Skip to content

Commit

Permalink
Merge pull request #1178 from MetPX/issue1173
Browse files Browse the repository at this point in the history
Fix #1173: problems stopping instances
  • Loading branch information
petersilva authored Aug 22, 2024
2 parents 8ce8619 + 56d2be7 commit a9334a7
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 68 deletions.
20 changes: 16 additions & 4 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,21 @@ def reject(self, m, code, reason) -> None:
self.worklist.rejected.append(m)
m.setReport(code, reason)

def stop_request(self) -> None:
""" called by the signal handler to tell self and FlowCB classes to stop. Without this,
calling runCallbacksTime('please_stop') from inside self.please_stop causes an infinite loop.
Note: if this is called from outside of a signal handler, the interruptible_sleep function
won't work.
"""
logger.info(f'telling {len(self.plugins["please_stop"])} callbacks to please_stop.')
# this will call the please_stop method below, and other classes' please_stop methods
self.runCallbacksTime('please_stop')

def please_stop(self) -> None:
logger.info( f'ok, telling {len(self.plugins["please_stop"])} callbacks about it.')
logger.info(f'asked to stop')
self._stop_requested = True
self.metrics["flow"]['stop_requested'] = True



def close(self) -> None:

self.runCallbacksTime('on_stop')
Expand Down Expand Up @@ -562,7 +571,7 @@ def run(self):
self.close()
break
else:
logger.debug( 'starting last pass (without gather) through loop for cleanup.')
logger.debug('starting last pass (without gather) through loop for cleanup.')
stopping = True

self._run_vip_update()
Expand Down Expand Up @@ -604,7 +613,9 @@ def run(self):
run_time = now - start_time
total_messages += last_gather_len

# trigger shutdown when messageCountMax is reached
if (self.o.messageCountMax > 0) and (total_messages > self.o.messageCountMax):
logger.info(f'{total_messages} messages processed > messageCountMax {self.o.messageCountMax}')
self.runCallbacksTime('please_stop')

current_rate = total_messages / run_time
Expand All @@ -613,6 +624,7 @@ def run(self):
self.metrics['flow']['msgRate'] = current_rate
self.metrics['flow']['msgRateCpu'] = total_messages / (self.metrics['flow']['cpuTime']+self.metrics['flow']['last_housekeeping_cpuTime'] )

# trigger shutdown once gather is finished, where sleep < 0 (e.g. a post)
if (last_gather_len == 0) and (self.o.sleep < 0):
if (self.o.retryEmptyBeforeExit and "retry" in self.metrics
and self.metrics['retry']['msgs_in_post_retry'] > 0):
Expand Down
8 changes: 8 additions & 0 deletions sarracenia/flowcb/gather/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,11 @@ def on_stop(self) -> None:
if hasattr(self,'consumer') and hasattr(self.consumer, 'close'):
self.consumer.close()
logger.info('closing')

def please_stop(self) -> None:
""" pass stop request along to consumer Moth instance(s)
"""
super().please_stop()
if hasattr(self,'consumer') and self.consumer:
logger.debug("asking Moth consumer to please_stop")
self.consumer.please_stop()
8 changes: 8 additions & 0 deletions sarracenia/flowcb/post/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,11 @@ def on_stop(self):
if hasattr(self,'poster') and self.poster:
self.poster.close()
logger.debug('closing')

def please_stop(self) -> None:
""" pass stop request along to publisher Moth instance(s)
"""
super().please_stop()
if hasattr(self, 'poster') and self.poster:
logger.debug("asking Moth publisher to please_stop")
self.poster.please_stop()
7 changes: 7 additions & 0 deletions sarracenia/flowcb/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,10 @@ def on_stop(self):
self.poster.close()
logger.info('closing')

def please_stop(self) -> None:
""" pass stop request along to publisher Moth instance(s)
"""
super().please_stop()
if hasattr(self, 'poster') and self.poster:
logger.debug("asking Moth publisher to please_stop")
self.poster.please_stop()
2 changes: 1 addition & 1 deletion sarracenia/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def stop_signal(self, signum, stack):
if line:
code.append(" %s" % (line.strip()))
logging.debug('\n'.join(code))
self.running_instance.please_stop()
self.running_instance.stop_request()

