This bundle is inspired from oldSoundRabbitMqBundle and will help you to build asynchronous services using RabbitMq message broker.
- Install [Composer] (http://symfony.com/doc/current/cookbook/composer.html)
- Run
composer require revinate/rabbitmq-bundle
and thencomposer update revinate/rabbitmq-bundle
- It is wise to anchor the version to latest tag rather than "dev-master"
- RabbitMq Concepts
- [Some other resources] (https://www.rabbitmq.com/how.html#quickstart) (Optional)
This bundle depends upon php-amqplib/php-amqplib. There is currently a crashing issue on shutdown with versions between 2.4 and 2.6.3. There is an underlying PHP bug that causes the crash. We have submitted a pull request to php-amqplib with a workaround.
Until our pull request goes through, you will need to add the following to your composer file:
"repositories": [
{
"type": "vcs",
"url": "https://github.com/revinate/php-amqplib.git"
}
]
-
brew install rabbitmq
to install RabbitMq -
Open
app/AppKernel.php
and add following line toregisterBundles()
methodnew Revinate\RabbitMqBundle\RevinateRabbitMqBundle()
-
Configure rabbitMq Connection config in
config_dev.yml
,config_prod.yml
andconfig_test.yml
.revinate_rabbit_mq: connections: myConnection: user: 'myuser' password: 'mypassword' host: '127.0.0.1' vhost: '/default'
-
Create a new config called
rabbitmq.yml
inapp/config
directory and add the import toapp/config/config.yml
:imports: - { resource: rabbitmq.yml }
-
Now setup Exchanges, Queues, Consumers and Producers. If you are not sure what those terms mean, please go back to reading material given above 😃
Here is a sample rabbitmq.yml file
revinate_rabbit_mq: exchanges: log.tx: { connection: myConnection } events.tx: { connection: myConnection } queues: error_log_q: { exchange: log.tx, routing_keys: ['error.log'] } producers: error_log: { exchange: log.tx } consumers: error_log: { queue: error_log_q, callback: revinate.rabbitmq.LogEventConsumer, batch_size: 10, buffer_wait: 400, idle_timeout: 2 }
Setup: Run this command
revinate:rabbitmq:setup
to create exchanges and queues in rabbitmq cluster. You can check127.0.0.1:15672
to view created exchanges and queues.-
Exchanges: Exchanges array defines all exchanges to be used. Here are the various properties an exchange can be defined with:
- connection: Connection to use. (Required)
- type: Can be
direct
,fanout
ortopic
. Default:topic
- passive: If set server raises an error if exchange is not declared. Default:
false
. - managed: If false, exchange is not managed by this bundle. Default:
true
. Set this option tofalse
if you are not responsible to create/delete this exchange. - durable: Durable exchanges remain active when a server restarts. Default:
true
- auto_delete: If set, the exchange is deleted when all queues have finished using it. Default:
false
- internal: If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Default:
false
- nowait: If set, the server will not respond to declare method. Default:
false
-
Queues: Queues array defines all queues to be used. Here are the various properties a queue can be defined with:
- exchange: Exchange to which this queue is bound to. (Required)
- routing_keys: Routing keys to which this queue should be bound to. Default:
#
. - passive: If set server raises an error if queue is not declared. Default:
false
. - exclusive: Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Default:
false
- managed: If false, queue is not managed by this bundle. Default:
true
. Set this option tofalse
if you are not responsible to create/delete this queue. - durable: Durable queues remain active when a server restarts. Default:
true
- auto_delete: If set, the queue is deleted when all consumers have finished using it. Default:
false
- nowait: If set, the server will not respond to declare method. Default:
false
- arguments: Queue specific arguments. Read up on Per Queue TTL and Deadletter Exchanges to checkout possible arguments.
-
Producers: Producers array defines all producers. Here are the various properties a producer can be defined with:
- exchange: Exchange to which this producer will send message to. (Required)
- encoder: Encoder service to use to encode messages. Default:
revinate.rabbit_mq.encoder.json
- You can define your own encoder by creating a service that implements
EncoderInterface
- You can define your own encoder by creating a service that implements
-
Consumers: Consumers array defines all message consumers. Here are the various properties a consumer can be defined with:
- queue: Queue from which this consumer should consume from. (Required)
- queues: Instead of a single queue, a consumer can consume from multiple queues. This can be an array of queues. Either
queue
orqueues
needs to be set. - callback: Callback service to call when message arrives. (Required)
- callbacks: If you have set
queues
, you need to setup callback for each of the queues. This can be an array of callbacks.- Callback service should either implement
ConsumerInterface
orBatchConsumerInterface
. - Callback service can implement
ContainerAwareInterface
in order to get access to Container.
- Callback service should either implement
- idle_timeout: If there is nothing in queue, consumer will quit in these many seconds. Default:
0
which means never. - qos_options: Following qos options are supported. [Great post on QO] Options(http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/)
- prefetch_size: Size limit of messages to prefetch. Default:
0
or no limit - prefetch_count: Number of messages to prefetch. Default:
0
or no limit- Note that for
batch consumer
,prefetch_count
is overridden to consumertarget
size in order to avoid certain unexpected batch processing behavior.
- Note that for
- global: If set to true, these settings are shared by all consumers. Default:
false
- prefetch_size: Size limit of messages to prefetch. Default:
- batch_size: Number of messages to process in bulk. If you use this option, your consumer callback should implement
BatchConsumerInterface
which accepts a batch of messages at a time. You can ack/nack these messages in one go. Useful if you want to do bulk operations on a set of messages. Default:null
or 1.- If
batch_size
is set,qos:prefetch_count
is set tobatch_size
- When batch consumer starts, the batch size starts from 1 and doubles every tick to reach
batch_size
. This is done in order to avoid cases when queue has messages less than thebatch_size
.
- If
- buffer_wait: Number of milliseconds to wait for buffer to get full before flushing. This should be set carefully. It should small enough that your consumer is not waiting for buffer to get full and large enough that you are processing
batch_size
number of messages. Default:1000ms
. - message_class: Custom Message class. You can extend
Message
class to define your own message class to use. Default:Message
- decoder: Decoder service to use to decode messages. Default:
revinate.rabbit_mq.decoder.json
- You can define your own decoder by creating a service that implements
DecoderInterface
- You can define your own decoder by creating a service that implements
-
-
Produce Messages
-
You can access your defined producers using
revinate_rabbit_mq.producer.<producer_name>
service syntax. -
Example:
error_log
producer defined above can be accessed as following:$producer = $this->getContainer()->get('revinate_rabbit_mq.producer.error_log')
-
Publish Messages:
$producer->publish("This is a log", "error.log");
-
Republish Messages:
$producer->rePublish(Revinate\RabbitMqBundle\Message\Message $message);
-
Dynamic Producer:
-
You can get a dynamic producer which can publish to multiple exchanges like this:
$producer = $this->getContainer()->get('revinate.rabbit_mq.base_producer'); $exchange1 = $this->getContainer()->get('revinate.rabbit_mq.exchange.log.tx'); $exchange2 = $this->getContainer()->get('revinate.rabbit_mq.exchange.events.tx'); // Set Exchange $producer->setExchange($exchange1); $producer->publish("log", "error.log"); // Switch Exchange $producer->setExchange($exchange2); $producer->publish("event", "error.event");
-
-
-
Consume Messages
-
Callback service should either implement
ConsumerInterface
orBatchConsumerInterface
. -
Callback service can implement
ContainerAwareInterface
in order to get access to Container. -
Consumer Example:
class LogEventConsumer implements ConsumerInterface, ContainerAwareInterface { /** @var ContainerInterface */ protected $container; /** * @param \Revinate\RabbitMqBundle\Message\Message $message * @return int[] */ public function execute($message) { print_r($message->getData()); return DeliveryResponse::MSG_ACK;; } }
-
Batch Consumer Example:
class LogEventConsumer implements BatchConsumerInterface, ContainerAwareInterface { /** @var ContainerInterface */ protected $container; /** * @param \Revinate\RabbitMqBundle\Message\Message[] $messages * @return int[] */ public function execute($messages) { $statuses = array(); foreach ($messages as $message) { print_r($message->getData()); $statuses[] = DeliveryResponse::MSG_ACK; } return $statuses; } /** * @param ContainerInterface $container */ public function setContainer(ContainerInterface $container = null) { $this->container = $container; } }
-
-
RPC
// Producer that publishes to the exchange server is listening on $producer = $this->getContainer()->get("revinate_rabbit_mq.producer.test_producer"); $rpcConsumer = new RPCConsumer($producer); // Client Request with message data and routing key that server listens on $queue = $rpcConsumer->call("RPC Message", "rpc.message"); // Server Reply (Done by Server) // Client Consume which blocks until timeout to receive response $rpcConsumer->consume($queue, 5 /* timeout */, function(Message $message) { // Process Response var_dump($message->getData()); });
-
Create a rabbitmq.yml file anywhere in your codebase. Here is a sample file:
revinate_rabbit_mq: connections: app: user: 'myuser' port: 5672 vhost: '/myhost' lazy: true environment: prod: connections: app: password: 'mypassword' host: 'prod.myrabbithost.net' test: connections: app: password: 'mypassword' host: 'test.myrabbithost.net' dev-local: connections: app: password: 'mypassword' host: '127.0.0.1' exchanges: log.tx: { connection: app } queues: error_log_q: { exchange: log.tx, routing_keys: ['error.log'] } producers: error_log: { exchange: log.tx } consumers: error_log: { queue: error_log_q, callback: LogEventConsumer, batch_size: 10, buffer_wait: 400, idle_timeout: 2 }
-
Setup Exchanges, Queues
php setup.php dev-local
-
Create a service container class called
RabbitMqServiceContainer
which will act like aContainer
from Symfony2class RabbitMqServiceContainer { public static function getInstance() { return ServiceContainer::getInstance(sfConfig::get('sf_environment'), __DIR__ . "/rabbitmq.yml"); } }
-
Producing Messages
$producer = RabbitMqServiceContainer::getInstance()->getProducer("error_log");
$producer->publish("This is a log", 'error.log');
- Consuming Messages
-
You can write a single cli script which can take a consumer name and work for all consumers in your config.
-
Consumer command line script:
// $consumerName and $prefetchCount can be taken as command line parameters $consumer = RabbitMqServiceContainer::getInstance()->getConsumer($consumerName); try { $consumer->consume($prefetchCount); } catch (PhpAmqpLib\Exception\AMQPTimeoutException $e) { ; } catch (PhpAmqpLib\Exception\AMQPIOWaitException $e) { ; }
-
A consumer callback is implemented the same way it is done for Symfony2.
-