Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
import org.apache.sysds.runtime.compress.lib.CLALibSeparator;
import org.apache.sysds.runtime.compress.lib.CLALibSeparator.SeparatedGroups;
import org.apache.sysds.runtime.compress.lib.CLALibSlice;
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
import org.apache.sysds.runtime.instructions.spark.CompressionSPInstruction.CompressionFunction;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils;
import org.apache.sysds.runtime.io.FileFormatProperties;
import org.apache.sysds.runtime.io.IOUtilFunctions;
Expand Down Expand Up @@ -407,4 +409,9 @@ public Object call() throws Exception {

}

@Override
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format.");
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysds.runtime.data.TensorBlock;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.instructions.Instruction;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.io.FileFormatProperties;
import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
import org.apache.sysds.runtime.io.FileFormatPropertiesHDF5;
Expand All @@ -55,6 +57,8 @@
import org.apache.sysds.runtime.io.WriterHDF5;
import org.apache.sysds.runtime.io.WriterMatrixMarket;
import org.apache.sysds.runtime.io.WriterTextCSV;
import org.apache.sysds.runtime.io.MatrixWriterFactory;
import org.apache.sysds.runtime.io.MatrixWriter;
import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.runtime.lineage.LineageItemUtils;
import org.apache.sysds.runtime.lineage.LineageTraceable;
Expand Down Expand Up @@ -1060,6 +1064,35 @@ private void processWriteInstruction(ExecutionContext ec) {
HDFSTool.writeScalarToHDFS(ec.getScalarInput(getInput1()), fname);
}
else if( getInput1().getDataType() == DataType.MATRIX ) {
MatrixObject mo = ec.getMatrixObject(getInput1().getName());
int blen = Integer.parseInt(getInput4().getName());
LocalTaskQueue<IndexedMatrixValue> stream = mo.getStreamHandle();

if (stream != null) {

@j143 j143 Aug 3, 2025

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to recompilation, this is visited the second time. Since the stream is null the second time, it fails.

So, I tried setting the values explicitly. But, recompilation still happening.

mo.updateDataCharacteristics(mc);
HDFSTool.copyFileOnHDFS(fname, mo.getFileName());
mo.setDirty(false);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend to simple probe if a stream exists and not consume the matrix if it does.

@j143 j143 Aug 3, 2025

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I've not understood this part! :)


try {
MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(fmt);
long nrows = mo.getNumRows();
long ncols = mo.getNumColumns();

long totalNnz = writer.writeMatrixFromStream(fname, stream, nrows, ncols, blen);
MatrixCharacteristics mc = new MatrixCharacteristics(nrows, ncols, blen, totalNnz);
HDFSTool.writeMetaDataFile(fname + ".mtd", mo.getValueType(), mc, fmt);

// 1. Update the metadata of the MatrixObject in the symbol table.
mo.updateDataCharacteristics(mc);
System.out.println("MO characterstics updated to avoid recompilation");

// 2. Clear its dirty flag and update its file path to the result we just wrote.
// This tells the system that the data for this variable now lives in 'fname'.
HDFSTool.copyFileOnHDFS(fname, mo.getFileName());
mo.setDirty(false);

}
catch(Exception ex) {
throw new DMLRuntimeException("Failed to write OOC stream to " + fname, ex);
}
}
if( fmt == FileFormat.MM )
writeMMFile(ec, fname);
else if( fmt == FileFormat.CSV )
Expand All @@ -1070,8 +1103,6 @@ else if(fmt == FileFormat.HDF5)
writeHDF5File(ec, fname);
else {
// Default behavior (text, binary)
MatrixObject mo = ec.getMatrixObject(getInput1().getName());
int blen = Integer.parseInt(getInput4().getName());
mo.exportData(fname, fmtStr, new FileFormatProperties(blen));
}
}
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;

/**
Expand All @@ -42,6 +44,20 @@ public void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long cl

public abstract void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int blen, long nnz, boolean diag )
throws IOException;

/**
* Consumes an out-of-core stream of matrix blocks and writes them to a single file.
* This method must be implemented by writers that support OOC streaming output.
*
* @param fname The target output filename
* @param stream The OOC stream of matrix blocks to consume
* @param rlen The total number of rows in the matrix
* @param clen The total number of columns in the matrix
* @param blen The block size
* @throws IOException if an I/O error occurs
*/
public abstract long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream,
long rlen, long clen, int blen) throws IOException;

public void setForcedParallel(boolean par) {
_forcedParallel = par;
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.CompilerConfig.ConfigType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.util.HDFSTool;


public class WriterBinaryBlock extends MatrixWriter {
protected int _replication = -1;

Expand Down Expand Up @@ -228,4 +232,38 @@ protected final void writeDiagBinaryBlockMatrixToHDFS(Path path, JobConf job, M
IOUtilFunctions.closeSilently(writer);
}
}

@Override
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) throws IOException {
JobConf conf = ConfigurationManager.getCachedJobConf();
Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);

