From 3e6a9d0d670e3aa7f97d1f4cbf2f38c51f479e74 Mon Sep 17 00:00:00 2001 From: butschster Date: Mon, 25 Dec 2023 10:45:26 +0400 Subject: [PATCH] Adds an ability to assign multiple task queue on a workflow and activity --- src/Attribute/AssignWorker.php | 7 +- src/Commands/InfoCommand.php | 15 +++- src/DeclarationWorkerResolver.php | 34 ++++++--- src/Dispatcher.php | 29 ++++---- .../Declaration/ActivityDeclaration.php | 2 +- .../Declaration/WorkflowDeclaration.php | 2 +- tests/app/src/SomeActivity.php | 2 +- .../app/src/SomeActivityWithDefaultWorker.php | 9 +++ tests/app/src/SomeWorkflow.php | 2 +- .../src/SomeWorkflowWithMultipleWorkers.php | 13 ++++ tests/src/Commands/InfoCommandTest.php | 73 ++++++++++++++----- .../Scaffolder/ActivityCommandTest.php | 2 +- .../Scaffolder/WorkflowCommandTest.php | 2 +- tests/src/DeclarationWorkerResolverTest.php | 23 +++++- tests/src/DispatcherTest.php | 40 ++++++++-- 15 files changed, 193 insertions(+), 62 deletions(-) create mode 100644 tests/app/src/SomeActivityWithDefaultWorker.php create mode 100644 tests/app/src/SomeWorkflowWithMultipleWorkers.php diff --git a/src/Attribute/AssignWorker.php b/src/Attribute/AssignWorker.php index b7f8028..ee38ccc 100644 --- a/src/Attribute/AssignWorker.php +++ b/src/Attribute/AssignWorker.php @@ -9,11 +9,14 @@ /** * @psalm-suppress DeprecatedClass */ -#[\Attribute(\Attribute::TARGET_CLASS), NamedArgumentConstructor] +#[\Attribute(\Attribute::TARGET_CLASS | \Attribute::IS_REPEATABLE), NamedArgumentConstructor] final class AssignWorker { + /** + * @param string $taskQueue Task queue name. + */ public function __construct( - public readonly string $name, + public readonly string $taskQueue, ) { } } diff --git a/src/Commands/InfoCommand.php b/src/Commands/InfoCommand.php index e0174cc..a4eb4e5 100644 --- a/src/Commands/InfoCommand.php +++ b/src/Commands/InfoCommand.php @@ -6,6 +6,7 @@ use Spiral\Boot\DirectoriesInterface; use Spiral\Console\Attribute\AsCommand; +use Spiral\Console\Attribute\Option; use Spiral\Console\Command; use Spiral\TemporalBridge\DeclarationLocatorInterface; use Spiral\TemporalBridge\DeclarationWorkerResolver; @@ -20,6 +21,9 @@ )] final class InfoCommand extends Command { + #[Option(name: 'show-activities', shortcut: 'a', description: 'Show activities.')] + private bool $showActivities = false; + public function perform( DeclarationLocatorInterface $locator, DeclarationWorkerResolver $workerResolver, @@ -39,16 +43,20 @@ public function perform( 'class' => $declaration->getName(), 'file' => $declaration->getFileName(), 'name' => $prototype->getID(), - 'task_queue' => $taskQueue, + 'task_queue' => \implode(', ', $taskQueue), ]; } else { + $taskQueueShown = false; + foreach ($activityReader->fromClass($declaration->getName()) as $prototype) { $activities[$declaration->getName()][$prototype->getID()] = [ 'file' => $declaration->getFileName(), 'name' => $prototype->getID(), 'handler' => $declaration->getShortName() . '::' . $prototype->getHandler()->getName(), - 'task_queue' => $taskQueue, + 'task_queue' => !$taskQueueShown ? \implode(', ', $taskQueue) : '', ]; + + $taskQueueShown = true; } } } @@ -66,6 +74,9 @@ public function perform( } $table->render(); + if (!$this->showActivities) { + return self::SUCCESS; + } $this->output->title('Activities'); $table = $this->table(['Name', 'Class', 'Task Queue']); diff --git a/src/DeclarationWorkerResolver.php b/src/DeclarationWorkerResolver.php index e680c1f..f0cf6ad 100644 --- a/src/DeclarationWorkerResolver.php +++ b/src/DeclarationWorkerResolver.php @@ -17,30 +17,42 @@ public function __construct( } /** - * Find the worker name for the given workflow or class declaration. If no worker is assigned, the default worker - * name is returned. + * Find the worker name for the given workflow or class declaration. If no worker is assigned, the default task + * queue name is returned. */ - public function resolve(\ReflectionClass $declaration): string + public function resolve(\ReflectionClass $declaration): array { - return $this->resolveQueueName($declaration) ?? $this->config->getDefaultWorker(); + $queue = $this->resolveTaskQueues($declaration); + + if ($queue !== []) { + return $queue; + } + + return [$this->config->getDefaultWorker()]; } - private function resolveQueueName(\ReflectionClass $declaration): ?string + private function resolveTaskQueues(\ReflectionClass $declaration): array { - $assignWorker = $this->reader->firstClassMetadata($declaration, AssignWorker::class); + $assignWorker = $this->reader->getClassMetadata($declaration, AssignWorker::class); + + $workers = []; + + foreach ($assignWorker as $worker) { + $workers[] = $worker->taskQueue; + } - if ($assignWorker !== null) { - return $assignWorker->name; + if ($workers !== []) { + return $workers; } $parents = $declaration->getInterfaceNames(); foreach ($parents as $parent) { - $queueName = $this->resolveQueueName(new \ReflectionClass($parent)); - if ($queueName !== null) { + $queueName = $this->resolveTaskQueues(new \ReflectionClass($parent)); + if ($queueName !== []) { return $queueName; } } - return null; + return []; } } diff --git a/src/Dispatcher.php b/src/Dispatcher.php index b7b55e1..bc643b4 100644 --- a/src/Dispatcher.php +++ b/src/Dispatcher.php @@ -43,23 +43,26 @@ public function serve(): void $hasDeclarations = false; foreach ($declarations as $type => $declaration) { // Worker that listens on a task queue and hosts both workflow and activity implementations. - $queueName = $this->workerResolver->resolve($declaration); + $taskQueues = $this->workerResolver->resolve($declaration); - $worker = $registry->get($queueName); + foreach ($taskQueues as $taskQueue) { + $worker = $registry->get($taskQueue); - if ($type === WorkflowInterface::class) { - // Workflows are stateful. So you need a type to create instances. - $worker->registerWorkflowTypes($declaration->getName()); - } + if ($type === WorkflowInterface::class) { + // Workflows are stateful. So you need a type to create instances. + $worker->registerWorkflowTypes($declaration->getName()); + } + + if ($type === ActivityInterface::class) { + // Workflows are stateful. So you need a type to create instances. + $worker->registerActivity( + $declaration->getName(), + fn(ReflectionClass $class): object => $this->container->make($class->getName()), + ); + } - if ($type === ActivityInterface::class) { - // Workflows are stateful. So you need a type to create instances. - $worker->registerActivity( - $declaration->getName(), - fn(ReflectionClass $class): object => $this->container->make($class->getName()), - ); + $hasDeclarations = true; } - $hasDeclarations = true; } if (!$hasDeclarations) { diff --git a/src/Scaffolder/Declaration/ActivityDeclaration.php b/src/Scaffolder/Declaration/ActivityDeclaration.php index 5fb09bc..8dd65bc 100644 --- a/src/Scaffolder/Declaration/ActivityDeclaration.php +++ b/src/Scaffolder/Declaration/ActivityDeclaration.php @@ -42,7 +42,7 @@ public function declare(): void public function assignWorker(string $worker): void { $this->namespace->addUse(AssignWorker::class); - $this->class->addAttribute(AssignWorker::class, ['name' => $worker]); + $this->class->addAttribute(AssignWorker::class, ['taskQueue' => $worker]); } public function addMethod(string $name, string $returnType): void diff --git a/src/Scaffolder/Declaration/WorkflowDeclaration.php b/src/Scaffolder/Declaration/WorkflowDeclaration.php index 93b7784..f572aa9 100644 --- a/src/Scaffolder/Declaration/WorkflowDeclaration.php +++ b/src/Scaffolder/Declaration/WorkflowDeclaration.php @@ -49,7 +49,7 @@ public function declare(): void public function assignWorker(string $worker): void { $this->namespace->addUse(AssignWorker::class); - $this->class->addAttribute(AssignWorker::class, ['name' => $worker]); + $this->class->addAttribute(AssignWorker::class, ['taskQueue' => $worker]); } public function addQueryMethod(string $name, string $returnType): void diff --git a/tests/app/src/SomeActivity.php b/tests/app/src/SomeActivity.php index d17fbba..c68a049 100644 --- a/tests/app/src/SomeActivity.php +++ b/tests/app/src/SomeActivity.php @@ -6,7 +6,7 @@ use Spiral\TemporalBridge\Attribute\AssignWorker; -#[AssignWorker(name: 'worker1')] +#[AssignWorker(taskQueue: 'worker1')] class SomeActivity { } diff --git a/tests/app/src/SomeActivityWithDefaultWorker.php b/tests/app/src/SomeActivityWithDefaultWorker.php new file mode 100644 index 0000000..dcb6f01 --- /dev/null +++ b/tests/app/src/SomeActivityWithDefaultWorker.php @@ -0,0 +1,9 @@ +runCommand('temporal:info', [ + '--show-activities' => true, + ]); + + $this->assertSame( + <<<'OUTPUT' + +Workflows +========= + ++-----------------+------------------------------------------------------+------------------+ +| Name | Class | Task Queue | ++-----------------+------------------------------------------------------+------------------+ +| fooWorkflow | Spiral\TemporalBridge\Tests\Commands\Workflow | worker2 | +| | src/Commands/InfoCommandTest.php | | +| AnotherWorkflow | Spiral\TemporalBridge\Tests\Commands\AnotherWorkflow | default, worker2 | +| | src/Commands/InfoCommandTest.php | | ++-----------------+------------------------------------------------------+------------------+ Activities ========== -+----------------+-------------------------------------+------------+ -| Name | Class | Task Queue | -+----------------+-------------------------------------+------------+ -| fooActivity | ActivityInterfaceWithWorker::foo | worker1 | -| bar | ActivityInterfaceWithWorker::bar | worker1 | -+----------------+-------------------------------------+------------+ -| fooActivitybaz | ActivityInterfaceWithoutWorker::baz | default | -+----------------+-------------------------------------+------------+ ++------------------------+---------------------------------------------+------------+ +| Name | Class | Task Queue | ++------------------------+---------------------------------------------+------------+ +| fooActivity | ActivityInterfaceWithWorker::foo | worker1 | +| bar | ActivityInterfaceWithWorker::bar | | ++------------------------+---------------------------------------------+------------+ +| fooActivity__construct | ActivityInterfaceWithoutWorker::__construct | default | +| fooActivitybaz | ActivityInterfaceWithoutWorker::baz | | ++------------------------+---------------------------------------------+------------+ OUTPUT, $result, @@ -64,7 +91,7 @@ public function testInfo(): void } } -#[AssignWorker(name: 'worker1')] +#[AssignWorker(taskQueue: 'worker1')] #[ActivityInterface] class ActivityInterfaceWithWorker { @@ -83,14 +110,21 @@ public function bar(): void #[ActivityInterface('fooActivity')] class ActivityInterfaceWithoutWorker { + public function __construct() + { + } #[ActivityMethod] public function baz(): void { } + + private function baf(): void + { + } } -#[AssignWorker(name: 'worker2')] +#[AssignWorker(taskQueue: 'worker2')] #[WorkflowInterface] class Workflow { @@ -100,7 +134,8 @@ public function handle() } } -#[AssignWorker(name: 'default')] +#[AssignWorker(taskQueue: 'default')] +#[AssignWorker(taskQueue: 'worker2')] #[WorkflowInterface] class AnotherWorkflow { diff --git a/tests/src/Commands/Scaffolder/ActivityCommandTest.php b/tests/src/Commands/Scaffolder/ActivityCommandTest.php index 280b91b..143406d 100644 --- a/tests/src/Commands/Scaffolder/ActivityCommandTest.php +++ b/tests/src/Commands/Scaffolder/ActivityCommandTest.php @@ -70,7 +70,7 @@ public function testGenerateWithAssignedWorker(): void use Temporal\Activity\ActivityMethod; #[ActivityInterface] -#[AssignWorker(name: 'scanner_service')] +#[AssignWorker(taskQueue: 'scanner_service')] class PaymentActivity { } diff --git a/tests/src/Commands/Scaffolder/WorkflowCommandTest.php b/tests/src/Commands/Scaffolder/WorkflowCommandTest.php index 1a86f08..f1aa4e4 100644 --- a/tests/src/Commands/Scaffolder/WorkflowCommandTest.php +++ b/tests/src/Commands/Scaffolder/WorkflowCommandTest.php @@ -117,7 +117,7 @@ public function testGenerateWithWorker(): void use Temporal\Workflow\WorkflowMethod; #[WorkflowInterface] -#[AssignWorker(name: 'test')] +#[AssignWorker(taskQueue: 'test')] class PaymentWorkflow { /** diff --git a/tests/src/DeclarationWorkerResolverTest.php b/tests/src/DeclarationWorkerResolverTest.php index 38ffd1e..73be3de 100644 --- a/tests/src/DeclarationWorkerResolverTest.php +++ b/tests/src/DeclarationWorkerResolverTest.php @@ -29,7 +29,16 @@ public function testResolvingQueueNameWithAttributeOnClass(): void new \ReflectionClass(ActivityInterfaceWithAttribute::class), ); - $this->assertSame('worker1', $queue); + $this->assertSame(['worker1'], $queue); + } + + public function testResolvingQueueNameWithMultipleAttributeOnClass(): void + { + $queue = $this->resolver->resolve( + new \ReflectionClass(ActivityInterfaceWithMultipleAttributes::class), + ); + + $this->assertSame(['worker1', 'worker2'], $queue); } public function testResolvingQueueNameWithAttributeOnParentClass(): void @@ -38,7 +47,7 @@ public function testResolvingQueueNameWithAttributeOnParentClass(): void new \ReflectionClass(ActivityClass::class), ); - $this->assertSame('worker1', $queue); + $this->assertSame(['worker1'], $queue); } public function testResolvingQueueNameWithoutAttribute(): void @@ -47,15 +56,21 @@ public function testResolvingQueueNameWithoutAttribute(): void new \ReflectionClass(ActivityInterfaceWithoutAttribute::class), ); - $this->assertSame('foo', $queue); + $this->assertSame(['foo'], $queue); } } -#[AssignWorker(name: 'worker1')] +#[AssignWorker(taskQueue: 'worker1')] interface ActivityInterfaceWithAttribute { } +#[AssignWorker(taskQueue: 'worker1')] +#[AssignWorker(taskQueue: 'worker2')] +interface ActivityInterfaceWithMultipleAttributes +{ +} + interface ActivityInterfaceWithoutAttribute { diff --git a/tests/src/DispatcherTest.php b/tests/src/DispatcherTest.php index ccfdfab..1addc89 100644 --- a/tests/src/DispatcherTest.php +++ b/tests/src/DispatcherTest.php @@ -4,14 +4,19 @@ namespace Spiral\TemporalBridge\Tests; +use Mockery as m; use Spiral\Attributes\AttributeReader; use Spiral\RoadRunnerBridge\RoadRunnerMode; use Spiral\TemporalBridge\Config\TemporalConfig; use Spiral\TemporalBridge\DeclarationLocatorInterface; use Spiral\TemporalBridge\DeclarationWorkerResolver; use Spiral\TemporalBridge\Dispatcher; +use Spiral\TemporalBridge\Tests\App\SomeActivity; +use Spiral\TemporalBridge\Tests\App\SomeActivityWithDefaultWorker; use Spiral\TemporalBridge\Tests\App\SomeWorkflow; +use Spiral\TemporalBridge\Tests\App\SomeWorkflowWithMultipleWorkers; use Spiral\TemporalBridge\WorkersRegistryInterface; +use Temporal\Activity\ActivityInterface; use Temporal\Worker\WorkerFactoryInterface; use Temporal\Worker\WorkerInterface; use Temporal\Workflow\WorkflowInterface; @@ -30,7 +35,7 @@ protected function setUp(): void new DeclarationWorkerResolver( new AttributeReader(), new TemporalConfig(['defaultWorker' => 'foo']), - ) + ), ); } @@ -55,16 +60,41 @@ public function testServeWithoutDeclarations(): void public function testServeWithDeclarations(): void { $locator = $this->mockContainer(DeclarationLocatorInterface::class); - $locator->shouldReceive('getDeclarations')->once()->andReturn([ - WorkflowInterface::class => new \ReflectionClass(SomeWorkflow::class), - ]); + $locator->shouldReceive('getDeclarations')->once()->andReturnUsing(function () { + yield WorkflowInterface::class => new \ReflectionClass(SomeWorkflow::class); + yield WorkflowInterface::class => new \ReflectionClass(SomeWorkflowWithMultipleWorkers::class); + yield ActivityInterface::class => new \ReflectionClass(SomeActivity::class); + yield ActivityInterface::class => new \ReflectionClass(SomeActivityWithDefaultWorker::class); + }); $registry = $this->mockContainer(WorkersRegistryInterface::class); + $registry ->shouldReceive('get') ->once() + ->with('foo') + ->andReturn($worker = m::mock(WorkerInterface::class)); + + $worker->shouldReceive('registerActivity')->once()->withSomeOfArgs(SomeActivityWithDefaultWorker::class); + + $registry + ->shouldReceive('get') + ->twice() ->with('worker2') - ->andReturn($this->createMock(WorkerInterface::class)); + ->andReturn($worker = m::mock(WorkerInterface::class)); + + $worker->shouldReceive('registerWorkflowTypes')->once()->with(SomeWorkflow::class); + $worker->shouldReceive('registerWorkflowTypes')->once()->with(SomeWorkflowWithMultipleWorkers::class); + + $registry + ->shouldReceive('get') + ->twice() + ->with('worker1') + ->andReturn($worker = m::mock(WorkerInterface::class)); + + $worker->shouldReceive('registerWorkflowTypes')->once()->with(SomeWorkflowWithMultipleWorkers::class); + $worker->shouldReceive('registerActivity')->once()->withSomeOfArgs(SomeActivity::class); + $factory = $this->mockContainer(WorkerFactoryInterface::class); $factory->shouldReceive('run')->once();