Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1173: problems stopping instances #1178

Merged
merged 6 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,21 @@ def reject(self, m, code, reason) -> None:
self.worklist.rejected.append(m)
m.setReport(code, reason)

def stop_request(self) -> None:
""" called by the signal handler to tell self and FlowCB classes to stop. Without this,
calling runCallbacksTime('please_stop') from inside self.please_stop causes an infinite loop.
Note: if this is called from outside of a signal handler, the interruptible_sleep function
won't work.
"""
logger.info(f'telling {len(self.plugins["please_stop"])} callbacks to please_stop.')
# this will call the please_stop method below, and other classes' please_stop methods
self.runCallbacksTime('please_stop')

def please_stop(self) -> None:
logger.info( f'ok, telling {len(self.plugins["please_stop"])} callbacks about it.')
logger.info(f'asked to stop')
self._stop_requested = True
self.metrics["flow"]['stop_requested'] = True



def close(self) -> None:

self.runCallbacksTime('on_stop')
Expand Down Expand Up @@ -562,7 +571,7 @@ def run(self):
self.close()
break
else:
logger.debug( 'starting last pass (without gather) through loop for cleanup.')
logger.debug('starting last pass (without gather) through loop for cleanup.')
stopping = True

self._run_vip_update()
Expand Down Expand Up @@ -604,7 +613,9 @@ def run(self):
run_time = now - start_time
total_messages += last_gather_len

# trigger shutdown when messageCountMax is reached
if (self.o.messageCountMax > 0) and (total_messages > self.o.messageCountMax):
logger.info(f'{total_messages} messages processed > messageCountMax {self.o.messageCountMax}')
self.runCallbacksTime('please_stop')

current_rate = total_messages / run_time
Expand All @@ -613,6 +624,7 @@ def run(self):
self.metrics['flow']['msgRate'] = current_rate
self.metrics['flow']['msgRateCpu'] = total_messages / (self.metrics['flow']['cpuTime']+self.metrics['flow']['last_housekeeping_cpuTime'] )

# trigger shutdown once gather is finished, where sleep < 0 (e.g. a post)
if (last_gather_len == 0) and (self.o.sleep < 0):
if (self.o.retryEmptyBeforeExit and "retry" in self.metrics
and self.metrics['retry']['msgs_in_post_retry'] > 0):
Expand Down Expand Up @@ -999,7 +1011,7 @@ def filter(self) -> None:

if self.o.fileAgeMin > 0 and age < self.o.fileAgeMin:
logger.warning( f"file too young: queueing for retry.")
self.worklist.failed.append(msg)
self.worklist.failed.append(m)
continue

if 'fileOp' in m and 'rename' in m['fileOp']:
Expand Down
8 changes: 8 additions & 0 deletions sarracenia/flowcb/gather/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,11 @@ def on_stop(self) -> None:
if hasattr(self,'consumer') and hasattr(self.consumer, 'close'):
self.consumer.close()
logger.info('closing')

def please_stop(self) -> None:
""" pass stop request along to consumer Moth instance(s)
"""
super().please_stop()
if hasattr(self,'consumer') and self.consumer:
logger.debug("asking Moth consumer to please_stop")
self.consumer.please_stop()
8 changes: 8 additions & 0 deletions sarracenia/flowcb/post/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,11 @@ def on_stop(self):
if hasattr(self,'poster') and self.poster:
self.poster.close()
logger.debug('closing')

def please_stop(self) -> None:
""" pass stop request along to publisher Moth instance(s)
"""
super().please_stop()
if hasattr(self, 'poster') and self.poster:
logger.debug("asking Moth publisher to please_stop")
self.poster.please_stop()
7 changes: 7 additions & 0 deletions sarracenia/flowcb/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,10 @@ def on_stop(self):
self.poster.close()
logger.info('closing')

def please_stop(self) -> None:
""" pass stop request along to publisher Moth instance(s)
"""
super().please_stop()
if hasattr(self, 'poster') and self.poster:
logger.debug("asking Moth publisher to please_stop")
self.poster.please_stop()
2 changes: 1 addition & 1 deletion sarracenia/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def stop_signal(self, signum, stack):
if line:
code.append(" %s" % (line.strip()))
logging.debug('\n'.join(code))
self.running_instance.please_stop()
self.running_instance.stop_request()

