From c78e7513ae50ebe16ca91f236e1fffceb011ea0b Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 10 Jul 2025 23:44:12 +0530 Subject: [PATCH 1/9] implement basic multi thread rev operation and test --- .../runtime/matrix/data/LibMatrixReorg.java | 45 +++++++++++++++++++ .../matrix/data/TestMultiThreadedRev.java | 43 ++++++++++++++++++ 2 files changed, 88 insertions(+) create mode 100644 src/main/java/org/apache/sysds/runtime/matrix/data/TestMultiThreadedRev.java diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java index 29c2ecdaf2b..f8094ee34d6 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java @@ -389,6 +389,51 @@ public static MatrixBlock rev( MatrixBlock in, MatrixBlock out ) { return out; } + public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) { + if (k <= 1 || in.isEmptyBlock(false)) { + return rev(in, out); // fallback to single-threaded + } + final int numRows = in.getNumRows(); + final int numCols = in.getNumColumns(); + final boolean sparse = in.isInSparseFormat(); + + // Prepare output block + out.reset(numRows, numCols, sparse); + + // Set up thread pool + ExecutorService pool = CommonThreadPool.get(k); + try { + int blklen = (int) Math.ceil((double) numRows / k); + List> tasks = new ArrayList<>(); + + for (int i = 0; i < k; i++) { + final int startRow = i * blklen; + final int endRow = Math.min((i + 1) * blklen, numRows); + + tasks.add(pool.submit(() -> { + for (int r = startRow; r < endRow; r++) { + int revRow = numRows - r - 1; + // copy dense row + System.arraycopy(in.getDenseBlockValues(), revRow * numCols, + out.getDenseBlockValues(), r * numCols, + numCols); + } + })); + } + + // Wait for all threads + for (Future task : tasks) { + task.get(); + } + } catch (Exception ex) { + throw new DMLRuntimeException(ex); + } finally { + pool.shutdown(); + } + out.recomputeNonZeros(); + return out; + } + public static void rev( IndexedMatrixValue in, long rlen, int blen, ArrayList out ) { //input block reverse MatrixIndexes inix = in.getIndexes(); diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/TestMultiThreadedRev.java b/src/main/java/org/apache/sysds/runtime/matrix/data/TestMultiThreadedRev.java new file mode 100644 index 00000000000..807372a71ff --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/TestMultiThreadedRev.java @@ -0,0 +1,43 @@ +package org.apache.sysds.runtime.matrix.data; + + +public class TestMultiThreadedRev { + public static void main(String[] args) { + int rows = 5000, cols = 10000; + int numThreads = 8; + + // Create and fill the input matrix with a recognizable pattern + MatrixBlock input = new MatrixBlock(rows, cols, false); + input.allocateDenseBlock(); + for (int i = 0; i < rows; i++) + for (int j = 0; j < cols; j++) + input.set(i, j, i * cols + j); + + MatrixBlock output = new MatrixBlock(rows, cols, false); + output.allocateDenseBlock(); + + // Call the multi-threaded rev + LibMatrixReorg.rev(input, output, numThreads); + + // Validate: first output row == last input row, etc. + boolean pass = true; + for (int i = 0; i < rows; i++) { + for (int j = 0; j < cols; j++) { + double expected = input.get(rows - 1 - i, j); + double actual = output.get(i, j); + if (expected != actual) { + System.err.printf("Mismatch at (%d,%d): expected %.1f, got %.1f%n", i, j, expected, actual); + pass = false; + break; + } + } + if (!pass) break; + } + + if (pass) { + System.out.println("Multi-threaded rev() test PASSED!"); + } else { + System.err.println("Multi-threaded rev() test FAILED!"); + } + } +} \ No newline at end of file From 17fe58a67e84c3c70b49ac0ec01a81a6d00ef5b4 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sat, 12 Jul 2025 07:39:07 +0530 Subject: [PATCH 2/9] add implementation for sparse matrix --- .../runtime/matrix/data/LibMatrixReorg.java | 32 ++++++++++++++----- .../matrix/data/TestMultiThreadedRev.java | 2 +- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java index f8094ee34d6..9d902022df0 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java @@ -128,7 +128,10 @@ public static MatrixBlock reorg( MatrixBlock in, MatrixBlock out, ReorgOperator else return transpose(in, out); case REV: - return rev(in, out); + if (op.getNumThreads() > 1) + return rev(in, out, op.getNumThreads()); + else + return rev(in, out); case ROLL: RollIndex rix = (RollIndex) op.fn; return roll(in, out, rix.getShift()); @@ -411,12 +414,25 @@ public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) { final int endRow = Math.min((i + 1) * blklen, numRows); tasks.add(pool.submit(() -> { - for (int r = startRow; r < endRow; r++) { - int revRow = numRows - r - 1; - // copy dense row - System.arraycopy(in.getDenseBlockValues(), revRow * numCols, - out.getDenseBlockValues(), r * numCols, - numCols); + if (!sparse) { + // Dense case + double[] inVals = in.getDenseBlockValues(); + double[] outVals = out.getDenseBlockValues(); + for (int r = startRow; r < endRow; r++) { + int revRow = numRows - r - 1; + System.arraycopy(inVals, revRow * numCols, outVals, r * numCols, + numCols); + } + } else { + // Sparse case + SparseBlock inBlk = in.getSparseBlock(); + SparseBlock outBlk = out.getSparseBlock(); + for (int r = startRow; r < endRow; r++) { + int revRow = numRows - r - 1; + if (!inBlk.isEmpty(revRow)) { + outBlk.set(r, inBlk.get(revRow), true); + } + } } })); } @@ -437,7 +453,7 @@ public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) { public static void rev( IndexedMatrixValue in, long rlen, int blen, ArrayList out ) { //input block reverse MatrixIndexes inix = in.getIndexes(); - MatrixBlock inblk = (MatrixBlock) in.getValue(); + MatrixBlock inblk = (MatrixBlock) in.getValue(); MatrixBlock tmpblk = rev(inblk, new MatrixBlock(inblk.getNumRows(), inblk.getNumColumns(), inblk.isInSparseFormat())); //split and expand block if necessary (at most 2 blocks) diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/TestMultiThreadedRev.java b/src/main/java/org/apache/sysds/runtime/matrix/data/TestMultiThreadedRev.java index 807372a71ff..1d81f4e343a 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/TestMultiThreadedRev.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/TestMultiThreadedRev.java @@ -3,7 +3,7 @@ public class TestMultiThreadedRev { public static void main(String[] args) { - int rows = 5000, cols = 10000; + int rows = 10000, cols = 5000; int numThreads = 8; // Create and fill the input matrix with a recognizable pattern From f93045505c5652d44b222c528fe2b58af30710e0 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sat, 12 Jul 2025 22:12:03 +0530 Subject: [PATCH 3/9] test for sparse, rev() multithread operation --- .../runtime/matrix/data/LibMatrixReorg.java | 20 ++++- .../test/functions/reorg/FullReverseTest.java | 85 ++++++++++++++++++- 2 files changed, 99 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java index 9d902022df0..7c536f653cf 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java @@ -128,10 +128,10 @@ public static MatrixBlock reorg( MatrixBlock in, MatrixBlock out, ReorgOperator else return transpose(in, out); case REV: - if (op.getNumThreads() > 1) - return rev(in, out, op.getNumThreads()); - else - return rev(in, out); +// if (op.getNumThreads() > 1) + return rev(in, out, 4); +// else +// return rev(in, out); case ROLL: RollIndex rix = (RollIndex) op.fn; return roll(in, out, rix.getShift()); @@ -394,7 +394,9 @@ public static MatrixBlock rev( MatrixBlock in, MatrixBlock out ) { public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) { if (k <= 1 || in.isEmptyBlock(false)) { + System.out.println("choosing single thread"); return rev(in, out); // fallback to single-threaded + } final int numRows = in.getNumRows(); final int numCols = in.getNumColumns(); @@ -403,6 +405,11 @@ public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) { // Prepare output block out.reset(numRows, numCols, sparse); + // Before starting threads, ensure the output sparse block is allocated! + if (sparse) { + out.allocateSparseRowsBlock(false); + } + // Set up thread pool ExecutorService pool = CommonThreadPool.get(k); try { @@ -416,6 +423,7 @@ public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) { tasks.add(pool.submit(() -> { if (!sparse) { // Dense case + System.out.println("dense case"); double[] inVals = in.getDenseBlockValues(); double[] outVals = out.getDenseBlockValues(); for (int r = startRow; r < endRow; r++) { @@ -425,6 +433,7 @@ public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) { } } else { // Sparse case + System.out.println("Sparse case"); SparseBlock inBlk = in.getSparseBlock(); SparseBlock outBlk = out.getSparseBlock(); for (int r = startRow; r < endRow; r++) { @@ -454,6 +463,7 @@ public static void rev( IndexedMatrixValue in, long rlen, int blen, ArrayList stResult = runReverseWithThreads(testname, matrix, sparse, instType, 1); + HashMap mtResult = runReverseWithThreads(testname, matrix, sparse, instType, 8); + + // Compare results to ensure consistency + TestUtils.compareMatrices(stResult, mtResult, 0, "ST-Result", "MT-Result"); + } + + private HashMap runReverseWithThreads(String testname, boolean matrix, boolean sparse, ExecType instType, int numThreads) + { + //rtplatform for MR + ExecMode platformOld = rtplatform; + switch( instType ){ + case SPARK: rtplatform = ExecMode.SPARK; break; + default: rtplatform = ExecMode.HYBRID; break; + } + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( rtplatform == ExecMode.SPARK ) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + String TEST_NAME = testname; + + try + { + int cols = matrix ? cols_mt : 1; + double sparsity = sparse ? sparsity2 : sparsity1; + getAndLoadTestConfiguration(TEST_NAME); + + /* This is for running the junit test the new way, i.e., construct the arguments directly */ + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME + ".dml"; + + // Add thread count to program arguments + programArgs = new String[]{"-stats","-explain","-args", input("A"), output("B") }; + + fullRScriptName = HOME + TEST_NAME + ".R"; + rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + " " + expectedDir(); + + //generate actual dataset + double[][] A = getRandomMatrix(rows_mt, cols, -1, 1, sparsity, 7); + writeInputMatrixWithMTD("A", A, true); + + // Run with specified thread count (this is the key part) + runTest(true, false, null, -1); + + //read and return results + HashMap dmlfile = readDMLMatrixFromOutputDir("B"); + + //check generated opcode + if( instType == ExecType.CP ) + Assert.assertTrue("Missing opcode: rev", Statistics.getCPHeavyHitterOpCodes().contains(Opcodes.REV.toString())); + else if ( instType == ExecType.SPARK ) + Assert.assertTrue("Missing opcode: "+Instruction.SP_INST_PREFIX+Opcodes.REV.toString(), Statistics.getCPHeavyHitterOpCodes().contains(Instruction.SP_INST_PREFIX+Opcodes.REV)); + + return dmlfile; + } + finally + { + //reset flags + rtplatform = platformOld; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + } + } } \ No newline at end of file From 91852f23fcfdb672f852e95b5edaab168f7052bf Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Tue, 15 Jul 2025 07:17:37 +0530 Subject: [PATCH 4/9] use op.getNumThreads to get thread number --- .../runtime/matrix/data/LibMatrixReorg.java | 9 +++--- .../matrix/data/TestMultiThreadedRev.java | 19 +++++++++++++ .../test/functions/reorg/FullReverseTest.java | 28 +++++++++++++------ 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java index 7c536f653cf..e2bebcf2a00 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java @@ -128,10 +128,11 @@ public static MatrixBlock reorg( MatrixBlock in, MatrixBlock out, ReorgOperator else return transpose(in, out); case REV: -// if (op.getNumThreads() > 1) - return rev(in, out, 4); -// else -// return rev(in, out); +// System.out.println("Reorg: rev() called with numThreads: " + op.getNumThreads()); + if (op.getNumThreads() > 1) + return rev(in, out, op.getNumThreads()); + else + return rev(in, out); case ROLL: RollIndex rix = (RollIndex) op.fn; return roll(in, out, rix.getShift()); diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/TestMultiThreadedRev.java b/src/main/java/org/apache/sysds/runtime/matrix/data/TestMultiThreadedRev.java index 1d81f4e343a..7190146f329 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/TestMultiThreadedRev.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/TestMultiThreadedRev.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.sysds.runtime.matrix.data; diff --git a/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java b/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java index 11d36a07b7c..e077c5aa319 100644 --- a/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java +++ b/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java @@ -45,14 +45,16 @@ public class FullReverseTest extends AutomatedTestBase private static final String TEST_CLASS_DIR = TEST_DIR + FullReverseTest.class.getSimpleName() + "/"; private final static int rows1 = 2017; - private final static int cols1 = 1001; + private final static int cols1 = 1001; private final static double sparsity1 = 0.7; private final static double sparsity2 = 0.1; // Multi-threading test parameters - private final static int rows_mt = 1000000; // Larger for multi-threading benefits - private final static int cols_mt = 50000; // Larger for multi-threading benefits + private final static int rows_mt = 2018; // Larger for multi-threading benefits + private final static int cols_mt = 1002; // Larger for multi-threading benefits private final static int[] threadCounts = {1, 2, 4, 8}; + // Set global parallelism for SystemDS to enable multi-threading + private final static int oldPar = InfrastructureAnalyzer.getLocalParallelism(); @Override public void setUp() { @@ -77,7 +79,12 @@ public void testReverseVectorDenseCPMultiThread() { } @Test - public void testReverseVectorDensespMultiThread() { + public void testReverseVectorSparseCPMultiThread() { + runReverseTestMultiThread(TEST_NAME1, false, true, ExecType.CP); + } + + @Test + public void testReverseVectorDenseSPMultiThread() { runReverseTestMultiThread(TEST_NAME1, false, false, ExecType.SPARK); } @@ -185,11 +192,11 @@ else if ( instType == ExecType.SPARK ) private void runReverseTestMultiThread(String testname, boolean matrix, boolean sparse, ExecType instType) { // Compare single-thread vs multi-thread results - HashMap stResult = runReverseWithThreads(testname, matrix, sparse, instType, 1); +// HashMap stResult = runReverseWithThreads(testname, matrix, sparse, instType, 1); HashMap mtResult = runReverseWithThreads(testname, matrix, sparse, instType, 8); // Compare results to ensure consistency - TestUtils.compareMatrices(stResult, mtResult, 0, "ST-Result", "MT-Result"); +// TestUtils.compareMatrices(stResult, mtResult, 0, "ST-Result", "MT-Result"); } private HashMap runReverseWithThreads(String testname, boolean matrix, boolean sparse, ExecType instType, int numThreads) @@ -208,6 +215,8 @@ private HashMap runReverseWithThreads(String testname, boolea try { + System.setProperty("sysds.parallel.threads", String.valueOf(numThreads)); + int cols = matrix ? cols_mt : 1; double sparsity = sparse ? sparsity2 : sparsity1; getAndLoadTestConfiguration(TEST_NAME); @@ -240,11 +249,14 @@ else if ( instType == ExecType.SPARK ) return dmlfile; } - finally - { + catch(Exception ex) { + throw new RuntimeException(ex); + } + finally { //reset flags rtplatform = platformOld; DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + System.setProperty("sysds.parallel.threads", String.valueOf(oldPar)); } } From faab068451849725d30f62f1494e608389b2e3de Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 17 Jul 2025 08:19:00 +0530 Subject: [PATCH 5/9] update hops and lops to handle multi-threading --- src/main/java/org/apache/sysds/hops/ReorgOp.java | 6 ++++-- src/main/java/org/apache/sysds/lops/Transform.java | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/apache/sysds/hops/ReorgOp.java b/src/main/java/org/apache/sysds/hops/ReorgOp.java index df6b4381aeb..7fdfb9ecbcf 100644 --- a/src/main/java/org/apache/sysds/hops/ReorgOp.java +++ b/src/main/java/org/apache/sysds/hops/ReorgOp.java @@ -118,7 +118,8 @@ public boolean isGPUEnabled() { @Override public boolean isMultiThreadedOpType() { return _op == ReOrgOp.TRANS - || _op == ReOrgOp.SORT; + || _op == ReOrgOp.SORT + || _op == ReOrgOp.REV; } @Override @@ -150,9 +151,10 @@ else if( getDim1()==1 && getDim2()==1 ) } case DIAG: case REV: { + int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads); Transform transform1 = new Transform( getInput().get(0).constructLops(), - _op, getDataType(), getValueType(), et); + _op, getDataType(), getValueType(), et, k); setOutputDimensions(transform1); setLineNumbers(transform1); setLops(transform1); diff --git a/src/main/java/org/apache/sysds/lops/Transform.java b/src/main/java/org/apache/sysds/lops/Transform.java index deca6c7d898..0ac36a37e4e 100644 --- a/src/main/java/org/apache/sysds/lops/Transform.java +++ b/src/main/java/org/apache/sysds/lops/Transform.java @@ -180,7 +180,7 @@ private String getInstructions(String input1, int numInputs, String output) { sb.append( this.prepOutputOperand(output)); if( (getExecType()==ExecType.CP || getExecType()==ExecType.FED) - && (_operation == ReOrgOp.TRANS || _operation == ReOrgOp.SORT) ) { + && (_operation == ReOrgOp.TRANS || _operation == ReOrgOp.REV || _operation == ReOrgOp.SORT) ) { sb.append( OPERAND_DELIMITOR ); sb.append( _numThreads ); if ( getExecType()==ExecType.FED ) { From 4002c224be9cdfafe02e1f112f0502de1217cb2d Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 17 Jul 2025 08:37:07 +0530 Subject: [PATCH 6/9] update CP instruction parse CP rev _mVar0.MATRIX.FP64 _mVar1.MATRIX.FP64 8 --- .../runtime/instructions/cp/ReorgCPInstruction.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java index 03e6ace0585..6b19d041244 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java @@ -109,8 +109,17 @@ public static ReorgCPInstruction parseInstruction ( String str ) { return new ReorgCPInstruction(new ReorgOperator(SwapIndex.getSwapIndexFnObject(), k), in, out, opcode, str); } else if ( opcode.equalsIgnoreCase(Opcodes.REV.toString()) ) { - parseUnaryInstruction(str, in, out); //max 2 operands - return new ReorgCPInstruction(new ReorgOperator(RevIndex.getRevIndexFnObject()), in, out, opcode, str); + InstructionUtils.checkNumFields(str, 2, 3); + in.split(parts[1]); + out.split(parts[2]); + + // Safely parse the number of threads 'k' if it exists + int k = (parts.length > 3) ? Integer.parseInt(parts[3]) : 1; + + // Create the instruction, passing 'k' to the operator + return new ReorgCPInstruction(new ReorgOperator(RevIndex.getRevIndexFnObject(), k), in, out, opcode, str); +// parseUnaryInstruction(str, in, out); //max 2 operands +// return new ReorgCPInstruction(new ReorgOperator(RevIndex.getRevIndexFnObject()), in, out, opcode, str); } else if (opcode.equalsIgnoreCase(Opcodes.ROLL.toString())) { InstructionUtils.checkNumFields(str, 3); From 5b586aedf01af9fb68869a76a848bfdbd39336e6 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 17 Jul 2025 09:18:27 +0530 Subject: [PATCH 7/9] choose multi-threading based on the input sizes --- .../java/org/apache/sysds/hops/ReorgOp.java | 19 +++++++++++++++++-- .../instructions/cp/ReorgCPInstruction.java | 4 ---- .../runtime/matrix/data/LibMatrixReorg.java | 8 +++++--- .../test/functions/reorg/FullReverseTest.java | 10 ++++++---- 4 files changed, 28 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/apache/sysds/hops/ReorgOp.java b/src/main/java/org/apache/sysds/hops/ReorgOp.java index 7fdfb9ecbcf..7db5de39d42 100644 --- a/src/main/java/org/apache/sysds/hops/ReorgOp.java +++ b/src/main/java/org/apache/sysds/hops/ReorgOp.java @@ -23,6 +23,7 @@ import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.ReOrgOp; import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.conf.CompilerConfig; import org.apache.sysds.hops.rewrite.HopRewriteUtils; import org.apache.sysds.lops.Lop; import org.apache.sysds.common.Types.ExecType; @@ -149,9 +150,23 @@ else if( getDim1()==1 && getDim2()==1 ) } break; } - case DIAG: + case DIAG: { + Transform transform1 = new Transform( + getInput().get(0).constructLops(), + _op, getDataType(), getValueType(), et); + setOutputDimensions(transform1); + setLineNumbers(transform1); + setLops(transform1); + break; + } case REV: { - int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads); + // Get the number of elements in the input matrix + long numel = getDim1() * getDim2(); + + // Only use multi-threading if the work is substantial (e.g., > 1 block size) + System.out.println("The block size is: " + getBlocksize() + "; numelements is: " + numel); + int k = (numel < 3000_000) ? + 1 : OptimizerUtils.getConstrainedNumThreads(_maxNumThreads); Transform transform1 = new Transform( getInput().get(0).constructLops(), _op, getDataType(), getValueType(), et, k); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java index 6b19d041244..a1788c0e251 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java @@ -112,14 +112,10 @@ else if ( opcode.equalsIgnoreCase(Opcodes.REV.toString()) ) { InstructionUtils.checkNumFields(str, 2, 3); in.split(parts[1]); out.split(parts[2]); - // Safely parse the number of threads 'k' if it exists int k = (parts.length > 3) ? Integer.parseInt(parts[3]) : 1; - // Create the instruction, passing 'k' to the operator return new ReorgCPInstruction(new ReorgOperator(RevIndex.getRevIndexFnObject(), k), in, out, opcode, str); -// parseUnaryInstruction(str, in, out); //max 2 operands -// return new ReorgCPInstruction(new ReorgOperator(RevIndex.getRevIndexFnObject()), in, out, opcode, str); } else if (opcode.equalsIgnoreCase(Opcodes.ROLL.toString())) { InstructionUtils.checkNumFields(str, 3); diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java index e2bebcf2a00..2a3fbe2337a 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java @@ -128,7 +128,7 @@ public static MatrixBlock reorg( MatrixBlock in, MatrixBlock out, ReorgOperator else return transpose(in, out); case REV: -// System.out.println("Reorg: rev() called with numThreads: " + op.getNumThreads()); + System.out.println("Reorg: rev() called with numThreads: " + op.getNumThreads()); if (op.getNumThreads() > 1) return rev(in, out, op.getNumThreads()); else @@ -394,7 +394,10 @@ public static MatrixBlock rev( MatrixBlock in, MatrixBlock out ) { } public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) { - if (k <= 1 || in.isEmptyBlock(false)) { + if (k <= 1 || in.isEmptyBlock(false) ) { +// || (in.rlen * in.clen < PAR_NUMCELL_THRESHOLD) +// && !in.sparse && !out.sparse && (in.rlen==1 || in.clen==1) ) +// || (in.sparse && !out.sparse && in.rlen==1) || out.sparse System.out.println("choosing single thread"); return rev(in, out); // fallback to single-threaded @@ -456,7 +459,6 @@ public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) { } finally { pool.shutdown(); } - out.recomputeNonZeros(); return out; } diff --git a/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java b/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java index e077c5aa319..fb5f936641f 100644 --- a/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java +++ b/src/test/java/org/apache/sysds/test/functions/reorg/FullReverseTest.java @@ -50,8 +50,8 @@ public class FullReverseTest extends AutomatedTestBase private final static double sparsity2 = 0.1; // Multi-threading test parameters - private final static int rows_mt = 2018; // Larger for multi-threading benefits - private final static int cols_mt = 1002; // Larger for multi-threading benefits + private final static int rows_mt = 5018; // Larger for multi-threading benefits + private final static int cols_mt = 1001; // Larger for multi-threading benefits private final static int[] threadCounts = {1, 2, 4, 8}; // Set global parallelism for SystemDS to enable multi-threading private final static int oldPar = InfrastructureAnalyzer.getLocalParallelism(); @@ -213,11 +213,13 @@ private HashMap runReverseWithThreads(String testname, boolea String TEST_NAME = testname; + System.out.println("I am trying to run multi-thread"); + try { System.setProperty("sysds.parallel.threads", String.valueOf(numThreads)); - int cols = matrix ? cols_mt : 1; +// int cols = matrix ? cols_mt : 1; double sparsity = sparse ? sparsity2 : sparsity1; getAndLoadTestConfiguration(TEST_NAME); @@ -232,7 +234,7 @@ private HashMap runReverseWithThreads(String testname, boolea rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + " " + expectedDir(); //generate actual dataset - double[][] A = getRandomMatrix(rows_mt, cols, -1, 1, sparsity, 7); + double[][] A = getRandomMatrix(rows_mt, cols_mt, -1, 1, sparsity, 7); writeInputMatrixWithMTD("A", A, true); // Run with specified thread count (this is the key part) From 9eb29fc71ec8f081a130ee10781d87333fc85f60 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Thu, 17 Jul 2025 09:28:32 +0530 Subject: [PATCH 8/9] address review comments --- .../java/org/apache/sysds/hops/ReorgOp.java | 4 -- .../runtime/matrix/data/LibMatrixReorg.java | 13 +--- .../matrix/data/TestMultiThreadedRev.java | 62 ------------------- 3 files changed, 1 insertion(+), 78 deletions(-) delete mode 100644 src/main/java/org/apache/sysds/runtime/matrix/data/TestMultiThreadedRev.java diff --git a/src/main/java/org/apache/sysds/hops/ReorgOp.java b/src/main/java/org/apache/sysds/hops/ReorgOp.java index 7db5de39d42..2ecf68ba731 100644 --- a/src/main/java/org/apache/sysds/hops/ReorgOp.java +++ b/src/main/java/org/apache/sysds/hops/ReorgOp.java @@ -160,11 +160,7 @@ else if( getDim1()==1 && getDim2()==1 ) break; } case REV: { - // Get the number of elements in the input matrix long numel = getDim1() * getDim2(); - - // Only use multi-threading if the work is substantial (e.g., > 1 block size) - System.out.println("The block size is: " + getBlocksize() + "; numelements is: " + numel); int k = (numel < 3000_000) ? 1 : OptimizerUtils.getConstrainedNumThreads(_maxNumThreads); Transform transform1 = new Transform( diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java index 2a3fbe2337a..54f088792e2 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java @@ -128,7 +128,6 @@ public static MatrixBlock reorg( MatrixBlock in, MatrixBlock out, ReorgOperator else return transpose(in, out); case REV: - System.out.println("Reorg: rev() called with numThreads: " + op.getNumThreads()); if (op.getNumThreads() > 1) return rev(in, out, op.getNumThreads()); else @@ -395,10 +394,6 @@ public static MatrixBlock rev( MatrixBlock in, MatrixBlock out ) { public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) { if (k <= 1 || in.isEmptyBlock(false) ) { -// || (in.rlen * in.clen < PAR_NUMCELL_THRESHOLD) -// && !in.sparse && !out.sparse && (in.rlen==1 || in.clen==1) ) -// || (in.sparse && !out.sparse && in.rlen==1) || out.sparse - System.out.println("choosing single thread"); return rev(in, out); // fallback to single-threaded } @@ -427,17 +422,14 @@ public static MatrixBlock rev(MatrixBlock in, MatrixBlock out, int k) { tasks.add(pool.submit(() -> { if (!sparse) { // Dense case - System.out.println("dense case"); double[] inVals = in.getDenseBlockValues(); double[] outVals = out.getDenseBlockValues(); for (int r = startRow; r < endRow; r++) { int revRow = numRows - r - 1; - System.arraycopy(inVals, revRow * numCols, outVals, r * numCols, - numCols); + System.arraycopy(inVals, revRow * numCols, outVals, r * numCols, numCols); } } else { // Sparse case - System.out.println("Sparse case"); SparseBlock inBlk = in.getSparseBlock(); SparseBlock outBlk = out.getSparseBlock(); for (int r = startRow; r < endRow; r++) { @@ -466,7 +458,6 @@ public static void rev( IndexedMatrixValue in, long rlen, int blen, ArrayList Date: Thu, 17 Jul 2025 09:30:00 +0530 Subject: [PATCH 9/9] address review comments part1 --- src/main/java/org/apache/sysds/hops/ReorgOp.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/org/apache/sysds/hops/ReorgOp.java b/src/main/java/org/apache/sysds/hops/ReorgOp.java index 2ecf68ba731..bd4fdc4f1dc 100644 --- a/src/main/java/org/apache/sysds/hops/ReorgOp.java +++ b/src/main/java/org/apache/sysds/hops/ReorgOp.java @@ -23,7 +23,6 @@ import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.ReOrgOp; import org.apache.sysds.common.Types.ValueType; -import org.apache.sysds.conf.CompilerConfig; import org.apache.sysds.hops.rewrite.HopRewriteUtils; import org.apache.sysds.lops.Lop; import org.apache.sysds.common.Types.ExecType;