Skip to content

Commit

Permalink
Merge commit b3d9511b716c7b8631b59796b83c60767596a6ba into new-master
Browse files Browse the repository at this point in the history
  • Loading branch information
spiralbot committed May 15, 2024
1 parent 45f712b commit 0ec0310
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 107 deletions.
16 changes: 6 additions & 10 deletions src/Bootloader/QueueBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Spiral\Boot\Bootloader\Bootloader;
use Spiral\Config\ConfiguratorInterface;
use Spiral\Config\Patch\Append;
use Spiral\Core\{BinderInterface, FactoryInterface, InterceptableCore, InterceptorPipeline};
use Spiral\Core\{BinderInterface, FactoryInterface, InterceptableCore};
use Spiral\Core\Container\Autowire;
use Spiral\Core\CoreInterceptorInterface;
use Spiral\Queue\{JobHandlerLocatorListener,
Expand All @@ -20,17 +20,13 @@
QueueRegistry,
SerializerLocatorListener,
SerializerRegistryInterface};
use Spiral\Interceptors\InterceptorInterface;
use Spiral\Queue\Config\QueueConfig;
use Spiral\Queue\ContainerRegistry;
use Spiral\Queue\Core\QueueInjector;
use Spiral\Queue\Driver\{NullDriver, SyncDriver};
use Spiral\Queue\Failed\{FailedJobHandlerInterface, LogFailedJobHandler};
use Spiral\Queue\HandlerRegistryInterface;
use Spiral\Queue\Interceptor\Consume\Core as ConsumeCore;
use Spiral\Queue\Interceptor\Consume\ErrorHandlerInterceptor;
use Spiral\Queue\Interceptor\Consume\Handler;
use Spiral\Queue\Interceptor\Consume\RetryPolicyInterceptor;
use Spiral\Queue\Interceptor\Consume\{Core as ConsumeCore, ErrorHandlerInterceptor, Handler, RetryPolicyInterceptor};
use Spiral\Telemetry\Bootloader\TelemetryBootloader;
use Spiral\Telemetry\TracerFactoryInterface;
use Spiral\Tokenizer\Bootloader\TokenizerListenerBootloader;
Expand Down Expand Up @@ -143,7 +139,7 @@ protected function initHandler(
TracerFactoryInterface $tracerFactory,
?EventDispatcherInterface $dispatcher = null,
): Handler {
$pipeline = (new InterceptorPipeline($dispatcher))->withHandler($core);
$core = new InterceptableCore($core, $dispatcher);

foreach ($config->getConsumeInterceptors() as $interceptor) {
if (\is_string($interceptor)) {
Expand All @@ -152,11 +148,11 @@ protected function initHandler(
$interceptor = $interceptor->resolve($factory);
}

\assert($interceptor instanceof CoreInterceptorInterface || $interceptor instanceof InterceptorInterface);
$pipeline->addInterceptor($interceptor);
\assert($interceptor instanceof CoreInterceptorInterface);
$core->addInterceptor($interceptor);
}

return new Handler($pipeline, $tracerFactory);
return new Handler($core, $tracerFactory);
}

private function initQueueConfig(EnvironmentInterface $env): void
Expand Down
12 changes: 3 additions & 9 deletions src/Config/QueueConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,11 @@
namespace Spiral\Queue\Config;

use Spiral\Core\Container\Autowire;
use Spiral\Core\CoreInterceptorInterface as LegacyInterceptor;
use Spiral\Core\CoreInterceptorInterface;
use Spiral\Core\InjectableConfig;
use Spiral\Interceptors\InterceptorInterface;
use Spiral\Queue\Exception\InvalidArgumentException;
use Spiral\Serializer\SerializerInterface;

/**
* @psalm-type TLegacyInterceptors = array<class-string<LegacyInterceptor>|LegacyInterceptor|Autowire>
* @psalm-type TNewInterceptors = array<class-string<InterceptorInterface>|InterceptorInterface|Autowire>
* @psalm-type TInterceptors = TNewInterceptors|TLegacyInterceptors
*/
final class QueueConfig extends InjectableConfig
{
public const CONFIG = 'queue';
Expand Down Expand Up @@ -47,7 +41,7 @@ public function getAliases(): array
/**
* Get consumer interceptors
*
* @return TInterceptors
* @return array<class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire>
*/
public function getConsumeInterceptors(): array
{
Expand All @@ -57,7 +51,7 @@ public function getConsumeInterceptors(): array
/**
* Get pusher interceptors
*
* @return TInterceptors
* @return array<class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire>
*/
public function getPushInterceptors(): array
{
Expand Down
14 changes: 1 addition & 13 deletions src/Interceptor/Consume/Core.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

use Psr\EventDispatcher\EventDispatcherInterface;
use Spiral\Core\CoreInterface;
use Spiral\Interceptors\Context\CallContext;
use Spiral\Interceptors\HandlerInterface;
use Spiral\Queue\Event\JobProcessed;
use Spiral\Queue\Event\JobProcessing;
use Spiral\Queue\HandlerRegistryInterface;
Expand All @@ -22,7 +20,7 @@
* headers: array
* }
*/
final class Core implements CoreInterface, HandlerInterface
final class Core implements CoreInterface
{
public function __construct(
private readonly HandlerRegistryInterface $registry,
Expand All @@ -32,7 +30,6 @@ public function __construct(

/**
* @param-assert TParameters $parameters
* @deprecated
*/
public function callAction(string $controller, string $action, array $parameters = []): mixed
{
Expand All @@ -52,15 +49,6 @@ public function callAction(string $controller, string $action, array $parameters
return null;
}

public function handle(CallContext $context): mixed
{
$args = $context->getArguments();
$controller = $context->getTarget()->getPath()[0];
$action = $context->getTarget()->getPath()[1];

return $this->callAction($controller, $action, $args);
}

/**
* @param class-string $event
* @param-assert TParameters $parameters
Expand Down
27 changes: 2 additions & 25 deletions src/Interceptor/Consume/ErrorHandlerInterceptor.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@

namespace Spiral\Queue\Interceptor\Consume;

use Spiral\Core\CoreInterceptorInterface as LegacyInterceptor;
use Spiral\Core\CoreInterceptorInterface;
use Spiral\Core\CoreInterface;
use Spiral\Interceptors\Context\CallContextInterface;
use Spiral\Interceptors\HandlerInterface;
use Spiral\Interceptors\InterceptorInterface;
use Spiral\Queue\Exception\StateException;
use Spiral\Queue\Failed\FailedJobHandlerInterface;

final class ErrorHandlerInterceptor implements LegacyInterceptor, InterceptorInterface
final class ErrorHandlerInterceptor implements CoreInterceptorInterface
{
public function __construct(
private readonly FailedJobHandlerInterface $handler
Expand All @@ -38,24 +35,4 @@ public function process(string $name, string $action, array $parameters, CoreInt
throw $e;
}
}

public function intercept(CallContextInterface $context, HandlerInterface $handler): mixed
{
try {
return $handler->handle($context);
} catch (\Throwable $e) {
$args = $context->getArguments();
if (!$e instanceof StateException) {
$this->handler->handle(
$args['driver'],
$args['queue'],
$context->getTarget()->getPath()[0],
$args['payload'],
$e,
);
}

throw $e;
}
}
}
25 changes: 8 additions & 17 deletions src/Interceptor/Consume/Handler.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@

use Spiral\Core\Container;
use Spiral\Core\CoreInterface;
use Spiral\Interceptors\Context\CallContext;
use Spiral\Interceptors\Context\Target;
use Spiral\Interceptors\HandlerInterface;
use Spiral\Telemetry\NullTracerFactory;
use Spiral\Telemetry\TraceKind;
use Spiral\Telemetry\TracerFactoryInterface;
Expand All @@ -20,14 +17,12 @@
final class Handler
{
private readonly TracerFactoryInterface $tracerFactory;
private readonly bool $isLegacy;

public function __construct(
private readonly HandlerInterface|CoreInterface $core,
private readonly CoreInterface $core,
?TracerFactoryInterface $tracerFactory = null,
) {
$this->tracerFactory = $tracerFactory ?? new NullTracerFactory(new Container());
$this->isLegacy = !$core instanceof HandlerInterface;
}

public function handle(
Expand All @@ -40,19 +35,15 @@ public function handle(
): mixed {
$tracer = $this->tracerFactory->make($headers);

$arguments = [
'driver' => $driver,
'queue' => $queue,
'id' => $id,
'payload' => $payload,
'headers' => $headers,
];

return $tracer->trace(
name: \sprintf('Job handling [%s:%s]', $name, $id),
callback: $this->isLegacy
? fn (): mixed => $this->core->callAction($name, 'handle', $arguments)
: fn (): mixed => $this->core->handle(new CallContext(Target::fromPair($name, 'handle'), $arguments)),
callback: fn (): mixed => $this->core->callAction($name, 'handle', [
'driver' => $driver,
'queue' => $queue,
'id' => $id,
'payload' => $payload,
'headers' => $headers,
]),
attributes: [
'queue.driver' => $driver,
'queue.name' => $queue,
Expand Down
14 changes: 1 addition & 13 deletions src/Interceptor/Push/Core.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

use Spiral\Core\ContainerScope;
use Spiral\Core\CoreInterface;
use Spiral\Interceptors\Context\CallContext;
use Spiral\Interceptors\HandlerInterface;
use Spiral\Queue\Options;
use Spiral\Queue\OptionsInterface;
use Spiral\Queue\QueueInterface;
Expand All @@ -18,7 +16,7 @@
* @internal
* @psalm-type TParameters = array{options: ?OptionsInterface, payload: mixed}
*/
final class Core implements CoreInterface, HandlerInterface
final class Core implements CoreInterface
{
public function __construct(
private readonly QueueInterface $connection,
Expand All @@ -27,7 +25,6 @@ public function __construct(

/**
* @param-assert TParameters $parameters
* @deprecated
*/
public function callAction(
string $controller,
Expand Down Expand Up @@ -59,15 +56,6 @@ public function callAction(
);
}

public function handle(CallContext $context): mixed
{
$args = $context->getArguments();
$controller = $context->getTarget()->getPath()[0];
$action = $context->getTarget()->getPath()[1];

return $this->callAction($controller, $action, $args);
}

private function getTracer(): TracerInterface
{
try {
Expand Down
16 changes: 3 additions & 13 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
namespace Spiral\Queue;

use Spiral\Core\CoreInterface;
use Spiral\Interceptors\Context\CallContext;
use Spiral\Interceptors\Context\Target;
use Spiral\Interceptors\HandlerInterface as InterceptorHandler;

/**
* This class is used to push jobs into the queue and pass them through the interceptor chain
Expand All @@ -18,23 +15,16 @@
*/
final class Queue implements QueueInterface
{
private readonly bool $isLegacy;

public function __construct(
private readonly CoreInterface|InterceptorHandler $core,
private readonly CoreInterface $core,
) {
$this->isLegacy = !$core instanceof HandlerInterface;
}

public function push(string $name, mixed $payload = [], mixed $options = null): string
{
$arguments = [
return $this->core->callAction($name, 'push', [
'payload' => $payload,
'options' => $options,
];

return $this->isLegacy
? $this->core->callAction($name, 'push', $arguments)
: $this->core->handle(new CallContext(Target::fromPair($name, 'push'), $arguments));
]);
}
}
17 changes: 10 additions & 7 deletions src/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Spiral\Core\Container\Autowire;
use Spiral\Core\CoreInterceptorInterface as LegacyInterceptor;
use Spiral\Core\CoreInterceptorInterface;
use Spiral\Core\Exception\Container\ContainerException;
use Spiral\Core\FactoryInterface;
use Spiral\Core\InterceptorPipeline;
use Spiral\Interceptors\InterceptorInterface;
use Spiral\Core\InterceptableCore;
use Spiral\Queue\Config\QueueConfig;
use Spiral\Queue\Interceptor\Push\Core as PushCore;

Expand Down Expand Up @@ -51,18 +50,22 @@ private function resolveConnection(string $name): QueueInterface

try {
$driver = $this->factory->make($config['driver'], $config);
$pipeline = (new InterceptorPipeline($this->dispatcher))->withHandler(new PushCore($driver));

$core = new InterceptableCore(
new PushCore($driver),
$this->dispatcher
);

foreach ($this->config->getPushInterceptors() as $interceptor) {
if (\is_string($interceptor) || $interceptor instanceof Autowire) {
$interceptor = $this->container->get($interceptor);
}

\assert($interceptor instanceof LegacyInterceptor || $interceptor instanceof InterceptorInterface);
$pipeline->addInterceptor($interceptor);
\assert($interceptor instanceof CoreInterceptorInterface);
$core->addInterceptor($interceptor);
}

return new Queue($pipeline);
return new Queue($core);
} catch (ContainerException $e) {
throw new Exception\NotSupportedDriverException(
\sprintf(
Expand Down

0 comments on commit 0ec0310

Please sign in to comment.