From 949d533afe4d1e2f4ae1891fb706991439546866 Mon Sep 17 00:00:00 2001 From: spiralbot Date: Thu, 30 May 2024 20:42:04 +0000 Subject: [PATCH] Merge branch refs/heads/master into feature/scopes --- composer.json | 16 +++++------ src/Bootloader/QueueBootloader.php | 16 ++++++----- src/Config/QueueConfig.php | 12 ++++++--- src/Interceptor/Consume/Core.php | 14 +++++++++- .../Consume/ErrorHandlerInterceptor.php | 27 +++++++++++++++++-- src/Interceptor/Consume/Handler.php | 25 +++++++++++------ src/Interceptor/Push/Core.php | 14 +++++++++- src/JobHandler.php | 3 ++- src/Queue.php | 16 ++++++++--- src/QueueManager.php | 17 +++++------- 10 files changed, 117 insertions(+), 43 deletions(-) diff --git a/composer.json b/composer.json index 337f1f1..c81c25c 100644 --- a/composer.json +++ b/composer.json @@ -29,12 +29,12 @@ "require": { "php": ">=8.1", "ext-json": "*", - "spiral/core": "^3.13", - "spiral/hmvc": "^3.13", - "spiral/serializer": "^3.13", - "spiral/snapshots": "^3.13", - "spiral/telemetry": "^3.13", - "spiral/tokenizer": "^3.13", + "spiral/core": "^3.14", + "spiral/hmvc": "^3.14", + "spiral/serializer": "^3.14", + "spiral/snapshots": "^3.14", + "spiral/telemetry": "^3.14", + "spiral/tokenizer": "^3.14", "spiral/attributes": "^2.8|^3.0", "doctrine/inflector": "^1.4|^2.0", "ramsey/uuid": "^4.2.3", @@ -53,12 +53,12 @@ "require-dev": { "phpunit/phpunit": "^10.1", "mockery/mockery": "^1.5", - "spiral/boot": "^3.13", + "spiral/boot": "^3.14", "vimeo/psalm": "^5.9" }, "extra": { "branch-alias": { - "dev-master": "3.13.x-dev" + "dev-master": "3.14.x-dev" } }, "config": { diff --git a/src/Bootloader/QueueBootloader.php b/src/Bootloader/QueueBootloader.php index 4014e8a..d78c904 100644 --- a/src/Bootloader/QueueBootloader.php +++ b/src/Bootloader/QueueBootloader.php @@ -10,7 +10,7 @@ use Spiral\Boot\Bootloader\Bootloader; use Spiral\Config\ConfiguratorInterface; use Spiral\Config\Patch\Append; -use Spiral\Core\{BinderInterface, FactoryInterface, InterceptableCore}; +use Spiral\Core\{BinderInterface, FactoryInterface, InterceptableCore, InterceptorPipeline}; use Spiral\Core\Container\Autowire; use Spiral\Core\CoreInterceptorInterface; use Spiral\Queue\{JobHandlerLocatorListener, @@ -20,13 +20,17 @@ QueueRegistry, SerializerLocatorListener, SerializerRegistryInterface}; +use Spiral\Interceptors\InterceptorInterface; use Spiral\Queue\Config\QueueConfig; use Spiral\Queue\ContainerRegistry; use Spiral\Queue\Core\QueueInjector; use Spiral\Queue\Driver\{NullDriver, SyncDriver}; use Spiral\Queue\Failed\{FailedJobHandlerInterface, LogFailedJobHandler}; use Spiral\Queue\HandlerRegistryInterface; -use Spiral\Queue\Interceptor\Consume\{Core as ConsumeCore, ErrorHandlerInterceptor, Handler, RetryPolicyInterceptor}; +use Spiral\Queue\Interceptor\Consume\Core as ConsumeCore; +use Spiral\Queue\Interceptor\Consume\ErrorHandlerInterceptor; +use Spiral\Queue\Interceptor\Consume\Handler; +use Spiral\Queue\Interceptor\Consume\RetryPolicyInterceptor; use Spiral\Telemetry\Bootloader\TelemetryBootloader; use Spiral\Telemetry\TracerFactoryInterface; use Spiral\Tokenizer\Bootloader\TokenizerListenerBootloader; @@ -139,7 +143,7 @@ protected function initHandler( TracerFactoryInterface $tracerFactory, ?EventDispatcherInterface $dispatcher = null, ): Handler { - $core = new InterceptableCore($core, $dispatcher); + $pipeline = (new InterceptorPipeline($dispatcher))->withHandler($core); foreach ($config->getConsumeInterceptors() as $interceptor) { if (\is_string($interceptor)) { @@ -148,11 +152,11 @@ protected function initHandler( $interceptor = $interceptor->resolve($factory); } - \assert($interceptor instanceof CoreInterceptorInterface); - $core->addInterceptor($interceptor); + \assert($interceptor instanceof CoreInterceptorInterface || $interceptor instanceof InterceptorInterface); + $pipeline->addInterceptor($interceptor); } - return new Handler($core, $tracerFactory); + return new Handler($pipeline, $tracerFactory); } private function initQueueConfig(EnvironmentInterface $env): void diff --git a/src/Config/QueueConfig.php b/src/Config/QueueConfig.php index 4a06591..30b930d 100644 --- a/src/Config/QueueConfig.php +++ b/src/Config/QueueConfig.php @@ -5,11 +5,17 @@ namespace Spiral\Queue\Config; use Spiral\Core\Container\Autowire; -use Spiral\Core\CoreInterceptorInterface; +use Spiral\Core\CoreInterceptorInterface as LegacyInterceptor; use Spiral\Core\InjectableConfig; +use Spiral\Interceptors\InterceptorInterface; use Spiral\Queue\Exception\InvalidArgumentException; use Spiral\Serializer\SerializerInterface; +/** + * @psalm-type TLegacyInterceptors = array|LegacyInterceptor|Autowire> + * @psalm-type TNewInterceptors = array|InterceptorInterface|Autowire> + * @psalm-type TInterceptors = TNewInterceptors|TLegacyInterceptors + */ final class QueueConfig extends InjectableConfig { public const CONFIG = 'queue'; @@ -41,7 +47,7 @@ public function getAliases(): array /** * Get consumer interceptors * - * @return array|CoreInterceptorInterface|Autowire> + * @return TInterceptors */ public function getConsumeInterceptors(): array { @@ -51,7 +57,7 @@ public function getConsumeInterceptors(): array /** * Get pusher interceptors * - * @return array|CoreInterceptorInterface|Autowire> + * @return TInterceptors */ public function getPushInterceptors(): array { diff --git a/src/Interceptor/Consume/Core.php b/src/Interceptor/Consume/Core.php index 670a3de..ce0c124 100644 --- a/src/Interceptor/Consume/Core.php +++ b/src/Interceptor/Consume/Core.php @@ -6,6 +6,8 @@ use Psr\EventDispatcher\EventDispatcherInterface; use Spiral\Core\CoreInterface; +use Spiral\Interceptors\Context\CallContext; +use Spiral\Interceptors\HandlerInterface; use Spiral\Queue\Event\JobProcessed; use Spiral\Queue\Event\JobProcessing; use Spiral\Queue\HandlerRegistryInterface; @@ -20,7 +22,7 @@ * headers: array * } */ -final class Core implements CoreInterface +final class Core implements CoreInterface, HandlerInterface { public function __construct( private readonly HandlerRegistryInterface $registry, @@ -30,6 +32,7 @@ public function __construct( /** * @param-assert TParameters $parameters + * @deprecated */ public function callAction(string $controller, string $action, array $parameters = []): mixed { @@ -49,6 +52,15 @@ public function callAction(string $controller, string $action, array $parameters return null; } + public function handle(CallContext $context): mixed + { + $args = $context->getArguments(); + $controller = $context->getTarget()->getPath()[0]; + $action = $context->getTarget()->getPath()[1]; + + return $this->callAction($controller, $action, $args); + } + /** * @param class-string $event * @param-assert TParameters $parameters diff --git a/src/Interceptor/Consume/ErrorHandlerInterceptor.php b/src/Interceptor/Consume/ErrorHandlerInterceptor.php index 0443d1e..967a92b 100644 --- a/src/Interceptor/Consume/ErrorHandlerInterceptor.php +++ b/src/Interceptor/Consume/ErrorHandlerInterceptor.php @@ -4,12 +4,15 @@ namespace Spiral\Queue\Interceptor\Consume; -use Spiral\Core\CoreInterceptorInterface; +use Spiral\Core\CoreInterceptorInterface as LegacyInterceptor; use Spiral\Core\CoreInterface; +use Spiral\Interceptors\Context\CallContextInterface; +use Spiral\Interceptors\HandlerInterface; +use Spiral\Interceptors\InterceptorInterface; use Spiral\Queue\Exception\StateException; use Spiral\Queue\Failed\FailedJobHandlerInterface; -final class ErrorHandlerInterceptor implements CoreInterceptorInterface +final class ErrorHandlerInterceptor implements LegacyInterceptor, InterceptorInterface { public function __construct( private readonly FailedJobHandlerInterface $handler @@ -35,4 +38,24 @@ public function process(string $name, string $action, array $parameters, CoreInt throw $e; } } + + public function intercept(CallContextInterface $context, HandlerInterface $handler): mixed + { + try { + return $handler->handle($context); + } catch (\Throwable $e) { + $args = $context->getArguments(); + if (!$e instanceof StateException) { + $this->handler->handle( + $args['driver'], + $args['queue'], + $context->getTarget()->getPath()[0], + $args['payload'], + $e, + ); + } + + throw $e; + } + } } diff --git a/src/Interceptor/Consume/Handler.php b/src/Interceptor/Consume/Handler.php index 92c74a4..b09a94a 100644 --- a/src/Interceptor/Consume/Handler.php +++ b/src/Interceptor/Consume/Handler.php @@ -6,6 +6,9 @@ use Spiral\Core\Container; use Spiral\Core\CoreInterface; +use Spiral\Interceptors\Context\CallContext; +use Spiral\Interceptors\Context\Target; +use Spiral\Interceptors\HandlerInterface; use Spiral\Telemetry\NullTracerFactory; use Spiral\Telemetry\TraceKind; use Spiral\Telemetry\TracerFactoryInterface; @@ -17,12 +20,14 @@ final class Handler { private readonly TracerFactoryInterface $tracerFactory; + private readonly bool $isLegacy; public function __construct( - private readonly CoreInterface $core, + private readonly HandlerInterface|CoreInterface $core, ?TracerFactoryInterface $tracerFactory = null, ) { $this->tracerFactory = $tracerFactory ?? new NullTracerFactory(new Container()); + $this->isLegacy = !$core instanceof HandlerInterface; } public function handle( @@ -35,15 +40,19 @@ public function handle( ): mixed { $tracer = $this->tracerFactory->make($headers); + $arguments = [ + 'driver' => $driver, + 'queue' => $queue, + 'id' => $id, + 'payload' => $payload, + 'headers' => $headers, + ]; + return $tracer->trace( name: \sprintf('Job handling [%s:%s]', $name, $id), - callback: fn (): mixed => $this->core->callAction($name, 'handle', [ - 'driver' => $driver, - 'queue' => $queue, - 'id' => $id, - 'payload' => $payload, - 'headers' => $headers, - ]), + callback: $this->isLegacy + ? fn (): mixed => $this->core->callAction($name, 'handle', $arguments) + : fn (): mixed => $this->core->handle(new CallContext(Target::fromPair($name, 'handle'), $arguments)), attributes: [ 'queue.driver' => $driver, 'queue.name' => $queue, diff --git a/src/Interceptor/Push/Core.php b/src/Interceptor/Push/Core.php index 63f2bbe..dada254 100644 --- a/src/Interceptor/Push/Core.php +++ b/src/Interceptor/Push/Core.php @@ -6,6 +6,8 @@ use Spiral\Core\ContainerScope; use Spiral\Core\CoreInterface; +use Spiral\Interceptors\Context\CallContext; +use Spiral\Interceptors\HandlerInterface; use Spiral\Queue\Options; use Spiral\Queue\OptionsInterface; use Spiral\Queue\QueueInterface; @@ -16,7 +18,7 @@ * @internal * @psalm-type TParameters = array{options: ?OptionsInterface, payload: mixed} */ -final class Core implements CoreInterface +final class Core implements CoreInterface, HandlerInterface { public function __construct( private readonly QueueInterface $connection, @@ -25,6 +27,7 @@ public function __construct( /** * @param-assert TParameters $parameters + * @deprecated */ public function callAction( string $controller, @@ -56,6 +59,15 @@ public function callAction( ); } + public function handle(CallContext $context): mixed + { + $args = $context->getArguments(); + $controller = $context->getTarget()->getPath()[0]; + $action = $context->getTarget()->getPath()[1]; + + return $this->callAction($controller, $action, $args); + } + private function getTracer(): TracerInterface { try { diff --git a/src/JobHandler.php b/src/JobHandler.php index 29d10f9..a89bced 100644 --- a/src/JobHandler.php +++ b/src/JobHandler.php @@ -4,6 +4,7 @@ namespace Spiral\Queue; +use Spiral\Core\Attribute\Proxy; use Spiral\Core\InvokerInterface; use Spiral\Queue\Exception\JobException; @@ -18,7 +19,7 @@ abstract class JobHandler implements HandlerInterface protected const HANDLE_FUNCTION = 'invoke'; public function __construct( - protected InvokerInterface $invoker, + #[Proxy] protected InvokerInterface $invoker, ) { } diff --git a/src/Queue.php b/src/Queue.php index ee7583d..2978e7f 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -5,6 +5,9 @@ namespace Spiral\Queue; use Spiral\Core\CoreInterface; +use Spiral\Interceptors\Context\CallContext; +use Spiral\Interceptors\Context\Target; +use Spiral\Interceptors\HandlerInterface as InterceptorHandler; /** * This class is used to push jobs into the queue and pass them through the interceptor chain @@ -15,16 +18,23 @@ */ final class Queue implements QueueInterface { + private readonly bool $isLegacy; + public function __construct( - private readonly CoreInterface $core, + private readonly CoreInterface|InterceptorHandler $core, ) { + $this->isLegacy = !$core instanceof HandlerInterface; } public function push(string $name, mixed $payload = [], mixed $options = null): string { - return $this->core->callAction($name, 'push', [ + $arguments = [ 'payload' => $payload, 'options' => $options, - ]); + ]; + + return $this->isLegacy + ? $this->core->callAction($name, 'push', $arguments) + : $this->core->handle(new CallContext(Target::fromPair($name, 'push'), $arguments)); } } diff --git a/src/QueueManager.php b/src/QueueManager.php index 5a686f6..97b1be5 100644 --- a/src/QueueManager.php +++ b/src/QueueManager.php @@ -7,10 +7,11 @@ use Psr\Container\ContainerInterface; use Psr\EventDispatcher\EventDispatcherInterface; use Spiral\Core\Container\Autowire; -use Spiral\Core\CoreInterceptorInterface; +use Spiral\Core\CoreInterceptorInterface as LegacyInterceptor; use Spiral\Core\Exception\Container\ContainerException; use Spiral\Core\FactoryInterface; -use Spiral\Core\InterceptableCore; +use Spiral\Core\InterceptorPipeline; +use Spiral\Interceptors\InterceptorInterface; use Spiral\Queue\Config\QueueConfig; use Spiral\Queue\Interceptor\Push\Core as PushCore; @@ -50,22 +51,18 @@ private function resolveConnection(string $name): QueueInterface try { $driver = $this->factory->make($config['driver'], $config); - - $core = new InterceptableCore( - new PushCore($driver), - $this->dispatcher - ); + $pipeline = (new InterceptorPipeline($this->dispatcher))->withHandler(new PushCore($driver)); foreach ($this->config->getPushInterceptors() as $interceptor) { if (\is_string($interceptor) || $interceptor instanceof Autowire) { $interceptor = $this->container->get($interceptor); } - \assert($interceptor instanceof CoreInterceptorInterface); - $core->addInterceptor($interceptor); + \assert($interceptor instanceof LegacyInterceptor || $interceptor instanceof InterceptorInterface); + $pipeline->addInterceptor($interceptor); } - return new Queue($core); + return new Queue($pipeline); } catch (ContainerException $e) { throw new Exception\NotSupportedDriverException( \sprintf(