Skip to content

Commit

Permalink
Merge "rabbit: Add logging on blocked connection"
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenkins authored and openstack-gerrit committed Jun 10, 2015
2 parents 2e5ba45 + 1f8ccd3 commit 62e7280
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
17 changes: 15 additions & 2 deletions oslo_messaging/_drivers/impl_rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,12 @@ def __init__(self, conf, url, purpose):
login_method=self.login_method,
failover_strategy="shuffle",
heartbeat=self.heartbeat_timeout_threshold,
transport_options={'confirm_publish': True})
transport_options={
'confirm_publish': True,
'on_blocked': self._on_connection_blocked,
'on_unblocked': self._on_connection_unblocked,
},
)

LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)s'),
self.connection.info())
Expand Down Expand Up @@ -581,6 +586,14 @@ def _fetch_ssl_params(self):
return ssl_params or True
return False

@staticmethod
def _on_connection_blocked(reason):
LOG.error(_LE("The broker has blocked the connection: %s"), reason)

@staticmethod
def _on_connection_unblocked():
LOG.info(_LI("The broker has unblocked the connection"))

def ensure_connection(self):
self.ensure(method=lambda: True)

Expand Down Expand Up @@ -829,7 +842,7 @@ def declare_consumer(self, consumer):
def _connect_error(exc):
log_info = {'topic': consumer.routing_key, 'err_str': exc}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s"), log_info)
"%(err_str)s"), log_info)

def _declare_consumer():
consumer.declare(self)
Expand Down
4 changes: 3 additions & 1 deletion oslo_messaging/tests/drivers/test_impl_rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ def test_driver_load(self, connection_klass, fake_ensure):

transport._driver._get_connection()
connection_klass.assert_called_once_with(
'memory:///', transport_options={'confirm_publish': True},
'memory:///', transport_options={'confirm_publish': True,
'on_blocked': mock.ANY,
'on_unblocked': mock.ANY},
ssl=self.expected, login_method='AMQPLAIN',
heartbeat=0, failover_strategy="shuffle")

Expand Down

0 comments on commit 62e7280

Please sign in to comment.