Skip to content

Commit

Permalink
Adds temporal:info command
Browse files Browse the repository at this point in the history
  • Loading branch information
butschster committed Dec 22, 2023
1 parent 66f8754 commit c8672f2
Show file tree
Hide file tree
Showing 11 changed files with 340 additions and 100 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ docs
vendor
node_modules
.php-cs-fixer.cache
tests/runtime
3 changes: 2 additions & 1 deletion src/Bootloader/TemporalBridgeBootloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class TemporalBridgeBootloader extends Bootloader
public function defineDependencies(): array
{
return [
ConsoleBootloader::class,
RoadRunnerBootloader::class,
ScaffolderBootloader::class,
];
Expand All @@ -64,10 +63,12 @@ public function init(
AbstractKernel $kernel,
EnvironmentInterface $env,
FactoryInterface $factory,
ConsoleBootloader $console,
): void {
$this->initConfig($env);

$kernel->addDispatcher($factory->make(Dispatcher::class));
$console->addCommand(Commands\InfoCommand::class);
}

public function addWorkerOptions(string $worker, WorkerOptions $options): void
Expand Down
88 changes: 88 additions & 0 deletions src/Commands/InfoCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge\Commands;

use Spiral\Boot\DirectoriesInterface;
use Spiral\Console\Attribute\AsCommand;
use Spiral\Console\Command;
use Spiral\TemporalBridge\DeclarationLocatorInterface;
use Spiral\TemporalBridge\DeclarationWorkerResolver;
use Symfony\Component\Console\Helper\TableSeparator;
use Temporal\Internal\Declaration\Reader\ActivityReader;
use Temporal\Internal\Declaration\Reader\WorkflowReader;
use Temporal\Workflow\WorkflowInterface;

#[AsCommand(
name: 'temporal:info',
description: 'Show information about registered temporal workflows and activities.',
)]
final class InfoCommand extends Command
{
public function perform(
DeclarationLocatorInterface $locator,
DeclarationWorkerResolver $workerResolver,
WorkflowReader $workflowReader,
ActivityReader $activityReader,
DirectoriesInterface $dir,
): int {
$workflows = [];
$activities = [];

foreach ($locator->getDeclarations() as $type => $declaration) {
$taskQueue = $workerResolver->resolve($declaration);

if ($type === WorkflowInterface::class) {
$prototype = $workflowReader->fromClass($declaration->getName());
$workflows[$prototype->getID()] = [
'class' => $declaration->getName(),
'file' => $declaration->getFileName(),
'name' => $prototype->getID(),
'task_queue' => $taskQueue,
];
} else {
foreach ($activityReader->fromClass($declaration->getName()) as $prototype) {
$activities[$declaration->getName()][$prototype->getID()] = [
'file' => $declaration->getFileName(),
'name' => $prototype->getID(),
'handler' => $declaration->getShortName() . '::' . $prototype->getHandler()->getName(),
'task_queue' => $taskQueue,
];
}
}
}

$rootDir = \realpath($dir->get('root')) . '/';

$this->output->title('Workflows');
$table = $this->table(['Name', 'Class', 'Task Queue']);
foreach ($workflows as $workflow) {
$table->addRow([
\sprintf('<fg=green>%s</>', $workflow['name']),
$workflow['class'] . "\n" . \sprintf('<fg=blue>%s</>', \str_replace($rootDir, '', $workflow['file'])),
$workflow['task_queue'],
]);
}
$table->render();


$this->output->title('Activities');
$table = $this->table(['Name', 'Class', 'Task Queue']);
foreach ($activities as $class => $prototypes) {
foreach ($prototypes as $prototype) {
$table->addRow([
$prototype['name'],
$prototype['handler'],
$prototype['task_queue'],
]);
}
if (\end($activities) !== $prototypes) {
$table->addRow(new TableSeparator());
}
}
$table->render();

return self::SUCCESS;
}
}
8 changes: 8 additions & 0 deletions src/DeclarationLocatorInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@

namespace Spiral\TemporalBridge;

use Temporal\Activity\ActivityInterface;
use Temporal\Workflow\WorkflowInterface;

interface DeclarationLocatorInterface
{
/**
* List of all declarations for workflows and activities.
*
* @return iterable<class-string<WorkflowInterface>|class-string<ActivityInterface>, \ReflectionClass>
*/
public function getDeclarations(): iterable;
}
46 changes: 46 additions & 0 deletions src/DeclarationWorkerResolver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

declare(strict_types=1);

namespace Spiral\TemporalBridge;

use Spiral\Attributes\ReaderInterface;
use Spiral\TemporalBridge\Attribute\AssignWorker;
use Spiral\TemporalBridge\Config\TemporalConfig;

