Skip to content

Commit 0ba37ee

Browse files
author
Justin Yip
committed
First commit
0 parents  commit 0ba37ee

File tree

10 files changed

+437
-0
lines changed

10 files changed

+437
-0
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
manifest.json
2+
target/
3+
pio.log
4+
/pio.sbt

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Event Distribution Checker
2+
3+
It prints out the landscape of an appid.
4+
5+
## To run
6+
7+
edit engine.json. 2 key fields.
8+
9+
- appId: the appId
10+
- sample: the # of event to be printed for each key. can be empty.
11+
12+
```
13+
$ pio build
14+
$ pio train
15+
...
16+
[INFO] [BasicChecker] Event (Time, Name, EntityType, TargetEntityType) Distribution
17+
[INFO] [AggEvent$] [1 x 4]
18+
Count Freq CumCount CumFreq
19+
-------- ------ -------- -------
20+
(2015-03,$set,user,None) -> 153.0000 1.0000 153.0000 1.0000
21+
...
22+
[INFO] [CoreWorkflow$] Training interrupted by io.prediction.workflow.StopAfterReadInterruption.
23+
```
24+

build.sbt

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import AssemblyKeys._
2+
3+
assemblySettings
4+
5+
name := "template-scala-parallel-vanilla"
6+
7+
organization := "io.prediction"
8+
9+
libraryDependencies ++= Seq(
10+
"io.prediction" %% "core" % pioVersion.value % "provided",
11+
"org.apache.spark" %% "spark-core" % "1.2.0" % "provided",
12+
"org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided",
13+
"org.scala-saddle" %% "saddle-core" % "1.3.+")
14+
15+
resolvers ++= Seq(
16+
"Sonatype Snapshots" at
17+
"http://oss.sonatype.org/content/repositories/snapshots",
18+
"Sonatype Releases" at
19+
"http://oss.sonatype.org/content/repositories/releases"
20+
)
21+

engine.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"id": "default",
3+
"description": "Default settings",
4+
"engineFactory": "io.prediction.e2.eventdistributionchecker.VanillaEngine",
5+
"datasource": {
6+
"params" : {
7+
"appId": 11,
8+
"sample": 0
9+
}
10+
}
11+
}

src/main/scala/Algorithm.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.prediction.e2.eventdistributionchecker
2+
3+
import io.prediction.controller.P2LAlgorithm
4+
import io.prediction.controller.Params
5+
6+
import org.apache.spark.SparkContext
7+
import org.apache.spark.SparkContext._
8+
import org.apache.spark.rdd.RDD
9+
10+
import grizzled.slf4j.Logger
11+
12+
//case class AlgorithmParams(mult: Int) extends Params
13+
14+
//class Algorithm(val ap: AlgorithmParams)
15+
class Algorithm
16+
// extends PAlgorithm if Model contains RDD[]
17+
extends P2LAlgorithm[PreparedData, Model, Query, PredictedResult] {
18+
19+
@transient lazy val logger = Logger[this.type]
20+
21+
def train(sc: SparkContext, data: PreparedData): Model = {
22+
// Simply count number of events
23+
// and multiple it by the algorithm parameter
24+
// and store the number as model
25+
val count = data.events.count().toInt
26+
new Model(mc = count)
27+
}
28+
29+
def predict(model: Model, query: Query): PredictedResult = {
30+
// Prefix the query with the model data
31+
val result = s"${model.mc}-${query.q}"
32+
PredictedResult(p = result)
33+
}
34+
}
35+
36+
class Model(val mc: Int) extends Serializable {
37+
override def toString = s"mc=${mc}"
38+
}

