Skip to content

Commit e2e34ed

Browse files
author
Kunal Kanojia
committed
Added persistence query
1 parent 71bb69c commit e2e34ed

File tree

11 files changed

+82
-68
lines changed

11 files changed

+82
-68
lines changed

app/com/kkanojia/example/actors/TaggingEventAdapter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ class TaggingEventAdapter extends WriteEventAdapter {
88

99
override def toJournal(event: Any): Any = event match {
1010
case e: UserCreated => Tagged(event, Set("user-events"))
11-
case e: TradeCreated => Tagged(event, Set("trade-events"))
12-
case e: TradeUpdated => Tagged(event, Set("trade-events"))
11+
case TradeCreated(userId, _) => Tagged(event, Set(userId, "trade-events"))
12+
case TradeUpdated(userId, _) => Tagged(event, Set(userId, "trade-events"))
1313
case _ => event
1414
}
1515

app/com/kkanojia/example/actors/TradeActor.scala

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@ object TradeActor {
2222
case class UpdateTradeFailure(cause: Throwable)
2323

2424
//Events
25-
case class TradeCreated(trade: Trade) extends TaggedEvent
25+
case class TradeCreated(userId: String, trade: Trade) extends TaggedEvent
2626

27-
case class TradeUpdated(trade: Trade) extends TaggedEvent
27+
case class TradeUpdated(userId: String, trade: Trade) extends TaggedEvent
2828

2929
}
3030

31-
class TradeActor(val id: String
32-
) extends PersistentActor {
31+
class TradeActor(val id: String, userId: String) extends PersistentActor {
3332

3433
override def persistenceId = id
3534

@@ -40,21 +39,27 @@ class TradeActor(val id: String
4039
override def receiveCommand: Receive = {
4140

4241
case CreateTrade(trade) =>
43-
persist(TradeCreated(trade)) {
44-
evt => sender() ! CreateTradeSuccess(trade)
42+
persist(TradeCreated(userId, trade)) {
43+
evt => {
44+
tradeOpt = Some(trade)
45+
sender() ! CreateTradeSuccess(trade)
46+
}
4547
}
4648

4749
case UpdateTrade(trade) =>
48-
persist(TradeUpdated(trade)) {
49-
evt => sender() ! UpdateTradeSuccess(trade)
50+
persist(TradeUpdated(userId, trade)) {
51+
evt => {
52+
tradeOpt = Some(trade)
53+
sender() ! UpdateTradeSuccess(trade)
54+
}
5055
}
5156
}
5257

5358
override def receiveRecover: Receive = {
54-
case TradeCreated(trade) =>
59+
case TradeCreated(usrId, trade) =>
5560
tradeOpt = Some(trade)
5661

57-
case TradeUpdated(trade) =>
62+
case TradeUpdated(usrId, trade) =>
5863
tradeOpt = Some(trade)
5964
}
6065
}

app/com/kkanojia/example/actors/TradeAggregateViewActor.scala

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@ package com.kkanojia.example.actors
33
import scala.collection.immutable.HashSet
44
import scala.collection.mutable
55

6-
import akka.actor.{Actor, ActorRef, Stash}
6+
import akka.NotUsed
7+
import akka.actor.{Actor, ActorLogging, ActorRef, Stash}
8+
import akka.persistence.query.{EventEnvelope, PersistenceQuery}
9+
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
10+
import akka.stream.ActorMaterializer
11+
import akka.stream.scaladsl.Source
712
import com.kkanojia.example.actors.TradeActor.{TradeCreated, TradeUpdated}
813
import com.kkanojia.example.actors.TradeAggregateViewActor.{UnWatchTrades, WatchTrades}
914
import com.kkanojia.example.models.Trade
@@ -19,13 +24,29 @@ object TradeAggregateViewActor {
1924

2025
}
2126

22-
class TradeAggregateViewActor(val id: String
23-
) extends Actor {
27+
class TradeAggregateViewActor(val id: String) extends Actor {
2428

2529
protected[this] var watchers = HashSet.empty[ActorRef]
26-
2730
private val trades = mutable.Map[String, Trade]()
2831

32+
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
33+
val src: Source[EventEnvelope, NotUsed] = queries.eventsByTag("trade-events", 0L)
34+
implicit val mat = ActorMaterializer()
35+
36+
src.runForeach { env =>
37+
env.event match {
38+
case TradeCreated(userId, trade) =>
39+
trades(trade.id) = trade
40+
watchers.foreach(_ ! TradeCreated(userId, trade))
41+
42+
case TradeUpdated(userId, trade) =>
43+
trades(trade.id) = trade
44+
watchers.foreach(_ ! TradeUpdated(userId, trade))
45+
46+
case _ => println(s"Unknown event $env")
47+
}
48+
}
49+
2950
override def receive: Receive = {
3051

3152
case WatchTrades =>
@@ -36,14 +57,4 @@ class TradeAggregateViewActor(val id: String
3657

3758
}
3859

39-
def onEvent: Receive = {
40-
case TradeCreated(trade) =>
41-
trades(trade.id) = trade
42-
watchers.foreach(_ ! TradeCreated(trade))
43-
44-
case TradeUpdated(trade) =>
45-
trades(trade.id) = trade
46-
watchers.foreach(_ ! TradeUpdated(trade))
47-
48-
}
4960
}

app/com/kkanojia/example/actors/TradeManager.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import akka.persistence.query.{EventEnvelope, PersistenceQuery}
88
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
99
import akka.stream.ActorMaterializer
1010
import akka.stream.scaladsl.Source
11-
import com.kkanojia.example.actors.UserActor.UserCreated
1211
import com.kkanojia.example.models.Trade
1312

1413
object TradeManager {
@@ -25,25 +24,24 @@ object TradeManager {
2524

2625
}
2726

28-
class TradeManager(val id: String) extends Actor {
27+
class TradeManager(val id: String, userId: String) extends Actor {
2928

3029
import TradeActor._
3130
import TradeManager._
3231

3332
private val userTrades = mutable.Map[String, Trade]()
3433

35-
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](
36-
LeveldbReadJournal.Identifier)
34+
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
3735

38-
val src: Source[EventEnvelope, NotUsed] = queries.eventsByTag("trade-events", 0L)
36+
val src: Source[EventEnvelope, NotUsed] = queries.eventsByTag(userId, 0L)
3937

4038
implicit val mat = ActorMaterializer()
4139
src.runForeach { env =>
4240
env.event match {
43-
case TradeCreated(trade) =>
41+
case TradeCreated(_, trade) =>
4442
userTrades(trade.id) = trade
4543

46-
case TradeUpdated(trade) =>
44+
case TradeUpdated(_, trade) =>
4745
userTrades(trade.id) = trade
4846

4947
case _ => println(s"Unknown event $env")
@@ -69,7 +67,7 @@ class TradeManager(val id: String) extends Actor {
6967
val name = s"trade_$tradeId"
7068
context.child(name) match {
7169
case Some(actorRef) => actorRef
72-
case None => context.actorOf(Props(new TradeActor(tradeId)), name)
70+
case None => context.actorOf(Props(new TradeActor(tradeId, userId)), name)
7371
}
7472
}
7573
}

app/com/kkanojia/example/actors/UserActor.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ class UserActor(override val persistenceId: String) extends PersistentActor {
4444

4545
case CreateUser(user) =>
4646
persist(UserCreated(user)) {
47-
event => sender() ! UserCreationSuccess(user)
47+
event =>
48+
updateState(user)
49+
sender() ! UserCreationSuccess(user)
4850
}
4951

5052
case GetUser =>
@@ -56,9 +58,12 @@ class UserActor(override val persistenceId: String) extends PersistentActor {
5658

5759
override def receiveRecover: Receive = {
5860
case UserCreated(user) =>
59-
userOpt = Some(user)
60-
val name = s"tm_${user.id}"
61-
context.actorOf(Props(new TradeManager(name)), name)
61+
updateState(user)
6262
}
6363

64+
private def updateState(user: User): Unit = {
65+
userOpt = Some(user)
66+
val name = s"tm_${user.id}"
67+
context.actorOf(Props(new TradeManager(name, user.id)), name)
68+
}
6469
}

app/com/kkanojia/example/actors/UserManager.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,28 +27,26 @@ class UserManager(val id: String) extends Actor with ActorLogging {
2727

2828
private val usersInSystem = mutable.Map[String, String]() //email -> UUID
2929

30-
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](
31-
LeveldbReadJournal.Identifier)
32-
30+
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
3331
val src: Source[EventEnvelope, NotUsed] = queries.eventsByTag("user-events", 0L)
34-
3532
implicit val mat = ActorMaterializer()
33+
3634
src.runForeach { env =>
3735
env.event match {
3836
case UserCreated(user) => {
39-
println("received" + user)
4037
usersInSystem(user.email) = user.id
4138
}
4239
case _ => println(s"Unknown event $env")
4340
}}
4441

45-
override def receive: Receive = LoggingReceive {
42+
override def receive: Receive =
43+
LoggingReceive {
4644

4745
case CreateUser(user) =>
4846
if (usersInSystem.contains(user.email))
4947
sender() ! UserCreationFailed(UserPresentException)
5048
else
51-
getUserActor(user.id.toString) forward CreateUser(user)
49+
getUserActor(user.id) forward CreateUser(user)
5250

5351
case RetrieveUser(email: String) =>
5452
usersInSystem.get(email) match {

app/com/kkanojia/example/actors/WSUserActor.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ class WSUserActor @Inject()(@Assisted out: ActorRef,
3030

3131
override def receive: Receive = LoggingReceive {
3232

33-
case TradeCreated(trade) =>
34-
val tradeCreateMessage = write(trade)
33+
case TradeCreated(userId, trade) =>
34+
val tradeCreateMessage = write(TradeCreated(userId, trade))
3535
out ! tradeCreateMessage
3636

37-
case TradeUpdated(trade) =>
38-
val tradeUpdateMessage = write(trade)
37+
case TradeUpdated(userId, trade) =>
38+
val tradeUpdateMessage = write(TradeUpdated(userId, trade))
3939
out ! tradeUpdateMessage
4040

4141
}

app/controllers/TradeWSController.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class TradeWSController @Inject()(@Named("userParentActor") userParentActor: Act
3636
* @return a fully realized websocket.
3737
*/
3838
def ws: WebSocket = WebSocket.acceptOrResult[String, String] {
39-
case rh if sameOriginCheck(rh) =>
39+
case rh => //if sameOriginCheck(rh) =>
4040
wsFutureFlow(rh).map { flow =>
4141
Right(flow)
4242
}.recover {

test/com/kkanojia/example/actors/TradeActorSpec.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.kkanojia.example.actors
22

3+
import java.util.UUID
4+
35
import akka.actor.{ActorSystem, Props}
46
import akka.testkit.{ImplicitSender, TestKit}
57
import com.kkanojia.example.actors.TradeActor._
@@ -18,7 +20,7 @@ class TradeActorSpec(_system: ActorSystem) extends TestKit(_system) with Implici
1820
"be able to create a trade when called with `CreateTrade`" in {
1921
//Arrange
2022
val trade = Trade(tradeDate = DateTime.now, buySell = "B", assetId = 1, quantity = 100, price = 20.2)
21-
val tradeActor = system.actorOf(Props(new TradeActor(trade.id)))
23+
val tradeActor = system.actorOf(Props(new TradeActor(trade.id, UUID.randomUUID().toString)))
2224

2325
//Act
2426
tradeActor ! CreateTrade(trade)
@@ -35,7 +37,7 @@ class TradeActorSpec(_system: ActorSystem) extends TestKit(_system) with Implici
3537
"be able to update a trade when called with `UpdateTrade`" in {
3638
//Arrange
3739
val trade = Trade(tradeDate = DateTime.now, buySell = "B", assetId = 1, quantity = 100, price = 20.2)
38-
val tradeActor = system.actorOf(Props(new TradeActor(trade.id)))
40+
val tradeActor = system.actorOf(Props(new TradeActor(trade.id, UUID.randomUUID().toString)))
3941
tradeActor ! CreateTrade(trade);
4042
expectMsgType[CreateTradeSuccess]
4143

test/com/kkanojia/example/actors/TradeAggregateViewActorSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class TradeAggregateViewActorSpec(_system: ActorSystem) extends TestKit(_system)
2929
aggregateActor.tell(WatchTrades, wSUserActor)
3030

3131
val trade = Trade(tradeDate = DateTime.now, buySell = "B", assetId = 1, quantity = 100, price = 20.2)
32-
val tradeActor = system.actorOf(Props(new TradeActor(trade.id)))
32+
val tradeActor = system.actorOf(Props(new TradeActor(trade.id, UUID.randomUUID().toString)))
3333

3434
//Act
3535
tradeActor ! CreateTrade(trade);
@@ -48,7 +48,7 @@ class TradeAggregateViewActorSpec(_system: ActorSystem) extends TestKit(_system)
4848
val wSUserActor = system.actorOf(Props(new ProbeWrapper(probe)))
4949
aggregateActor.tell(WatchTrades, wSUserActor)
5050
val trade = Trade(tradeDate = DateTime.now, buySell = "B", assetId = 1, quantity = 100, price = 20.2)
51-
val tradeActor = system.actorOf(Props(new TradeActor(trade.id)))
51+
val tradeActor = system.actorOf(Props(new TradeActor(trade.id, UUID.randomUUID().toString)))
5252

5353
//Act
5454
tradeActor ! UpdateTrade(trade);

0 commit comments

Comments
 (0)