- It abstracts all difficulties associated with asynchronous messages publishing and processing.
- It guarantees reliable publication of messages - in case of failure in publication even one of the messages exception is thrown.
- It helps with passing unsuccessfully processed messages back to queue.
MQ_BROKER_HOST={rabbit host}
MQ_BROKER_PORT={rabbit port}
MQ_BROKER_USER={rabbit user}
MQ_BROKER_PASSWORD={rabbit password}
$asynchronousMessageChannel = AsynchronousMessageChannelFactory::withLogger($implementationOfPsrLoggerInterface);
Logger is instance of class implementing Psr\Log\LoggerInterface
, it will be used to log error which
can occur during message processing.
Message have to be instance of class implementing PublishableMessage
. Currently implementation is aligned
with RabbitMQ requirements so PublishableMessage
defines three methods:
body
- returns body of a message as stringroutingKey
- returns routing key which will be used by RabbitMQ to route message into proper queuesexchangeName
- returns RabbitMQ's exchange name to use for message publishing
You can use default implementation:
$asynchronousMessageChannel = AsynchronousMessageChannelFactory::withLogger($implementationOfPsrLoggerInterface);
$publishableMessage = BasicMessage::publishable($routingKeyForMessage, $exchangeNameForMessage, $messageBody);
$asynchronousMessageChannel->add([$publishableMessage]);
AsynchronousMessageChannel::add
receives array of PublishableMessage
s and publish them reliably in batch manner.
If it receives information about publishing failure from RabbitMQ MessagePublishingFailed
is thrown.
For messages processing implementation of MessageHandler
is needed, this interface defines only one method:
interface MessageHandler
{
/**
* @param ProcessableMessage $message
* @throws Throwable
* @throws MessageConstantlyUnprocessable
* @throws MessageTemporaryUnprocessable
*/
public function handle(ProcessableMessage $message): void;
}
As You can see handle
receives ProcessableMessage
as an only argument.
interface ProcessableMessage
{
public function body(): string;
}
Client code can decide what to do with processed message thorough implementation of handle
method :
- if
handle
method method throws any exception/throwable(excludingMessageConstantlyUnprocessable
)AsynchronousMessageChannel
willreject
message from RabbitMQ. When message is rejected it will be deleted from queue but You can configure RabbitMQ to usefallback queue
for that purposes and instruct broker to pass messages back fromfallback queue
to any other queue(probably the one to which message was originally added) with some delay. - if
handle
throwsMessageConstantlyUnprocessable
thenAsynchronousMessageChannel
informs RabbitMQ that message was processed successfully then RabbitMQ simply deletes the message.
- All exception thrown by
MessageHandler::handle
will be logged. - If message has been processed successfully
MessageHandler::handle
should not throw any exception. - If message processing failed but You don't want to receive that message again and log exception then throw
MessageConstantlyUnprocessable
insideMessageHandler::handle
. - If message processing failed but You want to receive that message again and log exception throw any
exception inside
MessageHandler::handle
(You can be more explicit and throwMessageTemporaryUnprocessable
). To receive message again You also have to configurefallback queue
for RabbitMQ.
To start processing messages You need and instance of AsynchronousMessageChannel
:
$asynchronousMessageChannel = AsynchronousMessageChannelFactory::withLogger($implementationOfPsrLoggerInterface);
And then You should use AsynchronousMessageChannel::startProcessingQueue
:
$asynchronousMessageChannel->startProcessingQueue($myImplementationOfMessageHandler, $nameOfRabbitMQQueueFromWhichMessagesWillBeProcessed);