final class DeclarationWorkerResolver
{
public function __construct(
private readonly ReaderInterface $reader,
private readonly TemporalConfig $config,
) {
}

/**
* Find the worker name for the given workflow or class declaration. If no worker is assigned, the default worker
* name is returned.
*/
public function resolve(\ReflectionClass $declaration): string
{
return $this->resolveQueueName($declaration) ?? $this->config->getDefaultWorker();
}

private function resolveQueueName(\ReflectionClass $declaration): ?string
{
$assignWorker = $this->reader->firstClassMetadata($declaration, AssignWorker::class);

if ($assignWorker !== null) {
return $assignWorker->name;
}

$parents = $declaration->getInterfaceNames();
foreach ($parents as $parent) {
$queueName = $this->resolveQueueName(new \ReflectionClass($parent));
if ($queueName !== null) {
return $queueName;
}
}

return null;
}
}
29 changes: 4 additions & 25 deletions src/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@
namespace Spiral\TemporalBridge;

use ReflectionClass;
use Spiral\Attributes\ReaderInterface;
use Spiral\Boot\DispatcherInterface;
use Spiral\Core\Container;
use Spiral\RoadRunnerBridge\RoadRunnerMode;
use Spiral\TemporalBridge\Attribute\AssignWorker;
use Spiral\TemporalBridge\Config\TemporalConfig;
use Temporal\Activity\ActivityInterface;
use Temporal\Worker\WorkerFactoryInterface;
use Temporal\Workflow\WorkflowInterface;
Expand All @@ -19,9 +16,8 @@ final class Dispatcher implements DispatcherInterface
{
public function __construct(
private readonly RoadRunnerMode $mode,
private readonly ReaderInterface $reader,
private readonly TemporalConfig $config,
private readonly Container $container,
private readonly DeclarationWorkerResolver $workerResolver,
) {
}

Expand All @@ -39,13 +35,15 @@ public function serve(): void
$declarations = $this->container->get(DeclarationLocatorInterface::class)->getDeclarations();

// factory initiates and runs task queue specific activity and workflow workers
/** @var WorkerFactoryInterface $factory */
$factory = $this->container->get(WorkerFactoryInterface::class);
/** @var WorkersRegistryInterface $registry */
$registry = $this->container->get(WorkersRegistryInterface::class);

$hasDeclarations = false;
foreach ($declarations as $type => $declaration) {
// Worker that listens on a task queue and hosts both workflow and activity implementations.
$queueName = $this->resolveQueueName($declaration) ?? $this->config->getDefaultWorker();
$queueName = $this->workerResolver->resolve($declaration);

$worker = $registry->get($queueName);

Expand All @@ -71,23 +69,4 @@ public function serve(): void
// start primary loop
$factory->run();
}

private function resolveQueueName(\ReflectionClass $declaration): ?string
{
$assignWorker = $this->reader->firstClassMetadata($declaration, AssignWorker::class);

if ($assignWorker !== null) {
return $assignWorker->name;
}

$parents = $declaration->getInterfaceNames();
foreach ($parents as $parent) {
$queueName = $this->resolveQueueName(new \ReflectionClass($parent));
if ($queueName !== null) {
return $queueName;
}
}

return null;
}
}
6 changes: 3 additions & 3 deletions src/WorkersRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ final class WorkersRegistry implements WorkersRegistryInterface
public function __construct(
private readonly WorkerFactoryInterface $workerFactory,
private readonly FinalizerInterface $finalizer,
private readonly TemporalConfig $config
private readonly TemporalConfig $config,
) {
}

Expand All @@ -30,7 +30,7 @@ public function register(string $name, ?WorkerOptions $options): void

if ($this->has($name)) {
throw new WorkersRegistryException(
\sprintf('Temporal worker with given name `%s` has already been registered.', $name)
\sprintf('Temporal worker with given name `%s` has already been registered.', $name),
);
}

Expand All @@ -44,7 +44,7 @@ public function get(string $name): WorkerInterface

$options = $this->config->getWorkers();

if (! $this->has($name)) {
if (!$this->has($name)) {
$this->register($name, $options[$name] ?? null);
}

Expand Down
7 changes: 4 additions & 3 deletions src/WorkersRegistryInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@
interface WorkersRegistryInterface
{
/**
* Register a new temporal worker with given task queue and options
* Register a new temporal worker with given task queue and options.
*
* @throws WorkersRegistryException
*/
public function register(string $name, ?WorkerOptions $options): void;

/**
* Get or create worker by task queue name
* Get or create worker by task queue name.
*/
public function get(string $name): WorkerInterface;

/**
* Check if a worker with given task queue name registered
* Check if a worker with given task queue name registered.
*/
public function has(string $name): bool;
}
Loading

0 comments on commit c8672f2

Please sign in to comment.