Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restoring blockmanifest transfers #1110

Merged
merged 21 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f768412
modernizing old plugin
petersilva Jun 7, 2024
c236c58
adding definitions of new report codes so they are not unknown
petersilva Jun 7, 2024
82da104
adding documentation for logFormat
petersilva Jun 7, 2024
5e52145
sr3 --full status crash
petersilva Jun 7, 2024
88116ec
updating with recent changes
petersilva Jun 7, 2024
cbcd6ec
update changelog
petersilva Jun 7, 2024
7144f24
restore old fashioned rename header processing to wmotypesuffice, wil…
petersilva Jun 7, 2024
975bc5f
adjust set string parsing to deal with +- ... result of conversion
petersilva Jun 8, 2024
906aa04
get convert not to prepend a + to fileEvents if a sign is already pre…
petersilva Jun 8, 2024
ece3c99
adding a note about the existence,importance, and use of unit tests f…
petersilva Jun 11, 2024
dd7b75e
create full missing list when reading manifest for first time
petersilva Jun 11, 2024
6e050b7
separate block manifest management in block_reassembly vs not case
petersilva Jun 12, 2024
c23cc66
alternate name for block_reassembly
petersilva Jun 12, 2024
a4bdc7d
replace block_reassemble option with flow state var
petersilva Jun 12, 2024
4d3eaa1
aesthetic change of working variable name
petersilva Jun 12, 2024
b2e0bca
demoting some debug prints an obsolete comment.
petersilva Jun 12, 2024
ef14b70
fix crash removing manifest files
petersilva Jun 12, 2024
7ea2b29
adding 206 report code
petersilva Jun 12, 2024
8e22567
can only have one string per report code
petersilva Jun 12, 2024
d75ee14
set directory for removal of block manifest file
petersilva Jun 12, 2024
79d685e
reducing logging to reasonable level
petersilva Jun 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
metpx-sr3 (3.00.54) UNRELEASED; urgency=medium

