Skip to content

Commit 71bb69c

Browse files
author
Kunal Kanojia
committed
Forked source from eventuate sample
0 parents  commit 71bb69c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2585
-0
lines changed

.gitignore

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
logs
2+
target
3+
/.idea
4+
/.idea_modules
5+
/.classpath
6+
/.project
7+
/.settings
8+
/RUNNING_PID
9+
/.sbtserver

.travis.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
language: scala
2+
jdk: oraclejdk8
3+
scala:
4+
- 2.11.8
5+
6+
script: sbt ++$TRAVIS_SCALA_VERSION clean test

README.md

Whitespace-only changes.

app/Filters.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import javax.inject._
2+
import play.api._
3+
import play.api.http.HttpFilters
4+
import play.api.mvc._
5+
6+
import filters.ExampleFilter
7+
8+
/**
9+
* This class configures filters that run on every request. This
10+
* class is queried by Play to get a list of filters.
11+
*
12+
* Play will automatically use filters from any class called
13+
* `Filters` that is placed the root package. You can load filters
14+
* from a different class by adding a `play.http.filters` setting to
15+
* the `application.conf` configuration file.
16+
*
17+
* @param env Basic environment settings for the current application.
18+
* @param exampleFilter A demonstration filter that adds a header to
19+
* each response.
20+
*/
21+
@Singleton
22+
class Filters @Inject() (
23+
env: Environment,
24+
exampleFilter: ExampleFilter) extends HttpFilters {
25+
26+
override val filters = {
27+
// Use the example filter if we're running development mode. If
28+
// we're running in production or test mode then don't use any
29+
// filters at all.
30+
if (env.mode == Mode.Dev) Seq(exampleFilter) else Seq.empty
31+
}
32+
33+
}

