Skip to content

Commit

Permalink
Merge pull request #65 from spiral/interceptors
Browse files Browse the repository at this point in the history
Adding support for Temporal Interceptors
  • Loading branch information
butschster committed Dec 25, 2023
2 parents 05c7410 + d8135c8 commit dd13567
Show file tree
Hide file tree
Showing 14 changed files with 459 additions and 50 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/psalm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ on:
name: static analysis

jobs:
psalm:
uses: spiral/gh-actions/.github/workflows/psalm.yml@master
with:
os: >-
['ubuntu-latest']
psalm:
uses: spiral/gh-actions/.github/workflows/psalm.yml@master
with:
os: >-
['ubuntu-latest']
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ docs
vendor
node_modules
.php-cs-fixer.cache
tests/runtime
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"spiral/tokenizer": "^3.0",
"spiral/roadrunner-bridge": "^2.0 || ^3.0",
"nette/php-generator": "^4.0",
"temporal/sdk": "^1.3 || ^2.0"
"temporal/sdk": "^2.7"
},
"require-dev": {
"spiral/framework": "^3.0",
Expand Down
2 changes: 1 addition & 1 deletion src/Bootloader/PrototypeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use Spiral\TemporalBridge\WorkflowManagerInterface;
use Temporal\Client\WorkflowClientInterface;

class PrototypeBootloader extends Bootloader
final class PrototypeBootloader extends Bootloader
{
protected const DEPENDENCIES = [
BasePrototypeBootloader::class,
Expand Down
65 changes: 41 additions & 24 deletions src/Bootloader/TemporalBridgeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@
use Spiral\Boot\AbstractKernel;
use Spiral\Boot\Bootloader\Bootloader;
use Spiral\Boot\EnvironmentInterface;
use Spiral\Boot\FinalizerInterface;
use Spiral\Config\ConfiguratorInterface;
use Spiral\Config\Patch\Append;
use Spiral\Console\Bootloader\ConsoleBootloader;
use Spiral\Core\Container\Autowire;
use Spiral\Core\FactoryInterface;
use Spiral\RoadRunnerBridge\Bootloader\RoadRunnerBootloader;
use Spiral\TemporalBridge\Commands;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Spiral\TemporalBridge\DeclarationLocator;
use Spiral\TemporalBridge\DeclarationLocatorInterface;
use Spiral\TemporalBridge\Dispatcher;
use Spiral\TemporalBridge\Preset\PresetRegistry;
use Spiral\TemporalBridge\Preset\PresetRegistryInterface;
use Spiral\TemporalBridge\WorkerFactory;
use Spiral\TemporalBridge\WorkerFactoryInterface;
use Spiral\TemporalBridge\WorkersRegistry;
use Spiral\TemporalBridge\WorkersRegistryInterface;
use Spiral\TemporalBridge\Workflow\WorkflowManager;
Expand All @@ -33,22 +36,27 @@
use Temporal\Client\WorkflowClientInterface;
use Temporal\DataConverter\DataConverter;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Interceptor\SimplePipelineProvider;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Worker\Transport\Goridge;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;
use Temporal\WorkerFactory;
use Temporal\WorkerFactory as TemporalWorkerFactory;

class TemporalBridgeBootloader extends Bootloader
{
protected const SINGLETONS = [
WorkflowPresetLocatorInterface::class => [self::class, 'initWorkflowPresetLocator'],
WorkflowManagerInterface::class => WorkflowManager::class,
WorkerFactoryInterface::class => [self::class, 'initWorkerFactory'],
TemporalWorkerFactoryInterface::class => [self::class, 'initWorkerFactory'],
DeclarationLocatorInterface::class => [self::class, 'initDeclarationLocator'],
WorkflowClientInterface::class => [self::class, 'initWorkflowClient'],
WorkersRegistryInterface::class => [self::class, 'initWorkersRegistry'],
WorkersRegistryInterface::class => WorkersRegistry::class,
PresetRegistryInterface::class => PresetRegistry::class,
DataConverterInterface::class => [self::class, 'initDataConverter'],
WorkerFactoryInterface::class => WorkerFactory::class,
PipelineProvider::class => [self::class, 'initPipelineProvider'],
];

protected const DEPENDENCIES = [
Expand All @@ -57,15 +65,15 @@ class TemporalBridgeBootloader extends Bootloader
];

public function __construct(
private readonly ConfiguratorInterface $config
private readonly ConfiguratorInterface $config,
) {
}

public function init(
AbstractKernel $kernel,
EnvironmentInterface $env,
ConsoleBootloader $console,
FactoryInterface $factory
FactoryInterface $factory,
): void {
$this->initConfig($env);

Expand All @@ -83,7 +91,7 @@ public function addWorkerOptions(string $worker, WorkerOptions $options): void

protected function initWorkflowPresetLocator(
FactoryInterface $factory,
ClassesInterface $classes
ClassesInterface $classes,
): WorkflowPresetLocatorInterface {
return new WorkflowPresetLocator(
factory: $factory,
Expand All @@ -99,20 +107,22 @@ protected function initConfig(EnvironmentInterface $env): void
[
'address' => $env->get('TEMPORAL_ADDRESS', '127.0.0.1:7233'),
'namespace' => 'App\\Workflow',
'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', WorkerFactoryInterface::DEFAULT_TASK_QUEUE),
'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', TemporalWorkerFactory::DEFAULT_TASK_QUEUE),
'workers' => [],
]
],
);
}

protected function initWorkflowClient(
TemporalConfig $config,
DataConverterInterface $dataConverter
DataConverterInterface $dataConverter,
PipelineProvider $pipelineProvider,
): WorkflowClientInterface {
return WorkflowClient::create(
return new WorkflowClient(
serviceClient: ServiceClient::create($config->getAddress()),
options: (new ClientOptions())->withNamespace($config->getTemporalNamespace()),
converter: $dataConverter
converter: $dataConverter,
interceptorProvider: $pipelineProvider,
);
}

Expand All @@ -122,28 +132,35 @@ protected function initDataConverter(): DataConverterInterface
}

protected function initWorkerFactory(
DataConverterInterface $dataConverter
): WorkerFactoryInterface {
return new WorkerFactory(
DataConverterInterface $dataConverter,
): TemporalWorkerFactoryInterface {
return new TemporalWorkerFactory(
dataConverter: $dataConverter,
rpc: Goridge::create()
);
}

protected function initDeclarationLocator(
ClassesInterface $classes
ClassesInterface $classes,
): DeclarationLocatorInterface {
return new \Spiral\TemporalBridge\DeclarationLocator(
return new DeclarationLocator(
classes: $classes,
reader: new AttributeReader()
);
}

protected function initWorkersRegistry(
WorkerFactoryInterface $workerFactory,
FinalizerInterface $finalizer,
TemporalConfig $config
): WorkersRegistryInterface {
return new WorkersRegistry($workerFactory, $finalizer, $config);
protected function initPipelineProvider(TemporalConfig $config, FactoryInterface $factory): PipelineProvider
{
/** @var Interceptor[] $interceptors */
$interceptors = \array_map(
static fn (mixed $interceptor) => match (true) {
\is_string($interceptor) => $factory->make($interceptor),
$interceptor instanceof Autowire => $interceptor->resolve($factory),
default => $interceptor
},
$config->getInterceptors(),
);

return new SimplePipelineProvider($interceptors);
}
}
47 changes: 45 additions & 2 deletions src/Config/TemporalConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,30 @@

namespace Spiral\TemporalBridge\Config;

use Spiral\Core\Container\Autowire;
use Spiral\Core\InjectableConfig;
use Temporal\Exception\ExceptionInterceptorInterface;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;

/**
* @psalm-type TInterceptor = Interceptor|class-string<Interceptor>|Autowire<Interceptor>
* @psalm-type TExceptionInterceptor = ExceptionInterceptorInterface|class-string<ExceptionInterceptorInterface>|Autowire<ExceptionInterceptorInterface>
* @psalm-type TWorker = array{
* options?: WorkerOptions,
* exception_interceptor?: TExceptionInterceptor
* }
*
* @property array{
* address: non-empty-string,
* namespace: non-empty-string,
* temporalNamespace: non-empty-string,
* defaultWorker: non-empty-string,
* workers: array<non-empty-string, WorkerOptions|TWorker>,
* interceptors?: TInterceptor[]
* } $config
*/
final class TemporalConfig extends InjectableConfig
{
public const CONFIG = 'temporal';
Expand All @@ -18,31 +38,54 @@ final class TemporalConfig extends InjectableConfig
'temporalNamespace' => 'default',
'defaultWorker' => WorkerFactoryInterface::DEFAULT_TASK_QUEUE,
'workers' => [],
'interceptors' => [],
];

/**
* @return non-empty-string
*/
public function getDefaultNamespace(): string
{
return $this->config['namespace'];
}

/**
* @return non-empty-string
*/
public function getTemporalNamespace(): string
{
return $this->config['temporalNamespace'];
}

/**
* @return non-empty-string
*/
public function getAddress(): string
{
return $this->config['address'];
}

/**
* @return non-empty-string
*/
public function getDefaultWorker(): string
{
return $this->config['defaultWorker'];
}

/** @psalm-return array<non-empty-string, WorkerOptions> */
/**
* @return array<non-empty-string, WorkerOptions|TWorker>
*/
public function getWorkers(): array
{
return (array) $this->config['workers'];
return $this->config['workers'] ?? [];
}

/**
* @return TInterceptor[]
*/
public function getInterceptors(): array
{
return $this->config['interceptors'] ?? [];
}
}
90 changes: 90 additions & 0 deletions src/WorkerFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Spiral\Boot\FinalizerInterface;
use Spiral\Core\Container\Autowire;
use Spiral\Core\FactoryInterface;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Temporal\Exception\ExceptionInterceptorInterface;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactory;
use Temporal\Worker\WorkerInterface;
use Temporal\Worker\WorkerOptions;

/**
* @psalm-import-type TWorker from TemporalConfig
*/
final class WorkerFactory implements WorkerFactoryInterface
{
/** @var array<non-empty-string, WorkerOptions|TWorker> */
private array $workers = [];

public function __construct(
private readonly TemporalWorkerFactory $workerFactory,
private readonly FinalizerInterface $finalizer,
private readonly FactoryInterface $factory,
private readonly PipelineProvider $pipelineProvider,
private readonly TemporalConfig $config,
) {
$this->workers = $this->config->getWorkers();
}

/**
* @param non-empty-string $name
*/
public function create(string $name): WorkerInterface
{
/** @psalm-suppress TooManyArguments */
$worker = $this->workerFactory->newWorker(
$name,
$this->getWorkerOptions($name),
$this->getExceptionInterceptor($name),
$this->pipelineProvider,
);
$worker->registerActivityFinalizer(fn () => $this->finalizer->finalize());

return $worker;
}

/**
* @param non-empty-string $name
*/
private function getWorkerOptions(string $name): ?WorkerOptions
{
$worker = $this->workers[$name] ?? null;

return match (true) {
$worker instanceof WorkerOptions => $worker,
isset($worker['options']) && $worker['options'] instanceof WorkerOptions => $worker['options'],
default => null
};
}

/**
* @param non-empty-string $name
*/
private function getExceptionInterceptor(string $name): ?ExceptionInterceptorInterface
{
$worker = $this->workers[$name] ?? null;
if (!\is_array($worker) || !isset($worker['exception_interceptor'])) {
return null;
}

$exceptionInterceptor = $this->wire($worker['exception_interceptor']);
\assert($exceptionInterceptor instanceof ExceptionInterceptorInterface);

return $exceptionInterceptor;
}

private function wire(mixed $alias): object
{
return match (true) {
\is_string($alias) => $this->factory->make($alias),
$alias instanceof Autowire => $alias->resolve($this->factory),
default => $alias
};
}
}
17 changes: 17 additions & 0 deletions src/WorkerFactoryInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Temporal\Worker\WorkerInterface;

interface WorkerFactoryInterface
{
/**
* Creates a new Temporal worker.
*
* @param non-empty-string $name
*/
public function create(string $name): WorkerInterface;
}
Loading

0 comments on commit dd13567

Please sign in to comment.