From 170871772dfcfa926e60fa59a301b3afe044fefb Mon Sep 17 00:00:00 2001 From: Vi Vuong Date: Fri, 27 Jun 2025 15:06:43 +0200 Subject: [PATCH 1/5] ein test --- basjxbla.txt | 1 + 1 file changed, 1 insertion(+) create mode 100644 basjxbla.txt diff --git a/basjxbla.txt b/basjxbla.txt new file mode 100644 index 00000000000..e5b219ba346 --- /dev/null +++ b/basjxbla.txt @@ -0,0 +1 @@ +hallo sandra \ No newline at end of file From 887458458bcb86f7e7f7fbbde70e6b4196f96817 Mon Sep 17 00:00:00 2001 From: lcl2a Date: Fri, 27 Jun 2025 15:08:12 +0200 Subject: [PATCH 2/5] vi stinkt --- basjxbla.txt | 1 - 1 file changed, 1 deletion(-) delete mode 100644 basjxbla.txt diff --git a/basjxbla.txt b/basjxbla.txt deleted file mode 100644 index e5b219ba346..00000000000 --- a/basjxbla.txt +++ /dev/null @@ -1 +0,0 @@ -hallo sandra \ No newline at end of file From f8a5e96e1faed73c0ade6cf847f8ca03fc2be98f Mon Sep 17 00:00:00 2001 From: Vi Vuong Date: Fri, 27 Jun 2025 17:25:09 +0200 Subject: [PATCH 3/5] rowcumsum implementation and tests --- .../org/apache/sysds/common/Builtins.java | 1 + .../java/org/apache/sysds/common/Opcodes.java | 3 + .../java/org/apache/sysds/common/Types.java | 4 +- .../parser/BuiltinFunctionExpression.java | 1 + .../apache/sysds/parser/DMLTranslator.java | 1 + .../runtime/functionobjects/Builtin.java | 3 +- .../instructions/InstructionUtils.java | 2 + .../spark/CumulativeOffsetSPInstruction.java | 2 + .../spark/UnaryMatrixSPInstruction.java | 210 +++++++++++++++++ .../runtime/matrix/data/LibMatrixAgg.java | 107 ++++++++- .../unary/matrix/FullRowcumsumTest.java | 222 ++++++++++++++++++ .../functions/unary/matrix/Rowcumsum.R | 36 +++ .../functions/unary/matrix/Rowcumsum.dml | 26 ++ 13 files changed, 615 insertions(+), 3 deletions(-) create mode 100644 src/test/java/org/apache/sysds/test/functions/unary/matrix/FullRowcumsumTest.java create mode 100644 src/test/scripts/functions/unary/matrix/Rowcumsum.R create mode 100644 src/test/scripts/functions/unary/matrix/Rowcumsum.dml diff --git a/src/main/java/org/apache/sysds/common/Builtins.java b/src/main/java/org/apache/sysds/common/Builtins.java index 423679d038c..fe75aec6a05 100644 --- a/src/main/java/org/apache/sysds/common/Builtins.java +++ b/src/main/java/org/apache/sysds/common/Builtins.java @@ -291,6 +291,7 @@ public enum Builtins { ROLL("roll", false), ROUND("round", false), ROW_COUNT_DISTINCT("rowCountDistinct",false), + ROWCUMSUM("rowcumsum", false), ROWINDEXMAX("rowIndexMax", false), ROWINDEXMIN("rowIndexMin", false), ROWMAX("rowMaxs", false), diff --git a/src/main/java/org/apache/sysds/common/Opcodes.java b/src/main/java/org/apache/sysds/common/Opcodes.java index 28c5a7a6a8e..623271bf12a 100644 --- a/src/main/java/org/apache/sysds/common/Opcodes.java +++ b/src/main/java/org/apache/sysds/common/Opcodes.java @@ -36,6 +36,7 @@ public enum Opcodes { UAKP("uak+", InstructionType.AggregateUnary), UARKP("uark+", InstructionType.AggregateUnary), UACKP("uack+", InstructionType.AggregateUnary), + UARCKP("uarck+", InstructionType.AggregateUnary), UASQKP("uasqk+", InstructionType.AggregateUnary), UARSQKP("uarsqk+", InstructionType.AggregateUnary), UACSQKP("uacsqk+", InstructionType.AggregateUnary), @@ -151,6 +152,7 @@ public enum Opcodes { CEIL("ceil", InstructionType.Unary), FLOOR("floor", InstructionType.Unary), UCUMKP("ucumk+", InstructionType.Unary), + UROWCUMKP("urowcumk+", InstructionType.Unary), UCUMM("ucum*", InstructionType.Unary), UCUMKPM("ucumk+*", InstructionType.Unary), UCUMMIN("ucummin", InstructionType.Unary), @@ -383,6 +385,7 @@ public enum Opcodes { UCUMACMIN("ucumacmin", InstructionType.CumsumAggregate), UCUMACMAX("ucumacmax", InstructionType.CumsumAggregate), BCUMOFFKP("bcumoffk+", InstructionType.CumsumOffset), + BROWCUMOFFKP("browcumoffk+", InstructionType.CumsumOffset), BCUMOFFM("bcumoff*", InstructionType.CumsumOffset), BCUMOFFPM("bcumoff+*", InstructionType.CumsumOffset), BCUMOFFMIN("bcumoffmin", InstructionType.CumsumOffset), diff --git a/src/main/java/org/apache/sysds/common/Types.java b/src/main/java/org/apache/sysds/common/Types.java index e69ad375b20..239cdfbd37d 100644 --- a/src/main/java/org/apache/sysds/common/Types.java +++ b/src/main/java/org/apache/sysds/common/Types.java @@ -545,7 +545,7 @@ public enum OpOp1 { CEIL, CHOLESKY, COS, COSH, CUMMAX, CUMMIN, CUMPROD, CUMSUM, CUMSUMPROD, DET, DETECTSCHEMA, COLNAMES, EIGEN, EXISTS, EXP, FLOOR, INVERSE, IQM, ISNA, ISNAN, ISINF, LENGTH, LINEAGE, LOG, NCOL, NOT, NROW, - MEDIAN, PREFETCH, PRINT, ROUND, SIN, SINH, SIGN, SOFTMAX, SQRT, STOP, _EVICT, + MEDIAN, PREFETCH, PRINT, ROUND, ROWCUMSUM, SIN, SINH, SIGN, SOFTMAX, SQRT, STOP, _EVICT, SVD, TAN, TANH, TYPEOF, TRIGREMOTE, SQRT_MATRIX_JAVA, //fused ML-specific operators for performance SPROP, //sample proportion: P * (1 - P) @@ -589,6 +589,7 @@ public String toString() { case MULT2: return Opcodes.MULT2.toString(); case NOT: return Opcodes.NOT.toString(); case POW2: return Opcodes.POW2.toString(); + case ROWCUMSUM: return Opcodes.UROWCUMKP.toString(); case TYPEOF: return Opcodes.TYPEOF.toString(); default: return name().toLowerCase(); } @@ -608,6 +609,7 @@ public static OpOp1 valueOfByOpcode(String opcode) { case "ucummin": return CUMMIN; case "ucum*": return CUMPROD; case "ucumk+": return CUMSUM; + case "urowcumk+": return ROWCUMSUM; case "ucumk+*": return CUMSUMPROD; case "detectSchema": return DETECTSCHEMA; case "*2": return MULT2; diff --git a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java index ae582b052b2..540b522a8bb 100644 --- a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java +++ b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java @@ -1034,6 +1034,7 @@ else if( getAllExpr().length == 2 ) { //binary break; case CUMSUM: + case ROWCUMSUM: case CUMPROD: case CUMSUMPROD: case CUMMIN: diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java b/src/main/java/org/apache/sysds/parser/DMLTranslator.java index c4f7f672abe..6827bcc4bfa 100644 --- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java +++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java @@ -2616,6 +2616,7 @@ else if ( sop.equalsIgnoreCase(Opcodes.NOTEQUAL.toString()) ) case CEIL: case FLOOR: case CUMSUM: + case ROWCUMSUM: case CUMPROD: case CUMSUMPROD: case CUMMIN: diff --git a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java index 6b196489eac..8e9aef94660 100644 --- a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java +++ b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java @@ -49,7 +49,7 @@ public class Builtin extends ValueFunction public enum BuiltinCode { AUTODIFF, SIN, COS, TAN, SINH, COSH, TANH, ASIN, ACOS, ATAN, LOG, LOG_NZ, MIN, MAX, ABS, SIGN, SQRT, EXP, PLOGP, PRINT, PRINTF, NROW, NCOL, LENGTH, LINEAGE, ROUND, MAXINDEX, MININDEX, - STOP, CEIL, FLOOR, CUMSUM, CUMPROD, CUMMIN, CUMMAX, CUMSUMPROD, INVERSE, SPROP, SIGMOID, EVAL, LIST, + STOP, CEIL, FLOOR, CUMSUM, ROWCUMSUM, CUMPROD, CUMMIN, CUMMAX, CUMSUMPROD, INVERSE, SPROP, SIGMOID, EVAL, LIST, TYPEOF, APPLY_SCHEMA, DETECTSCHEMA, ISNA, ISNAN, ISINF, DROP_INVALID_TYPE, DROP_INVALID_LENGTH, VALUE_SWAP, FRAME_ROW_REPLICATE, MAP, COUNT_DISTINCT, COUNT_DISTINCT_APPROX, UNIQUE} @@ -95,6 +95,7 @@ public enum BuiltinCode { AUTODIFF, SIN, COS, TAN, SINH, COSH, TANH, ASIN, ACOS, String2BuiltinCode.put( "ceil" , BuiltinCode.CEIL); String2BuiltinCode.put( "floor" , BuiltinCode.FLOOR); String2BuiltinCode.put( "ucumk+" , BuiltinCode.CUMSUM); + String2BuiltinCode.put( "urowcumk+" , BuiltinCode.ROWCUMSUM); String2BuiltinCode.put( "ucum*" , BuiltinCode.CUMPROD); String2BuiltinCode.put( "ucumk+*", BuiltinCode.CUMSUMPROD); String2BuiltinCode.put( "ucummin", BuiltinCode.CUMMIN); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java index 3c1cf9d7758..391ec1cd1d2 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java @@ -552,6 +552,8 @@ public static AggregateUnaryOperator parseBasicCumulativeAggregateUnaryOperator( Builtin f = (Builtin)uop.fn; if( f.getBuiltinCode()==BuiltinCode.CUMSUM ) return parseBasicAggregateUnaryOperator(Opcodes.UACKP.toString()) ; + else if( f.getBuiltinCode()==BuiltinCode.ROWCUMSUM ) + return parseBasicAggregateUnaryOperator(Opcodes.UARCKP.toString()) ; else if( f.getBuiltinCode()==BuiltinCode.CUMPROD ) return parseBasicAggregateUnaryOperator(Opcodes.UACM.toString()) ; else if( f.getBuiltinCode()==BuiltinCode.CUMMIN ) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java index 9f469922adb..61b61b15332 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java @@ -56,6 +56,8 @@ private CumulativeOffsetSPInstruction(Operator op, CPOperand in1, CPOperand in2, if (Opcodes.BCUMOFFKP.toString().equals(opcode)) _uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucumk+")); + else if (Opcodes.BROWCUMOFFKP.toString().equals(opcode)) + _uop = new UnaryOperator(Builtin.getBuiltinFnObject("urowcumk+")); else if (Opcodes.BCUMOFFM.toString().equals(opcode)) _uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucum*")); else if (Opcodes.BCUMOFFPM.toString().equals(opcode)) { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java index e2653f4310d..d4cfc9defac 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java @@ -21,16 +21,22 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.common.Types.ValueType; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysds.runtime.functionobjects.KahanPlus; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CPOperand; +import org.apache.sysds.runtime.instructions.cp.KahanObject; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.matrix.operators.UnaryOperator; +import scala.Serializable; +import scala.Tuple2; +import java.util.*; public class UnaryMatrixSPInstruction extends UnarySPInstruction { @@ -61,6 +67,210 @@ public void processInstruction(ExecutionContext ec) { updateUnaryOutputDataCharacteristics(sec); sec.setRDDHandleForVariable(output.getName(), out); sec.addLineageRDD(output.getName(), input1.getName()); + + if ( "urowcumk+".equals(getOpcode()) ) { + + JavaPairRDD< MatrixIndexes, Tuple2 > localRowcumsum = in.mapToPair( new LocalRowCumsumFunction() ); + + + // Collect end-values of every block of every row for offset calc by grouping by global row index + JavaPairRDD< Long, Iterable> > rowEndValues = localRowcumsum + .mapToPair( tuple2 -> { + + // get index of block + MatrixIndexes indexes = tuple2._1; + // get cum matrix block + MatrixBlock localRowcumsumBlock = tuple2._2._2; + + // get row and column block index + long rowBlockIndex = indexes.getRowIndex(); + long colBlockIndex = indexes.getColumnIndex(); + + // Save end value of every row of every block (if block is empty save 0) + double[] endValues = new double[ localRowcumsumBlock.getNumRows() ]; + + for ( int i = 0; i < localRowcumsumBlock.getNumRows(); i ++ ) { + if (localRowcumsumBlock.getNumColumns() > 0) { + endValues[i] = localRowcumsumBlock.get(i, localRowcumsumBlock.getNumColumns() - 1); + } else { + endValues[i] = 0.0 ; + } + } + + return new Tuple2<>(rowBlockIndex, new Tuple3<>(rowBlockIndex, colBlockIndex, endValues)); + } + + ).groupByKey(); + + + + + // compute offset for every block + List< Tuple2 , double[]> > offsetList = rowEndValues + .flatMapToPair(tuple2 -> { + + Long rowBlockIndex = tuple2._1; + + List< Tuple3 > colValues = new ArrayList<>(); + for ( Tuple3 cv : tuple2._2 ) { + colValues.add(cv); + } + + // sort blocks from one row by column index + colValues.sort(Comparator.comparing(Tuple3::_2)); + + // get number of rows of a block by counting amount of end (row) values of said block + int numberOfRows = 0; + if ( !colValues.isEmpty() ) { + Tuple3 firstTuple = colValues.get(0); + double[] lastValuesArray = firstTuple._3(); + numberOfRows = lastValuesArray.length; + } + + + List, double[]>> blockOffsets = new ArrayList<>(); + + double[] cumulativeOffsets = new double[numberOfRows]; + + for (Tuple3 colValue : colValues) { + + Long colBlockIndex = colValue._2(); + double[] endValues = colValue._3(); + + // copy current offsets + double[] currentOffsets = cumulativeOffsets.clone(); + + // and save block indexes with its offsets + blockOffsets.add( new Tuple2<>(new Tuple2<>(rowBlockIndex, colBlockIndex), currentOffsets) ); + + for ( int i = 0; i < numberOfRows && i < endValues.length; i++ ) { + cumulativeOffsets[i] += endValues[i]; + } + + } + return blockOffsets.iterator(); + } + ).collect(); + + + // convert list to map for easier access to offsets + Map< Tuple2, double[] > offsetMap = new HashMap<>(); + for (Tuple2, double[]> offset : offsetList) { + offsetMap.put(offset._1, offset._2); + } + + + out = localRowcumsum.mapToPair( new FinalRowCumsumFunction(offsetMap)) ; + + updateUnaryOutputDataCharacteristics(sec); + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), input1.getName()); + } + } + + + + private static class LocalRowCumsumFunction implements PairFunction< Tuple2, MatrixIndexes, Tuple2 > { + + @Override + public Tuple2< MatrixIndexes, Tuple2 > call(Tuple2 tuple2) { + + + MatrixBlock inputBlock = tuple2._2; + MatrixBlock cumsumBlock = new MatrixBlock( inputBlock.getNumRows(), inputBlock.getNumColumns(), false ); + + + for ( int i = 0; i < inputBlock.getNumRows(); i++ ) { + + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); + + for ( int j = 0; j < inputBlock.getNumColumns(); j++ ) { + + double val = inputBlock.get(i, j); + kplus.execute2(kbuff, val); + cumsumBlock.set(i, j, kbuff._sum); + } + } + // original index, original matrix and local cumsum block + return new Tuple2<>( tuple2._1, new Tuple2<>(inputBlock, cumsumBlock) ); + } + } + + + + + private static class FinalRowCumsumFunction implements PairFunction >, MatrixIndexes, MatrixBlock> { + + + // map block indexes to the row offsets + private final Map< Tuple2, double[] > offsetMap; + + public FinalRowCumsumFunction(Map, double[]> offsetMap) { + this.offsetMap = offsetMap; + } + + + @Override + public Tuple2 call( Tuple2< MatrixIndexes, Tuple2 > tuple ) { + + MatrixIndexes indexes = tuple._1; + MatrixBlock inputBlock = tuple._2._1; + MatrixBlock localRowCumsumBlock = tuple._2._2; + + // key to get the row offset for this block + Tuple2 blockKey = new Tuple2<>( indexes.getRowIndex(), indexes.getColumnIndex()) ; + double[] offsets = offsetMap.get(blockKey); + + MatrixBlock cumsumBlock = new MatrixBlock( inputBlock.getNumRows(), inputBlock.getNumColumns(), false ); + + + for ( int i = 0; i < inputBlock.getNumRows(); i++ ) { + + double rowOffset = 0.0; + if ( offsets != null && i < offsets.length ) { + rowOffset = offsets[i]; + } + + for ( int j = 0; j < inputBlock.getNumColumns(); j++ ) { + double cumsumValue = localRowCumsumBlock.get(i, j); + cumsumBlock.set(i, j, cumsumValue + rowOffset); + } + } + + // block index and final cumsum block + return new Tuple2<>(indexes, cumsumBlock); + } + } + + + + // helper class + private static class Tuple3 implements Serializable { + + private static final long serialVersionUID = 123; + private final Type1 _1; + private final Type2 _2; + private final Type3 _3; + + + public Tuple3( Type1 _1, Type2 _2, Type3 _3 ) { + this._1 = _1; + this._2 = _2; + this._3 = _3; + } + + public Type1 _1() { + return _1; + } + + public Type2 _2() { + return _2; + } + + public Type3 _3() { + return _3; + } } private static class RDDMatrixBuiltinUnaryOp implements Function diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java index 3cba9fb8c57..b9827d49ee1 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java @@ -112,6 +112,7 @@ private enum AggType { SUM, SUM_SQ, CUM_KAHAN_SUM, + ROW_CUM_SUM, CUM_MIN, CUM_MAX, CUM_PROD, @@ -783,6 +784,7 @@ private static AggType getAggType( UnaryOperator op ) { BuiltinCode bfunc = ((Builtin) vfn).bFunc; switch( bfunc ) { case CUMSUM: return AggType.CUM_KAHAN_SUM; + case ROWCUMSUM: return AggType.ROW_CUM_SUM; case CUMPROD: return AggType.CUM_PROD; case CUMMIN: return AggType.CUM_MIN; case CUMMAX: return AggType.CUM_MAX; @@ -1548,6 +1550,12 @@ else if( ixFn instanceof ReduceRow ) //COLSUM_SQ d_ucumkp(in.getDenseBlock(), null, out.getDenseBlock(), n, kbuff, kplus, rl, ru); break; } + case ROW_CUM_SUM: { //ROWCUMSUM + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); + d_urowcumkp(in.getDenseBlock(), null, out.getDenseBlock(), n, kbuff, kplus, rl, ru); + break; + } case CUM_PROD: { //CUMPROD d_ucumm(in.getDenseBlockValues(), null, out.getDenseBlockValues(), n, rl, ru); break; @@ -1666,6 +1674,12 @@ else if( ixFn instanceof ReduceRow ) //COLSUM_SQ s_ucumkp(a, null, out.getDenseBlock(), m, n, kbuff, kplus, rl, ru); break; } + case ROW_CUM_SUM: { //ROWCUMSUM + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); + s_urowcumkp(a, null, out.getDenseBlock(), m, n, kbuff, kplus, rl, ru); + break; + } case CUM_PROD: { //CUMPROD s_ucumm(a, null, out.getDenseBlockValues(), n, rl, ru); break; @@ -1747,6 +1761,12 @@ private static void cumaggregateUnaryMatrixDense(MatrixBlock in, MatrixBlock out d_ucumkp(da, agg, dc, n, kbuff, kplus, rl, ru); break; } + case ROW_CUM_SUM: { //ROWCUMSUM + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); + d_urowcumkp(da, agg, dc, n, kbuff, kplus, rl, ru); + break; + } case CUM_SUM_PROD: { //CUMSUMPROD if( n != 2 ) throw new DMLRuntimeException("Cumsumprod expects two-column input (n="+n+")."); @@ -1791,6 +1811,12 @@ private static void cumaggregateUnaryMatrixSparse(MatrixBlock in, MatrixBlock ou s_ucumkp(a, agg, dc, m, n, kbuff, kplus, rl, ru); break; } + case ROW_CUM_SUM: { //ROWCUMSUM + KahanObject kbuff = new KahanObject(0, 0); + KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); + s_urowcumkp(a, agg, dc, m, n, kbuff, kplus, rl, ru); + break; + } case CUM_SUM_PROD: { //CUMSUMPROD if( n != 2 ) throw new DMLRuntimeException("Cumsumprod expects two-column input (n="+n+")."); @@ -1821,6 +1847,7 @@ private static MatrixBlock aggregateUnaryMatrixEmpty(MatrixBlock in, MatrixBlock case SUM: case SUM_SQ: case KAHAN_SUM: + case ROW_CUM_SUM: case KAHAN_SUM_SQ: val = 0; break; case MIN: val = Double.POSITIVE_INFINITY; break; case MAX: val = Double.NEGATIVE_INFINITY; break; @@ -1838,7 +1865,7 @@ private static MatrixBlock aggregateUnaryMatrixEmpty(MatrixBlock in, MatrixBlock if(optype == AggType.KAHAN_SUM || optype == AggType.KAHAN_SUM_SQ || optype == AggType.SUM || optype == AggType.SUM_SQ || optype == AggType.MIN || optype == AggType.MAX || optype == AggType.PROD - || optype == AggType.CUM_KAHAN_SUM || optype == AggType.CUM_PROD + || optype == AggType.CUM_KAHAN_SUM || optype == AggType.ROW_CUM_SUM || optype == AggType.CUM_PROD || optype == AggType.CUM_MIN || optype == AggType.CUM_MAX) { return out; @@ -2099,6 +2126,39 @@ private static void d_ucumkp( DenseBlock a, double[] agg, DenseBlock c, int n, K c.set(i, csums.values(0)); } } + + /** + * ROWCUMSUM, opcode: urowcumk+, dense input. + * + * @param a + * @param agg + * @param c + * @param n + * @param kbuff + * @param kplus + * @param rl + * @param ru + */ + private static void d_urowcumkp( DenseBlock a, double[] agg, DenseBlock c, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) { + //row-wise cumulative sum w/ optional row offsets + for (int i = rl; i < ru; i++) { + double start = 0.0; + int localRow = i - rl; + if (agg != null) { + if (localRow >= 0 && localRow < agg.length) { + start = agg[localRow]; + } + } + kbuff.set(start, 0); + //compute cumulative sum over row + for (int j = 0; j < n; j++) { + double val = a.get(i, j); + kplus.execute2(kbuff, val); + c.set(i, j, kbuff._sum); + } + + } + } /** * CUMSUMPROD, opcode: ucumk+*, dense input. @@ -2750,6 +2810,51 @@ private static void s_ucumkp( SparseBlock a, double[] agg, DenseBlock c, int m, c.set(i, csums.values(0)); } } + + /** + * ROWCUMSUM, opcode: urowcumk+, sparse input. + * + * @param a ? + * @param agg ? + * @param c ? + * @param m ? + * @param n ? + * @param kbuff ? + * @param kplus ? + * @param rl row lower index + * @param ru row upper index + */ + private static void s_urowcumkp(SparseBlock a, double[] agg, DenseBlock c, int m, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru) { + //scan rows and compute row-wise prefix sums + for (int i = rl; i < ru; i++) { + double start = 0.0; + int localRow = i - rl; + if (agg != null && localRow >= 0 && localRow < agg.length) + start = agg[localRow]; + if (!a.isEmpty(i)) { + double[] ain = a.values(i); + int[] aix = a.indexes(i); + int apos = a.pos(i); + int alen = a.size(i); + kbuff.set(start, 0); + int sparseIdx = 0; + //prefix sum over sparse row + for (int j = 0; j < n; j++) { + if (sparseIdx < alen && aix[apos + sparseIdx] == j) { + kplus.execute2(kbuff, ain[apos + sparseIdx]); + start = kbuff._sum; + sparseIdx++; + } + c.set(i, j, start); + } + } + else { + //fill empty row with start value + for (int j = 0; j < n; j++) + c.set(i, j, start); + } + } + } /** diff --git a/src/test/java/org/apache/sysds/test/functions/unary/matrix/FullRowcumsumTest.java b/src/test/java/org/apache/sysds/test/functions/unary/matrix/FullRowcumsumTest.java new file mode 100644 index 00000000000..a3b48549953 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/unary/matrix/FullRowcumsumTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.unary.matrix; + +import org.apache.sysds.api.DMLScript; +import org.apache.sysds.common.Types.ExecMode; +import org.apache.sysds.common.Types.ExecType; +import org.apache.sysds.hops.OptimizerUtils; +import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.apache.sysds.utils.Statistics; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashMap; + +public class FullRowcumsumTest extends AutomatedTestBase +{ + private final static String TEST_NAME = "rowcumsum"; + private final static String TEST_DIR = "functions/unary/matrix/"; + private static final String TEST_CLASS_DIR = TEST_DIR + FullRowcumsumTest.class.getSimpleName() + "/"; + + private final static double eps = 1e-10; + + private final static int rowsMatrix = 1201; + private final static int colsMatrix = 1103; + private final static double spSparse = 0.1; + private final static double spDense = 0.9; + + private enum InputType { + COL_VECTOR, + ROW_VECTOR, + MATRIX + } + + @Override + public void setUp() + { + addTestConfiguration(TEST_NAME,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME,new String[]{"B"})); + + if (TEST_CACHE_ENABLED) { + setOutAndExpectedDeletionDisabled(true); + } + } + + @BeforeClass + public static void init() { + TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR); + } + + @AfterClass + public static void cleanUp() { + if (TEST_CACHE_ENABLED) { + TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR); + } + } + + @Test + public void testCumsumColVectorDenseCP() { + runColAggregateOperationTest(InputType.COL_VECTOR, false, ExecType.CP); + } + + @Test + public void testCumsumRowVectorDenseCP() { + runColAggregateOperationTest(InputType.ROW_VECTOR, false, ExecType.CP); + } + + @Test + public void testCumsumRowVectorDenseNoRewritesCP() { + runColAggregateOperationTest(InputType.ROW_VECTOR, false, ExecType.CP, false); + } + + @Test + public void testCumsumMatrixDenseCP() { + runColAggregateOperationTest(InputType.MATRIX, false, ExecType.CP); + } + + @Test + public void testCumsumColVectorSparseCP() { + runColAggregateOperationTest(InputType.COL_VECTOR, true, ExecType.CP); + } + + @Test + public void testCumsumRowVectorSparseCP() { + runColAggregateOperationTest(InputType.ROW_VECTOR, true, ExecType.CP); + } + + @Test + public void testCumsumRowVectorSparseNoRewritesCP() { + runColAggregateOperationTest(InputType.ROW_VECTOR, true, ExecType.CP, false); + } + + @Test + public void testCumsumMatrixSparseCP() { + runColAggregateOperationTest(InputType.MATRIX, true, ExecType.CP); + } + + @Test + public void testCumsumColVectorDenseSP() { + runColAggregateOperationTest(InputType.COL_VECTOR, false, ExecType.SPARK); + } + + @Test + public void testCumsumRowVectorDenseSP() { + runColAggregateOperationTest(InputType.ROW_VECTOR, false, ExecType.SPARK); + } + + @Test + public void testCumsumRowVectorDenseNoRewritesSP() { + runColAggregateOperationTest(InputType.ROW_VECTOR, false, ExecType.SPARK, false); + } + + @Test + public void testCumsumMatrixDenseSP() { + runColAggregateOperationTest(InputType.MATRIX, false, ExecType.SPARK); + } + + @Test + public void testCumsumColVectorSparseSP() { //das hier testen + runColAggregateOperationTest(InputType.COL_VECTOR, true, ExecType.SPARK); + } + + @Test + public void testCumsumRowVectorSparseSP() { + runColAggregateOperationTest(InputType.ROW_VECTOR, true, ExecType.SPARK); + } + + @Test + public void testCumsumRowVectorSparseNoRewritesSP() { + runColAggregateOperationTest(InputType.ROW_VECTOR, true, ExecType.SPARK, false); + } + + @Test + public void testCumsumMatrixSparseSP() { + runColAggregateOperationTest(InputType.MATRIX, true, ExecType.SPARK); + } + + private void runColAggregateOperationTest( InputType type, boolean sparse, ExecType instType) { + //by default we apply algebraic simplification rewrites + runColAggregateOperationTest(type, sparse, instType, true); + } + + private void runColAggregateOperationTest( InputType type, boolean sparse, ExecType instType, boolean rewrites) + { + ExecMode platformOld = rtplatform; + switch( instType ){ + case SPARK: rtplatform = ExecMode.SPARK; break; + default: rtplatform = ExecMode.HYBRID; break; + } + + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( rtplatform == ExecMode.SPARK ) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + //rewrites + boolean oldFlagRewrites = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION; + OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = rewrites; + + try + { + int cols = (type== InputType.COL_VECTOR) ? 1 : colsMatrix; + int rows = (type== InputType.ROW_VECTOR) ? 1 : rowsMatrix; + double sparsity = (sparse) ? spSparse : spDense; + + String TEST_CACHE_DIR = !TEST_CACHE_ENABLED ? "" : + type.ordinal() + "_" + sparsity + "/"; + + TestConfiguration config = getTestConfiguration(TEST_NAME); + loadTestConfiguration(config, TEST_CACHE_DIR); + + // This is for running the junit test the new way, i.e., construct the arguments directly + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME + ".dml"; + programArgs = new String[]{"-explain", "-args", input("A"), output("B") }; + + fullRScriptName = HOME + TEST_NAME + ".R"; + rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + " " + expectedDir(); + + //generate actual dataset + double[][] A = getRandomMatrix(rows, cols, -0.05, 1, sparsity, 7); + writeInputMatrixWithMTD("A", A, true); + + runTest(true, false, null, -1); + if( instType==ExecType.CP ) //in CP no spark jobs should be executed + Assert.assertEquals("Unexpected number of executed MR jobs.", 0, Statistics.getNoOfExecutedSPInst()); + + runRScript(true); + + //compare matrices + HashMap dmlfile = readDMLMatrixFromOutputDir("B"); + HashMap rfile = readRMatrixFromExpectedDir("B"); + TestUtils.compareMatrices(dmlfile, rfile, eps, "Stat-DML", "Stat-R"); + } + finally + { + rtplatform = platformOld; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = oldFlagRewrites; + } + } +} diff --git a/src/test/scripts/functions/unary/matrix/Rowcumsum.R b/src/test/scripts/functions/unary/matrix/Rowcumsum.R new file mode 100644 index 00000000000..7c80e304f63 --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/Rowcumsum.R @@ -0,0 +1,36 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +args <- commandArgs(TRUE) +options(digits=22) + +library("Matrix") + +A = as.matrix(readMM(paste(args[1], "A.mtx", sep=""))) + +if( ncol(A)>1 ){ + B = t(apply(A, 1, cumsum)); +} else { + B = A; +} + +writeMM(as(B, "CsparseMatrix"), paste(args[2], "B", sep="")); \ No newline at end of file diff --git a/src/test/scripts/functions/unary/matrix/Rowcumsum.dml b/src/test/scripts/functions/unary/matrix/Rowcumsum.dml new file mode 100644 index 00000000000..8b4b4fb0a90 --- /dev/null +++ b/src/test/scripts/functions/unary/matrix/Rowcumsum.dml @@ -0,0 +1,26 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +A = read($1); +B = rowcumsum(A); +write(B, $2); + From 67a346ca021f35d2754cde7bc8da847e8d70cbfc Mon Sep 17 00:00:00 2001 From: Vi Vuong Date: Tue, 22 Jul 2025 15:05:47 +0200 Subject: [PATCH 4/5] Automatic Rewrites, Documentation, Linux bug fix --- docs/site/dml-language-reference.md | 45 ++--- .../RewriteAlgebraicSimplificationStatic.java | 23 +++ .../RewriteSimplifyTransposedCumsumTest.java | 181 ++++++++++++++++++ .../unary/matrix/FullRowcumsumTest.java | 54 ++++-- .../rewrite/RewriteSimplifyTransposedCumsum.R | 36 ++++ .../RewriteSimplifyTransposedCumsum.dml | 25 +++ 6 files changed, 325 insertions(+), 39 deletions(-) create mode 100644 src/test/java/org/apache/sysds/test/functions/rewrite/RewriteSimplifyTransposedCumsumTest.java create mode 100644 src/test/scripts/functions/rewrite/RewriteSimplifyTransposedCumsum.R create mode 100644 src/test/scripts/functions/rewrite/RewriteSimplifyTransposedCumsum.dml diff --git a/docs/site/dml-language-reference.md b/docs/site/dml-language-reference.md index abafb74a5ac..4fe2e17b3cf 100644 --- a/docs/site/dml-language-reference.md +++ b/docs/site/dml-language-reference.md @@ -686,29 +686,30 @@ as.double(),
as.integer(),
as.logical() | A variable is cast as the **Table 7**: Statistical Built-In Functions -Function | Description | Parameters | Example --------- | ----------- | ---------- | ------- -mean()
avg() | Return the mean value of all cells in matrix | Input: matrix
Output: scalar | mean(X) -var()
sd() | Return the variance/stdDev value of all cells in matrix. Both use unbiased estimators with (n-1) denominator. | Input: matrix
Output: scalar | var(X)
sd(X) -moment() | Returns the kth central moment of values in a column matrix V, where k = 2, 3, or 4. It can be used to compute statistical measures like Variance, Kurtosis, and Skewness. This function also takes an optional weights parameter W. | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),] k <scalar>)
Output: <scalar> | A = rand(rows=100000,cols=1, pdf="normal")
print("Variance from our (standard normal) random generator is approximately " + moment(A,2)) -colSums()
colMeans()
colVars()
colSds()
colMaxs()
colMins() | Column-wise computations -- for each column, compute the sum/mean/variance/stdDev/max/min of cell values | Input: matrix
Output: (1 x n) matrix | colSums(X)
colMeans(X)
colVars(X)
colSds(X)
colMaxs(X)
colMins(X) -cov() | Returns the covariance between two 1-dimensional column matrices X and Y. The function takes an optional weights parameter W. All column matrices X, Y, and W (when specified) must have the exact same dimension. | Input: (X <(n x 1) matrix>, Y <(n x 1) matrix> [, W <(n x 1) matrix>)])
Output: <scalar> | cov(X,Y)
cov(X,Y,W) -contains() | Indicates if the target matrix contains at least one pattern value (with handling of special values like Not-a-Number). | Input: (target=<matrix>,pattern=<scalar>)
Output: <scalar> | hasNaNs = contains(target=X, pattern=NaN) -table() | Returns the contingency table of two vectors A and B. The resulting table F consists of max(A) rows and max(B) columns.
More precisely, F[i,j] = \\|{ k \\| A[k] = i and B[k] = j, 1 ≤ k ≤ n }\\|, where A and B are two n-dimensional vectors.
This function supports multiple other variants, which can be found below, at the end of this Table 7. | Input: (<(n x 1) matrix>, <(n x 1) matrix>), [<(n x 1) matrix>])
Output: <matrix> | F = table(A, B)
F = table(A, B, C)
And, several other forms (see below Table 7.) -cdf()
pnorm()
pexp()
pchisq()
pf()
pt()
icdf()
qnorm()
qexp()
qchisq()
qf()
qt() | p=cdf(target=q, ...) returns the cumulative probability P[X <= q].
q=icdf(target=p, ...) returns the inverse cumulative probability i.e., it returns q such that the given target p = P[X<=q].
For more details, please see the section "Probability Distribution Functions" below Table 7. | Input: (target=<scalar>, dist="...", ...)
Output: <scalar> | p = cdf(target=q, dist="normal", mean=1.5, sd=2); is same as p=pnorm(target=q, mean=1.5, sd=2);
q=icdf(target=p, dist="normal") is same as q=qnorm(target=p, mean=0,sd=1)
More examples can be found in the section "Probability Distribution Functions" below Table 7. +Function | Description | Parameters | Example +-------- |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ---------- | ------- +mean()
avg() | Return the mean value of all cells in matrix | Input: matrix
Output: scalar | mean(X) +var()
sd() | Return the variance/stdDev value of all cells in matrix. Both use unbiased estimators with (n-1) denominator. | Input: matrix
Output: scalar | var(X)
sd(X) +moment() | Returns the kth central moment of values in a column matrix V, where k = 2, 3, or 4. It can be used to compute statistical measures like Variance, Kurtosis, and Skewness. This function also takes an optional weights parameter W. | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),] k <scalar>)
Output: <scalar> | A = rand(rows=100000,cols=1, pdf="normal")
print("Variance from our (standard normal) random generator is approximately " + moment(A,2)) +colSums()
colMeans()
colVars()
colSds()
colMaxs()
colMins() | Column-wise computations -- for each column, compute the sum/mean/variance/stdDev/max/min of cell values | Input: matrix
Output: (1 x n) matrix | colSums(X)
colMeans(X)
colVars(X)
colSds(X)
colMaxs(X)
colMins(X) +cov() | Returns the covariance between two 1-dimensional column matrices X and Y. The function takes an optional weights parameter W. All column matrices X, Y, and W (when specified) must have the exact same dimension. | Input: (X <(n x 1) matrix>, Y <(n x 1) matrix> [, W <(n x 1) matrix>)])
Output: <scalar> | cov(X,Y)
cov(X,Y,W) +contains() | Indicates if the target matrix contains at least one pattern value (with handling of special values like Not-a-Number). | Input: (target=<matrix>,pattern=<scalar>)
Output: <scalar> | hasNaNs = contains(target=X, pattern=NaN) +table() | Returns the contingency table of two vectors A and B. The resulting table F consists of max(A) rows and max(B) columns.
More precisely, F[i,j] = \\|{ k \\| A[k] = i and B[k] = j, 1 ≤ k ≤ n }\\|, where A and B are two n-dimensional vectors.
This function supports multiple other variants, which can be found below, at the end of this Table 7. | Input: (<(n x 1) matrix>, <(n x 1) matrix>), [<(n x 1) matrix>])
Output: <matrix> | F = table(A, B)
F = table(A, B, C)
And, several other forms (see below Table 7.) +cdf()
pnorm()
pexp()
pchisq()
pf()
pt()
icdf()
qnorm()
qexp()
qchisq()
qf()
qt() | p=cdf(target=q, ...) returns the cumulative probability P[X <= q].
q=icdf(target=p, ...) returns the inverse cumulative probability i.e., it returns q such that the given target p = P[X<=q].
For more details, please see the section "Probability Distribution Functions" below Table 7. | Input: (target=<scalar>, dist="...", ...)
Output: <scalar> | p = cdf(target=q, dist="normal", mean=1.5, sd=2); is same as p=pnorm(target=q, mean=1.5, sd=2);
q=icdf(target=p, dist="normal") is same as q=qnorm(target=p, mean=0,sd=1)
More examples can be found in the section "Probability Distribution Functions" below Table 7. aggregate() | Splits/groups the values from X according to the corresponding values from G, and then applies the function fn on each group.
The result F is a column matrix, in which each row contains the value computed from a distinct group in G. More specifically, F[k,1] = fn( {X[i,1] \\| 1<=i<=n and G[i,1] = k} ), where n = nrow(X) = nrow(G).
Note that the distinct values in G are used as row indexes in the result matrix F. Therefore, nrow(F) = max(G). It is thus recommended that the values in G are consecutive and start from 1.
This function supports multiple other variants, which can be found below, at the end of this Table 7. | Input:
(target = X <(n x 1) matrix, or matrix>,
   groups = G <(n x 1) matrix>,
   fn= "..."
   [,weights= W<(n x 1) matrix>]
   [,ngroups=N] )
