diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 7bc7c43e9..d02677956 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -477,7 +477,12 @@ def __init__(self, conf, url, purpose): login_method=self.login_method, failover_strategy="shuffle", heartbeat=self.heartbeat_timeout_threshold, - transport_options={'confirm_publish': True}) + transport_options={ + 'confirm_publish': True, + 'on_blocked': self._on_connection_blocked, + 'on_unblocked': self._on_connection_unblocked, + }, + ) LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)s'), self.connection.info()) @@ -581,6 +586,14 @@ def _fetch_ssl_params(self): return ssl_params or True return False + @staticmethod + def _on_connection_blocked(reason): + LOG.error(_LE("The broker has blocked the connection: %s"), reason) + + @staticmethod + def _on_connection_unblocked(): + LOG.info(_LI("The broker has unblocked the connection")) + def ensure_connection(self): self.ensure(method=lambda: True) @@ -829,7 +842,7 @@ def declare_consumer(self, consumer): def _connect_error(exc): log_info = {'topic': consumer.routing_key, 'err_str': exc} LOG.error(_("Failed to declare consumer for topic '%(topic)s': " - "%(err_str)s"), log_info) + "%(err_str)s"), log_info) def _declare_consumer(): consumer.declare(self) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 660d861ef..9b22a1a80 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -169,7 +169,9 @@ def test_driver_load(self, connection_klass, fake_ensure): transport._driver._get_connection() connection_klass.assert_called_once_with( - 'memory:///', transport_options={'confirm_publish': True}, + 'memory:///', transport_options={'confirm_publish': True, + 'on_blocked': mock.ANY, + 'on_unblocked': mock.ANY}, ssl=self.expected, login_method='AMQPLAIN', heartbeat=0, failover_strategy="shuffle")