src/main/scala/DataSource.scala

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package io.prediction.e2.eventdistributionchecker
2+
3+
import io.prediction.controller.PDataSource
4+
import io.prediction.controller.EmptyEvaluationInfo
5+
import io.prediction.controller.EmptyActualResult
6+
import io.prediction.controller.Params
7+
import io.prediction.data.storage.Event
8+
import io.prediction.data.storage.Storage
9+
import io.prediction.workflow.StopAfterReadInterruption
10+
11+
import org.apache.spark.SparkContext
12+
import org.apache.spark.SparkContext._
13+
import org.apache.spark.rdd.RDD
14+
import org.apache.spark.FutureAction
15+
import org.apache.spark.rdd.AsyncRDDActions
16+
import org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER
17+
import org.apache.spark.storage.StorageLevel.MEMORY_ONLY
18+
import org.apache.spark.Accumulator
19+
import org.apache.spark.Accumulable
20+
21+
import grizzled.slf4j.Logger
22+
import scala.concurrent.ExecutionContext.Implicits.global
23+
import scala.concurrent.Await
24+
import scala.concurrent.duration._
25+
26+
import com.github.nscala_time.time.Imports._
27+
import org.apache.spark.mllib.linalg.Vector
28+
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
29+
import org.apache.spark.mllib.linalg.Vectors
30+
import org.joda.time.Days
31+
import org.joda.time.Hours
32+
33+
34+
case class DataSourceParams(
35+
appId: Int,
36+
sample: Option[Int]
37+
) extends Params
38+
39+
40+
abstract class AbstractChecker extends Serializable {
41+
def check(): Unit
42+
}
43+
44+
class BasicChecker(ds: DataSource, eventsRDD: RDD[Event], sample: Option[Int])
45+
extends AbstractChecker {
46+
@transient lazy val logger = Logger[this.type]
47+
48+
import io.prediction.e2.eventdistributionchecker.AggEvent.rddToEventRDD
49+
import io.prediction.e2.eventdistributionchecker.AggEvent.rddToKeyedEventRDD
50+
51+
val _sample = sample.getOrElse(0)
52+
53+
val fTypeDist: FutureAction[Seq[((String, Option[String]), AggEvent.Result)]] =
54+
eventsRDD
55+
.map(e => ((e.entityType, e.targetEntityType), e))
56+
.countAndSample(_sample)
57+
.collectAsync()
58+
59+
val fEventTimeDist = eventsRDD
60+
.map{ e => (e.eventTime.toString("yyyy-MM"), e) }
61+
.countAndSample(_sample)
62+
.collectAsync()
63+
64+
val fEventTimeNameDist = eventsRDD
65+
.map{ e => ((e.eventTime.toString("yyyy-MM"), e.event), e) }
66+
.countAndSample(_sample)
67+
.collectAsync()
68+
69+
val fEventTimeNameTypeDist = eventsRDD
70+
.map{ e =>
71+
((e.eventTime.toString("yyyy-MM"), e.event, e.entityType, e.targetEntityType), e)
72+
}
73+
.countAndSample(_sample)
74+
.collectAsync()
75+
76+
val fTypeDistinctId = EventUtils.distinctEntityId(
77+
eventsRDD, _.entityType).collectAsync()
78+
79+
def check(): Unit = {
80+
logger.info("Entity Type Distribution")
81+
AggEvent.print(fTypeDist.get(), true)
82+
83+
84+
logger.info("Event Time Distribution")
85+
AggEvent.print(fEventTimeDist.get(), true)
86+
87+
logger.info("Event (Time, Name, EntityType, TargetEntityType) Distribution")
88+
AggEvent.print(fEventTimeNameTypeDist.get(), true)
89+
90+
logger.info("Event (EntityType) Distinct Id")
91+
val typeDistinctIdStats = EventUtils.distStats(fTypeDistinctId.get(), true)
92+
logger.info(typeDistinctIdStats)
93+
}
94+
}
95+
96+
97+
class DataSource(val dsp: DataSourceParams)
98+
extends PDataSource[TrainingData,
99+
EmptyEvaluationInfo, Query, EmptyActualResult] {
100+
101+
@transient lazy val logger = Logger[this.type]
102+
@transient lazy val eventsDb = Storage.getPEvents()
103+
104+
override
105+
def readTraining(sc: SparkContext): TrainingData = {
106+
val eventsRDD: RDD[Event] = eventsDb.find(appId = dsp.appId)(sc)
107+
.setName("EventsRDD")
108+
.cache
109+
110+
logger.info(s"EventsCount: ${eventsRDD.count}")
111+
112+
val checkers = Seq[AbstractChecker](
113+
new BasicChecker(this, eventsRDD, sample=dsp.sample)
114+
)
115+
116+
checkers.foreach(_.check)
117+
118+
throw new StopAfterReadInterruption()
119+
}
120+
}
121+
122+
class TrainingData(
123+
val events: RDD[Event]
124+
) extends Serializable {
125+
override def toString = {
126+
s"events: [${events.count()}] (${events.take(2).toList}...)"
127+
}
128+
}

src/main/scala/Engine.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.prediction.e2.eventdistributionchecker
2+
3+
import io.prediction.controller.IEngineFactory
4+
import io.prediction.controller.Engine
5+
6+
case class Query(q: String) extends Serializable
7+
8+
case class PredictedResult(p: String) extends Serializable
9+
10+
object VanillaEngine extends IEngineFactory {
11+
def apply() = {
12+
new Engine(
13+
classOf[DataSource],
14+
classOf[Preparator],
15+
Map("" -> classOf[Algorithm]),
16+
classOf[Serving])
17+
}
18+
}

0 commit comments

Comments
 (0)