Skip to content

Commit

Permalink
Merge branch '2.0' into feature/interceptors
Browse files Browse the repository at this point in the history
# Conflicts:
#	composer.json
#	src/Bootloader/TemporalBridgeBootloader.php
#	src/Workflow/Workflow.php
#	tests/src/Bootloader/TemporalBridgeBootloaderTest.php
  • Loading branch information
butschster committed Dec 25, 2023
2 parents 66f8754 + dd13567 commit e56900a
Show file tree
Hide file tree
Showing 13 changed files with 432 additions and 32 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/psalm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ on:
name: static analysis

jobs:
psalm:
uses: spiral/gh-actions/.github/workflows/psalm.yml@master
with:
os: >-
['ubuntu-latest']
psalm:
uses: spiral/gh-actions/.github/workflows/psalm.yml@master
with:
os: >-
['ubuntu-latest']
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ docs
vendor
node_modules
.php-cs-fixer.cache
tests/runtime
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"spiral/tokenizer": "^3.0",
"spiral/scaffolder": "^3.0",
"spiral/roadrunner-bridge": "^2.0 || ^3.0",
"temporal/sdk": "^1.3 || ^2.0"
"temporal/sdk": "^2.7"
},
"require-dev": {
"spiral/framework": "^3.0",
Expand Down
2 changes: 1 addition & 1 deletion src/Bootloader/PrototypeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Spiral\Prototype\Bootloader\PrototypeBootloader as BasePrototypeBootloader;
use Temporal\Client\WorkflowClientInterface;

class PrototypeBootloader extends Bootloader
final class PrototypeBootloader extends Bootloader
{
public function defineDependencies(): array
{
Expand Down
38 changes: 28 additions & 10 deletions src/Bootloader/TemporalBridgeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@
use Spiral\Config\ConfiguratorInterface;
use Spiral\Config\Patch\Append;
use Spiral\Console\Bootloader\ConsoleBootloader;
use Spiral\Core\Container\Autowire;
use Spiral\Core\FactoryInterface;
use Spiral\RoadRunnerBridge\Bootloader\RoadRunnerBootloader;
use Spiral\TemporalBridge\Commands;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Spiral\TemporalBridge\DeclarationLocator;
use Spiral\TemporalBridge\DeclarationLocatorInterface;
use Spiral\TemporalBridge\Dispatcher;
use Spiral\TemporalBridge\Preset\PresetRegistry;
use Spiral\TemporalBridge\Preset\PresetRegistryInterface;
use Spiral\TemporalBridge\WorkerFactory;
use Spiral\TemporalBridge\WorkerFactoryInterface;
use Spiral\TemporalBridge\WorkersRegistry;
use Spiral\TemporalBridge\WorkersRegistryInterface;
use Spiral\Tokenizer\ClassesInterface;
Expand All @@ -28,10 +33,13 @@
use Temporal\Client\WorkflowClientInterface;
use Temporal\DataConverter\DataConverter;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Interceptor\SimplePipelineProvider;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Worker\Transport\Goridge;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;
use Temporal\WorkerFactory;
use Temporal\WorkerFactory as TemporalWorkerFactory;

class TemporalBridgeBootloader extends Bootloader
{
Expand All @@ -50,8 +58,9 @@ public function defineSingletons(): array
WorkerFactoryInterface::class => [self::class, 'initWorkerFactory'],
DeclarationLocatorInterface::class => [self::class, 'initDeclarationLocator'],
WorkflowClientInterface::class => [self::class, 'initWorkflowClient'],
WorkersRegistryInterface::class => [self::class, 'initWorkersRegistry'],
WorkersRegistryInterface::class => WorkersRegistry::class,
DataConverterInterface::class => [self::class, 'initDataConverter'],
PipelineProvider::class => [self::class, 'initPipelineProvider'],
];
}

Expand Down Expand Up @@ -91,11 +100,13 @@ protected function initConfig(EnvironmentInterface $env): void
protected function initWorkflowClient(
TemporalConfig $config,
DataConverterInterface $dataConverter,
PipelineProvider $pipelineProvider,
): WorkflowClientInterface {
return WorkflowClient::create(
return new WorkflowClient(
serviceClient: ServiceClient::create($config->getAddress()),
options: (new ClientOptions())->withNamespace($config->getTemporalNamespace()),
converter: $dataConverter,
interceptorProvider: $pipelineProvider,
);
}

Expand All @@ -122,11 +133,18 @@ classes: $classes,
);
}

protected function initWorkersRegistry(
WorkerFactoryInterface $workerFactory,
FinalizerInterface $finalizer,
TemporalConfig $config,
): WorkersRegistryInterface {
return new WorkersRegistry($workerFactory, $finalizer, $config);
protected function initPipelineProvider(TemporalConfig $config, FactoryInterface $factory): PipelineProvider
{
/** @var Interceptor[] $interceptors */
$interceptors = \array_map(
static fn (mixed $interceptor) => match (true) {
\is_string($interceptor) => $factory->make($interceptor),
$interceptor instanceof Autowire => $interceptor->resolve($factory),
default => $interceptor
},
$config->getInterceptors(),
);

return new SimplePipelineProvider($interceptors);
}
}
47 changes: 45 additions & 2 deletions src/Config/TemporalConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,30 @@

namespace Spiral\TemporalBridge\Config;

use Spiral\Core\Container\Autowire;
use Spiral\Core\InjectableConfig;
use Temporal\Exception\ExceptionInterceptorInterface;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;

/**
* @psalm-type TInterceptor = Interceptor|class-string<Interceptor>|Autowire<Interceptor>
* @psalm-type TExceptionInterceptor = ExceptionInterceptorInterface|class-string<ExceptionInterceptorInterface>|Autowire<ExceptionInterceptorInterface>
* @psalm-type TWorker = array{
* options?: WorkerOptions,
* exception_interceptor?: TExceptionInterceptor
* }
*
* @property array{
* address: non-empty-string,
* namespace: non-empty-string,
* temporalNamespace: non-empty-string,
* defaultWorker: non-empty-string,
* workers: array<non-empty-string, WorkerOptions|TWorker>,
* interceptors?: TInterceptor[]
* } $config
*/
final class TemporalConfig extends InjectableConfig
{
public const CONFIG = 'temporal';
Expand All @@ -18,31 +38,54 @@ final class TemporalConfig extends InjectableConfig
'temporalNamespace' => 'default',
'defaultWorker' => WorkerFactoryInterface::DEFAULT_TASK_QUEUE,
'workers' => [],
'interceptors' => [],
];

/**
* @return non-empty-string
*/
public function getDefaultNamespace(): string
{
return $this->config['namespace'];
}

/**
* @return non-empty-string
*/
public function getTemporalNamespace(): string
{
return $this->config['temporalNamespace'];
}

/**
* @return non-empty-string
*/
public function getAddress(): string
{
return $this->config['address'];
}

/**
* @return non-empty-string
*/
public function getDefaultWorker(): string
{
return $this->config['defaultWorker'];
}

/** @psalm-return array<non-empty-string, WorkerOptions> */
/**
* @return array<non-empty-string, WorkerOptions|TWorker>
*/
public function getWorkers(): array
{
return (array) $this->config['workers'];
return $this->config['workers'] ?? [];
}

/**
* @return TInterceptor[]
*/
public function getInterceptors(): array
{
return $this->config['interceptors'] ?? [];
}
}
90 changes: 90 additions & 0 deletions src/WorkerFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Spiral\Boot\FinalizerInterface;
use Spiral\Core\Container\Autowire;
use Spiral\Core\FactoryInterface;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Temporal\Exception\ExceptionInterceptorInterface;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactory;
use Temporal\Worker\WorkerInterface;
use Temporal\Worker\WorkerOptions;

