From b80bb1f076d06b4d6cfb70d64a66f388d64fd24b Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Mon, 13 Jun 2016 12:47:42 -0700 Subject: [PATCH 01/26] add pic framework (model, class etc) --- .../clustering/PowerIterationClustering.scala | 214 ++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala new file mode 100644 index 0000000000000..a98b56bfad16f --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import org.apache.spark.SparkException +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util._ +import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} +import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.types.{IntegerType, StructType} + +/* + * Common params for PowerIterationClustering and PowerIterationClusteringModel + */ +private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter + with HasFeaturesCol with HasPredictionCol { + + /* + * The number of clusters to create (k). Must be > 1. Default: 2. + * @group param + */ + @Since("2.0.0") + final val k = new IntParam(this, "k", "The number of clusters to create. " + + "Must be > 1.", ParamValidators.gt(1)) + + /** @group getParam */ + @Since("2.0.0") + def getK: Int = $(k) + + @Since("2.0.0") + final val initMode = new Param[String](this, "initMode", "The initialization algorithm. " + + "Supported options: 'random' and 'degree'.", + (value: String) => validateInitMode(value)) + + private[spark] def validateInitMode(initMode: String): Boolean = { + initMode match { + case "random" => true + case "degree" => true + case _ => false + } + } + + /** @group expertGetParam */ + @Since("2.0.0") + def getInitMode: String = $(initMode) + + /** + * Validates and transforms the input schema. + * @param schema input schema + * @return output schema + */ + protected def validateAndTransformSchema(schema: StructType): StructType = { + SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT) + SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) + } +} + + +@Since("2.0.0") +@Experimental +class PowerIterationClusteringModel private[ml] ( + @Since("2.0.0") override val uid: String) + extends Model[PowerIterationClusteringModel] with PowerIterationClusteringParams with MLWritable { + @Since("2.0.0") + override def copy(extra: ParamMap): PowerIterationClusteringModel = { + val copied = new PowerIterationClusteringModel(uid) + copyValues(copied, extra).setParent(this.parent) + } + + @Since("2.0.0") + override def transform(dataset: Dataset[_]): DataFrame = { + val predictUDF = udf((vector: Vector) => predict(vector)) + dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) + } + + @Since("2.0.0") + override def transformSchema(schema: StructType): StructType = { + validateAndTransformSchema(schema) + } + + private[clustering] def predict(features: Vector): Int = ??? + + @Since("2.0.0") + override def write: MLWriter = + new PowerIterationClusteringModel.PowerIterationClusteringModelWriter(this) + + private var trainingSummary: Option[PowerIterationClusteringSummary] = None + + private[clustering] def setSummary(summary: PowerIterationClusteringSummary): this.type = { + this.trainingSummary = Some(summary) + this + } + + /** + * Return true if there exists summary of model. + */ + @Since("2.0.0") + def hasSummary: Boolean = trainingSummary.nonEmpty + + /** + * Gets summary of model on training set. An exception is + * thrown if `trainingSummary == None`. + */ + @Since("2.0.0") + def summary: PowerIterationClusteringSummary = trainingSummary.getOrElse { + throw new SparkException( + s"No training summary available for the ${this.getClass.getSimpleName}") + } +} + +@Since("2.0.0") +object PowerIterationClusteringModel extends MLReadable[PowerIterationClusteringModel] { + + @Since("2.0.0") + override def read: MLReader[PowerIterationClusteringModel] = + new PowerIterationClusteringModelReader() + + @Since("2.0.0") + override def load(path: String): PowerIterationClusteringModel = ??? + + /** [[MLWriter]] instance for [[KMeansModel]] */ + private[PowerIterationClusteringModel] class PowerIterationClusteringModelWriter + (instance: PowerIterationClusteringModel) extends MLWriter { + + override protected def saveImpl(path: String): Unit = ??? + } + + private class PowerIterationClusteringModelReader + extends MLReader[PowerIterationClusteringModel] { + + override def load(path: String): PowerIterationClusteringModel = ??? + } +} + +@Since("2.0.0") +@Experimental +class PowerIterationClustering @Since("2.0.0") ( + @Since("2.0.0") override val uid: String) + extends Estimator[PowerIterationClusteringModel] with PowerIterationClusteringParams + with DefaultParamsWritable { + + setDefault( + k -> 2, + maxIter -> 20, + initMode -> "random") + + @Since("2.0.0") + override def copy(extra: ParamMap): PowerIterationClustering = defaultCopy(extra) + + @Since("2.0.0") + def this() = this(Identifiable.randomUID("PowerIterationClustering")) + + /** @group setParam */ + @Since("2.0.0") + def setFeaturesCol(value: String): this.type = set(featuresCol, value) + + /** @group setParam */ + @Since("2.0.0") + def setPredictionCol(value: String): this.type = set(predictionCol, value) + + /** @group setParam */ + @Since("2.0.0") + def setK(value: Int): this.type = set(k, value) + + /** @group expertSetParam */ + @Since("2.0.0") + def setInitMode(value: String): this.type = set(initMode, value) + + /** @group setParam */ + @Since("2.0.0") + def setMaxIter(value: Int): this.type = set(maxIter, value) + + @Since("2.0.0") + override def fit(dataset: Dataset[_]): PowerIterationClusteringModel = ??? + + @Since("2.0.0") + override def transformSchema(schema: StructType): StructType = { + validateAndTransformSchema(schema) + } +} + +@Since("2.0.0") +object PowerIterationClustering extends DefaultParamsReadable[PowerIterationClustering] { + + @Since("2.0.0") + override def load(path: String): PowerIterationClustering = super.load(path) +} + +@Since("2.0.0") +@Experimental +class PowerIterationClusteringSummary private[clustering] () extends Serializable { + +} From 75004e8e4416d807ee9a2ddfb786da568342688c Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Mon, 13 Jun 2016 16:28:09 -0700 Subject: [PATCH 02/26] change a comment --- .../apache/spark/ml/clustering/PowerIterationClustering.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index a98b56bfad16f..1b65e31f55d37 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -139,7 +139,7 @@ object PowerIterationClusteringModel extends MLReadable[PowerIterationClustering @Since("2.0.0") override def load(path: String): PowerIterationClusteringModel = ??? - /** [[MLWriter]] instance for [[KMeansModel]] */ + /** [[MLWriter]] instance for [[PowerIterationClusteringModel]] */ private[PowerIterationClusteringModel] class PowerIterationClusteringModelWriter (instance: PowerIterationClusteringModel) extends MLWriter { From e1d9a3320336ff8a54f4ea441c6587431ab9e81c Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Fri, 17 Jun 2016 10:27:55 -0700 Subject: [PATCH 03/26] add missing functions fit predict load save etc. --- .../clustering/PowerIterationClustering.scala | 106 ++++++++++++++---- .../clustering/PowerIterationClustering.scala | 2 +- 2 files changed, 85 insertions(+), 23 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 1b65e31f55d37..428671eff1b3c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -24,11 +24,13 @@ import org.apache.spark.ml.linalg.{Vector, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ -import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} -import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPowerIterationClustering} +import org.apache.spark.mllib.clustering.{PowerIterationClusteringModel => MLlibPowerIterationClusteringModel} +import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions.{col, udf} -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} /* * Common params for PowerIterationClustering and PowerIterationClusteringModel @@ -36,7 +38,7 @@ import org.apache.spark.sql.types.{IntegerType, StructType} private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter with HasFeaturesCol with HasPredictionCol { - /* + /** * The number of clusters to create (k). Must be > 1. Default: 2. * @group param */ @@ -66,10 +68,10 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has def getInitMode: String = $(initMode) /** - * Validates and transforms the input schema. - * @param schema input schema - * @return output schema - */ + * Validates and transforms the input schema. + * @param schema input schema + * @return output schema + */ protected def validateAndTransformSchema(schema: StructType): StructType = { SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT) SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) @@ -80,18 +82,33 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has @Since("2.0.0") @Experimental class PowerIterationClusteringModel private[ml] ( - @Since("2.0.0") override val uid: String) + @Since("2.0.0") override val uid: String, + private val parentModel: MLlibPowerIterationClusteringModel) extends Model[PowerIterationClusteringModel] with PowerIterationClusteringParams with MLWritable { + @Since("2.0.0") override def copy(extra: ParamMap): PowerIterationClusteringModel = { - val copied = new PowerIterationClusteringModel(uid) + val copied = new PowerIterationClusteringModel(uid, parentModel) copyValues(copied, extra).setParent(this.parent) } + def assignments: RDD[Assignment] = parentModel.assignments + + /** @group setParam */ + @Since("2.0.0") + def saveK(value: Int): this.type = set(k, value) + + /** @group expertSetParam */ + @Since("2.0.0") + def saveInitMode(value: String): this.type = set(initMode, value) + + /** @group setParam */ + @Since("2.0.0") + def saveMaxIter(value: Int): this.type = set(maxIter, value) + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { - val predictUDF = udf((vector: Vector) => predict(vector)) - dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) + predict(dataset) } @Since("2.0.0") @@ -99,7 +116,22 @@ class PowerIterationClusteringModel private[ml] ( validateAndTransformSchema(schema) } - private[clustering] def predict(features: Vector): Int = ??? + private[clustering] def predict(features: Dataset[_]): DataFrame = { + val sparkSession = features.sparkSession + val powerIterationClustering = new PowerIterationClustering().setK($(k)) + .setInitMode($(initMode)) + .setMaxIter($(maxIter)) + val model = powerIterationClustering.fit(features) + model.saveK($(k)) + .saveInitMode($(initMode)) + .saveMaxIter($(maxIter)) + val rows: RDD[Row] = model.assignments.map { + case assignment: Assignment => Row(assignment.cluster) + } + val schema = new StructType(Array(StructField("cluster", IntegerType))) + val predict = sparkSession.createDataFrame(rows, schema) + features.withColumn($(predictionCol), predict.col("cluster")) + } @Since("2.0.0") override def write: MLWriter = @@ -113,15 +145,15 @@ class PowerIterationClusteringModel private[ml] ( } /** - * Return true if there exists summary of model. - */ + * Return true if there exists summary of model. + */ @Since("2.0.0") def hasSummary: Boolean = trainingSummary.nonEmpty /** - * Gets summary of model on training set. An exception is - * thrown if `trainingSummary == None`. - */ + * Gets summary of model on training set. An exception is + * thrown if `trainingSummary == None`. + */ @Since("2.0.0") def summary: PowerIterationClusteringSummary = trainingSummary.getOrElse { throw new SparkException( @@ -137,19 +169,34 @@ object PowerIterationClusteringModel extends MLReadable[PowerIterationClustering new PowerIterationClusteringModelReader() @Since("2.0.0") - override def load(path: String): PowerIterationClusteringModel = ??? + override def load(path: String): PowerIterationClusteringModel = super.load(path) /** [[MLWriter]] instance for [[PowerIterationClusteringModel]] */ private[PowerIterationClusteringModel] class PowerIterationClusteringModelWriter (instance: PowerIterationClusteringModel) extends MLWriter { - override protected def saveImpl(path: String): Unit = ??? + override protected def saveImpl(path: String): Unit = { + // Save metadata and Params + DefaultParamsWriter.saveMetadata(instance, path, sc) + MLlibPowerIterationClusteringModel.SaveLoadV1_0.save(sc, instance.parentModel, path) + } } private class PowerIterationClusteringModelReader extends MLReader[PowerIterationClusteringModel] { - override def load(path: String): PowerIterationClusteringModel = ??? + /** Checked against metadata when loading model */ + private val className = classOf[PowerIterationClusteringModel].getName + + override def load(path: String): PowerIterationClusteringModel = { + + val metadata = DefaultParamsReader.loadMetadata(path, sc, className) + val parentModel = MLlibPowerIterationClusteringModel.SaveLoadV1_0.load(sc, path) + + val model = new PowerIterationClusteringModel(metadata.uid, parentModel) + DefaultParamsReader.getAndSetParams(model, metadata) + model + } } } @@ -192,7 +239,22 @@ class PowerIterationClustering @Since("2.0.0") ( def setMaxIter(value: Int): this.type = set(maxIter, value) @Since("2.0.0") - override def fit(dataset: Dataset[_]): PowerIterationClusteringModel = ??? + override def fit(dataset: Dataset[_]): PowerIterationClusteringModel = { + val rdd: RDD[(Long, Long, Double)] = dataset.select(col($(featuresCol))).rdd.map { + case Row(point: Vector) => point.asInstanceOf[(Long, Long, Double)] + } + + val algo = new MLlibPowerIterationClustering() + .setK($(k)) + .setInitializationMode($(initMode)) + .setMaxIterations($(maxIter)) + val parentModel = algo.run(rdd) + val model = copyValues(new PowerIterationClusteringModel(uid, parentModel).setParent(this)) + model.saveK($(k)) + .saveInitMode($(initMode)) + .saveMaxIter($(maxIter)) + model + } @Since("2.0.0") override def transformSchema(schema: StructType): StructType = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index b2437b845f826..71047326a9709 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -60,7 +60,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode PowerIterationClusteringModel.SaveLoadV1_0.load(sc, path) } - private[clustering] + private[spark] object SaveLoadV1_0 { private val thisFormatVersion = "1.0" From f8343e08818d22bdd6735e2e58cede0581cb4fec Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Fri, 17 Jun 2016 18:12:41 -0700 Subject: [PATCH 04/26] add unit test flie --- .../PowerIterationClusteringSuite.scala | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala new file mode 100644 index 0000000000000..aadca3e044336 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.clustering + +import scala.collection.mutable +import scala.util.Random +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.graphx.{Edge, Graph} +import org.apache.spark.ml.util.DefaultReadWriteTest +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ +import org.apache.spark.util.Utils + +class PowerIterationClusteringSuite extends SparkFunSuite + with MLlibTestSparkContext with DefaultReadWriteTest { + + import org.apache.spark.ml.clustering.PowerIterationClustering._ + + +} \ No newline at end of file From c62a2c02bbc5d4544078e40cc7471b41e729c1fc Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Mon, 20 Jun 2016 10:35:05 -0700 Subject: [PATCH 05/26] add test cases part 1 --- .../PowerIterationClusteringSuite.scala | 50 +++++++++++++++++-- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala index aadca3e044336..5e152bd46a0cb 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala @@ -17,9 +17,7 @@ package org.apache.spark.ml.clustering -import scala.collection.mutable -import scala.util.Random -import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.SparkFunSuite import org.apache.spark.graphx.{Edge, Graph} import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -31,5 +29,51 @@ class PowerIterationClusteringSuite extends SparkFunSuite import org.apache.spark.ml.clustering.PowerIterationClustering._ + /** Generates a circle of points. */ + private def genCircle(r: Double, n: Int): Array[(Double, Double)] = { + Array.tabulate(n) { i => + val theta = 2.0 * math.Pi * i / n + (r * math.cos(theta), r * math.sin(theta)) + } + } + /** Computes Gaussian similarity. */ + private def sim(x: (Double, Double), y: (Double, Double)): Double = { + val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2) + math.exp(-dist2 / 2.0) + } + + test("default parameters") { + val pic = new PowerIterationClustering() + + assert(pic.getK === 2) + assert(pic.getMaxIter === 20) + assert(pic.getInitMode === "random") + assert(pic.getFeaturesCol === "features") + assert(pic.getPredictionCol === "prediction") + } + + test("set parameters") { + val pic = new PowerIterationClustering() + .setK(9) + .setMaxIter(33) + .setInitMode("degree") + .setFeaturesCol("test_feature") + .setPredictionCol("test_prediction") + + assert(pic.getK === 9) + assert(pic.getMaxIter === 33) + assert(pic.getInitMode === "degree") + assert(pic.getFeaturesCol === "test_feature") + assert(pic.getPredictionCol === "test_prediction") + } + + test("parameters validation") { + intercept[IllegalArgumentException] { + new PowerIterationClustering().setK(1) + } + intercept[IllegalArgumentException] { + new PowerIterationClustering().setInitMode("no_such_a_mode") + } + } } \ No newline at end of file From 1277f75ae5c39e8024300f266fda175a0f0753ce Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Mon, 20 Jun 2016 13:29:54 -0700 Subject: [PATCH 06/26] add unit test part 2: test fit, parameters etc. --- .../clustering/PowerIterationClustering.scala | 5 +- .../PowerIterationClusteringSuite.scala | 46 +++++++++++++++++-- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 428671eff1b3c..e818d71b66ea8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -241,7 +241,10 @@ class PowerIterationClustering @Since("2.0.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]): PowerIterationClusteringModel = { val rdd: RDD[(Long, Long, Double)] = dataset.select(col($(featuresCol))).rdd.map { - case Row(point: Vector) => point.asInstanceOf[(Long, Long, Double)] + case Row(point: Vector) => + val array = point.toArray + require(array.size == 3, "The number of elements in each row must be 3.") + (array(0).toLong, array(1).toLong, array(2)) } val algo = new MLlibPowerIterationClustering() diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala index 5e152bd46a0cb..b6e95d7358699 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala @@ -17,12 +17,12 @@ package org.apache.spark.ml.clustering +import scala.collection.mutable + import org.apache.spark.SparkFunSuite -import org.apache.spark.graphx.{Edge, Graph} +import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.mllib.util.TestingUtils._ -import org.apache.spark.util.Utils class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { @@ -76,4 +76,44 @@ class PowerIterationClusteringSuite extends SparkFunSuite new PowerIterationClustering().setInitMode("no_such_a_mode") } } + + test("power iteration clustering") { + // Generate two circles following the example in the PIC paper. + val r1 = 1.0 + val n1 = 10 + val r2 = 4.0 + val n2 = 40 + val n = n1 + n2 + val points = genCircle(r1, n1) ++ genCircle(r2, n2) + val similarities = for (i <- 1 until n; j <- 0 until i) yield { + (i.toLong, j.toLong, sim(points(i), points(j))) + } + + val sc = spark.sparkContext + val rdd = sc.parallelize(similarities) + .map{case (i: Long, j: Long, sim: Double) => Vectors.dense(Array(i, j, sim))} + .map(v => TestRow(v)) + val data = spark.createDataFrame(rdd) + + val model = new PowerIterationClustering() + .setK(2) + .setMaxIter(40) + .fit(data) + val predictions = Array.fill(2)(mutable.Set.empty[Long]) + model.assignments.collect().foreach { a => + predictions(a.cluster) += a.id + } + assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet)) + + val model2 = new PowerIterationClustering() + .setK(2) + .setMaxIter(10) + .setInitMode("degree") + .fit(data) + val predictions2 = Array.fill(2)(mutable.Set.empty[Long]) + model2.assignments.collect().foreach { a => + predictions2(a.cluster) += a.id + } + assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet)) + } } \ No newline at end of file From f50873dbbc8bddd9b84f57cdd94f84d360099c12 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Mon, 20 Jun 2016 14:22:59 -0700 Subject: [PATCH 07/26] fix a type issue --- .../spark/ml/clustering/PowerIterationClusteringSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala index b6e95d7358699..868c154b345af 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala @@ -116,4 +116,4 @@ class PowerIterationClusteringSuite extends SparkFunSuite } assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet)) } -} \ No newline at end of file +} From 88a9ae06c3353a57f820a3804cff71485e069d4c Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Tue, 21 Jun 2016 13:07:27 -0700 Subject: [PATCH 08/26] add more unit tests --- .../clustering/PowerIterationClustering.scala | 23 ++-- .../clustering/PowerIterationClustering.scala | 2 +- .../PowerIterationClusteringSuite.scala | 113 +++++++++++++----- 3 files changed, 98 insertions(+), 40 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index e818d71b66ea8..91e7d101014bd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml.clustering +import org.apache.hadoop.fs.Path + import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model} @@ -30,9 +32,9 @@ import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions.{col, udf} -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} -/* +/** * Common params for PowerIterationClustering and PowerIterationClusteringModel */ private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter @@ -78,7 +80,6 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has } } - @Since("2.0.0") @Experimental class PowerIterationClusteringModel private[ml] ( @@ -126,11 +127,11 @@ class PowerIterationClusteringModel private[ml] ( .saveInitMode($(initMode)) .saveMaxIter($(maxIter)) val rows: RDD[Row] = model.assignments.map { - case assignment: Assignment => Row(assignment.cluster) + case assignment: Assignment => Row(assignment.id, assignment.cluster) } - val schema = new StructType(Array(StructField("cluster", IntegerType))) - val predict = sparkSession.createDataFrame(rows, schema) - features.withColumn($(predictionCol), predict.col("cluster")) + val schema = new StructType(Array(StructField($(featuresCol), LongType), + StructField($(predictionCol), IntegerType))) + sparkSession.createDataFrame(rows, schema) } @Since("2.0.0") @@ -178,7 +179,8 @@ object PowerIterationClusteringModel extends MLReadable[PowerIterationClustering override protected def saveImpl(path: String): Unit = { // Save metadata and Params DefaultParamsWriter.saveMetadata(instance, path, sc) - MLlibPowerIterationClusteringModel.SaveLoadV1_0.save(sc, instance.parentModel, path) + val dataPath = new Path(path, "data").toString + instance.parentModel.save(sc, dataPath) } } @@ -189,10 +191,9 @@ object PowerIterationClusteringModel extends MLReadable[PowerIterationClustering private val className = classOf[PowerIterationClusteringModel].getName override def load(path: String): PowerIterationClusteringModel = { - val metadata = DefaultParamsReader.loadMetadata(path, sc, className) - val parentModel = MLlibPowerIterationClusteringModel.SaveLoadV1_0.load(sc, path) - + val dataPath = new Path(path, "data").toString + val parentModel = MLlibPowerIterationClusteringModel.load(sc, dataPath) val model = new PowerIterationClusteringModel(metadata.uid, parentModel) DefaultParamsReader.getAndSetParams(model, metadata) model diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index 71047326a9709..b2437b845f826 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -60,7 +60,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode PowerIterationClusteringModel.SaveLoadV1_0.load(sc, path) } - private[spark] + private[clustering] object SaveLoadV1_0 { private val thisFormatVersion = "1.0" diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala index 868c154b345af..bb2f74fb67097 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala @@ -23,24 +23,21 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { - import org.apache.spark.ml.clustering.PowerIterationClustering._ + @transient var data: Dataset[_] = _ + final val r1 = 1.0 + final val n1 = 10 + final val r2 = 4.0 + final val n2 = 40 - /** Generates a circle of points. */ - private def genCircle(r: Double, n: Int): Array[(Double, Double)] = { - Array.tabulate(n) { i => - val theta = 2.0 * math.Pi * i / n - (r * math.cos(theta), r * math.sin(theta)) - } - } + override def beforeAll(): Unit = { + super.beforeAll() - /** Computes Gaussian similarity. */ - private def sim(x: (Double, Double), y: (Double, Double)): Double = { - val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2) - math.exp(-dist2 / 2.0) + data = PowerIterationClusteringSuite.generatePICData(spark, r1, r2, n1, n2) } test("default parameters") { @@ -78,23 +75,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite } test("power iteration clustering") { - // Generate two circles following the example in the PIC paper. - val r1 = 1.0 - val n1 = 10 - val r2 = 4.0 - val n2 = 40 val n = n1 + n2 - val points = genCircle(r1, n1) ++ genCircle(r2, n2) - val similarities = for (i <- 1 until n; j <- 0 until i) yield { - (i.toLong, j.toLong, sim(points(i), points(j))) - } - - val sc = spark.sparkContext - val rdd = sc.parallelize(similarities) - .map{case (i: Long, j: Long, sim: Double) => Vectors.dense(Array(i, j, sim))} - .map(v => TestRow(v)) - val data = spark.createDataFrame(rdd) - val model = new PowerIterationClustering() .setK(2) .setMaxIter(40) @@ -116,4 +97,80 @@ class PowerIterationClusteringSuite extends SparkFunSuite } assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet)) } + + test("transform") { + val predictionColName = "pic_prediction" + val model = new PowerIterationClustering() + .setK(2) + .setMaxIter(10) + .setPredictionCol(predictionColName) + .fit(data) + + val transformed = model.transform(data) + val expectedColumns = Array("features", predictionColName) + expectedColumns.foreach { column => + assert(transformed.columns.contains(column)) + } + } + + test("read/write") { + def checkModelData(model: PowerIterationClusteringModel, + model2: PowerIterationClusteringModel): Unit = { + assert(model.getK === model2.getK) + val modelAssignments = + model.assignments.map(x => (x.id, x.cluster)) + val model2Assignments = + model2.assignments.map(x => (x.id, x.cluster)) + val unequalElements = modelAssignments.join(model2Assignments).filter { + case (id, (c1, c2)) => c1 != c2 }.count() + assert(unequalElements === 0L) + } + val pic = new PowerIterationClustering() + testEstimatorAndModelReadWrite(pic, data, PowerIterationClusteringSuite.allParamSettings, + checkModelData) + } +} + +object PowerIterationClusteringSuite { + + /** Generates a circle of points. */ + private def genCircle(r: Double, n: Int): Array[(Double, Double)] = { + Array.tabulate(n) { i => + val theta = 2.0 * math.Pi * i / n + (r * math.cos(theta), r * math.sin(theta)) + } + } + + /** Computes Gaussian similarity. */ + private def sim(x: (Double, Double), y: (Double, Double)): Double = { + val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2) + math.exp(-dist2 / 2.0) + } + + def generatePICData(spark: SparkSession, r1: Double, r2: Double, + n1: Int, n2: Int): DataFrame = { + // Generate two circles following the example in the PIC paper. + val n = n1 + n2 + val points = genCircle(r1, n1) ++ genCircle(r2, n2) + val similarities = for (i <- 1 until n; j <- 0 until i) yield { + (i.toLong, j.toLong, sim(points(i), points(j))) + } + val sc = spark.sparkContext + val rdd = sc.parallelize(similarities) + .map{case (i: Long, j: Long, sim: Double) => Vectors.dense(Array(i, j, sim))} + .map(v => TestRow(v)) + spark.createDataFrame(rdd) + } + + /** + * Mapping from all Params to valid settings which differ from the defaults. + * This is useful for tests which need to exercise all Params, such as save/load. + * This excludes input columns to simplify some tests. + */ + val allParamSettings: Map[String, Any] = Map( + "predictionCol" -> "myPrediction", + "k" -> 2, + "maxIter" -> 10, + "initMode" -> "random" + ) } From 06188153613f63b624f77c1c060434010394dc22 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Tue, 21 Jun 2016 14:46:25 -0700 Subject: [PATCH 09/26] delete unused import and add comments --- .../clustering/PowerIterationClustering.scala | 51 ++++++++----------- 1 file changed, 21 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 91e7d101014bd..4bf85023f4241 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -19,7 +19,6 @@ package org.apache.spark.ml.clustering import org.apache.hadoop.fs.Path -import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.linalg.{Vector, VectorUDT} @@ -31,7 +30,7 @@ import org.apache.spark.mllib.clustering.{PowerIterationClusteringModel => MLlib import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} -import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.functions.{col} import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} /** @@ -52,6 +51,10 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has @Since("2.0.0") def getK: Int = $(k) + /** + * Param for the initialization algorithm. This can be either "random" to use a random vector + * as vertex properties, or "degree" to use normalized sum similarities. Default: random. + */ @Since("2.0.0") final val initMode = new Param[String](this, "initMode", "The initialization algorithm. " + "Supported options: 'random' and 'degree'.", @@ -80,6 +83,11 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has } } +/** + * :: Experimental :: + * Model fitted by PowerIterationClustering. + * @param parentModel a model trained by spark.mllib.clustering.PowerIterationClustering. + */ @Since("2.0.0") @Experimental class PowerIterationClusteringModel private[ml] ( @@ -134,32 +142,12 @@ class PowerIterationClusteringModel private[ml] ( sparkSession.createDataFrame(rows, schema) } - @Since("2.0.0") - override def write: MLWriter = - new PowerIterationClusteringModel.PowerIterationClusteringModelWriter(this) - - private var trainingSummary: Option[PowerIterationClusteringSummary] = None - - private[clustering] def setSummary(summary: PowerIterationClusteringSummary): this.type = { - this.trainingSummary = Some(summary) - this - } - /** - * Return true if there exists summary of model. + * Returns a [[org.apache.spark.ml.util.MLWriter]] instance for this ML instance. */ @Since("2.0.0") - def hasSummary: Boolean = trainingSummary.nonEmpty - - /** - * Gets summary of model on training set. An exception is - * thrown if `trainingSummary == None`. - */ - @Since("2.0.0") - def summary: PowerIterationClusteringSummary = trainingSummary.getOrElse { - throw new SparkException( - s"No training summary available for the ${this.getClass.getSimpleName}") - } + override def write: MLWriter = + new PowerIterationClusteringModel.PowerIterationClusteringModelWriter(this) } @Since("2.0.0") @@ -201,6 +189,14 @@ object PowerIterationClusteringModel extends MLReadable[PowerIterationClustering } } +/** + * Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by + * [[http://www.icml2010.org/papers/387.pdf Lin and Cohen]]. From the abstract: PIC finds a very + * low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise + * similarity matrix of the data. + * + * @see [[http://en.wikipedia.org/wiki/Spectral_clustering Spectral clustering (Wikipedia)]] + */ @Since("2.0.0") @Experimental class PowerIterationClustering @Since("2.0.0") ( @@ -273,8 +269,3 @@ object PowerIterationClustering extends DefaultParamsReadable[PowerIterationClus override def load(path: String): PowerIterationClustering = super.load(path) } -@Since("2.0.0") -@Experimental -class PowerIterationClusteringSummary private[clustering] () extends Serializable { - -} From 04fddbd9c7c79c7a929580cdba87ee1c5a3dfd38 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Tue, 25 Oct 2016 14:28:12 -0700 Subject: [PATCH 10/26] change version to 2.1.0 --- .../clustering/PowerIterationClustering.scala | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 4bf85023f4241..cc3c6ce447624 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -43,19 +43,19 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has * The number of clusters to create (k). Must be > 1. Default: 2. * @group param */ - @Since("2.0.0") + @Since("2.1.0") final val k = new IntParam(this, "k", "The number of clusters to create. " + "Must be > 1.", ParamValidators.gt(1)) /** @group getParam */ - @Since("2.0.0") + @Since("2.1.0") def getK: Int = $(k) /** * Param for the initialization algorithm. This can be either "random" to use a random vector * as vertex properties, or "degree" to use normalized sum similarities. Default: random. */ - @Since("2.0.0") + @Since("2.1.0") final val initMode = new Param[String](this, "initMode", "The initialization algorithm. " + "Supported options: 'random' and 'degree'.", (value: String) => validateInitMode(value)) @@ -69,7 +69,7 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has } /** @group expertGetParam */ - @Since("2.0.0") + @Since("2.1.0") def getInitMode: String = $(initMode) /** @@ -88,14 +88,14 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has * Model fitted by PowerIterationClustering. * @param parentModel a model trained by spark.mllib.clustering.PowerIterationClustering. */ -@Since("2.0.0") +@Since("2.1.0") @Experimental class PowerIterationClusteringModel private[ml] ( - @Since("2.0.0") override val uid: String, + @Since("2.1.0") override val uid: String, private val parentModel: MLlibPowerIterationClusteringModel) extends Model[PowerIterationClusteringModel] with PowerIterationClusteringParams with MLWritable { - @Since("2.0.0") + @Since("2.1.0") override def copy(extra: ParamMap): PowerIterationClusteringModel = { val copied = new PowerIterationClusteringModel(uid, parentModel) copyValues(copied, extra).setParent(this.parent) @@ -104,23 +104,23 @@ class PowerIterationClusteringModel private[ml] ( def assignments: RDD[Assignment] = parentModel.assignments /** @group setParam */ - @Since("2.0.0") + @Since("2.1.0") def saveK(value: Int): this.type = set(k, value) /** @group expertSetParam */ - @Since("2.0.0") + @Since("2.1.0") def saveInitMode(value: String): this.type = set(initMode, value) /** @group setParam */ - @Since("2.0.0") + @Since("2.1.0") def saveMaxIter(value: Int): this.type = set(maxIter, value) - @Since("2.0.0") + @Since("2.1.0") override def transform(dataset: Dataset[_]): DataFrame = { predict(dataset) } - @Since("2.0.0") + @Since("2.1.0") override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema) } @@ -145,19 +145,19 @@ class PowerIterationClusteringModel private[ml] ( /** * Returns a [[org.apache.spark.ml.util.MLWriter]] instance for this ML instance. */ - @Since("2.0.0") + @Since("2.1.0") override def write: MLWriter = new PowerIterationClusteringModel.PowerIterationClusteringModelWriter(this) } -@Since("2.0.0") +@Since("2.1.0") object PowerIterationClusteringModel extends MLReadable[PowerIterationClusteringModel] { - @Since("2.0.0") + @Since("2.1.0") override def read: MLReader[PowerIterationClusteringModel] = new PowerIterationClusteringModelReader() - @Since("2.0.0") + @Since("2.1.0") override def load(path: String): PowerIterationClusteringModel = super.load(path) /** [[MLWriter]] instance for [[PowerIterationClusteringModel]] */ @@ -197,10 +197,10 @@ object PowerIterationClusteringModel extends MLReadable[PowerIterationClustering * * @see [[http://en.wikipedia.org/wiki/Spectral_clustering Spectral clustering (Wikipedia)]] */ -@Since("2.0.0") +@Since("2.1.0") @Experimental -class PowerIterationClustering @Since("2.0.0") ( - @Since("2.0.0") override val uid: String) +class PowerIterationClustering @Since("2.1.0") ( + @Since("2.1.0") override val uid: String) extends Estimator[PowerIterationClusteringModel] with PowerIterationClusteringParams with DefaultParamsWritable { @@ -209,33 +209,33 @@ class PowerIterationClustering @Since("2.0.0") ( maxIter -> 20, initMode -> "random") - @Since("2.0.0") + @Since("2.1.0") override def copy(extra: ParamMap): PowerIterationClustering = defaultCopy(extra) - @Since("2.0.0") + @Since("2.1.0") def this() = this(Identifiable.randomUID("PowerIterationClustering")) /** @group setParam */ - @Since("2.0.0") + @Since("2.1.0") def setFeaturesCol(value: String): this.type = set(featuresCol, value) /** @group setParam */ - @Since("2.0.0") + @Since("2.1.0") def setPredictionCol(value: String): this.type = set(predictionCol, value) /** @group setParam */ - @Since("2.0.0") + @Since("2.1.0") def setK(value: Int): this.type = set(k, value) /** @group expertSetParam */ - @Since("2.0.0") + @Since("2.1.0") def setInitMode(value: String): this.type = set(initMode, value) /** @group setParam */ - @Since("2.0.0") + @Since("2.1.0") def setMaxIter(value: Int): this.type = set(maxIter, value) - @Since("2.0.0") + @Since("2.1.0") override def fit(dataset: Dataset[_]): PowerIterationClusteringModel = { val rdd: RDD[(Long, Long, Double)] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => @@ -256,16 +256,16 @@ class PowerIterationClustering @Since("2.0.0") ( model } - @Since("2.0.0") + @Since("2.1.0") override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema) } } -@Since("2.0.0") +@Since("2.1.0") object PowerIterationClustering extends DefaultParamsReadable[PowerIterationClustering] { - @Since("2.0.0") + @Since("2.1.0") override def load(path: String): PowerIterationClustering = super.load(path) } From b49f4c793709b998ec4e84358bfdb441325e3edd Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Thu, 3 Nov 2016 16:26:01 -0700 Subject: [PATCH 11/26] change PIC as a Transformer --- .../clustering/PowerIterationClustering.scala | 186 ++++-------------- .../PowerIterationClusteringSuite.scala | 67 ++----- 2 files changed, 61 insertions(+), 192 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index cc3c6ce447624..10f883b7aea41 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -17,16 +17,13 @@ package org.apache.spark.ml.clustering -import org.apache.hadoop.fs.Path - import org.apache.spark.annotation.{Experimental, Since} -import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.Transformer import org.apache.spark.ml.linalg.{Vector, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPowerIterationClustering} -import org.apache.spark.mllib.clustering.{PowerIterationClusteringModel => MLlibPowerIterationClusteringModel} import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -34,7 +31,7 @@ import org.apache.spark.sql.functions.{col} import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} /** - * Common params for PowerIterationClustering and PowerIterationClusteringModel + * Common params for PowerIterationClustering */ private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter with HasFeaturesCol with HasPredictionCol { @@ -43,19 +40,19 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has * The number of clusters to create (k). Must be > 1. Default: 2. * @group param */ - @Since("2.1.0") + @Since("2.2.0") final val k = new IntParam(this, "k", "The number of clusters to create. " + "Must be > 1.", ParamValidators.gt(1)) /** @group getParam */ - @Since("2.1.0") + @Since("2.2.0") def getK: Int = $(k) /** * Param for the initialization algorithm. This can be either "random" to use a random vector * as vertex properties, or "degree" to use normalized sum similarities. Default: random. */ - @Since("2.1.0") + @Since("2.2.0") final val initMode = new Param[String](this, "initMode", "The initialization algorithm. " + "Supported options: 'random' and 'degree'.", (value: String) => validateInitMode(value)) @@ -69,203 +66,102 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has } /** @group expertGetParam */ - @Since("2.1.0") + @Since("2.2.0") def getInitMode: String = $(initMode) /** - * Validates and transforms the input schema. + * Validates the input schema * @param schema input schema - * @return output schema */ - protected def validateAndTransformSchema(schema: StructType): StructType = { - SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT) - SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) + protected def validateSchema(schema: StructType): Unit = { + SchemaUtils.checkColumnType(schema, $(featuresCol), LongType) + SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType) } } /** * :: Experimental :: - * Model fitted by PowerIterationClustering. - * @param parentModel a model trained by spark.mllib.clustering.PowerIterationClustering. - */ -@Since("2.1.0") -@Experimental -class PowerIterationClusteringModel private[ml] ( - @Since("2.1.0") override val uid: String, - private val parentModel: MLlibPowerIterationClusteringModel) - extends Model[PowerIterationClusteringModel] with PowerIterationClusteringParams with MLWritable { - - @Since("2.1.0") - override def copy(extra: ParamMap): PowerIterationClusteringModel = { - val copied = new PowerIterationClusteringModel(uid, parentModel) - copyValues(copied, extra).setParent(this.parent) - } - - def assignments: RDD[Assignment] = parentModel.assignments - - /** @group setParam */ - @Since("2.1.0") - def saveK(value: Int): this.type = set(k, value) - - /** @group expertSetParam */ - @Since("2.1.0") - def saveInitMode(value: String): this.type = set(initMode, value) - - /** @group setParam */ - @Since("2.1.0") - def saveMaxIter(value: Int): this.type = set(maxIter, value) - - @Since("2.1.0") - override def transform(dataset: Dataset[_]): DataFrame = { - predict(dataset) - } - - @Since("2.1.0") - override def transformSchema(schema: StructType): StructType = { - validateAndTransformSchema(schema) - } - - private[clustering] def predict(features: Dataset[_]): DataFrame = { - val sparkSession = features.sparkSession - val powerIterationClustering = new PowerIterationClustering().setK($(k)) - .setInitMode($(initMode)) - .setMaxIter($(maxIter)) - val model = powerIterationClustering.fit(features) - model.saveK($(k)) - .saveInitMode($(initMode)) - .saveMaxIter($(maxIter)) - val rows: RDD[Row] = model.assignments.map { - case assignment: Assignment => Row(assignment.id, assignment.cluster) - } - val schema = new StructType(Array(StructField($(featuresCol), LongType), - StructField($(predictionCol), IntegerType))) - sparkSession.createDataFrame(rows, schema) - } - - /** - * Returns a [[org.apache.spark.ml.util.MLWriter]] instance for this ML instance. - */ - @Since("2.1.0") - override def write: MLWriter = - new PowerIterationClusteringModel.PowerIterationClusteringModelWriter(this) -} - -@Since("2.1.0") -object PowerIterationClusteringModel extends MLReadable[PowerIterationClusteringModel] { - - @Since("2.1.0") - override def read: MLReader[PowerIterationClusteringModel] = - new PowerIterationClusteringModelReader() - - @Since("2.1.0") - override def load(path: String): PowerIterationClusteringModel = super.load(path) - - /** [[MLWriter]] instance for [[PowerIterationClusteringModel]] */ - private[PowerIterationClusteringModel] class PowerIterationClusteringModelWriter - (instance: PowerIterationClusteringModel) extends MLWriter { - - override protected def saveImpl(path: String): Unit = { - // Save metadata and Params - DefaultParamsWriter.saveMetadata(instance, path, sc) - val dataPath = new Path(path, "data").toString - instance.parentModel.save(sc, dataPath) - } - } - - private class PowerIterationClusteringModelReader - extends MLReader[PowerIterationClusteringModel] { - - /** Checked against metadata when loading model */ - private val className = classOf[PowerIterationClusteringModel].getName - - override def load(path: String): PowerIterationClusteringModel = { - val metadata = DefaultParamsReader.loadMetadata(path, sc, className) - val dataPath = new Path(path, "data").toString - val parentModel = MLlibPowerIterationClusteringModel.load(sc, dataPath) - val model = new PowerIterationClusteringModel(metadata.uid, parentModel) - DefaultParamsReader.getAndSetParams(model, metadata) - model - } - } -} - -/** * Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by * [[http://www.icml2010.org/papers/387.pdf Lin and Cohen]]. From the abstract: PIC finds a very * low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise * similarity matrix of the data. * + * Note that we implement [[PowerIterationClustering]] as a transformer. The [[transform]] is an + * expensive operation, because it uses PIC algorithm to cluster the whole input dataset. + * * @see [[http://en.wikipedia.org/wiki/Spectral_clustering Spectral clustering (Wikipedia)]] */ -@Since("2.1.0") +@Since("2.2.0") @Experimental -class PowerIterationClustering @Since("2.1.0") ( - @Since("2.1.0") override val uid: String) - extends Estimator[PowerIterationClusteringModel] with PowerIterationClusteringParams - with DefaultParamsWritable { +class PowerIterationClustering private[clustering] ( + @Since("2.2.0") override val uid: String) + extends Transformer with PowerIterationClusteringParams with DefaultParamsWritable { setDefault( k -> 2, maxIter -> 20, initMode -> "random") - @Since("2.1.0") + @Since("2.2.0") override def copy(extra: ParamMap): PowerIterationClustering = defaultCopy(extra) - @Since("2.1.0") + @Since("2.2.0") def this() = this(Identifiable.randomUID("PowerIterationClustering")) /** @group setParam */ - @Since("2.1.0") + @Since("2.2.0") def setFeaturesCol(value: String): this.type = set(featuresCol, value) /** @group setParam */ - @Since("2.1.0") + @Since("2.2.0") def setPredictionCol(value: String): this.type = set(predictionCol, value) /** @group setParam */ - @Since("2.1.0") + @Since("2.2.0") def setK(value: Int): this.type = set(k, value) /** @group expertSetParam */ - @Since("2.1.0") + @Since("2.2.0") def setInitMode(value: String): this.type = set(initMode, value) /** @group setParam */ - @Since("2.1.0") + @Since("2.2.0") def setMaxIter(value: Int): this.type = set(maxIter, value) - @Since("2.1.0") - override def fit(dataset: Dataset[_]): PowerIterationClusteringModel = { + @Since("2.2.0") + override def transform(dataset: Dataset[_]): DataFrame = { + val sparkSession = dataset.sparkSession val rdd: RDD[(Long, Long, Double)] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => val array = point.toArray require(array.size == 3, "The number of elements in each row must be 3.") (array(0).toLong, array(1).toLong, array(2)) } - - val algo = new MLlibPowerIterationClustering() + val algorithm = new MLlibPowerIterationClustering() .setK($(k)) .setInitializationMode($(initMode)) .setMaxIterations($(maxIter)) - val parentModel = algo.run(rdd) - val model = copyValues(new PowerIterationClusteringModel(uid, parentModel).setParent(this)) - model.saveK($(k)) - .saveInitMode($(initMode)) - .saveMaxIter($(maxIter)) - model + val model = algorithm.run(rdd) + val rows: RDD[Row] = model.assignments.map { + case assignment: Assignment => Row(assignment.id, assignment.cluster) + } + val schema = transformSchema(new StructType(Array(StructField($(featuresCol), LongType), + StructField($(predictionCol), IntegerType)))) + sparkSession.createDataFrame(rows, schema) } - @Since("2.1.0") + @Since("2.2.0") override def transformSchema(schema: StructType): StructType = { - validateAndTransformSchema(schema) + validateSchema(schema) + schema } + } -@Since("2.1.0") +@Since("2.2.0") object PowerIterationClustering extends DefaultParamsReadable[PowerIterationClustering] { - @Since("2.1.0") + @Since("2.2.0") override def load(path: String): PowerIterationClustering = super.load(path) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala index bb2f74fb67097..c75776da68abe 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { @@ -76,58 +76,43 @@ class PowerIterationClusteringSuite extends SparkFunSuite test("power iteration clustering") { val n = n1 + n2 - val model = new PowerIterationClustering() + + val result = new PowerIterationClustering() .setK(2) .setMaxIter(40) - .fit(data) + .transform(data) + val predictions = Array.fill(2)(mutable.Set.empty[Long]) - model.assignments.collect().foreach { a => - predictions(a.cluster) += a.id + result.select("features", "prediction").collect().foreach { + case Row(id: Long, cluster: Integer) => predictions(cluster) += id } assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet)) - val model2 = new PowerIterationClustering() + val result2 = new PowerIterationClustering() .setK(2) .setMaxIter(10) .setInitMode("degree") - .fit(data) + .transform(data) val predictions2 = Array.fill(2)(mutable.Set.empty[Long]) - model2.assignments.collect().foreach { a => - predictions2(a.cluster) += a.id + result2.select("features", "prediction").collect().foreach { + case Row(id: Long, cluster: Integer) => predictions2(cluster) += id } assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet)) - } - - test("transform") { - val predictionColName = "pic_prediction" - val model = new PowerIterationClustering() - .setK(2) - .setMaxIter(10) - .setPredictionCol(predictionColName) - .fit(data) - val transformed = model.transform(data) - val expectedColumns = Array("features", predictionColName) + val expectedColumns = Array("features", "prediction") expectedColumns.foreach { column => - assert(transformed.columns.contains(column)) + assert(result2.columns.contains(column)) } } test("read/write") { - def checkModelData(model: PowerIterationClusteringModel, - model2: PowerIterationClusteringModel): Unit = { - assert(model.getK === model2.getK) - val modelAssignments = - model.assignments.map(x => (x.id, x.cluster)) - val model2Assignments = - model2.assignments.map(x => (x.id, x.cluster)) - val unequalElements = modelAssignments.join(model2Assignments).filter { - case (id, (c1, c2)) => c1 != c2 }.count() - assert(unequalElements === 0L) - } - val pic = new PowerIterationClustering() - testEstimatorAndModelReadWrite(pic, data, PowerIterationClusteringSuite.allParamSettings, - checkModelData) + val t = new PowerIterationClustering() + .setK(4) + .setMaxIter(100) + .setInitMode("degree") + .setFeaturesCol("test_feature") + .setPredictionCol("test_prediction") + testDefaultReadWrite(t) } } @@ -161,16 +146,4 @@ object PowerIterationClusteringSuite { .map(v => TestRow(v)) spark.createDataFrame(rdd) } - - /** - * Mapping from all Params to valid settings which differ from the defaults. - * This is useful for tests which need to exercise all Params, such as save/load. - * This excludes input columns to simplify some tests. - */ - val allParamSettings: Map[String, Any] = Map( - "predictionCol" -> "myPrediction", - "k" -> 2, - "maxIter" -> 10, - "initMode" -> "random" - ) } From d3f86d02f352f786c0aaac24ad52a9c4691685b4 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Fri, 4 Nov 2016 10:28:26 -0700 Subject: [PATCH 12/26] add LabelCol --- .../ml/clustering/PowerIterationClustering.scala | 15 ++++++++++----- .../PowerIterationClusteringSuite.scala | 10 +++++++--- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 10f883b7aea41..7ddae0813d455 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.clustering import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.Transformer -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg.{Vector} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -34,7 +34,7 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructTyp * Common params for PowerIterationClustering */ private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter - with HasFeaturesCol with HasPredictionCol { + with HasFeaturesCol with HasPredictionCol with HasLabelCol { /** * The number of clusters to create (k). Must be > 1. Default: 2. @@ -74,7 +74,7 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has * @param schema input schema */ protected def validateSchema(schema: StructType): Unit = { - SchemaUtils.checkColumnType(schema, $(featuresCol), LongType) + SchemaUtils.checkColumnType(schema, $(labelCol), LongType) SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType) } } @@ -100,7 +100,8 @@ class PowerIterationClustering private[clustering] ( setDefault( k -> 2, maxIter -> 20, - initMode -> "random") + initMode -> "random", + labelCol -> "id") @Since("2.2.0") override def copy(extra: ParamMap): PowerIterationClustering = defaultCopy(extra) @@ -128,6 +129,10 @@ class PowerIterationClustering private[clustering] ( @Since("2.2.0") def setMaxIter(value: Int): this.type = set(maxIter, value) + /** @group setParam */ + @Since("2.2.0") + def setLabelCol(value: String): this.type = set(labelCol, value) + @Since("2.2.0") override def transform(dataset: Dataset[_]): DataFrame = { val sparkSession = dataset.sparkSession @@ -145,7 +150,7 @@ class PowerIterationClustering private[clustering] ( val rows: RDD[Row] = model.assignments.map { case assignment: Assignment => Row(assignment.id, assignment.cluster) } - val schema = transformSchema(new StructType(Array(StructField($(featuresCol), LongType), + val schema = transformSchema(new StructType(Array(StructField($(labelCol), LongType), StructField($(predictionCol), IntegerType)))) sparkSession.createDataFrame(rows, schema) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala index c75776da68abe..f97ae4f199159 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala @@ -48,6 +48,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite assert(pic.getInitMode === "random") assert(pic.getFeaturesCol === "features") assert(pic.getPredictionCol === "prediction") + assert(pic.getLabelCol === "id") } test("set parameters") { @@ -57,12 +58,14 @@ class PowerIterationClusteringSuite extends SparkFunSuite .setInitMode("degree") .setFeaturesCol("test_feature") .setPredictionCol("test_prediction") + .setLabelCol("test_id") assert(pic.getK === 9) assert(pic.getMaxIter === 33) assert(pic.getInitMode === "degree") assert(pic.getFeaturesCol === "test_feature") assert(pic.getPredictionCol === "test_prediction") + assert(pic.getLabelCol === "test_id") } test("parameters validation") { @@ -83,7 +86,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite .transform(data) val predictions = Array.fill(2)(mutable.Set.empty[Long]) - result.select("features", "prediction").collect().foreach { + result.select("id", "prediction").collect().foreach { case Row(id: Long, cluster: Integer) => predictions(cluster) += id } assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet)) @@ -94,12 +97,12 @@ class PowerIterationClusteringSuite extends SparkFunSuite .setInitMode("degree") .transform(data) val predictions2 = Array.fill(2)(mutable.Set.empty[Long]) - result2.select("features", "prediction").collect().foreach { + result2.select("id", "prediction").collect().foreach { case Row(id: Long, cluster: Integer) => predictions2(cluster) += id } assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet)) - val expectedColumns = Array("features", "prediction") + val expectedColumns = Array("id", "prediction") expectedColumns.foreach { column => assert(result2.columns.contains(column)) } @@ -112,6 +115,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite .setInitMode("degree") .setFeaturesCol("test_feature") .setPredictionCol("test_prediction") + .setLabelCol("test_id") testDefaultReadWrite(t) } } From 655bc675a5875bbc92ea0788496d2039a89c48d2 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Fri, 4 Nov 2016 11:36:09 -0700 Subject: [PATCH 13/26] change col implementation --- .../clustering/PowerIterationClustering.scala | 20 ++++++++++++++----- .../PowerIterationClusteringSuite.scala | 8 ++++---- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 7ddae0813d455..53513a1b1313a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructTyp * Common params for PowerIterationClustering */ private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter - with HasFeaturesCol with HasPredictionCol with HasLabelCol { + with HasFeaturesCol with HasPredictionCol { /** * The number of clusters to create (k). Must be > 1. Default: 2. @@ -69,12 +69,22 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has @Since("2.2.0") def getInitMode: String = $(initMode) + /** + * Param for the column name for ids returned by [[PowerIterationClustering.transform()]]. + * Default: "id" + * @group param + */ + val idCol = new Param[String](this, "idCol", "column name for ids.") + + /** @group getParam */ + def getIdCol: String = $(idCol) + /** * Validates the input schema * @param schema input schema */ protected def validateSchema(schema: StructType): Unit = { - SchemaUtils.checkColumnType(schema, $(labelCol), LongType) + SchemaUtils.checkColumnType(schema, $(idCol), LongType) SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType) } } @@ -101,7 +111,7 @@ class PowerIterationClustering private[clustering] ( k -> 2, maxIter -> 20, initMode -> "random", - labelCol -> "id") + idCol -> "id") @Since("2.2.0") override def copy(extra: ParamMap): PowerIterationClustering = defaultCopy(extra) @@ -131,7 +141,7 @@ class PowerIterationClustering private[clustering] ( /** @group setParam */ @Since("2.2.0") - def setLabelCol(value: String): this.type = set(labelCol, value) + def setIdCol(value: String): this.type = set(idCol, value) @Since("2.2.0") override def transform(dataset: Dataset[_]): DataFrame = { @@ -150,7 +160,7 @@ class PowerIterationClustering private[clustering] ( val rows: RDD[Row] = model.assignments.map { case assignment: Assignment => Row(assignment.id, assignment.cluster) } - val schema = transformSchema(new StructType(Array(StructField($(labelCol), LongType), + val schema = transformSchema(new StructType(Array(StructField($(idCol), LongType), StructField($(predictionCol), IntegerType)))) sparkSession.createDataFrame(rows, schema) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala index f97ae4f199159..ed5bfce2d0d0d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala @@ -48,7 +48,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite assert(pic.getInitMode === "random") assert(pic.getFeaturesCol === "features") assert(pic.getPredictionCol === "prediction") - assert(pic.getLabelCol === "id") + assert(pic.getIdCol === "id") } test("set parameters") { @@ -58,14 +58,14 @@ class PowerIterationClusteringSuite extends SparkFunSuite .setInitMode("degree") .setFeaturesCol("test_feature") .setPredictionCol("test_prediction") - .setLabelCol("test_id") + .setIdCol("test_id") assert(pic.getK === 9) assert(pic.getMaxIter === 33) assert(pic.getInitMode === "degree") assert(pic.getFeaturesCol === "test_feature") assert(pic.getPredictionCol === "test_prediction") - assert(pic.getLabelCol === "test_id") + assert(pic.getIdCol === "test_id") } test("parameters validation") { @@ -115,7 +115,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite .setInitMode("degree") .setFeaturesCol("test_feature") .setPredictionCol("test_prediction") - .setLabelCol("test_id") + .setIdCol("test_id") testDefaultReadWrite(t) } } From d5975bc7e73f430c944fd01b122c945af8b370c7 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Fri, 17 Feb 2017 14:20:00 -0800 Subject: [PATCH 14/26] address some of the comments --- .../clustering/PowerIterationClustering.scala | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 53513a1b1313a..2913163c73e0f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.clustering import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.Transformer -import org.apache.spark.ml.linalg.{Vector} +import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -27,7 +27,7 @@ import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPower import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} -import org.apache.spark.sql.functions.{col} +import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} /** @@ -53,16 +53,10 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has * as vertex properties, or "degree" to use normalized sum similarities. Default: random. */ @Since("2.2.0") - final val initMode = new Param[String](this, "initMode", "The initialization algorithm. " + - "Supported options: 'random' and 'degree'.", - (value: String) => validateInitMode(value)) - - private[spark] def validateInitMode(initMode: String): Boolean = { - initMode match { - case "random" => true - case "degree" => true - case _ => false - } + final val initMode = { + val allowedParams = ParamValidators.inArray(Array("random", "degree")) + new Param[String](this, "initMode", "The initialization algorithm. " + + "Supported options: 'random' and 'degree'.", allowedParams) } /** @group expertGetParam */ @@ -104,7 +98,7 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has @Since("2.2.0") @Experimental class PowerIterationClustering private[clustering] ( - @Since("2.2.0") override val uid: String) + @Since("2.2.0") override val uid: String) extends Transformer with PowerIterationClusteringParams with DefaultParamsWritable { setDefault( From f012624a5061c4df0aaad1f232e0ca17f366b824 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Tue, 21 Feb 2017 14:37:51 -0800 Subject: [PATCH 15/26] add additional test with dataset having more data --- .../PowerIterationClusteringSuite.scala | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala index ed5bfce2d0d0d..8aa963d0f0710 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.clustering import scala.collection.mutable -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -29,6 +29,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { @transient var data: Dataset[_] = _ + @transient var malData: Dataset[_] = _ final val r1 = 1.0 final val n1 = 10 final val r2 = 4.0 @@ -38,6 +39,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite super.beforeAll() data = PowerIterationClusteringSuite.generatePICData(spark, r1, r2, n1, n2) + malData = PowerIterationClusteringSuite.generateMalFormatData(spark) } test("default parameters") { @@ -80,10 +82,16 @@ class PowerIterationClusteringSuite extends SparkFunSuite test("power iteration clustering") { val n = n1 + n2 - val result = new PowerIterationClustering() + val model = new PowerIterationClustering() .setK(2) .setMaxIter(40) - .transform(data) + val result = model.transform(data) + + val thrownData = intercept[SparkException] { + model.transform(malData) + } + + assert(thrownData.getMessage().contains("The number of elements in each row must be 3")) val predictions = Array.fill(2)(mutable.Set.empty[Long]) result.select("id", "prediction").collect().foreach { @@ -150,4 +158,16 @@ object PowerIterationClusteringSuite { .map(v => TestRow(v)) spark.createDataFrame(rdd) } + + def generateMalFormatData(spark: SparkSession): DataFrame = { + val data = for (i <- 1 until 2; j <- 0 until i) yield { + (i.toLong, j.toLong, 0.01, (i + j).toLong) + } + val sc = spark.sparkContext + val rdd = sc.parallelize(data) + .map{case (i: Long, j: Long, sim: Double, k: Long) => Vectors.dense(Array(i, j, sim, k))} + .map(v => TestRow(v)) + spark.createDataFrame(rdd) + } + } From bef0594e1ecf0e94ca9ee90e488fe098b83b50b2 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Tue, 14 Mar 2017 16:13:45 -0700 Subject: [PATCH 16/26] change input data format --- .../clustering/PowerIterationClustering.scala | 31 +++++++------ .../PowerIterationClusteringSuite.scala | 43 ++++++++----------- 2 files changed, 35 insertions(+), 39 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 2913163c73e0f..ae9cf2f26bf88 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -17,9 +17,10 @@ package org.apache.spark.ml.clustering +import scala.collection.mutable + import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.Transformer -import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -27,7 +28,6 @@ import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPower import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} -import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} /** @@ -86,14 +86,14 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has /** * :: Experimental :: * Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by - * [[http://www.icml2010.org/papers/387.pdf Lin and Cohen]]. From the abstract: PIC finds a very - * low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise - * similarity matrix of the data. + * Lin and Cohen. From the abstract: + * PIC finds a very low-dimensional embedding of a dataset using truncated power + * iteration on a normalized pair-wise similarity matrix of the data. * * Note that we implement [[PowerIterationClustering]] as a transformer. The [[transform]] is an * expensive operation, because it uses PIC algorithm to cluster the whole input dataset. * - * @see [[http://en.wikipedia.org/wiki/Spectral_clustering Spectral clustering (Wikipedia)]] + * @see */ @Since("2.2.0") @Experimental @@ -140,23 +140,28 @@ class PowerIterationClustering private[clustering] ( @Since("2.2.0") override def transform(dataset: Dataset[_]): DataFrame = { val sparkSession = dataset.sparkSession - val rdd: RDD[(Long, Long, Double)] = dataset.select(col($(featuresCol))).rdd.map { - case Row(point: Vector) => - val array = point.toArray - require(array.size == 3, "The number of elements in each row must be 3.") - (array(0).toLong, array(1).toLong, array(2)) - } + + val rdd: RDD[(Long, Long, Double)] = dataset.select("id", "neighbor", "weight").rdd.map { + case Row(id: Long, nbr: mutable.WrappedArray[Long], weight: mutable.WrappedArray[Double]) + => (id, nbr, weight) + }.flatMap{ case (id, nbr, weight) => + val ids = Array.fill(nbr.length)(id) + ids.zip(nbr).zip(weight)}.map {case ((i, j), k) => (i, j, k)} val algorithm = new MLlibPowerIterationClustering() .setK($(k)) .setInitializationMode($(initMode)) .setMaxIterations($(maxIter)) val model = algorithm.run(rdd) + val rows: RDD[Row] = model.assignments.map { case assignment: Assignment => Row(assignment.id, assignment.cluster) } + val schema = transformSchema(new StructType(Array(StructField($(idCol), LongType), StructField($(predictionCol), IntegerType)))) - sparkSession.createDataFrame(rows, schema) + val result = sparkSession.createDataFrame(rows, schema) + + dataset.join(result, "id") } @Since("2.2.0") diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala index 8aa963d0f0710..eb140036576e4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala @@ -19,8 +19,7 @@ package org.apache.spark.ml.clustering import scala.collection.mutable -import org.apache.spark.{SparkException, SparkFunSuite} -import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.SparkFunSuite import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} @@ -39,7 +38,6 @@ class PowerIterationClusteringSuite extends SparkFunSuite super.beforeAll() data = PowerIterationClusteringSuite.generatePICData(spark, r1, r2, n1, n2) - malData = PowerIterationClusteringSuite.generateMalFormatData(spark) } test("default parameters") { @@ -87,17 +85,11 @@ class PowerIterationClusteringSuite extends SparkFunSuite .setMaxIter(40) val result = model.transform(data) - val thrownData = intercept[SparkException] { - model.transform(malData) - } - - assert(thrownData.getMessage().contains("The number of elements in each row must be 3")) - val predictions = Array.fill(2)(mutable.Set.empty[Long]) result.select("id", "prediction").collect().foreach { case Row(id: Long, cluster: Integer) => predictions(cluster) += id } - assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet)) + assert(predictions.toSet == Set((1 until n1).toSet, (n1 until n).toSet)) val result2 = new PowerIterationClustering() .setK(2) @@ -108,7 +100,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite result2.select("id", "prediction").collect().foreach { case Row(id: Long, cluster: Integer) => predictions2(cluster) += id } - assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet)) + assert(predictions2.toSet == Set((1 until n1).toSet, (n1 until n).toSet)) val expectedColumns = Array("id", "prediction") expectedColumns.foreach { column => @@ -130,6 +122,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite object PowerIterationClusteringSuite { + case class TestRow2(id: Long, neighbor: Array[Long], weight: Array[Double]) /** Generates a circle of points. */ private def genCircle(r: Double, n: Int): Array[(Double, Double)] = { Array.tabulate(n) { i => @@ -149,24 +142,22 @@ object PowerIterationClusteringSuite { // Generate two circles following the example in the PIC paper. val n = n1 + n2 val points = genCircle(r1, n1) ++ genCircle(r2, n2) - val similarities = for (i <- 1 until n; j <- 0 until i) yield { - (i.toLong, j.toLong, sim(points(i), points(j))) - } - val sc = spark.sparkContext - val rdd = sc.parallelize(similarities) - .map{case (i: Long, j: Long, sim: Double) => Vectors.dense(Array(i, j, sim))} - .map(v => TestRow(v)) - spark.createDataFrame(rdd) - } - def generateMalFormatData(spark: SparkSession): DataFrame = { - val data = for (i <- 1 until 2; j <- 0 until i) yield { - (i.toLong, j.toLong, 0.01, (i + j).toLong) + val similarities = for (i <- 1 until n) yield { + val neighbor = for (j <- 0 until i) yield { + j.toLong + } + val weight = for (j <- 0 until i) yield { + sim(points(i), points(j)) + } + (i.toLong, neighbor.toArray, weight.toArray) } + val sc = spark.sparkContext - val rdd = sc.parallelize(data) - .map{case (i: Long, j: Long, sim: Double, k: Long) => Vectors.dense(Array(i, j, sim, k))} - .map(v => TestRow(v)) + + val rdd = sc.parallelize(similarities).map{ + case (id: Long, nbr: Array[Long], weight: Array[Double]) => + TestRow2(id, nbr, weight)} spark.createDataFrame(rdd) } From a4bee8908f61226f891c2df2f8fe912a72d46b42 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Wed, 15 Mar 2017 15:49:45 -0700 Subject: [PATCH 17/26] resolve warnings --- .../spark/ml/clustering/PowerIterationClustering.scala | 9 ++++----- .../ml/clustering/PowerIterationClusteringSuite.scala | 5 +++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index ae9cf2f26bf88..dcebf73b09c73 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -17,9 +17,8 @@ package org.apache.spark.ml.clustering -import scala.collection.mutable - import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.Transformer import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ @@ -142,11 +141,11 @@ class PowerIterationClustering private[clustering] ( val sparkSession = dataset.sparkSession val rdd: RDD[(Long, Long, Double)] = dataset.select("id", "neighbor", "weight").rdd.map { - case Row(id: Long, nbr: mutable.WrappedArray[Long], weight: mutable.WrappedArray[Double]) + case Row(id: Long, nbr: Vector, weight: Vector) => (id, nbr, weight) }.flatMap{ case (id, nbr, weight) => - val ids = Array.fill(nbr.length)(id) - ids.zip(nbr).zip(weight)}.map {case ((i, j), k) => (i, j, k)} + val ids = Array.fill(nbr.size)(id) + ids.zip(nbr.toArray).zip(weight.toArray)}.map {case ((i, j), k) => (i, j.toLong, k)} val algorithm = new MLlibPowerIterationClustering() .setK($(k)) .setInitializationMode($(initMode)) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala index eb140036576e4..3cc0cffdd700d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.ml.clustering import scala.collection.mutable import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} @@ -122,7 +123,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite object PowerIterationClusteringSuite { - case class TestRow2(id: Long, neighbor: Array[Long], weight: Array[Double]) + case class TestRow2(id: Long, neighbor: Vector, weight: Vector) /** Generates a circle of points. */ private def genCircle(r: Double, n: Int): Array[(Double, Double)] = { Array.tabulate(n) { i => @@ -157,7 +158,7 @@ object PowerIterationClusteringSuite { val rdd = sc.parallelize(similarities).map{ case (id: Long, nbr: Array[Long], weight: Array[Double]) => - TestRow2(id, nbr, weight)} + TestRow2(id, Vectors.dense(nbr.map(i => i.toDouble)), Vectors.dense(weight))} spark.createDataFrame(rdd) } From 0f979073f8f7a588449fbbc10a700decc925f8df Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Thu, 16 Mar 2017 11:33:47 -0700 Subject: [PATCH 18/26] add neighbor and weight cols --- .../clustering/PowerIterationClustering.scala | 49 ++++++++++++++++--- .../PowerIterationClusteringSuite.scala | 6 +++ 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index dcebf73b09c73..b51cfcb5f1378 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -27,13 +27,14 @@ import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPower import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} /** * Common params for PowerIterationClustering */ private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter - with HasFeaturesCol with HasPredictionCol { + with HasFeaturesCol with HasPredictionCol with HasWeightCol { /** * The number of clusters to create (k). Must be > 1. Default: 2. @@ -67,11 +68,21 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has * Default: "id" * @group param */ - val idCol = new Param[String](this, "idCol", "column name for ids.") + val idCol = new Param[String](this, "id", "column name for ids.") /** @group getParam */ def getIdCol: String = $(idCol) + /** + * Param for the column name for neighbors required by [[PowerIterationClustering.transform()]]. + * Default: "neighbor" + * @group param + */ + val neighborCol = new Param[String](this, "neighbor", "column name for neighbors.") + + /** @group getParam */ + def getNeighborCol: String = $(neighborCol) + /** * Validates the input schema * @param schema input schema @@ -104,7 +115,9 @@ class PowerIterationClustering private[clustering] ( k -> 2, maxIter -> 20, initMode -> "random", - idCol -> "id") + idCol -> "id", + weightCol -> "weight", + neighborCol -> "neighbor") @Since("2.2.0") override def copy(extra: ParamMap): PowerIterationClustering = defaultCopy(extra) @@ -136,16 +149,36 @@ class PowerIterationClustering private[clustering] ( @Since("2.2.0") def setIdCol(value: String): this.type = set(idCol, value) + /** + * Sets the value of param [[weightCol]]. + * Default is "weight" + * + * @group setParam + */ + @Since("2.2.0") + def setWeightCol(value: String): this.type = set(weightCol, value) + + /** + * Sets the value of param [[neighborCol]]. + * Default is "neighbor" + * + * @group setParam + */ + @Since("2.2.0") + def setNeighborCol(value: String): this.type = set(neighborCol, value) + @Since("2.2.0") override def transform(dataset: Dataset[_]): DataFrame = { val sparkSession = dataset.sparkSession - val rdd: RDD[(Long, Long, Double)] = dataset.select("id", "neighbor", "weight").rdd.map { - case Row(id: Long, nbr: Vector, weight: Vector) - => (id, nbr, weight) + val rdd: RDD[(Long, Long, Double)] = + dataset.select(col($(idCol)), col($(neighborCol)), col($(weightCol))).rdd.map { + case Row(id: Long, nbr: Vector, weight: Vector) => (id, nbr, weight) }.flatMap{ case (id, nbr, weight) => - val ids = Array.fill(nbr.size)(id) - ids.zip(nbr.toArray).zip(weight.toArray)}.map {case ((i, j), k) => (i, j.toLong, k)} + require(nbr.size == weight.size, + "The length of neighbor list must be equal to the the length of the weight list.") + val ids = Array.fill(nbr.size)(id) + ids.zip(nbr.toArray).zip(weight.toArray)}.map {case ((i, j), k) => (i, j.toLong, k)} val algorithm = new MLlibPowerIterationClustering() .setK($(k)) .setInitializationMode($(initMode)) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala index 3cc0cffdd700d..1c7f01b0dfdff 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala @@ -50,6 +50,8 @@ class PowerIterationClusteringSuite extends SparkFunSuite assert(pic.getFeaturesCol === "features") assert(pic.getPredictionCol === "prediction") assert(pic.getIdCol === "id") + assert(pic.getWeightCol === "weight") + assert(pic.getNeighborCol === "neighbor") } test("set parameters") { @@ -60,6 +62,8 @@ class PowerIterationClusteringSuite extends SparkFunSuite .setFeaturesCol("test_feature") .setPredictionCol("test_prediction") .setIdCol("test_id") + .setWeightCol("test_weight") + .setNeighborCol("test_neighbor") assert(pic.getK === 9) assert(pic.getMaxIter === 33) @@ -67,6 +71,8 @@ class PowerIterationClusteringSuite extends SparkFunSuite assert(pic.getFeaturesCol === "test_feature") assert(pic.getPredictionCol === "test_prediction") assert(pic.getIdCol === "test_id") + assert(pic.getWeightCol === "test_weight") + assert(pic.getNeighborCol === "test_neighbor") } test("parameters validation") { From 015383a4cedc7af14386e256e3e295da92594aae Mon Sep 17 00:00:00 2001 From: wangmiao1981 Date: Tue, 15 Aug 2017 14:13:55 -0700 Subject: [PATCH 19/26] address review comments 1 --- .../clustering/PowerIterationClustering.scala | 51 +++++++++++-------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index b51cfcb5f1378..53ab5cf80d5d4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -40,19 +40,19 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has * The number of clusters to create (k). Must be > 1. Default: 2. * @group param */ - @Since("2.2.0") + @Since("2.3.0") final val k = new IntParam(this, "k", "The number of clusters to create. " + "Must be > 1.", ParamValidators.gt(1)) /** @group getParam */ - @Since("2.2.0") + @Since("2.3.0") def getK: Int = $(k) /** * Param for the initialization algorithm. This can be either "random" to use a random vector * as vertex properties, or "degree" to use normalized sum similarities. Default: random. */ - @Since("2.2.0") + @Since("2.3.0") final val initMode = { val allowedParams = ParamValidators.inArray(Array("random", "degree")) new Param[String](this, "initMode", "The initialization algorithm. " + @@ -60,7 +60,7 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has } /** @group expertGetParam */ - @Since("2.2.0") + @Since("2.3.0") def getInitMode: String = $(initMode) /** @@ -105,10 +105,10 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has * * @see */ -@Since("2.2.0") +@Since("2.3.0") @Experimental class PowerIterationClustering private[clustering] ( - @Since("2.2.0") override val uid: String) + @Since("2.3.0") override val uid: String) extends Transformer with PowerIterationClusteringParams with DefaultParamsWritable { setDefault( @@ -119,34 +119,34 @@ class PowerIterationClustering private[clustering] ( weightCol -> "weight", neighborCol -> "neighbor") - @Since("2.2.0") + @Since("2.3.0") override def copy(extra: ParamMap): PowerIterationClustering = defaultCopy(extra) - @Since("2.2.0") + @Since("2.3.0") def this() = this(Identifiable.randomUID("PowerIterationClustering")) /** @group setParam */ - @Since("2.2.0") + @Since("2.3.0") def setFeaturesCol(value: String): this.type = set(featuresCol, value) /** @group setParam */ - @Since("2.2.0") + @Since("2.3.0") def setPredictionCol(value: String): this.type = set(predictionCol, value) /** @group setParam */ - @Since("2.2.0") + @Since("2.3.0") def setK(value: Int): this.type = set(k, value) /** @group expertSetParam */ - @Since("2.2.0") + @Since("2.3.0") def setInitMode(value: String): this.type = set(initMode, value) /** @group setParam */ - @Since("2.2.0") + @Since("2.3.0") def setMaxIter(value: Int): this.type = set(maxIter, value) /** @group setParam */ - @Since("2.2.0") + @Since("2.3.0") def setIdCol(value: String): this.type = set(idCol, value) /** @@ -155,7 +155,7 @@ class PowerIterationClustering private[clustering] ( * * @group setParam */ - @Since("2.2.0") + @Since("2.3.0") def setWeightCol(value: String): this.type = set(weightCol, value) /** @@ -164,13 +164,13 @@ class PowerIterationClustering private[clustering] ( * * @group setParam */ - @Since("2.2.0") + @Since("2.3.0") def setNeighborCol(value: String): this.type = set(neighborCol, value) - @Since("2.2.0") + @Since("2.3.0") override def transform(dataset: Dataset[_]): DataFrame = { val sparkSession = dataset.sparkSession - +/* val rdd: RDD[(Long, Long, Double)] = dataset.select(col($(idCol)), col($(neighborCol)), col($(weightCol))).rdd.map { case Row(id: Long, nbr: Vector, weight: Vector) => (id, nbr, weight) @@ -179,6 +179,15 @@ class PowerIterationClustering private[clustering] ( "The length of neighbor list must be equal to the the length of the weight list.") val ids = Array.fill(nbr.size)(id) ids.zip(nbr.toArray).zip(weight.toArray)}.map {case ((i, j), k) => (i, j.toLong, k)} +*/ + val rdd: RDD[(Long, Long, Double)] = + dataset.select(col($(idCol)), col($(neighborCol)), col($(weightCol))).rdd.flatMap { + case Row(id: Long, nbr: Vector, weight: Vector) => + require(nbr.size == weight.size, + "The length of neighbor list must be equal to the the length of the weight list.") + val ids = Array.fill(nbr.size)(id) + for (i <- 0 until ids.size) yield (ids(i), nbr(i).toLong, weight(i))} + val algorithm = new MLlibPowerIterationClustering() .setK($(k)) .setInitializationMode($(initMode)) @@ -196,7 +205,7 @@ class PowerIterationClustering private[clustering] ( dataset.join(result, "id") } - @Since("2.2.0") + @Since("2.3.0") override def transformSchema(schema: StructType): StructType = { validateSchema(schema) schema @@ -204,10 +213,10 @@ class PowerIterationClustering private[clustering] ( } -@Since("2.2.0") +@Since("2.3.0") object PowerIterationClustering extends DefaultParamsReadable[PowerIterationClustering] { - @Since("2.2.0") + @Since("2.3.0") override def load(path: String): PowerIterationClustering = super.load(path) } From 2d2957027428200ba8c5072caa51b4abc614be2e Mon Sep 17 00:00:00 2001 From: wangmiao1981 Date: Tue, 15 Aug 2017 14:23:39 -0700 Subject: [PATCH 20/26] fix style --- .../apache/spark/ml/clustering/PowerIterationClustering.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 53ab5cf80d5d4..66a776429e5d8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -18,8 +18,8 @@ package org.apache.spark.ml.clustering import org.apache.spark.annotation.{Experimental, Since} -import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.Transformer +import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ From af549e8dca3cd6488c3217e2eb9fe61025b11cd7 Mon Sep 17 00:00:00 2001 From: wangmiao1981 Date: Tue, 15 Aug 2017 15:30:28 -0700 Subject: [PATCH 21/26] remove unused comments --- .../ml/clustering/PowerIterationClustering.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 66a776429e5d8..93f8b4dcd5546 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -170,16 +170,6 @@ class PowerIterationClustering private[clustering] ( @Since("2.3.0") override def transform(dataset: Dataset[_]): DataFrame = { val sparkSession = dataset.sparkSession -/* - val rdd: RDD[(Long, Long, Double)] = - dataset.select(col($(idCol)), col($(neighborCol)), col($(weightCol))).rdd.map { - case Row(id: Long, nbr: Vector, weight: Vector) => (id, nbr, weight) - }.flatMap{ case (id, nbr, weight) => - require(nbr.size == weight.size, - "The length of neighbor list must be equal to the the length of the weight list.") - val ids = Array.fill(nbr.size)(id) - ids.zip(nbr.toArray).zip(weight.toArray)}.map {case ((i, j), k) => (i, j.toLong, k)} -*/ val rdd: RDD[(Long, Long, Double)] = dataset.select(col($(idCol)), col($(neighborCol)), col($(weightCol))).rdd.flatMap { case Row(id: Long, nbr: Vector, weight: Vector) => @@ -187,7 +177,6 @@ class PowerIterationClustering private[clustering] ( "The length of neighbor list must be equal to the the length of the weight list.") val ids = Array.fill(nbr.size)(id) for (i <- 0 until ids.size) yield (ids(i), nbr(i).toLong, weight(i))} - val algorithm = new MLlibPowerIterationClustering() .setK($(k)) .setInitializationMode($(initMode)) From 9b4f3d50be6796d7f24fefae5690a5776c948ff7 Mon Sep 17 00:00:00 2001 From: wangmiao1981 Date: Tue, 15 Aug 2017 16:43:14 -0700 Subject: [PATCH 22/26] add Since --- .../apache/spark/ml/clustering/PowerIterationClustering.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 93f8b4dcd5546..bdc66b16832ef 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -68,9 +68,11 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has * Default: "id" * @group param */ + @Since("2.3.0") val idCol = new Param[String](this, "id", "column name for ids.") /** @group getParam */ + @Since("2.3.0") def getIdCol: String = $(idCol) /** @@ -78,9 +80,11 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has * Default: "neighbor" * @group param */ + @Since("2.3.0") val neighborCol = new Param[String](this, "neighbor", "column name for neighbors.") /** @group getParam */ + @Since("2.3.0") def getNeighborCol: String = $(neighborCol) /** From e35fe546d334765f0a90c3393de0880a2c27981a Mon Sep 17 00:00:00 2001 From: wangmiao1981 Date: Wed, 16 Aug 2017 17:12:12 -0700 Subject: [PATCH 23/26] fix missing > --- .../apache/spark/ml/clustering/PowerIterationClustering.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index bdc66b16832ef..e63a3294575f1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -107,7 +107,8 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has * Note that we implement [[PowerIterationClustering]] as a transformer. The [[transform]] is an * expensive operation, because it uses PIC algorithm to cluster the whole input dataset. * - * @see + * @see + * Spectral clustering (Wikipedia) */ @Since("2.3.0") @Experimental From 73485d84671db335d90f7cffa08f983bccb97901 Mon Sep 17 00:00:00 2001 From: wangmiao1981 Date: Thu, 17 Aug 2017 10:26:40 -0700 Subject: [PATCH 24/26] fix doc --- .../spark/ml/clustering/PowerIterationClustering.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index e63a3294575f1..7387cc61c073e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -37,7 +37,7 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has with HasFeaturesCol with HasPredictionCol with HasWeightCol { /** - * The number of clusters to create (k). Must be > 1. Default: 2. + * The number of clusters to create (k). Must be > 1. Default: 2. * @group param */ @Since("2.3.0") @@ -64,7 +64,7 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has def getInitMode: String = $(initMode) /** - * Param for the column name for ids returned by [[PowerIterationClustering.transform()]]. + * Param for the column name for ids returned by PowerIterationClustering.transform(). * Default: "id" * @group param */ @@ -76,7 +76,7 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has def getIdCol: String = $(idCol) /** - * Param for the column name for neighbors required by [[PowerIterationClustering.transform()]]. + * Param for the column name for neighbors required by PowerIterationClustering.transform(). * Default: "neighbor" * @group param */ From 752b685892c1dbdf69811504985640e59756f679 Mon Sep 17 00:00:00 2001 From: wangmiao1981 Date: Wed, 25 Oct 2017 16:16:55 -0700 Subject: [PATCH 25/26] address review comments --- .../apache/spark/ml/clustering/PowerIterationClustering.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 7387cc61c073e..6a2af105dff07 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -180,8 +180,8 @@ class PowerIterationClustering private[clustering] ( case Row(id: Long, nbr: Vector, weight: Vector) => require(nbr.size == weight.size, "The length of neighbor list must be equal to the the length of the weight list.") - val ids = Array.fill(nbr.size)(id) - for (i <- 0 until ids.size) yield (ids(i), nbr(i).toLong, weight(i))} + nbr.toArray.toIterator.zip(weight.toArray.toIterator) + .map(x => (id, x._1.toLong, x._2.toLong))} val algorithm = new MLlibPowerIterationClustering() .setK($(k)) .setInitializationMode($(initMode)) From cfa18af7ed27eccebc7af97be8d7e1f4227a5ffa Mon Sep 17 00:00:00 2001 From: wangmiao1981 Date: Mon, 30 Oct 2017 14:44:24 -0700 Subject: [PATCH 26/26] fix unit test --- .../apache/spark/ml/clustering/PowerIterationClustering.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 6a2af105dff07..a255ac4adc305 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -181,7 +181,7 @@ class PowerIterationClustering private[clustering] ( require(nbr.size == weight.size, "The length of neighbor list must be equal to the the length of the weight list.") nbr.toArray.toIterator.zip(weight.toArray.toIterator) - .map(x => (id, x._1.toLong, x._2.toLong))} + .map(x => (id, x._1.toLong, x._2))} val algorithm = new MLlibPowerIterationClustering() .setK($(k)) .setInitializationMode($(initMode))