Skip to content

Commit

Permalink
Merge pull request #18 from spiral/feature/dynamic-pipelines
Browse files Browse the repository at this point in the history
Adds an ability to connect to a pipeline without configuration.
  • Loading branch information
butschster authored Jul 21, 2022
2 parents 2fce041 + 798d6cd commit fc1371c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 17 deletions.
5 changes: 4 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@
}
},
"config": {
"sort-packages": true
"sort-packages": true,
"allow-plugins": {
"spiral/composer-publish-plugin": true
}
},
"minimum-stability": "dev",
"prefer-stable": true
Expand Down
12 changes: 4 additions & 8 deletions src/Queue/RPCPipelineRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ public function getPipeline(string $name): QueueInterface
}

if (! isset($this->pipelines[$name])) {
throw new InvalidArgumentException(
sprintf('Queue pipeline with given name `%s` is not found.', $name)
);
return $this->jobs->connect($name);
}

if (! isset($this->pipelines[$name]['connector'])) {
Expand All @@ -63,15 +61,13 @@ public function getPipeline(string $name): QueueInterface

/** @var CreateInfoInterface $connector */
$connector = $this->pipelines[$name]['connector'];
$consume = (bool)($this->pipelines[$name]['consume'] ?? true);

if (! $this->isExists($connector)) {
$queue = $this->create($connector, $consume);
} else {
$queue = $this->connect($connector);
$consume = (bool)($this->pipelines[$name]['consume'] ?? true);
return $this->create($connector, $consume);
}

return $queue;
return $this->connect($connector);
}

/**
Expand Down
20 changes: 12 additions & 8 deletions tests/src/Queue/RPCPipelineRegistryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,24 @@ public function testGetsExistsPipelineByAliasShouldReturnQueue(): void
);
}

public function testGetsNonExistsPipelineShouldThrowAnException(): void
public function testGetsNonExistsPipelineShouldReturnQueue(): void
{
$this->expectException(InvalidArgumentException::class);
$this->expectErrorMessage('Queue pipeline with given name `test` is not found.');
$this->jobs->shouldReceive('connect')
->once()
->with('test')
->andReturn($queue = m::mock(QueueInterface::class));

$this->registry->getPipeline('test');
$this->assertSame($queue, $this->registry->getPipeline('test'));
}

public function testGetsNonExistsAliasPipelineShouldThrowAnException(): void
public function testGetsNonExistsAliasPipelineShouldReturnQueue(): void
{
$this->expectException(InvalidArgumentException::class);
$this->expectErrorMessage('Queue pipeline with given name `test` is not found.');
$this->jobs->shouldReceive('connect')
->once()
->with('test')
->andReturn($queue = m::mock(QueueInterface::class));

$this->registry->getPipeline('bad-alias');
$this->assertSame($queue, $this->registry->getPipeline('bad-alias'));
}

public function testGetsPipelineWithoutConnectorShouldThrowAnException(): void
Expand Down

0 comments on commit fc1371c

Please sign in to comment.