From 12dfb8836d21dceb641588bc06318bb2609829ed Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Fri, 29 May 2026 14:42:10 +0000 Subject: [PATCH 1/4] Restructure CLALibBinaryCellOp and decompression fallbacks for binary ops Reworks how the compressed library handles binary cell-wise operations, broadening the set of inputs it can keep compressed and tightening the "give up and decompress" path so it is consistent across operation shapes. - CLALibBinaryCellOp: - large refactor (~340 lines): split per-operation routing into smaller dispatch helpers, push compressed/uncompressed decisions to the call site, and add fallbacks for shapes the compressed path does not support (SDC + complex shapes, etc.) - BinaryMatrixMatrixCPInstruction: - on supported LibCommonsMath ops with compressed inputs, eagerly decompress so the math library always sees a plain MatrixBlock - CompressedMatrixBlock: - centralize getUncompressed(opcode) callers; tighten how binaryOperationsLeft/Right/InPlace route into the lib - CompressedMatrixBlockFactory: - small adjustments to align with the new lib entry points --- .../compress/CompressedMatrixBlock.java | 18 +- .../CompressedMatrixBlockFactory.java | 24 +- .../compress/lib/CLALibBinaryCellOp.java | 340 +++++++++++++----- .../cp/BinaryMatrixMatrixCPInstruction.java | 9 +- 4 files changed, 283 insertions(+), 108 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java index 3a79443157b..64bf2aa43d2 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java @@ -59,8 +59,8 @@ import org.apache.sysds.runtime.compress.lib.CLALibMatrixMult; import org.apache.sysds.runtime.compress.lib.CLALibMerge; import org.apache.sysds.runtime.compress.lib.CLALibRemoveEmpty; -import org.apache.sysds.runtime.compress.lib.CLALibReplace; import org.apache.sysds.runtime.compress.lib.CLALibReorg; +import org.apache.sysds.runtime.compress.lib.CLALibReplace; import org.apache.sysds.runtime.compress.lib.CLALibReshape; import org.apache.sysds.runtime.compress.lib.CLALibRexpand; import org.apache.sysds.runtime.compress.lib.CLALibScalar; @@ -103,6 +103,7 @@ import org.apache.sysds.runtime.util.IndexRange; import org.apache.sysds.utils.DMLCompressionStatistics; import org.apache.sysds.utils.stats.InfrastructureAnalyzer; +import org.apache.sysds.utils.stats.Timing; public class CompressedMatrixBlock extends MatrixBlock { private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName()); @@ -477,16 +478,20 @@ public void readFields(DataInput in) throws IOException { } public static CompressedMatrixBlock read(DataInput in) throws IOException { + Timing t = new Timing(); int rlen = in.readInt(); int clen = in.readInt(); long nonZeros = in.readLong(); boolean overlappingColGroups = in.readBoolean(); List groups = ColGroupIO.readGroups(in, rlen); - return new CompressedMatrixBlock(rlen, clen, nonZeros, overlappingColGroups, groups); + CompressedMatrixBlock ret = new CompressedMatrixBlock(rlen, clen, nonZeros, overlappingColGroups, groups); + LOG.debug("Compressed read serialization time: " + t.stop()); + return ret; } @Override public void write(DataOutput out) throws IOException { + Timing t = new Timing(); final long estimateUncompressed = nonZeros > 0 ? MatrixBlock.estimateSizeOnDisk(rlen, clen, nonZeros) : Long.MAX_VALUE; final long estDisk = nonZeros > 0 ? getExactSizeOnDisk() : Long.MAX_VALUE; @@ -514,6 +519,7 @@ public void write(DataOutput out) throws IOException { out.writeLong(nonZeros); out.writeBoolean(overlappingColGroups); ColGroupIO.writeGroups(out, _colGroups); + LOG.debug("Compressed write serialization time: " + t.stop()); } /** @@ -613,14 +619,6 @@ public MatrixBlock aggregateUnaryOperations(AggregateUnaryOperator op, MatrixVal public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype, int k) { // check for transpose type if(tstype == MMTSJType.LEFT) { - if(isEmpty()) - return new MatrixBlock(clen, clen, true); - // create output matrix block - if(out == null) - out = new MatrixBlock(clen, clen, false); - else - out.reset(clen, clen, false); - out.allocateDenseBlock(); CLALibTSMM.leftMultByTransposeSelf(this, out, k); return out; } diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java index 4c48effb4df..f082d1ffc3d 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java @@ -64,6 +64,8 @@ public class CompressedMatrixBlockFactory { private static final Log LOG = LogFactory.getLog(CompressedMatrixBlockFactory.class.getName()); + private static final Object asyncCompressLock = new Object(); + /** Timing object to measure the time of each phase in the compression */ private final Timing time = new Timing(true); /** Compression statistics gathered throughout the compression */ @@ -181,21 +183,23 @@ public static Future compressAsync(ExecutionContext ec, String varName) { } public static Future compressAsync(ExecutionContext ec, String varName, InstructionTypeCounter ins) { - LOG.debug("Compressing Async"); final ExecutorService pool = CommonThreadPool.get(); // We have to guarantee that a thread pool is allocated. return CompletableFuture.runAsync(() -> { // method call or code to be async try { CacheableData data = ec.getCacheableData(varName); - if(data instanceof MatrixObject) { - MatrixObject mo = (MatrixObject) data; - MatrixBlock mb = mo.acquireReadAndRelease(); - MatrixBlock mbc = CompressedMatrixBlockFactory.compress(mo.acquireReadAndRelease(), ins).getLeft(); - if(mbc instanceof CompressedMatrixBlock) { - ExecutionContext.createCacheableData(mb); - mo.acquireModify(mbc); - mo.release(); - mbc.sum(); // calculate sum to forcefully materialize counts + synchronized(asyncCompressLock){ // synchronize on the data object to not allow multiple compressions of the same matrix. + if(data instanceof MatrixObject) { + LOG.debug("Compressing Async"); + MatrixObject mo = (MatrixObject) data; + MatrixBlock mb = mo.acquireReadAndRelease(); + MatrixBlock mbc = CompressedMatrixBlockFactory.compress(mb, ins).getLeft(); + if(mbc instanceof CompressedMatrixBlock) { + ExecutionContext.createCacheableData(mb); + mo.acquireModify(mbc); + mo.release(); + mbc.sum(); // calculate sum to forcefully materialize counts + } } } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java index ce52bcd23fd..26779215306 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java @@ -48,6 +48,7 @@ import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; import org.apache.sysds.runtime.compress.colgroup.offset.AIterator; +import org.apache.sysds.runtime.compress.utils.HashMapIntToInt; import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.data.DenseBlockFP64; import org.apache.sysds.runtime.data.SparseBlock; @@ -55,7 +56,6 @@ import org.apache.sysds.runtime.data.SparseRow; import org.apache.sysds.runtime.data.SparseRowScalar; import org.apache.sysds.runtime.data.SparseRowVector; -import org.apache.sysds.runtime.frame.data.columns.HashMapToInt; import org.apache.sysds.runtime.functionobjects.Divide; import org.apache.sysds.runtime.functionobjects.Minus; import org.apache.sysds.runtime.functionobjects.Multiply; @@ -77,7 +77,7 @@ public final class CLALibBinaryCellOp { private static final Log LOG = LogFactory.getLog(CLALibBinaryCellOp.class.getName()); - public static final int DECOMPRESSION_BLEN = 16384; + public static final int DECOMPRESSION_BLEN = 16384 / 2; private CLALibBinaryCellOp() { // empty private constructor. @@ -86,7 +86,7 @@ private CLALibBinaryCellOp() { public static MatrixBlock binaryOperationsRight(BinaryOperator op, CompressedMatrixBlock m1, MatrixBlock that) { try { - op = LibMatrixBincell.replaceOpWithSparseSafeIfApplicable(m1, that, op); + op = LibMatrixBincell.replaceOpWithSparseSafeIfApplicable(m1, that, op); if((that.getNumRows() == 1 && that.getNumColumns() == 1) || that.isEmpty()) { ScalarOperator sop = new RightScalarOperator(op.fn, that.get(0, 0), op.getNumThreads()); @@ -113,7 +113,7 @@ public static MatrixBlock binaryOperationsLeft(BinaryOperator op, CompressedMatr return selectProcessingBasedOnAccessType(op, m1, that, atype, true); } catch(Exception e) { - throw new DMLRuntimeException("Failed Left Binary Compressed Operation", e); + throw new DMLRuntimeException("Failed Left Binary Compressed Operation: " + op, e); } } @@ -122,8 +122,8 @@ private static MatrixBlock binaryOperationsRightFiltered(BinaryOperator op, Comp BinaryAccessType atype = LibMatrixBincell.getBinaryAccessTypeExtended(m1, that); if(isDoubleCompressedOpApplicable(m1, that)) return doubleCompressedBinaryOp(op, m1, (CompressedMatrixBlock) that); - if(that instanceof CompressedMatrixBlock && that.getNumColumns() == m1.getNumColumns() - && that.getInMemorySize() < m1.getInMemorySize() ) { + if(that instanceof CompressedMatrixBlock && that.getNumColumns() == m1.getNumColumns() && + that.getInMemorySize() < m1.getInMemorySize()) { MatrixBlock m1uc = CompressedMatrixBlock.getUncompressed(m1, "Decompressing left side in BinaryOps"); return selectProcessingBasedOnAccessType(op, (CompressedMatrixBlock) that, m1uc, atype, true); } @@ -135,16 +135,15 @@ private static MatrixBlock binaryOperationsRightFiltered(BinaryOperator op, Comp } private static boolean isDoubleCompressedOpApplicable(CompressedMatrixBlock m1, MatrixBlock that) { - return that instanceof CompressedMatrixBlock - && !m1.isOverlapping() - && m1.getColGroups().get(0) instanceof ColGroupDDC - && !((CompressedMatrixBlock) that).isOverlapping() - && ((CompressedMatrixBlock) that).getColGroups().get(0) instanceof ColGroupDDC - && ((IMapToDataGroup) m1.getColGroups().get(0)).getMapToData() == - ((IMapToDataGroup) ((CompressedMatrixBlock) that).getColGroups().get(0)).getMapToData(); + return that instanceof CompressedMatrixBlock && !m1.isOverlapping() && + m1.getColGroups().get(0) instanceof ColGroupDDC && !((CompressedMatrixBlock) that).isOverlapping() && + ((CompressedMatrixBlock) that).getColGroups().get(0) instanceof ColGroupDDC && + ((IMapToDataGroup) m1.getColGroups().get(0)) + .getMapToData() == ((IMapToDataGroup) ((CompressedMatrixBlock) that).getColGroups().get(0)).getMapToData(); } - private static CompressedMatrixBlock doubleCompressedBinaryOp(BinaryOperator op, CompressedMatrixBlock m1, CompressedMatrixBlock m2) { + private static CompressedMatrixBlock doubleCompressedBinaryOp(BinaryOperator op, CompressedMatrixBlock m1, + CompressedMatrixBlock m2) { LOG.debug("Double Compressed BinaryOp"); AColGroup left = m1.getColGroups().get(0); AColGroup right = m2.getColGroups().get(0); @@ -201,6 +200,7 @@ private static MatrixBlock mvCol(BinaryOperator op, CompressedMatrixBlock m1, Ma // Column vector access MatrixBlock d_compressed = m1.getCachedDecompressed(); if(d_compressed != null) { + LOG.debug("Using cached decompressed for Matrix column vector compressed operation"); if(left) throw new NotImplementedException("Binary row op left is not supported for Uncompressed Matrix, " + "Implement support for VMr in MatrixBlock Binary Cell operations"); @@ -416,17 +416,24 @@ private static MatrixBlock mvColCompressed(CompressedMatrixBlock m1, MatrixBlock Pair tuple = evaluateSparsityMVCol(m1, m2, op, left); double estSparsity = tuple.getKey(); double estNnzPerRow = tuple.getValue(); - boolean shouldBeSparseOut = MatrixBlock.evalSparseFormatInMemory(nRows, nCols, (long) (estSparsity * nRows * nCols)); + boolean shouldBeSparseOut = MatrixBlock.evalSparseFormatInMemory(nRows, nCols, + (long) (estSparsity * nRows * nCols)); // currently also jump into that case if estNnzPerRow == 0 - if(estNnzPerRow <= 2 && nCols <= 31 && op.fn instanceof ValueComparisonFunction){ - return k <= 1 ? binaryMVComparisonColSingleThreadCompressed(m1, m2, op, left) : - binaryMVComparisonColMultiCompressed(m1, m2, op, left); + if(estNnzPerRow <= 2 && nCols <= 31 && op.fn instanceof ValueComparisonFunction) { + return k <= 1 ? binaryMVComparisonColSingleThreadCompressed(m1, m2, op, + left) : binaryMVComparisonColMultiCompressed(m1, m2, op, left); } MatrixBlock ret = new MatrixBlock(nRows, nCols, shouldBeSparseOut, -1).allocateBlock(); if(shouldBeSparseOut) { - if(k <= 1) + if(!m1.isOverlapping() && MatrixBlock.evalSparseFormatInMemory(nRows, nCols, m1.getNonZeros())) { + if(k <= 1) + nnz = binaryMVColSingleThreadSparseSparse(m1, m2, op, left, ret); + else + nnz = binaryMVColMultiThreadSparseSparse(m1, m2, op, left, ret); + } + else if(k <= 1) nnz = binaryMVColSingleThreadSparse(m1, m2, op, left, ret); else nnz = binaryMVColMultiThreadSparse(m1, m2, op, left, ret); @@ -438,7 +445,7 @@ private static MatrixBlock mvColCompressed(CompressedMatrixBlock m1, MatrixBlock nnz = binaryMVColMultiThreadDense(m1, m2, op, left, ret); } - if(op.fn instanceof ValueComparisonFunction) { + if(op.fn instanceof ValueComparisonFunction) { // potentially empty or filled. if(nnz == (long) nRows * nCols)// all was 1 return CompressedMatrixBlockFactory.createConstant(nRows, nCols, 1.0); else if(nnz == 0) // all was 0 -> return empty. @@ -452,19 +459,19 @@ else if(nnz == 0) // all was 0 -> return empty. } private static MatrixBlock binaryMVComparisonColSingleThreadCompressed(CompressedMatrixBlock m1, MatrixBlock m2, - BinaryOperator op, boolean left) { + BinaryOperator op, boolean left) { final int nRows = m1.getNumRows(); final int nCols = m1.getNumColumns(); // get indicators (one-hot-encoded comparison results) - BinaryMVColTaskCompressed task = new BinaryMVColTaskCompressed(m1, m2, 0, nRows, op, left); + BinaryMVColTaskCompressed task = new BinaryMVColTaskCompressed(m1, m2, 0, nRows, op, left); long nnz = task.call(); int[] indicators = task._ret; // map each unique indicator to an index - HashMapToInt hm = new HashMapToInt<>(nCols*3); + HashMapIntToInt hm = new HashMapIntToInt(nCols * 3); int[] colMap = new int[nRows]; - for(int i = 0; i < m1.getNumRows(); i++){ + for(int i = 0; i < m1.getNumRows(); i++) { int nextId = hm.size(); int id = hm.putIfAbsentI(indicators[i], nextId); colMap[i] = id == -1 ? nextId : id; @@ -477,37 +484,39 @@ private static MatrixBlock binaryMVComparisonColSingleThreadCompressed(Compresse return getCompressedMatrixBlock(m1, colMap, hm.size(), outMb, nRows, nCols, nnz); } - private static void fillSparseBlockFromIndicatorFromIndicatorInt(int numCol, Integer indicator, Integer rix, SparseBlockMCSR out) { + private static void fillSparseBlockFromIndicatorFromIndicatorInt(int numCol, Integer indicator, Integer rix, + SparseBlockMCSR out) { ArrayList colIndices = new ArrayList<>(8); - for (int c = numCol - 1; c >= 0; c--) { + for(int c = numCol - 1; c >= 0; c--) { if(indicator <= 0) break; - if(indicator % 2 == 1){ + if(indicator % 2 == 1) { colIndices.add(c); } indicator = indicator >> 1; } SparseRow row = null; - if(colIndices.size() > 1){ + if(colIndices.size() > 1) { double[] vals = new double[colIndices.size()]; Arrays.fill(vals, 1); int[] indices = new int[colIndices.size()]; - for (int i = 0, j = colIndices.size() - 1; i < colIndices.size(); i++, j--) + for(int i = 0, j = colIndices.size() - 1; i < colIndices.size(); i++, j--) indices[i] = colIndices.get(j); row = new SparseRowVector(vals, indices); - } else if(colIndices.size() == 1){ + } + else if(colIndices.size() == 1) { row = new SparseRowScalar(colIndices.get(0), 1.0); } out.set(rix, row, false); } private static MatrixBlock binaryMVComparisonColMultiCompressed(CompressedMatrixBlock m1, MatrixBlock m2, - BinaryOperator op, boolean left) throws Exception { + BinaryOperator op, boolean left) throws Exception { final int nRows = m1.getNumRows(); final int nCols = m1.getNumColumns(); final int k = op.getNumThreads(); - final int blkz = nRows / k; + final int blkz = Math.max((nRows + k) / k, 1000); // get indicators (one-hot-encoded comparison results) long nnz = 0; @@ -518,14 +527,11 @@ private static MatrixBlock binaryMVComparisonColMultiCompressed(CompressedMatrix tasks.add(new BinaryMVColTaskCompressed(m1, m2, i, Math.min(nRows, i + blkz), op, left)); } List> futures = pool.invokeAll(tasks); - HashMapToInt hm = new HashMapToInt<>(nCols*2); + HashMapIntToInt hm = new HashMapIntToInt(nCols * 2); int[] colMap = new int[nRows]; - for(Future f : futures) - nnz += f.get(); - // map each unique indicator to an index - mergeMVColTaskResults(tasks, blkz, hm, colMap); + nnz = mergeMVColTaskResults(futures, tasks, blkz, hm, colMap); // decode the unique indicator ints to SparseVectors MatrixBlock outMb = getMCSRMatrixBlock(hm, nCols); @@ -539,48 +545,53 @@ private static MatrixBlock binaryMVComparisonColMultiCompressed(CompressedMatrix } - private static void mergeMVColTaskResults(ArrayList tasks, int blkz, HashMapToInt hm, int[] colMap) { - + private static long mergeMVColTaskResults(List> futures, ArrayList tasks, + int blkz, HashMapIntToInt hm, int[] colMap) throws InterruptedException, ExecutionException { + long nnz = 0; for(int j = 0; j < tasks.size(); j++) { + nnz += futures.get(j).get(); // ensure task was finished. int[] indicators = tasks.get(j)._ret; - int offset = j* blkz; - - final int remainders = indicators.length % 8; - final int endVecLen = indicators.length - remainders; - for (int i = 0; i < endVecLen; i+= 8) { - colMap[offset + i] = hm.putIfAbsentReturnVal(indicators[i], hm.size()); - colMap[offset + i + 1] = hm.putIfAbsentReturnVal(indicators[i + 1], hm.size()); - colMap[offset + i + 2] = hm.putIfAbsentReturnVal(indicators[i + 2], hm.size()); - colMap[offset + i + 3] = hm.putIfAbsentReturnVal(indicators[i + 3], hm.size()); - colMap[offset + i + 4] = hm.putIfAbsentReturnVal(indicators[i + 4], hm.size()); - colMap[offset + i + 5] = hm.putIfAbsentReturnVal(indicators[i + 5], hm.size()); - colMap[offset + i + 6] = hm.putIfAbsentReturnVal(indicators[i + 6], hm.size()); - colMap[offset + i + 7] = hm.putIfAbsentReturnVal(indicators[i + 7], hm.size()); + int offset = j * blkz; - } - for (int i = 0; i < remainders; i++) { - colMap[offset + endVecLen + i] = hm.putIfAbsentReturnVal(indicators[endVecLen + i], hm.size()); - } + mergeMVColUnrolled(hm, colMap, indicators, offset); } + return nnz; } + private static void mergeMVColUnrolled(HashMapIntToInt hm, int[] colMap, int[] indicators, int offset) { + final int remainders = indicators.length % 8; + final int endVecLen = indicators.length - remainders; + for(int i = 0; i < endVecLen; i += 8) { + colMap[offset + i] = hm.putIfAbsentReturnVal(indicators[i], hm.size()); + colMap[offset + i + 1] = hm.putIfAbsentReturnVal(indicators[i + 1], hm.size()); + colMap[offset + i + 2] = hm.putIfAbsentReturnVal(indicators[i + 2], hm.size()); + colMap[offset + i + 3] = hm.putIfAbsentReturnVal(indicators[i + 3], hm.size()); + colMap[offset + i + 4] = hm.putIfAbsentReturnVal(indicators[i + 4], hm.size()); + colMap[offset + i + 5] = hm.putIfAbsentReturnVal(indicators[i + 5], hm.size()); + colMap[offset + i + 6] = hm.putIfAbsentReturnVal(indicators[i + 6], hm.size()); + colMap[offset + i + 7] = hm.putIfAbsentReturnVal(indicators[i + 7], hm.size()); - private static CompressedMatrixBlock getCompressedMatrixBlock(CompressedMatrixBlock m1, int[] colMap, - int mapSize, MatrixBlock outMb, int nRows, int nCols, long nnz) { + } + for(int i = 0; i < remainders; i++) { + colMap[offset + endVecLen + i] = hm.putIfAbsentReturnVal(indicators[endVecLen + i], hm.size()); + } + } + + private static CompressedMatrixBlock getCompressedMatrixBlock(CompressedMatrixBlock m1, int[] colMap, int mapSize, + MatrixBlock outMb, int nRows, int nCols, long nnz) { final IColIndex i = ColIndexFactory.create(0, m1.getNumColumns()); final AMapToData map = MapToFactory.create(m1.getNumRows(), colMap, mapSize); final AColGroup rgroup = ColGroupDDC.create(i, MatrixBlockDictionary.create(outMb), map, null); final ArrayList groups = new ArrayList<>(1); groups.add(rgroup); - return new CompressedMatrixBlock(nRows, nCols, nnz, false, groups); + return new CompressedMatrixBlock(nRows, nCols, nnz, false, groups); } - private static MatrixBlock getMCSRMatrixBlock(HashMapToInt hm, int nCols) { + private static MatrixBlock getMCSRMatrixBlock(HashMapIntToInt hm, int nCols) { // decode the unique indicator ints to SparseVectors SparseBlockMCSR out = new SparseBlockMCSR(hm.size()); - hm.forEach((indicator, rix) -> - fillSparseBlockFromIndicatorFromIndicatorInt(nCols, indicator, rix, out)); - return new MatrixBlock(hm.size(), nCols, -1, out); + hm.forEach((indicator, rix) -> fillSparseBlockFromIndicatorFromIndicatorInt(nCols, indicator, rix, out)); + return new MatrixBlock(hm.size(), nCols, -1, out); } private static long binaryMVColSingleThreadDense(CompressedMatrixBlock m1, MatrixBlock m2, BinaryOperator op, @@ -599,6 +610,14 @@ private static long binaryMVColSingleThreadSparse(CompressedMatrixBlock m1, Matr return nnz; } + private static long binaryMVColSingleThreadSparseSparse(CompressedMatrixBlock m1, MatrixBlock m2, BinaryOperator op, + boolean left, MatrixBlock ret) { + final int nRows = m1.getNumRows(); + long nnz = 0; + nnz += new BinaryMVColTaskSparseSparse(m1, m2, ret, 0, nRows, op, left).call(); + return nnz; + } + private static long binaryMVColMultiThreadDense(CompressedMatrixBlock m1, MatrixBlock m2, BinaryOperator op, boolean left, MatrixBlock ret) throws Exception { final int nRows = m1.getNumRows(); @@ -641,6 +660,27 @@ private static long binaryMVColMultiThreadSparse(CompressedMatrixBlock m1, Matri return nnz; } + private static long binaryMVColMultiThreadSparseSparse(CompressedMatrixBlock m1, MatrixBlock m2, BinaryOperator op, + boolean left, MatrixBlock ret) throws Exception { + final int nRows = m1.getNumRows(); + final int k = op.getNumThreads(); + final int blkz = Math.max(nRows / k, 64); + long nnz = 0; + final ExecutorService pool = CommonThreadPool.get(op.getNumThreads()); + try { + final ArrayList> tasks = new ArrayList<>(); + for(int i = 0; i < nRows; i += blkz) { + tasks.add(new BinaryMVColTaskSparseSparse(m1, m2, ret, i, Math.min(nRows, i + blkz), op, left)); + } + for(Future f : pool.invokeAll(tasks)) + nnz += f.get(); + } + finally { + pool.shutdown(); + } + return nnz; + } + private static MatrixBlock mmCompressed(CompressedMatrixBlock m1, MatrixBlock m2, BinaryOperator op, boolean left) throws Exception { final int nCols = m1.getNumColumns(); @@ -724,8 +764,8 @@ private static class BinaryMVColTaskCompressed implements Callable { private MatrixBlock tmp; - protected BinaryMVColTaskCompressed(CompressedMatrixBlock m1, MatrixBlock m2, int rl, int ru, - BinaryOperator op, boolean left) { + protected BinaryMVColTaskCompressed(CompressedMatrixBlock m1, MatrixBlock m2, int rl, int ru, BinaryOperator op, + boolean left) { _m1 = m1; _m2 = m2; _op = op; @@ -738,21 +778,21 @@ protected BinaryMVColTaskCompressed(CompressedMatrixBlock m1, MatrixBlock m2, in @Override public Long call() { - tmp = allocateTempUncompressedBlock(_m1.getNumColumns()); - final int _blklen = tmp.getNumRows(); + final int _blklen = Math.max(DECOMPRESSION_BLEN / _m1.getNumColumns(), 64); + tmp = allocateTempUncompressedBlock(_blklen, _m1.getNumColumns()); final List groups = _m1.getColGroups(); final AIterator[] its = getIterators(groups, _rl); long nnz = 0; if(!_left) - for (int rl = _rl, retIxOff = 0; rl < _ru; rl += _blklen, retIxOff += _blklen){ + for(int rl = _rl, retIxOff = 0; rl < _ru; rl += _blklen, retIxOff += _blklen) { int ru = Math.min(rl + _blklen, _ru); decompressToTmpBlock(rl, ru, tmp.getDenseBlock(), groups, its); nnz += processDense(rl, ru, retIxOff); tmp.reset(); } else - for (int rl = _rl, retIxOff = 0; rl < _ru; rl += _blklen, retIxOff += _blklen){ + for(int rl = _rl, retIxOff = 0; rl < _ru; rl += _blklen, retIxOff += _blklen) { int ru = Math.min(rl + _blklen, _ru); decompressToTmpBlock(rl, ru, tmp.getDenseBlock(), groups, its); nnz += processDenseLeft(rl, ru, retIxOff); @@ -770,18 +810,24 @@ private final long processDense(final int rl, final int ru, final int retIxOffse for(int row = rl, retIx = retIxOffset; row < ru; row++, retIx++) { final double vr = _m2Dense[row]; final int tmpOff = (row - rl) * nCol; - int indicatorVector = 0; - for(int col = 0; col < nCol; col++) { - indicatorVector = indicatorVector << 1; - int indicator = _compFn.compare(_tmpDense[tmpOff + col], vr) ? 1 : 0; - indicatorVector += indicator; - nnz += indicator; - } - _ret[retIx] = indicatorVector; + nnz = processRow(nCol, _tmpDense, nnz, retIx, vr, tmpOff); } return nnz; } + private final long processRow(final int nCol, final double[] _tmpDense, long nnz, int retIx, final double vr, + final int tmpOff) { + int indicatorVector = 0; + for(int col = tmpOff; col < nCol + tmpOff; col++) { + indicatorVector = indicatorVector << 1; + int indicator = _compFn.compare(_tmpDense[col], vr) ? 1 : 0; + indicatorVector += indicator; + nnz += indicator; + } + _ret[retIx] = indicatorVector; + return nnz; + } + private final long processDenseLeft(final int rl, final int ru, final int retIxOffset) { final int nCol = _m1.getNumColumns(); final double[] _tmpDense = tmp.getDenseBlockValues(); @@ -847,7 +893,8 @@ private final void processBlock(final int rl, final int ru, final List groups, final AIterator[] its) { + private final void processBlockLeft(final int rl, final int ru, final List groups, + final AIterator[] its) { // unsafe decompress, since we count nonzeros afterwards. final DenseBlock db = _ret.getDenseBlock(); decompressToSubBlock(rl, ru, db, groups, its); @@ -887,7 +934,7 @@ private void processRow(final int ncol, final double[] ret, final int posR, fina private void processRowLeft(final int ncol, final double[] ret, final int posR, final double vr) { for(int col = 0; col < ncol; col++) - ret[posR + col] = _op.fn.execute(vr,ret[posR + col]); + ret[posR + col] = _op.fn.execute(vr, ret[posR + col]); } } @@ -917,8 +964,8 @@ protected BinaryMVColTaskSparse(CompressedMatrixBlock m1, MatrixBlock m2, Matrix @Override public Long call() { - tmp = allocateTempUncompressedBlock(_m1.getNumColumns()); - final int _blklen = tmp.getNumRows(); + final int _blklen = Math.max(DECOMPRESSION_BLEN / _m1.getNumColumns(), 64); + tmp = allocateTempUncompressedBlock(_blklen, _m1.getNumColumns()); final List groups = _m1.getColGroups(); final AIterator[] its = getIterators(groups, _rl); if(!_left) @@ -936,7 +983,8 @@ private final void processBlock(final int rl, final int ru, final List groups, final AIterator[] its) { + private final void processBlockLeft(final int rl, final int ru, final List groups, + final AIterator[] its) { decompressToTmpBlock(rl, ru, tmp.getDenseBlock(), groups, its); processDenseLeft(rl, ru); tmp.reset(); @@ -971,8 +1019,107 @@ private final void processDenseLeft(final int rl, final int ru) { } } - private static MatrixBlock allocateTempUncompressedBlock(int cols) { - MatrixBlock out = new MatrixBlock(Math.max(DECOMPRESSION_BLEN / cols, 64), cols, false); + private static class BinaryMVColTaskSparseSparse implements Callable { + private final int _rl; + private final int _ru; + private final CompressedMatrixBlock _m1; + private final MatrixBlock _m2; + private final MatrixBlock _ret; + private final BinaryOperator _op; + + private MatrixBlock tmp; + + private boolean _left; + + protected BinaryMVColTaskSparseSparse(CompressedMatrixBlock m1, MatrixBlock m2, MatrixBlock ret, int rl, int ru, + BinaryOperator op, boolean left) { + _m1 = m1; + _m2 = m2; + _ret = ret; + _op = op; + _rl = rl; + _ru = ru; + _left = left; + } + + @Override + public Long call() { + final int _blklen = Math.max(DECOMPRESSION_BLEN / _m1.getNumColumns(), 64); + tmp = allocateTempUncompressedBlockSparse(_blklen, _m1.getNumColumns()); + final List groups = _m1.getColGroups(); + final AIterator[] its = getIterators(groups, _rl); + if(!_left) + for(int r = _rl; r < _ru; r += _blklen) + processBlock(r, Math.min(r + _blklen, _ru), groups, its); + else + for(int r = _rl; r < _ru; r += _blklen) + processBlockLeft(r, Math.min(r + _blklen, _ru), groups, its); + return _ret.recomputeNonZeros(_rl, _ru - 1); + } + + private final void processBlock(final int rl, final int ru, final List groups, final AIterator[] its) { + decompressToTmpBlock(rl, ru, tmp.getSparseBlock(), groups, its); + processDense(rl, ru); + tmp.reset(); + } + + private final void processBlockLeft(final int rl, final int ru, final List groups, + final AIterator[] its) { + decompressToTmpBlock(rl, ru, tmp.getSparseBlock(), groups, its); + processDenseLeft(rl, ru); + tmp.reset(); + } + + private final void processDense(final int rl, final int ru) { + final SparseBlock sb = _ret.getSparseBlock(); + final SparseBlock _tmpSparse = tmp.getSparseBlock(); + final double[] _m2Dense = _m2.getDenseBlockValues(); + for(int row = rl; row < ru; row++) { + final double vr = _m2Dense[row]; + final int tmpOff = (row - rl); + if(!_tmpSparse.isEmpty(tmpOff)){ + int[] aoff = _tmpSparse.indexes(tmpOff); + double[] aval = _tmpSparse.values(tmpOff); + int apos = _tmpSparse.pos(tmpOff); + int alen = apos + _tmpSparse.size(tmpOff); + + for(int j = apos; j < alen; j++){ + sb.append(row, aoff[j], _op.fn.execute(aval[j], vr)); + } + } + + } + } + + private final void processDenseLeft(final int rl, final int ru) { + final int nCol = _m1.getNumColumns(); + final SparseBlock sb = _ret.getSparseBlock(); + final SparseBlock _tmpSparse = tmp.getSparseBlock(); + final double[] _m2Dense = _m2.getDenseBlockValues(); + for(int row = rl; row < ru; row++) { + final double vr = _m2Dense[row]; + final int tmpOff = (row - rl) * nCol; + if(!_tmpSparse.isEmpty(tmpOff)){ + int[] aoff = _tmpSparse.indexes(tmpOff); + double[] aval = _tmpSparse.values(tmpOff); + int apos = _tmpSparse.pos(tmpOff); + int alen = apos + _tmpSparse.size(tmpOff); + for(int j = apos; j < alen; j++){ + sb.append(row, aoff[j], _op.fn.execute(vr,aval[j])); + } + } + } + } + } + + private static MatrixBlock allocateTempUncompressedBlock(int blklen, int cols) { + MatrixBlock out = new MatrixBlock(blklen, cols, false); + out.allocateBlock(); + return out; + } + + private static MatrixBlock allocateTempUncompressedBlockSparse(int blklen, int cols) { + MatrixBlock out = new MatrixBlock(blklen, cols, true); out.allocateBlock(); return out; } @@ -1199,6 +1346,25 @@ protected static void decompressToTmpBlock(final int rl, final int ru, final Den } } + protected static void decompressToTmpBlock(final int rl, final int ru, final SparseBlock db, + final List groups, final AIterator[] its) { + Timing time = new Timing(true); + for(int i = 0; i < groups.size(); i++) { + final AColGroup g = groups.get(i); + if(g.getCompType() == CompressionType.SDC) + ((ASDCZero) g).decompressToSparseBlock(db, rl, ru, -rl, 0, its[i]); + else + g.decompressToSparseBlock(db, rl, ru, -rl, 0); + } + + if(DMLScript.STATISTICS) { + final double t = time.stop(); + DMLCompressionStatistics.addDecompressToBlockTime(t, 1); + if(LOG.isTraceEnabled()) + LOG.trace("decompressed block w/ k=" + 1 + " in " + t + "ms."); + } + } + protected static AIterator[] getIterators(final List groups, final int rl) { final AIterator[] its = new AIterator[groups.size()]; for(int i = 0; i < groups.size(); i++) { @@ -1210,8 +1376,8 @@ protected static AIterator[] getIterators(final List groups, final in return its; } - private static Pair evaluateSparsityMVCol(CompressedMatrixBlock m1, MatrixBlock m2, BinaryOperator op, - boolean left) { + private static Pair evaluateSparsityMVCol(CompressedMatrixBlock m1, MatrixBlock m2, + BinaryOperator op, boolean left) { final List groups = m1.getColGroups(); final int nCol = m1.getNumColumns(); final int nRow = m1.getNumRows(); @@ -1247,7 +1413,7 @@ private static Pair evaluateSparsityMVCol(CompressedMatrixBlock for(int r = 0; r < sampleRow; r++) { final double m = m2v[r]; final int off = r * sampleCol; - for(int c = 0; c < sampleCol; c++){ + for(int c = 0; c < sampleCol; c++) { int outVal = op.fn.execute(dv[off + c], m) != 0 ? 1 : 0; nnz += outVal; nnzPerRow[r] += outVal; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java index 2ec23037385..d76dbe0d45e 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java @@ -80,8 +80,15 @@ public void processInstruction(ExecutionContext ec) { retBlock = inBlock1; } else { - if(LibCommonsMath.isSupportedMatrixMatrixOperation(getOpcode()) && !compressedLeft && !compressedRight) + if(LibCommonsMath.isSupportedMatrixMatrixOperation(getOpcode()) ){ + if(compressedLeft) + inBlock1 = CompressedMatrixBlock.getUncompressed(inBlock1, getOpcode()); + + if(compressedRight) + inBlock2 = CompressedMatrixBlock.getUncompressed(inBlock2, getOpcode()); + retBlock = LibCommonsMath.matrixMatrixOperations(inBlock1, inBlock2, getOpcode()); + } else { // Perform computation using input matrices, and produce the result matrix BinaryOperator bop = (BinaryOperator) _optr; From 85abdf238b5e382045ecc172145543b6ee442d13 Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Fri, 26 Jun 2026 16:20:51 +0000 Subject: [PATCH 2/4] Fix compressed left TSMM returning empty result transposeSelfMatrixMultOperations ignored the block returned by CLALibTSMM.leftMultByTransposeSelf and returned the passed-in out reference instead. The helper allocates (and for empty/null inputs returns a freshly created block) internally, so returning the stale out yielded a 0x0 result for the LEFT case. Return the helper's result directly. --- .../apache/sysds/runtime/compress/CompressedMatrixBlock.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java index 64bf2aa43d2..d0ba5363939 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java @@ -619,8 +619,7 @@ public MatrixBlock aggregateUnaryOperations(AggregateUnaryOperator op, MatrixVal public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype, int k) { // check for transpose type if(tstype == MMTSJType.LEFT) { - CLALibTSMM.leftMultByTransposeSelf(this, out, k); - return out; + return CLALibTSMM.leftMultByTransposeSelf(this, out, k); } else { throw new DMLRuntimeException("Invalid MMTSJ type '" + tstype.toString() + "'."); From 32648575ec9c60c73df9963f1e1683545f50aac1 Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Sat, 27 Jun 2026 12:59:17 +0000 Subject: [PATCH 3/4] Fix correctness bugs in compressed binary column-vector sparse path The new sparse-sparse column-vector path in CLALibBinaryCellOp only visits the stored non-zeros of the compressed operand, which produced wrong results for several operation/format combinations exercised by CLALibBinaryCellOpTest: - Gate the sparse-sparse fast path on operator sparse-safety w.r.t. the compressed operand's zeros (isRowSafeLeft/isRowSafeRight on the vector). Non-sparse-safe ops (plus, minus, or, xor, min/max, ...) now fall back to the all-columns sparse path that evaluates every cell. - Index the temporary sparse block by row instead of a dense flat offset in the left path (was (row-rl)*nCol, causing wrong rows / out-of-bounds). - Sort the temporary sparse rows after decompression so values are appended to the output in ascending column order (decompressing multiple column groups can leave unsorted column indices, swapping output columns). Also rename the sparse task helpers from processDense* to processSparse*, replace the DECOMPRESSION_BLEN 16384/2 expression with 8192, and correct the asyncCompressLock comment to describe its actual global scope. --- .../CompressedMatrixBlockFactory.java | 5 +-- .../compress/lib/CLALibBinaryCellOp.java | 34 ++++++++++++------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java index f082d1ffc3d..7eae9ca0a7e 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java @@ -64,7 +64,8 @@ public class CompressedMatrixBlockFactory { private static final Log LOG = LogFactory.getLog(CompressedMatrixBlockFactory.class.getName()); - private static final Object asyncCompressLock = new Object(); + /** Global lock serializing all async compressions to bound concurrent compression memory/CPU. */ + private static final Object asyncCompressLock = new Object(); /** Timing object to measure the time of each phase in the compression */ private final Timing time = new Timing(true); @@ -188,7 +189,7 @@ public static Future compressAsync(ExecutionContext ec, String varName, In // method call or code to be async try { CacheableData data = ec.getCacheableData(varName); - synchronized(asyncCompressLock){ // synchronize on the data object to not allow multiple compressions of the same matrix. + synchronized(asyncCompressLock) { // global lock: serialize all async compressions (not per-matrix) if(data instanceof MatrixObject) { LOG.debug("Compressing Async"); MatrixObject mo = (MatrixObject) data; diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java index 26779215306..d981ab87838 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java @@ -77,7 +77,7 @@ public final class CLALibBinaryCellOp { private static final Log LOG = LogFactory.getLog(CLALibBinaryCellOp.class.getName()); - public static final int DECOMPRESSION_BLEN = 16384 / 2; + public static final int DECOMPRESSION_BLEN = 8192; private CLALibBinaryCellOp() { // empty private constructor. @@ -427,7 +427,12 @@ private static MatrixBlock mvColCompressed(CompressedMatrixBlock m1, MatrixBlock MatrixBlock ret = new MatrixBlock(nRows, nCols, shouldBeSparseOut, -1).allocateBlock(); if(shouldBeSparseOut) { - if(!m1.isOverlapping() && MatrixBlock.evalSparseFormatInMemory(nRows, nCols, m1.getNonZeros())) { + // The sparse-sparse path only visits the stored non-zeros of m1, so it is correct only when the operation + // maps a zero in m1 to a zero output for every value of the vector m2 (i.e. it does not introduce non-zeros + // from m1's zeros). Otherwise fall back to the dense-scan sparse path that evaluates every cell. + final boolean sparseSafeOnM1Zeros = left ? op.isRowSafeLeft(m2) : op.isRowSafeRight(m2); + if(sparseSafeOnM1Zeros && !m1.isOverlapping() && + MatrixBlock.evalSparseFormatInMemory(nRows, nCols, m1.getNonZeros())) { if(k <= 1) nnz = binaryMVColSingleThreadSparseSparse(m1, m2, op, left, ret); else @@ -1059,31 +1064,35 @@ public Long call() { private final void processBlock(final int rl, final int ru, final List groups, final AIterator[] its) { decompressToTmpBlock(rl, ru, tmp.getSparseBlock(), groups, its); - processDense(rl, ru); + // decompressing multiple column groups can leave the temp rows with unsorted column indices, so sort + // before reading them in stored order into the (column-sorted) output sparse block. + tmp.sortSparseRows(0, ru - rl); + processSparse(rl, ru); tmp.reset(); } private final void processBlockLeft(final int rl, final int ru, final List groups, final AIterator[] its) { decompressToTmpBlock(rl, ru, tmp.getSparseBlock(), groups, its); - processDenseLeft(rl, ru); + tmp.sortSparseRows(0, ru - rl); + processSparseLeft(rl, ru); tmp.reset(); } - private final void processDense(final int rl, final int ru) { + private final void processSparse(final int rl, final int ru) { final SparseBlock sb = _ret.getSparseBlock(); final SparseBlock _tmpSparse = tmp.getSparseBlock(); final double[] _m2Dense = _m2.getDenseBlockValues(); for(int row = rl; row < ru; row++) { final double vr = _m2Dense[row]; final int tmpOff = (row - rl); - if(!_tmpSparse.isEmpty(tmpOff)){ + if(!_tmpSparse.isEmpty(tmpOff)) { int[] aoff = _tmpSparse.indexes(tmpOff); double[] aval = _tmpSparse.values(tmpOff); int apos = _tmpSparse.pos(tmpOff); int alen = apos + _tmpSparse.size(tmpOff); - for(int j = apos; j < alen; j++){ + for(int j = apos; j < alen; j++) { sb.append(row, aoff[j], _op.fn.execute(aval[j], vr)); } } @@ -1091,21 +1100,20 @@ private final void processDense(final int rl, final int ru) { } } - private final void processDenseLeft(final int rl, final int ru) { - final int nCol = _m1.getNumColumns(); + private final void processSparseLeft(final int rl, final int ru) { final SparseBlock sb = _ret.getSparseBlock(); final SparseBlock _tmpSparse = tmp.getSparseBlock(); final double[] _m2Dense = _m2.getDenseBlockValues(); for(int row = rl; row < ru; row++) { final double vr = _m2Dense[row]; - final int tmpOff = (row - rl) * nCol; - if(!_tmpSparse.isEmpty(tmpOff)){ + final int tmpOff = (row - rl); + if(!_tmpSparse.isEmpty(tmpOff)) { int[] aoff = _tmpSparse.indexes(tmpOff); double[] aval = _tmpSparse.values(tmpOff); int apos = _tmpSparse.pos(tmpOff); int alen = apos + _tmpSparse.size(tmpOff); - for(int j = apos; j < alen; j++){ - sb.append(row, aoff[j], _op.fn.execute(vr,aval[j])); + for(int j = apos; j < alen; j++) { + sb.append(row, aoff[j], _op.fn.execute(vr, aval[j])); } } } From 25d5b39d2fc37a41c2d6775528d803aa02cf3684 Mon Sep 17 00:00:00 2001 From: Sebastian Baunsgaard Date: Sat, 27 Jun 2026 17:05:56 +0000 Subject: [PATCH 4/4] Add tests for compressed solve and non-SDC sparse decompress branches --- .../lib/CLALibBinaryCellOpCustomTest.java | 41 ++++++ ...CompressedBinaryMatrixMatrixSolveTest.java | 127 ++++++++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 src/test/java/org/apache/sysds/test/component/compress/lib/CompressedBinaryMatrixMatrixSolveTest.java diff --git a/src/test/java/org/apache/sysds/test/component/compress/lib/CLALibBinaryCellOpCustomTest.java b/src/test/java/org/apache/sysds/test/component/compress/lib/CLALibBinaryCellOpCustomTest.java index 1ce05fab616..48bac7a7920 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/lib/CLALibBinaryCellOpCustomTest.java +++ b/src/test/java/org/apache/sysds/test/component/compress/lib/CLALibBinaryCellOpCustomTest.java @@ -22,14 +22,20 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import static org.junit.Assert.assertTrue; + import org.apache.commons.lang3.tuple.Pair; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; import org.apache.sysds.runtime.compress.CompressionStatistics; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; import org.apache.sysds.runtime.compress.lib.CLALibBinaryCellOp; import org.apache.sysds.runtime.functionobjects.GreaterThanEquals; import org.apache.sysds.runtime.functionobjects.LessThanEquals; import org.apache.sysds.runtime.functionobjects.Minus; +import org.apache.sysds.runtime.functionobjects.Multiply; +import org.apache.sysds.runtime.matrix.data.LibMatrixBincell; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.BinaryOperator; import org.apache.sysds.test.TestUtils; @@ -132,6 +138,41 @@ public void OVV2() { TestUtils.compareMatricesBitAvgDistance(new MatrixBlock(10, 10, 2.5 - 324.0), cRet, 0, 0, op.toString()); } + @Test + public void sparseSparseColVectorNonSDCGroup() { + // Drive the sparse-sparse column-vector path of CLALibBinaryCellOp through a compressed input whose + // column groups are not all SDC. The constant non-zero first column compresses to a non-SDC (Const/DDC) + // group, while the remaining columns stay sparse so the overall matrix is sparse enough to pick the + // sparse-sparse path. This exercises the non-SDC branch of decompressToTmpBlock(SparseBlock), which the + // all-SDC sparse inputs of CLALibBinaryCellOpTest never reach. + final int nRow = 300; + final int nCol = 12; + MatrixBlock mb = new MatrixBlock(nRow, nCol, false); + mb.allocateDenseBlock(); + for(int i = 0; i < nRow; i++) + mb.set(i, 0, 3.0); // constant non-zero column -> non-SDC group + for(int i = 0; i < nRow; i += 7) + mb.set(i, 3, 1.0 + (i % 4)); // a few sparse non-zeros + for(int i = 0; i < nRow; i += 11) + mb.set(i, 8, 4.0); + mb.recomputeNonZeros(); + + CompressedMatrixBlock cmb = (CompressedMatrixBlock) CompressedMatrixBlockFactory.compress(mb, 1).getLeft(); + assertTrue("input must compress to exercise the compressed path", cmb instanceof CompressedMatrixBlock); + boolean hasNonSDC = false; + for(AColGroup g : cmb.getColGroups()) + hasNonSDC |= g.getCompType() != CompressionType.SDC; + assertTrue("need a non-SDC column group to cover the non-SDC decompress branch", hasNonSDC); + + // Multiply is sparse-safe (f(0,v)==0), so the sparse-safe gate routes it through the sparse-sparse path. + BinaryOperator op = new BinaryOperator(Multiply.getMultiplyFnObject(), 1); + MatrixBlock cv = TestUtils.round(TestUtils.generateTestMatrixBlock(nRow, 1, -5, 5, 1.0, 7)); + + MatrixBlock cRet = CLALibBinaryCellOp.binaryOperationsRight(op, cmb, cv); + MatrixBlock uRet = LibMatrixBincell.bincellOp(mb, cv, null, op); + TestUtils.compareMatricesBitAvgDistance(uRet, cRet, 0, 0, op.toString()); + } + @Test public void overwriteToCompressedOnSecondCompressed() { BinaryOperator op = new BinaryOperator(Minus.getMinusFnObject(), 2); diff --git a/src/test/java/org/apache/sysds/test/component/compress/lib/CompressedBinaryMatrixMatrixSolveTest.java b/src/test/java/org/apache/sysds/test/component/compress/lib/CompressedBinaryMatrixMatrixSolveTest.java new file mode 100644 index 00000000000..549010a78cb --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/compress/lib/CompressedBinaryMatrixMatrixSolveTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.compress.lib; + +import static org.junit.Assert.assertTrue; + +import org.apache.sysds.common.Types.DataType; +import org.apache.sysds.common.Types.FileFormat; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.controlprogram.LocalVariableMap; +import org.apache.sysds.runtime.controlprogram.caching.CacheableData; +import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.instructions.InstructionUtils; +import org.apache.sysds.runtime.instructions.cp.BinaryCPInstruction; +import org.apache.sysds.runtime.instructions.cp.BinaryMatrixMatrixCPInstruction; +import org.apache.sysds.runtime.matrix.data.LibCommonsMath; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.meta.MetaDataFormat; +import org.apache.sysds.test.TestUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Drive the solve opcode through {@link BinaryMatrixMatrixCPInstruction} with compressed inputs to cover the + * commons-math matrix-matrix branch that decompresses compressed left/right operands before solving. The + * script-level solve tests only ever see uncompressed inputs, so this branch is otherwise unreached. + */ +public class CompressedBinaryMatrixMatrixSolveTest { + + private static final String SOLVE = "solve"; + + @BeforeClass + public static void init() throws java.io.IOException { + CacheableData.initCaching("compressed_solve_instruction_test"); + } + + @Test + public void solveCompressedLeftCompressedRight() { + // A is a compressible, invertible matrix (constant off-diagonal with a larger diagonal), b is a + // compressed constant right-hand side: both inputs are CompressedMatrixBlock so the instruction must + // decompress both before delegating to commons-math solve. + final int n = 200; + MatrixBlock aUC = new MatrixBlock(n, n, false); + aUC.allocateDenseBlock(); + for(int i = 0; i < n; i++) + for(int j = 0; j < n; j++) + aUC.set(i, j, i == j ? 5.0 : 1.0); + aUC.recomputeNonZeros(); + + CompressedMatrixBlock aC = (CompressedMatrixBlock) CompressedMatrixBlockFactory.compress(aUC, 1).getLeft(); + assertTrue("A must compress to exercise the compressed-left path", aC instanceof CompressedMatrixBlock); + CompressedMatrixBlock bC = CompressedMatrixBlockFactory.createConstant(n, 2, 1.0); + + MatrixBlock expected = LibCommonsMath.matrixMatrixOperations( + CompressedMatrixBlock.getUncompressed(aC), CompressedMatrixBlock.getUncompressed(bC), SOLVE); + + MatrixBlock actual = runSolve(aC, bC); + TestUtils.compareMatricesBitAvgDistance(expected, actual, 0, 0, SOLVE); + } + + @Test + public void solveCompressedLeftDenseRight() { + // Only the left operand is compressed; the right-hand side stays dense (a single column). + final int n = 200; + MatrixBlock aUC = new MatrixBlock(n, n, false); + aUC.allocateDenseBlock(); + for(int i = 0; i < n; i++) + for(int j = 0; j < n; j++) + aUC.set(i, j, i == j ? 5.0 : 1.0); + aUC.recomputeNonZeros(); + + CompressedMatrixBlock aC = (CompressedMatrixBlock) CompressedMatrixBlockFactory.compress(aUC, 1).getLeft(); + assertTrue("A must compress to exercise the compressed-left path", aC instanceof CompressedMatrixBlock); + MatrixBlock b = TestUtils.round(TestUtils.generateTestMatrixBlock(n, 1, -5, 5, 1.0, 7)); + + MatrixBlock expected = LibCommonsMath.matrixMatrixOperations( + CompressedMatrixBlock.getUncompressed(aC), b, SOLVE); + + MatrixBlock actual = runSolve(aC, b); + TestUtils.compareMatricesBitAvgDistance(expected, actual, 0, 0, SOLVE); + } + + private static MatrixBlock runSolve(MatrixBlock a, MatrixBlock b) { + ExecutionContext ec = new ExecutionContext(new LocalVariableMap()); + ec.setAutoCreateVars(true); + ec.setVariable("A", matrixObject("A", a)); + ec.setVariable("b", matrixObject("b", b)); + solveInstruction().processInstruction(ec); + return ec.getMatrixObject("out").acquireReadAndRelease(); + } + + private static BinaryMatrixMatrixCPInstruction solveInstruction() { + String in1 = InstructionUtils.concatOperandParts("A", DataType.MATRIX.name(), ValueType.FP64.name(), "false"); + String in2 = InstructionUtils.concatOperandParts("b", DataType.MATRIX.name(), ValueType.FP64.name(), "false"); + String out = InstructionUtils.concatOperandParts("out", DataType.MATRIX.name(), ValueType.FP64.name(), "false"); + String str = InstructionUtils.concatOperands("CP", SOLVE, in1, in2, out); + return (BinaryMatrixMatrixCPInstruction) BinaryCPInstruction.parseInstruction(str); + } + + private static MatrixObject matrixObject(String name, MatrixBlock mb) { + MatrixCharacteristics mc = new MatrixCharacteristics(mb.getNumRows(), mb.getNumColumns(), 1000, mb.getNonZeros()); + MatrixObject mo = new MatrixObject(ValueType.FP64, "/dev/null/" + name, + new MetaDataFormat(mc, FileFormat.BINARY), mb); + return mo; + } +}