Skip to content

Commit

Permalink
Adding the ability to define default options in the pipeline config (#65
Browse files Browse the repository at this point in the history
)
  • Loading branch information
msmakouz authored Feb 24, 2023
1 parent 78d3418 commit 2d025b3
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 14 deletions.
16 changes: 16 additions & 0 deletions src/Queue/OptionsFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

use Spiral\Queue\ExtendedOptionsInterface;
use Spiral\Queue\OptionsInterface;
use Spiral\RoadRunner\Jobs\KafkaOptions;
use Spiral\RoadRunner\Jobs\Options;
use Spiral\RoadRunner\Jobs\OptionsInterface as JobsOptionsInterface;
use Spiral\RoadRunner\Jobs\Queue\CreateInfoInterface;
use Spiral\RoadRunner\Jobs\Queue\Driver;

/**
* @internal
Expand All @@ -34,4 +37,17 @@ public static function fromQueueOptions(OptionsInterface $from): JobsOptionsInte

return $options;
}

/**
* Creates specified options for the concrete driver, if needed.
*/
public static function fromCreateInfo(CreateInfoInterface $connector): ?JobsOptionsInterface
{
$config = $connector->toArray();

return match ($connector->getDriver()) {
Driver::KAFKA => new KafkaOptions($config['topic'] ?? 'default'),
default => null
};
}
}
27 changes: 19 additions & 8 deletions src/Queue/RPCPipelineRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
namespace Spiral\RoadRunnerBridge\Queue;

use Spiral\Queue\Exception\InvalidArgumentException;
use Spiral\RoadRunner\Jobs\Jobs;
use Spiral\RoadRunner\Jobs\JobsInterface;
use Spiral\RoadRunner\Jobs\OptionsInterface;
use Spiral\RoadRunner\Jobs\Queue\CreateInfoInterface;
use Spiral\RoadRunner\Jobs\QueueInterface;
use Spiral\RoadRunner\Jobs\Serializer\SerializerAwareInterface;
Expand All @@ -19,6 +21,7 @@ final class RPCPipelineRegistry implements PipelineRegistryInterface
private array $existPipelines = [];

/**
* @param Jobs|JobsInterface $jobs
* @param array<non-empty-string, array{connector: CreateInfoInterface, consume: bool}> $pipelines
* @param array<non-empty-string,non-empty-string> $aliases
* @param int $ttl Time to cache existing RoadRunner pipelines
Expand Down Expand Up @@ -61,12 +64,17 @@ public function getPipeline(string $name, string $jobType): QueueInterface
/** @var CreateInfoInterface $connector */
$connector = $this->pipelines[$name]['connector'];

if (! $this->isExists($connector)) {
/** @var ?OptionsInterface $options */
$options = OptionsFactory::create($this->pipelines[$name]['options'] ?? null)
?? OptionsFactory::fromCreateInfo($connector);
\assert($options instanceof OptionsInterface);

if (!$this->isExists($connector)) {
$consume = (bool)($this->pipelines[$name]['consume'] ?? true);
return $this->create($connector, $consume);
return $this->create($connector, $consume, $options);
}

return $this->connect($connector);
return $this->connect($connector, $options);
}

/**
Expand All @@ -87,10 +95,13 @@ private function isExists(CreateInfoInterface $connector): bool
/**
* Create a new RoadRunner jobs pipeline
*/
private function create(CreateInfoInterface $connector, bool $shouldBeConsumed = true): QueueInterface
{
private function create(
CreateInfoInterface $connector,
bool $shouldBeConsumed = true,
?OptionsInterface $options = null
): QueueInterface {
$this->expiresAt = 0;
$queue = $this->jobs->create($connector);
$queue = $this->jobs->create($connector, $options);
if ($shouldBeConsumed) {
$queue->resume();
}
Expand All @@ -101,8 +112,8 @@ private function create(CreateInfoInterface $connector, bool $shouldBeConsumed =
/**
* Connect to the RoadRunner jobs pipeline
*/
private function connect(CreateInfoInterface $connector): QueueInterface
private function connect(CreateInfoInterface $connector, ?OptionsInterface $options = null): QueueInterface
{
return $this->jobs->connect($connector->getName());
return $this->jobs->connect($connector->getName(), $options);
}
}
18 changes: 18 additions & 0 deletions tests/src/Queue/OptionsFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
namespace Spiral\Tests\Queue;

use Spiral\Queue\Options;
use Spiral\RoadRunner\Jobs\KafkaOptions;
use Spiral\RoadRunner\Jobs\Options as JobsOptions;
use Spiral\RoadRunner\Jobs\OptionsInterface as JobsOptionsInterface;
use PHPUnit\Framework\TestCase;
use Spiral\RoadRunner\Jobs\Queue\CreateInfoInterface;
use Spiral\RoadRunner\Jobs\Queue\KafkaCreateInfo;
use Spiral\RoadRunner\Jobs\Queue\MemoryCreateInfo;
use Spiral\RoadRunnerBridge\Queue\OptionsFactory;

final class OptionsFactoryTest extends TestCase
Expand All @@ -20,6 +24,14 @@ public function testCreate(?JobsOptionsInterface $expected, mixed $from): void
$this->assertEquals($expected, OptionsFactory::create($from));
}

/**
* @dataProvider fromCreateInfoDataProvider
*/
public function testFromCreateInfo(mixed $expected, CreateInfoInterface $createInfo): void
{
$this->assertEquals($expected, OptionsFactory::fromCreateInfo($createInfo));
}

public function createDataProvider(): \Traversable
{
yield [null, null];
Expand All @@ -34,4 +46,10 @@ public function createDataProvider(): \Traversable
(new JobsOptions())->withPriority(4)->withDelay(6)->withAutoAck(true),
];
}

