Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds interceptors support #77

Merged
merged 15 commits into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/psalm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ on:
name: static analysis

jobs:
psalm:
uses: spiral/gh-actions/.github/workflows/psalm.yml@master
with:
os: >-
['ubuntu-latest']
psalm:
uses: spiral/gh-actions/.github/workflows/psalm.yml@master
with:
os: >-
['ubuntu-latest']
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ docs
vendor
node_modules
.php-cs-fixer.cache
tests/runtime
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"spiral/tokenizer": "^3.0",
"spiral/scaffolder": "^3.0",
"spiral/roadrunner-bridge": "^2.0 || ^3.0",
"temporal/sdk": "^1.3 || ^2.0"
"temporal/sdk": "^2.7"
},
"require-dev": {
"spiral/framework": "^3.0",
Expand Down
2 changes: 1 addition & 1 deletion src/Bootloader/PrototypeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Spiral\Prototype\Bootloader\PrototypeBootloader as BasePrototypeBootloader;
use Temporal\Client\WorkflowClientInterface;

class PrototypeBootloader extends Bootloader
final class PrototypeBootloader extends Bootloader
{
public function defineDependencies(): array
{
Expand Down
54 changes: 34 additions & 20 deletions src/Bootloader/TemporalBridgeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@
use Spiral\Boot\AbstractKernel;
use Spiral\Boot\Bootloader\Bootloader;
use Spiral\Boot\EnvironmentInterface;
use Spiral\Boot\FinalizerInterface;
use Spiral\Config\ConfiguratorInterface;
use Spiral\Config\Patch\Append;
use Spiral\Console\Bootloader\ConsoleBootloader;
use Spiral\Core\Container\Autowire;
use Spiral\Core\FactoryInterface;
use Spiral\RoadRunnerBridge\Bootloader\RoadRunnerBootloader;
use Spiral\TemporalBridge\Commands;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Spiral\TemporalBridge\DeclarationLocator;
use Spiral\TemporalBridge\DeclarationLocatorInterface;
use Spiral\TemporalBridge\Dispatcher;
use Spiral\TemporalBridge\WorkerFactory;
use Spiral\TemporalBridge\WorkerFactoryInterface;
use Spiral\TemporalBridge\WorkersRegistry;
use Spiral\TemporalBridge\WorkersRegistryInterface;
use Spiral\Tokenizer\ClassesInterface;
Expand All @@ -28,10 +30,13 @@
use Temporal\Client\WorkflowClientInterface;
use Temporal\DataConverter\DataConverter;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Interceptor\SimplePipelineProvider;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Worker\Transport\Goridge;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;
use Temporal\WorkerFactory;
use Temporal\WorkerFactory as TemporalWorkerFactory;

class TemporalBridgeBootloader extends Bootloader
{
Expand All @@ -47,11 +52,13 @@ public function defineDependencies(): array
public function defineSingletons(): array
{
return [
WorkerFactoryInterface::class => [self::class, 'initWorkerFactory'],
TemporalWorkerFactoryInterface::class => [self::class, 'initWorkerFactory'],
WorkerFactoryInterface::class => WorkerFactory::class,
DeclarationLocatorInterface::class => [self::class, 'initDeclarationLocator'],
WorkflowClientInterface::class => [self::class, 'initWorkflowClient'],
WorkersRegistryInterface::class => [self::class, 'initWorkersRegistry'],
WorkersRegistryInterface::class => WorkersRegistry::class,
DataConverterInterface::class => [self::class, 'initDataConverter'],
PipelineProvider::class => [self::class, 'initPipelineProvider'],
];
}

Expand Down Expand Up @@ -82,7 +89,7 @@ protected function initConfig(EnvironmentInterface $env): void
[
'address' => $env->get('TEMPORAL_ADDRESS', '127.0.0.1:7233'),
'namespace' => 'App\\Endpoint\\Temporal\\Workflow',
'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', WorkerFactoryInterface::DEFAULT_TASK_QUEUE),
'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', TemporalWorkerFactoryInterface::DEFAULT_TASK_QUEUE),
'workers' => [],
],
);
Expand All @@ -91,11 +98,13 @@ protected function initConfig(EnvironmentInterface $env): void
protected function initWorkflowClient(
TemporalConfig $config,
DataConverterInterface $dataConverter,
PipelineProvider $pipelineProvider,
): WorkflowClientInterface {
return WorkflowClient::create(
return new WorkflowClient(
serviceClient: ServiceClient::create($config->getAddress()),
options: (new ClientOptions())->withNamespace($config->getTemporalNamespace()),
converter: $dataConverter,
interceptorProvider: $pipelineProvider,
);
}

Expand All @@ -104,29 +113,34 @@ protected function initDataConverter(): DataConverterInterface
return DataConverter::createDefault();
}

protected function initWorkerFactory(
DataConverterInterface $dataConverter,
): WorkerFactoryInterface {
return new WorkerFactory(
protected function initWorkerFactory(DataConverterInterface $dataConverter,): TemporalWorkerFactoryInterface
{
return new TemporalWorkerFactory(
dataConverter: $dataConverter,
rpc: Goridge::create(),
);
}

protected function initDeclarationLocator(
ClassesInterface $classes,
): DeclarationLocatorInterface {
protected function initDeclarationLocator(ClassesInterface $classes,): DeclarationLocatorInterface
{
return new DeclarationLocator(
classes: $classes,
reader: new AttributeReader(),
);
}

protected function initWorkersRegistry(
WorkerFactoryInterface $workerFactory,
FinalizerInterface $finalizer,
TemporalConfig $config,
): WorkersRegistryInterface {
return new WorkersRegistry($workerFactory, $finalizer, $config);
protected function initPipelineProvider(TemporalConfig $config, FactoryInterface $factory): PipelineProvider
{
/** @var Interceptor[] $interceptors */
$interceptors = \array_map(
static fn(mixed $interceptor) => match (true) {
\is_string($interceptor) => $factory->make($interceptor),
$interceptor instanceof Autowire => $interceptor->resolve($factory),
default => $interceptor
},
$config->getInterceptors(),
);

return new SimplePipelineProvider($interceptors);
}
}
47 changes: 45 additions & 2 deletions src/Config/TemporalConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,30 @@

namespace Spiral\TemporalBridge\Config;

use Spiral\Core\Container\Autowire;
use Spiral\Core\InjectableConfig;
use Temporal\Exception\ExceptionInterceptorInterface;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;

/**
* @psalm-type TInterceptor = Interceptor|class-string<Interceptor>|Autowire<Interceptor>
* @psalm-type TExceptionInterceptor = ExceptionInterceptorInterface|class-string<ExceptionInterceptorInterface>|Autowire<ExceptionInterceptorInterface>
* @psalm-type TWorker = array{
* options?: WorkerOptions,
* exception_interceptor?: TExceptionInterceptor
* }
*
* @property array{
* address: non-empty-string,
* namespace: non-empty-string,
* temporalNamespace: non-empty-string,
* defaultWorker: non-empty-string,
* workers: array<non-empty-string, WorkerOptions|TWorker>,
* interceptors?: TInterceptor[]
* } $config
*/
final class TemporalConfig extends InjectableConfig
{
public const CONFIG = 'temporal';
Expand All @@ -18,31 +38,54 @@ final class TemporalConfig extends InjectableConfig
'temporalNamespace' => 'default',
'defaultWorker' => WorkerFactoryInterface::DEFAULT_TASK_QUEUE,
'workers' => [],
'interceptors' => [],
];

/**
* @return non-empty-string
*/
public function getDefaultNamespace(): string
{
return $this->config['namespace'];
}

/**
* @return non-empty-string
*/
public function getTemporalNamespace(): string
{
return $this->config['temporalNamespace'];
}

/**
* @return non-empty-string
*/
public function getAddress(): string
{
return $this->config['address'];
}

/**
* @return non-empty-string
*/
public function getDefaultWorker(): string
{
return $this->config['defaultWorker'];
}

/** @psalm-return array<non-empty-string, WorkerOptions> */
/**
* @return array<non-empty-string, WorkerOptions|TWorker>
*/
public function getWorkers(): array
{
return (array) $this->config['workers'];
return $this->config['workers'] ?? [];
}

/**
* @return TInterceptor[]
*/
public function getInterceptors(): array
{
return $this->config['interceptors'] ?? [];
}
}
90 changes: 90 additions & 0 deletions src/WorkerFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Spiral\Boot\FinalizerInterface;
use Spiral\Core\Container\Autowire;
use Spiral\Core\FactoryInterface;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Temporal\Exception\ExceptionInterceptorInterface;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactory;
use Temporal\Worker\WorkerInterface;
use Temporal\Worker\WorkerOptions;

/**
* @psalm-import-type TWorker from TemporalConfig
*/
final class WorkerFactory implements WorkerFactoryInterface
{
/** @var array<non-empty-string, WorkerOptions|TWorker> */
private array $workers = [];

public function __construct(
private readonly TemporalWorkerFactory $workerFactory,
private readonly FinalizerInterface $finalizer,
private readonly FactoryInterface $factory,
private readonly PipelineProvider $pipelineProvider,
private readonly TemporalConfig $config,
) {
$this->workers = $this->config->getWorkers();
}

/**
* @param non-empty-string $name
*/
public function create(string $name): WorkerInterface
{
/** @psalm-suppress TooManyArguments */
$worker = $this->workerFactory->newWorker(
$name,
$this->getWorkerOptions($name),
$this->getExceptionInterceptor($name),
$this->pipelineProvider,
);
$worker->registerActivityFinalizer(fn () => $this->finalizer->finalize());

return $worker;
}

/**
* @param non-empty-string $name
*/
private function getWorkerOptions(string $name): ?WorkerOptions
{
$worker = $this->workers[$name] ?? null;

return match (true) {
$worker instanceof WorkerOptions => $worker,
isset($worker['options']) && $worker['options'] instanceof WorkerOptions => $worker['options'],
default => null
};
}

/**
* @param non-empty-string $name
*/
private function getExceptionInterceptor(string $name): ?ExceptionInterceptorInterface
{
$worker = $this->workers[$name] ?? null;
if (!\is_array($worker) || !isset($worker['exception_interceptor'])) {
return null;
}

$exceptionInterceptor = $this->wire($worker['exception_interceptor']);
\assert($exceptionInterceptor instanceof ExceptionInterceptorInterface);

return $exceptionInterceptor;
}

private function wire(mixed $alias): object
{
return match (true) {
\is_string($alias) => $this->factory->make($alias),
$alias instanceof Autowire => $alias->resolve($this->factory),
default => $alias
};
}
}
17 changes: 17 additions & 0 deletions src/WorkerFactoryInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Temporal\Worker\WorkerInterface;

interface WorkerFactoryInterface
{
/**
* Creates a new Temporal worker.
*
* @param non-empty-string $name
*/
public function create(string $name): WorkerInterface;
}
Loading
Loading