diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..5f7ce83 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,18 @@ +{ + "permissions": { + "allow": [ + "WebFetch(domain:github.com)", + "WebFetch(domain:symfony.com)", + "WebFetch(domain:nette.org)", + "Bash(make:*)", + "Bash(make:*:*)", + "Bash(grep:*)", + "Bash(cat:*)", + "Bash(ls:*)", + "Bash(gh pr:*)", + "Bash(git add:*)", + "Bash(git commit:*)", + "Bash(git fetch:*)" + ] + } +} diff --git a/.docs/README.md b/.docs/README.md index 5d78ae8..a01e040 100644 --- a/.docs/README.md +++ b/.docs/README.md @@ -81,6 +81,22 @@ messenger: debug: panel: %debugMode% + # Worker limits for production environments. + # Workers will gracefully stop when configured limit is reached. + worker: + memoryLimit: 134217728 # int|null (bytes), e.g. 128 MB + timeLimit: 3600 # int|null (seconds), e.g. 1 hour + messageLimit: 1000 # int|null, stop after N messages + failureLimit: 5 # int|null, stop after N failures + + # PSR-6 cache pool for messenger:stop-workers command and worker restart signal. + # When configured, registers StopWorkersCommand and StopWorkerOnRestartSignalListener. + cache: @cache.pool + + # Fallback bus for RoutableMessageBus. Used when no bus stamp is present on the envelope. + # Defaults to null (no fallback). Set to a bus name to enable. + fallbackBus: messageBus + # Defines buses, default one are messageBus, queryBus and commandBus. bus: messageBus: @@ -119,12 +135,14 @@ messenger: # consoleLogger: @specialLogger # Defines transport factories. + # Built-in factories (sync, inMemory, amqp, redis) are auto-registered when their classes exist. + # Doctrine factory is auto-registered when ConnectionRegistry is available in the container. transportFactory: # redis: Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory - # sync: Symfony\Component\Messenger\Transport\Sync\SyncTransportFactorya + # sync: Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory # amqp: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory # doctrine: Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory - # inMemory: Symfony\Component\Messenger\Transport\InMemoryTransportFactory + # inMemory: Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory # inMemory: @customMemoryTransportFactory # Defines global failure transport. Default is none. @@ -180,6 +198,8 @@ messenger: # Defines routing (message -> transport) # If the routing for message is missing, the message will be handled by handler immediately when dispatched + # Routing can also be defined via #[AsMessage] attribute on message classes (see below). + # NEON config takes precedence over attributes. routing: App\Domain\NewUserEmail: [redis] App\Domain\ForgotPasswordEmail: [db, redis] @@ -220,6 +240,40 @@ final class SimpleMessage } ``` +#### Routing via `#[AsMessage]` attribute + +Instead of configuring routing in NEON, you can use the `#[AsMessage]` attribute directly on your message class. +The attribute-based routing is auto-discovered from handler method parameters. NEON config takes precedence over attributes. + +```php + @@ -341,8 +395,7 @@ extensions: **Roadmap** -- No fallbackBus in RoutableMessageBus. -- No debug console commands. +- No Tracy debug panel integration (`TraceableMessageBus`). ## Examples diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..da4741e --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,81 @@ +# AGENTS.md + +Nette DI integration for Symfony Messenger. Library and DI extension, not an application. Provides message buses, async transports, handler auto-discovery, retry strategies, and failure transport routing — all configured via Nette DI extension and compiled through ordered passes. + +## Stack + +- PHP >=8.2, Symfony 7.x/8.x, Nette DI ^3.1 +- PHPStan level 9 (phpVersion 80200) +- Nette Tester (`.phpt` files) +- Contributte code style (`ruleset.xml`) + +## Codebase + +- `src/DI/` — main entry point; extension delegates to ordered passes in `Pass/`, each handling one concern (serializers, transports, routing, handlers, events, logging, console, buses, debug) +- `src/Bus/` — bus wrappers (message, command, query) and registry +- `src/Container/` — PSR-11 adapters for Nette DI +- `src/Handler/` — runtime handler locator with wildcard/interface/parent matching +- `src/Logger/` — dual HTTP/console logger bridge +- `tests/Cases/` — tests grouped by concern (DI/, Bus/, E2E/) +- `tests/Mocks/` — simple DTOs and handlers used as test doubles +- `tests/Toolkit/` — container builder and test helpers + +## Architecture + +- Extension delegates to ordered passes via load -> beforeCompile -> afterCompile hooks, each handling one concern +- Pass priority: serializers/transports -> routing/handlers -> events/logging/console -> buses/debug +- Modify the responsible pass, not the extension +- Config under `messenger:` key — schema defined in extension class, update schema first, then wire into the pass +- Config values can be class names, `@service` references, or DI statements; routing/failure transports validated at compile time +- Tag names defined as extension constants; service names follow `messenger.bus..*`, `messenger.transport.`, `messenger.serializer.` — preserve these +- Handlers registered via DI tag or `#[AsMessageHandler]`, message type inferred from first parameter type-hint (`__invoke`) +- Union/intersection types rejected; handlers grouped per bus, sorted by priority +- Runtime handler matching: concrete class, parents, interfaces, namespace wildcards, `*` +- Default middleware order: bus name stamp -> dispatch after current bus -> failed message processing -> [custom] -> send -> handle — preserve this order +- Transport factories registered only when corresponding Symfony bridge class exists +- Retry defaults to multiplier; event pass wires retry/failure listeners, reuses existing event dispatcher + +## Code Style + +- `withDefaults()->withCompiler(...)->build()` +- Inline NEON config via `Helpers::neon(<<<'NEON' ... NEON)` +- Assertions: `Assert::type()`, `Assert::count()`, `Assert::equal()`, `Assert::exception()` +- DI tests verify service registration, tags, and config validation errors +- E2E tests extend `TestCase` for full dispatch->handle workflows +- File naming: `MessengerExtension.{feature}.phpt` +- Mocks are simple DTOs/handlers with public properties for assertions +- Compiled containers written to `tests/tmp` for debugging + +## Upstream + +- Docs: https://symfony.com/doc/current/messenger.html +- Source: https://github.com/symfony/messenger +- Changelog: https://github.com/symfony/messenger/blob/8.1/CHANGELOG.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..43c994c --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +@AGENTS.md diff --git a/composer.json b/composer.json index d8687ad..3c29e54 100644 --- a/composer.json +++ b/composer.json @@ -20,27 +20,29 @@ ], "require": { "ext-json": "*", - "php": ">=8.0", + "php": ">=8.2", "nette/di": "^3.1.2", "psr/container": "^2.0.2", "psr/cache": "^2.0.0 || ^3.0.0", "psr/log": "^2.0.0 || ^3.0.0", - "symfony/messenger": "^6.0.19 || ^6.2.8", - "symfony/event-dispatcher": "^6.0.19 || ^6.2.8", - "symfony/console": "^6.0.19 || ^6.2.10", - "symfony/var-dumper": "^6.0.19 || ^6.2.10" + "symfony/messenger": "^7.4.7 || ^8.0.7", + "symfony/event-dispatcher": "^7.4.7 || ^8.0.7", + "symfony/console": "^7.4.7 || ^8.0.7", + "symfony/var-dumper": "^7.4.7 || ^8.0.7" }, "require-dev": { - "psr/container": "^2.0.2", "mockery/mockery": "^1.3.3", - "symfony/redis-messenger": "^6.0.19 || ^6.2.10", - "symfony/amqp-messenger": "^6.0.19 || ^6.2.8", - "symfony/doctrine-messenger": "^6.0.19 || ^6.2.10", + "symfony/redis-messenger": "^7.4.7 || ^8.0.7", + "symfony/amqp-messenger": "^7.4.7 || ^8.0.7", + "symfony/doctrine-messenger": "^7.4.7 || ^8.0.7", + "symfony/cache": "^7.4.7 || ^8.0.7", + "doctrine/dbal": "^4.4.2", + "doctrine/orm": "^3.6.2", "contributte/console": "^0.10.0", "contributte/event-dispatcher": "^0.9.0", "contributte/qa": "^0.4.0", - "contributte/tester": "^0.3.0", - "contributte/phpstan": "^0.1.0", + "contributte/tester": "^0.4.0", + "contributte/phpstan": "^0.3.0", "tracy/tracy": "^2.10.2" }, "autoload": { diff --git a/phpstan.neon b/phpstan.neon index 485bc1c..517b83c 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -3,7 +3,7 @@ includes: parameters: level: 9 - phpVersion: 80000 + phpVersion: 80200 scanDirectories: - src @@ -21,25 +21,7 @@ parameters: count: 1 path: src/DI/Pass/HandlerPass.php - - - message: """ - #^Fetching class constant class of deprecated class Symfony\\\\Component\\\\Messenger\\\\Handler\\\\MessageHandlerInterface\\: - since Symfony 6\\.2, use the \\{@see AsMessageHandler\\} attribute instead$# - """ - count: 1 - path: src/DI/Pass/HandlerPass.php - - - - message: "#^Class ReflectionIntersectionType not found\\.$#" - count: 1 - path: src/DI/Utils/Reflector.php - - message: "#^Dead catch \\- ReflectionException is never thrown in the try block\\.$#" count: 1 - path: src/DI/Utils/Reflector.php - - - - message: "#^PHPDoc tag @var for variable \\$type contains unknown class ReflectionIntersectionType\\.$#" - count: 1 - path: src/DI/Utils/Reflector.php + path: src/DI/Utils/Reflector.php \ No newline at end of file diff --git a/ruleset.xml b/ruleset.xml index db36657..eb05e7a 100644 --- a/ruleset.xml +++ b/ruleset.xml @@ -1,7 +1,7 @@ - + diff --git a/src/DI/MessengerExtension.php b/src/DI/MessengerExtension.php index f144706..ce92058 100644 --- a/src/DI/MessengerExtension.php +++ b/src/DI/MessengerExtension.php @@ -13,8 +13,8 @@ use Contributte\Messenger\DI\Pass\SerializerPass; use Contributte\Messenger\DI\Pass\TransportFactoryPass; use Contributte\Messenger\DI\Pass\TransportPass; -use Nette\DI\Definitions\Statement; use Nette\DI\CompilerExtension; +use Nette\DI\Definitions\Statement; use Nette\PhpGenerator\ClassType; use Nette\Schema\Expect; use Nette\Schema\Schema; @@ -61,9 +61,9 @@ public function __construct() public function getConfigSchema(): Schema { - $expectClass = Expect::string()->required()->assert(fn ($input) => class_exists($input) || interface_exists($input)); + $expectClass = Expect::string()->required()->assert(fn ($input) => is_string($input) && (class_exists($input) || interface_exists($input))); $expectService = Expect::anyOf( - Expect::string()->required()->assert(fn ($input) => str_starts_with($input, '@') || class_exists($input) || interface_exists($input)), + Expect::string()->required()->assert(fn ($input) => is_string($input) && (str_starts_with($input, '@') || class_exists($input) || interface_exists($input))), Expect::type(Statement::class)->required(), ); $expectLoosyService = Expect::anyOf( @@ -75,14 +75,24 @@ public function getConfigSchema(): Schema 'debug' => Expect::structure([ 'panel' => Expect::bool(false), ]), + 'worker' => Expect::structure([ + 'memoryLimit' => Expect::int()->nullable(), + 'timeLimit' => Expect::int()->nullable(), + 'messageLimit' => Expect::int()->nullable(), + 'failureLimit' => Expect::int()->nullable(), + ]), + 'cache' => Expect::anyOf( + Expect::string()->required(), + Expect::type(Statement::class)->required(), + )->nullable(), + 'fallbackBus' => Expect::string()->nullable(), 'bus' => Expect::arrayOf( Expect::structure([ 'defaultMiddlewares' => Expect::bool(true), 'middlewares' => Expect::arrayOf((clone $expectService)), 'allowNoHandlers' => Expect::bool(false), 'allowNoSenders' => Expect::bool(true), - 'autowired' => Expect::bool(), - 'class' => (clone $expectClass)->required(false)->assert(fn ($input) => is_subclass_of($input, MessageBusInterface::class), 'Specified bus class must implements "MessageBusInterface"'), + 'class' => (clone $expectClass)->required(false)->assert(fn ($input) => is_string($input) && is_subclass_of($input, MessageBusInterface::class), 'Specified bus class must implements "MessageBusInterface"'), 'wrapper' => (clone $expectClass)->required(false), ]), Expect::string()->required(), @@ -91,7 +101,6 @@ public function getConfigSchema(): Schema 'defaultMiddlewares' => true, 'middlewares' => [], 'class' => null, - 'autowired' => true, 'allowNoHandlers' => false, 'allowNoSenders' => true, ], diff --git a/src/DI/Pass/AbstractPass.php b/src/DI/Pass/AbstractPass.php index 610c00d..966f64f 100644 --- a/src/DI/Pass/AbstractPass.php +++ b/src/DI/Pass/AbstractPass.php @@ -51,7 +51,7 @@ public function getContainerBuilder(): ContainerBuilder return $this->extension->getContainerBuilder(); } - public function getConfig(): stdclass + public function getConfig(): stdClass { /** @var stdclass $ret */ $ret = (object) $this->extension->getConfig(); diff --git a/src/DI/Pass/BusPass.php b/src/DI/Pass/BusPass.php index f848096..0cc9c74 100644 --- a/src/DI/Pass/BusPass.php +++ b/src/DI/Pass/BusPass.php @@ -77,10 +77,10 @@ public function loadPassConfiguration(): void ->addSetup('setLogger', [$this->prefix('@logger.logger')]); } - // Register message bus + // Register message bus (not autowired — RoutableMessageBus is the autowired MessageBusInterface) $builder->addDefinition($this->prefix(sprintf('bus.%s.bus', $name))) ->setFactory($busConfig->class ?? SymfonyMessageBus::class, [$middlewares]) - ->setAutowired($busConfig->autowired ?? count($builder->findByTag(MessengerExtension::BUS_TAG)) === 0) + ->setAutowired(false) ->setTags([MessengerExtension::BUS_TAG => $name]); // Register bus wrapper @@ -95,10 +95,16 @@ public function loadPassConfiguration(): void ->setFactory(NetteContainer::class) ->setAutowired(false); - // Register routable bus (for CLI) + // Register routable bus — autowired as MessageBusInterface so that + // SyncTransport (and any other autowired consumer) routes through + // the correct bus based on BusNameStamp. + $fallbackBusName = $config->fallbackBus ?? null; $builder->addDefinition($this->prefix('bus.routable')) - ->setFactory(RoutableMessageBus::class, [$this->prefix('@bus.container')]) // @TODO fallbackBus - ->setAutowired(false); + ->setFactory(RoutableMessageBus::class, [ + $this->prefix('@bus.container'), + $fallbackBusName !== null ? $this->prefix(sprintf('@bus.%s.bus', $fallbackBusName)) : null, + ]) + ->setAutowired(true); // Register bus registry $builder->addDefinition($this->prefix('busRegistry')) diff --git a/src/DI/Pass/ConsolePass.php b/src/DI/Pass/ConsolePass.php index a3e6fcd..ee8f275 100644 --- a/src/DI/Pass/ConsolePass.php +++ b/src/DI/Pass/ConsolePass.php @@ -33,15 +33,13 @@ public function loadPassConfiguration(): void ]); $builder->addDefinition($this->extension->prefix('console.debugCommand')) - ->setFactory(DebugCommand::class, [[]]); // @TODO mapping + ->setFactory(DebugCommand::class, [[]]); $builder->addDefinition($this->extension->prefix('console.setupTransportsCommand')) - ->setFactory(SetupTransportsCommand::class, [$this->prefix('@transport.container'), []]); // @TODO transportNames + ->setFactory(SetupTransportsCommand::class, [$this->prefix('@transport.container'), []]); - if (class_exists(StatsCommand::class)) { - $builder->addDefinition($this->extension->prefix('console.statsCommand')) - ->setFactory(StatsCommand::class, [$this->prefix('@transport.container'), []]); // @TODO transportNames - } + $builder->addDefinition($this->extension->prefix('console.statsCommand')) + ->setFactory(StatsCommand::class, [$this->prefix('@transport.container'), []]); $builder->addDefinition($this->extension->prefix('console.failedMessageRemoveCommand')) ->setFactory(FailedMessagesRemoveCommand::class, [ @@ -67,10 +65,9 @@ public function loadPassConfiguration(): void $this->prefix('@serializer.default'), ]); - if (PHP_VERSION === 'fake') { - // TODO + if ($config->cache !== null) { $builder->addDefinition($this->extension->prefix('console.stopWorkersCommand')) - ->setFactory(StopWorkersCommand::class); + ->setFactory(StopWorkersCommand::class, [$config->cache]); } } @@ -80,10 +77,23 @@ public function loadPassConfiguration(): void public function beforePassCompile(): void { $builder = $this->getContainerBuilder(); + $builderMan = BuilderMan::of($this); + + // Transport names + $transportNames = array_keys($builderMan->getTransports()); /** @var ServiceDefinition $setupTransportsCommandDef */ $setupTransportsCommandDef = $builder->getDefinition($this->prefix('console.setupTransportsCommand')); - $setupTransportsCommandDef->setArgument(1, array_keys(BuilderMan::of($this)->getTransports())); + $setupTransportsCommandDef->setArgument(1, $transportNames); + + /** @var ServiceDefinition $statsCommandDef */ + $statsCommandDef = $builder->getDefinition($this->prefix('console.statsCommand')); + $statsCommandDef->setArgument(1, $transportNames); + + // Handler mapping + /** @var ServiceDefinition $debugCommandDef */ + $debugCommandDef = $builder->getDefinition($this->prefix('console.debugCommand')); + $debugCommandDef->setArgument(0, $builderMan->getHandlerMapping()); } } diff --git a/src/DI/Pass/EventPass.php b/src/DI/Pass/EventPass.php index c56926f..76419c1 100644 --- a/src/DI/Pass/EventPass.php +++ b/src/DI/Pass/EventPass.php @@ -10,10 +10,15 @@ use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener; use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener; +use Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; -use Symfony\Component\Messenger\EventListener\StopWorkerOnSignalsListener; -use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnCustomStopExceptionListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener; class EventPass extends AbstractPass { @@ -96,14 +101,62 @@ public function beforePassCompile(): void ]), ]); - // Stop on signal - if (class_exists(StopWorkerOnSignalsListener::class)) { + // Custom stop exception + $dispatcher->addSetup('addSubscriber', [ + new Statement(StopWorkerOnCustomStopExceptionListener::class), + ]); + + // Reset memory usage + $dispatcher->addSetup('addSubscriber', [ + new Statement(ResetMemoryUsageListener::class), + ]); + + // Worker limits + $config = $this->getConfig(); + + if ($config->worker->memoryLimit !== null) { + $dispatcher->addSetup('addSubscriber', [ + new Statement(StopWorkerOnMemoryLimitListener::class, [ + $config->worker->memoryLimit, + $this->prefix('@logger.logger'), + ]), + ]); + } + + if ($config->worker->timeLimit !== null) { $dispatcher->addSetup('addSubscriber', [ - new Statement(StopWorkerOnSignalsListener::class, [null, $this->prefix('@logger.logger')]), + new Statement(StopWorkerOnTimeLimitListener::class, [ + $config->worker->timeLimit, + $this->prefix('@logger.logger'), + ]), ]); - } elseif (class_exists(StopWorkerOnSigtermSignalListener::class)) { + } + + if ($config->worker->messageLimit !== null) { + $dispatcher->addSetup('addSubscriber', [ + new Statement(StopWorkerOnMessageLimitListener::class, [ + $config->worker->messageLimit, + $this->prefix('@logger.logger'), + ]), + ]); + } + + if ($config->worker->failureLimit !== null) { + $dispatcher->addSetup('addSubscriber', [ + new Statement(StopWorkerOnFailureLimitListener::class, [ + $config->worker->failureLimit, + $this->prefix('@logger.logger'), + ]), + ]); + } + + // Restart signal (requires cache) + if ($config->cache !== null) { $dispatcher->addSetup('addSubscriber', [ - new Statement(StopWorkerOnSigtermSignalListener::class, [$this->prefix('@logger.logger')]), // @phpstan-ignore-line + new Statement(StopWorkerOnRestartSignalListener::class, [ + $config->cache, + $this->prefix('@logger.logger'), + ]), ]); } } diff --git a/src/DI/Pass/HandlerPass.php b/src/DI/Pass/HandlerPass.php index 379fcbb..564d1d1 100644 --- a/src/DI/Pass/HandlerPass.php +++ b/src/DI/Pass/HandlerPass.php @@ -3,13 +3,13 @@ namespace Contributte\Messenger\DI\Pass; use Contributte\Messenger\DI\MessengerExtension; +use Contributte\Messenger\DI\Utils\BuilderMan; use Contributte\Messenger\DI\Utils\Reflector; use Contributte\Messenger\Exception\LogicalException; use Nette\DI\Definitions\Definition; use Nette\DI\Definitions\ServiceDefinition; use ReflectionClass; use ReflectionException; -use Symfony\Component\Messenger\Handler\MessageHandlerInterface; use function array_merge; use function is_numeric; use function is_string; @@ -41,7 +41,7 @@ public function beforePassCompile(): void $handlers = []; // Collect all message handlers from DIC - $serviceHandlers = $this->getMessageHandlers(); + $serviceHandlers = BuilderMan::of($this)->getHandlerServiceNames(); // Iterate all found handlers foreach ($serviceHandlers as $serviceName) { @@ -87,44 +87,6 @@ public function beforePassCompile(): void } } - /** - * @return array - */ - private function getMessageHandlers(): array - { - $builder = $this->getContainerBuilder(); - - // Find all handlers - $serviceHandlers = []; - $serviceHandlers = array_merge($serviceHandlers, array_keys($builder->findByTag(MessengerExtension::HANDLER_TAG))); - $serviceHandlers = array_merge($serviceHandlers, array_keys($builder->findByType(MessageHandlerInterface::class))); - - foreach ($builder->getDefinitions() as $definition) { - /** @var class-string $class */ - $class = $definition->getType(); - - // Skip definitions without type - if ($definition->getType() === null) { - continue; - } - - // Skip definitions without name - if ($definition->getName() === null) { - continue; - } - - // Skip services without attribute - if (Reflector::getMessageHandlers($class) === []) { - continue; - } - - $serviceHandlers[] = $definition->getName(); - } - - // Clean duplicates - return array_unique($serviceHandlers); - } - /** * @return listgetContainerBuilder(); - $config = $this->getConfig(); $builder->addDefinition($this->prefix('routing.locator')) - ->setFactory(SendersLocator::class, [$config->routing, $this->prefix('@transport.container')]); + ->setFactory(SendersLocator::class, [[], $this->prefix('@transport.container')]); } /** @@ -26,14 +28,66 @@ public function loadPassConfiguration(): void */ public function beforePassCompile(): void { + $builder = $this->getContainerBuilder(); $config = $this->getConfig(); - $transports = array_values($this->getContainerBuilder()->findByTag(MessengerExtension::TRANSPORT_TAG)); + $transports = array_values($builder->findByTag(MessengerExtension::TRANSPORT_TAG)); + + // Scan message classes for #[AsMessage] attribute routing + $attributeRouting = $this->discoverAttributeRouting(); - foreach ($config->routing as $routingEntity => $routingTransports) { + // Merge: NEON config takes precedence over attributes + $routing = array_merge($attributeRouting, (array) $config->routing); + + // Validate + foreach ($routing as $routingEntity => $routingTransports) { if (($diff = array_diff($routingTransports, $transports)) !== []) { throw new LogicalException(sprintf('Invalid transport "%s" defined for "%s". Available transports "%s".', implode(',', $diff), $routingEntity, implode(',', $transports))); } } + + // Update SendersLocator with merged routing + /** @var ServiceDefinition $locatorDef */ + $locatorDef = $builder->getDefinition($this->prefix('routing.locator')); + $locatorDef->setArgument(0, $routing); + } + + /** + * Discover routing from #[AsMessage] attributes on message classes + * by scanning handler method parameters. + * + * @return array> + */ + private function discoverAttributeRouting(): array + { + $builder = $this->getContainerBuilder(); + $routing = []; + + foreach (BuilderMan::of($this)->getHandlerServiceNames() as $serviceName) { + $definition = $builder->getDefinition($serviceName); + /** @var class-string|null $handlerClass */ + $handlerClass = $definition->getType(); + + if ($handlerClass === null || !class_exists($handlerClass)) { + continue; + } + + foreach (Reflector::getHandlerMessageClasses($handlerClass) as $messageClass) { + if (isset($routing[$messageClass]) || !class_exists($messageClass)) { + continue; + } + + foreach (Reflector::getMessageRouting($messageClass) as $attribute) { + if ($attribute->transport === null) { + continue; + } + + $transports = is_array($attribute->transport) ? $attribute->transport : [$attribute->transport]; + $routing[$messageClass] = array_merge($routing[$messageClass] ?? [], $transports); + } + } + } + + return $routing; } } diff --git a/src/DI/Pass/TransportFactoryPass.php b/src/DI/Pass/TransportFactoryPass.php index 0dcb35e..e613742 100644 --- a/src/DI/Pass/TransportFactoryPass.php +++ b/src/DI/Pass/TransportFactoryPass.php @@ -4,7 +4,10 @@ use Contributte\Messenger\DI\MessengerExtension; use Contributte\Messenger\DI\Utils\BuilderMan; +use Doctrine\Persistence\ConnectionRegistry; +use Nette\DI\Definitions\ServiceDefinition; use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory; +use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory; use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory; use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory; use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory; @@ -18,6 +21,7 @@ class TransportFactoryPass extends AbstractPass 'inMemory' => InMemoryTransportFactory::class, 'amqp' => AmqpTransportFactory::class, 'redis' => RedisTransportFactory::class, + 'doctrine' => DoctrineTransportFactory::class, ]; /** @@ -28,8 +32,12 @@ public function loadPassConfiguration(): void $builder = $this->getContainerBuilder(); $config = $this->getConfig(); - // Filter out class factory that cannot be found - $defaultFactories = array_filter(self::DEFAULT_TRANSPORT_FACTORY, static fn ($class, $name) => class_exists($class), ARRAY_FILTER_USE_BOTH); + // Filter out class factory that cannot be found, exclude doctrine (requires ConnectionRegistry) + $defaultFactories = array_filter( + self::DEFAULT_TRANSPORT_FACTORY, + static fn ($class, $name) => class_exists($class) && $name !== 'doctrine', + ARRAY_FILTER_USE_BOTH, + ); // Merge default + user defined factories $transportFactories = array_merge($defaultFactories, (array) $config->transportFactory); @@ -41,8 +49,32 @@ public function loadPassConfiguration(): void ->addTag(MessengerExtension::TRANSPORT_FACTORY_TAG, $name); } + // Placeholder: TransportFactory will be finalized in beforePassCompile $builder->addDefinition($this->prefix('transportFactory')) - ->setFactory(TransportFactory::class, [BuilderMan::of($this)->getTransportFactories()]); + ->setFactory(TransportFactory::class, [[]]); + } + + /** + * Decorate services + */ + public function beforePassCompile(): void + { + $builder = $this->getContainerBuilder(); + + // Register Doctrine transport factory when ConnectionRegistry is available + if (class_exists(DoctrineTransportFactory::class) && interface_exists(ConnectionRegistry::class)) { + if ($builder->getByType(ConnectionRegistry::class, false) !== null) { + $builder->addDefinition($this->prefix('transportFactory.doctrine')) + ->setFactory(DoctrineTransportFactory::class) + ->setAutowired(false) + ->addTag(MessengerExtension::TRANSPORT_FACTORY_TAG, 'doctrine'); + } + } + + // Finalize TransportFactory with all registered factories + /** @var ServiceDefinition $transportFactoryDef */ + $transportFactoryDef = $builder->getDefinition($this->prefix('transportFactory')); + $transportFactoryDef->setArgument(0, BuilderMan::of($this)->getTransportFactories()); } } diff --git a/src/DI/Utils/BuilderMan.php b/src/DI/Utils/BuilderMan.php index 6f5e892..3621261 100644 --- a/src/DI/Utils/BuilderMan.php +++ b/src/DI/Utils/BuilderMan.php @@ -6,6 +6,7 @@ use Contributte\Messenger\DI\Pass\AbstractPass; use Contributte\Messenger\Exception\LogicalException; use Nette\DI\Definitions\Definition; +use Nette\DI\Definitions\ServiceDefinition; use Nette\DI\Definitions\Statement; final class BuilderMan @@ -109,6 +110,89 @@ public function getFailedTransports(): array return $transportsMapping; } + /** + * @return list + */ + public function getHandlerServiceNames(): array + { + $builder = $this->pass->getContainerBuilder(); + + $serviceHandlers = array_keys($builder->findByTag(MessengerExtension::HANDLER_TAG)); + + foreach ($builder->getDefinitions() as $definition) { + /** @var class-string|null $class */ + $class = $definition->getType(); + + if ($class === null || !class_exists($class)) { + continue; + } + + $name = $definition->getName(); + + if ($name === null) { + continue; + } + + if (Reflector::getMessageHandlers($class) !== []) { + $serviceHandlers[] = $name; + } + } + + return array_values(array_unique($serviceHandlers)); + } + + /** + * @return array}>>> + */ + public function getHandlerMapping(): array + { + $builder = $this->pass->getContainerBuilder(); + $config = $this->pass->getConfig(); + $mapping = []; + + foreach ($config->bus as $busName => $busConfig) { + /** @var ServiceDefinition $locator */ + $locator = $builder->getDefinition($this->pass->prefix(sprintf('bus.%s.locator', $busName))); + + /** @var array> $handlers */ + $handlers = $locator->getFactory()->arguments[0] ?? []; + + $busMapping = []; + + foreach ($handlers as $messageClass => $handlerDescriptors) { + $descriptions = []; + + foreach ($handlerDescriptors as $descriptor) { + $serviceDef = $builder->getDefinition($descriptor['service']); + $description = $serviceDef->getType() ?? $descriptor['service']; + + $options = []; + + if ($descriptor['method'] !== '__invoke') { + $description .= '::' . $descriptor['method']; + $options['method'] = $descriptor['method']; + } + + if (isset($descriptor['from_transport'])) { + $options['from_transport'] = $descriptor['from_transport']; + } + + if (isset($descriptor['alias'])) { + $options['alias'] = $descriptor['alias']; + } + + $descriptions[] = [$description, $options]; + } + + $busMapping[$messageClass] = $descriptions; + } + + $mapping[$busName] = $busMapping; + } + + return $mapping; + } + public function getSerializer(string|Statement|null $serializer): Statement|string { if ($serializer === null) { diff --git a/src/DI/Utils/Reflector.php b/src/DI/Utils/Reflector.php index 543814f..fa5bba0 100644 --- a/src/DI/Utils/Reflector.php +++ b/src/DI/Utils/Reflector.php @@ -9,6 +9,7 @@ use ReflectionIntersectionType; use ReflectionNamedType; use ReflectionUnionType; +use Symfony\Component\Messenger\Attribute\AsMessage; use Symfony\Component\Messenger\Attribute\AsMessageHandler; use Symfony\Component\Messenger\Handler\Acknowledger; use Symfony\Component\Messenger\Handler\BatchHandlerInterface; @@ -66,11 +67,11 @@ public static function getMessageHandlerMessage(string $class, array $options): throw new LogicalException(sprintf('Handler must have "%s::%s()" method.', $class, $options['method'])); } - if ($rcMethod->getNumberOfParameters() !== 1 && !$rc->implementsInterface( BatchHandlerInterface::class)) { + if ($rcMethod->getNumberOfParameters() !== 1 && !$rc->implementsInterface(BatchHandlerInterface::class)) { throw new LogicalException(sprintf('Only one parameter is allowed in "%s::%s()."', $class, $options['method'])); } - if ($rc->implementsInterface( BatchHandlerInterface::class)) { + if ($rc->implementsInterface(BatchHandlerInterface::class)) { if ($rcMethod->getNumberOfParameters() !== 2) { throw new LogicalException(sprintf('Exactly two parameters are required in "%s::%s()."', $class, $options['method'])); } @@ -105,4 +106,50 @@ public static function getMessageHandlerMessage(string $class, array $options): return $type->getName(); } + /** + * @param class-string $handlerClass + * @return list + */ + public static function getHandlerMessageClasses(string $handlerClass): array + { + $messages = []; + + $handlers = self::getMessageHandlers($handlerClass); + + foreach ($handlers as $handler) { + $method = $handler->method ?? '__invoke'; + + try { + $messages[] = self::getMessageHandlerMessage($handlerClass, ['method' => $method]); + } catch (\Throwable) { + // Skip handlers that can't be resolved + } + } + + // Try default __invoke method only if no handlers were found from attributes + if ($handlers === []) { + try { + $messages[] = self::getMessageHandlerMessage($handlerClass, ['method' => '__invoke']); + } catch (\Throwable) { + // Skip + } + } + + return $messages; + } + + /** + * @param class-string $class + * @return list + */ + public static function getMessageRouting(string $class): array + { + $rc = new ReflectionClass($class); + + return array_map( + static fn (ReflectionAttribute $attribute): AsMessage => $attribute->newInstance(), + $rc->getAttributes(AsMessage::class), + ); + } + } diff --git a/src/Handler/ContainerServiceHandlersLocator.php b/src/Handler/ContainerServiceHandlersLocator.php index 199ad5d..76de92e 100644 --- a/src/Handler/ContainerServiceHandlersLocator.php +++ b/src/Handler/ContainerServiceHandlersLocator.php @@ -26,7 +26,7 @@ public function __construct(array $map, Container $context) } /** - * @return array + * @return iterable */ public function getHandlers(Envelope $envelope): iterable { diff --git a/tests/Cases/DI/MessengerExtension.bus.phpt b/tests/Cases/DI/MessengerExtension.bus.phpt index 2182206..5b09719 100644 --- a/tests/Cases/DI/MessengerExtension.bus.phpt +++ b/tests/Cases/DI/MessengerExtension.bus.phpt @@ -6,8 +6,11 @@ use Contributte\Tester\Toolkit; use Nette\DI\Compiler; use Nette\DI\InvalidConfigurationException; use Psr\Container\ContainerInterface; +use ReflectionClass; use Symfony\Component\Messenger\MessageBus; +use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Middleware\MiddlewareInterface; +use Symfony\Component\Messenger\RoutableMessageBus; use Tester\Assert; use Tests\Mocks\Bus\BusWrapper; use Tests\Toolkit\Container; @@ -166,3 +169,46 @@ Toolkit::test(function (): void { Assert::type(BusWrapper::class, $container->getByType(BusWrapper::class)); }); + +// RoutableMessageBus is autowired as MessageBusInterface, no fallback by default +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->build(); + + /** @var RoutableMessageBus $routableBus */ + $routableBus = $container->getService('messenger.bus.routable'); + Assert::type(RoutableMessageBus::class, $routableBus); + + // RoutableMessageBus is autowired as MessageBusInterface + Assert::type(RoutableMessageBus::class, $container->getByType(MessageBusInterface::class)); + + $rc = new ReflectionClass($routableBus); + $prop = $rc->getProperty('fallbackBus'); + Assert::null($prop->getValue($routableBus)); +}); + +// RoutableMessageBus with explicit fallback bus +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + fallbackBus: messageBus + bus: + messageBus: + commandBus: + allowNoHandlers: true + NEON + )); + }) + ->build(); + + /** @var RoutableMessageBus $routableBus */ + $routableBus = $container->getService('messenger.bus.routable'); + + $rc = new ReflectionClass($routableBus); + $prop = $rc->getProperty('fallbackBus'); + Assert::type(MessageBus::class, $prop->getValue($routableBus)); +}); diff --git a/tests/Cases/DI/MessengerExtension.cache.phpt b/tests/Cases/DI/MessengerExtension.cache.phpt new file mode 100644 index 0000000..e926670 --- /dev/null +++ b/tests/Cases/DI/MessengerExtension.cache.phpt @@ -0,0 +1,99 @@ +withDefaults() + ->build(); + + Assert::count(0, $container->findByType(StopWorkersCommand::class)); +}); + +// StopWorkersCommand registered with cache +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + cache: @cachePool + + services: + cachePool: Tests\Mocks\Cache\DummyCachePool + NEON + )); + }) + ->build(); + + Assert::count(1, $container->findByType(StopWorkersCommand::class)); +}); + +// StopWorkerOnRestartSignalListener registered with cache +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addExtension('events', new EventDispatcherExtension()); + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + cache: @cachePool + + services: + cachePool: Tests\Mocks\Cache\DummyCachePool + NEON + )); + }) + ->build(); + + /** @var EventDispatcher $dispatcher */ + $dispatcher = $container->getService('messenger.event.dispatcher'); + $listeners = []; + + foreach ($dispatcher->getListeners() as $listenersForEvent) { + if (is_array($listenersForEvent)) { + foreach ($listenersForEvent as $listenerForEvent) { + $listeners[] = $listenerForEvent[0]::class; + } + } + } + + Assert::true(in_array(StopWorkerOnRestartSignalListener::class, $listeners, true)); +}); + +// StopWorkerOnRestartSignalListener not registered without cache +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addExtension('events', new EventDispatcherExtension()); + }) + ->build(); + + /** @var EventDispatcher $dispatcher */ + $dispatcher = $container->getService('messenger.event.dispatcher'); + $listeners = []; + + foreach ($dispatcher->getListeners() as $listenersForEvent) { + if (is_array($listenersForEvent)) { + foreach ($listenersForEvent as $listenerForEvent) { + $listeners[] = $listenerForEvent[0]::class; + } + } + } + + Assert::false(in_array(StopWorkerOnRestartSignalListener::class, $listeners, true)); +}); diff --git a/tests/Cases/DI/MessengerExtension.console.phpt b/tests/Cases/DI/MessengerExtension.console.phpt new file mode 100644 index 0000000..ff0acce --- /dev/null +++ b/tests/Cases/DI/MessengerExtension.console.phpt @@ -0,0 +1,215 @@ +withDefaults() + ->build(); + + Assert::count(1, $container->findByType(ConsumeMessagesCommand::class)); + Assert::count(1, $container->findByType(DebugCommand::class)); + Assert::count(1, $container->findByType(SetupTransportsCommand::class)); + Assert::count(1, $container->findByType(FailedMessagesRemoveCommand::class)); + Assert::count(1, $container->findByType(FailedMessagesRetryCommand::class)); + Assert::count(1, $container->findByType(FailedMessagesShowCommand::class)); + Assert::count(1, $container->findByType(StatsCommand::class)); +}); + +// DebugCommand receives handler mapping +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + transport: + memory: + dsn: in-memory:// + + routing: + Tests\Mocks\Message\SimpleMessage: [memory] + + services: + - Tests\Mocks\Handler\SimpleHandler + NEON + )); + }) + ->build(); + + /** @var DebugCommand $debugCommand */ + $debugCommand = $container->getByType(DebugCommand::class); + + $rc = new ReflectionClass($debugCommand); + $prop = $rc->getProperty('mapping'); + /** @var array>> $mapping */ + $mapping = $prop->getValue($debugCommand); + + Assert::true(isset($mapping['messageBus'])); + Assert::true(isset($mapping['messageBus'][SimpleMessage::class])); + Assert::count(1, $mapping['messageBus'][SimpleMessage::class]); +}); + +// DebugCommand with multiple buses +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + bus: + messageBus: + commandBus: + allowNoHandlers: true + + services: + - Tests\Mocks\Handler\SimpleHandler + NEON + )); + }) + ->build(); + + /** @var DebugCommand $debugCommand */ + $debugCommand = $container->getByType(DebugCommand::class); + + $rc = new ReflectionClass($debugCommand); + $prop = $rc->getProperty('mapping'); + /** @var array>> $mapping */ + $mapping = $prop->getValue($debugCommand); + + Assert::true(isset($mapping['messageBus'])); + Assert::true(isset($mapping['commandBus'])); + // SimpleHandler has #[AsMessageHandler] without bus restriction, so it registers on all buses + Assert::true(isset($mapping['messageBus'][SimpleMessage::class])); + Assert::true(isset($mapping['commandBus'][SimpleMessage::class])); +}); + +// DebugCommand executes successfully and shows handler mapping +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + transport: + memory: + dsn: in-memory:// + + routing: + Tests\Mocks\Message\SimpleMessage: [memory] + + services: + - Tests\Mocks\Handler\SimpleHandler + NEON + )); + }) + ->build(); + + /** @var DebugCommand $debugCommand */ + $debugCommand = $container->getByType(DebugCommand::class); + + $tester = new CommandTester($debugCommand); + $tester->execute([]); + + $output = $tester->getDisplay(true); + + $expected = <<<'TEXT' + Messenger + ========= + + messageBus + ---------- + + The following messages can be dispatched: + + -------------------------------------------------- + Tests\Mocks\Message\SimpleMessage + handled by Tests\Mocks\Handler\SimpleHandler + + -------------------------------------------------- + TEXT; + + Assert::equal( + Console::normalize($expected), + Console::normalize($output) + ); +}); + +// SetupTransportsCommand receives transport names +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + transport: + async: + dsn: in-memory:// + failed: + dsn: in-memory:// + NEON + )); + }) + ->build(); + + /** @var SetupTransportsCommand $setupCommand */ + $setupCommand = $container->getByType(SetupTransportsCommand::class); + + $tester = new CommandTester($setupCommand); + $tester->execute([]); + + $output = $tester->getDisplay(true); + + Assert::match('~.*The "async" transport does not support setup.*~s', $output); + Assert::match('~.*The "failed" transport does not support setup.*~s', $output); + Assert::same(0, $tester->getStatusCode()); +}); + +// StatsCommand receives transport names +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + transport: + async: + dsn: in-memory:// + failed: + dsn: in-memory:// + NEON + )); + }) + ->build(); + + /** @var StatsCommand $statsCommand */ + $statsCommand = $container->getByType(StatsCommand::class); + + $tester = new CommandTester($statsCommand); + $tester->execute([]); + + $output = $tester->getDisplay(true); + + Assert::match('~.*Unable to get message count for the following transports: "async",.*"failed".*~s', $output); + Assert::same(0, $tester->getStatusCode()); +}); diff --git a/tests/Cases/DI/MessengerExtension.event.phpt b/tests/Cases/DI/MessengerExtension.event.phpt index 36b7f4b..e64931c 100644 --- a/tests/Cases/DI/MessengerExtension.event.phpt +++ b/tests/Cases/DI/MessengerExtension.event.phpt @@ -9,9 +9,10 @@ use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener; use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener; +use Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; -use Symfony\Component\Messenger\EventListener\StopWorkerOnSignalsListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnCustomStopExceptionListener; use Tester\Assert; use Tests\Toolkit\Container; @@ -74,7 +75,8 @@ Toolkit::test(static function (): void { AddErrorDetailsStampListener::class, SendFailedMessageForRetryListener::class, SendFailedMessageToFailureTransportListener::class, - StopWorkerOnSignalsListener::class, + StopWorkerOnCustomStopExceptionListener::class, + ResetMemoryUsageListener::class, ]; foreach ($expectedRegisteredListeners as $expectedRegisteredListener) { diff --git a/tests/Cases/DI/MessengerExtension.failureTransport.phpt b/tests/Cases/DI/MessengerExtension.failureTransport.phpt index 82eba33..90f91ad 100644 --- a/tests/Cases/DI/MessengerExtension.failureTransport.phpt +++ b/tests/Cases/DI/MessengerExtension.failureTransport.phpt @@ -87,7 +87,7 @@ Toolkit::test(function (): void { NEON )); }) - ->build(), + ->build(), LogicalException::class, 'Invalid failure transport "transport3" defined for "transport2" transport. Available transports "transport1, transport2".', ); diff --git a/tests/Cases/DI/MessengerExtension.handler.phpt b/tests/Cases/DI/MessengerExtension.handler.phpt index 3dc62c5..22c6edd 100644 --- a/tests/Cases/DI/MessengerExtension.handler.phpt +++ b/tests/Cases/DI/MessengerExtension.handler.phpt @@ -281,7 +281,7 @@ Toolkit::test(function (): void { NEON )); }) - ->build(); + ->build(); }, LogicalException::class, 'Handler must have "Tests\Mocks\Handler\NoMethodHandler::__invoke()" method.' @@ -313,8 +313,10 @@ function getHandlerDescriptor(NetteContainer $container, object $message, string { /** @var HandlersLocatorInterface $handlerLocator */ $handlerLocator = $container->getByName(sprintf('messenger.bus.%s.locator', $busName)); - /** @var HandlerDescriptor $handlerDescriptor */ - $handlerDescriptor = $handlerLocator->getHandlers(new Envelope($message))[0] ?? null; + + /** @var HandlerDescriptor[] $handlerDescriptors */ + $handlerDescriptors = iterator_to_array($handlerLocator->getHandlers(new Envelope($message))); + $handlerDescriptor = $handlerDescriptors[0] ?? null; Assert::notNull($handlerDescriptor); return $handlerDescriptor; diff --git a/tests/Cases/DI/MessengerExtension.routing.phpt b/tests/Cases/DI/MessengerExtension.routing.phpt index 2c30027..aee7c8a 100644 --- a/tests/Cases/DI/MessengerExtension.routing.phpt +++ b/tests/Cases/DI/MessengerExtension.routing.phpt @@ -8,14 +8,17 @@ use Contributte\Tester\Toolkit; use Nette\DI\Compiler; use Symfony\Component\Messenger\Exception\NoHandlerForMessageException; use Symfony\Component\Messenger\MessageBus; +use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport; use Tester\Assert; use Tests\Mocks\Handler\InterfaceHandler; +use Tests\Mocks\Handler\RoutedMessageHandler; use Tests\Mocks\Handler\SameHandler1; use Tests\Mocks\Handler\SameHandler2; use Tests\Mocks\Handler\SimpleHandler; use Tests\Mocks\Handler\WildcardHandler; use Tests\Mocks\Message\MessageImpl1; use Tests\Mocks\Message\MessageImpl2; +use Tests\Mocks\Message\RoutedMessage; use Tests\Mocks\Message\SameMessage; use Tests\Mocks\Message\SimpleMessage; use Tests\Toolkit\Container; @@ -100,7 +103,7 @@ Toolkit::test(function (): void { ->build(); /** @var MessageBus $messageBus */ - $messageBus = $container->getByType(MessageBus::class); + $messageBus = $container->getService('messenger.bus.messageBus.bus'); Assert::exception( static fn () => $messageBus->dispatch(new SimpleMessage('foobar')), @@ -132,7 +135,7 @@ Toolkit::test(function (): void { ->build(); /** @var MessageBus $messageBus */ - $messageBus = $container->getByType(MessageBus::class); + $messageBus = $container->getService('messenger.bus.messageBus.bus'); $messageBus->dispatch(new SameMessage('foobar')); /** @var SameHandler2 $handler1 */ @@ -263,3 +266,98 @@ Toolkit::test(function: function (): void { $messageBus->dispatch(new MessageImpl1('1')); Assert::type(MessageImpl1::class, $handler->message); }); + +// Routing via #[AsMessage] attribute +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + transport: + memory: + dsn: in-memory:// + + services: + - Tests\Mocks\Handler\RoutedMessageHandler + NEON + )); + }) + ->build(); + + /** @var BusRegistry $busRegistry */ + $busRegistry = $container->getByType(BusRegistry::class); + $messageBus = $busRegistry->get('messageBus'); + + $messageBus->dispatch(new RoutedMessage('routed')); + + // Message was routed to async transport (not handled synchronously) + /** @var InMemoryTransport $transport */ + $transport = $container->getService('messenger.transport.memory'); + $sent = $transport->getSent(); + Assert::count(1, $sent); + Assert::type(RoutedMessage::class, $sent[0]->getMessage()); +}); + +// NEON routing takes precedence over #[AsMessage] attribute +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + transport: + memory: + dsn: in-memory:// + sync: + dsn: sync:// + + routing: + Tests\Mocks\Message\RoutedMessage: [sync] + + services: + - Tests\Mocks\Handler\RoutedMessageHandler + NEON + )); + }) + ->build(); + + /** @var BusRegistry $busRegistry */ + $busRegistry = $container->getByType(BusRegistry::class); + $messageBus = $busRegistry->get('messageBus'); + + /** @var RoutedMessageHandler $handler */ + $handler = $container->getByType(RoutedMessageHandler::class); + + // NEON routes to sync, so handler is called synchronously (overriding #[AsMessage(transport: 'memory')]) + $messageBus->dispatch(new RoutedMessage('overridden')); + Assert::type(RoutedMessage::class, $handler->message); + Assert::equal('overridden', $handler->message->text); + + // Memory transport should be empty (NEON routing to sync took precedence) + /** @var InMemoryTransport $transport */ + $transport = $container->getService('messenger.transport.memory'); + Assert::count(0, $transport->getSent()); +}); + +// #[AsMessage] with invalid transport fails validation +Toolkit::test(static function (): void { + Assert::exception( + static function (): void { + Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + + services: + - Tests\Mocks\Handler\RoutedMessageHandler + NEON + )); + }) + ->build(); + }, + LogicalException::class, + 'Invalid transport "memory"%a%' + ); +}); diff --git a/tests/Cases/DI/MessengerExtension.transport.phpt b/tests/Cases/DI/MessengerExtension.transport.phpt index eb13c24..5f64543 100644 --- a/tests/Cases/DI/MessengerExtension.transport.phpt +++ b/tests/Cases/DI/MessengerExtension.transport.phpt @@ -128,6 +128,17 @@ Toolkit::test(function (): void { Assert::count(1, $container->findByTag(MessengerExtension::FAILURE_TRANSPORT_TAG)); }); +// Doctrine transport factory not registered without ConnectionRegistry +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->build(); + + // 5 = sync + inMemory + amqp + redis + main TransportFactory (no doctrine without ConnectionRegistry) + Assert::count(5, $container->findByType(TransportFactoryInterface::class)); + Assert::false($container->hasService('messenger.transportFactory.doctrine')); +}); + // Dynamic parameters Toolkit::test(function (): void { $container = Container::of() diff --git a/tests/Cases/DI/MessengerExtension.worker.phpt b/tests/Cases/DI/MessengerExtension.worker.phpt new file mode 100644 index 0000000..d2aad57 --- /dev/null +++ b/tests/Cases/DI/MessengerExtension.worker.phpt @@ -0,0 +1,126 @@ +withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addExtension('events', new EventDispatcherExtension()); + }) + ->build(); + + /** @var EventDispatcher $dispatcher */ + $dispatcher = $container->getService('messenger.event.dispatcher'); + $listeners = getListenerClasses($dispatcher); + + Assert::true(in_array(StopWorkerOnCustomStopExceptionListener::class, $listeners, true)); + Assert::true(in_array(ResetMemoryUsageListener::class, $listeners, true)); +}); + +// Worker limit listeners not registered by default +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addExtension('events', new EventDispatcherExtension()); + }) + ->build(); + + /** @var EventDispatcher $dispatcher */ + $dispatcher = $container->getService('messenger.event.dispatcher'); + $listeners = getListenerClasses($dispatcher); + + Assert::false(in_array(StopWorkerOnMemoryLimitListener::class, $listeners, true)); + Assert::false(in_array(StopWorkerOnTimeLimitListener::class, $listeners, true)); + Assert::false(in_array(StopWorkerOnMessageLimitListener::class, $listeners, true)); + Assert::false(in_array(StopWorkerOnFailureLimitListener::class, $listeners, true)); +}); + +// Worker limit listeners registered when configured +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addExtension('events', new EventDispatcherExtension()); + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + worker: + memoryLimit: 134217728 + timeLimit: 3600 + messageLimit: 1000 + failureLimit: 5 + NEON + )); + }) + ->build(); + + /** @var EventDispatcher $dispatcher */ + $dispatcher = $container->getService('messenger.event.dispatcher'); + $listeners = getListenerClasses($dispatcher); + + Assert::true(in_array(StopWorkerOnMemoryLimitListener::class, $listeners, true)); + Assert::true(in_array(StopWorkerOnTimeLimitListener::class, $listeners, true)); + Assert::true(in_array(StopWorkerOnMessageLimitListener::class, $listeners, true)); + Assert::true(in_array(StopWorkerOnFailureLimitListener::class, $listeners, true)); +}); + +// Partial worker limits +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addExtension('events', new EventDispatcherExtension()); + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + worker: + memoryLimit: 134217728 + NEON + )); + }) + ->build(); + + /** @var EventDispatcher $dispatcher */ + $dispatcher = $container->getService('messenger.event.dispatcher'); + $listeners = getListenerClasses($dispatcher); + + Assert::true(in_array(StopWorkerOnMemoryLimitListener::class, $listeners, true)); + Assert::false(in_array(StopWorkerOnTimeLimitListener::class, $listeners, true)); + Assert::false(in_array(StopWorkerOnMessageLimitListener::class, $listeners, true)); + Assert::false(in_array(StopWorkerOnFailureLimitListener::class, $listeners, true)); +}); + +/** + * @return list + */ +function getListenerClasses(EventDispatcher $dispatcher): array +{ + $listeners = []; + + foreach ($dispatcher->getListeners() as $listenersForEvent) { + if (is_array($listenersForEvent)) { + foreach ($listenersForEvent as $listenerForEvent) { + $listeners[] = $listenerForEvent[0]::class; + } + } + } + + return $listeners; +} diff --git a/tests/Cases/E2E/SyncTransportBusRoutingTest.phpt b/tests/Cases/E2E/SyncTransportBusRoutingTest.phpt new file mode 100644 index 0000000..7f40333 --- /dev/null +++ b/tests/Cases/E2E/SyncTransportBusRoutingTest.phpt @@ -0,0 +1,53 @@ +withDefaults() + ->withCompiler(function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + transport: + sync: + dsn: sync:// + + routing: + Tests\Mocks\Message\EventMessage: [sync] + + bus: + messageBus: + eventBus: + allowNoHandlers: true + NEON + )); + }) + ->build(); + + // Dispatching through eventBus (allowNoHandlers: true) — no exception + /** @var MessageBusInterface $eventBus */ + $eventBus = $container->getService('messenger.bus.eventBus.bus'); + $eventBus->dispatch(new EventMessage('hello')); + + // Dispatching through messageBus (allowNoHandlers: false) — throws + /** @var MessageBusInterface $messageBus */ + $messageBus = $container->getService('messenger.bus.messageBus.bus'); + Assert::exception( + static fn () => $messageBus->dispatch(new EventMessage('hello')), + NoHandlerForMessageException::class, + 'No handler for message "Tests\Mocks\Message\EventMessage".', + ); +}); diff --git a/tests/Cases/E2E/Vendor/MessengerTest.php b/tests/Cases/E2E/Vendor/MessengerTest.php index 711abad..a97ee41 100644 --- a/tests/Cases/E2E/Vendor/MessengerTest.php +++ b/tests/Cases/E2E/Vendor/MessengerTest.php @@ -16,6 +16,7 @@ use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy; +use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport; use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory; use Symfony\Component\Messenger\Transport\Sender\SendersLocator; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; @@ -243,22 +244,22 @@ public function testFailureListener(): void 'level' => 'info', 'message' => 'Sending message {class} with {alias} sender using {sender}', 'context' => [ - 'class' => 'Tests\Mocks\Vendor\DummyRetryFailureMessage', + 'class' => DummyRetryFailureMessage::class, 'alias' => 'transport1', - 'sender' => 'Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport', + 'sender' => InMemoryTransport::class, ], ], [ 'level' => 'info', 'message' => 'Received message {class}', - 'context' => ['class' => 'Tests\Mocks\Vendor\DummyRetryFailureMessage'], + 'context' => ['class' => DummyRetryFailureMessage::class], ], [ 'level' => 'info', 'message' => 'Rejected message {class} will be sent to the failure transport {transport}.', 'context' => [ - 'class' => 'Tests\Mocks\Vendor\DummyRetryFailureMessage', - 'transport' => 'Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport', + 'class' => DummyRetryFailureMessage::class, + 'transport' => InMemoryTransport::class, ], ], [ @@ -349,37 +350,40 @@ public function testRealRetryListener(): void 'level' => 'warning', 'message' => 'Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', 'context' => [ - 'class' => 'Tests\Mocks\Vendor\DummyFailureMessage', + 'class' => DummyFailureMessage::class, + 'message_id' => 1, 'retryCount' => 1, 'delay' => 1, - 'error' => 'Handling "Tests\Mocks\Vendor\DummyFailureMessage" failed: Foo', + 'error' => 'Handling "' . DummyFailureMessage::class . '" failed: Foo', ], ], [ 'level' => 'warning', 'message' => 'Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', 'context' => [ - 'class' => 'Tests\Mocks\Vendor\DummyFailureMessage', + 'class' => DummyFailureMessage::class, + 'message_id' => 2, 'retryCount' => 2, 'delay' => 2, - 'error' => 'Handling "Tests\Mocks\Vendor\DummyFailureMessage" failed: Foo', + 'error' => 'Handling "' . DummyFailureMessage::class . '" failed: Foo', ], ], [ 'level' => 'critical', 'message' => 'Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', 'context' => [ - 'class' => 'Tests\Mocks\Vendor\DummyFailureMessage', + 'class' => DummyFailureMessage::class, + 'message_id' => 3, 'retryCount' => 2, - 'error' => 'Handling "Tests\Mocks\Vendor\DummyFailureMessage" failed: Foo', + 'error' => 'Handling "' . DummyFailureMessage::class . '" failed: Foo', ], ], [ 'level' => 'info', 'message' => 'Rejected message {class} will be sent to the failure transport {transport}.', 'context' => [ - 'class' => 'Tests\Mocks\Vendor\DummyFailureMessage', - 'transport' => 'Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport', + 'class' => DummyFailureMessage::class, + 'transport' => InMemoryTransport::class, ], ], [ diff --git a/tests/Mocks/Cache/DummyCacheItem.php b/tests/Mocks/Cache/DummyCacheItem.php new file mode 100644 index 0000000..50c9f2f --- /dev/null +++ b/tests/Mocks/Cache/DummyCacheItem.php @@ -0,0 +1,46 @@ +key; + } + + public function get(): mixed + { + return null; + } + + public function isHit(): bool + { + return false; + } + + public function set(mixed $value): static + { + return $this; + } + + public function expiresAt(?\DateTimeInterface $expiration): static + { + return $this; + } + + public function expiresAfter(\DateInterval|int|null $time): static + { + return $this; + } + +} diff --git a/tests/Mocks/Cache/DummyCachePool.php b/tests/Mocks/Cache/DummyCachePool.php new file mode 100644 index 0000000..c99d040 --- /dev/null +++ b/tests/Mocks/Cache/DummyCachePool.php @@ -0,0 +1,63 @@ + $keys + * @return iterable + */ + public function getItems(array $keys = []): iterable + { + return []; + } + + public function hasItem(string $key): bool + { + return false; + } + + public function clear(): bool + { + return true; + } + + public function deleteItem(string $key): bool + { + return true; + } + + /** + * @param list $keys + */ + public function deleteItems(array $keys): bool + { + return true; + } + + public function save(CacheItemInterface $item): bool + { + return true; + } + + public function saveDeferred(CacheItemInterface $item): bool + { + return true; + } + + public function commit(): bool + { + return true; + } + +} diff --git a/tests/Mocks/Handler/MultiRoutedMessageHandler.php b/tests/Mocks/Handler/MultiRoutedMessageHandler.php new file mode 100644 index 0000000..1fe56ef --- /dev/null +++ b/tests/Mocks/Handler/MultiRoutedMessageHandler.php @@ -0,0 +1,19 @@ +message = $message; + } + +} diff --git a/tests/Mocks/Handler/RoutedMessageHandler.php b/tests/Mocks/Handler/RoutedMessageHandler.php new file mode 100644 index 0000000..1d8dd3f --- /dev/null +++ b/tests/Mocks/Handler/RoutedMessageHandler.php @@ -0,0 +1,19 @@ +message = $message; + } + +} diff --git a/tests/Mocks/Message/EventMessage.php b/tests/Mocks/Message/EventMessage.php new file mode 100644 index 0000000..7859c84 --- /dev/null +++ b/tests/Mocks/Message/EventMessage.php @@ -0,0 +1,15 @@ +text = $text; + } + +} diff --git a/tests/Mocks/Message/MultiRoutedMessage.php b/tests/Mocks/Message/MultiRoutedMessage.php new file mode 100644 index 0000000..9a5da9f --- /dev/null +++ b/tests/Mocks/Message/MultiRoutedMessage.php @@ -0,0 +1,17 @@ + rtrim(ltrim($line, "\t")), + explode("\n", trim($s)), + )); + } + +}