From c1ee8fe3e1e8e63599cf0ee077d85aae057bfa32 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Sat, 23 May 2015 21:49:33 +0200 Subject: [PATCH 1/4] Add Scala code in module spark to expose utility functions for display --- spark/pom.xml | 36 +++++ .../zeppelin/spark/SparkInterpreter.java | 3 + .../zeppelin/spark/utils/DisplayUtils.scala | 70 ++++++++++ .../spark/utils/DisplayFunctionsTest.scala | 130 ++++++++++++++++++ zeppelin-interpreter/pom.xml | 1 + 5 files changed, 240 insertions(+) create mode 100644 spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala create mode 100644 spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala diff --git a/spark/pom.xml b/spark/pom.xml index 0475515305e..71fb8a69550 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -367,6 +367,14 @@ 1.1 + + + org.scalatest + scalatest_${scala.binary.version} + 2.2.4 + test + + junit junit @@ -760,6 +768,34 @@ + + + org.scala-tools + maven-scala-plugin + + + compile + + compile + + compile + + + test-compile + + testCompile + + test-compile + + + process-resources + + compile + + + + + diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 935b2a59c19..68732cf590b 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -461,6 +461,9 @@ public void open() { intp.interpret("import org.apache.spark.sql.functions._"); } + // Utility functions for display + intp.interpret("import org.apache.zeppelin.spark.utils.DisplayUtils._"); + // add jar if (depInterpreter != null) { DependencyContext depc = depInterpreter.getDependencyContext(); diff --git a/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala b/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala new file mode 100644 index 00000000000..45991af5780 --- /dev/null +++ b/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark.utils + +import java.lang.StringBuilder +import org.apache.spark.rdd.RDD +import scala.collection.IterableLike + +object DisplayUtils { + + implicit def toDisplayFunctions[T <: Product](rdd: RDD[T]): DisplayFunctions[T] = new DisplayFunctions[T](rdd) + + def html(htmlContent: String = "") = s"%html $htmlContent" + + def img64(base64Content: String = "") = s"%img $base64Content" + + def img(url: String) = s"" +} + +class DisplayFunctions[T <: Product](val rdd: RDD[T]) { + def displayAsTable(columnLabels: String*): Unit = { + val providedLabelCount: Int = columnLabels.size + var maxColumnCount:Int = 1 + val headers = new StringBuilder("%table ") + + val data = new StringBuilder("") + rdd.collect().foreach(tuple => { + maxColumnCount = math.max(maxColumnCount,tuple.productArity) + data.append(tuple.productIterator.mkString("\t")).append("\n") + }) + + + if (providedLabelCount > maxColumnCount) { + headers.append(columnLabels.take(maxColumnCount).mkString("\t")).append("\n") + } else if (providedLabelCount < maxColumnCount) { + val missingColumnHeaders = ((providedLabelCount+1) to maxColumnCount).foldLeft[String](""){ + (stringAccumulator,index) => { + if (index==1) s"Column$index" else s"$stringAccumulator\tColumn$index" + } + } + + headers.append(columnLabels.mkString("\t")).append(missingColumnHeaders).append("\n") + } else { + headers.append(columnLabels.mkString("\t")).append("\n") + } + + headers.append(data) + + print(headers.toString) + } + + +} + + diff --git a/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala b/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala new file mode 100644 index 00000000000..97762a207b2 --- /dev/null +++ b/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark.utils + +import java.io.ByteArrayOutputStream + +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.{BeforeAndAfter, _} + +case class Person(login : String, name: String, age: Int) + +class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAfterEach with Matchers { + var sc: SparkContext = null + var testRDDTuples: RDD[(String,String,Int)] = null + var testRDDPersons: RDD[Person] = null + var stream: ByteArrayOutputStream = null + + before { + val sparkConf: SparkConf = new SparkConf(true) + .setAppName("test-DisplayFunctions") + .setMaster("local") + sc = new SparkContext(sparkConf) + testRDDTuples = sc.parallelize(List(("jdoe","John DOE",32),("hsue","Helen SUE",27),("rsmith","Richard SMITH",45))) + testRDDPersons = sc.parallelize(List(Person("jdoe","John DOE",32),Person("hsue","Helen SUE",27),Person("rsmith","Richard SMITH",45))) + } + + override def beforeEach() { + stream = new java.io.ByteArrayOutputStream() + super.beforeEach() // To be stackable, must call super.beforeEach + } + + + "DisplayFunctions" should "generate correct column headers for tuples" in { + + Console.withOut(stream) { + new DisplayFunctions[(String,String,Int)](testRDDTuples).displayAsTable("Login","Name","Age") + } + + stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + + "jdoe\tJohn DOE\t32\n" + + "hsue\tHelen SUE\t27\n" + + "rsmith\tRichard SMITH\t45\n") + } + + "DisplayFunctions" should "generate correct column headers for case class" in { + + Console.withOut(stream) { + new DisplayFunctions[Person](testRDDPersons).displayAsTable("Login","Name","Age") + } + + stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + + "jdoe\tJohn DOE\t32\n" + + "hsue\tHelen SUE\t27\n" + + "rsmith\tRichard SMITH\t45\n") + } + + "DisplayFunctions" should "truncate exceeding column headers for tuples" in { + + Console.withOut(stream) { + new DisplayFunctions[(String,String,Int)](testRDDTuples).displayAsTable("Login","Name","Age","xxx","yyy") + } + + stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + + "jdoe\tJohn DOE\t32\n" + + "hsue\tHelen SUE\t27\n" + + "rsmith\tRichard SMITH\t45\n") + } + + "DisplayFunctions" should "pad missing column headers with ColumnXXX for tuples" in { + + Console.withOut(stream) { + new DisplayFunctions[(String,String,Int)](testRDDTuples).displayAsTable("Login") + } + + stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" + + "jdoe\tJohn DOE\t32\n" + + "hsue\tHelen SUE\t27\n" + + "rsmith\tRichard SMITH\t45\n") + } + + "DisplayFunctions" should "pad all missing column headers with ColumnXXX for tuples" in { + + Console.withOut(stream) { + new DisplayFunctions[(String,String,Int)](testRDDTuples).displayAsTable() + } + + stream.toString("UTF-8") should be("%table Column1\tColumn2\tColumn3\n" + + "jdoe\tJohn DOE\t32\n" + + "hsue\tHelen SUE\t27\n" + + "rsmith\tRichard SMITH\t45\n") + } + + "DisplayUtils" should "display HTML" in { + DisplayUtils.html() should be ("%html ") + DisplayUtils.html("test") should be ("%html test") + } + + "DisplayUtils" should "display img" in { + DisplayUtils.img("http://www.google.com") should be ("") + DisplayUtils.img64() should be ("%img ") + DisplayUtils.img64("abcde") should be ("%img abcde") + } + + override def afterEach() { + try super.afterEach() // To be stackable, must call super.afterEach + stream = null + } + + after { + sc.stop() + } + + +} diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index 980fe4ac568..c2a67524a95 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -36,6 +36,7 @@ http://zeppelin.incubator.apache.org + org.apache.thrift libthrift From a15294ea79c03bbaa9df479c12e22398b8cbb6ef Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Tue, 2 Jun 2015 22:15:39 +0200 Subject: [PATCH 2/4] Enhance display function utility to accept Scala Traversable in addition to RDD --- .../zeppelin/spark/utils/DisplayUtils.scala | 32 ++++++++++---- .../spark/utils/DisplayFunctionsTest.scala | 42 +++++++++++++------ 2 files changed, 54 insertions(+), 20 deletions(-) diff --git a/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala b/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala index 45991af5780..be0e69b0324 100644 --- a/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala +++ b/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala @@ -18,12 +18,16 @@ package org.apache.zeppelin.spark.utils import java.lang.StringBuilder + import org.apache.spark.rdd.RDD + import scala.collection.IterableLike object DisplayUtils { - implicit def toDisplayFunctions[T <: Product](rdd: RDD[T]): DisplayFunctions[T] = new DisplayFunctions[T](rdd) + implicit def toDisplayRDDFunctions[T <: Product](rdd: RDD[T]): DisplayRDDFunctions[T] = new DisplayRDDFunctions[T](rdd) + + implicit def toDisplayTraversableFunctions[T <: Product](traversable: Traversable[T]): DisplayTraversableFunctions[T] = new DisplayTraversableFunctions[T](traversable) def html(htmlContent: String = "") = s"%html $htmlContent" @@ -32,26 +36,25 @@ object DisplayUtils { def img(url: String) = s"" } -class DisplayFunctions[T <: Product](val rdd: RDD[T]) { - def displayAsTable(columnLabels: String*): Unit = { +trait DisplayCollection[T <: Product] { + + def printFormattedData(traversable: Traversable[T], columnLabels: String*): Unit = { val providedLabelCount: Int = columnLabels.size var maxColumnCount:Int = 1 val headers = new StringBuilder("%table ") val data = new StringBuilder("") - rdd.collect().foreach(tuple => { + + traversable.foreach(tuple => { maxColumnCount = math.max(maxColumnCount,tuple.productArity) data.append(tuple.productIterator.mkString("\t")).append("\n") }) - if (providedLabelCount > maxColumnCount) { headers.append(columnLabels.take(maxColumnCount).mkString("\t")).append("\n") } else if (providedLabelCount < maxColumnCount) { val missingColumnHeaders = ((providedLabelCount+1) to maxColumnCount).foldLeft[String](""){ - (stringAccumulator,index) => { - if (index==1) s"Column$index" else s"$stringAccumulator\tColumn$index" - } + (stringAccumulator,index) => if (index==1) s"Column$index" else s"$stringAccumulator\tColumn$index" } headers.append(columnLabels.mkString("\t")).append(missingColumnHeaders).append("\n") @@ -64,7 +67,20 @@ class DisplayFunctions[T <: Product](val rdd: RDD[T]) { print(headers.toString) } +} + +class DisplayRDDFunctions[T <: Product] (val rdd: RDD[T]) extends DisplayCollection[T] { + + def displayAsTable(columnLabels: String*): Unit = { + printFormattedData(rdd.collect(), columnLabels: _*) + } +} + +class DisplayTraversableFunctions[T <: Product] (val traversable: Traversable[T]) extends DisplayCollection[T] { + def displayAsTable(columnLabels: String*): Unit = { + printFormattedData(traversable, columnLabels: _*) + } } diff --git a/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala b/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala index 97762a207b2..8deec4a0815 100644 --- a/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala +++ b/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala @@ -14,19 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.zeppelin.spark.utils import java.io.ByteArrayOutputStream import org.apache.spark.rdd.RDD -import org.apache.spark.{SparkConf, SparkContext} -import org.scalatest.{BeforeAndAfter, _} +import org.apache.spark.{SparkContext, SparkConf} +import org.scalatest._ +import org.scalatest.{BeforeAndAfter} case class Person(login : String, name: String, age: Int) class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAfterEach with Matchers { var sc: SparkContext = null + var testTuples:List[(String, String, Int)] = null + var testPersons:List[Person] = null var testRDDTuples: RDD[(String,String,Int)] = null var testRDDPersons: RDD[Person] = null var stream: ByteArrayOutputStream = null @@ -36,8 +38,10 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf .setAppName("test-DisplayFunctions") .setMaster("local") sc = new SparkContext(sparkConf) - testRDDTuples = sc.parallelize(List(("jdoe","John DOE",32),("hsue","Helen SUE",27),("rsmith","Richard SMITH",45))) - testRDDPersons = sc.parallelize(List(Person("jdoe","John DOE",32),Person("hsue","Helen SUE",27),Person("rsmith","Richard SMITH",45))) + testTuples = List(("jdoe", "John DOE", 32), ("hsue", "Helen SUE", 27), ("rsmith", "Richard SMITH", 45)) + testRDDTuples = sc.parallelize(testTuples) + testPersons = List(Person("jdoe", "John DOE", 32), Person("hsue", "Helen SUE", 27), Person("rsmith", "Richard SMITH", 45)) + testRDDPersons = sc.parallelize(testPersons) } override def beforeEach() { @@ -49,7 +53,7 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf "DisplayFunctions" should "generate correct column headers for tuples" in { Console.withOut(stream) { - new DisplayFunctions[(String,String,Int)](testRDDTuples).displayAsTable("Login","Name","Age") + new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).displayAsTable("Login","Name","Age") } stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + @@ -61,7 +65,7 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf "DisplayFunctions" should "generate correct column headers for case class" in { Console.withOut(stream) { - new DisplayFunctions[Person](testRDDPersons).displayAsTable("Login","Name","Age") + new DisplayRDDFunctions[Person](testRDDPersons).displayAsTable("Login","Name","Age") } stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + @@ -73,7 +77,7 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf "DisplayFunctions" should "truncate exceeding column headers for tuples" in { Console.withOut(stream) { - new DisplayFunctions[(String,String,Int)](testRDDTuples).displayAsTable("Login","Name","Age","xxx","yyy") + new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).displayAsTable("Login","Name","Age","xxx","yyy") } stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + @@ -85,7 +89,7 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf "DisplayFunctions" should "pad missing column headers with ColumnXXX for tuples" in { Console.withOut(stream) { - new DisplayFunctions[(String,String,Int)](testRDDTuples).displayAsTable("Login") + new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).displayAsTable("Login") } stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" + @@ -94,13 +98,25 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf "rsmith\tRichard SMITH\t45\n") } - "DisplayFunctions" should "pad all missing column headers with ColumnXXX for tuples" in { + "DisplayFunctions" should "display traversable of tuples" in { + + Console.withOut(stream) { + new DisplayTraversableFunctions[(String,String,Int)](testTuples).displayAsTable("Login","Name","Age") + } + + stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + + "jdoe\tJohn DOE\t32\n" + + "hsue\tHelen SUE\t27\n" + + "rsmith\tRichard SMITH\t45\n") + } + + "DisplayFunctions" should "display traversable of case class" in { Console.withOut(stream) { - new DisplayFunctions[(String,String,Int)](testRDDTuples).displayAsTable() + new DisplayTraversableFunctions[Person](testPersons).displayAsTable("Login","Name","Age") } - stream.toString("UTF-8") should be("%table Column1\tColumn2\tColumn3\n" + + stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + "jdoe\tJohn DOE\t32\n" + "hsue\tHelen SUE\t27\n" + "rsmith\tRichard SMITH\t45\n") @@ -128,3 +144,5 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf } + + From 47a1b1f4b96cb9c32eb22546f44151d975318d26 Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Fri, 12 Jun 2015 11:31:15 +0200 Subject: [PATCH 3/4] Rename displayAsTable() to display() --- .../apache/zeppelin/spark/utils/DisplayUtils.scala | 4 ++-- .../zeppelin/spark/utils/DisplayFunctionsTest.scala | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala b/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala index be0e69b0324..bc93de0ee6c 100644 --- a/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala +++ b/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala @@ -71,14 +71,14 @@ trait DisplayCollection[T <: Product] { class DisplayRDDFunctions[T <: Product] (val rdd: RDD[T]) extends DisplayCollection[T] { - def displayAsTable(columnLabels: String*): Unit = { + def display(columnLabels: String*): Unit = { printFormattedData(rdd.collect(), columnLabels: _*) } } class DisplayTraversableFunctions[T <: Product] (val traversable: Traversable[T]) extends DisplayCollection[T] { - def displayAsTable(columnLabels: String*): Unit = { + def display(columnLabels: String*): Unit = { printFormattedData(traversable, columnLabels: _*) } } diff --git a/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala b/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala index 8deec4a0815..753dbf31ab5 100644 --- a/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala +++ b/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala @@ -53,7 +53,7 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf "DisplayFunctions" should "generate correct column headers for tuples" in { Console.withOut(stream) { - new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).displayAsTable("Login","Name","Age") + new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login","Name","Age") } stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + @@ -65,7 +65,7 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf "DisplayFunctions" should "generate correct column headers for case class" in { Console.withOut(stream) { - new DisplayRDDFunctions[Person](testRDDPersons).displayAsTable("Login","Name","Age") + new DisplayRDDFunctions[Person](testRDDPersons).display("Login","Name","Age") } stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + @@ -77,7 +77,7 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf "DisplayFunctions" should "truncate exceeding column headers for tuples" in { Console.withOut(stream) { - new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).displayAsTable("Login","Name","Age","xxx","yyy") + new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login","Name","Age","xxx","yyy") } stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + @@ -89,7 +89,7 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf "DisplayFunctions" should "pad missing column headers with ColumnXXX for tuples" in { Console.withOut(stream) { - new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).displayAsTable("Login") + new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login") } stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" + @@ -101,7 +101,7 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf "DisplayFunctions" should "display traversable of tuples" in { Console.withOut(stream) { - new DisplayTraversableFunctions[(String,String,Int)](testTuples).displayAsTable("Login","Name","Age") + new DisplayTraversableFunctions[(String,String,Int)](testTuples).display("Login","Name","Age") } stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + @@ -113,7 +113,7 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf "DisplayFunctions" should "display traversable of case class" in { Console.withOut(stream) { - new DisplayTraversableFunctions[Person](testPersons).displayAsTable("Login","Name","Age") + new DisplayTraversableFunctions[Person](testPersons).display("Login","Name","Age") } stream.toString("UTF-8") should be("%table Login\tName\tAge\n" + From 62a2311a4a2411bb2731503fea7a84745ceb120e Mon Sep 17 00:00:00 2001 From: DuyHai DOAN Date: Fri, 12 Jun 2015 13:35:48 +0200 Subject: [PATCH 4/4] Add default limit to RDD using zeppelin.spark.maxResult property --- .../zeppelin/spark/SparkInterpreter.java | 5 +++ .../zeppelin/spark/utils/DisplayUtils.scala | 10 ++++-- .../spark/utils/DisplayFunctionsTest.scala | 33 ++++++++++++++++--- 3 files changed, 41 insertions(+), 7 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 68732cf590b..1c4c5e7c9cd 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -464,6 +464,11 @@ public void open() { // Utility functions for display intp.interpret("import org.apache.zeppelin.spark.utils.DisplayUtils._"); + // Scala implicit value for spark.maxResult + intp.interpret("import org.apache.zeppelin.spark.utils.SparkMaxResult"); + intp.interpret("implicit val sparkMaxResult = new SparkMaxResult(" + + Integer.parseInt(getProperty("zeppelin.spark.maxResult")) + ")"); + // add jar if (depInterpreter != null) { DependencyContext depc = depInterpreter.getDependencyContext(); diff --git a/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala b/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala index bc93de0ee6c..81814349c18 100644 --- a/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala +++ b/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala @@ -71,8 +71,12 @@ trait DisplayCollection[T <: Product] { class DisplayRDDFunctions[T <: Product] (val rdd: RDD[T]) extends DisplayCollection[T] { - def display(columnLabels: String*): Unit = { - printFormattedData(rdd.collect(), columnLabels: _*) + def display(columnLabels: String*)(implicit sparkMaxResult: SparkMaxResult): Unit = { + printFormattedData(rdd.take(sparkMaxResult.maxResult), columnLabels: _*) + } + + def display(sparkMaxResult:Int, columnLabels: String*): Unit = { + printFormattedData(rdd.take(sparkMaxResult), columnLabels: _*) } } @@ -83,4 +87,4 @@ class DisplayTraversableFunctions[T <: Product] (val traversable: Traversable[T] } } - +class SparkMaxResult(val maxResult: Int) extends Serializable diff --git a/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala b/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala index 753dbf31ab5..2638f1710e9 100644 --- a/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala +++ b/spark/src/test/scala/org/apache/zeppelin/spark/utils/DisplayFunctionsTest.scala @@ -51,7 +51,7 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf "DisplayFunctions" should "generate correct column headers for tuples" in { - + implicit val sparkMaxResult = new SparkMaxResult(100) Console.withOut(stream) { new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login","Name","Age") } @@ -63,7 +63,7 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf } "DisplayFunctions" should "generate correct column headers for case class" in { - + implicit val sparkMaxResult = new SparkMaxResult(100) Console.withOut(stream) { new DisplayRDDFunctions[Person](testRDDPersons).display("Login","Name","Age") } @@ -75,7 +75,7 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf } "DisplayFunctions" should "truncate exceeding column headers for tuples" in { - + implicit val sparkMaxResult = new SparkMaxResult(100) Console.withOut(stream) { new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login","Name","Age","xxx","yyy") } @@ -87,7 +87,7 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf } "DisplayFunctions" should "pad missing column headers with ColumnXXX for tuples" in { - + implicit val sparkMaxResult = new SparkMaxResult(100) Console.withOut(stream) { new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login") } @@ -98,6 +98,31 @@ class DisplayFunctionsTest extends FlatSpec with BeforeAndAfter with BeforeAndAf "rsmith\tRichard SMITH\t45\n") } + "DisplayUtils" should "restricts RDD to sparkMaxresult with implicit limit" in { + + implicit val sparkMaxResult = new SparkMaxResult(2) + + Console.withOut(stream) { + new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display("Login") + } + + stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" + + "jdoe\tJohn DOE\t32\n" + + "hsue\tHelen SUE\t27\n") + } + + "DisplayUtils" should "restricts RDD to sparkMaxresult with explicit limit" in { + + implicit val sparkMaxResult = new SparkMaxResult(2) + + Console.withOut(stream) { + new DisplayRDDFunctions[(String,String,Int)](testRDDTuples).display(1,"Login") + } + + stream.toString("UTF-8") should be("%table Login\tColumn2\tColumn3\n" + + "jdoe\tJohn DOE\t32\n") + } + "DisplayFunctions" should "display traversable of tuples" in { Console.withOut(stream) {