diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java index 3a79443157b..d0ba5363939 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java @@ -59,8 +59,8 @@ import org.apache.sysds.runtime.compress.lib.CLALibMatrixMult; import org.apache.sysds.runtime.compress.lib.CLALibMerge; import org.apache.sysds.runtime.compress.lib.CLALibRemoveEmpty; -import org.apache.sysds.runtime.compress.lib.CLALibReplace; import org.apache.sysds.runtime.compress.lib.CLALibReorg; +import org.apache.sysds.runtime.compress.lib.CLALibReplace; import org.apache.sysds.runtime.compress.lib.CLALibReshape; import org.apache.sysds.runtime.compress.lib.CLALibRexpand; import org.apache.sysds.runtime.compress.lib.CLALibScalar; @@ -103,6 +103,7 @@ import org.apache.sysds.runtime.util.IndexRange; import org.apache.sysds.utils.DMLCompressionStatistics; import org.apache.sysds.utils.stats.InfrastructureAnalyzer; +import org.apache.sysds.utils.stats.Timing; public class CompressedMatrixBlock extends MatrixBlock { private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName()); @@ -477,16 +478,20 @@ public void readFields(DataInput in) throws IOException { } public static CompressedMatrixBlock read(DataInput in) throws IOException { + Timing t = new Timing(); int rlen = in.readInt(); int clen = in.readInt(); long nonZeros = in.readLong(); boolean overlappingColGroups = in.readBoolean(); List groups = ColGroupIO.readGroups(in, rlen); - return new CompressedMatrixBlock(rlen, clen, nonZeros, overlappingColGroups, groups); + CompressedMatrixBlock ret = new CompressedMatrixBlock(rlen, clen, nonZeros, overlappingColGroups, groups); + LOG.debug("Compressed read serialization time: " + t.stop()); + return ret; } @Override public void write(DataOutput out) throws IOException { + Timing t = new Timing(); final long estimateUncompressed = nonZeros > 0 ? MatrixBlock.estimateSizeOnDisk(rlen, clen, nonZeros) : Long.MAX_VALUE; final long estDisk = nonZeros > 0 ? getExactSizeOnDisk() : Long.MAX_VALUE; @@ -514,6 +519,7 @@ public void write(DataOutput out) throws IOException { out.writeLong(nonZeros); out.writeBoolean(overlappingColGroups); ColGroupIO.writeGroups(out, _colGroups); + LOG.debug("Compressed write serialization time: " + t.stop()); } /** @@ -613,16 +619,7 @@ public MatrixBlock aggregateUnaryOperations(AggregateUnaryOperator op, MatrixVal public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype, int k) { // check for transpose type if(tstype == MMTSJType.LEFT) { - if(isEmpty()) - return new MatrixBlock(clen, clen, true); - // create output matrix block - if(out == null) - out = new MatrixBlock(clen, clen, false); - else - out.reset(clen, clen, false); - out.allocateDenseBlock(); - CLALibTSMM.leftMultByTransposeSelf(this, out, k); - return out; + return CLALibTSMM.leftMultByTransposeSelf(this, out, k); } else { throw new DMLRuntimeException("Invalid MMTSJ type '" + tstype.toString() + "'."); diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java index 4c48effb4df..7eae9ca0a7e 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java @@ -64,6 +64,9 @@ public class CompressedMatrixBlockFactory { private static final Log LOG = LogFactory.getLog(CompressedMatrixBlockFactory.class.getName()); + /** Global lock serializing all async compressions to bound concurrent compression memory/CPU. */ + private static final Object asyncCompressLock = new Object(); + /** Timing object to measure the time of each phase in the compression */ private final Timing time = new Timing(true); /** Compression statistics gathered throughout the compression */ @@ -181,21 +184,23 @@ public static Future compressAsync(ExecutionContext ec, String varName) { } public static Future compressAsync(ExecutionContext ec, String varName, InstructionTypeCounter ins) { - LOG.debug("Compressing Async"); final ExecutorService pool = CommonThreadPool.get(); // We have to guarantee that a thread pool is allocated. return CompletableFuture.runAsync(() -> { // method call or code to be async try { CacheableData data = ec.getCacheableData(varName); - if(data instanceof MatrixObject) { - MatrixObject mo = (MatrixObject) data; - MatrixBlock mb = mo.acquireReadAndRelease(); - MatrixBlock mbc = CompressedMatrixBlockFactory.compress(mo.acquireReadAndRelease(), ins).getLeft(); - if(mbc instanceof CompressedMatrixBlock) { - ExecutionContext.createCacheableData(mb); - mo.acquireModify(mbc); - mo.release(); - mbc.sum(); // calculate sum to forcefully materialize counts + synchronized(asyncCompressLock) { // global lock: serialize all async compressions (not per-matrix) + if(data instanceof MatrixObject) { + LOG.debug("Compressing Async"); + MatrixObject mo = (MatrixObject) data; + MatrixBlock mb = mo.acquireReadAndRelease(); + MatrixBlock mbc = CompressedMatrixBlockFactory.compress(mb, ins).getLeft(); + if(mbc instanceof CompressedMatrixBlock) { + ExecutionContext.createCacheableData(mb); + mo.acquireModify(mbc); + mo.release(); + mbc.sum(); // calculate sum to forcefully materialize counts + } } } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java index ce52bcd23fd..d981ab87838 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java @@ -48,6 +48,7 @@ import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; import org.apache.sysds.runtime.compress.colgroup.offset.AIterator; +import org.apache.sysds.runtime.compress.utils.HashMapIntToInt; import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.data.DenseBlockFP64; import org.apache.sysds.runtime.data.SparseBlock; @@ -55,7 +56,6 @@ import org.apache.sysds.runtime.data.SparseRow; import org.apache.sysds.runtime.data.SparseRowScalar; import org.apache.sysds.runtime.data.SparseRowVector; -import org.apache.sysds.runtime.frame.data.columns.HashMapToInt; import org.apache.sysds.runtime.functionobjects.Divide; import org.apache.sysds.runtime.functionobjects.Minus; import org.apache.sysds.runtime.functionobjects.Multiply; @@ -77,7 +77,7 @@ public final class CLALibBinaryCellOp { private static final Log LOG = LogFactory.getLog(CLALibBinaryCellOp.class.getName()); - public static final int DECOMPRESSION_BLEN = 16384; + public static final int DECOMPRESSION_BLEN = 8192; private CLALibBinaryCellOp() { // empty private constructor. @@ -86,7 +86,7 @@ private CLALibBinaryCellOp() { public static MatrixBlock binaryOperationsRight(BinaryOperator op, CompressedMatrixBlock m1, MatrixBlock that) { try { - op = LibMatrixBincell.replaceOpWithSparseSafeIfApplicable(m1, that, op); + op = LibMatrixBincell.replaceOpWithSparseSafeIfApplicable(m1, that, op); if((that.getNumRows() == 1 && that.getNumColumns() == 1) || that.isEmpty()) { ScalarOperator sop = new RightScalarOperator(op.fn, that.get(0, 0), op.getNumThreads()); @@ -113,7 +113,7 @@ public static MatrixBlock binaryOperationsLeft(BinaryOperator op, CompressedMatr return selectProcessingBasedOnAccessType(op, m1, that, atype, true); } catch(Exception e) { - throw new DMLRuntimeException("Failed Left Binary Compressed Operation", e); + throw new DMLRuntimeException("Failed Left Binary Compressed Operation: " + op, e); } } @@ -122,8 +122,8 @@ private static MatrixBlock binaryOperationsRightFiltered(BinaryOperator op, Comp BinaryAccessType atype = LibMatrixBincell.getBinaryAccessTypeExtended(m1, that); if(isDoubleCompressedOpApplicable(m1, that)) return doubleCompressedBinaryOp(op, m1, (CompressedMatrixBlock) that); - if(that instanceof CompressedMatrixBlock && that.getNumColumns() == m1.getNumColumns() - && that.getInMemorySize() < m1.getInMemorySize() ) { + if(that instanceof CompressedMatrixBlock && that.getNumColumns() == m1.getNumColumns() && + that.getInMemorySize() < m1.getInMemorySize()) { MatrixBlock m1uc = CompressedMatrixBlock.getUncompressed(m1, "Decompressing left side in BinaryOps"); return selectProcessingBasedOnAccessType(op, (CompressedMatrixBlock) that, m1uc, atype, true); } @@ -135,16 +135,15 @@ private static MatrixBlock binaryOperationsRightFiltered(BinaryOperator op, Comp } private static boolean isDoubleCompressedOpApplicable(CompressedMatrixBlock m1, MatrixBlock that) { - return that instanceof CompressedMatrixBlock - && !m1.isOverlapping() - && m1.getColGroups().get(0) instanceof ColGroupDDC - && !((CompressedMatrixBlock) that).isOverlapping() - && ((CompressedMatrixBlock) that).getColGroups().get(0) instanceof ColGroupDDC - && ((IMapToDataGroup) m1.getColGroups().get(0)).getMapToData() == - ((IMapToDataGroup) ((CompressedMatrixBlock) that).getColGroups().get(0)).getMapToData(); + return that instanceof CompressedMatrixBlock && !m1.isOverlapping() && + m1.getColGroups().get(0) instanceof ColGroupDDC && !((CompressedMatrixBlock) that).isOverlapping() && + ((CompressedMatrixBlock) that).getColGroups().get(0) instanceof ColGroupDDC && + ((IMapToDataGroup) m1.getColGroups().get(0)) + .getMapToData() == ((IMapToDataGroup) ((CompressedMatrixBlock) that).getColGroups().get(0)).getMapToData(); } - private static CompressedMatrixBlock doubleCompressedBinaryOp(BinaryOperator op, CompressedMatrixBlock m1, CompressedMatrixBlock m2) { + private static CompressedMatrixBlock doubleCompressedBinaryOp(BinaryOperator op, CompressedMatrixBlock m1, + CompressedMatrixBlock m2) { LOG.debug("Double Compressed BinaryOp"); AColGroup left = m1.getColGroups().get(0); AColGroup right = m2.getColGroups().get(0); @@ -201,6 +200,7 @@ private static MatrixBlock mvCol(BinaryOperator op, CompressedMatrixBlock m1, Ma // Column vector access MatrixBlock d_compressed = m1.getCachedDecompressed(); if(d_compressed != null) { + LOG.debug("Using cached decompressed for Matrix column vector compressed operation"); if(left) throw new NotImplementedException("Binary row op left is not supported for Uncompressed Matrix, " + "Implement support for VMr in MatrixBlock Binary Cell operations"); @@ -416,17 +416,29 @@ private static MatrixBlock mvColCompressed(CompressedMatrixBlock m1, MatrixBlock Pair tuple = evaluateSparsityMVCol(m1, m2, op, left); double estSparsity = tuple.getKey(); double estNnzPerRow = tuple.getValue(); - boolean shouldBeSparseOut = MatrixBlock.evalSparseFormatInMemory(nRows, nCols, (long) (estSparsity * nRows * nCols)); + boolean shouldBeSparseOut = MatrixBlock.evalSparseFormatInMemory(nRows, nCols, + (long) (estSparsity * nRows * nCols)); // currently also jump into that case if estNnzPerRow == 0 - if(estNnzPerRow <= 2 && nCols <= 31 && op.fn instanceof ValueComparisonFunction){ - return k <= 1 ? binaryMVComparisonColSingleThreadCompressed(m1, m2, op, left) : - binaryMVComparisonColMultiCompressed(m1, m2, op, left); + if(estNnzPerRow <= 2 && nCols <= 31 && op.fn instanceof ValueComparisonFunction) { + return k <= 1 ? binaryMVComparisonColSingleThreadCompressed(m1, m2, op, + left) : binaryMVComparisonColMultiCompressed(m1, m2, op, left); } MatrixBlock ret = new MatrixBlock(nRows, nCols, shouldBeSparseOut, -1).allocateBlock(); if(shouldBeSparseOut) { - if(k <= 1) + // The sparse-sparse path only visits the stored non-zeros of m1, so it is correct only when the operation + // maps a zero in m1 to a zero output for every value of the vector m2 (i.e. it does not introduce non-zeros + // from m1's zeros). Otherwise fall back to the dense-scan sparse path that evaluates every cell. + final boolean sparseSafeOnM1Zeros = left ? op.isRowSafeLeft(m2) : op.isRowSafeRight(m2); + if(sparseSafeOnM1Zeros && !m1.isOverlapping() && + MatrixBlock.evalSparseFormatInMemory(nRows, nCols, m1.getNonZeros())) { + if(k <= 1) + nnz = binaryMVColSingleThreadSparseSparse(m1, m2, op, left, ret); + else + nnz = binaryMVColMultiThreadSparseSparse(m1, m2, op, left, ret); + } + else if(k <= 1) nnz = binaryMVColSingleThreadSparse(m1, m2, op, left, ret); else nnz = binaryMVColMultiThreadSparse(m1, m2, op, left, ret); @@ -438,7 +450,7 @@ private static MatrixBlock mvColCompressed(CompressedMatrixBlock m1, MatrixBlock nnz = binaryMVColMultiThreadDense(m1, m2, op, left, ret); } - if(op.fn instanceof ValueComparisonFunction) { + if(op.fn instanceof ValueComparisonFunction) { // potentially empty or filled. if(nnz == (long) nRows * nCols)// all was 1 return CompressedMatrixBlockFactory.createConstant(nRows, nCols, 1.0); else if(nnz == 0) // all was 0 -> return empty. @@ -452,19 +464,19 @@ else if(nnz == 0) // all was 0 -> return empty. } private static MatrixBlock binaryMVComparisonColSingleThreadCompressed(CompressedMatrixBlock m1, MatrixBlock m2, - BinaryOperator op, boolean left) { + BinaryOperator op, boolean left) { final int nRows = m1.getNumRows(); final int nCols = m1.getNumColumns(); // get indicators (one-hot-encoded comparison results) - BinaryMVColTaskCompressed task = new BinaryMVColTaskCompressed(m1, m2, 0, nRows, op, left); + BinaryMVColTaskCompressed task = new BinaryMVColTaskCompressed(m1, m2, 0, nRows, op, left); long nnz = task.call(); int[] indicators = task._ret; // map each unique indicator to an index - HashMapToInt hm = new HashMapToInt<>(nCols*3); + HashMapIntToInt hm = new HashMapIntToInt(nCols * 3); int[] colMap = new int[nRows]; - for(int i = 0; i < m1.getNumRows(); i++){ + for(int i = 0; i < m1.getNumRows(); i++) { int nextId = hm.size(); int id = hm.putIfAbsentI(indicators[i], nextId); colMap[i] = id == -1 ? nextId : id; @@ -477,37 +489,39 @@ private static MatrixBlock binaryMVComparisonColSingleThreadCompressed(Compresse return getCompressedMatrixBlock(m1, colMap, hm.size(), outMb, nRows, nCols, nnz); } - private static void fillSparseBlockFromIndicatorFromIndicatorInt(int numCol, Integer indicator, Integer rix, SparseBlockMCSR out) { + private static void fillSparseBlockFromIndicatorFromIndicatorInt(int numCol, Integer indicator, Integer rix, + SparseBlockMCSR out) { ArrayList colIndices = new ArrayList<>(8); - for (int c = numCol - 1; c >= 0; c--) { + for(int c = numCol - 1; c >= 0; c--) { if(indicator <= 0) break; - if(indicator % 2 == 1){ + if(indicator % 2 == 1) { colIndices.add(c); } indicator = indicator >> 1; } SparseRow row = null; - if(colIndices.size() > 1){ + if(colIndices.size() > 1) { double[] vals = new double[colIndices.size()]; Arrays.fill(vals, 1); int[] indices = new int[colIndices.size()]; - for (int i = 0, j = colIndices.size() - 1; i < colIndices.size(); i++, j--) + for(int i = 0, j = colIndices.size() - 1; i < colIndices.size(); i++, j--) indices[i] = colIndices.get(j); row = new SparseRowVector(vals, indices); - } else if(colIndices.size() == 1){ + } + else if(colIndices.size() == 1) { row = new SparseRowScalar(colIndices.get(0), 1.0); } out.set(rix, row, false); } private static MatrixBlock binaryMVComparisonColMultiCompressed(CompressedMatrixBlock m1, MatrixBlock m2, - BinaryOperator op, boolean left) throws Exception { + BinaryOperator op, boolean left) throws Exception { final int nRows = m1.getNumRows(); final int nCols = m1.getNumColumns(); final int k = op.getNumThreads(); - final int blkz = nRows / k; + final int blkz = Math.max((nRows + k) / k, 1000); // get indicators (one-hot-encoded comparison results) long nnz = 0; @@ -518,14 +532,11 @@ private static MatrixBlock binaryMVComparisonColMultiCompressed(CompressedMatrix tasks.add(new BinaryMVColTaskCompressed(m1, m2, i, Math.min(nRows, i + blkz), op, left)); } List> futures = pool.invokeAll(tasks); - HashMapToInt hm = new HashMapToInt<>(nCols*2); + HashMapIntToInt hm = new HashMapIntToInt(nCols * 2); int[] colMap = new int[nRows]; - for(Future f : futures) - nnz += f.get(); - // map each unique indicator to an index - mergeMVColTaskResults(tasks, blkz, hm, colMap); + nnz = mergeMVColTaskResults(futures, tasks, blkz, hm, colMap); // decode the unique indicator ints to SparseVectors MatrixBlock outMb = getMCSRMatrixBlock(hm, nCols); @@ -539,48 +550,53 @@ private static MatrixBlock binaryMVComparisonColMultiCompressed(CompressedMatrix } - private static void mergeMVColTaskResults(ArrayList tasks, int blkz, HashMapToInt hm, int[] colMap) { - + private static long mergeMVColTaskResults(List> futures, ArrayList tasks, + int blkz, HashMapIntToInt hm, int[] colMap) throws InterruptedException, ExecutionException { + long nnz = 0; for(int j = 0; j < tasks.size(); j++) { + nnz += futures.get(j).get(); // ensure task was finished. int[] indicators = tasks.get(j)._ret; - int offset = j* blkz; - - final int remainders = indicators.length % 8; - final int endVecLen = indicators.length - remainders; - for (int i = 0; i < endVecLen; i+= 8) { - colMap[offset + i] = hm.putIfAbsentReturnVal(indicators[i], hm.size()); - colMap[offset + i + 1] = hm.putIfAbsentReturnVal(indicators[i + 1], hm.size()); - colMap[offset + i + 2] = hm.putIfAbsentReturnVal(indicators[i + 2], hm.size()); - colMap[offset + i + 3] = hm.putIfAbsentReturnVal(indicators[i + 3], hm.size()); - colMap[offset + i + 4] = hm.putIfAbsentReturnVal(indicators[i + 4], hm.size()); - colMap[offset + i + 5] = hm.putIfAbsentReturnVal(indicators[i + 5], hm.size()); - colMap[offset + i + 6] = hm.putIfAbsentReturnVal(indicators[i + 6], hm.size()); - colMap[offset + i + 7] = hm.putIfAbsentReturnVal(indicators[i + 7], hm.size()); + int offset = j * blkz; - } - for (int i = 0; i < remainders; i++) { - colMap[offset + endVecLen + i] = hm.putIfAbsentReturnVal(indicators[endVecLen + i], hm.size()); - } + mergeMVColUnrolled(hm, colMap, indicators, offset); } + return nnz; } + private static void mergeMVColUnrolled(HashMapIntToInt hm, int[] colMap, int[] indicators, int offset) { + final int remainders = indicators.length % 8; + final int endVecLen = indicators.length - remainders; + for(int i = 0; i < endVecLen; i += 8) { + colMap[offset + i] = hm.putIfAbsentReturnVal(indicators[i], hm.size()); + colMap[offset + i + 1] = hm.putIfAbsentReturnVal(indicators[i + 1], hm.size()); + colMap[offset + i + 2] = hm.putIfAbsentReturnVal(indicators[i + 2], hm.size()); + colMap[offset + i + 3] = hm.putIfAbsentReturnVal(indicators[i + 3], hm.size()); + colMap[offset + i + 4] = hm.putIfAbsentReturnVal(indicators[i + 4], hm.size()); + colMap[offset + i + 5] = hm.putIfAbsentReturnVal(indicators[i + 5], hm.size()); + colMap[offset + i + 6] = hm.putIfAbsentReturnVal(indicators[i + 6], hm.size()); + colMap[offset + i + 7] = hm.putIfAbsentReturnVal(indicators[i + 7], hm.size()); + + } + for(int i = 0; i < remainders; i++) { + colMap[offset + endVecLen + i] = hm.putIfAbsentReturnVal(indicators[endVecLen + i], hm.size()); + } + } - private static CompressedMatrixBlock getCompressedMatrixBlock(CompressedMatrixBlock m1, int[] colMap, - int mapSize, MatrixBlock outMb, int nRows, int nCols, long nnz) { + private static CompressedMatrixBlock getCompressedMatrixBlock(CompressedMatrixBlock m1, int[] colMap, int mapSize, + MatrixBlock outMb, int nRows, int nCols, long nnz) { final IColIndex i = ColIndexFactory.create(0, m1.getNumColumns()); final AMapToData map = MapToFactory.create(m1.getNumRows(), colMap, mapSize); final AColGroup rgroup = ColGroupDDC.create(i, MatrixBlockDictionary.create(outMb), map, null); final ArrayList groups = new ArrayList<>(1); groups.add(rgroup); - return new CompressedMatrixBlock(nRows, nCols, nnz, false, groups); + return new CompressedMatrixBlock(nRows, nCols, nnz, false, groups); } - private static MatrixBlock getMCSRMatrixBlock(HashMapToInt hm, int nCols) { + private static MatrixBlock getMCSRMatrixBlock(HashMapIntToInt hm, int nCols) { // decode the unique indicator ints to SparseVectors SparseBlockMCSR out = new SparseBlockMCSR(hm.size()); - hm.forEach((indicator, rix) -> - fillSparseBlockFromIndicatorFromIndicatorInt(nCols, indicator, rix, out)); - return new MatrixBlock(hm.size(), nCols, -1, out); + hm.forEach((indicator, rix) -> fillSparseBlockFromIndicatorFromIndicatorInt(nCols, indicator, rix, out)); + return new MatrixBlock(hm.size(), nCols, -1, out); } private static long binaryMVColSingleThreadDense(CompressedMatrixBlock m1, MatrixBlock m2, BinaryOperator op, @@ -599,6 +615,14 @@ private static long binaryMVColSingleThreadSparse(CompressedMatrixBlock m1, Matr return nnz; } + private static long binaryMVColSingleThreadSparseSparse(CompressedMatrixBlock m1, MatrixBlock m2, BinaryOperator op, + boolean left, MatrixBlock ret) { + final int nRows = m1.getNumRows(); + long nnz = 0; + nnz += new BinaryMVColTaskSparseSparse(m1, m2, ret, 0, nRows, op, left).call(); + return nnz; + } + private static long binaryMVColMultiThreadDense(CompressedMatrixBlock m1, MatrixBlock m2, BinaryOperator op, boolean left, MatrixBlock ret) throws Exception { final int nRows = m1.getNumRows(); @@ -641,6 +665,27 @@ private static long binaryMVColMultiThreadSparse(CompressedMatrixBlock m1, Matri return nnz; } + private static long binaryMVColMultiThreadSparseSparse(CompressedMatrixBlock m1, MatrixBlock m2, BinaryOperator op, + boolean left, MatrixBlock ret) throws Exception { + final int nRows = m1.getNumRows(); + final int k = op.getNumThreads(); + final int blkz = Math.max(nRows / k, 64); + long nnz = 0; + final ExecutorService pool = CommonThreadPool.get(op.getNumThreads()); + try { + final ArrayList> tasks = new ArrayList<>(); + for(int i = 0; i < nRows; i += blkz) { + tasks.add(new BinaryMVColTaskSparseSparse(m1, m2, ret, i, Math.min(nRows, i + blkz), op, left)); + } + for(Future f : pool.invokeAll(tasks)) + nnz += f.get(); + } + finally { + pool.shutdown(); + } + return nnz; + } + private static MatrixBlock mmCompressed(CompressedMatrixBlock m1, MatrixBlock m2, BinaryOperator op, boolean left) throws Exception { final int nCols = m1.getNumColumns(); @@ -724,8 +769,8 @@ private static class BinaryMVColTaskCompressed implements Callable { private MatrixBlock tmp; - protected BinaryMVColTaskCompressed(CompressedMatrixBlock m1, MatrixBlock m2, int rl, int ru, - BinaryOperator op, boolean left) { + protected BinaryMVColTaskCompressed(CompressedMatrixBlock m1, MatrixBlock m2, int rl, int ru, BinaryOperator op, + boolean left) { _m1 = m1; _m2 = m2; _op = op; @@ -738,21 +783,21 @@ protected BinaryMVColTaskCompressed(CompressedMatrixBlock m1, MatrixBlock m2, in @Override public Long call() { - tmp = allocateTempUncompressedBlock(_m1.getNumColumns()); - final int _blklen = tmp.getNumRows(); + final int _blklen = Math.max(DECOMPRESSION_BLEN / _m1.getNumColumns(), 64); + tmp = allocateTempUncompressedBlock(_blklen, _m1.getNumColumns()); final List groups = _m1.getColGroups(); final AIterator[] its = getIterators(groups, _rl); long nnz = 0; if(!_left) - for (int rl = _rl, retIxOff = 0; rl < _ru; rl += _blklen, retIxOff += _blklen){ + for(int rl = _rl, retIxOff = 0; rl < _ru; rl += _blklen, retIxOff += _blklen) { int ru = Math.min(rl + _blklen, _ru); decompressToTmpBlock(rl, ru, tmp.getDenseBlock(), groups, its); nnz += processDense(rl, ru, retIxOff); tmp.reset(); } else - for (int rl = _rl, retIxOff = 0; rl < _ru; rl += _blklen, retIxOff += _blklen){ + for(int rl = _rl, retIxOff = 0; rl < _ru; rl += _blklen, retIxOff += _blklen) { int ru = Math.min(rl + _blklen, _ru); decompressToTmpBlock(rl, ru, tmp.getDenseBlock(), groups, its); nnz += processDenseLeft(rl, ru, retIxOff); @@ -770,15 +815,21 @@ private final long processDense(final int rl, final int ru, final int retIxOffse for(int row = rl, retIx = retIxOffset; row < ru; row++, retIx++) { final double vr = _m2Dense[row]; final int tmpOff = (row - rl) * nCol; - int indicatorVector = 0; - for(int col = 0; col < nCol; col++) { - indicatorVector = indicatorVector << 1; - int indicator = _compFn.compare(_tmpDense[tmpOff + col], vr) ? 1 : 0; - indicatorVector += indicator; - nnz += indicator; - } - _ret[retIx] = indicatorVector; + nnz = processRow(nCol, _tmpDense, nnz, retIx, vr, tmpOff); + } + return nnz; + } + + private final long processRow(final int nCol, final double[] _tmpDense, long nnz, int retIx, final double vr, + final int tmpOff) { + int indicatorVector = 0; + for(int col = tmpOff; col < nCol + tmpOff; col++) { + indicatorVector = indicatorVector << 1; + int indicator = _compFn.compare(_tmpDense[col], vr) ? 1 : 0; + indicatorVector += indicator; + nnz += indicator; } + _ret[retIx] = indicatorVector; return nnz; } @@ -847,7 +898,8 @@ private final void processBlock(final int rl, final int ru, final List groups, final AIterator[] its) { + private final void processBlockLeft(final int rl, final int ru, final List groups, + final AIterator[] its) { // unsafe decompress, since we count nonzeros afterwards. final DenseBlock db = _ret.getDenseBlock(); decompressToSubBlock(rl, ru, db, groups, its); @@ -887,7 +939,7 @@ private void processRow(final int ncol, final double[] ret, final int posR, fina private void processRowLeft(final int ncol, final double[] ret, final int posR, final double vr) { for(int col = 0; col < ncol; col++) - ret[posR + col] = _op.fn.execute(vr,ret[posR + col]); + ret[posR + col] = _op.fn.execute(vr, ret[posR + col]); } } @@ -917,8 +969,8 @@ protected BinaryMVColTaskSparse(CompressedMatrixBlock m1, MatrixBlock m2, Matrix @Override public Long call() { - tmp = allocateTempUncompressedBlock(_m1.getNumColumns()); - final int _blklen = tmp.getNumRows(); + final int _blklen = Math.max(DECOMPRESSION_BLEN / _m1.getNumColumns(), 64); + tmp = allocateTempUncompressedBlock(_blklen, _m1.getNumColumns()); final List groups = _m1.getColGroups(); final AIterator[] its = getIterators(groups, _rl); if(!_left) @@ -936,7 +988,8 @@ private final void processBlock(final int rl, final int ru, final List groups, final AIterator[] its) { + private final void processBlockLeft(final int rl, final int ru, final List groups, + final AIterator[] its) { decompressToTmpBlock(rl, ru, tmp.getDenseBlock(), groups, its); processDenseLeft(rl, ru); tmp.reset(); @@ -971,8 +1024,110 @@ private final void processDenseLeft(final int rl, final int ru) { } } - private static MatrixBlock allocateTempUncompressedBlock(int cols) { - MatrixBlock out = new MatrixBlock(Math.max(DECOMPRESSION_BLEN / cols, 64), cols, false); + private static class BinaryMVColTaskSparseSparse implements Callable { + private final int _rl; + private final int _ru; + private final CompressedMatrixBlock _m1; + private final MatrixBlock _m2; + private final MatrixBlock _ret; + private final BinaryOperator _op; + + private MatrixBlock tmp; + + private boolean _left; + + protected BinaryMVColTaskSparseSparse(CompressedMatrixBlock m1, MatrixBlock m2, MatrixBlock ret, int rl, int ru, + BinaryOperator op, boolean left) { + _m1 = m1; + _m2 = m2; + _ret = ret; + _op = op; + _rl = rl; + _ru = ru; + _left = left; + } + + @Override + public Long call() { + final int _blklen = Math.max(DECOMPRESSION_BLEN / _m1.getNumColumns(), 64); + tmp = allocateTempUncompressedBlockSparse(_blklen, _m1.getNumColumns()); + final List groups = _m1.getColGroups(); + final AIterator[] its = getIterators(groups, _rl); + if(!_left) + for(int r = _rl; r < _ru; r += _blklen) + processBlock(r, Math.min(r + _blklen, _ru), groups, its); + else + for(int r = _rl; r < _ru; r += _blklen) + processBlockLeft(r, Math.min(r + _blklen, _ru), groups, its); + return _ret.recomputeNonZeros(_rl, _ru - 1); + } + + private final void processBlock(final int rl, final int ru, final List groups, final AIterator[] its) { + decompressToTmpBlock(rl, ru, tmp.getSparseBlock(), groups, its); + // decompressing multiple column groups can leave the temp rows with unsorted column indices, so sort + // before reading them in stored order into the (column-sorted) output sparse block. + tmp.sortSparseRows(0, ru - rl); + processSparse(rl, ru); + tmp.reset(); + } + + private final void processBlockLeft(final int rl, final int ru, final List groups, + final AIterator[] its) { + decompressToTmpBlock(rl, ru, tmp.getSparseBlock(), groups, its); + tmp.sortSparseRows(0, ru - rl); + processSparseLeft(rl, ru); + tmp.reset(); + } + + private final void processSparse(final int rl, final int ru) { + final SparseBlock sb = _ret.getSparseBlock(); + final SparseBlock _tmpSparse = tmp.getSparseBlock(); + final double[] _m2Dense = _m2.getDenseBlockValues(); + for(int row = rl; row < ru; row++) { + final double vr = _m2Dense[row]; + final int tmpOff = (row - rl); + if(!_tmpSparse.isEmpty(tmpOff)) { + int[] aoff = _tmpSparse.indexes(tmpOff); + double[] aval = _tmpSparse.values(tmpOff); + int apos = _tmpSparse.pos(tmpOff); + int alen = apos + _tmpSparse.size(tmpOff); + + for(int j = apos; j < alen; j++) { + sb.append(row, aoff[j], _op.fn.execute(aval[j], vr)); + } + } + + } + } + + private final void processSparseLeft(final int rl, final int ru) { + final SparseBlock sb = _ret.getSparseBlock(); + final SparseBlock _tmpSparse = tmp.getSparseBlock(); + final double[] _m2Dense = _m2.getDenseBlockValues(); + for(int row = rl; row < ru; row++) { + final double vr = _m2Dense[row]; + final int tmpOff = (row - rl); + if(!_tmpSparse.isEmpty(tmpOff)) { + int[] aoff = _tmpSparse.indexes(tmpOff); + double[] aval = _tmpSparse.values(tmpOff); + int apos = _tmpSparse.pos(tmpOff); + int alen = apos + _tmpSparse.size(tmpOff); + for(int j = apos; j < alen; j++) { + sb.append(row, aoff[j], _op.fn.execute(vr, aval[j])); + } + } + } + } + } + + private static MatrixBlock allocateTempUncompressedBlock(int blklen, int cols) { + MatrixBlock out = new MatrixBlock(blklen, cols, false); + out.allocateBlock(); + return out; + } + + private static MatrixBlock allocateTempUncompressedBlockSparse(int blklen, int cols) { + MatrixBlock out = new MatrixBlock(blklen, cols, true); out.allocateBlock(); return out; } @@ -1199,6 +1354,25 @@ protected static void decompressToTmpBlock(final int rl, final int ru, final Den } } + protected static void decompressToTmpBlock(final int rl, final int ru, final SparseBlock db, + final List groups, final AIterator[] its) { + Timing time = new Timing(true); + for(int i = 0; i < groups.size(); i++) { + final AColGroup g = groups.get(i); + if(g.getCompType() == CompressionType.SDC) + ((ASDCZero) g).decompressToSparseBlock(db, rl, ru, -rl, 0, its[i]); + else + g.decompressToSparseBlock(db, rl, ru, -rl, 0); + } + + if(DMLScript.STATISTICS) { + final double t = time.stop(); + DMLCompressionStatistics.addDecompressToBlockTime(t, 1); + if(LOG.isTraceEnabled()) + LOG.trace("decompressed block w/ k=" + 1 + " in " + t + "ms."); + } + } + protected static AIterator[] getIterators(final List groups, final int rl) { final AIterator[] its = new AIterator[groups.size()]; for(int i = 0; i < groups.size(); i++) { @@ -1210,8 +1384,8 @@ protected static AIterator[] getIterators(final List groups, final in return its; } - private static Pair evaluateSparsityMVCol(CompressedMatrixBlock m1, MatrixBlock m2, BinaryOperator op, - boolean left) { + private static Pair evaluateSparsityMVCol(CompressedMatrixBlock m1, MatrixBlock m2, + BinaryOperator op, boolean left) { final List groups = m1.getColGroups(); final int nCol = m1.getNumColumns(); final int nRow = m1.getNumRows(); @@ -1247,7 +1421,7 @@ private static Pair evaluateSparsityMVCol(CompressedMatrixBlock for(int r = 0; r < sampleRow; r++) { final double m = m2v[r]; final int off = r * sampleCol; - for(int c = 0; c < sampleCol; c++){ + for(int c = 0; c < sampleCol; c++) { int outVal = op.fn.execute(dv[off + c], m) != 0 ? 1 : 0; nnz += outVal; nnzPerRow[r] += outVal; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java index 2ec23037385..d76dbe0d45e 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java @@ -80,8 +80,15 @@ public void processInstruction(ExecutionContext ec) { retBlock = inBlock1; } else { - if(LibCommonsMath.isSupportedMatrixMatrixOperation(getOpcode()) && !compressedLeft && !compressedRight) + if(LibCommonsMath.isSupportedMatrixMatrixOperation(getOpcode()) ){ + if(compressedLeft) + inBlock1 = CompressedMatrixBlock.getUncompressed(inBlock1, getOpcode()); + + if(compressedRight) + inBlock2 = CompressedMatrixBlock.getUncompressed(inBlock2, getOpcode()); + retBlock = LibCommonsMath.matrixMatrixOperations(inBlock1, inBlock2, getOpcode()); + } else { // Perform computation using input matrices, and produce the result matrix BinaryOperator bop = (BinaryOperator) _optr; diff --git a/src/test/java/org/apache/sysds/test/component/compress/lib/CLALibBinaryCellOpCustomTest.java b/src/test/java/org/apache/sysds/test/component/compress/lib/CLALibBinaryCellOpCustomTest.java index 1ce05fab616..48bac7a7920 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/lib/CLALibBinaryCellOpCustomTest.java +++ b/src/test/java/org/apache/sysds/test/component/compress/lib/CLALibBinaryCellOpCustomTest.java @@ -22,14 +22,20 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import static org.junit.Assert.assertTrue; + import org.apache.commons.lang3.tuple.Pair; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; import org.apache.sysds.runtime.compress.CompressionStatistics; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; import org.apache.sysds.runtime.compress.lib.CLALibBinaryCellOp; import org.apache.sysds.runtime.functionobjects.GreaterThanEquals; import org.apache.sysds.runtime.functionobjects.LessThanEquals; import org.apache.sysds.runtime.functionobjects.Minus; +import org.apache.sysds.runtime.functionobjects.Multiply; +import org.apache.sysds.runtime.matrix.data.LibMatrixBincell; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.BinaryOperator; import org.apache.sysds.test.TestUtils; @@ -132,6 +138,41 @@ public void OVV2() { TestUtils.compareMatricesBitAvgDistance(new MatrixBlock(10, 10, 2.5 - 324.0), cRet, 0, 0, op.toString()); } + @Test + public void sparseSparseColVectorNonSDCGroup() { + // Drive the sparse-sparse column-vector path of CLALibBinaryCellOp through a compressed input whose + // column groups are not all SDC. The constant non-zero first column compresses to a non-SDC (Const/DDC) + // group, while the remaining columns stay sparse so the overall matrix is sparse enough to pick the + // sparse-sparse path. This exercises the non-SDC branch of decompressToTmpBlock(SparseBlock), which the + // all-SDC sparse inputs of CLALibBinaryCellOpTest never reach. + final int nRow = 300; + final int nCol = 12; + MatrixBlock mb = new MatrixBlock(nRow, nCol, false); + mb.allocateDenseBlock(); + for(int i = 0; i < nRow; i++) + mb.set(i, 0, 3.0); // constant non-zero column -> non-SDC group + for(int i = 0; i < nRow; i += 7) + mb.set(i, 3, 1.0 + (i % 4)); // a few sparse non-zeros + for(int i = 0; i < nRow; i += 11) + mb.set(i, 8, 4.0); + mb.recomputeNonZeros(); + + CompressedMatrixBlock cmb = (CompressedMatrixBlock) CompressedMatrixBlockFactory.compress(mb, 1).getLeft(); + assertTrue("input must compress to exercise the compressed path", cmb instanceof CompressedMatrixBlock); + boolean hasNonSDC = false; + for(AColGroup g : cmb.getColGroups()) + hasNonSDC |= g.getCompType() != CompressionType.SDC; + assertTrue("need a non-SDC column group to cover the non-SDC decompress branch", hasNonSDC); + + // Multiply is sparse-safe (f(0,v)==0), so the sparse-safe gate routes it through the sparse-sparse path. + BinaryOperator op = new BinaryOperator(Multiply.getMultiplyFnObject(), 1); + MatrixBlock cv = TestUtils.round(TestUtils.generateTestMatrixBlock(nRow, 1, -5, 5, 1.0, 7)); + + MatrixBlock cRet = CLALibBinaryCellOp.binaryOperationsRight(op, cmb, cv); + MatrixBlock uRet = LibMatrixBincell.bincellOp(mb, cv, null, op); + TestUtils.compareMatricesBitAvgDistance(uRet, cRet, 0, 0, op.toString()); + } + @Test public void overwriteToCompressedOnSecondCompressed() { BinaryOperator op = new BinaryOperator(Minus.getMinusFnObject(), 2); diff --git a/src/test/java/org/apache/sysds/test/component/compress/lib/CompressedBinaryMatrixMatrixSolveTest.java b/src/test/java/org/apache/sysds/test/component/compress/lib/CompressedBinaryMatrixMatrixSolveTest.java new file mode 100644 index 00000000000..549010a78cb --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/compress/lib/CompressedBinaryMatrixMatrixSolveTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.compress.lib; + +import static org.junit.Assert.assertTrue; + +import org.apache.sysds.common.Types.DataType; +import org.apache.sysds.common.Types.FileFormat; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.controlprogram.LocalVariableMap; +import org.apache.sysds.runtime.controlprogram.caching.CacheableData; +import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.instructions.InstructionUtils; +import org.apache.sysds.runtime.instructions.cp.BinaryCPInstruction; +import org.apache.sysds.runtime.instructions.cp.BinaryMatrixMatrixCPInstruction; +import org.apache.sysds.runtime.matrix.data.LibCommonsMath; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.meta.MetaDataFormat; +import org.apache.sysds.test.TestUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Drive the solve opcode through {@link BinaryMatrixMatrixCPInstruction} with compressed inputs to cover the + * commons-math matrix-matrix branch that decompresses compressed left/right operands before solving. The + * script-level solve tests only ever see uncompressed inputs, so this branch is otherwise unreached. + */ +public class CompressedBinaryMatrixMatrixSolveTest { + + private static final String SOLVE = "solve"; + + @BeforeClass + public static void init() throws java.io.IOException { + CacheableData.initCaching("compressed_solve_instruction_test"); + } + + @Test + public void solveCompressedLeftCompressedRight() { + // A is a compressible, invertible matrix (constant off-diagonal with a larger diagonal), b is a + // compressed constant right-hand side: both inputs are CompressedMatrixBlock so the instruction must + // decompress both before delegating to commons-math solve. + final int n = 200; + MatrixBlock aUC = new MatrixBlock(n, n, false); + aUC.allocateDenseBlock(); + for(int i = 0; i < n; i++) + for(int j = 0; j < n; j++) + aUC.set(i, j, i == j ? 5.0 : 1.0); + aUC.recomputeNonZeros(); + + CompressedMatrixBlock aC = (CompressedMatrixBlock) CompressedMatrixBlockFactory.compress(aUC, 1).getLeft(); + assertTrue("A must compress to exercise the compressed-left path", aC instanceof CompressedMatrixBlock); + CompressedMatrixBlock bC = CompressedMatrixBlockFactory.createConstant(n, 2, 1.0); + + MatrixBlock expected = LibCommonsMath.matrixMatrixOperations( + CompressedMatrixBlock.getUncompressed(aC), CompressedMatrixBlock.getUncompressed(bC), SOLVE); + + MatrixBlock actual = runSolve(aC, bC); + TestUtils.compareMatricesBitAvgDistance(expected, actual, 0, 0, SOLVE); + } + + @Test + public void solveCompressedLeftDenseRight() { + // Only the left operand is compressed; the right-hand side stays dense (a single column). + final int n = 200; + MatrixBlock aUC = new MatrixBlock(n, n, false); + aUC.allocateDenseBlock(); + for(int i = 0; i < n; i++) + for(int j = 0; j < n; j++) + aUC.set(i, j, i == j ? 5.0 : 1.0); + aUC.recomputeNonZeros(); + + CompressedMatrixBlock aC = (CompressedMatrixBlock) CompressedMatrixBlockFactory.compress(aUC, 1).getLeft(); + assertTrue("A must compress to exercise the compressed-left path", aC instanceof CompressedMatrixBlock); + MatrixBlock b = TestUtils.round(TestUtils.generateTestMatrixBlock(n, 1, -5, 5, 1.0, 7)); + + MatrixBlock expected = LibCommonsMath.matrixMatrixOperations( + CompressedMatrixBlock.getUncompressed(aC), b, SOLVE); + + MatrixBlock actual = runSolve(aC, b); + TestUtils.compareMatricesBitAvgDistance(expected, actual, 0, 0, SOLVE); + } + + private static MatrixBlock runSolve(MatrixBlock a, MatrixBlock b) { + ExecutionContext ec = new ExecutionContext(new LocalVariableMap()); + ec.setAutoCreateVars(true); + ec.setVariable("A", matrixObject("A", a)); + ec.setVariable("b", matrixObject("b", b)); + solveInstruction().processInstruction(ec); + return ec.getMatrixObject("out").acquireReadAndRelease(); + } + + private static BinaryMatrixMatrixCPInstruction solveInstruction() { + String in1 = InstructionUtils.concatOperandParts("A", DataType.MATRIX.name(), ValueType.FP64.name(), "false"); + String in2 = InstructionUtils.concatOperandParts("b", DataType.MATRIX.name(), ValueType.FP64.name(), "false"); + String out = InstructionUtils.concatOperandParts("out", DataType.MATRIX.name(), ValueType.FP64.name(), "false"); + String str = InstructionUtils.concatOperands("CP", SOLVE, in1, in2, out); + return (BinaryMatrixMatrixCPInstruction) BinaryCPInstruction.parseInstruction(str); + } + + private static MatrixObject matrixObject(String name, MatrixBlock mb) { + MatrixCharacteristics mc = new MatrixCharacteristics(mb.getNumRows(), mb.getNumColumns(), 1000, mb.getNonZeros()); + MatrixObject mo = new MatrixObject(ValueType.FP64, "/dev/null/" + name, + new MetaDataFormat(mc, FileFormat.BINARY), mb); + return mo; + } +}