def start(self):
"""
Expand Down
32 changes: 32 additions & 0 deletions sarracenia/interruptible_sleep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Long running sleeps prevent Sarracenia from being shutdown cleanly in a
reasonable amount of time.
This implements a reusable sleep function that can be called from other parts
of the code to sleep for a long time, but can still be interrupted.
"""

import time

def interruptible_sleep(sleep_time:float, obj: object, stop_flag_name: str='_stop_requested', nap_time: float=5.0) -> bool:
""" Sleep for sleep_time, divided up into shorter nap_time intervals.
Pass a reference to an object that contains a boolean attribute named stop_flag_name.
Between each nap_time, the function will check if obj.stop_flag_name has become True.
If the flag is False, it will continue sleeping, if True, it will abort the sleep.
Args:
sleep_time (float): total amount of time to sleep, if not interrupted
obj (object): the object containing the boolean attribute named stop_flag_name
stop_flag_name (str): the name of the boolean attribute in obj
nap_time (float): default = 5.0, sleep in intervals of nap_time
Returns:
bool: ``True`` if the sleep **was** interrupted, ``False`` if it slept for the entire ``sleep_time``
time without interruption.
"""
interrupted = False
while sleep_time > 0 and not interrupted:
time.sleep(min(nap_time, sleep_time))
sleep_time -= nap_time
interrupted = (hasattr(obj, stop_flag_name) and getattr(obj, stop_flag_name))
return interrupted
9 changes: 8 additions & 1 deletion sarracenia/moth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def __init__(self, props=None, is_subscriber=True) -> None:

self.is_subscriber = is_subscriber
self.connected=False
self.please_stop = False
self._stop_requested = False
self.metrics = { 'connected': False }
self.metricsReset()

Expand Down Expand Up @@ -349,6 +349,13 @@ def newMessages(self) -> list:
"""
logger.error("NewMessages unimplemented")
return []

def please_stop(self) -> None:
""" register a request to cleanly stop. Any long running processes should check for _stop_requested and
stop if it becomes True.
"""
logger.info("asked to stop")
self._stop_requested = True

def putNewMessage(self, message:sarracenia.Message, content_type: str ='application/json', exchange: str = None) -> bool:
"""
Expand Down
37 changes: 7 additions & 30 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import sarracenia
from sarracenia.postformat import PostFormat
from sarracenia.moth import Moth
import signal
from sarracenia.interruptible_sleep import interruptible_sleep
import os

import time
Expand Down Expand Up @@ -182,7 +182,7 @@ def __init__(self, props, is_subscriber) -> None:
self.o.update(props)

self.first_setup = True
self.please_stop = False
self._stop_requested = False

me = "%s.%s" % (__class__.__module__, __class__.__name__)

Expand Down Expand Up @@ -242,10 +242,6 @@ def __connect(self, broker) -> bool:
self.channel = self.connection.channel(2)
return True

def _amqp_setup_signal_handler(self, signum, stack):
logger.info("ok, asked to stop")
self.please_stop=True

def metricsReport(self):

