Skip to content

Commit

Permalink
Adds an ability to assign multiple task queue on a workflow and activity
Browse files Browse the repository at this point in the history
  • Loading branch information
butschster committed Dec 25, 2023
1 parent c8672f2 commit 3e6a9d0
Show file tree
Hide file tree
Showing 15 changed files with 193 additions and 62 deletions.
7 changes: 5 additions & 2 deletions src/Attribute/AssignWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
/**
* @psalm-suppress DeprecatedClass
*/
#[\Attribute(\Attribute::TARGET_CLASS), NamedArgumentConstructor]
#[\Attribute(\Attribute::TARGET_CLASS | \Attribute::IS_REPEATABLE), NamedArgumentConstructor]
final class AssignWorker
{
/**
* @param string $taskQueue Task queue name.
*/
public function __construct(
public readonly string $name,
public readonly string $taskQueue,
) {
}
}
15 changes: 13 additions & 2 deletions src/Commands/InfoCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Spiral\Boot\DirectoriesInterface;
use Spiral\Console\Attribute\AsCommand;
use Spiral\Console\Attribute\Option;
use Spiral\Console\Command;
use Spiral\TemporalBridge\DeclarationLocatorInterface;
use Spiral\TemporalBridge\DeclarationWorkerResolver;
Expand All @@ -20,6 +21,9 @@
)]
final class InfoCommand extends Command
{
#[Option(name: 'show-activities', shortcut: 'a', description: 'Show activities.')]
private bool $showActivities = false;

public function perform(
DeclarationLocatorInterface $locator,
DeclarationWorkerResolver $workerResolver,
Expand All @@ -39,16 +43,20 @@ public function perform(
'class' => $declaration->getName(),
'file' => $declaration->getFileName(),
'name' => $prototype->getID(),
'task_queue' => $taskQueue,
'task_queue' => \implode(', ', $taskQueue),
];
} else {
$taskQueueShown = false;

foreach ($activityReader->fromClass($declaration->getName()) as $prototype) {
$activities[$declaration->getName()][$prototype->getID()] = [
'file' => $declaration->getFileName(),
'name' => $prototype->getID(),
'handler' => $declaration->getShortName() . '::' . $prototype->getHandler()->getName(),
'task_queue' => $taskQueue,
'task_queue' => !$taskQueueShown ? \implode(', ', $taskQueue) : '',
];

$taskQueueShown = true;
}
}
}
Expand All @@ -66,6 +74,9 @@ public function perform(
}
$table->render();

if (!$this->showActivities) {
return self::SUCCESS;
}

$this->output->title('Activities');
$table = $this->table(['Name', 'Class', 'Task Queue']);
Expand Down
34 changes: 23 additions & 11 deletions src/DeclarationWorkerResolver.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,42 @@ public function __construct(
}

/**
* Find the worker name for the given workflow or class declaration. If no worker is assigned, the default worker
* name is returned.
* Find the worker name for the given workflow or class declaration. If no worker is assigned, the default task
* queue name is returned.
*/
public function resolve(\ReflectionClass $declaration): string
public function resolve(\ReflectionClass $declaration): array
{
return $this->resolveQueueName($declaration) ?? $this->config->getDefaultWorker();
$queue = $this->resolveTaskQueues($declaration);

if ($queue !== []) {
return $queue;
}

return [$this->config->getDefaultWorker()];
}

