diff --git a/build.sbt b/build.sbt index 00d80e7..a73c860 100644 --- a/build.sbt +++ b/build.sbt @@ -31,7 +31,7 @@ ThisBuild / organization := "app.softnetwork" name := "payment" -ThisBuild / version := "0.1.5.1" +ThisBuild / version := "0.2.0" ThisBuild / scalaVersion := "2.12.15" diff --git a/common/src/main/protobuf/message/payment/payment.proto b/common/src/main/protobuf/message/payment/payment.proto index 2e0358b..0f9a04e 100644 --- a/common/src/main/protobuf/message/payment/payment.proto +++ b/common/src/main/protobuf/message/payment/payment.proto @@ -5,7 +5,6 @@ import "google/protobuf/timestamp.proto"; import "model/payment/card.proto"; import "model/payment/document.proto"; import "model/payment/paymentUser.proto"; -import "message/schedule.proto"; package app.softnetwork.payment.message.PaymentEvents; @@ -16,14 +15,15 @@ option (scalapb.options) = { import: "app.softnetwork.persistence.model._" import: "app.softnetwork.protobuf.ScalaPBTypeMappers._" import: "app.softnetwork.serialization._" + import: "app.softnetwork.payment.message.PaymentMessages._" import: "app.softnetwork.payment.model._" import: "app.softnetwork.payment.serialization._" - import: "org.softnetwork.akka.message.SchedulerEvents.SchedulerEventWithCommand" - preamble: "trait PaymentEvent extends Event" - preamble: "trait PaymentAccountEvent extends PaymentEvent" - preamble: "trait PaymentCommandEvent extends PaymentEvent" - preamble: "trait WrapPaymentCommandEvent extends PaymentCommandEvent {def event: PaymentCommandEvent}" - preamble: "trait PaymentToSchedulerEvent extends SchedulerEventWithCommand with PaymentEvent" + import: "app.softnetwork.scheduler.message.SchedulerEvents.ExternalSchedulerEvent" + preamble: "trait PaymentEvent extends ExternalSchedulerEvent" + preamble: "sealed trait PaymentAccountEvent extends PaymentEvent" + preamble: "sealed trait PaymentCommandEvent extends PaymentEvent" + preamble: "trait ExternalPaymentEvent extends PaymentEvent" + preamble: "trait PaymentEventWithCommand extends ExternalPaymentEvent {def command: Option[PaymentCommandEvent]}" preserve_unknown_fields: false }; @@ -153,6 +153,24 @@ message BankAccountDeletedEvent { required google.protobuf.Timestamp lastUpdated = 2 [(scalapb.field).type = "java.util.Date"]; } +message ExternalEntityToPaymentEvent{ + option (scalapb.message).extends = "ProtobufEvent"; + option (scalapb.message).extends = "PaymentEventWithCommand"; + option (scalapb.message).extends = "ExternalEntityToPaymentEventDecorator"; + oneof wrapped { + PayInWithCardPreAuthorizedCommandEvent payInWithCardPreAuthorized = 1; + RefundCommandEvent refund = 2; + PayOutCommandEvent payOut = 3; + TransferCommandEvent transfer = 4; + DirectDebitCommandEvent directDebit = 5; + CreateOrUpdatePaymentAccountCommandEvent createOrUpdatePaymentAccount = 6; + RegisterRecurringPaymentCommandEvent registerRecurringPayment = 7; + CancelPreAuthorizationCommandEvent cancelPreAuthorization = 8; + LoadDirectDebitTransactionCommandEvent loadDirectDebitTransaction = 9; + CancelMandateCommandEvent cancelMandate = 10; + } +} + message PayInWithCardPreAuthorizedCommandEvent{ option (scalapb.message).extends = "ProtobufEvent"; option (scalapb.message).extends = "PaymentCommandEvent"; @@ -211,18 +229,6 @@ message CreateOrUpdatePaymentAccountCommandEvent{ required app.softnetwork.payment.model.PaymentAccount paymentAccount = 1; } -message ScheduleForPaymentAdded { - option (scalapb.message).extends = "ProtobufEvent"; - option (scalapb.message).extends = "PaymentToSchedulerEvent"; - required app.softnetwork.scheduler.message.AddSchedule command = 1; -} - -message ScheduleForPaymentRemoved { - option (scalapb.message).extends = "ProtobufEvent"; - option (scalapb.message).extends = "PaymentToSchedulerEvent"; - required app.softnetwork.scheduler.message.RemoveSchedule command = 1; -} - message RegisterRecurringPaymentCommandEvent { option (scalapb.message).extends = "ProtobufEvent"; option (scalapb.message).extends = "PaymentCommandEvent"; @@ -241,7 +247,7 @@ message RegisterRecurringPaymentCommandEvent { message RecurringPaymentRegisteredEvent { option (scalapb.message).extends = "ProtobufEvent"; - option (scalapb.message).extends = "PaymentCommandEvent"; + option (scalapb.message).extends = "PaymentAccountEvent"; option (scalapb.message).extends = "BroadcastEvent"; required string externalUuid = 1; required app.softnetwork.payment.model.RecurringPayment recurringPayment = 2; diff --git a/common/src/main/scala/app/softnetwork/payment/message/PaymentMessages.scala b/common/src/main/scala/app/softnetwork/payment/message/PaymentMessages.scala index 2a59c4d..850a86a 100644 --- a/common/src/main/scala/app/softnetwork/payment/message/PaymentMessages.scala +++ b/common/src/main/scala/app/softnetwork/payment/message/PaymentMessages.scala @@ -1,9 +1,14 @@ package app.softnetwork.payment.message import app.softnetwork.payment.annotation.InternalApi +import app.softnetwork.payment.message.PaymentEvents.{ + ExternalEntityToPaymentEvent, + PaymentCommandEvent, + PaymentEventWithCommand +} import app.softnetwork.payment.model._ import app.softnetwork.persistence.message.{Command, CommandResult, EntityCommand, ErrorMessage} -import org.softnetwork.akka.model.Schedule +import app.softnetwork.scheduler.model.Schedule import java.util.Date @@ -814,4 +819,22 @@ object PaymentMessages { ) extends PaymentError(s"NextRecurringPaymentFailed: $reason") case object Schedule4PaymentNotTriggered extends PaymentError("Schedule4PaymentNotTriggered") + + trait ExternalEntityToPaymentEventDecorator extends PaymentEventWithCommand { + _: ExternalEntityToPaymentEvent => + override def command: Option[PaymentCommandEvent] = + wrapped match { + case r: ExternalEntityToPaymentEvent.Wrapped.CreateOrUpdatePaymentAccount => Some(r.value) + case r: ExternalEntityToPaymentEvent.Wrapped.PayInWithCardPreAuthorized => Some(r.value) + case r: ExternalEntityToPaymentEvent.Wrapped.Refund => Some(r.value) + case r: ExternalEntityToPaymentEvent.Wrapped.PayOut => Some(r.value) + case r: ExternalEntityToPaymentEvent.Wrapped.Transfer => Some(r.value) + case r: ExternalEntityToPaymentEvent.Wrapped.DirectDebit => Some(r.value) + case r: ExternalEntityToPaymentEvent.Wrapped.LoadDirectDebitTransaction => Some(r.value) + case r: ExternalEntityToPaymentEvent.Wrapped.RegisterRecurringPayment => Some(r.value) + case r: ExternalEntityToPaymentEvent.Wrapped.CancelPreAuthorization => Some(r.value) + case r: ExternalEntityToPaymentEvent.Wrapped.CancelMandate => Some(r.value) + case _ => None + } + } } diff --git a/core/build.sbt b/core/build.sbt index 1241e52..d1bc97c 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -5,7 +5,6 @@ organization := "app.softnetwork.payment" name := "payment-core" libraryDependencies ++= Seq( - "app.softnetwork.scheduler" %% "scheduler-core" % Versions.scheduler, "app.softnetwork.persistence" %% "persistence-kv" % Versions.genericPersistence, "app.softnetwork.session" %% "session-core" % Versions.session ) diff --git a/core/src/main/scala/app/softnetwork/payment/launch/PaymentApplication.scala b/core/src/main/scala/app/softnetwork/payment/launch/PaymentApplication.scala index 26f47ba..0e00604 100644 --- a/core/src/main/scala/app/softnetwork/payment/launch/PaymentApplication.scala +++ b/core/src/main/scala/app/softnetwork/payment/launch/PaymentApplication.scala @@ -1,11 +1,6 @@ package app.softnetwork.payment.launch -import akka.actor.typed.ActorSystem import app.softnetwork.api.server.launch.Application import app.softnetwork.persistence.query.SchemaProvider -trait PaymentApplication extends Application with PaymentRoutes { _: SchemaProvider => - override def initSystem: ActorSystem[_] => Unit = system => { - initSchedulerSystem(system) - } -} +trait PaymentApplication extends Application with PaymentRoutes { _: SchemaProvider => } diff --git a/core/src/main/scala/app/softnetwork/payment/launch/PaymentGuardian.scala b/core/src/main/scala/app/softnetwork/payment/launch/PaymentGuardian.scala index 9de26d0..447baf3 100644 --- a/core/src/main/scala/app/softnetwork/payment/launch/PaymentGuardian.scala +++ b/core/src/main/scala/app/softnetwork/payment/launch/PaymentGuardian.scala @@ -10,11 +10,9 @@ import app.softnetwork.payment.persistence.typed.GenericPaymentBehavior import app.softnetwork.persistence.launch.PersistentEntity import app.softnetwork.persistence.query.{EventProcessorStream, SchemaProvider} import app.softnetwork.persistence.typed.Singleton -import app.softnetwork.scheduler.launch.SchedulerGuardian -import app.softnetwork.scheduler.persistence.query.Scheduler2EntityProcessorStream import app.softnetwork.session.launch.SessionGuardian -trait PaymentGuardian extends SchedulerGuardian with SessionGuardian { _: SchemaProvider => +trait PaymentGuardian extends SessionGuardian { _: SchemaProvider => import app.softnetwork.persistence.launch.PersistenceGuardian._ @@ -28,7 +26,7 @@ trait PaymentGuardian extends SchedulerGuardian with SessionGuardian { _: Schema /** initialize all entities */ override def entities: ActorSystem[_] => Seq[PersistentEntity[_, _, _, _]] = sys => - schedulerEntities(sys) ++ sessionEntities(sys) ++ paymentEntities(sys) + sessionEntities(sys) ++ paymentEntities(sys) /** initialize all singletons */ @@ -36,18 +34,14 @@ trait PaymentGuardian extends SchedulerGuardian with SessionGuardian { _: Schema def paymentCommandProcessorStream: ActorSystem[_] => GenericPaymentCommandProcessorStream - def paymentEventProcessorStreams: ActorSystem[_] => Seq[EventProcessorStream[_]] = sys => - Seq(paymentCommandProcessorStream(sys)) - def scheduler2PaymentProcessorStream: ActorSystem[_] => Scheduler2PaymentProcessorStream - override def scheduler2EntityProcessorStreams - : ActorSystem[_] => Seq[Scheduler2EntityProcessorStream[_, _]] = sys => - Seq(scheduler2PaymentProcessorStream(sys)) + def paymentEventProcessorStreams: ActorSystem[_] => Seq[EventProcessorStream[_]] = sys => + Seq(paymentCommandProcessorStream(sys)) :+ scheduler2PaymentProcessorStream(sys) /** initialize all event processor streams */ override def eventProcessorStreams: ActorSystem[_] => Seq[EventProcessorStream[_]] = sys => - schedulerEventProcessorStreams(sys) ++ paymentEventProcessorStreams(sys) + paymentEventProcessorStreams(sys) } diff --git a/core/src/main/scala/app/softnetwork/payment/persistence/query/GenericPaymentCommandProcessorStream.scala b/core/src/main/scala/app/softnetwork/payment/persistence/query/GenericPaymentCommandProcessorStream.scala index a92fb1f..7580155 100644 --- a/core/src/main/scala/app/softnetwork/payment/persistence/query/GenericPaymentCommandProcessorStream.scala +++ b/core/src/main/scala/app/softnetwork/payment/persistence/query/GenericPaymentCommandProcessorStream.scala @@ -11,7 +11,7 @@ import app.softnetwork.persistence.query.{EventProcessorStream, JournalProvider} import scala.concurrent.Future -trait GenericPaymentCommandProcessorStream extends EventProcessorStream[PaymentCommandEvent] { +trait GenericPaymentCommandProcessorStream extends EventProcessorStream[PaymentEventWithCommand] { _: JournalProvider with GenericPaymentHandler => override lazy val tag: String = PaymentSettings.ExternalToPaymentAccountTag @@ -33,181 +33,186 @@ trait GenericPaymentCommandProcessorStream extends EventProcessorStream[PaymentC * @return */ override protected def processEvent( - event: PaymentCommandEvent, + event: PaymentEventWithCommand, persistenceId: PersistenceId, sequenceNr: Long ): Future[Done] = { - event match { - case evt: WrapPaymentCommandEvent => processEvent(evt.event, persistenceId, sequenceNr) - case evt: CreateOrUpdatePaymentAccountCommandEvent => - val command = CreateOrUpdatePaymentAccount(evt.paymentAccount) - !?(command) map { - case PaymentAccountCreated => - if (forTests) system.eventStream.tell(Publish(event)) - Done - case PaymentAccountUpdated => - if (forTests) system.eventStream.tell(Publish(event)) - Done - case other => - logger.error( - s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" - ) - Done - } - case evt: PayInWithCardPreAuthorizedCommandEvent => - import evt._ - val command = PayInWithCardPreAuthorized(preAuthorizationId, creditedAccount) - !?(command) map { - case _: PaidInResult => - if (forTests) system.eventStream.tell(Publish(event)) - Done - case other => - logger.error( - s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" - ) - Done - } - case evt: RefundCommandEvent => - import evt._ - val command = Refund( - orderUuid, - payInTransactionId, - refundAmount, - currency, - reasonMessage, - initializedByClient - ) - !?(command) map { - case _: Refunded => - if (forTests) system.eventStream.tell(Publish(event)) - Done - case other => - logger.error( - s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" - ) - Done - } - case evt: PayOutCommandEvent => - import evt._ - val command = PayOut(orderUuid, creditedAccount, creditedAmount, feesAmount, currency) - !?(command) map { - case _: PaidOut => - if (forTests) system.eventStream.tell(Publish(event)) - Done - case other => - logger.error( - s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" - ) - Done - } - case evt: TransferCommandEvent => - import evt._ - val command = Transfer( - orderUuid, - debitedAccount, - creditedAccount, - debitedAmount, - feesAmount, - currency, - payOutRequired, - externalReference - ) - !?(command) map { - case _: Transferred => - if (forTests) system.eventStream.tell(Publish(event)) - Done - case other => - logger.error( - s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" - ) - Done - } - case evt: DirectDebitCommandEvent => - import evt._ - val command = DirectDebit( - creditedAccount, - debitedAmount, - feesAmount, - currency, - statementDescriptor, - externalReference - ) - !?(command) map { - case _: DirectDebited => - if (forTests) system.eventStream.tell(Publish(event)) - Done - case other => - logger.error( - s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" + event.command match { + case Some(commandEvent) => + commandEvent match { + case evt: CreateOrUpdatePaymentAccountCommandEvent => + val command = CreateOrUpdatePaymentAccount(evt.paymentAccount) + !?(command) map { + case PaymentAccountCreated => + if (forTests) system.eventStream.tell(Publish(event)) + Done + case PaymentAccountUpdated => + if (forTests) system.eventStream.tell(Publish(event)) + Done + case other => + logger.error( + s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" + ) + Done + } + case evt: PayInWithCardPreAuthorizedCommandEvent => + import evt._ + val command = PayInWithCardPreAuthorized(preAuthorizationId, creditedAccount) + !?(command) map { + case _: PaidInResult => + if (forTests) system.eventStream.tell(Publish(event)) + Done + case other => + logger.error( + s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" + ) + Done + } + case evt: RefundCommandEvent => + import evt._ + val command = Refund( + orderUuid, + payInTransactionId, + refundAmount, + currency, + reasonMessage, + initializedByClient ) - Done - } - case evt: LoadDirectDebitTransactionCommandEvent => - import evt._ - val command = LoadDirectDebitTransaction(directDebitTransactionId) - !?(command) map { - case _: DirectDebited => - if (forTests) system.eventStream.tell(Publish(event)) - Done - case other => - logger.error( - s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" + !?(command) map { + case _: Refunded => + if (forTests) system.eventStream.tell(Publish(event)) + Done + case other => + logger.error( + s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" + ) + Done + } + case evt: PayOutCommandEvent => + import evt._ + val command = PayOut(orderUuid, creditedAccount, creditedAmount, feesAmount, currency) + !?(command) map { + case _: PaidOut => + if (forTests) system.eventStream.tell(Publish(event)) + Done + case other => + logger.error( + s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" + ) + Done + } + case evt: TransferCommandEvent => + import evt._ + val command = Transfer( + orderUuid, + debitedAccount, + creditedAccount, + debitedAmount, + feesAmount, + currency, + payOutRequired, + externalReference ) - Done - } - case evt: RegisterRecurringPaymentCommandEvent => - import evt._ - val command = RegisterRecurringPayment( - debitedAccount, - firstDebitedAmount, - firstFeesAmount, - currency, - `type`, - startDate, - endDate, - frequency, - fixedNextAmount, - nextDebitedAmount, - nextFeesAmount - ) - !?(command) map { - case _: RecurringPaymentRegistered => - if (forTests) system.eventStream.tell(Publish(event)) - Done - case other => - logger.error( - s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" + !?(command) map { + case _: Transferred => + if (forTests) system.eventStream.tell(Publish(event)) + Done + case other => + logger.error( + s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" + ) + Done + } + case evt: DirectDebitCommandEvent => + import evt._ + val command = DirectDebit( + creditedAccount, + debitedAmount, + feesAmount, + currency, + statementDescriptor, + externalReference ) - Done - } - case evt: CancelPreAuthorizationCommandEvent => - import evt._ - val command = CancelPreAuthorization(orderUuid, cardPreAuthorizedTransactionId) - !?(command) map { - case _: PreAuthorizationCanceled => - if (forTests) system.eventStream.tell(Publish(event)) - Done - case other => - logger.error( - s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" + !?(command) map { + case _: DirectDebited => + if (forTests) system.eventStream.tell(Publish(event)) + Done + case other => + logger.error( + s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" + ) + Done + } + case evt: LoadDirectDebitTransactionCommandEvent => + import evt._ + val command = LoadDirectDebitTransaction(directDebitTransactionId) + !?(command) map { + case _: DirectDebited => + if (forTests) system.eventStream.tell(Publish(event)) + Done + case other => + logger.error( + s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" + ) + Done + } + case evt: RegisterRecurringPaymentCommandEvent => + import evt._ + val command = RegisterRecurringPayment( + debitedAccount, + firstDebitedAmount, + firstFeesAmount, + currency, + `type`, + startDate, + endDate, + frequency, + fixedNextAmount, + nextDebitedAmount, + nextFeesAmount ) - Done - } - case evt: CancelMandateCommandEvent => - import evt._ - val command = CancelMandate(externalUuid) - !?(command) map { - case MandateCanceled => - if (forTests) system.eventStream.tell(Publish(event)) - Done + !?(command) map { + case _: RecurringPaymentRegistered => + if (forTests) system.eventStream.tell(Publish(event)) + Done + case other => + logger.error( + s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" + ) + Done + } + case evt: CancelPreAuthorizationCommandEvent => + import evt._ + val command = CancelPreAuthorization(orderUuid, cardPreAuthorizedTransactionId) + !?(command) map { + case _: PreAuthorizationCanceled => + if (forTests) system.eventStream.tell(Publish(event)) + Done + case other => + logger.error( + s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" + ) + Done + } + case evt: CancelMandateCommandEvent => + import evt._ + val command = CancelMandate(externalUuid) + !?(command) map { + case MandateCanceled => + if (forTests) system.eventStream.tell(Publish(event)) + Done + case other => + logger.error( + s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" + ) + Done + } case other => - logger.error( - s"$platformEventProcessorId - command $command returns unexpectedly ${other.getClass}" - ) - Done + logger.warn(s"$platformEventProcessorId does not support event [${other.getClass}]") + Future.successful(Done) } - case other => - logger.warn(s"$platformEventProcessorId does not support event [${other.getClass}]") + case None => + logger.warn(s"$platformEventProcessorId does not support event without command") Future.successful(Done) } } diff --git a/core/src/main/scala/app/softnetwork/payment/persistence/query/Scheduler2PaymentProcessorStream.scala b/core/src/main/scala/app/softnetwork/payment/persistence/query/Scheduler2PaymentProcessorStream.scala index 3d8d868..d9ef2d7 100644 --- a/core/src/main/scala/app/softnetwork/payment/persistence/query/Scheduler2PaymentProcessorStream.scala +++ b/core/src/main/scala/app/softnetwork/payment/persistence/query/Scheduler2PaymentProcessorStream.scala @@ -10,7 +10,7 @@ import app.softnetwork.payment.message.PaymentMessages.{ } import app.softnetwork.persistence.query.JournalProvider import app.softnetwork.scheduler.persistence.query.Scheduler2EntityProcessorStream -import org.softnetwork.akka.model.Schedule +import app.softnetwork.scheduler.model.Schedule import scala.concurrent.Future diff --git a/core/src/main/scala/app/softnetwork/payment/persistence/typed/GenericPaymentBehavior.scala b/core/src/main/scala/app/softnetwork/payment/persistence/typed/GenericPaymentBehavior.scala index b9540fa..b9cc257 100644 --- a/core/src/main/scala/app/softnetwork/payment/persistence/typed/GenericPaymentBehavior.scala +++ b/core/src/main/scala/app/softnetwork/payment/persistence/typed/GenericPaymentBehavior.scala @@ -20,9 +20,13 @@ import app.softnetwork.scheduler.config.SchedulerSettings import app.softnetwork.serialization.asJson import app.softnetwork.time.{now => _, _} import org.slf4j.Logger -import org.softnetwork.akka.message.SchedulerEvents.SchedulerEventWithCommand +import app.softnetwork.scheduler.message.SchedulerEvents.{ + ExternalEntityToSchedulerEvent, + ExternalSchedulerEvent, + SchedulerEventWithCommand +} import app.softnetwork.scheduler.message.{AddSchedule, RemoveSchedule} -import org.softnetwork.akka.model.Schedule +import app.softnetwork.scheduler.model.Schedule import scala.reflect.ClassTag import scala.util.{Failure, Success} @@ -30,7 +34,12 @@ import scala.util.{Failure, Success} /** Created by smanciot on 22/04/2022. */ trait GenericPaymentBehavior - extends TimeStampedBehavior[PaymentCommand, PaymentAccount, PaymentEvent, PaymentResult] + extends TimeStampedBehavior[ + PaymentCommand, + PaymentAccount, + ExternalSchedulerEvent, + PaymentResult + ] with ManifestWrapper[PaymentAccount] { _: PaymentProvider => override protected val manifestWrapper: ManifestW = ManifestW() @@ -63,15 +72,12 @@ trait GenericPaymentBehavior * @return * event tags */ - override protected def tagEvent(entityId: String, event: PaymentEvent): Set[String] = + override protected def tagEvent(entityId: String, event: ExternalSchedulerEvent): Set[String] = event match { case _: BroadcastEvent => Set(s"${persistenceId.toLowerCase}-to-external") case _: CrudEvent => Set(s"${persistenceId.toLowerCase}-to-elastic") case _: SchedulerEventWithCommand => - Set( - s"$persistenceId-to-scheduler", - SchedulerSettings.SchedulerConfig.eventStreams.entityToSchedulerTag - ) + Set(SchedulerSettings.SchedulerConfig.eventStreams.entityToSchedulerTag) case _ => Set(persistenceId) } @@ -94,7 +100,7 @@ trait GenericPaymentBehavior timers: TimerScheduler[PaymentCommand] )(implicit context: ActorContext[PaymentCommand] - ): Effect[PaymentEvent, Option[PaymentAccount]] = { + ): Effect[ExternalSchedulerEvent, Option[PaymentAccount]] = { implicit val system: ActorSystem[_] = context.system implicit val log: Logger = context.log command match { @@ -226,7 +232,7 @@ trait GenericPaymentBehavior preRegisterCard(Some(userId), currency, user.externalUuid) match { case Some(cardPreRegistration) => keyValueDao.addKeyValue(cardPreRegistration.id, entityId) - val walletEvents: List[PaymentEvent] = + val walletEvents: List[ExternalSchedulerEvent] = if (registerWallet) { broadcastEvent( WalletRegisteredEvent.defaultInstance @@ -1861,7 +1867,7 @@ trait GenericPaymentBehavior updatedPaymentAccount = updatedPaymentAccount.resetBankAccountId(Some(bankAccountId)) - var events: List[PaymentEvent] = List.empty + var events: List[ExternalSchedulerEvent] = List.empty if (shouldCreateBankAccount) { updatedPaymentAccount = updatedPaymentAccount @@ -2185,7 +2191,7 @@ trait GenericPaymentBehavior }) match { case Some(declaration) => import cmd._ - var events: List[PaymentEvent] = List.empty + var events: List[ExternalSchedulerEvent] = List.empty val lastUpdated = now() @@ -2296,7 +2302,7 @@ trait GenericPaymentBehavior declaration.status } } - var events: List[PaymentEvent] = List.empty + var events: List[ExternalSchedulerEvent] = List.empty val lastUpdated = now() var updatedDeclaration = declaration.withStatus(internalStatus) @@ -2367,7 +2373,7 @@ trait GenericPaymentBehavior case Some(paymentAccount) => val lastUpdated = now() - var events: List[PaymentEvent] = + var events: List[ExternalSchedulerEvent] = broadcastEvent( PaymentAccountStatusUpdatedEvent.defaultInstance .withExternalUuid(paymentAccount.externalUuid) @@ -2468,7 +2474,7 @@ trait GenericPaymentBehavior var updatedPaymentAccount = paymentAccount .copy(bankAccount = None) .withLastUpdated(lastUpdated) - var events: List[PaymentEvent] = { + var events: List[ExternalSchedulerEvent] = { broadcastEvent( BankAccountDeletedEvent.defaultInstance .withExternalUuid(paymentAccount.externalUuid) @@ -2681,21 +2687,23 @@ trait GenericPaymentBehavior nextFeesAmount = cmd.nextFeesAmount ) import app.softnetwork.time._ - val nextDirectDebit: List[ScheduleForPaymentAdded] = + val nextDirectDebit: List[ExternalEntityToSchedulerEvent] = recurringPayment.nextPaymentDate.map(_.toDate) match { case Some(value) => recurringPayment = recurringPayment.withNextRecurringPaymentDate(value) List( - ScheduleForPaymentAdded( - AddSchedule( - Schedule( - persistenceId, - entityId, - s"$nextRecurringPayment#${recurringPayment.getId}", - 1, - Some(false), - Some(value), - None + ExternalEntityToSchedulerEvent( + ExternalEntityToSchedulerEvent.Wrapped.AddSchedule( + AddSchedule( + Schedule( + persistenceId, + entityId, + s"$nextRecurringPayment#${recurringPayment.getId}", + 1, + Some(false), + Some(value), + None + ) ) ) ) @@ -2768,11 +2776,13 @@ trait GenericPaymentBehavior ) ++ { if (result.status.isEnded) { // cancel scheduled payIn for recurring card payment List( - ScheduleForPaymentRemoved( - RemoveSchedule( - persistenceId, - entityId, - s"$nextRecurringPayment#${cmd.recurringPayInRegistrationId}" + ExternalEntityToSchedulerEvent( + ExternalEntityToSchedulerEvent.Wrapped.RemoveSchedule( + RemoveSchedule( + persistenceId, + entityId, + s"$nextRecurringPayment#${cmd.recurringPayInRegistrationId}" + ) ) ) ) @@ -2820,7 +2830,7 @@ trait GenericPaymentBehavior feesAmount: Int, currency: String, reason: String - )(implicit context: ActorContext[_]): Effect[PaymentEvent, Option[PaymentAccount]] = { + )(implicit context: ActorContext[_]): Effect[ExternalSchedulerEvent, Option[PaymentAccount]] = { Effect .persist( broadcastEvent( @@ -2838,25 +2848,29 @@ trait GenericPaymentBehavior ) :+ { recurringPayment.nextRecurringPaymentDate match { case Some(value) => - ScheduleForPaymentAdded( - AddSchedule( - Schedule( - persistenceId, - entityId, - s"$nextRecurringPayment#${recurringPayment.getId}", - 1, - Some(false), - Some(value), - None + ExternalEntityToSchedulerEvent( + ExternalEntityToSchedulerEvent.Wrapped.AddSchedule( + AddSchedule( + Schedule( + persistenceId, + entityId, + s"$nextRecurringPayment#${recurringPayment.getId}", + 1, + Some(false), + Some(value), + None + ) ) ) ) case _ => - ScheduleForPaymentRemoved( - RemoveSchedule( - persistenceId, - entityId, - s"$nextRecurringPayment#${recurringPayment.getId}" + ExternalEntityToSchedulerEvent( + ExternalEntityToSchedulerEvent.Wrapped.RemoveSchedule( + RemoveSchedule( + persistenceId, + entityId, + s"$nextRecurringPayment#${recurringPayment.getId}" + ) ) ) } @@ -2878,7 +2892,7 @@ trait GenericPaymentBehavior paymentAccount: PaymentAccount, creditedUserId: String, bankAccountId: String - )(implicit context: ActorContext[_]): Effect[PaymentEvent, Option[PaymentAccount]] = { + )(implicit context: ActorContext[_]): Effect[ExternalSchedulerEvent, Option[PaymentAccount]] = { implicit val system: ActorSystem[_] = context.system mandate(creditedAccount, creditedUserId, bankAccountId) match { case Some(mandateResult) => @@ -2932,7 +2946,7 @@ trait GenericPaymentBehavior * @return * new state */ - override def handleEvent(state: Option[PaymentAccount], event: PaymentEvent)(implicit + override def handleEvent(state: Option[PaymentAccount], event: ExternalSchedulerEvent)(implicit context: ActorContext[_] ): Option[PaymentAccount] = event match { @@ -2983,7 +2997,10 @@ trait GenericPaymentBehavior paymentAccount: PaymentAccount, recurringPayment: RecurringPayment, transaction: Transaction - )(implicit system: ActorSystem[_], log: Logger): Effect[PaymentEvent, Option[PaymentAccount]] = { + )(implicit + system: ActorSystem[_], + log: Logger + ): Effect[ExternalSchedulerEvent, Option[PaymentAccount]] = { keyValueDao.addKeyValue( transaction.id, entityId @@ -3077,25 +3094,29 @@ trait GenericPaymentBehavior ) :+ { updatedRecurringPayment.nextRecurringPaymentDate match { case Some(value) => - ScheduleForPaymentAdded( - AddSchedule( - Schedule( - persistenceId, - entityId, - s"$nextRecurringPayment#${recurringPayment.getId}", - 1, - Some(false), - Some(value), - None + ExternalEntityToSchedulerEvent( + ExternalEntityToSchedulerEvent.Wrapped.AddSchedule( + AddSchedule( + Schedule( + persistenceId, + entityId, + s"$nextRecurringPayment#${recurringPayment.getId}", + 1, + Some(false), + Some(value), + None + ) ) ) ) case _ => - ScheduleForPaymentRemoved( - RemoveSchedule( - persistenceId, - entityId, - s"$nextRecurringPayment#${recurringPayment.getId}" + ExternalEntityToSchedulerEvent( + ExternalEntityToSchedulerEvent.Wrapped.RemoveSchedule( + RemoveSchedule( + persistenceId, + entityId, + s"$nextRecurringPayment#${recurringPayment.getId}" + ) ) ) } @@ -3145,25 +3166,29 @@ trait GenericPaymentBehavior ) :+ { recurringPayment.nextRecurringPaymentDate match { case Some(value) => - ScheduleForPaymentAdded( - AddSchedule( - Schedule( - persistenceId, - entityId, - s"$nextRecurringPayment#${recurringPayment.getId}", - 1, - Some(false), - Some(value), - None + ExternalEntityToSchedulerEvent( + ExternalEntityToSchedulerEvent.Wrapped.AddSchedule( + AddSchedule( + Schedule( + persistenceId, + entityId, + s"$nextRecurringPayment#${recurringPayment.getId}", + 1, + Some(false), + Some(value), + None + ) ) ) ) case _ => - ScheduleForPaymentRemoved( - RemoveSchedule( - persistenceId, - entityId, - s"$nextRecurringPayment#${recurringPayment.getId}" + ExternalEntityToSchedulerEvent( + ExternalEntityToSchedulerEvent.Wrapped.RemoveSchedule( + RemoveSchedule( + persistenceId, + entityId, + s"$nextRecurringPayment#${recurringPayment.getId}" + ) ) ) } @@ -3199,7 +3224,10 @@ trait GenericPaymentBehavior paymentAccount: PaymentAccount, registerCard: Boolean, transaction: Transaction - )(implicit system: ActorSystem[_], log: Logger): Effect[PaymentEvent, Option[PaymentAccount]] = { + )(implicit + system: ActorSystem[_], + log: Logger + ): Effect[ExternalSchedulerEvent, Option[PaymentAccount]] = { keyValueDao.addKeyValue( transaction.id, entityId @@ -3225,7 +3253,7 @@ trait GenericPaymentBehavior case _ => if (transaction.status.isTransactionSucceeded || transaction.status.isTransactionCreated) { log.debug("Order-{} paid in: {} -> {}", orderUuid, transaction.id, asJson(transaction)) - val registerCardEvents: List[PaymentEvent] = + val registerCardEvents: List[ExternalSchedulerEvent] = if (registerCard) { transaction.cardId match { case Some(cardId) => @@ -3307,7 +3335,10 @@ trait GenericPaymentBehavior paymentAccount: PaymentAccount, registerCard: Boolean, transaction: Transaction - )(implicit system: ActorSystem[_], log: Logger): Effect[PaymentEvent, Option[PaymentAccount]] = { + )(implicit + system: ActorSystem[_], + log: Logger + ): Effect[ExternalSchedulerEvent, Option[PaymentAccount]] = { keyValueDao.addKeyValue( transaction.id, entityId @@ -3342,7 +3373,7 @@ trait GenericPaymentBehavior transaction.id, asJson(transaction) ) - val registerCardEvents: List[PaymentEvent] = + val registerCardEvents: List[ExternalSchedulerEvent] = if (registerCard) { transaction.cardId match { case Some(cardId) => @@ -3419,8 +3450,10 @@ trait GenericPaymentBehavior document: KycDocument, documentId: String, maybeStatus: Option[KycDocument.KycDocumentStatus] = None - )(implicit system: ActorSystem[_]): (KycDocumentValidationReport, List[PaymentEvent]) = { - var events: List[PaymentEvent] = List.empty + )(implicit + system: ActorSystem[_] + ): (KycDocumentValidationReport, List[ExternalSchedulerEvent]) = { + var events: List[ExternalSchedulerEvent] = List.empty val lastUpdated = now() val userId = paymentAccount.userId.getOrElse("") diff --git a/mangopay/api/build.sbt b/mangopay/api/build.sbt index 3dea5b9..517ed58 100644 --- a/mangopay/api/build.sbt +++ b/mangopay/api/build.sbt @@ -28,5 +28,6 @@ organization := "app.softnetwork.payment" name := "mangopay-api" libraryDependencies ++= Seq( - "app.softnetwork.persistence" %% "persistence-jdbc" % Versions.genericPersistence + "app.softnetwork.scheduler" %% "scheduler-api" % Versions.scheduler, + "app.softnetwork.persistence" %% "akka-persistence-jdbc" % Versions.genericPersistence ) diff --git a/mangopay/api/src/main/scala/app/softnetwork/payment/api/MangoPayApi.scala b/mangopay/api/src/main/scala/app/softnetwork/payment/api/MangoPayApi.scala index 9182f0e..ff3055b 100644 --- a/mangopay/api/src/main/scala/app/softnetwork/payment/api/MangoPayApi.scala +++ b/mangopay/api/src/main/scala/app/softnetwork/payment/api/MangoPayApi.scala @@ -13,14 +13,13 @@ import app.softnetwork.payment.persistence.typed.{GenericPaymentBehavior, MangoP import app.softnetwork.payment.service.{GenericPaymentService, MangoPayPaymentService} import app.softnetwork.persistence.jdbc.query.JdbcSchema.SchemaType import app.softnetwork.persistence.jdbc.query.{JdbcJournalProvider, JdbcSchema, JdbcSchemaProvider} -import app.softnetwork.scheduler.handlers.SchedulerHandler -import app.softnetwork.scheduler.persistence.query.Entity2SchedulerProcessorStream +import app.softnetwork.scheduler.config.SchedulerSettings import scala.concurrent.Future trait MangoPayApi extends PaymentApplication with JdbcSchemaProvider { - def jdbcSchemaType: SchemaType = this.schemaType + def internalSchemaType: SchemaType = this.schemaType override def paymentAccountBehavior: ActorSystem[_] => GenericPaymentBehavior = _ => MangoPayPaymentBehavior @@ -32,28 +31,18 @@ trait MangoPayApi extends PaymentApplication with JdbcSchemaProvider { with JdbcJournalProvider with JdbcSchemaProvider { override implicit def system: ActorSystem[_] = sys - override def schemaType: JdbcSchema.SchemaType = jdbcSchemaType + override def schemaType: JdbcSchema.SchemaType = internalSchemaType } - override def entity2SchedulerProcessorStream: ActorSystem[_] => Entity2SchedulerProcessorStream = - sys => - new Entity2SchedulerProcessorStream() - with SchedulerHandler - with JdbcJournalProvider - with JdbcSchemaProvider { - override lazy val schemaType: JdbcSchema.SchemaType = jdbcSchemaType - override implicit def system: ActorSystem[_] = sys - } - override def scheduler2PaymentProcessorStream : ActorSystem[_] => Scheduler2PaymentProcessorStream = sys => new Scheduler2PaymentProcessorStream with MangoPayPaymentHandler with JdbcJournalProvider with JdbcSchemaProvider { - override val tag: String = s"${MangoPayPaymentBehavior.persistenceId}-scheduler" + override val tag: String = SchedulerSettings.tag(MangoPayPaymentBehavior.persistenceId) override implicit def system: ActorSystem[_] = sys - override def schemaType: JdbcSchema.SchemaType = jdbcSchemaType + override def schemaType: JdbcSchema.SchemaType = internalSchemaType } override def paymentService: ActorSystem[_] => GenericPaymentService = sys => diff --git a/mangopay/api/src/main/scala/app/softnetwork/payment/api/MangoPayWithSchedulerApi.scala b/mangopay/api/src/main/scala/app/softnetwork/payment/api/MangoPayWithSchedulerApi.scala new file mode 100644 index 0000000..7a610fa --- /dev/null +++ b/mangopay/api/src/main/scala/app/softnetwork/payment/api/MangoPayWithSchedulerApi.scala @@ -0,0 +1,42 @@ +package app.softnetwork.payment.api + +import akka.actor.typed.ActorSystem +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import app.softnetwork.persistence.jdbc.query.{JdbcJournalProvider, JdbcSchema, JdbcSchemaProvider} +import app.softnetwork.persistence.launch.PersistentEntity +import app.softnetwork.persistence.query.EventProcessorStream +import app.softnetwork.scheduler.api.{SchedulerApi, SchedulerServiceApiHandler} +import app.softnetwork.scheduler.handlers.SchedulerHandler +import app.softnetwork.scheduler.persistence.query.Entity2SchedulerProcessorStream + +import scala.concurrent.Future + +trait MangoPayWithSchedulerApi extends MangoPayApi with SchedulerApi { + + override def entity2SchedulerProcessorStream: ActorSystem[_] => Entity2SchedulerProcessorStream = + sys => + new Entity2SchedulerProcessorStream() + with SchedulerHandler + with JdbcJournalProvider + with JdbcSchemaProvider { + override lazy val schemaType: JdbcSchema.SchemaType = jdbcSchemaType + override implicit def system: ActorSystem[_] = sys + } + + override def entities: ActorSystem[_] => Seq[PersistentEntity[_, _, _, _]] = sys => + schedulerEntities(sys) ++ sessionEntities(sys) ++ paymentEntities(sys) + + override def eventProcessorStreams: ActorSystem[_] => Seq[EventProcessorStream[_]] = sys => + schedulerEventProcessorStreams(sys) ++ + paymentEventProcessorStreams(sys) + + /*override def initSystem: ActorSystem[_] => Unit = system => { + initSchedulerSystem(system) + }*/ + + override def grpcServices + : ActorSystem[_] => Seq[PartialFunction[HttpRequest, Future[HttpResponse]]] = system => + Seq( + PaymentServiceApiHandler.partial(MangoPayServer(system))(system) + ) :+ SchedulerServiceApiHandler.partial(schedulerServer(system))(system) +} diff --git a/mangopay/api/src/main/scala/app/softnetwork/payment/api/MangoPayWithSchedulerPostgresLauncher.scala b/mangopay/api/src/main/scala/app/softnetwork/payment/api/MangoPayWithSchedulerPostgresLauncher.scala new file mode 100644 index 0000000..2565535 --- /dev/null +++ b/mangopay/api/src/main/scala/app/softnetwork/payment/api/MangoPayWithSchedulerPostgresLauncher.scala @@ -0,0 +1,7 @@ +package app.softnetwork.payment.api + +import app.softnetwork.persistence.jdbc.query.PostgresSchemaProvider + +object MangoPayWithSchedulerPostgresLauncher + extends MangoPayWithSchedulerApi + with PostgresSchemaProvider diff --git a/project/src/main/scala/app/softnetwork/sbt/build/Versions.scala b/project/src/main/scala/app/softnetwork/sbt/build/Versions.scala index c6af51f..2c8a4a1 100644 --- a/project/src/main/scala/app/softnetwork/sbt/build/Versions.scala +++ b/project/src/main/scala/app/softnetwork/sbt/build/Versions.scala @@ -4,7 +4,7 @@ object Versions { val genericPersistence = "0.2.5.15" - val scheduler = "0.1.4" + val scheduler = "0.2.1" val server = "0.2.6.2" diff --git a/testkit/src/main/scala/app/softnetwork/payment/api/PaymentGrpcServer.scala b/testkit/src/main/scala/app/softnetwork/payment/api/PaymentGrpcServer.scala index eff2446..1f9d51d 100644 --- a/testkit/src/main/scala/app/softnetwork/payment/api/PaymentGrpcServer.scala +++ b/testkit/src/main/scala/app/softnetwork/payment/api/PaymentGrpcServer.scala @@ -3,11 +3,12 @@ package app.softnetwork.payment.api import akka.http.scaladsl.testkit.PersistenceScalatestGrpcTest import app.softnetwork.payment.launch.PaymentGuardian import app.softnetwork.persistence.scalatest.InMemoryPersistenceTestKit +import app.softnetwork.scheduler.launch.SchedulerGuardian import org.scalatest.Suite trait PaymentGrpcServer extends PersistenceScalatestGrpcTest with PaymentGrpcServices - with InMemoryPersistenceTestKit { _: Suite with PaymentGuardian => + with InMemoryPersistenceTestKit { _: Suite with PaymentGuardian with SchedulerGuardian => override lazy val additionalConfig: String = paymentGrpcConfig } diff --git a/testkit/src/main/scala/app/softnetwork/payment/api/PaymentGrpcServices.scala b/testkit/src/main/scala/app/softnetwork/payment/api/PaymentGrpcServices.scala index c91d243..8b2e363 100644 --- a/testkit/src/main/scala/app/softnetwork/payment/api/PaymentGrpcServices.scala +++ b/testkit/src/main/scala/app/softnetwork/payment/api/PaymentGrpcServices.scala @@ -5,22 +5,23 @@ import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import app.softnetwork.api.server.scalatest.ServerTestKit import app.softnetwork.payment.launch.PaymentGuardian import app.softnetwork.scheduler.api.SchedulerGrpcServices +import app.softnetwork.scheduler.launch.SchedulerGuardian import scala.concurrent.Future trait PaymentGrpcServices extends SchedulerGrpcServices { - _: PaymentGuardian with ServerTestKit => + _: PaymentGuardian with SchedulerGuardian with ServerTestKit => override def grpcServices - : ActorSystem[_] => Seq[PartialFunction[HttpRequest, Future[HttpResponse]]] = - paymentGrpcServices + : ActorSystem[_] => Seq[PartialFunction[HttpRequest, Future[HttpResponse]]] = system => + paymentGrpcServices(system) ++ schedulerGrpcServices(system) def paymentGrpcServices : ActorSystem[_] => Seq[PartialFunction[HttpRequest, Future[HttpResponse]]] = system => Seq( PaymentServiceApiHandler.partial(MockPaymentServer(system))(system) - ) ++ schedulerGrpcServices(system) + ) def paymentGrpcConfig: String = schedulerGrpcConfig + s""" |# Important: enable HTTP/2 in ActorSystem's config diff --git a/testkit/src/main/scala/app/softnetwork/payment/scalatest/PaymentTestKit.scala b/testkit/src/main/scala/app/softnetwork/payment/scalatest/PaymentTestKit.scala index be574c9..18b19ab 100644 --- a/testkit/src/main/scala/app/softnetwork/payment/scalatest/PaymentTestKit.scala +++ b/testkit/src/main/scala/app/softnetwork/payment/scalatest/PaymentTestKit.scala @@ -21,7 +21,9 @@ import app.softnetwork.payment.persistence.query.{ } import app.softnetwork.payment.persistence.typed.{GenericPaymentBehavior, MockPaymentBehavior} import app.softnetwork.payment.service.{GenericPaymentService, MockPaymentService} -import app.softnetwork.persistence.query.InMemoryJournalProvider +import app.softnetwork.persistence.launch.PersistentEntity +import app.softnetwork.persistence.query.{EventProcessorStream, InMemoryJournalProvider} +import app.softnetwork.scheduler.config.SchedulerSettings import app.softnetwork.scheduler.scalatest.SchedulerTestKit import app.softnetwork.session.scalatest.{SessionServiceRoute, SessionTestKit} import org.scalatest.Suite @@ -49,7 +51,7 @@ trait PaymentTestKit extends SchedulerTestKit with PaymentGuardian { _: Suite => override def scheduler2PaymentProcessorStream : ActorSystem[_] => Scheduler2PaymentProcessorStream = sys => new Scheduler2PaymentProcessorStream with MockPaymentHandler with InMemoryJournalProvider { - override val tag: String = s"${MockPaymentBehavior.persistenceId}-scheduler" + override val tag: String = SchedulerSettings.tag(MockPaymentBehavior.persistenceId) override val forTests: Boolean = true override implicit def system: ActorSystem[_] = sys } @@ -149,6 +151,16 @@ trait PaymentTestKit extends SchedulerTestKit with PaymentGuardian { _: Suite => } } + override def entities: ActorSystem[_] => Seq[PersistentEntity[_, _, _, _]] = sys => + schedulerEntities(sys) ++ sessionEntities(sys) ++ paymentEntities(sys) + + override def eventProcessorStreams: ActorSystem[_] => Seq[EventProcessorStream[_]] = sys => + schedulerEventProcessorStreams(sys) ++ + paymentEventProcessorStreams(sys) + + /*override def initSystem: ActorSystem[_] => Unit = system => { + initSchedulerSystem(system) + }*/ } trait PaymentRouteTestKit