def start(self):
"""
Expand Down
32 changes: 32 additions & 0 deletions sarracenia/interruptible_sleep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Long running sleeps prevent Sarracenia from being shutdown cleanly in a
reasonable amount of time.
This implements a reusable sleep function that can be called from other parts
of the code to sleep for a long time, but can still be interrupted.
"""

import time

def interruptible_sleep(sleep_time:float, obj: object, stop_flag_name: str='_stop_requested', nap_time: float=5.0) -> bool:
""" Sleep for sleep_time, divided up into shorter nap_time intervals.
Pass a reference to an object that contains a boolean attribute named stop_flag_name.
Between each nap_time, the function will check if obj.stop_flag_name has become True.
If the flag is False, it will continue sleeping, if True, it will abort the sleep.
Args:
sleep_time (float): total amount of time to sleep, if not interrupted
obj (object): the object containing the boolean attribute named stop_flag_name
stop_flag_name (str): the name of the boolean attribute in obj
nap_time (float): default = 5.0, sleep in intervals of nap_time
Returns:
bool: ``True`` if the sleep **was** interrupted, ``False`` if it slept for the entire ``sleep_time``
time without interruption.
"""
interrupted = False
while sleep_time > 0 and not interrupted:
time.sleep(min(nap_time, sleep_time))
sleep_time -= nap_time
interrupted = (hasattr(obj, stop_flag_name) and getattr(obj, stop_flag_name))
return interrupted
9 changes: 8 additions & 1 deletion sarracenia/moth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def __init__(self, props=None, is_subscriber=True) -> None:

self.is_subscriber = is_subscriber
self.connected=False
self.please_stop = False
self._stop_requested = False
self.metrics = { 'connected': False }
self.metricsReset()

Expand Down Expand Up @@ -349,6 +349,13 @@ def newMessages(self) -> list:
"""
logger.error("NewMessages unimplemented")
return []

def please_stop(self) -> None:
""" register a request to cleanly stop. Any long running processes should check for _stop_requested and
stop if it becomes True.
"""
logger.info("asked to stop")
self._stop_requested = True

def putNewMessage(self, message:sarracenia.Message, content_type: str ='application/json', exchange: str = None) -> bool:
"""
Expand Down
37 changes: 7 additions & 30 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import sarracenia
from sarracenia.postformat import PostFormat
from sarracenia.moth import Moth
import signal
from sarracenia.interruptible_sleep import interruptible_sleep
import os

import time
Expand Down Expand Up @@ -182,7 +182,7 @@ def __init__(self, props, is_subscriber) -> None:
self.o.update(props)

self.first_setup = True
self.please_stop = False
self._stop_requested = False

me = "%s.%s" % (__class__.__module__, __class__.__name__)

Expand Down Expand Up @@ -242,10 +242,6 @@ def __connect(self, broker) -> bool:
self.channel = self.connection.channel(2)
return True

def _amqp_setup_signal_handler(self, signum, stack):
logger.info("ok, asked to stop")
self.please_stop=True

def metricsReport(self):

