Skip to content

Commit

Permalink
Merge pull request #1111: [Interceptors] Add PipelineBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
spiralbot committed Jun 26, 2024
1 parent 949d533 commit 6392642
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 27 deletions.
30 changes: 15 additions & 15 deletions src/Bootloader/QueueBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@
use Spiral\Boot\Bootloader\Bootloader;
use Spiral\Config\ConfiguratorInterface;
use Spiral\Config\Patch\Append;
use Spiral\Core\{BinderInterface, FactoryInterface, InterceptableCore, InterceptorPipeline};
use Spiral\Core\{BinderInterface, CompatiblePipelineBuilder, FactoryInterface, InterceptableCore, InterceptorPipeline};
use Spiral\Core\Container\Autowire;
use Spiral\Core\CoreInterceptorInterface;
use Spiral\Queue\{JobHandlerLocatorListener,
QueueConnectionProviderInterface,
QueueInterface,
QueueManager,
QueueRegistry,
SerializerLocatorListener,
SerializerRegistryInterface};
use Spiral\Interceptors\InterceptorInterface;
use Spiral\Queue\JobHandlerLocatorListener;
use Spiral\Queue\QueueConnectionProviderInterface;
use Spiral\Queue\QueueInterface;
use Spiral\Queue\QueueManager;
use Spiral\Queue\QueueRegistry;
use Spiral\Queue\SerializerLocatorListener;
use Spiral\Queue\SerializerRegistryInterface;
use Spiral\Interceptors\PipelineBuilderInterface;
use Spiral\Queue\Config\QueueConfig;
use Spiral\Queue\ContainerRegistry;
use Spiral\Queue\Core\QueueInjector;
Expand Down Expand Up @@ -142,20 +142,20 @@ protected function initHandler(
FactoryInterface $factory,
TracerFactoryInterface $tracerFactory,
?EventDispatcherInterface $dispatcher = null,
?PipelineBuilderInterface $builder = null,
): Handler {
$pipeline = (new InterceptorPipeline($dispatcher))->withHandler($core);
$builder ??= new CompatiblePipelineBuilder($dispatcher);

$list = [];
foreach ($config->getConsumeInterceptors() as $interceptor) {
if (\is_string($interceptor)) {
$interceptor = $container->get($interceptor);
$list[] = $container->get($interceptor);
} elseif ($interceptor instanceof Autowire) {
$interceptor = $interceptor->resolve($factory);
$list[] = $interceptor->resolve($factory);
}

\assert($interceptor instanceof CoreInterceptorInterface || $interceptor instanceof InterceptorInterface);
$pipeline->addInterceptor($interceptor);
}

$pipeline = $builder->withInterceptors(...$list)->build($core);
return new Handler($pipeline, $tracerFactory);
}

Expand Down
23 changes: 11 additions & 12 deletions src/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,28 @@

use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Spiral\Core\CompatiblePipelineBuilder;
use Spiral\Core\Container\Autowire;
use Spiral\Core\CoreInterceptorInterface as LegacyInterceptor;
use Spiral\Core\Exception\Container\ContainerException;
use Spiral\Core\FactoryInterface;
use Spiral\Core\InterceptorPipeline;
use Spiral\Interceptors\InterceptorInterface;
use Spiral\Interceptors\PipelineBuilderInterface;
use Spiral\Queue\Config\QueueConfig;
use Spiral\Queue\Interceptor\Push\Core as PushCore;

final class QueueManager implements QueueConnectionProviderInterface
{
/** @var QueueInterface[] */
private array $pipelines = [];
private PipelineBuilderInterface $builder;

public function __construct(
private readonly QueueConfig $config,
private readonly ContainerInterface $container,
private readonly FactoryInterface $factory,
private readonly ?EventDispatcherInterface $dispatcher = null
?EventDispatcherInterface $dispatcher = null,
?PipelineBuilderInterface $builder = null,
) {
$this->builder = $builder ?? new CompatiblePipelineBuilder($dispatcher);
}

public function getConnection(?string $name = null): QueueInterface
Expand All @@ -51,18 +53,15 @@ private function resolveConnection(string $name): QueueInterface

try {
$driver = $this->factory->make($config['driver'], $config);
$pipeline = (new InterceptorPipeline($this->dispatcher))->withHandler(new PushCore($driver));

$list = [];
foreach ($this->config->getPushInterceptors() as $interceptor) {
if (\is_string($interceptor) || $interceptor instanceof Autowire) {
$interceptor = $this->container->get($interceptor);
}

\assert($interceptor instanceof LegacyInterceptor || $interceptor instanceof InterceptorInterface);
$pipeline->addInterceptor($interceptor);
$list[] = \is_string($interceptor) || $interceptor instanceof Autowire
? $this->container->get($interceptor)
: $interceptor;
}

return new Queue($pipeline);
return new Queue($this->builder->withInterceptors(...$list)->build(new PushCore($driver)));
} catch (ContainerException $e) {
throw new Exception\NotSupportedDriverException(
\sprintf(
Expand Down

0 comments on commit 6392642

Please sign in to comment.