Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .claude/settings.local.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"permissions": {
"allow": [
"WebFetch(domain:github.com)",
"WebFetch(domain:symfony.com)",
"WebFetch(domain:nette.org)",
"Bash(make:*)",
"Bash(make:*:*)",
"Bash(grep:*)",
"Bash(cat:*)",
"Bash(ls:*)",
"Bash(gh pr:*)",
"Bash(git add:*)",
"Bash(git commit:*)",
"Bash(git fetch:*)"
]
}
}
61 changes: 57 additions & 4 deletions .docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ messenger:
debug:
panel: %debugMode%

# Worker limits for production environments.
# Workers will gracefully stop when configured limit is reached.
worker:
memoryLimit: 134217728 # int|null (bytes), e.g. 128 MB
timeLimit: 3600 # int|null (seconds), e.g. 1 hour
messageLimit: 1000 # int|null, stop after N messages
failureLimit: 5 # int|null, stop after N failures

# PSR-6 cache pool for messenger:stop-workers command and worker restart signal.
# When configured, registers StopWorkersCommand and StopWorkerOnRestartSignalListener.
cache: @cache.pool

# Fallback bus for RoutableMessageBus. Used when no bus stamp is present on the envelope.
# Defaults to null (no fallback). Set to a bus name to enable.
fallbackBus: messageBus

# Defines buses, default one are messageBus, queryBus and commandBus.
bus:
messageBus:
Expand Down Expand Up @@ -119,12 +135,14 @@ messenger:
# consoleLogger: @specialLogger

# Defines transport factories.
# Built-in factories (sync, inMemory, amqp, redis) are auto-registered when their classes exist.
# Doctrine factory is auto-registered when ConnectionRegistry is available in the container.
transportFactory:
# redis: Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory
# sync: Symfony\Component\Messenger\Transport\Sync\SyncTransportFactorya
# sync: Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory
# amqp: Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory
# doctrine: Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory
# inMemory: Symfony\Component\Messenger\Transport\InMemoryTransportFactory
# inMemory: Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory
# inMemory: @customMemoryTransportFactory

# Defines global failure transport. Default is none.
Expand Down Expand Up @@ -180,6 +198,8 @@ messenger:

# Defines routing (message -> transport)
# If the routing for message is missing, the message will be handled by handler immediately when dispatched
# Routing can also be defined via #[AsMessage] attribute on message classes (see below).
# NEON config takes precedence over attributes.
routing:
App\Domain\NewUserEmail: [redis]
App\Domain\ForgotPasswordEmail: [db, redis]
Expand Down Expand Up @@ -220,6 +240,40 @@ final class SimpleMessage
}
```

#### Routing via `#[AsMessage]` attribute

Instead of configuring routing in NEON, you can use the `#[AsMessage]` attribute directly on your message class.
The attribute-based routing is auto-discovered from handler method parameters. NEON config takes precedence over attributes.

```php
<?php declare(strict_types = 1);

namespace App\Domain;

use Symfony\Component\Messenger\Attribute\AsMessage;

#[AsMessage(transport: 'async')]
final class NewUserEmail
{

public function __construct(
public readonly string $email,
)
{
}

}
```

Multiple transports are also supported:

```php
#[AsMessage(transport: ['async', 'audit'])]
final class ImportantMessage
{
}
```

### Handlers

