Skip to content

Latest commit

 

History

History
68 lines (50 loc) · 2.67 KB

Consuming.md

File metadata and controls

68 lines (50 loc) · 2.67 KB

Consuming

Your consumer service should implement Consumer interface so the bundle can register it as a consumer.

Acknowledgement is done through AcknowledgeOperation. You can acknowledge one message with its handle($message). In some cases, you might need to acknowledge multiple messages and once, you can use handleAll($lastMessage) for that.

There's RejectOperation for message rejection as well with handle($message) and handleAll($lastMessage).

use Bunny\Message;
use Cdn77\RabbitMQBundle\RabbitMQ\Consumer\Consumer;
use Cdn77\RabbitMQBundle\RabbitMQ\Consumer\Configuration;
use Cdn77\RabbitMQBundle\RabbitMQ\Operation\AcknowledgeOperation;
use Cdn77\RabbitMQBundle\RabbitMQ\Operation\RejectOperation;

final class ExampleConsumer implements Consumer
{
    private const QUEUE_NAME = 'some_queue';

    /** @var AcknowledgeOperation */
    private $acknowledgeOperation;

    /** @var RejectOperation */
    private $rejectOperation;

    public function __construct(AcknowledgeOperation $acknowledgeOperation, RejectOperation $rejectOperation)
    {
        $this->acknowledgeOperation = $acknowledgeOperation;
        $this->rejectOperation = $rejectOperation;
    }

    public function consume(Message $bunnyMessage) : void
    {
        $data = json_decode($bunnyMessage->content, true);
        
        try {
            // Do something with the message
        } catch(\Throwable $throwable) {
            $this->rejectOperation->handle($bunnyMessage);

            return;
        }

        $this->acknowledgeOperation->handle($bunnyMessage);
    }

    public function getConfiguration() : Configuration
    {
        $prefetchCount = 1;
        $prefetchSize = 0;
        $maxMessages = 100;
        $maxSeconds = 1000;

        return new ConsumerConfiguration(self::QUEUE_NAME, $prefetchCount, $prefetchSize, $maxMessages, $maxSeconds);
    }

    public function getName() : string
    {
        return 'example_consumer';
    }
}

Consumer configuration is done via getConfiguration() that returns instance of Configuration.

maxMessages & maxSeconds parameters are recommended to set to something else than null so consumer will shutdown gracefully after specified number of messages is consumed or seconds elapsed so it can start with fresh memory again.

Consumer is registered under the name specified in getName() method. You can check whether it is successfully registered through debug:rabbitmq:consumers command. It can be run with rabbitmq:consumer:run example_consumer