/**
* @psalm-import-type TWorker from TemporalConfig
*/
final class WorkerFactory implements WorkerFactoryInterface
{
/** @var array<non-empty-string, WorkerOptions|TWorker> */
private array $workers = [];

public function __construct(
private readonly TemporalWorkerFactory $workerFactory,
private readonly FinalizerInterface $finalizer,
private readonly FactoryInterface $factory,
private readonly PipelineProvider $pipelineProvider,
private readonly TemporalConfig $config,
) {
$this->workers = $this->config->getWorkers();
}

/**
* @param non-empty-string $name
*/
public function create(string $name): WorkerInterface
{
/** @psalm-suppress TooManyArguments */
$worker = $this->workerFactory->newWorker(
$name,
$this->getWorkerOptions($name),
$this->getExceptionInterceptor($name),
$this->pipelineProvider,
);
$worker->registerActivityFinalizer(fn () => $this->finalizer->finalize());

return $worker;
}

/**
* @param non-empty-string $name
*/
private function getWorkerOptions(string $name): ?WorkerOptions
{
$worker = $this->workers[$name] ?? null;

return match (true) {
$worker instanceof WorkerOptions => $worker,
isset($worker['options']) && $worker['options'] instanceof WorkerOptions => $worker['options'],
default => null
};
}

/**
* @param non-empty-string $name
*/
private function getExceptionInterceptor(string $name): ?ExceptionInterceptorInterface
{
$worker = $this->workers[$name] ?? null;
if (!\is_array($worker) || !isset($worker['exception_interceptor'])) {
return null;
}

$exceptionInterceptor = $this->wire($worker['exception_interceptor']);
\assert($exceptionInterceptor instanceof ExceptionInterceptorInterface);

return $exceptionInterceptor;
}

private function wire(mixed $alias): object
{
return match (true) {
\is_string($alias) => $this->factory->make($alias),
$alias instanceof Autowire => $alias->resolve($this->factory),
default => $alias
};
}
}
17 changes: 17 additions & 0 deletions src/WorkerFactoryInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Temporal\Worker\WorkerInterface;

