From 8832097ffac44406ab51517d9262a8849b0585d3 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sat, 2 Aug 2025 21:11:09 +0530 Subject: [PATCH 01/12] add abstract method writeMatrixFromStream --- .../runtime/compress/io/WriterCompressed.java | 7 +++++++ .../apache/sysds/runtime/io/MatrixWriter.java | 16 ++++++++++++++++ .../sysds/runtime/io/WriterBinaryBlock.java | 7 +++++++ .../org/apache/sysds/runtime/io/WriterHDF5.java | 7 +++++++ .../sysds/runtime/io/WriterMatrixMarket.java | 7 +++++++ .../apache/sysds/runtime/io/WriterTextCSV.java | 7 +++++++ .../apache/sysds/runtime/io/WriterTextCell.java | 7 +++++++ .../sysds/runtime/io/WriterTextLIBSVM.java | 7 +++++++ 8 files changed, 65 insertions(+) diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java index cf39ca6fba9..037bdbfc366 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java @@ -46,7 +46,9 @@ import org.apache.sysds.runtime.compress.lib.CLALibSeparator; import org.apache.sysds.runtime.compress.lib.CLALibSeparator.SeparatedGroups; import org.apache.sysds.runtime.compress.lib.CLALibSlice; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.instructions.spark.CompressionSPInstruction.CompressionFunction; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysds.runtime.io.FileFormatProperties; import org.apache.sysds.runtime.io.IOUtilFunctions; @@ -407,4 +409,9 @@ public Object call() throws Exception { } + @Override + public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); + }; + } diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java index 0f335477bdc..48b6a96f558 100644 --- a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java +++ b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java @@ -23,6 +23,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; /** @@ -42,6 +44,20 @@ public void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long cl public abstract void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int blen, long nnz, boolean diag ) throws IOException; + + /** + * Consumes an out-of-core stream of matrix blocks and writes them to a single file. + * This method must be implemented by writers that support OOC streaming output. + * + * @param fname The target output filename + * @param stream The OOC stream of matrix blocks to consume + * @param rlen The total number of rows in the matrix + * @param clen The total number of columns in the matrix + * @param blen The block size + * @throws IOException if an I/O error occurs + */ + public abstract void writeMatrixFromStream(String fname, LocalTaskQueue stream, + long rlen, long clen, int blen) throws IOException; public void setForcedParallel(boolean par) { _forcedParallel = par; diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java index e3dd3935c69..690d49e1ddf 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java @@ -30,6 +30,8 @@ import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.util.HDFSTool; @@ -228,4 +230,9 @@ protected final void writeDiagBinaryBlockMatrixToHDFS(Path path, JobConf job, M IOUtilFunctions.closeSilently(writer); } } + + @Override + public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); + }; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java index 34b34333e6f..17f25c9d85a 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java @@ -24,8 +24,10 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.io.hdf5.H5; import org.apache.sysds.runtime.io.hdf5.H5RootObject; import org.apache.sysds.runtime.matrix.data.MatrixBlock; @@ -129,4 +131,9 @@ protected static void writeHDF5MatrixToFile(Path path, JobConf job, FileSystem f IOUtilFunctions.closeSilently(bos); } } + + @Override + public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); + }; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java b/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java index d5eeabeb508..dc763167b5a 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java @@ -35,7 +35,9 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.IJV; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.HDFSTool; @@ -220,4 +222,9 @@ public static void mergeTextcellToMatrixMarket( String srcFileName, String fileN throw new IOException(src.toString() + ": No such file or directory"); } } + + @Override + public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); + }; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java b/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java index 4e3cf74239b..34f3759cd07 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java @@ -35,8 +35,10 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.HDFSTool; @@ -341,4 +343,9 @@ public final void addHeaderToCSV(String srcFileName, String destFileName, long r throw new IOException(srcFilePath.toString() + ": No such file or directory"); } } + + @Override + public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); + }; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java b/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java index f5dd64af8f2..da1ea2ebbb0 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java @@ -30,7 +30,9 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.IJV; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.HDFSTool; @@ -137,4 +139,9 @@ protected static void writeTextCellMatrixToFile( Path path, JobConf job, FileSys br.write(IOUtilFunctions.EMPTY_TEXT_LINE); } } + + @Override + public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); + }; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java b/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java index 125217a2a03..a410ceb9a6c 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java @@ -28,8 +28,10 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.HDFSTool; @@ -156,4 +158,9 @@ protected static void appendIndexValLibsvm(StringBuilder sb, int index, double v sb.append(_props.getIndexDelim()); sb.append(value); } + + @Override + public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); + }; } From daea90cdae187be62d3c6212ced3a8b2f3fa2dc5 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sat, 2 Aug 2025 23:21:39 +0530 Subject: [PATCH 02/12] implement writeMatrixFromStream. while loop calls mb.write(dostream) for every block. This results in a corrupted output file that looks like this: [Header for Block 1] [Data for Block 1] [Header for Block 2] [Data for Block 2] [Header for Block 3] [Data for Block 3] --- .../cp/VariableCPInstruction.java | 34 +++++++++++++------ .../sysds/runtime/io/WriterBinaryBlock.java | 26 ++++++++++++-- src/test/scripts/functions/ooc/Unary.dml | 6 ++-- 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java index 8400ec54e6f..09da39bd5ed 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java @@ -41,20 +41,14 @@ import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysds.runtime.controlprogram.caching.TensorObject; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysds.runtime.data.TensorBlock; import org.apache.sysds.runtime.frame.data.FrameBlock; import org.apache.sysds.runtime.instructions.Instruction; import org.apache.sysds.runtime.instructions.InstructionUtils; -import org.apache.sysds.runtime.io.FileFormatProperties; -import org.apache.sysds.runtime.io.FileFormatPropertiesCSV; -import org.apache.sysds.runtime.io.FileFormatPropertiesHDF5; -import org.apache.sysds.runtime.io.FileFormatPropertiesLIBSVM; -import org.apache.sysds.runtime.io.ListReader; -import org.apache.sysds.runtime.io.ListWriter; -import org.apache.sysds.runtime.io.WriterHDF5; -import org.apache.sysds.runtime.io.WriterMatrixMarket; -import org.apache.sysds.runtime.io.WriterTextCSV; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.io.*; import org.apache.sysds.runtime.lineage.LineageItem; import org.apache.sysds.runtime.lineage.LineageItemUtils; import org.apache.sysds.runtime.lineage.LineageTraceable; @@ -1060,6 +1054,24 @@ private void processWriteInstruction(ExecutionContext ec) { HDFSTool.writeScalarToHDFS(ec.getScalarInput(getInput1()), fname); } else if( getInput1().getDataType() == DataType.MATRIX ) { + MatrixObject mo = ec.getMatrixObject(getInput1().getName()); + int blen = Integer.parseInt(getInput4().getName()); + LocalTaskQueue stream = mo.getStreamHandle(); + + if (stream != null) { + + try { + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(fmt); + + writer.writeMatrixFromStream(fname, stream, mo.getNumRows(), mo.getNumColumns(), blen); + HDFSTool.writeMetaDataFile(fname + ".mtd", mo.getValueType(), + mo.getMetaData().getDataCharacteristics(), fmt); + + } + catch(Exception ex) { + throw new DMLRuntimeException("Failed to write OOC stream to " + fname, ex); + } + } if( fmt == FileFormat.MM ) writeMMFile(ec, fname); else if( fmt == FileFormat.CSV ) @@ -1070,8 +1082,8 @@ else if(fmt == FileFormat.HDF5) writeHDF5File(ec, fname); else { // Default behavior (text, binary) - MatrixObject mo = ec.getMatrixObject(getInput1().getName()); - int blen = Integer.parseInt(getInput4().getName()); +// MatrixObject mo = ec.getMatrixObject(getInput1().getName()); +// int blen = Integer.parseInt(getInput4().getName()); mo.exportData(fname, fmtStr, new FileFormatProperties(blen)); } } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java index 690d49e1ddf..ec0c7f76e18 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java @@ -19,6 +19,7 @@ package org.apache.sysds.runtime.io; +import java.io.DataOutputStream; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; @@ -36,6 +37,8 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.util.HDFSTool; +import static org.apache.sysds.runtime.util.HDFSTool.getHDFSDataOutputStream; + public class WriterBinaryBlock extends MatrixWriter { protected int _replication = -1; @@ -233,6 +236,25 @@ protected final void writeDiagBinaryBlockMatrixToHDFS(Path path, JobConf job, M @Override public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { - throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); - }; + DataOutputStream dostream = null; + try { + dostream = getHDFSDataOutputStream(fname, true); + dostream.writeLong(rlen); + dostream.writeLong(clen); + + IndexedMatrixValue i_val = null; + while((i_val = stream.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { + MatrixBlock mb = (MatrixBlock) i_val.getValue(); + mb.write(dostream); + } + + } catch (IOException ex) { + throw new RuntimeException(ex); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + IOUtilFunctions.closeSilently(dostream); + } + + }; } diff --git a/src/test/scripts/functions/ooc/Unary.dml b/src/test/scripts/functions/ooc/Unary.dml index 6d34e8fd763..24c0d98fb4c 100644 --- a/src/test/scripts/functions/ooc/Unary.dml +++ b/src/test/scripts/functions/ooc/Unary.dml @@ -22,8 +22,8 @@ # Read input matrix and operator from command line args X = read($1); #print(toString(X)) -Y = ceil(X); +res = ceil(X); #print(toString(Y)) -res = as.matrix(sum(Y)); +#res = as.matrix(sum(Y)); # Write the final matrix result -write(res, $2); +write(res, $2, format="binary"); From 49c1e5a033733d547fdc544c2336b9b0e0c23767 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 3 Aug 2025 12:34:04 +0530 Subject: [PATCH 03/12] implement 2 PASS method for write and merge with concat at present, Concat is not supported by ChecksumFileSystem --- .../sysds/runtime/io/WriterBinaryBlock.java | 58 ++++++++++++++++--- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java index ec0c7f76e18..27a22e1aa02 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java @@ -235,25 +235,67 @@ protected final void writeDiagBinaryBlockMatrixToHDFS(Path path, JobConf job, M } @Override - public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { - DataOutputStream dostream = null; - try { - dostream = getHDFSDataOutputStream(fname, true); - dostream.writeLong(rlen); - dostream.writeLong(clen); + public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) throws IOException { + DataOutputStream dostream_data = null; + DataOutputStream dostream_header = null; + + String tempDataFname = fname + "._data"; + String tempHeaderFname = fname + "._header"; + Path dataPath = new Path(tempDataFname); + Path headerPath = new Path(tempHeaderFname); + Path finalPath = new Path(fname); + + FileSystem fs = null; + try { + // PASS 1: Stream to a temporary raw data file and count NNZ + fs = IOUtilFunctions.getFileSystem(dataPath); +// dostream = getHDFSDataOutputStream(fname, true); + dostream_data = fs.create(dataPath, true); +// dostream_data.writeLong(rlen); +// dostream_data.writeLong(clen); + + long totalNnz = 0; IndexedMatrixValue i_val = null; while((i_val = stream.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { MatrixBlock mb = (MatrixBlock) i_val.getValue(); - mb.write(dostream); + totalNnz += mb.getNonZeros(); + + double[] denseValues = mb.getDenseBlockValues(); + if (denseValues != null) { + for (double v : denseValues) { + dostream_data.writeDouble(v); + } + } +// mb.write(dostream); } + IOUtilFunctions.closeSilently(dostream_data); + + // PASS 2: Create a header file in RAM (very small) + dostream_header = fs.create(headerPath, true); + dostream_header.writeLong(rlen); + dostream_header.writeLong(clen); + dostream_header.writeInt(blen); + dostream_header.writeBoolean(false); // isSparse + dostream_header.writeLong(totalNnz); + IOUtilFunctions.closeSilently(dostream_header); + + // MERGE STEP: Use HDFS concat for metadata-only merge + fs.concat(finalPath, new Path[]{dataPath, headerPath}); } catch (IOException ex) { throw new RuntimeException(ex); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { - IOUtilFunctions.closeSilently(dostream); + // Cleanup incase of failure before concat +// IOUtilFunctions.closeSilently(dostream_data); +// IOUtilFunctions.closeSilently(dostream_header); +// +// if (fs != null) { +// if (fs.exists(dataPath)) fs.delete(dataPath, false); +// if (fs.exists(headerPath)) fs.delete(headerPath, false); +// } } }; From 5aed9e7dfbfcca815d88a23fd10a1bb83258d31d Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 3 Aug 2025 13:05:14 +0530 Subject: [PATCH 04/12] implementation for data not supported fs case --- .../sysds/runtime/io/WriterBinaryBlock.java | 32 ++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java index 27a22e1aa02..f3bedbe7c70 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java @@ -19,9 +19,12 @@ package org.apache.sysds.runtime.io; +import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile.Writer; @@ -283,7 +286,34 @@ public void writeMatrixFromStream(String fname, LocalTaskQueue Date: Sun, 3 Aug 2025 13:37:13 +0530 Subject: [PATCH 05/12] return nnz value from writeMatrixFromStream and use it in writeMetaDataFile --- .../sysds/runtime/compress/io/WriterCompressed.java | 2 +- .../runtime/instructions/cp/VariableCPInstruction.java | 8 +++++--- .../java/org/apache/sysds/runtime/io/MatrixWriter.java | 2 +- .../org/apache/sysds/runtime/io/WriterBinaryBlock.java | 10 ++++------ .../java/org/apache/sysds/runtime/io/WriterHDF5.java | 2 +- .../apache/sysds/runtime/io/WriterMatrixMarket.java | 4 ++-- .../org/apache/sysds/runtime/io/WriterTextCSV.java | 4 ++-- .../org/apache/sysds/runtime/io/WriterTextCell.java | 4 ++-- .../org/apache/sysds/runtime/io/WriterTextLIBSVM.java | 4 ++-- 9 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java index 037bdbfc366..c4d9db367bb 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java @@ -410,7 +410,7 @@ public Object call() throws Exception { } @Override - public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { + public long writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); }; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java index 09da39bd5ed..1a4447eb995 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java @@ -1062,10 +1062,12 @@ else if( getInput1().getDataType() == DataType.MATRIX ) { try { MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(fmt); + long nrows = mo.getNumRows(); + long ncols = mo.getNumColumns(); - writer.writeMatrixFromStream(fname, stream, mo.getNumRows(), mo.getNumColumns(), blen); - HDFSTool.writeMetaDataFile(fname + ".mtd", mo.getValueType(), - mo.getMetaData().getDataCharacteristics(), fmt); + long totalNnz = writer.writeMatrixFromStream(fname, stream, nrows, ncols, blen); + MatrixCharacteristics mc = new MatrixCharacteristics(nrows, ncols, blen, totalNnz); + HDFSTool.writeMetaDataFile(fname + ".mtd", mo.getValueType(), mc, fmt); } catch(Exception ex) { diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java index 48b6a96f558..1844cc1af79 100644 --- a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java +++ b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java @@ -56,7 +56,7 @@ public abstract void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen * @param blen The block size * @throws IOException if an I/O error occurs */ - public abstract void writeMatrixFromStream(String fname, LocalTaskQueue stream, + public abstract long writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) throws IOException; public void setForcedParallel(boolean par) { diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java index f3bedbe7c70..6597ceb46c4 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java @@ -19,7 +19,6 @@ package org.apache.sysds.runtime.io; -import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -40,7 +39,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.util.HDFSTool; -import static org.apache.sysds.runtime.util.HDFSTool.getHDFSDataOutputStream; public class WriterBinaryBlock extends MatrixWriter { protected int _replication = -1; @@ -238,7 +236,7 @@ protected final void writeDiagBinaryBlockMatrixToHDFS(Path path, JobConf job, M } @Override - public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) throws IOException { + public long writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) throws IOException { DataOutputStream dostream_data = null; DataOutputStream dostream_header = null; @@ -249,7 +247,7 @@ public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { + public long writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); }; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java b/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java index dc763167b5a..39855968202 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java @@ -224,7 +224,7 @@ public static void mergeTextcellToMatrixMarket( String srcFileName, String fileN } @Override - public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { - throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); + public long writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the MatrixMarket format."); }; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java b/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java index 34f3759cd07..9bc1edace9d 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java @@ -345,7 +345,7 @@ public final void addHeaderToCSV(String srcFileName, String destFileName, long r } @Override - public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { - throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); + public long writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the TextCSV format."); }; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java b/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java index da1ea2ebbb0..b876f21752b 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java @@ -141,7 +141,7 @@ protected static void writeTextCellMatrixToFile( Path path, JobConf job, FileSys } @Override - public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { - throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); + public long writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the TextCell format."); }; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java b/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java index a410ceb9a6c..4a97abefc55 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java @@ -160,7 +160,7 @@ protected static void appendIndexValLibsvm(StringBuilder sb, int index, double v } @Override - public void writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { - throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); + public long writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the LIBSVM format."); }; } From 6c24e03047d9ed6e25d01974f9c194cce1a74be8 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 3 Aug 2025 14:20:43 +0530 Subject: [PATCH 06/12] filesystem files are written correctly UnaryTest/ in/ .X.crc .X.mtd.crc X X.mtd out/ .res._data.crc .res._header.crc .res.crc .res.mtd.crc res res._data res._header res.mtd --- src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java index 6597ceb46c4..79c25a179ff 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java @@ -263,6 +263,7 @@ public long writeMatrixFromStream(String fname, LocalTaskQueue Date: Sun, 3 Aug 2025 16:23:46 +0530 Subject: [PATCH 07/12] set matrixobject characterstics to avoid recompilation --- .../runtime/instructions/cp/VariableCPInstruction.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java index 1a4447eb995..ae81df08d30 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java @@ -1069,6 +1069,15 @@ else if( getInput1().getDataType() == DataType.MATRIX ) { MatrixCharacteristics mc = new MatrixCharacteristics(nrows, ncols, blen, totalNnz); HDFSTool.writeMetaDataFile(fname + ".mtd", mo.getValueType(), mc, fmt); + // 1. Update the metadata of the MatrixObject in the symbol table. + mo.updateDataCharacteristics(mc); + System.out.println("MO characterstics updated to avoid recompilation"); + + // 2. Clear its dirty flag and update its file path to the result we just wrote. + // This tells the system that the data for this variable now lives in 'fname'. + mo.setFileName(fname); + mo.setDirty(false); + } catch(Exception ex) { throw new DMLRuntimeException("Failed to write OOC stream to " + fname, ex); From 1a15b57a7b0ef7aff5743d67351ad58411a9bb52 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 3 Aug 2025 16:53:51 +0530 Subject: [PATCH 08/12] workaround: copy fname to scratch file for consistency --- .../sysds/runtime/instructions/cp/VariableCPInstruction.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java index ae81df08d30..bce9e987186 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java @@ -1075,7 +1075,8 @@ else if( getInput1().getDataType() == DataType.MATRIX ) { // 2. Clear its dirty flag and update its file path to the result we just wrote. // This tells the system that the data for this variable now lives in 'fname'. - mo.setFileName(fname); +// mo.setFileName(fname); + HDFSTool.copyFileOnHDFS(fname, mo.getFileName()); mo.setDirty(false); } From 8753168dc72de7ed05fc42113a8f748b8b3d0558 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 3 Aug 2025 20:52:13 +0530 Subject: [PATCH 09/12] address review comments, use sequenceFile --- .../sysds/runtime/io/WriterBinaryBlock.java | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java index 79c25a179ff..dde86af85a8 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java @@ -26,12 +26,14 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.mapred.JobConf; import org.apache.sysds.conf.CompilerConfig.ConfigType; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.DMLScriptException; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; @@ -237,6 +239,39 @@ protected final void writeDiagBinaryBlockMatrixToHDFS(Path path, JobConf job, M @Override public long writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) throws IOException { + JobConf conf = ConfigurationManager.getCachedJobConf(); + Path path = new Path(fname); + FileSystem fs = IOUtilFunctions.getFileSystem(path, conf); + + SequenceFile.Writer writer = null; + + long totalNnz = 0; + try { + // 1. Create Sequence file writer for the final destination file + writer = new SequenceFile.Writer(fs, conf, path, MatrixIndexes.class, MatrixBlock.class); + + // 2. Loop through OOC stream + IndexedMatrixValue i_val = null; + while((i_val = stream.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { + MatrixBlock mb = (MatrixBlock) i_val.getValue(); + MatrixIndexes ix = i_val.getIndexes(); + + // 3. Append (key, value) record as a new value in the file + writer.append(ix, mb); + + totalNnz += mb.getNonZeros(); + } + + } catch (IOException | InterruptedException e) { + throw new DMLRuntimeException(e); + } finally { + IOUtilFunctions.closeSilently(writer); + } + + return totalNnz; + } + + public long writeMatrixFromStream1(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) throws IOException { DataOutputStream dostream_data = null; DataOutputStream dostream_header = null; @@ -246,6 +281,7 @@ public long writeMatrixFromStream(String fname, LocalTaskQueue Date: Sun, 3 Aug 2025 21:14:30 +0530 Subject: [PATCH 10/12] use createWriter instead of SequenceFile.Writer --- .../java/org/apache/sysds/runtime/io/WriterBinaryBlock.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java index dde86af85a8..ddd8ea2ab60 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java @@ -248,7 +248,8 @@ public long writeMatrixFromStream(String fname, LocalTaskQueue Date: Sun, 3 Aug 2025 22:59:53 +0530 Subject: [PATCH 11/12] remove unused imports and code --- .../cp/VariableCPInstruction.java | 15 ++- .../sysds/runtime/io/WriterBinaryBlock.java | 103 +----------------- 2 files changed, 12 insertions(+), 106 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java index bce9e987186..bd40f253b4e 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java @@ -48,7 +48,17 @@ import org.apache.sysds.runtime.instructions.Instruction; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; -import org.apache.sysds.runtime.io.*; +import org.apache.sysds.runtime.io.FileFormatProperties; +import org.apache.sysds.runtime.io.FileFormatPropertiesCSV; +import org.apache.sysds.runtime.io.FileFormatPropertiesHDF5; +import org.apache.sysds.runtime.io.FileFormatPropertiesLIBSVM; +import org.apache.sysds.runtime.io.ListReader; +import org.apache.sysds.runtime.io.ListWriter; +import org.apache.sysds.runtime.io.WriterHDF5; +import org.apache.sysds.runtime.io.WriterMatrixMarket; +import org.apache.sysds.runtime.io.WriterTextCSV; +import org.apache.sysds.runtime.io.MatrixWriterFactory; +import org.apache.sysds.runtime.io.MatrixWriter; import org.apache.sysds.runtime.lineage.LineageItem; import org.apache.sysds.runtime.lineage.LineageItemUtils; import org.apache.sysds.runtime.lineage.LineageTraceable; @@ -1075,7 +1085,6 @@ else if( getInput1().getDataType() == DataType.MATRIX ) { // 2. Clear its dirty flag and update its file path to the result we just wrote. // This tells the system that the data for this variable now lives in 'fname'. -// mo.setFileName(fname); HDFSTool.copyFileOnHDFS(fname, mo.getFileName()); mo.setDirty(false); @@ -1094,8 +1103,6 @@ else if(fmt == FileFormat.HDF5) writeHDF5File(ec, fname); else { // Default behavior (text, binary) -// MatrixObject mo = ec.getMatrixObject(getInput1().getName()); -// int blen = Integer.parseInt(getInput4().getName()); mo.exportData(fname, fmtStr, new FileFormatProperties(blen)); } } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java index ddd8ea2ab60..a3798a45136 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java @@ -19,11 +19,8 @@ package org.apache.sysds.runtime.io; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -33,7 +30,6 @@ import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.DMLRuntimeException; -import org.apache.sysds.runtime.DMLScriptException; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; @@ -247,8 +243,7 @@ public long writeMatrixFromStream(String fname, LocalTaskQueue stream, long rlen, long clen, int blen) throws IOException { - DataOutputStream dostream_data = null; - DataOutputStream dostream_header = null; - - String tempDataFname = fname + "._data"; - String tempHeaderFname = fname + "._header"; - Path dataPath = new Path(tempDataFname); - Path headerPath = new Path(tempHeaderFname); - Path finalPath = new Path(fname); - - - FileSystem fs = null; - long totalNnz = 0; - try { - // PASS 1: Stream to a temporary raw data file and count NNZ - fs = IOUtilFunctions.getFileSystem(dataPath); -// dostream = getHDFSDataOutputStream(fname, true); - dostream_data = fs.create(dataPath, true); -// dostream_data.writeLong(rlen); -// dostream_data.writeLong(clen); - -// long totalNnz = 0; - IndexedMatrixValue i_val = null; - while((i_val = stream.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { - MatrixBlock mb = (MatrixBlock) i_val.getValue(); - totalNnz += mb.getNonZeros(); - - double[] denseValues = mb.getDenseBlockValues(); - System.out.println("totalNnz: " + totalNnz); - if (denseValues != null) { - for (double v : denseValues) { - dostream_data.writeDouble(v); - } - } -// mb.write(dostream); - } - IOUtilFunctions.closeSilently(dostream_data); - - // PASS 2: Create a header file in RAM (very small) - dostream_header = fs.create(headerPath, true); - dostream_header.writeLong(rlen); - dostream_header.writeLong(clen); - dostream_header.writeInt(blen); - dostream_header.writeBoolean(false); // isSparse - dostream_header.writeLong(totalNnz); - IOUtilFunctions.closeSilently(dostream_header); - - // MERGE STEP: Use HDFS concat for metadata-only merge - fs.concat(finalPath, new Path[]{dataPath, headerPath}); - System.out.println("merged file available"); - - } catch (UnsupportedOperationException ex) { - LOG.warn(ex.getMessage()); - System.out.println("concat is not available"); - - DataInputStream distream_header = null; - DataInputStream distream_data = null; - DataOutputStream dostream_final = null; - - try { - dostream_final = fs.create(finalPath, true); - - // 1. Copy header file content - distream_header = fs.open(headerPath); - IOUtils.copy(distream_header, dostream_final); - IOUtilFunctions.closeSilently(distream_header); - - // 2. Copy data file content - distream_data = fs.open(dataPath); - IOUtils.copy(distream_data, dostream_final); - IOUtilFunctions.closeSilently(distream_data); - } - finally { - IOUtilFunctions.closeSilently(dostream_final); - } - - -// throw new IOException("The filesystem doesn't support concat, required for OOC", ex); - } - catch (IOException ex) { - throw new RuntimeException(ex); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - // Cleanup incase of failure before concat -// IOUtilFunctions.closeSilently(dostream_data); -// IOUtilFunctions.closeSilently(dostream_header); -// -// if (fs != null) { -// if (fs.exists(dataPath)) fs.delete(dataPath, false); -// if (fs.exists(headerPath)) fs.delete(headerPath, false); -// } - } - return totalNnz; - }; } From 4995bfd5163be07437ac4aab960b9a99f59ecd04 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 3 Aug 2025 23:20:46 +0530 Subject: [PATCH 12/12] test successful for write operation --- .../sysds/test/functions/ooc/UnaryTest.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java index a81689af375..fc6f01a37f8 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java @@ -31,6 +31,7 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixValue; import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.util.DataConverter; import org.apache.sysds.runtime.util.HDFSTool; import org.apache.sysds.test.AutomatedTestBase; import org.apache.sysds.test.TestConfiguration; @@ -38,8 +39,11 @@ import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.HashMap; +import static org.apache.sysds.test.TestUtils.readDMLMatrixFromHDFS; + public class UnaryTest extends AutomatedTestBase { private static final String TEST_NAME = "Unary"; @@ -86,17 +90,17 @@ public void testUnaryOperation(boolean rewrite) runTest(true, false, null, -1); - HashMap dmlfile = readDMLMatrixFromOutputDir(OUTPUT_NAME); - Double result = dmlfile.get(new MatrixValue.CellIndex(1, 1)); + double[][] C1 = readMatrix(output(OUTPUT_NAME), FileFormat.BINARY, rows, cols, 1000, 1000); double expected = 0.0; + double result = 0.0; for(int i = 0; i < rows; i++) { for(int j = 0; j < cols; j++) { - expected += Math.ceil(mb.get(i, j)); + expected = Math.ceil(mb.get(i, j)); + result = C1[i][j]; + Assert.assertEquals(expected, result, 1e-10); } } - Assert.assertEquals(expected, result, 1e-10); - String prefix = Instruction.OOC_INST_PREFIX; Assert.assertTrue("OOC wasn't used for RBLK", heavyHittersContainsString(prefix + Opcodes.RBLK)); @@ -111,4 +115,12 @@ public void testUnaryOperation(boolean rewrite) resetExecMode(platformOld); } } + + private static double[][] readMatrix( String fname, FileFormat fmt, long rows, long cols, int brows, int bcols ) + throws IOException + { + MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt, rows, cols, brows, bcols); + double[][] C = DataConverter.convertToDoubleMatrix(mb); + return C; + } }