diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala index b46a94d00c..13c4d288a3 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/Benchmark.scala @@ -24,11 +24,33 @@ import org.apache.spark.sql.SparkSession * Convenient runner for benchmarks. * * To run locally, use - * `$OPAQUE_HOME/build/sbt 'run edu.berkeley.cs.rise.opaque.benchmark.Benchmark'`. + * `$OPAQUE_HOME/build/sbt 'run edu.berkeley.cs.rise.opaque.benchmark.Benchmark '`. + * Available flags: + * --num-partitions: specify the number of partitions the data should be split into. + * Default: 2 * number of executors if exists, 4 otherwise + * --size: specify the size of the dataset that should be loaded into Spark. + * Default: sf_small + * --operations: select the different operations that should be benchmarked. + * Default: all + * Available operations: logistic-regression, tpc-h + * Syntax: --operations "logistic-regression,tpc-h" + * --run-local: boolean whether to use HDFS or the local filesystem + * Default: HDFS + * Leave --operations flag blank to run all benchmarks * * To run on a cluster, use `$SPARK_HOME/bin/spark-submit` with appropriate arguments. */ object Benchmark { + + val spark = SparkSession.builder() + .appName("Benchmark") + .getOrCreate() + var numPartitions = spark.sparkContext.defaultParallelism + var size = "sf_med" + + // Configure your HDFS namenode url here + var fileUrl = "hdfs://10.0.3.4:8020" + def dataDir: String = { if (System.getenv("SPARKSGX_DATA_DIR") == null) { throw new Exception("Set SPARKSGX_DATA_DIR") @@ -36,15 +58,9 @@ object Benchmark { System.getenv("SPARKSGX_DATA_DIR") } - def main(args: Array[String]): Unit = { - val spark = SparkSession.builder() - .appName("QEDBenchmark") - .getOrCreate() - Utils.initSQLContext(spark.sqlContext) - - // val numPartitions = - // if (spark.sparkContext.isLocal) 1 else spark.sparkContext.defaultParallelism - + def logisticRegression() = { + // TODO: this fails when Spark is ran on a cluster + /* // Warmup LogisticRegression.train(spark, Encrypted, 1000, 1) LogisticRegression.train(spark, Encrypted, 1000, 1) @@ -52,7 +68,73 @@ object Benchmark { // Run LogisticRegression.train(spark, Insecure, 100000, 1) LogisticRegression.train(spark, Encrypted, 100000, 1) + */ + } + def runAll() = { + logisticRegression() + TPCHBenchmark.run(spark.sqlContext, numPartitions, size, fileUrl) + } + + def main(args: Array[String]): Unit = { + Utils.initSQLContext(spark.sqlContext) + + if (args.length >= 2 && args(1) == "--help") { + println( +"""Available flags: + --num-partitions: specify the number of partitions the data should be split into. + Default: 2 * number of executors if exists, 4 otherwise + --size: specify the size of the dataset that should be loaded into Spark. + Default: sf_small + --operations: select the different operations that should be benchmarked. + Default: all + Available operations: logistic-regression, tpc-h + Syntax: --operations "logistic-regression,tpc-h" + Leave --operations flag blank to run all benchmarks + --run-local: boolean whether to use HDFS or the local filesystem + Default: HDFS""" + ) + } + + var runAll = true + args.slice(1, args.length).sliding(2, 2).toList.collect { + case Array("--num-partitions", numPartitions: String) => { + this.numPartitions = numPartitions.toInt + } + case Array("--size", size: String) => { + val supportedSizes = Set("sf_small, sf_med") + if (supportedSizes.contains(size)) { + this.size = size + } else { + println("Given size is not supported: available values are " + supportedSizes.toString()) + } + } + case Array("--run-local", runLocal: String) => { + runLocal match { + case "true" => { + fileUrl = "file://" + } + case _ => {} + } + } + case Array("--operations", operations: String) => { + runAll = false + val operationsArr = operations.split(",").map(_.trim) + for (operation <- operationsArr) { + operation match { + case "logistic-regression" => { + logisticRegression() + } + case "tpc-h" => { + TPCHBenchmark.run(spark.sqlContext, numPartitions, size, fileUrl) + } + } + } + } + } + if (runAll) { + this.runAll(); + } spark.stop() } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala index e0bb4d4caf..ee905026c8 100644 --- a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCH.scala @@ -17,6 +17,7 @@ package edu.berkeley.cs.rise.opaque.benchmark +import java.io.File import scala.io.Source import org.apache.spark.sql.DataFrame @@ -162,7 +163,7 @@ object TPCH { .option("delimiter", "|") .load(s"${Benchmark.dataDir}/tpch/$size/customer.tbl") - def generateMap( + def generateDFs( sqlContext: SQLContext, size: String) : Map[String, DataFrame] = { Map("part" -> part(sqlContext, size), @@ -175,42 +176,73 @@ object TPCH { "customer" -> customer(sqlContext, size) ), } - - def apply(sqlContext: SQLContext, size: String) : TPCH = { - val tpch = new TPCH(sqlContext, size) - tpch.tableNames = tableNames - tpch.nameToDF = generateMap(sqlContext, size) - tpch.ensureCached() - tpch - } } -class TPCH(val sqlContext: SQLContext, val size: String) { +class TPCH(val sqlContext: SQLContext, val size: String, val fileUrl: String) { - var tableNames : Seq[String] = Seq() - var nameToDF : Map[String, DataFrame] = Map() + val tableNames = TPCH.tableNames + val nameToDF = TPCH.generateDFs(sqlContext, size) - def ensureCached() = { - for (name <- tableNames) { - nameToDF.get(name).foreach(df => { - Utils.ensureCached(df) - Utils.ensureCached(Encrypted.applyTo(df)) - }) - } + private var numPartitions: Int = -1 + private var nameToPath = Map[String, File]() + private var nameToEncryptedPath = Map[String, File]() + + def getQuery(queryNumber: Int) : String = { + val queryLocation = sys.env.getOrElse("OPAQUE_HOME", ".") + "/src/test/resources/tpch/" + Source.fromFile(queryLocation + s"q$queryNumber.sql").getLines().mkString("\n") } - def setupViews(securityLevel: SecurityLevel, numPartitions: Int) = { - for ((name, df) <- nameToDF) { - securityLevel.applyTo(df.repartition(numPartitions)).createOrReplaceTempView(name) + def generateFiles(numPartitions: Int) = { + if (numPartitions != this.numPartitions) { + this.numPartitions = numPartitions + for ((name, df) <- nameToDF) { + nameToPath.get(name).foreach{ path => Utils.deleteRecursively(path) } + + nameToPath += (name -> createPath(df, Insecure, numPartitions)) + nameToEncryptedPath += (name -> createPath(df, Encrypted, numPartitions)) + } } } - def query(queryNumber: Int, securityLevel: SecurityLevel, sqlContext: SQLContext, numPartitions: Int) : DataFrame = { - setupViews(securityLevel, numPartitions) + private def createPath(df: DataFrame, securityLevel: SecurityLevel, numPartitions: Int): File = { + val partitionedDF = securityLevel.applyTo(df.repartition(numPartitions)) + val path = Utils.createTempDir() + path.delete() + securityLevel match { + case Insecure => { + partitionedDF.write.format("com.databricks.spark.csv") + .option("ignoreLeadingWhiteSpace", false) + .option("ignoreTrailingWhiteSpace", false) + .save(fileUrl + path.toString) + } + case Encrypted => { + partitionedDF.write.format("edu.berkeley.cs.rise.opaque.EncryptedSource").save(fileUrl + path.toString) + } + } + path + } - val queryLocation = sys.env.getOrElse("OPAQUE_HOME", ".") + "/src/test/resources/tpch/" - val sqlStr = Source.fromFile(queryLocation + s"q$queryNumber.sql").getLines().mkString("\n") + private def loadViews(securityLevel: SecurityLevel) = { + val (map, formatStr) = if (securityLevel == Insecure) + (nameToPath, "com.databricks.spark.csv") else + (nameToEncryptedPath, "edu.berkeley.cs.rise.opaque.EncryptedSource") + for ((name, path) <- map) { + val df = sqlContext.sparkSession.read + .format(formatStr) + .schema(nameToDF.get(name).get.schema) + .load(fileUrl + path.toString) + df.createOrReplaceTempView(name) + } + } + def performQuery(sqlStr: String, securityLevel: SecurityLevel): DataFrame = { + loadViews(securityLevel) sqlContext.sparkSession.sql(sqlStr) } + + def query(queryNumber: Int, securityLevel: SecurityLevel, numPartitions: Int): DataFrame = { + val sqlStr = getQuery(queryNumber) + generateFiles(numPartitions) + performQuery(sqlStr, securityLevel) + } } diff --git a/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala new file mode 100644 index 0000000000..14d71a1d0c --- /dev/null +++ b/src/main/scala/edu/berkeley/cs/rise/opaque/benchmark/TPCHBenchmark.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edu.berkeley.cs.rise.opaque.benchmark + +import edu.berkeley.cs.rise.opaque.Utils + +import org.apache.spark.sql.SQLContext + +object TPCHBenchmark { + + // Add query numbers here once they are supported + val supportedQueries = Seq(1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 17, 19, 20, 22) + + def query(queryNumber: Int, tpch: TPCH, sqlContext: SQLContext, numPartitions: Int) = { + val sqlStr = tpch.getQuery(queryNumber) + tpch.generateFiles(numPartitions) + + Utils.timeBenchmark( + "distributed" -> (numPartitions > 1), + "query" -> s"TPC-H $queryNumber", + "system" -> Insecure.name) { + + tpch.performQuery(sqlStr, Insecure).collect + } + + Utils.timeBenchmark( + "distributed" -> (numPartitions > 1), + "query" -> s"TPC-H $queryNumber", + "system" -> Encrypted.name) { + + tpch.performQuery(sqlStr, Encrypted).collect + } + } + + def run(sqlContext: SQLContext, numPartitions: Int, size: String, fileUrl: String) = { + val tpch = new TPCH(sqlContext, size, fileUrl) + + for (queryNumber <- supportedQueries) { + query(queryNumber, tpch, sqlContext, numPartitions) + } + } +} diff --git a/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala b/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala index 8d60dfa550..dabd99fa11 100644 --- a/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala +++ b/src/test/scala/edu/berkeley/cs/rise/opaque/TPCHTests.scala @@ -21,99 +21,19 @@ package edu.berkeley.cs.rise.opaque import org.apache.spark.sql.SparkSession import edu.berkeley.cs.rise.opaque.benchmark._ -import edu.berkeley.cs.rise.opaque.benchmark.TPCH trait TPCHTests extends OpaqueTestsBase { self => def size = "sf_small" - def tpch = TPCH(spark.sqlContext, size) + def tpch = new TPCH(spark.sqlContext, size, "file://") - testAgainstSpark("TPC-H 1") { securityLevel => - tpch.query(1, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 2", ignore) { securityLevel => - tpch.query(2, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 3") { securityLevel => - tpch.query(3, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 4") { securityLevel => - tpch.query(4, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 5") { securityLevel => - tpch.query(5, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 6") { securityLevel => - tpch.query(6, securityLevel, spark.sqlContext, numPartitions).collect.toSet - } - - testAgainstSpark("TPC-H 7") { securityLevel => - tpch.query(7, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 8") { securityLevel => - tpch.query(8, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 9") { securityLevel => - tpch.query(9, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 10") { securityLevel => - tpch.query(10, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 11") { securityLevel => - tpch.query(11, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 12") { securityLevel => - tpch.query(12, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 13", ignore) { securityLevel => - tpch.query(13, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 14") { securityLevel => - tpch.query(14, securityLevel, spark.sqlContext, numPartitions).collect.toSet - } - - testAgainstSpark("TPC-H 15") { securityLevel => - tpch.query(15, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 16", ignore) { securityLevel => - tpch.query(16, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 17") { securityLevel => - tpch.query(17, securityLevel, spark.sqlContext, numPartitions).collect.toSet - } - - testAgainstSpark("TPC-H 18", ignore) { securityLevel => - tpch.query(18, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 19") { securityLevel => - tpch.query(19, securityLevel, spark.sqlContext, numPartitions).collect.toSet - } - - testAgainstSpark("TPC-H 20") { securityLevel => - tpch.query(20, securityLevel, spark.sqlContext, numPartitions).collect.toSet - } - - testAgainstSpark("TPC-H 21", ignore) { securityLevel => - tpch.query(21, securityLevel, spark.sqlContext, numPartitions).collect - } - - testAgainstSpark("TPC-H 22") { securityLevel => - tpch.query(22, securityLevel, spark.sqlContext, numPartitions).collect + def runTests() = { + for (queryNum <- TPCHBenchmark.supportedQueries) { + val testStr = s"TPC-H $queryNum" + testAgainstSpark(testStr) { securityLevel => + tpch.query(queryNum, securityLevel, numPartitions).collect + } + } } } @@ -124,6 +44,8 @@ class TPCHSinglePartitionSuite extends TPCHTests { .appName("TPCHSinglePartitionSuite") .config("spark.sql.shuffle.partitions", numPartitions) .getOrCreate() + + runTests(); } class TPCHMultiplePartitionSuite extends TPCHTests { @@ -133,4 +55,6 @@ class TPCHMultiplePartitionSuite extends TPCHTests { .appName("TPCHMultiplePartitionSuite") .config("spark.sql.shuffle.partitions", numPartitions) .getOrCreate() + + runTests(); }