From 5e14328dd420463802834beab2facd78f405c3b7 Mon Sep 17 00:00:00 2001 From: Jannik Lindemann Date: Thu, 6 Nov 2025 17:19:10 +0100 Subject: [PATCH 1/3] [SYSTEMDS-3932] CSV reader for out-of-core streams --- .../org/apache/sysds/lops/CSVReBlock.java | 4 +- .../instructions/OOCInstructionParser.java | 3 + .../ooc/CSVReblockOOCInstruction.java | 590 ++++++++++++++++++ .../test/functions/ooc/CSVReaderTest.java | 140 +++++ src/test/scripts/functions/ooc/CSVReader.dml | 23 + 5 files changed, 758 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java create mode 100644 src/test/java/org/apache/sysds/test/functions/ooc/CSVReaderTest.java create mode 100644 src/test/scripts/functions/ooc/CSVReader.dml diff --git a/src/main/java/org/apache/sysds/lops/CSVReBlock.java b/src/main/java/org/apache/sysds/lops/CSVReBlock.java index b554b787713..5820a732ee9 100644 --- a/src/main/java/org/apache/sysds/lops/CSVReBlock.java +++ b/src/main/java/org/apache/sysds/lops/CSVReBlock.java @@ -44,8 +44,8 @@ public CSVReBlock(Lop input, int blen, DataType dt, ValueType vt, ExecType et) _blocksize = blen; - if(et == ExecType.SPARK) { - lps.setProperties( inputs, ExecType.SPARK); + if(et == ExecType.SPARK || et == ExecType.OOC) { + lps.setProperties( inputs, et ); } else { throw new LopsException("Incorrect execution type for CSVReblock:" + et); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java index 4e9a92ecb78..03c806b09ae 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java @@ -25,6 +25,7 @@ import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.instructions.ooc.AggregateUnaryOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.BinaryOOCInstruction; +import org.apache.sysds.runtime.instructions.ooc.CSVReblockOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.CentralMomentOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.CtableOOCInstruction; import org.apache.sysds.runtime.instructions.ooc.OOCInstruction; @@ -56,6 +57,8 @@ public static OOCInstruction parseSingleInstruction(InstructionType ooctype, Str switch(ooctype) { case Reblock: return ReblockOOCInstruction.parseInstruction(str); + case CSVReblock: + return CSVReblockOOCInstruction.parseInstruction(str); case AggregateUnary: return AggregateUnaryOOCInstruction.parseInstruction(str); case Unary: diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java new file mode 100644 index 00000000000..b7a2e72d3aa --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java @@ -0,0 +1,590 @@ +package org.apache.sysds.runtime.instructions.ooc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.sysds.common.Opcodes; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.instructions.InstructionUtils; +import org.apache.sysds.runtime.instructions.cp.CPOperand; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.io.FileFormatProperties; +import org.apache.sysds.runtime.io.FileFormatPropertiesCSV; +import org.apache.sysds.runtime.io.IOUtilFunctions; +import org.apache.sysds.runtime.io.MatrixReader; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixIndexes; +import org.apache.sysds.runtime.matrix.operators.Operator; +import org.apache.sysds.runtime.meta.DataCharacteristics; +import org.apache.sysds.runtime.util.UtilFunctions; + +public class CSVReblockOOCInstruction extends ComputationOOCInstruction { + private static final int MAX_BLOCKS_IN_CACHE = 40; + + private final int blen; + + private CSVReblockOOCInstruction(Operator op, CPOperand in, CPOperand out, int blocklength, String opcode, + String instr) { + super(OOCType.Reblock, op, in, out, opcode, instr); + blen = blocklength; + } + + public static CSVReblockOOCInstruction parseInstruction(String str) { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + String opcode = parts[0]; + if(!opcode.equals(Opcodes.CSVRBLK.toString())) + throw new DMLRuntimeException("Incorrect opcode for CSVReblockOOCInstruction:" + opcode); + + CPOperand in = new CPOperand(parts[1]); + CPOperand out = new CPOperand(parts[2]); + int blen = Integer.parseInt(parts[3]); + return new CSVReblockOOCInstruction(null, in, out, blen, opcode, str); + } + + @Override + public void processInstruction(ExecutionContext ec) { + MatrixObject min = ec.getMatrixObject(input1); + DataCharacteristics mc = ec.getDataCharacteristics(input1.getName()); + DataCharacteristics mcOut = ec.getDataCharacteristics(output.getName()); + mcOut.set(mc.getRows(), mc.getCols(), blen, mc.getNonZeros()); + + OOCStream qOut = createWritableStream(); + addOutStream(qOut); + + FileFormatProperties props = min.getFileFormatProperties(); + FileFormatPropertiesCSV csvProps = props instanceof FileFormatPropertiesCSV ? (FileFormatPropertiesCSV) props : new FileFormatPropertiesCSV(); + + final Path path = new Path(min.getFileName()); + final JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); + + try { + final FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + MatrixReader.checkValidInputFile(fs, path); + + final List files = collectInputFiles(fs, path); + + if(files.size() == 1) { + submitOOCTask(() -> { + readCSVBlock(qOut, files.get(0), csvProps); + qOut.closeInput(); + }, qOut); + } + else { + submitOOCTask(() -> { + try(MultiFileBufferedSeekableInput in = new MultiFileBufferedSeekableInput(fs, files)) { + readCSVBlock(qOut, in, csvProps); + } + catch(IOException ioe) { + throw new DMLRuntimeException(ioe); + } + qOut.closeInput(); + }, qOut); + } + + MatrixObject mout = ec.getMatrixObject(output); + mout.setStreamHandle(qOut); + } + catch(IOException e) { + throw new DMLRuntimeException(e); + } + } + + private static List collectInputFiles(FileSystem fs, Path path) throws IOException { + if(!fs.getFileStatus(path).isDirectory()) + return Collections.singletonList(path); + + final List files = new ArrayList<>(); + for(FileStatus stat : fs.listStatus(path, IOUtilFunctions.hiddenFileFilter)) + files.add(stat.getPath()); + Collections.sort(files); + return files; + } + + private void readCSVBlock(OOCStream qOut, Path path, FileFormatPropertiesCSV props) { + final JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); + + try { + final FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + MatrixReader.checkValidInputFile(fs, path); + + if(props.getDelim().length() != 1) + throw new DMLRuntimeException("Can only read CSVs with single char delimiters"); + + try(FSDataInputStream rawIn = fs.open(path); BufferedSeekableInput in = new BufferedSeekableInput(rawIn)) { + readCSVBlock(qOut, in, props); + } + } + catch(IOException ex) { + throw new DMLRuntimeException(ex); + } + } + + private void readCSVBlock(OOCStream qOut, SeekableInput in, FileFormatPropertiesCSV props) + throws IOException { + final int delim = props.getDelim().charAt(0); + final ColumnInfo columnInfo = detectColumnInfo(in, delim, props.hasHeader()); + final int ncols = columnInfo.ncols; + + if(ncols <= 0) + return; + + in.seek(columnInfo.dataStart); + + final int segLenMax = Math.min(MAX_BLOCKS_IN_CACHE * blen, ncols); + final int segLen = Math.min(segLenMax, ncols); + final boolean fill = props.isFill(); + final double fillValue = props.getFillValue(); + final Set naStrings = props.getNAStrings(); + + final List segStartList = new ArrayList<>(); + MatrixBlock[] firstBlocks = null; + DenseBlock[] firstDense = null; + int rowsInBand = 0; + int brow = 0; + int rowOffset = 0; + + while(true) { + int next = peek(in); + if(next == -1) + break; + + if(rowsInBand == 0) { + firstBlocks = allocateBlocks(blen, 0, segLen, ncols); + firstDense = extractDense(firstBlocks); + } + + final long nextSegPos = fillFirstSegmentRow(in, firstDense, rowsInBand, segLen, ncols, delim, fill, + fillValue, naStrings); + segStartList.add(nextSegPos); + + rowsInBand++; + if(rowsInBand == blen) { + emitBlocks(qOut, firstBlocks, rowsInBand, brow, 0); + firstBlocks = null; + firstDense = null; + + fillFollowingSegments(qOut, in, segStartList.subList(rowOffset, rowOffset + rowsInBand), segLen, ncols, + brow, rowsInBand, blen, delim, fill, fillValue, naStrings); + + rowsInBand = 0; + brow++; + rowOffset += blen; + } + } + + // Process remainder + if(rowsInBand > 0) { + emitBlocks(qOut, firstBlocks, rowsInBand, brow, 0); + fillFollowingSegments(qOut, in, segStartList.subList(rowOffset, rowOffset + rowsInBand), segLen, ncols, + brow, rowsInBand, blen, delim, fill, fillValue, naStrings); + } + } + + private void fillFollowingSegments(OOCStream qOut, SeekableInput in, List segStartList, + int segLen, int ncols, int brow, int rowsInBand, int blen, int delim, boolean fill, double fillValue, + Set naStrings) throws IOException { + for(int c0 = segLen; c0 < ncols; c0 += segLen) { + final int seg = Math.min(segLen, ncols - c0); + final int firstBlockCol = c0 / blen; + + //final int rows = rowsInBand; + final MatrixBlock[] blocks = allocateBlocks(rowsInBand, firstBlockCol, seg, ncols); + final DenseBlock[] dense = extractDense(blocks); + + for(int i = 0; i < rowsInBand; i++) { + in.seek(segStartList.get(i)); + int col = c0; + int read = 0; + + while(read < seg) { + final Token token = parseToken(in, delim, fill, fillValue, naStrings); + final int bci = (col / blen) - firstBlockCol; + final int within = col % blen; + dense[bci].set(i, within, token.value); + if(token.term == delim) + in.read(); + read++; + col++; + } + + segStartList.set(i, in.getPos()); + skipRestOfLineFast(in); + } + + emitBlocks(qOut, blocks, rowsInBand, brow, firstBlockCol); + } + } + + private static ColumnInfo detectColumnInfo(SeekableInput in, int delim, boolean hasHeader) throws IOException { + in.seek(0); + if(hasHeader) + skipRestOfLineFast(in); + + final long dataStart = in.getPos(); + int ch; + int ncols = 0; + boolean seenToken = false; + while((ch = in.read()) != -1) { + if(ch == delim) { + ncols++; + seenToken = false; + } + else if(ch == '\n') { + if(seenToken || ncols > 0) + ncols++; + break; + } + else if(ch == '\r') { + if(consumeLF(in)) + ch = '\n'; + if(seenToken || ncols > 0) + ncols++; + break; + } + else { + seenToken = true; + } + } + + if(ch == -1 && (seenToken || ncols > 0)) + ncols++; + + in.seek(dataStart); + return new ColumnInfo(ncols, dataStart); + } + + private long fillFirstSegmentRow(SeekableInput in, DenseBlock[] denseBlocks, int rowOffset, int segLen, int ncols, + int delim, boolean fill, double fillValue, Set naStrings) throws IOException { + int col = 0; + long nextSegPos = -1; + + while(col < segLen) { + final Token token = parseToken(in, delim, fill, fillValue, naStrings); + final int bci = col / blen; + final int within = col % blen; + denseBlocks[bci].set(rowOffset, within, token.value); + col++; + + if(token.term == delim) { + in.read(); + if(col == segLen) + nextSegPos = in.getPos(); + continue; + } + + nextSegPos = in.getPos(); + break; + } + + if(nextSegPos < 0) + nextSegPos = in.getPos(); + + skipRestOfLineFast(in); + return nextSegPos; + } + + private MatrixBlock[] allocateBlocks(int rows, int firstBlockCol, int segLen, int ncols) { + final int c0 = firstBlockCol * blen; + final int lastBlockCol = (c0 + segLen - 1) / blen; + final int numBlocks = lastBlockCol - firstBlockCol + 1; + final MatrixBlock[] blocks = new MatrixBlock[numBlocks]; + + for(int bci = 0; bci < numBlocks; bci++) { + final int bcol = firstBlockCol + bci; + final int cStart = bcol * blen; + final int cEnd = Math.min(ncols, cStart + blen); + final MatrixBlock block = new MatrixBlock(rows, cEnd - cStart, false); + block.allocateDenseBlock(); + blocks[bci] = block; + } + + return blocks; + } + + private DenseBlock[] extractDense(MatrixBlock[] blocks) { + final DenseBlock[] dense = new DenseBlock[blocks.length]; + for(int i = 0; i < blocks.length; i++) { + final DenseBlock db = blocks[i].getDenseBlock(); + dense[i] = db; + } + return dense; + } + + private void emitBlocks(OOCStream qOut, MatrixBlock[] blocks, int rowsInBand, int brow, + int firstBlockCol) { + for(int bci = 0; bci < blocks.length; bci++) { + MatrixBlock block = blocks[bci]; + + if(block.getNumRows() != rowsInBand) + block = block.slice(0, rowsInBand - 1, 0, block.getNumColumns() - 1); + + block.recomputeNonZeros(); + block.examSparsity(); + final MatrixIndexes idx = new MatrixIndexes(brow + 1, firstBlockCol + bci + 1); + qOut.enqueue(new IndexedMatrixValue(idx, block)); + } + } + + private static Token parseToken(SeekableInput in, int delim, boolean fill, double fillValue, Set naStrings) + throws IOException { + int ch; + do { + ch = in.read(); + if(ch == -1) + throw new DMLRuntimeException("Unexpected EOF in CSV token"); + } + while(ch == ' ' || ch == '\t'); + + final StringBuilder buf = new StringBuilder(32); + while(ch != -1 && ch != delim && ch != '\n' && ch != '\r') { + buf.append((char) ch); + ch = in.read(); + } + if(ch != -1) + in.seek(in.getPos() - 1); + + int len = buf.length(); + while(len > 0 && (buf.charAt(len - 1) == ' ' || buf.charAt(len - 1) == '\t')) + buf.setLength(--len); + + final double value; + if(len == 0) { + if(fill) + value = fillValue; + else + throw new DMLRuntimeException("Empty value in CSV input"); + } + else { + value = UtilFunctions.parseToDouble(buf.toString(), naStrings); + } + + return new Token(value, ch); + } + + private static void skipRestOfLineFast(SeekableInput in) throws IOException { + int ch; + while((ch = in.read()) != -1) { + if(ch == '\n') + return; + if(ch == '\r') { + consumeLF(in); + return; + } + } + } + + private static boolean consumeLF(SeekableInput in) throws IOException { + final long pos = in.getPos(); + final int next = in.read(); + if(next == '\n') + return true; + if(next != -1) + in.seek(pos); + return false; + } + + private static int peek(SeekableInput in) throws IOException { + final long pos = in.getPos(); + final int ch = in.read(); + if(ch != -1) + in.seek(pos); + return ch; + } + + private interface SeekableInput extends AutoCloseable { + int read() throws IOException; + + void seek(long pos) throws IOException; + + long getPos(); + + @Override + void close() throws IOException; + } + + private static final class MultiFileBufferedSeekableInput implements SeekableInput { + private final FileSystem fs; + private final List files; + private final long[] offsets; + private final long totalLength; + private final BufferedSeekableInput[] streams; + + private int currentIdx; + private BufferedSeekableInput current; + private long position; + + private MultiFileBufferedSeekableInput(FileSystem fs, List files) throws IOException { + if(files.isEmpty()) + throw new DMLRuntimeException("No CSV files to read"); + this.fs = fs; + this.files = files; + offsets = new long[files.size()]; + streams = new BufferedSeekableInput[files.size()]; + long offset = 0; + for(int i = 0; i < files.size(); i++) { + offsets[i] = offset; + offset += fs.getFileStatus(files.get(i)).getLen(); + } + totalLength = offset; + currentIdx = 0; + openIfNeeded(0); + current = streams[0]; + position = 0; + } + + private void openIfNeeded(int idx) throws IOException { + if(idx >= files.size()) + return; + + if(streams[idx] == null) + streams[idx] = new BufferedSeekableInput(fs.open(files.get(idx))); + } + + @Override + public int read() throws IOException { + if(current == null) + return -1; + int b = current.read(); + while(b == -1 && currentIdx + 1 < files.size()) { + currentIdx++; + openIfNeeded(currentIdx); + current = streams[currentIdx]; + current.seek(0); + b = current.read(); + } + if(b != -1) + position++; + return b; + } + + @Override + public void seek(long pos) throws IOException { + if(pos < 0 || pos > totalLength) + throw new IOException("Seek position out of range: " + pos); + if(pos == totalLength) { + currentIdx = files.size(); + current = null; + position = pos; + return; + } + int idx = findFile(pos); + openIfNeeded(idx); + currentIdx = idx; + current = streams[idx]; + current.seek(pos - offsets[idx]); + position = pos; + } + + @Override + public long getPos() { + return position; + } + + @Override + public void close() throws IOException { + for(BufferedSeekableInput stream : streams) { + if(stream != null) + stream.close(); + } + } + + private int findFile(long pos) { + for(int i = 0; i < offsets.length - 1; i++) { + if(pos < offsets[i + 1]) + return i; + } + return offsets.length - 1; + } + } + + private static final class BufferedSeekableInput implements SeekableInput { + private static final int BUF_SIZE = 4 * 1024; + + private final FSDataInputStream in; + private final byte[] buf = new byte[BUF_SIZE]; + private long bufStart = 0; + private int bufLen = 0; + private int bufPos = 0; + + private BufferedSeekableInput(FSDataInputStream in) { + this.in = in; + } + + @Override + public int read() throws IOException { + if(bufPos >= bufLen) { + if(!fill()) + return -1; + } + return buf[bufPos++] & 0xFF; + } + + private boolean fill() throws IOException { + bufStart = in.getPos(); + bufLen = in.read(buf, 0, BUF_SIZE); + if(bufLen <= 0) { + bufLen = 0; + bufPos = 0; + return false; + } + bufPos = 0; + return true; + } + + @Override + public void seek(long pos) throws IOException { + final long bufEnd = bufStart + bufLen; + if(pos >= bufStart && pos < bufEnd) { + bufPos = (int) (pos - bufStart); + } + else { + in.seek(pos); + bufStart = pos; + bufLen = 0; + bufPos = 0; + } + } + + @Override + public long getPos() { + return bufStart + bufPos; + } + + @Override + public void close() throws IOException { + in.close(); + } + } + + private static final class ColumnInfo { + private final int ncols; + private final long dataStart; + + private ColumnInfo(int ncols, long dataStart) { + this.ncols = ncols; + this.dataStart = dataStart; + } + } + + private static final class Token { + private final double value; + private final int term; + + private Token(double value, int term) { + this.value = value; + this.term = term; + } + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/CSVReaderTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/CSVReaderTest.java new file mode 100644 index 00000000000..5f5f7fb42f6 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/ooc/CSVReaderTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.ooc; + +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.MatrixWriterFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +import java.io.IOException; + +public class CSVReaderTest extends AutomatedTestBase { + private final static String TEST_NAME1 = "CSVReader"; + private final static String TEST_DIR = "functions/ooc/"; + private final static String TEST_CLASS_DIR = TEST_DIR + CSVReaderTest.class.getSimpleName() + "/"; + private final static double eps = 1e-8; + private static final String INPUT_NAME = "X"; + private static final String OUTPUT_NAME = "res"; + + private final static int maxVal = 7; + private final static double sparsity1 = 0.65; + private final static double sparsity2 = 0.05; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + TestConfiguration config = new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1); + addTestConfiguration(TEST_NAME1, config); + } + + @Test + public void testCSVReaderDense1() { + runCSVReaderTest(false, 1800, 1100); + } + + @Test + public void testCSVReaderSparse1() { + runCSVReaderTest(true, 1800, 1100); + } + + @Test + public void testCSVReaderDenseWide() { + runCSVReaderTest(false, 50, 12100); + } + + @Test + public void testCSVReaderSparseWide() { + runCSVReaderTest(true, 500, 50000); + } + + @Test + public void testCSVReaderDenseUltraWide() { + runCSVReaderTest(false, 50, 200000); + } + + @Test + public void testCSVReaderDenseLarge() { + runCSVReaderTest(false, 750, 50000); + } + + @Test + public void testCSVReaderSparseLarge() { + runCSVReaderTest(true, 500, 50000); + } + + @Test + public void testCSVReaderDenseLarge2() { + runCSVReaderTest(false, 1200, 25000); + } + + private void runCSVReaderTest(boolean sparse, int rows, int cols) { + Types.ExecMode platformOld = setExecMode(Types.ExecMode.SINGLE_NODE); + + try { + getAndLoadTestConfiguration(TEST_NAME1); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; + programArgs = new String[] {"-explain", "-stats", "-ooc", "-args", input(INPUT_NAME), output(OUTPUT_NAME)}; + + // 1. Generate the data in-memory as MatrixBlock objects + double[][] A_data = getRandomMatrix(rows, cols, 1, maxVal, sparse ? sparsity2 : sparsity1, 7); + + // 2. Convert the double arrays to MatrixBlock objects + MatrixBlock A_mb = DataConverter.convertToMatrixBlock(A_data); + + // 3. Create a binary matrix writer + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(Types.FileFormat.CSV); + + // 4. Write matrix A to a binary SequenceFile + writer.writeMatrixToHDFS(A_mb, input(INPUT_NAME), rows, cols, 1000, A_mb.getNonZeros()); + HDFSTool.writeMetaDataFile(input(INPUT_NAME + ".mtd"), Types.ValueType.FP64, + new MatrixCharacteristics(rows, cols, 1000, A_mb.getNonZeros()), Types.FileFormat.CSV); + + runTest(true, false, null, -1); + + //compare results + + // rerun without ooc flag + programArgs = new String[] {"-explain", "-stats", "-args", input(INPUT_NAME), + output(OUTPUT_NAME + "_target")}; + runTest(true, false, null, -1); + + // compare matrices + MatrixBlock ret1 = DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME), Types.FileFormat.BINARY, rows, cols, 1000); + MatrixBlock ret2 = DataConverter.readMatrixFromHDFS(output(OUTPUT_NAME + "_target"), Types.FileFormat.BINARY, rows, cols, 1000); + TestUtils.compareMatrices(ret1, ret2, eps); + } + catch(IOException e) { + throw new RuntimeException(e); + } + finally { + resetExecMode(platformOld); + } + } +} diff --git a/src/test/scripts/functions/ooc/CSVReader.dml b/src/test/scripts/functions/ooc/CSVReader.dml new file mode 100644 index 00000000000..12e5b02cd0c --- /dev/null +++ b/src/test/scripts/functions/ooc/CSVReader.dml @@ -0,0 +1,23 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- +A = read($1); + +write(A, $2, format="binary"); From c0e87a40337141456225773a7d6cfa3e2234144d Mon Sep 17 00:00:00 2001 From: Jannik Lindemann Date: Tue, 11 Nov 2025 14:12:50 +0100 Subject: [PATCH 2/3] Add Missing License --- .../ooc/CSVReblockOOCInstruction.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java index b7a2e72d3aa..0a70469f11c 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.sysds.runtime.instructions.ooc; import java.io.IOException; From 76da4d810a4970ec557d9c16f902cf634a5ee3eb Mon Sep 17 00:00:00 2001 From: Jannik Lindemann <52833175+janniklinde@users.noreply.github.com> Date: Fri, 14 Nov 2025 10:31:21 +0100 Subject: [PATCH 3/3] CSV Reader Simplification --- .../ooc/CSVReblockOOCInstruction.java | 553 +----------------- .../runtime/io/ReaderTextCSVParallel.java | 349 +++++++++-- 2 files changed, 313 insertions(+), 589 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java index 0a70469f11c..a4f8c497050 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/CSVReblockOOCInstruction.java @@ -19,39 +19,20 @@ package org.apache.sysds.runtime.instructions.ooc; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; import org.apache.sysds.common.Opcodes; -import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; -import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CPOperand; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.io.FileFormatProperties; import org.apache.sysds.runtime.io.FileFormatPropertiesCSV; -import org.apache.sysds.runtime.io.IOUtilFunctions; -import org.apache.sysds.runtime.io.MatrixReader; -import org.apache.sysds.runtime.matrix.data.MatrixBlock; -import org.apache.sysds.runtime.matrix.data.MatrixIndexes; +import org.apache.sysds.runtime.io.ReaderTextCSVParallel; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.meta.DataCharacteristics; -import org.apache.sysds.runtime.util.UtilFunctions; public class CSVReblockOOCInstruction extends ComputationOOCInstruction { - private static final int MAX_BLOCKS_IN_CACHE = 40; - private final int blen; private CSVReblockOOCInstruction(Operator op, CPOperand in, CPOperand out, int blocklength, String opcode, @@ -83,527 +64,25 @@ public void processInstruction(ExecutionContext ec) { addOutStream(qOut); FileFormatProperties props = min.getFileFormatProperties(); - FileFormatPropertiesCSV csvProps = props instanceof FileFormatPropertiesCSV ? (FileFormatPropertiesCSV) props : new FileFormatPropertiesCSV(); - - final Path path = new Path(min.getFileName()); - final JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - - try { - final FileSystem fs = IOUtilFunctions.getFileSystem(path, job); - MatrixReader.checkValidInputFile(fs, path); - - final List files = collectInputFiles(fs, path); - - if(files.size() == 1) { - submitOOCTask(() -> { - readCSVBlock(qOut, files.get(0), csvProps); - qOut.closeInput(); - }, qOut); - } - else { - submitOOCTask(() -> { - try(MultiFileBufferedSeekableInput in = new MultiFileBufferedSeekableInput(fs, files)) { - readCSVBlock(qOut, in, csvProps); - } - catch(IOException ioe) { - throw new DMLRuntimeException(ioe); - } - qOut.closeInput(); - }, qOut); - } - - MatrixObject mout = ec.getMatrixObject(output); - mout.setStreamHandle(qOut); - } - catch(IOException e) { - throw new DMLRuntimeException(e); - } - } - - private static List collectInputFiles(FileSystem fs, Path path) throws IOException { - if(!fs.getFileStatus(path).isDirectory()) - return Collections.singletonList(path); - - final List files = new ArrayList<>(); - for(FileStatus stat : fs.listStatus(path, IOUtilFunctions.hiddenFileFilter)) - files.add(stat.getPath()); - Collections.sort(files); - return files; - } - - private void readCSVBlock(OOCStream qOut, Path path, FileFormatPropertiesCSV props) { - final JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - - try { - final FileSystem fs = IOUtilFunctions.getFileSystem(path, job); - MatrixReader.checkValidInputFile(fs, path); - - if(props.getDelim().length() != 1) - throw new DMLRuntimeException("Can only read CSVs with single char delimiters"); - - try(FSDataInputStream rawIn = fs.open(path); BufferedSeekableInput in = new BufferedSeekableInput(rawIn)) { - readCSVBlock(qOut, in, props); - } - } - catch(IOException ex) { - throw new DMLRuntimeException(ex); - } - } - - private void readCSVBlock(OOCStream qOut, SeekableInput in, FileFormatPropertiesCSV props) - throws IOException { - final int delim = props.getDelim().charAt(0); - final ColumnInfo columnInfo = detectColumnInfo(in, delim, props.hasHeader()); - final int ncols = columnInfo.ncols; - - if(ncols <= 0) - return; - - in.seek(columnInfo.dataStart); - - final int segLenMax = Math.min(MAX_BLOCKS_IN_CACHE * blen, ncols); - final int segLen = Math.min(segLenMax, ncols); - final boolean fill = props.isFill(); - final double fillValue = props.getFillValue(); - final Set naStrings = props.getNAStrings(); - - final List segStartList = new ArrayList<>(); - MatrixBlock[] firstBlocks = null; - DenseBlock[] firstDense = null; - int rowsInBand = 0; - int brow = 0; - int rowOffset = 0; - - while(true) { - int next = peek(in); - if(next == -1) - break; - - if(rowsInBand == 0) { - firstBlocks = allocateBlocks(blen, 0, segLen, ncols); - firstDense = extractDense(firstBlocks); - } - - final long nextSegPos = fillFirstSegmentRow(in, firstDense, rowsInBand, segLen, ncols, delim, fill, - fillValue, naStrings); - segStartList.add(nextSegPos); - - rowsInBand++; - if(rowsInBand == blen) { - emitBlocks(qOut, firstBlocks, rowsInBand, brow, 0); - firstBlocks = null; - firstDense = null; - - fillFollowingSegments(qOut, in, segStartList.subList(rowOffset, rowOffset + rowsInBand), segLen, ncols, - brow, rowsInBand, blen, delim, fill, fillValue, naStrings); - - rowsInBand = 0; - brow++; - rowOffset += blen; - } - } - - // Process remainder - if(rowsInBand > 0) { - emitBlocks(qOut, firstBlocks, rowsInBand, brow, 0); - fillFollowingSegments(qOut, in, segStartList.subList(rowOffset, rowOffset + rowsInBand), segLen, ncols, - brow, rowsInBand, blen, delim, fill, fillValue, naStrings); - } - } - - private void fillFollowingSegments(OOCStream qOut, SeekableInput in, List segStartList, - int segLen, int ncols, int brow, int rowsInBand, int blen, int delim, boolean fill, double fillValue, - Set naStrings) throws IOException { - for(int c0 = segLen; c0 < ncols; c0 += segLen) { - final int seg = Math.min(segLen, ncols - c0); - final int firstBlockCol = c0 / blen; - - //final int rows = rowsInBand; - final MatrixBlock[] blocks = allocateBlocks(rowsInBand, firstBlockCol, seg, ncols); - final DenseBlock[] dense = extractDense(blocks); - - for(int i = 0; i < rowsInBand; i++) { - in.seek(segStartList.get(i)); - int col = c0; - int read = 0; - - while(read < seg) { - final Token token = parseToken(in, delim, fill, fillValue, naStrings); - final int bci = (col / blen) - firstBlockCol; - final int within = col % blen; - dense[bci].set(i, within, token.value); - if(token.term == delim) - in.read(); - read++; - col++; - } - - segStartList.set(i, in.getPos()); - skipRestOfLineFast(in); - } - - emitBlocks(qOut, blocks, rowsInBand, brow, firstBlockCol); - } - } - - private static ColumnInfo detectColumnInfo(SeekableInput in, int delim, boolean hasHeader) throws IOException { - in.seek(0); - if(hasHeader) - skipRestOfLineFast(in); - - final long dataStart = in.getPos(); - int ch; - int ncols = 0; - boolean seenToken = false; - while((ch = in.read()) != -1) { - if(ch == delim) { - ncols++; - seenToken = false; - } - else if(ch == '\n') { - if(seenToken || ncols > 0) - ncols++; - break; - } - else if(ch == '\r') { - if(consumeLF(in)) - ch = '\n'; - if(seenToken || ncols > 0) - ncols++; - break; - } - else { - seenToken = true; - } - } - - if(ch == -1 && (seenToken || ncols > 0)) - ncols++; - - in.seek(dataStart); - return new ColumnInfo(ncols, dataStart); - } - - private long fillFirstSegmentRow(SeekableInput in, DenseBlock[] denseBlocks, int rowOffset, int segLen, int ncols, - int delim, boolean fill, double fillValue, Set naStrings) throws IOException { - int col = 0; - long nextSegPos = -1; - - while(col < segLen) { - final Token token = parseToken(in, delim, fill, fillValue, naStrings); - final int bci = col / blen; - final int within = col % blen; - denseBlocks[bci].set(rowOffset, within, token.value); - col++; - - if(token.term == delim) { - in.read(); - if(col == segLen) - nextSegPos = in.getPos(); - continue; - } - - nextSegPos = in.getPos(); - break; - } - - if(nextSegPos < 0) - nextSegPos = in.getPos(); - - skipRestOfLineFast(in); - return nextSegPos; - } - - private MatrixBlock[] allocateBlocks(int rows, int firstBlockCol, int segLen, int ncols) { - final int c0 = firstBlockCol * blen; - final int lastBlockCol = (c0 + segLen - 1) / blen; - final int numBlocks = lastBlockCol - firstBlockCol + 1; - final MatrixBlock[] blocks = new MatrixBlock[numBlocks]; - - for(int bci = 0; bci < numBlocks; bci++) { - final int bcol = firstBlockCol + bci; - final int cStart = bcol * blen; - final int cEnd = Math.min(ncols, cStart + blen); - final MatrixBlock block = new MatrixBlock(rows, cEnd - cStart, false); - block.allocateDenseBlock(); - blocks[bci] = block; - } - - return blocks; - } - - private DenseBlock[] extractDense(MatrixBlock[] blocks) { - final DenseBlock[] dense = new DenseBlock[blocks.length]; - for(int i = 0; i < blocks.length; i++) { - final DenseBlock db = blocks[i].getDenseBlock(); - dense[i] = db; - } - return dense; - } - - private void emitBlocks(OOCStream qOut, MatrixBlock[] blocks, int rowsInBand, int brow, - int firstBlockCol) { - for(int bci = 0; bci < blocks.length; bci++) { - MatrixBlock block = blocks[bci]; - - if(block.getNumRows() != rowsInBand) - block = block.slice(0, rowsInBand - 1, 0, block.getNumColumns() - 1); - - block.recomputeNonZeros(); - block.examSparsity(); - final MatrixIndexes idx = new MatrixIndexes(brow + 1, firstBlockCol + bci + 1); - qOut.enqueue(new IndexedMatrixValue(idx, block)); - } - } - - private static Token parseToken(SeekableInput in, int delim, boolean fill, double fillValue, Set naStrings) - throws IOException { - int ch; - do { - ch = in.read(); - if(ch == -1) - throw new DMLRuntimeException("Unexpected EOF in CSV token"); - } - while(ch == ' ' || ch == '\t'); - - final StringBuilder buf = new StringBuilder(32); - while(ch != -1 && ch != delim && ch != '\n' && ch != '\r') { - buf.append((char) ch); - ch = in.read(); - } - if(ch != -1) - in.seek(in.getPos() - 1); - - int len = buf.length(); - while(len > 0 && (buf.charAt(len - 1) == ' ' || buf.charAt(len - 1) == '\t')) - buf.setLength(--len); - - final double value; - if(len == 0) { - if(fill) - value = fillValue; - else - throw new DMLRuntimeException("Empty value in CSV input"); - } - else { - value = UtilFunctions.parseToDouble(buf.toString(), naStrings); - } - - return new Token(value, ch); - } - - private static void skipRestOfLineFast(SeekableInput in) throws IOException { - int ch; - while((ch = in.read()) != -1) { - if(ch == '\n') - return; - if(ch == '\r') { - consumeLF(in); - return; - } - } - } - - private static boolean consumeLF(SeekableInput in) throws IOException { - final long pos = in.getPos(); - final int next = in.read(); - if(next == '\n') - return true; - if(next != -1) - in.seek(pos); - return false; - } - - private static int peek(SeekableInput in) throws IOException { - final long pos = in.getPos(); - final int ch = in.read(); - if(ch != -1) - in.seek(pos); - return ch; - } - - private interface SeekableInput extends AutoCloseable { - int read() throws IOException; - - void seek(long pos) throws IOException; - - long getPos(); - - @Override - void close() throws IOException; - } + final FileFormatPropertiesCSV csvProps = props instanceof FileFormatPropertiesCSV ? (FileFormatPropertiesCSV) props + : new FileFormatPropertiesCSV(); - private static final class MultiFileBufferedSeekableInput implements SeekableInput { - private final FileSystem fs; - private final List files; - private final long[] offsets; - private final long totalLength; - private final BufferedSeekableInput[] streams; + final ReaderTextCSVParallel reader = new ReaderTextCSVParallel(csvProps); + final String fileName = min.getFileName(); + final long rows = mc.getRows(); + final long cols = mc.getCols(); + final long nnz = mc.getNonZeros(); - private int currentIdx; - private BufferedSeekableInput current; - private long position; - - private MultiFileBufferedSeekableInput(FileSystem fs, List files) throws IOException { - if(files.isEmpty()) - throw new DMLRuntimeException("No CSV files to read"); - this.fs = fs; - this.files = files; - offsets = new long[files.size()]; - streams = new BufferedSeekableInput[files.size()]; - long offset = 0; - for(int i = 0; i < files.size(); i++) { - offsets[i] = offset; - offset += fs.getFileStatus(files.get(i)).getLen(); - } - totalLength = offset; - currentIdx = 0; - openIfNeeded(0); - current = streams[0]; - position = 0; - } - - private void openIfNeeded(int idx) throws IOException { - if(idx >= files.size()) - return; - - if(streams[idx] == null) - streams[idx] = new BufferedSeekableInput(fs.open(files.get(idx))); - } - - @Override - public int read() throws IOException { - if(current == null) - return -1; - int b = current.read(); - while(b == -1 && currentIdx + 1 < files.size()) { - currentIdx++; - openIfNeeded(currentIdx); - current = streams[currentIdx]; - current.seek(0); - b = current.read(); + submitOOCTask(() -> { + try { + reader.readMatrixAsStream(qOut, fileName, rows, cols, blen, nnz); } - if(b != -1) - position++; - return b; - } - - @Override - public void seek(long pos) throws IOException { - if(pos < 0 || pos > totalLength) - throw new IOException("Seek position out of range: " + pos); - if(pos == totalLength) { - currentIdx = files.size(); - current = null; - position = pos; - return; + catch(Exception ex) { + throw (ex instanceof DMLRuntimeException) ? (DMLRuntimeException) ex : new DMLRuntimeException(ex); } - int idx = findFile(pos); - openIfNeeded(idx); - currentIdx = idx; - current = streams[idx]; - current.seek(pos - offsets[idx]); - position = pos; - } - - @Override - public long getPos() { - return position; - } - - @Override - public void close() throws IOException { - for(BufferedSeekableInput stream : streams) { - if(stream != null) - stream.close(); - } - } - - private int findFile(long pos) { - for(int i = 0; i < offsets.length - 1; i++) { - if(pos < offsets[i + 1]) - return i; - } - return offsets.length - 1; - } - } - - private static final class BufferedSeekableInput implements SeekableInput { - private static final int BUF_SIZE = 4 * 1024; - - private final FSDataInputStream in; - private final byte[] buf = new byte[BUF_SIZE]; - private long bufStart = 0; - private int bufLen = 0; - private int bufPos = 0; - - private BufferedSeekableInput(FSDataInputStream in) { - this.in = in; - } - - @Override - public int read() throws IOException { - if(bufPos >= bufLen) { - if(!fill()) - return -1; - } - return buf[bufPos++] & 0xFF; - } - - private boolean fill() throws IOException { - bufStart = in.getPos(); - bufLen = in.read(buf, 0, BUF_SIZE); - if(bufLen <= 0) { - bufLen = 0; - bufPos = 0; - return false; - } - bufPos = 0; - return true; - } - - @Override - public void seek(long pos) throws IOException { - final long bufEnd = bufStart + bufLen; - if(pos >= bufStart && pos < bufEnd) { - bufPos = (int) (pos - bufStart); - } - else { - in.seek(pos); - bufStart = pos; - bufLen = 0; - bufPos = 0; - } - } - - @Override - public long getPos() { - return bufStart + bufPos; - } - - @Override - public void close() throws IOException { - in.close(); - } - } - - private static final class ColumnInfo { - private final int ncols; - private final long dataStart; - - private ColumnInfo(int ncols, long dataStart) { - this.ncols = ncols; - this.dataStart = dataStart; - } - } - - private static final class Token { - private final double value; - private final int term; + }, qOut); - private Token(double value, int term) { - this.value = value; - this.term = term; - } + MatrixObject mout = ec.getMatrixObject(output); + mout.setStreamHandle(qOut); } } diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java index 5b297a5d530..3e9a7a881a6 100644 --- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java @@ -22,9 +22,12 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; @@ -43,8 +46,11 @@ import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.data.SparseBlock; import org.apache.sysds.runtime.data.SparseRow; +import org.apache.sysds.runtime.instructions.ooc.OOCStream; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.io.IOUtilFunctions.CountRowsTask; import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.util.CommonThreadPool; import org.apache.sysds.runtime.util.UtilFunctions; @@ -65,6 +71,7 @@ public class ReaderTextCSVParallel extends MatrixReader { protected int _rLen; protected int _cLen; protected JobConf _job; + protected boolean _streamSparse = false; public ReaderTextCSVParallel(FileFormatPropertiesCSV props) { _numThreads = OptimizerUtils.getParallelTextReadParallelism(); @@ -97,7 +104,7 @@ public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int bl MatrixBlock ret = computeCSVSizeAndCreateOutputMatrixBlock(splits, path, rlen, clen, blen, estnnz); // Second Read Pass (read, parse strings, append to matrix block) - readCSVMatrixFromHDFS(splits, path, ret); + readCSVMatrixFromHDFS(splits, path, ret, null); // post-processing (representation-specific, change of sparse/dense block representation) // - no sorting required for CSV because it is read in sorted order per row @@ -112,6 +119,53 @@ public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int bl return ret; } + public MatrixBlock readMatrixAsStream(OOCStream outStream, String fname, long rlen, long clen, + int blen, long estnnz) throws IOException, DMLRuntimeException { + _bLen = blen; + + // prepare file access + _job = new JobConf(ConfigurationManager.getCachedJobConf()); + + Path path = new Path(fname); + FileSystem fs = IOUtilFunctions.getFileSystem(path, _job); + + FileInputFormat.addInputPath(_job, path); + TextInputFormat informat = new TextInputFormat(); + informat.configure(_job); + + InputSplit[] splits = informat.getSplits(_job, _numThreads); + splits = IOUtilFunctions.sortInputSplits(splits); + + // check existence and non-empty file + checkValidInputFile(fs, path); + + // count rows/cols to populate meta data and split offsets + long estnnz2; + ExecutorService pool = CommonThreadPool.get(_numThreads); + try { + estnnz2 = computeCSVSize(splits, path, rlen, clen, estnnz, pool); + } + catch(Exception e) { + throw new IOException("Thread pool Error " + e.getMessage(), e); + } + finally { + pool.shutdown(); + } + + _streamSparse = MatrixBlock.evalSparseFormatInMemory(_rLen, _cLen, estnnz2); + + // stream CSV into blen x blen blocks + try { + BlockBuffer buffer = new BlockBuffer(outStream, _streamSparse); + readCSVMatrixFromHDFS(splits, path, null, buffer); + buffer.flushRemaining(); + } + finally { + outStream.closeInput(); + } + return null; + } + @Override public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int blen, long estnnz) throws IOException, DMLRuntimeException { @@ -119,7 +173,8 @@ public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long cle return new ReaderTextCSV(_props).readMatrixFromInputStream(is, rlen, clen, blen, estnnz); } - private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, MatrixBlock dest) throws IOException { + private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, MatrixBlock dest, BlockBuffer streamBuffer) + throws IOException { FileInputFormat.addInputPath(_job, path); TextInputFormat informat = new TextInputFormat(); @@ -131,17 +186,19 @@ private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, MatrixBlock d // create read tasks for all splits ArrayList> tasks = new ArrayList<>(); int splitCount = 0; + final boolean sparseOut = (streamBuffer != null) ? streamBuffer.isSparseBlocks() : + dest.isInSparseFormat(); for(InputSplit split : splits) { - if(dest.isInSparseFormat() && _props.getNAStrings() != null) - tasks.add(new CSVReadSparseNanTask(split, informat, dest, splitCount++)); - else if(dest.isInSparseFormat() && _props.getFillValue() == 0) - tasks.add(new CSVReadSparseNoNanTaskAndFill(split, informat, dest, splitCount++)); - else if(dest.isInSparseFormat()) - tasks.add(new CSVReadSparseNoNanTask(split, informat, dest, splitCount++)); + if(sparseOut && _props.getNAStrings() != null) + tasks.add(new CSVReadSparseNanTask(split, informat, dest, splitCount++, streamBuffer)); + else if(sparseOut && _props.getFillValue() == 0) + tasks.add(new CSVReadSparseNoNanTaskAndFill(split, informat, dest, splitCount++, streamBuffer)); + else if(sparseOut) + tasks.add(new CSVReadSparseNoNanTask(split, informat, dest, splitCount++, streamBuffer)); else if(_props.getNAStrings() != null) - tasks.add(new CSVReadDenseNanTask(split, informat, dest, splitCount++)); + tasks.add(new CSVReadDenseNanTask(split, informat, dest, splitCount++, streamBuffer)); else - tasks.add(new CSVReadDenseNoNanTask(split, informat, dest, splitCount++)); + tasks.add(new CSVReadDenseNoNanTask(split, informat, dest, splitCount++, streamBuffer)); } // check return codes and aggregate nnz @@ -149,7 +206,8 @@ else if(_props.getNAStrings() != null) for(Future rt : pool.invokeAll(tasks)) lnnz += rt.get(); - dest.setNonZeros(lnnz); + if(dest != null) + dest.setNonZeros(lnnz); } catch(Exception e) { throw new IOException("Thread pool issue, while parallel read.", e); @@ -159,6 +217,7 @@ else if(_props.getNAStrings() != null) } } + private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits, Path path, long rlen, long clen, int blen, long estnnz) throws IOException, DMLRuntimeException { _rLen = 0; @@ -172,10 +231,29 @@ private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits Future ret = (rlen<0 || clen<0 || estnnz<0) ? null : pool.submit(() -> createOutputMatrixBlock(rlen, clen, blen, estnnz, true, true)); + long estnnz2 = computeCSVSize(splits, path, rlen, clen, estnnz, pool); + return (ret!=null) ? UtilFunctions.getSafe(ret) : + createOutputMatrixBlock(_rLen, _cLen, blen, estnnz2, true, true); + } + catch(Exception e) { + throw new IOException("Thread pool Error " + e.getMessage(), e); + } + finally{ + pool.shutdown(); + } + } + + private long computeCSVSize(InputSplit[] splits, + Path path, long rlen, long clen, long estnnz, ExecutorService pool) throws IOException { + _rLen = 0; + _cLen = 0; + + // count rows in parallel per split + try { FileInputFormat.addInputPath(_job, path); TextInputFormat informat = new TextInputFormat(); informat.configure(_job); - + // count number of entities in the first non-header row LongWritable key = new LongWritable(); Text oneLine = new Text(); @@ -196,7 +274,7 @@ private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits tasks.add(new CountRowsTask(split, informat, _job, hasHeader)); hasHeader = false; } - + // collect row counts for offset computation // early error notify in case not all tasks successful _offsets = new SplitOffsetInfos(tasks.size()); @@ -208,7 +286,7 @@ private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits _rLen = _rLen + lnrow; i++; } - + // robustness for wrong dimensions which are already compiled into the plan if((rlen != -1 && _rLen != rlen) || (clen != -1 && _cLen != clen)) { @@ -229,8 +307,7 @@ private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits // allocate target matrix block based on given size; // need to allocate sparse as well since lock-free insert into target long estnnz2 = (estnnz < 0) ? (long) _rLen * _cLen : estnnz; - return (ret!=null) ? UtilFunctions.getSafe(ret) : - createOutputMatrixBlock(_rLen, _cLen, blen, estnnz2, true, true); + return estnnz2; } catch(Exception e) { throw new IOException("Thread pool Error " + e.getMessage(), e); @@ -271,16 +348,19 @@ private abstract class CSVReadTask implements Callable { protected final InputSplit _split; protected final TextInputFormat _informat; protected final MatrixBlock _dest; + protected final BlockBuffer _streamBuffer; protected final boolean _isFirstSplit; protected final int _splitCount; protected int _row = 0; protected int _col = 0; - public CSVReadTask(InputSplit split, TextInputFormat informat, MatrixBlock dest, int splitCount) { + public CSVReadTask(InputSplit split, TextInputFormat informat, MatrixBlock dest, int splitCount, + BlockBuffer buffer) { _split = split; _informat = informat; _dest = dest; + _streamBuffer = buffer; _isFirstSplit = (splitCount == 0); _splitCount = splitCount; } @@ -335,24 +415,184 @@ protected void verifyRows(Text value) throws IOException { + value); } } + + protected void finishRow(int row) { + if(_streamBuffer != null) + _streamBuffer.finishRow(row); + } + } + + private interface RowWriter { + void set(int col, double value); + } + + private static class DenseRowWriter implements RowWriter { + private final double[] _vals; + private final int _pos; + + public DenseRowWriter(DenseBlock block, int row) { + _vals = block.values(row); + _pos = block.pos(row); + } + + @Override + public void set(int col, double value) { + _vals[_pos + col] = value; + } + } + + private static class SparseRowWriter implements RowWriter { + private final SparseRow _row; + + public SparseRowWriter(SparseBlock block, int row) { + block.allocate(row); + _row = block.get(row); + } + + @Override + public void set(int col, double value) { + _row.append(col, value); + } + } + + private class BlockBuffer { + private final OOCStream _stream; + private final boolean _sparseBlocks; + private final int _numBlockCols; + private final ConcurrentHashMap _states = new ConcurrentHashMap<>(); + + public BlockBuffer(OOCStream stream, boolean sparseBlocks) { + _stream = stream; + _sparseBlocks = sparseBlocks; + _numBlockCols = Math.max(1, (int) Math.ceil((double) _cLen / _bLen)); + } + + public boolean isSparseBlocks() { + return _sparseBlocks; + } + + public RowWriter getRowWriter(int row) { + int brow = row / _bLen; + BlockRowState state = _states.computeIfAbsent(brow, BlockRowState::new); + return state.createRowWriter(row % _bLen); + } + + public void finishRow(int row) { + int brow = row / _bLen; + BlockRowState state = _states.get(brow); + if(state != null && state.finishRow()) { + if(_states.remove(brow, state)) + state.flush(brow); + } + } + + public void flushRemaining() { + for(Map.Entry entry : _states.entrySet()) { + if(_states.remove(entry.getKey(), entry.getValue())) + entry.getValue().flush(entry.getKey()); + } + } + + private class StreamRowWriter implements RowWriter { + private final BlockRowState _state; + private final int _rowInBlock; + + public StreamRowWriter(BlockRowState state, int rowInBlock) { + _state = state; + _rowInBlock = rowInBlock; + } + + @Override + public void set(int col, double value) { + if(value == 0) + return; + int bcol = col / _bLen; + MatrixBlock block = _state.getOrCreateBlock(bcol); + int localCol = col % _bLen; + if(_sparseBlocks) { + SparseBlock sb = block.getSparseBlock(); + sb.allocate(_rowInBlock); + sb.get(_rowInBlock).append(localCol, value); + } + else { + DenseBlock db = block.getDenseBlock(); + double[] vals = db.values(_rowInBlock); + int pos = db.pos(_rowInBlock); + vals[pos + localCol] = value; + } + } + } + + private class BlockRowState { + private final MatrixBlock[] _blocks; + private final int _rowsInBlock; + private final AtomicInteger _rowsCompleted = new AtomicInteger(); + + public BlockRowState(int brow) { + _blocks = new MatrixBlock[_numBlockCols]; + _rowsInBlock = Math.min(_bLen, _rLen - brow * _bLen); + } + + public RowWriter createRowWriter(int rowInBlock) { + return new StreamRowWriter(this, rowInBlock); + } + + public boolean finishRow() { + return _rowsCompleted.incrementAndGet() == _rowsInBlock; + } + + public void flush(int brow) { + for(int bci = 0; bci < _blocks.length; bci++) { + MatrixBlock block = _blocks[bci]; + if(block == null) + continue; + block.recomputeNonZeros(); + if(block.getNonZeros() == 0) + continue; + block.examSparsity(); + MatrixIndexes idx = new MatrixIndexes(brow + 1, bci + 1); + _stream.enqueue(new IndexedMatrixValue(idx, block)); + } + } + + private MatrixBlock getOrCreateBlock(int bcol) { + MatrixBlock block = _blocks[bcol]; + if(block == null) { + synchronized(this) { + block = _blocks[bcol]; + if(block == null) { + int cols = Math.min(_bLen, _cLen - bcol * _bLen); + block = new MatrixBlock(_rowsInBlock, cols, _sparseBlocks); + if(_sparseBlocks) + block.allocateSparseRowsBlock(); + else + block.allocateDenseBlock(); + _blocks[bcol] = block; + } + } + } + return block; + } + } } private class CSVReadDenseNoNanTask extends CSVReadTask { - public CSVReadDenseNoNanTask(InputSplit split, TextInputFormat informat, MatrixBlock dest, int splitCount) { - super(split, informat, dest, splitCount); + public CSVReadDenseNoNanTask(InputSplit split, TextInputFormat informat, MatrixBlock dest, int splitCount, + BlockBuffer buffer) { + super(split, informat, dest, splitCount, buffer); } protected long parse(RecordReader reader, LongWritable key, Text value) throws IOException { - DenseBlock a = _dest.getDenseBlock(); + DenseBlock a = (_streamBuffer == null) ? _dest.getDenseBlock() : null; double cellValue = 0; long nnz = 0; boolean noFillEmpty = false; while(reader.next(key, value)) { // foreach line final String cellStr = value.toString().trim(); - double[] avals = a.values(_row); - int apos = a.pos(_row); + RowWriter rowWriter = (_streamBuffer != null) ? + _streamBuffer.getRowWriter(_row) : new DenseRowWriter(a, _row); final String[] parts = _cLen == 1 ? null : IOUtilFunctions.split(cellStr, _props.getDelim()); @@ -365,14 +605,14 @@ protected long parse(RecordReader reader, LongWritable key, else { cellValue = Double.parseDouble(part); } - if(cellValue != 0) { - avals[apos+j] = cellValue; + rowWriter.set(j, cellValue); + if(cellValue != 0) nnz++; - } } // sanity checks (number of columns, fill values) IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty); IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen); + finishRow(_row); _row++; } @@ -383,20 +623,21 @@ protected long parse(RecordReader reader, LongWritable key, private class CSVReadDenseNanTask extends CSVReadTask { - public CSVReadDenseNanTask(InputSplit split, TextInputFormat informat, MatrixBlock dest, int splitCount) { - super(split, informat, dest, splitCount); + public CSVReadDenseNanTask(InputSplit split, TextInputFormat informat, MatrixBlock dest, int splitCount, + BlockBuffer buffer) { + super(split, informat, dest, splitCount, buffer); } protected long parse(RecordReader reader, LongWritable key, Text value) throws IOException { - DenseBlock a = _dest.getDenseBlock(); + DenseBlock a = (_streamBuffer == null) ? _dest.getDenseBlock() : null; double cellValue = 0; boolean noFillEmpty = false; long nnz = 0; while(reader.next(key, value)) { // foreach line String cellStr = value.toString().trim(); String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim()); - double[] avals = a.values(_row); - int apos = a.pos(_row); + RowWriter rowWriter = (_streamBuffer != null) ? + _streamBuffer.getRowWriter(_row) : new DenseRowWriter(a, _row); for(int j = 0; j < _cLen; j++) { // foreach cell String part = parts[j].trim(); if(part.isEmpty()) { @@ -406,14 +647,14 @@ protected long parse(RecordReader reader, LongWritable key, else cellValue = UtilFunctions.parseToDouble(part, _props.getNAStrings()); - if(cellValue != 0) { - avals[apos+j] = cellValue; + rowWriter.set(j, cellValue); + if(cellValue != 0) nnz++; - } } // sanity checks (number of columns, fill values) IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty); IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen); + finishRow(_row); _row++; } return nnz; @@ -422,23 +663,23 @@ protected long parse(RecordReader reader, LongWritable key, private class CSVReadSparseNanTask extends CSVReadTask { - public CSVReadSparseNanTask(InputSplit split, TextInputFormat informat, MatrixBlock dest, int splitCount) { - super(split, informat, dest, splitCount); + public CSVReadSparseNanTask(InputSplit split, TextInputFormat informat, MatrixBlock dest, int splitCount, + BlockBuffer buffer) { + super(split, informat, dest, splitCount, buffer); } protected long parse(RecordReader reader, LongWritable key, Text value) throws IOException { boolean noFillEmpty = false; double cellValue = 0; - final SparseBlock sb = _dest.getSparseBlock(); + final SparseBlock sb = (_streamBuffer == null) ? _dest.getSparseBlock() : null; long nnz = 0; while(reader.next(key, value)) { final String cellStr = value.toString().trim(); final String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim()); _col = 0; - sb.allocate(_row); - SparseRow r = sb.get(_row); - + RowWriter rowWriter = (_streamBuffer != null) ? + _streamBuffer.getRowWriter(_row) : new SparseRowWriter(sb, _row); for(String part : parts) { part = part.trim(); if(part.isEmpty()) { @@ -450,7 +691,7 @@ protected long parse(RecordReader reader, LongWritable key, } if(cellValue != 0) { - r.append(_col, cellValue); + rowWriter.set(_col, cellValue); nnz++; } _col++; @@ -460,6 +701,7 @@ protected long parse(RecordReader reader, LongWritable key, IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty); IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen); + finishRow(_row); _row++; } return nnz; @@ -467,12 +709,13 @@ protected long parse(RecordReader reader, LongWritable key, } private class CSVReadSparseNoNanTask extends CSVReadTask { - public CSVReadSparseNoNanTask(InputSplit split, TextInputFormat informat, MatrixBlock dest, int splitCount) { - super(split, informat, dest, splitCount); + public CSVReadSparseNoNanTask(InputSplit split, TextInputFormat informat, MatrixBlock dest, int splitCount, + BlockBuffer buffer) { + super(split, informat, dest, splitCount, buffer); } protected long parse(RecordReader reader, LongWritable key, Text value) throws IOException { - final SparseBlock sb = _dest.getSparseBlock(); + final SparseBlock sb = (_streamBuffer == null) ? _dest.getSparseBlock() : null; long nnz = 0; double cellValue = 0; boolean noFillEmpty = false; @@ -480,8 +723,8 @@ protected long parse(RecordReader reader, LongWritable key, _col = 0; final String cellStr = value.toString().trim(); final String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim()); - sb.allocate(_row); - SparseRow r = sb.get(_row); + RowWriter rowWriter = (_streamBuffer != null) ? + _streamBuffer.getRowWriter(_row) : new SparseRowWriter(sb, _row); for(String part : parts) { part = part.trim(); if(part.isEmpty()) { @@ -493,7 +736,7 @@ protected long parse(RecordReader reader, LongWritable key, } if(cellValue != 0) { - r.append(_col, cellValue); + rowWriter.set(_col, cellValue); nnz++; } _col++; @@ -503,6 +746,7 @@ protected long parse(RecordReader reader, LongWritable key, IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty); IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen); + finishRow(_row); _row++; } return nnz; @@ -511,25 +755,25 @@ protected long parse(RecordReader reader, LongWritable key, private class CSVReadSparseNoNanTaskAndFill extends CSVReadTask { public CSVReadSparseNoNanTaskAndFill(InputSplit split, TextInputFormat informat, MatrixBlock dest, - int splitCount) { - super(split, informat, dest, splitCount); + int splitCount, BlockBuffer buffer) { + super(split, informat, dest, splitCount, buffer); } protected long parse(RecordReader reader, LongWritable key, Text value) throws IOException { - final SparseBlock sb = _dest.getSparseBlock(); + final SparseBlock sb = (_streamBuffer == null) ? _dest.getSparseBlock() : null; long nnz = 0; double cellValue = 0; while(reader.next(key, value)) { _col = 0; final String cellStr = value.toString().trim(); final String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim()); - sb.allocate(_row); - SparseRow r = sb.get(_row); + RowWriter rowWriter = (_streamBuffer != null) ? + _streamBuffer.getRowWriter(_row) : new SparseRowWriter(sb, _row); for(String part : parts) { if(!part.isEmpty()) { cellValue = Double.parseDouble(part); if(cellValue != 0) { - r.append(_col, cellValue); + rowWriter.set(_col, cellValue); nnz++; } } @@ -538,6 +782,7 @@ protected long parse(RecordReader reader, LongWritable key, IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen); + finishRow(_row); _row++; } return nnz;