From 34107aa9654d9a7418fc5db9a67de556efde42c1 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 3 Sep 2015 00:24:43 -0700 Subject: [PATCH 1/7] implement weighted least squares via normal equation --- .../spark/ml/optim/WeightedLeastSquares.scala | 178 ++++++++++++++++++ .../org/apache/spark/mllib/linalg/BLAS.scala | 6 + .../apache/spark/mllib/linalg/Vectors.scala | 2 + .../mllib/linalg/distributed/RowMatrix.scala | 2 +- .../ml/optim/WeightedLeastSquaresSuite.scala | 75 ++++++++ 5 files changed, 262 insertions(+), 1 deletion(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala create mode 100644 mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala new file mode 100644 index 0000000000000..44f576777bd61 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.optim + +import com.github.fommil.netlib.LAPACK.{getInstance => lapack} +import org.netlib.util.intW + +import org.apache.spark.Logging +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.linalg.distributed.RowMatrix +import org.apache.spark.rdd.RDD + +private[ml] class WeightedLeastSquaresModel( + val coefficients: DenseVector, + val intercept: Double) extends Serializable + +private[ml] class WeightedLeastSquares( + val fitIntercept: Boolean, + val regParam: Double, + val standardization: Boolean) extends Logging with Serializable { + import WeightedLeastSquares._ + + require(regParam >= 0.0, s"regParam cannot be negative: $regParam") + if (regParam == 0.0) { + logWarning("regParam is zero, which might cause numerical instability and overfit.") + } + + /** + * Creates a [[WeightedLeastSquaresModel]] from an RDD of [[Instance]]s. + */ + def fit(instances: RDD[Instance]): WeightedLeastSquaresModel = { + val summary = instances.treeAggregate(new Aggregator)(_.add(_), _.merge(_)) + assert(summary.initialized, "Training dataset is empty.") + assert(summary.wSum > 0.0, "Sum of weights cannot be zero.") + assert(summary.count > 1.0, "Must have more than one instances.") + val triK = summary.triK + val wSum = summary.wSum + val wwSum = summary.wwSum + val bSum = summary.bSum + val aSum = summary.aSum + val abSum = summary.abSum + val aaSum = summary.aaSum + val aaValues = aaSum.values + + // add regularization to diagonals + var i = 0 + var j = 2 + while (i < triK) { + val scale = if (standardization) { + val l = j - 2 + aaValues(i) - aSum(l) * aSum(l) / wSum + } else { + wSum + } + aaValues(i) += scale * regParam + i += j + j += 1 + } + + if (fitIntercept) { + // shift centers + BLAS.axpy(-bSum / wSum, aSum, abSum) + RowMatrix.dspr(-1.0 / wSum, aSum, aaValues) + } + + if (standardization) { + // correct bias + BLAS.scal(1.0 - wwSum / (wSum * wSum), abSum) + } + + val x = choleskySolve(aaValues, abSum) + + // compute intercept + val intercept = if (fitIntercept) { + (bSum - BLAS.dot(aSum, x)) / wSum + } else { + 0.0 + } + + new WeightedLeastSquaresModel(x, intercept) + } + + private def choleskySolve(A: Array[Double], bx: DenseVector): DenseVector = { + val k = bx.size + val info = new intW(0) + lapack.dppsv("U", k, 1, A, bx.values, k, info) + val code = info.`val` + assert(code == 0, s"lapack.dpotrs returned $code.") + bx + } +} + +private[ml] object WeightedLeastSquares { + + case class Instance(w: Double, a: Vector, b: Double) { + require(w >= 0.0, s"Weight cannot be negative: $w.") + } + + private class Aggregator extends Serializable { + var initialized: Boolean = false + var k: Int = _ + var count: Long = _ + var triK: Int = _ + var wSum: Double = _ + var wwSum: Double = _ + var bSum: Double = _ + var aSum: DenseVector = _ + var abSum: DenseVector = _ + var aaSum: DenseVector = _ + + private def init(k: Int): Unit = { + require(k <= 4096, "In order to take the normal equation approach efficiently, " + + s"we set the max number of features to 4096 but got $k.") + this.k = k + triK = k * (k + 1) / 2 + count = 0L + wSum = 0.0 + wwSum = 0.0 + bSum = 0.0 + aSum = DenseVector.zeros(k) + abSum = DenseVector.zeros(k) + aaSum = DenseVector.zeros(triK) + initialized = true + } + + def add(instance: Instance): this.type = { + val Instance(w, a, b) = instance + val ak = a.size + if (!initialized) { + init(ak) + initialized = true + } + assert(ak == k, s"Dimension mismatch. Expect vectors of size $k but got $ak.") + count += 1L + wSum += w + wwSum += w * w + bSum += w * b + BLAS.axpy(w, a, aSum) + BLAS.axpy(w * b, a, abSum) + RowMatrix.dspr(w, a, aaSum.values) + this + } + + def merge(other: Aggregator): this.type = { + if (!other.initialized) { + this + } else { + if (!initialized) { + init(other.k) + } + assert(k == other.k) + count += other.count + wSum += other.wSum + wwSum += other.wwSum + bSum += other.bSum + BLAS.axpy(1.0, other.aSum, aSum) + BLAS.axpy(1.0, other.abSum, abSum) + BLAS.axpy(1.0, other.aaSum, aaSum) + this + } + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala index ab475af264dd3..06e94d33ef165 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala @@ -92,6 +92,12 @@ private[spark] object BLAS extends Serializable with Logging { } } + private[spark] def axpy(a: Double, X: DenseMatrix, Y: DenseMatrix): Unit = { + require(X.numRows == Y.numRows && X.numCols == Y.numCols, "Dimension mismatch: " + + s"size(X) = ${(X.numRows, X.numCols)} but size(Y) = ${(Y.numRows, Y.numCols)}.") + f2jBLAS.daxpy(X.numRows * X.numCols, a, X.values, 1, Y.values, 1) + } + /** * dot(x, y) */ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 3642e9286504f..c8d393e32cd85 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -655,6 +655,8 @@ object DenseVector { /** Extracts the value array from a dense vector. */ @Since("1.3.0") def unapply(dv: DenseVector): Option[Array[Double]] = Some(dv.values) + + def zeros(size: Int): DenseVector = new DenseVector(Array.ofDim(size)) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 9a423ddafdc09..8ef1f459577a1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -678,7 +678,7 @@ object RowMatrix { * * @param U the upper triangular part of the matrix packed in an array (column major) */ - private def dspr(alpha: Double, v: Vector, U: Array[Double]): Unit = { + private[spark] def dspr(alpha: Double, v: Vector, U: Array[Double]): Unit = { // TODO: Find a better home (breeze?) for this method. val n = v.size v match { diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala new file mode 100644 index 0000000000000..f7db270288fbb --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.optim + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.optim.WeightedLeastSquares.Instance +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ +import org.apache.spark.rdd.RDD + +class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext { + + private var instances: RDD[Instance] = _ + + override def beforeAll(): Unit = { + super.beforeAll() + instances = sc.parallelize(Seq( + Instance(1.0, Vectors.dense(0.0, 5.0).toSparse, 17.0), + Instance(2.0, Vectors.dense(1.0, 7.0), 19.0), + Instance(3.0, Vectors.dense(2.0, 11.0), 23.0), + Instance(4.0, Vectors.dense(3.0, 13.0), 29.0) + ), 2) + } + + test("WLS against glmnet") { + /* + R code: + +library(glmnet) + +A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) +b <- c(17, 19, 23, 29) +w <- c(1, 2, 3, 4) + +for (intercept in c(FALSE, TRUE)) { + model <- glmnet(A, b, weights=w, intercept=intercept, lambda=0.0, standardize=FALSE, alpha=0) + print(as.vector(coef(model))) +} + +[1] 0.000000 -3.713230 3.007162 +[1] 17.9935922 6.0267241 -0.5814462 + */ + + val expected = Seq( + Vectors.dense(0.0, -3.713230, 3.007162), + Vectors.dense(17.9935922, 6.0267241, -0.5814462)) + + var idx = 0 + for (fitIntercept <- Seq(false, true); + regParam <- Seq(0.0); + standardization <- Seq(false)) { + val wls = new WeightedLeastSquares(fitIntercept, regParam, standardization) + .fit(instances) + val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) + assert(actual ~== expected(idx) absTol 1e-1) + idx += 1 + } + } +} From c75ff923428052c68e43004f5f2488cf0b3d72dc Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 3 Sep 2015 10:51:07 -0700 Subject: [PATCH 2/7] tighten threshold --- .../spark/ml/optim/WeightedLeastSquaresSuite.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala index f7db270288fbb..16dce7b051948 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala @@ -49,17 +49,18 @@ b <- c(17, 19, 23, 29) w <- c(1, 2, 3, 4) for (intercept in c(FALSE, TRUE)) { - model <- glmnet(A, b, weights=w, intercept=intercept, lambda=0.0, standardize=FALSE, alpha=0) + model <- glmnet(A, b, weights=w, intercept=intercept, lambda=0.0, standardize=FALSE, + alpha=0, thresh=1E-14) print(as.vector(coef(model))) } -[1] 0.000000 -3.713230 3.007162 -[1] 17.9935922 6.0267241 -0.5814462 +[1] 0.000000 -3.727117 3.009982 +[1] 18.0799727 6.0799832 -0.5999941 */ val expected = Seq( - Vectors.dense(0.0, -3.713230, 3.007162), - Vectors.dense(17.9935922, 6.0267241, -0.5814462)) + Vectors.dense(0.0, -3.727117, 3.009982), + Vectors.dense(18.0799727, 6.0799832, -0.5999941)) var idx = 0 for (fitIntercept <- Seq(false, true); @@ -68,7 +69,7 @@ for (intercept in c(FALSE, TRUE)) { val wls = new WeightedLeastSquares(fitIntercept, regParam, standardization) .fit(instances) val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) - assert(actual ~== expected(idx) absTol 1e-1) + assert(actual ~== expected(idx) absTol 1e-2) idx += 1 } } From 03c56c9bf6b3b59a192ea4eebbb6fffd00d7aeb8 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 3 Sep 2015 20:36:01 -0700 Subject: [PATCH 3/7] match glmnet (pending standardization --- .../spark/ml/optim/WeightedLeastSquares.scala | 86 +++++++++++++------ .../ml/optim/WeightedLeastSquaresSuite.scala | 29 +++++-- 2 files changed, 83 insertions(+), 32 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 44f576777bd61..a9ad7a9fae5cf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -46,48 +46,41 @@ private[ml] class WeightedLeastSquares( def fit(instances: RDD[Instance]): WeightedLeastSquaresModel = { val summary = instances.treeAggregate(new Aggregator)(_.add(_), _.merge(_)) assert(summary.initialized, "Training dataset is empty.") - assert(summary.wSum > 0.0, "Sum of weights cannot be zero.") - assert(summary.count > 1.0, "Must have more than one instances.") + // assert(summary.wSum > 0.0, "Sum of weights cannot be zero.") + // assert(summary.count > 1.0, "Must have more than one instances.") val triK = summary.triK - val wSum = summary.wSum - val wwSum = summary.wwSum - val bSum = summary.bSum - val aSum = summary.aSum - val abSum = summary.abSum - val aaSum = summary.aaSum - val aaValues = aaSum.values + val bBar = summary.bBar + val bVar = summary.bVar + val aBar = summary.aBar + val aVar = summary.aVar + val abBar = summary.abBar + val aaBar = summary.aaBar + val aaValues = aaBar.values + + if (fitIntercept) { + RowMatrix.dspr(-1.0, aBar, aaValues) + BLAS.axpy(-bBar, aBar, abBar) + } // add regularization to diagonals var i = 0 var j = 2 while (i < triK) { val scale = if (standardization) { - val l = j - 2 - aaValues(i) - aSum(l) * aSum(l) / wSum + aVar(j - 2) / math.sqrt(bVar) } else { - wSum + 1.0 / math.sqrt(bVar) } aaValues(i) += scale * regParam i += j j += 1 } - if (fitIntercept) { - // shift centers - BLAS.axpy(-bSum / wSum, aSum, abSum) - RowMatrix.dspr(-1.0 / wSum, aSum, aaValues) - } - - if (standardization) { - // correct bias - BLAS.scal(1.0 - wwSum / (wSum * wSum), abSum) - } - - val x = choleskySolve(aaValues, abSum) + val x = choleskySolve(aaBar.values, abBar) // compute intercept val intercept = if (fitIntercept) { - (bSum - BLAS.dot(aSum, x)) / wSum + bBar - BLAS.dot(aBar, x) } else { 0.0 } @@ -119,6 +112,7 @@ private[ml] object WeightedLeastSquares { var wSum: Double = _ var wwSum: Double = _ var bSum: Double = _ + var bbSum: Double = _ var aSum: DenseVector = _ var abSum: DenseVector = _ var aaSum: DenseVector = _ @@ -132,6 +126,7 @@ private[ml] object WeightedLeastSquares { wSum = 0.0 wwSum = 0.0 bSum = 0.0 + bbSum = 0.0 aSum = DenseVector.zeros(k) abSum = DenseVector.zeros(k) aaSum = DenseVector.zeros(triK) @@ -150,6 +145,7 @@ private[ml] object WeightedLeastSquares { wSum += w wwSum += w * w bSum += w * b + bbSum += w * b * b BLAS.axpy(w, a, aSum) BLAS.axpy(w * b, a, abSum) RowMatrix.dspr(w, a, aaSum.values) @@ -168,11 +164,51 @@ private[ml] object WeightedLeastSquares { wSum += other.wSum wwSum += other.wwSum bSum += other.bSum + bbSum += other.bbSum BLAS.axpy(1.0, other.aSum, aSum) BLAS.axpy(1.0, other.abSum, abSum) BLAS.axpy(1.0, other.aaSum, aaSum) this } } + + def aBar: DenseVector = { + val output = aSum.copy + BLAS.scal(1.0 / wSum, output) + output + } + + def bBar: Double = bSum / wSum + + def bVar: Double = bbSum / wSum - bBar * bBar + + def abBar: DenseVector = { + val output = abSum.copy + BLAS.scal(1.0 / wSum, output) + output + } + + def aaBar: DenseVector = { + val output = aaSum.copy + BLAS.scal(1.0 / wSum, output) + output + } + + def aVar: DenseVector = { + val variance = Array.ofDim[Double](k) + var i = 0 + var j = 2 + val aaValues = aaSum.values + while (i < triK) { + val l = j - 2 + variance(l) = wSum * aaValues(i) - aSum(l) * aSum(l) + i += j + j += 1 + } + val output = new DenseVector(variance) + // correct bias + BLAS.scal(1.0 / (wSum * wSum), output) + output + } } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala index 16dce7b051948..266e11916998f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala @@ -49,28 +49,43 @@ b <- c(17, 19, 23, 29) w <- c(1, 2, 3, 4) for (intercept in c(FALSE, TRUE)) { - model <- glmnet(A, b, weights=w, intercept=intercept, lambda=0.0, standardize=FALSE, - alpha=0, thresh=1E-14) - print(as.vector(coef(model))) + for (lambda in c(0.0, 0.1, 1.0)) { + for (standardize in c(FALSE)) { + model <- glmnet(A, b, weights=w, intercept=intercept, lambda=lambda, standardize=standardize, + alpha=0, thresh=1E-14) + print(as.vector(coef(model))) + } + } } [1] 0.000000 -3.727117 3.009982 +[1] 0.000000 -3.307532 2.924206 +[1] 0.000000 -1.526575 2.558158 [1] 18.0799727 6.0799832 -0.5999941 +[1] 13.5356178 3.2714044 0.3770744 +[1] 10.1238013 0.9708569 1.1475466 */ val expected = Seq( - Vectors.dense(0.0, -3.727117, 3.009982), - Vectors.dense(18.0799727, 6.0799832, -0.5999941)) + Vectors.dense(0.000000, -3.727117, 3.009982), + Vectors.dense(0.000000, -3.307532, 2.924206), + Vectors.dense(0.000000, -1.526575, 2.558158), + Vectors.dense(18.0799727, 6.0799832, -0.5999941), + Vectors.dense(13.5356178, 3.2714044, 0.3770744), + Vectors.dense(10.1238013, 0.9708569, 1.1475466)) var idx = 0 for (fitIntercept <- Seq(false, true); - regParam <- Seq(0.0); + regParam <- Seq(0.0, 0.1, 1.0); standardization <- Seq(false)) { val wls = new WeightedLeastSquares(fitIntercept, regParam, standardization) .fit(instances) val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) - assert(actual ~== expected(idx) absTol 1e-2) + println(actual, expected(idx)) + // assert(actual ~== expected(idx) absTol 1e-2) idx += 1 } + + assert(false) } } From 3111d9561940126ebfa82a573f194f627e4fcd1b Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 3 Sep 2015 20:41:58 -0700 Subject: [PATCH 4/7] match standardization --- .../ml/optim/WeightedLeastSquaresSuite.scala | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala index 266e11916998f..69f45cc7f76c5 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala @@ -50,7 +50,7 @@ w <- c(1, 2, 3, 4) for (intercept in c(FALSE, TRUE)) { for (lambda in c(0.0, 0.1, 1.0)) { - for (standardize in c(FALSE)) { + for (standardize in c(FALSE, TRUE)) { model <- glmnet(A, b, weights=w, intercept=intercept, lambda=lambda, standardize=standardize, alpha=0, thresh=1E-14) print(as.vector(coef(model))) @@ -58,34 +58,44 @@ for (intercept in c(FALSE, TRUE)) { } } +[1] 0.000000 -3.727117 3.009982 [1] 0.000000 -3.727117 3.009982 [1] 0.000000 -3.307532 2.924206 +[1] 0.000000 -2.914790 2.840627 [1] 0.000000 -1.526575 2.558158 +[1] 0.00000000 0.06984238 2.20488344 +[1] 18.0799727 6.0799832 -0.5999941 [1] 18.0799727 6.0799832 -0.5999941 [1] 13.5356178 3.2714044 0.3770744 +[1] 14.064629 3.565802 0.269593 [1] 10.1238013 0.9708569 1.1475466 +[1] 13.1860638 2.1761382 0.6213134 */ val expected = Seq( - Vectors.dense(0.000000, -3.727117, 3.009982), - Vectors.dense(0.000000, -3.307532, 2.924206), - Vectors.dense(0.000000, -1.526575, 2.558158), + Vectors.dense(0.0, -3.727117, 3.009982), + Vectors.dense(0.0, -3.727117, 3.009982), + Vectors.dense(0.0, -3.307532, 2.924206), + Vectors.dense(0.0, -2.914790, 2.840627), + Vectors.dense(0.0, -1.526575, 2.558158), + Vectors.dense(0.0, 0.06984238, 2.20488344), + Vectors.dense(18.0799727, 6.0799832, -0.5999941), Vectors.dense(18.0799727, 6.0799832, -0.5999941), Vectors.dense(13.5356178, 3.2714044, 0.3770744), - Vectors.dense(10.1238013, 0.9708569, 1.1475466)) + Vectors.dense(14.064629, 3.565802, 0.269593), + Vectors.dense(10.1238013, 0.9708569, 1.1475466), + Vectors.dense(13.1860638, 2.1761382, 0.6213134)) var idx = 0 for (fitIntercept <- Seq(false, true); regParam <- Seq(0.0, 0.1, 1.0); - standardization <- Seq(false)) { + standardization <- Seq(false, true)) { val wls = new WeightedLeastSquares(fitIntercept, regParam, standardization) .fit(instances) val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) - println(actual, expected(idx)) - // assert(actual ~== expected(idx) absTol 1e-2) + // println(actual, expected(idx)) + assert(actual ~== expected(idx) absTol 1e-4) idx += 1 } - - assert(false) } } From e05acb2a4aadbeda66c46861ce8d6b69c14f7a4e Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 3 Sep 2015 21:47:29 -0700 Subject: [PATCH 5/7] add doc --- .../spark/ml/optim/WeightedLeastSquares.scala | 120 ++++++++++++++---- .../ml/optim/WeightedLeastSquaresSuite.scala | 46 ++++++- 2 files changed, 137 insertions(+), 29 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index a9ad7a9fae5cf..c2fc2086d575c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -25,14 +25,43 @@ import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.rdd.RDD +/** + * Model fitted by [[WeightedLeastSquares]]. + * @param coefficients model coefficients + * @param intercept model intercept + */ private[ml] class WeightedLeastSquaresModel( val coefficients: DenseVector, val intercept: Double) extends Serializable +/** + * Weighted least squares solver via normal equation. + * Given weighted observations (w,,i,,, a,,i,,, b,,i,,), we use the following weighted least squares + * formulation: + * + * min,,x,z,, 1/2 sum,,i,, w,,i,, (a,,i,,^T^ x + z - b,,i,,)^2^ / sum,,i,, w_i + * + 1/2 lambda delta sum,,j,, (sigma,,j,, x,,j,,)^2^, + * + * where lambda is the regularization parameter, and delta and sigma,,j,, are controlled by + * [[standardizeLabel]] and [[standardizeFeatures]], respectively. + * + * Set [[regParam]] to 0.0 and turn off both [[standardizeFeatures]] and [[standardizeLabel]] to + * match R's `lm`. + * Turn on [[standardizeLabel]] to match R's `glmnet`. + * + * @param fitIntercept whether to fit intercept. If false, z is 0.0. + * @param regParam L2 regularization parameter (lambda) + * @param standardizeFeatures whether to standardize features. If true, sigma_,,j,, is the + * population standard deviation of the j-th column of A. Otherwise, + * sigma,,j,, is 1.0. + * @param standardizeLabel whether to standardize label. If true, delta is the population standard + * deviation of the label column b. Otherwise, delta is 1.0. + */ private[ml] class WeightedLeastSquares( val fitIntercept: Boolean, val regParam: Double, - val standardization: Boolean) extends Logging with Serializable { + val standardizeFeatures: Boolean, + val standardizeLabel: Boolean) extends Logging with Serializable { import WeightedLeastSquares._ require(regParam >= 0.0, s"regParam cannot be negative: $regParam") @@ -45,12 +74,11 @@ private[ml] class WeightedLeastSquares( */ def fit(instances: RDD[Instance]): WeightedLeastSquaresModel = { val summary = instances.treeAggregate(new Aggregator)(_.add(_), _.merge(_)) - assert(summary.initialized, "Training dataset is empty.") - // assert(summary.wSum > 0.0, "Sum of weights cannot be zero.") - // assert(summary.count > 1.0, "Must have more than one instances.") + summary.validate() + logInfo(s"Number of instances: ${summary.count}.") val triK = summary.triK val bBar = summary.bBar - val bVar = summary.bVar + val bStd = summary.bStd val aBar = summary.aBar val aVar = summary.aVar val abBar = summary.abBar @@ -66,10 +94,12 @@ private[ml] class WeightedLeastSquares( var i = 0 var j = 2 while (i < triK) { - val scale = if (standardization) { - aVar(j - 2) / math.sqrt(bVar) - } else { - 1.0 / math.sqrt(bVar) + var scale = 1.0 + if (standardizeFeatures) { + scale *= aVar(j - 2) + } + if (standardizeLabel) { + scale /= bStd } aaValues(i) += scale * regParam i += j @@ -88,6 +118,13 @@ private[ml] class WeightedLeastSquares( new WeightedLeastSquaresModel(x, intercept) } + /** + * Solves a symmetric positive definite linear system via Cholesky factorization. + * The input arguments are modified in-place to store the factorization and the solution. + * @param A the upper triangular part of A + * @param bx right-hand side + * @return the solution vector + */ private def choleskySolve(A: Array[Double], bx: DenseVector): DenseVector = { val k = bx.size val info = new intW(0) @@ -100,22 +137,31 @@ private[ml] class WeightedLeastSquares( private[ml] object WeightedLeastSquares { + /** + * Case class for weighted observations. + * @param w weight, must be positive + * @param a features + * @param b label + */ case class Instance(w: Double, a: Vector, b: Double) { require(w >= 0.0, s"Weight cannot be negative: $w.") } + /** + * Aggregator to provide necessary summary statistics for solving [[WeightedLeastSquares]]. + */ private class Aggregator extends Serializable { var initialized: Boolean = false var k: Int = _ var count: Long = _ var triK: Int = _ - var wSum: Double = _ - var wwSum: Double = _ - var bSum: Double = _ - var bbSum: Double = _ - var aSum: DenseVector = _ - var abSum: DenseVector = _ - var aaSum: DenseVector = _ + private var wSum: Double = _ + private var wwSum: Double = _ + private var bSum: Double = _ + private var bbSum: Double = _ + private var aSum: DenseVector = _ + private var abSum: DenseVector = _ + private var aaSum: DenseVector = _ private def init(k: Int): Unit = { require(k <= 4096, "In order to take the normal equation approach efficiently, " + @@ -133,6 +179,9 @@ private[ml] object WeightedLeastSquares { initialized = true } + /** + * Adds an instance. + */ def add(instance: Instance): this.type = { val Instance(w, a, b) = instance val ak = a.size @@ -152,6 +201,9 @@ private[ml] object WeightedLeastSquares { this } + /** + * Merges another [[Aggregator]]. + */ def merge(other: Aggregator): this.type = { if (!other.initialized) { this @@ -172,28 +224,54 @@ private[ml] object WeightedLeastSquares { } } + /** + * Validates that we have seen observations. + */ + def validate(): Unit = { + assert(initialized, "Training dataset is empty.") + assert(wSum > 0.0, "Sum of weights cannot be zero.") + } + + /** + * Weighted mean of features. + */ def aBar: DenseVector = { val output = aSum.copy BLAS.scal(1.0 / wSum, output) output } + /** + * Weighted mean of labels. + */ def bBar: Double = bSum / wSum - def bVar: Double = bbSum / wSum - bBar * bBar + /** + * Weighted population standard deviation of labels. + */ + def bStd: Double = math.sqrt(bbSum / wSum - bBar * bBar) + /** + * Weighted mean of (label * features). + */ def abBar: DenseVector = { val output = abSum.copy BLAS.scal(1.0 / wSum, output) output } + /** + * Weighted mean of (features * features^T^). + */ def aaBar: DenseVector = { val output = aaSum.copy BLAS.scal(1.0 / wSum, output) output } + /** + * Weighted population variance of features. + */ def aVar: DenseVector = { val variance = Array.ofDim[Double](k) var i = 0 @@ -201,14 +279,12 @@ private[ml] object WeightedLeastSquares { val aaValues = aaSum.values while (i < triK) { val l = j - 2 - variance(l) = wSum * aaValues(i) - aSum(l) * aSum(l) + val aw = aSum(l) / wSum + variance(l) = aaValues(i) / wSum - aw * aw i += j j += 1 } - val output = new DenseVector(variance) - // correct bias - BLAS.scal(1.0 / (wSum * wSum), output) - output + new DenseVector(variance) } } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala index 69f45cc7f76c5..d76c24f9631dc 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala @@ -30,6 +30,13 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext override def beforeAll(): Unit = { super.beforeAll() + /* + R code: + +A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) +b <- c(17, 19, 23, 29) +w <- c(1, 2, 3, 4) + */ instances = sc.parallelize(Seq( Instance(1.0, Vectors.dense(0.0, 5.0).toSparse, 17.0), Instance(2.0, Vectors.dense(1.0, 7.0), 19.0), @@ -38,16 +45,41 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext ), 2) } + test("WLS against lm") { + /* + R code: + +df <- as.data.frame(cbind(A, b)) +for (formula in c(b ~ . -1, b ~ .)) { + model <- lm(formula, data=df, weights=w) + print(as.vector(coef(model))) +} + +[1] -3.727121 3.009983 +[1] 18.08 6.08 -0.60 + */ + + val expected = Seq( + Vectors.dense(0.0, -3.727121, 3.009983), + Vectors.dense(18.08, 6.08, -0.60)) + + var idx = 0 + for (fitIntercept <- Seq(false, true)) { + val wls = new WeightedLeastSquares( + fitIntercept, regParam = 0.0, standardizeFeatures = false, standardizeLabel = false) + .fit(instances) + val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) + assert(actual ~== expected(idx) absTol 1e-4) + idx += 1 + } + } + test("WLS against glmnet") { /* R code: library(glmnet) -A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) -b <- c(17, 19, 23, 29) -w <- c(1, 2, 3, 4) - for (intercept in c(FALSE, TRUE)) { for (lambda in c(0.0, 0.1, 1.0)) { for (standardize in c(FALSE, TRUE)) { @@ -89,11 +121,11 @@ for (intercept in c(FALSE, TRUE)) { var idx = 0 for (fitIntercept <- Seq(false, true); regParam <- Seq(0.0, 0.1, 1.0); - standardization <- Seq(false, true)) { - val wls = new WeightedLeastSquares(fitIntercept, regParam, standardization) + standardizeFeatures <- Seq(false, true)) { + val wls = new WeightedLeastSquares( + fitIntercept, regParam, standardizeFeatures, standardizeLabel = true) .fit(instances) val actual = Vectors.dense(wls.intercept, wls.coefficients(0), wls.coefficients(1)) - // println(actual, expected(idx)) assert(actual ~== expected(idx) absTol 1e-4) idx += 1 } From 1614f2280603df92af049e6deb016cb1de768e80 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 3 Sep 2015 21:50:18 -0700 Subject: [PATCH 6/7] minor updates --- .../spark/ml/optim/WeightedLeastSquares.scala | 21 ++++++++++++------- .../org/apache/spark/mllib/linalg/BLAS.scala | 1 + .../apache/spark/mllib/linalg/Vectors.scala | 2 -- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index c2fc2086d575c..0f0e56e8d886a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -40,7 +40,7 @@ private[ml] class WeightedLeastSquaresModel( * formulation: * * min,,x,z,, 1/2 sum,,i,, w,,i,, (a,,i,,^T^ x + z - b,,i,,)^2^ / sum,,i,, w_i - * + 1/2 lambda delta sum,,j,, (sigma,,j,, x,,j,,)^2^, + * + 1/2 lambda / delta sum,,j,, (sigma,,j,, x,,j,,)^2^, * * where lambda is the regularization parameter, and delta and sigma,,j,, are controlled by * [[standardizeLabel]] and [[standardizeFeatures]], respectively. @@ -86,7 +86,10 @@ private[ml] class WeightedLeastSquares( val aaValues = aaBar.values if (fitIntercept) { + // shift centers + // A^T A - aBar aBar^T RowMatrix.dspr(-1.0, aBar, aaValues) + // A^T b - bBar aBar BLAS.axpy(-bBar, aBar, abBar) } @@ -94,14 +97,15 @@ private[ml] class WeightedLeastSquares( var i = 0 var j = 2 while (i < triK) { - var scale = 1.0 + var lambda = regParam if (standardizeFeatures) { - scale *= aVar(j - 2) + lambda *= aVar(j - 2) } if (standardizeLabel) { - scale /= bStd + // TODO: handle the case when bStd = 0 + lambda /= bStd } - aaValues(i) += scale * regParam + aaValues(i) += lambda i += j j += 1 } @@ -150,6 +154,7 @@ private[ml] object WeightedLeastSquares { /** * Aggregator to provide necessary summary statistics for solving [[WeightedLeastSquares]]. */ + // TODO: consolidate aggregates for summary statistics private class Aggregator extends Serializable { var initialized: Boolean = false var k: Int = _ @@ -173,9 +178,9 @@ private[ml] object WeightedLeastSquares { wwSum = 0.0 bSum = 0.0 bbSum = 0.0 - aSum = DenseVector.zeros(k) - abSum = DenseVector.zeros(k) - aaSum = DenseVector.zeros(triK) + aSum = new DenseVector(Array.ofDim(k)) + abSum = new DenseVector(Array.ofDim(k)) + aaSum = new DenseVector(Array.ofDim(triK)) initialized = true } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala index 06e94d33ef165..9ee81eda8a8c0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala @@ -92,6 +92,7 @@ private[spark] object BLAS extends Serializable with Logging { } } + /** Y += a * x */ private[spark] def axpy(a: Double, X: DenseMatrix, Y: DenseMatrix): Unit = { require(X.numRows == Y.numRows && X.numCols == Y.numCols, "Dimension mismatch: " + s"size(X) = ${(X.numRows, X.numCols)} but size(Y) = ${(Y.numRows, Y.numCols)}.") diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index c8d393e32cd85..3642e9286504f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -655,8 +655,6 @@ object DenseVector { /** Extracts the value array from a dense vector. */ @Since("1.3.0") def unapply(dv: DenseVector): Option[Array[Double]] = Some(dv.values) - - def zeros(size: Int): DenseVector = new DenseVector(Array.ofDim(size)) } /** From c2ec746ab6e9aee84dc984c912ab4f0ee2b4e75e Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 8 Sep 2015 11:04:58 -0700 Subject: [PATCH 7/7] address comments --- .../spark/ml/optim/WeightedLeastSquares.scala | 5 +- .../mllib/linalg/distributed/RowMatrix.scala | 1 + .../ml/optim/WeightedLeastSquaresSuite.scala | 68 +++++++++---------- 3 files changed, 38 insertions(+), 36 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 0f0e56e8d886a..a99e2ac4c6913 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -66,7 +66,7 @@ private[ml] class WeightedLeastSquares( require(regParam >= 0.0, s"regParam cannot be negative: $regParam") if (regParam == 0.0) { - logWarning("regParam is zero, which might cause numerical instability and overfit.") + logWarning("regParam is zero, which might cause numerical instability and overfitting.") } /** @@ -129,6 +129,7 @@ private[ml] class WeightedLeastSquares( * @param bx right-hand side * @return the solution vector */ + // TODO: SPARK-10490 - consolidate this and the Cholesky solver in ALS private def choleskySolve(A: Array[Double], bx: DenseVector): DenseVector = { val k = bx.size val info = new intW(0) @@ -216,7 +217,7 @@ private[ml] object WeightedLeastSquares { if (!initialized) { init(other.k) } - assert(k == other.k) + assert(k == other.k, s"dimension mismatch: this.k = $k but other.k = ${other.k}") count += other.count wSum += other.wSum wwSum += other.wwSum diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 8ef1f459577a1..83779ac88989b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -678,6 +678,7 @@ object RowMatrix { * * @param U the upper triangular part of the matrix packed in an array (column major) */ + // TODO: SPARK-10491 - move this method to linalg.BLAS private[spark] def dspr(alpha: Double, v: Vector, U: Array[Double]): Unit = { // TODO: Find a better home (breeze?) for this method. val n = v.size diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala index d76c24f9631dc..652f3adb984d3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/WeightedLeastSquaresSuite.scala @@ -33,9 +33,9 @@ class WeightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext /* R code: -A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) -b <- c(17, 19, 23, 29) -w <- c(1, 2, 3, 4) + A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) + b <- c(17, 19, 23, 29) + w <- c(1, 2, 3, 4) */ instances = sc.parallelize(Seq( Instance(1.0, Vectors.dense(0.0, 5.0).toSparse, 17.0), @@ -49,14 +49,14 @@ w <- c(1, 2, 3, 4) /* R code: -df <- as.data.frame(cbind(A, b)) -for (formula in c(b ~ . -1, b ~ .)) { - model <- lm(formula, data=df, weights=w) - print(as.vector(coef(model))) -} + df <- as.data.frame(cbind(A, b)) + for (formula in c(b ~ . -1, b ~ .)) { + model <- lm(formula, data=df, weights=w) + print(as.vector(coef(model))) + } -[1] -3.727121 3.009983 -[1] 18.08 6.08 -0.60 + [1] -3.727121 3.009983 + [1] 18.08 6.08 -0.60 */ val expected = Seq( @@ -78,30 +78,30 @@ for (formula in c(b ~ . -1, b ~ .)) { /* R code: -library(glmnet) - -for (intercept in c(FALSE, TRUE)) { - for (lambda in c(0.0, 0.1, 1.0)) { - for (standardize in c(FALSE, TRUE)) { - model <- glmnet(A, b, weights=w, intercept=intercept, lambda=lambda, standardize=standardize, - alpha=0, thresh=1E-14) - print(as.vector(coef(model))) - } - } -} - -[1] 0.000000 -3.727117 3.009982 -[1] 0.000000 -3.727117 3.009982 -[1] 0.000000 -3.307532 2.924206 -[1] 0.000000 -2.914790 2.840627 -[1] 0.000000 -1.526575 2.558158 -[1] 0.00000000 0.06984238 2.20488344 -[1] 18.0799727 6.0799832 -0.5999941 -[1] 18.0799727 6.0799832 -0.5999941 -[1] 13.5356178 3.2714044 0.3770744 -[1] 14.064629 3.565802 0.269593 -[1] 10.1238013 0.9708569 1.1475466 -[1] 13.1860638 2.1761382 0.6213134 + library(glmnet) + + for (intercept in c(FALSE, TRUE)) { + for (lambda in c(0.0, 0.1, 1.0)) { + for (standardize in c(FALSE, TRUE)) { + model <- glmnet(A, b, weights=w, intercept=intercept, lambda=lambda, + standardize=standardize, alpha=0, thresh=1E-14) + print(as.vector(coef(model))) + } + } + } + + [1] 0.000000 -3.727117 3.009982 + [1] 0.000000 -3.727117 3.009982 + [1] 0.000000 -3.307532 2.924206 + [1] 0.000000 -2.914790 2.840627 + [1] 0.000000 -1.526575 2.558158 + [1] 0.00000000 0.06984238 2.20488344 + [1] 18.0799727 6.0799832 -0.5999941 + [1] 18.0799727 6.0799832 -0.5999941 + [1] 13.5356178 3.2714044 0.3770744 + [1] 14.064629 3.565802 0.269593 + [1] 10.1238013 0.9708569 1.1475466 + [1] 13.1860638 2.1761382 0.6213134 */ val expected = Seq(