Output: F <matrix>
Note: X is a (n x 1) matrix unless ngroups is specified with no weights, in which case X is a regular (n x m) matrix.
The parameter fn takes one of the following functions: "count", "sum", "mean", "variance", "centralmoment". In the case of central moment, one must also provide the order of the moment that need to be computed (see example). | F = aggregate(target=X, groups=G, fn= "..." [,weights = W])
F = aggregate(target=X, groups=G1, fn= "sum");
F = aggregate(target=Y, groups=G2, fn= "mean", weights=W);
F = aggregate(target=Z, groups=G3, fn= "centralmoment", order= "2");
And, several other forms (see below Table 7.) -interQuartileMean() | Returns the mean of all x in X such that x>quantile(X, 0.25) and x<=quantile(X, 0.75). X, W are column matrices (vectors) of the same size. W contains the weights for data in X. | Input: (X <(n x 1) matrix> [, W <(n x 1) matrix>)])
Output: <scalar> | interQuartileMean(X)
interQuartileMean(X, W) -quantile () | The p-quantile for a random variable X is the value x such that Pr[X<x] <= p and Pr[X<= x] >= p
let n=nrow(X), i=ceiling(p*n), quantile() will return X[i]. p is a scalar (0<p<1) that specifies the quantile to be computed. Optionally, a weight vector may be provided for X. | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),] p <scalar>)
Output: <scalar> | quantile(X, p)
quantile(X, W, p) -quantile () | Returns a column matrix with list of all quantiles requested in P. | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),] P <(q x 1) matrix>)
Output: matrix | quantile(X, P)
quantile(X, W, P) -median() | Computes the median in a given column matrix of values | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),])
Output: <scalar> | median(X)
median(X,W) -rowSums()
rowMeans()
rowVars()
rowSds()
rowMaxs()
rowMins() | Row-wise computations -- for each row, compute the sum/mean/variance/stdDev/max/min of cell value | Input: matrix
Output: (n x 1) matrix | rowSums(X)
rowMeans(X)
rowVars(X)
rowSds(X)
rowMaxs(X)
rowMins(X) -cumsum() | Column prefix-sum (For row-prefix sum, use cumsum(t(X)) | Input: matrix
Output: matrix of the same dimensions | A = matrix("1 2 3 4 5 6", rows=3, cols=2)
B = cumsum(A)
The output matrix B = [[1, 2], [4, 6], [9, 12]] -cumprod() | Column prefix-prod (For row-prefix prod, use cumprod(t(X)) | Input: matrix
Output: matrix of the same dimensions | A = matrix("1 2 3 4 5 6", rows=3, cols=2)
B = cumprod(A)
The output matrix B = [[1, 2], [3, 8], [15, 48]] -cummin() | Column prefix-min (For row-prefix min, use cummin(t(X)) | Input: matrix
Output: matrix of the same dimensions | A = matrix("3 4 1 6 5 2", rows=3, cols=2)
B = cummin(A)
The output matrix B = [[3, 4], [1, 4], [1, 2]] -cummax() | Column prefix-max (For row-prefix min, use cummax(t(X)) | Input: matrix
Output: matrix of the same dimensions | A = matrix("3 4 1 6 5 2", rows=3, cols=2)
B = cummax(A)
The output matrix B = [[3, 4], [3, 6], [5, 6]] -sample(range, size, replacement, seed) | Sample returns a column vector of length size, containing uniform random numbers from [1, range] | Input:
range: integer
size: integer
replacement: boolean (Optional, default: FALSE)
seed: integer (Optional)
Output: Matrix dimensions are size x 1 | sample(100, 5)
sample(100, 5, TRUE)
sample(100, 120, TRUE)
sample(100, 5, 1234) # 1234 is the seed
sample(100, 5, TRUE, 1234) -outer(vector1, vector2, "op") | Applies element wise binary operation "op" (for example: "<", "==", ">=", "*", "min") on the all combination of vector.
Note: Using "*", we get outer product of two vectors. | Input: vectors of same size d, string
Output: matrix of size d X d | A = matrix("1 4", rows = 2, cols = 1)
B = matrix("3 6", rows = 1, cols = 2)
C = outer(A, B, "<")
D = outer(A, B, "*")
The output matrix C = [[1, 1], [0, 1]]
The output matrix D = [[3, 6], [12, 24]]
-toOneHot(X, num_classes)| Converts a vector containing integers to a one-hot-encoded matrix | Input: vector with N integer entries between 1 and num_classes, number of columns (must be >= largest value in X)
Output: one-hot-encoded matrix with shape (N, num_classes) | X = round(rand(rows=10, cols=1, min=2, max=10));
num_classes = ​12;
Y = toOneHot(X, num_classes); +interQuartileMean() | Returns the mean of all x in X such that x>quantile(X, 0.25) and x<=quantile(X, 0.75). X, W are column matrices (vectors) of the same size. W contains the weights for data in X. | Input: (X <(n x 1) matrix> [, W <(n x 1) matrix>)])
Output: <scalar> | interQuartileMean(X)
interQuartileMean(X, W) +quantile () | The p-quantile for a random variable X is the value x such that Pr[X<x] <= p and Pr[X<= x] >= p
let n=nrow(X), i=ceiling(p*n), quantile() will return X[i]. p is a scalar (0<p<1) that specifies the quantile to be computed. Optionally, a weight vector may be provided for X. | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),] p <scalar>)
Output: <scalar> | quantile(X, p)
quantile(X, W, p) +quantile () | Returns a column matrix with list of all quantiles requested in P. | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),] P <(q x 1) matrix>)
Output: matrix | quantile(X, P)
quantile(X, W, P) +median() | Computes the median in a given column matrix of values | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),])
Output: <scalar> | median(X)
median(X,W) +rowSums()
rowMeans()
rowVars()
rowSds()
rowMaxs()
rowMins() | Row-wise computations -- for each row, compute the sum/mean/variance/stdDev/max/min of cell value | Input: matrix
Output: (n x 1) matrix | rowSums(X)
rowMeans(X)
rowVars(X)
rowSds(X)
rowMaxs(X)
rowMins(X) +cumsum() | Column prefix-sum | Input: matrix
Output: matrix of the same dimensions | A = matrix("1 2 3 4 5 6", rows=3, cols=2)
B = cumsum(A)
The output matrix B = [[1, 2], [4, 6], [9, 12]] +rowcumsum() | Row prefix-sum | Input: matrix
Output: matrix of the same dimensions | A = matrix("1 2 3 4 5 6", rows=2, cols=3)
B = rowcumsum(A)
The output matrix B = [[1, 3, 6], [4, 9, 15]] +cumprod() | Column prefix-prod (For row-prefix prod, use cumprod(t(X)) | Input: matrix
Output: matrix of the same dimensions | A = matrix("1 2 3 4 5 6", rows=3, cols=2)
B = cumprod(A)
The output matrix B = [[1, 2], [3, 8], [15, 48]] +cummin() | Column prefix-min (For row-prefix min, use cummin(t(X)) | Input: matrix
Output: matrix of the same dimensions | A = matrix("3 4 1 6 5 2", rows=3, cols=2)
B = cummin(A)
The output matrix B = [[3, 4], [1, 4], [1, 2]] +cummax() | Column prefix-max (For row-prefix min, use cummax(t(X)) | Input: matrix
Output: matrix of the same dimensions | A = matrix("3 4 1 6 5 2", rows=3, cols=2)
B = cummax(A)
The output matrix B = [[3, 4], [3, 6], [5, 6]] +sample(range, size, replacement, seed) | Sample returns a column vector of length size, containing uniform random numbers from [1, range] | Input:
range: integer
size: integer
replacement: boolean (Optional, default: FALSE)
seed: integer (Optional)
Output: Matrix dimensions are size x 1 | sample(100, 5)
sample(100, 5, TRUE)
sample(100, 120, TRUE)
sample(100, 5, 1234) # 1234 is the seed
sample(100, 5, TRUE, 1234) +outer(vector1, vector2, "op") | Applies element wise binary operation "op" (for example: "<", "==", ">=", "*", "min") on the all combination of vector.
Note: Using "*", we get outer product of two vectors. | Input: vectors of same size d, string
Output: matrix of size d X d | A = matrix("1 4", rows = 2, cols = 1)
B = matrix("3 6", rows = 1, cols = 2)
C = outer(A, B, "<")
D = outer(A, B, "*")
The output matrix C = [[1, 1], [0, 1]]
The output matrix D = [[3, 6], [12, 24]]
+toOneHot(X, num_classes)| Converts a vector containing integers to a one-hot-encoded matrix | Input: vector with N integer entries between 1 and num_classes, number of columns (must be >= largest value in X)
Output: one-hot-encoded matrix with shape (N, num_classes) | X = round(rand(rows=10, cols=1, min=2, max=10));
num_classes = ​12;
Y = toOneHot(X, num_classes); #### Alternative forms of table() diff --git a/src/main/java/org/apache/sysds/hops/rewrite/RewriteAlgebraicSimplificationStatic.java b/src/main/java/org/apache/sysds/hops/rewrite/RewriteAlgebraicSimplificationStatic.java index ef5670dda80..4f47fa101ff 100644 --- a/src/main/java/org/apache/sysds/hops/rewrite/RewriteAlgebraicSimplificationStatic.java +++ b/src/main/java/org/apache/sysds/hops/rewrite/RewriteAlgebraicSimplificationStatic.java @@ -204,6 +204,7 @@ private void rule_AlgebraicSimplification(Hop hop, boolean descendFirst) hi = simplifyNotOverComparisons(hop, hi, i); //e.g., !(A>B) -> (A<=B) hi = simplifyMatrixScalarPMOperation(hop, hi, i); //e.g., a-A-b -> (a-b)-A; a+A-b -> (a-b)+A //hi = removeUnecessaryPPred(hop, hi, i); //e.g., ppred(X,X,"==")->matrix(1,rows=nrow(X),cols=ncol(X)) + hi = simplifyTransposedCumsum(hop, hi, i); //e.g., t(cumsum(t(X))) -> rowcumsum(X) //process childs recursively after rewrites (to investigate pattern newly created by rewrites) if( !descendFirst ) @@ -213,6 +214,28 @@ private void rule_AlgebraicSimplification(Hop hop, boolean descendFirst) hop.setVisited(); } + private static Hop simplifyTransposedCumsum( Hop parent, Hop hi, int pos ) + { + //e.g., t(cumsum(t(X))) -> rowcumsum(X) + if( HopRewriteUtils.isTransposeOperation(hi) + && hi.getInput(0) instanceof UnaryOp + && ((UnaryOp)hi.getInput(0)).getOp() == OpOp1.CUMSUM + && hi.getInput(0).getParent().size() == 1 + && HopRewriteUtils.isTransposeOperation(hi.getInput(0).getInput(0), 1)) //inner transpose with single consumer + { + UnaryOp cumsum=(UnaryOp)hi.getInput(0); + Hop innerMatrix = cumsum.getInput(0).getInput(0); + + UnaryOp rowcumsumOp = HopRewriteUtils.createUnary(innerMatrix, OpOp1.ROWCUMSUM); + HopRewriteUtils.replaceChildReference(parent,hi, rowcumsumOp, pos); + + hi = rowcumsumOp; + LOG.debug("Applied simplifyTransposedCumsum (line "+hi.getBeginLine()+")."); + } + + return hi; + } + private Hop simplifyMatrixScalarPMOperation(Hop parent, Hop hi, int pos) { if (!(hi instanceof BinaryOp)) return hi; diff --git a/src/test/java/org/apache/sysds/test/functions/rewrite/RewriteSimplifyTransposedCumsumTest.java b/src/test/java/org/apache/sysds/test/functions/rewrite/RewriteSimplifyTransposedCumsumTest.java new file mode 100644 index 00000000000..f6fb547de2f --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/rewrite/RewriteSimplifyTransposedCumsumTest.java @@ -0,0 +1,181 @@ +package org.apache.sysds.test.functions.rewrite; + +import org.apache.sysds.api.DMLScript; +import org.apache.sysds.common.Opcodes; +import org.apache.sysds.common.Types.ExecMode; +import org.apache.sysds.common.Types.ExecType; +import org.apache.sysds.hops.OptimizerUtils; +import org.apache.sysds.runtime.matrix.data.MatrixValue; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.apache.sysds.utils.Statistics; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashMap; + + +public class RewriteSimplifyTransposedCumsumTest extends AutomatedTestBase{ + private static final String TEST_NAME = "RewriteSimplifyTransposedCumsum"; + private static final String TEST_DIR = "functions/rewrite/"; + private static final String TEST_CLASS_DIR = TEST_DIR + RewriteSimplifyTransposedCumsumTest.class.getSimpleName() + "/"; + + private static final double eps = 1e-10; + + private static final int rowsMatrix = 1201; + private static final int colsMatrix = 1103; + private static final double spSparse = 0.1; + private static final double spDense = 0.9; + + private enum InputType { + COL_VECTOR, + ROW_VECTOR, + MATRIX + } + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"})); + if (TEST_CACHE_ENABLED) { + setOutAndExpectedDeletionDisabled(true); + } + } + + @BeforeClass + public static void init() { + TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR); + } + + @AfterClass + public static void cleanUp() { + if (TEST_CACHE_ENABLED) { + TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR); + } + } + + // dense cp + @Test public void testRewriteMatrixDenseCPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.MATRIX, false, ExecType.CP, false); } + @Test public void testRewriteMatrixDenseCP() { testRewriteSimplifyRowcumsum(InputType.MATRIX, false, ExecType.CP, true); } + + @Test public void testRewriteColVectorDenseCPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, false, ExecType.CP, false); } + @Test public void testRewriteColVectorDenseCP() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, false, ExecType.CP, true); } + + @Test public void testRewriteRowVectorDenseCPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, false, ExecType.CP, false); } + @Test public void testRewriteRowVectorDenseCP() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, false, ExecType.CP, true); } + + // sparse cp + @Test public void testRewriteMatrixSparseCPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.MATRIX, true, ExecType.CP, false); } + @Test public void testRewriteMatrixSparseCP() { testRewriteSimplifyRowcumsum(InputType.MATRIX, true, ExecType.CP, true); } + + @Test public void testRewriteColVectorSparseCPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, true, ExecType.CP, false); } + @Test public void testRewriteColVectorSparseCP() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, true, ExecType.CP, true); } + + @Test public void testRewriteRowVectorSparseCPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, true, ExecType.CP, false); } + @Test public void testRewriteRowVectorSparseCP() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, true, ExecType.CP, true); } + + + // dense sp + @Test public void testRewriteMatrixDenseSPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.MATRIX, false, ExecType.SPARK, false); } + @Test public void testRewriteMatrixDenseSP() { testRewriteSimplifyRowcumsum(InputType.MATRIX, false, ExecType.SPARK, true); } + + @Test public void testRewriteColVectorDenseSPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, false, ExecType.SPARK, false); } + @Test public void testRewriteColVectorDenseSP() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, false, ExecType.SPARK, true); } + + @Test public void testRewriteRowVectorDenseSPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, false, ExecType.SPARK, false); } + @Test public void testRewriteRowVectorDenseSP() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, false, ExecType.SPARK, true); } + + // sparse sp + @Test public void testRewriteMatrixSparseSPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.MATRIX, true, ExecType.SPARK, false); } + @Test public void testRewriteMatrixSparseSP() { testRewriteSimplifyRowcumsum(InputType.MATRIX, true, ExecType.SPARK, true); } + + @Test public void testRewriteColVectorSparseSPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, true, ExecType.SPARK, false); } + @Test public void testRewriteColVectorSparseSP() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, true, ExecType.SPARK, true); } + + @Test public void testRewriteRowVectorSparseSPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, true, ExecType.SPARK, false); } + @Test public void testRewriteRowVectorSparseSP() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, true, ExecType.SPARK, true); } + + + /** + * Helper method to run the rewrite test with specified input type, sparsity, execution type, and rewrite flag. + * @param type The type of the input matrix (MATRIX, COL_VECTOR, ROW_VECTOR). + * @param sparse True if the matrix should be sparse, false for dense. + * @param instType The execution type (CP or SPARK). + * @param rewrites True if algebraic simplifications (rewrites) should be enabled, false otherwise. + */ + private void testRewriteSimplifyRowcumsum(InputType type, boolean sparse, ExecType instType, boolean rewrites) { + + ExecMode platformOld = rtplatform; + switch( instType ){ + case SPARK: rtplatform = ExecMode.SPARK; break; + default: rtplatform = ExecMode.HYBRID; break; + } + + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( rtplatform == ExecMode.SPARK ) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + //rewrites + boolean oldFlagRewrites = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION; + OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = rewrites; + + + try { + // Determine matrix dimensions based on InputType + int rows = (type == InputType.ROW_VECTOR) ? 1 : rowsMatrix; + int cols = (type == InputType.COL_VECTOR) ? 1 : colsMatrix; + double sparsity = (sparse) ? spSparse : spDense; + + String TEST_CACHE_DIR = !TEST_CACHE_ENABLED ? "" : + type.ordinal() + "_" + sparsity + "/"; + + TestConfiguration config = getTestConfiguration(TEST_NAME); + loadTestConfiguration(config, TEST_CACHE_DIR); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME + ".dml"; + programArgs = new String[] {"-stats", "-args", input("A"), output("B")}; + + fullRScriptName = HOME + TEST_NAME + ".R"; + rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + " " + expectedDir(); + + // create and write matrix + double[][] A = getRandomMatrix(rows, cols, -0.05, 1, sparsity, 7); + writeInputMatrixWithMTD("A", A, true); + + runTest(true, false, null, -1); + if( instType == ExecType.CP ) { + Assert.assertEquals("Unexpected number of executed Spark jobs.", 0, Statistics.getNoOfExecutedSPInst()); + } + + runRScript(true); + + //compare matrices + HashMap dmlfile = readDMLMatrixFromOutputDir("B"); + HashMap rfile = readRMatrixFromExpectedDir("B"); + TestUtils.compareMatrices(dmlfile, rfile, eps, "Stat-DML", "Stat-R"); + + // Assertions for opcodes + if(rewrites) { + // rewrite is enabled, double transposed cumsum operation is ROWCUMSUM operation instead + Assert.assertFalse(heavyHittersContainsString(Opcodes.TRANSPOSE.toString()) || heavyHittersContainsString("sp_r'")); + Assert.assertTrue(heavyHittersContainsString(Opcodes.UROWCUMKP.toString()) || heavyHittersContainsString("sp_urowcumk+")); + } else { + // rewrite is disabled, use TRANSPOSE and CUMSUM operation + boolean transposeFound = heavyHittersContainsString(Opcodes.TRANSPOSE.toString()) || heavyHittersContainsString("sp_r'"); + Assert.assertTrue(transposeFound); + Assert.assertTrue(heavyHittersContainsString(Opcodes.UCUMKP.toString()) || heavyHittersContainsString("sp_bcumoffk+")); + + // Check that neither CP nor Spark ROWCUMSUM is present + Assert.assertFalse(heavyHittersContainsString(Opcodes.UROWCUMKP.toString()) || heavyHittersContainsString("sp_urowcumk+")); + } + } + finally { + rtplatform = platformOld; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = oldFlagRewrites; + } + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/unary/matrix/FullRowcumsumTest.java b/src/test/java/org/apache/sysds/test/functions/unary/matrix/FullRowcumsumTest.java index a3b48549953..297f39d6778 100644 --- a/src/test/java/org/apache/sysds/test/functions/unary/matrix/FullRowcumsumTest.java +++ b/src/test/java/org/apache/sysds/test/functions/unary/matrix/FullRowcumsumTest.java @@ -37,7 +37,7 @@ public class FullRowcumsumTest extends AutomatedTestBase { - private final static String TEST_NAME = "rowcumsum"; + private final static String TEST_NAME = "Rowcumsum"; private final static String TEST_DIR = "functions/unary/matrix/"; private static final String TEST_CLASS_DIR = TEST_DIR + FullRowcumsumTest.class.getSimpleName() + "/"; @@ -77,82 +77,102 @@ public static void cleanUp() { } @Test - public void testCumsumColVectorDenseCP() { + public void testRowcumsumColVectorDenseCP() { runColAggregateOperationTest(InputType.COL_VECTOR, false, ExecType.CP); } @Test - public void testCumsumRowVectorDenseCP() { + public void testRowcumsumRowVectorDenseCP() { runColAggregateOperationTest(InputType.ROW_VECTOR, false, ExecType.CP); } @Test - public void testCumsumRowVectorDenseNoRewritesCP() { + public void testRowcumsumRowVectorDenseNoRewritesCP() { runColAggregateOperationTest(InputType.ROW_VECTOR, false, ExecType.CP, false); } @Test - public void testCumsumMatrixDenseCP() { + public void testRowcumsumColVectorDenseNoRewritesCP() { + runColAggregateOperationTest(InputType.COL_VECTOR, false, ExecType.CP, false); + } + + @Test + public void testRowcumsumMatrixDenseCP() { runColAggregateOperationTest(InputType.MATRIX, false, ExecType.CP); } @Test - public void testCumsumColVectorSparseCP() { + public void testRowcumsumColVectorSparseCP() { runColAggregateOperationTest(InputType.COL_VECTOR, true, ExecType.CP); } @Test - public void testCumsumRowVectorSparseCP() { + public void testRowcumsumRowVectorSparseCP() { runColAggregateOperationTest(InputType.ROW_VECTOR, true, ExecType.CP); } @Test - public void testCumsumRowVectorSparseNoRewritesCP() { + public void testRowcumsumRowVectorSparseNoRewritesCP() { runColAggregateOperationTest(InputType.ROW_VECTOR, true, ExecType.CP, false); } @Test - public void testCumsumMatrixSparseCP() { + public void testRowcumsumColVectorSparseNoRewritesCP() { + runColAggregateOperationTest(InputType.COL_VECTOR, true, ExecType.CP, false); + } + + @Test + public void testRowcumsumMatrixSparseCP() { runColAggregateOperationTest(InputType.MATRIX, true, ExecType.CP); } @Test - public void testCumsumColVectorDenseSP() { + public void testRowcumsumColVectorDenseSP() { runColAggregateOperationTest(InputType.COL_VECTOR, false, ExecType.SPARK); } @Test - public void testCumsumRowVectorDenseSP() { + public void testRowcumsumRowVectorDenseSP() { runColAggregateOperationTest(InputType.ROW_VECTOR, false, ExecType.SPARK); } @Test - public void testCumsumRowVectorDenseNoRewritesSP() { + public void testRowcumsumRowVectorDenseNoRewritesSP() { runColAggregateOperationTest(InputType.ROW_VECTOR, false, ExecType.SPARK, false); } @Test - public void testCumsumMatrixDenseSP() { + public void testRowcumsumColVectorDenseNoRewritesSP() { + runColAggregateOperationTest(InputType.COL_VECTOR, false, ExecType.SPARK, false); + } + + @Test + public void testRowcumsumMatrixDenseSP() { runColAggregateOperationTest(InputType.MATRIX, false, ExecType.SPARK); } @Test - public void testCumsumColVectorSparseSP() { //das hier testen + public void testRowcumsumColVectorSparseSP() { runColAggregateOperationTest(InputType.COL_VECTOR, true, ExecType.SPARK); } @Test - public void testCumsumRowVectorSparseSP() { + public void testRowcumsumRowVectorSparseSP() { runColAggregateOperationTest(InputType.ROW_VECTOR, true, ExecType.SPARK); } @Test - public void testCumsumRowVectorSparseNoRewritesSP() { + public void testRowcumsumRowVectorSparseNoRewritesSP() { runColAggregateOperationTest(InputType.ROW_VECTOR, true, ExecType.SPARK, false); } @Test - public void testCumsumMatrixSparseSP() { + public void testRowcumsumColVectorSparseNoRewritesSP() { + runColAggregateOperationTest(InputType.COL_VECTOR, true, ExecType.SPARK, false); + } + + @Test + public void testRowcumsumMatrixSparseSP() { runColAggregateOperationTest(InputType.MATRIX, true, ExecType.SPARK); } diff --git a/src/test/scripts/functions/rewrite/RewriteSimplifyTransposedCumsum.R b/src/test/scripts/functions/rewrite/RewriteSimplifyTransposedCumsum.R new file mode 100644 index 00000000000..7c80e304f63 --- /dev/null +++ b/src/test/scripts/functions/rewrite/RewriteSimplifyTransposedCumsum.R @@ -0,0 +1,36 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +args <- commandArgs(TRUE) +options(digits=22) + +library("Matrix") + +A = as.matrix(readMM(paste(args[1], "A.mtx", sep=""))) + +if( ncol(A)>1 ){ + B = t(apply(A, 1, cumsum)); +} else { + B = A; +} + +writeMM(as(B, "CsparseMatrix"), paste(args[2], "B", sep="")); \ No newline at end of file diff --git a/src/test/scripts/functions/rewrite/RewriteSimplifyTransposedCumsum.dml b/src/test/scripts/functions/rewrite/RewriteSimplifyTransposedCumsum.dml new file mode 100644 index 00000000000..c2dff2fdec0 --- /dev/null +++ b/src/test/scripts/functions/rewrite/RewriteSimplifyTransposedCumsum.dml @@ -0,0 +1,25 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + + +A = read($1); +B = t(cumsum(t(A))); #this should trigger the rewrite: t(cumsum(t(A))) -> rowcumsum(A) +write(B, $2); \ No newline at end of file From f7201c087d35731a4f56e4e7eb543565095d8fc9 Mon Sep 17 00:00:00 2001 From: Vi Vuong Date: Tue, 22 Jul 2025 15:46:38 +0200 Subject: [PATCH 5/5] javadoc --- .../runtime/matrix/data/LibMatrixAgg.java | 30 +++++++++---------- .../RewriteSimplifyTransposedCumsumTest.java | 17 +++-------- 2 files changed, 19 insertions(+), 28 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java index b9827d49ee1..59301db7ece 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java @@ -2130,14 +2130,14 @@ private static void d_ucumkp( DenseBlock a, double[] agg, DenseBlock c, int n, K /** * ROWCUMSUM, opcode: urowcumk+, dense input. * - * @param a - * @param agg - * @param c - * @param n - * @param kbuff - * @param kplus - * @param rl - * @param ru + * @param a input matrix + * @param agg initial array + * @param c output matrix + * @param n number of rows + * @param kbuff collects sum + * @param kplus sums up + * @param rl row lower index + * @param ru row upper index */ private static void d_urowcumkp( DenseBlock a, double[] agg, DenseBlock c, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) { //row-wise cumulative sum w/ optional row offsets @@ -2814,13 +2814,13 @@ private static void s_ucumkp( SparseBlock a, double[] agg, DenseBlock c, int m, /** * ROWCUMSUM, opcode: urowcumk+, sparse input. * - * @param a ? - * @param agg ? - * @param c ? - * @param m ? - * @param n ? - * @param kbuff ? - * @param kplus ? + * @param a input matrix + * @param agg intial array + * @param c output matrix + * @param m number of columns + * @param n number of rows + * @param kbuff collects sum + * @param kplus sums up * @param rl row lower index * @param ru row upper index */ diff --git a/src/test/java/org/apache/sysds/test/functions/rewrite/RewriteSimplifyTransposedCumsumTest.java b/src/test/java/org/apache/sysds/test/functions/rewrite/RewriteSimplifyTransposedCumsumTest.java index f6fb547de2f..45842437d35 100644 --- a/src/test/java/org/apache/sysds/test/functions/rewrite/RewriteSimplifyTransposedCumsumTest.java +++ b/src/test/java/org/apache/sysds/test/functions/rewrite/RewriteSimplifyTransposedCumsumTest.java @@ -98,13 +98,6 @@ public static void cleanUp() { @Test public void testRewriteRowVectorSparseSP() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, true, ExecType.SPARK, true); } - /** - * Helper method to run the rewrite test with specified input type, sparsity, execution type, and rewrite flag. - * @param type The type of the input matrix (MATRIX, COL_VECTOR, ROW_VECTOR). - * @param sparse True if the matrix should be sparse, false for dense. - * @param instType The execution type (CP or SPARK). - * @param rewrites True if algebraic simplifications (rewrites) should be enabled, false otherwise. - */ private void testRewriteSimplifyRowcumsum(InputType type, boolean sparse, ExecType instType, boolean rewrites) { ExecMode platformOld = rtplatform; @@ -159,16 +152,14 @@ private void testRewriteSimplifyRowcumsum(InputType type, boolean sparse, ExecTy // Assertions for opcodes if(rewrites) { - // rewrite is enabled, double transposed cumsum operation is ROWCUMSUM operation instead + // rewrite is enabled: double transposed CUMSUM and CUMSUM is not found, ROWCUMSUM operation is found Assert.assertFalse(heavyHittersContainsString(Opcodes.TRANSPOSE.toString()) || heavyHittersContainsString("sp_r'")); + Assert.assertFalse(heavyHittersContainsString(Opcodes.UCUMKP.toString()) || heavyHittersContainsString("sp_bcumoffk+")); Assert.assertTrue(heavyHittersContainsString(Opcodes.UROWCUMKP.toString()) || heavyHittersContainsString("sp_urowcumk+")); } else { - // rewrite is disabled, use TRANSPOSE and CUMSUM operation - boolean transposeFound = heavyHittersContainsString(Opcodes.TRANSPOSE.toString()) || heavyHittersContainsString("sp_r'"); - Assert.assertTrue(transposeFound); + // rewrite is disabled: double transposed CUMSUM and CUMSUM is found, ROWCUMSUM operation is not found + Assert.assertTrue(heavyHittersContainsString(Opcodes.TRANSPOSE.toString()) || heavyHittersContainsString("sp_r'")); Assert.assertTrue(heavyHittersContainsString(Opcodes.UCUMKP.toString()) || heavyHittersContainsString("sp_bcumoffk+")); - - // Check that neither CP nor Spark ROWCUMSUM is present Assert.assertFalse(heavyHittersContainsString(Opcodes.UROWCUMKP.toString()) || heavyHittersContainsString("sp_urowcumk+")); } }