From f77904d3162f3cfddd86d7d54b47eab725e67626 Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Tue, 20 Dec 2016 13:30:24 -0500 Subject: [PATCH 1/7] [SPARK-16473][MLLIB] Fix BisectingKMeans Algorithm failing in edge case where no children exist in updateAssignments --- .../spark/mllib/clustering/BisectingKMeans.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala index 336f2fc114309..81709e88f7c3d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala @@ -339,10 +339,14 @@ private object BisectingKMeans extends Serializable { assignments.map { case (index, v) => if (divisibleIndices.contains(index)) { val children = Seq(leftChildIndex(index), rightChildIndex(index)) - val selected = children.minBy { child => - KMeans.fastSquaredDistance(newClusterCenters(child), v) + if (children.length > 0) { + val selected = children.minBy { child => + KMeans.fastSquaredDistance(newClusterCenters(child), v) + } + (selected, v) + } else { + (index, v) } - (selected, v) } else { (index, v) } From 55ab17945b299e6b376ea252b53185484cbf5a2d Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Wed, 21 Dec 2016 11:44:19 -0500 Subject: [PATCH 2/7] Updated to validate children in newClusterCenters based on comments --- .../org/apache/spark/mllib/clustering/BisectingKMeans.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala index 81709e88f7c3d..31bd694fa12fa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala @@ -339,8 +339,9 @@ private object BisectingKMeans extends Serializable { assignments.map { case (index, v) => if (divisibleIndices.contains(index)) { val children = Seq(leftChildIndex(index), rightChildIndex(index)) - if (children.length > 0) { - val selected = children.minBy { child => + val newClusterChildren = children.filter(newClusterCenters.contains(_)) + if (newClusterChildren.nonEmpty) { + val selected = newClusterChildren.minBy { child => KMeans.fastSquaredDistance(newClusterCenters(child), v) } (selected, v) From 75192a7f3ebb93152b91f0367cd51ff4dc8caa31 Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Wed, 28 Dec 2016 16:33:29 -0500 Subject: [PATCH 3/7] Adding test case as requested in review. Test case generates synthetic sparse data which can generate the exception user encountered. --- .../ml/clustering/BisectingKMeansSuite.scala | 17 +++++++++++++++++ .../spark/ml/clustering/KMeansSuite.scala | 12 ++++++++++++ 2 files changed, 29 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala index fc491cd6161fd..5e9d25faffb5a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala @@ -29,9 +29,12 @@ class BisectingKMeansSuite final val k = 5 @transient var dataset: Dataset[_] = _ + @transient var sparseDataset: Dataset[_] = _ + override def beforeAll(): Unit = { super.beforeAll() dataset = KMeansSuite.generateKMeansData(spark, 50, 3, k) + sparseDataset = KMeansSuite.generateSparseData(spark, 100, 1000, k, 42) } test("default parameters") { @@ -51,6 +54,20 @@ class BisectingKMeansSuite assert(copiedModel.hasSummary) } + test("SPARK-16473: Verify Bisecting K-Means does not fail in edge case where" + + "one cluster is empty after split") { + val bkm = new BisectingKMeans().setK(k).setMinDivisibleClusterSize(4).setMaxIter(4) + + assert(bkm.getK === k) + assert(bkm.getFeaturesCol === "features") + assert(bkm.getPredictionCol === "prediction") + assert(bkm.getMaxIter === 4) + assert(bkm.getMinDivisibleClusterSize === 4) + // Verify fit does not fail on very sparse data + val model = bkm.fit(sparseDataset) + assert(model.hasSummary) + } + test("setter/getter") { val bkm = new BisectingKMeans() .setK(9) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index c1b7242e11a8f..6915a34e6835a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.mllib.clustering.{KMeans => MLlibKMeans} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import scala.util.Random private[clustering] case class TestRow(features: Vector) @@ -160,6 +161,17 @@ object KMeansSuite { spark.createDataFrame(rdd) } + def generateSparseData(spark: SparkSession, rows: Int, dim: Int, k: Int, seed: Int): DataFrame = { + val sc = spark.sparkContext + val random = new Random(seed) + val nnz = random.nextInt(dim) + val rdd = sc.parallelize(1 to rows) + .map(i => Vectors.sparse(dim, random.shuffle(0 to dim - 1).slice(0, nnz).sorted.toArray, + Array.fill(nnz)(random.nextDouble()))) + .map(v => new TestRow(v)) + spark.createDataFrame(rdd) + } + /** * Mapping from all Params to valid settings which differ from the defaults. * This is useful for tests which need to exercise all Params, such as save/load. From 14c7ce3b3bdfbb71d840ad6a61f09b4b18a4d12b Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Wed, 28 Dec 2016 18:36:57 -0500 Subject: [PATCH 4/7] Fixing scala style and test case --- .../apache/spark/mllib/clustering/BisectingKMeans.scala | 8 ++++---- .../apache/spark/ml/clustering/BisectingKMeansSuite.scala | 3 +++ .../org/apache/spark/ml/clustering/KMeansSuite.scala | 3 ++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala index 31bd694fa12fa..ae98e24a75681 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala @@ -377,12 +377,12 @@ private object BisectingKMeans extends Serializable { internalIndex -= 1 val leftIndex = leftChildIndex(rawIndex) val rightIndex = rightChildIndex(rawIndex) - val height = math.sqrt(Seq(leftIndex, rightIndex).map { childIndex => + val indexes = Seq(leftIndex, rightIndex).filter(clusters.contains(_)) + val height = math.sqrt(indexes.map { childIndex => KMeans.fastSquaredDistance(center, clusters(childIndex).center) }.max) - val left = buildSubTree(leftIndex) - val right = buildSubTree(rightIndex) - new ClusteringTreeNode(index, size, center, cost, height, Array(left, right)) + val children = indexes.map(buildSubTree(_)).toArray + new ClusteringTreeNode(index, size, center, cost, height, children) } else { val index = leafIndex leafIndex += 1 diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala index 5e9d25faffb5a..8376e19854fea 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala @@ -66,6 +66,9 @@ class BisectingKMeansSuite // Verify fit does not fail on very sparse data val model = bkm.fit(sparseDataset) assert(model.hasSummary) + val result = model.transform(sparseDataset) + val numClusters = result.select("prediction").distinct().collect().length + assert(numClusters <= k && numClusters >= 1) } test("setter/getter") { diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index 6915a34e6835a..b20bbb3a516db 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml.clustering +import scala.util.Random + import org.apache.spark.SparkFunSuite import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap @@ -24,7 +26,6 @@ import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.mllib.clustering.{KMeans => MLlibKMeans} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} -import scala.util.Random private[clustering] case class TestRow(features: Vector) From a8cf2f0eb11090b2882e6308adc2432f4381ebd9 Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Fri, 6 Jan 2017 17:14:46 -0500 Subject: [PATCH 5/7] Updated code based on comments in review --- .../apache/spark/ml/clustering/BisectingKMeansSuite.scala | 8 +++----- .../org/apache/spark/ml/clustering/KMeansSuite.scala | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala index 8376e19854fea..0ccce37210980 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala @@ -34,7 +34,7 @@ class BisectingKMeansSuite override def beforeAll(): Unit = { super.beforeAll() dataset = KMeansSuite.generateKMeansData(spark, 50, 3, k) - sparseDataset = KMeansSuite.generateSparseData(spark, 100, 1000, k, 42) + sparseDataset = KMeansSuite.generateSparseData(spark, 10, 1000, k, 42) } test("default parameters") { @@ -59,16 +59,14 @@ class BisectingKMeansSuite val bkm = new BisectingKMeans().setK(k).setMinDivisibleClusterSize(4).setMaxIter(4) assert(bkm.getK === k) - assert(bkm.getFeaturesCol === "features") - assert(bkm.getPredictionCol === "prediction") assert(bkm.getMaxIter === 4) assert(bkm.getMinDivisibleClusterSize === 4) // Verify fit does not fail on very sparse data val model = bkm.fit(sparseDataset) - assert(model.hasSummary) val result = model.transform(sparseDataset) val numClusters = result.select("prediction").distinct().collect().length - assert(numClusters <= k && numClusters >= 1) + // Verify we hit the edge case + assert(numClusters < k && numClusters > 1) } test("setter/getter") { diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index b20bbb3a516db..91221f9815324 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -168,7 +168,7 @@ object KMeansSuite { val nnz = random.nextInt(dim) val rdd = sc.parallelize(1 to rows) .map(i => Vectors.sparse(dim, random.shuffle(0 to dim - 1).slice(0, nnz).sorted.toArray, - Array.fill(nnz)(random.nextDouble()))) + Array.fill(nnz)(random.nextInt(k).toDouble))) .map(v => new TestRow(v)) spark.createDataFrame(rdd) } From 138ab3478fb8b0f4f4569bb3b0e66c04d3d5cac1 Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Thu, 12 Jan 2017 23:53:25 -0500 Subject: [PATCH 6/7] Updated based on latest comments - removed k, removed setters verification --- .../apache/spark/ml/clustering/BisectingKMeansSuite.scala | 5 +---- .../scala/org/apache/spark/ml/clustering/KMeansSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala index 0ccce37210980..a6d0cd16557ff 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala @@ -34,7 +34,7 @@ class BisectingKMeansSuite override def beforeAll(): Unit = { super.beforeAll() dataset = KMeansSuite.generateKMeansData(spark, 50, 3, k) - sparseDataset = KMeansSuite.generateSparseData(spark, 10, 1000, k, 42) + sparseDataset = KMeansSuite.generateSparseData(spark, 10, 1000, 42) } test("default parameters") { @@ -58,9 +58,6 @@ class BisectingKMeansSuite "one cluster is empty after split") { val bkm = new BisectingKMeans().setK(k).setMinDivisibleClusterSize(4).setMaxIter(4) - assert(bkm.getK === k) - assert(bkm.getMaxIter === 4) - assert(bkm.getMinDivisibleClusterSize === 4) // Verify fit does not fail on very sparse data val model = bkm.fit(sparseDataset) val result = model.transform(sparseDataset) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index 91221f9815324..e10127f7d108f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -162,13 +162,13 @@ object KMeansSuite { spark.createDataFrame(rdd) } - def generateSparseData(spark: SparkSession, rows: Int, dim: Int, k: Int, seed: Int): DataFrame = { + def generateSparseData(spark: SparkSession, rows: Int, dim: Int, seed: Int): DataFrame = { val sc = spark.sparkContext val random = new Random(seed) val nnz = random.nextInt(dim) val rdd = sc.parallelize(1 to rows) .map(i => Vectors.sparse(dim, random.shuffle(0 to dim - 1).slice(0, nnz).sorted.toArray, - Array.fill(nnz)(random.nextInt(k).toDouble))) + Array.fill(nnz)(random.nextDouble()))) .map(v => new TestRow(v)) spark.createDataFrame(rdd) } From ba00ad057a1b63ffe609528d9700c0bdbd60abf6 Mon Sep 17 00:00:00 2001 From: Ilya Matiach Date: Tue, 17 Jan 2017 18:10:26 -0500 Subject: [PATCH 7/7] Added seed based on comment --- .../apache/spark/ml/clustering/BisectingKMeansSuite.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala index a6d0cd16557ff..30513c1e276ae 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala @@ -56,7 +56,11 @@ class BisectingKMeansSuite test("SPARK-16473: Verify Bisecting K-Means does not fail in edge case where" + "one cluster is empty after split") { - val bkm = new BisectingKMeans().setK(k).setMinDivisibleClusterSize(4).setMaxIter(4) + val bkm = new BisectingKMeans() + .setK(k) + .setMinDivisibleClusterSize(4) + .setMaxIter(4) + .setSeed(123) // Verify fit does not fail on very sparse data val model = bkm.fit(sparseDataset)