Skip to content

Commit

Permalink
Merge pull request #1110 from MetPX/restoring_blockmanifest_transfers
Browse files Browse the repository at this point in the history
Restoring blockmanifest transfers
  • Loading branch information
petersilva authored Jun 17, 2024
2 parents afd88d8 + 79d685e commit b0277c4
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 31 deletions.
14 changes: 9 additions & 5 deletions sarracenia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,16 +356,20 @@ def durationToSeconds(str_value, default=None) -> float:

return duration

"""
report codes are cribbed from HTTP, when a new situation arises, just peruse a list,
and pick one that fits. Should also be easier for others to use:
https://en.wikipedia.org/wiki/List_of_HTTP_status_codes
"""
known_report_codes = {
201: "Download successful. (variations: Downloaded, Inserted, Published, Copied, or Linked)",
202: "Accepted. mkdir skipped as it already exists",
203: "Non-Authoritative Information: transformed during download.",
205:
"Reset Content: truncated. File is shorter than originally expected (changed length during transfer) This only arises during multi-part transfers.",
205: "Reset Content: checksum recalculated on receipt.",
304:
"Not modified (Checksum validated, unchanged, so no download resulted.)",
206: "Partial Content: received and inserted.",
304: "Not modified (Checksum validated, unchanged, so no download resulted.)",
307: "Insertion deferred (writing to temporary part file for the moment.)",
417: "Expectation Failed: invalid notification message (corrupt headers)",
422: "Unprocessable Content: could not determine path to transfer to",
Expand Down Expand Up @@ -744,7 +748,7 @@ def setReport(msg, code, text=None):
text = 'unknown disposition'

if 'report' in msg:
logger.warning('overriding initial report: %d: %s' %
logger.debug('overriding initial report: %d: %s' %
(msg['report']['code'], msg['report']['message']))

msg['report'] = {'code': code, 'timeCompleted': nowstr(), 'message': text}
Expand Down
12 changes: 6 additions & 6 deletions sarracenia/blockmanifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,16 @@ def __init__(self,path):

self.lock.lock()

self.x = None
self.new_x = None
self.x = {}
self.new_x = {}

if os.path.exists(self.path):
self.fd = open(self.path,"r+")
s=self.fd.read()
self.x = json.loads(s)
try:
self.x = json.loads(s)
except Exception as ex:
pass

for k in ['manifest', 'waiting' ]:
if k in self.x:
Expand Down Expand Up @@ -127,12 +130,9 @@ def persist(self):
return

if self.new_x and (self.new_x != self.x):
logger.info( f"overwriting" )
self.fd.seek(0)
self.fd.write(json.dumps(self.new_x,sort_keys=True,indent=4))
self.fd.truncate()
else:
logger.info( f"closing unchanged" )

self.fd.close()
self.lock.unlock()
Expand Down
5 changes: 1 addition & 4 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def __repr__(self) -> str:
'batch' : 100,
'baseDir': None,
'baseUrl_relPath': False,
'block_reassemble': True,
'delete': False,
'documentRoot': None,
'download': False,
Expand Down Expand Up @@ -142,7 +141,7 @@ def __repr__(self) -> str:


# all the boolean settings.
flag_options = [ 'acceptSizeWrong', 'acceptUnmatched', 'amqp_consumer', 'baseUrl_relPath', 'block_reassemble', 'debug', \
flag_options = [ 'acceptSizeWrong', 'acceptUnmatched', 'amqp_consumer', 'baseUrl_relPath', 'debug', \
'delete', 'discard', 'download', 'dry_run', 'durable', 'exchangeDeclare', 'exchangeSplit', 'logReject', 'realpathFilter', \
'follow_symlinks', 'force_polling', 'inline', 'inlineOnly', 'inplace', 'logMetrics', 'logStdout', 'logReject', 'restore', \
'messageDebugDump', 'mirror', 'timeCopy', 'notify_only', 'overwrite', 'post_on_start', \
Expand Down Expand Up @@ -1502,8 +1501,6 @@ def parse_line(self, component, cfg, cfname, lineno, l ):
if k == 'continue':
return

#FIXME: note for Clea, line conversion to v3 complete here.

line = list(map(lambda x: self._varsub(x), line))

if len(line) == 1:
Expand Down
13 changes: 8 additions & 5 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ def __init__(self, cfg=None):

self.plugins['load'].extend(self.o.destfn_scripts)

self.block_reassembly_active = 'block_reassembly' in self.plugins['load'] or \
'sarracenia.flowcb.block_reassembly' in self.plugins['load']

# metrics - dictionary with names of plugins as the keys
self.metrics_lastWrite=0
self.metricsFlowReset()
Expand Down Expand Up @@ -2689,11 +2692,11 @@ def set_local_file_attributes(self, local_file, msg):
logger.debug("%s" % local_file)

# if the file is not partitioned, the the onfly_checksum is for the whole file.
# cache it here, along with the mtime.

if ('blocks' in msg) and sarracenia.features['reassembly']['present']:
with sarracenia.blockmanifest.BlockManifest(local_file) as y:
y.set( msg['blocks'] )
# cache it here, along with the mtime, unless block_reassembly plugin is active...
if ('blocks' in msg) and sarracenia.features['reassembly']['present'] and not self.block_reassembly_active:
with sarracenia.blockmanifest.BlockManifest(local_file) as bm:
bm.set( msg['blocks'] )

x = sarracenia.filemetadata.FileMetadata(local_file)
# FIXME ... what to do when checksums don't match?
Expand Down
23 changes: 14 additions & 9 deletions sarracenia/flowcb/block_reassembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def after_work(self, worklist) -> None:
blksz=humanfriendly.parse_size(blk_suffix[1],binary=True)

if blkno != m['blocks']['number']:
logger.warning(" mismatch {m['relPath']} name says {blkno} but message says {m['block']['number']}" )
logger.warning( f"mismatch {m['relPath']} name says {blkno} but message says {m['block']['number']}" )
blkno = m['blocks']['number']

#determine root file name.
Expand All @@ -131,7 +131,7 @@ def after_work(self, worklist) -> None:
flck = flufl.lock.Lock(lock_file)

flck.lock()
logger.info( f"10 locked {flck} lock_file: {lock_file}" )
#logger.debug( f"10 locked {flck} lock_file: {lock_file}" )

pf=open(part_file,'rb')

Expand All @@ -148,7 +148,7 @@ def after_work(self, worklist) -> None:
old_blocks = rfm.get()

if old_blocks and not 'waiting' in old_blocks:
old_blocks['waiting'] = {}
old_blocks['waiting'] = m['blocks']['manifest'].copy()

# calculate old file size.
if old_blocks and 'manifest' in old_blocks:
Expand All @@ -165,8 +165,9 @@ def after_work(self, worklist) -> None:

# update old_blocks to reflect receipt of this block.
if old_blocks and 'manifest' in old_blocks:
logger.info( f" read old block manifest from attributes: {old_blocks['manifest']}" )
logger.info( f" also show waiting: {old_blocks['waiting']}" )
logger.debug( f" read {len(old_blocks['manifest'])} blocks in manifest, waiting for {len(old_blocks['waiting'])} " )
logger.debug( f" read old block manifest from attributes: {old_blocks['manifest']}" )
logger.debug( f" also show waiting: {old_blocks['waiting']}" )
found=False
sz=0
# add
Expand All @@ -181,7 +182,6 @@ def after_work(self, worklist) -> None:

if blkno in old_blocks['waiting']:
del old_blocks['waiting'][blkno]
logger.info( f"deleted block {blkno} from waiting: {len(old_blocks['waiting'])} left. ")

# calculate where to seek to...
offset=0
Expand All @@ -195,7 +195,11 @@ def after_work(self, worklist) -> None:
byteCount = m['blocks']['manifest'][blkno]['size']

logger.info( f" blocks: adding block {blkno} by seeking to: {offset} to write {byteCount} bytes in {root_file}" )
logger.info( f" still waiting for: {len(old_blocks['waiting'])} " )
#if len(old_blocks['waiting']) > 0 :
# logger.info( f" still waiting for: {len(old_blocks['waiting'])} " )
#else:
# logger.info( f" we have received every block now." )

#- {old_blocks['waiting']} " )

# FIXME: can seek ever fail? how do we check?
Expand Down Expand Up @@ -231,7 +235,7 @@ def after_work(self, worklist) -> None:
"""
with sarracenia.blockmanifest.BlockManifest(root_file) as rfm:
rfm.set(old_blocks)
m.setReport( 206, f"file block subset {m['blocks']['number']} received and reassembled ok. waiting for {(len(old_blocks['waiting']))} more blocks." )
m.setReport( 206, f"file block subset {m['blocks']['number']} received and written ok. waiting for {(len(old_blocks['waiting']))} more blocks." )
worklist.rejected.append(m)
else:
# FIXME: for inflight. now rename the file to the real name.
Expand All @@ -244,8 +248,9 @@ def after_work(self, worklist) -> None:
logger.info( f"completed reassembly of {m['relPath']}" )
new_ok.append(m)
if hasattr(self.o, 'block_manifest_delete') and self.o.block_manifest_delete:
manifest = msg['new_file'] + "§block_manifest§"
manifest = m['new_dir'] + os.sep + m['new_file'] + "§block_manifest§"
if os.path.exists(manifest):
logger.info( f"deleting {manifest}")
os.unlink(manifest)
else:
del old_blocks['waiting']
Expand Down
4 changes: 2 additions & 2 deletions sarracenia/flowcb/gather/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ def post_file_in_parts(self, path, lstat):

if features['reassembly']['present'] and \
(not hasattr(self.o, 'block_manifest_delete') or not self.o.block_manifest_delete):
with sarracenia.blockmanifest.BlockManifest( path ) as x:
x.set(msg['blocks'])
with sarracenia.blockmanifest.BlockManifest( path ) as bm:
bm.set(msg['blocks'])

messages = []
for current_block in blocks:
Expand Down

0 comments on commit b0277c4

Please sign in to comment.