diff --git a/docs/site/dml-language-reference.md b/docs/site/dml-language-reference.md
index abafb74a5ac..4fe2e17b3cf 100644
--- a/docs/site/dml-language-reference.md
+++ b/docs/site/dml-language-reference.md
@@ -686,29 +686,30 @@ as.double(),
as.integer(),
as.logical() | A variable is cast as the
**Table 7**: Statistical Built-In Functions
-Function | Description | Parameters | Example
--------- | ----------- | ---------- | -------
-mean()
avg() | Return the mean value of all cells in matrix | Input: matrix
Output: scalar | mean(X)
-var()
sd() | Return the variance/stdDev value of all cells in matrix. Both use unbiased estimators with (n-1) denominator. | Input: matrix
Output: scalar | var(X)
sd(X)
-moment() | Returns the kth central moment of values in a column matrix V, where k = 2, 3, or 4. It can be used to compute statistical measures like Variance, Kurtosis, and Skewness. This function also takes an optional weights parameter W. | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),] k <scalar>)
Output: <scalar> | A = rand(rows=100000,cols=1, pdf="normal")
print("Variance from our (standard normal) random generator is approximately " + moment(A,2))
-colSums()
colMeans()
colVars()
colSds()
colMaxs()
colMins() | Column-wise computations -- for each column, compute the sum/mean/variance/stdDev/max/min of cell values | Input: matrix
Output: (1 x n) matrix | colSums(X)
colMeans(X)
colVars(X)
colSds(X)
colMaxs(X)
colMins(X)
-cov() | Returns the covariance between two 1-dimensional column matrices X and Y. The function takes an optional weights parameter W. All column matrices X, Y, and W (when specified) must have the exact same dimension. | Input: (X <(n x 1) matrix>, Y <(n x 1) matrix> [, W <(n x 1) matrix>)])
Output: <scalar> | cov(X,Y)
cov(X,Y,W)
-contains() | Indicates if the target matrix contains at least one pattern value (with handling of special values like Not-a-Number). | Input: (target=<matrix>,pattern=<scalar>)
Output: <scalar> | hasNaNs = contains(target=X, pattern=NaN)
-table() | Returns the contingency table of two vectors A and B. The resulting table F consists of max(A) rows and max(B) columns.
More precisely, F[i,j] = \\|{ k \\| A[k] = i and B[k] = j, 1 ≤ k ≤ n }\\|, where A and B are two n-dimensional vectors.
This function supports multiple other variants, which can be found below, at the end of this Table 7. | Input: (<(n x 1) matrix>, <(n x 1) matrix>), [<(n x 1) matrix>])
Output: <matrix> | F = table(A, B)
F = table(A, B, C)
And, several other forms (see below Table 7.)
-cdf()
pnorm()
pexp()
pchisq()
pf()
pt()
icdf()
qnorm()
qexp()
qchisq()
qf()
qt() | p=cdf(target=q, ...) returns the cumulative probability P[X <= q].
q=icdf(target=p, ...) returns the inverse cumulative probability i.e., it returns q such that the given target p = P[X<=q].
For more details, please see the section "Probability Distribution Functions" below Table 7. | Input: (target=<scalar>, dist="...", ...)
Output: <scalar> | p = cdf(target=q, dist="normal", mean=1.5, sd=2); is same as p=pnorm(target=q, mean=1.5, sd=2);
q=icdf(target=p, dist="normal") is same as q=qnorm(target=p, mean=0,sd=1)
More examples can be found in the section "Probability Distribution Functions" below Table 7.
+Function | Description | Parameters | Example
+-------- |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ---------- | -------
+mean()
avg() | Return the mean value of all cells in matrix | Input: matrix
Output: scalar | mean(X)
+var()
sd() | Return the variance/stdDev value of all cells in matrix. Both use unbiased estimators with (n-1) denominator. | Input: matrix
Output: scalar | var(X)
sd(X)
+moment() | Returns the kth central moment of values in a column matrix V, where k = 2, 3, or 4. It can be used to compute statistical measures like Variance, Kurtosis, and Skewness. This function also takes an optional weights parameter W. | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),] k <scalar>)
Output: <scalar> | A = rand(rows=100000,cols=1, pdf="normal")
print("Variance from our (standard normal) random generator is approximately " + moment(A,2))
+colSums()
colMeans()
colVars()
colSds()
colMaxs()
colMins() | Column-wise computations -- for each column, compute the sum/mean/variance/stdDev/max/min of cell values | Input: matrix
Output: (1 x n) matrix | colSums(X)
colMeans(X)
colVars(X)
colSds(X)
colMaxs(X)
colMins(X)
+cov() | Returns the covariance between two 1-dimensional column matrices X and Y. The function takes an optional weights parameter W. All column matrices X, Y, and W (when specified) must have the exact same dimension. | Input: (X <(n x 1) matrix>, Y <(n x 1) matrix> [, W <(n x 1) matrix>)])
Output: <scalar> | cov(X,Y)
cov(X,Y,W)
+contains() | Indicates if the target matrix contains at least one pattern value (with handling of special values like Not-a-Number). | Input: (target=<matrix>,pattern=<scalar>)
Output: <scalar> | hasNaNs = contains(target=X, pattern=NaN)
+table() | Returns the contingency table of two vectors A and B. The resulting table F consists of max(A) rows and max(B) columns.
More precisely, F[i,j] = \\|{ k \\| A[k] = i and B[k] = j, 1 ≤ k ≤ n }\\|, where A and B are two n-dimensional vectors.
This function supports multiple other variants, which can be found below, at the end of this Table 7. | Input: (<(n x 1) matrix>, <(n x 1) matrix>), [<(n x 1) matrix>])
Output: <matrix> | F = table(A, B)
F = table(A, B, C)
And, several other forms (see below Table 7.)
+cdf()
pnorm()
pexp()
pchisq()
pf()
pt()
icdf()
qnorm()
qexp()
qchisq()
qf()
qt() | p=cdf(target=q, ...) returns the cumulative probability P[X <= q].
q=icdf(target=p, ...) returns the inverse cumulative probability i.e., it returns q such that the given target p = P[X<=q].
For more details, please see the section "Probability Distribution Functions" below Table 7. | Input: (target=<scalar>, dist="...", ...)
Output: <scalar> | p = cdf(target=q, dist="normal", mean=1.5, sd=2); is same as p=pnorm(target=q, mean=1.5, sd=2);
q=icdf(target=p, dist="normal") is same as q=qnorm(target=p, mean=0,sd=1)
More examples can be found in the section "Probability Distribution Functions" below Table 7.
aggregate() | Splits/groups the values from X according to the corresponding values from G, and then applies the function fn on each group.
The result F is a column matrix, in which each row contains the value computed from a distinct group in G. More specifically, F[k,1] = fn( {X[i,1] \\| 1<=i<=n and G[i,1] = k} ), where n = nrow(X) = nrow(G).
Note that the distinct values in G are used as row indexes in the result matrix F. Therefore, nrow(F) = max(G). It is thus recommended that the values in G are consecutive and start from 1.
This function supports multiple other variants, which can be found below, at the end of this Table 7. | Input:
(target = X <(n x 1) matrix, or matrix>,
groups = G <(n x 1) matrix>,
fn= "..."
[,weights= W<(n x 1) matrix>]
[,ngroups=N] )
Output: F <matrix>
Note: X is a (n x 1) matrix unless ngroups is specified with no weights, in which case X is a regular (n x m) matrix.
The parameter fn takes one of the following functions: "count", "sum", "mean", "variance", "centralmoment". In the case of central moment, one must also provide the order of the moment that need to be computed (see example). | F = aggregate(target=X, groups=G, fn= "..." [,weights = W])
F = aggregate(target=X, groups=G1, fn= "sum");
F = aggregate(target=Y, groups=G2, fn= "mean", weights=W);
F = aggregate(target=Z, groups=G3, fn= "centralmoment", order= "2");
And, several other forms (see below Table 7.)
-interQuartileMean() | Returns the mean of all x in X such that x>quantile(X, 0.25) and x<=quantile(X, 0.75). X, W are column matrices (vectors) of the same size. W contains the weights for data in X. | Input: (X <(n x 1) matrix> [, W <(n x 1) matrix>)])
Output: <scalar> | interQuartileMean(X)
interQuartileMean(X, W)
-quantile () | The p-quantile for a random variable X is the value x such that Pr[X<x] <= p and Pr[X<= x] >= p
let n=nrow(X), i=ceiling(p*n), quantile() will return X[i]. p is a scalar (0<p<1) that specifies the quantile to be computed. Optionally, a weight vector may be provided for X. | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),] p <scalar>)
Output: <scalar> | quantile(X, p)
quantile(X, W, p)
-quantile () | Returns a column matrix with list of all quantiles requested in P. | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),] P <(q x 1) matrix>)
Output: matrix | quantile(X, P)
quantile(X, W, P)
-median() | Computes the median in a given column matrix of values | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),])
Output: <scalar> | median(X)
median(X,W)
-rowSums()
rowMeans()
rowVars()
rowSds()
rowMaxs()
rowMins() | Row-wise computations -- for each row, compute the sum/mean/variance/stdDev/max/min of cell value | Input: matrix
Output: (n x 1) matrix | rowSums(X)
rowMeans(X)
rowVars(X)
rowSds(X)
rowMaxs(X)
rowMins(X)
-cumsum() | Column prefix-sum (For row-prefix sum, use cumsum(t(X)) | Input: matrix
Output: matrix of the same dimensions | A = matrix("1 2 3 4 5 6", rows=3, cols=2)
B = cumsum(A)
The output matrix B = [[1, 2], [4, 6], [9, 12]]
-cumprod() | Column prefix-prod (For row-prefix prod, use cumprod(t(X)) | Input: matrix
Output: matrix of the same dimensions | A = matrix("1 2 3 4 5 6", rows=3, cols=2)
B = cumprod(A)
The output matrix B = [[1, 2], [3, 8], [15, 48]]
-cummin() | Column prefix-min (For row-prefix min, use cummin(t(X)) | Input: matrix
Output: matrix of the same dimensions | A = matrix("3 4 1 6 5 2", rows=3, cols=2)
B = cummin(A)
The output matrix B = [[3, 4], [1, 4], [1, 2]]
-cummax() | Column prefix-max (For row-prefix min, use cummax(t(X)) | Input: matrix
Output: matrix of the same dimensions | A = matrix("3 4 1 6 5 2", rows=3, cols=2)
B = cummax(A)
The output matrix B = [[3, 4], [3, 6], [5, 6]]
-sample(range, size, replacement, seed) | Sample returns a column vector of length size, containing uniform random numbers from [1, range] | Input:
range: integer
size: integer
replacement: boolean (Optional, default: FALSE)
seed: integer (Optional)
Output: Matrix dimensions are size x 1 | sample(100, 5)
sample(100, 5, TRUE)
sample(100, 120, TRUE)
sample(100, 5, 1234) # 1234 is the seed
sample(100, 5, TRUE, 1234)
-outer(vector1, vector2, "op") | Applies element wise binary operation "op" (for example: "<", "==", ">=", "*", "min") on the all combination of vector.
Note: Using "*", we get outer product of two vectors. | Input: vectors of same size d, string
Output: matrix of size d X d | A = matrix("1 4", rows = 2, cols = 1)
B = matrix("3 6", rows = 1, cols = 2)
C = outer(A, B, "<")
D = outer(A, B, "*")
The output matrix C = [[1, 1], [0, 1]]
The output matrix D = [[3, 6], [12, 24]]
-toOneHot(X, num_classes)| Converts a vector containing integers to a one-hot-encoded matrix | Input: vector with N integer entries between 1 and num_classes, number of columns (must be >= largest value in X)
Output: one-hot-encoded matrix with shape (N, num_classes) | X = round(rand(rows=10, cols=1, min=2, max=10));
num_classes = 12;
Y = toOneHot(X, num_classes);
+interQuartileMean() | Returns the mean of all x in X such that x>quantile(X, 0.25) and x<=quantile(X, 0.75). X, W are column matrices (vectors) of the same size. W contains the weights for data in X. | Input: (X <(n x 1) matrix> [, W <(n x 1) matrix>)])
Output: <scalar> | interQuartileMean(X)
interQuartileMean(X, W)
+quantile () | The p-quantile for a random variable X is the value x such that Pr[X<x] <= p and Pr[X<= x] >= p
let n=nrow(X), i=ceiling(p*n), quantile() will return X[i]. p is a scalar (0<p<1) that specifies the quantile to be computed. Optionally, a weight vector may be provided for X. | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),] p <scalar>)
Output: <scalar> | quantile(X, p)
quantile(X, W, p)
+quantile () | Returns a column matrix with list of all quantiles requested in P. | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),] P <(q x 1) matrix>)
Output: matrix | quantile(X, P)
quantile(X, W, P)
+median() | Computes the median in a given column matrix of values | Input: (X <(n x 1) matrix>, [W <(n x 1) matrix>),])
Output: <scalar> | median(X)
median(X,W)
+rowSums()
rowMeans()
rowVars()
rowSds()
rowMaxs()
rowMins() | Row-wise computations -- for each row, compute the sum/mean/variance/stdDev/max/min of cell value | Input: matrix
Output: (n x 1) matrix | rowSums(X)
rowMeans(X)
rowVars(X)
rowSds(X)
rowMaxs(X)
rowMins(X)
+cumsum() | Column prefix-sum | Input: matrix
Output: matrix of the same dimensions | A = matrix("1 2 3 4 5 6", rows=3, cols=2)
B = cumsum(A)
The output matrix B = [[1, 2], [4, 6], [9, 12]]
+rowcumsum() | Row prefix-sum | Input: matrix
Output: matrix of the same dimensions | A = matrix("1 2 3 4 5 6", rows=2, cols=3)
B = rowcumsum(A)
The output matrix B = [[1, 3, 6], [4, 9, 15]]
+cumprod() | Column prefix-prod (For row-prefix prod, use cumprod(t(X)) | Input: matrix
Output: matrix of the same dimensions | A = matrix("1 2 3 4 5 6", rows=3, cols=2)
B = cumprod(A)
The output matrix B = [[1, 2], [3, 8], [15, 48]]
+cummin() | Column prefix-min (For row-prefix min, use cummin(t(X)) | Input: matrix
Output: matrix of the same dimensions | A = matrix("3 4 1 6 5 2", rows=3, cols=2)
B = cummin(A)
The output matrix B = [[3, 4], [1, 4], [1, 2]]
+cummax() | Column prefix-max (For row-prefix min, use cummax(t(X)) | Input: matrix
Output: matrix of the same dimensions | A = matrix("3 4 1 6 5 2", rows=3, cols=2)
B = cummax(A)
The output matrix B = [[3, 4], [3, 6], [5, 6]]
+sample(range, size, replacement, seed) | Sample returns a column vector of length size, containing uniform random numbers from [1, range] | Input:
range: integer
size: integer
replacement: boolean (Optional, default: FALSE)
seed: integer (Optional)
Output: Matrix dimensions are size x 1 | sample(100, 5)
sample(100, 5, TRUE)
sample(100, 120, TRUE)
sample(100, 5, 1234) # 1234 is the seed
sample(100, 5, TRUE, 1234)
+outer(vector1, vector2, "op") | Applies element wise binary operation "op" (for example: "<", "==", ">=", "*", "min") on the all combination of vector.
Note: Using "*", we get outer product of two vectors. | Input: vectors of same size d, string
Output: matrix of size d X d | A = matrix("1 4", rows = 2, cols = 1)
B = matrix("3 6", rows = 1, cols = 2)
C = outer(A, B, "<")
D = outer(A, B, "*")
The output matrix C = [[1, 1], [0, 1]]
The output matrix D = [[3, 6], [12, 24]]
+toOneHot(X, num_classes)| Converts a vector containing integers to a one-hot-encoded matrix | Input: vector with N integer entries between 1 and num_classes, number of columns (must be >= largest value in X)
Output: one-hot-encoded matrix with shape (N, num_classes) | X = round(rand(rows=10, cols=1, min=2, max=10));
num_classes = 12;
Y = toOneHot(X, num_classes);
#### Alternative forms of table()
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java b/src/main/java/org/apache/sysds/common/Builtins.java
index 423679d038c..fe75aec6a05 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -291,6 +291,7 @@ public enum Builtins {
ROLL("roll", false),
ROUND("round", false),
ROW_COUNT_DISTINCT("rowCountDistinct",false),
+ ROWCUMSUM("rowcumsum", false),
ROWINDEXMAX("rowIndexMax", false),
ROWINDEXMIN("rowIndexMin", false),
ROWMAX("rowMaxs", false),
diff --git a/src/main/java/org/apache/sysds/common/Opcodes.java b/src/main/java/org/apache/sysds/common/Opcodes.java
index 28c5a7a6a8e..623271bf12a 100644
--- a/src/main/java/org/apache/sysds/common/Opcodes.java
+++ b/src/main/java/org/apache/sysds/common/Opcodes.java
@@ -36,6 +36,7 @@ public enum Opcodes {
UAKP("uak+", InstructionType.AggregateUnary),
UARKP("uark+", InstructionType.AggregateUnary),
UACKP("uack+", InstructionType.AggregateUnary),
+ UARCKP("uarck+", InstructionType.AggregateUnary),
UASQKP("uasqk+", InstructionType.AggregateUnary),
UARSQKP("uarsqk+", InstructionType.AggregateUnary),
UACSQKP("uacsqk+", InstructionType.AggregateUnary),
@@ -151,6 +152,7 @@ public enum Opcodes {
CEIL("ceil", InstructionType.Unary),
FLOOR("floor", InstructionType.Unary),
UCUMKP("ucumk+", InstructionType.Unary),
+ UROWCUMKP("urowcumk+", InstructionType.Unary),
UCUMM("ucum*", InstructionType.Unary),
UCUMKPM("ucumk+*", InstructionType.Unary),
UCUMMIN("ucummin", InstructionType.Unary),
@@ -383,6 +385,7 @@ public enum Opcodes {
UCUMACMIN("ucumacmin", InstructionType.CumsumAggregate),
UCUMACMAX("ucumacmax", InstructionType.CumsumAggregate),
BCUMOFFKP("bcumoffk+", InstructionType.CumsumOffset),
+ BROWCUMOFFKP("browcumoffk+", InstructionType.CumsumOffset),
BCUMOFFM("bcumoff*", InstructionType.CumsumOffset),
BCUMOFFPM("bcumoff+*", InstructionType.CumsumOffset),
BCUMOFFMIN("bcumoffmin", InstructionType.CumsumOffset),
diff --git a/src/main/java/org/apache/sysds/common/Types.java b/src/main/java/org/apache/sysds/common/Types.java
index e69ad375b20..239cdfbd37d 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -545,7 +545,7 @@ public enum OpOp1 {
CEIL, CHOLESKY, COS, COSH, CUMMAX, CUMMIN, CUMPROD, CUMSUM,
CUMSUMPROD, DET, DETECTSCHEMA, COLNAMES, EIGEN, EXISTS, EXP, FLOOR, INVERSE,
IQM, ISNA, ISNAN, ISINF, LENGTH, LINEAGE, LOG, NCOL, NOT, NROW,
- MEDIAN, PREFETCH, PRINT, ROUND, SIN, SINH, SIGN, SOFTMAX, SQRT, STOP, _EVICT,
+ MEDIAN, PREFETCH, PRINT, ROUND, ROWCUMSUM, SIN, SINH, SIGN, SOFTMAX, SQRT, STOP, _EVICT,
SVD, TAN, TANH, TYPEOF, TRIGREMOTE, SQRT_MATRIX_JAVA,
//fused ML-specific operators for performance
SPROP, //sample proportion: P * (1 - P)
@@ -589,6 +589,7 @@ public String toString() {
case MULT2: return Opcodes.MULT2.toString();
case NOT: return Opcodes.NOT.toString();
case POW2: return Opcodes.POW2.toString();
+ case ROWCUMSUM: return Opcodes.UROWCUMKP.toString();
case TYPEOF: return Opcodes.TYPEOF.toString();
default: return name().toLowerCase();
}
@@ -608,6 +609,7 @@ public static OpOp1 valueOfByOpcode(String opcode) {
case "ucummin": return CUMMIN;
case "ucum*": return CUMPROD;
case "ucumk+": return CUMSUM;
+ case "urowcumk+": return ROWCUMSUM;
case "ucumk+*": return CUMSUMPROD;
case "detectSchema": return DETECTSCHEMA;
case "*2": return MULT2;
diff --git a/src/main/java/org/apache/sysds/hops/rewrite/RewriteAlgebraicSimplificationStatic.java b/src/main/java/org/apache/sysds/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
index ef5670dda80..4f47fa101ff 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
@@ -204,6 +204,7 @@ private void rule_AlgebraicSimplification(Hop hop, boolean descendFirst)
hi = simplifyNotOverComparisons(hop, hi, i); //e.g., !(A>B) -> (A<=B)
hi = simplifyMatrixScalarPMOperation(hop, hi, i); //e.g., a-A-b -> (a-b)-A; a+A-b -> (a-b)+A
//hi = removeUnecessaryPPred(hop, hi, i); //e.g., ppred(X,X,"==")->matrix(1,rows=nrow(X),cols=ncol(X))
+ hi = simplifyTransposedCumsum(hop, hi, i); //e.g., t(cumsum(t(X))) -> rowcumsum(X)
//process childs recursively after rewrites (to investigate pattern newly created by rewrites)
if( !descendFirst )
@@ -213,6 +214,28 @@ private void rule_AlgebraicSimplification(Hop hop, boolean descendFirst)
hop.setVisited();
}
+ private static Hop simplifyTransposedCumsum( Hop parent, Hop hi, int pos )
+ {
+ //e.g., t(cumsum(t(X))) -> rowcumsum(X)
+ if( HopRewriteUtils.isTransposeOperation(hi)
+ && hi.getInput(0) instanceof UnaryOp
+ && ((UnaryOp)hi.getInput(0)).getOp() == OpOp1.CUMSUM
+ && hi.getInput(0).getParent().size() == 1
+ && HopRewriteUtils.isTransposeOperation(hi.getInput(0).getInput(0), 1)) //inner transpose with single consumer
+ {
+ UnaryOp cumsum=(UnaryOp)hi.getInput(0);
+ Hop innerMatrix = cumsum.getInput(0).getInput(0);
+
+ UnaryOp rowcumsumOp = HopRewriteUtils.createUnary(innerMatrix, OpOp1.ROWCUMSUM);
+ HopRewriteUtils.replaceChildReference(parent,hi, rowcumsumOp, pos);
+
+ hi = rowcumsumOp;
+ LOG.debug("Applied simplifyTransposedCumsum (line "+hi.getBeginLine()+").");
+ }
+
+ return hi;
+ }
+
private Hop simplifyMatrixScalarPMOperation(Hop parent, Hop hi, int pos) {
if (!(hi instanceof BinaryOp))
return hi;
diff --git a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
index ae582b052b2..540b522a8bb 100644
--- a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
+++ b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
@@ -1034,6 +1034,7 @@ else if( getAllExpr().length == 2 ) { //binary
break;
case CUMSUM:
+ case ROWCUMSUM:
case CUMPROD:
case CUMSUMPROD:
case CUMMIN:
diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
index c4f7f672abe..6827bcc4bfa 100644
--- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
@@ -2616,6 +2616,7 @@ else if ( sop.equalsIgnoreCase(Opcodes.NOTEQUAL.toString()) )
case CEIL:
case FLOOR:
case CUMSUM:
+ case ROWCUMSUM:
case CUMPROD:
case CUMSUMPROD:
case CUMMIN:
diff --git a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
index 6b196489eac..8e9aef94660 100644
--- a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
+++ b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
@@ -49,7 +49,7 @@ public class Builtin extends ValueFunction
public enum BuiltinCode { AUTODIFF, SIN, COS, TAN, SINH, COSH, TANH, ASIN, ACOS, ATAN, LOG, LOG_NZ, MIN,
MAX, ABS, SIGN, SQRT, EXP, PLOGP, PRINT, PRINTF, NROW, NCOL, LENGTH, LINEAGE, ROUND, MAXINDEX, MININDEX,
- STOP, CEIL, FLOOR, CUMSUM, CUMPROD, CUMMIN, CUMMAX, CUMSUMPROD, INVERSE, SPROP, SIGMOID, EVAL, LIST,
+ STOP, CEIL, FLOOR, CUMSUM, ROWCUMSUM, CUMPROD, CUMMIN, CUMMAX, CUMSUMPROD, INVERSE, SPROP, SIGMOID, EVAL, LIST,
TYPEOF, APPLY_SCHEMA, DETECTSCHEMA, ISNA, ISNAN, ISINF, DROP_INVALID_TYPE,
DROP_INVALID_LENGTH, VALUE_SWAP, FRAME_ROW_REPLICATE,
MAP, COUNT_DISTINCT, COUNT_DISTINCT_APPROX, UNIQUE}
@@ -95,6 +95,7 @@ public enum BuiltinCode { AUTODIFF, SIN, COS, TAN, SINH, COSH, TANH, ASIN, ACOS,
String2BuiltinCode.put( "ceil" , BuiltinCode.CEIL);
String2BuiltinCode.put( "floor" , BuiltinCode.FLOOR);
String2BuiltinCode.put( "ucumk+" , BuiltinCode.CUMSUM);
+ String2BuiltinCode.put( "urowcumk+" , BuiltinCode.ROWCUMSUM);
String2BuiltinCode.put( "ucum*" , BuiltinCode.CUMPROD);
String2BuiltinCode.put( "ucumk+*", BuiltinCode.CUMSUMPROD);
String2BuiltinCode.put( "ucummin", BuiltinCode.CUMMIN);
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
index 3c1cf9d7758..391ec1cd1d2 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
@@ -552,6 +552,8 @@ public static AggregateUnaryOperator parseBasicCumulativeAggregateUnaryOperator(
Builtin f = (Builtin)uop.fn;
if( f.getBuiltinCode()==BuiltinCode.CUMSUM )
return parseBasicAggregateUnaryOperator(Opcodes.UACKP.toString()) ;
+ else if( f.getBuiltinCode()==BuiltinCode.ROWCUMSUM )
+ return parseBasicAggregateUnaryOperator(Opcodes.UARCKP.toString()) ;
else if( f.getBuiltinCode()==BuiltinCode.CUMPROD )
return parseBasicAggregateUnaryOperator(Opcodes.UACM.toString()) ;
else if( f.getBuiltinCode()==BuiltinCode.CUMMIN )
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
index 9f469922adb..61b61b15332 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
@@ -56,6 +56,8 @@ private CumulativeOffsetSPInstruction(Operator op, CPOperand in1, CPOperand in2,
if (Opcodes.BCUMOFFKP.toString().equals(opcode))
_uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucumk+"));
+ else if (Opcodes.BROWCUMOFFKP.toString().equals(opcode))
+ _uop = new UnaryOperator(Builtin.getBuiltinFnObject("urowcumk+"));
else if (Opcodes.BCUMOFFM.toString().equals(opcode))
_uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucum*"));
else if (Opcodes.BCUMOFFPM.toString().equals(opcode)) {
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java
index e2653f4310d..d4cfc9defac 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/UnaryMatrixSPInstruction.java
@@ -21,16 +21,22 @@
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.functionobjects.KahanPlus;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.KahanObject;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
+import scala.Serializable;
+import scala.Tuple2;
+import java.util.*;
public class UnaryMatrixSPInstruction extends UnarySPInstruction {
@@ -61,6 +67,210 @@ public void processInstruction(ExecutionContext ec) {
updateUnaryOutputDataCharacteristics(sec);
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), input1.getName());
+
+ if ( "urowcumk+".equals(getOpcode()) ) {
+
+ JavaPairRDD< MatrixIndexes, Tuple2 > localRowcumsum = in.mapToPair( new LocalRowCumsumFunction() );
+
+
+ // Collect end-values of every block of every row for offset calc by grouping by global row index
+ JavaPairRDD< Long, Iterable> > rowEndValues = localRowcumsum
+ .mapToPair( tuple2 -> {
+
+ // get index of block
+ MatrixIndexes indexes = tuple2._1;
+ // get cum matrix block
+ MatrixBlock localRowcumsumBlock = tuple2._2._2;
+
+ // get row and column block index
+ long rowBlockIndex = indexes.getRowIndex();
+ long colBlockIndex = indexes.getColumnIndex();
+
+ // Save end value of every row of every block (if block is empty save 0)
+ double[] endValues = new double[ localRowcumsumBlock.getNumRows() ];
+
+ for ( int i = 0; i < localRowcumsumBlock.getNumRows(); i ++ ) {
+ if (localRowcumsumBlock.getNumColumns() > 0) {
+ endValues[i] = localRowcumsumBlock.get(i, localRowcumsumBlock.getNumColumns() - 1);
+ } else {
+ endValues[i] = 0.0 ;
+ }
+ }
+
+ return new Tuple2<>(rowBlockIndex, new Tuple3<>(rowBlockIndex, colBlockIndex, endValues));
+ }
+
+ ).groupByKey();
+
+
+
+
+ // compute offset for every block
+ List< Tuple2 , double[]> > offsetList = rowEndValues
+ .flatMapToPair(tuple2 -> {
+
+ Long rowBlockIndex = tuple2._1;
+
+ List< Tuple3 > colValues = new ArrayList<>();
+ for ( Tuple3 cv : tuple2._2 ) {
+ colValues.add(cv);
+ }
+
+ // sort blocks from one row by column index
+ colValues.sort(Comparator.comparing(Tuple3::_2));
+
+ // get number of rows of a block by counting amount of end (row) values of said block
+ int numberOfRows = 0;
+ if ( !colValues.isEmpty() ) {
+ Tuple3 firstTuple = colValues.get(0);
+ double[] lastValuesArray = firstTuple._3();
+ numberOfRows = lastValuesArray.length;
+ }
+
+
+ List, double[]>> blockOffsets = new ArrayList<>();
+
+ double[] cumulativeOffsets = new double[numberOfRows];
+
+ for (Tuple3 colValue : colValues) {
+
+ Long colBlockIndex = colValue._2();
+ double[] endValues = colValue._3();
+
+ // copy current offsets
+ double[] currentOffsets = cumulativeOffsets.clone();
+
+ // and save block indexes with its offsets
+ blockOffsets.add( new Tuple2<>(new Tuple2<>(rowBlockIndex, colBlockIndex), currentOffsets) );
+
+ for ( int i = 0; i < numberOfRows && i < endValues.length; i++ ) {
+ cumulativeOffsets[i] += endValues[i];
+ }
+
+ }
+ return blockOffsets.iterator();
+ }
+ ).collect();
+
+
+ // convert list to map for easier access to offsets
+ Map< Tuple2, double[] > offsetMap = new HashMap<>();
+ for (Tuple2, double[]> offset : offsetList) {
+ offsetMap.put(offset._1, offset._2);
+ }
+
+
+ out = localRowcumsum.mapToPair( new FinalRowCumsumFunction(offsetMap)) ;
+
+ updateUnaryOutputDataCharacteristics(sec);
+ sec.setRDDHandleForVariable(output.getName(), out);
+ sec.addLineageRDD(output.getName(), input1.getName());
+ }
+ }
+
+
+
+ private static class LocalRowCumsumFunction implements PairFunction< Tuple2, MatrixIndexes, Tuple2 > {
+
+ @Override
+ public Tuple2< MatrixIndexes, Tuple2 > call(Tuple2 tuple2) {
+
+
+ MatrixBlock inputBlock = tuple2._2;
+ MatrixBlock cumsumBlock = new MatrixBlock( inputBlock.getNumRows(), inputBlock.getNumColumns(), false );
+
+
+ for ( int i = 0; i < inputBlock.getNumRows(); i++ ) {
+
+ KahanObject kbuff = new KahanObject(0, 0);
+ KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+
+ for ( int j = 0; j < inputBlock.getNumColumns(); j++ ) {
+
+ double val = inputBlock.get(i, j);
+ kplus.execute2(kbuff, val);
+ cumsumBlock.set(i, j, kbuff._sum);
+ }
+ }
+ // original index, original matrix and local cumsum block
+ return new Tuple2<>( tuple2._1, new Tuple2<>(inputBlock, cumsumBlock) );
+ }
+ }
+
+
+
+
+ private static class FinalRowCumsumFunction implements PairFunction >, MatrixIndexes, MatrixBlock> {
+
+
+ // map block indexes to the row offsets
+ private final Map< Tuple2, double[] > offsetMap;
+
+ public FinalRowCumsumFunction(Map, double[]> offsetMap) {
+ this.offsetMap = offsetMap;
+ }
+
+
+ @Override
+ public Tuple2 call( Tuple2< MatrixIndexes, Tuple2 > tuple ) {
+
+ MatrixIndexes indexes = tuple._1;
+ MatrixBlock inputBlock = tuple._2._1;
+ MatrixBlock localRowCumsumBlock = tuple._2._2;
+
+ // key to get the row offset for this block
+ Tuple2 blockKey = new Tuple2<>( indexes.getRowIndex(), indexes.getColumnIndex()) ;
+ double[] offsets = offsetMap.get(blockKey);
+
+ MatrixBlock cumsumBlock = new MatrixBlock( inputBlock.getNumRows(), inputBlock.getNumColumns(), false );
+
+
+ for ( int i = 0; i < inputBlock.getNumRows(); i++ ) {
+
+ double rowOffset = 0.0;
+ if ( offsets != null && i < offsets.length ) {
+ rowOffset = offsets[i];
+ }
+
+ for ( int j = 0; j < inputBlock.getNumColumns(); j++ ) {
+ double cumsumValue = localRowCumsumBlock.get(i, j);
+ cumsumBlock.set(i, j, cumsumValue + rowOffset);
+ }
+ }
+
+ // block index and final cumsum block
+ return new Tuple2<>(indexes, cumsumBlock);
+ }
+ }
+
+
+
+ // helper class
+ private static class Tuple3 implements Serializable {
+
+ private static final long serialVersionUID = 123;
+ private final Type1 _1;
+ private final Type2 _2;
+ private final Type3 _3;
+
+
+ public Tuple3( Type1 _1, Type2 _2, Type3 _3 ) {
+ this._1 = _1;
+ this._2 = _2;
+ this._3 = _3;
+ }
+
+ public Type1 _1() {
+ return _1;
+ }
+
+ public Type2 _2() {
+ return _2;
+ }
+
+ public Type3 _3() {
+ return _3;
+ }
}
private static class RDDMatrixBuiltinUnaryOp implements Function
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java
index 3cba9fb8c57..59301db7ece 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java
@@ -112,6 +112,7 @@ private enum AggType {
SUM,
SUM_SQ,
CUM_KAHAN_SUM,
+ ROW_CUM_SUM,
CUM_MIN,
CUM_MAX,
CUM_PROD,
@@ -783,6 +784,7 @@ private static AggType getAggType( UnaryOperator op ) {
BuiltinCode bfunc = ((Builtin) vfn).bFunc;
switch( bfunc ) {
case CUMSUM: return AggType.CUM_KAHAN_SUM;
+ case ROWCUMSUM: return AggType.ROW_CUM_SUM;
case CUMPROD: return AggType.CUM_PROD;
case CUMMIN: return AggType.CUM_MIN;
case CUMMAX: return AggType.CUM_MAX;
@@ -1548,6 +1550,12 @@ else if( ixFn instanceof ReduceRow ) //COLSUM_SQ
d_ucumkp(in.getDenseBlock(), null, out.getDenseBlock(), n, kbuff, kplus, rl, ru);
break;
}
+ case ROW_CUM_SUM: { //ROWCUMSUM
+ KahanObject kbuff = new KahanObject(0, 0);
+ KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+ d_urowcumkp(in.getDenseBlock(), null, out.getDenseBlock(), n, kbuff, kplus, rl, ru);
+ break;
+ }
case CUM_PROD: { //CUMPROD
d_ucumm(in.getDenseBlockValues(), null, out.getDenseBlockValues(), n, rl, ru);
break;
@@ -1666,6 +1674,12 @@ else if( ixFn instanceof ReduceRow ) //COLSUM_SQ
s_ucumkp(a, null, out.getDenseBlock(), m, n, kbuff, kplus, rl, ru);
break;
}
+ case ROW_CUM_SUM: { //ROWCUMSUM
+ KahanObject kbuff = new KahanObject(0, 0);
+ KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+ s_urowcumkp(a, null, out.getDenseBlock(), m, n, kbuff, kplus, rl, ru);
+ break;
+ }
case CUM_PROD: { //CUMPROD
s_ucumm(a, null, out.getDenseBlockValues(), n, rl, ru);
break;
@@ -1747,6 +1761,12 @@ private static void cumaggregateUnaryMatrixDense(MatrixBlock in, MatrixBlock out
d_ucumkp(da, agg, dc, n, kbuff, kplus, rl, ru);
break;
}
+ case ROW_CUM_SUM: { //ROWCUMSUM
+ KahanObject kbuff = new KahanObject(0, 0);
+ KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+ d_urowcumkp(da, agg, dc, n, kbuff, kplus, rl, ru);
+ break;
+ }
case CUM_SUM_PROD: { //CUMSUMPROD
if( n != 2 )
throw new DMLRuntimeException("Cumsumprod expects two-column input (n="+n+").");
@@ -1791,6 +1811,12 @@ private static void cumaggregateUnaryMatrixSparse(MatrixBlock in, MatrixBlock ou
s_ucumkp(a, agg, dc, m, n, kbuff, kplus, rl, ru);
break;
}
+ case ROW_CUM_SUM: { //ROWCUMSUM
+ KahanObject kbuff = new KahanObject(0, 0);
+ KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+ s_urowcumkp(a, agg, dc, m, n, kbuff, kplus, rl, ru);
+ break;
+ }
case CUM_SUM_PROD: { //CUMSUMPROD
if( n != 2 )
throw new DMLRuntimeException("Cumsumprod expects two-column input (n="+n+").");
@@ -1821,6 +1847,7 @@ private static MatrixBlock aggregateUnaryMatrixEmpty(MatrixBlock in, MatrixBlock
case SUM:
case SUM_SQ:
case KAHAN_SUM:
+ case ROW_CUM_SUM:
case KAHAN_SUM_SQ: val = 0; break;
case MIN: val = Double.POSITIVE_INFINITY; break;
case MAX: val = Double.NEGATIVE_INFINITY; break;
@@ -1838,7 +1865,7 @@ private static MatrixBlock aggregateUnaryMatrixEmpty(MatrixBlock in, MatrixBlock
if(optype == AggType.KAHAN_SUM || optype == AggType.KAHAN_SUM_SQ
|| optype == AggType.SUM || optype == AggType.SUM_SQ
|| optype == AggType.MIN || optype == AggType.MAX || optype == AggType.PROD
- || optype == AggType.CUM_KAHAN_SUM || optype == AggType.CUM_PROD
+ || optype == AggType.CUM_KAHAN_SUM || optype == AggType.ROW_CUM_SUM || optype == AggType.CUM_PROD
|| optype == AggType.CUM_MIN || optype == AggType.CUM_MAX)
{
return out;
@@ -2099,6 +2126,39 @@ private static void d_ucumkp( DenseBlock a, double[] agg, DenseBlock c, int n, K
c.set(i, csums.values(0));
}
}
+
+ /**
+ * ROWCUMSUM, opcode: urowcumk+, dense input.
+ *
+ * @param a input matrix
+ * @param agg initial array
+ * @param c output matrix
+ * @param n number of rows
+ * @param kbuff collects sum
+ * @param kplus sums up
+ * @param rl row lower index
+ * @param ru row upper index
+ */
+ private static void d_urowcumkp( DenseBlock a, double[] agg, DenseBlock c, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) {
+ //row-wise cumulative sum w/ optional row offsets
+ for (int i = rl; i < ru; i++) {
+ double start = 0.0;
+ int localRow = i - rl;
+ if (agg != null) {
+ if (localRow >= 0 && localRow < agg.length) {
+ start = agg[localRow];
+ }
+ }
+ kbuff.set(start, 0);
+ //compute cumulative sum over row
+ for (int j = 0; j < n; j++) {
+ double val = a.get(i, j);
+ kplus.execute2(kbuff, val);
+ c.set(i, j, kbuff._sum);
+ }
+
+ }
+ }
/**
* CUMSUMPROD, opcode: ucumk+*, dense input.
@@ -2750,6 +2810,51 @@ private static void s_ucumkp( SparseBlock a, double[] agg, DenseBlock c, int m,
c.set(i, csums.values(0));
}
}
+
+ /**
+ * ROWCUMSUM, opcode: urowcumk+, sparse input.
+ *
+ * @param a input matrix
+ * @param agg intial array
+ * @param c output matrix
+ * @param m number of columns
+ * @param n number of rows
+ * @param kbuff collects sum
+ * @param kplus sums up
+ * @param rl row lower index
+ * @param ru row upper index
+ */
+ private static void s_urowcumkp(SparseBlock a, double[] agg, DenseBlock c, int m, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru) {
+ //scan rows and compute row-wise prefix sums
+ for (int i = rl; i < ru; i++) {
+ double start = 0.0;
+ int localRow = i - rl;
+ if (agg != null && localRow >= 0 && localRow < agg.length)
+ start = agg[localRow];
+ if (!a.isEmpty(i)) {
+ double[] ain = a.values(i);
+ int[] aix = a.indexes(i);
+ int apos = a.pos(i);
+ int alen = a.size(i);
+ kbuff.set(start, 0);
+ int sparseIdx = 0;
+ //prefix sum over sparse row
+ for (int j = 0; j < n; j++) {
+ if (sparseIdx < alen && aix[apos + sparseIdx] == j) {
+ kplus.execute2(kbuff, ain[apos + sparseIdx]);
+ start = kbuff._sum;
+ sparseIdx++;
+ }
+ c.set(i, j, start);
+ }
+ }
+ else {
+ //fill empty row with start value
+ for (int j = 0; j < n; j++)
+ c.set(i, j, start);
+ }
+ }
+ }
/**
diff --git a/src/test/java/org/apache/sysds/test/functions/rewrite/RewriteSimplifyTransposedCumsumTest.java b/src/test/java/org/apache/sysds/test/functions/rewrite/RewriteSimplifyTransposedCumsumTest.java
new file mode 100644
index 00000000000..45842437d35
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/rewrite/RewriteSimplifyTransposedCumsumTest.java
@@ -0,0 +1,172 @@
+package org.apache.sysds.test.functions.rewrite;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Opcodes;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.common.Types.ExecType;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.matrix.data.MatrixValue;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.utils.Statistics;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+
+public class RewriteSimplifyTransposedCumsumTest extends AutomatedTestBase{
+ private static final String TEST_NAME = "RewriteSimplifyTransposedCumsum";
+ private static final String TEST_DIR = "functions/rewrite/";
+ private static final String TEST_CLASS_DIR = TEST_DIR + RewriteSimplifyTransposedCumsumTest.class.getSimpleName() + "/";
+
+ private static final double eps = 1e-10;
+
+ private static final int rowsMatrix = 1201;
+ private static final int colsMatrix = 1103;
+ private static final double spSparse = 0.1;
+ private static final double spDense = 0.9;
+
+ private enum InputType {
+ COL_VECTOR,
+ ROW_VECTOR,
+ MATRIX
+ }
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
+ if (TEST_CACHE_ENABLED) {
+ setOutAndExpectedDeletionDisabled(true);
+ }
+ }
+
+ @BeforeClass
+ public static void init() {
+ TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR);
+ }
+
+ @AfterClass
+ public static void cleanUp() {
+ if (TEST_CACHE_ENABLED) {
+ TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR);
+ }
+ }
+
+ // dense cp
+ @Test public void testRewriteMatrixDenseCPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.MATRIX, false, ExecType.CP, false); }
+ @Test public void testRewriteMatrixDenseCP() { testRewriteSimplifyRowcumsum(InputType.MATRIX, false, ExecType.CP, true); }
+
+ @Test public void testRewriteColVectorDenseCPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, false, ExecType.CP, false); }
+ @Test public void testRewriteColVectorDenseCP() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, false, ExecType.CP, true); }
+
+ @Test public void testRewriteRowVectorDenseCPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, false, ExecType.CP, false); }
+ @Test public void testRewriteRowVectorDenseCP() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, false, ExecType.CP, true); }
+
+ // sparse cp
+ @Test public void testRewriteMatrixSparseCPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.MATRIX, true, ExecType.CP, false); }
+ @Test public void testRewriteMatrixSparseCP() { testRewriteSimplifyRowcumsum(InputType.MATRIX, true, ExecType.CP, true); }
+
+ @Test public void testRewriteColVectorSparseCPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, true, ExecType.CP, false); }
+ @Test public void testRewriteColVectorSparseCP() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, true, ExecType.CP, true); }
+
+ @Test public void testRewriteRowVectorSparseCPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, true, ExecType.CP, false); }
+ @Test public void testRewriteRowVectorSparseCP() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, true, ExecType.CP, true); }
+
+
+ // dense sp
+ @Test public void testRewriteMatrixDenseSPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.MATRIX, false, ExecType.SPARK, false); }
+ @Test public void testRewriteMatrixDenseSP() { testRewriteSimplifyRowcumsum(InputType.MATRIX, false, ExecType.SPARK, true); }
+
+ @Test public void testRewriteColVectorDenseSPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, false, ExecType.SPARK, false); }
+ @Test public void testRewriteColVectorDenseSP() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, false, ExecType.SPARK, true); }
+
+ @Test public void testRewriteRowVectorDenseSPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, false, ExecType.SPARK, false); }
+ @Test public void testRewriteRowVectorDenseSP() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, false, ExecType.SPARK, true); }
+
+ // sparse sp
+ @Test public void testRewriteMatrixSparseSPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.MATRIX, true, ExecType.SPARK, false); }
+ @Test public void testRewriteMatrixSparseSP() { testRewriteSimplifyRowcumsum(InputType.MATRIX, true, ExecType.SPARK, true); }
+
+ @Test public void testRewriteColVectorSparseSPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, true, ExecType.SPARK, false); }
+ @Test public void testRewriteColVectorSparseSP() { testRewriteSimplifyRowcumsum(InputType.COL_VECTOR, true, ExecType.SPARK, true); }
+
+ @Test public void testRewriteRowVectorSparseSPNoRewrite() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, true, ExecType.SPARK, false); }
+ @Test public void testRewriteRowVectorSparseSP() { testRewriteSimplifyRowcumsum(InputType.ROW_VECTOR, true, ExecType.SPARK, true); }
+
+
+ private void testRewriteSimplifyRowcumsum(InputType type, boolean sparse, ExecType instType, boolean rewrites) {
+
+ ExecMode platformOld = rtplatform;
+ switch( instType ){
+ case SPARK: rtplatform = ExecMode.SPARK; break;
+ default: rtplatform = ExecMode.HYBRID; break;
+ }
+
+ boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+ if( rtplatform == ExecMode.SPARK )
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+ //rewrites
+ boolean oldFlagRewrites = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
+ OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = rewrites;
+
+
+ try {
+ // Determine matrix dimensions based on InputType
+ int rows = (type == InputType.ROW_VECTOR) ? 1 : rowsMatrix;
+ int cols = (type == InputType.COL_VECTOR) ? 1 : colsMatrix;
+ double sparsity = (sparse) ? spSparse : spDense;
+
+ String TEST_CACHE_DIR = !TEST_CACHE_ENABLED ? "" :
+ type.ordinal() + "_" + sparsity + "/";
+
+ TestConfiguration config = getTestConfiguration(TEST_NAME);
+ loadTestConfiguration(config, TEST_CACHE_DIR);
+
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[] {"-stats", "-args", input("A"), output("B")};
+
+ fullRScriptName = HOME + TEST_NAME + ".R";
+ rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + " " + expectedDir();
+
+ // create and write matrix
+ double[][] A = getRandomMatrix(rows, cols, -0.05, 1, sparsity, 7);
+ writeInputMatrixWithMTD("A", A, true);
+
+ runTest(true, false, null, -1);
+ if( instType == ExecType.CP ) {
+ Assert.assertEquals("Unexpected number of executed Spark jobs.", 0, Statistics.getNoOfExecutedSPInst());
+ }
+
+ runRScript(true);
+
+ //compare matrices
+ HashMap dmlfile = readDMLMatrixFromOutputDir("B");
+ HashMap rfile = readRMatrixFromExpectedDir("B");
+ TestUtils.compareMatrices(dmlfile, rfile, eps, "Stat-DML", "Stat-R");
+
+ // Assertions for opcodes
+ if(rewrites) {
+ // rewrite is enabled: double transposed CUMSUM and CUMSUM is not found, ROWCUMSUM operation is found
+ Assert.assertFalse(heavyHittersContainsString(Opcodes.TRANSPOSE.toString()) || heavyHittersContainsString("sp_r'"));
+ Assert.assertFalse(heavyHittersContainsString(Opcodes.UCUMKP.toString()) || heavyHittersContainsString("sp_bcumoffk+"));
+ Assert.assertTrue(heavyHittersContainsString(Opcodes.UROWCUMKP.toString()) || heavyHittersContainsString("sp_urowcumk+"));
+ } else {
+ // rewrite is disabled: double transposed CUMSUM and CUMSUM is found, ROWCUMSUM operation is not found
+ Assert.assertTrue(heavyHittersContainsString(Opcodes.TRANSPOSE.toString()) || heavyHittersContainsString("sp_r'"));
+ Assert.assertTrue(heavyHittersContainsString(Opcodes.UCUMKP.toString()) || heavyHittersContainsString("sp_bcumoffk+"));
+ Assert.assertFalse(heavyHittersContainsString(Opcodes.UROWCUMKP.toString()) || heavyHittersContainsString("sp_urowcumk+"));
+ }
+ }
+ finally {
+ rtplatform = platformOld;
+ DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+ OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = oldFlagRewrites;
+ }
+ }
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/unary/matrix/FullRowcumsumTest.java b/src/test/java/org/apache/sysds/test/functions/unary/matrix/FullRowcumsumTest.java
new file mode 100644
index 00000000000..297f39d6778
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/unary/matrix/FullRowcumsumTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.unary.matrix;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.common.Types.ExecType;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.utils.Statistics;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class FullRowcumsumTest extends AutomatedTestBase
+{
+ private final static String TEST_NAME = "Rowcumsum";
+ private final static String TEST_DIR = "functions/unary/matrix/";
+ private static final String TEST_CLASS_DIR = TEST_DIR + FullRowcumsumTest.class.getSimpleName() + "/";
+
+ private final static double eps = 1e-10;
+
+ private final static int rowsMatrix = 1201;
+ private final static int colsMatrix = 1103;
+ private final static double spSparse = 0.1;
+ private final static double spDense = 0.9;
+
+ private enum InputType {
+ COL_VECTOR,
+ ROW_VECTOR,
+ MATRIX
+ }
+
+ @Override
+ public void setUp()
+ {
+ addTestConfiguration(TEST_NAME,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME,new String[]{"B"}));
+
+ if (TEST_CACHE_ENABLED) {
+ setOutAndExpectedDeletionDisabled(true);
+ }
+ }
+
+ @BeforeClass
+ public static void init() {
+ TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR);
+ }
+
+ @AfterClass
+ public static void cleanUp() {
+ if (TEST_CACHE_ENABLED) {
+ TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR);
+ }
+ }
+
+ @Test
+ public void testRowcumsumColVectorDenseCP() {
+ runColAggregateOperationTest(InputType.COL_VECTOR, false, ExecType.CP);
+ }
+
+ @Test
+ public void testRowcumsumRowVectorDenseCP() {
+ runColAggregateOperationTest(InputType.ROW_VECTOR, false, ExecType.CP);
+ }
+
+ @Test
+ public void testRowcumsumRowVectorDenseNoRewritesCP() {
+ runColAggregateOperationTest(InputType.ROW_VECTOR, false, ExecType.CP, false);
+ }
+
+ @Test
+ public void testRowcumsumColVectorDenseNoRewritesCP() {
+ runColAggregateOperationTest(InputType.COL_VECTOR, false, ExecType.CP, false);
+ }
+
+ @Test
+ public void testRowcumsumMatrixDenseCP() {
+ runColAggregateOperationTest(InputType.MATRIX, false, ExecType.CP);
+ }
+
+ @Test
+ public void testRowcumsumColVectorSparseCP() {
+ runColAggregateOperationTest(InputType.COL_VECTOR, true, ExecType.CP);
+ }
+
+ @Test
+ public void testRowcumsumRowVectorSparseCP() {
+ runColAggregateOperationTest(InputType.ROW_VECTOR, true, ExecType.CP);
+ }
+
+ @Test
+ public void testRowcumsumRowVectorSparseNoRewritesCP() {
+ runColAggregateOperationTest(InputType.ROW_VECTOR, true, ExecType.CP, false);
+ }
+
+ @Test
+ public void testRowcumsumColVectorSparseNoRewritesCP() {
+ runColAggregateOperationTest(InputType.COL_VECTOR, true, ExecType.CP, false);
+ }
+
+ @Test
+ public void testRowcumsumMatrixSparseCP() {
+ runColAggregateOperationTest(InputType.MATRIX, true, ExecType.CP);
+ }
+
+ @Test
+ public void testRowcumsumColVectorDenseSP() {
+ runColAggregateOperationTest(InputType.COL_VECTOR, false, ExecType.SPARK);
+ }
+
+ @Test
+ public void testRowcumsumRowVectorDenseSP() {
+ runColAggregateOperationTest(InputType.ROW_VECTOR, false, ExecType.SPARK);
+ }
+
+ @Test
+ public void testRowcumsumRowVectorDenseNoRewritesSP() {
+ runColAggregateOperationTest(InputType.ROW_VECTOR, false, ExecType.SPARK, false);
+ }
+
+ @Test
+ public void testRowcumsumColVectorDenseNoRewritesSP() {
+ runColAggregateOperationTest(InputType.COL_VECTOR, false, ExecType.SPARK, false);
+ }
+
+ @Test
+ public void testRowcumsumMatrixDenseSP() {
+ runColAggregateOperationTest(InputType.MATRIX, false, ExecType.SPARK);
+ }
+
+ @Test
+ public void testRowcumsumColVectorSparseSP() {
+ runColAggregateOperationTest(InputType.COL_VECTOR, true, ExecType.SPARK);
+ }
+
+ @Test
+ public void testRowcumsumRowVectorSparseSP() {
+ runColAggregateOperationTest(InputType.ROW_VECTOR, true, ExecType.SPARK);
+ }
+
+ @Test
+ public void testRowcumsumRowVectorSparseNoRewritesSP() {
+ runColAggregateOperationTest(InputType.ROW_VECTOR, true, ExecType.SPARK, false);
+ }
+
+ @Test
+ public void testRowcumsumColVectorSparseNoRewritesSP() {
+ runColAggregateOperationTest(InputType.COL_VECTOR, true, ExecType.SPARK, false);
+ }
+
+ @Test
+ public void testRowcumsumMatrixSparseSP() {
+ runColAggregateOperationTest(InputType.MATRIX, true, ExecType.SPARK);
+ }
+
+ private void runColAggregateOperationTest( InputType type, boolean sparse, ExecType instType) {
+ //by default we apply algebraic simplification rewrites
+ runColAggregateOperationTest(type, sparse, instType, true);
+ }
+
+ private void runColAggregateOperationTest( InputType type, boolean sparse, ExecType instType, boolean rewrites)
+ {
+ ExecMode platformOld = rtplatform;
+ switch( instType ){
+ case SPARK: rtplatform = ExecMode.SPARK; break;
+ default: rtplatform = ExecMode.HYBRID; break;
+ }
+
+ boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+ if( rtplatform == ExecMode.SPARK )
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+ //rewrites
+ boolean oldFlagRewrites = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
+ OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = rewrites;
+
+ try
+ {
+ int cols = (type== InputType.COL_VECTOR) ? 1 : colsMatrix;
+ int rows = (type== InputType.ROW_VECTOR) ? 1 : rowsMatrix;
+ double sparsity = (sparse) ? spSparse : spDense;
+
+ String TEST_CACHE_DIR = !TEST_CACHE_ENABLED ? "" :
+ type.ordinal() + "_" + sparsity + "/";
+
+ TestConfiguration config = getTestConfiguration(TEST_NAME);
+ loadTestConfiguration(config, TEST_CACHE_DIR);
+
+ // This is for running the junit test the new way, i.e., construct the arguments directly
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[]{"-explain", "-args", input("A"), output("B") };
+
+ fullRScriptName = HOME + TEST_NAME + ".R";
+ rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + " " + expectedDir();
+
+ //generate actual dataset
+ double[][] A = getRandomMatrix(rows, cols, -0.05, 1, sparsity, 7);
+ writeInputMatrixWithMTD("A", A, true);
+
+ runTest(true, false, null, -1);
+ if( instType==ExecType.CP ) //in CP no spark jobs should be executed
+ Assert.assertEquals("Unexpected number of executed MR jobs.", 0, Statistics.getNoOfExecutedSPInst());
+
+ runRScript(true);
+
+ //compare matrices
+ HashMap dmlfile = readDMLMatrixFromOutputDir("B");
+ HashMap rfile = readRMatrixFromExpectedDir("B");
+ TestUtils.compareMatrices(dmlfile, rfile, eps, "Stat-DML", "Stat-R");
+ }
+ finally
+ {
+ rtplatform = platformOld;
+ DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+ OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = oldFlagRewrites;
+ }
+ }
+}
diff --git a/src/test/scripts/functions/rewrite/RewriteSimplifyTransposedCumsum.R b/src/test/scripts/functions/rewrite/RewriteSimplifyTransposedCumsum.R
new file mode 100644
index 00000000000..7c80e304f63
--- /dev/null
+++ b/src/test/scripts/functions/rewrite/RewriteSimplifyTransposedCumsum.R
@@ -0,0 +1,36 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+args <- commandArgs(TRUE)
+options(digits=22)
+
+library("Matrix")
+
+A = as.matrix(readMM(paste(args[1], "A.mtx", sep="")))
+
+if( ncol(A)>1 ){
+ B = t(apply(A, 1, cumsum));
+} else {
+ B = A;
+}
+
+writeMM(as(B, "CsparseMatrix"), paste(args[2], "B", sep=""));
\ No newline at end of file
diff --git a/src/test/scripts/functions/rewrite/RewriteSimplifyTransposedCumsum.dml b/src/test/scripts/functions/rewrite/RewriteSimplifyTransposedCumsum.dml
new file mode 100644
index 00000000000..c2dff2fdec0
--- /dev/null
+++ b/src/test/scripts/functions/rewrite/RewriteSimplifyTransposedCumsum.dml
@@ -0,0 +1,25 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+A = read($1);
+B = t(cumsum(t(A))); #this should trigger the rewrite: t(cumsum(t(A))) -> rowcumsum(A)
+write(B, $2);
\ No newline at end of file
diff --git a/src/test/scripts/functions/unary/matrix/Rowcumsum.R b/src/test/scripts/functions/unary/matrix/Rowcumsum.R
new file mode 100644
index 00000000000..7c80e304f63
--- /dev/null
+++ b/src/test/scripts/functions/unary/matrix/Rowcumsum.R
@@ -0,0 +1,36 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+args <- commandArgs(TRUE)
+options(digits=22)
+
+library("Matrix")
+
+A = as.matrix(readMM(paste(args[1], "A.mtx", sep="")))
+
+if( ncol(A)>1 ){
+ B = t(apply(A, 1, cumsum));
+} else {
+ B = A;
+}
+
+writeMM(as(B, "CsparseMatrix"), paste(args[2], "B", sep=""));
\ No newline at end of file
diff --git a/src/test/scripts/functions/unary/matrix/Rowcumsum.dml b/src/test/scripts/functions/unary/matrix/Rowcumsum.dml
new file mode 100644
index 00000000000..8b4b4fb0a90
--- /dev/null
+++ b/src/test/scripts/functions/unary/matrix/Rowcumsum.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+A = read($1);
+B = rowcumsum(A);
+write(B, $2);
+