A Clojure RabbitMQ client.
Leiningen (via Clojars)
Bunnicula is framework for building asynchronous workflows with RabbitMQ.
It defines 4 components (based on Stuart Sierra's component lib)
- connection - representing a per-service instance RabbitMQ connection
- publisher - for publishing jobs to queues, usually 1 per service instance
- consumer - for consuming jobs from a queue, a service usually defines multiple ones, consuming jobs from multiple queues. Consumers can also run concurrently, e.g. spin up N consumer threads to pull jobs of the queue
- monitoring - a consumer monitoring component which reports all aspects of the consumer lifecycle (in basic form, it will log all consumer operations)
Bunnicula follows the RabbitMQ best practices, inspired by following blog posts.
Relevant practices and patterns implemented in Bunnicula:
- Channels and connections are opened/closed only at application startup time (or during auto-recovery)
- Publisher component maintains long running channel
- Channels are not shared between threads (as channels are not thread safe)
- Consuming channel is not used for publishing
- Consumers don't have an unlimited prefetch value (default value is set to 10)
- An ACK is invoked on the same channel on which the delivery was received
- All messages are persistent, all queues are durable
Bunnicula uses official RabbitMQ Java client for creating connections, consumers etc. As of version 4.0.0 of the Java client, automatic recovery is enabled by default. See more details here
By default we ensure your queue data will survive RabbitMQ server restart.
All queues defined by consumer component are durable. Publisher component publishes persistent message by default.
Connection component represents RabbitMQ connection. When component is started, new RabbitMQ connection is opened. When component is stopped, connection is closed!
Connection component is a required dependency by the publisher & consumer components, under the rmq-connection
key in your system.
- server params can be specified either via map full
url
key or by map withhost
,port
,username
andpassword
keys vhost
is always required to be present in configuration, default is "/"- optionally user can specify
connection-name
- this is a recommended setting if you have multiple applications connecting to the same RabbitMQ server - optionally you can set
secure?
flag to enable connecting to RabbitMQ server with TLS/SSL enabled
(require '[bunnicula.component.connection :as connection]
'[com.stuartsierra.component :as component])
(def connection (connection/create {:username "rabbit"
:password "password"
:host "127.0.0.1"
:port 5672
:vhost "/main"
:secure? true
:connection-name "connection-test"}))
;; connection specified by url
;; (def connection (connection/create {:url "amqp://rabbit:[email protected]:5672"
;; :vhost "/main"}))
(component/start connection)
Publisher component is used for publishing messages to the broker. When component is started, new channel is opened. When component is stopped, channel is closed.
The connection component is a required dependency, it has to be present under the rmq-connection
key in the system map.
exchange-name
the default exchange messages will be published toserializer
(optional) function used to serialize messages to RabbitMQ message body (bytes). Default function uses json-serialization. Seebunnicula.utils/json-serializer
for an example.
Publisher component implements publish method, which has multiple arity
(publish publisher routing-key message)
will publish message to default exchange with given routing-key (queue name)(publish publisher routing-key message options)
will publish message to default exchange with given routing-key and options(publish publisher exchange routing-key message options)
will publish message to specified exchange with given routing-key and options
Following options are supported for publishing
mandatory
flag to ensure message delivery, default falseexpiration
set message expirationpersistent
whether to persist message on disk, default true!
(require '[bunnicula.component.connection :as connection]
'[bunnicula.component.publisher :as publisher]
'[bunnicula.protocol :as protocol]
'[com.stuartsierra.component :as component])
(def connection (connection/create {:url "amqp://rabbit:[email protected]:5672"
:vhost "/main"}))
(def publisher (publisher/create {:exchange-name "my-exchange"}))
(def system (-> (component/system-map
:publisher (component/using
publisher
[:rmq-connection])
:rmq-connection connection)
component/start-system))
;; publish to 'my-exchange' exchange with routing-key 'some.queue'
(protocol/publish (:publisher system)
"some.queue"
{:integration_id 1 :message_id "123"})
;; publish to 'another-exchange' exchange with routing-key 'some.queue' and options map
(protocol/publish (:publisher system)
"another-exchange"
"some.queue"
{:integration_id 1 :message_id "123"}
{:mandatory true :persistent true})
For testing purposes or in development mode you can use the Mock Component.
Mock publisher component contains queues
atom which represents the RabbitMQ.
It is initiated as empty map and any time you publish message,
its value is added to queues
atom
(require '[bunnicula.component.publisher.mock :as mock]
'[bunnicula.protocol :as protocol]
'[com.stuartsierra.component :as component])
(def p (-> (mock/create) component/start))
(protocol/publish p
"some.queue"
{:integration_id 1 :message_id "123"})
(protocol/publish p
"some.queue"
{:integration_id 1 :message_id "456"})
(-> p :queues deref)
;; => {"some.queue" [{:integration_id 1, :message_id "456"}
;; {:integration_id 1, :message_id "123"}]}
Defines consumer with auto retry functionality!
The component is composed of a message handler (a Clojure function), consumer threads and channels. When processing fails it can be retried automatically with a fixed amount of times with a delay between each attempt. If processing still fails - messages will be pushed to an error queue for further inspection and manual retrying (dead lettering).
When the component starts it will create necessary exchanges and queues if they do not exist.
When component stops consumers are destroyed and channels are closed. All exchanges, queues and their messages will remain intact. Messages will be fetched again when the consumer reconnects as per the pre-fetch setting.
Processing message on consumer can result in one of following results
- success => ACK message on original queue
- hard failure => ACK message on original queue and push message to error queue
- recoverable failure => ACK message on original queue and push message to retry queue to be processed later
⚠️ If using non-default exchange, the main exchange used by regular work queues (not retry/error) has to be created before starting the consumer. Only retry and error exchanges are created by the consumer component.
Assume configured queue-name is some.queue
.
Following exchanges are declared when component is started
- retry exchange with name
some.queue-retry
- error exchange with name
some.queue-error
- dead letter exchange with name
some.queue-requeue
Following queues are declared when component is started
- main queue with name
some.queue
which is bound to the pre-configured exchange via 'some.queue' routing key (routing-key is usually the queue name) - retry queue with name
some.queue-retry
which is bound to retry exchange via '#' routing key, expired messages are sent to dead letter exchangesome.queue-requeue
- error queue with name
some.queue-error
which is bound to error exchange via '#' routing key
Note all queues are durable and publishing messages between queues ensures they are persisted on disk. This also means that all messages will survive the server and consumer restarts.
⚠️ You have to take care of garbage collecting failed messages - either reprocessing them again by moving the messages back to the main queue by using the RabbitMQ shovel plugin or purging the queue. Accumulating failed messages will offload them to the disk, but will eventually bloat RabbitMQ's memory!
The connection component and monitoring component are required dependencies. Both have to be present under :rmq-connection
and :monitoring
key in the component dependency list. You can use aliases to avoid name clashes.
handler
handler fn for consumer, holds the application logic, see more heredeserializer
(optional, default is json deserialization) function to be used to deserializer messagesoptions
queue-name
queue for consuming messagesexchange-name
exchange which queue will be bind to using queue-name as routing-key (exchange needs to be already created, you can use default RabbitMQ exchange""
)max-retries
(optional, default 3) how many times should be message retriedtimeout-seconds
(optional, default 60s) timeout for handler, if handler execution exceeds the timeout it will be terminated and treated as a retrybackoff-interval-seconds
(optional, default 60s) how long should message wait on retry queueconsumer-threads
(optional, default 4) how many consumers should be created, allows parallel processingprefetch-count
(optional, default 10) how many messages should be prefetched by each consumer
handler function takes 4 arguments
raw-body
raw message data, as received by the consumer - always sent asbytes
(a byte array)deserialized-body
parsed message, parsing is defined by the deserializer function - by default it's JSON. This is the payload used by the handlerenvelope
message envelopecomponents
- components which are specified as dependencies for the consumer
handler-fn is required to return one of following values
:bunnicula.consumer/ack -
message was processed successfully:bunnicula.consumer/retry
- recoverable failure => retry message automatically:bunnicula.consumer/error
- hard failure => no retry, send to dead-letter queue:bunnicula.consumer/timeout
- consumption timed out, will be retried but also logs/records timeout specific metrics
Note that you can use non-namespaced versions of these keywords - these will be the removed in v3.
:ack
:retry
:error
:timeout
⚠️ If it returns anything else than the keywords listed above it will be considered a failure and the job will be retried!
A simpler handler looks like this:
(defn handler-fn [raw-body deserialized-body envelope components]
;; ... some domain specific code ...
;; return supported response value
:bunnicula.consumer/ack)
The envelope is a map of:
{:routing-key "QUEUE-NAME",
:exchange "",
:redelivered? false,
:delivery-tag 1}
(see the JavaDoc for details)
In typical scenario you can ignore the raw body and envelope arguments, as focus on the business logic. Example handler for sending email notifications:
(defn send-email [_ {:keys [to subject content]}
_ {:keys [email-sender]}]
(email/send email-sender {:to to
:subject subject
:html content
:plain-text (util/make-plain-test content)})
:bunnicula.consumer/ack)
(require '[bunnicula.component.connection :as connection]
'[bunnicula.component.monitoring :as monitoring]
'[bunnicula.component.consumer-with-retry :as consumer]
'[com.stuartsierra.component :as component])
(defn import-conversation-handler
[body parsed envelope components]
(let [{:keys [integration_id message_id]} parsed]
;; ... import intercom conversation for given integration_id & message_id ...
;; need to return :bunnicula.consumer/ack, :bunnicula.consumer/error, :bunnicula.consumer/retry
:bunnicula.consumer/ack))
(def connection (connection/create {:url "amqp://rabbit:[email protected]:5672"
:vhost "/main"}))
(def consumer (consumer/create {:handler import-conversation-handler
:options {:queue-name "some.queue"
:exchange-name "my-exchange"
:timeout-seconds 120
:backoff-interval-seconds 60
:consumer-threads 4
:max-retries 3}}))
(def system (-> (component/system-map
:rmq-connection connection
:publisher (component/using
(publisher/create)
[:rmq-connection])
:monitoring (monitoring/create)
:consumer (component/using
consumer
[:rmq-connection :monitoring]))
component/start-system))
;; publish
(bunnicula.protocol/publish (:publisher system) "some.queue" {:integration_id 123 :message_id 2334})
Monitoring component is a required dependency for the consumer component (it has to be present under the monitoring key in the system map.)
Bunnicula provides a basic monitoring component. If you require more advanced monitoring functionality you can also implement your own.
The component needs to implement all methods from Monitoring protocol and support component lifecycle
You can also use a production-ready bunnicula.monitoring component, which will track consumer metrics and send those to StatsD and report exceptions to Rollbar.
Provides basic monitoring functionality for consumer component
It logs the result of consumer's handler
using clojure.tools.logging
.
You can completely override metrics and error reporting backends and call their APIs directly:
(ns bunnicula.monitoring.custom
(:require [com.stuartsierra.component :as component]
[clojure.tools.logging :as log]
[bunnicula.protocol :as protocol]
[xyz.component.raygun :as raygun]
[xyz.component.graphite :as graphite]))
(defrecord CustomMonitoring [consumer-name raygun graphite]
component/Lifecycle
(start [c]
(log/infof "start consumer-name=%s" consumer-name)
c)
(stop [c]
(log/infof "stop consumer-name=%s" consumer-name)
c)
protocol/Monitoring
(on-success [this args]
(log/infof "consumer=%s success" consumer-name)
(graphite/count graphite :success consumer-name))
(on-error [this args]
(log/errorf "consumer=%s error payload=%s"
consumer-name (log-fn (:message args)))
(graphite/count graphite :error consumer-name))
(on-timeout [this args]
(log/errorf "consumer=%s timeout payload=%s"
consumer-name (log-fn (:message args)))
(graphite/count graphite :timeout consumer-name))
(on-retry [this args]
(log/errorf "consumer=%s retry-attempts=%d payload=%s"
consumer-name (:retry-attempts args) (log-fn (:message args)))
(graphite/count graphite :retry consumer-name))
(on-exception [this args]
(let [{:keys [exception message]} args]
(log/errorf exception "consumer=%s exception payload=%s"
consumer-name (log-fn message))
(when exception-tracker
(tracker/report raygun exception)))
(graphite/count graphite :fail consumer-name)))
See a full example of a component system with a publisher, monitoring and a consumer.
- updated dependencies
- support for
amqps://
scheme andsecure?
flag to allow connecting to RabbitMQ servers with TLS enabled
- yanked, do not use
- more flexible configuration, with somewhat better option names
- updated dependencies
- support for namespaced keywords as return values of consumer handlers
- fixed reflection warnings
- documentation improvements
- reformatted & linted code
- update all dependencies
- make it work with Clojure 1.10
- potentially breaking change consumer config is now more strict and will throw exceptions if invalid configuration is passed
- Open source nomnom/bunnicula library