private function resolveQueueName(\ReflectionClass $declaration): ?string
private function resolveTaskQueues(\ReflectionClass $declaration): array
{
$assignWorker = $this->reader->firstClassMetadata($declaration, AssignWorker::class);
$assignWorker = $this->reader->getClassMetadata($declaration, AssignWorker::class);

$workers = [];

foreach ($assignWorker as $worker) {
$workers[] = $worker->taskQueue;
}

if ($assignWorker !== null) {
return $assignWorker->name;
if ($workers !== []) {
return $workers;
}

$parents = $declaration->getInterfaceNames();
foreach ($parents as $parent) {
$queueName = $this->resolveQueueName(new \ReflectionClass($parent));
if ($queueName !== null) {
$queueName = $this->resolveTaskQueues(new \ReflectionClass($parent));
if ($queueName !== []) {
return $queueName;
}
}

return null;
return [];
}
}
29 changes: 16 additions & 13 deletions src/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,26 @@ public function serve(): void
$hasDeclarations = false;
foreach ($declarations as $type => $declaration) {
// Worker that listens on a task queue and hosts both workflow and activity implementations.
$queueName = $this->workerResolver->resolve($declaration);
$taskQueues = $this->workerResolver->resolve($declaration);

$worker = $registry->get($queueName);
foreach ($taskQueues as $taskQueue) {
$worker = $registry->get($taskQueue);

if ($type === WorkflowInterface::class) {
// Workflows are stateful. So you need a type to create instances.
$worker->registerWorkflowTypes($declaration->getName());
}
if ($type === WorkflowInterface::class) {
// Workflows are stateful. So you need a type to create instances.
$worker->registerWorkflowTypes($declaration->getName());
}

if ($type === ActivityInterface::class) {
// Workflows are stateful. So you need a type to create instances.
$worker->registerActivity(
$declaration->getName(),
fn(ReflectionClass $class): object => $this->container->make($class->getName()),
);
}

if ($type === ActivityInterface::class) {
// Workflows are stateful. So you need a type to create instances.
$worker->registerActivity(
$declaration->getName(),
fn(ReflectionClass $class): object => $this->container->make($class->getName()),
);
$hasDeclarations = true;
}
$hasDeclarations = true;
}

if (!$hasDeclarations) {
Expand Down
2 changes: 1 addition & 1 deletion src/Scaffolder/Declaration/ActivityDeclaration.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public function declare(): void
public function assignWorker(string $worker): void
{
$this->namespace->addUse(AssignWorker::class);
$this->class->addAttribute(AssignWorker::class, ['name' => $worker]);
$this->class->addAttribute(AssignWorker::class, ['taskQueue' => $worker]);
}

public function addMethod(string $name, string $returnType): void
Expand Down
2 changes: 1 addition & 1 deletion src/Scaffolder/Declaration/WorkflowDeclaration.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public function declare(): void
public function assignWorker(string $worker): void
{
$this->namespace->addUse(AssignWorker::class);
$this->class->addAttribute(AssignWorker::class, ['name' => $worker]);
$this->class->addAttribute(AssignWorker::class, ['taskQueue' => $worker]);
}

public function addQueryMethod(string $name, string $returnType): void
Expand Down
2 changes: 1 addition & 1 deletion tests/app/src/SomeActivity.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Spiral\TemporalBridge\Attribute\AssignWorker;

#[AssignWorker(name: 'worker1')]
#[AssignWorker(taskQueue: 'worker1')]
class SomeActivity
{
}
9 changes: 9 additions & 0 deletions tests/app/src/SomeActivityWithDefaultWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Tests\App;

class SomeActivityWithDefaultWorker
{
}
2 changes: 1 addition & 1 deletion tests/app/src/SomeWorkflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Spiral\TemporalBridge\Attribute\AssignWorker;

#[AssignWorker(name: 'worker2')]
#[AssignWorker(taskQueue: 'worker2')]
class SomeWorkflow
{
}
13 changes: 13 additions & 0 deletions tests/app/src/SomeWorkflowWithMultipleWorkers.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Tests\App;

use Spiral\TemporalBridge\Attribute\AssignWorker;

#[AssignWorker(taskQueue: 'worker1')]
#[AssignWorker(taskQueue: 'worker2')]
class SomeWorkflowWithMultipleWorkers
{
}
73 changes: 54 additions & 19 deletions tests/src/Commands/InfoCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,61 @@ public function testInfo(): void
Workflows
=========
+-----------------+------------------------------------------------------+------------+
| Name | Class | Task Queue |
+-----------------+------------------------------------------------------+------------+
| fooWorkflow | Spiral\TemporalBridge\Tests\Commands\Workflow | worker2 |
| | src/Commands/InfoCommandTest.php | |
| AnotherWorkflow | Spiral\TemporalBridge\Tests\Commands\AnotherWorkflow | default |
| | src/Commands/InfoCommandTest.php | |
+-----------------+------------------------------------------------------+------------+
+-----------------+------------------------------------------------------+------------------+
| Name | Class | Task Queue |
+-----------------+------------------------------------------------------+------------------+
| fooWorkflow | Spiral\TemporalBridge\Tests\Commands\Workflow | worker2 |
| | src/Commands/InfoCommandTest.php | |
| AnotherWorkflow | Spiral\TemporalBridge\Tests\Commands\AnotherWorkflow | default, worker2 |
| | src/Commands/InfoCommandTest.php | |
+-----------------+------------------------------------------------------+------------------+

OUTPUT,
$result,
);
}

public function testInfoWithActivities(): void
{
$result = $this->runCommand('temporal:info', [
'--show-activities' => true,
]);

$this->assertSame(
<<<'OUTPUT'
Workflows
=========
+-----------------+------------------------------------------------------+------------------+
| Name | Class | Task Queue |
+-----------------+------------------------------------------------------+------------------+
| fooWorkflow | Spiral\TemporalBridge\Tests\Commands\Workflow | worker2 |
| | src/Commands/InfoCommandTest.php | |
| AnotherWorkflow | Spiral\TemporalBridge\Tests\Commands\AnotherWorkflow | default, worker2 |
| | src/Commands/InfoCommandTest.php | |
+-----------------+------------------------------------------------------+------------------+
Activities
==========
+----------------+-------------------------------------+------------+
| Name | Class | Task Queue |
+----------------+-------------------------------------+------------+
| fooActivity | ActivityInterfaceWithWorker::foo | worker1 |
| bar | ActivityInterfaceWithWorker::bar | worker1 |
+----------------+-------------------------------------+------------+
| fooActivitybaz | ActivityInterfaceWithoutWorker::baz | default |
+----------------+-------------------------------------+------------+
+------------------------+---------------------------------------------+------------+
| Name | Class | Task Queue |
+------------------------+---------------------------------------------+------------+
| fooActivity | ActivityInterfaceWithWorker::foo | worker1 |
| bar | ActivityInterfaceWithWorker::bar | |
+------------------------+---------------------------------------------+------------+
| fooActivity__construct | ActivityInterfaceWithoutWorker::__construct | default |
| fooActivitybaz | ActivityInterfaceWithoutWorker::baz | |
+------------------------+---------------------------------------------+------------+

OUTPUT,
$result,
);
}
}

