diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index 6036449b3..0fa27b051 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -460,12 +460,21 @@ def reject(self, m, code, reason) -> None: self.worklist.rejected.append(m) m.setReport(code, reason) + def stop_request(self) -> None: + """ called by the signal handler to tell self and FlowCB classes to stop. Without this, + calling runCallbacksTime('please_stop') from inside self.please_stop causes an infinite loop. + Note: if this is called from outside of a signal handler, the interruptible_sleep function + won't work. + """ + logger.info(f'telling {len(self.plugins["please_stop"])} callbacks to please_stop.') + # this will call the please_stop method below, and other classes' please_stop methods + self.runCallbacksTime('please_stop') + def please_stop(self) -> None: - logger.info( f'ok, telling {len(self.plugins["please_stop"])} callbacks about it.') + logger.info(f'asked to stop') self._stop_requested = True self.metrics["flow"]['stop_requested'] = True - - + def close(self) -> None: self.runCallbacksTime('on_stop') @@ -562,7 +571,7 @@ def run(self): self.close() break else: - logger.debug( 'starting last pass (without gather) through loop for cleanup.') + logger.debug('starting last pass (without gather) through loop for cleanup.') stopping = True self._run_vip_update() @@ -604,7 +613,9 @@ def run(self): run_time = now - start_time total_messages += last_gather_len + # trigger shutdown when messageCountMax is reached if (self.o.messageCountMax > 0) and (total_messages > self.o.messageCountMax): + logger.info(f'{total_messages} messages processed > messageCountMax {self.o.messageCountMax}') self.runCallbacksTime('please_stop') current_rate = total_messages / run_time @@ -613,6 +624,7 @@ def run(self): self.metrics['flow']['msgRate'] = current_rate self.metrics['flow']['msgRateCpu'] = total_messages / (self.metrics['flow']['cpuTime']+self.metrics['flow']['last_housekeeping_cpuTime'] ) + # trigger shutdown once gather is finished, where sleep < 0 (e.g. a post) if (last_gather_len == 0) and (self.o.sleep < 0): if (self.o.retryEmptyBeforeExit and "retry" in self.metrics and self.metrics['retry']['msgs_in_post_retry'] > 0): @@ -999,7 +1011,7 @@ def filter(self) -> None: if self.o.fileAgeMin > 0 and age < self.o.fileAgeMin: logger.warning( f"file too young: queueing for retry.") - self.worklist.failed.append(msg) + self.worklist.failed.append(m) continue if 'fileOp' in m and 'rename' in m['fileOp']: diff --git a/sarracenia/flowcb/gather/message.py b/sarracenia/flowcb/gather/message.py index f6fcea4bb..5adcec4f1 100755 --- a/sarracenia/flowcb/gather/message.py +++ b/sarracenia/flowcb/gather/message.py @@ -74,3 +74,11 @@ def on_stop(self) -> None: if hasattr(self,'consumer') and hasattr(self.consumer, 'close'): self.consumer.close() logger.info('closing') + + def please_stop(self) -> None: + """ pass stop request along to consumer Moth instance(s) + """ + super().please_stop() + if hasattr(self,'consumer') and self.consumer: + logger.debug("asking Moth consumer to please_stop") + self.consumer.please_stop() diff --git a/sarracenia/flowcb/post/message.py b/sarracenia/flowcb/post/message.py index 07ee0dcfb..f30d78d5d 100755 --- a/sarracenia/flowcb/post/message.py +++ b/sarracenia/flowcb/post/message.py @@ -69,3 +69,11 @@ def on_stop(self): if hasattr(self,'poster') and self.poster: self.poster.close() logger.debug('closing') + + def please_stop(self) -> None: + """ pass stop request along to publisher Moth instance(s) + """ + super().please_stop() + if hasattr(self, 'poster') and self.poster: + logger.debug("asking Moth publisher to please_stop") + self.poster.please_stop() diff --git a/sarracenia/flowcb/report.py b/sarracenia/flowcb/report.py index dcea4b7bc..cc9402305 100755 --- a/sarracenia/flowcb/report.py +++ b/sarracenia/flowcb/report.py @@ -166,3 +166,10 @@ def on_stop(self): self.poster.close() logger.info('closing') + def please_stop(self) -> None: + """ pass stop request along to publisher Moth instance(s) + """ + super().please_stop() + if hasattr(self, 'poster') and self.poster: + logger.debug("asking Moth publisher to please_stop") + self.poster.please_stop() diff --git a/sarracenia/instance.py b/sarracenia/instance.py index b00e543d5..37c0e0fb4 100755 --- a/sarracenia/instance.py +++ b/sarracenia/instance.py @@ -62,7 +62,7 @@ def stop_signal(self, signum, stack): if line: code.append(" %s" % (line.strip())) logging.debug('\n'.join(code)) - self.running_instance.please_stop() + self.running_instance.stop_request() def start(self): """ diff --git a/sarracenia/interruptible_sleep.py b/sarracenia/interruptible_sleep.py new file mode 100644 index 000000000..831d0c2c9 --- /dev/null +++ b/sarracenia/interruptible_sleep.py @@ -0,0 +1,32 @@ +""" +Long running sleeps prevent Sarracenia from being shutdown cleanly in a +reasonable amount of time. + +This implements a reusable sleep function that can be called from other parts +of the code to sleep for a long time, but can still be interrupted. +""" + +import time + +def interruptible_sleep(sleep_time:float, obj: object, stop_flag_name: str='_stop_requested', nap_time: float=5.0) -> bool: + """ Sleep for sleep_time, divided up into shorter nap_time intervals. + Pass a reference to an object that contains a boolean attribute named stop_flag_name. + Between each nap_time, the function will check if obj.stop_flag_name has become True. + If the flag is False, it will continue sleeping, if True, it will abort the sleep. + + Args: + sleep_time (float): total amount of time to sleep, if not interrupted + obj (object): the object containing the boolean attribute named stop_flag_name + stop_flag_name (str): the name of the boolean attribute in obj + nap_time (float): default = 5.0, sleep in intervals of nap_time + + Returns: + bool: ``True`` if the sleep **was** interrupted, ``False`` if it slept for the entire ``sleep_time`` + time without interruption. + """ + interrupted = False + while sleep_time > 0 and not interrupted: + time.sleep(min(nap_time, sleep_time)) + sleep_time -= nap_time + interrupted = (hasattr(obj, stop_flag_name) and getattr(obj, stop_flag_name)) + return interrupted diff --git a/sarracenia/moth/__init__.py b/sarracenia/moth/__init__.py index f9cd1de85..489e9c722 100755 --- a/sarracenia/moth/__init__.py +++ b/sarracenia/moth/__init__.py @@ -280,7 +280,7 @@ def __init__(self, props=None, is_subscriber=True) -> None: self.is_subscriber = is_subscriber self.connected=False - self.please_stop = False + self._stop_requested = False self.metrics = { 'connected': False } self.metricsReset() @@ -349,6 +349,13 @@ def newMessages(self) -> list: """ logger.error("NewMessages unimplemented") return [] + + def please_stop(self) -> None: + """ register a request to cleanly stop. Any long running processes should check for _stop_requested and + stop if it becomes True. + """ + logger.info("asked to stop") + self._stop_requested = True def putNewMessage(self, message:sarracenia.Message, content_type: str ='application/json', exchange: str = None) -> bool: """ diff --git a/sarracenia/moth/amqp.py b/sarracenia/moth/amqp.py index bd3da8b85..9b23d11d2 100755 --- a/sarracenia/moth/amqp.py +++ b/sarracenia/moth/amqp.py @@ -34,7 +34,7 @@ import sarracenia from sarracenia.postformat import PostFormat from sarracenia.moth import Moth -import signal +from sarracenia.interruptible_sleep import interruptible_sleep import os import time @@ -182,7 +182,7 @@ def __init__(self, props, is_subscriber) -> None: self.o.update(props) self.first_setup = True - self.please_stop = False + self._stop_requested = False me = "%s.%s" % (__class__.__module__, __class__.__name__) @@ -242,10 +242,6 @@ def __connect(self, broker) -> bool: self.channel = self.connection.channel(2) return True - def _amqp_setup_signal_handler(self, signum, stack): - logger.info("ok, asked to stop") - self.please_stop=True - def metricsReport(self): if 'no' in self.o and self.o['no'] < 2 and self.is_subscriber and self.connection and self.connection.connected: @@ -327,14 +323,10 @@ def getSetup(self) -> None: connect, declare queue, apply bindings. """ ebo = 1 - original_sigint = signal.getsignal(signal.SIGINT) - original_sigterm = signal.getsignal(signal.SIGINT) - signal.signal(signal.SIGINT, self._amqp_setup_signal_handler) - signal.signal(signal.SIGTERM, self._amqp_setup_signal_handler) while True: - if self.please_stop: + if self._stop_requested: break if 'broker' not in self.o or self.o['broker'] is None: @@ -400,27 +392,18 @@ def getSetup(self) -> None: if ebo < 60: ebo *= 2 logger.info("Sleeping {} seconds ...".format(ebo)) - time.sleep(ebo) - - signal.signal(signal.SIGINT, original_sigint) - signal.signal(signal.SIGTERM, original_sigterm) - if self.please_stop: - os.kill(os.getpid(), signal.SIGINT) + interruptible_sleep(ebo, obj=self) def putSetup(self) -> None: ebo = 1 - original_sigint = signal.getsignal(signal.SIGINT) - original_sigterm = signal.getsignal(signal.SIGINT) - signal.signal(signal.SIGINT, self._amqp_setup_signal_handler) - signal.signal(signal.SIGTERM, self._amqp_setup_signal_handler) while True: # It does not really matter how it fails, the recovery approach is always the same: # tear the whole thing down, and start over. try: - if self.please_stop: + if self._stop_requested: break if self.o['broker'] is None: @@ -474,13 +457,7 @@ def putSetup(self) -> None: self.close() logger.info("Sleeping {} seconds ...".format(ebo)) - time.sleep(ebo) - - signal.signal(signal.SIGINT, original_sigint) - signal.signal(signal.SIGTERM, original_sigterm) - if self.please_stop: - os.kill(os.getpid(), signal.SIGINT) - + interruptible_sleep(ebo, obj=self) def putCleanUp(self) -> None: @@ -636,7 +613,7 @@ def ack(self, m: sarracenia.Message) -> None: if ebo < 60: ebo *= 2 logger.info("Sleeping {} seconds before re-trying ack...".format(ebo)) - time.sleep(ebo) + interruptible_sleep(ebo, obj=self) # TODO maybe implement message strategy stubborn here and give up after retrying? def putNewMessage(self, diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index 2e4555055..37a807a02 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -33,7 +33,6 @@ import sarracenia from sarracenia.postformat import PostFormat from sarracenia.moth import Moth -import signal import os import ssl import threading @@ -342,25 +341,17 @@ def __clientSetup(self, cid) -> paho.mqtt.client.Client: unquote(self.o['broker'].url.password)) return client - def _mqtt_setup_signal_handler(self, signum, stack): - logger.info("ok, asked to stop") - self.please_stop=True - def getSetup(self): """ Establish a connection to consume messages with. """ ebo = 1 - original_sigint = signal.getsignal(signal.SIGINT) - original_sigterm = signal.getsignal(signal.SIGINT) - signal.signal(signal.SIGINT, self._mqtt_setup_signal_handler) - signal.signal(signal.SIGTERM, self._mqtt_setup_signal_handler) something_broke = True self.connected=False while True: - if self.please_stop: + if self._stop_requested: break try: @@ -390,7 +381,7 @@ def getSetup(self): while (self.connect_in_progress) or (self.subscribe_in_progress > 0): logger.info( f"waiting for subscription to be set up. (ebo={ebo})") time.sleep(0.1*ebo) - if self.please_stop: + if self._stop_requested: break if ebo < 512 : ebo *= 2 @@ -411,7 +402,7 @@ def getSetup(self): while (self.connect_in_progress) or (self.subscribe_in_progress > 0): logger.info( f"waiting ({ebo} seconds) for broker to confirm subscription is set up.") logger.info( f"for {icid} connect_in_progress={self.connect_in_progress} subscribe_in_progress={self.subscribe_in_progress}" ) - if self.please_stop: + if self._stop_requested: break if ebo < 60: ebo *= 2 decl_client.loop(ebo) @@ -427,27 +418,16 @@ def getSetup(self): if ebo < 60: ebo *= 2 time.sleep(ebo) - signal.signal(signal.SIGINT, original_sigint) - signal.signal(signal.SIGTERM, original_sigterm) - if self.please_stop: - os.kill(os.getpid(), signal.SIGINT) - - - def putSetup(self): """ establish a connection to allow publishing. """ ebo = 1 - original_sigint = signal.getsignal(signal.SIGINT) - original_sigterm = signal.getsignal(signal.SIGINT) - signal.signal(signal.SIGINT, self._mqtt_setup_signal_handler) - signal.signal(signal.SIGTERM, self._mqtt_setup_signal_handler) self.connected=False while True: - if self.please_stop: + if self._stop_requested: break try: @@ -489,7 +469,7 @@ def putSetup(self): while self.connect_in_progress: logger.info( f"waiting for connection to {self.o['broker']} ebo={ebo}") time.sleep(0.1*ebo) - if self.please_stop: + if self._stop_requested: break if ebo < 512: ebo *= 2 @@ -504,12 +484,6 @@ def putSetup(self): if ebo < 60: ebo *= 2 time.sleep(ebo) - signal.signal(signal.SIGINT, original_sigint) - signal.signal(signal.SIGTERM, original_sigterm) - if self.please_stop: - os.kill(os.getpid(), signal.SIGINT) - - def __sub_on_message(client, userdata, msg): """ callback to append messages received to new queue. @@ -544,7 +518,7 @@ def getCleanUp(self): clean_start=False, properties=props ) while self.connect_in_progress: myclient.loop(0.1) - if self.please_stop: + if self._stop_requested: break myclient.disconnect() logger.info( f"instance deletion for {i:02d} done" ) diff --git a/tests/sarracenia/interruptible_sleep_test.py b/tests/sarracenia/interruptible_sleep_test.py new file mode 100644 index 000000000..e87825700 --- /dev/null +++ b/tests/sarracenia/interruptible_sleep_test.py @@ -0,0 +1,68 @@ +import pytest +from tests.conftest import * +#from unittest.mock import Mock + +import datetime +import os +import signal +import subprocess +from sarracenia.interruptible_sleep import interruptible_sleep + +class SleepThing(): + def __init__(self): + self._stop_requested = False + self.other_name = False + signal.signal(signal.SIGTERM, self.signal_handler) + signal.signal(signal.SIGINT, self.signal_handler) + + def signal_handler(self, signum, stack): + self._stop_requested = True + self.other_name = True + +def test_interruptible_sleep(): + st = SleepThing() + stime = 10 + + # Test that sleep sleeps for the right amount of time when not interrupted + before_time = datetime.datetime.now() + result = interruptible_sleep(stime, st) + after_time = datetime.datetime.now() + assert (result == False) + assert ( (after_time - before_time).seconds == stime) + + # Test that the sleep behaves correctly when interrupted + # send a SIGINT to this process after 5 seconds: + cmdline = f"""bash -c '/usr/bin/sleep 5; kill -SIGTERM {os.getpid()};' &""" + subprocess.run(cmdline, shell=True) + before_time = datetime.datetime.now() + result = interruptible_sleep(stime, st) + after_time = datetime.datetime.now() + assert result + assert ( (after_time - before_time).seconds == 5) + + # Test using a different nap_time + st = SleepThing() + # send a SIGINT to this process after 5 seconds: + cmdline = f"""bash -c '/usr/bin/sleep 5; kill -SIGTERM {os.getpid()};' &""" + subprocess.run(cmdline, shell=True) + before_time = datetime.datetime.now() + result = interruptible_sleep(stime, st, nap_time=1) + after_time = datetime.datetime.now() + assert result + assert ( (after_time - before_time).seconds == 5) + + + # Test using a different attribute name + st = SleepThing() + # send a SIGINT to this process after 5 seconds: + cmdline = f"""bash -c '/usr/bin/sleep 5; kill -SIGTERM {os.getpid()};' &""" + subprocess.run(cmdline, shell=True) + before_time = datetime.datetime.now() + result = interruptible_sleep(stime, st, stop_flag_name = 'other_name') + after_time = datetime.datetime.now() + assert result + assert ( (after_time - before_time).seconds == 5) + + + +