Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V03 issue643 - on_cleanup now works (needed by redisqueue ) #733

Merged
merged 11 commits into from
Jul 27, 2023
22 changes: 13 additions & 9 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ Peter Silva <[email protected]>
wrote none of the code in v2, except most (all?) plugins included with package.
added the 'plugin' model to v2. Main blame for sr3.


Reid Sunderland
Added SASL authentication and vhost support.
Extensive work getting flow tests working with sr3.
Expand All @@ -15,6 +14,10 @@ Reid Sunderland
André Leblanc
Amserver, am_sender

Greg Linton
Redis drivers for retry and nodupe plugins.
added unit testing framework for sr3, and many test clases.

Cléa Aumont
refactoring of FTP processing to fix timezone support.
porting many plugins from v2 to sr3. Resolved issued with xattr python modules
Expand All @@ -31,30 +34,31 @@ Michel Grenier <[email protected]> (Retired)
Retired now...
none of the code since sr3, but a lot was copy/pasted.

Jun Hu <[email protected]>
Jun Hu
Documentation Diagrams, lead on some deployments (head tester!)
plugin work.

Noureddine Habili <[email protected]>
Noureddine Habili
lead on other deployments ( project lead on Sarrasemina )
plugin work.

Benoit Lapointe <[email protected]>
Benoit Lapointe
Improving testing frameworks/ Design documentation / code hygiene.
Some MQTT interop work.
testing work was limited to v2, does not apply to sr3.
Some early MQTT interop work.

Khosrow Ebrahimpour <[email protected]> (no long involved.)
Khosrow Ebrahimpour
Packaging & Process (Debian, Launchpad, some pypi, the vagrant self-test)
originally proposed migration to github.

Daluma Sen <[email protected]>
Daluma Sen
some work on sr_watch, and worked on sr_post as well for caching.

Murray Rennie <[email protected]>
Murray Rennie
sr_winnow, worked on that with Michel.
lots of work on plugins and LDM bridge for UNIDATA and NOAAPORT ingest.

Dominic.Racette <[email protected]>
Dominic.Racette
actually a client, not really an author, but participation was crucial.
for five years of patient testing in a very large deployment scenario.

Expand Down
10 changes: 5 additions & 5 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def _runCallbacksWorklist(self, entry_point):
logger.error( f'flowCallback plugin {p}/{entry_point} crashed: {ex}' )
logger.debug( "details:", exc_info=True )

def _runCallbacksTime(self, entry_point):
def runCallbacksTime(self, entry_point):
for p in self.plugins[entry_point]:
if self.o.logLevel.lower() == 'debug' :
p()
Expand Down Expand Up @@ -341,14 +341,14 @@ def please_stop(self) -> None:
logger.info(
f'ok, telling {len(self.plugins["please_stop"])} callbacks about it.'
)
self._runCallbacksTime('please_stop')
self.runCallbacksTime('please_stop')
self._stop_requested = True
self.metrics["flow"]['stop_requested'] = True


def close(self) -> None:

self._runCallbacksTime('on_stop')
self.runCallbacksTime('on_stop')
if os.path.exists( self.o.novipFilename ):
os.unlink( self.o.novipFilename )
logger.info(
Expand Down Expand Up @@ -405,7 +405,7 @@ def run(self):
if os.path.exists( self.o.novipFilename ):
os.unlink( self.o.novipFilename )

self._runCallbacksTime(f'on_start')
self.runCallbacksTime(f'on_start')

spamming = True
last_gather_len = 0
Expand Down Expand Up @@ -587,7 +587,7 @@ def run(self):
logger.info(
f'on_housekeeping pid: {os.getpid()} {self.o.component}/{self.o.config} instance: {self.o.no}'
)
self._runCallbacksTime('on_housekeeping')
self.runCallbacksTime('on_housekeeping')
self.metricsFlowReset()
self.metrics['flow']['last_housekeeping'] = now

Expand Down
12 changes: 10 additions & 2 deletions sarracenia/flowcb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
entry_points = [

'ack', 'after_accept', 'after_post', 'after_work', 'destfn', 'do_poll',
'download', 'gather', 'on_housekeeping', 'on_report', 'on_start', 'on_stop',
'poll', 'post', 'send', 'please_stop', 'metricsReport',
'download', 'gather', 'metricsReport', 'on_cleanup', 'on_declare', 'on_housekeeping', 'on_report',
'on_sanity', 'on_start', 'on_stop', 'please_stop', 'poll', 'post', 'send',

]

Expand Down Expand Up @@ -129,6 +129,14 @@ def metricsReport(self) -> dict:

Return a dictionary of metrics. Example: number of messages remaining in retry queues.

def on_cleanup(self) -> None::
allow plugins to perform additional work after broker resources are eliminated.
local state files are still present when this runs.

def on_declare(self) -> None::
local state files are still already present when this runs.
allow plugins to perform additional work besides broker resource setup.

def on_housekeeping(self) -> None::

do periodic processing.
Expand Down
6 changes: 6 additions & 0 deletions sarracenia/flowcb/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ def stats(self):
logger.info("lag: average: %.2f, maximum: %.2f " %
(self.lagTotal / self.msgCount, self.lagMax))

def on_cleanup(self):
logger.info("hello")

def on_declare(self):
logger.info("hello")

def on_stop(self):
if set(['on_stop']) & self.o.logEvents:
self.stats()
Expand Down
10 changes: 5 additions & 5 deletions sarracenia/flowcb/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ def __init__(self, options) -> None:

logger.debug('logLevel=%s' % self.o.logLevel)

def cleanup(self) -> None:
logger.debug('starting retry cleanup')
self.download_retry.cleanup()
self.post_retry.cleanup()

def after_accept(self, worklist) -> None:
"""
If there are only a few new messages, get some from the download retry queue and put them into
Expand Down Expand Up @@ -121,6 +116,11 @@ def metricsReport(self) -> dict:
"""
return {'msgs_in_download_retry': len(self.download_retry), 'msgs_in_post_retry': len(self.post_retry)}

def on_cleanup(self) -> None:
logger.debug('starting retry cleanup')
self.download_retry.cleanup()
self.post_retry.cleanup()

def on_housekeeping(self) -> None:
logger.info("on_housekeeping")

Expand Down
2 changes: 1 addition & 1 deletion sarracenia/redisqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def put(self, message_list):
logger.debug("rpush to list %s %s" % (self.key_name_new, message))
self.redis.rpush(self.key_name_new, self._msgToJSON(message))

def cleanup(self):
def on_cleanup(self):
"""
remove statefiles.
"""
Expand Down
67 changes: 66 additions & 1 deletion sarracenia/sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,28 @@ def declare(self):
qdc.getSetup()
qdc.close()

# run on_declare plugins.
for f in self.filtered_configurations:
if f == 'audit': continue
if self.please_stop:
break

(c, cfg) = f.split(os.sep)

if not 'options' in self.configs[c][cfg]:
continue

o = self.configs[c][cfg]['options']
o.no=0
o.finalize()
if c not in [ 'cpost', 'cpump' ]:
flow = sarracenia.flow.Flow.factory(o)
flow.loadCallbacks()
flow.runCallbacksTime('on_declare')
del flow
flow=None


def disable(self):
if len(self.filtered_configurations) == 0:
logging.error('%s configuration not found', self.leftovers)
Expand Down Expand Up @@ -1469,6 +1491,7 @@ def cleanup(self):
'queueName': o.resolved_qname,
'message_strategy': { 'stubborn':True }
})
qdc.getSetup()
qdc.getCleanUp()
qdc.close()
queues_to_delete.append((o.broker, o.resolved_qname))
Expand Down Expand Up @@ -1500,10 +1523,31 @@ def cleanup(self):
'message_strategy': { 'stubborn':True }
})
if qdc:
qdc.putSetup()
qdc.putCleanUp()
qdc.close()

self.user_cache_dir
# run on_cleanup plugins.
for f in self.filtered_configurations:
if f == 'audit': continue
if self.please_stop:
break

(c, cfg) = f.split(os.sep)

if not 'options' in self.configs[c][cfg]:
continue

o = self.configs[c][cfg]['options']
o.no=0
o.finalize()
if c not in [ 'cpost', 'cpump' ]:
flow = sarracenia.flow.Flow.factory(o)
flow.loadCallbacks()
flow.runCallbacksTime('on_cleanup')
del flow
flow=None

for f in self.filtered_configurations:
if self.please_stop:
break
Expand Down Expand Up @@ -1806,6 +1850,27 @@ def sanity(self):
if not sarracenia.extras[l]['present']:
print( f"notice: python module {l} is missing: {sarracenia.extras[l]['lament']}" )

# run on_sanity plugins.
for f in self.filtered_configurations:
if f == 'audit': continue
if self.please_stop:
break

(c, cfg) = f.split(os.sep)

if not 'options' in self.configs[c][cfg]:
continue

o = self.configs[c][cfg]['options']
o.no=0
o.finalize()
if c not in [ 'cpost', 'cpump' ]:
flow = sarracenia.flow.Flow.factory(o)
flow.loadCallbacks()
flow.runCallbacksTime('on_sanity')
del flow
flow=None

def start(self):
""" Starting all components

Expand Down
Loading