Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/javaTests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
"org.apache.sysds.test.applications.**",
"**.test.usertest.**",
"**.component.c**.** -Dtest-threadCount=1 -Dtest-forkCount=1",
"**.component.e**.**,**.component.f**.**,**.component.m**.**,**.component.o**.**",
"**.component.e**.**,**.component.f**.**,**.component.i**.**,**.component.m**.**,**.component.o**.**",
"**.component.p**.**,**.component.r**.**,**.component.s**.**,**.component.t**.**,**.component.u**.**",
"**.functions.a**.**,**.functions.binary.matrix.**,**.functions.binary.scalar.**,**.functions.binary.tensor.**",
"**.functions.blocks.**,**.functions.data.rand.**,",
Expand Down
33 changes: 33 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
<jackson.version>2.15.4</jackson.version>
<scala.version>2.12.18</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<delta-kernel.version>3.3.2</delta-kernel.version>
<parquet.version>1.13.1</parquet.version>
<maven.build.timestamp.format>yyyy-MM-dd HH:mm:ss z</maven.build.timestamp.format>
<project.build.outputTimestamp>1</project.build.outputTimestamp>
<enableGPU>false</enableGPU>
Expand Down Expand Up @@ -968,6 +970,37 @@
</profiles>

<dependencies>
<!-- Delta Lake native (Spark-free) read/write via Delta Kernel -->
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-api</artifactId>
<version>${delta-kernel.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-defaults</artifactId>
<version>${delta-kernel.version}</version>
</dependency>
<!-- Delta Kernel needs parquet >= 1.13 (the version Spark 3.5.x ships);
declared directly via ${parquet.version} so Maven's nearest-wins
mediation does not downgrade it to an older transitive version. The
jackson trio Delta Kernel relies on is pinned globally via
${jackson.version}. -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>org.jcuda</groupId>
<artifactId>jcuda</artifactId>
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/apache/sysds/common/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -878,14 +878,15 @@ public enum FileFormat {
HDF5, // Hierarchical Data Format (HDF)
COG, // Cloud-optimized GeoTIFF
PARQUET, // parquet format for columnar data storage
DELTA, // Delta Lake table (transaction log + parquet), read/written via Delta Kernel
UNKNOWN;

public boolean isIJV() {
return this == TEXT || this == MM;
}

public boolean isTextFormat() {
return this != BINARY && this != COMPRESSED;
return this != BINARY && this != COMPRESSED && this != DELTA;
}

public static boolean isTextFormat(String fmt) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/apache/sysds/conf/ConfigurationManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,16 @@ public static int getFederatedTimeout(){
return getDMLConfig().getIntValue(DMLConfig.FEDERATED_TIMEOUT);
}

/** @return rows per parquet read batch for the native Delta reader */
public static int getDeltaReaderBatchSize() {
return getDMLConfig().getIntValue(DMLConfig.DELTA_READER_BATCH_SIZE);
}

/** @return target data-file size (bytes) for the native Delta writer */
public static long getDeltaWriterTargetFileSize() {
return Long.parseLong(getDMLConfig().getTextValue(DMLConfig.DELTA_WRITER_TARGET_FILE_SIZE));
}

public static boolean isFederatedSSL(){
return getDMLConfig().getBooleanValue(DMLConfig.USE_SSL_FEDERATED_COMMUNICATION);
}
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/apache/sysds/conf/DMLConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ public class DMLConfig
public static final String CP_PARALLEL_OPS = "sysds.cp.parallel.ops";
public static final String CP_PARALLEL_IO = "sysds.cp.parallel.io";
public static final String IO_COMPRESSION_CODEC = "sysds.io.compression.encoding";
public static final String DELTA_READER_BATCH_SIZE = "sysds.io.delta.reader.batchsize"; // int: rows per parquet read batch
public static final String DELTA_WRITER_TARGET_FILE_SIZE = "sysds.io.delta.writer.targetfilesize"; // long: target data-file size in bytes (smaller -> more files -> more parallel-read throughput)
/** Default rows per parquet read batch (Delta Kernel default is 1024). A larger batch slightly
* reduces per-batch object/transform overhead; the dominant per-value decode cost is unaffected. */
public static final int DELTA_READER_BATCH_SIZE_DEFAULT = 4096;
/** Default target data-file size for the Delta writer in bytes (Delta Kernel default is 128MB). The
* native Delta read decode is CPU-bound and parallelizes across data files, so writing smaller files
* lets a parallel reader use more threads. 64MB roughly doubles the file count (and read parallelism)
* versus the default with only a modest increase in per-file metadata overhead. */
public static final long DELTA_WRITER_TARGET_FILE_SIZE_DEFAULT = 64L * 1024 * 1024;
public static final String PARALLEL_ENCODE = "sysds.parallel.encode"; // boolean: enable multi-threaded transformencode and apply
public static final String PARALLEL_ENCODE_STAGED = "sysds.parallel.encode.staged";
public static final String PARALLEL_ENCODE_APPLY_BLOCKS = "sysds.parallel.encode.applyBlocks";
Expand Down Expand Up @@ -158,6 +168,8 @@ public class DMLConfig
_defaultVals.put(CP_PARALLEL_OPS, "true" );
_defaultVals.put(CP_PARALLEL_IO, "true" );
_defaultVals.put(IO_COMPRESSION_CODEC, "none");
_defaultVals.put(DELTA_READER_BATCH_SIZE, String.valueOf(DELTA_READER_BATCH_SIZE_DEFAULT));
_defaultVals.put(DELTA_WRITER_TARGET_FILE_SIZE, String.valueOf(DELTA_WRITER_TARGET_FILE_SIZE_DEFAULT));
_defaultVals.put(PARALLEL_TOKENIZE, "false");
_defaultVals.put(PARALLEL_TOKENIZE_NUM_BLOCKS, "64");
_defaultVals.put(FRAME_TO_MATRIX_WARN_CAST, "false");
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/apache/sysds/parser/DMLTranslator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,8 @@ public void constructHops(StatementBlock sb) {
case CSV:
case LIBSVM:
case HDF5:
// write output in textcell format
case DELTA:
// columnar/text formats: no block layout (blocksize -1)
ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(), -1);
break;
case BINARY:
Expand Down
13 changes: 9 additions & 4 deletions src/main/java/org/apache/sysds/parser/DataExpression.java
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,10 @@ else if( getVarParam(READNNZPARAM) != null ) {

boolean isCOG = (formatTypeString != null && formatTypeString.equalsIgnoreCase(FileFormat.COG.toString()));

// Delta tables are self-describing (schema + dimensions discovered from the
// transaction log at read time), so dimensions are optional like CSV.
boolean isDelta = (formatTypeString != null && formatTypeString.equalsIgnoreCase(FileFormat.DELTA.toString()));

dataTypeString = (getVarParam(DATATYPEPARAM) == null) ? null : getVarParam(DATATYPEPARAM).toString();

if ( dataTypeString == null || dataTypeString.equalsIgnoreCase(Statement.MATRIX_DATA_TYPE)
Expand All @@ -1202,8 +1206,8 @@ else if( getVarParam(READNNZPARAM) != null ) {
// initialize size of target data identifier to UNKNOWN
getOutput().setDimensions(-1, -1);

if (!isCSV && !isLIBSVM && !isHDF5 && !isCOG && ConfigurationManager.getCompilerConfig()
.getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) //skip check for csv/libsvm format / jmlc api
if (!isCSV && !isLIBSVM && !isHDF5 && !isCOG && !isDelta && ConfigurationManager.getCompilerConfig()
.getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) //skip check for csv/libsvm/delta format / jmlc api
&& (getVarParam(READROWPARAM) == null || getVarParam(READCOLPARAM) == null) ) {
raiseValidateError("Missing or incomplete dimension information in read statement: "
+ mtdFileName, conditional, LanguageErrorCodes.INVALID_PARAMETERS);
Expand All @@ -1215,7 +1219,7 @@ && getVarParam(READCOLPARAM) instanceof ConstIdentifier)
// these are strings that are long values
Long dim1 = (getVarParam(READROWPARAM) == null) ? null : Long.valueOf( getVarParam(READROWPARAM).toString());
Long dim2 = (getVarParam(READCOLPARAM) == null) ? null : Long.valueOf( getVarParam(READCOLPARAM).toString());
if ( !isCSV && (dim1 < 0 || dim2 < 0) && ConfigurationManager
if ( !isCSV && !isDelta && (dim1 < 0 || dim2 < 0) && ConfigurationManager
.getCompilerConfig().getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) ) {
raiseValidateError("Invalid dimension information in read statement", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
}
Expand Down Expand Up @@ -1333,7 +1337,8 @@ else if (valueTypeString.equalsIgnoreCase(ValueType.UNKNOWN.name()))
}

//validate read filename
if (getVarParam(FORMAT_TYPE) == null || FileFormat.isTextFormat(getVarParam(FORMAT_TYPE).toString()))
if (getVarParam(FORMAT_TYPE) == null || FileFormat.isTextFormat(getVarParam(FORMAT_TYPE).toString())
|| checkFormatType(FileFormat.DELTA)) //delta: columnar, no block layout
getOutput().setBlocksize(-1);
else if (checkFormatType(FileFormat.BINARY, FileFormat.COMPRESSED, FileFormat.UNKNOWN)) {
if( getVarParam(ROWBLOCKCOUNTPARAM)!=null )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
Expand Down Expand Up @@ -203,9 +204,14 @@ protected FrameBlock readBlobFromHDFS(String fname, long[] dims) throws IOExcept
.createFrameReader(iimd.getFileFormat(), getFileFormatProperties())
.readFrameFromHDFS(fname, lschema, dc.getRows(), dc.getCols());

if(iimd.getFileFormat() == FileFormat.CSV)
//Delta and CSV discover dimensions (and Delta also schema) at read time, so
//refresh the cached metadata to reflect the materialized frame block.
if(iimd.getFileFormat() == FileFormat.CSV || iimd.getFileFormat() == FileFormat.DELTA) {
_metaData = _metaData instanceof MetaDataFormat ? new MetaDataFormat(data.getDataCharacteristics(),
iimd.getFileFormat()) : new MetaData(data.getDataCharacteristics());
if(iimd.getFileFormat() == FileFormat.DELTA)
_schema = data.getSchema();
}

// sanity check correct output
if(data == null)
Expand Down Expand Up @@ -293,6 +299,9 @@ protected void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatPro

FrameWriter writer = FrameWriterFactory.createFrameWriter(fmt, fprop);
writer.writeFrameToHDFS(_data, fname, getNumRows(), getNumColumns());

if(DMLScript.STATISTICS)
CacheStatistics.incrementHDFSWrites();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ protected MatrixBlock readBlobFromHDFS(String fname, long[] dims) throws IOExcep
DataConverter.readMatrixFromHDFS(fname, iimd.getFileFormat(),
rlen, clen, blen, mc.getNonZeros(), getFileFormatProperties());

if(iimd.getFileFormat() == FileFormat.CSV) {
if(iimd.getFileFormat() == FileFormat.CSV || iimd.getFileFormat() == FileFormat.DELTA) {
//dimensions/nnz are discovered at read time for these self-describing formats
_metaData = _metaData instanceof MetaDataFormat ? new MetaDataFormat(newData.getDataCharacteristics(),
iimd.getFileFormat()) : new MetaData(newData.getDataCharacteristics());
}
Expand Down
Loading
Loading