public function fromCreateInfoDataProvider(): \Traversable
{
yield [null, new MemoryCreateInfo('')];
yield [new KafkaOptions('foo'), new KafkaCreateInfo('', 'foo')];
}
}
66 changes: 60 additions & 6 deletions tests/src/Queue/RPCPipelineRegistryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@

namespace Spiral\Tests\Queue;

use Hamcrest\Matchers;
use Mockery as m;
use Spiral\Queue\Exception\InvalidArgumentException;
use Spiral\Queue\Options;
use Spiral\RoadRunner\Jobs\Options as JobsOptions;
use Spiral\RoadRunner\Jobs\JobsInterface;
use Spiral\RoadRunner\Jobs\KafkaOptions;
use Spiral\RoadRunner\Jobs\Queue\CreateInfoInterface;
use Spiral\RoadRunner\Jobs\QueueInterface;
use Spiral\RoadRunnerBridge\Queue\JobsAdapterSerializer;
Expand All @@ -25,16 +29,24 @@ protected function setUp(): void
{
parent::setUp();

$this->memoryConnector = m::mock(CreateInfoInterface::class);
$this->memoryConnector->shouldReceive('toArray')->andReturn([]);
$this->memoryConnector->shouldReceive('getDriver')->andReturn('foo');

$this->localConnector = m::mock(CreateInfoInterface::class);
$this->localConnector->shouldReceive('toArray')->andReturn([]);
$this->localConnector->shouldReceive('getDriver')->andReturn('foo');

$this->registry = (new RPCPipelineRegistry(
$this->jobs = m::mock(JobsInterface::class),
$this->getContainer()->get(JobsAdapterSerializer::class),
[
'memory' => [
'connector' => $this->memoryConnector = m::mock(CreateInfoInterface::class),
'connector' => $this->memoryConnector,
'cunsume' => true,
],
'local' => [
'connector' => $this->localConnector = m::mock(CreateInfoInterface::class),
'connector' => $this->localConnector,
'consume' => false,
],
'without-connector' => [
Expand All @@ -44,6 +56,16 @@ protected function setUp(): void
'connector' => 'test',
'cunsume' => true,
],
'with-queue-options' => [
'connector' => $this->localConnector,
'cunsume' => true,
'options' => (new Options())->withDelay(5),
],
'with-jobs-options' => [
'connector' => $this->localConnector,
'cunsume' => true,
'options' => new KafkaOptions('foo', 100, 14),
],
],
[
'user-data' => 'memory',
Expand All @@ -60,7 +82,7 @@ public function testGetsExistsPipelineByNameShouldReturnQueue(): void
$this->jobs->shouldReceive('getIterator')->once()->andReturn(new \ArrayIterator(['local' => '']));
$this->jobs->shouldReceive('connect')
->once()
->with('local')
->with('local', null)
->andReturn($queue = m::mock(QueueInterface::class));

$this->assertInstanceOf(
Expand All @@ -69,14 +91,46 @@ public function testGetsExistsPipelineByNameShouldReturnQueue(): void
);
}

public function testDefaultQueueOptionsShouldBePassedAsJobsOptions(): void
{
$this->localConnector->shouldReceive('getName')->andReturn('with-queue-options');

$this->jobs->shouldReceive('getIterator')->once()->andReturn(new \ArrayIterator(['with-queue-options' => '']));
$this->jobs->shouldReceive('connect')
->once()
->with('with-queue-options', Matchers::equalTo(new JobsOptions(5)))
->andReturn(m::mock(QueueInterface::class));

$this->assertInstanceOf(
QueueInterface::class,
$this->registry->getPipeline('with-queue-options', 'some')
);
}

public function testDefaultJobsOptionsShouldBePassed(): void
{
$this->localConnector->shouldReceive('getName')->andReturn('with-jobs-options');

$this->jobs->shouldReceive('getIterator')->once()->andReturn(new \ArrayIterator(['with-jobs-options' => '']));
$this->jobs->shouldReceive('connect')
->once()
->with('with-jobs-options', Matchers::equalTo(new KafkaOptions('foo', 100, 14)))
->andReturn(m::mock(QueueInterface::class));

$this->assertInstanceOf(
QueueInterface::class,
$this->registry->getPipeline('with-jobs-options', 'some')
);
}

public function testGetsNonExistsPipelineByNameShouldCreateItAndReturnQueue(): void
{
$this->memoryConnector->shouldReceive('getName')->once()->andReturn('local');

$this->jobs->shouldReceive('getIterator')->once()->andReturn(new \ArrayIterator(['memory']));
$this->jobs->shouldReceive('create')
->once()
->with($this->memoryConnector)
->with($this->memoryConnector, null)
->andReturn($queue = m::mock(QueueInterface::class));

$queue->shouldReceive('resume')->once();
Expand All @@ -94,7 +148,7 @@ public function testGetsNonExistsPipelineByNameWithoutConsumingShouldCreateItAnd
$this->jobs->shouldReceive('getIterator')->once()->andReturn(new \ArrayIterator(['memory']));
$this->jobs->shouldReceive('create')
->once()
->with($this->localConnector)
->with($this->localConnector, null)
->andReturn($queue = m::mock(QueueInterface::class));

$this->assertInstanceOf(
Expand All @@ -110,7 +164,7 @@ public function testGetsExistsPipelineByAliasShouldReturnQueue(): void
$this->jobs->shouldReceive('getIterator')->once()->andReturn(new \ArrayIterator(['memory']));
$this->jobs->shouldReceive('create')
->once()
->with($this->memoryConnector)
->with($this->memoryConnector, null)
->andReturn($queue = m::mock(QueueInterface::class));

$queue->shouldReceive('resume')->once();
Expand Down

0 comments on commit 2d025b3

Please sign in to comment.