Skip to content
This repository has been archived by the owner on Jul 15, 2021. It is now read-only.

HUB-406 RouterPublisher #6

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
## [Unreleased]
### Added
- `RouterPublisher` added for using `direct` and `topic` RabbitMQ exchanges with routing key.

## [2.1.0] - 2021-05-06
### Added
Expand Down
41 changes: 41 additions & 0 deletions Command/UpdateDefinitionCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
use Symfony\Component\Console\Output\OutputInterface;
use Wakeapp\Bundle\RabbitQueueBundle\Definition\DefinitionInterface;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\ExchangeEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Exception\RouteStructureException;

class UpdateDefinitionCommand extends Command
{
Expand Down Expand Up @@ -54,10 +56,49 @@ protected function configure(): void

/**
* {@inheritDoc}
*
* @throws RouteStructureException
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$routersToInit = [];
$initializedRouters = [];

foreach ($this->definitionList as $definition) {
if ($definition->getQueueType() & QueueTypeEnum::ROUTER) {
if (method_exists($definition, 'dependsOn') && !empty($definition->dependsOn())) {
$routersToInit[$definition::getQueueName()] = $definition;
} else {
$definition->init($this->connection);
$initializedRouters[] = $definition::getQueueName();
}
}
}

$successLoop = true;
while ($successLoop && !empty($routersToInit)) {
$successLoop = false;

foreach ($routersToInit as $router) {
if (empty(array_diff($router->dependsOn(), $initializedRouters))) {
$successLoop = true;
$router->init($this->connection);
unset($routersToInit[$router::getQueueName()]);
$initializedRouters[] = $router::getQueueName();
}
}
}

if (!$successLoop) {
throw new RouteStructureException('Router definitions have cyclic dependencies');
}


foreach ($this->definitionList as $definition) {
if ($definition->getQueueType() & QueueTypeEnum::ROUTER) {
continue;
}

$definition->init($this->connection);

$this->bindRetryExchange($definition);
Expand Down
1 change: 1 addition & 0 deletions Enum/QueueTypeEnum.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ class QueueTypeEnum
public const DELAY = 2;
public const REPLACE = 4;
public const DEDUPLICATE = 8;
public const ROUTER = 16;
}
9 changes: 9 additions & 0 deletions Exception/RouteStructureException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Wakeapp\Bundle\RabbitQueueBundle\Exception;

class RouteStructureException extends RabbitQueueException
{
}
4 changes: 2 additions & 2 deletions Producer/RabbitMqProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function __construct(
* @throws DefinitionNotFoundException
* @throws HydratorNotFoundException
*/
public function put(string $queueName, $data, array $options = []): void
public function put(string $queueName, $data, array $options = [], string $routingKey = ''): void
{
$dataString = $this->hydratorRegistry->getHydrator($this->hydratorName)->dehydrate($data);

Expand All @@ -44,6 +44,6 @@ public function put(string $queueName, $data, array $options = []): void

$publisher = $this->publisherRegistry->getPublisher($queueType);

$publisher->publish($definition, $dataString, $options);
$publisher->publish($definition, $dataString, $options, $routingKey);
}
}
2 changes: 1 addition & 1 deletion Producer/RabbitMqProducerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@

