Skip to content

Commit 48dc639

Browse files
author
dmsuehir
committed
Adding ARX model from pull #51 with fixed predict().
1 parent a52f443 commit 48dc639

File tree

4 files changed

+296
-0
lines changed

4 files changed

+296
-0
lines changed

src/main/scala/com/cloudera/sparkts/Lag.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,28 @@ private[sparkts] object Lag {
9797
}
9898
}
9999
}
100+
101+
/**
102+
* Creates a lagged matrix from a current matrix (represented in row-array form). Lags each column
103+
* the appropriate amount of times and then concatenates the columns.
104+
* So given a matrix [a b c], where a/b/c are column vectors, and calling with lag of 2, becomes a
105+
* matrix of the form [a_-1 a_-2 b_-1 b_-2 c_-1 c_-2]
106+
*/
107+
def lagMatTrimBoth(
108+
x: Array[Array[Double]],
109+
maxLag: Int,
110+
includeOriginal: Boolean): Array[Array[Double]] = {
111+
val xt = x.transpose
112+
// one matrix per column, consisting of all its lags
113+
val matrices = for (col <- xt) yield {
114+
Lag.lagMatTrimBoth(col, maxLag, includeOriginal)
115+
}
116+
// merge the matrices into 1 matrix by concatenating col-wise
117+
matrices.transpose.map(_.reduceLeft(_ ++ _))
118+
}
119+
120+
def lagMatTrimBoth(x: Array[Array[Double]], maxLag: Int)
121+
: Array[Array[Double]] = {
122+
lagMatTrimBoth(x, maxLag, false)
123+
}
100124
}

src/main/scala/com/cloudera/sparkts/MatrixUtil.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ private[sparkts] object MatrixUtil {
3636
arrs
3737
}
3838

