From b41b6db74f8275ba336d31669412fecbb9ea9b25 Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Fri, 26 Jun 2026 10:06:36 +0000 Subject: [PATCH 1/3] [SYSTEMDS-3949] Add native Delta Lake matrix read/write via Delta Kernel Introduce a DELTA file format that reads and writes Delta Lake tables natively through the Spark-free Delta Kernel library, for matrices on the single-node CP path. DML read/write with format="delta" now operates directly on Delta tables without a Spark DataFrame round-trip. - Add FileFormat.DELTA and exclude it from the text formats - Accept format="delta" with unknown dimensions in the parser and set blocksize -1 for the columnar format - Wire DELTA into the matrix reader and writer factories - Add DeltaKernelUtils plus serial and parallel native Delta readers and WriterDelta with column-at-a-time, boxing-free data transfer - Expose Delta reader batch size and writer target file size via DMLConfig - Refresh cached matrix metadata after a Delta read (discovered dimensions) - Add a parquet.version property and pin delta-kernel 3.3.2 - Run Delta component IO tests in CI and broaden matrix coverage Append/overwrite table semantics, distributed execution, frames, and time travel are out of scope. --- .github/workflows/javaTests.yml | 2 +- pom.xml | 33 + .../java/org/apache/sysds/common/Types.java | 3 +- .../sysds/conf/ConfigurationManager.java | 15 + .../java/org/apache/sysds/conf/DMLConfig.java | 6 + .../apache/sysds/parser/DMLTranslator.java | 3 +- .../apache/sysds/parser/DataExpression.java | 13 +- .../controlprogram/caching/MatrixObject.java | 3 +- .../sysds/runtime/io/DeltaKernelUtils.java | 425 +++++++++++++ .../sysds/runtime/io/MatrixReaderFactory.java | 11 +- .../sysds/runtime/io/MatrixWriterFactory.java | 3 + .../apache/sysds/runtime/io/ReaderDelta.java | 179 ++++++ .../sysds/runtime/io/ReaderDeltaParallel.java | 209 ++++++ .../apache/sysds/runtime/io/WriterDelta.java | 219 +++++++ .../io/DeltaMatrixReadWriteTest.java | 595 ++++++++++++++++++ .../io/delta/DeltaReadWriteTest.java | 130 ++++ .../functions/io/delta/DeltaReadCompare.dml | 34 + .../scripts/functions/io/delta/DeltaWrite.dml | 30 + 18 files changed, 1904 insertions(+), 9 deletions(-) create mode 100644 src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java create mode 100644 src/main/java/org/apache/sysds/runtime/io/ReaderDelta.java create mode 100644 src/main/java/org/apache/sysds/runtime/io/ReaderDeltaParallel.java create mode 100644 src/main/java/org/apache/sysds/runtime/io/WriterDelta.java create mode 100644 src/test/java/org/apache/sysds/test/component/io/DeltaMatrixReadWriteTest.java create mode 100644 src/test/java/org/apache/sysds/test/functions/io/delta/DeltaReadWriteTest.java create mode 100644 src/test/scripts/functions/io/delta/DeltaReadCompare.dml create mode 100644 src/test/scripts/functions/io/delta/DeltaWrite.dml diff --git a/.github/workflows/javaTests.yml b/.github/workflows/javaTests.yml index 61089807820..16e925d155b 100644 --- a/.github/workflows/javaTests.yml +++ b/.github/workflows/javaTests.yml @@ -60,7 +60,7 @@ jobs: "org.apache.sysds.test.applications.**", "**.test.usertest.**", "**.component.c**.**", - "**.component.e**.**,**.component.f**.**,**.component.m**.**,**.component.o**.**", + "**.component.e**.**,**.component.f**.**,**.component.i**.**,**.component.m**.**,**.component.o**.**", "**.component.p**.**,**.component.r**.**,**.component.s**.**,**.component.t**.**,**.component.u**.**", "**.functions.a**.**,**.functions.binary.matrix.**,**.functions.binary.scalar.**,**.functions.binary.tensor.**", "**.functions.blocks.**,**.functions.data.rand.**,", diff --git a/pom.xml b/pom.xml index cfd3d8464fb..44454361318 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,8 @@ 2.15.4 2.12.18 2.12 + 3.3.2 + 1.13.1 yyyy-MM-dd HH:mm:ss z 1 false @@ -968,6 +970,37 @@ + + + io.delta + delta-kernel-api + ${delta-kernel.version} + + + io.delta + delta-kernel-defaults + ${delta-kernel.version} + + + + org.apache.parquet + parquet-hadoop + ${parquet.version} + + + org.apache.parquet + parquet-column + ${parquet.version} + + + org.apache.parquet + parquet-common + ${parquet.version} + org.jcuda jcuda diff --git a/src/main/java/org/apache/sysds/common/Types.java b/src/main/java/org/apache/sysds/common/Types.java index c2832aeb8cd..624c9eed3c6 100644 --- a/src/main/java/org/apache/sysds/common/Types.java +++ b/src/main/java/org/apache/sysds/common/Types.java @@ -878,6 +878,7 @@ public enum FileFormat { HDF5, // Hierarchical Data Format (HDF) COG, // Cloud-optimized GeoTIFF PARQUET, // parquet format for columnar data storage + DELTA, // Delta Lake table (transaction log + parquet), read/written via Delta Kernel UNKNOWN; public boolean isIJV() { @@ -885,7 +886,7 @@ public boolean isIJV() { } public boolean isTextFormat() { - return this != BINARY && this != COMPRESSED; + return this != BINARY && this != COMPRESSED && this != DELTA; } public static boolean isTextFormat(String fmt) { diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java index 0d5ee888d86..83676da47a7 100644 --- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java +++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java @@ -258,6 +258,21 @@ public static int getFederatedTimeout(){ return getDMLConfig().getIntValue(DMLConfig.FEDERATED_TIMEOUT); } + /** @return rows per parquet read batch for the native Delta reader */ + public static int getDeltaReaderBatchSize() { + return getDMLConfig().getIntValue(DMLConfig.DELTA_READER_BATCH_SIZE); + } + + /** @return matrix rows materialized per columnar batch for the native Delta writer */ + public static int getDeltaWriterBatchSize() { + return getDMLConfig().getIntValue(DMLConfig.DELTA_WRITER_BATCH_SIZE); + } + + /** @return target data-file size (bytes) for the native Delta writer */ + public static long getDeltaWriterTargetFileSize() { + return Long.parseLong(getDMLConfig().getTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE)); + } + public static boolean isFederatedSSL(){ return getDMLConfig().getBooleanValue(DMLConfig.USE_SSL_FEDERATED_COMMUNICATION); } diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java index a6339656fb0..e06b58b07c8 100644 --- a/src/main/java/org/apache/sysds/conf/DMLConfig.java +++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java @@ -71,6 +71,9 @@ public class DMLConfig public static final String CP_PARALLEL_OPS = "sysds.cp.parallel.ops"; public static final String CP_PARALLEL_IO = "sysds.cp.parallel.io"; public static final String IO_COMPRESSION_CODEC = "sysds.io.compression.encoding"; + public static final String DELTA_READER_BATCH_SIZE = "sysds.io.delta.reader.batchsize"; // int: rows per parquet read batch + public static final String DELTA_WRITER_BATCH_SIZE = "sysds.io.delta.writer.batchsize"; // int: matrix rows materialized per columnar batch handed to the engine + public static final String DELTA_WRITER_TARGET_FILE_SIZE = "sysds.io.delta.writer.targetfilesize"; // long: target data-file size in bytes (smaller -> more files -> more parallel-read throughput) public static final String PARALLEL_ENCODE = "sysds.parallel.encode"; // boolean: enable multi-threaded transformencode and apply public static final String PARALLEL_ENCODE_STAGED = "sysds.parallel.encode.staged"; public static final String PARALLEL_ENCODE_APPLY_BLOCKS = "sysds.parallel.encode.applyBlocks"; @@ -158,6 +161,9 @@ public class DMLConfig _defaultVals.put(CP_PARALLEL_OPS, "true" ); _defaultVals.put(CP_PARALLEL_IO, "true" ); _defaultVals.put(IO_COMPRESSION_CODEC, "none"); + _defaultVals.put(DELTA_READER_BATCH_SIZE, "4096"); // rows per parquet read batch (Delta Kernel default 1024) + _defaultVals.put(DELTA_WRITER_BATCH_SIZE, "4096"); // matrix rows materialized per columnar batch handed to the engine + _defaultVals.put(DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(64L * 1024 * 1024)); // 64MB target data-file size (Delta Kernel default 128MB) -> more files -> more parallel-read throughput _defaultVals.put(PARALLEL_TOKENIZE, "false"); _defaultVals.put(PARALLEL_TOKENIZE_NUM_BLOCKS, "64"); _defaultVals.put(FRAME_TO_MATRIX_WARN_CAST, "false"); diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java b/src/main/java/org/apache/sysds/parser/DMLTranslator.java index e14cfd31388..a8e1667d049 100644 --- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java +++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java @@ -1057,7 +1057,8 @@ public void constructHops(StatementBlock sb) { case CSV: case LIBSVM: case HDF5: - // write output in textcell format + case DELTA: + // columnar/text formats: no block layout (blocksize -1) ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(), -1); break; case BINARY: diff --git a/src/main/java/org/apache/sysds/parser/DataExpression.java b/src/main/java/org/apache/sysds/parser/DataExpression.java index 22dbe21c187..68a3d1b7ffe 100644 --- a/src/main/java/org/apache/sysds/parser/DataExpression.java +++ b/src/main/java/org/apache/sysds/parser/DataExpression.java @@ -1178,6 +1178,10 @@ else if( getVarParam(READNNZPARAM) != null ) { boolean isCOG = (formatTypeString != null && formatTypeString.equalsIgnoreCase(FileFormat.COG.toString())); + // Delta tables are self-describing (schema + dimensions discovered from the + // transaction log at read time), so dimensions are optional like CSV. + boolean isDelta = (formatTypeString != null && formatTypeString.equalsIgnoreCase(FileFormat.DELTA.toString())); + dataTypeString = (getVarParam(DATATYPEPARAM) == null) ? null : getVarParam(DATATYPEPARAM).toString(); if ( dataTypeString == null || dataTypeString.equalsIgnoreCase(Statement.MATRIX_DATA_TYPE) @@ -1202,8 +1206,8 @@ else if( getVarParam(READNNZPARAM) != null ) { // initialize size of target data identifier to UNKNOWN getOutput().setDimensions(-1, -1); - if (!isCSV && !isLIBSVM && !isHDF5 && !isCOG && ConfigurationManager.getCompilerConfig() - .getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) //skip check for csv/libsvm format / jmlc api + if (!isCSV && !isLIBSVM && !isHDF5 && !isCOG && !isDelta && ConfigurationManager.getCompilerConfig() + .getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) //skip check for csv/libsvm/delta format / jmlc api && (getVarParam(READROWPARAM) == null || getVarParam(READCOLPARAM) == null) ) { raiseValidateError("Missing or incomplete dimension information in read statement: " + mtdFileName, conditional, LanguageErrorCodes.INVALID_PARAMETERS); @@ -1215,7 +1219,7 @@ && getVarParam(READCOLPARAM) instanceof ConstIdentifier) // these are strings that are long values Long dim1 = (getVarParam(READROWPARAM) == null) ? null : Long.valueOf( getVarParam(READROWPARAM).toString()); Long dim2 = (getVarParam(READCOLPARAM) == null) ? null : Long.valueOf( getVarParam(READCOLPARAM).toString()); - if ( !isCSV && (dim1 < 0 || dim2 < 0) && ConfigurationManager + if ( !isCSV && !isDelta && (dim1 < 0 || dim2 < 0) && ConfigurationManager .getCompilerConfig().getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) ) { raiseValidateError("Invalid dimension information in read statement", conditional, LanguageErrorCodes.INVALID_PARAMETERS); } @@ -1333,7 +1337,8 @@ else if (valueTypeString.equalsIgnoreCase(ValueType.UNKNOWN.name())) } //validate read filename - if (getVarParam(FORMAT_TYPE) == null || FileFormat.isTextFormat(getVarParam(FORMAT_TYPE).toString())) + if (getVarParam(FORMAT_TYPE) == null || FileFormat.isTextFormat(getVarParam(FORMAT_TYPE).toString()) + || checkFormatType(FileFormat.DELTA)) //delta: columnar, no block layout getOutput().setBlocksize(-1); else if (checkFormatType(FileFormat.BINARY, FileFormat.COMPRESSED, FileFormat.UNKNOWN)) { if( getVarParam(ROWBLOCKCOUNTPARAM)!=null ) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java index 0b1a1ee27cb..5430a7a6c32 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java @@ -453,7 +453,8 @@ protected MatrixBlock readBlobFromHDFS(String fname, long[] dims) throws IOExcep DataConverter.readMatrixFromHDFS(fname, iimd.getFileFormat(), rlen, clen, blen, mc.getNonZeros(), getFileFormatProperties()); - if(iimd.getFileFormat() == FileFormat.CSV) { + if(iimd.getFileFormat() == FileFormat.CSV || iimd.getFileFormat() == FileFormat.DELTA) { + //dimensions/nnz are discovered at read time for these self-describing formats _metaData = _metaData instanceof MetaDataFormat ? new MetaDataFormat(newData.getDataCharacteristics(), iimd.getFileFormat()) : new MetaData(newData.getDataCharacteristics()); } diff --git a/src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java b/src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java new file mode 100644 index 00000000000..1e06f9acb56 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.util.HDFSTool; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.delta.kernel.DataWriteContext; +import io.delta.kernel.Operation; +import io.delta.kernel.Scan; +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.Transaction; +import io.delta.kernel.TransactionBuilder; +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.internal.ScanImpl; +import io.delta.kernel.internal.data.ScanStateRow; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterable; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.DataFileStatus; +import io.delta.kernel.utils.FileStatus; + +/** + * Shared helpers for the native (Spark-free) Delta Lake read/write paths used + * by both the matrix and frame readers/writers. Centralizes engine creation, + * path qualification, the scan loop (snapshot -> data files -> logical + * columnar batches, honoring deletion vectors), and the write transaction + * (logical data -> parquet -> commit). + */ +public class DeltaKernelUtils { + + private static final String ENGINE_INFO = "Apache SystemDS"; + + /** Reused thread-safe JSON reader for the per-file Delta stats (numRecords). */ + private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + + /** Delta Kernel config key: number of rows per parquet read batch, overridable via + * {@link org.apache.sysds.conf.DMLConfig#DELTA_READER_BATCH_SIZE}. */ + private static final String CONF_READER_BATCH_SIZE = "delta.kernel.default.parquet.reader.batch-size"; + /** Delta Kernel config key: target size (bytes) at which the writer rolls a new data file, overridable via + * {@link org.apache.sysds.conf.DMLConfig#DELTA_WRITER_TARGET_FILE_SIZE}. */ + private static final String CONF_WRITER_TARGET_FILE_SIZE = "delta.kernel.default.parquet.writer.targetMaxFileSize"; + + /** Internal Delta column type codes shared by the matrix and frame readers to + * dispatch boxing-free primitive column access. */ + public static final int T_DOUBLE = 0; + public static final int T_FLOAT = 1; + public static final int T_LONG = 2; + public static final int T_INT = 3; + public static final int T_SHORT = 4; + public static final int T_BYTE = 5; + public static final int T_BOOLEAN = 6; + public static final int T_STRING = 7; + + //derived configuration cached to avoid copying the (large) base conf on every + //engine creation (createEngine is called once per data file in parallel reads); + //rebuilt whenever the base conf or the relevant SystemDS settings change. + private static Configuration cachedConf; + private static Configuration cachedConfBase; + private static int cachedBatchSize; + private static long cachedTargetFileSize; + + private DeltaKernelUtils() {} + + /** + * Consumes a whole columnar batch. {@code selected} is {@code null} when all + * {@code size} rows are live; otherwise {@code selected[r]} indicates whether + * row {@code r} survived the deletion/selection vector. Batch-level consumption + * lets callers extract data column-at-a-time (cache friendly, boxing free) + * instead of paying a per-row callback. + */ + @FunctionalInterface + public interface BatchConsumer { + void accept(ColumnVector[] cols, int size, boolean[] selected); + } + + /** + * Map a Delta Kernel {@link DataType} to an internal type code (see the + * {@code T_*} constants). Returned once per column so the per-cell read loop + * can switch on a primitive int instead of repeating {@code instanceof} checks. + * + * @param dt the Delta column data type + * @return the matching {@code T_*} code, or {@code -1} if the type is not supported + */ + public static int typeCode(DataType dt) { + if( dt instanceof DoubleType ) return T_DOUBLE; + if( dt instanceof FloatType ) return T_FLOAT; + if( dt instanceof LongType ) return T_LONG; + if( dt instanceof IntegerType ) return T_INT; + if( dt instanceof ShortType ) return T_SHORT; + if( dt instanceof ByteType ) return T_BYTE; + if( dt instanceof BooleanType ) return T_BOOLEAN; + if( dt instanceof StringType ) return T_STRING; + return -1; + } + + /** + * @param size number of rows in the batch + * @param selected per-row selection mask, or {@code null} if all rows are live + * @return the number of live rows in the batch + */ + public static int countSelected(int size, boolean[] selected) { + if(selected == null) + return size; + int n = 0; + for(int r = 0; r < size; r++) + if(selected[r]) + n++; + return n; + } + + private static synchronized Configuration deltaConf() { + Configuration base = ConfigurationManager.getCachedJobConf(); + int batchSize = ConfigurationManager.getDeltaReaderBatchSize(); + long targetFileSize = ConfigurationManager.getDeltaWriterTargetFileSize(); + if(cachedConf == null || cachedConfBase != base + || cachedBatchSize != batchSize || cachedTargetFileSize != targetFileSize) + { + Configuration c = new Configuration(base); + c.setInt(CONF_READER_BATCH_SIZE, batchSize); + c.setLong(CONF_WRITER_TARGET_FILE_SIZE, targetFileSize); + cachedConf = c; + cachedConfBase = base; + cachedBatchSize = batchSize; + cachedTargetFileSize = targetFileSize; + } + return cachedConf; + } + + public static Engine createEngine() { + return DefaultEngine.create(deltaConf()); + } + + /** + * Resolve a (possibly relative) path to a fully-qualified URI so the + * kernel's default engine can locate the table on the right filesystem. + * + * @param fname input path + * @return fully-qualified table path + */ + public static String qualify(String fname) { + try { + Configuration conf = ConfigurationManager.getCachedJobConf(); + Path path = new Path(fname); + return path.getFileSystem(conf).makeQualified(path).toString(); + } + catch(IOException ex) { + throw new DMLRuntimeException("Failed to resolve Delta table path: " + fname, ex); + } + } + + /** + * Opened latest snapshot of a Delta table: the logical schema plus everything + * needed to (re)read its data files, including the list of per-data-file scan + * rows. Delta Kernel scan-file rows are self-contained (the kernel's + * distributed design serializes them to workers), so they can be retained and + * read independently / in parallel. + */ + public static final class ScanHandle { + public final StructType schema; + public final Row scanState; + public final StructType physicalReadSchema; + public final List scanFiles; + /** + * Per-file record counts taken from the Delta {@code numRecords} statistic, + * aligned with {@link #scanFiles}; {@code -1} where the statistic is absent. + */ + public final long[] numRecords; + /** + * Per-file flag indicating a deletion vector is present (so the live row + * count differs from {@link #numRecords}), aligned with {@link #scanFiles}. + */ + public final boolean[] hasDeletionVector; + + private ScanHandle(StructType schema, Row scanState, StructType physicalReadSchema, + List scanFiles, long[] numRecords, boolean[] hasDeletionVector) + { + this.schema = schema; + this.scanState = scanState; + this.physicalReadSchema = physicalReadSchema; + this.scanFiles = scanFiles; + this.numRecords = numRecords; + this.hasDeletionVector = hasDeletionVector; + } + + /** + * @return true iff every data file carries a {@code numRecords} statistic + * and none has a deletion vector, i.e. exact per-file row offsets + * can be derived from metadata without reading the data. + */ + public boolean hasExactRowCounts() { + for( int i=0; i scanFileIter = (scan instanceof ScanImpl) + ? ((ScanImpl) scan).getScanFiles(engine, true) + : scan.getScanFiles(engine); + + List files = new ArrayList<>(); + List recs = new ArrayList<>(); + List dvs = new ArrayList<>(); + try( CloseableIterator scanFiles = scanFileIter ) { + while( scanFiles.hasNext() ) { + FilteredColumnarBatch scanFileBatch = scanFiles.next(); + try( CloseableIterator scanFileRows = scanFileBatch.getRows() ) { + while( scanFileRows.hasNext() ) { + Row scanFileRow = scanFileRows.next(); + files.add(scanFileRow); + recs.add(numRecords(scanFileRow)); + dvs.add(InternalScanFileUtils.getDeletionVectorDescriptorFromRow(scanFileRow) != null); + } + } + } + } + long[] numRecords = new long[recs.size()]; + boolean[] hasDv = new boolean[dvs.size()]; + for( int i=0; i physicalData = engine.getParquetHandler() + .readParquetFiles(Utils.singletonCloseableIterator(dataFile), physicalReadSchema, Optional.empty()); + try( CloseableIterator logicalData = + Scan.transformPhysicalData(engine, scanState, scanFileRow, physicalData) ) + { + while( logicalData.hasNext() ) + consumeBatch(logicalData.next(), consumer); + } + } + + /** + * Scan the latest snapshot of a Delta table sequentially, invoking the batch + * consumer for every data batch. The consumer is created lazily from the table + * schema (so callers can size buffers / derive per-column types up front). + * + * @param engine delta kernel engine + * @param tablePath fully-qualified table path + * @param consumerFactory builds the batch consumer from the table schema + * @return the logical table schema + * @throws IOException on read failure + */ + public static StructType scan(Engine engine, String tablePath, Function consumerFactory) + throws IOException + { + ScanHandle h = openScan(engine, tablePath); + BatchConsumer consumer = consumerFactory.apply(h.schema); + for( Row scanFileRow : h.scanFiles ) + readScanFile(engine, h.scanState, h.physicalReadSchema, scanFileRow, consumer); + return h.schema; + } + + private static void consumeBatch(FilteredColumnarBatch fcb, BatchConsumer consumer) { + ColumnarBatch batch = fcb.getData(); + int ncol = batch.getSchema().length(); + ColumnVector[] cols = new ColumnVector[ncol]; + for( int c=0; c all rows live) + Optional selVector = fcb.getSelectionVector(); + boolean[] selected = null; + if( selVector.isPresent() ) { + ColumnVector sv = selVector.get(); + selected = new boolean[size]; + for( int r=0; r logicalData) throws IOException + { + //replace any existing table at the path (the other SystemDS writers delete + //the output first; the caching layer does not do it on our behalf) + HDFSTool.deleteFileIfExistOnHDFS(tablePath); + + Table table = Table.forPath(engine, tablePath); + TransactionBuilder txnBuilder = table + .createTransactionBuilder(engine, ENGINE_INFO, Operation.CREATE_TABLE) + .withSchema(engine, schema); + Transaction txn = txnBuilder.build(engine); + Row txnState = txn.getTransactionState(engine); + + CloseableIterator physicalData = + Transaction.transformLogicalData(engine, txnState, logicalData, Collections.emptyMap()); + DataWriteContext writeContext = + Transaction.getWriteContext(engine, txnState, Collections.emptyMap()); + CloseableIterator dataFiles = engine.getParquetHandler() + .writeParquetFiles(writeContext.getTargetDirectory(), physicalData, writeContext.getStatisticsColumns()); + CloseableIterator appendActions = + Transaction.generateAppendActions(engine, txnState, dataFiles, writeContext); + txn.commit(engine, CloseableIterable.inMemoryIterable(appendActions)); + } +} diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java b/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java index dc1c7da230c..e10d358d629 100644 --- a/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java +++ b/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java @@ -78,7 +78,11 @@ public static MatrixReader createMatrixReader(FileFormat fmt) { case COMPRESSED: reader = ReaderCompressed.create(); break; - + + case DELTA: + reader = par ? new ReaderDeltaParallel() : new ReaderDelta(); + break; + default: throw new DMLRuntimeException("Failed to create matrix reader for unknown format: " + fmt.toString()); } @@ -140,6 +144,11 @@ public static MatrixReader createMatrixReader( ReadProperties props ) { case COMPRESSED: reader = new ReaderCompressed(); break; + + case DELTA: + reader = par ? new ReaderDeltaParallel() : new ReaderDelta(); + break; + default: throw new DMLRuntimeException("Failed to create matrix reader for unknown format: " + fmt.toString()); } diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java b/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java index bb0b0c940f7..091194edc81 100644 --- a/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java +++ b/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java @@ -94,6 +94,9 @@ else if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE case COMPRESSED: return WriterCompressed.create(props); + case DELTA: + return new WriterDelta(); + default: throw new DMLRuntimeException("Failed to create matrix writer for unknown format: " + fmt.toString()); } diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderDelta.java b/src/main/java/org/apache/sysds/runtime/io/ReaderDelta.java new file mode 100644 index 00000000000..58a98741975 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/ReaderDelta.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; + +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.StructType; + +/** + * Single-threaded native Delta Lake reader for matrices, built on the + * Spark-free Delta Kernel library. It opens the latest snapshot of a Delta + * table directory, reads its parquet data files through the kernel's default + * engine (honoring deletion vectors), and materializes the numeric columns + * into a dense {@link MatrixBlock}. + * + *

Only numeric columns (double/float/long/int/short/byte/boolean) are + * supported, matching the all-double nature of a SystemDS matrix. Dimensions + * do not need to be known up front: the row count is discovered while scanning + * and the column count is taken from the table schema.

+ */ +public class ReaderDelta extends MatrixReader { + + @Override + public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int blen, long estnnz) + throws IOException, DMLRuntimeException + { + Engine engine = DeltaKernelUtils.createEngine(); + String tablePath = DeltaKernelUtils.qualify(fname); + + //Scan column-at-a-time into one row-major buffer per batch (no per-row + //allocation, no boxing, no per-cell set()). Buffers are concatenated into + //the dense output via bulk array copies below. + ArrayList batches = new ArrayList<>(); + int[] nrowH = new int[1]; + StructType schema = DeltaKernelUtils.scan(engine, tablePath, sch -> { + int[] types = columnTypes(sch); + int ncol = sch.length(); + return (cols, size, selected) -> { + batches.add(extractBatch(cols, size, selected, types, ncol)); + nrowH[0] += DeltaKernelUtils.countSelected(size, selected); + }; + }); + + int ncol = schema.length(); + int nrow = nrowH[0]; + long lestnnz = (estnnz >= 0) ? estnnz : (long) nrow * ncol; + MatrixBlock ret = createOutputMatrixBlock(nrow, ncol, Math.max(nrow, 1), lestnnz, true, false); + + if( nrow > 0 && ncol > 0 ) + fillDense(ret, batches); + ret.recomputeNonZeros(); + ret.examSparsity(); + return ret; + } + + /** Derive the per-column internal type codes from the table schema. */ + static int[] columnTypes(StructType schema) { + int ncol = schema.length(); + int[] types = new int[ncol]; + for( int c=0; c batches) { + DenseBlock db = ret.getDenseBlock(); + if( db.isContiguous() ) { + double[] dv = db.valuesAt(0); + int off = 0; + for( double[] buf : batches ) { + System.arraycopy(buf, 0, dv, off, buf.length); + off += buf.length; + } + } + else { + //rare large multi-block fallback: route each row through the block API + int ncol = ret.getNumColumns(); + int r = 0; + for( double[] buf : batches ) { + int rowsInBuf = buf.length / ncol; + for( int i=0; iThe expensive part of a Delta read is the parquet decode, which the kernel + * performs per data file; parallelizing across files is therefore the natural + * way to bridge the gap to the (near-raw) binary reader. A table backed by a + * single data file (the default for tables <= the parquet target file size) + * cannot be split this way, so the reader transparently falls back to the + * sequential {@link ReaderDelta} path in that case.

+ */ +public class ReaderDeltaParallel extends ReaderDelta { + + private final int _numThreads; + + public ReaderDeltaParallel() { + _numThreads = OptimizerUtils.getParallelBinaryReadParallelism(); + } + + @Override + public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int blen, long estnnz) + throws IOException, DMLRuntimeException + { + Engine engine = DeltaKernelUtils.createEngine(); + String tablePath = DeltaKernelUtils.qualify(fname); + DeltaKernelUtils.ScanHandle handle = DeltaKernelUtils.openScan(engine, tablePath); + + final int nfiles = handle.scanFiles.size(); + //nothing to gain from parallelism for single-file (or empty) tables + if( _numThreads <= 1 || nfiles <= 1 ) + return super.readMatrixFromHDFS(fname, rlen, clen, blen, estnnz); + + final int ncol = handle.schema.length(); + final int[] types = columnTypes(handle.schema); + + //fast path: exact per-file row counts are known from metadata and the dense + //output fits a single contiguous array -> pre-size once and let each thread + //decode directly into its slice (no intermediate buffers, no serial copy). + if( useDirectPath(handle) ) { + long total = 0; + for( long r : handle.numRecords ) + total += r; + if( total > 0 && (long) total * ncol <= Integer.MAX_VALUE ) + return readDirect(fname, handle, ncol, types, (int) total, estnnz); + } + + return readBuffered(fname, handle, ncol, types, estnnz); + } + + /** + * Whether the metadata-driven direct-write fast path can be used for this + * table (exact per-file row counts and no deletion vectors). Visible for + * testing: the buffered fallback is otherwise only reachable for tables + * lacking row statistics or carrying deletion vectors, which the SystemDS + * Delta writer never produces. + * + * @param handle the opened scan handle + * @return true if the direct path is applicable + */ + protected boolean useDirectPath(DeltaKernelUtils.ScanHandle handle) { + return handle.hasExactRowCounts(); + } + + /** + * Fast path: each thread decodes one data file straight into the final dense + * array at a metadata-derived row offset. Single allocation, fully parallel. + */ + private MatrixBlock readDirect(String fname, DeltaKernelUtils.ScanHandle handle, + int ncol, int[] types, int nrow, long estnnz) throws IOException + { + final int nfiles = handle.scanFiles.size(); + final int[] rowOffset = new int[nfiles]; + int acc = 0; + for( int i=0; i> tasks = new ArrayList<>(nfiles); + for( int i=0; i { + int[] cur = new int[] {base}; + Engine eng = DeltaKernelUtils.createEngine(); + DeltaKernelUtils.readScanFile(eng, handle.scanState, handle.physicalReadSchema, scanFileRow, + (cols, size, selected) -> + cur[0] += extractBatchInto(cols, size, selected, types, ncol, dv, cur[0])); + return null; + }); + } + awaitFileTasks(tasks, fname); + + ret.recomputeNonZeros(_numThreads); + ret.examSparsity(); + return ret; + } + + /** + * Fallback path: decode each file in parallel into per-file buffers (used when + * row counts are unknown, deletion vectors are present, or the matrix exceeds a + * single contiguous array), then concatenate in file order. + */ + private MatrixBlock readBuffered(String fname, DeltaKernelUtils.ScanHandle handle, + int ncol, int[] types, long estnnz) throws IOException + { + final int nfiles = handle.scanFiles.size(); + @SuppressWarnings("unchecked") + final ArrayList[] fileBufs = new ArrayList[nfiles]; + final int[] fileRows = new int[nfiles]; + ArrayList> tasks = new ArrayList<>(nfiles); + for( int i=0; i { + ArrayList bufs = new ArrayList<>(); + int[] rows = new int[1]; + Engine eng = DeltaKernelUtils.createEngine(); + DeltaKernelUtils.readScanFile(eng, handle.scanState, handle.physicalReadSchema, scanFileRow, + (cols, size, selected) -> { + bufs.add(extractBatch(cols, size, selected, types, ncol)); + rows[0] += DeltaKernelUtils.countSelected(size, selected); + }); + fileBufs[fi] = bufs; + fileRows[fi] = rows[0]; + return null; + }); + } + awaitFileTasks(tasks, fname); + + int nrow = 0; + for( int i=0; i ordered = new ArrayList<>(); + for( int i=0; i= 0) ? estnnz : (long) nrow * ncol; + MatrixBlock ret = createOutputMatrixBlock(nrow, ncol, Math.max(nrow, 1), lestnnz, true, false); + if( nrow > 0 && ncol > 0 ) + fillDense(ret, ordered); + ret.recomputeNonZeros(); + ret.examSparsity(); + return ret; + } + + /** + * Run one decode task per data file on the shared common thread pool and await + * completion. Full parallelism is requested (the task count, one per data file, + * naturally caps concurrency); this avoids the per-thread pool-size caching in + * {@code CommonThreadPool.get(k)} that could otherwise throttle this reader to a + * smaller pool created earlier on the same thread. + */ + private void awaitFileTasks(List> tasks, String fname) throws IOException { + ExecutorService pool = CommonThreadPool.get(_numThreads); + try { + for( Future f : pool.invokeAll(tasks) ) + f.get(); + } + catch(Exception ex) { + throw new IOException("Failed parallel read of Delta table: " + fname, ex); + } + finally { + pool.shutdown(); + } + } +} diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterDelta.java b/src/main/java/org/apache/sysds/runtime/io/WriterDelta.java new file mode 100644 index 00000000000..0f08bf5517d --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/WriterDelta.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.Optional; + +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.runtime.instructions.ooc.OOCStream; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterable; +import io.delta.kernel.utils.CloseableIterator; + +/** + * Single-threaded native Delta Lake writer for matrices, built on the + * Spark-free Delta Kernel library. It creates a Delta table at the target + * directory with an all-double schema {@code c0..c(n-1)} (replacing any existing + * table at that path), streams the {@link MatrixBlock} rows as columnar batches + * into parquet data files via the kernel's default engine, and commits the + * corresponding add-file actions to the transaction log. + */ +public class WriterDelta extends MatrixWriter { + + @Override + public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int blen, long nnz, boolean diag) + throws IOException + { + if( src.getNumRows() != rlen || src.getNumColumns() != clen ) + throw new IOException("Matrix dimensions mismatch with metadata: (" + + src.getNumRows() + "x" + src.getNumColumns() + ") vs (" + rlen + "x" + clen + ")."); + int ncol = (int) clen; + int nrow = (int) rlen; + int batchRows = ConfigurationManager.getDeltaWriterBatchSize(); + //fast path: a contiguous dense block lets the column views read straight + //from the backing double[] (avoids per-cell MatrixBlock.get dispatch). + double[] dense = (!src.isInSparseFormat() && src.getDenseBlock() != null + && src.getDenseBlock().isContiguous()) ? src.getDenseBlockValues() : null; + Engine engine = DeltaKernelUtils.createEngine(); + DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(fname), + buildSchema(ncol), new MatrixBatchIterator(src, dense, nrow, ncol, batchRows)); + } + + @Override + public void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, int blen) + throws IOException + { + //empty table: create with schema but no data files + Engine engine = DeltaKernelUtils.createEngine(); + DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(fname), + buildSchema((int) clen), CloseableIterable.emptyIterable().iterator()); + } + + private static StructType buildSchema(int ncol) { + StructType schema = new StructType(); + for( int c=0; c stream, long rlen, long clen, int blen) + throws IOException + { + throw new UnsupportedOperationException("Out-of-core stream write is not supported for the Delta format."); + } + + /** Chunks a MatrixBlock into fixed-size columnar batches for the kernel write path. */ + private static class MatrixBatchIterator implements CloseableIterator { + private final MatrixBlock _mb; + private final double[] _dense; + private final int _nrow; + private final int _ncol; + private final int _batchRows; + private final StructType _schema; + private int _pos = 0; + + MatrixBatchIterator(MatrixBlock mb, double[] dense, int nrow, int ncol, int batchRows) { + _mb = mb; + _dense = dense; + _nrow = nrow; + _ncol = ncol; + _batchRows = batchRows; + _schema = buildSchema(ncol); + } + + @Override + public boolean hasNext() { + return _pos < _nrow; + } + + @Override + public FilteredColumnarBatch next() { + if( !hasNext() ) + throw new NoSuchElementException(); + int size = Math.min(_batchRows, _nrow - _pos); + ColumnarBatch batch = new MatrixColumnarBatch(_mb, _dense, _schema, _pos, size, _ncol); + _pos += size; + //no selection vector: all rows in the batch are written + return new FilteredColumnarBatch(batch, Optional.empty()); + } + + @Override + public void close() { + //nothing to release + } + } + + /** Read-only view of a row range of a MatrixBlock as a Delta Kernel columnar batch. */ + private static class MatrixColumnarBatch implements ColumnarBatch { + private final MatrixBlock _mb; + private final double[] _dense; + private final StructType _schema; + private final int _rowStart; + private final int _size; + private final int _ncol; + + MatrixColumnarBatch(MatrixBlock mb, double[] dense, StructType schema, int rowStart, int size, int ncol) { + _mb = mb; + _dense = dense; + _schema = schema; + _rowStart = rowStart; + _size = size; + _ncol = ncol; + } + + @Override + public StructType getSchema() { + return _schema; + } + + @Override + public ColumnVector getColumnVector(int ordinal) { + if( ordinal < 0 || ordinal >= _ncol ) + throw new IndexOutOfBoundsException("column ordinal " + ordinal); + return new MatrixColumnVector(_mb, _dense, _rowStart, _size, _ncol, ordinal); + } + + @Override + public int getSize() { + return _size; + } + } + + /** Read-only double column view over one column of a MatrixBlock row range. */ + private static class MatrixColumnVector implements ColumnVector { + private final MatrixBlock _mb; + private final double[] _dense; // contiguous dense backing array, or null + private final int _rowStart; + private final int _size; + private final int _ncol; + private final int _col; + + MatrixColumnVector(MatrixBlock mb, double[] dense, int rowStart, int size, int ncol, int col) { + _mb = mb; + _dense = dense; + _rowStart = rowStart; + _size = size; + _ncol = ncol; + _col = col; + } + + @Override + public DataType getDataType() { + return DoubleType.DOUBLE; + } + + @Override + public int getSize() { + return _size; + } + + @Override + public boolean isNullAt(int rowId) { + return false; + } + + @Override + public double getDouble(int rowId) { + //dense contiguous single block => index fits in int (getDenseBlockValues + //is only handed over for single-block dense matrices) + return (_dense != null) + ? _dense[(_rowStart + rowId) * _ncol + _col] + : _mb.get(_rowStart + rowId, _col); + } + + @Override + public void close() { + //nothing to release + } + } +} diff --git a/src/test/java/org/apache/sysds/test/component/io/DeltaMatrixReadWriteTest.java b/src/test/java/org/apache/sysds/test/component/io/DeltaMatrixReadWriteTest.java new file mode 100644 index 00000000000..54a3bcf6334 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/io/DeltaMatrixReadWriteTest.java @@ -0,0 +1,595 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.NoSuchElementException; +import java.util.Optional; + +import org.apache.commons.io.FileUtils; +import org.apache.sysds.common.Types.FileFormat; +import org.apache.sysds.conf.CompilerConfig; +import org.apache.sysds.conf.CompilerConfig.ConfigType; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.conf.DMLConfig; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.io.DeltaKernelUtils; +import org.apache.sysds.runtime.io.MatrixReader; +import org.apache.sysds.runtime.io.MatrixReaderFactory; +import org.apache.sysds.runtime.io.ReaderDelta; +import org.apache.sysds.runtime.io.ReaderDeltaParallel; +import org.apache.sysds.runtime.io.WriterDelta; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; + +/** + * Direct (no DML) round-trip tests for the native Delta Kernel based matrix + * reader/writer. Each test writes a MatrixBlock to a fresh local Delta table + * directory and reads it back, asserting dimensions and values match. + */ +public class DeltaMatrixReadWriteTest { + + //small writer target file size (bytes) used to force a multi-file table + //layout cheaply, instead of brute-forcing huge row counts. + private static final long SMALL_TARGET_FILE_SIZE = 256L * 1024; + private static final int ROWS_MULTI_FILE = 100_000; + + private static MatrixBlock writeThenRead(MatrixBlock in) throws Exception { + Path dir = Files.createTempDirectory("sysds_delta_"); + //WriterDelta creates the table at the given (empty) directory + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + WriterDelta writer = new WriterDelta(); + writer.writeMatrixToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + + MatrixReader reader = new ReaderDelta(); + return reader.readMatrixFromHDFS(tablePath, in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void parallelReadMatchesSerialMultiFile() throws Exception { + //force a multi-file table cheaply via a small writer target file size + //(rather than a huge row count), so the parallel per-file path is + //actually exercised rather than falling back to serial. + MatrixBlock in = TestUtils.generateTestMatrixBlock(ROWS_MULTI_FILE, 8, -10, 10, 1.0, 13); + in.recomputeNonZeros(); + + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(SMALL_TARGET_FILE_SIZE)); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_par_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new WriterDelta().writeMatrixToHDFS(in, tablePath, + in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + + //sanity: confirm the table really is split across multiple files + long files; + try( java.util.stream.Stream s = Files.walk(new File(tablePath).toPath()) ) { + files = s.filter(p -> p.toString().endsWith(".parquet")).count(); + } + assertTrue("expected a multi-file Delta table to exercise the parallel path, got " + files, + files > 1); + + MatrixBlock serial = new ReaderDelta() + .readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + MatrixBlock parallel = new ReaderDeltaParallel() + .readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + + assertEquals("rows", serial.getNumRows(), parallel.getNumRows()); + assertEquals("cols", serial.getNumColumns(), parallel.getNumColumns()); + assertEquals("nnz", serial.getNonZeros(), parallel.getNonZeros()); + TestUtils.compareMatrices(serial, parallel, 0, "serial-vs-parallel"); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void roundTripDenseSmall() throws Exception { + MatrixBlock in = new MatrixBlock(3, 4, false); + double v = 1.0; + for( int i=0; i<3; i++ ) + for( int j=0; j<4; j++ ) + in.set(i, j, v++); + in.recomputeNonZeros(); + + MatrixBlock out = writeThenRead(in); + assertEquals("rows", 3, out.getNumRows()); + assertEquals("cols", 4, out.getNumColumns()); + TestUtils.compareMatrices(in, out, 1e-12, "dense-small"); + } + + @Test + public void roundTripDenseRandom() throws Exception { + MatrixBlock in = TestUtils.generateTestMatrixBlock(500, 17, -10, 10, 1.0, 7); + MatrixBlock out = writeThenRead(in); + assertEquals("rows", in.getNumRows(), out.getNumRows()); + assertEquals("cols", in.getNumColumns(), out.getNumColumns()); + TestUtils.compareMatrices(in, out, 1e-12, "dense-random"); + } + + @Test + public void roundTripSparseRandom() throws Exception { + //values written are dense parquet, but exercise a sparse-ish input + MatrixBlock in = TestUtils.generateTestMatrixBlock(1200, 9, -5, 5, 0.1, 13); + in.recomputeNonZeros(); + MatrixBlock out = writeThenRead(in); + assertEquals("rows", in.getNumRows(), out.getNumRows()); + assertEquals("cols", in.getNumColumns(), out.getNumColumns()); + TestUtils.compareMatrices(in, out, 1e-12, "sparse-random"); + } + + @Test + public void roundTripMultiBatch() throws Exception { + //more rows than the writer batch size (4096) to exercise chunking + MatrixBlock in = TestUtils.generateTestMatrixBlock(10000, 5, 0, 100, 1.0, 1); + MatrixBlock out = writeThenRead(in); + assertEquals("rows", 10000, out.getNumRows()); + assertEquals("cols", 5, out.getNumColumns()); + TestUtils.compareMatrices(in, out, 1e-12, "multi-batch"); + } + + @Test + public void readDiscoversUnknownDimensions() throws Exception { + MatrixBlock in = TestUtils.generateTestMatrixBlock(123, 6, -1, 1, 1.0, 3); + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new WriterDelta().writeMatrixToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + //pass -1 dimensions: the reader must discover them from the table + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", 123, out.getNumRows()); + assertEquals("cols", 6, out.getNumColumns()); + TestUtils.compareMatrices(in, out, 1e-12, "unknown-dims"); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void emptyMatrixRoundTrip() throws Exception { + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new WriterDelta().writeEmptyMatrixToHDFS(tablePath, 0, 4, -1); + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", 0, out.getNumRows()); + assertEquals("cols", 4, out.getNumColumns()); + assertEquals("nnz", 0, out.getNonZeros()); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readNonDoubleNumericColumns() throws Exception { + //tables produced by external tools (or the frame writer) can carry + //long/int/boolean columns; the matrix reader must coerce them to double + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + double[] longVals = {1, -2, 1_000_000_000L, 0}; + double[] intVals = {7, -8, 123456, 0}; + double[] boolVals = {1, 0, 1, 0}; + writeTypedColumns(tablePath, + new DataType[] {LongType.LONG, IntegerType.INTEGER, BooleanType.BOOLEAN}, + new double[][] {longVals, intVals, boolVals}); + + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", 4, out.getNumRows()); + assertEquals("cols", 3, out.getNumColumns()); + for( int r=0; r<4; r++ ) { + assertEquals("long col r" + r, longVals[r], out.get(r, 0), 0.0); + assertEquals("int col r" + r, intVals[r], out.get(r, 1), 0.0); + assertEquals("bool col r" + r, boolVals[r], out.get(r, 2), 0.0); + } + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void rewriteSamePathReplacesData() throws Exception { + //writing to a path that already holds a Delta table must fully replace it + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + MatrixBlock first = TestUtils.generateTestMatrixBlock(50, 8, 0, 100, 1.0, 1); + new WriterDelta().writeMatrixToHDFS(first, tablePath, 50, 8, -1, first.getNonZeros()); + + //second write has different dimensions and values + MatrixBlock second = TestUtils.generateTestMatrixBlock(20, 3, -5, 5, 1.0, 2); + new WriterDelta().writeMatrixToHDFS(second, tablePath, 20, 3, -1, second.getNonZeros()); + + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", 20, out.getNumRows()); + assertEquals("cols", 3, out.getNumColumns()); + TestUtils.compareMatrices(second, out, 1e-12, "rewrite-replace"); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void parallelBufferedPathMatchesSerial() throws Exception { + //the direct fast path is always taken for SystemDS-written tables (exact + //row stats, no deletion vectors); force the buffered fallback to exercise + //its per-file decode + serial concatenation and assert it matches serial. + //force a multi-file table cheaply via a small writer target file size. + MatrixBlock in = TestUtils.generateTestMatrixBlock(ROWS_MULTI_FILE, 8, -10, 10, 1.0, 23); + in.recomputeNonZeros(); + + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(SMALL_TARGET_FILE_SIZE)); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_buf_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new WriterDelta().writeMatrixToHDFS(in, tablePath, + in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + + long files; + try( java.util.stream.Stream s = Files.walk(new File(tablePath).toPath()) ) { + files = s.filter(p -> p.toString().endsWith(".parquet")).count(); + } + assertTrue("expected a multi-file Delta table, got " + files, files > 1); + + MatrixBlock serial = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + //subclass that always declines the direct path -> readBuffered() + MatrixBlock buffered = new ReaderDeltaParallel() { + @Override protected boolean useDirectPath(DeltaKernelUtils.ScanHandle h) { return false; } + }.readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + + assertEquals("rows", serial.getNumRows(), buffered.getNumRows()); + assertEquals("cols", serial.getNumColumns(), buffered.getNumColumns()); + assertEquals("nnz", serial.getNonZeros(), buffered.getNonZeros()); + TestUtils.compareMatrices(serial, buffered, 0, "serial-vs-buffered"); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void writerTargetFileSizeConfigProducesMoreFiles() throws Exception { + //a smaller configured target file size must make the writer roll more + //data files for the same matrix (the lever the parallel reader relies on). + MatrixBlock in = TestUtils.generateTestMatrixBlock(400_000, 16, -10, 10, 1.0, 7); + in.recomputeNonZeros(); + + //isolate the override in a fresh thread-local config (restored in finally) + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(1L * 1024 * 1024)); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_cfg_"); + try { + assertEquals("config getter reflects the override", + 1L * 1024 * 1024, ConfigurationManager.getDeltaWriterTargetFileSize()); + + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + new WriterDelta().writeMatrixToHDFS(in, tablePath, + in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + long files; + try( java.util.stream.Stream s = Files.walk(new File(tablePath).toPath()) ) { + files = s.filter(p -> p.toString().endsWith(".parquet")).count(); + } + assertTrue("expected >1 data file with a 1MB target, got " + files, files > 1); + + //data still round-trips correctly with the custom layout + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + TestUtils.compareMatrices(in, out, 1e-12, "small-target-roundtrip"); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readerBatchSizeConfigRoundTrips() throws Exception { + //a non-default reader batch size must not change the result (more, smaller + //batches exercise the per-batch extract/concatenate loop more often). + MatrixBlock in = TestUtils.generateTestMatrixBlock(5000, 7, -10, 10, 1.0, 11); + //isolate the override in a fresh thread-local config (restored in finally) + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_READER_BATCH_SIZE, "128"); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_bs_"); + try { + assertEquals("config getter reflects the override", + 128, ConfigurationManager.getDeltaReaderBatchSize()); + + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + new WriterDelta().writeMatrixToHDFS(in, tablePath, + in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + TestUtils.compareMatrices(in, out, 1e-12, "small-batch-roundtrip"); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void factoryRoutesDeltaToParallelWhenEnabled() { + //the factory must pick the parallel reader iff parallel CP read is enabled + CompilerConfig cc = ConfigurationManager.getCompilerConfig(); + try { + cc.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, true); + ConfigurationManager.setLocalConfig(cc); + MatrixReader par = MatrixReaderFactory.createMatrixReader(FileFormat.DELTA); + assertTrue("expected ReaderDeltaParallel when parallel read enabled", + par instanceof ReaderDeltaParallel); + + cc.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, false); + ConfigurationManager.setLocalConfig(cc); + MatrixReader ser = MatrixReaderFactory.createMatrixReader(FileFormat.DELTA); + assertTrue("expected serial ReaderDelta when parallel read disabled", + ser instanceof ReaderDelta && !(ser instanceof ReaderDeltaParallel)); + } + finally { + ConfigurationManager.clearLocalConfigs(); + } + } + + @Test + public void readFloatColumnsCoercedToDouble() throws Exception { + //float columns must be widened to double on read (exact-representable values) + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + double[] f0 = {1.5, -2.25, 0.0, 1024.5}; + double[] f1 = {-0.5, 3.75, 100.125, -7.0}; + writeTypedColumns(tablePath, + new DataType[] {FloatType.FLOAT, FloatType.FLOAT}, + new double[][] {f0, f1}); + + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", 4, out.getNumRows()); + assertEquals("cols", 2, out.getNumColumns()); + for( int r=0; r<4; r++ ) { + assertEquals("f0 r" + r, f0[r], out.get(r, 0), 0.0); + assertEquals("f1 r" + r, f1[r], out.get(r, 1), 0.0); + } + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readShortByteColumnsCoercedToDouble() throws Exception { + //short/byte columns must be coerced to double on read, exercising the + //T_SHORT / T_BYTE branches of ReaderDelta.getDoubleValue. + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + double[] shortVals = {1, -2, 30000, 0}; + double[] byteVals = {7, -8, 120, 0}; + writeTypedColumns(tablePath, + new DataType[] {ShortType.SHORT, ByteType.BYTE}, + new double[][] {shortVals, byteVals}); + + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", 4, out.getNumRows()); + assertEquals("cols", 2, out.getNumColumns()); + for( int r=0; r<4; r++ ) { + assertEquals("short col r" + r, shortVals[r], out.get(r, 0), 0.0); + assertEquals("byte col r" + r, byteVals[r], out.get(r, 1), 0.0); + } + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void writerRejectsDimensionMismatch() throws Exception { + //WriterDelta validates that the passed rlen/clen match the MatrixBlock + //and rejects a mismatch with an IOException. + MatrixBlock in = TestUtils.generateTestMatrixBlock(10, 4, -1, 1, 1.0, 5); + in.recomputeNonZeros(); + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new WriterDelta().writeMatrixToHDFS(in, tablePath, 11, 4, -1, in.getNonZeros()); + fail("expected an IOException for mismatched matrix dimensions"); + } + catch(java.io.IOException ex) { + assertTrue("message should mention the dimension mismatch, got: " + ex.getMessage(), + ex.getMessage() != null && ex.getMessage().contains("dimensions mismatch")); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readNullCellsBecomeZero() throws Exception { + //nullable numeric columns with null cells must read back as 0.0 + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + double[] vals = {3.0, 7.0, 9.0, 11.0}; + boolean[] nulls = {false, true, false, true}; + writeNullableDoubleColumn(tablePath, vals, nulls); + + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", 4, out.getNumRows()); + assertEquals("cols", 1, out.getNumColumns()); + for( int r=0; r<4; r++ ) + assertEquals("r" + r, nulls[r] ? 0.0 : vals[r], out.get(r, 0), 0.0); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readStringColumnRejected() throws Exception { + //string columns cannot back an all-double matrix -> reader must reject them + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + writeStringColumn(tablePath, new String[] {"a", "b", "c"}); + try { + new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + fail("expected a DMLRuntimeException for a non-numeric (string) Delta column"); + } + catch(DMLRuntimeException ex) { + assertTrue("message should mention the non-numeric column, got: " + ex.getMessage(), + ex.getMessage() != null && ex.getMessage().contains("non-numeric")); + } + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + /** Writes a single-batch Delta table with one column per given (type, values) pair. */ + private static void writeTypedColumns(String tablePath, DataType[] types, double[][] vals) throws Exception { + Engine engine = DeltaKernelUtils.createEngine(); + StructType schema = new StructType(); + for( int c=0; c singleton(FilteredColumnarBatch fcb) { + return new CloseableIterator() { + private boolean _done = false; + @Override public boolean hasNext() { return !_done; } + @Override public FilteredColumnarBatch next() { + if( _done ) throw new NoSuchElementException(); + _done = true; + return fcb; + } + @Override public void close() {} + }; + } + + /** Minimal in-memory columnar batch backed by per-column double[] values, with + * an optional per-column null mask ({@code nulls==null} => no nulls). */ + private static class TypedBatch implements ColumnarBatch { + private final StructType _schema; + private final DataType[] _types; + private final double[][] _vals; + private final boolean[][] _nulls; + TypedBatch(StructType schema, DataType[] types, double[][] vals, boolean[][] nulls) { + _schema = schema; _types = types; _vals = vals; _nulls = nulls; + } + @Override public StructType getSchema() { return _schema; } + @Override public int getSize() { return _vals[0].length; } + @Override public ColumnVector getColumnVector(int ordinal) { + return new TypedVector(_types[ordinal], _vals[ordinal], + _nulls == null ? null : _nulls[ordinal]); + } + } + + /** Column view exposing a double[] as the requested Delta primitive type. */ + private static class TypedVector implements ColumnVector { + private final DataType _type; + private final double[] _vals; + private final boolean[] _nulls; + TypedVector(DataType type, double[] vals, boolean[] nulls) { _type = type; _vals = vals; _nulls = nulls; } + @Override public DataType getDataType() { return _type; } + @Override public int getSize() { return _vals.length; } + @Override public boolean isNullAt(int rowId) { return _nulls != null && _nulls[rowId]; } + @Override public double getDouble(int rowId) { return _vals[rowId]; } + @Override public float getFloat(int rowId) { return (float) _vals[rowId]; } + @Override public long getLong(int rowId) { return (long) _vals[rowId]; } + @Override public int getInt(int rowId) { return (int) _vals[rowId]; } + @Override public short getShort(int rowId) { return (short) _vals[rowId]; } + @Override public byte getByte(int rowId) { return (byte) _vals[rowId]; } + @Override public boolean getBoolean(int rowId) { return _vals[rowId] != 0; } + @Override public void close() {} + } + + /** Column view exposing a String[] as a Delta string column. */ + private static class StringVector implements ColumnVector { + private final String[] _vals; + StringVector(String[] vals) { _vals = vals; } + @Override public DataType getDataType() { return StringType.STRING; } + @Override public int getSize() { return _vals.length; } + @Override public boolean isNullAt(int rowId) { return _vals[rowId] == null; } + @Override public String getString(int rowId) { return _vals[rowId]; } + @Override public void close() {} + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/io/delta/DeltaReadWriteTest.java b/src/test/java/org/apache/sysds/test/functions/io/delta/DeltaReadWriteTest.java new file mode 100644 index 00000000000..a4013c3672d --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/delta/DeltaReadWriteTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.io.delta; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.HashMap; + +import org.apache.sysds.runtime.controlprogram.caching.CacheStatistics; +import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +/** + * End-to-end DML test of the native Delta read/write path. + * + *

The write and the read are run as two separate SystemDS executions + * on purpose. If they shared a single script/process, SystemDS would reuse the + * still-materialized in-memory matrix for the subsequent read and never invoke + * {@link org.apache.sysds.runtime.io.ReaderDelta} at all (verified: the cache + * reports 0 HDFS hits in that case). Splitting the executions forces a genuine + * read from disk, and we additionally assert via {@link CacheStatistics} that + * the read run actually performed HDFS reads (the Delta table + the text + * reference) rather than serving the matrix from cache.

+ */ +public class DeltaReadWriteTest extends AutomatedTestBase { + + private final static String TEST_DIR = "functions/io/delta/"; + private final static String TEST_CLASS_DIR = TEST_DIR + DeltaReadWriteTest.class.getSimpleName() + "/"; + private final static String WRITE_NAME = "DeltaWrite"; + private final static String READ_NAME = "DeltaReadCompare"; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(WRITE_NAME, + new TestConfiguration(TEST_CLASS_DIR, WRITE_NAME, new String[] { "ref" })); + addTestConfiguration(READ_NAME, + new TestConfiguration(TEST_CLASS_DIR, READ_NAME, new String[] { "R" })); + } + + @Test + public void testDenseRoundTrip() { + runDeltaRoundTrip(200, 12, 1.0); + } + + @Test + public void testSparseRoundTrip() { + runDeltaRoundTrip(640, 8, 0.2); + } + + @Test + public void testMultiBatchRoundTrip() { + runDeltaRoundTrip(9000, 4, 1.0); + } + + private void runDeltaRoundTrip(int rows, int cols, double sparsity) { + try { + String HOME = SCRIPT_DIR + TEST_DIR; + + // ---- phase 1: write the matrix as a Delta table + text reference ---- + getAndLoadTestConfiguration(WRITE_NAME); + String deltaPath = output("deltaTable"); + String refPath = output("ref"); + fullDMLScriptName = HOME + WRITE_NAME + ".dml"; + programArgs = new String[] { "-stats", "-args", + String.valueOf(rows), String.valueOf(cols), String.valueOf(sparsity), + deltaPath, refPath }; + runTest(true, false, null, -1); + + // the write run must have materialized two matrices to disk (the Delta + // table under test + the text reference); WriterDelta genuinely hitting + // HDFS is what produces these write-side cache statistics. + long hdfsWrites = CacheStatistics.getHDFSWrites(); + assertTrue("expected >= 2 HDFS writes in the write run (delta + reference), got " + + hdfsWrites, hdfsWrites >= 2); + // and a real Delta table (transaction log) must have been created + assertTrue("missing Delta transaction log under " + deltaPath, + new File(deltaPath, "_delta_log").isDirectory()); + + // ---- phase 2: fresh execution reads the Delta table and compares ---- + getAndLoadTestConfiguration(READ_NAME); + fullDMLScriptName = HOME + READ_NAME + ".dml"; + programArgs = new String[] { "-stats", "-args", + deltaPath, refPath, output("R") }; + runTest(true, false, null, -1); + + // the read run must have materialized two matrices from disk (the Delta + // table under test + the text reference); a cached/short-circuited read + // would report fewer HDFS hits and fail here. + long hdfsReads = CacheStatistics.getHDFSHits(); + assertTrue("expected >= 2 HDFS reads in the read run (delta + reference), got " + + hdfsReads, hdfsReads >= 2); + + HashMap R = readDMLMatrixFromOutputDir("R"); + //text-cell output omits exact zeros, so a missing cell means 0.0 + double diff = R.getOrDefault(new CellIndex(1, 1), 0.0); + double nrow = R.getOrDefault(new CellIndex(1, 2), 0.0); + double ncol = R.getOrDefault(new CellIndex(1, 3), 0.0); + + assertEquals("reconstruction error", 0.0, diff, 1e-12); + assertEquals("discovered rows", rows, (int) nrow); + assertEquals("discovered cols", cols, (int) ncol); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + } +} diff --git a/src/test/scripts/functions/io/delta/DeltaReadCompare.dml b/src/test/scripts/functions/io/delta/DeltaReadCompare.dml new file mode 100644 index 00000000000..5caf992f39c --- /dev/null +++ b/src/test/scripts/functions/io/delta/DeltaReadCompare.dml @@ -0,0 +1,34 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Reader side of the native Delta round-trip test. Reads the Delta table +# (dimensions discovered from the transaction log) and the text reference, +# both genuine HDFS reads in a fresh process, and reports the elementwise +# reconstruction error together with the discovered dimensions. + +Y = read($1, format="delta") +Xref = read($2, format="text") + +R = matrix(0, rows=1, cols=3) +R[1,1] = sum(abs(Xref - Y)) # 0 if ReaderDelta reconstructed the matrix exactly +R[1,2] = nrow(Y) # discovered row count +R[1,3] = ncol(Y) # discovered column count +write(R, $3) diff --git a/src/test/scripts/functions/io/delta/DeltaWrite.dml b/src/test/scripts/functions/io/delta/DeltaWrite.dml new file mode 100644 index 00000000000..41c5b0899a7 --- /dev/null +++ b/src/test/scripts/functions/io/delta/DeltaWrite.dml @@ -0,0 +1,30 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Writer side of the native Delta round-trip test. Generates a matrix and +# materializes it twice: once as a Delta table (under test) and once as a +# plain text reference. Running the read/compare in a SEPARATE process is +# intentional: it prevents SystemDS from short-circuiting the subsequent +# read against the still-in-memory matrix, so ReaderDelta is actually used. + +X = rand(rows=$1, cols=$2, min=-5, max=5, seed=7, sparsity=$3) +write(X, $4, format="delta") +write(X, $5, format="text") From f2c304dbe071c3c3c1ef2f19f17582d9c585a407 Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Sat, 27 Jun 2026 16:49:16 +0000 Subject: [PATCH 2/3] Harden Delta parallel matrix reader against bad row stats and slow nnz - Bound each per-file decode task in the direct parallel read path to its numRecords-derived row slice, so a Delta file that decodes more rows than its statistic claims fails with a clear error instead of overflowing into the next file's region (concurrent overlapping writes) or off the array. - Use the parallel recomputeNonZeros(_numThreads) in the buffered read path to match the direct path; the buffered fallback handles the largest matrices. --- .../sysds/runtime/io/ReaderDeltaParallel.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderDeltaParallel.java b/src/main/java/org/apache/sysds/runtime/io/ReaderDeltaParallel.java index e0e8c224bfd..c4224e8e4ea 100644 --- a/src/main/java/org/apache/sysds/runtime/io/ReaderDeltaParallel.java +++ b/src/main/java/org/apache/sysds/runtime/io/ReaderDeltaParallel.java @@ -122,12 +122,20 @@ private MatrixBlock readDirect(String fname, DeltaKernelUtils.ScanHandle handle, for( int i=0; i { int[] cur = new int[] {base}; Engine eng = DeltaKernelUtils.createEngine(); DeltaKernelUtils.readScanFile(eng, handle.scanState, handle.physicalReadSchema, scanFileRow, - (cols, size, selected) -> - cur[0] += extractBatchInto(cols, size, selected, types, ncol, dv, cur[0])); + (cols, size, selected) -> { + if( cur[0] + DeltaKernelUtils.countSelected(size, selected) > limit ) + throw new DMLRuntimeException("Delta file produced more rows than its " + + "numRecords statistic; refusing parallel direct read of " + fname); + cur[0] += extractBatchInto(cols, size, selected, types, ncol, dv, cur[0]); + }); return null; }); } @@ -181,7 +189,7 @@ private MatrixBlock readBuffered(String fname, DeltaKernelUtils.ScanHandle handl MatrixBlock ret = createOutputMatrixBlock(nrow, ncol, Math.max(nrow, 1), lestnnz, true, false); if( nrow > 0 && ncol > 0 ) fillDense(ret, ordered); - ret.recomputeNonZeros(); + ret.recomputeNonZeros(_numThreads); ret.examSparsity(); return ret; } From 55f5e92a236e46dfd3eb24fab340c883c9f16667 Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Sun, 28 Jun 2026 18:49:48 +0000 Subject: [PATCH 3/3] Add Delta cross-engine interop and error-path matrix tests - Add a test-only dependency on Delta's Spark connector (delta-spark) and a cross-engine interop suite: SystemDS-written tables are read back by the reference Delta/Spark engine, and Spark-written tables (multi-file, plus a deletion-vector + second-commit layout the SystemDS writer never emits) are read back by both the serial and parallel SystemDS readers. All comparisons are keyed by an id column since neither engine guarantees row order. - Add targeted coverage for the Delta read/write defensive branches the round-trip tests miss: unresolvable table paths, unsupported column types, unsupported stream operations, malformed/absent per-file statistics (mocked scan rows), the non-contiguous dense fill fallback, a failed parallel per-file decode, and the sparse-format writer input path. --- pom.xml | 11 + .../component/io/DeltaMatrixCoverageTest.java | 301 ++++++++++++++++++ .../io/DeltaMatrixSparkInteropTest.java | 271 ++++++++++++++++ 3 files changed, 583 insertions(+) create mode 100644 src/test/java/org/apache/sysds/test/component/io/DeltaMatrixCoverageTest.java create mode 100644 src/test/java/org/apache/sysds/test/component/io/DeltaMatrixSparkInteropTest.java diff --git a/pom.xml b/pom.xml index 44454361318..068bed2e8ea 100644 --- a/pom.xml +++ b/pom.xml @@ -1001,6 +1001,17 @@ parquet-common ${parquet.version} + + + io.delta + delta-spark_${scala.binary.version} + ${delta-kernel.version} + test + org.jcuda jcuda diff --git a/src/test/java/org/apache/sysds/test/component/io/DeltaMatrixCoverageTest.java b/src/test/java/org/apache/sysds/test/component/io/DeltaMatrixCoverageTest.java new file mode 100644 index 00000000000..8d7ad14539a --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/io/DeltaMatrixCoverageTest.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.conf.DMLConfig; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.DenseBlockLDRB; +import org.apache.sysds.runtime.data.DenseBlockLFP64; +import org.apache.sysds.runtime.io.DeltaKernelUtils; +import org.apache.sysds.runtime.io.ReaderDelta; +import org.apache.sysds.runtime.io.ReaderDeltaParallel; +import org.apache.sysds.runtime.io.WriterDelta; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.Row; +import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampType; + +/** + * Targeted tests for the error/defensive branches of the native Delta matrix + * read/write code that the round-trip and interop tests do not reach: malformed + * per-file statistics, unsupported column types, unsupported stream operations, + * bad table paths, and the non-dense writer input path. + * + *

A few of these branches guard against inputs that the SystemDS writer and + * the Delta Kernel scan API never produce in a normal round trip (e.g. a + * statistics JSON without {@code numRecords}, or a column type code outside the + * supported set). They are exercised here by mocking the Delta Kernel data + * objects and invoking the (package-private) helpers reflectively, rather than + * widening their production visibility purely for testing. + */ +public class DeltaMatrixCoverageTest { + + // --------------------------------------------------------------------- + // public defensive paths + // --------------------------------------------------------------------- + + @Test + public void qualifyRejectsUnknownFilesystemScheme() { + try { + DeltaKernelUtils.qualify("nosuchfs://host/path/to/table"); + fail("expected a DMLRuntimeException for an unresolvable table path"); + } + catch(DMLRuntimeException ex) { + assertTrue("message should reference the bad path, got: " + ex.getMessage(), + ex.getMessage() != null && ex.getMessage().contains("Delta table path")); + } + } + + @Test + public void typeCodeReturnsNegativeForUnsupportedTypes() { + //non-numeric / unsupported Delta types must map to the sentinel -1 so the + //reader can reject them with a clear message rather than mis-decoding. + assertEquals(-1, DeltaKernelUtils.typeCode(DateType.DATE)); + assertEquals(-1, DeltaKernelUtils.typeCode(TimestampType.TIMESTAMP)); + assertEquals(-1, DeltaKernelUtils.typeCode(BinaryType.BINARY)); + } + + @Test(expected = UnsupportedOperationException.class) + public void readerRejectsInputStream() throws Exception { + new ReaderDelta().readMatrixFromInputStream(null, 1, 1, -1, -1); + } + + @Test(expected = UnsupportedOperationException.class) + public void writerRejectsStreamWrite() throws Exception { + new WriterDelta().writeMatrixFromStream("dummy", null, 1, 1, -1); + } + + // --------------------------------------------------------------------- + // mocked / reflective coverage of internal defensive branches + // --------------------------------------------------------------------- + + @Test + public void numRecordsHandlesAbsentNullAndMalformedStats() throws Exception { + //no "stats" field at all -> -1 + assertEquals(-1, numRecords(addFileRow(new StructType().add("path", StringType.STRING), false, null))); + //stats column present but null-at -> -1 + assertEquals(-1, numRecords(addFileRow(statsSchema(), true, null))); + //stats string explicitly null -> -1 + assertEquals(-1, numRecords(addFileRow(statsSchema(), false, null))); + //malformed JSON -> JsonProcessingException -> -1 + assertEquals(-1, numRecords(addFileRow(statsSchema(), false, "{not valid json"))); + //valid JSON but no numRecords field -> -1 + assertEquals(-1, numRecords(addFileRow(statsSchema(), false, "{\"minValues\":{}}"))); + //well-formed stats -> the parsed count + assertEquals(1234L, numRecords(addFileRow(statsSchema(), false, "{\"numRecords\":1234}"))); + } + + @Test + public void getDoubleValueRejectsUnknownTypeCode() throws Exception { + Method m = ReaderDelta.class.getDeclaredMethod("getDoubleValue", ColumnVector.class, int.class, int.class); + m.setAccessible(true); + try { + //type code outside the supported T_* set; the switch default must throw + //before touching the (null) vector. + m.invoke(null, (ColumnVector) null, 0, 999); + fail("expected a DMLRuntimeException for an unsupported type code"); + } + catch(InvocationTargetException ite) { + assertTrue(ite.getCause() instanceof DMLRuntimeException); + } + } + + @Test + public void numericTypeCodeRejectsNonNumericType() throws Exception { + Method m = ReaderDelta.class.getDeclaredMethod("numericTypeCode", DataType.class, String.class); + m.setAccessible(true); + try { + m.invoke(null, DateType.DATE, "d"); + fail("expected a DMLRuntimeException for a non-numeric column type"); + } + catch(InvocationTargetException ite) { + assertTrue(ite.getCause() instanceof DMLRuntimeException); + } + } + + @Test + public void parallelReadWrapsFileFailure() throws Exception { + //a per-file decode failure in the parallel reader must surface as a single + //clear IOException (the awaitFileTasks catch), not a raw executor error. + //Provoke it by deleting one data file after the table (and its log) exist. + MatrixBlock in = TestUtils.generateTestMatrixBlock(100_000, 8, -10, 10, 1.0, 13); + in.recomputeNonZeros(); + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(256L * 1024)); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_fail_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new WriterDelta().writeMatrixToHDFS(in, tablePath, + in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + + //delete one parquet data file; the transaction log still references it, + //so the scan enumerates it but the decode task fails. + File victim; + try( java.util.stream.Stream s = Files.walk(new File(tablePath).toPath()) ) { + victim = s.filter(p -> p.toString().endsWith(".parquet")) + .findFirst().map(Path::toFile).orElse(null); + } + assertTrue("expected at least one data file to delete", victim != null && victim.delete()); + + try { + new ReaderDeltaParallel().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + fail("expected an IOException when a Delta data file is missing"); + } + catch(java.io.IOException ex) { + assertTrue("message should describe the failed parallel read, got: " + ex.getMessage(), + ex.getMessage() != null && ex.getMessage().contains("parallel read of Delta table")); + } + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + // --------------------------------------------------------------------- + // non-dense writer input path + // --------------------------------------------------------------------- + + @Test + public void sparseFormatMatrixRoundTrips() throws Exception { + //a sparse-backed MatrixBlock takes the writer's non-contiguous path (no + //direct double[] view), exercising MatrixColumnVector.get via MatrixBlock. + MatrixBlock in = TestUtils.generateTestMatrixBlock(2000, 7, -5, 5, 0.05, 13); + in.recomputeNonZeros(); + in.examSparsity(); + assertTrue("input should be in sparse format to exercise the non-dense path", in.isInSparseFormat()); + + Path dir = Files.createTempDirectory("sysds_delta_sparse_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new WriterDelta().writeMatrixToHDFS(in, tablePath, + in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", in.getNumRows(), out.getNumRows()); + assertEquals("cols", in.getNumColumns(), out.getNumColumns()); + TestUtils.compareMatrices(in, out, 1e-12, "sparse-format-roundtrip"); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void fillDenseHandlesNonContiguousBlock() throws Exception { + //the dense fill normally hits the contiguous fast path; force a multi-block + //(non-contiguous) dense block so the row-by-row fallback is exercised. Such + //blocks only arise for matrices beyond a single contiguous array, so we + //shrink the per-block allocation cap to provoke it on a tiny matrix. + int rows = 5, cols = 4; + int savedMaxAlloc = DenseBlockLDRB.MAX_ALLOC; + DenseBlock db; + try { + DenseBlockLDRB.MAX_ALLOC = 2 * cols; //~2 rows per block -> multiple blocks + db = new DenseBlockLFP64(new int[] {rows, cols}); + } + finally { + DenseBlockLDRB.MAX_ALLOC = savedMaxAlloc; + } + assertTrue("expected a non-contiguous (multi-block) dense block", !db.isContiguous()); + + //two row-major batches (3 rows + 2 rows) covering all 5 rows + double[] b0 = new double[3 * cols]; + double[] b1 = new double[2 * cols]; + for( int r = 0; r < 3; r++ ) + for( int c = 0; c < cols; c++ ) + b0[r * cols + c] = cell(r, c); + for( int r = 0; r < 2; r++ ) + for( int c = 0; c < cols; c++ ) + b1[r * cols + c] = cell(3 + r, c); + java.util.ArrayList batches = new java.util.ArrayList<>(); + batches.add(b0); + batches.add(b1); + + MatrixBlock ret = new MatrixBlock(rows, cols, db); + Method m = ReaderDelta.class.getDeclaredMethod("fillDense", MatrixBlock.class, java.util.ArrayList.class); + m.setAccessible(true); + m.invoke(null, ret, batches); + + for( int r = 0; r < rows; r++ ) + for( int c = 0; c < cols; c++ ) + assertEquals("r" + r + " c" + c, cell(r, c), ret.getDenseBlock().get(r, c), 0.0); + } + + private static double cell(int r, int c) { + return r * 10 + c; + } + + // --------------------------------------------------------------------- + // helpers + // --------------------------------------------------------------------- + + private static StructType statsSchema() { + return new StructType().add("stats", StringType.STRING); + } + + /** + * Build a mocked scan-file row whose AddFile child has the given schema, null + * flag and (when not null) stats string, matching what {@code numRecords} reads. + */ + private static Row addFileRow(StructType addSchema, boolean statsNull, String statsValue) { + Row outer = mock(Row.class); + Row add = mock(Row.class); + when(outer.getStruct(InternalScanFileUtils.ADD_FILE_ORDINAL)).thenReturn(add); + when(add.getSchema()).thenReturn(addSchema); + int statsOrd = addSchema.fieldNames().indexOf("stats"); + if( statsOrd >= 0 ) { + when(add.isNullAt(statsOrd)).thenReturn(statsNull); + if( !statsNull ) + when(add.getString(statsOrd)).thenReturn(statsValue); + } + return outer; //the scan-file row numRecords consumes (its AddFile child is 'add') + } + + private static long numRecords(Row scanFileRow) throws Exception { + Method m = DeltaKernelUtils.class.getDeclaredMethod("numRecords", Row.class); + m.setAccessible(true); + return (Long) m.invoke(null, scanFileRow); + } +} diff --git a/src/test/java/org/apache/sysds/test/component/io/DeltaMatrixSparkInteropTest.java b/src/test/java/org/apache/sysds/test/component/io/DeltaMatrixSparkInteropTest.java new file mode 100644 index 00000000000..2d79b79f2dd --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/io/DeltaMatrixSparkInteropTest.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.FileUtils; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.conf.DMLConfig; +import org.apache.sysds.runtime.io.ReaderDelta; +import org.apache.sysds.runtime.io.ReaderDeltaParallel; +import org.apache.sysds.runtime.io.WriterDelta; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Cross-engine interoperability tests for the native (Delta Kernel based) matrix + * reader/writer against the reference Delta implementation (Delta's Spark + * connector, {@code delta-spark}, pulled in test-only). + * + *

The other Delta matrix tests round-trip exclusively through SystemDS' own + * Kernel-based read/write paths, so they cannot catch a table that SystemDS + * writes in a way other Delta engines reject (or vice versa). These tests close + * that gap by routing data through two independent engines: + *

    + *
  • SystemDS writes -> Spark/Delta reads (our output is spec-compliant), and
  • + *
  • Spark/Delta writes -> SystemDS reads, including a multi-file layout and a + * table with deletion vectors / a second commit that the SystemDS writer + * never produces itself.
  • + *
+ * + *

Row order is never assumed: every table carries a unique id in column 0 and + * comparisons are keyed by that id, since neither engine guarantees row order + * across files. + */ +@net.jcip.annotations.NotThreadSafe +public class DeltaMatrixSparkInteropTest { + + private static SparkSession spark; + + @BeforeClass + public static void startSpark() { + //each test class runs in its own fork (surefire reuseForks=false), so this + //is the only SparkSession in the JVM and gets the Delta extensions injected. + SparkSession.clearActiveSession(); + SparkSession.clearDefaultSession(); + spark = SparkSession.builder() + .appName("sysds-delta-interop") + .master("local[2]") + .config("spark.ui.enabled", "false") + .config("spark.sql.shuffle.partitions", "2") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + if( spark != null ) + spark.stop(); + SparkSession.clearActiveSession(); + SparkSession.clearDefaultSession(); + spark = null; + } + + @Test + public void systemdsWriteSparkReadMultiFile() throws Exception { + //SystemDS writes a (forced) multi-file Delta table; the reference Delta + //engine (Spark) must read every data file back with matching values. + int rows = 500, cols = 5; + MatrixBlock in = indexedMatrix(rows, cols); + + //small target file size -> multiple parquet data files (exercise that an + //external reader stitches all of our data files, not just the first). + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(16L * 1024)); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_s2s_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new WriterDelta().writeMatrixToHDFS(in, tablePath, rows, cols, -1, in.getNonZeros()); + assertTrue("writer should have produced a multi-file table", countParquet(tablePath) > 1); + + Dataset df = spark.read().format("delta").load(tablePath); + assertEquals("rows", rows, df.count()); + assertEquals("cols", cols, df.schema().fields().length); + + List read = df.collectAsList(); + assertEquals(rows, read.size()); + for( Row r : read ) { + int id = (int) Math.round(r.getDouble(0)); + assertTrue("id in range: " + id, id >= 0 && id < rows); + for( int c = 0; c < cols; c++ ) + assertEquals("r" + id + " c" + c, in.get(id, c), r.getDouble(c), 1e-9); + } + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void sparkWriteSystemdsReadMultiFile() throws Exception { + //the reference Delta engine writes a multi-file table; both the serial and + //parallel SystemDS readers must reconstruct it (coercing long ids to double). + int rows = 600, cols = 4; + Dataset df = indexedDataFrame(rows, cols).repartition(3); //-> multiple data files + Path dir = Files.createTempDirectory("sysds_delta_p2s_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + df.write().format("delta").save(tablePath); + assertTrue("spark should have written a multi-file table", countParquet(tablePath) > 1); + + Map expected = expectedById(rows, cols); + assertMatchesById(new ReaderDelta() + .readMatrixFromHDFS(tablePath, -1, -1, -1, -1), expected, cols, "serial"); + assertMatchesById(new ReaderDeltaParallel() + .readMatrixFromHDFS(tablePath, -1, -1, -1, -1), expected, cols, "parallel"); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void sparkDeletionVectorsSystemdsRead() throws Exception { + //a Delta table with deletion vectors + a second commit (the DELETE) is a + //layout the SystemDS writer never emits; the readers must honor the DV and + //return only the surviving rows. This exercises the hasDeletionVector path. + int rows = 400, cols = 3, deleteBelow = 50; + Path dir = Files.createTempDirectory("sysds_delta_dv_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + //enable deletion vectors for tables created in this block, then delete a + //row range so Delta records a DV rather than rewriting the data files. + spark.conf().set(DV_DEFAULT, "true"); + indexedDataFrame(rows, cols).write().format("delta").save(tablePath); + spark.sql("DELETE FROM delta.`" + tablePath + "` WHERE c0 < " + deleteBelow); + + Map expected = expectedById(rows, cols); + expected.keySet().removeIf(id -> id < deleteBelow); + + MatrixBlock serial = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("surviving rows (serial)", rows - deleteBelow, serial.getNumRows()); + assertMatchesById(serial, expected, cols, "serial-dv"); + + MatrixBlock parallel = new ReaderDeltaParallel().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("surviving rows (parallel)", rows - deleteBelow, parallel.getNumRows()); + assertMatchesById(parallel, expected, cols, "parallel-dv"); + } + finally { + //fresh fork per test class, so simply clearing the override is enough + spark.conf().unset(DV_DEFAULT); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + private static final String DV_DEFAULT = + "spark.databricks.delta.properties.defaults.enableDeletionVectors"; + + /** Matrix whose column 0 is the row index and remaining columns are exact doubles. */ + private static MatrixBlock indexedMatrix(int rows, int cols) { + MatrixBlock mb = new MatrixBlock(rows, cols, false); + for( int r = 0; r < rows; r++ ) { + mb.set(r, 0, r); + for( int c = 1; c < cols; c++ ) + mb.set(r, c, value(r, c)); + } + mb.recomputeNonZeros(); + return mb; + } + + /** Spark DataFrame mirroring {@link #indexedMatrix} with columns c0..c(cols-1) as doubles. */ + private static Dataset indexedDataFrame(int rows, int cols) { + StructField[] fields = new StructField[cols]; + for( int c = 0; c < cols; c++ ) + fields[c] = DataTypes.createStructField("c" + c, DataTypes.DoubleType, false); + StructType schema = DataTypes.createStructType(fields); + + List data = new ArrayList<>(rows); + for( int r = 0; r < rows; r++ ) { + Object[] vals = new Object[cols]; + vals[0] = (double) r; + for( int c = 1; c < cols; c++ ) + vals[c] = value(r, c); + data.add(RowFactory.create(vals)); + } + return spark.createDataFrame(data, schema); + } + + /** Deterministic, exactly-representable cell value for (row,col), col>=1. */ + private static double value(int row, int col) { + return row * 0.5 - col; + } + + private static Map expectedById(int rows, int cols) { + Map exp = new HashMap<>(rows); + for( int r = 0; r < rows; r++ ) { + double[] row = new double[cols]; + row[0] = r; + for( int c = 1; c < cols; c++ ) + row[c] = value(r, c); + exp.put(r, row); + } + return exp; + } + + /** Asserts every row of {@code out} (keyed by its column-0 id) matches {@code expected}. */ + private static void assertMatchesById(MatrixBlock out, Map expected, int cols, String tag) { + assertEquals(tag + " rows", expected.size(), out.getNumRows()); + assertEquals(tag + " cols", cols, out.getNumColumns()); + boolean[] seen = new boolean[expected.size() == 0 ? 0 : maxId(expected) + 1]; + for( int r = 0; r < out.getNumRows(); r++ ) { + int id = (int) Math.round(out.get(r, 0)); + double[] exp = expected.get(id); + assertTrue(tag + ": unexpected/duplicate id " + id, exp != null && id < seen.length && !seen[id]); + seen[id] = true; + for( int c = 0; c < cols; c++ ) + assertEquals(tag + " id" + id + " c" + c, exp[c], out.get(r, c), 1e-9); + } + } + + private static int maxId(Map expected) { + int m = 0; + for( int id : expected.keySet() ) + m = Math.max(m, id); + return m; + } + + private static long countParquet(String tablePath) throws Exception { + try( java.util.stream.Stream s = Files.walk(new File(tablePath).toPath()) ) { + return s.filter(p -> p.toString().endsWith(".parquet")).count(); + } + } +}