Skip to content

Commit

Permalink
Working on #407 - fixed a bug where putSetup doesn't close the connec…
Browse files Browse the repository at this point in the history
…tion, added retry to ack
  • Loading branch information
reidsunderland authored and petersilva committed Feb 23, 2022
1 parent 8bace1a commit bee974e
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ def __putSetup(self):

if ebo < 60: ebo *= 2

self.close()
logger.info("Sleeping {} seconds ...".format(ebo))
time.sleep(ebo)

Expand Down Expand Up @@ -427,14 +428,27 @@ def ack(self, m):
if not 'ack_id' in m:
return

try:
self.channel.basic_ack(m['ack_id'])
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
ebo = 1
while True:
try:
self.channel.basic_ack(m['ack_id'])
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
# Break loop if no exceptions encountered
return

except Exception as err:
logger.warning("failed for tag: %s: %s" % (m['ack_id'], err))
logger.debug('Exception details: ', exc_info=True)
except Exception as err:
logger.warning("failed for tag: %s: %s" % (m['ack_id'], err))
logger.debug('Exception details: ', exc_info=True)

# Cleanly close partially broken connection and restablish
self.close()
self.__putSetup()

if ebo < 60: ebo *= 2

logger.warning("WIP Sleeping {} seconds before re-trying ack...".format(ebo))
time.sleep(ebo)


def putNewMessage(self,
Expand Down

0 comments on commit bee974e

Please sign in to comment.