if 'no' in self.o and self.o['no'] < 2 and self.is_subscriber and self.connection and self.connection.connected:
Expand Down Expand Up @@ -327,14 +323,10 @@ def getSetup(self) -> None:
connect, declare queue, apply bindings.
"""
ebo = 1
original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self._amqp_setup_signal_handler)
signal.signal(signal.SIGTERM, self._amqp_setup_signal_handler)

while True:

if self.please_stop:
if self._stop_requested:
break

if 'broker' not in self.o or self.o['broker'] is None:
Expand Down Expand Up @@ -400,27 +392,18 @@ def getSetup(self) -> None:
if ebo < 60: ebo *= 2

logger.info("Sleeping {} seconds ...".format(ebo))
time.sleep(ebo)

signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGTERM, original_sigterm)
if self.please_stop:
os.kill(os.getpid(), signal.SIGINT)
interruptible_sleep(ebo, obj=self)

def putSetup(self) -> None:

ebo = 1
original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self._amqp_setup_signal_handler)
signal.signal(signal.SIGTERM, self._amqp_setup_signal_handler)

while True:

# It does not really matter how it fails, the recovery approach is always the same:
# tear the whole thing down, and start over.
try:
if self.please_stop:
if self._stop_requested:
break

if self.o['broker'] is None:
Expand Down Expand Up @@ -474,13 +457,7 @@ def putSetup(self) -> None:

self.close()
logger.info("Sleeping {} seconds ...".format(ebo))
time.sleep(ebo)

signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGTERM, original_sigterm)
if self.please_stop:
os.kill(os.getpid(), signal.SIGINT)

interruptible_sleep(ebo, obj=self)

def putCleanUp(self) -> None:

Expand Down Expand Up @@ -636,7 +613,7 @@ def ack(self, m: sarracenia.Message) -> None:
if ebo < 60:
ebo *= 2
logger.info("Sleeping {} seconds before re-trying ack...".format(ebo))
time.sleep(ebo)
interruptible_sleep(ebo, obj=self)
# TODO maybe implement message strategy stubborn here and give up after retrying?

def putNewMessage(self,
Expand Down
38 changes: 6 additions & 32 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import sarracenia
from sarracenia.postformat import PostFormat
from sarracenia.moth import Moth
import signal
import os
import ssl
import threading
Expand Down Expand Up @@ -342,25 +341,17 @@ def __clientSetup(self, cid) -> paho.mqtt.client.Client:
unquote(self.o['broker'].url.password))
return client

def _mqtt_setup_signal_handler(self, signum, stack):
logger.info("ok, asked to stop")
self.please_stop=True

def getSetup(self):
"""
Establish a connection to consume messages with.
"""
ebo = 1
original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self._mqtt_setup_signal_handler)
signal.signal(signal.SIGTERM, self._mqtt_setup_signal_handler)

something_broke = True
self.connected=False
while True:

if self.please_stop:
if self._stop_requested:
break

try:
Expand Down Expand Up @@ -390,7 +381,7 @@ def getSetup(self):
while (self.connect_in_progress) or (self.subscribe_in_progress > 0):
logger.info( f"waiting for subscription to be set up. (ebo={ebo})")
time.sleep(0.1*ebo)
if self.please_stop:
if self._stop_requested:
break
if ebo < 512 :
ebo *= 2
Expand All @@ -411,7 +402,7 @@ def getSetup(self):
while (self.connect_in_progress) or (self.subscribe_in_progress > 0):
logger.info( f"waiting ({ebo} seconds) for broker to confirm subscription is set up.")
logger.info( f"for {icid} connect_in_progress={self.connect_in_progress} subscribe_in_progress={self.subscribe_in_progress}" )
if self.please_stop:
if self._stop_requested:
break
if ebo < 60: ebo *= 2
decl_client.loop(ebo)
Expand All @@ -427,27 +418,16 @@ def getSetup(self):
if ebo < 60: ebo *= 2
time.sleep(ebo)

signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGTERM, original_sigterm)
if self.please_stop:
os.kill(os.getpid(), signal.SIGINT)



def putSetup(self):
"""
establish a connection to allow publishing.
"""
ebo = 1
original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self._mqtt_setup_signal_handler)
signal.signal(signal.SIGTERM, self._mqtt_setup_signal_handler)

self.connected=False
while True:

if self.please_stop:
if self._stop_requested:
break

try:
Expand Down Expand Up @@ -489,7 +469,7 @@ def putSetup(self):
while self.connect_in_progress:
logger.info( f"waiting for connection to {self.o['broker']} ebo={ebo}")
time.sleep(0.1*ebo)
if self.please_stop:
if self._stop_requested:
break
if ebo < 512:
ebo *= 2
Expand All @@ -504,12 +484,6 @@ def putSetup(self):
if ebo < 60: ebo *= 2
time.sleep(ebo)

signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGTERM, original_sigterm)
if self.please_stop:
os.kill(os.getpid(), signal.SIGINT)


def __sub_on_message(client, userdata, msg):
"""
callback to append messages received to new queue.
Expand Down Expand Up @@ -544,7 +518,7 @@ def getCleanUp(self):
clean_start=False, properties=props )
while self.connect_in_progress:
myclient.loop(0.1)
if self.please_stop:
if self._stop_requested:
break
myclient.disconnect()
logger.info( f"instance deletion for {i:02d} done" )
Expand Down
Loading
Loading