* merge PR #1067 (closing #824 )
* messages reviewed & consolidated: #1094, #1099 (de-cluttering.)
* add http metadata to scheduled flows. #1084
* only run after_post when actually posted #1101
* when plugins go bad, report better, recover better: #1085, #1091,
* performance improvements #1083, #1086
* crashes/problems with statehost #1076, #1087, #1096
* fix #1104 message rate per cpu second and cpuS state.
* fix #1097 better parsing of low fractional rates.
* added logFormat option.
* sr3 status flow state detail improved.
* adds lag,rtry,slow,reje states to status display.
* adds checks for running process to cleanup, fail if running.
* adds cleanup to remove, don't remove if cleanup fails.
* adds progressive logs to transfers (closing #966)
* several fixes for sender crashes resulting from changes in try/except scope.
* several fixes for sender crashes resulting from changes in try/except
scope. #1091, #1095,
* add #1054 can now convert multiple configs, and overwrite (with --wololo)
* fixed #1064 poll crash.
* fixed #927 sanity not restarting crashed polls.
* bug fixes and unit tests for AM
* bug fixes and unit tests for AM ( #1036, #1074, #1078, #1079 )
* many unit tests added, improved coverage (though still poor.)
* many other plugin improvements.
* many other improvements and fixes in core and plugins.
* search function restored on web-site documentation.

-- SSC-5CD2310S60 <[email protected]> Fri, 17 May 2024 12:29:22 -0400
Expand Down
17 changes: 17 additions & 0 deletions docs/source/Contribution/Development.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ Both v2 and v3 are supported on the stable branch of sr_insects. That branch sh
used to support all development in both versions....


Unit tests
~~~~~~~~~~

The tests/ sub-directory contains a woefully incomplete but growing set of unit tests
using the *pytest* framework. These tests are only exercised on Ubuntu 22.04 at the moment.
consult tests/README.md for how to run them. Unit tests passing (or a very good explanation
of why they *temporarily* fail) should be another gate before merging to the main *development* branch.


Local Installation
------------------

Expand Down Expand Up @@ -291,6 +300,14 @@ multipass launch -m 8G bionic
'''

can run developer tests as per multipass as described above.
Need to edit:

* requirements.txt (remove paramiko and watchdog)
* setup.py (remove install_requires for paramiko and watchdog)

These packages have comedically complex dependencies. Install them separately
and they will be usable by sr3, but listing them as a requirement breaks things.


Python Wheel
~~~~~~~~~~~~
Expand Down
9 changes: 9 additions & 0 deletions docs/source/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,15 @@ other values: on_start, on_stop, post, gather, ... etc... It is comma separated,
if the list starts with a plus sign (+) then the selected events are appended to current value.
A minus signe (-) can be used to remove events from the set.

LogFormat ( default: %(asctime)s [%(levelname)s] %(name)s %(funcName)s %(message)s )
------------------------------------------------------------------------------------

The *LogFormat* option is passed directly to python logging mechanisms and can be used
to control what is written to log files. The format is documented here:

* https://docs.python.org/3/library/logging.html#logrecord-attributes


logLevel ( default: info )
--------------------------

Expand Down
9 changes: 9 additions & 0 deletions docs/source/fr/Contribution/Développement.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ Le référentiel sr_insects a sa propre base de données de problèmes, et le tr
Les versions 2 et 3 sont prises en charge sur la branche principale de sr_insects. Cette branche devrait être
utilisé pour prendre en charge tout le développement dans les deux versions...

Tests unitaires
~~~~~~~~~~

Le sous-répertoire tests/ contient un ensemble malheureusement incomplet mais croissant de tests unitaires
qui utilisent le framework *pytest*. Ces tests requiert Ubuntu 22.04 pour le moment.
Consultez tests/README.md pour savoir comment les exécuter. Réussite des tests unitaires (ou une très bonne explication
pourquoi ils échouent *temporairement*) devrait être une autre porte avant de fusionner avec la branche *developpement* principale.


Installation locale
-------------------

Expand Down
10 changes: 10 additions & 0 deletions docs/source/fr/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1037,8 +1037,18 @@ messages de journal. Autres valeurs : on_start, on_stop, post, gather, ... etc..
On peut débuter la valeur avec un plus (+) pour signifier un ajout au valeurs actuels.
la valeur moins (-) signifie la soustraction des valeurs de l´ensemble actuel.

LogFormat ( default: %(asctime)s [%(levelname)s] %(name)s %(funcName)s %(message)s )
------------------------------------------------------------------------------------

L'option *LogFormat* est passée directement au mécanismes de contrôle des journalisation
de python. Le format est documenté ici:

* https://docs.python.org/fr/3/library/logging.html#logrecord-attributes


logLevel ( défaut: info )
-------------------------

Niveau de journalisation exprimé par la journalisation de python. Les valeurs possibles sont :
critical, error, info, warning, debug.

Expand Down
19 changes: 12 additions & 7 deletions sarracenia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,18 +356,23 @@ def durationToSeconds(str_value, default=None) -> float:

return duration

"""
report codes are cribbed from HTTP, when a new situation arises, just peruse a list,
and pick one that fits. Should also be easier for others to use:

https://en.wikipedia.org/wiki/List_of_HTTP_status_codes

"""
known_report_codes = {
201:
"Download successful. (variations: Downloaded, Inserted, Published, Copied, or Linked)",
201: "Download successful. (variations: Downloaded, Inserted, Published, Copied, or Linked)",
202: "Accepted. mkdir skipped as it already exists",
203: "Non-Authoritative Information: transformed during download.",
205:
"Reset Content: truncated. File is shorter than originally expected (changed length during transfer) This only arises during multi-part transfers.",
205: "Reset Content: checksum recalculated on receipt.",
304:
"Not modified (Checksum validated, unchanged, so no download resulted.)",
206: "Partial Content: received and inserted.",
304: "Not modified (Checksum validated, unchanged, so no download resulted.)",
307: "Insertion deferred (writing to temporary part file for the moment.)",
417: "Expectation Failed: invalid notification message (corrupt headers)",
422: "Unprocessable Content: could not determine path to transfer to",
499: "Failure: Not Copied. SFTP/FTP/HTTP download problem",
#FIXME : should not have 503 error code 3 times in a row
# 503: "Service unavailable. delete (File removal not currently supported.)",
Expand Down Expand Up @@ -743,7 +748,7 @@ def setReport(msg, code, text=None):
text = 'unknown disposition'

if 'report' in msg:
logger.warning('overriding initial report: %d: %s' %
logger.debug('overriding initial report: %d: %s' %
(msg['report']['code'], msg['report']['message']))

msg['report'] = {'code': code, 'timeCompleted': nowstr(), 'message': text}
Expand Down
12 changes: 6 additions & 6 deletions sarracenia/blockmanifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,16 @@ def __init__(self,path):

self.lock.lock()

self.x = None
self.new_x = None
self.x = {}
self.new_x = {}

if os.path.exists(self.path):
self.fd = open(self.path,"r+")
s=self.fd.read()
self.x = json.loads(s)
try:
self.x = json.loads(s)
except Exception as ex:
pass

for k in ['manifest', 'waiting' ]:
if k in self.x:
Expand Down Expand Up @@ -127,12 +130,9 @@ def persist(self):
return

if self.new_x and (self.new_x != self.x):
logger.info( f"overwriting" )
self.fd.seek(0)
self.fd.write(json.dumps(self.new_x,sort_keys=True,indent=4))
self.fd.truncate()
else:
logger.info( f"closing unchanged" )

self.fd.close()
self.lock.unlock()
Expand Down
11 changes: 4 additions & 7 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def __repr__(self) -> str:
'batch' : 100,
'baseDir': None,
'baseUrl_relPath': False,
'block_reassemble': True,
'delete': False,
'documentRoot': None,
'download': False,
Expand Down Expand Up @@ -141,7 +140,7 @@ def __repr__(self) -> str:


# all the boolean settings.
flag_options = [ 'acceptSizeWrong', 'acceptUnmatched', 'amqp_consumer', 'baseUrl_relPath', 'block_reassemble', 'debug', \
flag_options = [ 'acceptSizeWrong', 'acceptUnmatched', 'amqp_consumer', 'baseUrl_relPath', 'debug', \
'delete', 'discard', 'download', 'dry_run', 'durable', 'exchangeDeclare', 'exchangeSplit', 'logReject', 'realpathFilter', \
'follow_symlinks', 'force_polling', 'inline', 'inlineOnly', 'inplace', 'logMetrics', 'logStdout', 'logReject', 'restore', \
'messageDebugDump', 'mirror', 'timeCopy', 'notify_only', 'overwrite', 'post_on_start', \
Expand Down Expand Up @@ -1042,11 +1041,10 @@ def _parse_set_string( self, v:str, old_value: set ) -> set:
if v == 'None':
sv=set([])
else:
if v[0] in [ '+', '-']:
op='r'
while v[0] in [ '+', '-']:
op=v[0]
v=v[1:]
else:
op='r'

if ',' in v:
sv=set(v.split(','))
Expand All @@ -1057,6 +1055,7 @@ def _parse_set_string( self, v:str, old_value: set ) -> set:
sv= old_value | sv
elif op == '-' :
sv= old_value - sv

return sv

def add_option(self, option, kind='list', default_value=None, all_values=None ):
Expand Down Expand Up @@ -1501,8 +1500,6 @@ def parse_line(self, component, cfg, cfname, lineno, l ):
if k == 'continue':
return

#FIXME: note for Clea, line conversion to v3 complete here.

line = list(map(lambda x: self._varsub(x), line))

if len(line) == 1:
Expand Down
13 changes: 8 additions & 5 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ def __init__(self, cfg=None):

self.plugins['load'].extend(self.o.destfn_scripts)

self.block_reassembly_active = 'block_reassembly' in self.plugins['load'] or \
'sarracenia.flowcb.block_reassembly' in self.plugins['load']

# metrics - dictionary with names of plugins as the keys
self.metrics_lastWrite=0
self.metricsFlowReset()
Expand Down Expand Up @@ -2680,11 +2683,11 @@ def set_local_file_attributes(self, local_file, msg):
logger.debug("%s" % local_file)

# if the file is not partitioned, the the onfly_checksum is for the whole file.
# cache it here, along with the mtime.

if ('blocks' in msg) and sarracenia.features['reassembly']['present']:
with sarracenia.blockmanifest.BlockManifest(local_file) as y:
y.set( msg['blocks'] )
# cache it here, along with the mtime, unless block_reassembly plugin is active...
if ('blocks' in msg) and sarracenia.features['reassembly']['present'] and not self.block_reassembly_active:
with sarracenia.blockmanifest.BlockManifest(local_file) as bm:
bm.set( msg['blocks'] )

x = sarracenia.filemetadata.FileMetadata(local_file)
# FIXME ... what to do when checksums don't match?
Expand Down
13 changes: 12 additions & 1 deletion sarracenia/flowcb/accept/wmotypesuffix.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def __find_type(self, TT):

def after_accept(self, worklist):
for message in worklist.incoming:

if 'fileOp' in message and 'directory' in message['fileOp']:
continue

type_suffix = self.__find_type(message['new_file'][0:2])
## FIXME confused as to how this could ever be true since find_type never returns "UNKNOWN"
#if type_suffix == 'UNKNOWN':
Expand All @@ -53,6 +57,13 @@ def after_accept(self, worklist):
continue

message['new_file'] = message['new_file'] + type_suffix

if 'rename' in message:
message['rename'] = message['rename'] + type_suffix
message['rename'] += type_suffix

if 'fileOp' in message and 'rename' in message['fileOp']:
message['fileOp']['rename'] += type_suffix

# TODO else -> worklist.rejected.append(message) ?? should this be happening at any point?


23 changes: 14 additions & 9 deletions sarracenia/flowcb/block_reassembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def after_work(self, worklist) -> None:
blksz=humanfriendly.parse_size(blk_suffix[1],binary=True)

if blkno != m['blocks']['number']:
logger.warning(" mismatch {m['relPath']} name says {blkno} but message says {m['block']['number']}" )
logger.warning( f"mismatch {m['relPath']} name says {blkno} but message says {m['block']['number']}" )
blkno = m['blocks']['number']

#determine root file name.
Expand All @@ -131,7 +131,7 @@ def after_work(self, worklist) -> None:
flck = flufl.lock.Lock(lock_file)

flck.lock()
logger.info( f"10 locked {flck} lock_file: {lock_file}" )
#logger.debug( f"10 locked {flck} lock_file: {lock_file}" )

pf=open(part_file,'rb')

Expand All @@ -148,7 +148,7 @@ def after_work(self, worklist) -> None:
old_blocks = rfm.get()

if old_blocks and not 'waiting' in old_blocks:
old_blocks['waiting'] = {}
old_blocks['waiting'] = m['blocks']['manifest'].copy()

# calculate old file size.
if old_blocks and 'manifest' in old_blocks:
Expand All @@ -165,8 +165,9 @@ def after_work(self, worklist) -> None:

# update old_blocks to reflect receipt of this block.
if old_blocks and 'manifest' in old_blocks:
logger.info( f" read old block manifest from attributes: {old_blocks['manifest']}" )
logger.info( f" also show waiting: {old_blocks['waiting']}" )
logger.debug( f" read {len(old_blocks['manifest'])} blocks in manifest, waiting for {len(old_blocks['waiting'])} " )
logger.debug( f" read old block manifest from attributes: {old_blocks['manifest']}" )
logger.debug( f" also show waiting: {old_blocks['waiting']}" )
found=False
sz=0
# add
Expand All @@ -181,7 +182,6 @@ def after_work(self, worklist) -> None:

if blkno in old_blocks['waiting']:
del old_blocks['waiting'][blkno]
logger.info( f"deleted block {blkno} from waiting: {len(old_blocks['waiting'])} left. ")

# calculate where to seek to...
offset=0
Expand All @@ -195,7 +195,11 @@ def after_work(self, worklist) -> None:
byteCount = m['blocks']['manifest'][blkno]['size']

logger.info( f" blocks: adding block {blkno} by seeking to: {offset} to write {byteCount} bytes in {root_file}" )
logger.info( f" still waiting for: {len(old_blocks['waiting'])} " )
#if len(old_blocks['waiting']) > 0 :
# logger.info( f" still waiting for: {len(old_blocks['waiting'])} " )
#else:
# logger.info( f" we have received every block now." )

#- {old_blocks['waiting']} " )

# FIXME: can seek ever fail? how do we check?
Expand Down Expand Up @@ -231,7 +235,7 @@ def after_work(self, worklist) -> None:
"""
with sarracenia.blockmanifest.BlockManifest(root_file) as rfm:
rfm.set(old_blocks)
m.setReport( 206, f"file block subset {m['blocks']['number']} received and reassembled ok. waiting for {(len(old_blocks['waiting']))} more blocks." )
m.setReport( 206, f"file block subset {m['blocks']['number']} received and written ok. waiting for {(len(old_blocks['waiting']))} more blocks." )
worklist.rejected.append(m)
else:
# FIXME: for inflight. now rename the file to the real name.
Expand All @@ -244,8 +248,9 @@ def after_work(self, worklist) -> None:
logger.info( f"completed reassembly of {m['relPath']}" )
new_ok.append(m)
if hasattr(self.o, 'block_manifest_delete') and self.o.block_manifest_delete:
manifest = msg['new_file'] + "§block_manifest§"
manifest = m['new_dir'] + os.sep + m['new_file'] + "§block_manifest§"
if os.path.exists(manifest):
logger.info( f"deleting {manifest}")
os.unlink(manifest)
else:
del old_blocks['waiting']
Expand Down
4 changes: 2 additions & 2 deletions sarracenia/flowcb/gather/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ def post_file_in_parts(self, path, lstat):

if features['reassembly']['present'] and \
(not hasattr(self.o, 'block_manifest_delete') or not self.o.block_manifest_delete):
with sarracenia.blockmanifest.BlockManifest( path ) as x:
x.set(msg['blocks'])
with sarracenia.blockmanifest.BlockManifest( path ) as bm:
bm.set(msg['blocks'])

messages = []
for current_block in blocks:
Expand Down
5 changes: 3 additions & 2 deletions sarracenia/sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2600,7 +2600,7 @@ def status(self):
naturalSize(m["transferRxFiles"]).replace("B","F").replace("Fyte","File"), \
naturalSize(m["transferTxBytes"]), \
naturalSize(m["transferTxFiles"]).replace("B","F").replace("Fyte","File"), \
time_base )
m["time_base"] )
else:
line += " %10s %10s %9s %5s %5s %10s %8s" % ( "-", "-", "-", "-", "-", "-", "-" )
if self.options.displayFull:
Expand Down Expand Up @@ -2805,7 +2805,8 @@ def convert1(self,cfg):
if 'none' in line[1].lower():
v=line[1]
else:
line[1]= '+' + line[1]
if line[1][0] not in ['+','-']:
line[1]= '+' + line[1]
v=line[1]

if k == 'continue':
Expand Down
Loading