app/Module.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import java.time.Clock
2+
3+
import com.google.inject.AbstractModule
4+
import com.kkanojia.example.actors.{WSUserActor, WSUserParentActor}
5+
import com.kkanojia.example.modules.ActorSystemInitializer
6+
7+
import play.api.libs.concurrent.AkkaGuiceSupport
8+
9+
import services.ApplicationTimer
10+
11+
/**
12+
* This class is a Guice module that tells Guice how to bind several
13+
* different types. This Guice module is created when the Play
14+
* application starts.
15+
*
16+
* Play will automatically use any class called `Module` that is in
17+
* the root package. You can create modules in other locations by
18+
* adding `play.modules.enabled` settings to the `application.conf`
19+
* configuration file.
20+
*/
21+
class Module extends AbstractModule with AkkaGuiceSupport {
22+
23+
override def configure() = {
24+
bind(classOf[ActorSystemInitializer]).asEagerSingleton()
25+
26+
bind(classOf[Clock]).toInstance(Clock.systemDefaultZone)
27+
bind(classOf[ApplicationTimer]).asEagerSingleton()
28+
29+
bindActor[WSUserParentActor]("userParentActor")
30+
bindActorFactory[WSUserActor, WSUserActor.Factory]
31+
}
32+
33+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package com.kkanojia.example.actors
2+
3+
trait TaggedEvent
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.kkanojia.example.actors
2+
3+
import akka.persistence.journal.{Tagged, WriteEventAdapter}
4+
import com.kkanojia.example.actors.TradeActor.{TradeCreated, TradeUpdated}
5+
import com.kkanojia.example.actors.UserActor.UserCreated
6+
7+
class TaggingEventAdapter extends WriteEventAdapter {
8+
9+
override def toJournal(event: Any): Any = event match {
10+
case e: UserCreated => Tagged(event, Set("user-events"))
11+
case e: TradeCreated => Tagged(event, Set("trade-events"))
12+
case e: TradeUpdated => Tagged(event, Set("trade-events"))
13+
case _ => event
14+
}
15+
16+
override def manifest(event: Any): String = ""
17+
18+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.kkanojia.example.actors
2+
3+
import akka.persistence.PersistentActor
4+
import akka.persistence.journal.leveldb.SharedLeveldbJournal
5+
import com.kkanojia.example.actors.TradeActor._
6+
import com.kkanojia.example.models.Trade
7+
8+
object TradeActor {
9+
10+
//Commands
11+
case class CreateTrade(trade: Trade)
12+
13+
case class UpdateTrade(trade: Trade)
14+
15+
//Replies
16+
case class CreateTradeSuccess(trade: Trade)
17+
18+
case class CreateTradeFailure(cause: Throwable)
19+
20+
case class UpdateTradeSuccess(trade: Trade)
21+
22+
case class UpdateTradeFailure(cause: Throwable)
23+
24+
//Events
25+
case class TradeCreated(trade: Trade) extends TaggedEvent
26+
27+
case class TradeUpdated(trade: Trade) extends TaggedEvent
28+
29+
}
30+
31+
class TradeActor(val id: String
32+
) extends PersistentActor {
33+
34+
override def persistenceId = id
35+
36+
private var tradeOpt: Option[Trade] = None
37+
38+
SharedLeveldbJournal.setStore(context.self, context.system)
39+
40+
override def receiveCommand: Receive = {
41+
42+
case CreateTrade(trade) =>
43+
persist(TradeCreated(trade)) {
44+
evt => sender() ! CreateTradeSuccess(trade)
45+
}
46+
47+
case UpdateTrade(trade) =>
48+
persist(TradeUpdated(trade)) {
49+
evt => sender() ! UpdateTradeSuccess(trade)
50+
}
51+
}
52+
53+
override def receiveRecover: Receive = {
54+
case TradeCreated(trade) =>
55+
tradeOpt = Some(trade)
56+
57+
case TradeUpdated(trade) =>
58+
tradeOpt = Some(trade)
59+
}
60+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.kkanojia.example.actors
2+
3+
import scala.collection.immutable.HashSet
4+
import scala.collection.mutable
5+
6+
import akka.actor.{Actor, ActorRef, Stash}
7+
import com.kkanojia.example.actors.TradeActor.{TradeCreated, TradeUpdated}
8+
import com.kkanojia.example.actors.TradeAggregateViewActor.{UnWatchTrades, WatchTrades}
9+
import com.kkanojia.example.models.Trade
10+
11+
object TradeAggregateViewActor {
12+
13+
val ID = "464788cb-58aa-4dc6-8dce-703a456c238a"
14+
val NAME = "trade_view_aggregate"
15+
16+
case object WatchTrades
17+
18+
case object UnWatchTrades
19+
20+
}
21+
22+
class TradeAggregateViewActor(val id: String
23+
) extends Actor {
24+
25+
protected[this] var watchers = HashSet.empty[ActorRef]
26+
27+
private val trades = mutable.Map[String, Trade]()
28+
29+
override def receive: Receive = {
30+
31+
case WatchTrades =>
32+
watchers = watchers + sender
33+
34+
case UnWatchTrades =>
35+
watchers = watchers - sender
36+
37+
}
38+
39+
def onEvent: Receive = {
40+
case TradeCreated(trade) =>
41+
trades(trade.id) = trade
42+
watchers.foreach(_ ! TradeCreated(trade))
43+
44+
case TradeUpdated(trade) =>
45+
trades(trade.id) = trade
46+
watchers.foreach(_ ! TradeUpdated(trade))
47+
48+
}
49+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com.kkanojia.example.actors
2+
3+
import scala.collection.mutable
4+
5+
import akka.NotUsed
6+
import akka.actor.{Actor, ActorRef, Props}
7+
import akka.persistence.query.{EventEnvelope, PersistenceQuery}
8+
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
9+
import akka.stream.ActorMaterializer
10+
import akka.stream.scaladsl.Source
11+
import com.kkanojia.example.actors.UserActor.UserCreated
12+
import com.kkanojia.example.models.Trade
13+
14+
object TradeManager {
15+
16+
//Command
17+
case object RetrieveTrades
18+
19+
case class FindTrade(tradeId: String)
20+
21+
//Replies
22+
case class RetrieveTradesSuccess(trades: Seq[Trade])
23+
24+
case class FindTradeSuccess(tradeOpt: Option[Trade])
25+
26+
}
27+
28+
class TradeManager(val id: String) extends Actor {
29+
30+
import TradeActor._
31+
import TradeManager._
32+
33+
private val userTrades = mutable.Map[String, Trade]()
34+
35+
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](
36+
LeveldbReadJournal.Identifier)
37+
38+
val src: Source[EventEnvelope, NotUsed] = queries.eventsByTag("trade-events", 0L)
39+
40+
implicit val mat = ActorMaterializer()
41+
src.runForeach { env =>
42+
env.event match {
43+
case TradeCreated(trade) =>
44+
userTrades(trade.id) = trade
45+
46+
case TradeUpdated(trade) =>
47+
userTrades(trade.id) = trade
48+
49+
case _ => println(s"Unknown event $env")
50+
}}
51+
52+
override def receive: Receive = {
53+
54+
case CreateTrade(trade) =>
55+
getTradeActor(trade.id) forward CreateTrade(trade)
56+
57+
case RetrieveTrades =>
58+
sender() ! RetrieveTradesSuccess(userTrades.values.toSeq)
59+
60+
case FindTrade(tradeId: String) =>
61+
sender() ! FindTradeSuccess(userTrades.get(tradeId))
62+
63+
case UpdateTrade(trade) =>
64+
getTradeActor(trade.id) forward UpdateTrade(trade)
65+
66+
}
67+
68+
private def getTradeActor(tradeId: String): ActorRef = {
69+
val name = s"trade_$tradeId"
70+
context.child(name) match {
71+
case Some(actorRef) => actorRef
72+
case None => context.actorOf(Props(new TradeActor(tradeId)), name)
73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)