All handlers must be registered to your [DIC container](https://doc.nette.org/en/dependency-injection) via [Neon files](https://doc.nette.org/en/neon/format).<br>
Expand Down Expand Up @@ -341,8 +395,7 @@ extensions:

**Roadmap**

- No fallbackBus in RoutableMessageBus.
- No debug console commands.
- No Tracy debug panel integration (`TraceableMessageBus`).

## Examples

Expand Down
81 changes: 81 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# AGENTS.md

Nette DI integration for Symfony Messenger. Library and DI extension, not an application. Provides message buses, async transports, handler auto-discovery, retry strategies, and failure transport routing — all configured via Nette DI extension and compiled through ordered passes.

## Stack

- PHP >=8.2, Symfony 7.x/8.x, Nette DI ^3.1
- PHPStan level 9 (phpVersion 80200)
- Nette Tester (`.phpt` files)
- Contributte code style (`ruleset.xml`)

## Codebase

- `src/DI/` — main entry point; extension delegates to ordered passes in `Pass/`, each handling one concern (serializers, transports, routing, handlers, events, logging, console, buses, debug)
- `src/Bus/` — bus wrappers (message, command, query) and registry
- `src/Container/` — PSR-11 adapters for Nette DI
- `src/Handler/` — runtime handler locator with wildcard/interface/parent matching
- `src/Logger/` — dual HTTP/console logger bridge
- `tests/Cases/` — tests grouped by concern (DI/, Bus/, E2E/)
- `tests/Mocks/` — simple DTOs and handlers used as test doubles
- `tests/Toolkit/` — container builder and test helpers

## Architecture

- Extension delegates to ordered passes via load -> beforeCompile -> afterCompile hooks, each handling one concern
- Pass priority: serializers/transports -> routing/handlers -> events/logging/console -> buses/debug
- Modify the responsible pass, not the extension
- Config under `messenger:` key — schema defined in extension class, update schema first, then wire into the pass
- Config values can be class names, `@service` references, or DI statements; routing/failure transports validated at compile time
- Tag names defined as extension constants; service names follow `messenger.bus.<name>.*`, `messenger.transport.<name>`, `messenger.serializer.<name>` — preserve these
- Handlers registered via DI tag or `#[AsMessageHandler]`, message type inferred from first parameter type-hint (`__invoke`)
- Union/intersection types rejected; handlers grouped per bus, sorted by priority
- Runtime handler matching: concrete class, parents, interfaces, namespace wildcards, `*`
- Default middleware order: bus name stamp -> dispatch after current bus -> failed message processing -> [custom] -> send -> handle — preserve this order
- Transport factories registered only when corresponding Symfony bridge class exists
- Retry defaults to multiplier; event pass wires retry/failure listeners, reuses existing event dispatcher

## Code Style

- `<?php declare(strict_types = 1);` on one line in every file
- Indentation with tabs
- Contributte/Nette formatting enforced via `contributte/qa` ruleset (Slevomat Coding Standard)
- Type name must match file name
- Root namespaces: `Contributte\Messenger` for `src/`, `Tests` for `tests/`
- One `use` per line, alphabetically ordered, no unused imports
- No superfluous `Interface` suffix in production code (allowed in tests)
- Exception messages must be explicit — tests assert on them
- Avoid comments unless the logic is genuinely non-obvious
- Prefer small, focused changes inside the responsible pass or utility
- Run `make csf` to auto-fix before committing

## Testing

Nette Tester, not PHPUnit.

```bash
make tests # run all tests
make phpstan # static analysis
make csf # fix code style
make qa # phpstan + cs
```

**Always run `make cs phpstan tests` and fix all errors.**

### Conventions

- `.phpt` files with multiple test cases per file
- Containers built via toolkit: `Container::of()->withDefaults()->withCompiler(...)->build()`
- Inline NEON config via `Helpers::neon(<<<'NEON' ... NEON)`
- Assertions: `Assert::type()`, `Assert::count()`, `Assert::equal()`, `Assert::exception()`
- DI tests verify service registration, tags, and config validation errors
- E2E tests extend `TestCase` for full dispatch->handle workflows
- File naming: `MessengerExtension.{feature}.phpt`
- Mocks are simple DTOs/handlers with public properties for assertions
- Compiled containers written to `tests/tmp` for debugging

## Upstream

- Docs: https://symfony.com/doc/current/messenger.html
- Source: https://github.com/symfony/messenger
- Changelog: https://github.com/symfony/messenger/blob/8.1/CHANGELOG.md
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
@AGENTS.md
24 changes: 13 additions & 11 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,29 @@
],
"require": {
"ext-json": "*",
"php": ">=8.0",
"php": ">=8.2",
"nette/di": "^3.1.2",
"psr/container": "^2.0.2",
"psr/cache": "^2.0.0 || ^3.0.0",
"psr/log": "^2.0.0 || ^3.0.0",
"symfony/messenger": "^6.0.19 || ^6.2.8",
"symfony/event-dispatcher": "^6.0.19 || ^6.2.8",
"symfony/console": "^6.0.19 || ^6.2.10",
"symfony/var-dumper": "^6.0.19 || ^6.2.10"
"symfony/messenger": "^7.4.7 || ^8.0.7",
"symfony/event-dispatcher": "^7.4.7 || ^8.0.7",
"symfony/console": "^7.4.7 || ^8.0.7",
"symfony/var-dumper": "^7.4.7 || ^8.0.7"
},
"require-dev": {
"psr/container": "^2.0.2",
"mockery/mockery": "^1.3.3",
"symfony/redis-messenger": "^6.0.19 || ^6.2.10",
"symfony/amqp-messenger": "^6.0.19 || ^6.2.8",
"symfony/doctrine-messenger": "^6.0.19 || ^6.2.10",
"symfony/redis-messenger": "^7.4.7 || ^8.0.7",
"symfony/amqp-messenger": "^7.4.7 || ^8.0.7",
"symfony/doctrine-messenger": "^7.4.7 || ^8.0.7",
"symfony/cache": "^7.4.7 || ^8.0.7",
"doctrine/dbal": "^4.4.2",
"doctrine/orm": "^3.6.2",
"contributte/console": "^0.10.0",
"contributte/event-dispatcher": "^0.9.0",
"contributte/qa": "^0.4.0",
"contributte/tester": "^0.3.0",
"contributte/phpstan": "^0.1.0",
"contributte/tester": "^0.4.0",
"contributte/phpstan": "^0.3.0",
"tracy/tracy": "^2.10.2"
},
"autoload": {
Expand Down
22 changes: 2 additions & 20 deletions phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ includes:

parameters:
level: 9
phpVersion: 80000
phpVersion: 80200

scanDirectories:
- src
Expand All @@ -21,25 +21,7 @@ parameters:
count: 1
path: src/DI/Pass/HandlerPass.php

-
message: """
#^Fetching class constant class of deprecated class Symfony\\\\Component\\\\Messenger\\\\Handler\\\\MessageHandlerInterface\\:
since Symfony 6\\.2, use the \\{@see AsMessageHandler\\} attribute instead$#
"""
count: 1
path: src/DI/Pass/HandlerPass.php

-
message: "#^Class ReflectionIntersectionType not found\\.$#"
count: 1
path: src/DI/Utils/Reflector.php

-
message: "#^Dead catch \\- ReflectionException is never thrown in the try block\\.$#"
count: 1
path: src/DI/Utils/Reflector.php

-
message: "#^PHPDoc tag @var for variable \\$type contains unknown class ReflectionIntersectionType\\.$#"
count: 1
path: src/DI/Utils/Reflector.php
path: src/DI/Utils/Reflector.php
2 changes: 1 addition & 1 deletion ruleset.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<ruleset name="Contributte" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="vendor/squizlabs/php_codesniffer/phpcs.xsd">
<!-- Rulesets -->
<rule ref="./vendor/contributte/qa/ruleset-8.0.xml"/>
<rule ref="./vendor/contributte/qa/ruleset-8.2.xml"/>

<!-- Rules -->
<rule ref="SlevomatCodingStandard.Files.TypeNameMatchesFileName">
Expand Down
21 changes: 15 additions & 6 deletions src/DI/MessengerExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
use Contributte\Messenger\DI\Pass\SerializerPass;
use Contributte\Messenger\DI\Pass\TransportFactoryPass;
use Contributte\Messenger\DI\Pass\TransportPass;
use Nette\DI\Definitions\Statement;
use Nette\DI\CompilerExtension;
use Nette\DI\Definitions\Statement;
use Nette\PhpGenerator\ClassType;
use Nette\Schema\Expect;
use Nette\Schema\Schema;
Expand Down Expand Up @@ -61,9 +61,9 @@ public function __construct()

public function getConfigSchema(): Schema
{
$expectClass = Expect::string()->required()->assert(fn ($input) => class_exists($input) || interface_exists($input));
$expectClass = Expect::string()->required()->assert(fn ($input) => is_string($input) && (class_exists($input) || interface_exists($input)));
$expectService = Expect::anyOf(
Expect::string()->required()->assert(fn ($input) => str_starts_with($input, '@') || class_exists($input) || interface_exists($input)),
Expect::string()->required()->assert(fn ($input) => is_string($input) && (str_starts_with($input, '@') || class_exists($input) || interface_exists($input))),
Expect::type(Statement::class)->required(),
);
$expectLoosyService = Expect::anyOf(
Expand All @@ -75,14 +75,24 @@ public function getConfigSchema(): Schema
'debug' => Expect::structure([
'panel' => Expect::bool(false),
]),
'worker' => Expect::structure([
'memoryLimit' => Expect::int()->nullable(),
'timeLimit' => Expect::int()->nullable(),
'messageLimit' => Expect::int()->nullable(),
'failureLimit' => Expect::int()->nullable(),
]),
'cache' => Expect::anyOf(
Expect::string()->required(),
Expect::type(Statement::class)->required(),
)->nullable(),
'fallbackBus' => Expect::string()->nullable(),
'bus' => Expect::arrayOf(
Expect::structure([
'defaultMiddlewares' => Expect::bool(true),
'middlewares' => Expect::arrayOf((clone $expectService)),
'allowNoHandlers' => Expect::bool(false),
'allowNoSenders' => Expect::bool(true),
'autowired' => Expect::bool(),
'class' => (clone $expectClass)->required(false)->assert(fn ($input) => is_subclass_of($input, MessageBusInterface::class), 'Specified bus class must implements "MessageBusInterface"'),
'class' => (clone $expectClass)->required(false)->assert(fn ($input) => is_string($input) && is_subclass_of($input, MessageBusInterface::class), 'Specified bus class must implements "MessageBusInterface"'),
'wrapper' => (clone $expectClass)->required(false),
]),
Expect::string()->required(),
Expand All @@ -91,7 +101,6 @@ public function getConfigSchema(): Schema
'defaultMiddlewares' => true,
'middlewares' => [],
'class' => null,
'autowired' => true,
'allowNoHandlers' => false,
'allowNoSenders' => true,
],
Expand Down
2 changes: 1 addition & 1 deletion src/DI/Pass/AbstractPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public function getContainerBuilder(): ContainerBuilder
return $this->extension->getContainerBuilder();
}

public function getConfig(): stdclass
public function getConfig(): stdClass
{
/** @var stdclass $ret */
$ret = (object) $this->extension->getConfig();
Expand Down
16 changes: 11 additions & 5 deletions src/DI/Pass/BusPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ public function loadPassConfiguration(): void
->addSetup('setLogger', [$this->prefix('@logger.logger')]);
}

// Register message bus
// Register message bus (not autowired — RoutableMessageBus is the autowired MessageBusInterface)
$builder->addDefinition($this->prefix(sprintf('bus.%s.bus', $name)))
->setFactory($busConfig->class ?? SymfonyMessageBus::class, [$middlewares])
->setAutowired($busConfig->autowired ?? count($builder->findByTag(MessengerExtension::BUS_TAG)) === 0)
->setAutowired(false)
->setTags([MessengerExtension::BUS_TAG => $name]);

// Register bus wrapper
Expand All @@ -95,10 +95,16 @@ public function loadPassConfiguration(): void
->setFactory(NetteContainer::class)
->setAutowired(false);

// Register routable bus (for CLI)
// Register routable bus — autowired as MessageBusInterface so that
// SyncTransport (and any other autowired consumer) routes through
// the correct bus based on BusNameStamp.
$fallbackBusName = $config->fallbackBus ?? null;
$builder->addDefinition($this->prefix('bus.routable'))
->setFactory(RoutableMessageBus::class, [$this->prefix('@bus.container')]) // @TODO fallbackBus
->setAutowired(false);
->setFactory(RoutableMessageBus::class, [
$this->prefix('@bus.container'),
$fallbackBusName !== null ? $this->prefix(sprintf('@bus.%s.bus', $fallbackBusName)) : null,
])
->setAutowired(true);

// Register bus registry
$builder->addDefinition($this->prefix('busRegistry'))
Expand Down
Loading
Loading