From 0ec03105092a52077402210c6b51b3d6ed3070a5 Mon Sep 17 00:00:00 2001 From: spiralbot Date: Wed, 15 May 2024 20:34:16 +0000 Subject: [PATCH] Merge commit b3d9511b716c7b8631b59796b83c60767596a6ba into new-master --- src/Bootloader/QueueBootloader.php | 16 +++++------ src/Config/QueueConfig.php | 12 +++------ src/Interceptor/Consume/Core.php | 14 +--------- .../Consume/ErrorHandlerInterceptor.php | 27 ++----------------- src/Interceptor/Consume/Handler.php | 25 ++++++----------- src/Interceptor/Push/Core.php | 14 +--------- src/Queue.php | 16 +++-------- src/QueueManager.php | 17 +++++++----- 8 files changed, 34 insertions(+), 107 deletions(-) diff --git a/src/Bootloader/QueueBootloader.php b/src/Bootloader/QueueBootloader.php index d78c904..4014e8a 100644 --- a/src/Bootloader/QueueBootloader.php +++ b/src/Bootloader/QueueBootloader.php @@ -10,7 +10,7 @@ use Spiral\Boot\Bootloader\Bootloader; use Spiral\Config\ConfiguratorInterface; use Spiral\Config\Patch\Append; -use Spiral\Core\{BinderInterface, FactoryInterface, InterceptableCore, InterceptorPipeline}; +use Spiral\Core\{BinderInterface, FactoryInterface, InterceptableCore}; use Spiral\Core\Container\Autowire; use Spiral\Core\CoreInterceptorInterface; use Spiral\Queue\{JobHandlerLocatorListener, @@ -20,17 +20,13 @@ QueueRegistry, SerializerLocatorListener, SerializerRegistryInterface}; -use Spiral\Interceptors\InterceptorInterface; use Spiral\Queue\Config\QueueConfig; use Spiral\Queue\ContainerRegistry; use Spiral\Queue\Core\QueueInjector; use Spiral\Queue\Driver\{NullDriver, SyncDriver}; use Spiral\Queue\Failed\{FailedJobHandlerInterface, LogFailedJobHandler}; use Spiral\Queue\HandlerRegistryInterface; -use Spiral\Queue\Interceptor\Consume\Core as ConsumeCore; -use Spiral\Queue\Interceptor\Consume\ErrorHandlerInterceptor; -use Spiral\Queue\Interceptor\Consume\Handler; -use Spiral\Queue\Interceptor\Consume\RetryPolicyInterceptor; +use Spiral\Queue\Interceptor\Consume\{Core as ConsumeCore, ErrorHandlerInterceptor, Handler, RetryPolicyInterceptor}; use Spiral\Telemetry\Bootloader\TelemetryBootloader; use Spiral\Telemetry\TracerFactoryInterface; use Spiral\Tokenizer\Bootloader\TokenizerListenerBootloader; @@ -143,7 +139,7 @@ protected function initHandler( TracerFactoryInterface $tracerFactory, ?EventDispatcherInterface $dispatcher = null, ): Handler { - $pipeline = (new InterceptorPipeline($dispatcher))->withHandler($core); + $core = new InterceptableCore($core, $dispatcher); foreach ($config->getConsumeInterceptors() as $interceptor) { if (\is_string($interceptor)) { @@ -152,11 +148,11 @@ protected function initHandler( $interceptor = $interceptor->resolve($factory); } - \assert($interceptor instanceof CoreInterceptorInterface || $interceptor instanceof InterceptorInterface); - $pipeline->addInterceptor($interceptor); + \assert($interceptor instanceof CoreInterceptorInterface); + $core->addInterceptor($interceptor); } - return new Handler($pipeline, $tracerFactory); + return new Handler($core, $tracerFactory); } private function initQueueConfig(EnvironmentInterface $env): void diff --git a/src/Config/QueueConfig.php b/src/Config/QueueConfig.php index 30b930d..4a06591 100644 --- a/src/Config/QueueConfig.php +++ b/src/Config/QueueConfig.php @@ -5,17 +5,11 @@ namespace Spiral\Queue\Config; use Spiral\Core\Container\Autowire; -use Spiral\Core\CoreInterceptorInterface as LegacyInterceptor; +use Spiral\Core\CoreInterceptorInterface; use Spiral\Core\InjectableConfig; -use Spiral\Interceptors\InterceptorInterface; use Spiral\Queue\Exception\InvalidArgumentException; use Spiral\Serializer\SerializerInterface; -/** - * @psalm-type TLegacyInterceptors = array|LegacyInterceptor|Autowire> - * @psalm-type TNewInterceptors = array|InterceptorInterface|Autowire> - * @psalm-type TInterceptors = TNewInterceptors|TLegacyInterceptors - */ final class QueueConfig extends InjectableConfig { public const CONFIG = 'queue'; @@ -47,7 +41,7 @@ public function getAliases(): array /** * Get consumer interceptors * - * @return TInterceptors + * @return array|CoreInterceptorInterface|Autowire> */ public function getConsumeInterceptors(): array { @@ -57,7 +51,7 @@ public function getConsumeInterceptors(): array /** * Get pusher interceptors * - * @return TInterceptors + * @return array|CoreInterceptorInterface|Autowire> */ public function getPushInterceptors(): array { diff --git a/src/Interceptor/Consume/Core.php b/src/Interceptor/Consume/Core.php index ce0c124..670a3de 100644 --- a/src/Interceptor/Consume/Core.php +++ b/src/Interceptor/Consume/Core.php @@ -6,8 +6,6 @@ use Psr\EventDispatcher\EventDispatcherInterface; use Spiral\Core\CoreInterface; -use Spiral\Interceptors\Context\CallContext; -use Spiral\Interceptors\HandlerInterface; use Spiral\Queue\Event\JobProcessed; use Spiral\Queue\Event\JobProcessing; use Spiral\Queue\HandlerRegistryInterface; @@ -22,7 +20,7 @@ * headers: array * } */ -final class Core implements CoreInterface, HandlerInterface +final class Core implements CoreInterface { public function __construct( private readonly HandlerRegistryInterface $registry, @@ -32,7 +30,6 @@ public function __construct( /** * @param-assert TParameters $parameters - * @deprecated */ public function callAction(string $controller, string $action, array $parameters = []): mixed { @@ -52,15 +49,6 @@ public function callAction(string $controller, string $action, array $parameters return null; } - public function handle(CallContext $context): mixed - { - $args = $context->getArguments(); - $controller = $context->getTarget()->getPath()[0]; - $action = $context->getTarget()->getPath()[1]; - - return $this->callAction($controller, $action, $args); - } - /** * @param class-string $event * @param-assert TParameters $parameters diff --git a/src/Interceptor/Consume/ErrorHandlerInterceptor.php b/src/Interceptor/Consume/ErrorHandlerInterceptor.php index 967a92b..0443d1e 100644 --- a/src/Interceptor/Consume/ErrorHandlerInterceptor.php +++ b/src/Interceptor/Consume/ErrorHandlerInterceptor.php @@ -4,15 +4,12 @@ namespace Spiral\Queue\Interceptor\Consume; -use Spiral\Core\CoreInterceptorInterface as LegacyInterceptor; +use Spiral\Core\CoreInterceptorInterface; use Spiral\Core\CoreInterface; -use Spiral\Interceptors\Context\CallContextInterface; -use Spiral\Interceptors\HandlerInterface; -use Spiral\Interceptors\InterceptorInterface; use Spiral\Queue\Exception\StateException; use Spiral\Queue\Failed\FailedJobHandlerInterface; -final class ErrorHandlerInterceptor implements LegacyInterceptor, InterceptorInterface +final class ErrorHandlerInterceptor implements CoreInterceptorInterface { public function __construct( private readonly FailedJobHandlerInterface $handler @@ -38,24 +35,4 @@ public function process(string $name, string $action, array $parameters, CoreInt throw $e; } } - - public function intercept(CallContextInterface $context, HandlerInterface $handler): mixed - { - try { - return $handler->handle($context); - } catch (\Throwable $e) { - $args = $context->getArguments(); - if (!$e instanceof StateException) { - $this->handler->handle( - $args['driver'], - $args['queue'], - $context->getTarget()->getPath()[0], - $args['payload'], - $e, - ); - } - - throw $e; - } - } } diff --git a/src/Interceptor/Consume/Handler.php b/src/Interceptor/Consume/Handler.php index b09a94a..92c74a4 100644 --- a/src/Interceptor/Consume/Handler.php +++ b/src/Interceptor/Consume/Handler.php @@ -6,9 +6,6 @@ use Spiral\Core\Container; use Spiral\Core\CoreInterface; -use Spiral\Interceptors\Context\CallContext; -use Spiral\Interceptors\Context\Target; -use Spiral\Interceptors\HandlerInterface; use Spiral\Telemetry\NullTracerFactory; use Spiral\Telemetry\TraceKind; use Spiral\Telemetry\TracerFactoryInterface; @@ -20,14 +17,12 @@ final class Handler { private readonly TracerFactoryInterface $tracerFactory; - private readonly bool $isLegacy; public function __construct( - private readonly HandlerInterface|CoreInterface $core, + private readonly CoreInterface $core, ?TracerFactoryInterface $tracerFactory = null, ) { $this->tracerFactory = $tracerFactory ?? new NullTracerFactory(new Container()); - $this->isLegacy = !$core instanceof HandlerInterface; } public function handle( @@ -40,19 +35,15 @@ public function handle( ): mixed { $tracer = $this->tracerFactory->make($headers); - $arguments = [ - 'driver' => $driver, - 'queue' => $queue, - 'id' => $id, - 'payload' => $payload, - 'headers' => $headers, - ]; - return $tracer->trace( name: \sprintf('Job handling [%s:%s]', $name, $id), - callback: $this->isLegacy - ? fn (): mixed => $this->core->callAction($name, 'handle', $arguments) - : fn (): mixed => $this->core->handle(new CallContext(Target::fromPair($name, 'handle'), $arguments)), + callback: fn (): mixed => $this->core->callAction($name, 'handle', [ + 'driver' => $driver, + 'queue' => $queue, + 'id' => $id, + 'payload' => $payload, + 'headers' => $headers, + ]), attributes: [ 'queue.driver' => $driver, 'queue.name' => $queue, diff --git a/src/Interceptor/Push/Core.php b/src/Interceptor/Push/Core.php index dada254..63f2bbe 100644 --- a/src/Interceptor/Push/Core.php +++ b/src/Interceptor/Push/Core.php @@ -6,8 +6,6 @@ use Spiral\Core\ContainerScope; use Spiral\Core\CoreInterface; -use Spiral\Interceptors\Context\CallContext; -use Spiral\Interceptors\HandlerInterface; use Spiral\Queue\Options; use Spiral\Queue\OptionsInterface; use Spiral\Queue\QueueInterface; @@ -18,7 +16,7 @@ * @internal * @psalm-type TParameters = array{options: ?OptionsInterface, payload: mixed} */ -final class Core implements CoreInterface, HandlerInterface +final class Core implements CoreInterface { public function __construct( private readonly QueueInterface $connection, @@ -27,7 +25,6 @@ public function __construct( /** * @param-assert TParameters $parameters - * @deprecated */ public function callAction( string $controller, @@ -59,15 +56,6 @@ public function callAction( ); } - public function handle(CallContext $context): mixed - { - $args = $context->getArguments(); - $controller = $context->getTarget()->getPath()[0]; - $action = $context->getTarget()->getPath()[1]; - - return $this->callAction($controller, $action, $args); - } - private function getTracer(): TracerInterface { try { diff --git a/src/Queue.php b/src/Queue.php index 2978e7f..ee7583d 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -5,9 +5,6 @@ namespace Spiral\Queue; use Spiral\Core\CoreInterface; -use Spiral\Interceptors\Context\CallContext; -use Spiral\Interceptors\Context\Target; -use Spiral\Interceptors\HandlerInterface as InterceptorHandler; /** * This class is used to push jobs into the queue and pass them through the interceptor chain @@ -18,23 +15,16 @@ */ final class Queue implements QueueInterface { - private readonly bool $isLegacy; - public function __construct( - private readonly CoreInterface|InterceptorHandler $core, + private readonly CoreInterface $core, ) { - $this->isLegacy = !$core instanceof HandlerInterface; } public function push(string $name, mixed $payload = [], mixed $options = null): string { - $arguments = [ + return $this->core->callAction($name, 'push', [ 'payload' => $payload, 'options' => $options, - ]; - - return $this->isLegacy - ? $this->core->callAction($name, 'push', $arguments) - : $this->core->handle(new CallContext(Target::fromPair($name, 'push'), $arguments)); + ]); } } diff --git a/src/QueueManager.php b/src/QueueManager.php index 97b1be5..5a686f6 100644 --- a/src/QueueManager.php +++ b/src/QueueManager.php @@ -7,11 +7,10 @@ use Psr\Container\ContainerInterface; use Psr\EventDispatcher\EventDispatcherInterface; use Spiral\Core\Container\Autowire; -use Spiral\Core\CoreInterceptorInterface as LegacyInterceptor; +use Spiral\Core\CoreInterceptorInterface; use Spiral\Core\Exception\Container\ContainerException; use Spiral\Core\FactoryInterface; -use Spiral\Core\InterceptorPipeline; -use Spiral\Interceptors\InterceptorInterface; +use Spiral\Core\InterceptableCore; use Spiral\Queue\Config\QueueConfig; use Spiral\Queue\Interceptor\Push\Core as PushCore; @@ -51,18 +50,22 @@ private function resolveConnection(string $name): QueueInterface try { $driver = $this->factory->make($config['driver'], $config); - $pipeline = (new InterceptorPipeline($this->dispatcher))->withHandler(new PushCore($driver)); + + $core = new InterceptableCore( + new PushCore($driver), + $this->dispatcher + ); foreach ($this->config->getPushInterceptors() as $interceptor) { if (\is_string($interceptor) || $interceptor instanceof Autowire) { $interceptor = $this->container->get($interceptor); } - \assert($interceptor instanceof LegacyInterceptor || $interceptor instanceof InterceptorInterface); - $pipeline->addInterceptor($interceptor); + \assert($interceptor instanceof CoreInterceptorInterface); + $core->addInterceptor($interceptor); } - return new Queue($pipeline); + return new Queue($core); } catch (ContainerException $e) { throw new Exception\NotSupportedDriverException( \sprintf(