From 78e060e3455ecdc95fdedb6adccc0a375188e2d5 Mon Sep 17 00:00:00 2001 From: Peng Date: Mon, 24 Apr 2017 13:01:13 +0800 Subject: [PATCH 1/4] set ALS blockify size --- .../recommendation/MatrixFactorizationModel.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 23045fa2b6863..ca9ccf448e516 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -273,9 +273,10 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { rank: Int, srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], - num: Int): RDD[(Int, Array[(Int, Double)])] = { - val srcBlocks = blockify(rank, srcFeatures) - val dstBlocks = blockify(rank, dstFeatures) + num: Int, + blockSize: Int = 4096): RDD[(Int, Array[(Int, Double)])] = { + val srcBlocks = blockify(rank, srcFeatures, blockSize) + val dstBlocks = blockify(rank, dstFeatures, blockSize) val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case ((srcIds, srcFactors), (dstIds, dstFactors)) => val m = srcIds.length @@ -297,8 +298,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { */ private def blockify( rank: Int, - features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = { - val blockSize = 4096 // TODO: tune the block size + features: RDD[(Int, Array[Double])], + blockSize: Int): RDD[(Array[Int], DenseMatrix)] = { val blockStorage = rank * blockSize features.mapPartitions { iter => iter.grouped(blockSize).map { grouped => From b4e392ea249d37e91995e1d604a0d463567a7624 Mon Sep 17 00:00:00 2001 From: Peng Date: Mon, 24 Apr 2017 21:42:51 +0800 Subject: [PATCH 2/4] Add blockSize parameter --- .../MatrixFactorizationModel.scala | 16 +++++++++++++--- project/MimaExcludes.scala | 3 +++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index ca9ccf448e516..2c2452d4acd70 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -56,7 +56,8 @@ import org.apache.spark.storage.StorageLevel class MatrixFactorizationModel @Since("0.8.0") ( @Since("0.8.0") val rank: Int, @Since("0.8.0") val userFeatures: RDD[(Int, Array[Double])], - @Since("0.8.0") val productFeatures: RDD[(Int, Array[Double])]) + @Since("0.8.0") val productFeatures: RDD[(Int, Array[Double])], + @Since("2.3.0") var blockSize: Int = 4096) extends Saveable with Serializable with Logging { require(rank > 0) @@ -216,7 +217,8 @@ class MatrixFactorizationModel @Since("0.8.0") ( */ @Since("1.4.0") def recommendProductsForUsers(num: Int): RDD[(Int, Array[Rating])] = { - MatrixFactorizationModel.recommendForAll(rank, userFeatures, productFeatures, num).map { + MatrixFactorizationModel.recommendForAll(rank, userFeatures, productFeatures, num, blockSize) + .map { case (user, top) => val ratings = top.map { case (product, rating) => Rating(user, product, rating) } (user, ratings) @@ -234,12 +236,20 @@ class MatrixFactorizationModel @Since("0.8.0") ( */ @Since("1.4.0") def recommendUsersForProducts(num: Int): RDD[(Int, Array[Rating])] = { - MatrixFactorizationModel.recommendForAll(rank, productFeatures, userFeatures, num).map { + MatrixFactorizationModel.recommendForAll(rank, productFeatures, userFeatures, num, blockSize) + .map { case (product, top) => val ratings = top.map { case (user, rating) => Rating(user, product, rating) } (product, ratings) } } + + /** Sets blockSize, which will be used for recommendForAll. */ + @Since("2.3.0") + def setBlockSize(blockSize: Int): this.type = { + this.blockSize = blockSize + this + } } @Since("1.3.0") diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index feae76a087dec..a44eb9acffee5 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -999,6 +999,9 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.RandomForestClassificationModel.setFeatureSubsetStrategy"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.numTrees"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy") + ) ++ Seq( + // [SPARK-20443] set ALS blockify size + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.recommendation.MatrixFactorizationModel.this") ) } From cbd24021f0974afdded00c2667dffbfd5a05a909 Mon Sep 17 00:00:00 2001 From: Peng Date: Thu, 11 May 2017 12:53:22 +0800 Subject: [PATCH 3/4] add ML set blockSize --- .../apache/spark/ml/recommendation/ALS.scala | 25 +++++++++++++------ .../MatrixFactorizationModel.scala | 9 ++++--- .../spark/ml/recommendation/ALSSuite.scala | 4 +-- .../MatrixFactorizationModelSuite.scala | 4 +-- project/MimaExcludes.scala | 4 +++ 5 files changed, 32 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index d626f04599670..adf52d4981e4c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -261,13 +261,16 @@ private[recommendation] trait ALSParams extends ALSModelParams with HasMaxIter w * @param rank rank of the matrix factorization model * @param userFactors a DataFrame that stores user factors in two columns: `id` and `features` * @param itemFactors a DataFrame that stores item factors in two columns: `id` and `features` + * @param blockSize Number of records for each block, adjust this parameter to improve + * the efficiency of cartesian product */ @Since("1.3.0") class ALSModel private[ml] ( @Since("1.4.0") override val uid: String, @Since("1.4.0") val rank: Int, @transient val userFactors: DataFrame, - @transient val itemFactors: DataFrame) + @transient val itemFactors: DataFrame, + @Since("2.2.0") var blockSize: Int = 4096) extends Model[ALSModel] with ALSModelParams with MLWritable { /** @group setParam */ @@ -282,6 +285,13 @@ class ALSModel private[ml] ( @Since("1.3.0") def setPredictionCol(value: String): this.type = set(predictionCol, value) + /** @group setParam */ + @Since("2.2.0") + def setBlockSize(blockSize: Int): this.type = { + this.blockSize = blockSize + this + } + /** @group expertSetParam */ @Since("2.2.0") def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value) @@ -340,7 +350,7 @@ class ALSModel private[ml] ( */ @Since("2.2.0") def recommendForAllUsers(numItems: Int): DataFrame = { - recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems) + recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems, blockSize) } /** @@ -351,7 +361,7 @@ class ALSModel private[ml] ( */ @Since("2.2.0") def recommendForAllItems(numUsers: Int): DataFrame = { - recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers) + recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers, blockSize) } /** @@ -374,6 +384,7 @@ class ALSModel private[ml] ( * @param srcOutputColumn name of the column for the source ID in the output DataFrame * @param dstOutputColumn name of the column for the destination ID in the output DataFrame * @param num max number of recommendations for each record + * @param blockSize number of records for each block * @return a DataFrame of (srcOutputColumn: Int, recommendations), where recommendations are * stored as an array of (dstOutputColumn: Int, rating: Float) Rows. */ @@ -382,11 +393,12 @@ class ALSModel private[ml] ( dstFactors: DataFrame, srcOutputColumn: String, dstOutputColumn: String, - num: Int): DataFrame = { + num: Int, + blockSize: Int = 4096): DataFrame = { import srcFactors.sparkSession.implicits._ - val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])]) - val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])]) + val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize) + val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize) val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked) .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])] .flatMap { case (srcIter, dstIter) => @@ -437,7 +449,6 @@ class ALSModel private[ml] ( /** * Blockifies factors to improve the efficiency of cross join - * TODO: SPARK-20443 - expose blockSize as a param? */ private def blockify( factors: Dataset[(Int, Array[Float])], diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 7b1fecfc4de00..a10a60b258d66 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -49,6 +49,8 @@ import org.apache.spark.util.BoundedPriorityQueue * the features computed for this user. * @param productFeatures RDD of tuples where each tuple represents the productId * and the features computed for this product. + * @param blockSize Number of records for each block, adjust this parameter to improve + * the efficiency of cartesian product * * @note If you create the model directly using constructor, please be aware that fast prediction * requires cached user/product features and their associated partitioners. @@ -58,7 +60,7 @@ class MatrixFactorizationModel @Since("0.8.0") ( @Since("0.8.0") val rank: Int, @Since("0.8.0") val userFeatures: RDD[(Int, Array[Double])], @Since("0.8.0") val productFeatures: RDD[(Int, Array[Double])], - @Since("2.3.0") var blockSize: Int = 4096) + @Since("2.2.0") var blockSize: Int = 4096) extends Saveable with Serializable with Logging { require(rank > 0) @@ -277,6 +279,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { * @param srcFeatures src features to receive recommendations * @param dstFeatures dst features used to make recommendations * @param num number of recommendations for each record + * @param blockSize number of records for each block * @return an RDD of (srcId: Int, recommendations), where recommendations are stored as an array * of (dstId, rating) pairs. */ @@ -284,7 +287,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { rank: Int, srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], - num: Int): RDD[(Int, Array[(Int, Double)])] = { + num: Int, + blockSize: Int = 4096): RDD[(Int, Array[(Int, Double)])] = { val srcBlocks = blockify(srcFeatures) val dstBlocks = blockify(dstFeatures) /** @@ -336,7 +340,6 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { /** * Blockifies features to improve the efficiency of cartesian product - * TODO: SPARK-20443 - expose blockSize as a param? */ private def blockify( features: RDD[(Int, Array[Double])], diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 9d31e792633cd..faf5153ad7bed 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -684,7 +684,7 @@ class ALSSuite Seq(2, 4, 6).foreach { k => val n = math.min(k, numItems).toInt val expectedUpToN = expected.mapValues(_.slice(0, n)) - val topItems = model.recommendForAllUsers(k) + val topItems = model.setBlockSize(2048).recommendForAllUsers(k) assert(topItems.count() == numUsers) assert(topItems.columns.contains("user")) checkRecommendations(topItems, expectedUpToN, "item") @@ -705,7 +705,7 @@ class ALSSuite Seq(2, 3, 4).foreach { k => val n = math.min(k, numUsers).toInt val expectedUpToN = expected.mapValues(_.slice(0, n)) - val topUsers = getALSModel.recommendForAllItems(k) + val topUsers = getALSModel.setBlockSize(2048).recommendForAllItems(k) assert(topUsers.count() == numItems) assert(topUsers.columns.contains("item")) checkRecommendations(topUsers, expectedUpToN, "user") diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala index 2c8ed057a516a..9bce5c45044a9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala @@ -75,7 +75,7 @@ class MatrixFactorizationModelSuite extends SparkFunSuite with MLlibTestSparkCon test("batch predict API recommendProductsForUsers") { val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures) val topK = 10 - val recommendations = model.recommendProductsForUsers(topK).collectAsMap() + val recommendations = model.setBlockSize(2048).recommendProductsForUsers(topK).collectAsMap() assert(recommendations(0)(0).rating ~== 17.0 relTol 1e-14) assert(recommendations(1)(0).rating ~== 39.0 relTol 1e-14) @@ -84,7 +84,7 @@ class MatrixFactorizationModelSuite extends SparkFunSuite with MLlibTestSparkCon test("batch predict API recommendUsersForProducts") { val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures) val topK = 10 - val recommendations = model.recommendUsersForProducts(topK).collectAsMap() + val recommendations = model.setBlockSize(2048).recommendUsersForProducts(topK).collectAsMap() assert(recommendations(2)(0).user == 1) assert(recommendations(2)(0).rating ~== 39.0 relTol 1e-14) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 2dff154967428..196bc5f451ba5 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -1076,6 +1076,10 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setMinInstancesPerNode"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.util.MLWriter.context"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.util.MLReader.context") + ) ++ Seq( + // [SPARK-20443] The blockSize of MLLIB ALS should be setting by the User + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.recommendation.ALSModel.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.recommendation.MatrixFactorizationModel.this") ) } From ffe79b8cd94ebda58fd6afd51d19aa4990727e2b Mon Sep 17 00:00:00 2001 From: Peng Date: Thu, 11 May 2017 14:36:15 +0800 Subject: [PATCH 4/4] typo --- .../src/main/scala/org/apache/spark/ml/recommendation/ALS.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index adf52d4981e4c..9056e5fd678eb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -261,7 +261,7 @@ private[recommendation] trait ALSParams extends ALSModelParams with HasMaxIter w * @param rank rank of the matrix factorization model * @param userFactors a DataFrame that stores user factors in two columns: `id` and `features` * @param itemFactors a DataFrame that stores item factors in two columns: `id` and `features` - * @param blockSize Number of records for each block, adjust this parameter to improve + * @param blockSize number of records for each block, adjust this parameter to improve * the efficiency of cartesian product */ @Since("1.3.0")