if 'no' in self.o and self.o['no'] < 2 and self.is_subscriber and self.connection and self.connection.connected:
Expand Down Expand Up @@ -327,14 +323,10 @@ def getSetup(self) -> None:
connect, declare queue, apply bindings.
"""
ebo = 1
original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self._amqp_setup_signal_handler)
signal.signal(signal.SIGTERM, self._amqp_setup_signal_handler)

while True:

if self.please_stop:
if self._stop_requested:
break

if 'broker' not in self.o or self.o['broker'] is None:
Expand Down Expand Up @@ -400,27 +392,18 @@ def getSetup(self) -> None:
if ebo < 60: ebo *= 2

logger.info("Sleeping {} seconds ...".format(ebo))
time.sleep(ebo)

signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGTERM, original_sigterm)
if self.please_stop:
os.kill(os.getpid(), signal.SIGINT)
interruptible_sleep(ebo, obj=self)

def putSetup(self) -> None:

ebo = 1
original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self._amqp_setup_signal_handler)
signal.signal(signal.SIGTERM, self._amqp_setup_signal_handler)

while True:

# It does not really matter how it fails, the recovery approach is always the same:
# tear the whole thing down, and start over.
try:
if self.please_stop:
if self._stop_requested:
break

if self.o['broker'] is None:
Expand Down Expand Up @@ -474,13 +457,7 @@ def putSetup(self) -> None:

self.close()
logger.info("Sleeping {} seconds ...".format(ebo))
time.sleep(ebo)

signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGTERM, original_sigterm)
if self.please_stop:
os.kill(os.getpid(), signal.SIGINT)

interruptible_sleep(ebo, obj=self)

def putCleanUp(self) -> None:

Expand Down Expand Up @@ -636,7 +613,7 @@ def ack(self, m: sarracenia.Message) -> None:
if ebo < 60:
ebo *= 2
logger.info("Sleeping {} seconds before re-trying ack...".format(ebo))
time.sleep(ebo)
interruptible_sleep(ebo, obj=self)
# TODO maybe implement message strategy stubborn here and give up after retrying?

def putNewMessage(self,
Expand Down
38 changes: 6 additions & 32 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import sarracenia
from sarracenia.postformat import PostFormat
from sarracenia.moth import Moth
import signal
import os
import ssl
import threading
Expand Down Expand Up @@ -342,25 +341,17 @@ def __clientSetup(self, cid) -> paho.mqtt.client.Client:
unquote(self.o['broker'].url.password))
return client

def _mqtt_setup_signal_handler(self, signum, stack):
logger.info("ok, asked to stop")
self.please_stop=True

def getSetup(self):
"""
Establish a connection to consume messages with.
"""
ebo = 1
original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self._mqtt_setup_signal_handler)
signal.signal(signal.SIGTERM, self._mqtt_setup_signal_handler)

something_broke = True
self.connected=False
while True:

if self.please_stop:
if self._stop_requested:
break

try:
Expand Down Expand Up @@ -390,7 +381,7 @@ def getSetup(self):
while (self.connect_in_progress) or (self.subscribe_in_progress > 0):
logger.info( f"waiting for subscription to be set up. (ebo={ebo})")
time.sleep(0.1*ebo)
if self.please_stop:
if self._stop_requested:
break
if ebo < 512 :
ebo *= 2
Expand All @@ -411,7 +402,7 @@ def getSetup(self):
while (self.connect_in_progress) or (self.subscribe_in_progress > 0):
logger.info( f"waiting ({ebo} seconds) for broker to confirm subscription is set up.")
logger.info( f"for {icid} connect_in_progress={self.connect_in_progress} subscribe_in_progress={self.subscribe_in_progress}" )
if self.please_stop:
if self._stop_requested:
break
if ebo < 60: ebo *= 2
decl_client.loop(ebo)
Expand All @@ -427,27 +418,16 @@ def getSetup(self):
if ebo < 60: ebo *= 2
time.sleep(ebo)

signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGTERM, original_sigterm)
if self.please_stop:
os.kill(os.getpid(), signal.SIGINT)



def putSetup(self):
"""
establish a connection to allow publishing.
"""
ebo = 1
original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self._mqtt_setup_signal_handler)
signal.signal(signal.SIGTERM, self._mqtt_setup_signal_handler)

self.connected=False
while True:

if self.please_stop:
if self._stop_requested:
break

try:
Expand Down Expand Up @@ -489,7 +469,7 @@ def putSetup(self):
while self.connect_in_progress:
logger.info( f"waiting for connection to {self.o['broker']} ebo={ebo}")
time.sleep(0.1*ebo)
if self.please_stop:
if self._stop_requested:
break
if ebo < 512:
ebo *= 2
Expand All @@ -504,12 +484,6 @@ def putSetup(self):
if ebo < 60: ebo *= 2
time.sleep(ebo)

signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGTERM, original_sigterm)
if self.please_stop:
os.kill(os.getpid(), signal.SIGINT)


def __sub_on_message(client, userdata, msg):
"""
callback to append messages received to new queue.
Expand Down Expand Up @@ -544,7 +518,7 @@ def getCleanUp(self):
clean_start=False, properties=props )
while self.connect_in_progress:
myclient.loop(0.1)
if self.please_stop:
if self._stop_requested:
break
myclient.disconnect()
logger.info( f"instance deletion for {i:02d} done" )
Expand Down
Loading

0 comments on commit a9334a7

Please sign in to comment.