From 5ca3fd1d8e9d5fa6ae0daf24c83e72ef96045104 Mon Sep 17 00:00:00 2001 From: Peng Meng Date: Thu, 13 Jul 2017 15:33:45 +0800 Subject: [PATCH 1/9] add poll for PriorityQueue --- .../scala/org/apache/spark/util/BoundedPriorityQueue.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala index 1b2b1932e0c3..eff0aa4453f0 100644 --- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala +++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala @@ -51,6 +51,10 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin this } + def poll(): A = { + underlying.poll() + } + override def +=(elem1: A, elem2: A, elems: A*): this.type = { this += elem1 += elem2 ++= elems } From 215efc3114012ebc19af984a3d0172aecb22f255 Mon Sep 17 00:00:00 2001 From: Peng Meng Date: Thu, 13 Jul 2017 18:39:44 +0800 Subject: [PATCH 2/9] test pass --- .../MatrixFactorizationModel.scala | 131 +++++++++++++++--- 1 file changed, 108 insertions(+), 23 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index ac709ad72f0c..ed0d79919498 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -20,6 +20,8 @@ package org.apache.spark.mllib.recommendation import java.io.IOException import java.lang.{Integer => JavaInteger} +import scala.collection.mutable + import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import com.github.fommil.netlib.BLAS.{getInstance => blas} import org.apache.hadoop.fs.Path @@ -31,7 +33,7 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.Since import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} import org.apache.spark.internal.Logging -import org.apache.spark.mllib.linalg.BLAS +import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD @@ -286,40 +288,123 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { - val srcBlocks = blockify(srcFeatures) - val dstBlocks = blockify(dstFeatures) - val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) + val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() + val dstBlocks = blockify(rank, dstFeatures) + val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => + val m = srcIds.length + val n = dstIds.length + val dstIdMatrix = new Array[Int](m * num) + val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue) + val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + + val ratings = srcFactors.transpose.multiply(dstFactors) + var i = 0 + var j = 0 + while (i < m) { + var k = 0 + while (k < n) { + pq += dstIds(k) -> ratings(i, k) + k += 1 + } + var size = pq.size + while(size > 0) { + size -= 1 + val factor = pq.poll + dstIdMatrix(j + size) = factor._1 + scoreMatrix(j + size) = factor._2 + } + i += 1 + // pq.size maybe less than num, corner case + j += num + pq.clear + } + (index -> (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix))) + } + ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)( + (rateSum, rate) => { + mergeFunc(rateSum, rate, num) + }, + (rateSum1, rateSum2) => { + mergeFunc(rateSum1, rateSum2, num) + } + ).flatMap(value => { + // to avoid corner case that the number of items is less than recommendation num + var col: Int = 0 + while (col < num && value._2._3(0, col) > Double.MinValue) { + col += 1 + } + val row = value._2._3.numRows + val output = new Array[(Int, Array[(Int, Double)])](row) var i = 0 - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => - dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score + while (i < row) { + val factors = new Array[(Int, Double)](col) + var j = 0 + while (j < col) { + factors(j) = (value._2._2(i * num + j), value._2._3(i, j)) + j += 1 } - pq.foreach { case (dstId, score) => - output(i) = (srcId, (dstId, score)) - i += 1 + output(i) = (value._2._1(i), factors) + i += 1 + } + output.toSeq}) + } + + private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix), + rate: (Array[Int], Array[Int], DenseMatrix), + num: Int): (Array[Int], Array[Int], DenseMatrix) = { + if (rateSum._1 == null) { + rate + } else { + val row = rateSum._3.numRows + var i = 0 + val tempIdMatrix = new Array[Int](row * num) + val tempScoreMatrix = new Array[Double](row * num) + while (i < row) { + var j = 0 + var sum_index = 0 + var rate_index = 0 + while (j < num) { + if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) { + tempIdMatrix(i * num + j) = rate._2(i * num + rate_index) + tempScoreMatrix(i * num + j) = rate._3(i, rate_index) + rate_index += 1 + } else if (rate._3(i, rate_index) < rateSum._3(i, sum_index)) { + tempIdMatrix(i * num + j) = rateSum._2(i * num + sum_index) + tempScoreMatrix(i * num + j) = rateSum._3(i, sum_index) + sum_index += 1 + } + j += 1 } - pq.clear() + i += 1 } - output.toSeq + (rateSum._1, tempIdMatrix, new DenseMatrix(row, num, tempScoreMatrix)) } - ratings.topByKey(num)(Ordering.by(_._2)) } /** * Blockifies features to improve the efficiency of cartesian product * TODO: SPARK-20443 - expose blockSize as a param? */ - private def blockify( - features: RDD[(Int, Array[Double])], - blockSize: Int = 4096): RDD[Seq[(Int, Array[Double])]] = { + def blockify( + rank: Int, + features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = { + val blockSize = 2000 // TODO: tune the block size + val blockStorage = rank * blockSize features.mapPartitions { iter => - iter.grouped(blockSize) + iter.grouped(blockSize).map { grouped => + val ids = mutable.ArrayBuilder.make[Int] + ids.sizeHint(blockSize) + val factors = mutable.ArrayBuilder.make[Double] + factors.sizeHint(blockStorage) + var i = 0 + grouped.foreach { case (id, factor) => + ids += id + factors ++= factor + i += 1 + } + (ids.result(), new DenseMatrix(rank, i, factors.result())) + } } } From 7c587f4070c0951425d1686429816feb712c0273 Mon Sep 17 00:00:00 2001 From: Peng Meng Date: Thu, 13 Jul 2017 19:08:41 +0800 Subject: [PATCH 3/9] fix bug --- .../spark/mllib/recommendation/MatrixFactorizationModel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index ed0d79919498..6e2335fff9aa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -359,7 +359,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { val row = rateSum._3.numRows var i = 0 val tempIdMatrix = new Array[Int](row * num) - val tempScoreMatrix = new Array[Double](row * num) + val tempScoreMatrix = Array.fill[Double](row * num)(Double.MinValue) while (i < row) { var j = 0 var sum_index = 0 From e8a40edb25db8a6ecdfe67bd54f38071e7a99781 Mon Sep 17 00:00:00 2001 From: Peng Meng Date: Thu, 13 Jul 2017 19:56:50 +0800 Subject: [PATCH 4/9] code style change --- .../spark/mllib/recommendation/MatrixFactorizationModel.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 6e2335fff9aa..6469ace9dbbb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -347,7 +347,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { output(i) = (value._2._1(i), factors) i += 1 } - output.toSeq}) + output.toSeq + }) } private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix), From 963dae2695afb497c91ce60e64e91200b007e560 Mon Sep 17 00:00:00 2001 From: Peng Meng Date: Thu, 13 Jul 2017 21:57:41 +0800 Subject: [PATCH 5/9] Code style fix --- .../MatrixFactorizationModel.scala | 42 +++++++++---------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 6469ace9dbbb..9fd35f95e1b2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -295,7 +295,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { val m = srcIds.length val n = dstIds.length val dstIdMatrix = new Array[Int](m * num) - val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue) + val scoreMatrix = Array.fill[Double](m * num)(Double.NegativeInfinity) val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) val ratings = srcFactors.transpose.multiply(dstFactors) @@ -308,47 +308,42 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { k += 1 } var size = pq.size - while(size > 0) { + while (size > 0) { size -= 1 - val factor = pq.poll + val factor = pq.poll() dstIdMatrix(j + size) = factor._1 scoreMatrix(j + size) = factor._2 } i += 1 // pq.size maybe less than num, corner case j += num - pq.clear + pq.clear() } - (index -> (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix))) + index -> (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix)) } ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)( - (rateSum, rate) => { - mergeFunc(rateSum, rate, num) - }, - (rateSum1, rateSum2) => { - mergeFunc(rateSum1, rateSum2, num) - } - ).flatMap(value => { + (rateSum, rate) => mergeFunc(rateSum, rate, num), + (rateSum1, rateSum2) => mergeFunc(rateSum1, rateSum2, num) + ).flatMap { case (index, (srcIds, dstIdMatrix, scoreMatrix)) => // to avoid corner case that the number of items is less than recommendation num var col: Int = 0 - while (col < num && value._2._3(0, col) > Double.MinValue) { + while (col < num && scoreMatrix(0, col) > Double.NegativeInfinity) { col += 1 } - val row = value._2._3.numRows + val row = scoreMatrix.numRows val output = new Array[(Int, Array[(Int, Double)])](row) var i = 0 while (i < row) { val factors = new Array[(Int, Double)](col) var j = 0 while (j < col) { - factors(j) = (value._2._2(i * num + j), value._2._3(i, j)) + factors(j) = (dstIdMatrix(i * num + j), scoreMatrix(i, j)) j += 1 } - output(i) = (value._2._1(i), factors) + output(i) = (srcIds(i), factors) i += 1 } - output.toSeq - }) + output.toSeq} } private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix), @@ -360,19 +355,20 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { val row = rateSum._3.numRows var i = 0 val tempIdMatrix = new Array[Int](row * num) - val tempScoreMatrix = Array.fill[Double](row * num)(Double.MinValue) + val tempScoreMatrix = Array.fill[Double](row * num)(Double.NegativeInfinity) while (i < row) { var j = 0 var sum_index = 0 var rate_index = 0 + val matrixIndex = i * num while (j < num) { if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) { - tempIdMatrix(i * num + j) = rate._2(i * num + rate_index) - tempScoreMatrix(i * num + j) = rate._3(i, rate_index) + tempIdMatrix(matrixIndex + j) = rate._2(matrixIndex + rate_index) + tempScoreMatrix(matrixIndex + j) = rate._3(i, rate_index) rate_index += 1 } else if (rate._3(i, rate_index) < rateSum._3(i, sum_index)) { - tempIdMatrix(i * num + j) = rateSum._2(i * num + sum_index) - tempScoreMatrix(i * num + j) = rateSum._3(i, sum_index) + tempIdMatrix(matrixIndex + j) = rateSum._2(matrixIndex + sum_index) + tempScoreMatrix(matrixIndex + j) = rateSum._3(i, sum_index) sum_index += 1 } j += 1 From e5881f037db5b0cc66ca2e4eb74879d1bc9df9b1 Mon Sep 17 00:00:00 2001 From: Peng Meng Date: Fri, 14 Jul 2017 09:30:27 +0800 Subject: [PATCH 6/9] fix build error --- .../spark/mllib/recommendation/MatrixFactorizationModel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 9fd35f95e1b2..050c82d37757 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -319,7 +319,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { j += num pq.clear() } - index -> (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix)) + (index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix))) } ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)( (rateSum, rate) => mergeFunc(rateSum, rate, num), From 4a9b35031b006f5320fbfd6ed62a00e022e87b2e Mon Sep 17 00:00:00 2001 From: Peng Meng Date: Mon, 17 Jul 2017 11:18:06 +0800 Subject: [PATCH 7/9] fix DenseMatrix Store bugs --- .../mllib/recommendation/MatrixFactorizationModel.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 050c82d37757..e22b84195c0e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -319,7 +319,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { j += num pq.clear() } - (index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix))) + (index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix, true))) } ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)( (rateSum, rate) => mergeFunc(rateSum, rate, num), @@ -366,7 +366,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { tempIdMatrix(matrixIndex + j) = rate._2(matrixIndex + rate_index) tempScoreMatrix(matrixIndex + j) = rate._3(i, rate_index) rate_index += 1 - } else if (rate._3(i, rate_index) < rateSum._3(i, sum_index)) { + } else { tempIdMatrix(matrixIndex + j) = rateSum._2(matrixIndex + sum_index) tempScoreMatrix(matrixIndex + j) = rateSum._3(i, sum_index) sum_index += 1 @@ -375,7 +375,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { } i += 1 } - (rateSum._1, tempIdMatrix, new DenseMatrix(row, num, tempScoreMatrix)) + (rateSum._1, tempIdMatrix, new DenseMatrix(row, num, tempScoreMatrix, true)) } } From a5bf101f2db29e85d31e0a7ccf7b07e9b730455b Mon Sep 17 00:00:00 2001 From: Peng Meng Date: Mon, 17 Jul 2017 17:48:25 +0800 Subject: [PATCH 8/9] change poll to sortBy --- .../recommendation/MatrixFactorizationModel.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index e22b84195c0e..4f0bde2c6888 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -307,16 +307,15 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { pq += dstIds(k) -> ratings(i, k) k += 1 } - var size = pq.size - while (size > 0) { - size -= 1 - val factor = pq.poll() - dstIdMatrix(j + size) = factor._1 - scoreMatrix(j + size) = factor._2 + k = 0 + pq.toArray.sortBy(-_._2).foreach { case (id, score) => + dstIdMatrix(j + k) = id + scoreMatrix(j + k) = score + k += 1 } - i += 1 // pq.size maybe less than num, corner case j += num + i += 1 pq.clear() } (index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix, true))) From f36706ae46f4b0562b1d85627a526fcf642e913c Mon Sep 17 00:00:00 2001 From: Peng Meng Date: Tue, 12 Dec 2017 11:30:21 +0800 Subject: [PATCH 9/9] reset code --- .../scala/org/apache/spark/util/BoundedPriorityQueue.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala index eff0aa4453f0..1b2b1932e0c3 100644 --- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala +++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala @@ -51,10 +51,6 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin this } - def poll(): A = { - underlying.poll() - } - override def +=(elem1: A, elem2: A, elems: A*): this.type = { this += elem1 += elem2 ++= elems }