From 88618eacec283619b8ff16f8687cb506af2eef34 Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Thu, 26 Nov 2015 03:23:06 +0100 Subject: [PATCH 1/7] [SPARK-11932] Implement word2vec model partitioning --- .../apache/spark/mllib/feature/Word2Vec.scala | 41 ++++++++++--------- .../spark/mllib/feature/Word2VecSuite.scala | 21 ++++++++++ 2 files changed, 43 insertions(+), 19 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index a47f27b0afb14..6f44c97fe56d9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -41,15 +41,15 @@ import org.apache.spark.util.random.XORShiftRandom import org.apache.spark.sql.SQLContext /** - * Entry in vocabulary + * Entry in vocabulary */ private case class VocabWord( - var word: String, - var cn: Int, - var point: Array[Int], - var code: Array[Int], - var codeLen: Int -) + var word: String, + var cn: Int, + var point: Array[Int], + var code: Array[Int], + var codeLen: Int + ) /** * Word2Vec creates vector representation of words in a text corpus. @@ -392,8 +392,8 @@ class Word2Vec extends Serializable with Logging { }.flatten } val synAgg = partial.reduceByKey { case (v1, v2) => - blas.saxpy(vectorSize, 1.0f, v2, 1, v1, 1) - v1 + blas.saxpy(vectorSize, 1.0f, v2, 1, v1, 1) + v1 }.collect() var i = 0 while (i < synAgg.length) { @@ -432,9 +432,9 @@ class Word2Vec extends Serializable with Logging { * (i * vectorSize, i * vectorSize + vectorSize) */ @Since("1.1.0") -class Word2VecModel private[spark] ( - private[spark] val wordIndex: Map[String, Int], - private[spark] val wordVectors: Array[Float]) extends Serializable with Saveable { +class Word2VecModel private[spark]( + private[spark] val wordIndex: Map[String, Int], + private[spark] val wordVectors: Array[Float]) extends Serializable with Saveable { private val numWords = wordIndex.size // vectorSize: Dimension of each word's vector. @@ -535,7 +535,7 @@ class Word2VecModel private[spark] ( } wordList.zip(cosVec) .toSeq - .sortBy(- _._2) + .sortBy(-_._2) .take(num + 1) .tail .toArray @@ -600,12 +600,15 @@ object Word2VecModel extends Loader[Word2VecModel] { val vectorSize = model.values.head.size val numWords = model.size val metadata = compact(render - (("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ - ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords))) + (("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ + ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + val partitionSize = 33554432l //32MB + val approxSize = 4l * numWords * vectorSize //4 Byte is sizeOf(float) + val nPartitions = ((approxSize / partitionSize) + 1).toInt val dataArray = model.toSeq.map { case (w, v) => Data(w, v) } - sc.parallelize(dataArray.toSeq, 1).toDF().write.parquet(Loader.dataPath(path)) + sc.parallelize(dataArray.toSeq, nPartitions).toDF().write.parquet(Loader.dataPath(path)) } } @@ -624,14 +627,14 @@ object Word2VecModel extends Loader[Word2VecModel] { val numWords = model.getVectors.size require(expectedVectorSize == vectorSize, s"Word2VecModel requires each word to be mapped to a vector of size " + - s"$expectedVectorSize, got vector of size $vectorSize") + s"$expectedVectorSize, got vector of size $vectorSize") require(expectedNumWords == numWords, s"Word2VecModel requires $expectedNumWords words, but got $numWords") model case _ => throw new Exception( s"Word2VecModel.load did not recognize model with (className, format version):" + - s"($loadedClassName, $loadedVersion). Supported:\n" + - s" ($classNameV1_0, 1.0)") + s"($loadedClassName, $loadedVersion). Supported:\n" + + s" ($classNameV1_0, 1.0)") } } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala index a864eec460f2b..2d492dc66754b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala @@ -92,4 +92,25 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { } } + + test("big model load / save") { + val word2VecMap = Map( + (0 to 9000) + .map(i => s"$i" -> Array.fill(1000)(0.1f)):_* + ) + val model = new Word2VecModel(word2VecMap) + + val tempDir = Utils.createTempDir() + val path = tempDir.toURI.toString + + try { + model.save(sc, path) + val sameModel = Word2VecModel.load(sc, path) + assert(sameModel.getVectors.mapValues(_.toSeq) === model.getVectors.mapValues(_.toSeq)) + } finally { + Utils.deleteRecursively(tempDir) + } + } + + } From 2f7052303a9b74bc720f6c9c936b17d7621917d1 Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Thu, 26 Nov 2015 03:23:06 +0100 Subject: [PATCH 2/7] [SPARK-11932] Implement word2vec model partitioning --- .../apache/spark/mllib/feature/Word2Vec.scala | 9 +++++--- .../spark/mllib/feature/Word2VecSuite.scala | 21 +++++++++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index a47f27b0afb14..71ab82b1d1ee8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -600,12 +600,15 @@ object Word2VecModel extends Loader[Word2VecModel] { val vectorSize = model.values.head.size val numWords = model.size val metadata = compact(render - (("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ - ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords))) + (("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ + ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + val partitionSize = 33554432l //32MB + val approxSize = 4l * numWords * vectorSize //4 Byte is sizeOf(float) + val nPartitions = ((approxSize / partitionSize) + 1).toInt val dataArray = model.toSeq.map { case (w, v) => Data(w, v) } - sc.parallelize(dataArray.toSeq, 1).toDF().write.parquet(Loader.dataPath(path)) + sc.parallelize(dataArray.toSeq, nPartitions).toDF().write.parquet(Loader.dataPath(path)) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala index a864eec460f2b..2d492dc66754b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala @@ -92,4 +92,25 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { } } + + test("big model load / save") { + val word2VecMap = Map( + (0 to 9000) + .map(i => s"$i" -> Array.fill(1000)(0.1f)):_* + ) + val model = new Word2VecModel(word2VecMap) + + val tempDir = Utils.createTempDir() + val path = tempDir.toURI.toString + + try { + model.save(sc, path) + val sameModel = Word2VecModel.load(sc, path) + assert(sameModel.getVectors.mapValues(_.toSeq) === model.getVectors.mapValues(_.toSeq)) + } finally { + Utils.deleteRecursively(tempDir) + } + } + + } From 38e6750e957ce0c136efbb3356266ca382af6782 Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Thu, 26 Nov 2015 03:27:19 +0100 Subject: [PATCH 3/7] [SPARK-11932] Revert format/spaces unwanted changes --- .../apache/spark/mllib/feature/Word2Vec.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 6f44c97fe56d9..71ab82b1d1ee8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -41,15 +41,15 @@ import org.apache.spark.util.random.XORShiftRandom import org.apache.spark.sql.SQLContext /** - * Entry in vocabulary + * Entry in vocabulary */ private case class VocabWord( - var word: String, - var cn: Int, - var point: Array[Int], - var code: Array[Int], - var codeLen: Int - ) + var word: String, + var cn: Int, + var point: Array[Int], + var code: Array[Int], + var codeLen: Int +) /** * Word2Vec creates vector representation of words in a text corpus. @@ -392,8 +392,8 @@ class Word2Vec extends Serializable with Logging { }.flatten } val synAgg = partial.reduceByKey { case (v1, v2) => - blas.saxpy(vectorSize, 1.0f, v2, 1, v1, 1) - v1 + blas.saxpy(vectorSize, 1.0f, v2, 1, v1, 1) + v1 }.collect() var i = 0 while (i < synAgg.length) { @@ -432,9 +432,9 @@ class Word2Vec extends Serializable with Logging { * (i * vectorSize, i * vectorSize + vectorSize) */ @Since("1.1.0") -class Word2VecModel private[spark]( - private[spark] val wordIndex: Map[String, Int], - private[spark] val wordVectors: Array[Float]) extends Serializable with Saveable { +class Word2VecModel private[spark] ( + private[spark] val wordIndex: Map[String, Int], + private[spark] val wordVectors: Array[Float]) extends Serializable with Saveable { private val numWords = wordIndex.size // vectorSize: Dimension of each word's vector. @@ -535,7 +535,7 @@ class Word2VecModel private[spark]( } wordList.zip(cosVec) .toSeq - .sortBy(-_._2) + .sortBy(- _._2) .take(num + 1) .tail .toArray @@ -627,14 +627,14 @@ object Word2VecModel extends Loader[Word2VecModel] { val numWords = model.getVectors.size require(expectedVectorSize == vectorSize, s"Word2VecModel requires each word to be mapped to a vector of size " + - s"$expectedVectorSize, got vector of size $vectorSize") + s"$expectedVectorSize, got vector of size $vectorSize") require(expectedNumWords == numWords, s"Word2VecModel requires $expectedNumWords words, but got $numWords") model case _ => throw new Exception( s"Word2VecModel.load did not recognize model with (className, format version):" + - s"($loadedClassName, $loadedVersion). Supported:\n" + - s" ($classNameV1_0, 1.0)") + s"($loadedClassName, $loadedVersion). Supported:\n" + + s" ($classNameV1_0, 1.0)") } } } From a2f6b0bd6276e017ef0adab2d3fd45a456e0b55a Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Thu, 26 Nov 2015 11:42:38 +0100 Subject: [PATCH 4/7] [SPARK-11932] Add comments and adjust code for better readability --- .../org/apache/spark/mllib/feature/Word2Vec.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 71ab82b1d1ee8..98e06f1874076 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -599,13 +599,18 @@ object Word2VecModel extends Loader[Word2VecModel] { val vectorSize = model.values.head.size val numWords = model.size - val metadata = compact(render - (("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ + val metadata = compact(render( + ("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) - val partitionSize = 33554432l //32MB - val approxSize = 4l * numWords * vectorSize //4 Byte is sizeOf(float) + //we want to partition the model in partitions of size 32MB + val partitionSize = (1L << 25) + //we calculate the approximate size of the model + //we only calculate the array size, not considering + //the string size, the formula is: + //floatSize * numWords * vectorSize + val approxSize = 4L * numWords * vectorSize val nPartitions = ((approxSize / partitionSize) + 1).toInt val dataArray = model.toSeq.map { case (w, v) => Data(w, v) } sc.parallelize(dataArray.toSeq, nPartitions).toDF().write.parquet(Loader.dataPath(path)) From fdbe2a7828a08e58d104f7f27b2ded4a4dc3192a Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Sat, 28 Nov 2015 10:36:35 +0100 Subject: [PATCH 5/7] [SPARK-11934] Fix Word2VecSuite code style --- .../org/apache/spark/mllib/feature/Word2VecSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala index 2d492dc66754b..a714ac709d613 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala @@ -94,10 +94,8 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { } test("big model load / save") { - val word2VecMap = Map( - (0 to 9000) - .map(i => s"$i" -> Array.fill(1000)(0.1f)):_* - ) + //create a model bigger than 32MB since 9000 * 1000 * 4 > 2^25 + val word2VecMap = Map((0 to 9000).map(i => s"$i" -> Array.fill(1000)(0.1f)): _*) val model = new Word2VecModel(word2VecMap) val tempDir = Utils.createTempDir() From e0724834996b7b8d65db33ed7cae31e8c94a7cf0 Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Sat, 28 Nov 2015 11:17:29 +0100 Subject: [PATCH 6/7] Add space after comment --- .../scala/org/apache/spark/mllib/feature/Word2VecSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala index a714ac709d613..37d01e2876695 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala @@ -94,7 +94,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { } test("big model load / save") { - //create a model bigger than 32MB since 9000 * 1000 * 4 > 2^25 + // create a model bigger than 32MB since 9000 * 1000 * 4 > 2^25 val word2VecMap = Map((0 to 9000).map(i => s"$i" -> Array.fill(1000)(0.1f)): _*) val model = new Word2VecModel(word2VecMap) From e286e66ec0187decfdcebca05af8323c4bcb5acb Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Sat, 28 Nov 2015 11:22:22 +0100 Subject: [PATCH 7/7] Add space after comment start --- .../org/apache/spark/mllib/feature/Word2Vec.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 98e06f1874076..945ac68413341 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -604,12 +604,12 @@ object Word2VecModel extends Loader[Word2VecModel] { ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) - //we want to partition the model in partitions of size 32MB + // We want to partition the model in partitions of size 32MB val partitionSize = (1L << 25) - //we calculate the approximate size of the model - //we only calculate the array size, not considering - //the string size, the formula is: - //floatSize * numWords * vectorSize + // We calculate the approximate size of the model + // We only calculate the array size, not considering + // the string size, the formula is: + // floatSize * numWords * vectorSize val approxSize = 4L * numWords * vectorSize val nPartitions = ((approxSize / partitionSize) + 1).toInt val dataArray = model.toSeq.map { case (w, v) => Data(w, v) }