From 9251a77d3c670ac992bb9f968252b64bc84cac43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 11:54:57 +0100 Subject: [PATCH 01/21] AI: init --- .claude/settings.local.json | 14 +++++++++ AGENTS.md | 58 +++++++++++++++++++++++++++++++++++++ CLAUDE.md | 1 + 3 files changed, 73 insertions(+) create mode 100644 .claude/settings.local.json create mode 100644 AGENTS.md create mode 100644 CLAUDE.md diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..4654dbb --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,14 @@ +{ + "permissions": { + "allow": [ + "WebFetch(domain:github.com)", + "WebFetch(domain:symfony.com)", + "WebFetch(domain:nette.org)", + "Bash(make:*)", + "Bash(make:*:*)", + "Bash(grep:*)", + "Bash(cat:*)", + "Bash(ls:*)" + ] + } +} diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..e1a611c --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,58 @@ +# AGENTS.md + +Nette DI integration for Symfony Messenger. Provides message buses (command/query/message), async transports (AMQP, Redis, Doctrine), handler auto-discovery, retry strategies, and failure transport routing — all configured via Nette DI extension and compiled through ordered passes. + +## Upstream + +- Changelog: https://github.com/symfony/messenger/blob/8.1/CHANGELOG.md +- Source: https://github.com/symfony/messenger +- Doc: https://symfony.com/doc/current/messenger.html + +## Stack + +- PHP >=8.2, Symfony 7.x/8.x, Nette DI ^3.1 +- PHPStan level 9 (phpVersion 80200) +- Nette Tester (`.phpt` files) +- Contributte code style (`ruleset.xml`) + +## Codebase + +- `src/DI/` — Main entry point. `MessengerExtension` delegates work to 10 passes in `Pass/`, each handling one concern (serializers, transports, routing, handlers, events, logging, console commands, buses, debug). Passes run in priority order (10->20->30->40) through three lifecycle hooks: load -> beforeCompile -> afterCompile. +- `src/Bus/` — MessageBus, CommandBus, QueryBus, BusRegistry +- `src/Container/` — PSR-11 wrappers for Nette DI +- `src/Handler/` — Handler locator +- `src/Logger/` — Dual HTTP/console logger +- `tests/Cases/` — Tests grouped by concern (DI/, Bus/, E2E/) +- `tests/Mocks/` — Test doubles +- `tests/Toolkit/` — Container builder, helpers + +## Key conventions + +- **Service tags**: `contributte.messenger.handler`, `contributte.messenger.transport`, `contributte.messenger.bus`, etc. (constants on `MessengerExtension`) +- **Service naming**: `messenger.bus.`, `messenger.transport.` +- **Handler discovery**: DI tag `contributte.messenger.handler` or PHP attribute `#[AsMessageHandler]` +- **Handler method**: defaults to `__invoke`, configurable via tag/attribute +- **Message type detection**: auto-detected from handler method's first parameter type-hint +- **Default middleware chain**: AddBusNameStamp -> DispatchAfterCurrentBus -> FailedMessageProcessing -> [custom] -> SendMessage -> HandleMessage + +## Development + +```bash +make install # composer update +make tests # run Nette Tester +make phpstan # static analysis +make cs # check code style +make csf # fix code style +make qa # phpstan + cs +``` + +## Tests + +- Nette Tester with `.phpt` files, multiple test cases per file via `Toolkit::test(function (): void { ... })` +- Container built with `Container::of()->withDefaults()->withCompiler(fn ($compiler) => ...)->build()` +- NEON config snippets via `Helpers::neon(<<<'NEON' ... NEON)` +- Assertions: `Assert::type()`, `Assert::count()`, `Assert::equal()`, `Assert::exception()` +- DI tests (`tests/Cases/DI/`) verify service registration, tags, and config validation +- E2E tests (`tests/Cases/E2E/`) extend `TestCase`, test full dispatch->handle workflows +- File naming: `MessengerExtension.{feature}.phpt` +- Mocks are simple DTOs/handlers in `tests/Mocks/` with public properties for assertions diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..43c994c --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +@AGENTS.md From ec86cdf7e270e422949547c48efd433cb29cd373 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 11:42:41 +0100 Subject: [PATCH 02/21] Composer: upgrade symfony components to 7.x/8.x --- composer.json | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/composer.json b/composer.json index d8687ad..df2fabb 100644 --- a/composer.json +++ b/composer.json @@ -25,22 +25,21 @@ "psr/container": "^2.0.2", "psr/cache": "^2.0.0 || ^3.0.0", "psr/log": "^2.0.0 || ^3.0.0", - "symfony/messenger": "^6.0.19 || ^6.2.8", - "symfony/event-dispatcher": "^6.0.19 || ^6.2.8", - "symfony/console": "^6.0.19 || ^6.2.10", - "symfony/var-dumper": "^6.0.19 || ^6.2.10" + "symfony/messenger": "^7.4.7 || ^8.0.7", + "symfony/event-dispatcher": "^7.4.7 || ^8.0.7", + "symfony/console": "^7.4.7 || ^8.0.7", + "symfony/var-dumper": "^7.4.7 || ^8.0.7" }, "require-dev": { - "psr/container": "^2.0.2", "mockery/mockery": "^1.3.3", - "symfony/redis-messenger": "^6.0.19 || ^6.2.10", - "symfony/amqp-messenger": "^6.0.19 || ^6.2.8", - "symfony/doctrine-messenger": "^6.0.19 || ^6.2.10", + "symfony/redis-messenger": "^7.4.7 || ^8.0.7", + "symfony/amqp-messenger": "^7.4.7 || ^8.0.7", + "symfony/doctrine-messenger": "^7.4.7 || ^8.0.7", "contributte/console": "^0.10.0", "contributte/event-dispatcher": "^0.9.0", "contributte/qa": "^0.4.0", - "contributte/tester": "^0.3.0", - "contributte/phpstan": "^0.1.0", + "contributte/tester": "^0.4.0", + "contributte/phpstan": "^0.3.0", "tracy/tracy": "^2.10.2" }, "autoload": { From 98490bc5b30e6a43a2f055ec3e7eeed251bb4b82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 11:42:51 +0100 Subject: [PATCH 03/21] Codesniffer: use PHP 8.2 --- ruleset.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ruleset.xml b/ruleset.xml index db36657..eb05e7a 100644 --- a/ruleset.xml +++ b/ruleset.xml @@ -1,7 +1,7 @@ - + From 8798c1b1a89483f7246cd6d9d8f854a31393c037 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 11:46:16 +0100 Subject: [PATCH 04/21] Tests: fix tests after upgrade to newer version --- tests/Cases/DI/MessengerExtension.event.phpt | 2 -- tests/Cases/DI/MessengerExtension.failureTransport.phpt | 2 +- tests/Cases/DI/MessengerExtension.handler.phpt | 8 +++++--- tests/Cases/E2E/Vendor/MessengerTest.php | 3 +++ 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/Cases/DI/MessengerExtension.event.phpt b/tests/Cases/DI/MessengerExtension.event.phpt index 36b7f4b..b225570 100644 --- a/tests/Cases/DI/MessengerExtension.event.phpt +++ b/tests/Cases/DI/MessengerExtension.event.phpt @@ -11,7 +11,6 @@ use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener; use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; -use Symfony\Component\Messenger\EventListener\StopWorkerOnSignalsListener; use Tester\Assert; use Tests\Toolkit\Container; @@ -74,7 +73,6 @@ Toolkit::test(static function (): void { AddErrorDetailsStampListener::class, SendFailedMessageForRetryListener::class, SendFailedMessageToFailureTransportListener::class, - StopWorkerOnSignalsListener::class, ]; foreach ($expectedRegisteredListeners as $expectedRegisteredListener) { diff --git a/tests/Cases/DI/MessengerExtension.failureTransport.phpt b/tests/Cases/DI/MessengerExtension.failureTransport.phpt index 82eba33..90f91ad 100644 --- a/tests/Cases/DI/MessengerExtension.failureTransport.phpt +++ b/tests/Cases/DI/MessengerExtension.failureTransport.phpt @@ -87,7 +87,7 @@ Toolkit::test(function (): void { NEON )); }) - ->build(), + ->build(), LogicalException::class, 'Invalid failure transport "transport3" defined for "transport2" transport. Available transports "transport1, transport2".', ); diff --git a/tests/Cases/DI/MessengerExtension.handler.phpt b/tests/Cases/DI/MessengerExtension.handler.phpt index 3dc62c5..22c6edd 100644 --- a/tests/Cases/DI/MessengerExtension.handler.phpt +++ b/tests/Cases/DI/MessengerExtension.handler.phpt @@ -281,7 +281,7 @@ Toolkit::test(function (): void { NEON )); }) - ->build(); + ->build(); }, LogicalException::class, 'Handler must have "Tests\Mocks\Handler\NoMethodHandler::__invoke()" method.' @@ -313,8 +313,10 @@ function getHandlerDescriptor(NetteContainer $container, object $message, string { /** @var HandlersLocatorInterface $handlerLocator */ $handlerLocator = $container->getByName(sprintf('messenger.bus.%s.locator', $busName)); - /** @var HandlerDescriptor $handlerDescriptor */ - $handlerDescriptor = $handlerLocator->getHandlers(new Envelope($message))[0] ?? null; + + /** @var HandlerDescriptor[] $handlerDescriptors */ + $handlerDescriptors = iterator_to_array($handlerLocator->getHandlers(new Envelope($message))); + $handlerDescriptor = $handlerDescriptors[0] ?? null; Assert::notNull($handlerDescriptor); return $handlerDescriptor; diff --git a/tests/Cases/E2E/Vendor/MessengerTest.php b/tests/Cases/E2E/Vendor/MessengerTest.php index 711abad..cadee93 100644 --- a/tests/Cases/E2E/Vendor/MessengerTest.php +++ b/tests/Cases/E2E/Vendor/MessengerTest.php @@ -350,6 +350,7 @@ public function testRealRetryListener(): void 'message' => 'Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', 'context' => [ 'class' => 'Tests\Mocks\Vendor\DummyFailureMessage', + 'message_id' => 1, 'retryCount' => 1, 'delay' => 1, 'error' => 'Handling "Tests\Mocks\Vendor\DummyFailureMessage" failed: Foo', @@ -360,6 +361,7 @@ public function testRealRetryListener(): void 'message' => 'Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', 'context' => [ 'class' => 'Tests\Mocks\Vendor\DummyFailureMessage', + 'message_id' => 2, 'retryCount' => 2, 'delay' => 2, 'error' => 'Handling "Tests\Mocks\Vendor\DummyFailureMessage" failed: Foo', @@ -370,6 +372,7 @@ public function testRealRetryListener(): void 'message' => 'Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', 'context' => [ 'class' => 'Tests\Mocks\Vendor\DummyFailureMessage', + 'message_id' => 3, 'retryCount' => 2, 'error' => 'Handling "Tests\Mocks\Vendor\DummyFailureMessage" failed: Foo', ], From 162dde7c00f0dbea901eacf27772feee77c18ba5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 11:46:33 +0100 Subject: [PATCH 05/21] Code: fix after upgrade to newer version --- phpstan.neon | 22 ++----------------- src/DI/MessengerExtension.php | 8 +++---- src/DI/Pass/AbstractPass.php | 2 +- src/DI/Pass/EventPass.php | 13 ----------- src/DI/Pass/HandlerPass.php | 2 -- src/DI/Utils/Reflector.php | 4 ++-- .../ContainerServiceHandlersLocator.php | 2 +- 7 files changed, 10 insertions(+), 43 deletions(-) diff --git a/phpstan.neon b/phpstan.neon index 485bc1c..517b83c 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -3,7 +3,7 @@ includes: parameters: level: 9 - phpVersion: 80000 + phpVersion: 80200 scanDirectories: - src @@ -21,25 +21,7 @@ parameters: count: 1 path: src/DI/Pass/HandlerPass.php - - - message: """ - #^Fetching class constant class of deprecated class Symfony\\\\Component\\\\Messenger\\\\Handler\\\\MessageHandlerInterface\\: - since Symfony 6\\.2, use the \\{@see AsMessageHandler\\} attribute instead$# - """ - count: 1 - path: src/DI/Pass/HandlerPass.php - - - - message: "#^Class ReflectionIntersectionType not found\\.$#" - count: 1 - path: src/DI/Utils/Reflector.php - - message: "#^Dead catch \\- ReflectionException is never thrown in the try block\\.$#" count: 1 - path: src/DI/Utils/Reflector.php - - - - message: "#^PHPDoc tag @var for variable \\$type contains unknown class ReflectionIntersectionType\\.$#" - count: 1 - path: src/DI/Utils/Reflector.php + path: src/DI/Utils/Reflector.php \ No newline at end of file diff --git a/src/DI/MessengerExtension.php b/src/DI/MessengerExtension.php index f144706..661f14e 100644 --- a/src/DI/MessengerExtension.php +++ b/src/DI/MessengerExtension.php @@ -13,8 +13,8 @@ use Contributte\Messenger\DI\Pass\SerializerPass; use Contributte\Messenger\DI\Pass\TransportFactoryPass; use Contributte\Messenger\DI\Pass\TransportPass; -use Nette\DI\Definitions\Statement; use Nette\DI\CompilerExtension; +use Nette\DI\Definitions\Statement; use Nette\PhpGenerator\ClassType; use Nette\Schema\Expect; use Nette\Schema\Schema; @@ -61,9 +61,9 @@ public function __construct() public function getConfigSchema(): Schema { - $expectClass = Expect::string()->required()->assert(fn ($input) => class_exists($input) || interface_exists($input)); + $expectClass = Expect::string()->required()->assert(fn ($input) => is_string($input) && (class_exists($input) || interface_exists($input))); $expectService = Expect::anyOf( - Expect::string()->required()->assert(fn ($input) => str_starts_with($input, '@') || class_exists($input) || interface_exists($input)), + Expect::string()->required()->assert(fn ($input) => is_string($input) && (str_starts_with($input, '@') || class_exists($input) || interface_exists($input))), Expect::type(Statement::class)->required(), ); $expectLoosyService = Expect::anyOf( @@ -82,7 +82,7 @@ public function getConfigSchema(): Schema 'allowNoHandlers' => Expect::bool(false), 'allowNoSenders' => Expect::bool(true), 'autowired' => Expect::bool(), - 'class' => (clone $expectClass)->required(false)->assert(fn ($input) => is_subclass_of($input, MessageBusInterface::class), 'Specified bus class must implements "MessageBusInterface"'), + 'class' => (clone $expectClass)->required(false)->assert(fn ($input) => is_string($input) && is_subclass_of($input, MessageBusInterface::class), 'Specified bus class must implements "MessageBusInterface"'), 'wrapper' => (clone $expectClass)->required(false), ]), Expect::string()->required(), diff --git a/src/DI/Pass/AbstractPass.php b/src/DI/Pass/AbstractPass.php index 610c00d..966f64f 100644 --- a/src/DI/Pass/AbstractPass.php +++ b/src/DI/Pass/AbstractPass.php @@ -51,7 +51,7 @@ public function getContainerBuilder(): ContainerBuilder return $this->extension->getContainerBuilder(); } - public function getConfig(): stdclass + public function getConfig(): stdClass { /** @var stdclass $ret */ $ret = (object) $this->extension->getConfig(); diff --git a/src/DI/Pass/EventPass.php b/src/DI/Pass/EventPass.php index c56926f..ea895af 100644 --- a/src/DI/Pass/EventPass.php +++ b/src/DI/Pass/EventPass.php @@ -12,8 +12,6 @@ use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; -use Symfony\Component\Messenger\EventListener\StopWorkerOnSignalsListener; -use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener; class EventPass extends AbstractPass { @@ -95,17 +93,6 @@ public function beforePassCompile(): void $this->prefix('@logger.logger'), ]), ]); - - // Stop on signal - if (class_exists(StopWorkerOnSignalsListener::class)) { - $dispatcher->addSetup('addSubscriber', [ - new Statement(StopWorkerOnSignalsListener::class, [null, $this->prefix('@logger.logger')]), - ]); - } elseif (class_exists(StopWorkerOnSigtermSignalListener::class)) { - $dispatcher->addSetup('addSubscriber', [ - new Statement(StopWorkerOnSigtermSignalListener::class, [$this->prefix('@logger.logger')]), // @phpstan-ignore-line - ]); - } } } diff --git a/src/DI/Pass/HandlerPass.php b/src/DI/Pass/HandlerPass.php index 379fcbb..4455587 100644 --- a/src/DI/Pass/HandlerPass.php +++ b/src/DI/Pass/HandlerPass.php @@ -9,7 +9,6 @@ use Nette\DI\Definitions\ServiceDefinition; use ReflectionClass; use ReflectionException; -use Symfony\Component\Messenger\Handler\MessageHandlerInterface; use function array_merge; use function is_numeric; use function is_string; @@ -97,7 +96,6 @@ private function getMessageHandlers(): array // Find all handlers $serviceHandlers = []; $serviceHandlers = array_merge($serviceHandlers, array_keys($builder->findByTag(MessengerExtension::HANDLER_TAG))); - $serviceHandlers = array_merge($serviceHandlers, array_keys($builder->findByType(MessageHandlerInterface::class))); foreach ($builder->getDefinitions() as $definition) { /** @var class-string $class */ diff --git a/src/DI/Utils/Reflector.php b/src/DI/Utils/Reflector.php index 543814f..b58030a 100644 --- a/src/DI/Utils/Reflector.php +++ b/src/DI/Utils/Reflector.php @@ -66,11 +66,11 @@ public static function getMessageHandlerMessage(string $class, array $options): throw new LogicalException(sprintf('Handler must have "%s::%s()" method.', $class, $options['method'])); } - if ($rcMethod->getNumberOfParameters() !== 1 && !$rc->implementsInterface( BatchHandlerInterface::class)) { + if ($rcMethod->getNumberOfParameters() !== 1 && !$rc->implementsInterface(BatchHandlerInterface::class)) { throw new LogicalException(sprintf('Only one parameter is allowed in "%s::%s()."', $class, $options['method'])); } - if ($rc->implementsInterface( BatchHandlerInterface::class)) { + if ($rc->implementsInterface(BatchHandlerInterface::class)) { if ($rcMethod->getNumberOfParameters() !== 2) { throw new LogicalException(sprintf('Exactly two parameters are required in "%s::%s()."', $class, $options['method'])); } diff --git a/src/Handler/ContainerServiceHandlersLocator.php b/src/Handler/ContainerServiceHandlersLocator.php index 199ad5d..76de92e 100644 --- a/src/Handler/ContainerServiceHandlersLocator.php +++ b/src/Handler/ContainerServiceHandlersLocator.php @@ -26,7 +26,7 @@ public function __construct(array $map, Container $context) } /** - * @return array + * @return iterable */ public function getHandlers(Envelope $envelope): iterable { From ac02a8c49d71c1db512a64279e5364dc0fcfb898 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 11:54:53 +0100 Subject: [PATCH 06/21] Composer: require PHP 8.2 --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index df2fabb..8a42794 100644 --- a/composer.json +++ b/composer.json @@ -20,7 +20,7 @@ ], "require": { "ext-json": "*", - "php": ">=8.0", + "php": ">=8.2", "nette/di": "^3.1.2", "psr/container": "^2.0.2", "psr/cache": "^2.0.0 || ^3.0.0", From 1b8abb61795545387194d94231964724ccce52c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 15:18:12 +0100 Subject: [PATCH 07/21] Refactor: extract handler discovery to BuilderMan Move getMessageHandlers() from HandlerPass to BuilderMan::getHandlerServiceNames(). Add getHandlerMapping() to BuilderMan. Both reused by HandlerPass, RoutingPass, and ConsolePass. --- src/DI/Pass/HandlerPass.php | 40 ++------------------- src/DI/Utils/BuilderMan.php | 72 +++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 38 deletions(-) diff --git a/src/DI/Pass/HandlerPass.php b/src/DI/Pass/HandlerPass.php index 4455587..564d1d1 100644 --- a/src/DI/Pass/HandlerPass.php +++ b/src/DI/Pass/HandlerPass.php @@ -3,6 +3,7 @@ namespace Contributte\Messenger\DI\Pass; use Contributte\Messenger\DI\MessengerExtension; +use Contributte\Messenger\DI\Utils\BuilderMan; use Contributte\Messenger\DI\Utils\Reflector; use Contributte\Messenger\Exception\LogicalException; use Nette\DI\Definitions\Definition; @@ -40,7 +41,7 @@ public function beforePassCompile(): void $handlers = []; // Collect all message handlers from DIC - $serviceHandlers = $this->getMessageHandlers(); + $serviceHandlers = BuilderMan::of($this)->getHandlerServiceNames(); // Iterate all found handlers foreach ($serviceHandlers as $serviceName) { @@ -86,43 +87,6 @@ public function beforePassCompile(): void } } - /** - * @return array - */ - private function getMessageHandlers(): array - { - $builder = $this->getContainerBuilder(); - - // Find all handlers - $serviceHandlers = []; - $serviceHandlers = array_merge($serviceHandlers, array_keys($builder->findByTag(MessengerExtension::HANDLER_TAG))); - - foreach ($builder->getDefinitions() as $definition) { - /** @var class-string $class */ - $class = $definition->getType(); - - // Skip definitions without type - if ($definition->getType() === null) { - continue; - } - - // Skip definitions without name - if ($definition->getName() === null) { - continue; - } - - // Skip services without attribute - if (Reflector::getMessageHandlers($class) === []) { - continue; - } - - $serviceHandlers[] = $definition->getName(); - } - - // Clean duplicates - return array_unique($serviceHandlers); - } - /** * @return list + */ + public function getHandlerServiceNames(): array + { + $builder = $this->pass->getContainerBuilder(); + + $serviceHandlers = array_keys($builder->findByTag(MessengerExtension::HANDLER_TAG)); + + foreach ($builder->getDefinitions() as $definition) { + /** @var class-string|null $class */ + $class = $definition->getType(); + + if ($class === null || !class_exists($class)) { + continue; + } + + $name = $definition->getName(); + + if ($name === null) { + continue; + } + + if (Reflector::getMessageHandlers($class) !== []) { + $serviceHandlers[] = $name; + } + } + + return array_values(array_unique($serviceHandlers)); + } + + /** + * @return array>> + */ + public function getHandlerMapping(): array + { + $builder = $this->pass->getContainerBuilder(); + $config = $this->pass->getConfig(); + $mapping = []; + + foreach ($config->bus as $busName => $busConfig) { + /** @var ServiceDefinition $locator */ + $locator = $builder->getDefinition($this->pass->prefix(sprintf('bus.%s.locator', $busName))); + + /** @var array> $handlers */ + $handlers = $locator->getFactory()->arguments[0] ?? []; + + $busMapping = []; + + foreach ($handlers as $messageClass => $handlerDescriptors) { + $descriptions = []; + + foreach ($handlerDescriptors as $descriptor) { + $description = $descriptor['service']; + + if ($descriptor['method'] !== '__invoke') { + $description .= '::' . $descriptor['method']; + } + + $descriptions[] = $description; + } + + $busMapping[$messageClass] = $descriptions; + } + + $mapping[$busName] = $busMapping; + } + + return $mapping; + } + public function getSerializer(string|Statement|null $serializer): Statement|string { if ($serializer === null) { From c2b2f4dee028a0a315783f43ff771d6d518a0cca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 15:38:02 +0100 Subject: [PATCH 08/21] Feature: worker limits, cache support, and always-on event listeners Add worker config schema (memoryLimit, timeLimit, messageLimit, failureLimit), cache config for StopWorkersCommand/restart signal, and fallbackBus schema. Register StopWorkerOnCustomStopExceptionListener and ResetMemoryUsageListener as always-on. Conditionally register worker limit and restart signal listeners. --- src/DI/MessengerExtension.php | 11 ++ src/DI/Pass/ConsolePass.php | 5 +- src/DI/Pass/EventPass.php | 66 +++++++++ tests/Cases/DI/MessengerExtension.cache.phpt | 99 ++++++++++++++ tests/Cases/DI/MessengerExtension.event.phpt | 4 + tests/Cases/DI/MessengerExtension.worker.phpt | 126 ++++++++++++++++++ tests/Mocks/Cache/DummyCacheItem.php | 46 +++++++ tests/Mocks/Cache/DummyCachePool.php | 63 +++++++++ 8 files changed, 417 insertions(+), 3 deletions(-) create mode 100644 tests/Cases/DI/MessengerExtension.cache.phpt create mode 100644 tests/Cases/DI/MessengerExtension.worker.phpt create mode 100644 tests/Mocks/Cache/DummyCacheItem.php create mode 100644 tests/Mocks/Cache/DummyCachePool.php diff --git a/src/DI/MessengerExtension.php b/src/DI/MessengerExtension.php index 661f14e..6d34c03 100644 --- a/src/DI/MessengerExtension.php +++ b/src/DI/MessengerExtension.php @@ -75,6 +75,17 @@ public function getConfigSchema(): Schema 'debug' => Expect::structure([ 'panel' => Expect::bool(false), ]), + 'worker' => Expect::structure([ + 'memoryLimit' => Expect::int()->nullable(), + 'timeLimit' => Expect::int()->nullable(), + 'messageLimit' => Expect::int()->nullable(), + 'failureLimit' => Expect::int()->nullable(), + ]), + 'cache' => Expect::anyOf( + Expect::string()->required(), + Expect::type(Statement::class)->required(), + )->nullable(), + 'fallbackBus' => Expect::string()->nullable(), 'bus' => Expect::arrayOf( Expect::structure([ 'defaultMiddlewares' => Expect::bool(true), diff --git a/src/DI/Pass/ConsolePass.php b/src/DI/Pass/ConsolePass.php index a3e6fcd..024854a 100644 --- a/src/DI/Pass/ConsolePass.php +++ b/src/DI/Pass/ConsolePass.php @@ -67,10 +67,9 @@ public function loadPassConfiguration(): void $this->prefix('@serializer.default'), ]); - if (PHP_VERSION === 'fake') { - // TODO + if ($config->cache !== null) { $builder->addDefinition($this->extension->prefix('console.stopWorkersCommand')) - ->setFactory(StopWorkersCommand::class); + ->setFactory(StopWorkersCommand::class, [$config->cache]); } } diff --git a/src/DI/Pass/EventPass.php b/src/DI/Pass/EventPass.php index ea895af..76419c1 100644 --- a/src/DI/Pass/EventPass.php +++ b/src/DI/Pass/EventPass.php @@ -10,8 +10,15 @@ use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener; use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener; +use Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnCustomStopExceptionListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener; class EventPass extends AbstractPass { @@ -93,6 +100,65 @@ public function beforePassCompile(): void $this->prefix('@logger.logger'), ]), ]); + + // Custom stop exception + $dispatcher->addSetup('addSubscriber', [ + new Statement(StopWorkerOnCustomStopExceptionListener::class), + ]); + + // Reset memory usage + $dispatcher->addSetup('addSubscriber', [ + new Statement(ResetMemoryUsageListener::class), + ]); + + // Worker limits + $config = $this->getConfig(); + + if ($config->worker->memoryLimit !== null) { + $dispatcher->addSetup('addSubscriber', [ + new Statement(StopWorkerOnMemoryLimitListener::class, [ + $config->worker->memoryLimit, + $this->prefix('@logger.logger'), + ]), + ]); + } + + if ($config->worker->timeLimit !== null) { + $dispatcher->addSetup('addSubscriber', [ + new Statement(StopWorkerOnTimeLimitListener::class, [ + $config->worker->timeLimit, + $this->prefix('@logger.logger'), + ]), + ]); + } + + if ($config->worker->messageLimit !== null) { + $dispatcher->addSetup('addSubscriber', [ + new Statement(StopWorkerOnMessageLimitListener::class, [ + $config->worker->messageLimit, + $this->prefix('@logger.logger'), + ]), + ]); + } + + if ($config->worker->failureLimit !== null) { + $dispatcher->addSetup('addSubscriber', [ + new Statement(StopWorkerOnFailureLimitListener::class, [ + $config->worker->failureLimit, + $this->prefix('@logger.logger'), + ]), + ]); + } + + // Restart signal (requires cache) + if ($config->cache !== null) { + $dispatcher->addSetup('addSubscriber', [ + new Statement(StopWorkerOnRestartSignalListener::class, [ + $config->cache, + $this->prefix('@logger.logger'), + ]), + ]); + } } } diff --git a/tests/Cases/DI/MessengerExtension.cache.phpt b/tests/Cases/DI/MessengerExtension.cache.phpt new file mode 100644 index 0000000..e926670 --- /dev/null +++ b/tests/Cases/DI/MessengerExtension.cache.phpt @@ -0,0 +1,99 @@ +withDefaults() + ->build(); + + Assert::count(0, $container->findByType(StopWorkersCommand::class)); +}); + +// StopWorkersCommand registered with cache +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + cache: @cachePool + + services: + cachePool: Tests\Mocks\Cache\DummyCachePool + NEON + )); + }) + ->build(); + + Assert::count(1, $container->findByType(StopWorkersCommand::class)); +}); + +// StopWorkerOnRestartSignalListener registered with cache +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addExtension('events', new EventDispatcherExtension()); + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + cache: @cachePool + + services: + cachePool: Tests\Mocks\Cache\DummyCachePool + NEON + )); + }) + ->build(); + + /** @var EventDispatcher $dispatcher */ + $dispatcher = $container->getService('messenger.event.dispatcher'); + $listeners = []; + + foreach ($dispatcher->getListeners() as $listenersForEvent) { + if (is_array($listenersForEvent)) { + foreach ($listenersForEvent as $listenerForEvent) { + $listeners[] = $listenerForEvent[0]::class; + } + } + } + + Assert::true(in_array(StopWorkerOnRestartSignalListener::class, $listeners, true)); +}); + +// StopWorkerOnRestartSignalListener not registered without cache +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addExtension('events', new EventDispatcherExtension()); + }) + ->build(); + + /** @var EventDispatcher $dispatcher */ + $dispatcher = $container->getService('messenger.event.dispatcher'); + $listeners = []; + + foreach ($dispatcher->getListeners() as $listenersForEvent) { + if (is_array($listenersForEvent)) { + foreach ($listenersForEvent as $listenerForEvent) { + $listeners[] = $listenerForEvent[0]::class; + } + } + } + + Assert::false(in_array(StopWorkerOnRestartSignalListener::class, $listeners, true)); +}); diff --git a/tests/Cases/DI/MessengerExtension.event.phpt b/tests/Cases/DI/MessengerExtension.event.phpt index b225570..e64931c 100644 --- a/tests/Cases/DI/MessengerExtension.event.phpt +++ b/tests/Cases/DI/MessengerExtension.event.phpt @@ -9,8 +9,10 @@ use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener; use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener; +use Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; +use Symfony\Component\Messenger\EventListener\StopWorkerOnCustomStopExceptionListener; use Tester\Assert; use Tests\Toolkit\Container; @@ -73,6 +75,8 @@ Toolkit::test(static function (): void { AddErrorDetailsStampListener::class, SendFailedMessageForRetryListener::class, SendFailedMessageToFailureTransportListener::class, + StopWorkerOnCustomStopExceptionListener::class, + ResetMemoryUsageListener::class, ]; foreach ($expectedRegisteredListeners as $expectedRegisteredListener) { diff --git a/tests/Cases/DI/MessengerExtension.worker.phpt b/tests/Cases/DI/MessengerExtension.worker.phpt new file mode 100644 index 0000000..d2aad57 --- /dev/null +++ b/tests/Cases/DI/MessengerExtension.worker.phpt @@ -0,0 +1,126 @@ +withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addExtension('events', new EventDispatcherExtension()); + }) + ->build(); + + /** @var EventDispatcher $dispatcher */ + $dispatcher = $container->getService('messenger.event.dispatcher'); + $listeners = getListenerClasses($dispatcher); + + Assert::true(in_array(StopWorkerOnCustomStopExceptionListener::class, $listeners, true)); + Assert::true(in_array(ResetMemoryUsageListener::class, $listeners, true)); +}); + +// Worker limit listeners not registered by default +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addExtension('events', new EventDispatcherExtension()); + }) + ->build(); + + /** @var EventDispatcher $dispatcher */ + $dispatcher = $container->getService('messenger.event.dispatcher'); + $listeners = getListenerClasses($dispatcher); + + Assert::false(in_array(StopWorkerOnMemoryLimitListener::class, $listeners, true)); + Assert::false(in_array(StopWorkerOnTimeLimitListener::class, $listeners, true)); + Assert::false(in_array(StopWorkerOnMessageLimitListener::class, $listeners, true)); + Assert::false(in_array(StopWorkerOnFailureLimitListener::class, $listeners, true)); +}); + +// Worker limit listeners registered when configured +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addExtension('events', new EventDispatcherExtension()); + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + worker: + memoryLimit: 134217728 + timeLimit: 3600 + messageLimit: 1000 + failureLimit: 5 + NEON + )); + }) + ->build(); + + /** @var EventDispatcher $dispatcher */ + $dispatcher = $container->getService('messenger.event.dispatcher'); + $listeners = getListenerClasses($dispatcher); + + Assert::true(in_array(StopWorkerOnMemoryLimitListener::class, $listeners, true)); + Assert::true(in_array(StopWorkerOnTimeLimitListener::class, $listeners, true)); + Assert::true(in_array(StopWorkerOnMessageLimitListener::class, $listeners, true)); + Assert::true(in_array(StopWorkerOnFailureLimitListener::class, $listeners, true)); +}); + +// Partial worker limits +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addExtension('events', new EventDispatcherExtension()); + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + worker: + memoryLimit: 134217728 + NEON + )); + }) + ->build(); + + /** @var EventDispatcher $dispatcher */ + $dispatcher = $container->getService('messenger.event.dispatcher'); + $listeners = getListenerClasses($dispatcher); + + Assert::true(in_array(StopWorkerOnMemoryLimitListener::class, $listeners, true)); + Assert::false(in_array(StopWorkerOnTimeLimitListener::class, $listeners, true)); + Assert::false(in_array(StopWorkerOnMessageLimitListener::class, $listeners, true)); + Assert::false(in_array(StopWorkerOnFailureLimitListener::class, $listeners, true)); +}); + +/** + * @return list + */ +function getListenerClasses(EventDispatcher $dispatcher): array +{ + $listeners = []; + + foreach ($dispatcher->getListeners() as $listenersForEvent) { + if (is_array($listenersForEvent)) { + foreach ($listenersForEvent as $listenerForEvent) { + $listeners[] = $listenerForEvent[0]::class; + } + } + } + + return $listeners; +} diff --git a/tests/Mocks/Cache/DummyCacheItem.php b/tests/Mocks/Cache/DummyCacheItem.php new file mode 100644 index 0000000..50c9f2f --- /dev/null +++ b/tests/Mocks/Cache/DummyCacheItem.php @@ -0,0 +1,46 @@ +key; + } + + public function get(): mixed + { + return null; + } + + public function isHit(): bool + { + return false; + } + + public function set(mixed $value): static + { + return $this; + } + + public function expiresAt(?\DateTimeInterface $expiration): static + { + return $this; + } + + public function expiresAfter(\DateInterval|int|null $time): static + { + return $this; + } + +} diff --git a/tests/Mocks/Cache/DummyCachePool.php b/tests/Mocks/Cache/DummyCachePool.php new file mode 100644 index 0000000..c99d040 --- /dev/null +++ b/tests/Mocks/Cache/DummyCachePool.php @@ -0,0 +1,63 @@ + $keys + * @return iterable + */ + public function getItems(array $keys = []): iterable + { + return []; + } + + public function hasItem(string $key): bool + { + return false; + } + + public function clear(): bool + { + return true; + } + + public function deleteItem(string $key): bool + { + return true; + } + + /** + * @param list $keys + */ + public function deleteItems(array $keys): bool + { + return true; + } + + public function save(CacheItemInterface $item): bool + { + return true; + } + + public function saveDeferred(CacheItemInterface $item): bool + { + return true; + } + + public function commit(): bool + { + return true; + } + +} From b6839e7ba5913df14b45ec140a3cdc3e9b5cf3d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 15:38:23 +0100 Subject: [PATCH 09/21] Feature: Doctrine transport factory auto-registration Register DoctrineTransportFactory automatically when ConnectionRegistry is available in the container. Move TransportFactory finalization to beforePassCompile to support late-registered factories. --- composer.json | 3 ++ src/DI/Pass/TransportFactoryPass.php | 38 +++++++++++++++++-- .../DI/MessengerExtension.transport.phpt | 11 ++++++ 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/composer.json b/composer.json index 8a42794..3c29e54 100644 --- a/composer.json +++ b/composer.json @@ -35,6 +35,9 @@ "symfony/redis-messenger": "^7.4.7 || ^8.0.7", "symfony/amqp-messenger": "^7.4.7 || ^8.0.7", "symfony/doctrine-messenger": "^7.4.7 || ^8.0.7", + "symfony/cache": "^7.4.7 || ^8.0.7", + "doctrine/dbal": "^4.4.2", + "doctrine/orm": "^3.6.2", "contributte/console": "^0.10.0", "contributte/event-dispatcher": "^0.9.0", "contributte/qa": "^0.4.0", diff --git a/src/DI/Pass/TransportFactoryPass.php b/src/DI/Pass/TransportFactoryPass.php index 0dcb35e..e613742 100644 --- a/src/DI/Pass/TransportFactoryPass.php +++ b/src/DI/Pass/TransportFactoryPass.php @@ -4,7 +4,10 @@ use Contributte\Messenger\DI\MessengerExtension; use Contributte\Messenger\DI\Utils\BuilderMan; +use Doctrine\Persistence\ConnectionRegistry; +use Nette\DI\Definitions\ServiceDefinition; use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory; +use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory; use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory; use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory; use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory; @@ -18,6 +21,7 @@ class TransportFactoryPass extends AbstractPass 'inMemory' => InMemoryTransportFactory::class, 'amqp' => AmqpTransportFactory::class, 'redis' => RedisTransportFactory::class, + 'doctrine' => DoctrineTransportFactory::class, ]; /** @@ -28,8 +32,12 @@ public function loadPassConfiguration(): void $builder = $this->getContainerBuilder(); $config = $this->getConfig(); - // Filter out class factory that cannot be found - $defaultFactories = array_filter(self::DEFAULT_TRANSPORT_FACTORY, static fn ($class, $name) => class_exists($class), ARRAY_FILTER_USE_BOTH); + // Filter out class factory that cannot be found, exclude doctrine (requires ConnectionRegistry) + $defaultFactories = array_filter( + self::DEFAULT_TRANSPORT_FACTORY, + static fn ($class, $name) => class_exists($class) && $name !== 'doctrine', + ARRAY_FILTER_USE_BOTH, + ); // Merge default + user defined factories $transportFactories = array_merge($defaultFactories, (array) $config->transportFactory); @@ -41,8 +49,32 @@ public function loadPassConfiguration(): void ->addTag(MessengerExtension::TRANSPORT_FACTORY_TAG, $name); } + // Placeholder: TransportFactory will be finalized in beforePassCompile $builder->addDefinition($this->prefix('transportFactory')) - ->setFactory(TransportFactory::class, [BuilderMan::of($this)->getTransportFactories()]); + ->setFactory(TransportFactory::class, [[]]); + } + + /** + * Decorate services + */ + public function beforePassCompile(): void + { + $builder = $this->getContainerBuilder(); + + // Register Doctrine transport factory when ConnectionRegistry is available + if (class_exists(DoctrineTransportFactory::class) && interface_exists(ConnectionRegistry::class)) { + if ($builder->getByType(ConnectionRegistry::class, false) !== null) { + $builder->addDefinition($this->prefix('transportFactory.doctrine')) + ->setFactory(DoctrineTransportFactory::class) + ->setAutowired(false) + ->addTag(MessengerExtension::TRANSPORT_FACTORY_TAG, 'doctrine'); + } + } + + // Finalize TransportFactory with all registered factories + /** @var ServiceDefinition $transportFactoryDef */ + $transportFactoryDef = $builder->getDefinition($this->prefix('transportFactory')); + $transportFactoryDef->setArgument(0, BuilderMan::of($this)->getTransportFactories()); } } diff --git a/tests/Cases/DI/MessengerExtension.transport.phpt b/tests/Cases/DI/MessengerExtension.transport.phpt index eb13c24..5f64543 100644 --- a/tests/Cases/DI/MessengerExtension.transport.phpt +++ b/tests/Cases/DI/MessengerExtension.transport.phpt @@ -128,6 +128,17 @@ Toolkit::test(function (): void { Assert::count(1, $container->findByTag(MessengerExtension::FAILURE_TRANSPORT_TAG)); }); +// Doctrine transport factory not registered without ConnectionRegistry +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->build(); + + // 5 = sync + inMemory + amqp + redis + main TransportFactory (no doctrine without ConnectionRegistry) + Assert::count(5, $container->findByType(TransportFactoryInterface::class)); + Assert::false($container->hasService('messenger.transportFactory.doctrine')); +}); + // Dynamic parameters Toolkit::test(function (): void { $container = Container::of() From d6bf7c2c56662b1673ddf35193951c6dd414282f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 15:41:27 +0100 Subject: [PATCH 10/21] Feature: fix DebugCommand mapping and StatsCommand transport names Wire handler mapping into DebugCommand and transport names into StatsCommand via ConsolePass::beforePassCompile. --- src/DI/Pass/ConsolePass.php | 17 +- .../Cases/DI/MessengerExtension.console.phpt | 146 ++++++++++++++++++ 2 files changed, 162 insertions(+), 1 deletion(-) create mode 100644 tests/Cases/DI/MessengerExtension.console.phpt diff --git a/src/DI/Pass/ConsolePass.php b/src/DI/Pass/ConsolePass.php index 024854a..add3b86 100644 --- a/src/DI/Pass/ConsolePass.php +++ b/src/DI/Pass/ConsolePass.php @@ -79,10 +79,25 @@ public function loadPassConfiguration(): void public function beforePassCompile(): void { $builder = $this->getContainerBuilder(); + $builderMan = BuilderMan::of($this); + + // Transport names + $transportNames = array_keys($builderMan->getTransports()); /** @var ServiceDefinition $setupTransportsCommandDef */ $setupTransportsCommandDef = $builder->getDefinition($this->prefix('console.setupTransportsCommand')); - $setupTransportsCommandDef->setArgument(1, array_keys(BuilderMan::of($this)->getTransports())); + $setupTransportsCommandDef->setArgument(1, $transportNames); + + if (class_exists(StatsCommand::class)) { + /** @var ServiceDefinition $statsCommandDef */ + $statsCommandDef = $builder->getDefinition($this->prefix('console.statsCommand')); + $statsCommandDef->setArgument(1, $transportNames); + } + + // Handler mapping + /** @var ServiceDefinition $debugCommandDef */ + $debugCommandDef = $builder->getDefinition($this->prefix('console.debugCommand')); + $debugCommandDef->setArgument(0, $builderMan->getHandlerMapping()); } } diff --git a/tests/Cases/DI/MessengerExtension.console.phpt b/tests/Cases/DI/MessengerExtension.console.phpt new file mode 100644 index 0000000..7fca57f --- /dev/null +++ b/tests/Cases/DI/MessengerExtension.console.phpt @@ -0,0 +1,146 @@ +withDefaults() + ->build(); + + Assert::count(1, $container->findByType(ConsumeMessagesCommand::class)); + Assert::count(1, $container->findByType(DebugCommand::class)); + Assert::count(1, $container->findByType(SetupTransportsCommand::class)); + Assert::count(1, $container->findByType(FailedMessagesRemoveCommand::class)); + Assert::count(1, $container->findByType(FailedMessagesRetryCommand::class)); + Assert::count(1, $container->findByType(FailedMessagesShowCommand::class)); + + if (class_exists(StatsCommand::class)) { + Assert::count(1, $container->findByType(StatsCommand::class)); + } +}); + +// DebugCommand receives handler mapping +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + transport: + memory: + dsn: in-memory:// + + routing: + Tests\Mocks\Message\SimpleMessage: [memory] + + services: + - Tests\Mocks\Handler\SimpleHandler + NEON + )); + }) + ->build(); + + /** @var DebugCommand $debugCommand */ + $debugCommand = $container->getByType(DebugCommand::class); + + $rc = new \ReflectionClass($debugCommand); + $prop = $rc->getProperty('mapping'); + /** @var array>> $mapping */ + $mapping = $prop->getValue($debugCommand); + + Assert::true(isset($mapping['messageBus'])); + Assert::true(isset($mapping['messageBus']['Tests\Mocks\Message\SimpleMessage'])); + Assert::count(1, $mapping['messageBus']['Tests\Mocks\Message\SimpleMessage']); +}); + +// DebugCommand with multiple buses +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + bus: + messageBus: + commandBus: + allowNoHandlers: true + + services: + - Tests\Mocks\Handler\SimpleHandler + NEON + )); + }) + ->build(); + + /** @var DebugCommand $debugCommand */ + $debugCommand = $container->getByType(DebugCommand::class); + + $rc = new \ReflectionClass($debugCommand); + $prop = $rc->getProperty('mapping'); + /** @var array>> $mapping */ + $mapping = $prop->getValue($debugCommand); + + Assert::true(isset($mapping['messageBus'])); + Assert::true(isset($mapping['commandBus'])); + // SimpleHandler has #[AsMessageHandler] without bus restriction, so it registers on all buses + Assert::true(isset($mapping['messageBus']['Tests\Mocks\Message\SimpleMessage'])); + Assert::true(isset($mapping['commandBus']['Tests\Mocks\Message\SimpleMessage'])); +}); + +// SetupTransportsCommand and StatsCommand receive transport names +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + transport: + async: + dsn: in-memory:// + failed: + dsn: in-memory:// + NEON + )); + }) + ->build(); + + /** @var SetupTransportsCommand $setupCommand */ + $setupCommand = $container->getByType(SetupTransportsCommand::class); + + $rc = new \ReflectionClass($setupCommand); + $prop = $rc->getProperty('transportNames'); + /** @var list $transportNames */ + $transportNames = $prop->getValue($setupCommand); + + Assert::contains('async', $transportNames); + Assert::contains('failed', $transportNames); + + if (class_exists(StatsCommand::class)) { + /** @var StatsCommand $statsCommand */ + $statsCommand = $container->getByType(StatsCommand::class); + + $rc = new \ReflectionClass($statsCommand); + $prop = $rc->getProperty('transportNames'); + /** @var list $statsTransportNames */ + $statsTransportNames = $prop->getValue($statsCommand); + + Assert::contains('async', $statsTransportNames); + Assert::contains('failed', $statsTransportNames); + } +}); From 4b3ad9602f83d58f4307a75e35f38c3a5ccc0ac9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 15:41:53 +0100 Subject: [PATCH 11/21] Feature: RoutableMessageBus fallback bus Support fallbackBus config option for RoutableMessageBus, used when no bus stamp is present on the envelope. --- src/DI/Pass/BusPass.php | 6 +++- tests/Cases/DI/MessengerExtension.bus.phpt | 41 ++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/src/DI/Pass/BusPass.php b/src/DI/Pass/BusPass.php index f848096..df3746f 100644 --- a/src/DI/Pass/BusPass.php +++ b/src/DI/Pass/BusPass.php @@ -96,8 +96,12 @@ public function loadPassConfiguration(): void ->setAutowired(false); // Register routable bus (for CLI) + $fallbackBusName = $config->fallbackBus ?? null; $builder->addDefinition($this->prefix('bus.routable')) - ->setFactory(RoutableMessageBus::class, [$this->prefix('@bus.container')]) // @TODO fallbackBus + ->setFactory(RoutableMessageBus::class, [ + $this->prefix('@bus.container'), + $fallbackBusName !== null ? $this->prefix(sprintf('@bus.%s.bus', $fallbackBusName)) : null, + ]) ->setAutowired(false); // Register bus registry diff --git a/tests/Cases/DI/MessengerExtension.bus.phpt b/tests/Cases/DI/MessengerExtension.bus.phpt index 2182206..73ecfff 100644 --- a/tests/Cases/DI/MessengerExtension.bus.phpt +++ b/tests/Cases/DI/MessengerExtension.bus.phpt @@ -8,6 +8,7 @@ use Nette\DI\InvalidConfigurationException; use Psr\Container\ContainerInterface; use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\Middleware\MiddlewareInterface; +use Symfony\Component\Messenger\RoutableMessageBus; use Tester\Assert; use Tests\Mocks\Bus\BusWrapper; use Tests\Toolkit\Container; @@ -166,3 +167,43 @@ Toolkit::test(function (): void { Assert::type(BusWrapper::class, $container->getByType(BusWrapper::class)); }); + +// RoutableMessageBus without fallback bus (default) +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->build(); + + /** @var RoutableMessageBus $routableBus */ + $routableBus = $container->getService('messenger.bus.routable'); + Assert::type(RoutableMessageBus::class, $routableBus); + + $rc = new \ReflectionClass($routableBus); + $prop = $rc->getProperty('fallbackBus'); + Assert::null($prop->getValue($routableBus)); +}); + +// RoutableMessageBus with explicit fallback bus +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + fallbackBus: messageBus + bus: + messageBus: + commandBus: + allowNoHandlers: true + NEON + )); + }) + ->build(); + + /** @var RoutableMessageBus $routableBus */ + $routableBus = $container->getService('messenger.bus.routable'); + + $rc = new \ReflectionClass($routableBus); + $prop = $rc->getProperty('fallbackBus'); + Assert::type(MessageBus::class, $prop->getValue($routableBus)); +}); From c5e79e549cbb2c940343ecee7d991b82431a2d58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 15:42:12 +0100 Subject: [PATCH 12/21] Feature: #[AsMessage] attribute routing Auto-discover routing from #[AsMessage] attributes on message classes by scanning handler method parameters. NEON config takes precedence over attribute-based routing. --- src/DI/Pass/RoutingPass.php | 62 +++++++++++- src/DI/Utils/Reflector.php | 49 ++++++++++ .../Cases/DI/MessengerExtension.routing.phpt | 98 +++++++++++++++++++ .../Handler/MultiRoutedMessageHandler.php | 19 ++++ tests/Mocks/Handler/RoutedMessageHandler.php | 19 ++++ tests/Mocks/Message/MultiRoutedMessage.php | 17 ++++ tests/Mocks/Message/RoutedMessage.php | 17 ++++ 7 files changed, 277 insertions(+), 4 deletions(-) create mode 100644 tests/Mocks/Handler/MultiRoutedMessageHandler.php create mode 100644 tests/Mocks/Handler/RoutedMessageHandler.php create mode 100644 tests/Mocks/Message/MultiRoutedMessage.php create mode 100644 tests/Mocks/Message/RoutedMessage.php diff --git a/src/DI/Pass/RoutingPass.php b/src/DI/Pass/RoutingPass.php index 04cd20d..03a8376 100644 --- a/src/DI/Pass/RoutingPass.php +++ b/src/DI/Pass/RoutingPass.php @@ -3,7 +3,10 @@ namespace Contributte\Messenger\DI\Pass; use Contributte\Messenger\DI\MessengerExtension; +use Contributte\Messenger\DI\Utils\BuilderMan; +use Contributte\Messenger\DI\Utils\Reflector; use Contributte\Messenger\Exception\LogicalException; +use Nette\DI\Definitions\ServiceDefinition; use Symfony\Component\Messenger\Transport\Sender\SendersLocator; class RoutingPass extends AbstractPass @@ -15,10 +18,9 @@ class RoutingPass extends AbstractPass public function loadPassConfiguration(): void { $builder = $this->getContainerBuilder(); - $config = $this->getConfig(); $builder->addDefinition($this->prefix('routing.locator')) - ->setFactory(SendersLocator::class, [$config->routing, $this->prefix('@transport.container')]); + ->setFactory(SendersLocator::class, [[], $this->prefix('@transport.container')]); } /** @@ -26,14 +28,66 @@ public function loadPassConfiguration(): void */ public function beforePassCompile(): void { + $builder = $this->getContainerBuilder(); $config = $this->getConfig(); - $transports = array_values($this->getContainerBuilder()->findByTag(MessengerExtension::TRANSPORT_TAG)); + $transports = array_values($builder->findByTag(MessengerExtension::TRANSPORT_TAG)); + + // Scan message classes for #[AsMessage] attribute routing + $attributeRouting = $this->discoverAttributeRouting(); - foreach ($config->routing as $routingEntity => $routingTransports) { + // Merge: NEON config takes precedence over attributes + $routing = array_merge($attributeRouting, (array) $config->routing); + + // Validate + foreach ($routing as $routingEntity => $routingTransports) { if (($diff = array_diff($routingTransports, $transports)) !== []) { throw new LogicalException(sprintf('Invalid transport "%s" defined for "%s". Available transports "%s".', implode(',', $diff), $routingEntity, implode(',', $transports))); } } + + // Update SendersLocator with merged routing + /** @var ServiceDefinition $locatorDef */ + $locatorDef = $builder->getDefinition($this->prefix('routing.locator')); + $locatorDef->setArgument(0, $routing); + } + + /** + * Discover routing from #[AsMessage] attributes on message classes + * by scanning handler method parameters. + * + * @return array> + */ + private function discoverAttributeRouting(): array + { + $builder = $this->getContainerBuilder(); + $routing = []; + + foreach (BuilderMan::of($this)->getHandlerServiceNames() as $serviceName) { + $definition = $builder->getDefinition($serviceName); + /** @var class-string|null $handlerClass */ + $handlerClass = $definition->getType(); + + if ($handlerClass === null || !class_exists($handlerClass)) { + continue; + } + + foreach (Reflector::getHandlerMessageClasses($handlerClass) as $messageClass) { + if (isset($routing[$messageClass]) || !class_exists($messageClass)) { + continue; + } + + foreach (Reflector::getMessageRouting($messageClass) as $attribute) { + if ($attribute->transport === null) { + continue; + } + + $transports = is_array($attribute->transport) ? $attribute->transport : [$attribute->transport]; + $routing[$messageClass] = array_merge($routing[$messageClass] ?? [], $transports); + } + } + } + + return $routing; } } diff --git a/src/DI/Utils/Reflector.php b/src/DI/Utils/Reflector.php index b58030a..dde6cab 100644 --- a/src/DI/Utils/Reflector.php +++ b/src/DI/Utils/Reflector.php @@ -9,6 +9,7 @@ use ReflectionIntersectionType; use ReflectionNamedType; use ReflectionUnionType; +use Symfony\Component\Messenger\Attribute\AsMessage; use Symfony\Component\Messenger\Attribute\AsMessageHandler; use Symfony\Component\Messenger\Handler\Acknowledger; use Symfony\Component\Messenger\Handler\BatchHandlerInterface; @@ -105,4 +106,52 @@ public static function getMessageHandlerMessage(string $class, array $options): return $type->getName(); } + /** + * @param class-string $handlerClass + * @return list + */ + public static function getHandlerMessageClasses(string $handlerClass): array + { + $messages = []; + + $handlers = self::getMessageHandlers($handlerClass); + + foreach ($handlers as $handler) { + $method = $handler->method ?? '__invoke'; + + try { + $messages[] = self::getMessageHandlerMessage($handlerClass, ['method' => $method]); + } catch (\Throwable) { + // Skip handlers that can't be resolved + } + } + + // Also try default __invoke method if not already covered + try { + $message = self::getMessageHandlerMessage($handlerClass, ['method' => '__invoke']); + + if (!in_array($message, $messages, true)) { + $messages[] = $message; + } + } catch (\Throwable) { + // Skip + } + + return $messages; + } + + /** + * @param class-string $class + * @return list + */ + public static function getMessageRouting(string $class): array + { + $rc = new ReflectionClass($class); + + return array_map( + static fn (ReflectionAttribute $attribute): AsMessage => $attribute->newInstance(), + $rc->getAttributes(AsMessage::class), + ); + } + } diff --git a/tests/Cases/DI/MessengerExtension.routing.phpt b/tests/Cases/DI/MessengerExtension.routing.phpt index 2c30027..239c050 100644 --- a/tests/Cases/DI/MessengerExtension.routing.phpt +++ b/tests/Cases/DI/MessengerExtension.routing.phpt @@ -8,14 +8,17 @@ use Contributte\Tester\Toolkit; use Nette\DI\Compiler; use Symfony\Component\Messenger\Exception\NoHandlerForMessageException; use Symfony\Component\Messenger\MessageBus; +use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport; use Tester\Assert; use Tests\Mocks\Handler\InterfaceHandler; +use Tests\Mocks\Handler\RoutedMessageHandler; use Tests\Mocks\Handler\SameHandler1; use Tests\Mocks\Handler\SameHandler2; use Tests\Mocks\Handler\SimpleHandler; use Tests\Mocks\Handler\WildcardHandler; use Tests\Mocks\Message\MessageImpl1; use Tests\Mocks\Message\MessageImpl2; +use Tests\Mocks\Message\RoutedMessage; use Tests\Mocks\Message\SameMessage; use Tests\Mocks\Message\SimpleMessage; use Tests\Toolkit\Container; @@ -263,3 +266,98 @@ Toolkit::test(function: function (): void { $messageBus->dispatch(new MessageImpl1('1')); Assert::type(MessageImpl1::class, $handler->message); }); + +// Routing via #[AsMessage] attribute +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + transport: + memory: + dsn: in-memory:// + + services: + - Tests\Mocks\Handler\RoutedMessageHandler + NEON + )); + }) + ->build(); + + /** @var BusRegistry $busRegistry */ + $busRegistry = $container->getByType(BusRegistry::class); + $messageBus = $busRegistry->get('messageBus'); + + $messageBus->dispatch(new RoutedMessage('routed')); + + // Message was routed to async transport (not handled synchronously) + /** @var InMemoryTransport $transport */ + $transport = $container->getService('messenger.transport.memory'); + $sent = $transport->getSent(); + Assert::count(1, $sent); + Assert::type(RoutedMessage::class, $sent[0]->getMessage()); +}); + +// NEON routing takes precedence over #[AsMessage] attribute +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + transport: + memory: + dsn: in-memory:// + sync: + dsn: sync:// + + routing: + Tests\Mocks\Message\RoutedMessage: [sync] + + services: + - Tests\Mocks\Handler\RoutedMessageHandler + NEON + )); + }) + ->build(); + + /** @var BusRegistry $busRegistry */ + $busRegistry = $container->getByType(BusRegistry::class); + $messageBus = $busRegistry->get('messageBus'); + + /** @var RoutedMessageHandler $handler */ + $handler = $container->getByType(RoutedMessageHandler::class); + + // NEON routes to sync, so handler is called synchronously (overriding #[AsMessage(transport: 'memory')]) + $messageBus->dispatch(new RoutedMessage('overridden')); + Assert::type(RoutedMessage::class, $handler->message); + Assert::equal('overridden', $handler->message->text); + + // Memory transport should be empty (NEON routing to sync took precedence) + /** @var InMemoryTransport $transport */ + $transport = $container->getService('messenger.transport.memory'); + Assert::count(0, $transport->getSent()); +}); + +// #[AsMessage] with invalid transport fails validation +Toolkit::test(static function (): void { + Assert::exception( + static function (): void { + Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + + services: + - Tests\Mocks\Handler\RoutedMessageHandler + NEON + )); + }) + ->build(); + }, + LogicalException::class, + 'Invalid transport "memory"%a%' + ); +}); diff --git a/tests/Mocks/Handler/MultiRoutedMessageHandler.php b/tests/Mocks/Handler/MultiRoutedMessageHandler.php new file mode 100644 index 0000000..1fe56ef --- /dev/null +++ b/tests/Mocks/Handler/MultiRoutedMessageHandler.php @@ -0,0 +1,19 @@ +message = $message; + } + +} diff --git a/tests/Mocks/Handler/RoutedMessageHandler.php b/tests/Mocks/Handler/RoutedMessageHandler.php new file mode 100644 index 0000000..1d8dd3f --- /dev/null +++ b/tests/Mocks/Handler/RoutedMessageHandler.php @@ -0,0 +1,19 @@ +message = $message; + } + +} diff --git a/tests/Mocks/Message/MultiRoutedMessage.php b/tests/Mocks/Message/MultiRoutedMessage.php new file mode 100644 index 0000000..9a5da9f --- /dev/null +++ b/tests/Mocks/Message/MultiRoutedMessage.php @@ -0,0 +1,17 @@ + Date: Mon, 9 Mar 2026 15:42:26 +0100 Subject: [PATCH 13/21] Docs: update README with new config options Document worker limits, cache support, fallbackBus, Doctrine auto-registration, and #[AsMessage] attribute routing. --- .docs/README.md | 61 +++++++++++++++++++++++-- AGENTS.md | 115 +++++++++++++++++++++++++++++++++--------------- 2 files changed, 136 insertions(+), 40 deletions(-) diff --git a/.docs/README.md b/.docs/README.md index 5d78ae8..a01e040 100644 --- a/.docs/README.md +++ b/.docs/README.md @@ -81,6 +81,22 @@ messenger: debug: panel: %debugMode% + # Worker limits for production environments. + # Workers will gracefully stop when configured limit is reached. + worker: + memoryLimit: 134217728 # int|null (bytes), e.g. 128 MB + timeLimit: 3600 # int|null (seconds), e.g. 1 hour + messageLimit: 1000 # int|null, stop after N messages + failureLimit: 5 # int|null, stop after N failures + + # PSR-6 cache pool for messenger:stop-workers command and worker restart signal. + # When configured, registers StopWorkersCommand and StopWorkerOnRestartSignalListener. + cache: @cache.pool + + # Fallback bus for RoutableMessageBus. Used when no bus stamp is present on the envelope. + # Defaults to null (no fallback). Set to a bus name to enable. + fallbackBus: messageBus + # Defines buses, default one are messageBus, queryBus and commandBus. bus: messageBus: @@ -119,12 +135,14 @@ messenger: # consoleLogger: @specialLogger # Defines transport factories. + # Built-in factories (sync, inMemory, amqp, redis) are auto-registered when their classes exist. + # Doctrine factory is auto-registered when ConnectionRegistry is available in the container. transportFactory: # redis: Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory - # sync: Symfony\Component\Messenger\Transport\Sync\SyncTransportFactorya + # sync: Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory # amqp: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory # doctrine: Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory - # inMemory: Symfony\Component\Messenger\Transport\InMemoryTransportFactory + # inMemory: Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory # inMemory: @customMemoryTransportFactory # Defines global failure transport. Default is none. @@ -180,6 +198,8 @@ messenger: # Defines routing (message -> transport) # If the routing for message is missing, the message will be handled by handler immediately when dispatched + # Routing can also be defined via #[AsMessage] attribute on message classes (see below). + # NEON config takes precedence over attributes. routing: App\Domain\NewUserEmail: [redis] App\Domain\ForgotPasswordEmail: [db, redis] @@ -220,6 +240,40 @@ final class SimpleMessage } ``` +#### Routing via `#[AsMessage]` attribute + +Instead of configuring routing in NEON, you can use the `#[AsMessage]` attribute directly on your message class. +The attribute-based routing is auto-discovered from handler method parameters. NEON config takes precedence over attributes. + +```php + @@ -341,8 +395,7 @@ extensions: **Roadmap** -- No fallbackBus in RoutableMessageBus. -- No debug console commands. +- No Tracy debug panel integration (`TraceableMessageBus`). ## Examples diff --git a/AGENTS.md b/AGENTS.md index e1a611c..a3a38f1 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,12 +1,6 @@ # AGENTS.md -Nette DI integration for Symfony Messenger. Provides message buses (command/query/message), async transports (AMQP, Redis, Doctrine), handler auto-discovery, retry strategies, and failure transport routing — all configured via Nette DI extension and compiled through ordered passes. - -## Upstream - -- Changelog: https://github.com/symfony/messenger/blob/8.1/CHANGELOG.md -- Source: https://github.com/symfony/messenger -- Doc: https://symfony.com/doc/current/messenger.html +Nette DI integration for Symfony Messenger. Library and DI extension, not an application. Provides message buses, async transports, handler auto-discovery, retry strategies, and failure transport routing — all configured via Nette DI extension and compiled through ordered passes. ## Stack @@ -17,42 +11,91 @@ Nette DI integration for Symfony Messenger. Provides message buses (command/quer ## Codebase -- `src/DI/` — Main entry point. `MessengerExtension` delegates work to 10 passes in `Pass/`, each handling one concern (serializers, transports, routing, handlers, events, logging, console commands, buses, debug). Passes run in priority order (10->20->30->40) through three lifecycle hooks: load -> beforeCompile -> afterCompile. -- `src/Bus/` — MessageBus, CommandBus, QueryBus, BusRegistry -- `src/Container/` — PSR-11 wrappers for Nette DI -- `src/Handler/` — Handler locator -- `src/Logger/` — Dual HTTP/console logger -- `tests/Cases/` — Tests grouped by concern (DI/, Bus/, E2E/) -- `tests/Mocks/` — Test doubles -- `tests/Toolkit/` — Container builder, helpers +- `src/DI/` — main entry point; extension delegates to ordered passes in `Pass/`, each handling one concern (serializers, transports, routing, handlers, events, logging, console, buses, debug) +- `src/Bus/` — bus wrappers (message, command, query) and registry +- `src/Container/` — PSR-11 adapters for Nette DI +- `src/Handler/` — runtime handler locator with wildcard/interface/parent matching +- `src/Logger/` — dual HTTP/console logger bridge +- `tests/Cases/` — tests grouped by concern (DI/, Bus/, E2E/) +- `tests/Mocks/` — simple DTOs and handlers used as test doubles +- `tests/Toolkit/` — container builder and test helpers + +## Architecture + +### Compilation lifecycle + +The extension delegates work to passes through 3 lifecycle hooks: load -> beforeCompile -> afterCompile. Each pass handles exactly one concern. Pass order matters — later passes depend on services registered by earlier ones. + +Priority order: serializers and transports first, then routing and handlers, then events/logging/console, and finally buses and debug. + +When changing behavior, modify the responsible pass rather than the extension itself. + +### Configuration + +Config lives under `messenger:` key with sub-keys for debug, buses, serializers, transport factories, failure transports, transports, routing, and logging. The schema is defined in the extension class. When adding or changing config, update the schema first, then wire the option into the appropriate pass. + +Config values may be class names, `@service` references, or DI statements depending on the key. Routing and failure transports are validated against defined transport names at compile time. + +### Service naming and tags + +Extension constants define all tag names (transport factory, transport, failure transport, bus, handler, retry strategy). Service names follow patterns like `messenger.bus..*`, `messenger.transport.`, `messenger.serializer.`. Preserve these conventions — tests and utilities depend on them. -## Key conventions +### Handler discovery -- **Service tags**: `contributte.messenger.handler`, `contributte.messenger.transport`, `contributte.messenger.bus`, etc. (constants on `MessengerExtension`) -- **Service naming**: `messenger.bus.`, `messenger.transport.` -- **Handler discovery**: DI tag `contributte.messenger.handler` or PHP attribute `#[AsMessageHandler]` -- **Handler method**: defaults to `__invoke`, configurable via tag/attribute -- **Message type detection**: auto-detected from handler method's first parameter type-hint -- **Default middleware chain**: AddBusNameStamp -> DispatchAfterCurrentBus -> FailedMessageProcessing -> [custom] -> SendMessage -> HandleMessage +Handlers are registered via DI tag or `#[AsMessageHandler]` attribute. The handled message type is inferred from the handler method's first parameter type-hint (default method `__invoke`). Union/intersection types are rejected. Handlers are grouped per bus and sorted by priority. -## Development +At runtime, the handler locator matches messages by concrete class, parent classes, implemented interfaces, namespace wildcards, and catch-all `*`. + +### Buses and middleware + +Each bus gets a handler locator, an optional default middleware stack, custom middlewares, and optionally a typed wrapper (message/command/query bus). Default middleware order: bus name stamp -> dispatch after current bus -> failed message processing -> [custom] -> send message -> handle message. Preserve this ordering. + +### Transports and events + +Built-in transport factories are registered only when the corresponding Symfony bridge class exists. Retry strategies default to multiplier unless a custom service is configured. The event pass wires retry and failure listeners. If an event dispatcher already exists in the container, it is reused. + +## Code Style + +- `withDefaults()->withCompiler(fn ($compiler) => ...)->build()` -- NEON config snippets via `Helpers::neon(<<<'NEON' ... NEON)` +- `.phpt` files with multiple test cases per file +- Containers built via toolkit: `Container::of()->withDefaults()->withCompiler(...)->build()` +- Inline NEON config via `Helpers::neon(<<<'NEON' ... NEON)` - Assertions: `Assert::type()`, `Assert::count()`, `Assert::equal()`, `Assert::exception()` -- DI tests (`tests/Cases/DI/`) verify service registration, tags, and config validation -- E2E tests (`tests/Cases/E2E/`) extend `TestCase`, test full dispatch->handle workflows +- DI tests verify service registration, tags, and config validation errors +- E2E tests extend `TestCase` for full dispatch->handle workflows - File naming: `MessengerExtension.{feature}.phpt` -- Mocks are simple DTOs/handlers in `tests/Mocks/` with public properties for assertions +- Mocks are simple DTOs/handlers with public properties for assertions +- Compiled containers written to `tests/tmp` for debugging + +## Upstream + +- Docs: https://symfony.com/doc/current/messenger.html +- Source: https://github.com/symfony/messenger +- Changelog: https://github.com/symfony/messenger/blob/8.1/CHANGELOG.md From f2c1c5b1fef857f76c1d8451e9888ed596a5fc99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 15:47:26 +0100 Subject: [PATCH 14/21] AI: init --- .claude/settings.local.json | 4 +++- AGENTS.md | 43 +++++++++++-------------------------- 2 files changed, 15 insertions(+), 32 deletions(-) diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 4654dbb..923e417 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -8,7 +8,9 @@ "Bash(make:*:*)", "Bash(grep:*)", "Bash(cat:*)", - "Bash(ls:*)" + "Bash(ls:*)", + "Bash(git add:*)", + "Bash(git commit:*)" ] } } diff --git a/AGENTS.md b/AGENTS.md index a3a38f1..ba4c7c7 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -22,37 +22,18 @@ Nette DI integration for Symfony Messenger. Library and DI extension, not an app ## Architecture -### Compilation lifecycle - -The extension delegates work to passes through 3 lifecycle hooks: load -> beforeCompile -> afterCompile. Each pass handles exactly one concern. Pass order matters — later passes depend on services registered by earlier ones. - -Priority order: serializers and transports first, then routing and handlers, then events/logging/console, and finally buses and debug. - -When changing behavior, modify the responsible pass rather than the extension itself. - -### Configuration - -Config lives under `messenger:` key with sub-keys for debug, buses, serializers, transport factories, failure transports, transports, routing, and logging. The schema is defined in the extension class. When adding or changing config, update the schema first, then wire the option into the appropriate pass. - -Config values may be class names, `@service` references, or DI statements depending on the key. Routing and failure transports are validated against defined transport names at compile time. - -### Service naming and tags - -Extension constants define all tag names (transport factory, transport, failure transport, bus, handler, retry strategy). Service names follow patterns like `messenger.bus..*`, `messenger.transport.`, `messenger.serializer.`. Preserve these conventions — tests and utilities depend on them. - -### Handler discovery - -Handlers are registered via DI tag or `#[AsMessageHandler]` attribute. The handled message type is inferred from the handler method's first parameter type-hint (default method `__invoke`). Union/intersection types are rejected. Handlers are grouped per bus and sorted by priority. - -At runtime, the handler locator matches messages by concrete class, parent classes, implemented interfaces, namespace wildcards, and catch-all `*`. - -### Buses and middleware - -Each bus gets a handler locator, an optional default middleware stack, custom middlewares, and optionally a typed wrapper (message/command/query bus). Default middleware order: bus name stamp -> dispatch after current bus -> failed message processing -> [custom] -> send message -> handle message. Preserve this ordering. - -### Transports and events - -Built-in transport factories are registered only when the corresponding Symfony bridge class exists. Retry strategies default to multiplier unless a custom service is configured. The event pass wires retry and failure listeners. If an event dispatcher already exists in the container, it is reused. +- Extension delegates to ordered passes via load -> beforeCompile -> afterCompile hooks, each handling one concern +- Pass priority: serializers/transports -> routing/handlers -> events/logging/console -> buses/debug +- Modify the responsible pass, not the extension +- Config under `messenger:` key — schema defined in extension class, update schema first, then wire into the pass +- Config values can be class names, `@service` references, or DI statements; routing/failure transports validated at compile time +- Tag names defined as extension constants; service names follow `messenger.bus..*`, `messenger.transport.`, `messenger.serializer.` — preserve these +- Handlers registered via DI tag or `#[AsMessageHandler]`, message type inferred from first parameter type-hint (`__invoke`) +- Union/intersection types rejected; handlers grouped per bus, sorted by priority +- Runtime handler matching: concrete class, parents, interfaces, namespace wildcards, `*` +- Default middleware order: bus name stamp -> dispatch after current bus -> failed message processing -> [custom] -> send -> handle — preserve this order +- Transport factories registered only when corresponding Symfony bridge class exists +- Retry defaults to multiplier; event pass wires retry/failure listeners, reuses existing event dispatcher ## Code Style From 9220dd952aac7350a5cc3dc4259fdac9954a3cad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 21:00:48 +0100 Subject: [PATCH 15/21] Feature: autowire RoutableMessageBus as MessageBusInterface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Individual buses are no longer autowired — RoutableMessageBus is the single autowired MessageBusInterface, ensuring SyncTransport (and any other autowired consumer) routes through the correct bus via BusNameStamp. Remove the now-unused `autowired` bus config option from the schema. --- src/DI/MessengerExtension.php | 4 +- src/DI/Pass/BusPass.php | 10 ++-- tests/Cases/DI/MessengerExtension.bus.phpt | 11 ++-- .../Cases/DI/MessengerExtension.routing.phpt | 4 +- .../E2E/SyncTransportBusRoutingTest.phpt | 53 +++++++++++++++++++ tests/Mocks/Message/EventMessage.php | 15 ++++++ 6 files changed, 86 insertions(+), 11 deletions(-) create mode 100644 tests/Cases/E2E/SyncTransportBusRoutingTest.phpt create mode 100644 tests/Mocks/Message/EventMessage.php diff --git a/src/DI/MessengerExtension.php b/src/DI/MessengerExtension.php index 6d34c03..fc84e70 100644 --- a/src/DI/MessengerExtension.php +++ b/src/DI/MessengerExtension.php @@ -92,7 +92,7 @@ public function getConfigSchema(): Schema 'middlewares' => Expect::arrayOf((clone $expectService)), 'allowNoHandlers' => Expect::bool(false), 'allowNoSenders' => Expect::bool(true), - 'autowired' => Expect::bool(), + 'class' => (clone $expectClass)->required(false)->assert(fn ($input) => is_string($input) && is_subclass_of($input, MessageBusInterface::class), 'Specified bus class must implements "MessageBusInterface"'), 'wrapper' => (clone $expectClass)->required(false), ]), @@ -102,7 +102,7 @@ public function getConfigSchema(): Schema 'defaultMiddlewares' => true, 'middlewares' => [], 'class' => null, - 'autowired' => true, + 'allowNoHandlers' => false, 'allowNoSenders' => true, ], diff --git a/src/DI/Pass/BusPass.php b/src/DI/Pass/BusPass.php index df3746f..0cc9c74 100644 --- a/src/DI/Pass/BusPass.php +++ b/src/DI/Pass/BusPass.php @@ -77,10 +77,10 @@ public function loadPassConfiguration(): void ->addSetup('setLogger', [$this->prefix('@logger.logger')]); } - // Register message bus + // Register message bus (not autowired — RoutableMessageBus is the autowired MessageBusInterface) $builder->addDefinition($this->prefix(sprintf('bus.%s.bus', $name))) ->setFactory($busConfig->class ?? SymfonyMessageBus::class, [$middlewares]) - ->setAutowired($busConfig->autowired ?? count($builder->findByTag(MessengerExtension::BUS_TAG)) === 0) + ->setAutowired(false) ->setTags([MessengerExtension::BUS_TAG => $name]); // Register bus wrapper @@ -95,14 +95,16 @@ public function loadPassConfiguration(): void ->setFactory(NetteContainer::class) ->setAutowired(false); - // Register routable bus (for CLI) + // Register routable bus — autowired as MessageBusInterface so that + // SyncTransport (and any other autowired consumer) routes through + // the correct bus based on BusNameStamp. $fallbackBusName = $config->fallbackBus ?? null; $builder->addDefinition($this->prefix('bus.routable')) ->setFactory(RoutableMessageBus::class, [ $this->prefix('@bus.container'), $fallbackBusName !== null ? $this->prefix(sprintf('@bus.%s.bus', $fallbackBusName)) : null, ]) - ->setAutowired(false); + ->setAutowired(true); // Register bus registry $builder->addDefinition($this->prefix('busRegistry')) diff --git a/tests/Cases/DI/MessengerExtension.bus.phpt b/tests/Cases/DI/MessengerExtension.bus.phpt index 73ecfff..5b09719 100644 --- a/tests/Cases/DI/MessengerExtension.bus.phpt +++ b/tests/Cases/DI/MessengerExtension.bus.phpt @@ -6,7 +6,9 @@ use Contributte\Tester\Toolkit; use Nette\DI\Compiler; use Nette\DI\InvalidConfigurationException; use Psr\Container\ContainerInterface; +use ReflectionClass; use Symfony\Component\Messenger\MessageBus; +use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Middleware\MiddlewareInterface; use Symfony\Component\Messenger\RoutableMessageBus; use Tester\Assert; @@ -168,7 +170,7 @@ Toolkit::test(function (): void { Assert::type(BusWrapper::class, $container->getByType(BusWrapper::class)); }); -// RoutableMessageBus without fallback bus (default) +// RoutableMessageBus is autowired as MessageBusInterface, no fallback by default Toolkit::test(static function (): void { $container = Container::of() ->withDefaults() @@ -178,7 +180,10 @@ Toolkit::test(static function (): void { $routableBus = $container->getService('messenger.bus.routable'); Assert::type(RoutableMessageBus::class, $routableBus); - $rc = new \ReflectionClass($routableBus); + // RoutableMessageBus is autowired as MessageBusInterface + Assert::type(RoutableMessageBus::class, $container->getByType(MessageBusInterface::class)); + + $rc = new ReflectionClass($routableBus); $prop = $rc->getProperty('fallbackBus'); Assert::null($prop->getValue($routableBus)); }); @@ -203,7 +208,7 @@ Toolkit::test(static function (): void { /** @var RoutableMessageBus $routableBus */ $routableBus = $container->getService('messenger.bus.routable'); - $rc = new \ReflectionClass($routableBus); + $rc = new ReflectionClass($routableBus); $prop = $rc->getProperty('fallbackBus'); Assert::type(MessageBus::class, $prop->getValue($routableBus)); }); diff --git a/tests/Cases/DI/MessengerExtension.routing.phpt b/tests/Cases/DI/MessengerExtension.routing.phpt index 239c050..aee7c8a 100644 --- a/tests/Cases/DI/MessengerExtension.routing.phpt +++ b/tests/Cases/DI/MessengerExtension.routing.phpt @@ -103,7 +103,7 @@ Toolkit::test(function (): void { ->build(); /** @var MessageBus $messageBus */ - $messageBus = $container->getByType(MessageBus::class); + $messageBus = $container->getService('messenger.bus.messageBus.bus'); Assert::exception( static fn () => $messageBus->dispatch(new SimpleMessage('foobar')), @@ -135,7 +135,7 @@ Toolkit::test(function (): void { ->build(); /** @var MessageBus $messageBus */ - $messageBus = $container->getByType(MessageBus::class); + $messageBus = $container->getService('messenger.bus.messageBus.bus'); $messageBus->dispatch(new SameMessage('foobar')); /** @var SameHandler2 $handler1 */ diff --git a/tests/Cases/E2E/SyncTransportBusRoutingTest.phpt b/tests/Cases/E2E/SyncTransportBusRoutingTest.phpt new file mode 100644 index 0000000..7f40333 --- /dev/null +++ b/tests/Cases/E2E/SyncTransportBusRoutingTest.phpt @@ -0,0 +1,53 @@ +withDefaults() + ->withCompiler(function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + transport: + sync: + dsn: sync:// + + routing: + Tests\Mocks\Message\EventMessage: [sync] + + bus: + messageBus: + eventBus: + allowNoHandlers: true + NEON + )); + }) + ->build(); + + // Dispatching through eventBus (allowNoHandlers: true) — no exception + /** @var MessageBusInterface $eventBus */ + $eventBus = $container->getService('messenger.bus.eventBus.bus'); + $eventBus->dispatch(new EventMessage('hello')); + + // Dispatching through messageBus (allowNoHandlers: false) — throws + /** @var MessageBusInterface $messageBus */ + $messageBus = $container->getService('messenger.bus.messageBus.bus'); + Assert::exception( + static fn () => $messageBus->dispatch(new EventMessage('hello')), + NoHandlerForMessageException::class, + 'No handler for message "Tests\Mocks\Message\EventMessage".', + ); +}); diff --git a/tests/Mocks/Message/EventMessage.php b/tests/Mocks/Message/EventMessage.php new file mode 100644 index 0000000..7859c84 --- /dev/null +++ b/tests/Mocks/Message/EventMessage.php @@ -0,0 +1,15 @@ +text = $text; + } + +} From 1b3eaf9ee5abbeaaf70b1bf4f41f68169cf60715 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 21:00:56 +0100 Subject: [PATCH 16/21] Feature: improve console commands and DebugCommand handler mapping Remove class_exists guard for StatsCommand (always available in Symfony 7+). Fix DebugCommand mapping to show handler class names instead of service IDs. Add execution-based tests using CommandTester for DebugCommand, SetupTransportsCommand, and StatsCommand. --- src/DI/Pass/ConsolePass.php | 18 +-- src/DI/Utils/BuilderMan.php | 23 +++- .../Cases/DI/MessengerExtension.console.phpt | 123 ++++++++++++++---- tests/Toolkit/Console.php | 16 +++ 4 files changed, 138 insertions(+), 42 deletions(-) create mode 100644 tests/Toolkit/Console.php diff --git a/src/DI/Pass/ConsolePass.php b/src/DI/Pass/ConsolePass.php index add3b86..ee8f275 100644 --- a/src/DI/Pass/ConsolePass.php +++ b/src/DI/Pass/ConsolePass.php @@ -33,15 +33,13 @@ public function loadPassConfiguration(): void ]); $builder->addDefinition($this->extension->prefix('console.debugCommand')) - ->setFactory(DebugCommand::class, [[]]); // @TODO mapping + ->setFactory(DebugCommand::class, [[]]); $builder->addDefinition($this->extension->prefix('console.setupTransportsCommand')) - ->setFactory(SetupTransportsCommand::class, [$this->prefix('@transport.container'), []]); // @TODO transportNames + ->setFactory(SetupTransportsCommand::class, [$this->prefix('@transport.container'), []]); - if (class_exists(StatsCommand::class)) { - $builder->addDefinition($this->extension->prefix('console.statsCommand')) - ->setFactory(StatsCommand::class, [$this->prefix('@transport.container'), []]); // @TODO transportNames - } + $builder->addDefinition($this->extension->prefix('console.statsCommand')) + ->setFactory(StatsCommand::class, [$this->prefix('@transport.container'), []]); $builder->addDefinition($this->extension->prefix('console.failedMessageRemoveCommand')) ->setFactory(FailedMessagesRemoveCommand::class, [ @@ -88,11 +86,9 @@ public function beforePassCompile(): void $setupTransportsCommandDef = $builder->getDefinition($this->prefix('console.setupTransportsCommand')); $setupTransportsCommandDef->setArgument(1, $transportNames); - if (class_exists(StatsCommand::class)) { - /** @var ServiceDefinition $statsCommandDef */ - $statsCommandDef = $builder->getDefinition($this->prefix('console.statsCommand')); - $statsCommandDef->setArgument(1, $transportNames); - } + /** @var ServiceDefinition $statsCommandDef */ + $statsCommandDef = $builder->getDefinition($this->prefix('console.statsCommand')); + $statsCommandDef->setArgument(1, $transportNames); // Handler mapping /** @var ServiceDefinition $debugCommandDef */ diff --git a/src/DI/Utils/BuilderMan.php b/src/DI/Utils/BuilderMan.php index ab566ed..a6f765f 100644 --- a/src/DI/Utils/BuilderMan.php +++ b/src/DI/Utils/BuilderMan.php @@ -142,7 +142,7 @@ public function getHandlerServiceNames(): array } /** - * @return array>> + * @return array}>>> */ public function getHandlerMapping(): array { @@ -154,7 +154,7 @@ public function getHandlerMapping(): array /** @var ServiceDefinition $locator */ $locator = $builder->getDefinition($this->pass->prefix(sprintf('bus.%s.locator', $busName))); - /** @var array> $handlers */ + /** @var array> $handlers */ $handlers = $locator->getFactory()->arguments[0] ?? []; $busMapping = []; @@ -163,13 +163,28 @@ public function getHandlerMapping(): array $descriptions = []; foreach ($handlerDescriptors as $descriptor) { - $description = $descriptor['service']; + $serviceDef = $builder->getDefinition($descriptor['service']); + $description = $serviceDef->getType(); if ($descriptor['method'] !== '__invoke') { $description .= '::' . $descriptor['method']; } - $descriptions[] = $description; + $options = []; + + if ($descriptor['method'] !== '__invoke') { + $options['method'] = $descriptor['method']; + } + + if (isset($descriptor['from_transport'])) { + $options['from_transport'] = $descriptor['from_transport']; + } + + if (isset($descriptor['alias'])) { + $options['alias'] = $descriptor['alias']; + } + + $descriptions[] = [$description, $options]; } $busMapping[$messageClass] = $descriptions; diff --git a/tests/Cases/DI/MessengerExtension.console.phpt b/tests/Cases/DI/MessengerExtension.console.phpt index 7fca57f..8df394c 100644 --- a/tests/Cases/DI/MessengerExtension.console.phpt +++ b/tests/Cases/DI/MessengerExtension.console.phpt @@ -4,6 +4,8 @@ namespace Tests\Cases\DI; use Contributte\Tester\Toolkit; use Nette\DI\Compiler; +use ReflectionClass; +use Symfony\Component\Console\Tester\CommandTester; use Symfony\Component\Messenger\Command\ConsumeMessagesCommand; use Symfony\Component\Messenger\Command\DebugCommand; use Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand; @@ -12,6 +14,8 @@ use Symfony\Component\Messenger\Command\FailedMessagesShowCommand; use Symfony\Component\Messenger\Command\SetupTransportsCommand; use Symfony\Component\Messenger\Command\StatsCommand; use Tester\Assert; +use Tests\Mocks\Message\SimpleMessage; +use Tests\Toolkit\Console; use Tests\Toolkit\Container; use Tests\Toolkit\Helpers; @@ -29,10 +33,7 @@ Toolkit::test(static function (): void { Assert::count(1, $container->findByType(FailedMessagesRemoveCommand::class)); Assert::count(1, $container->findByType(FailedMessagesRetryCommand::class)); Assert::count(1, $container->findByType(FailedMessagesShowCommand::class)); - - if (class_exists(StatsCommand::class)) { - Assert::count(1, $container->findByType(StatsCommand::class)); - } + Assert::count(1, $container->findByType(StatsCommand::class)); }); // DebugCommand receives handler mapping @@ -59,14 +60,14 @@ Toolkit::test(static function (): void { /** @var DebugCommand $debugCommand */ $debugCommand = $container->getByType(DebugCommand::class); - $rc = new \ReflectionClass($debugCommand); + $rc = new ReflectionClass($debugCommand); $prop = $rc->getProperty('mapping'); /** @var array>> $mapping */ $mapping = $prop->getValue($debugCommand); Assert::true(isset($mapping['messageBus'])); - Assert::true(isset($mapping['messageBus']['Tests\Mocks\Message\SimpleMessage'])); - Assert::count(1, $mapping['messageBus']['Tests\Mocks\Message\SimpleMessage']); + Assert::true(isset($mapping['messageBus'][SimpleMessage::class])); + Assert::count(1, $mapping['messageBus'][SimpleMessage::class]); }); // DebugCommand with multiple buses @@ -91,7 +92,7 @@ Toolkit::test(static function (): void { /** @var DebugCommand $debugCommand */ $debugCommand = $container->getByType(DebugCommand::class); - $rc = new \ReflectionClass($debugCommand); + $rc = new ReflectionClass($debugCommand); $prop = $rc->getProperty('mapping'); /** @var array>> $mapping */ $mapping = $prop->getValue($debugCommand); @@ -99,11 +100,62 @@ Toolkit::test(static function (): void { Assert::true(isset($mapping['messageBus'])); Assert::true(isset($mapping['commandBus'])); // SimpleHandler has #[AsMessageHandler] without bus restriction, so it registers on all buses - Assert::true(isset($mapping['messageBus']['Tests\Mocks\Message\SimpleMessage'])); - Assert::true(isset($mapping['commandBus']['Tests\Mocks\Message\SimpleMessage'])); + Assert::true(isset($mapping['messageBus'][SimpleMessage::class])); + Assert::true(isset($mapping['commandBus'][SimpleMessage::class])); }); -// SetupTransportsCommand and StatsCommand receive transport names +// DebugCommand executes successfully and shows handler mapping +Toolkit::test(function: static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + transport: + memory: + dsn: in-memory:// + + routing: + Tests\Mocks\Message\SimpleMessage: [memory] + + services: + - Tests\Mocks\Handler\SimpleHandler + NEON + )); + }) + ->build(); + + /** @var DebugCommand $debugCommand */ + $debugCommand = $container->getByType(DebugCommand::class); + + $tester = new CommandTester($debugCommand); + $tester->execute([]); + + $output = $tester->getDisplay(true); + + $expected = <<<'TEXT' + Messenger + ========= + + messageBus + ---------- + + The following messages can be dispatched: + + -------------------------------------------------- + Tests\Mocks\Message\SimpleMessage + handled by Tests\Mocks\Handler\SimpleHandler + + -------------------------------------------------- + TEXT; + + Assert::equal( + Console::normalize($expected), + Console::normalize($output) + ); +}); + +// SetupTransportsCommand receives transport names Toolkit::test(static function (): void { $container = Container::of() ->withDefaults() @@ -123,24 +175,41 @@ Toolkit::test(static function (): void { /** @var SetupTransportsCommand $setupCommand */ $setupCommand = $container->getByType(SetupTransportsCommand::class); - $rc = new \ReflectionClass($setupCommand); - $prop = $rc->getProperty('transportNames'); - /** @var list $transportNames */ - $transportNames = $prop->getValue($setupCommand); + $tester = new CommandTester($setupCommand); + $tester->execute([]); + + $output = $tester->getDisplay(true); + + Assert::match('~.*The "async" transport does not support setup.*~s', $output); + Assert::match('~.*The "failed" transport does not support setup.*~s', $output); + Assert::same(0, $tester->getStatusCode()); +}); + +// StatsCommand receives transport names +Toolkit::test(static function (): void { + $container = Container::of() + ->withDefaults() + ->withCompiler(static function (Compiler $compiler): void { + $compiler->addConfig(Helpers::neon(<<<'NEON' + messenger: + transport: + async: + dsn: in-memory:// + failed: + dsn: in-memory:// + NEON + )); + }) + ->build(); - Assert::contains('async', $transportNames); - Assert::contains('failed', $transportNames); + /** @var StatsCommand $statsCommand */ + $statsCommand = $container->getByType(StatsCommand::class); - if (class_exists(StatsCommand::class)) { - /** @var StatsCommand $statsCommand */ - $statsCommand = $container->getByType(StatsCommand::class); + $tester = new CommandTester($statsCommand); + $tester->execute([]); - $rc = new \ReflectionClass($statsCommand); - $prop = $rc->getProperty('transportNames'); - /** @var list $statsTransportNames */ - $statsTransportNames = $prop->getValue($statsCommand); + $output = $tester->getDisplay(true); - Assert::contains('async', $statsTransportNames); - Assert::contains('failed', $statsTransportNames); - } + Assert::match('~.*Unable to get message count for the following transports: "async",.*"failed".*~s', $output); + Assert::same(0, $tester->getStatusCode()); }); diff --git a/tests/Toolkit/Console.php b/tests/Toolkit/Console.php new file mode 100644 index 0000000..af7e659 --- /dev/null +++ b/tests/Toolkit/Console.php @@ -0,0 +1,16 @@ + rtrim(ltrim($line, "\t")), + explode("\n", trim($s)), + )); + } + +} From d34a0fb1e0dc15607dc6ef2b81ff7822545ea165 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 21:01:01 +0100 Subject: [PATCH 17/21] Refactor: use ::class constants instead of string literals in E2E tests --- tests/Cases/E2E/Vendor/MessengerTest.php | 27 ++++++++++++------------ 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/tests/Cases/E2E/Vendor/MessengerTest.php b/tests/Cases/E2E/Vendor/MessengerTest.php index cadee93..a97ee41 100644 --- a/tests/Cases/E2E/Vendor/MessengerTest.php +++ b/tests/Cases/E2E/Vendor/MessengerTest.php @@ -16,6 +16,7 @@ use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy; +use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport; use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory; use Symfony\Component\Messenger\Transport\Sender\SendersLocator; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; @@ -243,22 +244,22 @@ public function testFailureListener(): void 'level' => 'info', 'message' => 'Sending message {class} with {alias} sender using {sender}', 'context' => [ - 'class' => 'Tests\Mocks\Vendor\DummyRetryFailureMessage', + 'class' => DummyRetryFailureMessage::class, 'alias' => 'transport1', - 'sender' => 'Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport', + 'sender' => InMemoryTransport::class, ], ], [ 'level' => 'info', 'message' => 'Received message {class}', - 'context' => ['class' => 'Tests\Mocks\Vendor\DummyRetryFailureMessage'], + 'context' => ['class' => DummyRetryFailureMessage::class], ], [ 'level' => 'info', 'message' => 'Rejected message {class} will be sent to the failure transport {transport}.', 'context' => [ - 'class' => 'Tests\Mocks\Vendor\DummyRetryFailureMessage', - 'transport' => 'Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport', + 'class' => DummyRetryFailureMessage::class, + 'transport' => InMemoryTransport::class, ], ], [ @@ -349,40 +350,40 @@ public function testRealRetryListener(): void 'level' => 'warning', 'message' => 'Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', 'context' => [ - 'class' => 'Tests\Mocks\Vendor\DummyFailureMessage', + 'class' => DummyFailureMessage::class, 'message_id' => 1, 'retryCount' => 1, 'delay' => 1, - 'error' => 'Handling "Tests\Mocks\Vendor\DummyFailureMessage" failed: Foo', + 'error' => 'Handling "' . DummyFailureMessage::class . '" failed: Foo', ], ], [ 'level' => 'warning', 'message' => 'Error thrown while handling message {class}. Sending for retry #{retryCount} using {delay} ms delay. Error: "{error}"', 'context' => [ - 'class' => 'Tests\Mocks\Vendor\DummyFailureMessage', + 'class' => DummyFailureMessage::class, 'message_id' => 2, 'retryCount' => 2, 'delay' => 2, - 'error' => 'Handling "Tests\Mocks\Vendor\DummyFailureMessage" failed: Foo', + 'error' => 'Handling "' . DummyFailureMessage::class . '" failed: Foo', ], ], [ 'level' => 'critical', 'message' => 'Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', 'context' => [ - 'class' => 'Tests\Mocks\Vendor\DummyFailureMessage', + 'class' => DummyFailureMessage::class, 'message_id' => 3, 'retryCount' => 2, - 'error' => 'Handling "Tests\Mocks\Vendor\DummyFailureMessage" failed: Foo', + 'error' => 'Handling "' . DummyFailureMessage::class . '" failed: Foo', ], ], [ 'level' => 'info', 'message' => 'Rejected message {class} will be sent to the failure transport {transport}.', 'context' => [ - 'class' => 'Tests\Mocks\Vendor\DummyFailureMessage', - 'transport' => 'Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport', + 'class' => DummyFailureMessage::class, + 'transport' => InMemoryTransport::class, ], ], [ From f1ca09c7cef083333701d85b893133554c2e2d6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 21:01:07 +0100 Subject: [PATCH 18/21] Docs: simplify AGENTS.md testing section --- AGENTS.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index ba4c7c7..da4741e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -58,11 +58,10 @@ make tests # run all tests make phpstan # static analysis make csf # fix code style make qa # phpstan + cs - -# Single test file -vendor/bin/tester -s -p php --colors 1 -C tests/Cases/DI/MessengerExtension.handler.phpt ``` +**Always run `make cs phpstan tests` and fix all errors.** + ### Conventions - `.phpt` files with multiple test cases per file From 3297529e2a26a23dac6cdb9f7abb717739b674fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 21:01:09 +0100 Subject: [PATCH 19/21] AI: update local settings --- .claude/settings.local.json | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 923e417..5f7ce83 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -9,8 +9,10 @@ "Bash(grep:*)", "Bash(cat:*)", "Bash(ls:*)", + "Bash(gh pr:*)", "Bash(git add:*)", - "Bash(git commit:*)" + "Bash(git commit:*)", + "Bash(git fetch:*)" ] } } From e88c352fe1905c12dde522f2fcd4ec0994c464ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 21:06:18 +0100 Subject: [PATCH 20/21] Fix: syntax error in console test, duplicate check in BuilderMan, cosmetic blank lines --- src/DI/MessengerExtension.php | 2 -- src/DI/Utils/BuilderMan.php | 5 +---- tests/Cases/DI/MessengerExtension.console.phpt | 2 +- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/DI/MessengerExtension.php b/src/DI/MessengerExtension.php index fc84e70..ce92058 100644 --- a/src/DI/MessengerExtension.php +++ b/src/DI/MessengerExtension.php @@ -92,7 +92,6 @@ public function getConfigSchema(): Schema 'middlewares' => Expect::arrayOf((clone $expectService)), 'allowNoHandlers' => Expect::bool(false), 'allowNoSenders' => Expect::bool(true), - 'class' => (clone $expectClass)->required(false)->assert(fn ($input) => is_string($input) && is_subclass_of($input, MessageBusInterface::class), 'Specified bus class must implements "MessageBusInterface"'), 'wrapper' => (clone $expectClass)->required(false), ]), @@ -102,7 +101,6 @@ public function getConfigSchema(): Schema 'defaultMiddlewares' => true, 'middlewares' => [], 'class' => null, - 'allowNoHandlers' => false, 'allowNoSenders' => true, ], diff --git a/src/DI/Utils/BuilderMan.php b/src/DI/Utils/BuilderMan.php index a6f765f..17e2c59 100644 --- a/src/DI/Utils/BuilderMan.php +++ b/src/DI/Utils/BuilderMan.php @@ -166,13 +166,10 @@ public function getHandlerMapping(): array $serviceDef = $builder->getDefinition($descriptor['service']); $description = $serviceDef->getType(); - if ($descriptor['method'] !== '__invoke') { - $description .= '::' . $descriptor['method']; - } - $options = []; if ($descriptor['method'] !== '__invoke') { + $description .= '::' . $descriptor['method']; $options['method'] = $descriptor['method']; } diff --git a/tests/Cases/DI/MessengerExtension.console.phpt b/tests/Cases/DI/MessengerExtension.console.phpt index 8df394c..ff0acce 100644 --- a/tests/Cases/DI/MessengerExtension.console.phpt +++ b/tests/Cases/DI/MessengerExtension.console.phpt @@ -105,7 +105,7 @@ Toolkit::test(static function (): void { }); // DebugCommand executes successfully and shows handler mapping -Toolkit::test(function: static function (): void { +Toolkit::test(static function (): void { $container = Container::of() ->withDefaults() ->withCompiler(static function (Compiler $compiler): void { From 35f5cb82b0618efa843c4b2a26f1b166030f6008 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milan=20Felix=20=C5=A0ulc?= Date: Mon, 9 Mar 2026 21:15:58 +0100 Subject: [PATCH 21/21] Code: micro optimized --- src/DI/Utils/BuilderMan.php | 2 +- src/DI/Utils/Reflector.php | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/DI/Utils/BuilderMan.php b/src/DI/Utils/BuilderMan.php index 17e2c59..3621261 100644 --- a/src/DI/Utils/BuilderMan.php +++ b/src/DI/Utils/BuilderMan.php @@ -164,7 +164,7 @@ public function getHandlerMapping(): array foreach ($handlerDescriptors as $descriptor) { $serviceDef = $builder->getDefinition($descriptor['service']); - $description = $serviceDef->getType(); + $description = $serviceDef->getType() ?? $descriptor['service']; $options = []; diff --git a/src/DI/Utils/Reflector.php b/src/DI/Utils/Reflector.php index dde6cab..fa5bba0 100644 --- a/src/DI/Utils/Reflector.php +++ b/src/DI/Utils/Reflector.php @@ -126,15 +126,13 @@ public static function getHandlerMessageClasses(string $handlerClass): array } } - // Also try default __invoke method if not already covered - try { - $message = self::getMessageHandlerMessage($handlerClass, ['method' => '__invoke']); - - if (!in_array($message, $messages, true)) { - $messages[] = $message; + // Try default __invoke method only if no handlers were found from attributes + if ($handlers === []) { + try { + $messages[] = self::getMessageHandlerMessage($handlerClass, ['method' => '__invoke']); + } catch (\Throwable) { + // Skip } - } catch (\Throwable) { - // Skip } return $messages;