From bee974e50b2236b883ed3449dd82bee1d0b5cf47 Mon Sep 17 00:00:00 2001 From: Reid Sunderland Date: Mon, 21 Feb 2022 12:55:52 -0600 Subject: [PATCH] Working on #407 - fixed a bug where putSetup doesn't close the connection, added retry to ack --- sarracenia/moth/amqp.py | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/sarracenia/moth/amqp.py b/sarracenia/moth/amqp.py index f353fb95d..9f816689a 100755 --- a/sarracenia/moth/amqp.py +++ b/sarracenia/moth/amqp.py @@ -342,6 +342,7 @@ def __putSetup(self): if ebo < 60: ebo *= 2 + self.close() logger.info("Sleeping {} seconds ...".format(ebo)) time.sleep(ebo) @@ -427,14 +428,27 @@ def ack(self, m): if not 'ack_id' in m: return - try: - self.channel.basic_ack(m['ack_id']) - del m['ack_id'] - m['_deleteOnPost'].remove('ack_id') + ebo = 1 + while True: + try: + self.channel.basic_ack(m['ack_id']) + del m['ack_id'] + m['_deleteOnPost'].remove('ack_id') + # Break loop if no exceptions encountered + return - except Exception as err: - logger.warning("failed for tag: %s: %s" % (m['ack_id'], err)) - logger.debug('Exception details: ', exc_info=True) + except Exception as err: + logger.warning("failed for tag: %s: %s" % (m['ack_id'], err)) + logger.debug('Exception details: ', exc_info=True) + + # Cleanly close partially broken connection and restablish + self.close() + self.__putSetup() + + if ebo < 60: ebo *= 2 + + logger.warning("WIP Sleeping {} seconds before re-trying ack...".format(ebo)) + time.sleep(ebo) def putNewMessage(self,