diff --git a/.github/workflows/javaTests.yml b/.github/workflows/javaTests.yml index 747d8930cd3..0878c5d12ba 100644 --- a/.github/workflows/javaTests.yml +++ b/.github/workflows/javaTests.yml @@ -60,7 +60,7 @@ jobs: "org.apache.sysds.test.applications.**", "**.test.usertest.**", "**.component.c**.** -Dtest-threadCount=1 -Dtest-forkCount=1", - "**.component.e**.**,**.component.f**.**,**.component.m**.**,**.component.o**.**", + "**.component.e**.**,**.component.f**.**,**.component.i**.**,**.component.m**.**,**.component.o**.**", "**.component.p**.**,**.component.r**.**,**.component.s**.**,**.component.t**.**,**.component.u**.**", "**.functions.a**.**,**.functions.binary.matrix.**,**.functions.binary.scalar.**,**.functions.binary.tensor.**", "**.functions.blocks.**,**.functions.data.rand.**,", diff --git a/pom.xml b/pom.xml index cfd3d8464fb..44454361318 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,8 @@ 2.15.4 2.12.18 2.12 + 3.3.2 + 1.13.1 yyyy-MM-dd HH:mm:ss z 1 false @@ -968,6 +970,37 @@ + + + io.delta + delta-kernel-api + ${delta-kernel.version} + + + io.delta + delta-kernel-defaults + ${delta-kernel.version} + + + + org.apache.parquet + parquet-hadoop + ${parquet.version} + + + org.apache.parquet + parquet-column + ${parquet.version} + + + org.apache.parquet + parquet-common + ${parquet.version} + org.jcuda jcuda diff --git a/src/main/java/org/apache/sysds/common/Types.java b/src/main/java/org/apache/sysds/common/Types.java index c2832aeb8cd..624c9eed3c6 100644 --- a/src/main/java/org/apache/sysds/common/Types.java +++ b/src/main/java/org/apache/sysds/common/Types.java @@ -878,6 +878,7 @@ public enum FileFormat { HDF5, // Hierarchical Data Format (HDF) COG, // Cloud-optimized GeoTIFF PARQUET, // parquet format for columnar data storage + DELTA, // Delta Lake table (transaction log + parquet), read/written via Delta Kernel UNKNOWN; public boolean isIJV() { @@ -885,7 +886,7 @@ public boolean isIJV() { } public boolean isTextFormat() { - return this != BINARY && this != COMPRESSED; + return this != BINARY && this != COMPRESSED && this != DELTA; } public static boolean isTextFormat(String fmt) { diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java index 0d5ee888d86..4c6e511c18f 100644 --- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java +++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java @@ -258,6 +258,16 @@ public static int getFederatedTimeout(){ return getDMLConfig().getIntValue(DMLConfig.FEDERATED_TIMEOUT); } + /** @return rows per parquet read batch for the native Delta reader */ + public static int getDeltaReaderBatchSize() { + return getDMLConfig().getIntValue(DMLConfig.DELTA_READER_BATCH_SIZE); + } + + /** @return target data-file size (bytes) for the native Delta writer */ + public static long getDeltaWriterTargetFileSize() { + return Long.parseLong(getDMLConfig().getTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE)); + } + public static boolean isFederatedSSL(){ return getDMLConfig().getBooleanValue(DMLConfig.USE_SSL_FEDERATED_COMMUNICATION); } diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java index a6339656fb0..d1cb5337e20 100644 --- a/src/main/java/org/apache/sysds/conf/DMLConfig.java +++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java @@ -71,6 +71,16 @@ public class DMLConfig public static final String CP_PARALLEL_OPS = "sysds.cp.parallel.ops"; public static final String CP_PARALLEL_IO = "sysds.cp.parallel.io"; public static final String IO_COMPRESSION_CODEC = "sysds.io.compression.encoding"; + public static final String DELTA_READER_BATCH_SIZE = "sysds.io.delta.reader.batchsize"; // int: rows per parquet read batch + public static final String DELTA_WRITER_TARGET_FILE_SIZE = "sysds.io.delta.writer.targetfilesize"; // long: target data-file size in bytes (smaller -> more files -> more parallel-read throughput) + /** Default rows per parquet read batch (Delta Kernel default is 1024). A larger batch slightly + * reduces per-batch object/transform overhead; the dominant per-value decode cost is unaffected. */ + public static final int DELTA_READER_BATCH_SIZE_DEFAULT = 4096; + /** Default target data-file size for the Delta writer in bytes (Delta Kernel default is 128MB). The + * native Delta read decode is CPU-bound and parallelizes across data files, so writing smaller files + * lets a parallel reader use more threads. 64MB roughly doubles the file count (and read parallelism) + * versus the default with only a modest increase in per-file metadata overhead. */ + public static final long DELTA_WRITER_TARGET_FILE_SIZE_DEFAULT = 64L * 1024 * 1024; public static final String PARALLEL_ENCODE = "sysds.parallel.encode"; // boolean: enable multi-threaded transformencode and apply public static final String PARALLEL_ENCODE_STAGED = "sysds.parallel.encode.staged"; public static final String PARALLEL_ENCODE_APPLY_BLOCKS = "sysds.parallel.encode.applyBlocks"; @@ -158,6 +168,8 @@ public class DMLConfig _defaultVals.put(CP_PARALLEL_OPS, "true" ); _defaultVals.put(CP_PARALLEL_IO, "true" ); _defaultVals.put(IO_COMPRESSION_CODEC, "none"); + _defaultVals.put(DELTA_READER_BATCH_SIZE, String.valueOf(DELTA_READER_BATCH_SIZE_DEFAULT)); + _defaultVals.put(DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(DELTA_WRITER_TARGET_FILE_SIZE_DEFAULT)); _defaultVals.put(PARALLEL_TOKENIZE, "false"); _defaultVals.put(PARALLEL_TOKENIZE_NUM_BLOCKS, "64"); _defaultVals.put(FRAME_TO_MATRIX_WARN_CAST, "false"); diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java b/src/main/java/org/apache/sysds/parser/DMLTranslator.java index e14cfd31388..a8e1667d049 100644 --- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java +++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java @@ -1057,7 +1057,8 @@ public void constructHops(StatementBlock sb) { case CSV: case LIBSVM: case HDF5: - // write output in textcell format + case DELTA: + // columnar/text formats: no block layout (blocksize -1) ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(), -1); break; case BINARY: diff --git a/src/main/java/org/apache/sysds/parser/DataExpression.java b/src/main/java/org/apache/sysds/parser/DataExpression.java index 22dbe21c187..68a3d1b7ffe 100644 --- a/src/main/java/org/apache/sysds/parser/DataExpression.java +++ b/src/main/java/org/apache/sysds/parser/DataExpression.java @@ -1178,6 +1178,10 @@ else if( getVarParam(READNNZPARAM) != null ) { boolean isCOG = (formatTypeString != null && formatTypeString.equalsIgnoreCase(FileFormat.COG.toString())); + // Delta tables are self-describing (schema + dimensions discovered from the + // transaction log at read time), so dimensions are optional like CSV. + boolean isDelta = (formatTypeString != null && formatTypeString.equalsIgnoreCase(FileFormat.DELTA.toString())); + dataTypeString = (getVarParam(DATATYPEPARAM) == null) ? null : getVarParam(DATATYPEPARAM).toString(); if ( dataTypeString == null || dataTypeString.equalsIgnoreCase(Statement.MATRIX_DATA_TYPE) @@ -1202,8 +1206,8 @@ else if( getVarParam(READNNZPARAM) != null ) { // initialize size of target data identifier to UNKNOWN getOutput().setDimensions(-1, -1); - if (!isCSV && !isLIBSVM && !isHDF5 && !isCOG && ConfigurationManager.getCompilerConfig() - .getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) //skip check for csv/libsvm format / jmlc api + if (!isCSV && !isLIBSVM && !isHDF5 && !isCOG && !isDelta && ConfigurationManager.getCompilerConfig() + .getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) //skip check for csv/libsvm/delta format / jmlc api && (getVarParam(READROWPARAM) == null || getVarParam(READCOLPARAM) == null) ) { raiseValidateError("Missing or incomplete dimension information in read statement: " + mtdFileName, conditional, LanguageErrorCodes.INVALID_PARAMETERS); @@ -1215,7 +1219,7 @@ && getVarParam(READCOLPARAM) instanceof ConstIdentifier) // these are strings that are long values Long dim1 = (getVarParam(READROWPARAM) == null) ? null : Long.valueOf( getVarParam(READROWPARAM).toString()); Long dim2 = (getVarParam(READCOLPARAM) == null) ? null : Long.valueOf( getVarParam(READCOLPARAM).toString()); - if ( !isCSV && (dim1 < 0 || dim2 < 0) && ConfigurationManager + if ( !isCSV && !isDelta && (dim1 < 0 || dim2 < 0) && ConfigurationManager .getCompilerConfig().getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) ) { raiseValidateError("Invalid dimension information in read statement", conditional, LanguageErrorCodes.INVALID_PARAMETERS); } @@ -1333,7 +1337,8 @@ else if (valueTypeString.equalsIgnoreCase(ValueType.UNKNOWN.name())) } //validate read filename - if (getVarParam(FORMAT_TYPE) == null || FileFormat.isTextFormat(getVarParam(FORMAT_TYPE).toString())) + if (getVarParam(FORMAT_TYPE) == null || FileFormat.isTextFormat(getVarParam(FORMAT_TYPE).toString()) + || checkFormatType(FileFormat.DELTA)) //delta: columnar, no block layout getOutput().setBlocksize(-1); else if (checkFormatType(FileFormat.BINARY, FileFormat.COMPRESSED, FileFormat.UNKNOWN)) { if( getVarParam(ROWBLOCKCOUNTPARAM)!=null ) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java index 7151d87211c..219f954cc52 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java @@ -23,6 +23,7 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.Pair; +import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.FileFormat; import org.apache.sysds.common.Types.ValueType; @@ -203,9 +204,14 @@ protected FrameBlock readBlobFromHDFS(String fname, long[] dims) throws IOExcept .createFrameReader(iimd.getFileFormat(), getFileFormatProperties()) .readFrameFromHDFS(fname, lschema, dc.getRows(), dc.getCols()); - if(iimd.getFileFormat() == FileFormat.CSV) + //Delta and CSV discover dimensions (and Delta also schema) at read time, so + //refresh the cached metadata to reflect the materialized frame block. + if(iimd.getFileFormat() == FileFormat.CSV || iimd.getFileFormat() == FileFormat.DELTA) { _metaData = _metaData instanceof MetaDataFormat ? new MetaDataFormat(data.getDataCharacteristics(), iimd.getFileFormat()) : new MetaData(data.getDataCharacteristics()); + if(iimd.getFileFormat() == FileFormat.DELTA) + _schema = data.getSchema(); + } // sanity check correct output if(data == null) @@ -293,6 +299,9 @@ protected void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatPro FrameWriter writer = FrameWriterFactory.createFrameWriter(fmt, fprop); writer.writeFrameToHDFS(_data, fname, getNumRows(), getNumColumns()); + + if(DMLScript.STATISTICS) + CacheStatistics.incrementHDFSWrites(); } @Override diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java index 0b1a1ee27cb..5430a7a6c32 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java @@ -453,7 +453,8 @@ protected MatrixBlock readBlobFromHDFS(String fname, long[] dims) throws IOExcep DataConverter.readMatrixFromHDFS(fname, iimd.getFileFormat(), rlen, clen, blen, mc.getNonZeros(), getFileFormatProperties()); - if(iimd.getFileFormat() == FileFormat.CSV) { + if(iimd.getFileFormat() == FileFormat.CSV || iimd.getFileFormat() == FileFormat.DELTA) { + //dimensions/nnz are discovered at read time for these self-describing formats _metaData = _metaData instanceof MetaDataFormat ? new MetaDataFormat(newData.getDataCharacteristics(), iimd.getFileFormat()) : new MetaData(newData.getDataCharacteristics()); } diff --git a/src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java b/src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java new file mode 100644 index 00000000000..9485e4c9d54 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/DeltaKernelUtils.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.util.HDFSTool; + +import io.delta.kernel.DataWriteContext; +import io.delta.kernel.Operation; +import io.delta.kernel.Scan; +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.Transaction; +import io.delta.kernel.TransactionBuilder; +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.internal.ScanImpl; +import io.delta.kernel.internal.data.ScanStateRow; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterable; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.DataFileStatus; +import io.delta.kernel.utils.FileStatus; + +/** + * Shared helpers for the native (Spark-free) Delta Lake read/write paths used + * by both the matrix and frame readers/writers. Centralizes engine creation, + * path qualification, the scan loop (snapshot -> data files -> logical + * columnar batches, honoring deletion vectors), and the write transaction + * (logical data -> parquet -> commit). + */ +public class DeltaKernelUtils { + + private static final String ENGINE_INFO = "Apache SystemDS"; + + /** + * Consumes a whole columnar batch. {@code selected} is {@code null} when all + * {@code size} rows are live; otherwise {@code selected[r]} indicates whether + * row {@code r} survived the deletion/selection vector. Batch-level consumption + * lets callers extract data column-at-a-time (cache friendly, boxing free) + * instead of paying a per-row callback. + */ + @FunctionalInterface + public interface BatchConsumer { + void accept(ColumnVector[] cols, int size, boolean[] selected); + } + + /** Internal Delta column type codes shared by the matrix and frame readers to + * dispatch boxing-free primitive column access. */ + public static final int T_DOUBLE = 0; + public static final int T_FLOAT = 1; + public static final int T_LONG = 2; + public static final int T_INT = 3; + public static final int T_SHORT = 4; + public static final int T_BYTE = 5; + public static final int T_BOOLEAN = 6; + public static final int T_STRING = 7; + + /** + * Map a Delta Kernel {@link DataType} to an internal type code (see the + * {@code T_*} constants). Returned once per column so the per-cell read loop + * can switch on a primitive int instead of repeating {@code instanceof} checks. + * + * @param dt the Delta column data type + * @return the matching {@code T_*} code, or {@code -1} if the type is not supported + */ + public static int typeCode(DataType dt) { + if( dt instanceof DoubleType ) return T_DOUBLE; + if( dt instanceof FloatType ) return T_FLOAT; + if( dt instanceof LongType ) return T_LONG; + if( dt instanceof IntegerType ) return T_INT; + if( dt instanceof ShortType ) return T_SHORT; + if( dt instanceof ByteType ) return T_BYTE; + if( dt instanceof BooleanType ) return T_BOOLEAN; + if( dt instanceof StringType ) return T_STRING; + return -1; + } + + /** + * @param size number of rows in the batch + * @param selected per-row selection mask, or {@code null} if all rows are live + * @return the number of live rows in the batch + */ + public static int countSelected(int size, boolean[] selected) { + if(selected == null) + return size; + int n = 0; + for(int r = 0; r < size; r++) + if(selected[r]) + n++; + return n; + } + + private DeltaKernelUtils() {} + + /** Delta Kernel config key: number of rows per parquet read batch. Defaults from + * {@link org.apache.sysds.conf.DMLConfig#DELTA_READER_BATCH_SIZE_DEFAULT}, overridable via + * {@link org.apache.sysds.conf.DMLConfig#DELTA_READER_BATCH_SIZE}. */ + private static final String CONF_READER_BATCH_SIZE = "delta.kernel.default.parquet.reader.batch-size"; + /** Delta Kernel config key: target size (bytes) at which the writer rolls a new data file. Defaults from + * {@link org.apache.sysds.conf.DMLConfig#DELTA_WRITER_TARGET_FILE_SIZE_DEFAULT}, overridable via + * {@link org.apache.sysds.conf.DMLConfig#DELTA_WRITER_TARGET_FILE_SIZE}. */ + private static final String CONF_WRITER_TARGET_FILE_SIZE = "delta.kernel.default.parquet.writer.targetMaxFileSize"; + + //derived configuration cached to avoid copying the (large) base conf on every + //engine creation (createEngine is called once per data file in parallel reads); + //rebuilt whenever the base conf or the relevant SystemDS settings change. + private static Configuration cachedConf; + private static Configuration cachedConfBase; + private static int cachedBatchSize; + private static long cachedTargetFileSize; + + private static synchronized Configuration deltaConf() { + Configuration base = ConfigurationManager.getCachedJobConf(); + int batchSize = ConfigurationManager.getDeltaReaderBatchSize(); + long targetFileSize = ConfigurationManager.getDeltaWriterTargetFileSize(); + if(cachedConf == null || cachedConfBase != base + || cachedBatchSize != batchSize || cachedTargetFileSize != targetFileSize) + { + Configuration c = new Configuration(base); + c.setInt(CONF_READER_BATCH_SIZE, batchSize); + c.setLong(CONF_WRITER_TARGET_FILE_SIZE, targetFileSize); + cachedConf = c; + cachedConfBase = base; + cachedBatchSize = batchSize; + cachedTargetFileSize = targetFileSize; + } + return cachedConf; + } + + public static Engine createEngine() { + return DefaultEngine.create(deltaConf()); + } + + /** + * Resolve a (possibly relative) path to a fully-qualified URI so the + * kernel's default engine can locate the table on the right filesystem. + * + * @param fname input path + * @return fully-qualified table path + */ + public static String qualify(String fname) { + try { + Configuration conf = ConfigurationManager.getCachedJobConf(); + Path path = new Path(fname); + return path.getFileSystem(conf).makeQualified(path).toString(); + } + catch(IOException ex) { + throw new DMLRuntimeException("Failed to resolve Delta table path: " + fname, ex); + } + } + + /** + * Opened latest snapshot of a Delta table: the logical schema plus everything + * needed to (re)read its data files, including the list of per-data-file scan + * rows. Delta Kernel scan-file rows are self-contained (the kernel's + * distributed design serializes them to workers), so they can be retained and + * read independently / in parallel. + */ + public static final class ScanHandle { + public final StructType schema; + public final Row scanState; + public final StructType physicalReadSchema; + public final List scanFiles; + /** + * Per-file record counts taken from the Delta {@code numRecords} statistic, + * aligned with {@link #scanFiles}; {@code -1} where the statistic is absent. + */ + public final long[] numRecords; + /** + * Per-file flag indicating a deletion vector is present (so the live row + * count differs from {@link #numRecords}), aligned with {@link #scanFiles}. + */ + public final boolean[] hasDeletionVector; + + private ScanHandle(StructType schema, Row scanState, StructType physicalReadSchema, + List scanFiles, long[] numRecords, boolean[] hasDeletionVector) + { + this.schema = schema; + this.scanState = scanState; + this.physicalReadSchema = physicalReadSchema; + this.scanFiles = scanFiles; + this.numRecords = numRecords; + this.hasDeletionVector = hasDeletionVector; + } + + /** + * @return true iff every data file carries a {@code numRecords} statistic + * and none has a deletion vector, i.e. exact per-file row offsets + * can be derived from metadata without reading the data. + */ + public boolean hasExactRowCounts() { + for( int i=0; i scanFileIter = (scan instanceof ScanImpl) + ? ((ScanImpl) scan).getScanFiles(engine, true) + : scan.getScanFiles(engine); + + List files = new ArrayList<>(); + List recs = new ArrayList<>(); + List dvs = new ArrayList<>(); + try( CloseableIterator scanFiles = scanFileIter ) { + while( scanFiles.hasNext() ) { + FilteredColumnarBatch scanFileBatch = scanFiles.next(); + try( CloseableIterator scanFileRows = scanFileBatch.getRows() ) { + while( scanFileRows.hasNext() ) { + Row scanFileRow = scanFileRows.next(); + files.add(scanFileRow); + recs.add(numRecords(scanFileRow)); + dvs.add(InternalScanFileUtils.getDeletionVectorDescriptorFromRow(scanFileRow) != null); + } + } + } + } + long[] numRecords = new long[recs.size()]; + boolean[] hasDv = new boolean[dvs.size()]; + for( int i=0; i= '0' && stats.charAt(i) <= '9' ) { + val = val * 10 + (stats.charAt(i) - '0'); + any = true; + i++; + } + return any ? val : -1; + } + + /** + * Read a single Delta data file (identified by its scan-file row), decoding + * its parquet batches and applying any deletion vector, invoking the consumer + * once per (logical) batch. Safe to call concurrently for distinct files as + * long as each call uses its own {@code engine}. + * + * @param engine delta kernel engine + * @param scanState scan state from {@link #openScan} + * @param physicalReadSchema physical read schema from {@link #openScan} + * @param scanFileRow the data file's scan-file row + * @param consumer batch consumer + * @throws IOException on read failure + */ + public static void readScanFile(Engine engine, Row scanState, StructType physicalReadSchema, + Row scanFileRow, BatchConsumer consumer) throws IOException + { + FileStatus dataFile = InternalScanFileUtils.getAddFileStatus(scanFileRow); + CloseableIterator physicalData = engine.getParquetHandler() + .readParquetFiles(Utils.singletonCloseableIterator(dataFile), physicalReadSchema, Optional.empty()); + try( CloseableIterator logicalData = + Scan.transformPhysicalData(engine, scanState, scanFileRow, physicalData) ) + { + while( logicalData.hasNext() ) + consumeBatch(logicalData.next(), consumer); + } + } + + /** + * Scan the latest snapshot of a Delta table sequentially, invoking the batch + * consumer for every data batch. The consumer is created lazily from the table + * schema (so callers can size buffers / derive per-column types up front). + * + * @param engine delta kernel engine + * @param tablePath fully-qualified table path + * @param consumerFactory builds the batch consumer from the table schema + * @return the logical table schema + * @throws IOException on read failure + */ + public static StructType scan(Engine engine, String tablePath, Function consumerFactory) + throws IOException + { + ScanHandle h = openScan(engine, tablePath); + BatchConsumer consumer = consumerFactory.apply(h.schema); + for( Row scanFileRow : h.scanFiles ) + readScanFile(engine, h.scanState, h.physicalReadSchema, scanFileRow, consumer); + return h.schema; + } + + private static void consumeBatch(FilteredColumnarBatch fcb, BatchConsumer consumer) { + ColumnarBatch batch = fcb.getData(); + int ncol = batch.getSchema().length(); + ColumnVector[] cols = new ColumnVector[ncol]; + for( int c=0; c all rows live) + Optional selVector = fcb.getSelectionVector(); + boolean[] selected = null; + if( selVector.isPresent() ) { + ColumnVector sv = selVector.get(); + selected = new boolean[size]; + for( int r=0; r logicalData) throws IOException + { + //replace any existing table at the path (the other SystemDS writers delete + //the output first; the caching layer does not do it on our behalf) + HDFSTool.deleteFileIfExistOnHDFS(tablePath); + + Table table = Table.forPath(engine, tablePath); + TransactionBuilder txnBuilder = table + .createTransactionBuilder(engine, ENGINE_INFO, Operation.CREATE_TABLE) + .withSchema(engine, schema); + Transaction txn = txnBuilder.build(engine); + Row txnState = txn.getTransactionState(engine); + + CloseableIterator physicalData = + Transaction.transformLogicalData(engine, txnState, logicalData, Collections.emptyMap()); + DataWriteContext writeContext = + Transaction.getWriteContext(engine, txnState, Collections.emptyMap()); + CloseableIterator dataFiles = engine.getParquetHandler() + .writeParquetFiles(writeContext.getTargetDirectory(), physicalData, writeContext.getStatisticsColumns()); + CloseableIterator appendActions = + Transaction.generateAppendActions(engine, txnState, dataFiles, writeContext); + txn.commit(engine, CloseableIterable.inMemoryIterable(appendActions)); + } +} diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderDelta.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderDelta.java new file mode 100644 index 00000000000..9d5df380552 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderDelta.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; + +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.frame.data.columns.Array; +import org.apache.sysds.runtime.frame.data.columns.ArrayFactory; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +/** + * Single-threaded native Delta Lake reader for frames, built on the Spark-free + * Delta Kernel library. It opens the latest snapshot of a Delta table, reads + * its parquet data files through the kernel's default engine (honoring deletion + * vectors), and materializes the columns into a {@link FrameBlock} whose schema + * and column names are derived from the Delta table schema. + * + *

Data is extracted column-at-a-time into primitive arrays (no per-cell + * boxing or {@code FrameBlock.set} dispatch) and the frame is constructed + * directly from typed column {@link Array}s. Supported column types map to + * SystemDS value types: double, float, long, int, short, byte, boolean, and + * string. Neither the schema nor the dimensions need to be supplied; they are + * discovered from the table.

+ */ +public class FrameReaderDelta extends FrameReader { + + //per-column read codes (how to pull a value out of the Delta column vector); + //package visible so the parallel reader can reuse the same dispatch. + static final int R_DOUBLE = 0, R_FLOAT = 1, R_LONG = 2, R_INT = 3, + R_SHORT = 4, R_BYTE = 5, R_BOOLEAN = 6, R_STRING = 7; + + @Override + public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen) + throws IOException, DMLRuntimeException + { + Engine engine = DeltaKernelUtils.createEngine(); + String tablePath = DeltaKernelUtils.qualify(fname); + + //per-batch, per-column extracted arrays (boxing free) + ArrayList batchCols = new ArrayList<>(); + ArrayList batchSizes = new ArrayList<>(); + int[] nrowH = new int[1]; + ValueType[][] vtH = new ValueType[1][]; + String[][] nameH = new String[1][]; + int[][] readCodeH = new int[1][]; + + DeltaKernelUtils.scan(engine, tablePath, sch -> { + int ncol = sch.length(); + int[] readCode = new int[ncol]; + ValueType[] vt = new ValueType[ncol]; + String[] cnames = new String[ncol]; + for( int c=0; c { + int n = DeltaKernelUtils.countSelected(size, selected); + Object[] extracted = new Object[ncol]; + for( int c=0; c[] columns = new Array[ncol]; + for( int c=0; c buildColumn(ValueType vt, int nrow, ArrayList batchCols, + ArrayList batchSizes, int c) + { + switch( vt ) { + case FP64: { + double[] all = new double[nrow]; + int off = 0; + for( int b=0; bIt mirrors {@link ReaderDeltaParallel} (the matrix variant) but produces + * typed column {@link Array}s instead of a dense {@code double[]}. As with the + * matrix reader, the expensive part of a Delta read is the per-file parquet + * decode, so parallelizing across data files is the natural speedup. A table + * backed by a single data file cannot be split this way, so the reader + * transparently falls back to the sequential {@link FrameReaderDelta}.

+ */ +public class FrameReaderDeltaParallel extends FrameReaderDelta { + + private final int _numThreads; + + public FrameReaderDeltaParallel() { + _numThreads = OptimizerUtils.getParallelBinaryReadParallelism(); + } + + @Override + public FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen) + throws IOException, DMLRuntimeException + { + Engine engine = DeltaKernelUtils.createEngine(); + String tablePath = DeltaKernelUtils.qualify(fname); + DeltaKernelUtils.ScanHandle handle = DeltaKernelUtils.openScan(engine, tablePath); + + final int nfiles = handle.scanFiles.size(); + //nothing to gain from parallelism for single-file (or empty) tables + if( _numThreads <= 1 || nfiles <= 1 ) + return super.readFrameFromHDFS(fname, schema, names, rlen, clen); + + //derive per-column read codes, value types and names once from the schema + final int ncol = handle.schema.length(); + final int[] readCodes = new int[ncol]; + final ValueType[] vt = new ValueType[ncol]; + final String[] cnames = new String[ncol]; + for( int c=0; c pre-size + //one typed array per column and let each thread decode directly into its + //row offset (no intermediate buffers, no serial concatenation). + if( useDirectPath(handle) ) { + long total = 0; + for( long r : handle.numRecords ) + total += r; + if( total > 0 && total <= Integer.MAX_VALUE ) + return readDirect(fname, handle, ncol, readCodes, vt, cnames, (int) total); + } + + return readBuffered(fname, handle, ncol, readCodes, vt, cnames); + } + + /** + * Whether the metadata-driven direct-write fast path can be used for this + * table (exact per-file row counts and no deletion vectors). Visible for + * testing: the buffered fallback is otherwise only reachable for tables + * lacking row statistics or carrying deletion vectors, which the SystemDS + * Delta writer never produces. + * + * @param handle the opened scan handle + * @return true if the direct path is applicable + */ + protected boolean useDirectPath(DeltaKernelUtils.ScanHandle handle) { + return handle.hasExactRowCounts(); + } + + /** + * Fast path: each thread decodes one data file straight into the final typed + * column arrays at a metadata-derived row offset. Single allocation per + * column, fully parallel. + */ + private FrameBlock readDirect(String fname, DeltaKernelUtils.ScanHandle handle, + int ncol, int[] readCodes, ValueType[] vt, String[] cnames, int nrow) throws IOException + { + final int nfiles = handle.scanFiles.size(); + final int[] rowOffset = new int[nfiles]; + int acc = 0; + for( int i=0; i> tasks = new ArrayList<>(nfiles); + for( int i=0; i { + int[] cur = new int[] {base}; + Engine eng = DeltaKernelUtils.createEngine(); + DeltaKernelUtils.readScanFile(eng, handle.scanState, handle.physicalReadSchema, scanFileRow, + (cols, size, selected) -> { + for( int c=0; c f : pool.invokeAll(tasks) ) + f.get(); + } + catch(Exception ex) { + throw new IOException("Failed parallel read of Delta table: " + fname, ex); + } + finally { + pool.shutdown(); + } + + Array[] columns = new Array[ncol]; + for( int c=0; c[] fileCols = new ArrayList[nfiles]; + @SuppressWarnings("unchecked") + final ArrayList[] fileSizes = new ArrayList[nfiles]; + final ExecutorService pool = CommonThreadPool.get(_numThreads); + try { + ArrayList> tasks = new ArrayList<>(nfiles); + for( int i=0; i { + ArrayList bcs = new ArrayList<>(); + ArrayList bss = new ArrayList<>(); + Engine eng = DeltaKernelUtils.createEngine(); + DeltaKernelUtils.readScanFile(eng, handle.scanState, handle.physicalReadSchema, scanFileRow, + (cols, size, selected) -> { + int n = DeltaKernelUtils.countSelected(size, selected); + Object[] extracted = new Object[ncol]; + for( int c=0; c f : pool.invokeAll(tasks) ) + f.get(); + } + catch(Exception ex) { + throw new IOException("Failed parallel read of Delta table: " + fname, ex); + } + finally { + pool.shutdown(); + } + + //flatten the per-file batches in file order and concatenate per column + ArrayList batchCols = new ArrayList<>(); + ArrayList batchSizes = new ArrayList<>(); + int nrow = 0; + for( int i=0; i[] columns = new Array[ncol]; + for( int c=0; c createColumn(ValueType vt, Object full) { + switch( vt ) { + case FP64: return ArrayFactory.create((double[]) full); + case FP32: return ArrayFactory.create((float[]) full); + case INT64: return ArrayFactory.create((long[]) full); + case INT32: return ArrayFactory.create((int[]) full); + case BOOLEAN: return ArrayFactory.create((boolean[]) full); + default: return ArrayFactory.create((String[]) full); // STRING + } + } + + /** + * Decode the live (selected, after deletion vector) rows of one column batch + * directly into a pre-sized typed array starting at absolute row {@code destOff}. + * Null numeric cells keep the array default (0); string nulls are stored as null. + */ + private static void extractColumnInto(ColumnVector col, int size, boolean[] selected, + int readCode, Object dest, int destOff) + { + switch( readCode ) { + case R_DOUBLE: { + double[] a = (double[]) dest; + int lr = destOff; + for( int r=0; r[] cols = new Array[ncol]; + boolean[] nullable = new boolean[ncol]; + for( int c=0; c { + private final Array[] _cols; + private final boolean[] _nullable; + private final StructType _schema; + private final int _nrow; + private final int _ncol; + private int _pos = 0; + + FrameBatchIterator(Array[] cols, boolean[] nullable, StructType schema, int nrow, int ncol) { + _cols = cols; + _nullable = nullable; + _schema = schema; + _nrow = nrow; + _ncol = ncol; + } + + @Override + public boolean hasNext() { + return _pos < _nrow; + } + + @Override + public FilteredColumnarBatch next() { + if( !hasNext() ) + throw new NoSuchElementException(); + int size = Math.min(BATCH_ROWS, _nrow - _pos); + ColumnarBatch batch = new FrameColumnarBatch(_cols, _nullable, _schema, _pos, size, _ncol); + _pos += size; + return new FilteredColumnarBatch(batch, Optional.empty()); + } + + @Override + public void close() { + //nothing to release + } + } + + /** Read-only view of a row range of the frame columns as a Delta Kernel columnar batch. */ + private static class FrameColumnarBatch implements ColumnarBatch { + private final Array[] _cols; + private final boolean[] _nullable; + private final StructType _schema; + private final int _rowStart; + private final int _size; + private final int _ncol; + + FrameColumnarBatch(Array[] cols, boolean[] nullable, StructType schema, int rowStart, int size, int ncol) { + _cols = cols; + _nullable = nullable; + _schema = schema; + _rowStart = rowStart; + _size = size; + _ncol = ncol; + } + + @Override + public StructType getSchema() { + return _schema; + } + + @Override + public ColumnVector getColumnVector(int ordinal) { + if( ordinal < 0 || ordinal >= _ncol ) + throw new IndexOutOfBoundsException("column ordinal " + ordinal); + return new FrameColumnVector(_cols[ordinal], _nullable[ordinal], + _schema.at(ordinal).getDataType(), _rowStart, _size); + } + + @Override + public int getSize() { + return _size; + } + } + + /** + * Read-only typed column view over one column {@link Array} row range. Numeric + * values are read through {@link Array#getAsDouble(int)} to avoid boxing, and + * non-nullable columns short-circuit {@code isNullAt} so the kernel never pays + * for a redundant boxed fetch. + */ + private static class FrameColumnVector implements ColumnVector { + private final Array _col; + private final boolean _nullable; + private final DataType _type; + private final int _rowStart; + private final int _size; + + FrameColumnVector(Array col, boolean nullable, DataType type, int rowStart, int size) { + _col = col; + _nullable = nullable; + _type = type; + _rowStart = rowStart; + _size = size; + } + + @Override + public DataType getDataType() { + return _type; + } + + @Override + public int getSize() { + return _size; + } + + @Override + public boolean isNullAt(int rowId) { + return _nullable && _col.get(_rowStart + rowId) == null; + } + + @Override + public String getString(int rowId) { + Object v = _col.get(_rowStart + rowId); + return (v == null) ? null : v.toString(); + } + + @Override + public boolean getBoolean(int rowId) { + return _col.getAsDouble(_rowStart + rowId) != 0; + } + + @Override + public double getDouble(int rowId) { + return _col.getAsDouble(_rowStart + rowId); + } + + @Override + public float getFloat(int rowId) { + return (float) _col.getAsDouble(_rowStart + rowId); + } + + @Override + public long getLong(int rowId) { + //exact for INT64 (getAsDouble would lose precision beyond 2^53) + return ((Number) _col.get(_rowStart + rowId)).longValue(); + } + + @Override + public int getInt(int rowId) { + return (int) _col.getAsDouble(_rowStart + rowId); + } + + @Override + public void close() { + //nothing to release + } + } +} diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java b/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java index 3fb3968c96f..ff38eb395dd 100644 --- a/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java +++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterFactory.java @@ -50,6 +50,8 @@ public static FrameWriter createFrameWriter(FileFormat fmt, FileFormatProperties return binaryParallel ? new FrameWriterBinaryBlockParallel() : new FrameWriterBinaryBlock(); case PROTO: return new FrameWriterProto(); + case DELTA: + return new FrameWriterDelta(); default: throw new DMLRuntimeException("Failed to create frame writer for unknown format: " + fmt.toString()); } diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java b/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java index dc1c7da230c..e10d358d629 100644 --- a/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java +++ b/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java @@ -78,7 +78,11 @@ public static MatrixReader createMatrixReader(FileFormat fmt) { case COMPRESSED: reader = ReaderCompressed.create(); break; - + + case DELTA: + reader = par ? new ReaderDeltaParallel() : new ReaderDelta(); + break; + default: throw new DMLRuntimeException("Failed to create matrix reader for unknown format: " + fmt.toString()); } @@ -140,6 +144,11 @@ public static MatrixReader createMatrixReader( ReadProperties props ) { case COMPRESSED: reader = new ReaderCompressed(); break; + + case DELTA: + reader = par ? new ReaderDeltaParallel() : new ReaderDelta(); + break; + default: throw new DMLRuntimeException("Failed to create matrix reader for unknown format: " + fmt.toString()); } diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java b/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java index bb0b0c940f7..091194edc81 100644 --- a/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java +++ b/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java @@ -94,6 +94,9 @@ else if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE case COMPRESSED: return WriterCompressed.create(props); + case DELTA: + return new WriterDelta(); + default: throw new DMLRuntimeException("Failed to create matrix writer for unknown format: " + fmt.toString()); } diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderDelta.java b/src/main/java/org/apache/sysds/runtime/io/ReaderDelta.java new file mode 100644 index 00000000000..5d987e9a2a4 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/ReaderDelta.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; + +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.StructType; + +/** + * Single-threaded native Delta Lake reader for matrices, built on the + * Spark-free Delta Kernel library. It opens the latest snapshot of a Delta + * table directory, reads its parquet data files through the kernel's default + * engine (honoring deletion vectors), and materializes the numeric columns + * into a dense {@link MatrixBlock}. + * + *

Only numeric columns (double/float/long/int/short/byte/boolean) are + * supported, matching the all-double nature of a SystemDS matrix. Dimensions + * do not need to be known up front: the row count is discovered while scanning + * and the column count is taken from the table schema.

+ */ +public class ReaderDelta extends MatrixReader { + + @Override + public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int blen, long estnnz) + throws IOException, DMLRuntimeException + { + Engine engine = DeltaKernelUtils.createEngine(); + String tablePath = DeltaKernelUtils.qualify(fname); + + //Scan column-at-a-time into one row-major buffer per batch (no per-row + //allocation, no boxing, no per-cell set()). Buffers are concatenated into + //the dense output via bulk array copies below. + ArrayList batches = new ArrayList<>(); + int[] nrowH = new int[1]; + StructType schema = DeltaKernelUtils.scan(engine, tablePath, sch -> { + int[] types = columnTypes(sch); + int ncol = sch.length(); + return (cols, size, selected) -> { + batches.add(extractBatch(cols, size, selected, types, ncol)); + nrowH[0] += DeltaKernelUtils.countSelected(size, selected); + }; + }); + + int ncol = schema.length(); + int nrow = nrowH[0]; + long lestnnz = (estnnz >= 0) ? estnnz : (long) nrow * ncol; + MatrixBlock ret = createOutputMatrixBlock(nrow, ncol, Math.max(nrow, 1), lestnnz, true, false); + + if( nrow > 0 && ncol > 0 ) + fillDense(ret, batches); + ret.recomputeNonZeros(); + ret.examSparsity(); + return ret; + } + + /** Derive the per-column internal type codes from the table schema. */ + static int[] columnTypes(StructType schema) { + int ncol = schema.length(); + int[] types = new int[ncol]; + for( int c=0; c batches) { + DenseBlock db = ret.getDenseBlock(); + if( db.isContiguous() ) { + double[] dv = db.valuesAt(0); + int off = 0; + for( double[] buf : batches ) { + System.arraycopy(buf, 0, dv, off, buf.length); + off += buf.length; + } + } + else { + //rare large multi-block fallback: route each row through the block API + int ncol = ret.getNumColumns(); + int r = 0; + for( double[] buf : batches ) { + int rowsInBuf = buf.length / ncol; + for( int i=0; iThe expensive part of a Delta read is the parquet decode, which the kernel + * performs per data file; parallelizing across files is therefore the natural + * way to bridge the gap to the (near-raw) binary reader. A table backed by a + * single data file (the default for tables <= the parquet target file size) + * cannot be split this way, so the reader transparently falls back to the + * sequential {@link ReaderDelta} path in that case.

+ */ +public class ReaderDeltaParallel extends ReaderDelta { + + private final int _numThreads; + + public ReaderDeltaParallel() { + _numThreads = OptimizerUtils.getParallelBinaryReadParallelism(); + } + + @Override + public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int blen, long estnnz) + throws IOException, DMLRuntimeException + { + Engine engine = DeltaKernelUtils.createEngine(); + String tablePath = DeltaKernelUtils.qualify(fname); + DeltaKernelUtils.ScanHandle handle = DeltaKernelUtils.openScan(engine, tablePath); + + final int nfiles = handle.scanFiles.size(); + //nothing to gain from parallelism for single-file (or empty) tables + if( _numThreads <= 1 || nfiles <= 1 ) + return super.readMatrixFromHDFS(fname, rlen, clen, blen, estnnz); + + final int ncol = handle.schema.length(); + final int[] types = columnTypes(handle.schema); + + //fast path: exact per-file row counts are known from metadata and the dense + //output fits a single contiguous array -> pre-size once and let each thread + //decode directly into its slice (no intermediate buffers, no serial copy). + if( useDirectPath(handle) ) { + long total = 0; + for( long r : handle.numRecords ) + total += r; + if( total > 0 && (long) total * ncol <= Integer.MAX_VALUE ) + return readDirect(fname, handle, ncol, types, (int) total, estnnz); + } + + return readBuffered(fname, handle, ncol, types, estnnz); + } + + /** + * Whether the metadata-driven direct-write fast path can be used for this + * table (exact per-file row counts and no deletion vectors). Visible for + * testing: the buffered fallback is otherwise only reachable for tables + * lacking row statistics or carrying deletion vectors, which the SystemDS + * Delta writer never produces. + * + * @param handle the opened scan handle + * @return true if the direct path is applicable + */ + protected boolean useDirectPath(DeltaKernelUtils.ScanHandle handle) { + return handle.hasExactRowCounts(); + } + + /** + * Fast path: each thread decodes one data file straight into the final dense + * array at a metadata-derived row offset. Single allocation, fully parallel. + */ + private MatrixBlock readDirect(String fname, DeltaKernelUtils.ScanHandle handle, + int ncol, int[] types, int nrow, long estnnz) throws IOException + { + final int nfiles = handle.scanFiles.size(); + final int[] rowOffset = new int[nfiles]; + int acc = 0; + for( int i=0; i> tasks = new ArrayList<>(nfiles); + for( int i=0; i { + int[] cur = new int[] {base}; + Engine eng = DeltaKernelUtils.createEngine(); + DeltaKernelUtils.readScanFile(eng, handle.scanState, handle.physicalReadSchema, scanFileRow, + (cols, size, selected) -> + cur[0] += extractBatchInto(cols, size, selected, types, ncol, dv, cur[0])); + return null; + }); + } + for( Future f : pool.invokeAll(tasks) ) + f.get(); + } + catch(Exception ex) { + throw new IOException("Failed parallel read of Delta table: " + fname, ex); + } + finally { + pool.shutdown(); + } + + ret.recomputeNonZeros(_numThreads); + ret.examSparsity(); + return ret; + } + + /** + * Fallback path: decode each file in parallel into per-file buffers (used when + * row counts are unknown, deletion vectors are present, or the matrix exceeds a + * single contiguous array), then concatenate in file order. + */ + private MatrixBlock readBuffered(String fname, DeltaKernelUtils.ScanHandle handle, + int ncol, int[] types, long estnnz) throws IOException + { + final int nfiles = handle.scanFiles.size(); + @SuppressWarnings("unchecked") + final ArrayList[] fileBufs = new ArrayList[nfiles]; + final int[] fileRows = new int[nfiles]; + final ExecutorService pool = CommonThreadPool.get(_numThreads); + try { + ArrayList> tasks = new ArrayList<>(nfiles); + for( int i=0; i { + ArrayList bufs = new ArrayList<>(); + int[] rows = new int[1]; + Engine eng = DeltaKernelUtils.createEngine(); + DeltaKernelUtils.readScanFile(eng, handle.scanState, handle.physicalReadSchema, scanFileRow, + (cols, size, selected) -> { + bufs.add(extractBatch(cols, size, selected, types, ncol)); + rows[0] += DeltaKernelUtils.countSelected(size, selected); + }); + fileBufs[fi] = bufs; + fileRows[fi] = rows[0]; + return null; + }); + } + for( Future f : pool.invokeAll(tasks) ) + f.get(); + } + catch(Exception ex) { + throw new IOException("Failed parallel read of Delta table: " + fname, ex); + } + finally { + pool.shutdown(); + } + + int nrow = 0; + for( int i=0; i ordered = new ArrayList<>(); + for( int i=0; i= 0) ? estnnz : (long) nrow * ncol; + MatrixBlock ret = createOutputMatrixBlock(nrow, ncol, Math.max(nrow, 1), lestnnz, true, false); + if( nrow > 0 && ncol > 0 ) + fillDense(ret, ordered); + ret.recomputeNonZeros(); + ret.examSparsity(); + return ret; + } +} diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterDelta.java b/src/main/java/org/apache/sysds/runtime/io/WriterDelta.java new file mode 100644 index 00000000000..af2aa8258ed --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/io/WriterDelta.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.io; + +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.Optional; + +import org.apache.sysds.runtime.instructions.ooc.OOCStream; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterable; +import io.delta.kernel.utils.CloseableIterator; + +/** + * Single-threaded native Delta Lake writer for matrices, built on the + * Spark-free Delta Kernel library. It creates a Delta table at the target + * directory with an all-double schema {@code c0..c(n-1)} (replacing any existing + * table at that path), streams the {@link MatrixBlock} rows as columnar batches + * into parquet data files via the kernel's default engine, and commits the + * corresponding add-file actions to the transaction log. + */ +public class WriterDelta extends MatrixWriter { + + /** Number of matrix rows materialized per columnar batch handed to the engine. */ + private static final int BATCH_ROWS = 4096; + + @Override + public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int blen, long nnz, boolean diag) + throws IOException + { + if( src.getNumRows() != rlen || src.getNumColumns() != clen ) + throw new IOException("Matrix dimensions mismatch with metadata: (" + + src.getNumRows() + "x" + src.getNumColumns() + ") vs (" + rlen + "x" + clen + ")."); + int ncol = (int) clen; + int nrow = (int) rlen; + //fast path: a contiguous dense block lets the column views read straight + //from the backing double[] (avoids per-cell MatrixBlock.get dispatch). + double[] dense = (!src.isInSparseFormat() && src.getDenseBlock() != null + && src.getDenseBlock().isContiguous()) ? src.getDenseBlockValues() : null; + Engine engine = DeltaKernelUtils.createEngine(); + DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(fname), + buildSchema(ncol), new MatrixBatchIterator(src, dense, nrow, ncol)); + } + + @Override + public void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, int blen) + throws IOException + { + //empty table: create with schema but no data files + Engine engine = DeltaKernelUtils.createEngine(); + DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(fname), + buildSchema((int) clen), CloseableIterable.emptyIterable().iterator()); + } + + private static StructType buildSchema(int ncol) { + StructType schema = new StructType(); + for( int c=0; c stream, long rlen, long clen, int blen) + throws IOException + { + throw new UnsupportedOperationException("Out-of-core stream write is not supported for the Delta format."); + } + + /** Chunks a MatrixBlock into fixed-size columnar batches for the kernel write path. */ + private static class MatrixBatchIterator implements CloseableIterator { + private final MatrixBlock _mb; + private final double[] _dense; + private final int _nrow; + private final int _ncol; + private final StructType _schema; + private int _pos = 0; + + MatrixBatchIterator(MatrixBlock mb, double[] dense, int nrow, int ncol) { + _mb = mb; + _dense = dense; + _nrow = nrow; + _ncol = ncol; + _schema = buildSchema(ncol); + } + + @Override + public boolean hasNext() { + return _pos < _nrow; + } + + @Override + public FilteredColumnarBatch next() { + if( !hasNext() ) + throw new NoSuchElementException(); + int size = Math.min(BATCH_ROWS, _nrow - _pos); + ColumnarBatch batch = new MatrixColumnarBatch(_mb, _dense, _schema, _pos, size, _ncol); + _pos += size; + //no selection vector: all rows in the batch are written + return new FilteredColumnarBatch(batch, Optional.empty()); + } + + @Override + public void close() { + //nothing to release + } + } + + /** Read-only view of a row range of a MatrixBlock as a Delta Kernel columnar batch. */ + private static class MatrixColumnarBatch implements ColumnarBatch { + private final MatrixBlock _mb; + private final double[] _dense; + private final StructType _schema; + private final int _rowStart; + private final int _size; + private final int _ncol; + + MatrixColumnarBatch(MatrixBlock mb, double[] dense, StructType schema, int rowStart, int size, int ncol) { + _mb = mb; + _dense = dense; + _schema = schema; + _rowStart = rowStart; + _size = size; + _ncol = ncol; + } + + @Override + public StructType getSchema() { + return _schema; + } + + @Override + public ColumnVector getColumnVector(int ordinal) { + if( ordinal < 0 || ordinal >= _ncol ) + throw new IndexOutOfBoundsException("column ordinal " + ordinal); + return new MatrixColumnVector(_mb, _dense, _rowStart, _size, _ncol, ordinal); + } + + @Override + public int getSize() { + return _size; + } + } + + /** Read-only double column view over one column of a MatrixBlock row range. */ + private static class MatrixColumnVector implements ColumnVector { + private final MatrixBlock _mb; + private final double[] _dense; // contiguous dense backing array, or null + private final int _rowStart; + private final int _size; + private final int _ncol; + private final int _col; + + MatrixColumnVector(MatrixBlock mb, double[] dense, int rowStart, int size, int ncol, int col) { + _mb = mb; + _dense = dense; + _rowStart = rowStart; + _size = size; + _ncol = ncol; + _col = col; + } + + @Override + public DataType getDataType() { + return DoubleType.DOUBLE; + } + + @Override + public int getSize() { + return _size; + } + + @Override + public boolean isNullAt(int rowId) { + return false; + } + + @Override + public double getDouble(int rowId) { + //dense contiguous single block => index fits in int (getDenseBlockValues + //is only handed over for single-block dense matrices) + return (_dense != null) + ? _dense[(_rowStart + rowId) * _ncol + _col] + : _mb.get(_rowStart + rowId, _col); + } + + @Override + public void close() { + //nothing to release + } + } +} diff --git a/src/test/java/org/apache/sysds/test/component/io/DeltaFrameReadWriteTest.java b/src/test/java/org/apache/sysds/test/component/io/DeltaFrameReadWriteTest.java new file mode 100644 index 00000000000..1c1d33a80d6 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/io/DeltaFrameReadWriteTest.java @@ -0,0 +1,624 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.Random; + +import org.apache.commons.io.FileUtils; +import org.apache.sysds.common.Types.FileFormat; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.conf.CompilerConfig; +import org.apache.sysds.conf.CompilerConfig.ConfigType; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.conf.DMLConfig; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.io.DeltaKernelUtils; +import org.apache.sysds.runtime.io.FrameReader; +import org.apache.sysds.runtime.io.FrameReaderDelta; +import org.apache.sysds.runtime.io.FrameReaderDeltaParallel; +import org.apache.sysds.runtime.io.FrameReaderFactory; +import org.apache.sysds.runtime.io.FrameWriterDelta; +import org.junit.Test; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; + +/** + * Direct (no DML) round-trip tests for the native Delta Kernel based frame + * reader/writer. Each test writes a FrameBlock to a fresh local Delta table + * directory and reads it back, asserting the discovered schema, column names, + * dimensions, and per-cell values match. Several tests additionally assert that + * the parallel reader ({@link FrameReaderDeltaParallel}) agrees with the serial + * reader cell-for-cell across a multi-file table (both its direct and buffered + * paths). + */ +public class DeltaFrameReadWriteTest { + + //nonsense schema/dims handed to the reader to confirm it discovers everything + private static final ValueType[] NO_SCHEMA = new ValueType[] {ValueType.STRING}; + private static final String[] NO_NAMES = new String[] {"x"}; + + //small target file size + enough random rows so the writer rolls multiple + //data files, exercising the per-file parallel read path rather than the + //single-file serial fallback. + private static final long SMALL_TARGET_FILE_SIZE = 512L * 1024; + private static final int ROWS_MULTI_FILE = 150_000; + + private static FrameBlock writeThenRead(FrameBlock in) throws Exception { + Path dir = Files.createTempDirectory("sysds_delta_frame_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new FrameWriterDelta().writeFrameToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns()); + //pass nonsense schema/dims: the reader must discover everything from the table + return new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + private static FrameBlock alloc(ValueType[] schema, String[] names, int nrow) { + FrameBlock fb = new FrameBlock(schema, names); + fb.ensureAllocatedColumns(nrow); + return fb; + } + + @Test + public void roundTripMixedTypes() throws Exception { + ValueType[] schema = {ValueType.STRING, ValueType.INT64, ValueType.FP64, + ValueType.BOOLEAN, ValueType.INT32, ValueType.FP32}; + String[] names = {"name", "id", "score", "active", "count", "ratio"}; + int nrow = 5; + FrameBlock in = alloc(schema, names, nrow); + for( int r=0; r readBuffered() + FrameBlock buffered = new FrameReaderDeltaParallel() { + @Override protected boolean useDirectPath(DeltaKernelUtils.ScanHandle h) { return false; } + }.readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + + assertFramesEqual(serial, buffered); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void factoryRoutesDeltaToParallelWhenEnabled() { + //the factory must pick the parallel frame reader iff parallel CP read is enabled + CompilerConfig cc = ConfigurationManager.getCompilerConfig(); + try { + cc.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, true); + ConfigurationManager.setLocalConfig(cc); + FrameReader par = FrameReaderFactory.createFrameReader(FileFormat.DELTA); + assertTrue("expected FrameReaderDeltaParallel when parallel read enabled", + par instanceof FrameReaderDeltaParallel); + + cc.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, false); + ConfigurationManager.setLocalConfig(cc); + FrameReader ser = FrameReaderFactory.createFrameReader(FileFormat.DELTA); + assertTrue("expected serial FrameReaderDelta when parallel read disabled", + ser instanceof FrameReaderDelta && !(ser instanceof FrameReaderDeltaParallel)); + } + finally { + ConfigurationManager.clearLocalConfigs(); + } + } + + @Test + public void readerBatchSizeConfigRoundTrips() throws Exception { + //a non-default reader batch size must not change the result (more, smaller + //batches exercise the per-batch extract/concatenate loop more often). + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_READER_BATCH_SIZE, "128"); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_frame_bs_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + assertEquals("config getter reflects the override", + 128, ConfigurationManager.getDeltaReaderBatchSize()); + + FrameBlock in = genMixedFrame(5000, 31); + new FrameWriterDelta().writeFrameToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns()); + FrameBlock out = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + assertFramesEqual(in, out); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void writerTargetFileSizeConfigProducesMoreFiles() throws Exception { + //a smaller configured target file size must make the writer roll more + //data files for the same frame (the lever the parallel reader relies on). + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(SMALL_TARGET_FILE_SIZE)); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_frame_cfg_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + assertEquals("config getter reflects the override", + SMALL_TARGET_FILE_SIZE, ConfigurationManager.getDeltaWriterTargetFileSize()); + + FrameBlock in = genMixedFrame(ROWS_MULTI_FILE, 41); + new FrameWriterDelta().writeFrameToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns()); + assertMultiFile(tablePath); + + //data still round-trips correctly with the custom layout + FrameBlock out = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + assertFramesEqual(in, out); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void emptyFrameRoundTrip() throws Exception { + //a schema-only Delta table (no data files, 0 rows); the reader must + //rebuild empty typed columns and discover the schema/names from the table. + ValueType[] schema = {ValueType.STRING, ValueType.FP64, ValueType.INT64}; + String[] names = {"s", "d", "k"}; + DataType[] dtypes = {StringType.STRING, DoubleType.DOUBLE, LongType.LONG}; + + Path dir = Files.createTempDirectory("sysds_delta_frame_empty_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + writeEmptyTable(tablePath, names, dtypes); + FrameBlock out = new FrameReaderDelta().readFrameFromHDFS(tablePath, NO_SCHEMA, NO_NAMES, -1, -1); + assertEquals("rows", 0, out.getNumRows()); + assertEquals("cols", schema.length, out.getNumColumns()); + for( int c=0; c writer must reject + new FrameWriterDelta().writeFrameToHDFS(fb, tablePath, + fb.getNumRows() + 1, fb.getNumColumns()); + fail("expected an IOException for a frame/metadata dimension mismatch"); + } + catch(IOException ex) { + assertTrue("message should mention the dimension mismatch, got: " + ex.getMessage(), + ex.getMessage() != null && ex.getMessage().contains("dimensions mismatch")); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readFromInputStreamUnsupported() throws Exception { + //Delta is a directory-based table format; stream reads are not supported + try { + new FrameReaderDelta().readFrameFromInputStream(null, NO_SCHEMA, NO_NAMES, -1, -1); + fail("expected UnsupportedOperationException for a Delta input-stream read"); + } + catch(UnsupportedOperationException ex) { + //expected: must throw before touching the (null) stream + } + } + + @Test + public void parallelReadStringNullsMatchSerialMultiFile() throws Exception { + //string nulls across a multi-file table: the parallel direct path must + //reproduce the serial read cell-for-cell (assertFramesEqual uses + //Objects.equals, so nulls are compared faithfully). + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(SMALL_TARGET_FILE_SIZE)); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_frame_parnull_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + ValueType[] schema = {ValueType.STRING, ValueType.INT64}; + String[] names = {"s", "k"}; + int nrow = ROWS_MULTI_FILE; + FrameBlock in = alloc(schema, names, nrow); + for( int r=0; r s = Files.walk(new File(tablePath).toPath()) ) { + files = s.filter(p -> p.toString().endsWith(".parquet")).count(); + } + assertTrue("expected a multi-file Delta table to exercise the parallel path, got " + files, + files > 1); + } + + private static void assertFramesEqual(FrameBlock expected, FrameBlock actual) { + assertEquals("rows", expected.getNumRows(), actual.getNumRows()); + assertEquals("cols", expected.getNumColumns(), actual.getNumColumns()); + int ncol = expected.getNumColumns(); + for( int c=0; c empty() { + return new CloseableIterator() { + @Override public boolean hasNext() { return false; } + @Override public FilteredColumnarBatch next() { throw new NoSuchElementException(); } + @Override public void close() {} + }; + } + + /** Writes a single date column (kernel stores dates as INT32 days) used to + * assert the frame reader rejects a non-mappable column type. */ + private static void writeDateColumn(String tablePath, int[] days) throws Exception { + Engine engine = DeltaKernelUtils.createEngine(); + final StructType schema = new StructType().add("d", DateType.DATE, false); + ColumnarBatch batch = new ColumnarBatch() { + @Override public StructType getSchema() { return schema; } + @Override public int getSize() { return days.length; } + @Override public ColumnVector getColumnVector(int ordinal) { return new DateVector(days); } + }; + FilteredColumnarBatch fcb = new FilteredColumnarBatch(batch, Optional.empty()); + DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(tablePath), schema, singleton(fcb)); + } + + /** Writes a short column and a byte column (kernel stores these as 16/8-bit + * integers) used to assert the frame reader coerces both to INT32. */ + private static void writeShortByteColumns(String tablePath, short[] shorts, byte[] bytes) throws Exception { + Engine engine = DeltaKernelUtils.createEngine(); + final StructType schema = new StructType() + .add("sh", ShortType.SHORT, false) + .add("by", ByteType.BYTE, false); + ColumnarBatch batch = new ColumnarBatch() { + @Override public StructType getSchema() { return schema; } + @Override public int getSize() { return shorts.length; } + @Override public ColumnVector getColumnVector(int ordinal) { + return (ordinal == 0) ? new ShortVector(shorts) : new ByteVector(bytes); + } + }; + FilteredColumnarBatch fcb = new FilteredColumnarBatch(batch, Optional.empty()); + DeltaKernelUtils.commit(engine, DeltaKernelUtils.qualify(tablePath), schema, singleton(fcb)); + } + + private static CloseableIterator singleton(FilteredColumnarBatch fcb) { + return new CloseableIterator() { + private boolean _done = false; + @Override public boolean hasNext() { return !_done; } + @Override public FilteredColumnarBatch next() { + if( _done ) throw new NoSuchElementException(); + _done = true; + return fcb; + } + @Override public void close() {} + }; + } + + /** Column view exposing an int[] as a Delta date column. */ + private static class DateVector implements ColumnVector { + private final int[] _days; + DateVector(int[] days) { _days = days; } + @Override public DataType getDataType() { return DateType.DATE; } + @Override public int getSize() { return _days.length; } + @Override public boolean isNullAt(int rowId) { return false; } + @Override public int getInt(int rowId) { return _days[rowId]; } + @Override public void close() {} + } + + /** Column view exposing a short[] as a Delta short column. */ + private static class ShortVector implements ColumnVector { + private final short[] _vals; + ShortVector(short[] vals) { _vals = vals; } + @Override public DataType getDataType() { return ShortType.SHORT; } + @Override public int getSize() { return _vals.length; } + @Override public boolean isNullAt(int rowId) { return false; } + @Override public short getShort(int rowId) { return _vals[rowId]; } + @Override public void close() {} + } + + /** Column view exposing a byte[] as a Delta byte column. */ + private static class ByteVector implements ColumnVector { + private final byte[] _vals; + ByteVector(byte[] vals) { _vals = vals; } + @Override public DataType getDataType() { return ByteType.BYTE; } + @Override public int getSize() { return _vals.length; } + @Override public boolean isNullAt(int rowId) { return false; } + @Override public byte getByte(int rowId) { return _vals[rowId]; } + @Override public void close() {} + } +} diff --git a/src/test/java/org/apache/sysds/test/component/io/DeltaMatrixReadWriteTest.java b/src/test/java/org/apache/sysds/test/component/io/DeltaMatrixReadWriteTest.java new file mode 100644 index 00000000000..54a3bcf6334 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/io/DeltaMatrixReadWriteTest.java @@ -0,0 +1,595 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.NoSuchElementException; +import java.util.Optional; + +import org.apache.commons.io.FileUtils; +import org.apache.sysds.common.Types.FileFormat; +import org.apache.sysds.conf.CompilerConfig; +import org.apache.sysds.conf.CompilerConfig.ConfigType; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.conf.DMLConfig; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.io.DeltaKernelUtils; +import org.apache.sysds.runtime.io.MatrixReader; +import org.apache.sysds.runtime.io.MatrixReaderFactory; +import org.apache.sysds.runtime.io.ReaderDelta; +import org.apache.sysds.runtime.io.ReaderDeltaParallel; +import org.apache.sysds.runtime.io.WriterDelta; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; + +/** + * Direct (no DML) round-trip tests for the native Delta Kernel based matrix + * reader/writer. Each test writes a MatrixBlock to a fresh local Delta table + * directory and reads it back, asserting dimensions and values match. + */ +public class DeltaMatrixReadWriteTest { + + //small writer target file size (bytes) used to force a multi-file table + //layout cheaply, instead of brute-forcing huge row counts. + private static final long SMALL_TARGET_FILE_SIZE = 256L * 1024; + private static final int ROWS_MULTI_FILE = 100_000; + + private static MatrixBlock writeThenRead(MatrixBlock in) throws Exception { + Path dir = Files.createTempDirectory("sysds_delta_"); + //WriterDelta creates the table at the given (empty) directory + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + WriterDelta writer = new WriterDelta(); + writer.writeMatrixToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + + MatrixReader reader = new ReaderDelta(); + return reader.readMatrixFromHDFS(tablePath, in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void parallelReadMatchesSerialMultiFile() throws Exception { + //force a multi-file table cheaply via a small writer target file size + //(rather than a huge row count), so the parallel per-file path is + //actually exercised rather than falling back to serial. + MatrixBlock in = TestUtils.generateTestMatrixBlock(ROWS_MULTI_FILE, 8, -10, 10, 1.0, 13); + in.recomputeNonZeros(); + + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(SMALL_TARGET_FILE_SIZE)); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_par_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new WriterDelta().writeMatrixToHDFS(in, tablePath, + in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + + //sanity: confirm the table really is split across multiple files + long files; + try( java.util.stream.Stream s = Files.walk(new File(tablePath).toPath()) ) { + files = s.filter(p -> p.toString().endsWith(".parquet")).count(); + } + assertTrue("expected a multi-file Delta table to exercise the parallel path, got " + files, + files > 1); + + MatrixBlock serial = new ReaderDelta() + .readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + MatrixBlock parallel = new ReaderDeltaParallel() + .readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + + assertEquals("rows", serial.getNumRows(), parallel.getNumRows()); + assertEquals("cols", serial.getNumColumns(), parallel.getNumColumns()); + assertEquals("nnz", serial.getNonZeros(), parallel.getNonZeros()); + TestUtils.compareMatrices(serial, parallel, 0, "serial-vs-parallel"); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void roundTripDenseSmall() throws Exception { + MatrixBlock in = new MatrixBlock(3, 4, false); + double v = 1.0; + for( int i=0; i<3; i++ ) + for( int j=0; j<4; j++ ) + in.set(i, j, v++); + in.recomputeNonZeros(); + + MatrixBlock out = writeThenRead(in); + assertEquals("rows", 3, out.getNumRows()); + assertEquals("cols", 4, out.getNumColumns()); + TestUtils.compareMatrices(in, out, 1e-12, "dense-small"); + } + + @Test + public void roundTripDenseRandom() throws Exception { + MatrixBlock in = TestUtils.generateTestMatrixBlock(500, 17, -10, 10, 1.0, 7); + MatrixBlock out = writeThenRead(in); + assertEquals("rows", in.getNumRows(), out.getNumRows()); + assertEquals("cols", in.getNumColumns(), out.getNumColumns()); + TestUtils.compareMatrices(in, out, 1e-12, "dense-random"); + } + + @Test + public void roundTripSparseRandom() throws Exception { + //values written are dense parquet, but exercise a sparse-ish input + MatrixBlock in = TestUtils.generateTestMatrixBlock(1200, 9, -5, 5, 0.1, 13); + in.recomputeNonZeros(); + MatrixBlock out = writeThenRead(in); + assertEquals("rows", in.getNumRows(), out.getNumRows()); + assertEquals("cols", in.getNumColumns(), out.getNumColumns()); + TestUtils.compareMatrices(in, out, 1e-12, "sparse-random"); + } + + @Test + public void roundTripMultiBatch() throws Exception { + //more rows than the writer batch size (4096) to exercise chunking + MatrixBlock in = TestUtils.generateTestMatrixBlock(10000, 5, 0, 100, 1.0, 1); + MatrixBlock out = writeThenRead(in); + assertEquals("rows", 10000, out.getNumRows()); + assertEquals("cols", 5, out.getNumColumns()); + TestUtils.compareMatrices(in, out, 1e-12, "multi-batch"); + } + + @Test + public void readDiscoversUnknownDimensions() throws Exception { + MatrixBlock in = TestUtils.generateTestMatrixBlock(123, 6, -1, 1, 1.0, 3); + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new WriterDelta().writeMatrixToHDFS(in, tablePath, in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + //pass -1 dimensions: the reader must discover them from the table + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", 123, out.getNumRows()); + assertEquals("cols", 6, out.getNumColumns()); + TestUtils.compareMatrices(in, out, 1e-12, "unknown-dims"); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void emptyMatrixRoundTrip() throws Exception { + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new WriterDelta().writeEmptyMatrixToHDFS(tablePath, 0, 4, -1); + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", 0, out.getNumRows()); + assertEquals("cols", 4, out.getNumColumns()); + assertEquals("nnz", 0, out.getNonZeros()); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readNonDoubleNumericColumns() throws Exception { + //tables produced by external tools (or the frame writer) can carry + //long/int/boolean columns; the matrix reader must coerce them to double + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + double[] longVals = {1, -2, 1_000_000_000L, 0}; + double[] intVals = {7, -8, 123456, 0}; + double[] boolVals = {1, 0, 1, 0}; + writeTypedColumns(tablePath, + new DataType[] {LongType.LONG, IntegerType.INTEGER, BooleanType.BOOLEAN}, + new double[][] {longVals, intVals, boolVals}); + + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", 4, out.getNumRows()); + assertEquals("cols", 3, out.getNumColumns()); + for( int r=0; r<4; r++ ) { + assertEquals("long col r" + r, longVals[r], out.get(r, 0), 0.0); + assertEquals("int col r" + r, intVals[r], out.get(r, 1), 0.0); + assertEquals("bool col r" + r, boolVals[r], out.get(r, 2), 0.0); + } + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void rewriteSamePathReplacesData() throws Exception { + //writing to a path that already holds a Delta table must fully replace it + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + MatrixBlock first = TestUtils.generateTestMatrixBlock(50, 8, 0, 100, 1.0, 1); + new WriterDelta().writeMatrixToHDFS(first, tablePath, 50, 8, -1, first.getNonZeros()); + + //second write has different dimensions and values + MatrixBlock second = TestUtils.generateTestMatrixBlock(20, 3, -5, 5, 1.0, 2); + new WriterDelta().writeMatrixToHDFS(second, tablePath, 20, 3, -1, second.getNonZeros()); + + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", 20, out.getNumRows()); + assertEquals("cols", 3, out.getNumColumns()); + TestUtils.compareMatrices(second, out, 1e-12, "rewrite-replace"); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void parallelBufferedPathMatchesSerial() throws Exception { + //the direct fast path is always taken for SystemDS-written tables (exact + //row stats, no deletion vectors); force the buffered fallback to exercise + //its per-file decode + serial concatenation and assert it matches serial. + //force a multi-file table cheaply via a small writer target file size. + MatrixBlock in = TestUtils.generateTestMatrixBlock(ROWS_MULTI_FILE, 8, -10, 10, 1.0, 23); + in.recomputeNonZeros(); + + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(SMALL_TARGET_FILE_SIZE)); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_buf_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new WriterDelta().writeMatrixToHDFS(in, tablePath, + in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + + long files; + try( java.util.stream.Stream s = Files.walk(new File(tablePath).toPath()) ) { + files = s.filter(p -> p.toString().endsWith(".parquet")).count(); + } + assertTrue("expected a multi-file Delta table, got " + files, files > 1); + + MatrixBlock serial = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + //subclass that always declines the direct path -> readBuffered() + MatrixBlock buffered = new ReaderDeltaParallel() { + @Override protected boolean useDirectPath(DeltaKernelUtils.ScanHandle h) { return false; } + }.readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + + assertEquals("rows", serial.getNumRows(), buffered.getNumRows()); + assertEquals("cols", serial.getNumColumns(), buffered.getNumColumns()); + assertEquals("nnz", serial.getNonZeros(), buffered.getNonZeros()); + TestUtils.compareMatrices(serial, buffered, 0, "serial-vs-buffered"); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void writerTargetFileSizeConfigProducesMoreFiles() throws Exception { + //a smaller configured target file size must make the writer roll more + //data files for the same matrix (the lever the parallel reader relies on). + MatrixBlock in = TestUtils.generateTestMatrixBlock(400_000, 16, -10, 10, 1.0, 7); + in.recomputeNonZeros(); + + //isolate the override in a fresh thread-local config (restored in finally) + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(1L * 1024 * 1024)); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_cfg_"); + try { + assertEquals("config getter reflects the override", + 1L * 1024 * 1024, ConfigurationManager.getDeltaWriterTargetFileSize()); + + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + new WriterDelta().writeMatrixToHDFS(in, tablePath, + in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + long files; + try( java.util.stream.Stream s = Files.walk(new File(tablePath).toPath()) ) { + files = s.filter(p -> p.toString().endsWith(".parquet")).count(); + } + assertTrue("expected >1 data file with a 1MB target, got " + files, files > 1); + + //data still round-trips correctly with the custom layout + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + TestUtils.compareMatrices(in, out, 1e-12, "small-target-roundtrip"); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readerBatchSizeConfigRoundTrips() throws Exception { + //a non-default reader batch size must not change the result (more, smaller + //batches exercise the per-batch extract/concatenate loop more often). + MatrixBlock in = TestUtils.generateTestMatrixBlock(5000, 7, -10, 10, 1.0, 11); + //isolate the override in a fresh thread-local config (restored in finally) + DMLConfig conf = new DMLConfig(); + conf.setTextValue(DMLConfig.DELTA_READER_BATCH_SIZE, "128"); + ConfigurationManager.setLocalConfig(conf); + Path dir = Files.createTempDirectory("sysds_delta_bs_"); + try { + assertEquals("config getter reflects the override", + 128, ConfigurationManager.getDeltaReaderBatchSize()); + + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + new WriterDelta().writeMatrixToHDFS(in, tablePath, + in.getNumRows(), in.getNumColumns(), -1, in.getNonZeros()); + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + TestUtils.compareMatrices(in, out, 1e-12, "small-batch-roundtrip"); + } + finally { + ConfigurationManager.clearLocalConfigs(); + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void factoryRoutesDeltaToParallelWhenEnabled() { + //the factory must pick the parallel reader iff parallel CP read is enabled + CompilerConfig cc = ConfigurationManager.getCompilerConfig(); + try { + cc.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, true); + ConfigurationManager.setLocalConfig(cc); + MatrixReader par = MatrixReaderFactory.createMatrixReader(FileFormat.DELTA); + assertTrue("expected ReaderDeltaParallel when parallel read enabled", + par instanceof ReaderDeltaParallel); + + cc.set(ConfigType.PARALLEL_CP_READ_TEXTFORMATS, false); + ConfigurationManager.setLocalConfig(cc); + MatrixReader ser = MatrixReaderFactory.createMatrixReader(FileFormat.DELTA); + assertTrue("expected serial ReaderDelta when parallel read disabled", + ser instanceof ReaderDelta && !(ser instanceof ReaderDeltaParallel)); + } + finally { + ConfigurationManager.clearLocalConfigs(); + } + } + + @Test + public void readFloatColumnsCoercedToDouble() throws Exception { + //float columns must be widened to double on read (exact-representable values) + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + double[] f0 = {1.5, -2.25, 0.0, 1024.5}; + double[] f1 = {-0.5, 3.75, 100.125, -7.0}; + writeTypedColumns(tablePath, + new DataType[] {FloatType.FLOAT, FloatType.FLOAT}, + new double[][] {f0, f1}); + + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", 4, out.getNumRows()); + assertEquals("cols", 2, out.getNumColumns()); + for( int r=0; r<4; r++ ) { + assertEquals("f0 r" + r, f0[r], out.get(r, 0), 0.0); + assertEquals("f1 r" + r, f1[r], out.get(r, 1), 0.0); + } + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readShortByteColumnsCoercedToDouble() throws Exception { + //short/byte columns must be coerced to double on read, exercising the + //T_SHORT / T_BYTE branches of ReaderDelta.getDoubleValue. + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + double[] shortVals = {1, -2, 30000, 0}; + double[] byteVals = {7, -8, 120, 0}; + writeTypedColumns(tablePath, + new DataType[] {ShortType.SHORT, ByteType.BYTE}, + new double[][] {shortVals, byteVals}); + + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", 4, out.getNumRows()); + assertEquals("cols", 2, out.getNumColumns()); + for( int r=0; r<4; r++ ) { + assertEquals("short col r" + r, shortVals[r], out.get(r, 0), 0.0); + assertEquals("byte col r" + r, byteVals[r], out.get(r, 1), 0.0); + } + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void writerRejectsDimensionMismatch() throws Exception { + //WriterDelta validates that the passed rlen/clen match the MatrixBlock + //and rejects a mismatch with an IOException. + MatrixBlock in = TestUtils.generateTestMatrixBlock(10, 4, -1, 1, 1.0, 5); + in.recomputeNonZeros(); + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + new WriterDelta().writeMatrixToHDFS(in, tablePath, 11, 4, -1, in.getNonZeros()); + fail("expected an IOException for mismatched matrix dimensions"); + } + catch(java.io.IOException ex) { + assertTrue("message should mention the dimension mismatch, got: " + ex.getMessage(), + ex.getMessage() != null && ex.getMessage().contains("dimensions mismatch")); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readNullCellsBecomeZero() throws Exception { + //nullable numeric columns with null cells must read back as 0.0 + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + double[] vals = {3.0, 7.0, 9.0, 11.0}; + boolean[] nulls = {false, true, false, true}; + writeNullableDoubleColumn(tablePath, vals, nulls); + + MatrixBlock out = new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + assertEquals("rows", 4, out.getNumRows()); + assertEquals("cols", 1, out.getNumColumns()); + for( int r=0; r<4; r++ ) + assertEquals("r" + r, nulls[r] ? 0.0 : vals[r], out.get(r, 0), 0.0); + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + @Test + public void readStringColumnRejected() throws Exception { + //string columns cannot back an all-double matrix -> reader must reject them + Path dir = Files.createTempDirectory("sysds_delta_"); + String tablePath = new File(dir.toFile(), "table").getAbsolutePath(); + try { + writeStringColumn(tablePath, new String[] {"a", "b", "c"}); + try { + new ReaderDelta().readMatrixFromHDFS(tablePath, -1, -1, -1, -1); + fail("expected a DMLRuntimeException for a non-numeric (string) Delta column"); + } + catch(DMLRuntimeException ex) { + assertTrue("message should mention the non-numeric column, got: " + ex.getMessage(), + ex.getMessage() != null && ex.getMessage().contains("non-numeric")); + } + } + finally { + FileUtils.deleteQuietly(dir.toFile()); + } + } + + /** Writes a single-batch Delta table with one column per given (type, values) pair. */ + private static void writeTypedColumns(String tablePath, DataType[] types, double[][] vals) throws Exception { + Engine engine = DeltaKernelUtils.createEngine(); + StructType schema = new StructType(); + for( int c=0; c singleton(FilteredColumnarBatch fcb) { + return new CloseableIterator() { + private boolean _done = false; + @Override public boolean hasNext() { return !_done; } + @Override public FilteredColumnarBatch next() { + if( _done ) throw new NoSuchElementException(); + _done = true; + return fcb; + } + @Override public void close() {} + }; + } + + /** Minimal in-memory columnar batch backed by per-column double[] values, with + * an optional per-column null mask ({@code nulls==null} => no nulls). */ + private static class TypedBatch implements ColumnarBatch { + private final StructType _schema; + private final DataType[] _types; + private final double[][] _vals; + private final boolean[][] _nulls; + TypedBatch(StructType schema, DataType[] types, double[][] vals, boolean[][] nulls) { + _schema = schema; _types = types; _vals = vals; _nulls = nulls; + } + @Override public StructType getSchema() { return _schema; } + @Override public int getSize() { return _vals[0].length; } + @Override public ColumnVector getColumnVector(int ordinal) { + return new TypedVector(_types[ordinal], _vals[ordinal], + _nulls == null ? null : _nulls[ordinal]); + } + } + + /** Column view exposing a double[] as the requested Delta primitive type. */ + private static class TypedVector implements ColumnVector { + private final DataType _type; + private final double[] _vals; + private final boolean[] _nulls; + TypedVector(DataType type, double[] vals, boolean[] nulls) { _type = type; _vals = vals; _nulls = nulls; } + @Override public DataType getDataType() { return _type; } + @Override public int getSize() { return _vals.length; } + @Override public boolean isNullAt(int rowId) { return _nulls != null && _nulls[rowId]; } + @Override public double getDouble(int rowId) { return _vals[rowId]; } + @Override public float getFloat(int rowId) { return (float) _vals[rowId]; } + @Override public long getLong(int rowId) { return (long) _vals[rowId]; } + @Override public int getInt(int rowId) { return (int) _vals[rowId]; } + @Override public short getShort(int rowId) { return (short) _vals[rowId]; } + @Override public byte getByte(int rowId) { return (byte) _vals[rowId]; } + @Override public boolean getBoolean(int rowId) { return _vals[rowId] != 0; } + @Override public void close() {} + } + + /** Column view exposing a String[] as a Delta string column. */ + private static class StringVector implements ColumnVector { + private final String[] _vals; + StringVector(String[] vals) { _vals = vals; } + @Override public DataType getDataType() { return StringType.STRING; } + @Override public int getSize() { return _vals.length; } + @Override public boolean isNullAt(int rowId) { return _vals[rowId] == null; } + @Override public String getString(int rowId) { return _vals[rowId]; } + @Override public void close() {} + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/io/delta/DeltaReadWriteTest.java b/src/test/java/org/apache/sysds/test/functions/io/delta/DeltaReadWriteTest.java new file mode 100644 index 00000000000..a4013c3672d --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/delta/DeltaReadWriteTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.io.delta; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.HashMap; + +import org.apache.sysds.runtime.controlprogram.caching.CacheStatistics; +import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +/** + * End-to-end DML test of the native Delta read/write path. + * + *

The write and the read are run as two separate SystemDS executions + * on purpose. If they shared a single script/process, SystemDS would reuse the + * still-materialized in-memory matrix for the subsequent read and never invoke + * {@link org.apache.sysds.runtime.io.ReaderDelta} at all (verified: the cache + * reports 0 HDFS hits in that case). Splitting the executions forces a genuine + * read from disk, and we additionally assert via {@link CacheStatistics} that + * the read run actually performed HDFS reads (the Delta table + the text + * reference) rather than serving the matrix from cache.

+ */ +public class DeltaReadWriteTest extends AutomatedTestBase { + + private final static String TEST_DIR = "functions/io/delta/"; + private final static String TEST_CLASS_DIR = TEST_DIR + DeltaReadWriteTest.class.getSimpleName() + "/"; + private final static String WRITE_NAME = "DeltaWrite"; + private final static String READ_NAME = "DeltaReadCompare"; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(WRITE_NAME, + new TestConfiguration(TEST_CLASS_DIR, WRITE_NAME, new String[] { "ref" })); + addTestConfiguration(READ_NAME, + new TestConfiguration(TEST_CLASS_DIR, READ_NAME, new String[] { "R" })); + } + + @Test + public void testDenseRoundTrip() { + runDeltaRoundTrip(200, 12, 1.0); + } + + @Test + public void testSparseRoundTrip() { + runDeltaRoundTrip(640, 8, 0.2); + } + + @Test + public void testMultiBatchRoundTrip() { + runDeltaRoundTrip(9000, 4, 1.0); + } + + private void runDeltaRoundTrip(int rows, int cols, double sparsity) { + try { + String HOME = SCRIPT_DIR + TEST_DIR; + + // ---- phase 1: write the matrix as a Delta table + text reference ---- + getAndLoadTestConfiguration(WRITE_NAME); + String deltaPath = output("deltaTable"); + String refPath = output("ref"); + fullDMLScriptName = HOME + WRITE_NAME + ".dml"; + programArgs = new String[] { "-stats", "-args", + String.valueOf(rows), String.valueOf(cols), String.valueOf(sparsity), + deltaPath, refPath }; + runTest(true, false, null, -1); + + // the write run must have materialized two matrices to disk (the Delta + // table under test + the text reference); WriterDelta genuinely hitting + // HDFS is what produces these write-side cache statistics. + long hdfsWrites = CacheStatistics.getHDFSWrites(); + assertTrue("expected >= 2 HDFS writes in the write run (delta + reference), got " + + hdfsWrites, hdfsWrites >= 2); + // and a real Delta table (transaction log) must have been created + assertTrue("missing Delta transaction log under " + deltaPath, + new File(deltaPath, "_delta_log").isDirectory()); + + // ---- phase 2: fresh execution reads the Delta table and compares ---- + getAndLoadTestConfiguration(READ_NAME); + fullDMLScriptName = HOME + READ_NAME + ".dml"; + programArgs = new String[] { "-stats", "-args", + deltaPath, refPath, output("R") }; + runTest(true, false, null, -1); + + // the read run must have materialized two matrices from disk (the Delta + // table under test + the text reference); a cached/short-circuited read + // would report fewer HDFS hits and fail here. + long hdfsReads = CacheStatistics.getHDFSHits(); + assertTrue("expected >= 2 HDFS reads in the read run (delta + reference), got " + + hdfsReads, hdfsReads >= 2); + + HashMap R = readDMLMatrixFromOutputDir("R"); + //text-cell output omits exact zeros, so a missing cell means 0.0 + double diff = R.getOrDefault(new CellIndex(1, 1), 0.0); + double nrow = R.getOrDefault(new CellIndex(1, 2), 0.0); + double ncol = R.getOrDefault(new CellIndex(1, 3), 0.0); + + assertEquals("reconstruction error", 0.0, diff, 1e-12); + assertEquals("discovered rows", rows, (int) nrow); + assertEquals("discovered cols", cols, (int) ncol); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/io/delta/FrameDeltaReadWriteTest.java b/src/test/java/org/apache/sysds/test/functions/io/delta/FrameDeltaReadWriteTest.java new file mode 100644 index 00000000000..0a6bba5a163 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/delta/FrameDeltaReadWriteTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.io.delta; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.HashMap; + +import org.apache.sysds.runtime.controlprogram.caching.CacheStatistics; +import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +/** + * End-to-end DML test of the native Delta frame read/write path. + * + *

As in the matrix variant, the write and the read run as two separate + * SystemDS executions so the read is a genuine disk read rather than an + * in-memory cache hit. We additionally assert via {@link CacheStatistics} that + * the write run wrote (delta + text reference) and the read run read (delta + + * text reference) from HDFS, so a short-circuited path would fail the test.

+ */ +public class FrameDeltaReadWriteTest extends AutomatedTestBase { + + private final static String TEST_DIR = "functions/io/delta/"; + private final static String TEST_CLASS_DIR = TEST_DIR + FrameDeltaReadWriteTest.class.getSimpleName() + "/"; + private final static String WRITE_NAME = "FrameDeltaWrite"; + private final static String READ_NAME = "FrameDeltaReadCompare"; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(WRITE_NAME, + new TestConfiguration(TEST_CLASS_DIR, WRITE_NAME, new String[] { "ref" })); + addTestConfiguration(READ_NAME, + new TestConfiguration(TEST_CLASS_DIR, READ_NAME, new String[] { "R" })); + } + + @Test + public void testDenseRoundTrip() { + runFrameDeltaRoundTrip(200, 12, 1.0); + } + + @Test + public void testSparseRoundTrip() { + runFrameDeltaRoundTrip(640, 8, 0.2); + } + + @Test + public void testMultiBatchRoundTrip() { + runFrameDeltaRoundTrip(9000, 4, 1.0); + } + + private void runFrameDeltaRoundTrip(int rows, int cols, double sparsity) { + try { + String HOME = SCRIPT_DIR + TEST_DIR; + + // ---- phase 1: write the frame as a Delta table + text reference ---- + getAndLoadTestConfiguration(WRITE_NAME); + String deltaPath = output("deltaTable"); + String refPath = output("ref"); + fullDMLScriptName = HOME + WRITE_NAME + ".dml"; + programArgs = new String[] { "-stats", "-args", + String.valueOf(rows), String.valueOf(cols), String.valueOf(sparsity), + deltaPath, refPath }; + runTest(true, false, null, -1); + + //the write run must materialize two objects to disk: the frame Delta + //table under test + the matrix text reference. FrameWriterDelta genuinely + //hitting HDFS is what produces the frame-side write statistic. + long hdfsWrites = CacheStatistics.getHDFSWrites(); + assertTrue("expected >= 2 HDFS writes in the write run (delta frame + reference), got " + + hdfsWrites, hdfsWrites >= 2); + //and a real Delta table (transaction log) must have been created + assertTrue("missing Delta transaction log under " + deltaPath, + new File(deltaPath, "_delta_log").isDirectory()); + + // ---- phase 2: fresh execution reads the Delta frame and compares ---- + getAndLoadTestConfiguration(READ_NAME); + fullDMLScriptName = HOME + READ_NAME + ".dml"; + programArgs = new String[] { "-stats", "-args", + deltaPath, refPath, output("R") }; + runTest(true, false, null, -1); + + long hdfsReads = CacheStatistics.getHDFSHits(); + assertTrue("expected >= 2 HDFS reads in the read run (delta + reference), got " + + hdfsReads, hdfsReads >= 2); + + HashMap R = readDMLMatrixFromOutputDir("R"); + double diff = R.getOrDefault(new CellIndex(1, 1), 0.0); + double nrow = R.getOrDefault(new CellIndex(1, 2), 0.0); + double ncol = R.getOrDefault(new CellIndex(1, 3), 0.0); + + assertEquals("reconstruction error", 0.0, diff, 1e-12); + assertEquals("discovered rows", rows, (int) nrow); + assertEquals("discovered cols", cols, (int) ncol); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + } +} diff --git a/src/test/scripts/functions/io/delta/DeltaReadCompare.dml b/src/test/scripts/functions/io/delta/DeltaReadCompare.dml new file mode 100644 index 00000000000..5caf992f39c --- /dev/null +++ b/src/test/scripts/functions/io/delta/DeltaReadCompare.dml @@ -0,0 +1,34 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Reader side of the native Delta round-trip test. Reads the Delta table +# (dimensions discovered from the transaction log) and the text reference, +# both genuine HDFS reads in a fresh process, and reports the elementwise +# reconstruction error together with the discovered dimensions. + +Y = read($1, format="delta") +Xref = read($2, format="text") + +R = matrix(0, rows=1, cols=3) +R[1,1] = sum(abs(Xref - Y)) # 0 if ReaderDelta reconstructed the matrix exactly +R[1,2] = nrow(Y) # discovered row count +R[1,3] = ncol(Y) # discovered column count +write(R, $3) diff --git a/src/test/scripts/functions/io/delta/DeltaWrite.dml b/src/test/scripts/functions/io/delta/DeltaWrite.dml new file mode 100644 index 00000000000..41c5b0899a7 --- /dev/null +++ b/src/test/scripts/functions/io/delta/DeltaWrite.dml @@ -0,0 +1,30 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Writer side of the native Delta round-trip test. Generates a matrix and +# materializes it twice: once as a Delta table (under test) and once as a +# plain text reference. Running the read/compare in a SEPARATE process is +# intentional: it prevents SystemDS from short-circuiting the subsequent +# read against the still-in-memory matrix, so ReaderDelta is actually used. + +X = rand(rows=$1, cols=$2, min=-5, max=5, seed=7, sparsity=$3) +write(X, $4, format="delta") +write(X, $5, format="text") diff --git a/src/test/scripts/functions/io/delta/FrameDeltaReadCompare.dml b/src/test/scripts/functions/io/delta/FrameDeltaReadCompare.dml new file mode 100644 index 00000000000..cdf1f0794fc --- /dev/null +++ b/src/test/scripts/functions/io/delta/FrameDeltaReadCompare.dml @@ -0,0 +1,35 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Reader side of the native Delta frame round-trip test. Reads the Delta table +# as a frame (schema + dimensions discovered from the transaction log) and the +# text matrix reference, both genuine HDFS reads in a fresh process, then +# reports the elementwise reconstruction error and the discovered dimensions. + +Y = read($1, data_type="frame", format="delta") +Xref = read($2, format="text") + +M = as.matrix(Y) +R = matrix(0, rows=1, cols=3) +R[1,1] = sum(abs(Xref - M)) # 0 if FrameReaderDelta reconstructed the frame exactly +R[1,2] = nrow(Y) # discovered row count +R[1,3] = ncol(Y) # discovered column count +write(R, $3) diff --git a/src/test/scripts/functions/io/delta/FrameDeltaWrite.dml b/src/test/scripts/functions/io/delta/FrameDeltaWrite.dml new file mode 100644 index 00000000000..5e152dde013 --- /dev/null +++ b/src/test/scripts/functions/io/delta/FrameDeltaWrite.dml @@ -0,0 +1,32 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Writer side of the native Delta frame round-trip test. Generates a matrix, +# converts it to a frame, and materializes it as a Delta table (under test). +# The same matrix is also written as a plain text reference. Running the +# read/compare in a SEPARATE process prevents SystemDS from short-circuiting +# the subsequent read against the in-memory frame, so FrameReaderDelta is +# actually exercised. + +X = rand(rows=$1, cols=$2, min=-5, max=5, seed=7, sparsity=$3) +F = as.frame(X) +write(F, $4, format="delta") +write(X, $5, format="text")