From 45adf7e35175228821cc8f84a63855b851e25d55 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 31 Mar 2017 11:10:45 +0800 Subject: [PATCH 1/6] recreate pr --- .../scala/org/apache/spark/ml/Predictor.scala | 7 ++++++ .../classification/LogisticRegression.scala | 11 ++------- .../spark/ml/classification/OneVsRest.scala | 23 ++++++------------- .../apache/spark/ml/clustering/KMeans.scala | 12 ++++------ .../ml/regression/AFTSurvivalRegression.scala | 2 +- .../ml/regression/IsotonicRegression.scala | 2 +- .../ml/regression/LinearRegression.scala | 1 - .../classification/LogisticRegression.scala | 4 ++-- 8 files changed, 24 insertions(+), 38 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala index 08b0cb9b8f6a5..0d95a8f9608be 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala @@ -27,6 +27,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataType, DoubleType, StructType} +import org.apache.spark.storage.StorageLevel /** * (private[ml]) Trait for parameters for prediction (regression and classification). @@ -85,6 +86,10 @@ abstract class Predictor[ M <: PredictionModel[FeaturesType, M]] extends Estimator[M] with PredictorParams { + protected[spark] var storageLevel = StorageLevel.NONE + + protected def handlePersistence = storageLevel == StorageLevel.NONE + /** @group setParam */ def setLabelCol(value: String): Learner = set(labelCol, value).asInstanceOf[Learner] @@ -99,6 +104,8 @@ abstract class Predictor[ // Developers only need to implement train(). transformSchema(dataset.schema, logging = true) + storageLevel = dataset.storageLevel + // Cast LabelCol to DoubleType and keep the metadata. val labelMeta = dataset.schema($(labelCol)).metadata val labelCasted = dataset.withColumn($(labelCol), col($(labelCol)).cast(DoubleType), labelMeta) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index f491a679b2422..7b4f988a4340c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -483,14 +483,7 @@ class LogisticRegression @Since("1.2.0") ( this } - override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { - val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE - train(dataset, handlePersistence) - } - - protected[spark] def train( - dataset: Dataset[_], - handlePersistence: Boolean): LogisticRegressionModel = { + protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map { @@ -500,7 +493,7 @@ class LogisticRegression @Since("1.2.0") ( if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) - val instr = Instrumentation.create(this, instances) + val instr = Instrumentation.create(this, dataset) instr.logParams(regParam, elasticNetParam, standardization, threshold, maxIter, tol, fitIntercept) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 05b8c3ab5456e..5afdc3a556a42 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -17,10 +17,8 @@ package org.apache.spark.ml.classification -import java.util.{List => JList} import java.util.UUID -import scala.collection.JavaConverters._ import scala.language.existentials import org.apache.hadoop.fs.Path @@ -164,10 +162,8 @@ final class OneVsRestModel private[ml] ( val newDataset = dataset.withColumn(accColName, initUDF()) // persist if underlying dataset is not persistent. - val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE - if (handlePersistence) { - newDataset.persist(StorageLevel.MEMORY_AND_DISK) - } + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK) // update the accumulator column with the result of prediction of models val aggregatedDataset = models.zipWithIndex.foldLeft[DataFrame](newDataset) { @@ -190,9 +186,7 @@ final class OneVsRestModel private[ml] ( updatedDataset.select(newColumns: _*).withColumnRenamed(tmpColName, accColName) } - if (handlePersistence) { - newDataset.unpersist() - } + if (handlePersistence) newDataset.unpersist() // output the index of the classifier with highest confidence as prediction val labelUDF = udf { (predictions: Map[Int, Double]) => @@ -347,10 +341,9 @@ final class OneVsRest @Since("1.4.0") ( } // persist if underlying dataset is not persistent. - val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE - if (handlePersistence) { - multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) - } + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + if (handlePersistence) multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) + // create k columns, one for each binary classifier. val models = Range(0, numClasses).par.map { index => @@ -374,9 +367,7 @@ final class OneVsRest @Since("1.4.0") ( }.toArray[ClassificationModel[_, _]] instr.logNumFeatures(models.head.numFeatures) - if (handlePersistence) { - multiclassLabeled.unpersist() - } + if (handlePersistence) multiclassLabeled.unpersist() // extract label metadata from label column if present, or create a nominal attribute // to output the number of labels diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index e02b532ca8a93..84b938fbd487d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -304,16 +304,14 @@ class KMeans @Since("1.5.0") ( override def fit(dataset: Dataset[_]): KMeansModel = { transformSchema(dataset.schema, logging = true) - val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE + val handlePersistence = dataset.storageLevel == StorageLevel.NONE val instances: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } - if (handlePersistence) { - instances.persist(StorageLevel.MEMORY_AND_DISK) - } + if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) - val instr = Instrumentation.create(this, instances) + val instr = Instrumentation.create(this, dataset) instr.logParams(featuresCol, predictionCol, k, initMode, initSteps, maxIter, seed, tol) val algo = new MLlibKMeans() .setK($(k)) @@ -329,9 +327,7 @@ class KMeans @Since("1.5.0") ( model.setSummary(Some(summary)) instr.logSuccess(model) - if (handlePersistence) { - instances.unpersist() - } + if (handlePersistence) instances.unpersist() model } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 16821f317760e..4b46c3831d75f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -213,7 +213,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S override def fit(dataset: Dataset[_]): AFTSurvivalRegressionModel = { transformSchema(dataset.schema, logging = true) val instances = extractAFTPoints(dataset) - val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE + val handlePersistence = dataset.storageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) val featuresSummarizer = { diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index 529f66eadbcff..8faab52ea474b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -165,7 +165,7 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri transformSchema(dataset.schema, logging = true) // Extract columns from data. If dataset is persisted, do not persist oldDataset. val instances = extractWeightedLabeledPoints(dataset) - val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE + val handlePersistence = dataset.storageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) val instr = Instrumentation.create(this, dataset) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index ed431f550817e..15bc34bbc3fa9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -251,7 +251,6 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String return lrModel } - val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) val (featuresSummarizer, ySummarizer) = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index 4b650000736e2..7b6e8ba0ec88d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -448,9 +448,9 @@ class LogisticRegressionWithLBFGS val spark = SparkSession.builder().sparkContext(input.context).getOrCreate() val df = spark.createDataFrame(input.map(_.asML)) // Determine if we should cache the DF - val handlePersistence = input.getStorageLevel == StorageLevel.NONE + lr.storageLevel == input.getStorageLevel // Train our model - val mlLogisticRegressionModel = lr.train(df, handlePersistence) + val mlLogisticRegressionModel = lr.train(df) // convert the model val weights = Vectors.dense(mlLogisticRegressionModel.coefficients.toArray) createModel(weights, mlLogisticRegressionModel.intercept) From 208e1009037aed780ca194658a68dec2221db3bc Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Thu, 6 Jul 2017 13:06:04 +0800 Subject: [PATCH 2/6] del unused equation --- .../apache/spark/mllib/classification/LogisticRegression.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index 7b6e8ba0ec88d..eb60103864769 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -447,8 +447,6 @@ class LogisticRegressionWithLBFGS // Convert our input into a DataFrame val spark = SparkSession.builder().sparkContext(input.context).getOrCreate() val df = spark.createDataFrame(input.map(_.asML)) - // Determine if we should cache the DF - lr.storageLevel == input.getStorageLevel // Train our model val mlLogisticRegressionModel = lr.train(df) // convert the model From 936d46634c69a8f0f8a546475edd3bfbd501d1e1 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Tue, 29 Aug 2017 17:52:28 +0800 Subject: [PATCH 3/6] revert Predictor --- mllib/src/main/scala/org/apache/spark/ml/Predictor.scala | 7 ------- .../spark/ml/classification/LogisticRegression.scala | 9 ++++++++- .../apache/spark/ml/regression/LinearRegression.scala | 1 + .../spark/mllib/classification/LogisticRegression.scala | 4 +++- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala index 0d95a8f9608be..08b0cb9b8f6a5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala @@ -27,7 +27,6 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataType, DoubleType, StructType} -import org.apache.spark.storage.StorageLevel /** * (private[ml]) Trait for parameters for prediction (regression and classification). @@ -86,10 +85,6 @@ abstract class Predictor[ M <: PredictionModel[FeaturesType, M]] extends Estimator[M] with PredictorParams { - protected[spark] var storageLevel = StorageLevel.NONE - - protected def handlePersistence = storageLevel == StorageLevel.NONE - /** @group setParam */ def setLabelCol(value: String): Learner = set(labelCol, value).asInstanceOf[Learner] @@ -104,8 +99,6 @@ abstract class Predictor[ // Developers only need to implement train(). transformSchema(dataset.schema, logging = true) - storageLevel = dataset.storageLevel - // Cast LabelCol to DoubleType and keep the metadata. val labelMeta = dataset.schema($(labelCol)).metadata val labelCasted = dataset.withColumn($(labelCol), col($(labelCol)).cast(DoubleType), labelMeta) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 7b4f988a4340c..5868458a22190 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -483,7 +483,14 @@ class LogisticRegression @Since("1.2.0") ( this } - protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { + override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { + val handlePersistence = dataset.storageLevel == StorageLevel.NONE + train(dataset, handlePersistence) + } + + protected[spark] def train( + dataset: Dataset[_], + handlePersistence: Boolean): LogisticRegressionModel = { val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map { diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 15bc34bbc3fa9..b2a968118d1a9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -251,6 +251,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String return lrModel } + val handlePersistence = dataset.storageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) val (featuresSummarizer, ySummarizer) = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index eb60103864769..4b650000736e2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -447,8 +447,10 @@ class LogisticRegressionWithLBFGS // Convert our input into a DataFrame val spark = SparkSession.builder().sparkContext(input.context).getOrCreate() val df = spark.createDataFrame(input.map(_.asML)) + // Determine if we should cache the DF + val handlePersistence = input.getStorageLevel == StorageLevel.NONE // Train our model - val mlLogisticRegressionModel = lr.train(df) + val mlLogisticRegressionModel = lr.train(df, handlePersistence) // convert the model val weights = Vectors.dense(mlLogisticRegressionModel.coefficients.toArray) createModel(weights, mlLogisticRegressionModel.intercept) From df4d263d21c3e4413b4d74aaabcd3a10348a0001 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Mon, 4 Sep 2017 13:27:56 +0800 Subject: [PATCH 4/6] create param HasHandlePersistence --- .../classification/LogisticRegression.scala | 20 ++++++------- .../spark/ml/classification/OneVsRest.scala | 29 ++++++++++++------- .../apache/spark/ml/clustering/KMeans.scala | 15 +++++++--- .../ml/param/shared/SharedParamsCodeGen.scala | 3 +- .../spark/ml/param/shared/sharedParams.scala | 17 +++++++++++ .../ml/regression/AFTSurvivalRegression.scala | 12 +++++--- .../ml/regression/IsotonicRegression.scala | 12 +++++--- .../ml/regression/LinearRegression.scala | 13 +++++---- 8 files changed, 81 insertions(+), 40 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 5868458a22190..ff48b3029a00b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -51,7 +51,8 @@ import org.apache.spark.util.VersionUtils */ private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol - with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth { + with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth + with HasHandlePersistence { import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames @@ -431,6 +432,10 @@ class LogisticRegression @Since("1.2.0") ( @Since("2.2.0") def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value) + /** @group setParam */ + @Since("2.3.0") + def setHandlePersistence(value: Boolean): this.type = set(handlePersistence, value) + private def assertBoundConstrainedOptimizationParamsValid( numCoefficientSets: Int, numFeatures: Int): Unit = { @@ -483,14 +488,7 @@ class LogisticRegression @Since("1.2.0") ( this } - override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { - val handlePersistence = dataset.storageLevel == StorageLevel.NONE - train(dataset, handlePersistence) - } - - protected[spark] def train( - dataset: Dataset[_], - handlePersistence: Boolean): LogisticRegressionModel = { + protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map { @@ -498,7 +496,7 @@ class LogisticRegression @Since("1.2.0") ( Instance(label, weight, features) } - if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + if ($(handlePersistence)) instances.persist(StorageLevel.MEMORY_AND_DISK) val instr = Instrumentation.create(this, dataset) instr.logParams(regParam, elasticNetParam, standardization, threshold, @@ -878,7 +876,7 @@ class LogisticRegression @Since("1.2.0") ( } } - if (handlePersistence) instances.unpersist() + if ($(handlePersistence)) instances.unpersist() val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector, numClasses, isMultinomial)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 5afdc3a556a42..0513932756cfc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -32,7 +32,7 @@ import org.apache.spark.ml._ import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} -import org.apache.spark.ml.param.shared.HasWeightCol +import org.apache.spark.ml.param.shared.{HasHandlePersistence, HasWeightCol} import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ @@ -53,7 +53,7 @@ private[ml] trait ClassifierTypeTrait { * Params for [[OneVsRest]]. */ private[ml] trait OneVsRestParams extends PredictorParams - with ClassifierTypeTrait with HasWeightCol { + with ClassifierTypeTrait with HasWeightCol with HasHandlePersistence { /** * param for the base binary classifier that we reduce multiclass classification into. @@ -65,6 +65,10 @@ private[ml] trait OneVsRestParams extends PredictorParams /** @group getParam */ def getClassifier: ClassifierType = $(classifier) + + /** @group setParam */ + @Since("2.3.0") + def setHandlePersistence(value: Boolean): this.type = set(handlePersistence, value) } private[ml] object OneVsRestParams extends ClassifierTypeTrait { @@ -161,9 +165,9 @@ final class OneVsRestModel private[ml] ( val initUDF = udf { () => Map[Int, Double]() } val newDataset = dataset.withColumn(accColName, initUDF()) - // persist if underlying dataset is not persistent. - val handlePersistence = dataset.storageLevel == StorageLevel.NONE - if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK) + if ($(handlePersistence)) { + newDataset.persist(StorageLevel.MEMORY_AND_DISK) + } // update the accumulator column with the result of prediction of models val aggregatedDataset = models.zipWithIndex.foldLeft[DataFrame](newDataset) { @@ -186,7 +190,9 @@ final class OneVsRestModel private[ml] ( updatedDataset.select(newColumns: _*).withColumnRenamed(tmpColName, accColName) } - if (handlePersistence) newDataset.unpersist() + if ($(handlePersistence)) { + newDataset.unpersist() + } // output the index of the classifier with highest confidence as prediction val labelUDF = udf { (predictions: Map[Int, Double]) => @@ -340,10 +346,9 @@ final class OneVsRest @Since("1.4.0") ( dataset.select($(labelCol), $(featuresCol)) } - // persist if underlying dataset is not persistent. - val handlePersistence = dataset.storageLevel == StorageLevel.NONE - if (handlePersistence) multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) - + if ($(handlePersistence)) { + multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) + } // create k columns, one for each binary classifier. val models = Range(0, numClasses).par.map { index => @@ -367,7 +372,9 @@ final class OneVsRest @Since("1.4.0") ( }.toArray[ClassificationModel[_, _]] instr.logNumFeatures(models.head.numFeatures) - if (handlePersistence) multiclassLabeled.unpersist() + if ($(handlePersistence)) { + multiclassLabeled.unpersist() + } // extract label metadata from label column if present, or create a nominal attribute // to output the number of labels diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 84b938fbd487d..3ea7f0b1594b9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -40,7 +40,7 @@ import org.apache.spark.util.VersionUtils.majorVersion * Common params for KMeans and KMeansModel */ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFeaturesCol - with HasSeed with HasPredictionCol with HasTol { + with HasSeed with HasPredictionCol with HasTol with HasHandlePersistence { /** * The number of clusters to create (k). Must be > 1. Note that it is possible for fewer than @@ -300,16 +300,21 @@ class KMeans @Since("1.5.0") ( @Since("1.5.0") def setSeed(value: Long): this.type = set(seed, value) + /** @group setParam */ + @Since("2.3.0") + def setHandlePersistence(value: Boolean): this.type = set(handlePersistence, value) + @Since("2.0.0") override def fit(dataset: Dataset[_]): KMeansModel = { transformSchema(dataset.schema, logging = true) - val handlePersistence = dataset.storageLevel == StorageLevel.NONE val instances: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } - if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + if ($(handlePersistence)) { + instances.persist(StorageLevel.MEMORY_AND_DISK) + } val instr = Instrumentation.create(this, dataset) instr.logParams(featuresCol, predictionCol, k, initMode, initSteps, maxIter, seed, tol) @@ -327,7 +332,9 @@ class KMeans @Since("1.5.0") ( model.setSummary(Some(summary)) instr.logSuccess(model) - if (handlePersistence) instances.unpersist() + if ($(handlePersistence)) { + instances.unpersist() + } model } diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 1860fe8361749..fe540a43d6e81 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -82,7 +82,8 @@ private[shared] object SharedParamsCodeGen { "all instance weights as 1.0"), ParamDesc[String]("solver", "the solver algorithm for optimization", finalFields = false), ParamDesc[Int]("aggregationDepth", "suggested depth for treeAggregate (>= 2)", Some("2"), - isValid = "ParamValidators.gtEq(2)", isExpertParam = true)) + isValid = "ParamValidators.gtEq(2)", isExpertParam = true), + ParamDesc[Boolean]("handlePersistence", "whether to handle data persistence", Some("true"))) val code = genSharedParams(params) val file = "src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala" diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 6061d9ca0a084..a7a46e8a46e5c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -402,4 +402,21 @@ private[ml] trait HasAggregationDepth extends Params { /** @group expertGetParam */ final def getAggregationDepth: Int = $(aggregationDepth) } + +/** + * Trait for shared param handlePersistence (default: true). + */ +private[ml] trait HasHandlePersistence extends Params { + + /** + * Param for whether to handle data persistence. + * @group param + */ + final val handlePersistence: BooleanParam = new BooleanParam(this, "handlePersistence", "whether to handle data persistence") + + setDefault(handlePersistence, true) + + /** @group getParam */ + final def getHandlePersistence: Boolean = $(handlePersistence) +} // scalastyle:on diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 4b46c3831d75f..e7dfa8b7b748f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -46,7 +46,8 @@ import org.apache.spark.storage.StorageLevel */ private[regression] trait AFTSurvivalRegressionParams extends Params with HasFeaturesCol with HasLabelCol with HasPredictionCol with HasMaxIter - with HasTol with HasFitIntercept with HasAggregationDepth with Logging { + with HasTol with HasFitIntercept with HasAggregationDepth with HasHandlePersistence + with Logging { /** * Param for censor column name. @@ -197,6 +198,10 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) setDefault(aggregationDepth -> 2) + /** @group setParam */ + @Since("2.3.0") + def setHandlePersistence(value: Boolean): this.type = set(handlePersistence, value) + /** * Extract [[featuresCol]], [[labelCol]] and [[censorCol]] from input dataset, * and put it in an RDD with strong types. @@ -213,8 +218,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S override def fit(dataset: Dataset[_]): AFTSurvivalRegressionModel = { transformSchema(dataset.schema, logging = true) val instances = extractAFTPoints(dataset) - val handlePersistence = dataset.storageLevel == StorageLevel.NONE - if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + if ($(handlePersistence)) instances.persist(StorageLevel.MEMORY_AND_DISK) val featuresSummarizer = { val seqOp = (c: MultivariateOnlineSummarizer, v: AFTPoint) => c.add(v.features) @@ -273,7 +277,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S } bcFeaturesStd.destroy(blocking = false) - if (handlePersistence) instances.unpersist() + if ($(handlePersistence)) instances.unpersist() val rawCoefficients = parameters.slice(2, parameters.length) var i = 0 diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index 8faab52ea474b..301aaaea5d2e0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -39,7 +39,8 @@ import org.apache.spark.storage.StorageLevel * Params for isotonic regression. */ private[regression] trait IsotonicRegressionBase extends Params with HasFeaturesCol - with HasLabelCol with HasPredictionCol with HasWeightCol with Logging { + with HasLabelCol with HasPredictionCol with HasWeightCol with HasHandlePersistence + with Logging { /** * Param for whether the output sequence should be isotonic/increasing (true) or @@ -157,6 +158,10 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri @Since("1.5.0") def setFeatureIndex(value: Int): this.type = set(featureIndex, value) + /** @group setParam */ + @Since("2.3.0") + def setHandlePersistence(value: Boolean): this.type = set(handlePersistence, value) + @Since("1.5.0") override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra) @@ -165,8 +170,7 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri transformSchema(dataset.schema, logging = true) // Extract columns from data. If dataset is persisted, do not persist oldDataset. val instances = extractWeightedLabeledPoints(dataset) - val handlePersistence = dataset.storageLevel == StorageLevel.NONE - if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + if ($(handlePersistence)) instances.persist(StorageLevel.MEMORY_AND_DISK) val instr = Instrumentation.create(this, dataset) instr.logParams(labelCol, featuresCol, weightCol, predictionCol, featureIndex, isotonic) @@ -175,7 +179,7 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonic)) val oldModel = isotonicRegression.run(instances) - if (handlePersistence) instances.unpersist() + if ($(handlePersistence)) instances.unpersist() val model = copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this)) instr.logSuccess(model) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index b2a968118d1a9..dc090f85c9a34 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -53,7 +53,7 @@ import org.apache.spark.storage.StorageLevel private[regression] trait LinearRegressionParams extends PredictorParams with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol with HasFitIntercept with HasStandardization with HasWeightCol with HasSolver - with HasAggregationDepth { + with HasAggregationDepth with HasHandlePersistence { import LinearRegression._ @@ -208,6 +208,10 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value) setDefault(aggregationDepth -> 2) + /** @group setParam */ + @Since("2.3.0") + def setHandlePersistence(value: Boolean): this.type = set(handlePersistence, value) + override protected def train(dataset: Dataset[_]): LinearRegressionModel = { // Extract the number of features before deciding optimization solver. val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size @@ -251,8 +255,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String return lrModel } - val handlePersistence = dataset.storageLevel == StorageLevel.NONE - if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) + if ($(handlePersistence)) instances.persist(StorageLevel.MEMORY_AND_DISK) val (featuresSummarizer, ySummarizer) = { val seqOp = (c: (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer), @@ -285,7 +288,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String s"zeros and the intercept will be the mean of the label; as a result, " + s"training is not needed.") } - if (handlePersistence) instances.unpersist() + if ($(handlePersistence)) instances.unpersist() val coefficients = Vectors.sparse(numFeatures, Seq.empty) val intercept = yMean @@ -422,7 +425,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String 0.0 } - if (handlePersistence) instances.unpersist() + if ($(handlePersistence)) instances.unpersist() val model = copyValues(new LinearRegressionModel(uid, coefficients, intercept)) // Handle possible missing or invalid prediction columns From 971e52c4a18b4261d82ac14fefa6bb849367562c Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Mon, 4 Sep 2017 13:57:13 +0800 Subject: [PATCH 5/6] fix --- .../spark/mllib/classification/LogisticRegression.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index 4b650000736e2..82511679cdc02 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -444,13 +444,13 @@ class LogisticRegressionWithLBFGS lr.setFitIntercept(addIntercept) lr.setMaxIter(optimizer.getNumIterations()) lr.setTol(optimizer.getConvergenceTol()) + // Determine if we should cache the DF + lr.setHandlePersistence(input.getStorageLevel == StorageLevel.NONE) // Convert our input into a DataFrame val spark = SparkSession.builder().sparkContext(input.context).getOrCreate() val df = spark.createDataFrame(input.map(_.asML)) - // Determine if we should cache the DF - val handlePersistence = input.getStorageLevel == StorageLevel.NONE // Train our model - val mlLogisticRegressionModel = lr.train(df, handlePersistence) + val mlLogisticRegressionModel = lr.train(df) // convert the model val weights = Vectors.dense(mlLogisticRegressionModel.coefficients.toArray) createModel(weights, mlLogisticRegressionModel.intercept) From f8fa9573a1b40ff236e9c52cf429e2742c8f2bd0 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Mon, 4 Sep 2017 14:26:18 +0800 Subject: [PATCH 6/6] fix mima --- project/MimaExcludes.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index dd299e074535e..a2706ae442cbd 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -68,7 +68,12 @@ object MimaExcludes { // [SPARK-14280] Support Scala 2.12 ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.FutureAction.transformWith"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.FutureAction.transform") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.FutureAction.transform"), + + // [SPARK-18608] Add Param HasHandlePersistence + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasHandlePersistence.handlePersistence"), + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasHandlePersistence.getHandlePersistence"), + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasHandlePersistence.org$apache$spark$ml$param$shared$HasHandlePersistence$_setter_$handlePersistence_=") ) // Exclude rules for 2.2.x