interface RabbitMqProducerInterface
{
public function put(string $queueName, $data, array $options = []);
public function put(string $queueName, $data, array $options = [], string $routingKey = '');
}
26 changes: 9 additions & 17 deletions Publisher/AbstractPublisher.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ public function __construct(RabbitMqClient $client, HydratorRegistry $hydratorRe

abstract protected function prepareOptions(DefinitionInterface $definition, array $options): array;

public function publish(DefinitionInterface $definition, string $dataString, array $options = []): void
public function publish(DefinitionInterface $definition, string $dataString, array $options = [], string $routingKey = ''): void
{
$exchangeName = $this->getDefinitionExchangeName($definition);
$queueName = $this->getDefinitionQueueName($definition);
$route = $routingKey !== '' ? $routingKey : $this->getDefinitionQueueName($definition);

$message = new AMQPMessage($dataString, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
Expand All @@ -45,32 +45,24 @@ public function publish(DefinitionInterface $definition, string $dataString, arr
$message->set('application_headers', new AMQPTable($amqpTableOptions));
}

$this->client->publish($message, $exchangeName, $queueName);
$this->client->publish($message, $exchangeName, $route);
}

abstract public static function getQueueType(): string;

protected function getDefinitionExchangeName(DefinitionInterface $definition): string
{
if ($definition->getQueueType() === (QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE)) {
return self::DEFAULT_NAME;
}

return $definition->getQueueType() === QueueTypeEnum::FIFO
? self::DEFAULT_NAME
: $definition->getEntryPointName()
return $definition->getQueueType() & (QueueTypeEnum::ROUTER | QueueTypeEnum::DELAY)
? $definition->getEntryPointName()
: self::DEFAULT_NAME
;
}

protected function getDefinitionQueueName(DefinitionInterface $definition): string
{
if ($definition->getQueueType() === (QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE)) {
return $definition::getQueueName();
}

return $definition->getQueueType() === QueueTypeEnum::FIFO
? $definition::getQueueName()
: self::DEFAULT_NAME
return $definition->getQueueType() & QueueTypeEnum::DELAY
? self::DEFAULT_NAME
: $definition::getQueueName()
;
}
}
2 changes: 1 addition & 1 deletion Publisher/PublisherInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ interface PublisherInterface
{
public const TAG = 'wakeapp_rabbit_queue.publisher';

public function publish(DefinitionInterface $definition, string $dataString, array $options = []): void;
public function publish(DefinitionInterface $definition, string $dataString, array $options = [], string $routingKey = ''): void;

public static function getQueueType(): string;
}
37 changes: 37 additions & 0 deletions Publisher/RouterPublisher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);

namespace Wakeapp\Bundle\RabbitQueueBundle\Publisher;

use Wakeapp\Bundle\RabbitQueueBundle\Definition\DefinitionInterface;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueHeaderOptionEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueOptionEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Exception\RabbitQueueException;