SequenceFile.Writer writer = null;

long totalNnz = 0;
try {
// 1. Create Sequence file writer for the final destination file writer = new SequenceFile.Writer(fs, conf, path, MatrixIndexes.class, MatrixBlock.class);
writer = SequenceFile.createWriter(fs, conf, path, MatrixIndexes.class, MatrixBlock.class);

// 2. Loop through OOC stream
IndexedMatrixValue i_val = null;
while((i_val = stream.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) {
MatrixBlock mb = (MatrixBlock) i_val.getValue();
MatrixIndexes ix = i_val.getIndexes();

// 3. Append (key, value) record as a new value in the file
writer.append(ix, mb);

totalNnz += mb.getNonZeros();
}

} catch (IOException | InterruptedException e) {
throw new DMLRuntimeException(e);
} finally {
IOUtilFunctions.closeSilently(writer);
}

return totalNnz;
}
}
7 changes: 7 additions & 0 deletions src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.io.hdf5.H5;
import org.apache.sysds.runtime.io.hdf5.H5RootObject;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
Expand Down Expand Up @@ -129,4 +131,9 @@ protected static void writeHDF5MatrixToFile(Path path, JobConf job, FileSystem f
IOUtilFunctions.closeSilently(bos);
}
}

@Override
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format.");
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.matrix.data.IJV;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.HDFSTool;
Expand Down Expand Up @@ -220,4 +222,9 @@ public static void mergeTextcellToMatrixMarket( String srcFileName, String fileN
throw new IOException(src.toString() + ": No such file or directory");
}
}

@Override
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the MatrixMarket format.");
};
}
7 changes: 7 additions & 0 deletions src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.HDFSTool;

Expand Down Expand Up @@ -341,4 +343,9 @@ public final void addHeaderToCSV(String srcFileName, String destFileName, long r
throw new IOException(srcFilePath.toString() + ": No such file or directory");
}
}

@Override
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the TextCSV format.");
};
}
7 changes: 7 additions & 0 deletions src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.matrix.data.IJV;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.HDFSTool;
Expand Down Expand Up @@ -137,4 +139,9 @@ protected static void writeTextCellMatrixToFile( Path path, JobConf job, FileSys
br.write(IOUtilFunctions.EMPTY_TEXT_LINE);
}
}

@Override
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the TextCell format.");
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.HDFSTool;

Expand Down Expand Up @@ -156,4 +158,9 @@ protected static void appendIndexValLibsvm(StringBuilder sb, int index, double v
sb.append(_props.getIndexDelim());
sb.append(value);
}

@Override
public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the LIBSVM format.");
};
}
22 changes: 17 additions & 5 deletions src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixValue;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.HashMap;

import static org.apache.sysds.test.TestUtils.readDMLMatrixFromHDFS;

public class UnaryTest extends AutomatedTestBase {

private static final String TEST_NAME = "Unary";
Expand Down Expand Up @@ -86,17 +90,17 @@ public void testUnaryOperation(boolean rewrite)

runTest(true, false, null, -1);

HashMap<MatrixValue.CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir(OUTPUT_NAME);
Double result = dmlfile.get(new MatrixValue.CellIndex(1, 1));
double[][] C1 = readMatrix(output(OUTPUT_NAME), FileFormat.BINARY, rows, cols, 1000, 1000);
double expected = 0.0;
double result = 0.0;
for(int i = 0; i < rows; i++) {
for(int j = 0; j < cols; j++) {
expected += Math.ceil(mb.get(i, j));
expected = Math.ceil(mb.get(i, j));
result = C1[i][j];
Assert.assertEquals(expected, result, 1e-10);
}
}

Assert.assertEquals(expected, result, 1e-10);

String prefix = Instruction.OOC_INST_PREFIX;
Assert.assertTrue("OOC wasn't used for RBLK",
heavyHittersContainsString(prefix + Opcodes.RBLK));
Expand All @@ -111,4 +115,12 @@ public void testUnaryOperation(boolean rewrite)
resetExecMode(platformOld);
}
}

private static double[][] readMatrix( String fname, FileFormat fmt, long rows, long cols, int brows, int bcols )
throws IOException
{
MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt, rows, cols, brows, bcols);
double[][] C = DataConverter.convertToDoubleMatrix(mb);
return C;
}
}
6 changes: 3 additions & 3 deletions src/test/scripts/functions/ooc/Unary.dml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
# Read input matrix and operator from command line args
X = read($1);
#print(toString(X))
Y = ceil(X);
res = ceil(X);
#print(toString(Y))
res = as.matrix(sum(Y));
#res = as.matrix(sum(Y));
# Write the final matrix result
write(res, $2);
write(res, $2, format="binary");
Loading