Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue1035 retry too much, reduce cpu to some extent... #1086

Merged
merged 7 commits into from
Jun 4, 2024
5 changes: 2 additions & 3 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def __repr__(self) -> str:
'identity_method': 'sha512',
'logMetrics': False,
'logStdout': False,
'metrics_writeInterval': 5,
'runStateThreshold_lag': 30,
'nodupe_driver': 'disk',
'nodupe_ttl': 0,
Expand Down Expand Up @@ -152,7 +153,7 @@ def __repr__(self) -> str:
float_options = [ ]

duration_options = [
'expire', 'housekeeping', 'logRotateInterval', 'message_ttl', 'fileAgeMax', 'fileAgeMin', \
'expire', 'housekeeping', 'logRotateInterval', 'message_ttl', 'fileAgeMax', 'fileAgeMin', 'metrics_writeInterval', \
'runStateThreshold_idle', 'runStateThreshold_lag', 'retry_ttl', 'runStateThreshold_hung', 'sleep', 'timeout', 'varTimeOffset'
]

Expand Down Expand Up @@ -264,8 +265,6 @@ def __repr__(self) -> str:
'msg_rename_dmf': [ 'callback', 'accept.renamedmf.RenameDMF'],
'msg_hour_tree': [ 'callback', 'accept.hourtree.HourTree'],
'msg_renamer': [ 'callback', 'accept.renamer.Renamer'],
'msg_2http': [ 'callback', 'accept.tohttp.ToHttp'],
'msg_2local': [ 'callback', 'accept.tolocal.ToLocal'],
'msg_http_to_https': [ 'callback', 'accept.httptohttps.HttpToHttps'],
'msg_speedo': [ 'callback', 'accept.speedo.Speedo'],
'msg_WMO_type_suffix': [ 'callback', 'accept.wmotypesuffix.WmoTypeSuffix'],
Expand Down
43 changes: 29 additions & 14 deletions sarracenia/diskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ def __init__(self, options, name):

# initialize ages and message counts

# msg_count is the number of messages available for retry in this interval
self.msg_count = 0

# msg_count_new is the number of messages added for retry in this interval
# ... new messages will only be available in the next interval.
self.msg_count_new = 0

if not os.path.isfile(self.queue_file):
Expand Down Expand Up @@ -204,7 +208,7 @@ def __len__(self) -> int:
Returns:
int: number of messages in the DiskQueue.
"""
return self.msg_count + self.msg_count_new
return max(0,self.msg_count) + self.msg_count_new

def msgFromJSON(self, line):
try:
Expand All @@ -223,27 +227,39 @@ def get(self, maximum_messages_to_get=1):
"""
qty number of messages to retrieve from the queue.

