Skip to content

Commit

Permalink
Merge branch refs/heads/master into feature/scopes
Browse files Browse the repository at this point in the history
  • Loading branch information
spiralbot committed May 30, 2024
1 parent 3e0a4d3 commit 949d533
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 43 deletions.
16 changes: 8 additions & 8 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
"require": {
"php": ">=8.1",
"ext-json": "*",
"spiral/core": "^3.13",
"spiral/hmvc": "^3.13",
"spiral/serializer": "^3.13",
"spiral/snapshots": "^3.13",
"spiral/telemetry": "^3.13",
"spiral/tokenizer": "^3.13",
"spiral/core": "^3.14",
"spiral/hmvc": "^3.14",
"spiral/serializer": "^3.14",
"spiral/snapshots": "^3.14",
"spiral/telemetry": "^3.14",
"spiral/tokenizer": "^3.14",
"spiral/attributes": "^2.8|^3.0",
"doctrine/inflector": "^1.4|^2.0",
"ramsey/uuid": "^4.2.3",
Expand All @@ -53,12 +53,12 @@
"require-dev": {
"phpunit/phpunit": "^10.1",
"mockery/mockery": "^1.5",
"spiral/boot": "^3.13",
"spiral/boot": "^3.14",
"vimeo/psalm": "^5.9"
},
"extra": {
"branch-alias": {
"dev-master": "3.13.x-dev"
"dev-master": "3.14.x-dev"
}
},
"config": {
Expand Down
16 changes: 10 additions & 6 deletions src/Bootloader/QueueBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Spiral\Boot\Bootloader\Bootloader;
use Spiral\Config\ConfiguratorInterface;
use Spiral\Config\Patch\Append;
use Spiral\Core\{BinderInterface, FactoryInterface, InterceptableCore};
use Spiral\Core\{BinderInterface, FactoryInterface, InterceptableCore, InterceptorPipeline};
use Spiral\Core\Container\Autowire;
use Spiral\Core\CoreInterceptorInterface;
use Spiral\Queue\{JobHandlerLocatorListener,
Expand All @@ -20,13 +20,17 @@
QueueRegistry,
SerializerLocatorListener,
SerializerRegistryInterface};
use Spiral\Interceptors\InterceptorInterface;
use Spiral\Queue\Config\QueueConfig;
use Spiral\Queue\ContainerRegistry;
use Spiral\Queue\Core\QueueInjector;
use Spiral\Queue\Driver\{NullDriver, SyncDriver};
use Spiral\Queue\Failed\{FailedJobHandlerInterface, LogFailedJobHandler};
use Spiral\Queue\HandlerRegistryInterface;
use Spiral\Queue\Interceptor\Consume\{Core as ConsumeCore, ErrorHandlerInterceptor, Handler, RetryPolicyInterceptor};
use Spiral\Queue\Interceptor\Consume\Core as ConsumeCore;
use Spiral\Queue\Interceptor\Consume\ErrorHandlerInterceptor;
use Spiral\Queue\Interceptor\Consume\Handler;
use Spiral\Queue\Interceptor\Consume\RetryPolicyInterceptor;
use Spiral\Telemetry\Bootloader\TelemetryBootloader;
use Spiral\Telemetry\TracerFactoryInterface;
use Spiral\Tokenizer\Bootloader\TokenizerListenerBootloader;
Expand Down Expand Up @@ -139,7 +143,7 @@ protected function initHandler(
TracerFactoryInterface $tracerFactory,
?EventDispatcherInterface $dispatcher = null,
): Handler {
$core = new InterceptableCore($core, $dispatcher);
$pipeline = (new InterceptorPipeline($dispatcher))->withHandler($core);

foreach ($config->getConsumeInterceptors() as $interceptor) {
if (\is_string($interceptor)) {
Expand All @@ -148,11 +152,11 @@ protected function initHandler(
$interceptor = $interceptor->resolve($factory);
}

\assert($interceptor instanceof CoreInterceptorInterface);
$core->addInterceptor($interceptor);
\assert($interceptor instanceof CoreInterceptorInterface || $interceptor instanceof InterceptorInterface);
$pipeline->addInterceptor($interceptor);
}

return new Handler($core, $tracerFactory);
return new Handler($pipeline, $tracerFactory);
}

private function initQueueConfig(EnvironmentInterface $env): void
Expand Down
12 changes: 9 additions & 3 deletions src/Config/QueueConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@
namespace Spiral\Queue\Config;

use Spiral\Core\Container\Autowire;
use Spiral\Core\CoreInterceptorInterface;
use Spiral\Core\CoreInterceptorInterface as LegacyInterceptor;
use Spiral\Core\InjectableConfig;
use Spiral\Interceptors\InterceptorInterface;
use Spiral\Queue\Exception\InvalidArgumentException;
use Spiral\Serializer\SerializerInterface;

/**
* @psalm-type TLegacyInterceptors = array<class-string<LegacyInterceptor>|LegacyInterceptor|Autowire>
* @psalm-type TNewInterceptors = array<class-string<InterceptorInterface>|InterceptorInterface|Autowire>
* @psalm-type TInterceptors = TNewInterceptors|TLegacyInterceptors
*/
final class QueueConfig extends InjectableConfig
{
public const CONFIG = 'queue';
Expand Down Expand Up @@ -41,7 +47,7 @@ public function getAliases(): array
/**
* Get consumer interceptors
*
* @return array<class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire>
* @return TInterceptors
*/
public function getConsumeInterceptors(): array
{
Expand All @@ -51,7 +57,7 @@ public function getConsumeInterceptors(): array
/**
* Get pusher interceptors
*
* @return array<class-string<CoreInterceptorInterface>|CoreInterceptorInterface|Autowire>
* @return TInterceptors
*/
public function getPushInterceptors(): array
{
Expand Down
14 changes: 13 additions & 1 deletion src/Interceptor/Consume/Core.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

use Psr\EventDispatcher\EventDispatcherInterface;
use Spiral\Core\CoreInterface;
use Spiral\Interceptors\Context\CallContext;
use Spiral\Interceptors\HandlerInterface;
use Spiral\Queue\Event\JobProcessed;
use Spiral\Queue\Event\JobProcessing;
use Spiral\Queue\HandlerRegistryInterface;
Expand All @@ -20,7 +22,7 @@
* headers: array
* }
*/
final class Core implements CoreInterface
final class Core implements CoreInterface, HandlerInterface
{
public function __construct(
private readonly HandlerRegistryInterface $registry,
Expand All @@ -30,6 +32,7 @@ public function __construct(

/**
* @param-assert TParameters $parameters
* @deprecated
*/
public function callAction(string $controller, string $action, array $parameters = []): mixed
{
Expand All @@ -49,6 +52,15 @@ public function callAction(string $controller, string $action, array $parameters
return null;
}

public function handle(CallContext $context): mixed
{
$args = $context->getArguments();
$controller = $context->getTarget()->getPath()[0];
$action = $context->getTarget()->getPath()[1];

return $this->callAction($controller, $action, $args);
}

/**
* @param class-string $event
* @param-assert TParameters $parameters
Expand Down
27 changes: 25 additions & 2 deletions src/Interceptor/Consume/ErrorHandlerInterceptor.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

namespace Spiral\Queue\Interceptor\Consume;

use Spiral\Core\CoreInterceptorInterface;
use Spiral\Core\CoreInterceptorInterface as LegacyInterceptor;
use Spiral\Core\CoreInterface;
use Spiral\Interceptors\Context\CallContextInterface;
use Spiral\Interceptors\HandlerInterface;
use Spiral\Interceptors\InterceptorInterface;
use Spiral\Queue\Exception\StateException;
use Spiral\Queue\Failed\FailedJobHandlerInterface;

final class ErrorHandlerInterceptor implements CoreInterceptorInterface
final class ErrorHandlerInterceptor implements LegacyInterceptor, InterceptorInterface
{
public function __construct(
private readonly FailedJobHandlerInterface $handler
Expand All @@ -35,4 +38,24 @@ public function process(string $name, string $action, array $parameters, CoreInt
throw $e;
}
}

public function intercept(CallContextInterface $context, HandlerInterface $handler): mixed
{
try {
return $handler->handle($context);
} catch (\Throwable $e) {
$args = $context->getArguments();
if (!$e instanceof StateException) {
$this->handler->handle(
$args['driver'],
$args['queue'],
$context->getTarget()->getPath()[0],
$args['payload'],
$e,
);
}

throw $e;
}
}
}
25 changes: 17 additions & 8 deletions src/Interceptor/Consume/Handler.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

use Spiral\Core\Container;
use Spiral\Core\CoreInterface;
use Spiral\Interceptors\Context\CallContext;
use Spiral\Interceptors\Context\Target;
use Spiral\Interceptors\HandlerInterface;
use Spiral\Telemetry\NullTracerFactory;
use Spiral\Telemetry\TraceKind;
use Spiral\Telemetry\TracerFactoryInterface;
Expand All @@ -17,12 +20,14 @@
final class Handler
{
private readonly TracerFactoryInterface $tracerFactory;
private readonly bool $isLegacy;

public function __construct(
private readonly CoreInterface $core,
private readonly HandlerInterface|CoreInterface $core,
?TracerFactoryInterface $tracerFactory = null,
) {
$this->tracerFactory = $tracerFactory ?? new NullTracerFactory(new Container());
$this->isLegacy = !$core instanceof HandlerInterface;
}

public function handle(
Expand All @@ -35,15 +40,19 @@ public function handle(
): mixed {
$tracer = $this->tracerFactory->make($headers);

$arguments = [
'driver' => $driver,
'queue' => $queue,
'id' => $id,
'payload' => $payload,
'headers' => $headers,
];

return $tracer->trace(
name: \sprintf('Job handling [%s:%s]', $name, $id),
callback: fn (): mixed => $this->core->callAction($name, 'handle', [
'driver' => $driver,
'queue' => $queue,
'id' => $id,
'payload' => $payload,
'headers' => $headers,
]),
callback: $this->isLegacy
? fn (): mixed => $this->core->callAction($name, 'handle', $arguments)
: fn (): mixed => $this->core->handle(new CallContext(Target::fromPair($name, 'handle'), $arguments)),
attributes: [
'queue.driver' => $driver,
'queue.name' => $queue,
Expand Down
14 changes: 13 additions & 1 deletion src/Interceptor/Push/Core.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

use Spiral\Core\ContainerScope;
use Spiral\Core\CoreInterface;
use Spiral\Interceptors\Context\CallContext;
use Spiral\Interceptors\HandlerInterface;
use Spiral\Queue\Options;
use Spiral\Queue\OptionsInterface;
use Spiral\Queue\QueueInterface;
Expand All @@ -16,7 +18,7 @@
* @internal
* @psalm-type TParameters = array{options: ?OptionsInterface, payload: mixed}
*/
final class Core implements CoreInterface
final class Core implements CoreInterface, HandlerInterface
{
public function __construct(
private readonly QueueInterface $connection,
Expand All @@ -25,6 +27,7 @@ public function __construct(

/**
* @param-assert TParameters $parameters
* @deprecated
*/
public function callAction(
string $controller,
Expand Down Expand Up @@ -56,6 +59,15 @@ public function callAction(
);
}

public function handle(CallContext $context): mixed
{
$args = $context->getArguments();
$controller = $context->getTarget()->getPath()[0];
$action = $context->getTarget()->getPath()[1];

return $this->callAction($controller, $action, $args);
}

private function getTracer(): TracerInterface
{
try {
Expand Down
3 changes: 2 additions & 1 deletion src/JobHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Spiral\Queue;

use Spiral\Core\Attribute\Proxy;
use Spiral\Core\InvokerInterface;
use Spiral\Queue\Exception\JobException;

Expand All @@ -18,7 +19,7 @@ abstract class JobHandler implements HandlerInterface
protected const HANDLE_FUNCTION = 'invoke';

public function __construct(
protected InvokerInterface $invoker,
#[Proxy] protected InvokerInterface $invoker,
) {
}

Expand Down
16 changes: 13 additions & 3 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
namespace Spiral\Queue;

use Spiral\Core\CoreInterface;
use Spiral\Interceptors\Context\CallContext;
use Spiral\Interceptors\Context\Target;
use Spiral\Interceptors\HandlerInterface as InterceptorHandler;

/**
* This class is used to push jobs into the queue and pass them through the interceptor chain
Expand All @@ -15,16 +18,23 @@
*/
final class Queue implements QueueInterface
{
private readonly bool $isLegacy;

public function __construct(
private readonly CoreInterface $core,
private readonly CoreInterface|InterceptorHandler $core,
) {
$this->isLegacy = !$core instanceof HandlerInterface;
}

public function push(string $name, mixed $payload = [], mixed $options = null): string
{
return $this->core->callAction($name, 'push', [
$arguments = [
'payload' => $payload,
'options' => $options,
]);
];

return $this->isLegacy
? $this->core->callAction($name, 'push', $arguments)
: $this->core->handle(new CallContext(Target::fromPair($name, 'push'), $arguments));
}
}
17 changes: 7 additions & 10 deletions src/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Spiral\Core\Container\Autowire;
use Spiral\Core\CoreInterceptorInterface;
use Spiral\Core\CoreInterceptorInterface as LegacyInterceptor;
use Spiral\Core\Exception\Container\ContainerException;
use Spiral\Core\FactoryInterface;
use Spiral\Core\InterceptableCore;
use Spiral\Core\InterceptorPipeline;
use Spiral\Interceptors\InterceptorInterface;
use Spiral\Queue\Config\QueueConfig;
use Spiral\Queue\Interceptor\Push\Core as PushCore;

Expand Down Expand Up @@ -50,22 +51,18 @@ private function resolveConnection(string $name): QueueInterface

try {
$driver = $this->factory->make($config['driver'], $config);

$core = new InterceptableCore(
new PushCore($driver),
$this->dispatcher
);
$pipeline = (new InterceptorPipeline($this->dispatcher))->withHandler(new PushCore($driver));

foreach ($this->config->getPushInterceptors() as $interceptor) {
if (\is_string($interceptor) || $interceptor instanceof Autowire) {
$interceptor = $this->container->get($interceptor);
}

\assert($interceptor instanceof CoreInterceptorInterface);
$core->addInterceptor($interceptor);
\assert($interceptor instanceof LegacyInterceptor || $interceptor instanceof InterceptorInterface);
$pipeline->addInterceptor($interceptor);
}

return new Queue($core);
return new Queue($pipeline);
} catch (ContainerException $e) {
throw new Exception\NotSupportedDriverException(
\sprintf(
Expand Down

0 comments on commit 949d533

Please sign in to comment.