From f0571890522eb09d092f01e8b5a208d4849e56df Mon Sep 17 00:00:00 2001 From: fjiang6 Date: Thu, 21 Aug 2014 15:34:15 -0700 Subject: [PATCH 1/7] add example HuberRobustRegression.scala --- .../mllib/HuberRobustRegression.scala | 134 +++++++++++++++ .../spark/mllib/optimization/Gradient.scala | 58 +++++++ .../mllib/regression/RobustRegression.scala | 159 ++++++++++++++++++ .../regression/RobustRegressionSuite.scala | 127 ++++++++++++++ 4 files changed, 478 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala new file mode 100644 index 0000000000000..6d7c4520202fc --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/HuberRobustRegression.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.log4j.{Level, Logger} +import scopt.OptionParser + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.regression.HuberRobustRegressionWithSGD +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.mllib.optimization.{SimpleUpdater, SquaredL2Updater, L1Updater} + +/** + * An example app for Huber Robust regression. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.HuberRobustRegression + * }}} + * A synthetic dataset can be found at `data/mllib/sample_linear_regression_data.txt`. + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object HuberRobustRegression extends App { + + object RegType extends Enumeration { + type RegType = Value + val NONE, L1, L2 = Value + } + + import RegType._ + + case class Params( + input: String = null, + numIterations: Int = 100, + stepSize: Double = 1.0, + regType: RegType = L2, + regParam: Double = 0.1) + + val defaultParams = Params() + + val parser = new OptionParser[Params]("HuberRobustRegression") { + head("HuberRobustRegression: an example app for Huber M-Estimation Robust regression.") + opt[Int]("numIterations") + .text("number of iterations") + .action((x, c) => c.copy(numIterations = x)) + opt[Double]("stepSize") + .text(s"initial step size, default: ${defaultParams.stepSize}") + .action((x, c) => c.copy(stepSize = x)) + opt[String]("regType") + .text(s"regularization type (${RegType.values.mkString(",")}), " + + s"default: ${defaultParams.regType}") + .action((x, c) => c.copy(regType = RegType.withName(x))) + opt[Double]("regParam") + .text(s"regularization parameter, default: ${defaultParams.regParam}") + arg[String]("") + .required() + .text("input paths to labeled examples in LIBSVM format") + .action((x, c) => c.copy(input = x)) + note( + """ + |For example, the following command runs this app on a synthetic dataset: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.HuberRobustRegression \ + | examples/target/scala-*/spark-examples-*.jar \ + | data/mllib/sample_linear_regression_data.txt + """.stripMargin) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"HuberRobustRegression with $params") + val sc = new SparkContext(conf) + + Logger.getRootLogger.setLevel(Level.WARN) + + val examples = MLUtils.loadLibSVMFile(sc, params.input).cache() + + val splits = examples.randomSplit(Array(0.8, 0.2)) + val training = splits(0).cache() + val test = splits(1).cache() + + val numTraining = training.count() + val numTest = test.count() + println(s"Training: $numTraining, test: $numTest.") + + examples.unpersist(blocking = false) + + val updater = params.regType match { + case NONE => new SimpleUpdater() + case L1 => new L1Updater() + case L2 => new SquaredL2Updater() + } + + val algorithm = new HuberRobustRegressionWithSGD() + algorithm.optimizer + .setNumIterations(params.numIterations) + .setStepSize(params.stepSize) + .setUpdater(updater) + .setRegParam(params.regParam) + + val model = algorithm.run(training) + + val prediction = model.predict(test.map(_.features)) + val predictionAndLabel = prediction.zip(test.map(_.label)) + + val loss = predictionAndLabel.map { case (p, l) => + val err = p - l + err * err + }.reduce(_ + _) + val rmse = math.sqrt(loss / numTest) + + println(s"Test RMSE = $rmse.") + + sc.stop() + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index fdd67160114ca..17af6c77b0cda 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -20,6 +20,7 @@ package org.apache.spark.mllib.optimization import org.apache.spark.annotation.DeveloperApi import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.BLAS.{axpy, dot, scal} +import scala.math.pow /** * :: DeveloperApi :: @@ -118,6 +119,63 @@ class LeastSquaresGradient extends Gradient { } } +/** + * :: DeveloperApi :: + * Compute gradient and loss for a M-estimation Huber objective function, as used in robust regression. + * The Huber M-estimator corresponds to a probability distribution for the errors which is normal in the centre but like + * a double exponential distribution in the tails (Hogg 1979: 109). + * L = 1/2 ||A weights-y||^2 if |A weights-y| <= k + * L = k |A weights-y| - 1/2 K^2 if |A weights-y| > k + * where k = 1.345 which produce 95% efficiency when the errors are normal and substantial resistance to outliers otherwise. + * See also the documentation for the precise formulation. + */ +@DeveloperApi +class HuberRobustGradient extends Gradient { + override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = { + val diff = dot(data, weights) - label + val loss = diff * diff + val gradient = data.copy + val k = 1.345 + //Tuning constant is generally picked to give reasonably high efficiency in the normal case. + //Smaller values produce more resistance to outliers while at the expense of lower efficiency when the errors are normally distributed. + if(diff < -k){ + scal(-k, gradient) + (gradient, (-k * diff - 1.0 / 2.0 * pow(k, 2))) + }else if(diff >= -k && diff <= k){ + scal(diff, gradient) + (gradient, (1.0 / 2.0 * pow(diff, 2))) + }else { + scal(k, gradient) + (gradient, (k * diff - 1.0 / 2.0 * pow(k, 2))) + } + } + + override def compute( + data: Vector, + label: Double, + weights: Vector, + cumGradient: Vector): Double = { + val diff = dot(data, weights) - label + val k = 1.345 + // + if(diff < -k){ + axpy(-k, data, cumGradient) + }else if(diff >= -k && diff <= k){ + axpy(diff, data, cumGradient) + }else { + axpy(k, data, cumGradient) + } + + if(diff < -k){ + -k * diff - 1.0 / 2.0 * pow(k, 2) + }else if(diff >= -k && diff <= k){ + 1.0 / 2.0 * pow(diff, 2) + }else { + k * diff - 1.0 /2.0 * pow(k, 2) + } + } +} + /** * :: DeveloperApi :: * Compute gradient and loss for a Hinge loss function, as used in SVM binary classification. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala new file mode 100644 index 0000000000000..1b7039e444f43 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.optimization._ +import org.apache.spark.annotation.Experimental + +/** + * Regression model trained using Huber M-Estimation RobustRegression. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. + */ +class HuberRobustRegressionModel ( + override val weights: Vector, + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable { + + override protected def predictPoint( + dataMatrix: Vector, + weightMatrix: Vector, + intercept: Double): Double = { + weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept + } +} + +/** + * Train a Huber M-Estimation Robust regression model with no regularization using Stochastic Gradient Descent. + * This solves the Huber objective function + * f(weights) = 1/2 ||A weights-y||^2 if |A weights-y| <= k + * f(weights) = k |A weights-y| - 1/2 K^2 if |A weights-y| > k + * where k = 1.345 which produce 95% efficiency when the errors are normal and substantial resistance to outliers otherwise. + * Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with + * its corresponding right hand side label y. + * See also the documentation for the precise formulation. + */ +class HuberRobustRegressionWithSGD private[mllib] ( + private var stepSize: Double, + private var numIterations: Int, + private var miniBatchFraction: Double) + extends GeneralizedLinearAlgorithm[HuberRobustRegressionModel] with Serializable { + + private val gradient = new HuberRobustGradient() + private val updater = new SimpleUpdater() + override val optimizer = new GradientDescent(gradient, updater) + .setStepSize(stepSize) + .setNumIterations(numIterations) + .setMiniBatchFraction(miniBatchFraction) + + /** + * Construct a Huber M-Estimation RobustRegression object with default parameters: {stepSize: 1.0, + * numIterations: 100, miniBatchFraction: 1.0}. + */ + def this() = this(1.0, 100, 1.0) + + override protected def createModel(weights: Vector, intercept: Double) = { + new HuberRobustRegressionModel(weights, intercept) + } +} + +/** + * Top-level methods for calling HuberRobustRegression. + */ +object HuberRobustRegressionWithSGD { + + /** + * Train a HuberRobust Regression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used + * in gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double, + initialWeights: Vector): HuberRobustRegressionModel = { + new HuberRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction) + .run(input, initialWeights) + } + + /** + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double): HuberRobustRegressionModel = { + new HuberRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input) + } + + /** + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. We use the entire data set to + * compute the true gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param stepSize Step size to be used for each iteration of Gradient Descent. + * @param numIterations Number of iterations of gradient descent to run. + * @return a HuberRobustRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double): HuberRobustRegressionModel = { + train(input, numIterations, stepSize, 1.0) + } + + /** + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using a step size of 1.0. We use the entire data set to + * compute the true gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @return a HuberRobustRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int): HuberRobustRegressionModel = { + train(input, numIterations, 1.0, 1.0) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala new file mode 100644 index 0000000000000..ca354421cd72f --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.regression + +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.util.{LocalClusterSparkContext, LinearDataGenerator, + LocalSparkContext} + +class RobustRegressionSuite extends FunSuite with LocalSparkContext { + + def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) { + val numOffPredictions = predictions.zip(input).count { case (prediction, expected) => + // A prediction is off if the prediction is more than 0.5 away from expected value. + math.abs(prediction - expected.label) > 0.5 + } + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 + test("Huber Robust regression") { + val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput( + 3.0, Array(10.0, 10.0), 100, 42), 2).cache() + val linReg = new HuberRobustRegressionWithSGD().setIntercept(true) + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(testRDD) + assert(model.intercept >= 2.5 && model.intercept <= 3.5) + + val weights = model.weights + assert(weights.size === 2) + assert(weights(0) >= 9.0 && weights(0) <= 11.0) + assert(weights(1) >= 9.0 && weights(1) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput( + 3.0, Array(10.0, 10.0), 100, 17) + val validationRDD = sc.parallelize(validationData, 2).cache() + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + // Test if we can correctly learn Y = 10*X1 + 10*X2 + test("Huber Robust regression without intercept") { + val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput( + 0.0, Array(10.0, 10.0), 100, 42), 2).cache() + val linReg = new HuberRobustRegressionWithSGD().setIntercept(false) + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(testRDD) + + assert(model.intercept === 0.0) + + val weights = model.weights + assert(weights.size === 2) + assert(weights(0) >= 9.0 && weights(0) <= 11.0) + assert(weights(1) >= 9.0 && weights(1) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput( + 0.0, Array(10.0, 10.0), 100, 17) + val validationRDD = sc.parallelize(validationData, 2).cache() + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + // Test if we can correctly learn Y = 10*X1 + 10*X10000 + test("sparse Huber Robust regression without intercept") { + val denseRDD = sc.parallelize( + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42), 2) + val sparseRDD = denseRDD.map { case LabeledPoint(label, v) => + val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1)))) + LabeledPoint(label, sv) + }.cache() + val linReg = new HuberRobustRegressionWithSGD().setIntercept(false) + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(sparseRDD) + + assert(model.intercept === 0.0) + + val weights = model.weights + assert(weights.size === 10000) + assert(weights(0) >= 9.0 && weights(0) <= 11.0) + assert(weights(9999) >= 9.0 && weights(9999) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 17) + val sparseValidationData = validationData.map { case LabeledPoint(label, v) => + val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1)))) + LabeledPoint(label, sv) + } + val sparseValidationRDD = sc.parallelize(sparseValidationData, 2) + + // Test prediction on RDD. + validatePrediction( + model.predict(sparseValidationRDD.map(_.features)).collect(), sparseValidationData) + + // Test prediction on Array. + validatePrediction( + sparseValidationData.map(row => model.predict(row.features)), sparseValidationData) + } +} From 38c1b3c0f7dd2a6f62eae2fecb006495ffc8a064 Mon Sep 17 00:00:00 2001 From: fanjiang Date: Fri, 22 Aug 2014 02:49:41 -0700 Subject: [PATCH 2/7] add new class HuberRobustGradient and RobustRegression.scala --- .../spark/mllib/optimization/Gradient.scala | 18 ++++++++----- .../mllib/regression/RobustRegression.scala | 25 ++++++++++--------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index 17af6c77b0cda..7834e683193c0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -121,12 +121,13 @@ class LeastSquaresGradient extends Gradient { /** * :: DeveloperApi :: - * Compute gradient and loss for a M-estimation Huber objective function, as used in robust regression. - * The Huber M-estimator corresponds to a probability distribution for the errors which is normal in the centre but like - * a double exponential distribution in the tails (Hogg 1979: 109). + * Compute gradient and loss for Huber objective function, as used in robust regression. + * The Huber M-estimator corresponds to a probability distribution for the errors which is normal + * in the centre but like a double exponential distribution in the tails (Hogg 1979: 109). * L = 1/2 ||A weights-y||^2 if |A weights-y| <= k * L = k |A weights-y| - 1/2 K^2 if |A weights-y| > k - * where k = 1.345 which produce 95% efficiency when the errors are normal and substantial resistance to outliers otherwise. + * where k = 1.345 which produce 95% efficiency when the errors are normal and + * substantial resistance to outliers otherwise. * See also the documentation for the precise formulation. */ @DeveloperApi @@ -136,8 +137,13 @@ class HuberRobustGradient extends Gradient { val loss = diff * diff val gradient = data.copy val k = 1.345 - //Tuning constant is generally picked to give reasonably high efficiency in the normal case. - //Smaller values produce more resistance to outliers while at the expense of lower efficiency when the errors are normally distributed. + + /** + * Tuning constant is generally picked to give reasonably high efficiency in the normal case. + * Smaller values produce more resistance to outliers while at the expense of + * lower efficiency when the errors are normally distributed. + */ + if(diff < -k){ scal(-k, gradient) (gradient, (-k * diff - 1.0 / 2.0 * pow(k, 2))) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala index 1b7039e444f43..4e50ccb1c8dc3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala @@ -42,11 +42,12 @@ class HuberRobustRegressionModel ( } /** - * Train a Huber M-Estimation Robust regression model with no regularization using Stochastic Gradient Descent. + * Train a Huber Robust regression model with no regularization using Stochastic Gradient Descent. * This solves the Huber objective function * f(weights) = 1/2 ||A weights-y||^2 if |A weights-y| <= k * f(weights) = k |A weights-y| - 1/2 K^2 if |A weights-y| > k - * where k = 1.345 which produce 95% efficiency when the errors are normal and substantial resistance to outliers otherwise. + * where k = 1.345 which produce 95% efficiency when the errors are normal and + * substantial resistance to outliers otherwise. * Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with * its corresponding right hand side label y. * See also the documentation for the precise formulation. @@ -81,8 +82,8 @@ class HuberRobustRegressionWithSGD private[mllib] ( object HuberRobustRegressionWithSGD { /** - * Train a HuberRobust Regression model given an RDD of (label, features) pairs. We run a fixed number - * of iterations of gradient descent using the specified step size. Each iteration uses + * Train a HuberRobust Regression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using the specified step size. Each iteration uses * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used * in gradient descent are initialized using the initial weights provided. * @@ -105,8 +106,8 @@ object HuberRobustRegressionWithSGD { } /** - * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed number - * of iterations of gradient descent using the specified step size. Each iteration uses + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using the specified step size. Each iteration uses * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. * * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data @@ -124,9 +125,9 @@ object HuberRobustRegressionWithSGD { } /** - * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed number - * of iterations of gradient descent using the specified step size. We use the entire data set to - * compute the true gradient in each iteration. + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using the specified step size. We use the entire + * data set to compute the true gradient in each iteration. * * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data * matrix A as well as the corresponding right hand side label y @@ -142,9 +143,9 @@ object HuberRobustRegressionWithSGD { } /** - * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed number - * of iterations of gradient descent using a step size of 1.0. We use the entire data set to - * compute the true gradient in each iteration. + * Train a HuberRobustRegression model given an RDD of (label, features) pairs. We run a fixed + * number of iterations of gradient descent using a step size of 1.0. We use the entire data + * set to compute the true gradient in each iteration. * * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data * matrix A as well as the corresponding right hand side label y From ba49567719299eb90b8f7a3a3db7561a912a210f Mon Sep 17 00:00:00 2001 From: fjiang6 Date: Sat, 23 Aug 2014 18:34:14 -0700 Subject: [PATCH 3/7] add Robust Regression Algorithm with Turkey bisquare weight function (Biweight Estimates) --- .../mllib/BiweightRobustRegression.scala | 134 +++++++++++++++++ .../spark/mllib/optimization/Gradient.scala | 61 +++++++- .../mllib/regression/RobustRegression.scala | 137 ++++++++++++++++++ .../regression/RobustRegressionSuite.scala | 89 ++++++++++++ 4 files changed, 415 insertions(+), 6 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/BiweightRobustRegression.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/BiweightRobustRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/BiweightRobustRegression.scala new file mode 100644 index 0000000000000..7683d74fc035c --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/BiweightRobustRegression.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.log4j.{Level, Logger} +import scopt.OptionParser + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.regression.BiweightRobustRegressionWithSGD +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.mllib.optimization.{SimpleUpdater, SquaredL2Updater, L1Updater} + +/** + * An example app for Biweight Robust regression. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.BiweightRobustRegression + * }}} + * A synthetic dataset can be found at `data/mllib/sample_linear_regression_data.txt`. + * If you use it as a template to create your own app, please use `spark-submit` to submit your app + */ +object BiweightRobustRegression extends App { + + object RegType extends Enumeration { + type RegType = Value + val NONE, L1, L2 = Value + } + + import RegType._ + + case class Params( + input: String = null, + numIterations: Int = 100, + stepSize: Double = 1.0, + regType: RegType = L2, + regParam: Double = 0.1) + + val defaultParams = Params() + + val parser = new OptionParser[Params]("BiweightRobustRegression") { + head("BiweightRobustRegression: an example app for Biweight M-Estimation Robust regression.") + opt[Int]("numIterations") + .text("number of iterations") + .action((x, c) => c.copy(numIterations = x)) + opt[Double]("stepSize") + .text(s"initial step size, default: ${defaultParams.stepSize}") + .action((x, c) => c.copy(stepSize = x)) + opt[String]("regType") + .text(s"regularization type (${RegType.values.mkString(",")}), " + + s"default: ${defaultParams.regType}") + .action((x, c) => c.copy(regType = RegType.withName(x))) + opt[Double]("regParam") + .text(s"regularization parameter, default: ${defaultParams.regParam}") + arg[String]("") + .required() + .text("input paths to labeled examples in LIBSVM format") + .action((x, c) => c.copy(input = x)) + note( + """ + |For example, the following command runs this app on a synthetic dataset: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.BiweightRobustRegression \ + | examples/target/scala-*/spark-examples-*.jar \ + | data/mllib/sample_linear_regression_data.txt + """.stripMargin) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"BiweightRobustRegression with $params") + val sc = new SparkContext(conf) + + Logger.getRootLogger.setLevel(Level.WARN) + + val examples = MLUtils.loadLibSVMFile(sc, params.input).cache() + + val splits = examples.randomSplit(Array(0.8, 0.2)) + val training = splits(0).cache() + val test = splits(1).cache() + + val numTraining = training.count() + val numTest = test.count() + println(s"Training: $numTraining, test: $numTest.") + + examples.unpersist(blocking = false) + + val updater = params.regType match { + case NONE => new SimpleUpdater() + case L1 => new L1Updater() + case L2 => new SquaredL2Updater() + } + + val algorithm = new BiweightRobustRegressionWithSGD() + algorithm.optimizer + .setNumIterations(params.numIterations) + .setStepSize(params.stepSize) + .setUpdater(updater) + .setRegParam(params.regParam) + + val model = algorithm.run(training) + + val prediction = model.predict(test.map(_.features)) + val predictionAndLabel = prediction.zip(test.map(_.label)) + + val loss = predictionAndLabel.map { case (p, l) => + val err = p - l + err * err + }.reduce(_ + _) + val rmse = math.sqrt(loss / numTest) + + println(s"Test RMSE = $rmse.") + + sc.stop() + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index 7834e683193c0..8cbe3608a9bd7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -137,19 +137,17 @@ class HuberRobustGradient extends Gradient { val loss = diff * diff val gradient = data.copy val k = 1.345 - /** * Tuning constant is generally picked to give reasonably high efficiency in the normal case. * Smaller values produce more resistance to outliers while at the expense of * lower efficiency when the errors are normally distributed. */ - if(diff < -k){ scal(-k, gradient) (gradient, (-k * diff - 1.0 / 2.0 * pow(k, 2))) }else if(diff >= -k && diff <= k){ scal(diff, gradient) - (gradient, (1.0 / 2.0 * pow(diff, 2))) + (gradient, (1.0 / 2.0 * loss)) }else { scal(k, gradient) (gradient, (k * diff - 1.0 / 2.0 * pow(k, 2))) @@ -163,7 +161,6 @@ class HuberRobustGradient extends Gradient { cumGradient: Vector): Double = { val diff = dot(data, weights) - label val k = 1.345 - // if(diff < -k){ axpy(-k, data, cumGradient) }else if(diff >= -k && diff <= k){ @@ -171,17 +168,69 @@ class HuberRobustGradient extends Gradient { }else { axpy(k, data, cumGradient) } - if(diff < -k){ -k * diff - 1.0 / 2.0 * pow(k, 2) }else if(diff >= -k && diff <= k){ - 1.0 / 2.0 * pow(diff, 2) + 1.0 / 2.0 * loss }else { k * diff - 1.0 /2.0 * pow(k, 2) } } } +/** + * :: DeveloperApi :: + * Compute gradient and loss for Turkey bisquare weight function, as used in robust regression. + * The biweight function produces and M-estimator that is more resistant to regression + * outliers than the Huber M-estimator (Andersen 2008: 19). Especially on the extreme tails. + * L = k^2 / 6 * (1 - (1 - ||A weights-y||^2 / k^2)^3) if |A weights-y| <= k + * L = k^2 / 6 if |A weights-y| > k + * where k = 4.685 which produce 95% efficiency when the errors are normal and + * substantial resistance to outliers otherwise. + * See also the documentation for the precise formulation. + */ +@DeveloperApi +class BiweightRobustGradient extends Gradient { + override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = { + val diff = dot(data, weights) - label + val loss = diff * diff + val gradient = data.copy + val k = 4.685 + /** + * Tuning constant is generally picked to give reasonably high efficiency in the normal case. + * Smaller values produce more resistance to outliers while at the expense of + * lower efficiency when the errors are normally distributed. + */ + if(diff >= -k && diff <= k){ + scal(pow((1 - loss / pow(k, 2)), 2) * diff, gradient) + (gradient, (pow(k, 2) / 6.0 * (1 - pow((1 - loss / pow(k, 2)), 3)))) + }else { + scal(0.0, gradient) + (gradient, pow(k, 2) / 6.0) + } + } + + override def compute( + data: Vector, + label: Double, + weights: Vector, + cumGradient: Vector): Double = { + val diff = dot(data, weights) - label + val loss = diff * diff + val k = 4.685 + if(diff >= -k && diff <= k){ + axpy(pow((1 - loss / pow(k, 2)), 2) * diff, data, cumGradient) + }else { + axpy(0.0, data, cumGradient) + } + if(diff >= -k && diff <= k){ + pow(k, 2) / 6.0 * (1 - pow((1 - loss / pow(k, 2)), 3)) + }else { + pow(k, 2) / 6.0 + } + } +} + /** * :: DeveloperApi :: * Compute gradient and loss for a Hinge loss function, as used in SVM binary classification. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala index 4e50ccb1c8dc3..499813a2da1b7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala @@ -158,3 +158,140 @@ object HuberRobustRegressionWithSGD { train(input, numIterations, 1.0, 1.0) } } + +/** + * Regression model trained using Turkey bisquare (Biweight) M-Estimation RobustRegression. + * + * @param weights Weights computed for every feature. + * @param intercept Intercept computed for this model. + */ +class BiweightRobustRegressionModel ( + override val weights: Vector, + override val intercept: Double) + extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable { + + override protected def predictPoint( + dataMatrix: Vector, + weightMatrix: Vector, + intercept: Double): Double = { + weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept + } +} + +/** + * Train a Biweight Robust regression model with no regularization using SGD. + * This solves the Biweight objective function + * L = k^2 / 6 * (1 - (1 - ||A weights-y||^2 / k^2)^3) if |A weights-y| <= k + * L = k^2 / 6 if |A weights-y| > k + * where k = 4.685 which produce 95% efficiency when the errors are normal and + * substantial resistance to outliers otherwise. + * Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with + * its corresponding right hand side label y. + * See also the documentation for the precise formulation. + */ +class BiweightRobustRegressionWithSGD private[mllib] ( + private var stepSize: Double, + private var numIterations: Int, + private var miniBatchFraction: Double) + extends GeneralizedLinearAlgorithm[BiweightRobustRegressionModel] with Serializable { + + private val gradient = new BiweightRobustGradient() + private val updater = new SimpleUpdater() + override val optimizer = new GradientDescent(gradient, updater) + .setStepSize(stepSize) + .setNumIterations(numIterations) + .setMiniBatchFraction(miniBatchFraction) + + /** + * Construct a Biweight M-Estimation RobustRegression object with default parameters: + * {stepSize: 1.0, numIterations: 100, miniBatchFraction: 1.0}. + */ + def this() = this(1.0, 100, 1.0) + + override protected def createModel(weights: Vector, intercept: Double) = { + new BiweightRobustRegressionModel(weights, intercept) + } +} + +/** + * Top-level methods for calling BiweightRobustRegression. + */ +object BiweightRobustRegressionWithSGD { + + /** + * Train a BiweightRobust Regression model given an RDD of (label, features) pairs. We run a + * fixed number of iterations of gradient descent using the specified step size. Each iteration + * uses `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights + * used in gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double, + initialWeights: Vector): BiweightRobustRegressionModel = { + new BiweightRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction) + .run(input, initialWeights) + } + + /** + * Train a BiweightRobustRegression model given an RDD of (label,features) pairs. We run a fixed + * number of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double, + miniBatchFraction: Double): BiweightRobustRegressionModel = { + new BiweightRobustRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input) + } + + /** + * Train a BiweightRobustRegression model given an RDD of (label,features) pairs. We run a fixed + * number of iterations of gradient descent using the specified step size. We use the entire + * data set to compute the true gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param stepSize Step size to be used for each iteration of Gradient Descent. + * @param numIterations Number of iterations of gradient descent to run. + * @return a BiweightRobustRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int, + stepSize: Double): BiweightRobustRegressionModel = { + train(input, numIterations, stepSize, 1.0) + } + + /** + * Train a BiweightRobustRegression model given an RDD of (label,features) pairs. We run a fixed + * number of iterations of gradient descent using a step size of 1.0. We use the entire data + * set to compute the true gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data + * matrix A as well as the corresponding right hand side label y + * @param numIterations Number of iterations of gradient descent to run. + * @return a BiweightRobustRegressionModel which has the weights and offset from training. + */ + def train( + input: RDD[LabeledPoint], + numIterations: Int): BiweightRobustRegressionModel = { + train(input, numIterations, 1.0, 1.0) + } +} \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala index ca354421cd72f..8fbdb5b9594c5 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala @@ -124,4 +124,93 @@ class RobustRegressionSuite extends FunSuite with LocalSparkContext { validatePrediction( sparseValidationData.map(row => model.predict(row.features)), sparseValidationData) } + + // Test if we can correctly learn Y = 3 + 10*X1 + 10*X2 + test("Biweight Robust regression") { + val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput( + 3.0, Array(10.0, 10.0), 100, 42), 2).cache() + val linReg = new BiweightRobustRegressionWithSGD().setIntercept(true) + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(testRDD) + assert(model.intercept >= 2.5 && model.intercept <= 3.5) + + val weights = model.weights + assert(weights.size === 2) + assert(weights(0) >= 9.0 && weights(0) <= 11.0) + assert(weights(1) >= 9.0 && weights(1) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput( + 3.0, Array(10.0, 10.0), 100, 17) + val validationRDD = sc.parallelize(validationData, 2).cache() + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + // Test if we can correctly learn Y = 10*X1 + 10*X2 + test("Biweight Robust regression without intercept") { + val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput( + 0.0, Array(10.0, 10.0), 100, 42), 2).cache() + val linReg = new BiweightRobustRegressionWithSGD().setIntercept(false) + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(testRDD) + + assert(model.intercept === 0.0) + + val weights = model.weights + assert(weights.size === 2) + assert(weights(0) >= 9.0 && weights(0) <= 11.0) + assert(weights(1) >= 9.0 && weights(1) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput( + 0.0, Array(10.0, 10.0), 100, 17) + val validationRDD = sc.parallelize(validationData, 2).cache() + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_.features)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row.features)), validationData) + } + + // Test if we can correctly learn Y = 10*X1 + 10*X10000 + test("sparse Biweight Robust regression without intercept") { + val denseRDD = sc.parallelize( + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42), 2) + val sparseRDD = denseRDD.map { case LabeledPoint(label, v) => + val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1)))) + LabeledPoint(label, sv) + }.cache() + val linReg = new BiweightRobustRegressionWithSGD().setIntercept(false) + linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + + val model = linReg.run(sparseRDD) + + assert(model.intercept === 0.0) + + val weights = model.weights + assert(weights.size === 10000) + assert(weights(0) >= 9.0 && weights(0) <= 11.0) + assert(weights(9999) >= 9.0 && weights(9999) <= 11.0) + + val validationData = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 17) + val sparseValidationData = validationData.map { case LabeledPoint(label, v) => + val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1)))) + LabeledPoint(label, sv) + } + val sparseValidationRDD = sc.parallelize(sparseValidationData, 2) + + // Test prediction on RDD. + validatePrediction( + model.predict(sparseValidationRDD.map(_.features)).collect(), sparseValidationData) + + // Test prediction on Array. + validatePrediction( + sparseValidationData.map(row => model.predict(row.features)), sparseValidationData) + } } From f4a84a3cb938b9917ca1faa5f173152eb9b01998 Mon Sep 17 00:00:00 2001 From: fjiang6 Date: Sun, 24 Aug 2014 10:17:39 -0700 Subject: [PATCH 4/7] adjust Gradient --- .../scala/org/apache/spark/mllib/optimization/Gradient.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index 8cbe3608a9bd7..24fd1a41e35c2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -160,6 +160,7 @@ class HuberRobustGradient extends Gradient { weights: Vector, cumGradient: Vector): Double = { val diff = dot(data, weights) - label + val loss = diff * diff val k = 1.345 if(diff < -k){ axpy(-k, data, cumGradient) From 5f9e03fd393992074edff0ccea48f4887f556578 Mon Sep 17 00:00:00 2001 From: fjiang6 Date: Sun, 24 Aug 2014 14:15:48 -0700 Subject: [PATCH 5/7] =?UTF-8?q?adjust=20Robust=20Regression=20Algorithm=20?= =?UTF-8?q?with=20Turkey=20bisquare=20weight=20function=20=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../spark/mllib/optimization/Gradient.scala | 15 +++++---------- .../spark/mllib/regression/RobustRegression.scala | 6 +++--- .../mllib/regression/RobustRegressionSuite.scala | 7 ++++--- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala index 24fd1a41e35c2..0cb090155733d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala @@ -181,7 +181,7 @@ class HuberRobustGradient extends Gradient { /** * :: DeveloperApi :: - * Compute gradient and loss for Turkey bisquare weight function, as used in robust regression. + * Compute gradient and loss for Tukey bisquare weight function, as used in robust regression. * The biweight function produces and M-estimator that is more resistant to regression * outliers than the Huber M-estimator (Andersen 2008: 19). Especially on the extreme tails. * L = k^2 / 6 * (1 - (1 - ||A weights-y||^2 / k^2)^3) if |A weights-y| <= k @@ -195,7 +195,6 @@ class BiweightRobustGradient extends Gradient { override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = { val diff = dot(data, weights) - label val loss = diff * diff - val gradient = data.copy val k = 4.685 /** * Tuning constant is generally picked to give reasonably high efficiency in the normal case. @@ -203,11 +202,11 @@ class BiweightRobustGradient extends Gradient { * lower efficiency when the errors are normally distributed. */ if(diff >= -k && diff <= k){ + val gradient = data.copy scal(pow((1 - loss / pow(k, 2)), 2) * diff, gradient) (gradient, (pow(k, 2) / 6.0 * (1 - pow((1 - loss / pow(k, 2)), 3)))) }else { - scal(0.0, gradient) - (gradient, pow(k, 2) / 6.0) + (Vectors.sparse(weights.size, Array.empty, Array.empty), (pow(k, 2) / 6.0)) } } @@ -220,12 +219,8 @@ class BiweightRobustGradient extends Gradient { val loss = diff * diff val k = 4.685 if(diff >= -k && diff <= k){ - axpy(pow((1 - loss / pow(k, 2)), 2) * diff, data, cumGradient) - }else { - axpy(0.0, data, cumGradient) - } - if(diff >= -k && diff <= k){ - pow(k, 2) / 6.0 * (1 - pow((1 - loss / pow(k, 2)), 3)) + axpy((pow((1.0 - loss / pow(k, 2)), 2) * diff), data, cumGradient) + pow(k, 2) / 6.0 * (1.0 - pow((1.0 - loss / pow(k, 2)), 3)) }else { pow(k, 2) / 6.0 } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala index 499813a2da1b7..f3afd94a59312 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala @@ -160,7 +160,7 @@ object HuberRobustRegressionWithSGD { } /** - * Regression model trained using Turkey bisquare (Biweight) M-Estimation RobustRegression. + * Regression model trained using Tukey bisquare (Biweight) M-Estimation RobustRegression. * * @param weights Weights computed for every feature. * @param intercept Intercept computed for this model. @@ -204,9 +204,9 @@ class BiweightRobustRegressionWithSGD private[mllib] ( /** * Construct a Biweight M-Estimation RobustRegression object with default parameters: - * {stepSize: 1.0, numIterations: 100, miniBatchFraction: 1.0}. + * {stepSize: 1.0, numIterations: 5000, miniBatchFraction: 1.0}. */ - def this() = this(1.0, 100, 1.0) + def this() = this(1.0, 5000, 1.0) override protected def createModel(weights: Vector, intercept: Double) = { new BiweightRobustRegressionModel(weights, intercept) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala index 8fbdb5b9594c5..3362c5abae17c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RobustRegressionSuite.scala @@ -130,7 +130,7 @@ class RobustRegressionSuite extends FunSuite with LocalSparkContext { val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput( 3.0, Array(10.0, 10.0), 100, 42), 2).cache() val linReg = new BiweightRobustRegressionWithSGD().setIntercept(true) - linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + linReg.optimizer.setNumIterations(3000).setStepSize(1.0) //Default numIterations: 5000 val model = linReg.run(testRDD) assert(model.intercept >= 2.5 && model.intercept <= 3.5) @@ -156,7 +156,7 @@ class RobustRegressionSuite extends FunSuite with LocalSparkContext { val testRDD = sc.parallelize(LinearDataGenerator.generateLinearInput( 0.0, Array(10.0, 10.0), 100, 42), 2).cache() val linReg = new BiweightRobustRegressionWithSGD().setIntercept(false) - linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + linReg.optimizer.setNumIterations(4000).setStepSize(1.0) //Default numIterations: 5000 val model = linReg.run(testRDD) @@ -187,7 +187,7 @@ class RobustRegressionSuite extends FunSuite with LocalSparkContext { LabeledPoint(label, sv) }.cache() val linReg = new BiweightRobustRegressionWithSGD().setIntercept(false) - linReg.optimizer.setNumIterations(1000).setStepSize(1.0) + linReg.optimizer.setNumIterations(4000).setStepSize(1.0) //Default numIterations: 5000 val model = linReg.run(sparseRDD) @@ -198,6 +198,7 @@ class RobustRegressionSuite extends FunSuite with LocalSparkContext { assert(weights(0) >= 9.0 && weights(0) <= 11.0) assert(weights(9999) >= 9.0 && weights(9999) <= 11.0) + val validationData = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 17) val sparseValidationData = validationData.map { case LabeledPoint(label, v) => val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1)))) From c9cafb31b234791d9728d1c5e0d2084214b1c3b5 Mon Sep 17 00:00:00 2001 From: fjiang6 Date: Sun, 24 Aug 2014 14:31:19 -0700 Subject: [PATCH 6/7] adjust Tukey bisquare (Biweight) function for Robust Regression --- .../apache/spark/examples/mllib/BiweightRobustRegression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/BiweightRobustRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/BiweightRobustRegression.scala index 7683d74fc035c..ed1ffb643cc58 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/BiweightRobustRegression.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/BiweightRobustRegression.scala @@ -44,7 +44,7 @@ object BiweightRobustRegression extends App { case class Params( input: String = null, - numIterations: Int = 100, + numIterations: Int = 5000, stepSize: Double = 1.0, regType: RegType = L2, regParam: Double = 0.1) From 07e0e78d28d198ef36910933d0e199d0d4fa91ec Mon Sep 17 00:00:00 2001 From: fjiang6 Date: Sun, 24 Aug 2014 14:42:32 -0700 Subject: [PATCH 7/7] to validate the code style --- .../org/apache/spark/mllib/regression/RobustRegression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala index f3afd94a59312..061b6da75c6c7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RobustRegression.scala @@ -294,4 +294,4 @@ object BiweightRobustRegressionWithSGD { numIterations: Int): BiweightRobustRegressionModel = { train(input, numIterations, 1.0, 1.0) } -} \ No newline at end of file +}