Skip to content

Commit

Permalink
fix for #643, adding functional on_cleanup, on_declare, and on_sanity
Browse files Browse the repository at this point in the history
events. also fixes a problem introduced by previous branch where cleanup
was broken because xxxSetup routines were no longer called by init
  • Loading branch information
petersilva committed Jul 27, 2023
1 parent 2bf28d2 commit affe018
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 14 deletions.
10 changes: 5 additions & 5 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def _runCallbacksWorklist(self, entry_point):
logger.error( f'flowCallback plugin {p}/{entry_point} crashed: {ex}' )
logger.debug( "details:", exc_info=True )

def _runCallbacksTime(self, entry_point):
def runCallbacksTime(self, entry_point):
for p in self.plugins[entry_point]:
if self.o.logLevel.lower() == 'debug' :
p()
Expand Down Expand Up @@ -341,14 +341,14 @@ def please_stop(self) -> None:
logger.info(
f'ok, telling {len(self.plugins["please_stop"])} callbacks about it.'
)
self._runCallbacksTime('please_stop')
self.runCallbacksTime('please_stop')
self._stop_requested = True
self.metrics["flow"]['stop_requested'] = True


def close(self) -> None:

self._runCallbacksTime('on_stop')
self.runCallbacksTime('on_stop')
if os.path.exists( self.o.novipFilename ):
os.unlink( self.o.novipFilename )
logger.info(
Expand Down Expand Up @@ -405,7 +405,7 @@ def run(self):
if os.path.exists( self.o.novipFilename ):
os.unlink( self.o.novipFilename )

self._runCallbacksTime(f'on_start')
self.runCallbacksTime(f'on_start')

spamming = True
last_gather_len = 0
Expand Down Expand Up @@ -587,7 +587,7 @@ def run(self):
logger.info(
f'on_housekeeping pid: {os.getpid()} {self.o.component}/{self.o.config} instance: {self.o.no}'
)
self._runCallbacksTime('on_housekeeping')
self.runCallbacksTime('on_housekeeping')
self.metricsFlowReset()
self.metrics['flow']['last_housekeeping'] = now

Expand Down
12 changes: 10 additions & 2 deletions sarracenia/flowcb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
entry_points = [

'ack', 'after_accept', 'after_post', 'after_work', 'destfn', 'do_poll',
'download', 'gather', 'on_housekeeping', 'on_report', 'on_start', 'on_stop',
'poll', 'post', 'send', 'please_stop', 'metricsReport',
'download', 'gather', 'metricsReport', 'on_cleanup', 'on_declare', 'on_housekeeping', 'on_report',
'on_sanity', 'on_start', 'on_stop', 'please_stop', 'poll', 'post', 'send',

]

Expand Down Expand Up @@ -129,6 +129,14 @@ def metricsReport(self) -> dict:
Return a dictionary of metrics. Example: number of messages remaining in retry queues.
def on_cleanup(self) -> None::
allow plugins to perform additional work after broker resources are eliminated.
local state files are still present when this runs.
def on_declare(self) -> None::
local state files are still already present when this runs.
allow plugins to perform additional work besides broker resource setup.
def on_housekeeping(self) -> None::
do periodic processing.
Expand Down
6 changes: 6 additions & 0 deletions sarracenia/flowcb/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ def stats(self):
logger.info("lag: average: %.2f, maximum: %.2f " %
(self.lagTotal / self.msgCount, self.lagMax))

def on_cleanup(self):
logger.info("hello")

def on_declare(self):
logger.info("hello")

def on_stop(self):
if set(['on_stop']) & self.o.logEvents:
self.stats()
Expand Down
10 changes: 5 additions & 5 deletions sarracenia/flowcb/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ def __init__(self, options) -> None:

logger.debug('logLevel=%s' % self.o.logLevel)

def cleanup(self) -> None:
logger.debug('starting retry cleanup')
self.download_retry.cleanup()
self.post_retry.cleanup()

def after_accept(self, worklist) -> None:
"""
If there are only a few new messages, get some from the download retry queue and put them into
Expand Down Expand Up @@ -121,6 +116,11 @@ def metricsReport(self) -> dict:
"""
return {'msgs_in_download_retry': len(self.download_retry), 'msgs_in_post_retry': len(self.post_retry)}

def on_cleanup(self) -> None:
logger.debug('starting retry cleanup')
self.download_retry.cleanup()
self.post_retry.cleanup()

def on_housekeeping(self) -> None:
logger.info("on_housekeeping")

Expand Down
2 changes: 1 addition & 1 deletion sarracenia/redisqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def put(self, message_list):
logger.debug("rpush to list %s %s" % (self.key_name_new, message))
self.redis.rpush(self.key_name_new, self._msgToJSON(message))

def cleanup(self):
def on_cleanup(self):
"""
remove statefiles.
"""
Expand Down
67 changes: 66 additions & 1 deletion sarracenia/sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,28 @@ def declare(self):
qdc.getSetup()
qdc.close()

# run on_declare plugins.
for f in self.filtered_configurations:
if f == 'audit': continue
if self.please_stop:
break

(c, cfg) = f.split(os.sep)

if not 'options' in self.configs[c][cfg]:
continue

o = self.configs[c][cfg]['options']
o.no=0
o.finalize()
if c not in [ 'cpost', 'cpump' ]:
flow = sarracenia.flow.Flow.factory(o)
flow.loadCallbacks()
flow.runCallbacksTime('on_declare')
del flow
flow=None


def disable(self):
if len(self.filtered_configurations) == 0:
logging.error('%s configuration not found', self.leftovers)
Expand Down Expand Up @@ -1469,6 +1491,7 @@ def cleanup(self):
'queueName': o.resolved_qname,
'message_strategy': { 'stubborn':True }
})
qdc.getSetup()
qdc.getCleanUp()
qdc.close()
queues_to_delete.append((o.broker, o.resolved_qname))
Expand Down Expand Up @@ -1500,10 +1523,31 @@ def cleanup(self):
'message_strategy': { 'stubborn':True }
})
if qdc:
qdc.putSetup()
qdc.putCleanUp()
qdc.close()

self.user_cache_dir
# run on_cleanup plugins.
for f in self.filtered_configurations:
if f == 'audit': continue
if self.please_stop:
break

(c, cfg) = f.split(os.sep)

if not 'options' in self.configs[c][cfg]:
continue

o = self.configs[c][cfg]['options']
o.no=0
o.finalize()
if c not in [ 'cpost', 'cpump' ]:
flow = sarracenia.flow.Flow.factory(o)
flow.loadCallbacks()
flow.runCallbacksTime('on_cleanup')
del flow
flow=None

for f in self.filtered_configurations:
if self.please_stop:
break
Expand Down Expand Up @@ -1806,6 +1850,27 @@ def sanity(self):
if not sarracenia.extras[l]['present']:
print( f"notice: python module {l} is missing: {sarracenia.extras[l]['lament']}" )

# run on_sanity plugins.
for f in self.filtered_configurations:
if f == 'audit': continue
if self.please_stop:
break

(c, cfg) = f.split(os.sep)

if not 'options' in self.configs[c][cfg]:
continue

o = self.configs[c][cfg]['options']
o.no=0
o.finalize()
if c not in [ 'cpost', 'cpump' ]:
flow = sarracenia.flow.Flow.factory(o)
flow.loadCallbacks()
flow.runCallbacksTime('on_sanity')
del flow
flow=None

def start(self):
""" Starting all components
Expand Down

0 comments on commit affe018

Please sign in to comment.