class RouterPublisher extends AbstractPublisher
{
public const QUEUE_TYPE = QueueTypeEnum::ROUTER;

protected function prepareOptions(DefinitionInterface $definition, array $options): array
{
$amqpTableOption = [];

if (isset($options[QueueOptionEnum::KEY])) {
$amqpTableOption[QueueHeaderOptionEnum::X_DEDUPLICATION_HEADER] = $options[QueueOptionEnum::KEY];
}

if (isset($options[QueueOptionEnum::DELAY])) {
$amqpTableOption[QueueHeaderOptionEnum::X_DELAY] = $options[QueueOptionEnum::DELAY] * 1000;
$amqpTableOption[QueueHeaderOptionEnum::X_CACHE_TTL] = $options[QueueOptionEnum::DELAY] * 1000;
}

return array_merge($amqpTableOption, $options);
}

public static function getQueueType(): string
{
return (string) self::QUEUE_TYPE;
}
}
160 changes: 158 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ Rabbit Queue Bundle
- [Шаг 2: Создание consumer'а](#шаг-2-создание-consumerа)
- [Шаг 3: Загрузка схем очередей RabbitMQ](#шаг-3-загрузка-схем-очередей-rabbitmq)
- [Шаг 4: Запуск consumer'а](#шаг-4-запуск-consumerа)
7. [Лицензия](#лицензия)
7. [Использование `RouterPublisher`](#использование-routerpublisher)
8. [Лицензия](#лицензия)

Требования
---------
Expand Down Expand Up @@ -118,11 +119,14 @@ $producer->put('queue_name', $data, $options);

Соответственно на каждый новый тип очереди требуется свой класс `Publisher` с кастомной логикой обработки/валидации и публикации сообщений в канал.

Бандл поддерживает следующие типы очередей:
Бандл поддерживает следующие типы очередей и обменников:
- FIFO
- Delay
- Deduplicate
- Deduplicate + Delay
- Router

Router используется для создания разветвленной топологии как описано [тут](https://www.rabbitmq.com/tutorials/tutorial-four-php.html) и [тут](https://www.rabbitmq.com/tutorials/tutorial-five-php.html)

При желании добавить собственный тип очереди, необходимо создать класс `Publisher` наследующий [AbstractPublisher](Publisher/AbstractPublisher.php) или реализующий [PublisherInterface](Publisher/PublisherInterface.php).

Expand Down Expand Up @@ -495,6 +499,158 @@ php bin/console rabbit:consumer:run example

Для просмотра списка всех зарегистрированных `consumer`'ов достаточно выполнить команду `rabbit:consumer:list`.

Использование `RouterPublisher`
--------

`RouterPublisher` следует использовать в случаях, когда нужно множество очередей, а каждое сообщение должно попадать
сразу в некоторое их подмножество, определяемое по `routingKey` сообщения. Для таких целей нужно создать `Definition`,
в котором будет определена только `exchange` типа `direct`, `topic` или `fanout`. Эта `Definition` будет использоваться
в качестве точки входя для сообщений. После этого нужно создать по одной `Definition` на каждую очередь, и все их
биндить на первую `Definition`. Можно создать сложную маршрутизацию, если вместо очередей создавать и биндить
`Definition` типа первой.

### Пример `Definition` с `exchange`:
```php
<?php

declare(strict_types=1);

namespace Wakeapp\Bundle\RabbitQueueBundle\Definition;

use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ExampleTopicExchangeDefinition implements DefinitionInterface
{
public const QUEUE_NAME = QueueEnum::EXAMPLE_TOPIC_EXCHENGE;
public const ENTRY_POINT = self::QUEUE_NAME;

/**
* {@inheritDoc}
*/
public function init(AMQPStreamConnection $connection): void
{
$channel = $connection->channel();

$channel->exchange_declare(
self::QUEUE_NAME,
'topic',
false,
true,
);
}

/**
* {@inheritDoc}
*/
public function getEntryPointName(): string
{
return self::ENTRY_POINT;
}

/**
* {@inheritDoc}
*/
public function getQueueType(): int
{
return QueueTypeEnum::ROUTER;
}

/**
* {@inheritDoc}
*/
public static function getQueueName(): string
{
return self::QUEUE_NAME;
}
}
```

### Пример `Definition` для очереди
```php
<?php

declare(strict_types=1);

namespace Wakeapp\Bundle\RabbitQueueBundle\Definition;

use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ExampleRoutedQueryDefinition implements DefinitionInterface
{
public const QUEUE_NAME = QueueEnum::EXAMPLE_ROUTED_FIFO;
public const ENTRY_POINT = QueueEnum::EXAMPLE_TOPIC_EXCHENGE; // это QUEUE_NAME из примера выше
public const ROUTING = [
'*.orange.*',
'big.#',
'*.black.car'
];

/**
* {@inheritDoc}
*/
public function init(AMQPStreamConnection $connection): void
{
$channel = $connection->channel();

$channel->queue_declare(
self::QUEUE_NAME,
false,
true,
false,
false
);

foreach (self::ROUTING as $route) {
$channel->queue_bind(self::QUEUE_NAME, self::ENTRY_POINT, $route); // биндим на exchange из первой Definition
}
}

/**
* {@inheritDoc}
*/
public function getEntryPointName(): string
{
return self::ENTRY_POINT;
}

/**
* {@inheritDoc}
*/
public function getQueueType(): int
{
return QueueTypeEnum::FIFO;
}

/**
* {@inheritDoc}
*/
public static function getQueueName(): string
{
return self::QUEUE_NAME;
}
}
```

После определения биржи и очередей отправка сообщений будет выглядеть как и раньше, но сообщения будут попадать в
очереди только при подходящем routingKey (четвертый параметр в методе put()).

```php
<?php
$data = ['message' => 'example']; # Сообщение
$options = [];

/** @var \Wakeapp\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer $producer */
$producer->put('queue_name', $data, $options, 'small.orange.bicycle'); // попадет в очередь по роуту '*.orange.*'
$producer->put('queue_name', $data, $options, 'big.aaa.bbb.and.more.words'); // попадет в очередь по роуту 'big.#'
$producer->put('queue_name', $data, $options, 'small.black.bicycle'); // НЕ попадет в очередь из примера
```

**Важно!!! Длина routeKey не должна превышать 255 символов**

Лицензия
--------

Expand Down
Loading