From 59d59e77a8f825813e46188b043abef3c1fbdc0f Mon Sep 17 00:00:00 2001 From: Marc Morera Date: Fri, 8 Oct 2021 08:45:42 +0200 Subject: [PATCH] Buses are now built inside static methods - Useful for buses creation without installing the bundle - Added an Alias for the async bus called AsyncCommandBus to force the async one. Alias for the main one. --- Bus/AsyncCommandBus.php | 23 ++ .../CompilerPass/BusCompilerPass.php | 211 ++++++++++++------ 2 files changed, 160 insertions(+), 74 deletions(-) create mode 100644 Bus/AsyncCommandBus.php diff --git a/Bus/AsyncCommandBus.php b/Bus/AsyncCommandBus.php new file mode 100644 index 0000000..d7ee6c2 --- /dev/null +++ b/Bus/AsyncCommandBus.php @@ -0,0 +1,23 @@ + + */ + +declare(strict_types=1); + +namespace Drift\CommandBus\Bus; + +/** + * Class AsyncCommandBus. + */ +class AsyncCommandBus extends CommandBus +{ +} diff --git a/DependencyInjection/CompilerPass/BusCompilerPass.php b/DependencyInjection/CompilerPass/BusCompilerPass.php index ad8d97f..2facfac 100644 --- a/DependencyInjection/CompilerPass/BusCompilerPass.php +++ b/DependencyInjection/CompilerPass/BusCompilerPass.php @@ -21,6 +21,7 @@ use Drift\CommandBus\Async\InMemoryAdapter; use Drift\CommandBus\Async\PostgreSQLAdapter; use Drift\CommandBus\Async\RedisAdapter; +use Drift\CommandBus\Bus\AsyncCommandBus; use Drift\CommandBus\Bus\CommandBus; use Drift\CommandBus\Bus\InlineCommandBus; use Drift\CommandBus\Bus\QueryBus; @@ -49,65 +50,106 @@ */ class BusCompilerPass implements CompilerPassInterface { + const BUS_TYPE_COMMAND = 'command'; + const BUS_TYPE_QUERY = 'query'; + /** * {@inheritdoc} */ public function process(ContainerBuilder $container) { - $asyncBus = $this->createAsyncMiddleware($container); - - $this->createQueryHandlerMiddleware($container); - $this->createCommandHandlerMiddleware($container); - $this->createQueryBus($container); - $this->createCommandBus($container, $asyncBus); - $this->createInlineCommandBus($container); - $this->createBusDebugger($container); + $asyncEnabled = static::createAsyncMiddleware($container); + static:: createBuses( + $container, + $asyncEnabled, + $container->getParameter('bus.query_bus.distribution'), + $container->getParameter('bus.query_bus.middlewares'), + $container->getParameter('bus.command_bus.distribution'), + $container->getParameter('bus.command_bus.middlewares') + ); - if ($asyncBus) { - $this->createCommandConsumer($container); - $this->createInfrastructureCreateCommand($container); - $this->createInfrastructureDropCommand($container); - $this->createInfrastructureCheckCommand($container); + if ($asyncEnabled) { + static::createAsyncBuses($container); } } /** - * Check for async middleware needs and return if has been created. - * * @param ContainerBuilder $container + * @param bool $asyncBus + * @param string $queryBusDistribution + * @param array $queryMiddlewares + * @param string $commandBusDistribution + * @param array $commandMiddlewares + */ + public static function createBuses( + ContainerBuilder $container, + bool $asyncBus, + string $queryBusDistribution, + array $queryMiddlewares, + string $commandBusDistribution, + array $commandMiddlewares + ) { + static::createQueryHandlerMiddleware($container); + static::createCommandHandlerMiddleware($container); + static::createQueryBus($container, $queryBusDistribution, $queryMiddlewares); + static::createCommandBus($container, $commandBusDistribution, $commandMiddlewares, $asyncBus); + static::createInlineCommandBus($container, $commandBusDistribution, $commandMiddlewares); + static::createBusDebugger($container); + } + + /** + * @param ContainerBuilder $container + */ + public static function createAsyncBuses(ContainerBuilder $container) + { + static::createCommandConsumer($container); + static::createInfrastructureCreateCommand($container); + static::createInfrastructureDropCommand($container); + static::createInfrastructureCheckCommand($container); + } + + /** + * @param ContainerBuilder $container + * @param array|null $adapter * * @return bool */ - public function createAsyncMiddleware(ContainerBuilder $container): bool - { - $asyncAdapters = $container->getParameter('bus.command_bus.async_adapter'); + public static function createAsyncMiddleware( + ContainerBuilder $container, + array $adapter = null + ): bool { + if (is_array($adapter)) { + $adapterType = $adapter['type']; + } else { + $asyncAdapters = $container->getParameter('bus.command_bus.async_adapter'); + + if ( + empty($asyncAdapters) || + ( + isset($asyncAdapters['adapter']) && + !isset($asyncAdapters[$asyncAdapters['adapter']]) + ) + ) { + return false; + } - if ( - empty($asyncAdapters) || - ( - isset($asyncAdapters['adapter']) && - !isset($asyncAdapters[$asyncAdapters['adapter']]) - ) - ) { - return false; + $adapterType = $asyncAdapters['adapter'] ?? array_key_first($asyncAdapters); + $adapterType = $container->resolveEnvPlaceholders($adapterType, true); + $adapter = $asyncAdapters[$adapterType]; } - $adapterType = $asyncAdapters['adapter'] ?? array_key_first($asyncAdapters); - $adapterType = $container->resolveEnvPlaceholders($adapterType, true); - $adapter = $asyncAdapters[$adapterType]; - switch ($adapterType) { case 'amqp': - $this->createAMQPAsyncAdapter($container, $adapter); + static::createAMQPAsyncAdapter($container, $adapter); break; case 'in_memory': - $this->createInMemoryAsyncAdapter($container); + static::createInMemoryAsyncAdapter($container); break; case 'redis': - $this->createRedisAsyncAdapter($container, $adapter); + static::createRedisAsyncAdapter($container, $adapter); break; case 'postgresql': - $this->createPostgreSQLAsyncAdapter($container, $adapter); + static::createPostgreSQLAsyncAdapter($container, $adapter); break; default: throw new Exception('Wrong adapter. Please use one of this list: amqp, in_memory, redis, postgresql.'); @@ -132,11 +174,11 @@ public function createAsyncMiddleware(ContainerBuilder $container): bool * @throws InvalidMiddlewareException * @throws ReflectionException */ - private function createQueryHandlerMiddleware(ContainerBuilder $container) + private static function createQueryHandlerMiddleware(ContainerBuilder $container) { $handlerMiddlewareId = 'bus.query_bus.handler_middleware'; $handlerMiddleware = new Definition(HandlerMiddleware::class); - $handlerMap = $this->createHandlersMap($container, 'query_handler'); + $handlerMap = static::createHandlersMap($container, 'query_handler'); foreach ($handlerMap as $command => list($reference, $method)) { $handlerMiddleware->addMethodCall('addHandler', [ @@ -155,11 +197,11 @@ private function createQueryHandlerMiddleware(ContainerBuilder $container) * @throws InvalidMiddlewareException * @throws ReflectionException */ - private function createCommandHandlerMiddleware(ContainerBuilder $container) + private static function createCommandHandlerMiddleware(ContainerBuilder $container) { $handlerMiddlewareId = 'bus.command_bus.handler_middleware'; $handlerMiddleware = new Definition(HandlerMiddleware::class); - $handlerMap = $this->createHandlersMap($container, 'command_handler'); + $handlerMap = static::createHandlersMap($container, 'command_handler'); foreach ($handlerMap as $command => list($reference, $method)) { $handlerMiddleware->addMethodCall('addHandler', [ @@ -174,18 +216,24 @@ private function createCommandHandlerMiddleware(ContainerBuilder $container) * Create query bus. * * @param ContainerBuilder $container + * @param string $queryBusDistribution + * @param array $queryMiddlewares */ - private function createQueryBus(ContainerBuilder $container) - { + private static function createQueryBus( + ContainerBuilder $container, + string $queryBusDistribution, + array $queryMiddlewares + ) { $container->setDefinition('drift.query_bus', (new Definition( QueryBus::class, [ new Reference(LoopInterface::class), - $this->createMiddlewaresArray( + static::createMiddlewaresArray( $container, - 'query', - false + $queryMiddlewares, + false, + self::BUS_TYPE_QUERY ), - $container->getParameter('bus.query_bus.distribution'), + $queryBusDistribution, ] )) ->addTag('preload') @@ -199,21 +247,27 @@ private function createQueryBus(ContainerBuilder $container) * Create command bus. * * @param ContainerBuilder $container + * @param string $commandBusDistribution + * @param array $commandMiddlewares * @param bool $asyncBus */ - private function createCommandBus( + private static function createCommandBus( ContainerBuilder $container, + string $commandBusDistribution, + array $commandMiddlewares, bool $asyncBus ) { + $class = $asyncBus ? AsyncCommandBus::class : CommandBus::class; $container->setDefinition('drift.command_bus', (new Definition( - CommandBus::class, [ + $class, [ new Reference(LoopInterface::class), - $this->createMiddlewaresArray( + static::createMiddlewaresArray( $container, - 'command', - $asyncBus + $commandMiddlewares, + $asyncBus, + self::BUS_TYPE_COMMAND ), - $container->getParameter('bus.command_bus.distribution'), + $commandBusDistribution, ] )) ->addTag('preload') @@ -221,24 +275,34 @@ private function createCommandBus( ); $container->setAlias(CommandBus::class, 'drift.command_bus'); + + if ($asyncBus) { + $container->setAlias(AsyncCommandBus::class, 'drift.command_bus'); + } } /** * Create command bus. * * @param ContainerBuilder $container + * @param string $commandBusDistribution + * @param array $commandMiddlewares */ - private function createInlineCommandBus(ContainerBuilder $container) - { + private static function createInlineCommandBus( + ContainerBuilder $container, + string $commandBusDistribution, + array $commandMiddlewares + ) { $container->setDefinition('drift.inline_command_bus', (new Definition( InlineCommandBus::class, [ new Reference(LoopInterface::class), - $this->createMiddlewaresArray( + static::createMiddlewaresArray( $container, - 'command', - false + $commandMiddlewares, + false, + self::BUS_TYPE_COMMAND ), - $container->getParameter('bus.command_bus.distribution'), + $commandBusDistribution, ] )) ->addTag('preload') @@ -249,20 +313,19 @@ private function createInlineCommandBus(ContainerBuilder $container) } /** - * Create array of middlewares by configuration. - * * @param ContainerBuilder $container - * @param string $type + * @param array $definedMiddlewares * @param bool $isAsync + * @param string $type * * @return array */ - private function createMiddlewaresArray( + private static function createMiddlewaresArray( ContainerBuilder $container, - string $type, - bool $isAsync = false + array $definedMiddlewares, + bool $isAsync, + string $type ) { - $definedMiddlewares = $container->getParameter("bus.{$type}_bus.middlewares"); $asyncFound = array_search('@async', $definedMiddlewares); $middlewares = []; @@ -276,7 +339,7 @@ private function createMiddlewaresArray( if ('@async' === $middleware) { if ( true === $isAsync && - 'command' === $type + self::BUS_TYPE_COMMAND === $type ) { $middlewares[] = new Reference(AsyncMiddleware::class); @@ -324,7 +387,7 @@ private function createMiddlewaresArray( * @throws InvalidMiddlewareException * @throws ReflectionException */ - private function createHandlersMap( + private static function createHandlersMap( ContainerBuilder $container, string $tag ): array { @@ -356,7 +419,7 @@ private function createHandlersMap( * * @param ContainerBuilder $container */ - private function createCommandConsumer(ContainerBuilder $container) + private static function createCommandConsumer(ContainerBuilder $container) { $consumer = new Definition(CommandConsumerCommand::class, [ new Reference(AsyncAdapter::class), @@ -376,7 +439,7 @@ private function createCommandConsumer(ContainerBuilder $container) * * @param ContainerBuilder $container */ - private function createBusDebugger(ContainerBuilder $container) + private static function createBusDebugger(ContainerBuilder $container) { $consumer = new Definition(DebugCommandBusCommand::class, [ new Reference('drift.command_bus'), @@ -396,7 +459,7 @@ private function createBusDebugger(ContainerBuilder $container) * * @param ContainerBuilder $container */ - private function createInfrastructureCreateCommand(ContainerBuilder $container) + private static function createInfrastructureCreateCommand(ContainerBuilder $container) { $consumer = new Definition(InfrastructureCreateCommand::class, [ new Reference(AsyncAdapter::class), @@ -415,7 +478,7 @@ private function createInfrastructureCreateCommand(ContainerBuilder $container) * * @param ContainerBuilder $container */ - private function createInfrastructureDropCommand(ContainerBuilder $container) + private static function createInfrastructureDropCommand(ContainerBuilder $container) { $consumer = new Definition(InfrastructureDropCommand::class, [ new Reference(AsyncAdapter::class), @@ -434,7 +497,7 @@ private function createInfrastructureDropCommand(ContainerBuilder $container) * * @param ContainerBuilder $container */ - private function createInfrastructureCheckCommand(ContainerBuilder $container) + private static function createInfrastructureCheckCommand(ContainerBuilder $container) { $consumer = new Definition(InfrastructureCheckCommand::class, [ new Reference(AsyncAdapter::class), @@ -457,7 +520,7 @@ private function createInfrastructureCheckCommand(ContainerBuilder $container) * * @param ContainerBuilder $container */ - private function createInMemoryAsyncAdapter(ContainerBuilder $container) + private static function createInMemoryAsyncAdapter(ContainerBuilder $container) { $container->setDefinition( AsyncAdapter::class, @@ -475,7 +538,7 @@ private function createInMemoryAsyncAdapter(ContainerBuilder $container) * @param ContainerBuilder $container * @param array $adapter */ - private function createRedisAsyncAdapter( + private static function createRedisAsyncAdapter( ContainerBuilder $container, array $adapter ) { @@ -500,7 +563,7 @@ private function createRedisAsyncAdapter( * @param ContainerBuilder $container * @param array $adapter */ - private function createPostgreSQLAsyncAdapter( + private static function createPostgreSQLAsyncAdapter( ContainerBuilder $container, array $adapter ) { @@ -527,7 +590,7 @@ private function createPostgreSQLAsyncAdapter( * @param ContainerBuilder $container * @param array $adapter */ - private function createAMQPAsyncAdapter( + private static function createAMQPAsyncAdapter( ContainerBuilder $container, array $adapter ) {