From 64c1d8ba26a4cf852efb09485526364323b26eb3 Mon Sep 17 00:00:00 2001 From: sethah Date: Mon, 5 Jun 2017 13:40:41 -0700 Subject: [PATCH 1/7] tests pass --- .../classification/LogisticRegression.scala | 635 +++++++++--------- .../optim/aggregator/LogisticAggregator.scala | 377 +++++++++++ .../loss/DifferentiableRegularization.scala | 36 +- .../ml/regression/LinearRegression.scala | 3 +- .../LogisticRegressionSuite.scala | 10 +- 5 files changed, 742 insertions(+), 319 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 567af0488e1b4..fc95a20f761b8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -20,11 +20,9 @@ package org.apache.spark.ml.classification import java.util.Locale import scala.collection.mutable - import breeze.linalg.{DenseVector => BDV} import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, LBFGSB => BreezeLBFGSB, OWLQN => BreezeOWLQN} import org.apache.hadoop.fs.Path - import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.broadcast.Broadcast @@ -32,6 +30,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ import org.apache.spark.ml.linalg.BLAS._ +import org.apache.spark.ml.optim.aggregator.LogisticAggregator +import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -598,8 +598,23 @@ class LogisticRegression @Since("1.2.0") ( val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam) val bcFeaturesStd = instances.context.broadcast(featuresStd) - val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept), - $(standardization), bcFeaturesStd, regParamL2, multinomial = isMultinomial, +// val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept), +// $(standardization), bcFeaturesStd, regParamL2, multinomial = isMultinomial, +// $(aggregationDepth)) + val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept), + multinomial = isMultinomial)(_) + val getFeaturesStd = (j: Int) => featuresStd(j % numFeatures) + val regularization = if (regParamL2 != 0.0) { + val shouldApply = (idx: Int) => { + // do not apply to intercepts + idx >= 0 && idx < numFeatures * numClasses + } + Some(new L2Regularization(regParamL2, shouldApply, + if ($(standardization)) None else Some(getFeaturesStd))) + } else { + None + } + val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth)) val numCoeffsPlusIntercepts = numFeaturesPlusIntercept * numCoefficientSets @@ -1629,309 +1644,309 @@ class BinaryLogisticRegressionSummary private[classification] ( * for speed. We convert back to row major order when we create the model, * since this form is optimal for the matrix operations used for prediction. */ -private class LogisticAggregator( - bcCoefficients: Broadcast[Vector], - bcFeaturesStd: Broadcast[Array[Double]], - numClasses: Int, - fitIntercept: Boolean, - multinomial: Boolean) extends Serializable with Logging { - - private val numFeatures = bcFeaturesStd.value.length - private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures - private val coefficientSize = bcCoefficients.value.size - private val numCoefficientSets = if (multinomial) numClasses else 1 - if (multinomial) { - require(numClasses == coefficientSize / numFeaturesPlusIntercept, s"The number of " + - s"coefficients should be ${numClasses * numFeaturesPlusIntercept} but was $coefficientSize") - } else { - require(coefficientSize == numFeaturesPlusIntercept, s"Expected $numFeaturesPlusIntercept " + - s"coefficients but got $coefficientSize") - require(numClasses == 1 || numClasses == 2, s"Binary logistic aggregator requires numClasses " + - s"in {1, 2} but found $numClasses.") - } - - private var weightSum = 0.0 - private var lossSum = 0.0 - - @transient private lazy val coefficientsArray: Array[Double] = bcCoefficients.value match { - case DenseVector(values) => values - case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " + - s"got type ${bcCoefficients.value.getClass}.)") - } - private lazy val gradientSumArray = new Array[Double](coefficientSize) - - if (multinomial && numClasses <= 2) { - logInfo(s"Multinomial logistic regression for binary classification yields separate " + - s"coefficients for positive and negative classes. When no regularization is applied, the" + - s"result will be effectively the same as binary logistic regression. When regularization" + - s"is applied, multinomial loss will produce a result different from binary loss.") - } - - /** Update gradient and loss using binary loss function. */ - private def binaryUpdateInPlace( - features: Vector, - weight: Double, - label: Double): Unit = { - - val localFeaturesStd = bcFeaturesStd.value - val localCoefficients = coefficientsArray - val localGradientArray = gradientSumArray - val margin = - { - var sum = 0.0 - features.foreachActive { (index, value) => - if (localFeaturesStd(index) != 0.0 && value != 0.0) { - sum += localCoefficients(index) * value / localFeaturesStd(index) - } - } - if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) - sum - } - - val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) - - features.foreachActive { (index, value) => - if (localFeaturesStd(index) != 0.0 && value != 0.0) { - localGradientArray(index) += multiplier * value / localFeaturesStd(index) - } - } - - if (fitIntercept) { - localGradientArray(numFeaturesPlusIntercept - 1) += multiplier - } - - if (label > 0) { - // The following is equivalent to log(1 + exp(margin)) but more numerically stable. - lossSum += weight * MLUtils.log1pExp(margin) - } else { - lossSum += weight * (MLUtils.log1pExp(margin) - margin) - } - } - - /** Update gradient and loss using multinomial (softmax) loss function. */ - private def multinomialUpdateInPlace( - features: Vector, - weight: Double, - label: Double): Unit = { - // TODO: use level 2 BLAS operations - /* - Note: this can still be used when numClasses = 2 for binary - logistic regression without pivoting. - */ - val localFeaturesStd = bcFeaturesStd.value - val localCoefficients = coefficientsArray - val localGradientArray = gradientSumArray - - // marginOfLabel is margins(label) in the formula - var marginOfLabel = 0.0 - var maxMargin = Double.NegativeInfinity - - val margins = new Array[Double](numClasses) - features.foreachActive { (index, value) => - val stdValue = value / localFeaturesStd(index) - var j = 0 - while (j < numClasses) { - margins(j) += localCoefficients(index * numClasses + j) * stdValue - j += 1 - } - } - var i = 0 - while (i < numClasses) { - if (fitIntercept) { - margins(i) += localCoefficients(numClasses * numFeatures + i) - } - if (i == label.toInt) marginOfLabel = margins(i) - if (margins(i) > maxMargin) { - maxMargin = margins(i) - } - i += 1 - } - - /** - * When maxMargin is greater than 0, the original formula could cause overflow. - * We address this by subtracting maxMargin from all the margins, so it's guaranteed - * that all of the new margins will be smaller than zero to prevent arithmetic overflow. - */ - val multipliers = new Array[Double](numClasses) - val sum = { - var temp = 0.0 - var i = 0 - while (i < numClasses) { - if (maxMargin > 0) margins(i) -= maxMargin - val exp = math.exp(margins(i)) - temp += exp - multipliers(i) = exp - i += 1 - } - temp - } - - margins.indices.foreach { i => - multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0) - } - features.foreachActive { (index, value) => - if (localFeaturesStd(index) != 0.0 && value != 0.0) { - val stdValue = value / localFeaturesStd(index) - var j = 0 - while (j < numClasses) { - localGradientArray(index * numClasses + j) += - weight * multipliers(j) * stdValue - j += 1 - } - } - } - if (fitIntercept) { - var i = 0 - while (i < numClasses) { - localGradientArray(numFeatures * numClasses + i) += weight * multipliers(i) - i += 1 - } - } - - val loss = if (maxMargin > 0) { - math.log(sum) - marginOfLabel + maxMargin - } else { - math.log(sum) - marginOfLabel - } - lossSum += weight * loss - } - - /** - * Add a new training instance to this LogisticAggregator, and update the loss and gradient - * of the objective function. - * - * @param instance The instance of data point to be added. - * @return This LogisticAggregator object. - */ - def add(instance: Instance): this.type = { - instance match { case Instance(label, weight, features) => - - if (weight == 0.0) return this - - if (multinomial) { - multinomialUpdateInPlace(features, weight, label) - } else { - binaryUpdateInPlace(features, weight, label) - } - weightSum += weight - this - } - } - - /** - * Merge another LogisticAggregator, and update the loss and gradient - * of the objective function. - * (Note that it's in place merging; as a result, `this` object will be modified.) - * - * @param other The other LogisticAggregator to be merged. - * @return This LogisticAggregator object. - */ - def merge(other: LogisticAggregator): this.type = { - - if (other.weightSum != 0.0) { - weightSum += other.weightSum - lossSum += other.lossSum - - var i = 0 - val localThisGradientSumArray = this.gradientSumArray - val localOtherGradientSumArray = other.gradientSumArray - val len = localThisGradientSumArray.length - while (i < len) { - localThisGradientSumArray(i) += localOtherGradientSumArray(i) - i += 1 - } - } - this - } - - def loss: Double = { - require(weightSum > 0.0, s"The effective number of instances should be " + - s"greater than 0.0, but $weightSum.") - lossSum / weightSum - } - - def gradient: Matrix = { - require(weightSum > 0.0, s"The effective number of instances should be " + - s"greater than 0.0, but $weightSum.") - val result = Vectors.dense(gradientSumArray.clone()) - scal(1.0 / weightSum, result) - new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, result.toArray) - } -} - -/** - * LogisticCostFun implements Breeze's DiffFunction[T] for a multinomial (softmax) logistic loss - * function, as used in multi-class classification (it is also used in binary logistic regression). - * It returns the loss and gradient with L2 regularization at a particular point (coefficients). - * It's used in Breeze's convex optimization routines. - */ -private class LogisticCostFun( - instances: RDD[Instance], - numClasses: Int, - fitIntercept: Boolean, - standardization: Boolean, - bcFeaturesStd: Broadcast[Array[Double]], - regParamL2: Double, - multinomial: Boolean, - aggregationDepth: Int) extends DiffFunction[BDV[Double]] { - - override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { - val coeffs = Vectors.fromBreeze(coefficients) - val bcCoeffs = instances.context.broadcast(coeffs) - val featuresStd = bcFeaturesStd.value - val numFeatures = featuresStd.length - val numCoefficientSets = if (multinomial) numClasses else 1 - val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures - - val logisticAggregator = { - val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance) - val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2) - - instances.treeAggregate( - new LogisticAggregator(bcCoeffs, bcFeaturesStd, numClasses, fitIntercept, - multinomial) - )(seqOp, combOp, aggregationDepth) - } - - val totalGradientMatrix = logisticAggregator.gradient - val coefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, coeffs.toArray) - // regVal is the sum of coefficients squares excluding intercept for L2 regularization. - val regVal = if (regParamL2 == 0.0) { - 0.0 - } else { - var sum = 0.0 - coefMatrix.foreachActive { case (classIndex, featureIndex, value) => - // We do not apply regularization to the intercepts - val isIntercept = fitIntercept && (featureIndex == numFeatures) - if (!isIntercept) { - // The following code will compute the loss of the regularization; also - // the gradient of the regularization, and add back to totalGradientArray. - sum += { - if (standardization) { - val gradValue = totalGradientMatrix(classIndex, featureIndex) - totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * value) - value * value - } else { - if (featuresStd(featureIndex) != 0.0) { - // If `standardization` is false, we still standardize the data - // to improve the rate of convergence; as a result, we have to - // perform this reverse standardization by penalizing each component - // differently to get effectively the same objective function when - // the training dataset is not standardized. - val temp = value / (featuresStd(featureIndex) * featuresStd(featureIndex)) - val gradValue = totalGradientMatrix(classIndex, featureIndex) - totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * temp) - value * temp - } else { - 0.0 - } - } - } - } - } - 0.5 * regParamL2 * sum - } - bcCoeffs.destroy(blocking = false) - - (logisticAggregator.loss + regVal, new BDV(totalGradientMatrix.toArray)) - } -} +//private class LogisticAggregator( +// bcCoefficients: Broadcast[Vector], +// bcFeaturesStd: Broadcast[Array[Double]], +// numClasses: Int, +// fitIntercept: Boolean, +// multinomial: Boolean) extends Serializable with Logging { +// +// private val numFeatures = bcFeaturesStd.value.length +// private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures +// private val coefficientSize = bcCoefficients.value.size +// private val numCoefficientSets = if (multinomial) numClasses else 1 +// if (multinomial) { +// require(numClasses == coefficientSize / numFeaturesPlusIntercept, s"The number of " + +// s"coefficients should be ${numClasses * numFeaturesPlusIntercept} but was $coefficientSize") +// } else { +// require(coefficientSize == numFeaturesPlusIntercept, s"Expected $numFeaturesPlusIntercept " + +// s"coefficients but got $coefficientSize") +// require(numClasses == 1 || numClasses == 2, s"Binary logistic aggregator requires numClasses " + +// s"in {1, 2} but found $numClasses.") +// } +// +// private var weightSum = 0.0 +// private var lossSum = 0.0 +// +// @transient private lazy val coefficientsArray: Array[Double] = bcCoefficients.value match { +// case DenseVector(values) => values +// case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " + +// s"got type ${bcCoefficients.value.getClass}.)") +// } +// private lazy val gradientSumArray = new Array[Double](coefficientSize) +// +// if (multinomial && numClasses <= 2) { +// logInfo(s"Multinomial logistic regression for binary classification yields separate " + +// s"coefficients for positive and negative classes. When no regularization is applied, the" + +// s"result will be effectively the same as binary logistic regression. When regularization" + +// s"is applied, multinomial loss will produce a result different from binary loss.") +// } +// +// /** Update gradient and loss using binary loss function. */ +// private def binaryUpdateInPlace( +// features: Vector, +// weight: Double, +// label: Double): Unit = { +// +// val localFeaturesStd = bcFeaturesStd.value +// val localCoefficients = coefficientsArray +// val localGradientArray = gradientSumArray +// val margin = - { +// var sum = 0.0 +// features.foreachActive { (index, value) => +// if (localFeaturesStd(index) != 0.0 && value != 0.0) { +// sum += localCoefficients(index) * value / localFeaturesStd(index) +// } +// } +// if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) +// sum +// } +// +// val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) +// +// features.foreachActive { (index, value) => +// if (localFeaturesStd(index) != 0.0 && value != 0.0) { +// localGradientArray(index) += multiplier * value / localFeaturesStd(index) +// } +// } +// +// if (fitIntercept) { +// localGradientArray(numFeaturesPlusIntercept - 1) += multiplier +// } +// +// if (label > 0) { +// // The following is equivalent to log(1 + exp(margin)) but more numerically stable. +// lossSum += weight * MLUtils.log1pExp(margin) +// } else { +// lossSum += weight * (MLUtils.log1pExp(margin) - margin) +// } +// } +// +// /** Update gradient and loss using multinomial (softmax) loss function. */ +// private def multinomialUpdateInPlace( +// features: Vector, +// weight: Double, +// label: Double): Unit = { +// // TODO: use level 2 BLAS operations +// /* +// Note: this can still be used when numClasses = 2 for binary +// logistic regression without pivoting. +// */ +// val localFeaturesStd = bcFeaturesStd.value +// val localCoefficients = coefficientsArray +// val localGradientArray = gradientSumArray +// +// // marginOfLabel is margins(label) in the formula +// var marginOfLabel = 0.0 +// var maxMargin = Double.NegativeInfinity +// +// val margins = new Array[Double](numClasses) +// features.foreachActive { (index, value) => +// val stdValue = value / localFeaturesStd(index) +// var j = 0 +// while (j < numClasses) { +// margins(j) += localCoefficients(index * numClasses + j) * stdValue +// j += 1 +// } +// } +// var i = 0 +// while (i < numClasses) { +// if (fitIntercept) { +// margins(i) += localCoefficients(numClasses * numFeatures + i) +// } +// if (i == label.toInt) marginOfLabel = margins(i) +// if (margins(i) > maxMargin) { +// maxMargin = margins(i) +// } +// i += 1 +// } +// +// /** +// * When maxMargin is greater than 0, the original formula could cause overflow. +// * We address this by subtracting maxMargin from all the margins, so it's guaranteed +// * that all of the new margins will be smaller than zero to prevent arithmetic overflow. +// */ +// val multipliers = new Array[Double](numClasses) +// val sum = { +// var temp = 0.0 +// var i = 0 +// while (i < numClasses) { +// if (maxMargin > 0) margins(i) -= maxMargin +// val exp = math.exp(margins(i)) +// temp += exp +// multipliers(i) = exp +// i += 1 +// } +// temp +// } +// +// margins.indices.foreach { i => +// multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0) +// } +// features.foreachActive { (index, value) => +// if (localFeaturesStd(index) != 0.0 && value != 0.0) { +// val stdValue = value / localFeaturesStd(index) +// var j = 0 +// while (j < numClasses) { +// localGradientArray(index * numClasses + j) += +// weight * multipliers(j) * stdValue +// j += 1 +// } +// } +// } +// if (fitIntercept) { +// var i = 0 +// while (i < numClasses) { +// localGradientArray(numFeatures * numClasses + i) += weight * multipliers(i) +// i += 1 +// } +// } +// +// val loss = if (maxMargin > 0) { +// math.log(sum) - marginOfLabel + maxMargin +// } else { +// math.log(sum) - marginOfLabel +// } +// lossSum += weight * loss +// } +// +// /** +// * Add a new training instance to this LogisticAggregator, and update the loss and gradient +// * of the objective function. +// * +// * @param instance The instance of data point to be added. +// * @return This LogisticAggregator object. +// */ +// def add(instance: Instance): this.type = { +// instance match { case Instance(label, weight, features) => +// +// if (weight == 0.0) return this +// +// if (multinomial) { +// multinomialUpdateInPlace(features, weight, label) +// } else { +// binaryUpdateInPlace(features, weight, label) +// } +// weightSum += weight +// this +// } +// } +// +// /** +// * Merge another LogisticAggregator, and update the loss and gradient +// * of the objective function. +// * (Note that it's in place merging; as a result, `this` object will be modified.) +// * +// * @param other The other LogisticAggregator to be merged. +// * @return This LogisticAggregator object. +// */ +// def merge(other: LogisticAggregator): this.type = { +// +// if (other.weightSum != 0.0) { +// weightSum += other.weightSum +// lossSum += other.lossSum +// +// var i = 0 +// val localThisGradientSumArray = this.gradientSumArray +// val localOtherGradientSumArray = other.gradientSumArray +// val len = localThisGradientSumArray.length +// while (i < len) { +// localThisGradientSumArray(i) += localOtherGradientSumArray(i) +// i += 1 +// } +// } +// this +// } +// +// def loss: Double = { +// require(weightSum > 0.0, s"The effective number of instances should be " + +// s"greater than 0.0, but $weightSum.") +// lossSum / weightSum +// } +// +// def gradient: Matrix = { +// require(weightSum > 0.0, s"The effective number of instances should be " + +// s"greater than 0.0, but $weightSum.") +// val result = Vectors.dense(gradientSumArray.clone()) +// scal(1.0 / weightSum, result) +// new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, result.toArray) +// } +//} +// +///** +// * LogisticCostFun implements Breeze's DiffFunction[T] for a multinomial (softmax) logistic loss +// * function, as used in multi-class classification (it is also used in binary logistic regression). +// * It returns the loss and gradient with L2 regularization at a particular point (coefficients). +// * It's used in Breeze's convex optimization routines. +// */ +//private class LogisticCostFun( +// instances: RDD[Instance], +// numClasses: Int, +// fitIntercept: Boolean, +// standardization: Boolean, +// bcFeaturesStd: Broadcast[Array[Double]], +// regParamL2: Double, +// multinomial: Boolean, +// aggregationDepth: Int) extends DiffFunction[BDV[Double]] { +// +// override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { +// val coeffs = Vectors.fromBreeze(coefficients) +// val bcCoeffs = instances.context.broadcast(coeffs) +// val featuresStd = bcFeaturesStd.value +// val numFeatures = featuresStd.length +// val numCoefficientSets = if (multinomial) numClasses else 1 +// val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures +// +// val logisticAggregator = { +// val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance) +// val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2) +// +// instances.treeAggregate( +// new LogisticAggregator(bcCoeffs, bcFeaturesStd, numClasses, fitIntercept, +// multinomial) +// )(seqOp, combOp, aggregationDepth) +// } +// +// val totalGradientMatrix = logisticAggregator.gradient +// val coefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, coeffs.toArray) +// // regVal is the sum of coefficients squares excluding intercept for L2 regularization. +// val regVal = if (regParamL2 == 0.0) { +// 0.0 +// } else { +// var sum = 0.0 +// coefMatrix.foreachActive { case (classIndex, featureIndex, value) => +// // We do not apply regularization to the intercepts +// val isIntercept = fitIntercept && (featureIndex == numFeatures) +// if (!isIntercept) { +// // The following code will compute the loss of the regularization; also +// // the gradient of the regularization, and add back to totalGradientArray. +// sum += { +// if (standardization) { +// val gradValue = totalGradientMatrix(classIndex, featureIndex) +// totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * value) +// value * value +// } else { +// if (featuresStd(featureIndex) != 0.0) { +// // If `standardization` is false, we still standardize the data +// // to improve the rate of convergence; as a result, we have to +// // perform this reverse standardization by penalizing each component +// // differently to get effectively the same objective function when +// // the training dataset is not standardized. +// val temp = value / (featuresStd(featureIndex) * featuresStd(featureIndex)) +// val gradValue = totalGradientMatrix(classIndex, featureIndex) +// totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * temp) +// value * temp +// } else { +// 0.0 +// } +// } +// } +// } +// } +// 0.5 * regParamL2 * sum +// } +// bcCoeffs.destroy(blocking = false) +// +// (logisticAggregator.loss + regVal, new BDV(totalGradientMatrix.toArray)) +// } +//} diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala new file mode 100644 index 0000000000000..dfb33ce8c4f37 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.optim.aggregator + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.Logging +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{DenseVector, Vector} +import org.apache.spark.mllib.util.MLUtils + +/** + * LogisticAggregator computes the gradient and loss for binary or multinomial logistic (softmax) + * loss function, as used in classification for instances in sparse or dense vector in an online + * fashion. + * + * Two LogisticAggregators can be merged together to have a summary of loss and gradient of + * the corresponding joint dataset. + * + * For improving the convergence rate during the optimization process and also to prevent against + * features with very large variances exerting an overly large influence during model training, + * packages like R's GLMNET perform the scaling to unit variance and remove the mean in order to + * reduce the condition number. The model is then trained in this scaled space, but returns the + * coefficients in the original scale. See page 9 in + * http://cran.r-project.org/web/packages/glmnet/glmnet.pdf + * + * However, we don't want to apply the [[org.apache.spark.ml.feature.StandardScaler]] on the + * training dataset, and then cache the standardized dataset since it will create a lot of overhead. + * As a result, we perform the scaling implicitly when we compute the objective function (though + * we do not subtract the mean). + * + * Note that there is a difference between multinomial (softmax) and binary loss. The binary case + * uses one outcome class as a "pivot" and regresses the other class against the pivot. In the + * multinomial case, the softmax loss function is used to model each class probability + * independently. Using softmax loss produces `K` sets of coefficients, while using a pivot class + * produces `K - 1` sets of coefficients (a single coefficient vector in the binary case). In the + * binary case, we can say that the coefficients are shared between the positive and negative + * classes. When regularization is applied, multinomial (softmax) loss will produce a result + * different from binary loss since the positive and negative don't share the coefficients while the + * binary regression shares the coefficients between positive and negative. + * + * The following is a mathematical derivation for the multinomial (softmax) loss. + * + * The probability of the multinomial outcome $y$ taking on any of the K possible outcomes is: + * + *
+ * $$ + * P(y_i=0|\vec{x}_i, \beta) = \frac{e^{\vec{x}_i^T \vec{\beta}_0}}{\sum_{k=0}^{K-1} + * e^{\vec{x}_i^T \vec{\beta}_k}} \\ + * P(y_i=1|\vec{x}_i, \beta) = \frac{e^{\vec{x}_i^T \vec{\beta}_1}}{\sum_{k=0}^{K-1} + * e^{\vec{x}_i^T \vec{\beta}_k}}\\ + * P(y_i=K-1|\vec{x}_i, \beta) = \frac{e^{\vec{x}_i^T \vec{\beta}_{K-1}}\,}{\sum_{k=0}^{K-1} + * e^{\vec{x}_i^T \vec{\beta}_k}} + * $$ + *
+ * + * The model coefficients $\beta = (\beta_0, \beta_1, \beta_2, ..., \beta_{K-1})$ become a matrix + * which has dimension of $K \times (N+1)$ if the intercepts are added. If the intercepts are not + * added, the dimension will be $K \times N$. + * + * Note that the coefficients in the model above lack identifiability. That is, any constant scalar + * can be added to all of the coefficients and the probabilities remain the same. + * + *
+ * $$ + * \begin{align} + * \frac{e^{\vec{x}_i^T \left(\vec{\beta}_0 + \vec{c}\right)}}{\sum_{k=0}^{K-1} + * e^{\vec{x}_i^T \left(\vec{\beta}_k + \vec{c}\right)}} + * = \frac{e^{\vec{x}_i^T \vec{\beta}_0}e^{\vec{x}_i^T \vec{c}}\,}{e^{\vec{x}_i^T \vec{c}} + * \sum_{k=0}^{K-1} e^{\vec{x}_i^T \vec{\beta}_k}} + * = \frac{e^{\vec{x}_i^T \vec{\beta}_0}}{\sum_{k=0}^{K-1} e^{\vec{x}_i^T \vec{\beta}_k}} + * \end{align} + * $$ + *
+ * + * However, when regularization is added to the loss function, the coefficients are indeed + * identifiable because there is only one set of coefficients which minimizes the regularization + * term. When no regularization is applied, we choose the coefficients with the minimum L2 + * penalty for consistency and reproducibility. For further discussion see: + * + * Friedman, et al. "Regularization Paths for Generalized Linear Models via Coordinate Descent" + * + * The loss of objective function for a single instance of data (we do not include the + * regularization term here for simplicity) can be written as + * + *
+ * $$ + * \begin{align} + * \ell\left(\beta, x_i\right) &= -log{P\left(y_i \middle| \vec{x}_i, \beta\right)} \\ + * &= log\left(\sum_{k=0}^{K-1}e^{\vec{x}_i^T \vec{\beta}_k}\right) - \vec{x}_i^T \vec{\beta}_y\\ + * &= log\left(\sum_{k=0}^{K-1} e^{margins_k}\right) - margins_y + * \end{align} + * $$ + *
+ * + * where ${margins}_k = \vec{x}_i^T \vec{\beta}_k$. + * + * For optimization, we have to calculate the first derivative of the loss function, and a simple + * calculation shows that + * + *
+ * $$ + * \begin{align} + * \frac{\partial \ell(\beta, \vec{x}_i, w_i)}{\partial \beta_{j, k}} + * &= x_{i,j} \cdot w_i \cdot \left(\frac{e^{\vec{x}_i \cdot \vec{\beta}_k}}{\sum_{k'=0}^{K-1} + * e^{\vec{x}_i \cdot \vec{\beta}_{k'}}\,} - I_{y=k}\right) \\ + * &= x_{i, j} \cdot w_i \cdot multiplier_k + * \end{align} + * $$ + *
+ * + * where $w_i$ is the sample weight, $I_{y=k}$ is an indicator function + * + *
+ * $$ + * I_{y=k} = \begin{cases} + * 1 & y = k \\ + * 0 & else + * \end{cases} + * $$ + *
+ * + * and + * + *
+ * $$ + * multiplier_k = \left(\frac{e^{\vec{x}_i \cdot \vec{\beta}_k}}{\sum_{k=0}^{K-1} + * e^{\vec{x}_i \cdot \vec{\beta}_k}} - I_{y=k}\right) + * $$ + *
+ * + * If any of margins is larger than 709.78, the numerical computation of multiplier and loss + * function will suffer from arithmetic overflow. This issue occurs when there are outliers in + * data which are far away from the hyperplane, and this will cause the failing of training once + * infinity is introduced. Note that this is only a concern when max(margins) > 0. + * + * Fortunately, when max(margins) = maxMargin > 0, the loss function and the multiplier can + * easily be rewritten into the following equivalent numerically stable formula. + * + *
+ * $$ + * \ell\left(\beta, x\right) = log\left(\sum_{k=0}^{K-1} e^{margins_k - maxMargin}\right) - + * margins_{y} + maxMargin + * $$ + *
+ * + * Note that each term, $(margins_k - maxMargin)$ in the exponential is no greater than zero; as a + * result, overflow will not happen with this formula. + * + * For $multiplier$, a similar trick can be applied as the following, + * + *
+ * $$ + * multiplier_k = \left(\frac{e^{\vec{x}_i \cdot \vec{\beta}_k - maxMargin}}{\sum_{k'=0}^{K-1} + * e^{\vec{x}_i \cdot \vec{\beta}_{k'} - maxMargin}} - I_{y=k}\right) + * $$ + *
+ * + * + * @param bcCoefficients The broadcast coefficients corresponding to the features. + * @param bcFeaturesStd The broadcast standard deviation values of the features. + * @param numClasses the number of possible outcomes for k classes classification problem in + * Multinomial Logistic Regression. + * @param fitIntercept Whether to fit an intercept term. + * @param multinomial Whether to use multinomial (softmax) or binary loss + * @note In order to avoid unnecessary computation during calculation of the gradient updates + * we lay out the coefficients in column major order during training. This allows us to + * perform feature standardization once, while still retaining sequential memory access + * for speed. We convert back to row major order when we create the model, + * since this form is optimal for the matrix operations used for prediction. + */ +private[ml] class LogisticAggregator( + bcFeaturesStd: Broadcast[Array[Double]], + numClasses: Int, + fitIntercept: Boolean, + multinomial: Boolean)(bcCoefficients: Broadcast[Vector]) + extends DifferentiableLossAggregator[Instance, LogisticAggregator] with Logging { + + private val numFeatures = bcFeaturesStd.value.length + private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures + private val coefficientSize = bcCoefficients.value.size + protected override val dim: Int = coefficientSize + private val numCoefficientSets = if (multinomial) numClasses else 1 + if (multinomial) { + require(numClasses == coefficientSize / numFeaturesPlusIntercept, s"The number of " + + s"coefficients should be ${numClasses * numFeaturesPlusIntercept} but was $coefficientSize") + } else { + require(coefficientSize == numFeaturesPlusIntercept, s"Expected $numFeaturesPlusIntercept " + + s"coefficients but got $coefficientSize") + require(numClasses == 1 || numClasses == 2, s"Binary logistic aggregator requires numClasses " + + s"in {1, 2} but found $numClasses.") + } + + @transient private lazy val coefficientsArray: Array[Double] = bcCoefficients.value match { + case DenseVector(values) => values + case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " + + s"got type ${bcCoefficients.value.getClass}.)") + } + + if (multinomial && numClasses <= 2) { + logInfo(s"Multinomial logistic regression for binary classification yields separate " + + s"coefficients for positive and negative classes. When no regularization is applied, the" + + s"result will be effectively the same as binary logistic regression. When regularization" + + s"is applied, multinomial loss will produce a result different from binary loss.") + } + + /** Update gradient and loss using binary loss function. */ + private def binaryUpdateInPlace( + features: Vector, + weight: Double, + label: Double): Unit = { + + val localFeaturesStd = bcFeaturesStd.value + val localCoefficients = coefficientsArray + val localGradientArray = gradientSumArray + val margin = - { + var sum = 0.0 + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + sum += localCoefficients(index) * value / localFeaturesStd(index) + } + } + if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) + sum + } + + val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) + + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + localGradientArray(index) += multiplier * value / localFeaturesStd(index) + } + } + + if (fitIntercept) { + localGradientArray(numFeaturesPlusIntercept - 1) += multiplier + } + + if (label > 0) { + // The following is equivalent to log(1 + exp(margin)) but more numerically stable. + lossSum += weight * MLUtils.log1pExp(margin) + } else { + lossSum += weight * (MLUtils.log1pExp(margin) - margin) + } + } + + /** Update gradient and loss using multinomial (softmax) loss function. */ + private def multinomialUpdateInPlace( + features: Vector, + weight: Double, + label: Double): Unit = { + // TODO: use level 2 BLAS operations + /* + Note: this can still be used when numClasses = 2 for binary + logistic regression without pivoting. + */ + val localFeaturesStd = bcFeaturesStd.value + val localCoefficients = coefficientsArray + val localGradientArray = gradientSumArray + + // marginOfLabel is margins(label) in the formula + var marginOfLabel = 0.0 + var maxMargin = Double.NegativeInfinity + + val margins = new Array[Double](numClasses) + features.foreachActive { (index, value) => + val stdValue = value / localFeaturesStd(index) + var j = 0 + while (j < numClasses) { + margins(j) += localCoefficients(index * numClasses + j) * stdValue + j += 1 + } + } + var i = 0 + while (i < numClasses) { + if (fitIntercept) { + margins(i) += localCoefficients(numClasses * numFeatures + i) + } + if (i == label.toInt) marginOfLabel = margins(i) + if (margins(i) > maxMargin) { + maxMargin = margins(i) + } + i += 1 + } + + /** + * When maxMargin is greater than 0, the original formula could cause overflow. + * We address this by subtracting maxMargin from all the margins, so it's guaranteed + * that all of the new margins will be smaller than zero to prevent arithmetic overflow. + */ + val multipliers = new Array[Double](numClasses) + val sum = { + var temp = 0.0 + var i = 0 + while (i < numClasses) { + if (maxMargin > 0) margins(i) -= maxMargin + val exp = math.exp(margins(i)) + temp += exp + multipliers(i) = exp + i += 1 + } + temp + } + + margins.indices.foreach { i => + multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0) + } + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + val stdValue = value / localFeaturesStd(index) + var j = 0 + while (j < numClasses) { + localGradientArray(index * numClasses + j) += + weight * multipliers(j) * stdValue + j += 1 + } + } + } + if (fitIntercept) { + var i = 0 + while (i < numClasses) { + localGradientArray(numFeatures * numClasses + i) += weight * multipliers(i) + i += 1 + } + } + + val loss = if (maxMargin > 0) { + math.log(sum) - marginOfLabel + maxMargin + } else { + math.log(sum) - marginOfLabel + } + lossSum += weight * loss + } + + /** + * Add a new training instance to this LogisticAggregator, and update the loss and gradient + * of the objective function. + * + * @param instance The instance of data point to be added. + * @return This LogisticAggregator object. + */ + def add(instance: Instance): this.type = { + instance match { case Instance(label, weight, features) => + + if (weight == 0.0) return this + + if (multinomial) { + multinomialUpdateInPlace(features, weight, label) + } else { + binaryUpdateInPlace(features, weight, label) + } + weightSum += weight + this + } + } + +// def gradient: Matrix = { +// require(weightSum > 0.0, s"The effective number of instances should be " + +// s"greater than 0.0, but $weightSum.") +// val result = Vectors.dense(gradientSumArray.clone()) +// BLAS.scal(1.0 / weightSum, result) +// new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, result.toArray) +// } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala index 118c0ebfa513e..a8e75955677dc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala @@ -17,6 +17,7 @@ package org.apache.spark.ml.optim.loss import breeze.optimize.DiffFunction +import org.apache.spark.ml.linalg._ /** * A Breeze diff function which represents a cost function for differentiable regularization @@ -29,6 +30,8 @@ private[ml] trait DifferentiableRegularization[T] extends DiffFunction[T] { /** Magnitude of the regularization penalty. */ def regParam: Double + def getReg: Int => Double = (x: Int) => regParam + } /** @@ -44,7 +47,7 @@ private[ml] trait DifferentiableRegularization[T] extends DiffFunction[T] { private[ml] class L2Regularization( val regParam: Double, shouldApply: Int => Boolean, - featuresStd: Option[Array[Double]]) extends DifferentiableRegularization[Array[Double]] { + featuresStd: Option[Int => Double]) extends DifferentiableRegularization[Array[Double]] { override def calculate(coefficients: Array[Double]): (Double, Array[Double]) = { var sum = 0.0 @@ -52,8 +55,8 @@ private[ml] class L2Regularization( coefficients.indices.filter(shouldApply).foreach { j => val coef = coefficients(j) featuresStd match { - case Some(stds) => - val std = stds(j) + case Some(getStd) => + val std = getStd(j) if (std != 0.0) { val temp = coef / (std * std) sum += coef * temp @@ -69,3 +72,30 @@ private[ml] class L2Regularization( (0.5 * sum * regParam, gradient) } } + +//class StandardizedRegularization(val getStd: Int => Double) +// extends DifferentiableRegularization[Vector] { +// val regParam = 0.0 +// +// override def calculate(coefficients: Vector): (Double, Vector) = { +// +// +// } +//} +// +//class Regularization(override val getReg: Int => Double) +// extends DifferentiableRegularization[Vector] { +// val regParam = 0.0 +// +// override def calculate(coefficients: Vector): (Double, Vector) = { +// var sum = 0.0 +// // TODO: handle sparse? +// val gradient = new Array[Double](coefficients.size) +// coefficients.foreachActive { (i, v) => +// val reg = getReg(i) +// sum += 0.5 * v * v * reg +// gradient(i) = v * reg +// } +// (sum, Vectors.dense(gradient)) +// } +//} diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index db5ac4f14bd3b..3c790fa83afd7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -322,10 +322,11 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val getAggregatorFunc = new LeastSquaresAggregator(yStd, yMean, $(fitIntercept), bcFeaturesStd, bcFeaturesMean)(_) + val getFeaturesStd = (j: Int) => featuresStd(j) val regularization = if (effectiveL2RegParam != 0.0) { val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures Some(new L2Regularization(effectiveL2RegParam, shouldApply, - if ($(standardization)) None else Some(featuresStd))) + if ($(standardization)) None else Some(getFeaturesStd))) } else { None } diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 1ffd8dcd53d61..bfa32f9d63b12 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -21,12 +21,12 @@ import scala.collection.JavaConverters._ import scala.language.existentials import scala.util.Random import scala.util.control.Breaks._ - import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.ml.classification.LogisticRegressionSuite._ import org.apache.spark.ml.feature.{Instance, LabeledPoint} import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, Matrix, SparseMatrix, Vector, Vectors} +import org.apache.spark.ml.optim.aggregator.LogisticAggregator import org.apache.spark.ml.param.{ParamMap, ParamsSuite} import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ @@ -506,8 +506,8 @@ class LogisticRegressionSuite test("sparse coefficients in LogisticAggregator") { val bcCoefficientsBinary = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) - val binaryAgg = new LogisticAggregator(bcCoefficientsBinary, bcFeaturesStd, 2, - fitIntercept = true, multinomial = false) + val binaryAgg = new LogisticAggregator(bcFeaturesStd, 2, + fitIntercept = true, multinomial = false)(bcCoefficientsBinary) val thrownBinary = withClue("binary logistic aggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { binaryAgg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) @@ -516,8 +516,8 @@ class LogisticRegressionSuite assert(thrownBinary.getMessage.contains("coefficients only supports dense")) val bcCoefficientsMulti = spark.sparkContext.broadcast(Vectors.sparse(6, Array(0), Array(1.0))) - val multinomialAgg = new LogisticAggregator(bcCoefficientsMulti, bcFeaturesStd, 3, - fitIntercept = true, multinomial = true) + val multinomialAgg = new LogisticAggregator(bcFeaturesStd, 3, + fitIntercept = true, multinomial = true)(bcCoefficientsMulti) val thrown = withClue("multinomial logistic aggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { multinomialAgg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) From a5b18c22a68418c29a13aab7a1fd00eec5176658 Mon Sep 17 00:00:00 2001 From: sethah Date: Tue, 13 Jun 2017 22:57:19 -0700 Subject: [PATCH 2/7] passing tests, added tests to leastsquares agg --- .../classification/LogisticRegression.scala | 485 +----------------- .../optim/aggregator/LogisticAggregator.scala | 25 +- .../loss/DifferentiableRegularization.scala | 74 +-- .../spark/ml/optim/loss/RDDLossFunction.scala | 10 +- .../DifferentiableLossAggregatorSuite.scala | 38 ++ .../LeastSquaresAggregatorSuite.scala | 47 +- .../aggregator/LogisticAggregatorSuite.scala | 256 +++++++++ .../DifferentiableRegularizationSuite.scala | 7 +- .../ml/optim/loss/RDDLossFunctionSuite.scala | 6 +- 9 files changed, 371 insertions(+), 577 deletions(-) create mode 100644 mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index fc95a20f761b8..9d618642ab79b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -598,22 +598,22 @@ class LogisticRegression @Since("1.2.0") ( val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam) val bcFeaturesStd = instances.context.broadcast(featuresStd) -// val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept), -// $(standardization), bcFeaturesStd, regParamL2, multinomial = isMultinomial, -// $(aggregationDepth)) val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept), multinomial = isMultinomial)(_) - val getFeaturesStd = (j: Int) => featuresStd(j % numFeatures) + val getFeaturesStd = (j: Int) => if (j >= 0 && j < numCoefficientSets * numFeatures) { + featuresStd(j / numCoefficientSets) + } else { + 0.0 + } + val regularization = if (regParamL2 != 0.0) { - val shouldApply = (idx: Int) => { - // do not apply to intercepts - idx >= 0 && idx < numFeatures * numClasses - } + val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures * numCoefficientSets Some(new L2Regularization(regParamL2, shouldApply, if ($(standardization)) None else Some(getFeaturesStd))) } else { None } + val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth)) @@ -1251,7 +1251,7 @@ object LogisticRegressionModel extends MLReadable[LogisticRegressionModel] { * Two MultilabelSummarizer can be merged together to have a statistical summary of the * corresponding joint dataset. */ -private[classification] class MultiClassSummarizer extends Serializable { +private[ml] class MultiClassSummarizer extends Serializable { // The first element of value in distinctMap is the actually number of instances, // and the second element of value is sum of the weights. private val distinctMap = new mutable.HashMap[Int, (Long, Double)] @@ -1483,470 +1483,3 @@ class BinaryLogisticRegressionSummary private[classification] ( binaryMetrics.recallByThreshold().toDF("threshold", "recall") } } - -/** - * LogisticAggregator computes the gradient and loss for binary or multinomial logistic (softmax) - * loss function, as used in classification for instances in sparse or dense vector in an online - * fashion. - * - * Two LogisticAggregators can be merged together to have a summary of loss and gradient of - * the corresponding joint dataset. - * - * For improving the convergence rate during the optimization process and also to prevent against - * features with very large variances exerting an overly large influence during model training, - * packages like R's GLMNET perform the scaling to unit variance and remove the mean in order to - * reduce the condition number. The model is then trained in this scaled space, but returns the - * coefficients in the original scale. See page 9 in - * http://cran.r-project.org/web/packages/glmnet/glmnet.pdf - * - * However, we don't want to apply the [[org.apache.spark.ml.feature.StandardScaler]] on the - * training dataset, and then cache the standardized dataset since it will create a lot of overhead. - * As a result, we perform the scaling implicitly when we compute the objective function (though - * we do not subtract the mean). - * - * Note that there is a difference between multinomial (softmax) and binary loss. The binary case - * uses one outcome class as a "pivot" and regresses the other class against the pivot. In the - * multinomial case, the softmax loss function is used to model each class probability - * independently. Using softmax loss produces `K` sets of coefficients, while using a pivot class - * produces `K - 1` sets of coefficients (a single coefficient vector in the binary case). In the - * binary case, we can say that the coefficients are shared between the positive and negative - * classes. When regularization is applied, multinomial (softmax) loss will produce a result - * different from binary loss since the positive and negative don't share the coefficients while the - * binary regression shares the coefficients between positive and negative. - * - * The following is a mathematical derivation for the multinomial (softmax) loss. - * - * The probability of the multinomial outcome $y$ taking on any of the K possible outcomes is: - * - *
- * $$ - * P(y_i=0|\vec{x}_i, \beta) = \frac{e^{\vec{x}_i^T \vec{\beta}_0}}{\sum_{k=0}^{K-1} - * e^{\vec{x}_i^T \vec{\beta}_k}} \\ - * P(y_i=1|\vec{x}_i, \beta) = \frac{e^{\vec{x}_i^T \vec{\beta}_1}}{\sum_{k=0}^{K-1} - * e^{\vec{x}_i^T \vec{\beta}_k}}\\ - * P(y_i=K-1|\vec{x}_i, \beta) = \frac{e^{\vec{x}_i^T \vec{\beta}_{K-1}}\,}{\sum_{k=0}^{K-1} - * e^{\vec{x}_i^T \vec{\beta}_k}} - * $$ - *
- * - * The model coefficients $\beta = (\beta_0, \beta_1, \beta_2, ..., \beta_{K-1})$ become a matrix - * which has dimension of $K \times (N+1)$ if the intercepts are added. If the intercepts are not - * added, the dimension will be $K \times N$. - * - * Note that the coefficients in the model above lack identifiability. That is, any constant scalar - * can be added to all of the coefficients and the probabilities remain the same. - * - *
- * $$ - * \begin{align} - * \frac{e^{\vec{x}_i^T \left(\vec{\beta}_0 + \vec{c}\right)}}{\sum_{k=0}^{K-1} - * e^{\vec{x}_i^T \left(\vec{\beta}_k + \vec{c}\right)}} - * = \frac{e^{\vec{x}_i^T \vec{\beta}_0}e^{\vec{x}_i^T \vec{c}}\,}{e^{\vec{x}_i^T \vec{c}} - * \sum_{k=0}^{K-1} e^{\vec{x}_i^T \vec{\beta}_k}} - * = \frac{e^{\vec{x}_i^T \vec{\beta}_0}}{\sum_{k=0}^{K-1} e^{\vec{x}_i^T \vec{\beta}_k}} - * \end{align} - * $$ - *
- * - * However, when regularization is added to the loss function, the coefficients are indeed - * identifiable because there is only one set of coefficients which minimizes the regularization - * term. When no regularization is applied, we choose the coefficients with the minimum L2 - * penalty for consistency and reproducibility. For further discussion see: - * - * Friedman, et al. "Regularization Paths for Generalized Linear Models via Coordinate Descent" - * - * The loss of objective function for a single instance of data (we do not include the - * regularization term here for simplicity) can be written as - * - *
- * $$ - * \begin{align} - * \ell\left(\beta, x_i\right) &= -log{P\left(y_i \middle| \vec{x}_i, \beta\right)} \\ - * &= log\left(\sum_{k=0}^{K-1}e^{\vec{x}_i^T \vec{\beta}_k}\right) - \vec{x}_i^T \vec{\beta}_y\\ - * &= log\left(\sum_{k=0}^{K-1} e^{margins_k}\right) - margins_y - * \end{align} - * $$ - *
- * - * where ${margins}_k = \vec{x}_i^T \vec{\beta}_k$. - * - * For optimization, we have to calculate the first derivative of the loss function, and a simple - * calculation shows that - * - *
- * $$ - * \begin{align} - * \frac{\partial \ell(\beta, \vec{x}_i, w_i)}{\partial \beta_{j, k}} - * &= x_{i,j} \cdot w_i \cdot \left(\frac{e^{\vec{x}_i \cdot \vec{\beta}_k}}{\sum_{k'=0}^{K-1} - * e^{\vec{x}_i \cdot \vec{\beta}_{k'}}\,} - I_{y=k}\right) \\ - * &= x_{i, j} \cdot w_i \cdot multiplier_k - * \end{align} - * $$ - *
- * - * where $w_i$ is the sample weight, $I_{y=k}$ is an indicator function - * - *
- * $$ - * I_{y=k} = \begin{cases} - * 1 & y = k \\ - * 0 & else - * \end{cases} - * $$ - *
- * - * and - * - *
- * $$ - * multiplier_k = \left(\frac{e^{\vec{x}_i \cdot \vec{\beta}_k}}{\sum_{k=0}^{K-1} - * e^{\vec{x}_i \cdot \vec{\beta}_k}} - I_{y=k}\right) - * $$ - *
- * - * If any of margins is larger than 709.78, the numerical computation of multiplier and loss - * function will suffer from arithmetic overflow. This issue occurs when there are outliers in - * data which are far away from the hyperplane, and this will cause the failing of training once - * infinity is introduced. Note that this is only a concern when max(margins) > 0. - * - * Fortunately, when max(margins) = maxMargin > 0, the loss function and the multiplier can - * easily be rewritten into the following equivalent numerically stable formula. - * - *
- * $$ - * \ell\left(\beta, x\right) = log\left(\sum_{k=0}^{K-1} e^{margins_k - maxMargin}\right) - - * margins_{y} + maxMargin - * $$ - *
- * - * Note that each term, $(margins_k - maxMargin)$ in the exponential is no greater than zero; as a - * result, overflow will not happen with this formula. - * - * For $multiplier$, a similar trick can be applied as the following, - * - *
- * $$ - * multiplier_k = \left(\frac{e^{\vec{x}_i \cdot \vec{\beta}_k - maxMargin}}{\sum_{k'=0}^{K-1} - * e^{\vec{x}_i \cdot \vec{\beta}_{k'} - maxMargin}} - I_{y=k}\right) - * $$ - *
- * - * @param bcCoefficients The broadcast coefficients corresponding to the features. - * @param bcFeaturesStd The broadcast standard deviation values of the features. - * @param numClasses the number of possible outcomes for k classes classification problem in - * Multinomial Logistic Regression. - * @param fitIntercept Whether to fit an intercept term. - * @param multinomial Whether to use multinomial (softmax) or binary loss - * - * @note In order to avoid unnecessary computation during calculation of the gradient updates - * we lay out the coefficients in column major order during training. This allows us to - * perform feature standardization once, while still retaining sequential memory access - * for speed. We convert back to row major order when we create the model, - * since this form is optimal for the matrix operations used for prediction. - */ -//private class LogisticAggregator( -// bcCoefficients: Broadcast[Vector], -// bcFeaturesStd: Broadcast[Array[Double]], -// numClasses: Int, -// fitIntercept: Boolean, -// multinomial: Boolean) extends Serializable with Logging { -// -// private val numFeatures = bcFeaturesStd.value.length -// private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures -// private val coefficientSize = bcCoefficients.value.size -// private val numCoefficientSets = if (multinomial) numClasses else 1 -// if (multinomial) { -// require(numClasses == coefficientSize / numFeaturesPlusIntercept, s"The number of " + -// s"coefficients should be ${numClasses * numFeaturesPlusIntercept} but was $coefficientSize") -// } else { -// require(coefficientSize == numFeaturesPlusIntercept, s"Expected $numFeaturesPlusIntercept " + -// s"coefficients but got $coefficientSize") -// require(numClasses == 1 || numClasses == 2, s"Binary logistic aggregator requires numClasses " + -// s"in {1, 2} but found $numClasses.") -// } -// -// private var weightSum = 0.0 -// private var lossSum = 0.0 -// -// @transient private lazy val coefficientsArray: Array[Double] = bcCoefficients.value match { -// case DenseVector(values) => values -// case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " + -// s"got type ${bcCoefficients.value.getClass}.)") -// } -// private lazy val gradientSumArray = new Array[Double](coefficientSize) -// -// if (multinomial && numClasses <= 2) { -// logInfo(s"Multinomial logistic regression for binary classification yields separate " + -// s"coefficients for positive and negative classes. When no regularization is applied, the" + -// s"result will be effectively the same as binary logistic regression. When regularization" + -// s"is applied, multinomial loss will produce a result different from binary loss.") -// } -// -// /** Update gradient and loss using binary loss function. */ -// private def binaryUpdateInPlace( -// features: Vector, -// weight: Double, -// label: Double): Unit = { -// -// val localFeaturesStd = bcFeaturesStd.value -// val localCoefficients = coefficientsArray -// val localGradientArray = gradientSumArray -// val margin = - { -// var sum = 0.0 -// features.foreachActive { (index, value) => -// if (localFeaturesStd(index) != 0.0 && value != 0.0) { -// sum += localCoefficients(index) * value / localFeaturesStd(index) -// } -// } -// if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) -// sum -// } -// -// val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label) -// -// features.foreachActive { (index, value) => -// if (localFeaturesStd(index) != 0.0 && value != 0.0) { -// localGradientArray(index) += multiplier * value / localFeaturesStd(index) -// } -// } -// -// if (fitIntercept) { -// localGradientArray(numFeaturesPlusIntercept - 1) += multiplier -// } -// -// if (label > 0) { -// // The following is equivalent to log(1 + exp(margin)) but more numerically stable. -// lossSum += weight * MLUtils.log1pExp(margin) -// } else { -// lossSum += weight * (MLUtils.log1pExp(margin) - margin) -// } -// } -// -// /** Update gradient and loss using multinomial (softmax) loss function. */ -// private def multinomialUpdateInPlace( -// features: Vector, -// weight: Double, -// label: Double): Unit = { -// // TODO: use level 2 BLAS operations -// /* -// Note: this can still be used when numClasses = 2 for binary -// logistic regression without pivoting. -// */ -// val localFeaturesStd = bcFeaturesStd.value -// val localCoefficients = coefficientsArray -// val localGradientArray = gradientSumArray -// -// // marginOfLabel is margins(label) in the formula -// var marginOfLabel = 0.0 -// var maxMargin = Double.NegativeInfinity -// -// val margins = new Array[Double](numClasses) -// features.foreachActive { (index, value) => -// val stdValue = value / localFeaturesStd(index) -// var j = 0 -// while (j < numClasses) { -// margins(j) += localCoefficients(index * numClasses + j) * stdValue -// j += 1 -// } -// } -// var i = 0 -// while (i < numClasses) { -// if (fitIntercept) { -// margins(i) += localCoefficients(numClasses * numFeatures + i) -// } -// if (i == label.toInt) marginOfLabel = margins(i) -// if (margins(i) > maxMargin) { -// maxMargin = margins(i) -// } -// i += 1 -// } -// -// /** -// * When maxMargin is greater than 0, the original formula could cause overflow. -// * We address this by subtracting maxMargin from all the margins, so it's guaranteed -// * that all of the new margins will be smaller than zero to prevent arithmetic overflow. -// */ -// val multipliers = new Array[Double](numClasses) -// val sum = { -// var temp = 0.0 -// var i = 0 -// while (i < numClasses) { -// if (maxMargin > 0) margins(i) -= maxMargin -// val exp = math.exp(margins(i)) -// temp += exp -// multipliers(i) = exp -// i += 1 -// } -// temp -// } -// -// margins.indices.foreach { i => -// multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0) -// } -// features.foreachActive { (index, value) => -// if (localFeaturesStd(index) != 0.0 && value != 0.0) { -// val stdValue = value / localFeaturesStd(index) -// var j = 0 -// while (j < numClasses) { -// localGradientArray(index * numClasses + j) += -// weight * multipliers(j) * stdValue -// j += 1 -// } -// } -// } -// if (fitIntercept) { -// var i = 0 -// while (i < numClasses) { -// localGradientArray(numFeatures * numClasses + i) += weight * multipliers(i) -// i += 1 -// } -// } -// -// val loss = if (maxMargin > 0) { -// math.log(sum) - marginOfLabel + maxMargin -// } else { -// math.log(sum) - marginOfLabel -// } -// lossSum += weight * loss -// } -// -// /** -// * Add a new training instance to this LogisticAggregator, and update the loss and gradient -// * of the objective function. -// * -// * @param instance The instance of data point to be added. -// * @return This LogisticAggregator object. -// */ -// def add(instance: Instance): this.type = { -// instance match { case Instance(label, weight, features) => -// -// if (weight == 0.0) return this -// -// if (multinomial) { -// multinomialUpdateInPlace(features, weight, label) -// } else { -// binaryUpdateInPlace(features, weight, label) -// } -// weightSum += weight -// this -// } -// } -// -// /** -// * Merge another LogisticAggregator, and update the loss and gradient -// * of the objective function. -// * (Note that it's in place merging; as a result, `this` object will be modified.) -// * -// * @param other The other LogisticAggregator to be merged. -// * @return This LogisticAggregator object. -// */ -// def merge(other: LogisticAggregator): this.type = { -// -// if (other.weightSum != 0.0) { -// weightSum += other.weightSum -// lossSum += other.lossSum -// -// var i = 0 -// val localThisGradientSumArray = this.gradientSumArray -// val localOtherGradientSumArray = other.gradientSumArray -// val len = localThisGradientSumArray.length -// while (i < len) { -// localThisGradientSumArray(i) += localOtherGradientSumArray(i) -// i += 1 -// } -// } -// this -// } -// -// def loss: Double = { -// require(weightSum > 0.0, s"The effective number of instances should be " + -// s"greater than 0.0, but $weightSum.") -// lossSum / weightSum -// } -// -// def gradient: Matrix = { -// require(weightSum > 0.0, s"The effective number of instances should be " + -// s"greater than 0.0, but $weightSum.") -// val result = Vectors.dense(gradientSumArray.clone()) -// scal(1.0 / weightSum, result) -// new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, result.toArray) -// } -//} -// -///** -// * LogisticCostFun implements Breeze's DiffFunction[T] for a multinomial (softmax) logistic loss -// * function, as used in multi-class classification (it is also used in binary logistic regression). -// * It returns the loss and gradient with L2 regularization at a particular point (coefficients). -// * It's used in Breeze's convex optimization routines. -// */ -//private class LogisticCostFun( -// instances: RDD[Instance], -// numClasses: Int, -// fitIntercept: Boolean, -// standardization: Boolean, -// bcFeaturesStd: Broadcast[Array[Double]], -// regParamL2: Double, -// multinomial: Boolean, -// aggregationDepth: Int) extends DiffFunction[BDV[Double]] { -// -// override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { -// val coeffs = Vectors.fromBreeze(coefficients) -// val bcCoeffs = instances.context.broadcast(coeffs) -// val featuresStd = bcFeaturesStd.value -// val numFeatures = featuresStd.length -// val numCoefficientSets = if (multinomial) numClasses else 1 -// val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures -// -// val logisticAggregator = { -// val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance) -// val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2) -// -// instances.treeAggregate( -// new LogisticAggregator(bcCoeffs, bcFeaturesStd, numClasses, fitIntercept, -// multinomial) -// )(seqOp, combOp, aggregationDepth) -// } -// -// val totalGradientMatrix = logisticAggregator.gradient -// val coefMatrix = new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, coeffs.toArray) -// // regVal is the sum of coefficients squares excluding intercept for L2 regularization. -// val regVal = if (regParamL2 == 0.0) { -// 0.0 -// } else { -// var sum = 0.0 -// coefMatrix.foreachActive { case (classIndex, featureIndex, value) => -// // We do not apply regularization to the intercepts -// val isIntercept = fitIntercept && (featureIndex == numFeatures) -// if (!isIntercept) { -// // The following code will compute the loss of the regularization; also -// // the gradient of the regularization, and add back to totalGradientArray. -// sum += { -// if (standardization) { -// val gradValue = totalGradientMatrix(classIndex, featureIndex) -// totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * value) -// value * value -// } else { -// if (featuresStd(featureIndex) != 0.0) { -// // If `standardization` is false, we still standardize the data -// // to improve the rate of convergence; as a result, we have to -// // perform this reverse standardization by penalizing each component -// // differently to get effectively the same objective function when -// // the training dataset is not standardized. -// val temp = value / (featuresStd(featureIndex) * featuresStd(featureIndex)) -// val gradValue = totalGradientMatrix(classIndex, featureIndex) -// totalGradientMatrix.update(classIndex, featureIndex, gradValue + regParamL2 * temp) -// value * temp -// } else { -// 0.0 -// } -// } -// } -// } -// } -// 0.5 * regParamL2 * sum -// } -// bcCoeffs.destroy(blocking = false) -// -// (logisticAggregator.loss + regVal, new BDV(totalGradientMatrix.toArray)) -// } -//} diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index dfb33ce8c4f37..66a52942e668c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -193,7 +193,6 @@ private[ml] class LogisticAggregator( private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures private val coefficientSize = bcCoefficients.value.size protected override val dim: Int = coefficientSize - private val numCoefficientSets = if (multinomial) numClasses else 1 if (multinomial) { require(numClasses == coefficientSize / numFeaturesPlusIntercept, s"The number of " + s"coefficients should be ${numClasses * numFeaturesPlusIntercept} but was $coefficientSize") @@ -218,10 +217,7 @@ private[ml] class LogisticAggregator( } /** Update gradient and loss using binary loss function. */ - private def binaryUpdateInPlace( - features: Vector, - weight: Double, - label: Double): Unit = { + private def binaryUpdateInPlace(features: Vector, weight: Double, label: Double): Unit = { val localFeaturesStd = bcFeaturesStd.value val localCoefficients = coefficientsArray @@ -258,10 +254,7 @@ private[ml] class LogisticAggregator( } /** Update gradient and loss using multinomial (softmax) loss function. */ - private def multinomialUpdateInPlace( - features: Vector, - weight: Double, - label: Double): Unit = { + private def multinomialUpdateInPlace(features: Vector, weight: Double, label: Double): Unit = { // TODO: use level 2 BLAS operations /* Note: this can still be used when numClasses = 2 for binary @@ -323,8 +316,7 @@ private[ml] class LogisticAggregator( val stdValue = value / localFeaturesStd(index) var j = 0 while (j < numClasses) { - localGradientArray(index * numClasses + j) += - weight * multipliers(j) * stdValue + localGradientArray(index * numClasses + j) += weight * multipliers(j) * stdValue j += 1 } } @@ -354,6 +346,9 @@ private[ml] class LogisticAggregator( */ def add(instance: Instance): this.type = { instance match { case Instance(label, weight, features) => + require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." + + s" Expecting $numFeatures but got ${features.size}.") + require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this @@ -366,12 +361,4 @@ private[ml] class LogisticAggregator( this } } - -// def gradient: Matrix = { -// require(weightSum > 0.0, s"The effective number of instances should be " + -// s"greater than 0.0, but $weightSum.") -// val result = Vectors.dense(gradientSumArray.clone()) -// BLAS.scal(1.0 / weightSum, result) -// new DenseMatrix(numCoefficientSets, numFeaturesPlusIntercept, result.toArray) -// } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala index a8e75955677dc..6050dde66cf40 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala @@ -30,8 +30,6 @@ private[ml] trait DifferentiableRegularization[T] extends DiffFunction[T] { /** Magnitude of the regularization penalty. */ def regParam: Double - def getReg: Int => Double = (x: Int) => regParam - } /** @@ -47,55 +45,33 @@ private[ml] trait DifferentiableRegularization[T] extends DiffFunction[T] { private[ml] class L2Regularization( val regParam: Double, shouldApply: Int => Boolean, - featuresStd: Option[Int => Double]) extends DifferentiableRegularization[Array[Double]] { + featuresStd: Option[Int => Double]) extends DifferentiableRegularization[Vector] { - override def calculate(coefficients: Array[Double]): (Double, Array[Double]) = { - var sum = 0.0 - val gradient = new Array[Double](coefficients.length) - coefficients.indices.filter(shouldApply).foreach { j => - val coef = coefficients(j) - featuresStd match { - case Some(getStd) => - val std = getStd(j) - if (std != 0.0) { - val temp = coef / (std * std) - sum += coef * temp - gradient(j) = regParam * temp - } else { - 0.0 + override def calculate(coefficients: Vector): (Double, Vector) = { + coefficients match { + case dv: DenseVector => + var sum = 0.0 + val gradient = new Array[Double](dv.size) + dv.values.indices.filter(shouldApply).foreach { j => + val coef = coefficients(j) + featuresStd match { + case Some(getStd) => + val std = getStd(j) + if (std != 0.0) { + val temp = coef / (std * std) + sum += coef * temp + gradient(j) = regParam * temp + } else { + 0.0 + } + case None => + sum += coef * coef + gradient(j) = coef * regParam } - case None => - sum += coef * coef - gradient(j) = coef * regParam - } + } + (0.5 * sum * regParam, Vectors.dense(gradient)) + case _: SparseVector => + throw new IllegalArgumentException("SparseVector is not currently supported.") } - (0.5 * sum * regParam, gradient) } } - -//class StandardizedRegularization(val getStd: Int => Double) -// extends DifferentiableRegularization[Vector] { -// val regParam = 0.0 -// -// override def calculate(coefficients: Vector): (Double, Vector) = { -// -// -// } -//} -// -//class Regularization(override val getReg: Int => Double) -// extends DifferentiableRegularization[Vector] { -// val regParam = 0.0 -// -// override def calculate(coefficients: Vector): (Double, Vector) = { -// var sum = 0.0 -// // TODO: handle sparse? -// val gradient = new Array[Double](coefficients.size) -// coefficients.foreachActive { (i, v) => -// val reg = getReg(i) -// sum += 0.5 * v * v * reg -// gradient(i) = v * reg -// } -// (sum, Vectors.dense(gradient)) -// } -//} diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/RDDLossFunction.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/RDDLossFunction.scala index 3b1618eb0b6fe..173041688128f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/RDDLossFunction.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/RDDLossFunction.scala @@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD /** * This class computes the gradient and loss of a differentiable loss function by mapping a - * [[DifferentiableLossAggregator]] over an [[RDD]] of [[Instance]]s. The loss function is the + * [[DifferentiableLossAggregator]] over an [[RDD]]. The loss function is the * sum of the loss computed on a single instance across all points in the RDD. Therefore, the actual * analytical form of the loss function is specified by the aggregator, which computes each points * contribution to the overall loss. @@ -37,7 +37,7 @@ import org.apache.spark.rdd.RDD * A differentiable regularization component can also be added by providing a * [[DifferentiableRegularization]] loss function. * - * @param instances + * @param instances RDD containing the data to compute the loss function over. * @param getAggregator A function which gets a new loss aggregator in every tree aggregate step. * @param regularization An option representing the regularization loss function to apply to the * coefficients. @@ -50,7 +50,7 @@ private[ml] class RDDLossFunction[ Agg <: DifferentiableLossAggregator[T, Agg]: ClassTag]( instances: RDD[T], getAggregator: (Broadcast[Vector] => Agg), - regularization: Option[DifferentiableRegularization[Array[Double]]], + regularization: Option[DifferentiableRegularization[Vector]], aggregationDepth: Int = 2) extends DiffFunction[BDV[Double]] { @@ -62,8 +62,8 @@ private[ml] class RDDLossFunction[ val newAgg = instances.treeAggregate(thisAgg)(seqOp, combOp, aggregationDepth) val gradient = newAgg.gradient val regLoss = regularization.map { regFun => - val (regLoss, regGradient) = regFun.calculate(coefficients.data) - BLAS.axpy(1.0, Vectors.dense(regGradient), gradient) + val (regLoss, regGradient) = regFun.calculate(Vectors.fromBreeze(coefficients)) + BLAS.axpy(1.0, regGradient, gradient) regLoss }.getOrElse(0.0) bcCoefficients.destroy(blocking = false) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala index 7a4faeb1c10bf..f56f8c27205de 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala @@ -17,9 +17,12 @@ package org.apache.spark.ml.optim.aggregator import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.classification.MultiClassSummarizer import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.mllib.linalg.VectorImplicits._ class DifferentiableLossAggregatorSuite extends SparkFunSuite { @@ -157,4 +160,39 @@ object DifferentiableLossAggregatorSuite { this } } + + /** Get feature and label summarizers for provided data. */ + def getRegressionSummarizers( + instances: Array[Instance]): (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer) = { + val seqOp = (c: (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer), + instance: Instance) => + (c._1.add(instance.features, instance.weight), + c._2.add(Vectors.dense(instance.label), instance.weight)) + + val combOp = (c1: (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer), + c2: (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer)) => + (c1._1.merge(c2._1), c1._2.merge(c2._2)) + + instances.aggregate( + new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer + )(seqOp, combOp) + } + + /** Get feature and label summarizers for provided data. */ + def getClassificationSummarizers( + instances: Array[Instance]): + (MultivariateOnlineSummarizer, MultiClassSummarizer) = { + val seqOp = (c: (MultivariateOnlineSummarizer, MultiClassSummarizer), + instance: Instance) => + (c._1.add(instance.features, instance.weight), + c._2.add(instance.label, instance.weight)) + + val combOp = (c1: (MultivariateOnlineSummarizer, MultiClassSummarizer), + c2: (MultivariateOnlineSummarizer, MultiClassSummarizer)) => + (c1._1.merge(c2._1), c1._2.merge(c2._2)) + + instances.aggregate( + new MultivariateOnlineSummarizer, new MultiClassSummarizer + )(seqOp, combOp) + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala index d1cb0d380e7a5..35b6944624707 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala @@ -20,12 +20,12 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ -import org.apache.spark.mllib.linalg.VectorImplicits._ -import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer import org.apache.spark.mllib.util.MLlibTestSparkContext class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { + import DifferentiableLossAggregatorSuite.getRegressionSummarizers + @transient var instances: Array[Instance] = _ @transient var instancesConstantFeature: Array[Instance] = _ @transient var instancesConstantLabel: Array[Instance] = _ @@ -49,29 +49,12 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte ) } - /** Get feature and label summarizers for provided data. */ - def getSummarizers( - instances: Array[Instance]): (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer) = { - val seqOp = (c: (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer), - instance: Instance) => - (c._1.add(instance.features, instance.weight), - c._2.add(Vectors.dense(instance.label), instance.weight)) - - val combOp = (c1: (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer), - c2: (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer)) => - (c1._1.merge(c2._1), c1._2.merge(c2._2)) - - instances.aggregate( - new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer - )(seqOp, combOp) - } - /** Get summary statistics for some data and create a new LeastSquaresAggregator. */ - def getNewAggregator( + private def getNewAggregator( instances: Array[Instance], coefficients: Vector, fitIntercept: Boolean): LeastSquaresAggregator = { - val (featuresSummarizer, ySummarizer) = getSummarizers(instances) + val (featuresSummarizer, ySummarizer) = getRegressionSummarizers(instances) val yStd = math.sqrt(ySummarizer.variance(0)) val yMean = ySummarizer.mean(0) val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) @@ -83,6 +66,26 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte bcFeaturesMean)(bcCoefficients) } + test("aggregator add method input size") { + val coefficients = Vectors.dense(1.0, 2.0) + val agg = getNewAggregator(instances, coefficients, fitIntercept = true) + withClue("LeastSquaresAggregator features dimension must match coefficients dimension") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, 1.0, Vectors.dense(2.0))) + } + } + } + + test("negative weight") { + val coefficients = Vectors.dense(1.0, 2.0) + val agg = getNewAggregator(instances, coefficients, fitIntercept = true) + withClue("LeastSquaresAggregator does not support negative instance weights.") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0))) + } + } + } + test("check sizes") { val coefficients = Vectors.dense(1.0, 2.0) val aggIntercept = getNewAggregator(instances, coefficients, fitIntercept = true) @@ -102,7 +105,7 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte */ val coefficients = Vectors.dense(1.0, 2.0) val numFeatures = coefficients.size - val (featuresSummarizer, ySummarizer) = getSummarizers(instances) + val (featuresSummarizer, ySummarizer) = getRegressionSummarizers(instances) val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) val featuresMean = featuresSummarizer.mean.toArray val yStd = math.sqrt(ySummarizer.variance(0)) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala new file mode 100644 index 0000000000000..6ad690d506766 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.optim.aggregator + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{BLAS, Matrices, Vector, Vectors} +import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { + + import DifferentiableLossAggregatorSuite.getClassificationSummarizers + + @transient var instances: Array[Instance] = _ + @transient var instancesConstantFeature: Array[Instance] = _ + + override def beforeAll(): Unit = { + super.beforeAll() + instances = Array( + Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), + Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), + Instance(2.0, 0.3, Vectors.dense(4.0, 0.5)) + ) + instancesConstantFeature = Array( + Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), + Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), + Instance(2.0, 0.3, Vectors.dense(1.0, 0.5)) + ) + } + + + /** Get summary statistics for some data and create a new LogisticAggregator. */ + private def getNewAggregator( + instances: Array[Instance], + coefficients: Vector, + fitIntercept: Boolean, + isMultinomial: Boolean): LogisticAggregator = { + val (featuresSummarizer, ySummarizer) = + DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) + val numClasses = ySummarizer.histogram.length + val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) + val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) + val bcCoefficients = spark.sparkContext.broadcast(coefficients) + new LogisticAggregator(bcFeaturesStd, numClasses, fitIntercept, isMultinomial)(bcCoefficients) + } + + test("aggregator add method input size") { + val coefArray = Array(1.0, 2.0, -2.0, 3.0, 0.0, -1.0) + val interceptArray = Array(4.0, 2.0, -3.0) + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + fitIntercept = true, isMultinomial = true) + withClue("LogisticAggregator features dimension must match coefficients dimension") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, 1.0, Vectors.dense(2.0))) + } + } + } + + test("negative weight") { + val coefArray = Array(1.0, 2.0, -2.0, 3.0, 0.0, -1.0) + val interceptArray = Array(4.0, 2.0, -3.0) + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + fitIntercept = true, isMultinomial = true) + withClue("LogisticAggregator does not support negative instance weights") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0))) + } + } + } + + test("check sizes multinomial") { + val rng = new scala.util.Random + val numFeatures = instances.head.features.size + val numClasses = instances.map(_.label).toSet.size + val coefWithIntercept = Vectors.dense( + Array.fill(numClasses * (numFeatures + 1))(rng.nextDouble)) + val coefWithoutIntercept = Vectors.dense( + Array.fill(numClasses * numFeatures)(rng.nextDouble)) + val aggIntercept = getNewAggregator(instances, coefWithIntercept, fitIntercept = true, + isMultinomial = true) + val aggNoIntercept = getNewAggregator(instances, coefWithoutIntercept, fitIntercept = false, + isMultinomial = true) + instances.foreach(aggIntercept.add) + instances.foreach(aggNoIntercept.add) + + // least squares agg does not include intercept in its gradient array + assert(aggIntercept.gradient.size === (numFeatures + 1) * numClasses) + assert(aggNoIntercept.gradient.size === numFeatures * numClasses) + } + + test("check sizes binomial") { + val rng = new scala.util.Random + val binaryInstances = instances.filter(_.label < 2.0) + val numFeatures = binaryInstances.head.features.size + val coefWithIntercept = Vectors.dense(Array.fill(numFeatures + 1)(rng.nextDouble)) + val coefWithoutIntercept = Vectors.dense(Array.fill(numFeatures)(rng.nextDouble)) + val aggIntercept = getNewAggregator(binaryInstances, coefWithIntercept, fitIntercept = true, + isMultinomial = false) + val aggNoIntercept = getNewAggregator(binaryInstances, coefWithoutIntercept, + fitIntercept = false, isMultinomial = false) + binaryInstances.foreach(aggIntercept.add) + binaryInstances.foreach(aggNoIntercept.add) + + // least squares agg does not include intercept in its gradient array + assert(aggIntercept.gradient.size === numFeatures + 1) + assert(aggNoIntercept.gradient.size === numFeatures) + } + + test("check correctness multinomial") { + /* + Check that the aggregator computes loss/gradient for: + -sum_i w_i * (beta_y dot x_i - log(sum_k e^(beta_k dot x_i))) + */ + val coefArray = Array(1.0, 2.0, -2.0, 3.0, 0.0, -1.0) + val interceptArray = Array(4.0, 2.0, -3.0) + val numFeatures = instances.head.features.size + val numClasses = instances.map(_.label).toSet.size + val intercepts = Vectors.dense(interceptArray) + val (featuresSummarizer, ySummarizer) = getClassificationSummarizers(instances) + val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) + val weightSum = instances.map(_.weight).sum + + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + fitIntercept = true, isMultinomial = true) + instances.foreach(agg.add) + + // compute the loss + val stdCoef = coefArray.indices.map(i => coefArray(i) / featuresStd(i / numClasses)).toArray + val linearPredictors = instances.map { case Instance(l, w, f) => + val result = intercepts.copy.toDense + BLAS.gemv(1.0, Matrices.dense(numClasses, numFeatures, stdCoef), f, 1.0, result) + (l, w, result) + } + + // sum_i w * beta_k dot x_i + val sumLinear = linearPredictors.map { case (l, w, p) => + w * p(l.toInt) + }.sum + + // sum_i w * log(sum_k e^(beta_K dot x_i)) + val sumLogs = linearPredictors.map { case (l, w, p) => + w * math.log(p.values.map(math.exp).sum) + }.sum + val loss = (sumLogs - sumLinear) / weightSum + + + // compute the gradients + val gradientCoef = new Array[Double](numFeatures * numClasses) + val gradientIntercept = new Array[Double](numClasses) + instances.foreach { case Instance(l, w, f) => + val margin = intercepts.copy.toDense + BLAS.gemv(1.0, Matrices.dense(numClasses, numFeatures, stdCoef), f, 1.0, margin) + val sum = margin.values.map(math.exp).sum + + gradientCoef.indices.foreach { i => + val fStd = f(i / numClasses) / featuresStd(i / numClasses) + val cidx = i % numClasses + if (cidx == l.toInt) gradientCoef(i) -= w * fStd + gradientCoef(i) += w * math.exp(margin(cidx)) / sum * fStd + } + + gradientIntercept.indices.foreach { i => + val cidx = i % numClasses + if (cidx == l.toInt) gradientIntercept(i) -= w + gradientIntercept(i) += w * math.exp(margin(cidx)) / sum + } + } + val gradient = Vectors.dense((gradientCoef ++ gradientIntercept).map(_ / weightSum)) + + assert(loss ~== agg.loss relTol 0.01) + assert(gradient ~== agg.gradient relTol 0.01) + } + + test("check correctness binomial") { + /* + Check that the aggregator computes loss/gradient for: + -sum_i y_i * log(1 / (1 + e^(-beta dot x_i)) + (1 - y_i) * log(1 - 1 / (1 + e^(-beta dot x_i)) + */ + val binaryInstances = instances.map { instance => + if (instance.label <= 1.0) instance else Instance(0.0, instance.weight, instance.features) + } + val coefArray = Array(1.0, 2.0) + val intercept = 1.0 + val numFeatures = binaryInstances.head.features.size + val (featuresSummarizer, _) = getClassificationSummarizers(binaryInstances) + val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) + val weightSum = binaryInstances.map(_.weight).sum + + val agg = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true, isMultinomial = false) + binaryInstances.foreach(agg.add) + + // compute the loss + val stdCoef = coefArray.indices.map(i => coefArray(i) / featuresStd(i)).toArray + val lossSum = binaryInstances.map { case Instance(l, w, f) => + val margin = BLAS.dot(Vectors.dense(stdCoef), f) + intercept + val prob = 1.0 / (1.0 + math.exp(-margin)) + -w * l * math.log(prob) - w * (1.0 - l) * math.log(1.0 - prob) + }.sum + val loss = lossSum / weightSum + + + + // compute the gradients + val gradientCoef = new Array[Double](numFeatures) + var gradientIntercept = 0.0 + binaryInstances.foreach { case Instance(l, w, f) => + val margin = BLAS.dot(f, Vectors.dense(coefArray)) + intercept + gradientCoef.indices.foreach { i => + gradientCoef(i) += w * (1.0 / (1.0 + math.exp(-margin)) - l) * f(i) / featuresStd(i) + } + gradientIntercept += w * (1.0 / (1.0 + math.exp(-margin)) - l) + } + val gradient = Vectors.dense((gradientCoef ++ Array(gradientIntercept)).map(_ / weightSum)) + + assert(loss ~== agg.loss relTol 0.01) + assert(gradient ~== agg.gradient relTol 0.01) + } + + test("check with zero standard deviation") { + val binaryInstances = instancesConstantFeature.map { instance => + if (instance.label <= 1.0) instance else Instance(0.0, instance.weight, instance.features) + } + val coefArray = Array(1.0, 2.0, -2.0, 3.0, 0.0, -1.0) + val interceptArray = Array(4.0, 2.0, -3.0) + val aggConstantFeature = getNewAggregator(instancesConstantFeature, + Vectors.dense(coefArray ++ interceptArray), fitIntercept = true, isMultinomial = true) + instances.foreach(aggConstantFeature.add) + // constant features should not affect gradient + assert(aggConstantFeature.gradient(0) === 0.0) + + val binaryCoefArray = Array(1.0, 2.0) + val intercept = 1.0 + val aggConstantFeatureBinary = getNewAggregator(binaryInstances, + Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true, + isMultinomial = false) + instances.foreach(aggConstantFeatureBinary.add) + // constant features should not affect gradient + assert(aggConstantFeatureBinary.gradient(0) === 0.0) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularizationSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularizationSuite.scala index 0794417a8d4bb..71ba3118c04e5 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularizationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularizationSuite.scala @@ -17,20 +17,21 @@ package org.apache.spark.ml.optim.loss import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.linalg.{BLAS, Vectors} class DifferentiableRegularizationSuite extends SparkFunSuite { test("L2 regularization") { val shouldApply = (_: Int) => true val regParam = 0.3 - val coefficients = Array(1.0, 3.0, -2.0) + val coefficients = Vectors.dense(Array(1.0, 3.0, -2.0)) val numFeatures = coefficients.size // check without features standard val regFun = new L2Regularization(regParam, shouldApply, None) val (loss, grad) = regFun.calculate(coefficients) - assert(loss === 0.5 * regParam * coefficients.map(x => x * x).sum) - assert(grad === coefficients.map(_ * regParam)) + assert(loss === 0.5 * regParam * BLAS.dot(coefficients, coefficients)) + assert(grad === coefficients.toArray.map(_ * regParam)) // check with features standard val featuresStd = Array(0.1, 1.1, 0.5) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/loss/RDDLossFunctionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/loss/RDDLossFunctionSuite.scala index cd5cebee5f7b8..f70da5750f2d5 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/loss/RDDLossFunctionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/loss/RDDLossFunctionSuite.scala @@ -46,11 +46,11 @@ class RDDLossFunctionSuite extends SparkFunSuite with MLlibTestSparkContext { val lossWithReg = new RDDLossFunction(instances, getAgg, Some(regLossFun)) val (loss1, grad1) = lossNoReg.calculate(coefficients.asBreeze.toDenseVector) - val (regLoss, regGrad) = regLossFun.calculate(coefficients.toArray) + val (regLoss, regGrad) = regLossFun.calculate(coefficients) val (loss2, grad2) = lossWithReg.calculate(coefficients.asBreeze.toDenseVector) - BLAS.axpy(1.0, Vectors.fromBreeze(grad1), Vectors.dense(regGrad)) - assert(Vectors.dense(regGrad) ~== Vectors.fromBreeze(grad2) relTol 1e-5) + BLAS.axpy(1.0, Vectors.fromBreeze(grad1), regGrad) + assert(regGrad ~== Vectors.fromBreeze(grad2) relTol 1e-5) assert(loss1 + regLoss === loss2) } From 6edd12893a255c1f44416a16e42c6ef79edc8f36 Mon Sep 17 00:00:00 2001 From: sethah Date: Wed, 14 Jun 2017 11:26:49 -0700 Subject: [PATCH 3/7] style checker --- .../spark/ml/classification/LogisticRegression.scala | 4 ++-- .../ml/optim/loss/DifferentiableRegularization.scala | 7 ++++--- .../apache/spark/ml/regression/LinearRegression.scala | 2 +- .../ml/classification/LogisticRegressionSuite.scala | 1 + .../aggregator/DifferentiableLossAggregatorSuite.scala | 9 ++++----- .../ml/optim/aggregator/LogisticAggregatorSuite.scala | 2 -- 6 files changed, 12 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 9d618642ab79b..5ebbb3050f917 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -20,16 +20,16 @@ package org.apache.spark.ml.classification import java.util.Locale import scala.collection.mutable + import breeze.linalg.{DenseVector => BDV} import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS, LBFGSB => BreezeLBFGSB, OWLQN => BreezeOWLQN} import org.apache.hadoop.fs.Path + import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} -import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ -import org.apache.spark.ml.linalg.BLAS._ import org.apache.spark.ml.optim.aggregator.LogisticAggregator import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} import org.apache.spark.ml.param._ diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala index 6050dde66cf40..7cb54286859c6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala @@ -17,6 +17,7 @@ package org.apache.spark.ml.optim.loss import breeze.optimize.DiffFunction + import org.apache.spark.ml.linalg._ /** @@ -39,11 +40,11 @@ private[ml] trait DifferentiableRegularization[T] extends DiffFunction[T] { * @param regParam The magnitude of the regularization. * @param shouldApply A function (Int => Boolean) indicating whether a given index should have * regularization applied to it. - * @param featuresStd Option indicating whether the regularization should be scaled by the standard - * deviation of the features. + * @param featuresStd Option for a function which maps coefficient index (column major) to the + * feature standard deviation. If `None`, no standardization is applied. */ private[ml] class L2Regularization( - val regParam: Double, + override val regParam: Double, shouldApply: Int => Boolean, featuresStd: Option[Int => Double]) extends DifferentiableRegularization[Vector] { diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 3c790fa83afd7..48be3122d621d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -322,7 +322,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val getAggregatorFunc = new LeastSquaresAggregator(yStd, yMean, $(fitIntercept), bcFeaturesStd, bcFeaturesMean)(_) - val getFeaturesStd = (j: Int) => featuresStd(j) + val getFeaturesStd = (j: Int) => if (j >=0 && j < numFeatures) featuresStd(j) else 0.0 val regularization = if (effectiveL2RegParam != 0.0) { val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures Some(new L2Regularization(effectiveL2RegParam, shouldApply, diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index bfa32f9d63b12..0570499e74516 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import scala.language.existentials import scala.util.Random import scala.util.control.Breaks._ + import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.ml.attribute.NominalAttribute import org.apache.spark.ml.classification.LogisticRegressionSuite._ diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala index f56f8c27205de..d7cdeae30be20 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala @@ -21,8 +21,8 @@ import org.apache.spark.ml.classification.MultiClassSummarizer import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ -import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer class DifferentiableLossAggregatorSuite extends SparkFunSuite { @@ -162,7 +162,7 @@ object DifferentiableLossAggregatorSuite { } /** Get feature and label summarizers for provided data. */ - def getRegressionSummarizers( + private[ml] def getRegressionSummarizers( instances: Array[Instance]): (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer) = { val seqOp = (c: (MultivariateOnlineSummarizer, MultivariateOnlineSummarizer), instance: Instance) => @@ -179,9 +179,8 @@ object DifferentiableLossAggregatorSuite { } /** Get feature and label summarizers for provided data. */ - def getClassificationSummarizers( - instances: Array[Instance]): - (MultivariateOnlineSummarizer, MultiClassSummarizer) = { + private[ml] def getClassificationSummarizers( + instances: Array[Instance]): (MultivariateOnlineSummarizer, MultiClassSummarizer) = { val seqOp = (c: (MultivariateOnlineSummarizer, MultiClassSummarizer), instance: Instance) => (c._1.add(instance.features, instance.weight), diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala index 6ad690d506766..4370823597c38 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala @@ -98,7 +98,6 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { instances.foreach(aggIntercept.add) instances.foreach(aggNoIntercept.add) - // least squares agg does not include intercept in its gradient array assert(aggIntercept.gradient.size === (numFeatures + 1) * numClasses) assert(aggNoIntercept.gradient.size === numFeatures * numClasses) } @@ -116,7 +115,6 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { binaryInstances.foreach(aggIntercept.add) binaryInstances.foreach(aggNoIntercept.add) - // least squares agg does not include intercept in its gradient array assert(aggIntercept.gradient.size === numFeatures + 1) assert(aggNoIntercept.gradient.size === numFeatures) } From fcf537208f25dac0b4fffa35f875944d82fdb180 Mon Sep 17 00:00:00 2001 From: sethah Date: Wed, 14 Jun 2017 13:05:53 -0700 Subject: [PATCH 4/7] fix broken test --- .../ml/optim/loss/DifferentiableRegularizationSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularizationSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularizationSuite.scala index 71ba3118c04e5..4377a6bd75dba 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularizationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularizationSuite.scala @@ -31,7 +31,7 @@ class DifferentiableRegularizationSuite extends SparkFunSuite { val regFun = new L2Regularization(regParam, shouldApply, None) val (loss, grad) = regFun.calculate(coefficients) assert(loss === 0.5 * regParam * BLAS.dot(coefficients, coefficients)) - assert(grad === coefficients.toArray.map(_ * regParam)) + assert(grad === Vectors.dense(coefficients.toArray.map(_ * regParam))) // check with features standard val featuresStd = Array(0.1, 1.1, 0.5) @@ -40,9 +40,9 @@ class DifferentiableRegularizationSuite extends SparkFunSuite { val expectedLossStd = 0.5 * regParam * (0 until numFeatures).map { j => coefficients(j) * coefficients(j) / (featuresStd(j) * featuresStd(j)) }.sum - val expectedGradientStd = (0 until numFeatures).map { j => + val expectedGradientStd = Vectors.dense((0 until numFeatures).map { j => regParam * coefficients(j) / (featuresStd(j) * featuresStd(j)) - }.toArray + }.toArray) assert(lossStd === expectedLossStd) assert(gradStd === expectedGradientStd) @@ -51,7 +51,7 @@ class DifferentiableRegularizationSuite extends SparkFunSuite { val regFunApply = new L2Regularization(regParam, shouldApply2, None) val (lossApply, gradApply) = regFunApply.calculate(coefficients) assert(lossApply === 0.5 * regParam * coefficients(1) * coefficients(1)) - assert(gradApply === Array(0.0, coefficients(1) * regParam, 0.0)) + assert(gradApply === Vectors.dense(0.0, coefficients(1) * regParam, 0.0)) // check with zero features standard val featuresStdZero = Array(0.1, 0.0, 0.5) From a19b385913f450254b6849098f68cab4c2dac618 Mon Sep 17 00:00:00 2001 From: sethah Date: Wed, 28 Jun 2017 11:18:15 -0700 Subject: [PATCH 5/7] code review changes --- .../spark/ml/optim/loss/DifferentiableRegularization.scala | 4 ++-- .../org/apache/spark/ml/regression/LinearRegression.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala index 7cb54286859c6..132dce76ca9f9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala @@ -34,7 +34,7 @@ private[ml] trait DifferentiableRegularization[T] extends DiffFunction[T] { } /** - * A Breeze diff function for computing the L2 regularized loss and gradient of an array of + * A Breeze diff function for computing the L2 regularized loss and gradient of a vector of * coefficients. * * @param regParam The magnitude of the regularization. @@ -72,7 +72,7 @@ private[ml] class L2Regularization( } (0.5 * sum * regParam, Vectors.dense(gradient)) case _: SparseVector => - throw new IllegalArgumentException("SparseVector is not currently supported.") + throw new IllegalArgumentException("Sparse coefficients are not currently supported.") } } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 48be3122d621d..4c14452d509a3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -322,7 +322,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String val getAggregatorFunc = new LeastSquaresAggregator(yStd, yMean, $(fitIntercept), bcFeaturesStd, bcFeaturesMean)(_) - val getFeaturesStd = (j: Int) => if (j >=0 && j < numFeatures) featuresStd(j) else 0.0 + val getFeaturesStd = (j: Int) => if (j >= 0 && j < numFeatures) featuresStd(j) else 0.0 val regularization = if (effectiveL2RegParam != 0.0) { val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures Some(new L2Regularization(effectiveL2RegParam, shouldApply, From a51e56512b608ed17b356b4672ef11113a031126 Mon Sep 17 00:00:00 2001 From: sethah Date: Wed, 5 Jul 2017 14:13:14 -0700 Subject: [PATCH 6/7] destroy broadcasts; applyFeaturesStd --- .../loss/DifferentiableRegularization.scala | 8 +-- .../LeastSquaresAggregatorSuite.scala | 37 ++++++++---- .../aggregator/LogisticAggregatorSuite.scala | 57 +++++++++++++------ 3 files changed, 70 insertions(+), 32 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala index 132dce76ca9f9..7ac7c225e5acb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala @@ -40,13 +40,13 @@ private[ml] trait DifferentiableRegularization[T] extends DiffFunction[T] { * @param regParam The magnitude of the regularization. * @param shouldApply A function (Int => Boolean) indicating whether a given index should have * regularization applied to it. - * @param featuresStd Option for a function which maps coefficient index (column major) to the - * feature standard deviation. If `None`, no standardization is applied. + * @param applyFeaturesStd Option for a function which maps coefficient index (column major) to the + * feature standard deviation. If `None`, no standardization is applied. */ private[ml] class L2Regularization( override val regParam: Double, shouldApply: Int => Boolean, - featuresStd: Option[Int => Double]) extends DifferentiableRegularization[Vector] { + applyFeaturesStd: Option[Int => Double]) extends DifferentiableRegularization[Vector] { override def calculate(coefficients: Vector): (Double, Vector) = { coefficients match { @@ -55,7 +55,7 @@ private[ml] class L2Regularization( val gradient = new Array[Double](dv.size) dv.values.indices.filter(shouldApply).foreach { j => val coef = coefficients(j) - featuresStd match { + applyFeaturesStd match { case Some(getStd) => val std = getStd(j) if (std != 0.0) { diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala index 35b6944624707..f8bdb50f4199e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala @@ -16,7 +16,10 @@ */ package org.apache.spark.ml.optim.aggregator +import scala.util.{Failure, Success, Try} + import org.apache.spark.SparkFunSuite +import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ @@ -53,7 +56,7 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte private def getNewAggregator( instances: Array[Instance], coefficients: Vector, - fitIntercept: Boolean): LeastSquaresAggregator = { + fitIntercept: Boolean): (Seq[Broadcast[_]], LeastSquaresAggregator) = { val (featuresSummarizer, ySummarizer) = getRegressionSummarizers(instances) val yStd = math.sqrt(ySummarizer.variance(0)) val yMean = ySummarizer.mean(0) @@ -62,40 +65,53 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte val featuresMean = featuresSummarizer.mean val bcFeaturesMean = spark.sparkContext.broadcast(featuresMean.toArray) val bcCoefficients = spark.sparkContext.broadcast(coefficients) - new LeastSquaresAggregator(yStd, yMean, fitIntercept, bcFeaturesStd, - bcFeaturesMean)(bcCoefficients) + val broadcasts = List(bcCoefficients, bcFeaturesMean, bcFeaturesStd) + val agg = Try(new LeastSquaresAggregator(yStd, yMean, fitIntercept, bcFeaturesStd, + bcFeaturesMean)(bcCoefficients)) match { + case Success(a) => a + case Failure(exception) => + broadcasts.foreach(_.destroy(blocking = false)) + throw exception + } + (broadcasts, agg) } test("aggregator add method input size") { val coefficients = Vectors.dense(1.0, 2.0) - val agg = getNewAggregator(instances, coefficients, fitIntercept = true) + val (broadcasts, agg) = getNewAggregator(instances, coefficients, fitIntercept = true) withClue("LeastSquaresAggregator features dimension must match coefficients dimension") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, 1.0, Vectors.dense(2.0))) } } + broadcasts.foreach(_.destroy(blocking = false)) } test("negative weight") { val coefficients = Vectors.dense(1.0, 2.0) - val agg = getNewAggregator(instances, coefficients, fitIntercept = true) + val (broadcasts, agg) = getNewAggregator(instances, coefficients, fitIntercept = true) withClue("LeastSquaresAggregator does not support negative instance weights.") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0))) } } + broadcasts.foreach(_.destroy(blocking = false)) } test("check sizes") { val coefficients = Vectors.dense(1.0, 2.0) - val aggIntercept = getNewAggregator(instances, coefficients, fitIntercept = true) - val aggNoIntercept = getNewAggregator(instances, coefficients, fitIntercept = false) + val (broadcastsIntercept, aggIntercept) = getNewAggregator(instances, coefficients, + fitIntercept = true) + val (broadcastsNoIntercept, aggNoIntercept) = getNewAggregator(instances, coefficients, + fitIntercept = false) instances.foreach(aggIntercept.add) instances.foreach(aggNoIntercept.add) // least squares agg does not include intercept in its gradient array assert(aggIntercept.gradient.size === 2) assert(aggNoIntercept.gradient.size === 2) + broadcastsIntercept.foreach(_.destroy(blocking = false)) + broadcastsNoIntercept.foreach(_.destroy(blocking = false)) } test("check correctness") { @@ -111,7 +127,7 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte val yStd = math.sqrt(ySummarizer.variance(0)) val yMean = ySummarizer.mean(0) - val agg = getNewAggregator(instances, coefficients, fitIntercept = true) + val (broadcasts, agg) = getNewAggregator(instances, coefficients, fitIntercept = true) instances.foreach(agg.add) // compute (y - pred) analytically @@ -145,11 +161,12 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte test("check with zero standard deviation") { val coefficients = Vectors.dense(1.0, 2.0) - val aggConstantFeature = getNewAggregator(instancesConstantFeature, coefficients, - fitIntercept = true) + val (broadcastsConstantFeature, aggConstantFeature) = getNewAggregator(instancesConstantFeature, + coefficients, fitIntercept = true) instances.foreach(aggConstantFeature.add) // constant features should not affect gradient assert(aggConstantFeature.gradient(0) === 0.0) + broadcastsConstantFeature.foreach(_.destroy(blocking = false)) withClue("LeastSquaresAggregator does not support zero standard deviation of the label") { intercept[IllegalArgumentException] { diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala index 4370823597c38..ebc8ff87cc524 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala @@ -16,7 +16,10 @@ */ package org.apache.spark.ml.optim.aggregator +import scala.util.{Failure, Success, Try} + import org.apache.spark.SparkFunSuite +import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{BLAS, Matrices, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ @@ -43,44 +46,54 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { ) } - /** Get summary statistics for some data and create a new LogisticAggregator. */ private def getNewAggregator( instances: Array[Instance], coefficients: Vector, fitIntercept: Boolean, - isMultinomial: Boolean): LogisticAggregator = { + isMultinomial: Boolean): (Seq[Broadcast[_]], LogisticAggregator) = { val (featuresSummarizer, ySummarizer) = DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) val numClasses = ySummarizer.histogram.length val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) val bcCoefficients = spark.sparkContext.broadcast(coefficients) - new LogisticAggregator(bcFeaturesStd, numClasses, fitIntercept, isMultinomial)(bcCoefficients) + val broadcasts = List(bcFeaturesStd, bcCoefficients) + val x = Try(5).failed + val agg = Try(new LogisticAggregator(bcFeaturesStd, numClasses, fitIntercept, + isMultinomial)(bcCoefficients)) match { + case Success(a) => a + case Failure(exception) => + broadcasts.foreach(_.destroy(blocking = false)) + throw exception + } + (broadcasts, agg) } test("aggregator add method input size") { val coefArray = Array(1.0, 2.0, -2.0, 3.0, 0.0, -1.0) val interceptArray = Array(4.0, 2.0, -3.0) - val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + val (broadcasts, agg) = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), fitIntercept = true, isMultinomial = true) withClue("LogisticAggregator features dimension must match coefficients dimension") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, 1.0, Vectors.dense(2.0))) } } + broadcasts.foreach(_.destroy(blocking = false)) } test("negative weight") { val coefArray = Array(1.0, 2.0, -2.0, 3.0, 0.0, -1.0) val interceptArray = Array(4.0, 2.0, -3.0) - val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + val (broadcasts, agg) = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), fitIntercept = true, isMultinomial = true) withClue("LogisticAggregator does not support negative instance weights") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0))) } } + broadcasts.foreach(_.destroy(blocking = false)) } test("check sizes multinomial") { @@ -91,15 +104,17 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { Array.fill(numClasses * (numFeatures + 1))(rng.nextDouble)) val coefWithoutIntercept = Vectors.dense( Array.fill(numClasses * numFeatures)(rng.nextDouble)) - val aggIntercept = getNewAggregator(instances, coefWithIntercept, fitIntercept = true, - isMultinomial = true) - val aggNoIntercept = getNewAggregator(instances, coefWithoutIntercept, fitIntercept = false, - isMultinomial = true) + val (broadcastIntercept, aggIntercept) = getNewAggregator(instances, coefWithIntercept, + fitIntercept = true, isMultinomial = true) + val (broadcastNoIntercept, aggNoIntercept) = getNewAggregator(instances, coefWithoutIntercept, + fitIntercept = false, isMultinomial = true) instances.foreach(aggIntercept.add) instances.foreach(aggNoIntercept.add) assert(aggIntercept.gradient.size === (numFeatures + 1) * numClasses) assert(aggNoIntercept.gradient.size === numFeatures * numClasses) + broadcastIntercept.foreach(_.destroy(blocking = false)) + broadcastNoIntercept.foreach(_.destroy(blocking = false)) } test("check sizes binomial") { @@ -108,15 +123,17 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { val numFeatures = binaryInstances.head.features.size val coefWithIntercept = Vectors.dense(Array.fill(numFeatures + 1)(rng.nextDouble)) val coefWithoutIntercept = Vectors.dense(Array.fill(numFeatures)(rng.nextDouble)) - val aggIntercept = getNewAggregator(binaryInstances, coefWithIntercept, fitIntercept = true, - isMultinomial = false) - val aggNoIntercept = getNewAggregator(binaryInstances, coefWithoutIntercept, - fitIntercept = false, isMultinomial = false) + val (broadcastIntercept, aggIntercept) = getNewAggregator(binaryInstances, coefWithIntercept, + fitIntercept = true, isMultinomial = false) + val (broadcastNoIntercept, aggNoIntercept) = getNewAggregator(binaryInstances, + coefWithoutIntercept, fitIntercept = false, isMultinomial = false) binaryInstances.foreach(aggIntercept.add) binaryInstances.foreach(aggNoIntercept.add) assert(aggIntercept.gradient.size === numFeatures + 1) assert(aggNoIntercept.gradient.size === numFeatures) + broadcastIntercept.foreach(_.destroy(blocking = false)) + broadcastNoIntercept.foreach(_.destroy(blocking = false)) } test("check correctness multinomial") { @@ -133,7 +150,7 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) val weightSum = instances.map(_.weight).sum - val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + val (broadcasts, agg) = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), fitIntercept = true, isMultinomial = true) instances.foreach(agg.add) @@ -182,6 +199,7 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { assert(loss ~== agg.loss relTol 0.01) assert(gradient ~== agg.gradient relTol 0.01) + broadcasts.foreach(_.destroy(blocking = false)) } test("check correctness binomial") { @@ -199,8 +217,8 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) val weightSum = binaryInstances.map(_.weight).sum - val agg = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)), - fitIntercept = true, isMultinomial = false) + val (broadcasts, agg) = getNewAggregator(binaryInstances, + Vectors.dense(coefArray ++ Array(intercept)), fitIntercept = true, isMultinomial = false) binaryInstances.foreach(agg.add) // compute the loss @@ -228,6 +246,7 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { assert(loss ~== agg.loss relTol 0.01) assert(gradient ~== agg.gradient relTol 0.01) + broadcasts.foreach(_.destroy(blocking = false)) } test("check with zero standard deviation") { @@ -236,19 +255,21 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { } val coefArray = Array(1.0, 2.0, -2.0, 3.0, 0.0, -1.0) val interceptArray = Array(4.0, 2.0, -3.0) - val aggConstantFeature = getNewAggregator(instancesConstantFeature, + val (broadcastConstantFeature, aggConstantFeature) = getNewAggregator(instancesConstantFeature, Vectors.dense(coefArray ++ interceptArray), fitIntercept = true, isMultinomial = true) instances.foreach(aggConstantFeature.add) // constant features should not affect gradient assert(aggConstantFeature.gradient(0) === 0.0) + broadcastConstantFeature.foreach(_.destroy(blocking = false)) val binaryCoefArray = Array(1.0, 2.0) val intercept = 1.0 - val aggConstantFeatureBinary = getNewAggregator(binaryInstances, + val (broadcastConstantBinary, aggConstantFeatureBinary) = getNewAggregator(binaryInstances, Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true, isMultinomial = false) instances.foreach(aggConstantFeatureBinary.add) // constant features should not affect gradient assert(aggConstantFeatureBinary.gradient(0) === 0.0) + broadcastConstantBinary.foreach(_.destroy(blocking = false)) } } From cb40b3189b546283635683d7711e0e2e28366034 Mon Sep 17 00:00:00 2001 From: sethah Date: Tue, 18 Jul 2017 09:55:44 -0700 Subject: [PATCH 7/7] backout broadcast destroy --- .../LeastSquaresAggregatorSuite.scala | 37 ++++-------- .../aggregator/LogisticAggregatorSuite.scala | 56 ++++++------------- 2 files changed, 27 insertions(+), 66 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala index f8bdb50f4199e..35b6944624707 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala @@ -16,10 +16,7 @@ */ package org.apache.spark.ml.optim.aggregator -import scala.util.{Failure, Success, Try} - import org.apache.spark.SparkFunSuite -import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ @@ -56,7 +53,7 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte private def getNewAggregator( instances: Array[Instance], coefficients: Vector, - fitIntercept: Boolean): (Seq[Broadcast[_]], LeastSquaresAggregator) = { + fitIntercept: Boolean): LeastSquaresAggregator = { val (featuresSummarizer, ySummarizer) = getRegressionSummarizers(instances) val yStd = math.sqrt(ySummarizer.variance(0)) val yMean = ySummarizer.mean(0) @@ -65,53 +62,40 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte val featuresMean = featuresSummarizer.mean val bcFeaturesMean = spark.sparkContext.broadcast(featuresMean.toArray) val bcCoefficients = spark.sparkContext.broadcast(coefficients) - val broadcasts = List(bcCoefficients, bcFeaturesMean, bcFeaturesStd) - val agg = Try(new LeastSquaresAggregator(yStd, yMean, fitIntercept, bcFeaturesStd, - bcFeaturesMean)(bcCoefficients)) match { - case Success(a) => a - case Failure(exception) => - broadcasts.foreach(_.destroy(blocking = false)) - throw exception - } - (broadcasts, agg) + new LeastSquaresAggregator(yStd, yMean, fitIntercept, bcFeaturesStd, + bcFeaturesMean)(bcCoefficients) } test("aggregator add method input size") { val coefficients = Vectors.dense(1.0, 2.0) - val (broadcasts, agg) = getNewAggregator(instances, coefficients, fitIntercept = true) + val agg = getNewAggregator(instances, coefficients, fitIntercept = true) withClue("LeastSquaresAggregator features dimension must match coefficients dimension") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, 1.0, Vectors.dense(2.0))) } } - broadcasts.foreach(_.destroy(blocking = false)) } test("negative weight") { val coefficients = Vectors.dense(1.0, 2.0) - val (broadcasts, agg) = getNewAggregator(instances, coefficients, fitIntercept = true) + val agg = getNewAggregator(instances, coefficients, fitIntercept = true) withClue("LeastSquaresAggregator does not support negative instance weights.") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0))) } } - broadcasts.foreach(_.destroy(blocking = false)) } test("check sizes") { val coefficients = Vectors.dense(1.0, 2.0) - val (broadcastsIntercept, aggIntercept) = getNewAggregator(instances, coefficients, - fitIntercept = true) - val (broadcastsNoIntercept, aggNoIntercept) = getNewAggregator(instances, coefficients, - fitIntercept = false) + val aggIntercept = getNewAggregator(instances, coefficients, fitIntercept = true) + val aggNoIntercept = getNewAggregator(instances, coefficients, fitIntercept = false) instances.foreach(aggIntercept.add) instances.foreach(aggNoIntercept.add) // least squares agg does not include intercept in its gradient array assert(aggIntercept.gradient.size === 2) assert(aggNoIntercept.gradient.size === 2) - broadcastsIntercept.foreach(_.destroy(blocking = false)) - broadcastsNoIntercept.foreach(_.destroy(blocking = false)) } test("check correctness") { @@ -127,7 +111,7 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte val yStd = math.sqrt(ySummarizer.variance(0)) val yMean = ySummarizer.mean(0) - val (broadcasts, agg) = getNewAggregator(instances, coefficients, fitIntercept = true) + val agg = getNewAggregator(instances, coefficients, fitIntercept = true) instances.foreach(agg.add) // compute (y - pred) analytically @@ -161,12 +145,11 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte test("check with zero standard deviation") { val coefficients = Vectors.dense(1.0, 2.0) - val (broadcastsConstantFeature, aggConstantFeature) = getNewAggregator(instancesConstantFeature, - coefficients, fitIntercept = true) + val aggConstantFeature = getNewAggregator(instancesConstantFeature, coefficients, + fitIntercept = true) instances.foreach(aggConstantFeature.add) // constant features should not affect gradient assert(aggConstantFeature.gradient(0) === 0.0) - broadcastsConstantFeature.foreach(_.destroy(blocking = false)) withClue("LeastSquaresAggregator does not support zero standard deviation of the label") { intercept[IllegalArgumentException] { diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala index ebc8ff87cc524..2b29c67d859db 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala @@ -16,10 +16,7 @@ */ package org.apache.spark.ml.optim.aggregator -import scala.util.{Failure, Success, Try} - import org.apache.spark.SparkFunSuite -import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{BLAS, Matrices, Vector, Vectors} import org.apache.spark.ml.util.TestingUtils._ @@ -51,49 +48,38 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { instances: Array[Instance], coefficients: Vector, fitIntercept: Boolean, - isMultinomial: Boolean): (Seq[Broadcast[_]], LogisticAggregator) = { + isMultinomial: Boolean): LogisticAggregator = { val (featuresSummarizer, ySummarizer) = DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) val numClasses = ySummarizer.histogram.length val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) val bcCoefficients = spark.sparkContext.broadcast(coefficients) - val broadcasts = List(bcFeaturesStd, bcCoefficients) - val x = Try(5).failed - val agg = Try(new LogisticAggregator(bcFeaturesStd, numClasses, fitIntercept, - isMultinomial)(bcCoefficients)) match { - case Success(a) => a - case Failure(exception) => - broadcasts.foreach(_.destroy(blocking = false)) - throw exception - } - (broadcasts, agg) + new LogisticAggregator(bcFeaturesStd, numClasses, fitIntercept, isMultinomial)(bcCoefficients) } test("aggregator add method input size") { val coefArray = Array(1.0, 2.0, -2.0, 3.0, 0.0, -1.0) val interceptArray = Array(4.0, 2.0, -3.0) - val (broadcasts, agg) = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), fitIntercept = true, isMultinomial = true) withClue("LogisticAggregator features dimension must match coefficients dimension") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, 1.0, Vectors.dense(2.0))) } } - broadcasts.foreach(_.destroy(blocking = false)) } test("negative weight") { val coefArray = Array(1.0, 2.0, -2.0, 3.0, 0.0, -1.0) val interceptArray = Array(4.0, 2.0, -3.0) - val (broadcasts, agg) = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), fitIntercept = true, isMultinomial = true) withClue("LogisticAggregator does not support negative instance weights") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0))) } } - broadcasts.foreach(_.destroy(blocking = false)) } test("check sizes multinomial") { @@ -104,17 +90,15 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { Array.fill(numClasses * (numFeatures + 1))(rng.nextDouble)) val coefWithoutIntercept = Vectors.dense( Array.fill(numClasses * numFeatures)(rng.nextDouble)) - val (broadcastIntercept, aggIntercept) = getNewAggregator(instances, coefWithIntercept, - fitIntercept = true, isMultinomial = true) - val (broadcastNoIntercept, aggNoIntercept) = getNewAggregator(instances, coefWithoutIntercept, - fitIntercept = false, isMultinomial = true) + val aggIntercept = getNewAggregator(instances, coefWithIntercept, fitIntercept = true, + isMultinomial = true) + val aggNoIntercept = getNewAggregator(instances, coefWithoutIntercept, fitIntercept = false, + isMultinomial = true) instances.foreach(aggIntercept.add) instances.foreach(aggNoIntercept.add) assert(aggIntercept.gradient.size === (numFeatures + 1) * numClasses) assert(aggNoIntercept.gradient.size === numFeatures * numClasses) - broadcastIntercept.foreach(_.destroy(blocking = false)) - broadcastNoIntercept.foreach(_.destroy(blocking = false)) } test("check sizes binomial") { @@ -123,17 +107,15 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { val numFeatures = binaryInstances.head.features.size val coefWithIntercept = Vectors.dense(Array.fill(numFeatures + 1)(rng.nextDouble)) val coefWithoutIntercept = Vectors.dense(Array.fill(numFeatures)(rng.nextDouble)) - val (broadcastIntercept, aggIntercept) = getNewAggregator(binaryInstances, coefWithIntercept, - fitIntercept = true, isMultinomial = false) - val (broadcastNoIntercept, aggNoIntercept) = getNewAggregator(binaryInstances, - coefWithoutIntercept, fitIntercept = false, isMultinomial = false) + val aggIntercept = getNewAggregator(binaryInstances, coefWithIntercept, fitIntercept = true, + isMultinomial = false) + val aggNoIntercept = getNewAggregator(binaryInstances, coefWithoutIntercept, + fitIntercept = false, isMultinomial = false) binaryInstances.foreach(aggIntercept.add) binaryInstances.foreach(aggNoIntercept.add) assert(aggIntercept.gradient.size === numFeatures + 1) assert(aggNoIntercept.gradient.size === numFeatures) - broadcastIntercept.foreach(_.destroy(blocking = false)) - broadcastNoIntercept.foreach(_.destroy(blocking = false)) } test("check correctness multinomial") { @@ -150,7 +132,7 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) val weightSum = instances.map(_.weight).sum - val (broadcasts, agg) = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), fitIntercept = true, isMultinomial = true) instances.foreach(agg.add) @@ -199,7 +181,6 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { assert(loss ~== agg.loss relTol 0.01) assert(gradient ~== agg.gradient relTol 0.01) - broadcasts.foreach(_.destroy(blocking = false)) } test("check correctness binomial") { @@ -217,8 +198,8 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) val weightSum = binaryInstances.map(_.weight).sum - val (broadcasts, agg) = getNewAggregator(binaryInstances, - Vectors.dense(coefArray ++ Array(intercept)), fitIntercept = true, isMultinomial = false) + val agg = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true, isMultinomial = false) binaryInstances.foreach(agg.add) // compute the loss @@ -246,7 +227,6 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { assert(loss ~== agg.loss relTol 0.01) assert(gradient ~== agg.gradient relTol 0.01) - broadcasts.foreach(_.destroy(blocking = false)) } test("check with zero standard deviation") { @@ -255,21 +235,19 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { } val coefArray = Array(1.0, 2.0, -2.0, 3.0, 0.0, -1.0) val interceptArray = Array(4.0, 2.0, -3.0) - val (broadcastConstantFeature, aggConstantFeature) = getNewAggregator(instancesConstantFeature, + val aggConstantFeature = getNewAggregator(instancesConstantFeature, Vectors.dense(coefArray ++ interceptArray), fitIntercept = true, isMultinomial = true) instances.foreach(aggConstantFeature.add) // constant features should not affect gradient assert(aggConstantFeature.gradient(0) === 0.0) - broadcastConstantFeature.foreach(_.destroy(blocking = false)) val binaryCoefArray = Array(1.0, 2.0) val intercept = 1.0 - val (broadcastConstantBinary, aggConstantFeatureBinary) = getNewAggregator(binaryInstances, + val aggConstantFeatureBinary = getNewAggregator(binaryInstances, Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true, isMultinomial = false) instances.foreach(aggConstantFeatureBinary.add) // constant features should not affect gradient assert(aggConstantFeatureBinary.gradient(0) === 0.0) - broadcastConstantBinary.foreach(_.destroy(blocking = false)) } }