interface WorkerFactoryInterface
{
/**
* Creates a new Temporal worker.
*
* @param non-empty-string $name
*/
public function create(string $name): WorkerInterface;
}
16 changes: 10 additions & 6 deletions src/WorkersRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Spiral\Boot\FinalizerInterface;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Spiral\TemporalBridge\Exception\WorkersRegistryException;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactory;
use Temporal\Worker\WorkerInterface;
use Temporal\Worker\WorkerOptions;

Expand All @@ -18,7 +18,7 @@ final class WorkersRegistry implements WorkersRegistryInterface

/** @psalm-param array<non-empty-string, WorkerOptions> $options */
public function __construct(
private readonly WorkerFactoryInterface $workerFactory,
private readonly WorkerFactoryInterface|TemporalWorkerFactory $workerFactory,
private readonly FinalizerInterface $finalizer,
private readonly TemporalConfig $config
) {
Expand All @@ -34,18 +34,22 @@ public function register(string $name, ?WorkerOptions $options): void
);
}

$this->workers[$name] = $this->workerFactory->newWorker($name, $options);
$this->workers[$name]->registerActivityFinalizer(fn() => $this->finalizer->finalize());
if ($this->workerFactory instanceof WorkerFactoryInterface) {
$this->workers[$name] = $this->workerFactory->create($name);
} else {
$this->workers[$name] = $this->workerFactory->newWorker($name, $options);
$this->workers[$name]->registerActivityFinalizer(fn() => $this->finalizer->finalize());
}
}

public function get(string $name): WorkerInterface
{
\assert($name !== '');

$options = $this->config->getWorkers();
$options = $this->config->getWorkers()[$name] ?? null;

if (! $this->has($name)) {
$this->register($name, $options[$name] ?? null);
$this->register($name, $options instanceof WorkerOptions ? $options : null);
}

return $this->workers[$name];
Expand Down
15 changes: 15 additions & 0 deletions tests/app/src/SomeInterceptor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Tests\App;

use Temporal\Interceptor\ActivityInbound\ActivityInput;
use Temporal\Interceptor\ActivityInboundInterceptor;

final class SomeInterceptor implements ActivityInboundInterceptor
{
public function handleActivityInbound(ActivityInput $input, callable $next): mixed
{
}
}
Loading

0 comments on commit e56900a

Please sign in to comment.