#[AssignWorker(name: 'worker1')]
#[AssignWorker(taskQueue: 'worker1')]
#[ActivityInterface]
class ActivityInterfaceWithWorker
{
Expand All @@ -83,14 +110,21 @@ public function bar(): void
#[ActivityInterface('fooActivity')]
class ActivityInterfaceWithoutWorker
{
public function __construct()
{
}

#[ActivityMethod]
public function baz(): void
{
}

private function baf(): void
{
}
}

#[AssignWorker(name: 'worker2')]
#[AssignWorker(taskQueue: 'worker2')]
#[WorkflowInterface]
class Workflow
{
Expand All @@ -100,7 +134,8 @@ public function handle()
}
}

#[AssignWorker(name: 'default')]
#[AssignWorker(taskQueue: 'default')]
#[AssignWorker(taskQueue: 'worker2')]
#[WorkflowInterface]
class AnotherWorkflow
{
Expand Down
2 changes: 1 addition & 1 deletion tests/src/Commands/Scaffolder/ActivityCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public function testGenerateWithAssignedWorker(): void
use Temporal\Activity\ActivityMethod;
#[ActivityInterface]
#[AssignWorker(name: 'scanner_service')]
#[AssignWorker(taskQueue: 'scanner_service')]
class PaymentActivity
{
}
Expand Down
2 changes: 1 addition & 1 deletion tests/src/Commands/Scaffolder/WorkflowCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public function testGenerateWithWorker(): void
use Temporal\Workflow\WorkflowMethod;
#[WorkflowInterface]
#[AssignWorker(name: 'test')]
#[AssignWorker(taskQueue: 'test')]
class PaymentWorkflow
{
/**
Expand Down
Loading

0 comments on commit 3e6a9d0

Please sign in to comment.