Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for Temporal Interceptors #65

Merged
merged 12 commits into from
Dec 25, 2023
10 changes: 5 additions & 5 deletions .github/workflows/psalm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ on:
name: static analysis

jobs:
psalm:
uses: spiral/gh-actions/.github/workflows/psalm.yml@master
with:
os: >-
['ubuntu-latest']
psalm:
uses: spiral/gh-actions/.github/workflows/psalm.yml@master
with:
os: >-
['ubuntu-latest']
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ docs
vendor
node_modules
.php-cs-fixer.cache
tests/runtime
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"spiral/tokenizer": "^3.0",
"spiral/roadrunner-bridge": "^2.0 || ^3.0",
"nette/php-generator": "^4.0",
"temporal/sdk": "^1.3 || ^2.0"
"temporal/sdk": "^2.7"
},
"require-dev": {
"spiral/framework": "^3.0",
Expand Down
2 changes: 1 addition & 1 deletion src/Bootloader/PrototypeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use Spiral\TemporalBridge\WorkflowManagerInterface;
use Temporal\Client\WorkflowClientInterface;

class PrototypeBootloader extends Bootloader
final class PrototypeBootloader extends Bootloader
{
protected const DEPENDENCIES = [
BasePrototypeBootloader::class,
Expand Down
65 changes: 41 additions & 24 deletions src/Bootloader/TemporalBridgeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@
use Spiral\Boot\AbstractKernel;
use Spiral\Boot\Bootloader\Bootloader;
use Spiral\Boot\EnvironmentInterface;
use Spiral\Boot\FinalizerInterface;
use Spiral\Config\ConfiguratorInterface;
use Spiral\Config\Patch\Append;
use Spiral\Console\Bootloader\ConsoleBootloader;
use Spiral\Core\Container\Autowire;
use Spiral\Core\FactoryInterface;
use Spiral\RoadRunnerBridge\Bootloader\RoadRunnerBootloader;
use Spiral\TemporalBridge\Commands;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Spiral\TemporalBridge\DeclarationLocator;
use Spiral\TemporalBridge\DeclarationLocatorInterface;
use Spiral\TemporalBridge\Dispatcher;
use Spiral\TemporalBridge\Preset\PresetRegistry;
use Spiral\TemporalBridge\Preset\PresetRegistryInterface;
use Spiral\TemporalBridge\WorkerFactory;
use Spiral\TemporalBridge\WorkerFactoryInterface;
use Spiral\TemporalBridge\WorkersRegistry;
use Spiral\TemporalBridge\WorkersRegistryInterface;
use Spiral\TemporalBridge\Workflow\WorkflowManager;
Expand All @@ -33,22 +36,27 @@
use Temporal\Client\WorkflowClientInterface;
use Temporal\DataConverter\DataConverter;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Interceptor\SimplePipelineProvider;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Worker\Transport\Goridge;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;
use Temporal\WorkerFactory;
use Temporal\WorkerFactory as TemporalWorkerFactory;

class TemporalBridgeBootloader extends Bootloader
{
protected const SINGLETONS = [
WorkflowPresetLocatorInterface::class => [self::class, 'initWorkflowPresetLocator'],
WorkflowManagerInterface::class => WorkflowManager::class,
WorkerFactoryInterface::class => [self::class, 'initWorkerFactory'],
TemporalWorkerFactoryInterface::class => [self::class, 'initWorkerFactory'],
DeclarationLocatorInterface::class => [self::class, 'initDeclarationLocator'],
WorkflowClientInterface::class => [self::class, 'initWorkflowClient'],
WorkersRegistryInterface::class => [self::class, 'initWorkersRegistry'],
WorkersRegistryInterface::class => WorkersRegistry::class,
PresetRegistryInterface::class => PresetRegistry::class,
DataConverterInterface::class => [self::class, 'initDataConverter'],
WorkerFactoryInterface::class => WorkerFactory::class,
PipelineProvider::class => [self::class, 'initPipelineProvider'],
];

protected const DEPENDENCIES = [
Expand All @@ -57,15 +65,15 @@ class TemporalBridgeBootloader extends Bootloader
];

public function __construct(
private readonly ConfiguratorInterface $config
private readonly ConfiguratorInterface $config,
) {
}

public function init(
AbstractKernel $kernel,
EnvironmentInterface $env,
ConsoleBootloader $console,
FactoryInterface $factory
FactoryInterface $factory,
): void {
$this->initConfig($env);

Expand All @@ -83,7 +91,7 @@ public function addWorkerOptions(string $worker, WorkerOptions $options): void

protected function initWorkflowPresetLocator(
FactoryInterface $factory,
ClassesInterface $classes
ClassesInterface $classes,
): WorkflowPresetLocatorInterface {
return new WorkflowPresetLocator(
factory: $factory,
Expand All @@ -99,20 +107,22 @@ protected function initConfig(EnvironmentInterface $env): void
[
'address' => $env->get('TEMPORAL_ADDRESS', '127.0.0.1:7233'),
'namespace' => 'App\\Workflow',
'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', WorkerFactoryInterface::DEFAULT_TASK_QUEUE),
'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', TemporalWorkerFactory::DEFAULT_TASK_QUEUE),
'workers' => [],
]
],
);
}

protected function initWorkflowClient(
TemporalConfig $config,
DataConverterInterface $dataConverter
DataConverterInterface $dataConverter,
PipelineProvider $pipelineProvider,
): WorkflowClientInterface {
return WorkflowClient::create(
return new WorkflowClient(
serviceClient: ServiceClient::create($config->getAddress()),
options: (new ClientOptions())->withNamespace($config->getTemporalNamespace()),
converter: $dataConverter
converter: $dataConverter,
interceptorProvider: $pipelineProvider,
);
}

Expand All @@ -122,28 +132,35 @@ protected function initDataConverter(): DataConverterInterface
}

protected function initWorkerFactory(
DataConverterInterface $dataConverter
): WorkerFactoryInterface {
return new WorkerFactory(
DataConverterInterface $dataConverter,
): TemporalWorkerFactoryInterface {
return new TemporalWorkerFactory(
dataConverter: $dataConverter,
rpc: Goridge::create()
);
}

protected function initDeclarationLocator(
ClassesInterface $classes
ClassesInterface $classes,
): DeclarationLocatorInterface {
return new \Spiral\TemporalBridge\DeclarationLocator(
return new DeclarationLocator(
classes: $classes,
reader: new AttributeReader()
);
}

protected function initWorkersRegistry(
WorkerFactoryInterface $workerFactory,
FinalizerInterface $finalizer,
TemporalConfig $config
): WorkersRegistryInterface {
return new WorkersRegistry($workerFactory, $finalizer, $config);
protected function initPipelineProvider(TemporalConfig $config, FactoryInterface $factory): PipelineProvider
{
/** @var Interceptor[] $interceptors */
$interceptors = \array_map(
static fn (mixed $interceptor) => match (true) {
\is_string($interceptor) => $factory->make($interceptor),
$interceptor instanceof Autowire => $interceptor->resolve($factory),
default => $interceptor
},
$config->getInterceptors(),
);

return new SimplePipelineProvider($interceptors);
}
}
47 changes: 45 additions & 2 deletions src/Config/TemporalConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,30 @@

namespace Spiral\TemporalBridge\Config;

use Spiral\Core\Container\Autowire;
use Spiral\Core\InjectableConfig;
use Temporal\Exception\ExceptionInterceptorInterface;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;

/**
* @psalm-type TInterceptor = Interceptor|class-string<Interceptor>|Autowire<Interceptor>
* @psalm-type TExceptionInterceptor = ExceptionInterceptorInterface|class-string<ExceptionInterceptorInterface>|Autowire<ExceptionInterceptorInterface>
* @psalm-type TWorker = array{
* options?: WorkerOptions,
* exception_interceptor?: TExceptionInterceptor
* }
*
* @property array{
* address: non-empty-string,
* namespace: non-empty-string,
* temporalNamespace: non-empty-string,
* defaultWorker: non-empty-string,
* workers: array<non-empty-string, WorkerOptions|TWorker>,
* interceptors?: TInterceptor[]
* } $config
*/
final class TemporalConfig extends InjectableConfig
{
public const CONFIG = 'temporal';
Expand All @@ -18,31 +38,54 @@ final class TemporalConfig extends InjectableConfig
'temporalNamespace' => 'default',
'defaultWorker' => WorkerFactoryInterface::DEFAULT_TASK_QUEUE,
'workers' => [],
'interceptors' => [],
];

/**
* @return non-empty-string
*/
public function getDefaultNamespace(): string
{
return $this->config['namespace'];
}

/**
* @return non-empty-string
*/
public function getTemporalNamespace(): string
{
return $this->config['temporalNamespace'];
}

/**
* @return non-empty-string
*/
public function getAddress(): string
{
return $this->config['address'];
}

/**
* @return non-empty-string
*/
public function getDefaultWorker(): string
{
return $this->config['defaultWorker'];
}

/** @psalm-return array<non-empty-string, WorkerOptions> */
/**
* @return array<non-empty-string, WorkerOptions|TWorker>
*/
public function getWorkers(): array
{
return (array) $this->config['workers'];
return $this->config['workers'] ?? [];
}

/**
* @return TInterceptor[]
*/
public function getInterceptors(): array
{
return $this->config['interceptors'] ?? [];
}
}
90 changes: 90 additions & 0 deletions src/WorkerFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Spiral\Boot\FinalizerInterface;
use Spiral\Core\Container\Autowire;
use Spiral\Core\FactoryInterface;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Temporal\Exception\ExceptionInterceptorInterface;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactory;
use Temporal\Worker\WorkerInterface;
use Temporal\Worker\WorkerOptions;

/**
* @psalm-import-type TWorker from TemporalConfig
*/
final class WorkerFactory implements WorkerFactoryInterface
{
/** @var array<non-empty-string, WorkerOptions|TWorker> */
private array $workers = [];

public function __construct(
private readonly TemporalWorkerFactory $workerFactory,
private readonly FinalizerInterface $finalizer,
private readonly FactoryInterface $factory,
private readonly PipelineProvider $pipelineProvider,
private readonly TemporalConfig $config,
) {
$this->workers = $this->config->getWorkers();
}

/**
* @param non-empty-string $name
*/
public function create(string $name): WorkerInterface
{
/** @psalm-suppress TooManyArguments */
$worker = $this->workerFactory->newWorker(
$name,
$this->getWorkerOptions($name),
$this->getExceptionInterceptor($name),
$this->pipelineProvider,
);
$worker->registerActivityFinalizer(fn () => $this->finalizer->finalize());

return $worker;
}

/**
* @param non-empty-string $name
*/
private function getWorkerOptions(string $name): ?WorkerOptions
{
$worker = $this->workers[$name] ?? null;

return match (true) {
$worker instanceof WorkerOptions => $worker,
isset($worker['options']) && $worker['options'] instanceof WorkerOptions => $worker['options'],
default => null
};
}

/**
* @param non-empty-string $name
*/
private function getExceptionInterceptor(string $name): ?ExceptionInterceptorInterface
{
$worker = $this->workers[$name] ?? null;
if (!\is_array($worker) || !isset($worker['exception_interceptor'])) {
return null;
}

$exceptionInterceptor = $this->wire($worker['exception_interceptor']);
\assert($exceptionInterceptor instanceof ExceptionInterceptorInterface);

return $exceptionInterceptor;
}

private function wire(mixed $alias): object
{
return match (true) {
\is_string($alias) => $this->factory->make($alias),
$alias instanceof Autowire => $alias->resolve($this->factory),
default => $alias
};
}
}
17 changes: 17 additions & 0 deletions src/WorkerFactoryInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Temporal\Worker\WorkerInterface;

interface WorkerFactoryInterface
{
/**
* Creates a new Temporal worker.
*
* @param non-empty-string $name
*/
public function create(string $name): WorkerInterface;
}
Loading
Loading