From a843a0b8d8583ec885d354fef83a3f517a83c2a9 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Tue, 29 Jan 2019 14:44:19 +1100 Subject: [PATCH 1/3] [SPARK-26757][GraphX] Return 0 for `count` on empty Edge/Vertex RDDs Previously a "java.lang.UnsupportedOperationException: empty collection" exception would be thrown due to using `reduce`, rather than `fold` or similar that can tolerate empty RDDs. This behaviour has existed for the Vertex RDDs since it was introduced in b30e0ae0351be1cbc0b1cf179293587b466ee026. It seems this behaviour was inherited by the Edge RDDs via copy-paste in ee29ef3800438501e0ff207feb00a28973fc0769. --- .../org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 2 +- .../org/apache/spark/graphx/impl/VertexRDDImpl.scala | 2 +- .../scala/org/apache/spark/graphx/EdgeRDDSuite.scala | 11 +++++++++++ .../org/apache/spark/graphx/VertexRDDSuite.scala | 11 +++++++++++ 4 files changed, 24 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 376c7b06f9d2b..eb8abd1846d0e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -87,7 +87,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( /** The number of edges in the RDD. */ override def count(): Long = { - partitionsRDD.map(_._2.size.toLong).reduce(_ + _) + partitionsRDD.map(_._2.size.toLong).fold(0)(_ + _) } override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, VD] = diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 3c6f22d97360d..2da9762fb0452 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -87,7 +87,7 @@ class VertexRDDImpl[VD] private[graphx] ( /** The number of vertices in the RDD. */ override def count(): Long = { - partitionsRDD.map(_.size.toLong).reduce(_ + _) + partitionsRDD.map(_.size.toLong).fold(0)(_ + _) } override private[graphx] def mapVertexPartitions[VD2: ClassTag]( diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala index 7a24e320c3e04..0ae55d50858a7 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala @@ -60,4 +60,15 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext { } } + test("count") { + withSpark { sc => + val empty = EdgeRDD.fromEdges(sc.emptyRDD[Edge[Int]]) + assert(empty.count === 0) + + val n = 100 + val edges = List(Edge(0, 1, ()), Edge(1, 2, ()), Edge(2, 0, ())) + val nonempty = EdgeRDD.fromEdges(sc.parallelize(edges)) + assert(nonempty.count === edges.size) + } + } } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index 8e630435279de..434e6a84edf6d 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -223,4 +223,15 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext { assert(verts.collect().toSeq === data) // test checkpointed RDD } } + + test("count") { + withSpark { sc => + val empty = VertexRDD(sc.emptyRDD[(Long, Unit)]) + assert(empty.count === 0) + + val n = 100 + val nonempty = vertices(sc, n) + assert(nonempty.count === n + 1) + } + } } From 1f9dd8f5c9c70a89eb66cc41e7b2d4c61fe87e50 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Thu, 31 Jan 2019 13:41:53 +1100 Subject: [PATCH 2/3] [SPARK-26757][GraphX] Remove unused variable --- graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala index 0ae55d50858a7..8fd3e6f5229cc 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala @@ -65,7 +65,6 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext { val empty = EdgeRDD.fromEdges(sc.emptyRDD[Edge[Int]]) assert(empty.count === 0) - val n = 100 val edges = List(Edge(0, 1, ()), Edge(1, 2, ()), Edge(2, 0, ())) val nonempty = EdgeRDD.fromEdges(sc.parallelize(edges)) assert(nonempty.count === edges.size) From de47631a0db84d285e4daafb5fdfffcace96a7c2 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Thu, 31 Jan 2019 13:42:18 +1100 Subject: [PATCH 3/3] [SPARK-26757][GraphX] Make SVD++ work with no edges Similar to `VertexRDD.count` and `EdgeRDD.count`, this used `reduce` which fails on an empty RDD. --- .../scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala | 2 +- .../org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala index 59fdd855e6f37..2847a4e172d40 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala @@ -72,7 +72,7 @@ object SVDPlusPlus { // calculate global rating mean edges.cache() - val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2)) + val (rs, rc) = edges.map(e => (e.attr, 1L)).fold((0, 0))((a, b) => (a._1 + b._1, a._2 + b._2)) val u = rs / rc // construct graph diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala index 2991438f5e57e..da0457c354b51 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala @@ -40,4 +40,13 @@ class SVDPlusPlusSuite extends SparkFunSuite with LocalSparkContext { } } + test("Test SVD++ with no edges") { + withSpark { sc => + val edges = sc.emptyRDD[Edge[Double]] + val conf = new SVDPlusPlus.Conf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations + val (graph, _) = SVDPlusPlus.run(edges, conf) + assert(graph.vertices.count == 0) + assert(graph.edges.count == 0) + } + } }