Skip to content

Commit

Permalink
Merge branch '3.x' into feature/info-command
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/Bootloader/TemporalBridgeBootloader.php
#	src/Dispatcher.php
#	src/WorkersRegistry.php
#	tests/src/DispatcherTest.php
  • Loading branch information
butschster committed Dec 27, 2023
2 parents ca3e557 + a55d7b2 commit 6cc12a9
Show file tree
Hide file tree
Showing 14 changed files with 631 additions and 85 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/psalm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ on:
name: static analysis

jobs:
psalm:
uses: spiral/gh-actions/.github/workflows/psalm.yml@master
with:
os: >-
['ubuntu-latest']
psalm:
uses: spiral/gh-actions/.github/workflows/psalm.yml@master
with:
os: >-
['ubuntu-latest']
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"spiral/tokenizer": "^3.0",
"spiral/scaffolder": "^3.0",
"spiral/roadrunner-bridge": "^2.0 || ^3.0",
"temporal/sdk": "^1.3 || ^2.0"
"temporal/sdk": "^2.7"
},
"require-dev": {
"spiral/framework": "^3.0",
Expand Down
2 changes: 1 addition & 1 deletion src/Bootloader/PrototypeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Spiral\Prototype\Bootloader\PrototypeBootloader as BasePrototypeBootloader;
use Temporal\Client\WorkflowClientInterface;

class PrototypeBootloader extends Bootloader
final class PrototypeBootloader extends Bootloader
{
public function defineDependencies(): array
{
Expand Down
117 changes: 93 additions & 24 deletions src/Bootloader/TemporalBridgeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,41 @@
use Spiral\Boot\AbstractKernel;
use Spiral\Boot\Bootloader\Bootloader;
use Spiral\Boot\EnvironmentInterface;
use Spiral\Boot\FinalizerInterface;
use Spiral\Config\ConfiguratorInterface;
use Spiral\Config\Patch\Append;
use Spiral\Console\Bootloader\ConsoleBootloader;
use Spiral\Core\Container\Autowire;
use Spiral\Core\FactoryInterface;
use Spiral\RoadRunnerBridge\Bootloader\RoadRunnerBootloader;
use Spiral\TemporalBridge\Commands;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Spiral\TemporalBridge\DeclarationLocator;
use Spiral\TemporalBridge\DeclarationLocatorInterface;
use Spiral\TemporalBridge\Dispatcher;
use Spiral\TemporalBridge\WorkerFactory;
use Spiral\TemporalBridge\WorkerFactoryInterface;
use Spiral\TemporalBridge\WorkersRegistry;
use Spiral\TemporalBridge\WorkersRegistryInterface;
use Spiral\Tokenizer\ClassesInterface;
use Temporal\Client\ClientOptions;
use Temporal\Client\GRPC\ServiceClient;
use Temporal\Client\WorkflowClient;
use Temporal\Client\WorkflowClientInterface;
use Temporal\DataConverter\DataConverter;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Interceptor\PipelineProvider;
use Temporal\Interceptor\SimplePipelineProvider;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Worker\Transport\Goridge;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerFactoryInterface as TemporalWorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;
use Temporal\WorkerFactory;
use Temporal\WorkerFactory as TemporalWorkerFactory;
use Temporal\Client\ScheduleClient;
use Temporal\Client\ScheduleClientInterface;
use Temporal\Client\GRPC\ServiceClientInterface;

/**
* @psalm-import-type TInterceptor from TemporalConfig
*/
class TemporalBridgeBootloader extends Bootloader
{
public function defineDependencies(): array
Expand All @@ -46,57 +56,94 @@ public function defineDependencies(): array
public function defineSingletons(): array
{
return [
WorkerFactoryInterface::class => [self::class, 'initWorkerFactory'],
TemporalWorkerFactoryInterface::class => [self::class, 'initWorkerFactory'],
WorkerFactoryInterface::class => WorkerFactory::class,
DeclarationLocatorInterface::class => [self::class, 'initDeclarationLocator'],
WorkflowClientInterface::class => [self::class, 'initWorkflowClient'],
WorkersRegistryInterface::class => [self::class, 'initWorkersRegistry'],
WorkersRegistryInterface::class => WorkersRegistry::class,
ScheduleClientInterface::class => [self::class, 'initScheduleClient'],
DataConverterInterface::class => [self::class, 'initDataConverter'],
PipelineProvider::class => [self::class, 'initPipelineProvider'],
ServiceClientInterface::class => [self::class, 'initServiceClient'],
];
}

public function __construct(
private readonly ConfiguratorInterface $config,
private readonly FactoryInterface $factory,
) {
}

public function init(
AbstractKernel $kernel,
EnvironmentInterface $env,
FactoryInterface $factory,
ConsoleBootloader $console,
): void {
$this->initConfig($env);

$kernel->addDispatcher($factory->make(Dispatcher::class));
$console->addCommand(Commands\InfoCommand::class);
$kernel->addDispatcher($this->factory->make(Dispatcher::class));
}

public function addWorkerOptions(string $worker, WorkerOptions $options): void
{
$this->config->modify(TemporalConfig::CONFIG, new Append('workers', $worker, $options));
}

/**
* Register a new Temporal interceptor.
*
* @param TInterceptor $interceptor
*/
public function addInterceptor(string|Interceptor|Autowire $interceptor): void
{
if (\is_string($interceptor)) {
$interceptor = $this->factory->make($interceptor);
} elseif ($interceptor instanceof Autowire) {
$interceptor = $interceptor->resolve($this->factory);
}

if (!$interceptor instanceof Interceptor) {
throw new \InvalidArgumentException(
\sprintf(
'Interceptor must be an instance of `%s`, `%s` given.',
Interceptor::class,
\get_class($interceptor),
),
);
}

$this->config->modify(TemporalConfig::CONFIG, new Append('interceptors', null, $interceptor));
}

protected function initConfig(EnvironmentInterface $env): void
{
$this->config->setDefaults(
TemporalConfig::CONFIG,
[
'address' => $env->get('TEMPORAL_ADDRESS', '127.0.0.1:7233'),
'namespace' => 'App\\Endpoint\\Temporal\\Workflow',
'defaultWorker' => (string)$env->get('TEMPORAL_TASK_QUEUE', WorkerFactoryInterface::DEFAULT_TASK_QUEUE),
'defaultWorker' => (string)$env->get(
'TEMPORAL_TASK_QUEUE',
TemporalWorkerFactoryInterface::DEFAULT_TASK_QUEUE,
),
'workers' => [],
'clientOptions' => null,
],
);
}

protected function initWorkflowClient(
TemporalConfig $config,
DataConverterInterface $dataConverter,
PipelineProvider $pipelineProvider,
ServiceClientInterface $serviceClient,
): WorkflowClientInterface {
return WorkflowClient::create(
serviceClient: ServiceClient::create($config->getAddress()),
options: (new ClientOptions())->withNamespace($config->getTemporalNamespace()),
return new WorkflowClient(
serviceClient: $serviceClient,
options: $config->getClientOptions(),
converter: $dataConverter,
interceptorProvider: $pipelineProvider,
);
}

Expand All @@ -105,29 +152,51 @@ protected function initDataConverter(): DataConverterInterface
return DataConverter::createDefault();
}

protected function initWorkerFactory(
DataConverterInterface $dataConverter,
): WorkerFactoryInterface {
return new WorkerFactory(
protected function initWorkerFactory(DataConverterInterface $dataConverter,): TemporalWorkerFactoryInterface
{
return new TemporalWorkerFactory(
dataConverter: $dataConverter,
rpc: Goridge::create(),
);
}

protected function initDeclarationLocator(
ClassesInterface $classes,
): DeclarationLocatorInterface {
protected function initDeclarationLocator(ClassesInterface $classes,): DeclarationLocatorInterface
{
return new DeclarationLocator(
classes: $classes,
reader: new AttributeReader(),
);
}

protected function initWorkersRegistry(
WorkerFactoryInterface $workerFactory,
FinalizerInterface $finalizer,
protected function initPipelineProvider(TemporalConfig $config, FactoryInterface $factory): PipelineProvider
{
/** @var Interceptor[] $interceptors */
$interceptors = \array_map(
static fn(mixed $interceptor) => match (true) {
\is_string($interceptor) => $factory->make($interceptor),
$interceptor instanceof Autowire => $interceptor->resolve($factory),
default => $interceptor
},
$config->getInterceptors(),
);

return new SimplePipelineProvider($interceptors);
}

protected function initServiceClient(TemporalConfig $config): ServiceClientInterface
{
return ServiceClient::create($config->getAddress());
}

protected function initScheduleClient(
TemporalConfig $config,
): WorkersRegistryInterface {
return new WorkersRegistry($workerFactory, $finalizer, $config);
DataConverterInterface $dataConverter,
ServiceClientInterface $serviceClient,
): ScheduleClientInterface {
return new ScheduleClient(
serviceClient: $serviceClient,
options: $config->getClientOptions(),
converter: $dataConverter,
);
}
}
55 changes: 53 additions & 2 deletions src/Config/TemporalConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,32 @@

namespace Spiral\TemporalBridge\Config;

use Spiral\Core\Container\Autowire;
use Spiral\Core\InjectableConfig;
use Temporal\Client\ClientOptions;
use Temporal\Exception\ExceptionInterceptorInterface;
use Temporal\Internal\Interceptor\Interceptor;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Worker\WorkerOptions;

/**
* @psalm-type TInterceptor = Interceptor|class-string<Interceptor>|Autowire<Interceptor>
* @psalm-type TExceptionInterceptor = ExceptionInterceptorInterface|class-string<ExceptionInterceptorInterface>|Autowire<ExceptionInterceptorInterface>
* @psalm-type TWorker = array{
* options?: WorkerOptions,
* exception_interceptor?: TExceptionInterceptor
* }
*
* @property array{
* address: non-empty-string,
* namespace: non-empty-string,
* temporalNamespace: non-empty-string,
* defaultWorker: non-empty-string,
* workers: array<non-empty-string, WorkerOptions|TWorker>,
* interceptors?: TInterceptor[],
* clientOptions?: ClientOptions
* } $config
*/
final class TemporalConfig extends InjectableConfig
{
public const CONFIG = 'temporal';
Expand All @@ -18,31 +40,60 @@ final class TemporalConfig extends InjectableConfig
'temporalNamespace' => 'default',
'defaultWorker' => WorkerFactoryInterface::DEFAULT_TASK_QUEUE,
'workers' => [],
'interceptors' => [],
'clientOptions' => null,
];

/**
* @return non-empty-string
*/
public function getDefaultNamespace(): string
{
return $this->config['namespace'];
}

/**
* @return non-empty-string
*/
public function getTemporalNamespace(): string
{
return $this->config['temporalNamespace'];
}

/**
* @return non-empty-string
*/
public function getAddress(): string
{
return $this->config['address'];
}

/**
* @return non-empty-string
*/
public function getDefaultWorker(): string
{
return $this->config['defaultWorker'];
}

/** @psalm-return array<non-empty-string, WorkerOptions> */
/**
* @return array<non-empty-string, WorkerOptions|TWorker>
*/
public function getWorkers(): array
{
return (array) $this->config['workers'];
return $this->config['workers'] ?? [];
}

/**
* @return TInterceptor[]
*/
public function getInterceptors(): array
{
return $this->config['interceptors'] ?? [];
}

public function getClientOptions(): ClientOptions
{
return $this->config['clientOptions'] ?? (new ClientOptions())->withNamespace($this->getTemporalNamespace());
}
}
Loading

0 comments on commit 6cc12a9

Please sign in to comment.