39+
def matToRowArrs(mat: Matrix[Double]): Array[Array[Double]] = {
40+
val arrs = new Array[Array[Double]](mat.rows)
41+
for (r <- 0 until mat.rows) {
42+
arrs(r) = mat(r to r, 0 to mat.cols - 1).toDenseMatrix.toArray
43+
}
44+
arrs
45+
}
46+
3947
def arrsToMat(arrs: Iterator[Array[Double]]): DenseMatrix[Double] = {
4048
vecArrsToMats(arrs, arrs.length).next()
4149
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/**
2+
* Copyright (c) 2016, Cloudera, Inc. All Rights Reserved.
3+
*
4+
* Cloudera, Inc. licenses this file to you under the Apache License,
5+
* Version 2.0 (the "License"). You may not use this file except in
6+
* compliance with the License. You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11+
* CONDITIONS OF ANY KIND, either express or implied. See the License for
12+
* the specific language governing permissions and limitations under the
13+
* License.
14+
*/
15+
16+
package com.cloudera.sparkts.models
17+
18+
import breeze.linalg._
19+
import com.cloudera.sparkts.Lag
20+
import com.cloudera.sparkts.MatrixUtil.{matToRowArrs, toBreeze}
21+
import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression
22+
23+
/**
24+
* Models a timeseries as a function of itself (autoregressive terms) and exogenous variables, which
25+
* are lagged up to degree xMaxLag.
26+
*/
27+
object AutoregressionX {
28+
/**
29+
* Fit an autoregressive model with additional exogenous variables. The model predicts a value
30+
* at time t of a dependent variable, Y, as a function of previous values of Y, and a combination
31+
* of previous values of exogenous regressors X_i, and current values of exogenous regressors X_i.
32+
* This is a generalization of an AR model, which is simple an ARX with no exogenous regressors.
33+
* The fitting procedure here is the same, using least squares. Note that all lags up to the
34+
* maxlag are included. In the case of the dependent variable the max lag is 'yMaxLag', while
35+
* for the exogenous variables the max lag is 'xMaxLag', with which each column in the original
36+
* matrix provided is lagged accordingly.
37+
* @param y the dependent variable, time series
38+
* @param x a matrix of exogenous variables
39+
* @param yMaxLag the maximum lag order for the dependent variable
40+
* @param xMaxLag the maximum lag order for exogenous variables
41+
* @param includeOriginalX a boolean flag indicating if the non-lagged exogenous variables should
42+
* be included. Default is true
43+
* @param noIntercept a boolean flag indicating if the intercept should be dropped. Default is
44+
* false
45+
* @return an ARXModel, which is an autoregressive model with exogenous variables
46+
*/
47+
def fitModel(
48+
y: Vector[Double],
49+
x: Matrix[Double],
50+
yMaxLag: Int,
51+
xMaxLag: Int,
52+
includeOriginalX: Boolean = true,
53+
noIntercept: Boolean = false): ARXModel = {
54+
val maxLag = max(yMaxLag, xMaxLag)
55+
val arrY = y.toArray
56+
// Make left hand side, note that we must drop the first maxLag terms
57+
val trimY = arrY.drop(maxLag)
58+
// Create predictors
59+
val predictors = assemblePredictors(arrY, matToRowArrs(x), yMaxLag, xMaxLag, includeOriginalX)
60+
val regression = new OLSMultipleLinearRegression()
61+
regression.setNoIntercept(noIntercept) // drop intercept in regression
62+
regression.newSampleData(trimY, predictors)
63+
val params = regression.estimateRegressionParameters()
64+
val (c, coeffs) = if (noIntercept) (0.0, params) else (params.head, params.tail)
65+
66+
new ARXModel(c, coeffs, yMaxLag, xMaxLag, includeOriginalX)
67+
}
68+
69+
70+
private[sparkts] def assemblePredictors(
71+
y: Array[Double],
72+
x: Array[Array[Double]],
73+
yMaxLag: Int,
74+
xMaxLag: Int,
75+
includeOriginalX: Boolean = true): Array[Array[Double]] = {
76+
val maxLag = max(yMaxLag, xMaxLag)
77+
// AR terms from dependent variable (autoregressive portion)
78+
val arY = Lag.lagMatTrimBoth(y, yMaxLag)
79+
// exogenous variables lagged as appropriate
80+
val laggedX = Lag.lagMatTrimBoth(x, xMaxLag)
81+
82+
// adjust difference in size for arY and laggedX so that they match up
83+
val arYAdj = arY.drop(maxLag - yMaxLag)
84+
85+
val laggedXAdj = laggedX.drop(maxLag - xMaxLag)
86+
87+
val trimmedX = if (includeOriginalX) x.drop(maxLag) else Array[Array[Double]]()
88+
89+
// combine matrices by concatenating column-wise
90+
Array(arYAdj, laggedXAdj, trimmedX).transpose.map(_.reduceLeft(_ ++_))
91+
}
92+
}
93+
94+
// Jose note: not extending timeseries model, since seems to me to be a different type of model
95+
// addingTimeDpendent...etc wouldn't apply here with the original signature, since we need
96+
// exogenous variables provided
97+
/**
98+
* An autoregressive model with exogenous variables
99+
* @param c an intercept term, zero if none desired
100+
* @param coefficients the coefficients for the various terms. The order of coefficients is as
101+
* follows:
102+
* - Autoregressive terms for the dependent variable, in increasing order of lag
103+
* - For each column in the exogenous matrix (in their original order), the
104+
* lagged terms in increasing order of lag (excluding the non-lagged versions).
105+
* - The coefficients associated with the non-lagged exogenous matrix
106+
* @param yMaxLag the maximum lag order for the dependent variable
107+
* @param xMaxLag the maximum lag order for exogenous variables
108+
* @param includesOriginalX a boolean flag indicating if the non-lagged exogenous variables should
109+
* be included
110+
*/
111+
class ARXModel(
112+
val c: Double,
113+
val coefficients: Array[Double],
114+
val yMaxLag: Int,
115+
val xMaxLag: Int,
116+
includesOriginalX: Boolean) {
117+
118+
119+
def predict(y: Vector[Double], x: Matrix[Double]): Vector[Double] = {
120+
val predictors = AutoregressionX.assemblePredictors(y.toArray, matToRowArrs(x), yMaxLag, xMaxLag, includesOriginalX)
121+
val results = DenseVector.zeros[Double](predictors.length)
122+
123+
for ((rowArray, rowIndex) <- predictors.zipWithIndex) {
124+
for ((value, colIndex) <- rowArray.zipWithIndex) {
125+
results(rowIndex) += value * coefficients(colIndex)
126+
}
127+
}
128+
129+
results
130+
}
131+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/**
2+
* Copyright (c) 2016, Cloudera, Inc. All Rights Reserved.
3+
*
4+
* Cloudera, Inc. licenses this file to you under the Apache License,
5+
* Version 2.0 (the "License"). You may not use this file except in
6+
* compliance with the License. You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11+
* CONDITIONS OF ANY KIND, either express or implied. See the License for
12+
* the specific language governing permissions and limitations under the
13+
* License.
14+
*/
15+
16+
package com.cloudera.sparkts.models
17+
18+
import breeze.linalg._
19+
20+
import org.apache.commons.math3.random.MersenneTwister
21+
import com.cloudera.sparkts.Lag
22+
import org.scalatest.FunSuite
23+
import org.scalatest.Matchers._
24+
25+
class AutoregressionXSuite extends FunSuite {
26+
val rand = new MersenneTwister(10L)
27+
val nRows = 1000
28+
val nCols = 2
29+
val X = Array.fill(nRows, nCols)(rand.nextGaussian())
30+
val intercept = rand.nextGaussian * 10
31+
32+
// tests an autoregressive model where the exogenous variables are not lagged
33+
test("fit ARX(1, 0, true)") {
34+
val xCoeffs = Array(0.8, 0.2)
35+
val rawY = X.map(_.zip(xCoeffs).map { case (b, v) => b * v }.sum + intercept)
36+
val arCoeff = 0.4
37+
val y = rawY.scanLeft(0.0) { case (priorY, currY) => currY + priorY * arCoeff }.tail
38+
val dy = new DenseVector(y)
39+
val dx = new DenseMatrix(rows = X.length, cols = X.head.length, data = X.transpose.flatten)
40+
val model = AutoregressionX.fitModel(dy, dx, 1, 0, includeOriginalX = true)
41+
val combinedCoeffs = Array(arCoeff) ++ xCoeffs
42+
43+
model.c should be (intercept +- 1e-4)
44+
for (i <- combinedCoeffs.indices) {
45+
model.coefficients(i) should be (combinedCoeffs(i) +- 1e-4)
46+
}
47+
}
48+
49+
// tests a model with no autoregressive term but with lagged exogenous variables
50+
test("fit ARX(0, 1, false) model") {
51+
val xCoeffs = Array(0.4, 0.15)
52+
val xLagged = Lag.lagMatTrimBoth(X, 1)
53+
val y = xLagged.map(_.zip(xCoeffs).map { case (b, v) => b * v }.sum + intercept)
54+
val dy = new DenseVector(Array(0.0) ++ y)
55+
// note that we provide the original X matrix to the fitting functiond
56+
val dx = new DenseMatrix(rows = X.length, cols = X.head.length, data = X.transpose.flatten)
57+
val model = AutoregressionX.fitModel(dy, dx, 0, 1, includeOriginalX = false)
58+
59+
model.c should be (intercept +- 1e-4)
60+
for (i <- xCoeffs.indices) {
61+
model.coefficients(i) should be (xCoeffs(i) +- 1e-4)
62+
}
63+
}
64+
65+
// this test simply reduces to a normal regression model
66+
test("fit ARX(0, 0, true) model") {
67+
// note that
68+
val xCoeffs = Array(0.8, 0.2)
69+
val y = X.map(_.zip(xCoeffs).map { case (b, v) => b * v }.sum + intercept)
70+
val dy = new DenseVector(y)
71+
val dx = new DenseMatrix(rows = X.length, cols = X.head.length, data = X.transpose.flatten)
72+
val model = AutoregressionX.fitModel(dy, dx, 0, 0, includeOriginalX = true)
73+
74+
model.c should be (intercept +- 1e-4)
75+
for (i <- xCoeffs.indices) {
76+
model.coefficients(i) should be (xCoeffs(i) +- 1e-4)
77+
}
78+
}
79+
80+
// tests a model with no autoregressive term but with lagged exogenous variables
81+
// of order 2 and inclusive of the original X values
82+
test("fit ARX(0, 2, true) model") {
83+
val xLagCoeffs = Array(0.4, 0.15, 0.2, 0.7)
84+
val xLagged = Lag.lagMatTrimBoth(X, 2)
85+
val yLaggedPart = xLagged.map(_.zip(xLagCoeffs).map { case (b, v) => b * v }.sum )
86+
val xNormalCoeffs = Array(0.3, 0.5)
87+
val yNormalPart = X.map(_.zip(xNormalCoeffs).map { case (b, v) => b * v }.sum )
88+
val y = yLaggedPart.zip(yNormalPart.drop(2)).map { case (l, n) => l + n + intercept }
89+
90+
val dy = new DenseVector(Array(0.0, 0.0) ++ y)
91+
val dx = new DenseMatrix(rows = X.length, cols = X.head.length, data = X.transpose.flatten)
92+
val model = AutoregressionX.fitModel(dy, dx, 0, 2, includeOriginalX = true)
93+
val combinedCoeffs = xLagCoeffs ++ xNormalCoeffs
94+
95+
model.c should be (intercept +- 1e-4)
96+
for (i <- combinedCoeffs.indices) {
97+
model.coefficients(i) should be (combinedCoeffs(i) +- 1e-4)
98+
}
99+
}
100+
101+
test("fit ARX(1, 1, false) model") {
102+
val xCoeffs = Array(0.8, 0.2)
103+
val xLagged = Lag.lagMatTrimBoth(X, 1)
104+
val rawY = xLagged.map(_.zip(xCoeffs).map { case (b, v) => b * v }.sum + intercept)
105+
val arCoeff = 0.4
106+
val y = rawY.scanLeft(0.0) { case (priorY, currY) => currY + priorY * arCoeff }.tail
107+
val dy = new DenseVector(Array(0.0) ++ y)
108+
val dx = new DenseMatrix(rows = X.length, cols = X.head.length, data = X.transpose.flatten)
109+
val model = AutoregressionX.fitModel(dy, dx, 1, 1, includeOriginalX = false)
110+
val combinedCoeffs = Array(arCoeff) ++ xCoeffs
111+
112+
model.c should be (intercept +- 1e-4)
113+
for (i <- combinedCoeffs.indices) {
114+
model.coefficients(i) should be (combinedCoeffs(i) +- 1e-4)
115+
}
116+
}
117+
118+
test("predict ARX model") {
119+
val c = 0
120+
val xCoeffs = Array(-1.136026484226831e-08, 8.637677568908233e-07, 15238.143039368977, -7.993535860373772e-09, -5.198597570089805e-07, 1.5691547009557947e-08, 7.409621376205488e-08)
121+
val yMaxLag = 0
122+
val xMaxLag = 0
123+
val arxModel = new ARXModel(c, xCoeffs, yMaxLag, xMaxLag, includesOriginalX = true)
124+
125+
val y = new DenseVector(Array(100.0))
126+
val x = new DenseMatrix(rows = 1, cols = 7, data = Array(465,1,0.006562479,24,1,0,51))
127+
128+
val results = arxModel.predict(y, x)
129+
results.length should be (1)
130+
results(0) should be (y(0) +- 1e-4)
131+
}
132+
}
133+

0 commit comments

Comments
 (0)