From 92fc3c84277fd3823733fe6cd5504d627270ea48 Mon Sep 17 00:00:00 2001
From: Sebastian Baunsgaard
Date: Fri, 26 Jun 2026 10:06:36 +0000
Subject: [PATCH 1/2] [SYSTEMDS-3949] Add native Delta Lake matrix read/write
via Delta Kernel
Introduce a DELTA file format that reads and writes Delta Lake tables
natively through the Spark-free Delta Kernel library, for matrices on the
single-node CP path. DML read/write with format="delta" now operates
directly on Delta tables without a Spark DataFrame round-trip.
- Add FileFormat.DELTA and exclude it from the text formats
- Accept format="delta" with unknown dimensions in the parser and set
blocksize -1 for the columnar format
- Wire DELTA into the matrix reader and writer factories
- Add DeltaKernelUtils plus serial and parallel native Delta readers and
WriterDelta with column-at-a-time, boxing-free data transfer
- Expose Delta reader batch size and writer target file size via DMLConfig
- Refresh cached matrix metadata after a Delta read (discovered dimensions)
- Add a parquet.version property and pin delta-kernel 3.3.2
- Run Delta component IO tests in CI and broaden matrix coverage
Append/overwrite table semantics, distributed execution, frames, and time
travel are out of scope.
---
.github/workflows/javaTests.yml | 2 +-
pom.xml | 33 +
.../java/org/apache/sysds/common/Types.java | 3 +-
.../sysds/conf/ConfigurationManager.java | 10 +
.../java/org/apache/sysds/conf/DMLConfig.java | 12 +
.../apache/sysds/parser/DMLTranslator.java | 3 +-
.../apache/sysds/parser/DataExpression.java | 13 +-
.../controlprogram/caching/MatrixObject.java | 3 +-
.../sysds/runtime/io/DeltaKernelUtils.java | 431 +++++++++++++
.../sysds/runtime/io/MatrixReaderFactory.java | 11 +-
.../sysds/runtime/io/MatrixWriterFactory.java | 3 +
.../apache/sysds/runtime/io/ReaderDelta.java | 191 ++++++
.../sysds/runtime/io/ReaderDeltaParallel.java | 211 +++++++
.../apache/sysds/runtime/io/WriterDelta.java | 218 +++++++
.../io/DeltaMatrixReadWriteTest.java | 595 ++++++++++++++++++
.../io/delta/DeltaReadWriteTest.java | 130 ++++
.../functions/io/delta/DeltaReadCompare.dml | 34 +
.../scripts/functions/io/delta/DeltaWrite.dml | 30 +
18 files changed, 1924 insertions(+), 9 deletions(-)
create mode 100644 src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java
create mode 100644 src/main/java/org/apache/sysds/runtime/io/ReaderDelta.java
create mode 100644 src/main/java/org/apache/sysds/runtime/io/ReaderDeltaParallel.java
create mode 100644 src/main/java/org/apache/sysds/runtime/io/WriterDelta.java
create mode 100644 src/test/java/org/apache/sysds/test/component/io/DeltaMatrixReadWriteTest.java
create mode 100644 src/test/java/org/apache/sysds/test/functions/io/delta/DeltaReadWriteTest.java
create mode 100644 src/test/scripts/functions/io/delta/DeltaReadCompare.dml
create mode 100644 src/test/scripts/functions/io/delta/DeltaWrite.dml
diff --git a/.github/workflows/javaTests.yml b/.github/workflows/javaTests.yml
index 747d8930cd3..0878c5d12ba 100644
--- a/.github/workflows/javaTests.yml
+++ b/.github/workflows/javaTests.yml
@@ -60,7 +60,7 @@ jobs:
"org.apache.sysds.test.applications.**",
"**.test.usertest.**",
"**.component.c**.** -Dtest-threadCount=1 -Dtest-forkCount=1",
- "**.component.e**.**,**.component.f**.**,**.component.m**.**,**.component.o**.**",
+ "**.component.e**.**,**.component.f**.**,**.component.i**.**,**.component.m**.**,**.component.o**.**",
"**.component.p**.**,**.component.r**.**,**.component.s**.**,**.component.t**.**,**.component.u**.**",
"**.functions.a**.**,**.functions.binary.matrix.**,**.functions.binary.scalar.**,**.functions.binary.tensor.**",
"**.functions.blocks.**,**.functions.data.rand.**,",
diff --git a/pom.xml b/pom.xml
index cfd3d8464fb..44454361318 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,8 @@
2.15.42.12.182.12
+ 3.3.2
+ 1.13.1yyyy-MM-dd HH:mm:ss z1false
@@ -968,6 +970,37 @@
+
+
+ io.delta
+ delta-kernel-api
+ ${delta-kernel.version}
+
+
+ io.delta
+ delta-kernel-defaults
+ ${delta-kernel.version}
+
+
+
+ org.apache.parquet
+ parquet-hadoop
+ ${parquet.version}
+
+
+ org.apache.parquet
+ parquet-column
+ ${parquet.version}
+
+
+ org.apache.parquet
+ parquet-common
+ ${parquet.version}
+ org.jcudajcuda
diff --git a/src/main/java/org/apache/sysds/common/Types.java b/src/main/java/org/apache/sysds/common/Types.java
index c2832aeb8cd..624c9eed3c6 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -878,6 +878,7 @@ public enum FileFormat {
HDF5, // Hierarchical Data Format (HDF)
COG, // Cloud-optimized GeoTIFF
PARQUET, // parquet format for columnar data storage
+ DELTA, // Delta Lake table (transaction log + parquet), read/written via Delta Kernel
UNKNOWN;
public boolean isIJV() {
@@ -885,7 +886,7 @@ public boolean isIJV() {
}
public boolean isTextFormat() {
- return this != BINARY && this != COMPRESSED;
+ return this != BINARY && this != COMPRESSED && this != DELTA;
}
public static boolean isTextFormat(String fmt) {
diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
index 0d5ee888d86..4c6e511c18f 100644
--- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
+++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
@@ -258,6 +258,16 @@ public static int getFederatedTimeout(){
return getDMLConfig().getIntValue(DMLConfig.FEDERATED_TIMEOUT);
}
+ /** @return rows per parquet read batch for the native Delta reader */
+ public static int getDeltaReaderBatchSize() {
+ return getDMLConfig().getIntValue(DMLConfig.DELTA_READER_BATCH_SIZE);
+ }
+
+ /** @return target data-file size (bytes) for the native Delta writer */
+ public static long getDeltaWriterTargetFileSize() {
+ return Long.parseLong(getDMLConfig().getTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE));
+ }
+
public static boolean isFederatedSSL(){
return getDMLConfig().getBooleanValue(DMLConfig.USE_SSL_FEDERATED_COMMUNICATION);
}
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index a6339656fb0..d1cb5337e20 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -71,6 +71,16 @@ public class DMLConfig
public static final String CP_PARALLEL_OPS = "sysds.cp.parallel.ops";
public static final String CP_PARALLEL_IO = "sysds.cp.parallel.io";
public static final String IO_COMPRESSION_CODEC = "sysds.io.compression.encoding";
+ public static final String DELTA_READER_BATCH_SIZE = "sysds.io.delta.reader.batchsize"; // int: rows per parquet read batch
+ public static final String DELTA_WRITER_TARGET_FILE_SIZE = "sysds.io.delta.writer.targetfilesize"; // long: target data-file size in bytes (smaller -> more files -> more parallel-read throughput)
+ /** Default rows per parquet read batch (Delta Kernel default is 1024). A larger batch slightly
+ * reduces per-batch object/transform overhead; the dominant per-value decode cost is unaffected. */
+ public static final int DELTA_READER_BATCH_SIZE_DEFAULT = 4096;
+ /** Default target data-file size for the Delta writer in bytes (Delta Kernel default is 128MB). The
+ * native Delta read decode is CPU-bound and parallelizes across data files, so writing smaller files
+ * lets a parallel reader use more threads. 64MB roughly doubles the file count (and read parallelism)
+ * versus the default with only a modest increase in per-file metadata overhead. */
+ public static final long DELTA_WRITER_TARGET_FILE_SIZE_DEFAULT = 64L * 1024 * 1024;
public static final String PARALLEL_ENCODE = "sysds.parallel.encode"; // boolean: enable multi-threaded transformencode and apply
public static final String PARALLEL_ENCODE_STAGED = "sysds.parallel.encode.staged";
public static final String PARALLEL_ENCODE_APPLY_BLOCKS = "sysds.parallel.encode.applyBlocks";
@@ -158,6 +168,8 @@ public class DMLConfig
_defaultVals.put(CP_PARALLEL_OPS, "true" );
_defaultVals.put(CP_PARALLEL_IO, "true" );
_defaultVals.put(IO_COMPRESSION_CODEC, "none");
+ _defaultVals.put(DELTA_READER_BATCH_SIZE, String.valueOf(DELTA_READER_BATCH_SIZE_DEFAULT));
+ _defaultVals.put(DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(DELTA_WRITER_TARGET_FILE_SIZE_DEFAULT));
_defaultVals.put(PARALLEL_TOKENIZE, "false");
_defaultVals.put(PARALLEL_TOKENIZE_NUM_BLOCKS, "64");
_defaultVals.put(FRAME_TO_MATRIX_WARN_CAST, "false");
diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
index e14cfd31388..a8e1667d049 100644
--- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
@@ -1057,7 +1057,8 @@ public void constructHops(StatementBlock sb) {
case CSV:
case LIBSVM:
case HDF5:
- // write output in textcell format
+ case DELTA:
+ // columnar/text formats: no block layout (blocksize -1)
ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(), -1);
break;
case BINARY:
diff --git a/src/main/java/org/apache/sysds/parser/DataExpression.java b/src/main/java/org/apache/sysds/parser/DataExpression.java
index 22dbe21c187..68a3d1b7ffe 100644
--- a/src/main/java/org/apache/sysds/parser/DataExpression.java
+++ b/src/main/java/org/apache/sysds/parser/DataExpression.java
@@ -1178,6 +1178,10 @@ else if( getVarParam(READNNZPARAM) != null ) {
boolean isCOG = (formatTypeString != null && formatTypeString.equalsIgnoreCase(FileFormat.COG.toString()));
+ // Delta tables are self-describing (schema + dimensions discovered from the
+ // transaction log at read time), so dimensions are optional like CSV.
+ boolean isDelta = (formatTypeString != null && formatTypeString.equalsIgnoreCase(FileFormat.DELTA.toString()));
+
dataTypeString = (getVarParam(DATATYPEPARAM) == null) ? null : getVarParam(DATATYPEPARAM).toString();
if ( dataTypeString == null || dataTypeString.equalsIgnoreCase(Statement.MATRIX_DATA_TYPE)
@@ -1202,8 +1206,8 @@ else if( getVarParam(READNNZPARAM) != null ) {
// initialize size of target data identifier to UNKNOWN
getOutput().setDimensions(-1, -1);
- if (!isCSV && !isLIBSVM && !isHDF5 && !isCOG && ConfigurationManager.getCompilerConfig()
- .getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) //skip check for csv/libsvm format / jmlc api
+ if (!isCSV && !isLIBSVM && !isHDF5 && !isCOG && !isDelta && ConfigurationManager.getCompilerConfig()
+ .getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) //skip check for csv/libsvm/delta format / jmlc api
&& (getVarParam(READROWPARAM) == null || getVarParam(READCOLPARAM) == null) ) {
raiseValidateError("Missing or incomplete dimension information in read statement: "
+ mtdFileName, conditional, LanguageErrorCodes.INVALID_PARAMETERS);
@@ -1215,7 +1219,7 @@ && getVarParam(READCOLPARAM) instanceof ConstIdentifier)
// these are strings that are long values
Long dim1 = (getVarParam(READROWPARAM) == null) ? null : Long.valueOf( getVarParam(READROWPARAM).toString());
Long dim2 = (getVarParam(READCOLPARAM) == null) ? null : Long.valueOf( getVarParam(READCOLPARAM).toString());
- if ( !isCSV && (dim1 < 0 || dim2 < 0) && ConfigurationManager
+ if ( !isCSV && !isDelta && (dim1 < 0 || dim2 < 0) && ConfigurationManager
.getCompilerConfig().getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) ) {
raiseValidateError("Invalid dimension information in read statement", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
}
@@ -1333,7 +1337,8 @@ else if (valueTypeString.equalsIgnoreCase(ValueType.UNKNOWN.name()))
}
//validate read filename
- if (getVarParam(FORMAT_TYPE) == null || FileFormat.isTextFormat(getVarParam(FORMAT_TYPE).toString()))
+ if (getVarParam(FORMAT_TYPE) == null || FileFormat.isTextFormat(getVarParam(FORMAT_TYPE).toString())
+ || checkFormatType(FileFormat.DELTA)) //delta: columnar, no block layout
getOutput().setBlocksize(-1);
else if (checkFormatType(FileFormat.BINARY, FileFormat.COMPRESSED, FileFormat.UNKNOWN)) {
if( getVarParam(ROWBLOCKCOUNTPARAM)!=null )
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index 0b1a1ee27cb..5430a7a6c32 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -453,7 +453,8 @@ protected MatrixBlock readBlobFromHDFS(String fname, long[] dims) throws IOExcep
DataConverter.readMatrixFromHDFS(fname, iimd.getFileFormat(),
rlen, clen, blen, mc.getNonZeros(), getFileFormatProperties());
- if(iimd.getFileFormat() == FileFormat.CSV) {
+ if(iimd.getFileFormat() == FileFormat.CSV || iimd.getFileFormat() == FileFormat.DELTA) {
+ //dimensions/nnz are discovered at read time for these self-describing formats
_metaData = _metaData instanceof MetaDataFormat ? new MetaDataFormat(newData.getDataCharacteristics(),
iimd.getFileFormat()) : new MetaData(newData.getDataCharacteristics());
}
diff --git a/src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java b/src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java
new file mode 100644
index 00000000000..9485e4c9d54
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.util.HDFSTool;
+
+import io.delta.kernel.DataWriteContext;
+import io.delta.kernel.Operation;
+import io.delta.kernel.Scan;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Table;
+import io.delta.kernel.Transaction;
+import io.delta.kernel.TransactionBuilder;
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.ScanImpl;
+import io.delta.kernel.internal.data.ScanStateRow;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterable;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.DataFileStatus;
+import io.delta.kernel.utils.FileStatus;
+
+/**
+ * Shared helpers for the native (Spark-free) Delta Lake read/write paths used
+ * by both the matrix and frame readers/writers. Centralizes engine creation,
+ * path qualification, the scan loop (snapshot -> data files -> logical
+ * columnar batches, honoring deletion vectors), and the write transaction
+ * (logical data -> parquet -> commit).
+ */
+public class DeltaKernelUtils {
+
+ private static final String ENGINE_INFO = "Apache SystemDS";
+
+ /**
+ * Consumes a whole columnar batch. {@code selected} is {@code null} when all
+ * {@code size} rows are live; otherwise {@code selected[r]} indicates whether
+ * row {@code r} survived the deletion/selection vector. Batch-level consumption
+ * lets callers extract data column-at-a-time (cache friendly, boxing free)
+ * instead of paying a per-row callback.
+ */
+ @FunctionalInterface
+ public interface BatchConsumer {
+ void accept(ColumnVector[] cols, int size, boolean[] selected);
+ }
+
+ /** Internal Delta column type codes shared by the matrix and frame readers to
+ * dispatch boxing-free primitive column access. */
+ public static final int T_DOUBLE = 0;
+ public static final int T_FLOAT = 1;
+ public static final int T_LONG = 2;
+ public static final int T_INT = 3;
+ public static final int T_SHORT = 4;
+ public static final int T_BYTE = 5;
+ public static final int T_BOOLEAN = 6;
+ public static final int T_STRING = 7;
+
+ /**
+ * Map a Delta Kernel {@link DataType} to an internal type code (see the
+ * {@code T_*} constants). Returned once per column so the per-cell read loop
+ * can switch on a primitive int instead of repeating {@code instanceof} checks.
+ *
+ * @param dt the Delta column data type
+ * @return the matching {@code T_*} code, or {@code -1} if the type is not supported
+ */
+ public static int typeCode(DataType dt) {
+ if( dt instanceof DoubleType ) return T_DOUBLE;
+ if( dt instanceof FloatType ) return T_FLOAT;
+ if( dt instanceof LongType ) return T_LONG;
+ if( dt instanceof IntegerType ) return T_INT;
+ if( dt instanceof ShortType ) return T_SHORT;
+ if( dt instanceof ByteType ) return T_BYTE;
+ if( dt instanceof BooleanType ) return T_BOOLEAN;
+ if( dt instanceof StringType ) return T_STRING;
+ return -1;
+ }
+
+ /**
+ * @param size number of rows in the batch
+ * @param selected per-row selection mask, or {@code null} if all rows are live
+ * @return the number of live rows in the batch
+ */
+ public static int countSelected(int size, boolean[] selected) {
+ if(selected == null)
+ return size;
+ int n = 0;
+ for(int r = 0; r < size; r++)
+ if(selected[r])
+ n++;
+ return n;
+ }
+
+ private DeltaKernelUtils() {}
+
+ /** Delta Kernel config key: number of rows per parquet read batch. Defaults from
+ * {@link org.apache.sysds.conf.DMLConfig#DELTA_READER_BATCH_SIZE_DEFAULT}, overridable via
+ * {@link org.apache.sysds.conf.DMLConfig#DELTA_READER_BATCH_SIZE}. */
+ private static final String CONF_READER_BATCH_SIZE = "delta.kernel.default.parquet.reader.batch-size";
+ /** Delta Kernel config key: target size (bytes) at which the writer rolls a new data file. Defaults from
+ * {@link org.apache.sysds.conf.DMLConfig#DELTA_WRITER_TARGET_FILE_SIZE_DEFAULT}, overridable via
+ * {@link org.apache.sysds.conf.DMLConfig#DELTA_WRITER_TARGET_FILE_SIZE}. */
+ private static final String CONF_WRITER_TARGET_FILE_SIZE = "delta.kernel.default.parquet.writer.targetMaxFileSize";
+
+ //derived configuration cached to avoid copying the (large) base conf on every
+ //engine creation (createEngine is called once per data file in parallel reads);
+ //rebuilt whenever the base conf or the relevant SystemDS settings change.
+ private static Configuration cachedConf;
+ private static Configuration cachedConfBase;
+ private static int cachedBatchSize;
+ private static long cachedTargetFileSize;
+
+ private static synchronized Configuration deltaConf() {
+ Configuration base = ConfigurationManager.getCachedJobConf();
+ int batchSize = ConfigurationManager.getDeltaReaderBatchSize();
+ long targetFileSize = ConfigurationManager.getDeltaWriterTargetFileSize();
+ if(cachedConf == null || cachedConfBase != base
+ || cachedBatchSize != batchSize || cachedTargetFileSize != targetFileSize)
+ {
+ Configuration c = new Configuration(base);
+ c.setInt(CONF_READER_BATCH_SIZE, batchSize);
+ c.setLong(CONF_WRITER_TARGET_FILE_SIZE, targetFileSize);
+ cachedConf = c;
+ cachedConfBase = base;
+ cachedBatchSize = batchSize;
+ cachedTargetFileSize = targetFileSize;
+ }
+ return cachedConf;
+ }
+
+ public static Engine createEngine() {
+ return DefaultEngine.create(deltaConf());
+ }
+
+ /**
+ * Resolve a (possibly relative) path to a fully-qualified URI so the
+ * kernel's default engine can locate the table on the right filesystem.
+ *
+ * @param fname input path
+ * @return fully-qualified table path
+ */
+ public static String qualify(String fname) {
+ try {
+ Configuration conf = ConfigurationManager.getCachedJobConf();
+ Path path = new Path(fname);
+ return path.getFileSystem(conf).makeQualified(path).toString();
+ }
+ catch(IOException ex) {
+ throw new DMLRuntimeException("Failed to resolve Delta table path: " + fname, ex);
+ }
+ }
+
+ /**
+ * Opened latest snapshot of a Delta table: the logical schema plus everything
+ * needed to (re)read its data files, including the list of per-data-file scan
+ * rows. Delta Kernel scan-file rows are self-contained (the kernel's
+ * distributed design serializes them to workers), so they can be retained and
+ * read independently / in parallel.
+ */
+ public static final class ScanHandle {
+ public final StructType schema;
+ public final Row scanState;
+ public final StructType physicalReadSchema;
+ public final List scanFiles;
+ /**
+ * Per-file record counts taken from the Delta {@code numRecords} statistic,
+ * aligned with {@link #scanFiles}; {@code -1} where the statistic is absent.
+ */
+ public final long[] numRecords;
+ /**
+ * Per-file flag indicating a deletion vector is present (so the live row
+ * count differs from {@link #numRecords}), aligned with {@link #scanFiles}.
+ */
+ public final boolean[] hasDeletionVector;
+
+ private ScanHandle(StructType schema, Row scanState, StructType physicalReadSchema,
+ List scanFiles, long[] numRecords, boolean[] hasDeletionVector)
+ {
+ this.schema = schema;
+ this.scanState = scanState;
+ this.physicalReadSchema = physicalReadSchema;
+ this.scanFiles = scanFiles;
+ this.numRecords = numRecords;
+ this.hasDeletionVector = hasDeletionVector;
+ }
+
+ /**
+ * @return true iff every data file carries a {@code numRecords} statistic
+ * and none has a deletion vector, i.e. exact per-file row offsets
+ * can be derived from metadata without reading the data.
+ */
+ public boolean hasExactRowCounts() {
+ for( int i=0; i scanFileIter = (scan instanceof ScanImpl)
+ ? ((ScanImpl) scan).getScanFiles(engine, true)
+ : scan.getScanFiles(engine);
+
+ List files = new ArrayList<>();
+ List recs = new ArrayList<>();
+ List dvs = new ArrayList<>();
+ try( CloseableIterator scanFiles = scanFileIter ) {
+ while( scanFiles.hasNext() ) {
+ FilteredColumnarBatch scanFileBatch = scanFiles.next();
+ try( CloseableIterator scanFileRows = scanFileBatch.getRows() ) {
+ while( scanFileRows.hasNext() ) {
+ Row scanFileRow = scanFileRows.next();
+ files.add(scanFileRow);
+ recs.add(numRecords(scanFileRow));
+ dvs.add(InternalScanFileUtils.getDeletionVectorDescriptorFromRow(scanFileRow) != null);
+ }
+ }
+ }
+ }
+ long[] numRecords = new long[recs.size()];
+ boolean[] hasDv = new boolean[dvs.size()];
+ for( int i=0; i= '0' && stats.charAt(i) <= '9' ) {
+ val = val * 10 + (stats.charAt(i) - '0');
+ any = true;
+ i++;
+ }
+ return any ? val : -1;
+ }
+
+ /**
+ * Read a single Delta data file (identified by its scan-file row), decoding
+ * its parquet batches and applying any deletion vector, invoking the consumer
+ * once per (logical) batch. Safe to call concurrently for distinct files as
+ * long as each call uses its own {@code engine}.
+ *
+ * @param engine delta kernel engine
+ * @param scanState scan state from {@link #openScan}
+ * @param physicalReadSchema physical read schema from {@link #openScan}
+ * @param scanFileRow the data file's scan-file row
+ * @param consumer batch consumer
+ * @throws IOException on read failure
+ */
+ public static void readScanFile(Engine engine, Row scanState, StructType physicalReadSchema,
+ Row scanFileRow, BatchConsumer consumer) throws IOException
+ {
+ FileStatus dataFile = InternalScanFileUtils.getAddFileStatus(scanFileRow);
+ CloseableIterator physicalData = engine.getParquetHandler()
+ .readParquetFiles(Utils.singletonCloseableIterator(dataFile), physicalReadSchema, Optional.empty());
+ try( CloseableIterator logicalData =
+ Scan.transformPhysicalData(engine, scanState, scanFileRow, physicalData) )
+ {
+ while( logicalData.hasNext() )
+ consumeBatch(logicalData.next(), consumer);
+ }
+ }
+
+ /**
+ * Scan the latest snapshot of a Delta table sequentially, invoking the batch
+ * consumer for every data batch. The consumer is created lazily from the table
+ * schema (so callers can size buffers / derive per-column types up front).
+ *
+ * @param engine delta kernel engine
+ * @param tablePath fully-qualified table path
+ * @param consumerFactory builds the batch consumer from the table schema
+ * @return the logical table schema
+ * @throws IOException on read failure
+ */
+ public static StructType scan(Engine engine, String tablePath, Function consumerFactory)
+ throws IOException
+ {
+ ScanHandle h = openScan(engine, tablePath);
+ BatchConsumer consumer = consumerFactory.apply(h.schema);
+ for( Row scanFileRow : h.scanFiles )
+ readScanFile(engine, h.scanState, h.physicalReadSchema, scanFileRow, consumer);
+ return h.schema;
+ }
+
+ private static void consumeBatch(FilteredColumnarBatch fcb, BatchConsumer consumer) {
+ ColumnarBatch batch = fcb.getData();
+ int ncol = batch.getSchema().length();
+ ColumnVector[] cols = new ColumnVector[ncol];
+ for( int c=0; c all rows live)
+ Optional selVector = fcb.getSelectionVector();
+ boolean[] selected = null;
+ if( selVector.isPresent() ) {
+ ColumnVector sv = selVector.get();
+ selected = new boolean[size];
+ for( int r=0; r logicalData) throws IOException
+ {
+ //replace any existing table at the path (the other SystemDS writers delete
+ //the output first; the caching layer does not do it on our behalf)
+ HDFSTool.deleteFileIfExistOnHDFS(tablePath);
+
+ Table table = Table.forPath(engine, tablePath);
+ TransactionBuilder txnBuilder = table
+ .createTransactionBuilder(engine, ENGINE_INFO, Operation.CREATE_TABLE)
+ .withSchema(engine, schema);
+ Transaction txn = txnBuilder.build(engine);
+ Row txnState = txn.getTransactionState(engine);
+
+ CloseableIterator physicalData =
+ Transaction.transformLogicalData(engine, txnState, logicalData, Collections.emptyMap());
+ DataWriteContext writeContext =
+ Transaction.getWriteContext(engine, txnState, Collections.emptyMap());
+ CloseableIterator dataFiles = engine.getParquetHandler()
+ .writeParquetFiles(writeContext.getTargetDirectory(), physicalData, writeContext.getStatisticsColumns());
+ CloseableIterator appendActions =
+ Transaction.generateAppendActions(engine, txnState, dataFiles, writeContext);
+ txn.commit(engine, CloseableIterable.inMemoryIterable(appendActions));
+ }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java b/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java
index dc1c7da230c..e10d358d629 100644
--- a/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java
@@ -78,7 +78,11 @@ public static MatrixReader createMatrixReader(FileFormat fmt) {
case COMPRESSED:
reader = ReaderCompressed.create();
break;
-
+
+ case DELTA:
+ reader = par ? new ReaderDeltaParallel() : new ReaderDelta();
+ break;
+
default:
throw new DMLRuntimeException("Failed to create matrix reader for unknown format: " + fmt.toString());
}
@@ -140,6 +144,11 @@ public static MatrixReader createMatrixReader( ReadProperties props ) {
case COMPRESSED:
reader = new ReaderCompressed();
break;
+
+ case DELTA:
+ reader = par ? new ReaderDeltaParallel() : new ReaderDelta();
+ break;
+
default:
throw new DMLRuntimeException("Failed to create matrix reader for unknown format: " + fmt.toString());
}
diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java b/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java
index bb0b0c940f7..091194edc81 100644
--- a/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java
@@ -94,6 +94,9 @@ else if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE
case COMPRESSED:
return WriterCompressed.create(props);
+ case DELTA:
+ return new WriterDelta();
+
default:
throw new DMLRuntimeException("Failed to create matrix writer for unknown format: " + fmt.toString());
}
diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderDelta.java b/src/main/java/org/apache/sysds/runtime/io/ReaderDelta.java
new file mode 100644
index 00000000000..5d987e9a2a4
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderDelta.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.StructType;
+
+/**
+ * Single-threaded native Delta Lake reader for matrices, built on the
+ * Spark-free Delta Kernel library. It opens the latest snapshot of a Delta
+ * table directory, reads its parquet data files through the kernel's default
+ * engine (honoring deletion vectors), and materializes the numeric columns
+ * into a dense {@link MatrixBlock}.
+ *
+ *
Only numeric columns (double/float/long/int/short/byte/boolean) are
+ * supported, matching the all-double nature of a SystemDS matrix. Dimensions
+ * do not need to be known up front: the row count is discovered while scanning
+ * and the column count is taken from the table schema.
+ */
+public class ReaderDelta extends MatrixReader {
+
+ @Override
+ public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int blen, long estnnz)
+ throws IOException, DMLRuntimeException
+ {
+ Engine engine = DeltaKernelUtils.createEngine();
+ String tablePath = DeltaKernelUtils.qualify(fname);
+
+ //Scan column-at-a-time into one row-major buffer per batch (no per-row
+ //allocation, no boxing, no per-cell set()). Buffers are concatenated into
+ //the dense output via bulk array copies below.
+ ArrayList batches = new ArrayList<>();
+ int[] nrowH = new int[1];
+ StructType schema = DeltaKernelUtils.scan(engine, tablePath, sch -> {
+ int[] types = columnTypes(sch);
+ int ncol = sch.length();
+ return (cols, size, selected) -> {
+ batches.add(extractBatch(cols, size, selected, types, ncol));
+ nrowH[0] += DeltaKernelUtils.countSelected(size, selected);
+ };
+ });
+
+ int ncol = schema.length();
+ int nrow = nrowH[0];
+ long lestnnz = (estnnz >= 0) ? estnnz : (long) nrow * ncol;
+ MatrixBlock ret = createOutputMatrixBlock(nrow, ncol, Math.max(nrow, 1), lestnnz, true, false);
+
+ if( nrow > 0 && ncol > 0 )
+ fillDense(ret, batches);
+ ret.recomputeNonZeros();
+ ret.examSparsity();
+ return ret;
+ }
+
+ /** Derive the per-column internal type codes from the table schema. */
+ static int[] columnTypes(StructType schema) {
+ int ncol = schema.length();
+ int[] types = new int[ncol];
+ for( int c=0; c batches) {
+ DenseBlock db = ret.getDenseBlock();
+ if( db.isContiguous() ) {
+ double[] dv = db.valuesAt(0);
+ int off = 0;
+ for( double[] buf : batches ) {
+ System.arraycopy(buf, 0, dv, off, buf.length);
+ off += buf.length;
+ }
+ }
+ else {
+ //rare large multi-block fallback: route each row through the block API
+ int ncol = ret.getNumColumns();
+ int r = 0;
+ for( double[] buf : batches ) {
+ int rowsInBuf = buf.length / ncol;
+ for( int i=0; iThe expensive part of a Delta read is the parquet decode, which the kernel
+ * performs per data file; parallelizing across files is therefore the natural
+ * way to bridge the gap to the (near-raw) binary reader. A table backed by a
+ * single data file (the default for tables <= the parquet target file size)
+ * cannot be split this way, so the reader transparently falls back to the
+ * sequential {@link ReaderDelta} path in that case.
+ */
+public class ReaderDeltaParallel extends ReaderDelta {
+
+ private final int _numThreads;
+
+ public ReaderDeltaParallel() {
+ _numThreads = OptimizerUtils.getParallelBinaryReadParallelism();
+ }
+
+ @Override
+ public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int blen, long estnnz)
+ throws IOException, DMLRuntimeException
+ {
+ Engine engine = DeltaKernelUtils.createEngine();
+ String tablePath = DeltaKernelUtils.qualify(fname);
+ DeltaKernelUtils.ScanHandle handle = DeltaKernelUtils.openScan(engine, tablePath);
+
+ final int nfiles = handle.scanFiles.size();
+ //nothing to gain from parallelism for single-file (or empty) tables
+ if( _numThreads <= 1 || nfiles <= 1 )
+ return super.readMatrixFromHDFS(fname, rlen, clen, blen, estnnz);
+
+ final int ncol = handle.schema.length();
+ final int[] types = columnTypes(handle.schema);
+
+ //fast path: exact per-file row counts are known from metadata and the dense
+ //output fits a single contiguous array -> pre-size once and let each thread
+ //decode directly into its slice (no intermediate buffers, no serial copy).
+ if( useDirectPath(handle) ) {
+ long total = 0;
+ for( long r : handle.numRecords )
+ total += r;
+ if( total > 0 && (long) total * ncol <= Integer.MAX_VALUE )
+ return readDirect(fname, handle, ncol, types, (int) total, estnnz);
+ }
+
+ return readBuffered(fname, handle, ncol, types, estnnz);
+ }
+
+ /**
+ * Whether the metadata-driven direct-write fast path can be used for this
+ * table (exact per-file row counts and no deletion vectors). Visible for
+ * testing: the buffered fallback is otherwise only reachable for tables
+ * lacking row statistics or carrying deletion vectors, which the SystemDS
+ * Delta writer never produces.
+ *
+ * @param handle the opened scan handle
+ * @return true if the direct path is applicable
+ */
+ protected boolean useDirectPath(DeltaKernelUtils.ScanHandle handle) {
+ return handle.hasExactRowCounts();
+ }
+
+ /**
+ * Fast path: each thread decodes one data file straight into the final dense
+ * array at a metadata-derived row offset. Single allocation, fully parallel.
+ */
+ private MatrixBlock readDirect(String fname, DeltaKernelUtils.ScanHandle handle,
+ int ncol, int[] types, int nrow, long estnnz) throws IOException
+ {
+ final int nfiles = handle.scanFiles.size();
+ final int[] rowOffset = new int[nfiles];
+ int acc = 0;
+ for( int i=0; i> tasks = new ArrayList<>(nfiles);
+ for( int i=0; i {
+ int[] cur = new int[] {base};
+ Engine eng = DeltaKernelUtils.createEngine();
+ DeltaKernelUtils.readScanFile(eng, handle.scanState, handle.physicalReadSchema, scanFileRow,
+ (cols, size, selected) ->
+ cur[0] += extractBatchInto(cols, size, selected, types, ncol, dv, cur[0]));
+ return null;
+ });
+ }
+ for( Future