From f4b4ec80651c205a5dfc510dc3a239ab15a67840 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Wed, 30 Jul 2025 21:12:20 +0530 Subject: [PATCH 1/4] choose synchronous + use existing processWriteInstruction --- .../cp/VariableCPInstruction.java | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java index 8400ec54e6f..43ecf7ded81 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java @@ -41,11 +41,13 @@ import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysds.runtime.controlprogram.caching.TensorObject; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysds.runtime.data.TensorBlock; import org.apache.sysds.runtime.frame.data.FrameBlock; import org.apache.sysds.runtime.instructions.Instruction; import org.apache.sysds.runtime.instructions.InstructionUtils; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.io.FileFormatProperties; import org.apache.sysds.runtime.io.FileFormatPropertiesCSV; import org.apache.sysds.runtime.io.FileFormatPropertiesHDF5; @@ -1060,19 +1062,27 @@ private void processWriteInstruction(ExecutionContext ec) { HDFSTool.writeScalarToHDFS(ec.getScalarInput(getInput1()), fname); } else if( getInput1().getDataType() == DataType.MATRIX ) { - if( fmt == FileFormat.MM ) - writeMMFile(ec, fname); - else if( fmt == FileFormat.CSV ) - writeCSVFile(ec, fname); - else if(fmt == FileFormat.LIBSVM) - writeLIBSVMFile(ec, fname); - else if(fmt == FileFormat.HDF5) - writeHDF5File(ec, fname); + + MatrixObject mo = ec.getMatrixObject(getInput1().getName()); + LocalTaskQueue stream = mo.getStreamHandle(); + + if (stream != null) { + System.out.println("Write OOC instruction: " + getInput1().getName() ); + } else { - // Default behavior (text, binary) - MatrixObject mo = ec.getMatrixObject(getInput1().getName()); - int blen = Integer.parseInt(getInput4().getName()); - mo.exportData(fname, fmtStr, new FileFormatProperties(blen)); + if( fmt == FileFormat.MM ) + writeMMFile(ec, fname); + else if( fmt == FileFormat.CSV ) + writeCSVFile(ec, fname); + else if(fmt == FileFormat.LIBSVM) + writeLIBSVMFile(ec, fname); + else if(fmt == FileFormat.HDF5) + writeHDF5File(ec, fname); + else { + // Default behavior (text, binary) + int blen = Integer.parseInt(getInput4().getName()); + mo.exportData(fname, fmtStr, new FileFormatProperties(blen)); + } } } else if( getInput1().getDataType() == DataType.FRAME ) { From b5ad48c0df61b8d1491bd5a01f72fbccbd5b96b6 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Wed, 30 Jul 2025 22:00:26 +0530 Subject: [PATCH 2/4] add writer --- .../cp/VariableCPInstruction.java | 39 ++++++++++++++----- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java index 43ecf7ded81..3083d281af2 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java @@ -26,7 +26,9 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.Options; import org.apache.sysds.api.DMLScript; +import org.apache.sysds.api.mlcontext.MatrixMetadata; import org.apache.sysds.common.Opcodes; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.FileFormat; @@ -48,15 +50,7 @@ import org.apache.sysds.runtime.instructions.Instruction; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; -import org.apache.sysds.runtime.io.FileFormatProperties; -import org.apache.sysds.runtime.io.FileFormatPropertiesCSV; -import org.apache.sysds.runtime.io.FileFormatPropertiesHDF5; -import org.apache.sysds.runtime.io.FileFormatPropertiesLIBSVM; -import org.apache.sysds.runtime.io.ListReader; -import org.apache.sysds.runtime.io.ListWriter; -import org.apache.sysds.runtime.io.WriterHDF5; -import org.apache.sysds.runtime.io.WriterMatrixMarket; -import org.apache.sysds.runtime.io.WriterTextCSV; +import org.apache.sysds.runtime.io.*; import org.apache.sysds.runtime.lineage.LineageItem; import org.apache.sysds.runtime.lineage.LineageItemUtils; import org.apache.sysds.runtime.lineage.LineageTraceable; @@ -1067,7 +1061,32 @@ else if( getInput1().getDataType() == DataType.MATRIX ) { LocalTaskQueue stream = mo.getStreamHandle(); if (stream != null) { - System.out.println("Write OOC instruction: " + getInput1().getName() ); + System.out.println("Write OOC instruction: " + getInput1().getName() + "to file name: " + fname); + + try { + IndexedMatrixValue tmp = null; + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(fmt); + System.out.println("mo details: "+ mo.getDataCharacteristics()); + + + while((tmp = stream.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { + MatrixBlock mb = (MatrixBlock)tmp.getValue(); + + +// writer.writeMatrixToHDFS(tmp.getValue(), fname, FileFormat.HDF5, mo.getMetaData().getDataCharacteristics()); + writer.writeMatrixToHDFS(mb, fname, mb.getNumRows(), mb.getNumColumns(), mo.getBlocksize(),mb.getNonZeros()); +// writer.writeMatrixToHDFS((MatrixBlock)tmp.getValue(), fname, mc.getRows(), mc.getCols(), +// mc.getBlocksize(), mc.getNonZeros()); + System.out.println(tmp); + + } +// IOUtilFunctions.closeSilently(writer); + HDFSTool.writeMetaDataFile(fname + ".mtd", mo.getValueType(), + mo.getMetaData().getDataCharacteristics(), FileFormat.HDF5, _formatProperties); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } } else { if( fmt == FileFormat.MM ) From 575351a2539c50f6b612eb82bef5fb37e6d50353 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 31 Jul 2025 08:32:12 +0530 Subject: [PATCH 3/4] compare write operation with each single element --- .../org/apache/sysds/test/functions/ooc/UnaryTest.java | 10 ++++++---- src/test/scripts/functions/ooc/Unary.dml | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java index a81689af375..c30214e1ca2 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java @@ -87,16 +87,18 @@ public void testUnaryOperation(boolean rewrite) runTest(true, false, null, -1); HashMap dmlfile = readDMLMatrixFromOutputDir(OUTPUT_NAME); - Double result = dmlfile.get(new MatrixValue.CellIndex(1, 1)); + double expected = 0.0; for(int i = 0; i < rows; i++) { for(int j = 0; j < cols; j++) { - expected += Math.ceil(mb.get(i, j)); + Double dmlResult = dmlfile.get(new MatrixValue.CellIndex(i+1 , j+1 )); // Note: MM format is 1-based index + double actualValue = (dmlResult == null) ? 0.0 : dmlResult; + expected = Math.abs(Math.ceil(mb.get(i, j))); + Assert.assertEquals(expected, actualValue, 1e-10); + System.out.println("("+i+","+j+"): " + actualValue + "actual: " + expected); } } - Assert.assertEquals(expected, result, 1e-10); - String prefix = Instruction.OOC_INST_PREFIX; Assert.assertTrue("OOC wasn't used for RBLK", heavyHittersContainsString(prefix + Opcodes.RBLK)); diff --git a/src/test/scripts/functions/ooc/Unary.dml b/src/test/scripts/functions/ooc/Unary.dml index 6d34e8fd763..7f20e4133e4 100644 --- a/src/test/scripts/functions/ooc/Unary.dml +++ b/src/test/scripts/functions/ooc/Unary.dml @@ -24,6 +24,6 @@ X = read($1); #print(toString(X)) Y = ceil(X); #print(toString(Y)) -res = as.matrix(sum(Y)); +#res = as.matrix(sum(Y)); # Write the final matrix result -write(res, $2); +write(Y, $2); From f7b230c4c2df1bfd3439178021f3bef83a2c2540 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 31 Jul 2025 09:27:56 +0530 Subject: [PATCH 4/4] use file partition to work with multiple blocks in parts --- .../cp/VariableCPInstruction.java | 33 ++++++++++--------- .../sysds/test/functions/ooc/UnaryTest.java | 8 +++-- src/test/scripts/functions/ooc/Unary.dml | 6 ++-- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java index 3083d281af2..9d08d32521d 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java @@ -26,9 +26,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.fs.Options; import org.apache.sysds.api.DMLScript; -import org.apache.sysds.api.mlcontext.MatrixMetadata; import org.apache.sysds.common.Opcodes; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.FileFormat; @@ -50,11 +48,22 @@ import org.apache.sysds.runtime.instructions.Instruction; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; -import org.apache.sysds.runtime.io.*; +import org.apache.sysds.runtime.io.FileFormatProperties; +import org.apache.sysds.runtime.io.FileFormatPropertiesCSV; +import org.apache.sysds.runtime.io.FileFormatPropertiesHDF5; +import org.apache.sysds.runtime.io.FileFormatPropertiesLIBSVM; +import org.apache.sysds.runtime.io.ListReader; +import org.apache.sysds.runtime.io.ListWriter; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.MatrixWriterFactory; +import org.apache.sysds.runtime.io.WriterHDF5; +import org.apache.sysds.runtime.io.WriterMatrixMarket; +import org.apache.sysds.runtime.io.WriterTextCSV; import org.apache.sysds.runtime.lineage.LineageItem; import org.apache.sysds.runtime.lineage.LineageItemUtils; import org.apache.sysds.runtime.lineage.LineageTraceable; import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.meta.DataCharacteristics; import org.apache.sysds.runtime.meta.MatrixCharacteristics; import org.apache.sysds.runtime.meta.MetaData; @@ -1061,31 +1070,25 @@ else if( getInput1().getDataType() == DataType.MATRIX ) { LocalTaskQueue stream = mo.getStreamHandle(); if (stream != null) { - System.out.println("Write OOC instruction: " + getInput1().getName() + "to file name: " + fname); try { IndexedMatrixValue tmp = null; MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(fmt); - System.out.println("mo details: "+ mo.getDataCharacteristics()); - while((tmp = stream.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { MatrixBlock mb = (MatrixBlock)tmp.getValue(); + MatrixIndexes mi = tmp.getIndexes(); + // Construct a unique filename for each part-file inside the output directory + String partFilePath = fname + "/part-" + mi.getRowIndex() + "-" + mi.getColumnIndex(); -// writer.writeMatrixToHDFS(tmp.getValue(), fname, FileFormat.HDF5, mo.getMetaData().getDataCharacteristics()); - writer.writeMatrixToHDFS(mb, fname, mb.getNumRows(), mb.getNumColumns(), mo.getBlocksize(),mb.getNonZeros()); -// writer.writeMatrixToHDFS((MatrixBlock)tmp.getValue(), fname, mc.getRows(), mc.getCols(), -// mc.getBlocksize(), mc.getNonZeros()); - System.out.println(tmp); - + writer.writeMatrixToHDFS(mb, partFilePath, mb.getNumRows(), mb.getNumColumns(), (int) mb.getLength() , mb.getNonZeros()); } -// IOUtilFunctions.closeSilently(writer); - HDFSTool.writeMetaDataFile(fname + ".mtd", mo.getValueType(), + HDFSTool.writeMetaDataFile(fname + "/.mtd", mo.getValueType(), mo.getMetaData().getDataCharacteristics(), FileFormat.HDF5, _formatProperties); } catch(Exception ex) { - throw new DMLRuntimeException(ex); + throw new DMLRuntimeException("Failed to write OOC stream to " + fname, ex); } } else { diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java index c30214e1ca2..85f9166dc52 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java @@ -56,7 +56,7 @@ public void setUp() { } /** - * Test the sum of scalar multiplication, "sum(X*7)", with OOC backend. + * Test the unary operation, "ceil(X)", with OOC backend. */ @Test public void testUnary() { @@ -77,7 +77,7 @@ public void testUnaryOperation(boolean rewrite) programArgs = new String[] {"-explain", "-stats", "-ooc", "-args", input(INPUT_NAME), output(OUTPUT_NAME)}; - int rows = 1000, cols = 1000; + int rows = 5000, cols = 5000; MatrixBlock mb = MatrixBlock.randOperations(rows, cols, 1.0, -1, 1, "uniform", 7); MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(FileFormat.BINARY); writer.writeMatrixToHDFS(mb, input(INPUT_NAME), rows, cols, 1000, rows*cols); @@ -94,8 +94,8 @@ public void testUnaryOperation(boolean rewrite) Double dmlResult = dmlfile.get(new MatrixValue.CellIndex(i+1 , j+1 )); // Note: MM format is 1-based index double actualValue = (dmlResult == null) ? 0.0 : dmlResult; expected = Math.abs(Math.ceil(mb.get(i, j))); - Assert.assertEquals(expected, actualValue, 1e-10); System.out.println("("+i+","+j+"): " + actualValue + "actual: " + expected); + Assert.assertEquals(expected, actualValue, 1e-10); } } @@ -104,6 +104,8 @@ public void testUnaryOperation(boolean rewrite) heavyHittersContainsString(prefix + Opcodes.RBLK)); Assert.assertTrue("OOC wasn't used for CEIL", heavyHittersContainsString(prefix + Opcodes.CEIL)); + Assert.assertTrue("Stream Aware WRITE wasn't used", + heavyHittersContainsString(String.valueOf(Opcodes.WRITE))); } catch(Exception ex) { Assert.fail(ex.getMessage()); diff --git a/src/test/scripts/functions/ooc/Unary.dml b/src/test/scripts/functions/ooc/Unary.dml index 7f20e4133e4..404fc9b0100 100644 --- a/src/test/scripts/functions/ooc/Unary.dml +++ b/src/test/scripts/functions/ooc/Unary.dml @@ -21,9 +21,7 @@ # Read input matrix and operator from command line args X = read($1); -#print(toString(X)) Y = ceil(X); -#print(toString(Y)) -#res = as.matrix(sum(Y)); + # Write the final matrix result -write(Y, $2); +write(Y, $2, format="binary");