diff --git a/scripts/databricks/.env.example b/scripts/databricks/.env.example new file mode 100644 index 00000000000..c2fddae4aeb --- /dev/null +++ b/scripts/databricks/.env.example @@ -0,0 +1,64 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Configuration for scripts/databricks/deploy.sh. +# +# Copy this file to ".env" (next to deploy.sh, at the repo root, or anywhere in +# a parent directory of the script) and fill in your own values. deploy.sh +# searches upward from its own location for the first .env it finds; you can +# also point it at a specific file with ENV_FILE=/path/to/.env. +# +# Values already exported in your shell take precedence over this file. + +# Databricks CLI profile to use (see ~/.databrickscfg). +# Authenticate once with: databricks auth login --profile +PROFILE=DEFAULT + +# Unity Catalog location for the SystemDS jar volume and demo tables. +CATALOG=main +SCHEMA=default +VOLUME=systemds + +# Compute policy that the cluster must satisfy. Leave empty for no policy. +# Example: Personal Compute policy id. +POLICY_ID= + +# Cluster shape. +# See "Choosing a node type" in README.md for NODE_TYPE options, or run: +# databricks clusters list-node-types -p "$PROFILE" -o json +SPARK_VERSION=16.4.x-scala2.12 +NODE_TYPE=i3.xlarge +NUM_WORKERS=0 +AUTOTERMINATION_MINUTES=30 +CLUSTER_NAME=systemds + +# Optional overrides (uncomment to use): +# Path to the SystemDS jar to upload (defaults to /target/SystemDS.jar). +# JAR_LOCAL=/abs/path/to/SystemDS.jar +# Workspace folder to import the notebooks into (defaults to /Users/). +# NB_DIR=/Users/me@example.com +# Notebooks to import (space-separated; language detected from extension). +# NB_FILES="SystemDS_MLContext_Demo.scala SystemDS_Delta_E2E.scala" +# Delta Kernel Maven library version installed by `deploy.sh libs` (>= 3.3.2; +# must match the delta-kernel.version in pom.xml). +# DELTA_KERNEL_VERSION=3.3.2 +# Override the auto-detected Databricks user name. +# USER_NAME=me@example.com diff --git a/scripts/databricks/.gitignore b/scripts/databricks/.gitignore new file mode 100644 index 00000000000..a8d37d17e76 --- /dev/null +++ b/scripts/databricks/.gitignore @@ -0,0 +1,3 @@ +# Local, user-specific configuration and state — never commit these. +.env +.cluster_id diff --git a/scripts/databricks/README.md b/scripts/databricks/README.md new file mode 100644 index 00000000000..04662775ff1 --- /dev/null +++ b/scripts/databricks/README.md @@ -0,0 +1,186 @@ + + +# Running SystemDS on Databricks + +Scripts and demo notebooks for deploying and running SystemDS on a Databricks +cluster. Tested against DBR 16.4 LTS (Spark 3.5.2 / Scala 2.12), where the +SystemDS jar runs unchanged. + +## Contents + +| File | Purpose | +| --- | --- | +| `deploy.sh` | Create a UC volume, upload `SystemDS.jar`, create a single-user cluster, install the Delta Kernel libraries, and import the demo notebooks. | +| `SystemDS_MLContext_Demo.scala` | Notebook: Unity Catalog round-trip using the SystemDS MLContext (Scala) API. Reads a table, runs a configurable DML script, writes the result back. | +| `SystemDS_Delta_E2E.scala` | Notebook: end-to-end Delta → linear regression on one Delta table. SystemDS reads it natively as a frame (`read(format="delta")`) → `transformencode` → `lm`; Spark ML reads the same table → `OneHotEncoder` → `LinearRegression`. Times read + encode + train for both. | +| `demo.dml` | Standalone DML smoke test: reads a matrix from storage, computes column sums and a Gram-matrix trace. | +| `.env.example` | Template for your local configuration. | + +## Prerequisites + +- The [Databricks CLI](https://docs.databricks.com/dev-tools/cli/) installed and + authenticated once interactively: + + ```bash + databricks auth login --profile + ``` + +- A built SystemDS jar at `/target/SystemDS.jar` + (`mvn -q -DskipTests package`), or set `JAR_LOCAL` to point at one. +- `python3` on your `PATH` (used to parse CLI JSON output). + +## Configuration + +All settings are read from environment variables. The easiest way is a `.env` +file: + +```bash +cp scripts/databricks/.env.example scripts/databricks/.env +# then edit scripts/databricks/.env +``` + +`deploy.sh` looks for a `.env` file by: + +1. `ENV_FILE=/abs/path/to/.env` if you set it explicitly, otherwise +2. searching upward from the script's own directory (script dir → repo root → + any parent directory) for the first `.env` it finds. + +Anything already exported in your shell overrides values from `.env`. + +| Variable | Default | Description | +| --- | --- | --- | +| `PROFILE` | `DEFAULT` | Databricks CLI profile. | +| `CATALOG` / `SCHEMA` / `VOLUME` | `main` / `default` / `systemds` | UC location for the jar volume (and notebook defaults). | +| `POLICY_ID` | _(empty)_ | Compute policy id; leave empty for none. | +| `SPARK_VERSION` | `16.4.x-scala2.12` | DBR runtime. | +| `NODE_TYPE` | `i3.xlarge` | Node type. | +| `NUM_WORKERS` | `0` | Worker count (0 = single node). | +| `AUTOTERMINATION_MINUTES` | `30` | Auto-terminate idle minutes. | +| `CLUSTER_NAME` | `systemds` | Cluster name. | +| `JAR_LOCAL` | `/target/SystemDS.jar` | Jar to upload. | +| `NB_DIR` | `/Users/` | Workspace folder to import notebooks into. | +| `NB_FILES` | _(the 2 demo notebooks)_ | Space-separated notebooks to import; language detected from extension. | +| `DELTA_KERNEL_VERSION` | `3.3.2` | Delta Kernel Maven library version installed by `deploy.sh libs` (>= 3.3.2; must match `pom.xml`). | +| `USER_NAME` | _(auto-detected)_ | Databricks user. | + +### Choosing a node type + +`NODE_TYPE` is a cloud instance type. The default `i3.xlarge` is a small, +storage-optimized AWS node (4 vCPU / 30.5 GiB RAM / 950 GB local NVMe SSD); the +fast local disk is handy because the notebook spills SystemDS scratch to +`/local_disk0`. With `NUM_WORKERS=0` this single node is both driver and +executor, so it bounds the total memory available to SystemDS. + +Some common AWS options (pick more cores/RAM for larger workloads): + +| Node type | vCPU | RAM | Local SSD | Notes | +| --- | --- | --- | --- | --- | +| `i3.xlarge` | 4 | 30.5 GiB | 950 GB | Default; storage-optimized. | +| `i3.2xlarge` | 8 | 61 GiB | 1900 GB | Same family, 2× bigger. | +| `i4i.xlarge` | 4 | 32 GiB | 937 GB | Newer gen, faster storage. | +| `m5d.xlarge` | 4 | 16 GiB | 150 GB | General purpose w/ local SSD. | +| `r5d.2xlarge` | 8 | 64 GiB | 300 GB | Memory-optimized w/ local SSD. | + +The exact set depends on your cloud (AWS / Azure / GCP) and workspace. List what +your workspace actually offers with: + +```bash +databricks clusters list-node-types -p "$PROFILE" -o json +``` + +References: +[AWS EC2 instance types](https://aws.amazon.com/ec2/instance-types/), +[Azure VM sizes](https://learn.microsoft.com/azure/virtual-machines/sizes), +[GCP machine families](https://cloud.google.com/compute/docs/machine-resource). + +## Usage + +```bash +cd scripts/databricks +./deploy.sh upload # create UC volume + copy SystemDS.jar into it +./deploy.sh cluster # create the single-user cluster + install the jar +./deploy.sh libs # install the Delta Kernel Maven libraries on the cluster +./deploy.sh import # import the demo notebooks +./deploy.sh all # all of the above +``` + +The created cluster id is written to `scripts/databricks/.cluster_id`. + +### Delta Kernel libraries + +The Delta notebook (`SystemDS_Delta_E2E`) reads Delta tables natively through the +Spark-free Delta Kernel, which is not on the DBR classpath. `./deploy.sh libs` +installs `io.delta:delta-kernel-defaults` (version `DELTA_KERNEL_VERSION`, default +`3.3.2`) as a cluster Maven library. Use **>= 3.3.2**: earlier releases trip a +classloader conflict with DBR's bundled parquet. The version must match the +`delta-kernel.version` property in the SystemDS `pom.xml`. + +## Notebook configuration (`SystemDS_MLContext_Demo`) + +The Scala notebook is driven by widgets, so nothing is hardcoded — set them in +the notebook UI or pass them as job parameters: + +| Widget | Default | Description | +| --- | --- | --- | +| `catalog` / `schema` | `main` / `default` | Where the input/output tables live. | +| `input_table` / `output_table` | `systemds_input` / `systemds_output` | Table names. | +| `dml_path` | _(blank)_ | DML script to run. Blank uses the built-in z-score demo; otherwise a path readable from the driver (UC volume, `/Workspace`, or `dbfs:`). | +| `exec_type` | `DEFAULT` | SystemDS execution mode. `DEFAULT` lets SystemDS choose the plan; `DRIVER`, `SPARK`, or `DRIVER_AND_SPARK` force a mode. | + +Custom DML contract: the script receives the input matrix as `X` and must +produce a matrix `Y` and a scalar `checksum`. + +## Notebook configuration (`SystemDS_Delta_E2E`) + +| Widget | Default | Description | +| --- | --- | --- | +| `catalog` / `schema` / `volume` | `main` / `default` / `systemds` | UC location; the Delta table is written under the volume. | +| `rows` | `1000000` | Rows in the generated Delta table. | +| `num_numeric` / `num_categorical` / `cardinality` | `100` / `20` / `30` | Feature shape. Defaults are deliberately encode-heavy (700 features) so the SystemDS-vs-Spark difference is visible. | +| `reg` | `1e-3` | L2 regularization for `lm`. | +| `recreate` | `true` | Rewrite the Delta table before running. | +| `statistics` | `true` | Print the SystemDS per-instruction breakdown. | + +The encode complexity (categoricals × cardinality), not the row count, drives the +gap: more categoricals blow up Spark's `StringIndexer` + `OneHotEncoder` (each a +shuffle stage), while SystemDS dummycodes in-memory. On a single node, raw rows +instead favor Spark, and very large tables can exhaust driver memory. + +Indicative single-node (`i3.xlarge`, 1M rows) numbers — single cold run, no warmup: + +| workload | Spark ML | SystemDS | speedup | +| --- | --- | --- | --- | +| `SystemDS_Delta_E2E`, 700 features (read + encode + train) | 116.6 s | 55.4 s | ~2.1× | + +The Spark side is the same speed on Spark 3.5.2 (DBR 16.4) and Spark 4.0.0 +(DBR 17.3 LTS), so the comparison is not an artifact of an old runtime. + +## Notes / gotchas baked into the scripts + +- UC clusters only accept JAR libraries from a **UC Volume** (not DBFS, not + `/Workspace`). +- The cluster must be **SINGLE_USER** (Assigned) mode; shared / USER_ISOLATION + blocks JAR libraries. +- SystemDS needs the Vector API module plus a full `--add-opens` set at JVM + launch (configured via `spark.{driver,executor}.extraJavaOptions`), and an + absolute scratch dir (the notebook pins `sysds.scratch`). + +`.env` and `.cluster_id` are git-ignored — they hold personal config and local +state and should never be committed. diff --git a/scripts/databricks/SystemDS_Delta_E2E.scala b/scripts/databricks/SystemDS_Delta_E2E.scala new file mode 100644 index 00000000000..cacc1c729f4 --- /dev/null +++ b/scripts/databricks/SystemDS_Delta_E2E.scala @@ -0,0 +1,236 @@ +// Databricks notebook source +//------------------------------------------------------------- +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +//------------------------------------------------------------- + +// End-to-end Delta -> linear regression, on the SAME Delta table for both engines: +// - SystemDS: read(format="delta") as a frame -> transformencode -> lm +// - Spark ML: spark.read delta -> StringIndexer + OneHotEncoder + LinearRegression +// +// SystemDS reads the Delta table natively through the Spark-free Delta Kernel +// (FrameReaderDelta), so the whole pipeline runs inside the SystemDS runtime. +// The timed region is end-to-end: read + encode + train, for both engines. +// +// Prereqs (handled by deploy.sh): +// - SystemDS.jar installed on the cluster, with the SystemDS JVM flags. +// - io.delta:delta-kernel-api + delta-kernel-defaults installed as Maven +// libraries (Delta Kernel is NOT on the DBR classpath and is NOT bundled in +// SystemDS.jar). + +// COMMAND ---------- + +// On a single-node cluster SystemDS reads the whole table into driver memory, so +// both `rows` and the encoded width (num_numeric + num_categorical*cardinality) +// are bounded by the node size. On an i3.xlarge (~30 GB) ~10M rows OOMs in +// transformencode; scale up the node or lower these on OOM. +// +// What drives the SystemDS-vs-Spark gap is encoding complexity, not row count: +// more categoricals / higher cardinality blow up Spark's StringIndexer + +// OneHotEncoder (each a shuffle stage), while SystemDS dummycodes in-memory. +// Raw rows instead favor Spark (it parallelizes across cores; single-node CP +// does not). Defaults below are deliberately encode-heavy (700 features) so the +// difference is visible; the baseline 50/4/20 config is roughly a tie at 1M rows. +dbutils.widgets.text("catalog", "main", "Unity Catalog") +dbutils.widgets.text("schema", "default", "Schema") +dbutils.widgets.text("volume", "systemds", "Volume (table is written under it)") +dbutils.widgets.text("rows", "1000000", "Number of rows") +dbutils.widgets.text("num_numeric", "100", "Numeric feature columns") +dbutils.widgets.text("num_categorical", "20", "Categorical feature columns") +dbutils.widgets.text("cardinality", "30", "Distinct values per categorical") +dbutils.widgets.text("reg", "1e-3", "L2 regularization (lambda)") +dbutils.widgets.dropdown("recreate", "true", Seq("true", "false"), "Recreate the Delta table") +dbutils.widgets.dropdown("statistics", "true", Seq("true", "false"), "Print SystemDS statistics") + +val CATALOG = dbutils.widgets.get("catalog") +val SCHEMA = dbutils.widgets.get("schema") +val VOLUME = dbutils.widgets.get("volume") +val N = dbutils.widgets.get("rows").toLong +val DNUM = dbutils.widgets.get("num_numeric").toInt +val DCAT = dbutils.widgets.get("num_categorical").toInt +val CARD = dbutils.widgets.get("cardinality").toInt +val REG = dbutils.widgets.get("reg").toDouble +val RECREATE = dbutils.widgets.get("recreate").toBoolean +val STATS = dbutils.widgets.get("statistics").toBoolean + +val numCols = (0 until DNUM).map(i => s"num_$i").toArray +val catCols = (0 until DCAT).map(j => s"cat_$j").toArray + +// The table lives on a UC volume (FUSE-mounted locally at /Volumes/...). Spark +// reads it via the same path; SystemDS reads the local FUSE mount with an +// explicit file: scheme so the Delta Kernel's Hadoop engine uses the local +// filesystem rather than the cluster default (dbfs). +val tablePath = s"/Volumes/$CATALOG/$SCHEMA/$VOLUME/delta_e2e" +val sysdsPath = "file:" + tablePath +println(s"config: rows=$N numeric=$DNUM categorical=$DCAT cardinality=$CARD reg=$REG") +println(s"table : $tablePath") + +// COMMAND ---------- + +// MAGIC %md +// MAGIC ## Setup: materialize the dataset as a Delta table (once) + +// COMMAND ---------- + +import org.apache.spark.sql.functions._ + +// Deterministic linear weights so the target is actually learnable. +val weights = (0 until DNUM).map(i => ((i % 7) - 3) * 0.5) + +def writeDeltaTable(): Unit = { + var df = spark.range(0, N).toDF("id") + for (i <- 0 until DNUM) + df = df.withColumn(s"num_$i", rand(i.toLong) * 2.0 - 1.0) + for (j <- 0 until DCAT) + df = df.withColumn(s"cat_$j", (floor(rand(100L + j) * CARD) + 1).cast("string")) + val signal = (0 until DNUM).map(i => col(s"num_$i") * lit(weights(i))).reduce(_ + _) + // Column order written to Delta is [numeric.., categorical.., y]; SystemDS + // relies on this order for the transform spec and target column. + val out = df.withColumn("y", signal + (rand(999L) * 0.2 - 0.1)).drop("id") + .select((numCols ++ catCols ++ Array("y")).map(col): _*) + // overwriteSchema so re-running with a different feature count replaces an + // existing table whose schema no longer matches. + out.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(tablePath) +} + +val exists = try { dbutils.fs.ls(tablePath); true } catch { case _: Throwable => false } +if (RECREATE || !exists) { + println(">> writing Delta table ...") + writeDeltaTable() +} +val tblRows = spark.read.format("delta").load(tablePath).count() +println(s">> Delta table ready: $tblRows rows at $tablePath") + +// COMMAND ---------- + +// MAGIC %md +// MAGIC ## SystemDS: read Delta (native Kernel) -> transformencode -> lm + +// COMMAND ---------- + +import org.apache.sysds.api.mlcontext._ +import org.apache.sysds.api.mlcontext.ScriptFactory._ + +val ml = new MLContext(sc) +// With statistics on, SystemDS prints a per-instruction breakdown (heavy hitters) +// after execute: the Delta read shows up as cache acquire-read (ACQr) time, plus +// transformencode and the lm operators (m_lm/tsmm/solve). Useful to see where the +// end-to-end time actually goes. +ml.setStatistics(STATS) +ml.setStatisticsMaxHeavyHitters(25) +ml.setConfigProperty("sysds.scratch", "/tmp/systemds_scratch") +ml.setConfigProperty("sysds.localtmpdir", "/local_disk0/tmp/systemds") +// Force single-node (CP) execution. The native Delta frame reader +// (FrameReaderDelta) is a control-program reader; under Spark execution the +// frame read is distributed and bypasses it (failing to parse the Delta parquet +// files). On a single-node cluster CP execution is the intended mode anyway. +ml.setExecutionType(MLContext.ExecutionType.DRIVER) + +// transform spec: one-hot (dummycode) the categorical columns. Delta column +// order is [numeric.., categorical.., y]; categoricals are 1-based indices +// DNUM+1 .. DNUM+DCAT. Numeric features and y pass through unchanged. +val catIdx = (DNUM + 1 to DNUM + DCAT).mkString(",") +val spec = s"""{"ids":true,"dummycode":[$catIdx]}""" + +// Whole pipeline in one script: native Delta frame read -> encode -> train. +val e2eDml = """ + F = read($path, data_type="frame", format="delta") + [X, M] = transformencode(target=F, spec=spec) + nc = ncol(X) + yv = X[, nc] + Xv = X[, 1:(nc-1)] + B = lm(X=Xv, y=yv, icpt=1, reg=reg, verbose=FALSE) + checksum = sum(B) + nfeat = ncol(Xv) + nrows = nrow(X) +""" + +def runSysds(path: String): (Double, Int, Long, Double) = { + val script = dml(e2eDml) + .in("$path", path).in("spec", spec).in("reg", REG) + .out("checksum", "nfeat", "nrows") + val t0 = System.nanoTime() + val res = ml.execute(script) + val secs = (System.nanoTime() - t0) / 1e9 + (secs, res.getDouble("nfeat").toInt, res.getDouble("nrows").toLong, res.getDouble("checksum")) +} + +val (sysdsSecs, sysdsFeat, sysdsRows, sysdsChk) = runSysds(sysdsPath) +println(f"SystemDS read+encode+train: $sysdsSecs%.2f s | rows=$sysdsRows features=$sysdsFeat checksum=$sysdsChk%.4f") + +// COMMAND ---------- + +// MAGIC %md +// MAGIC ## Spark ML: read Delta -> OneHotEncoder -> LinearRegression + +// COMMAND ---------- + +import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler} +import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} + +def sparkPipeline(): Pipeline = { + val indexers = catCols.map(c => + new StringIndexer().setInputCol(c).setOutputCol(c + "_idx").setHandleInvalid("keep")) + val ohe = new OneHotEncoder() + .setInputCols(catCols.map(_ + "_idx")).setOutputCols(catCols.map(_ + "_oh")) + .setDropLast(false) // keep all categories, matching SystemDS dummycode + val assembler = new VectorAssembler() + .setInputCols(numCols ++ catCols.map(_ + "_oh")).setOutputCol("features") + val lr = new LinearRegression() + .setLabelCol("y").setFeaturesCol("features") + .setRegParam(REG).setElasticNetParam(0.0).setFitIntercept(true) + new Pipeline().setStages(indexers ++ Array(ohe, assembler, lr)) +} + +// End-to-end: time the lazy Delta read + encode + train together (the read is +// triggered inside pipeline.fit), symmetric to the SystemDS execute above. +def runSpark(path: String): (Double, Int, Double) = { + val t0 = System.nanoTime() + val df = spark.read.format("delta").load(path) + val model = sparkPipeline().fit(df) + val secs = (System.nanoTime() - t0) / 1e9 + val lr = model.stages.last.asInstanceOf[LinearRegressionModel] + (secs, lr.numFeatures, lr.intercept) +} + +val (sparkSecs, sparkFeat, sparkIntercept) = runSpark(tablePath) +println(f"Spark ML read+encode+train: $sparkSecs%.2f s | features=$sparkFeat intercept=$sparkIntercept%.4f") + +// COMMAND ---------- + +// MAGIC %md +// MAGIC ## Result + +// COMMAND ---------- + +val speedup = sparkSecs / sysdsSecs +println(f""" +=== E2E Delta -> linear regression (read + encode + train) === +dataset : rows=$N numeric=$DNUM categorical=$DCAT cardinality=$CARD +table : $tablePath +Spark ML : $sparkSecs%6.2f s (features=$sparkFeat) +SystemDS : $sysdsSecs%6.2f s (features=$sysdsFeat, rows read=$sysdsRows) +Speedup : $speedup%6.2fx (Spark / SystemDS) +""") + +dbutils.notebook.exit( + f"rows=$N numeric=$DNUM categorical=$DCAT card=$CARD " + + f"spark_s=$sparkSecs%.2f sysds_s=$sysdsSecs%.2f speedup=$speedup%.2f " + + f"sysds_features=$sysdsFeat sysds_rows=$sysdsRows") diff --git a/scripts/databricks/SystemDS_MLContext_Demo.scala b/scripts/databricks/SystemDS_MLContext_Demo.scala new file mode 100644 index 00000000000..26a237be491 --- /dev/null +++ b/scripts/databricks/SystemDS_MLContext_Demo.scala @@ -0,0 +1,153 @@ +// Databricks notebook source +//------------------------------------------------------------- +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +//------------------------------------------------------------- + +// SystemDS on Databricks: Unity Catalog round-trip +// +// Reads a table from Unity Catalog into SystemDS, runs a DML script over it, +// and writes the result back as a UC table. Prereq: `SystemDS.jar` installed +// on the cluster (DBR 16.4 LTS, Spark 3.5.2 / Scala 2.12) with the SystemDS +// JVM flags. +// +// Everything is configured via the notebook widgets (first cell) so this works +// in any workspace: +// - catalog / schema / input_table / output_table: where data is read/written +// - dml_path: path to a DML script to run (blank = built-in z-score demo) +// - exec_type: SystemDS execution mode (DEFAULT = let SystemDS decide) +// +// The DML script contract: it receives the input matrix as `X` and must +// produce a matrix `Y` and a scalar `checksum`. + +// COMMAND ---------- + +// Widgets make the notebook portable: set these per workspace instead of +// hardcoding values. The table location defaults to main.default. +dbutils.widgets.text("catalog", "main", "Unity Catalog") +dbutils.widgets.text("schema", "default", "Schema") +dbutils.widgets.text("input_table", "systemds_input", "Input table name") +dbutils.widgets.text("output_table", "systemds_output", "Output table name") +// DML script to run. Blank uses the built-in z-score demo below. Otherwise a +// path readable from the cluster driver, e.g. a UC volume, /Workspace, or dbfs: +// /Volumes////my_script.dml +dbutils.widgets.text("dml_path", "", "DML script path (blank = built-in demo)") +// SystemDS execution mode. DEFAULT lets SystemDS choose (no forcing). +dbutils.widgets.dropdown("exec_type", "DEFAULT", + Seq("DEFAULT", "DRIVER", "SPARK", "DRIVER_AND_SPARK"), "Execution type") + +val catalog = dbutils.widgets.get("catalog") +val schema = dbutils.widgets.get("schema") +val INPUT_TABLE = s"$catalog.$schema.${dbutils.widgets.get("input_table")}" +val OUTPUT_TABLE = s"$catalog.$schema.${dbutils.widgets.get("output_table")}" +val DML_PATH = dbutils.widgets.get("dml_path") +val EXEC_TYPE = dbutils.widgets.get("exec_type") + +// COMMAND ---------- + +import org.apache.sysds.api.mlcontext._ +import org.apache.sysds.api.mlcontext.ScriptFactory._ +import org.apache.sysds.utils.Statistics +import org.apache.spark.sql.functions._ + +val ml = new MLContext(sc) +ml.setStatistics(true) +// Only override the execution mode when explicitly requested; DEFAULT leaves +// SystemDS to pick the plan (it will use Spark only when it decides to). +if (EXEC_TYPE != "DEFAULT") + ml.setExecutionType(MLContext.ExecutionType.valueOf(EXEC_TYPE)) +// SystemDS defaults to a relative scratch dir, which the Databricks default +// filesystem rejects ("Path must be absolute"). Pin both to absolute paths. +ml.setConfigProperty("sysds.scratch", "/tmp/systemds_scratch") +ml.setConfigProperty("sysds.localtmpdir", "/local_disk0/tmp/systemds") +println("Spark version: " + sc.version + " | exec_type: " + EXEC_TYPE) + +// COMMAND ---------- + +// MAGIC %md +// MAGIC ## Setup: ensure an input table exists in the catalog + +// COMMAND ---------- + +if (!spark.catalog.tableExists(INPUT_TABLE)) { + val seed = spark.range(0, 5000).select( + (rand(1) * 100).as("f1"), + (rand(2) * 10 + 5).as("f2"), + (rand(3) - 0.5).as("f3"), + (rand(4) * 1000).as("f4")) + seed.write.mode("overwrite").saveAsTable(INPUT_TABLE) +} +println(s"input table $INPUT_TABLE rows = ${spark.table(INPUT_TABLE).count()}") + +// COMMAND ---------- + +// MAGIC %md +// MAGIC ## 1. Read a table from the catalog into SystemDS + +// COMMAND ---------- + +val inDF = spark.table(INPUT_TABLE) + .select(col("f1").cast("double"), col("f2").cast("double"), + col("f3").cast("double"), col("f4").cast("double")) + +// Built-in fallback: standardize the columns (z-score). Used when no dml_path +// widget is set. A custom script must read `X` and produce `Y` and `checksum`. +val defaultScript = """ + n = nrow(X) + mu = colMeans(X) + Xc = X - mu + variance = colSums(Xc^2) / (n - 1) + sigma = sqrt(variance) + Y = Xc / sigma + checksum = sum(Y) + print("standardized " + nrow(X) + " x " + ncol(X) + " matrix") +""" + +val baseScript = if (DML_PATH.nonEmpty) { + println(s"running DML from $DML_PATH") + dmlFromFile(DML_PATH) +} else { + println("running built-in z-score demo script") + dml(defaultScript) +} +val script = baseScript.in("X", inDF).out("Y", "checksum") + +val res = ml.execute(script) +val checksum = res.getDouble("checksum") +val outDF = res.getDataFrameDoubleNoIDColumn("Y") +println(s"checksum(Y) = $checksum (≈0 for standardized data)") + +// COMMAND ---------- + +// MAGIC %md +// MAGIC ## 2. Write the SystemDS result back to the catalog + +// COMMAND ---------- + +outDF.write.mode("overwrite").saveAsTable(OUTPUT_TABLE) +val outRows = spark.table(OUTPUT_TABLE).count() +val outCols = spark.table(OUTPUT_TABLE).columns.length +println(s"wrote $OUTPUT_TABLE: $outRows rows x $outCols cols") + +// COMMAND ---------- + +val spExecuted = Statistics.getNoOfExecutedSPInst() +dbutils.notebook.exit( + s"spark=${sc.version} in=$INPUT_TABLE out=$OUTPUT_TABLE " + + s"out_rows=$outRows out_cols=$outCols checksum=$checksum sp_executed=$spExecuted") diff --git a/scripts/databricks/demo.dml b/scripts/databricks/demo.dml new file mode 100644 index 00000000000..c6195bbf3d4 --- /dev/null +++ b/scripts/databricks/demo.dml @@ -0,0 +1,50 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# SystemDS on Databricks smoke test. +# Reads a matrix from storage and computes column sums and a Gram-matrix trace, +# so the read path plus a few distributed-friendly operations are exercised. +# +# Args (all optional, with defaults): +# -nvargs in= fmt= out= +# +# `in` is any path readable from the cluster driver (a UC volume, /Workspace, +# or dbfs:) and `fmt` any SystemDS-supported matrix format (csv, binary, +# libsvm, mm, ...). Example: +# in=/Volumes////demo_input.csv fmt=csv + +in = ifdef($in, "demo_input.csv") +fmt = ifdef($fmt, "csv") +out = ifdef($out, "demo_result.txt") + +X = read(in, format=fmt) + +# Column sums and a Gram-matrix trace: both push work through Spark +# instructions for large inputs. +colSums = colSums(X) +gramTrace = sum(X * X) + +s = sum(colSums) + gramTrace + +print("rows=" + nrow(X) + " cols=" + ncol(X)) +print("result=" + s) + +write(s, out, format="text") diff --git a/scripts/databricks/deploy.sh b/scripts/databricks/deploy.sh new file mode 100755 index 00000000000..a144c6d46c1 --- /dev/null +++ b/scripts/databricks/deploy.sh @@ -0,0 +1,257 @@ +#!/usr/bin/env bash +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Deploy + run SystemDS on a Databricks cluster. +# +# Tested against DBR 16.4 LTS (Spark 3.5.2 / Scala 2.12), where the SystemDS +# jar runs unchanged. +# +# Quick start: +# 1. Copy scripts/databricks/.env.example to .env and edit it (or place the +# .env at the root of your workspace; this script searches parent dirs). +# 2. Authenticate the Databricks CLI once, interactively: +# databricks auth login --profile +# 3. Build the SystemDS jar (mvn -q -DskipTests package) so target/SystemDS.jar +# exists, or point JAR_LOCAL at an existing jar. +# 4. Run a step: +# ./deploy.sh upload # create UC volume + copy SystemDS.jar into it +# ./deploy.sh cluster # create single-user cluster + install SystemDS.jar +# ./deploy.sh libs # install Delta Kernel Maven libraries on cluster +# ./deploy.sh import # import the demo notebook(s) +# ./deploy.sh all # upload + cluster + libs + import +# +# All configuration is read from environment variables (see .env.example). +# Anything already exported in your shell overrides the .env file. +# +# Hard-won requirements baked in below: +# - Cluster creation requires a compute policy (e.g. Personal Compute) that +# restricts node types and autotermination. +# - UC clusters only accept JAR libraries from a UC Volume (not DBFS, not +# /Workspace). +# - Must be SINGLE_USER (Assigned) mode; shared/USER_ISOLATION blocks JAR libs. +# - SystemDS needs the Vector API module + a full --add-opens set at JVM +# launch, and an absolute scratch dir (the notebook sets sysds.scratch). + +set -euo pipefail + +HERE="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +#------------------------------------------------------------- +# Load configuration from a .env file. +# Resolution order: +# 1. $ENV_FILE if explicitly set. +# 2. The first .env found walking up from this script's directory +# (script dir -> repo root -> workspace root -> ... -> /). +#------------------------------------------------------------- +find_env_file() { + if [[ -n "${ENV_FILE:-}" ]]; then + printf '%s\n' "$ENV_FILE" + return 0 + fi + local dir="$HERE" + while [[ "$dir" != "/" ]]; do + if [[ -f "$dir/.env" ]]; then + printf '%s\n' "$dir/.env" + return 0 + fi + dir="$(dirname "$dir")" + done + return 1 +} + +if ENV_PATH="$(find_env_file)"; then + echo ">> loading config from $ENV_PATH" + set -a + # shellcheck disable=SC1090 + source "$ENV_PATH" + set +a +else + echo ">> no .env found; relying on exported environment variables / defaults" +fi + +#------------------------------------------------------------- +# Configuration (env var with sensible fallback). +#------------------------------------------------------------- +PROFILE="${PROFILE:-DEFAULT}" + +# Repo root is used to locate the default jar (target/SystemDS.jar). +REPO_ROOT="${REPO_ROOT:-$(git -C "$HERE" rev-parse --show-toplevel 2>/dev/null || echo "$HERE/../..")}" +JAR_LOCAL="${JAR_LOCAL:-$REPO_ROOT/target/SystemDS.jar}" + +db() { databricks -p "$PROFILE" "$@"; } + +# Resolve the current user lazily (only needed for the notebook import target). +resolve_user() { + if [[ -z "${USER_NAME:-}" ]]; then + USER_NAME="$(db current-user me -o json \ + | python3 -c 'import sys,json;print(json.load(sys.stdin)["userName"])')" + fi +} + +CATALOG="${CATALOG:-main}" +SCHEMA="${SCHEMA:-default}" +VOLUME="${VOLUME:-systemds}" +VOL_DIR="/Volumes/$CATALOG/$SCHEMA/$VOLUME" +JAR_REMOTE="$VOL_DIR/SystemDS.jar" + +POLICY_ID="${POLICY_ID:-}" +SPARK_VERSION="${SPARK_VERSION:-16.4.x-scala2.12}" +NODE_TYPE="${NODE_TYPE:-i3.xlarge}" +NUM_WORKERS="${NUM_WORKERS:-0}" +AUTOTERMINATION_MINUTES="${AUTOTERMINATION_MINUTES:-30}" +CLUSTER_NAME="${CLUSTER_NAME:-systemds}" +CLUSTER_ID_FILE="${CLUSTER_ID_FILE:-$HERE/.cluster_id}" + +# Delta Kernel is not on the DBR classpath and is not bundled in SystemDS.jar +# (the uber jar shades only wink + antlr). The native Delta read/write path needs +# delta-kernel-api + delta-kernel-defaults installed as cluster Maven libraries. +# Use >= 3.3.2: earlier releases (3.3.0/3.3.1) subclass parquet-mr's +# package-private InternalParquetRecordReader, which breaks across Databricks' +# library/app classloaders (IllegalAccessError, then NoSuchMethodError against +# DBR's parquet). 3.3.2 (delta PR #4494) switched to parquet's public +# ParquetReader API, so Kernel works with DBR's own parquet. 4.x targets Spark 4 +# (wrong for DBR 16.4 = Spark 3.5). Must match the delta-kernel.version in pom.xml. +DELTA_KERNEL_VERSION="${DELTA_KERNEL_VERSION:-3.3.2}" + +# SystemDS JVM flags (mirror of in pom.xml). +JVMOPTS="${JVMOPTS:---add-modules=jdk.incubator.vector \ +--add-opens=java.base/java.nio=ALL-UNNAMED \ +--add-opens=java.base/java.io=ALL-UNNAMED \ +--add-opens=java.base/java.util=ALL-UNNAMED \ +--add-opens=java.base/java.lang=ALL-UNNAMED \ +--add-opens=java.base/java.lang.ref=ALL-UNNAMED \ +--add-opens=java.base/java.util.concurrent=ALL-UNNAMED \ +--add-opens=java.base/sun.nio.ch=ALL-UNNAMED}" + +# Notebooks to import (space-separated, relative to this dir). Language is +# detected from the extension (.scala -> SCALA, .py -> PYTHON). +NB_FILES="${NB_FILES:-SystemDS_MLContext_Demo.scala SystemDS_Delta_E2E.scala}" + +#------------------------------------------------------------- +# Steps. +#------------------------------------------------------------- +upload() { + [[ -f "$JAR_LOCAL" ]] || { echo "!! jar not found: $JAR_LOCAL (build it or set JAR_LOCAL)"; exit 1; } + echo ">> ensuring UC volume $CATALOG.$SCHEMA.$VOLUME" + db volumes create "$CATALOG" "$SCHEMA" "$VOLUME" MANAGED 2>/dev/null || true + echo ">> uploading $JAR_LOCAL -> $JAR_REMOTE" + db fs cp --overwrite "$JAR_LOCAL" "dbfs:$JAR_REMOTE" +} + +cluster() { + resolve_user + echo ">> creating cluster $CLUSTER_NAME ($SPARK_VERSION, $NODE_TYPE, single-user)" + local policy_json="" + if [[ -n "$POLICY_ID" ]]; then + policy_json="\"policy_id\": \"$POLICY_ID\", \"apply_policy_default_values\": true," + fi + CID=$(db clusters create --no-wait -o json --json "{ + \"cluster_name\": \"$CLUSTER_NAME\", + $policy_json + \"spark_version\": \"$SPARK_VERSION\", + \"node_type_id\": \"$NODE_TYPE\", + \"num_workers\": $NUM_WORKERS, + \"autotermination_minutes\": $AUTOTERMINATION_MINUTES, + \"data_security_mode\": \"SINGLE_USER\", + \"single_user_name\": \"$USER_NAME\", + \"spark_conf\": { + \"spark.databricks.cluster.profile\": \"singleNode\", + \"spark.master\": \"local[*]\", + \"spark.driver.extraJavaOptions\": \"$JVMOPTS\", + \"spark.executor.extraJavaOptions\": \"$JVMOPTS\" + } + }" | python3 -c 'import sys,json;print(json.load(sys.stdin)["cluster_id"])') + echo "cluster_id=$CID" + echo "$CID" > "$CLUSTER_ID_FILE" + echo ">> installing library $JAR_REMOTE (queued; installs once RUNNING)" + db libraries install --json "{\"cluster_id\":\"$CID\",\"libraries\":[{\"jar\":\"$JAR_REMOTE\"}]}" +} + +# Resolve the target cluster id: prefer the one written by cluster(), else look +# it up by name. +resolve_cluster_id() { + if [[ -n "${CLUSTER_ID:-}" ]]; then + return 0 + fi + if [[ -f "$CLUSTER_ID_FILE" ]]; then + CLUSTER_ID="$(cat "$CLUSTER_ID_FILE")" + return 0 + fi + CLUSTER_ID="$(db clusters list -o json \ + | python3 -c 'import sys,json; +clusters=json.load(sys.stdin); +m=[c for c in clusters if c.get("cluster_name")=="'"$CLUSTER_NAME"'"]; +print(m[0]["cluster_id"] if m else "")')" + [[ -n "$CLUSTER_ID" ]] || { echo "!! could not resolve cluster id for $CLUSTER_NAME (run ./deploy.sh cluster first or set CLUSTER_ID)"; exit 1; } +} + +# Install the Delta Kernel Maven libraries on the cluster. delta-kernel-defaults +# pulls delta-kernel-api transitively; both come from Maven Central. parquet, +# hadoop and jackson are excluded so Kernel uses DBR's own copies (avoids +# duplicate-class / split-package issues across the library/app classloaders). +libs() { + resolve_cluster_id + echo ">> installing Delta Kernel $DELTA_KERNEL_VERSION Maven libs on $CLUSTER_ID" + db libraries install --json "{ + \"cluster_id\": \"$CLUSTER_ID\", + \"libraries\": [ + {\"maven\": { + \"coordinates\": \"io.delta:delta-kernel-defaults:$DELTA_KERNEL_VERSION\", + \"exclusions\": [ + \"org.apache.parquet:parquet-hadoop\", + \"org.apache.hadoop:hadoop-client-runtime\", + \"org.apache.hadoop:hadoop-client-api\", + \"com.fasterxml.jackson.core:jackson-databind\" + ] + }} + ] + }" + echo ">> queued; check status with: db libraries cluster-status $CLUSTER_ID" +} + +import() { + resolve_user + local dir="${NB_DIR:-/Users/$USER_NAME}" + local nb lang base + for nb in $NB_FILES; do + case "$nb" in + *.scala) lang="SCALA" ;; + *.py) lang="PYTHON" ;; + *.sql) lang="SQL" ;; + *.r|*.R) lang="R" ;; + *) echo "!! skipping $nb (unknown notebook language)"; continue ;; + esac + base="$(basename "${nb%.*}")" + echo ">> importing $nb -> $dir/$base ($lang)" + db workspace import --overwrite --language "$lang" --format SOURCE \ + --file "$HERE/$nb" "$dir/$base" + done +} + +case "${1:-all}" in + upload) upload ;; + cluster) cluster ;; + libs) libs ;; + import) import ;; + all) upload; cluster; libs; import ;; + *) echo "usage: $0 {upload|cluster|libs|import|all}"; exit 1 ;; +esac