no housekeeping in get ...
if no message (and new or state file there)
we wait for housekeeping to present retry messages
"""

if self.msg_count < 0:
return []
elif self.msg_count == 0:
try:
os.unlink(self.queue_file)
self.queue_fp.close()
except Exception as ex:
pass

self.queue_fp=None
self.msg_count=-1
return []

ml = []
count = 0
while count < maximum_messages_to_get:

# if the retry queue is empty, no sense looping.
mx = self.msg_count if self.msg_count < maximum_messages_to_get else maximum_messages_to_get

while count < mx:
self.queue_fp, message = self.msg_get_from_file(
self.queue_fp, self.queue_file)

# FIXME MG as discussed with Peter
# no housekeeping in get ...
# if no message (and new or state file there)
# we wait for housekeeping to present retry messages
if not message:
try:
os.unlink(self.queue_file)
except:
pass
self.queue_fp = None
#logger.debug("MG DEBUG retry get return None")
break
self.msg_count=0
return

count += 1
if self.is_expired(message):
#logger.error("MG invalid %s" % message)
continue
Expand All @@ -253,7 +269,6 @@ def get(self, maximum_messages_to_get=1):
message['_deleteOnPost'].remove('ack_id')

ml.append(message)
count += 1

self.msg_count -= count

Expand Down
25 changes: 16 additions & 9 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def __init__(self, cfg=None):
self.plugins['load'].extend(self.o.destfn_scripts)

# metrics - dictionary with names of plugins as the keys
self.metrics_lastWrite=0
self.metricsFlowReset()

self.had_vip = not os.path.exists( self.o.novipFilename )
Expand Down Expand Up @@ -571,7 +572,7 @@ def run(self):

else: # normal processing, when you are active.
self.work()
self.post()
self.post(now)

now = nowflt()
run_time = now - start_time
Expand Down Expand Up @@ -1185,7 +1186,7 @@ def work(self) -> None:



def post(self) -> None:
def post(self,now) -> None:

if len(self.plugins["post"]) > 0:

Expand All @@ -1207,10 +1208,16 @@ def post(self) -> None:
self._runCallbacksWorklist('report')
self._runCallbackMetrics()

if hasattr(self.o, 'metricsFilename' ) and os.path.isdir(os.path.dirname(self.o.metricsFilename)):
if hasattr(self.o, 'metricsFilename' ) \
and now > self.metrics_lastWrite+self.o.metrics_writeInterval:

# assume dir always exist... should check on startup, not here.
# if os.path.isdir(os.path.dirname(self.o.metricsFilename)):
metrics=json.dumps(self.metrics)
with open(self.o.metricsFilename, 'w') as mfn:
mfn.write(metrics+"\n")
self.metrics_lastWrite=now

if self.o.logMetrics:
if self.o.logRotateInterval >= 24*60*60:
tslen=8
Expand All @@ -1222,12 +1229,12 @@ def post(self) -> None:
with open(self.o.metricsFilename + '.' + timestamp[0:tslen], 'a') as mfn:
mfn.write( f'\"{timestamp}\" : {metrics},\n')

# removing old metrics files
logger.info( f"looking for old metrics for {self.o.metricsFilename}" )
old_metrics=sorted(glob.glob(self.o.metricsFilename+'.*'))[0:-self.o.logRotateCount]
for o in old_metrics:
logger.info( f"removing old metrics file: {o} " )
os.unlink(o)
# removing old metrics files
logger.info( f"looking for old metrics for {self.o.metricsFilename}" )
old_metrics=sorted(glob.glob(self.o.metricsFilename+'.*'))[0:-self.o.logRotateCount]
for o in old_metrics:
logger.info( f"removing old metrics file: {o} " )
os.unlink(o)

self.worklist.ok = []
self.worklist.directories_ok = []
Expand Down
49 changes: 0 additions & 49 deletions sarracenia/flowcb/accept/tohttp.py

This file was deleted.

82 changes: 0 additions & 82 deletions sarracenia/flowcb/accept/tolocal.py

This file was deleted.

2 changes: 1 addition & 1 deletion sarracenia/flowcb/block_reassembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Block_reassembly(FlowCB):
equivalent to:


reject .*\.flufl_lock.*
reject .*\\.flufl_lock.*
reject .*§block_manifest§.*

callback_prepend block_reassembly
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flowcb/destfn/sample.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""

a destfn plugin script is used by senders or subscribers to do complex file naming.
this is an API demonstrator that prefixes the name delivered with 'renamed\_'::
this is an API demonstrator that prefixes the name delivered with 'renamed\\_'::

filename DESTFNSCRIPT=sarracenia.flowcb.destfn.sample.Sample

Expand Down
5 changes: 4 additions & 1 deletion sarracenia/flowcb/filter/pclean_f90.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ def after_accept(self, worklist):
if d[0] != ' '] # Diffs without context
logger.info("a: len(%s) = %d" % (f20_path, len(f20_lines)))
logger.info("b: len(%s) = %d" % (path, len(f_lines)))
logger.info("diffs found:\n{}".format("".join(diff)))
if len(f20_lines) > 10 or len(f_lines) > 10:
logger.info(" long diff omitted ")
else:
logger.info("diffs found:\n{}".format("".join(diff)))

if not result:
logger.info('queued for retry because propagation not done yet.')
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flowcb/poll/rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

Limits how frequenty a poll accesses a remote server.

\*This limits the number of lsdir requests made to the server. If the poll that you want to rate limit doesn't
\\*This limits the number of lsdir requests made to the server. If the poll that you want to rate limit doesn't
call ``sarracenia.flowcb.poll.Poll.poll_directory``, then it won't work.

Configurable Options:
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flowcb/poll/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def poll(self):
gathered_messages.append(m)
elif status_code == 403:
logger.error(
'''poll_usgs: USGS has determined your usage is excessive and \
'''poll_usgs: USGS has determined your usage is excessive and \
blocked your IP. Use the contact form on their site to be \
unblocked.''')
else:
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flowcb/scheduled/wiski.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def gather(self,messageCountMax): # placeholder
flow.o.scheduled_interval= 5
flow.o.pollUrl = "https://kiwis.opg.com"
if sys.platform.startswith( "win" ):
flow.o.directory = "C:\\temp\wiski"
flow.o.directory = "C:\\temp\\wiski"
else:
flow.o.directory = "/tmp/wiski/${%Y%m%d}"
logging.basicConfig(level=logging.DEBUG)
Expand Down
4 changes: 3 additions & 1 deletion tests/sarracenia/diskqueue_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def test_get__Single(tmp_path):
fp.write(line)
fp.flush()
fp.close()
download_retry.msg_count = 1

gotten = download_retry.get()

Expand All @@ -227,6 +228,7 @@ def test_get__Multi(tmp_path):
fp.write(line + line)
fp.flush()
fp.close()
download_retry.msg_count = 2

gotten = download_retry.get(2)

Expand Down Expand Up @@ -287,4 +289,4 @@ def test_on_housekeeping(tmp_path, caplog):

assert log_found_HasQueue == True
assert log_found_NumMessages == True
assert log_found_Elapsed == True
assert log_found_Elapsed == True
Loading
Loading