From 5f0fd56e16fb029a626cc43f0616eb0126e0c637 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sat, 26 Jul 2025 15:51:23 +0530 Subject: [PATCH 01/13] add test to invoke ooc unary op --- .../instructions/OOCInstructionParser.java | 7 +- .../instructions/ooc/OOCInstruction.java | 2 +- .../instructions/ooc/UnaryOOCInstruction.java | 31 +++++ .../sysds/test/functions/ooc/UnaryTest.java | 117 ++++++++++++++++++ src/test/scripts/functions/ooc/Unary.dml | 32 +++++ 5 files changed, 184 insertions(+), 5 deletions(-) create mode 100644 src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java create mode 100644 src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java create mode 100644 src/test/scripts/functions/ooc/Unary.dml diff --git a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java index 0e5b3f1f512..b4d0c4a6eb0 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java @@ -23,10 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.sysds.common.InstructionType; import org.apache.sysds.runtime.DMLRuntimeException; -import org.apache.sysds.runtime.instructions.ooc.AggregateUnaryOOCInstruction; -import org.apache.sysds.runtime.instructions.ooc.BinaryOOCInstruction; -import org.apache.sysds.runtime.instructions.ooc.OOCInstruction; -import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction; +import org.apache.sysds.runtime.instructions.ooc.*; public class OOCInstructionParser extends InstructionParser { protected static final Log LOG = LogFactory.getLog(OOCInstructionParser.class.getName()); @@ -51,6 +48,8 @@ public static OOCInstruction parseSingleInstruction(InstructionType ooctype, Str return ReblockOOCInstruction.parseInstruction(str); case AggregateUnary: return AggregateUnaryOOCInstruction.parseInstruction(str); + case Unary: + return UnaryOOCInstruction.parseInstruction(str); case Binary: return BinaryOOCInstruction.parseInstruction(str); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java index fe73e57fd24..db3d2da8b11 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCInstruction.java @@ -30,7 +30,7 @@ public abstract class OOCInstruction extends Instruction { protected static final Log LOG = LogFactory.getLog(OOCInstruction.class.getName()); public enum OOCType { - Reblock, AggregateUnary, Binary + Reblock, AggregateUnary, Binary, Unary } protected final OOCInstruction.OOCType _ooctype; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java new file mode 100644 index 00000000000..021e445e63e --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -0,0 +1,31 @@ +package org.apache.sysds.runtime.instructions.ooc; + +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.instructions.InstructionUtils; +import org.apache.sysds.runtime.instructions.cp.CPOperand; +import org.apache.sysds.runtime.matrix.operators.Operator; +import org.apache.sysds.runtime.matrix.operators.UnaryOperator; + +public class UnaryOOCInstruction extends ComputationOOCInstruction { + private UnaryOperator _uop = null; + + protected UnaryOOCInstruction(OOCType type, Operator op, CPOperand in1, CPOperand out, String opcode, String istr) { + super(type, op, in1, out, opcode, istr); + } + + public static UnaryOOCInstruction parseInstruction(String str) { + String[] parts = InstructionUtils.getInstructionParts(str); + InstructionUtils.checkNumFields(parts, 2); + String opcode = parts[0]; + CPOperand in1 = new CPOperand(parts[1]); + CPOperand out = new CPOperand(parts[2]); + + UnaryOperator uopcode = InstructionUtils.parseUnaryOperator(opcode); + return new UnaryOOCInstruction(OOCType.Unary, uopcode, in1, out, str); + } + + public void processInstruction( ExecutionContext ec ) { + + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java new file mode 100644 index 00000000000..b4f90adf354 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.ooc; + +import org.apache.sysds.common.Opcodes; +import org.apache.sysds.common.Types; +import org.apache.sysds.common.Types.FileFormat; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.hops.OptimizerUtils; +import org.apache.sysds.runtime.instructions.Instruction; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.MatrixWriterFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixValue; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.util.HDFSTool; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; + +public class UnaryTest extends AutomatedTestBase { + + private static final String TEST_NAME = "Unary"; + private static final String TEST_DIR = "functions/ooc/"; + private static final String TEST_CLASS_DIR = TEST_DIR + UnaryTest.class.getSimpleName() + "/"; + private static final String INPUT_NAME = "X"; + private static final String OUTPUT_NAME = "res"; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + TestConfiguration config = new TestConfiguration(TEST_CLASS_DIR, TEST_NAME); + addTestConfiguration(TEST_NAME, config); + } + + /** + * Test the sum of scalar multiplication, "sum(X*7)", with OOC backend. + */ + @Test + public void testUnary() { + testUnaryOperation(false); + } + + + public void testUnaryOperation(boolean rewrite) + { + Types.ExecMode platformOld = rtplatform; + rtplatform = Types.ExecMode.SINGLE_NODE; + boolean oldRewrite = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION; + OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = rewrite; + + try { + getAndLoadTestConfiguration(TEST_NAME); + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME + ".dml"; + programArgs = new String[] {"-explain", "-stats", "-ooc", + "-args", input(INPUT_NAME), output(OUTPUT_NAME)}; + + int rows = 3500, cols = 4; + MatrixBlock mb = MatrixBlock.randOperations(rows, cols, 1.0, -1, 1, "uniform", 7); + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(FileFormat.BINARY); + writer.writeMatrixToHDFS(mb, input(INPUT_NAME), rows, cols, 1000, rows*cols); + HDFSTool.writeMetaDataFile(input(INPUT_NAME+".mtd"), ValueType.FP64, + new MatrixCharacteristics(rows,cols,1000,rows*cols), FileFormat.BINARY); + + runTest(true, false, null, -1); + + HashMap dmlfile = readDMLMatrixFromOutputDir(OUTPUT_NAME); + Double result = dmlfile.get(new MatrixValue.CellIndex(1, 1)); +// double expected = 0.0; +// for(int i = 0; i < rows; i++) { +// for(int j = 0; j < cols; j++) { +// expected += mb.get(i, j) * 7; +// } +// } +// +// Assert.assertEquals(expected, result, 1e-10); + + String prefix = Instruction.OOC_INST_PREFIX; + Assert.assertTrue("OOC wasn't used for RBLK", + heavyHittersContainsString(prefix + Opcodes.RBLK)); + if(!rewrite) + Assert.assertTrue("OOC wasn't used for SUM", + heavyHittersContainsString(prefix + Opcodes.MULT)); + Assert.assertTrue("OOC wasn't used for SUM", + heavyHittersContainsString(prefix + Opcodes.UAKP)); + } + catch(Exception ex) { + Assert.fail(ex.getMessage()); + } + finally { + OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = oldRewrite; + resetExecMode(platformOld); + } + } +} diff --git a/src/test/scripts/functions/ooc/Unary.dml b/src/test/scripts/functions/ooc/Unary.dml new file mode 100644 index 00000000000..a7c3bf5cea2 --- /dev/null +++ b/src/test/scripts/functions/ooc/Unary.dml @@ -0,0 +1,32 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Read input matrix and operator from command line args +X = read($1, format="binary"); +op = $3; + +# Apply the specified unary operation +if (op == "ceil") { + Y = ceil(X); +} + +# Write the final matrix result +write(Y, $2, format="binary"); From e03f6305f51ad981dfeea698c04e4b530aa43d12 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sat, 26 Jul 2025 15:54:19 +0530 Subject: [PATCH 02/13] fix dml file --- .../sysds/runtime/instructions/ooc/UnaryOOCInstruction.java | 2 +- src/test/scripts/functions/ooc/Unary.dml | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java index 021e445e63e..53efbf64d2f 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -22,7 +22,7 @@ public static UnaryOOCInstruction parseInstruction(String str) { CPOperand out = new CPOperand(parts[2]); UnaryOperator uopcode = InstructionUtils.parseUnaryOperator(opcode); - return new UnaryOOCInstruction(OOCType.Unary, uopcode, in1, out, str); + return new UnaryOOCInstruction(OOCType.Unary, uopcode, in1, out, str, str); } public void processInstruction( ExecutionContext ec ) { diff --git a/src/test/scripts/functions/ooc/Unary.dml b/src/test/scripts/functions/ooc/Unary.dml index a7c3bf5cea2..70c43dd29da 100644 --- a/src/test/scripts/functions/ooc/Unary.dml +++ b/src/test/scripts/functions/ooc/Unary.dml @@ -21,12 +21,8 @@ # Read input matrix and operator from command line args X = read($1, format="binary"); -op = $3; -# Apply the specified unary operation -if (op == "ceil") { - Y = ceil(X); -} +Y = ceil(X); # Write the final matrix result write(Y, $2, format="binary"); From 24598d616d992bbe3969d755da0c1a86fbaf4b83 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sat, 26 Jul 2025 16:16:37 +0530 Subject: [PATCH 03/13] ceil(X) output is empty --- src/test/scripts/functions/ooc/Unary.dml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/scripts/functions/ooc/Unary.dml b/src/test/scripts/functions/ooc/Unary.dml index 70c43dd29da..fd41caea205 100644 --- a/src/test/scripts/functions/ooc/Unary.dml +++ b/src/test/scripts/functions/ooc/Unary.dml @@ -20,9 +20,9 @@ #------------------------------------------------------------- # Read input matrix and operator from command line args -X = read($1, format="binary"); +X = read($1); Y = ceil(X); - +print(toString(Y)) # Write the final matrix result -write(Y, $2, format="binary"); +write(Y, $2); From ebaeff0d6fd9a2946438de2540f36e61a8083017 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sat, 26 Jul 2025 22:12:14 +0530 Subject: [PATCH 04/13] code related to unaryooc parseinstruction is called --- .../instructions/ooc/UnaryOOCInstruction.java | 39 +++++++++++++++++++ .../sysds/test/functions/ooc/UnaryTest.java | 4 +- src/test/scripts/functions/ooc/Unary.dml | 2 +- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java index 53efbf64d2f..c1694f15cf8 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -1,11 +1,19 @@ package org.apache.sysds.runtime.instructions.ooc; import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CPOperand; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import org.apache.sysds.runtime.util.CommonThreadPool; + +import java.util.concurrent.ExecutorService; public class UnaryOOCInstruction extends ComputationOOCInstruction { private UnaryOperator _uop = null; @@ -21,11 +29,42 @@ public static UnaryOOCInstruction parseInstruction(String str) { CPOperand in1 = new CPOperand(parts[1]); CPOperand out = new CPOperand(parts[2]); + System.out.println("Here at UnaryOOCInstruction parseInstruction"); + UnaryOperator uopcode = InstructionUtils.parseUnaryOperator(opcode); return new UnaryOOCInstruction(OOCType.Unary, uopcode, in1, out, str, str); } public void processInstruction( ExecutionContext ec ) { + UnaryOperator uop = (UnaryOperator) _uop; + // Create thread and process the unary operation + MatrixObject min = ec.getMatrixObject(input1); + LocalTaskQueue qIn = min.getStreamHandle(); + LocalTaskQueue qOut = new LocalTaskQueue<>(); + ec.getMatrixObject(output).setStreamHandle(qOut); + System.out.println("Here at UnaryOOCInstruction processInstruction ExecutionContext"); + ExecutorService pool = CommonThreadPool.get(); + try { + pool.submit(() -> { + IndexedMatrixValue tmp = null; + try { + while ((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { + IndexedMatrixValue tmpOut = new IndexedMatrixValue(); + System.out.println("Here at Inside thread"); + tmpOut.set(tmp.getIndexes(), + tmp.getValue().unaryOperations(uop, new MatrixBlock())); + qOut.enqueueTask(tmpOut); + } + qOut.dequeueTask(); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + }); + } + finally { + pool.shutdown(); + } } } diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java index b4f90adf354..73be6bb93aa 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java @@ -86,8 +86,8 @@ public void testUnaryOperation(boolean rewrite) runTest(true, false, null, -1); - HashMap dmlfile = readDMLMatrixFromOutputDir(OUTPUT_NAME); - Double result = dmlfile.get(new MatrixValue.CellIndex(1, 1)); +// HashMap dmlfile = readDMLMatrixFromOutputDir(OUTPUT_NAME); +// Double result = dmlfile.get(new MatrixValue.CellIndex(1, 1)); // double expected = 0.0; // for(int i = 0; i < rows; i++) { // for(int j = 0; j < cols; j++) { diff --git a/src/test/scripts/functions/ooc/Unary.dml b/src/test/scripts/functions/ooc/Unary.dml index fd41caea205..e71ad31f73b 100644 --- a/src/test/scripts/functions/ooc/Unary.dml +++ b/src/test/scripts/functions/ooc/Unary.dml @@ -21,7 +21,7 @@ # Read input matrix and operator from command line args X = read($1); - +print(toString(X)) Y = ceil(X); print(toString(Y)) # Write the final matrix result From 10e76170d3ea74935c9533ab1ebf86c978a21826 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 27 Jul 2025 07:13:24 +0530 Subject: [PATCH 05/13] program not reaching the threads --- .../sysds/runtime/instructions/ooc/UnaryOOCInstruction.java | 5 ++++- .../java/org/apache/sysds/test/functions/ooc/UnaryTest.java | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java index c1694f15cf8..6cf3853152f 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -5,6 +5,7 @@ import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.functionobjects.Builtin; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CPOperand; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; @@ -18,8 +19,10 @@ public class UnaryOOCInstruction extends ComputationOOCInstruction { private UnaryOperator _uop = null; - protected UnaryOOCInstruction(OOCType type, Operator op, CPOperand in1, CPOperand out, String opcode, String istr) { + protected UnaryOOCInstruction(OOCType type, UnaryOperator op, CPOperand in1, CPOperand out, String opcode, String istr) { super(type, op, in1, out, opcode, istr); + + _uop = op; } public static UnaryOOCInstruction parseInstruction(String str) { diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java index 73be6bb93aa..1ec05e3427a 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java @@ -77,7 +77,7 @@ public void testUnaryOperation(boolean rewrite) programArgs = new String[] {"-explain", "-stats", "-ooc", "-args", input(INPUT_NAME), output(OUTPUT_NAME)}; - int rows = 3500, cols = 4; + int rows = 3500, cols = 1000; MatrixBlock mb = MatrixBlock.randOperations(rows, cols, 1.0, -1, 1, "uniform", 7); MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(FileFormat.BINARY); writer.writeMatrixToHDFS(mb, input(INPUT_NAME), rows, cols, 1000, rows*cols); From 43f997a95a8e8535fbd03e61df86566a8520c900 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 27 Jul 2025 09:16:59 +0530 Subject: [PATCH 06/13] fix cpoperand input types --- .../sysds/runtime/instructions/ooc/UnaryOOCInstruction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java index 6cf3853152f..3f993b797ba 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -26,7 +26,7 @@ protected UnaryOOCInstruction(OOCType type, UnaryOperator op, CPOperand in1, CPO } public static UnaryOOCInstruction parseInstruction(String str) { - String[] parts = InstructionUtils.getInstructionParts(str); + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); InstructionUtils.checkNumFields(parts, 2); String opcode = parts[0]; CPOperand in1 = new CPOperand(parts[1]); From 97892432ec6400217a170f30d95490b6ad23f495 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 27 Jul 2025 12:24:56 +0530 Subject: [PATCH 07/13] input stream not yet reaching the threads --- .../sysds/runtime/instructions/ooc/UnaryOOCInstruction.java | 5 ++++- .../java/org/apache/sysds/test/functions/ooc/UnaryTest.java | 4 ++-- src/test/scripts/functions/ooc/Unary.dml | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java index 3f993b797ba..c9c85e6a5aa 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -59,11 +59,14 @@ public void processInstruction( ExecutionContext ec ) { tmp.getValue().unaryOperations(uop, new MatrixBlock())); qOut.enqueueTask(tmpOut); } - qOut.dequeueTask(); } catch(Exception ex) { throw new DMLRuntimeException(ex); } + finally { + qOut.closeInput(); + } + }); } finally { diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java index 1ec05e3427a..2c60940ce50 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java @@ -77,7 +77,7 @@ public void testUnaryOperation(boolean rewrite) programArgs = new String[] {"-explain", "-stats", "-ooc", "-args", input(INPUT_NAME), output(OUTPUT_NAME)}; - int rows = 3500, cols = 1000; + int rows = 1000, cols = 1000; MatrixBlock mb = MatrixBlock.randOperations(rows, cols, 1.0, -1, 1, "uniform", 7); MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(FileFormat.BINARY); writer.writeMatrixToHDFS(mb, input(INPUT_NAME), rows, cols, 1000, rows*cols); @@ -86,7 +86,7 @@ public void testUnaryOperation(boolean rewrite) runTest(true, false, null, -1); -// HashMap dmlfile = readDMLMatrixFromOutputDir(OUTPUT_NAME); + HashMap dmlfile = readDMLMatrixFromOutputDir(OUTPUT_NAME); // Double result = dmlfile.get(new MatrixValue.CellIndex(1, 1)); // double expected = 0.0; // for(int i = 0; i < rows; i++) { diff --git a/src/test/scripts/functions/ooc/Unary.dml b/src/test/scripts/functions/ooc/Unary.dml index e71ad31f73b..84cb4f3d3ba 100644 --- a/src/test/scripts/functions/ooc/Unary.dml +++ b/src/test/scripts/functions/ooc/Unary.dml @@ -25,4 +25,4 @@ print(toString(X)) Y = ceil(X); print(toString(Y)) # Write the final matrix result -write(Y, $2); +write(Y, $2, format="binary"); From 44aba46b741cad4f47f76f55ec4a31ada5ff7608 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 27 Jul 2025 12:37:52 +0530 Subject: [PATCH 08/13] pass only the opcode for profiling Heavy hitter instructions: # Instruction Time(s) Count 1 ooc_rblk 0.055 1 2 toString 0.049 2 3 print 0.016 2 4 createvar 0.013 3 5 ooc_ceil 0.001 1 6 rmvar 0.000 3 --- .../sysds/runtime/instructions/ooc/UnaryOOCInstruction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java index c9c85e6a5aa..5f4884449f0 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -35,7 +35,7 @@ public static UnaryOOCInstruction parseInstruction(String str) { System.out.println("Here at UnaryOOCInstruction parseInstruction"); UnaryOperator uopcode = InstructionUtils.parseUnaryOperator(opcode); - return new UnaryOOCInstruction(OOCType.Unary, uopcode, in1, out, str, str); + return new UnaryOOCInstruction(OOCType.Unary, uopcode, in1, out, opcode, str); } public void processInstruction( ExecutionContext ec ) { From ab7a585de6a4b5b7ceb58826bc391a134ed1a537 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 27 Jul 2025 13:56:48 +0530 Subject: [PATCH 09/13] add Future and task.get() to wait --- .../runtime/instructions/ooc/UnaryOOCInstruction.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java index 5f4884449f0..e2a2da8124a 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -14,7 +14,9 @@ import org.apache.sysds.runtime.matrix.operators.UnaryOperator; import org.apache.sysds.runtime.util.CommonThreadPool; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; public class UnaryOOCInstruction extends ComputationOOCInstruction { private UnaryOperator _uop = null; @@ -47,9 +49,10 @@ public void processInstruction( ExecutionContext ec ) { ec.getMatrixObject(output).setStreamHandle(qOut); System.out.println("Here at UnaryOOCInstruction processInstruction ExecutionContext"); + ExecutorService pool = CommonThreadPool.get(); try { - pool.submit(() -> { + Future task =pool.submit(() -> { IndexedMatrixValue tmp = null; try { while ((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { @@ -68,8 +71,10 @@ public void processInstruction( ExecutionContext ec ) { } }); - } - finally { + task.get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } finally { pool.shutdown(); } } From f306f694a10cc0cecdf195031a1468b7db5930f7 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 27 Jul 2025 15:12:21 +0530 Subject: [PATCH 10/13] test pass but write is not successful --- .../runtime/instructions/ooc/UnaryOOCInstruction.java | 5 +---- .../org/apache/sysds/test/functions/ooc/UnaryTest.java | 9 +++------ src/test/scripts/functions/ooc/Unary.dml | 4 ++-- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java index e2a2da8124a..84f5acadbbe 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -62,14 +62,11 @@ public void processInstruction( ExecutionContext ec ) { tmp.getValue().unaryOperations(uop, new MatrixBlock())); qOut.enqueueTask(tmpOut); } + qOut.closeInput(); } catch(Exception ex) { throw new DMLRuntimeException(ex); } - finally { - qOut.closeInput(); - } - }); task.get(); } catch (ExecutionException | InterruptedException e) { diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java index 2c60940ce50..063e22ca6ce 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java @@ -86,7 +86,7 @@ public void testUnaryOperation(boolean rewrite) runTest(true, false, null, -1); - HashMap dmlfile = readDMLMatrixFromOutputDir(OUTPUT_NAME); +// HashMap dmlfile = readDMLMatrixFromOutputDir(OUTPUT_NAME); // Double result = dmlfile.get(new MatrixValue.CellIndex(1, 1)); // double expected = 0.0; // for(int i = 0; i < rows; i++) { @@ -100,11 +100,8 @@ public void testUnaryOperation(boolean rewrite) String prefix = Instruction.OOC_INST_PREFIX; Assert.assertTrue("OOC wasn't used for RBLK", heavyHittersContainsString(prefix + Opcodes.RBLK)); - if(!rewrite) - Assert.assertTrue("OOC wasn't used for SUM", - heavyHittersContainsString(prefix + Opcodes.MULT)); - Assert.assertTrue("OOC wasn't used for SUM", - heavyHittersContainsString(prefix + Opcodes.UAKP)); + Assert.assertTrue("OOC wasn't used for CEIL", + heavyHittersContainsString(prefix + Opcodes.CEIL)); } catch(Exception ex) { Assert.fail(ex.getMessage()); diff --git a/src/test/scripts/functions/ooc/Unary.dml b/src/test/scripts/functions/ooc/Unary.dml index 84cb4f3d3ba..af9932d0f0c 100644 --- a/src/test/scripts/functions/ooc/Unary.dml +++ b/src/test/scripts/functions/ooc/Unary.dml @@ -21,8 +21,8 @@ # Read input matrix and operator from command line args X = read($1); -print(toString(X)) +#print(toString(X)) Y = ceil(X); print(toString(Y)) # Write the final matrix result -write(Y, $2, format="binary"); +#write(Y, $2, format="binary"); From 54594995cbde63c92bf56ca715774e3f8f3c83f7 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 27 Jul 2025 15:16:24 +0530 Subject: [PATCH 11/13] remove unused imports --- .../instructions/OOCInstructionParser.java | 6 ++++- .../instructions/ooc/UnaryOOCInstruction.java | 22 ++++++++++++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java index b4d0c4a6eb0..a744b5d8136 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/OOCInstructionParser.java @@ -23,7 +23,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.sysds.common.InstructionType; import org.apache.sysds.runtime.DMLRuntimeException; -import org.apache.sysds.runtime.instructions.ooc.*; +import org.apache.sysds.runtime.instructions.ooc.AggregateUnaryOOCInstruction; +import org.apache.sysds.runtime.instructions.ooc.BinaryOOCInstruction; +import org.apache.sysds.runtime.instructions.ooc.OOCInstruction; +import org.apache.sysds.runtime.instructions.ooc.ReblockOOCInstruction; +import org.apache.sysds.runtime.instructions.ooc.UnaryOOCInstruction; public class OOCInstructionParser extends InstructionParser { protected static final Log LOG = LogFactory.getLog(OOCInstructionParser.class.getName()); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java index 84f5acadbbe..50a0a53e204 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -1,16 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.sysds.runtime.instructions.ooc; -import org.apache.sysds.common.Types; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; -import org.apache.sysds.runtime.functionobjects.Builtin; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CPOperand; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; -import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.matrix.operators.UnaryOperator; import org.apache.sysds.runtime.util.CommonThreadPool; From 65330526bd0659f347d7ddca8b42d38a35ea3ef9 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 27 Jul 2025 15:20:33 +0530 Subject: [PATCH 12/13] address review comments --- .../sysds/runtime/instructions/ooc/UnaryOOCInstruction.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java index 50a0a53e204..d2fccd5fd6a 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -50,8 +50,6 @@ public static UnaryOOCInstruction parseInstruction(String str) { CPOperand in1 = new CPOperand(parts[1]); CPOperand out = new CPOperand(parts[2]); - System.out.println("Here at UnaryOOCInstruction parseInstruction"); - UnaryOperator uopcode = InstructionUtils.parseUnaryOperator(opcode); return new UnaryOOCInstruction(OOCType.Unary, uopcode, in1, out, opcode, str); } @@ -63,7 +61,6 @@ public void processInstruction( ExecutionContext ec ) { LocalTaskQueue qIn = min.getStreamHandle(); LocalTaskQueue qOut = new LocalTaskQueue<>(); ec.getMatrixObject(output).setStreamHandle(qOut); - System.out.println("Here at UnaryOOCInstruction processInstruction ExecutionContext"); ExecutorService pool = CommonThreadPool.get(); @@ -73,7 +70,6 @@ public void processInstruction( ExecutionContext ec ) { try { while ((tmp = qIn.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { IndexedMatrixValue tmpOut = new IndexedMatrixValue(); - System.out.println("Here at Inside thread"); tmpOut.set(tmp.getIndexes(), tmp.getValue().unaryOperations(uop, new MatrixBlock())); qOut.enqueueTask(tmpOut); From 8819b62728060de14c4f0b62f7f1e224aa8acab7 Mon Sep 17 00:00:00 2001 From: Janardhan Pulivarthi Date: Sun, 27 Jul 2025 15:31:51 +0530 Subject: [PATCH 13/13] add a basic test, verify it with aggregation --- .../sysds/test/functions/ooc/UnaryTest.java | 20 +++++++++---------- src/test/scripts/functions/ooc/Unary.dml | 5 +++-- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java index 063e22ca6ce..90f5f8ff0cb 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java @@ -86,16 +86,16 @@ public void testUnaryOperation(boolean rewrite) runTest(true, false, null, -1); -// HashMap dmlfile = readDMLMatrixFromOutputDir(OUTPUT_NAME); -// Double result = dmlfile.get(new MatrixValue.CellIndex(1, 1)); -// double expected = 0.0; -// for(int i = 0; i < rows; i++) { -// for(int j = 0; j < cols; j++) { -// expected += mb.get(i, j) * 7; -// } -// } -// -// Assert.assertEquals(expected, result, 1e-10); + HashMap dmlfile = readDMLMatrixFromOutputDir(OUTPUT_NAME); + Double result = dmlfile.get(new MatrixValue.CellIndex(1, 1)); + double expected = 0.0; + for(int i = 0; i < rows; i++) { + for(int j = 0; j < cols; j++) { + expected += Math.ceil(mb.get(i, j)); + } + } + + Assert.assertEquals(expected, result, 1e-10); String prefix = Instruction.OOC_INST_PREFIX; Assert.assertTrue("OOC wasn't used for RBLK", diff --git a/src/test/scripts/functions/ooc/Unary.dml b/src/test/scripts/functions/ooc/Unary.dml index af9932d0f0c..6d34e8fd763 100644 --- a/src/test/scripts/functions/ooc/Unary.dml +++ b/src/test/scripts/functions/ooc/Unary.dml @@ -23,6 +23,7 @@ X = read($1); #print(toString(X)) Y = ceil(X); -print(toString(Y)) +#print(toString(Y)) +res = as.matrix(sum(Y)); # Write the final matrix result -#write(Y, $2, format="binary"); +write(res, $2);