Skip to content

Commit

Permalink
wrap protocol calls with try/except
Browse files Browse the repository at this point in the history
  • Loading branch information
petersilva committed May 2, 2024
1 parent c9761fc commit b1b7cf8
Showing 1 changed file with 117 additions and 41 deletions.
158 changes: 117 additions & 41 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2230,13 +2230,21 @@ def send(self, msg, options):
cwd = None
if hasattr(self.proto[self.scheme], 'getcwd'):
if not self.o.dry_run:
cwd = self.proto[self.scheme].getcwd()
try:
cwd = self.proto[self.scheme].getcwd()
except Exception as ex:
logger.error( f"could not getcwd: {ex}" )
return False

if cwd != new_dir:
logger.debug("%s_transport send cd to %s" %
(self.scheme, new_dir))
if not self.o.dry_run:
self.proto[self.scheme].cd_forced(775, new_dir)
try:
self.proto[self.scheme].cd_forced(775, new_dir)
except Exception as ex:
logger.error( f"could not chdir to {new_dir}: {ex}" )
return False

#=================================
# delete event
Expand All @@ -2248,9 +2256,18 @@ def send(self, msg, options):
logger.debug("message is to remove %s" % new_file)
if not self.o.dry_run:
if 'directory' in msg['fileOp']:
self.proto[self.scheme].rmdir(new_file)
try:
self.proto[self.scheme].rmdir(new_file)
except Exception as ex:
logger.error( f"could not rmdir {new_file}: {ex}" )
return False
else:
self.proto[self.scheme].delete(new_file)
try:
self.proto[self.scheme].delete(new_file)
except Exception as ex:
logger.error( f"could not delete {new_file}: {ex}" )
return False

msg.setReport(201, f'file or directory removed')
self.metrics['flow']['transferTxFiles'] += 1
self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted']
Expand All @@ -2262,7 +2279,12 @@ def send(self, msg, options):
if hasattr(self.proto[self.scheme], 'delete'):
logger.debug( f"message is to rename {msg['fileOp']['rename']} to {new_file}" )
if not self.o.dry_run:
self.proto[self.scheme].rename(msg['fileOp']['rename'], new_file)
try:
self.proto[self.scheme].rename(msg['fileOp']['rename'], new_file)
except Exception as ex:
logger.error( f"could not rename {new_file}: {ex}" )
return False

msg.setReport(201, f'file renamed')
self.metrics['flow']['transferTxFiles'] += 1
self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted']
Expand All @@ -2276,7 +2298,11 @@ def send(self, msg, options):
if hasattr(self.proto[self.scheme], 'mkdir'):
logger.debug( f"message is to mkdir {new_file}")
if not self.o.dry_run:
self.proto[self.scheme].mkdir(new_file)
try:
self.proto[self.scheme].mkdir(new_file)
except Exception as ex:
logger.error( f"could not mkdir {new_file}: {ex}" )
return False
msg.setReport(201, f'directory created')
self.metrics['flow']['transferTxFiles'] += 1
self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted']
Expand All @@ -2295,7 +2321,11 @@ def send(self, msg, options):
if hasattr(self.proto[self.scheme], 'link'):
logger.debug("message is to link %s to: %s" % (new_file, msg['fileOp']['hlink']))
if not self.o.dry_run:
self.proto[self.scheme].link(msg['fileOp']['hlink'], new_file)
try:
self.proto[self.scheme].link(msg['fileOp']['hlink'], new_file)
except Exception as ex:
logger.error( f"could not link {new_file}: {ex}" )
return False
return True
logger.error("%s, hardlinks not supported" % self.scheme)
return False
Expand All @@ -2305,7 +2335,11 @@ def send(self, msg, options):
if hasattr(self.proto[self.scheme], 'symlink'):
logger.debug("message is to link %s to: %s" % (new_file, msg['fileOp']['link']))
if not self.o.dry_run:
self.proto[self.scheme].symlink(msg['fileOp']['link'], new_file)
try:
self.proto[self.scheme].symlink(msg['fileOp']['link'], new_file)
except Exception as ex:
logger.error( f"could not symlink {new_file}: {ex}" )
return False
msg.setReport(201, f'file linked')
self.metrics['flow']['transferTxFiles'] += 1
self.metrics['flow']['transferTxLast'] = msg['report']['timeCompleted']
Expand Down Expand Up @@ -2356,38 +2390,64 @@ def send(self, msg, options):

if inflight == None or (('blocks' in msg) and
(msg['blocks']['method'] != 'inplace')):
if not self.o.dry_run:
if accelerated:
len_written = self.proto[self.scheme].putAccelerated( msg, local_file, new_file)
else:
len_written = self.proto[self.scheme].put( msg, local_file, new_file)
try:
if not self.o.dry_run:
if accelerated:
len_written = self.proto[self.scheme].putAccelerated( msg, local_file, new_file)
else:
len_written = self.proto[self.scheme].put( msg, local_file, new_file)
except Exception as ex:
logger.error( f"could not send inflight=None {new_file}: {ex}" )
return False

elif (('blocks' in msg)
and (msg['blocks']['method'] == 'inplace')):
if not self.o.dry_run:
self.proto[self.scheme].put(msg, local_file, new_file, offset,
try:
self.proto[self.scheme].put(msg, local_file, new_file, offset,
new_offset, msg['size'])
except Exception as ex:
logger.error( f"could not send inplace {new_file}: {ex}" )
return False

elif inflight == '.':
new_inflight_path = '.' + new_file
if not self.o.dry_run:
if accelerated:
len_written = self.proto[self.scheme].putAccelerated(
msg, local_file, new_inflight_path)
else:
len_written = self.proto[self.scheme].put(
msg, local_file, new_inflight_path)
self.proto[self.scheme].rename(new_inflight_path, new_file)
try:
if accelerated:
len_written = self.proto[self.scheme].putAccelerated(
msg, local_file, new_inflight_path)
else:
len_written = self.proto[self.scheme].put(
msg, local_file, new_inflight_path)
except Exception as ex:
logger.error( f"could not send inflight={inflight} {new_file}: {ex}" )
return False
try:
self.proto[self.scheme].rename(new_inflight_path, new_file)
except Exception as ex:
logger.error( f"could not rename inflight={inflight} {new_file}: {ex}" )
return False
else:
len_written = msg['size']

elif inflight[0] == '.':
new_inflight_path = new_file + inflight
if not self.o.dry_run:
if accelerated:
len_written = self.proto[self.scheme].putAccelerated(
msg, local_file, new_inflight_path)
else:
len_written = self.proto[self.scheme].put(msg, local_file, new_inflight_path)
self.proto[self.scheme].rename(new_inflight_path, new_file)
try:
if accelerated:
len_written = self.proto[self.scheme].putAccelerated(
msg, local_file, new_inflight_path)
else:
len_written = self.proto[self.scheme].put(msg, local_file, new_inflight_path)
except Exception as ex:
logger.error( f"could not send inflight={inflight} {new_file}: {ex}" )
return False
try:
self.proto[self.scheme].rename(new_inflight_path, new_file)
except Exception as ex:
logger.error( f"could not rename inflight={inflight} {new_file}: {ex}" )
return False
elif options.inflight[-1] == '/':
if not self.o.dry_run:
try:
Expand All @@ -2398,25 +2458,41 @@ def send(self, msg, options):
pass
new_inflight_path = options.inflight + new_file
if not self.o.dry_run:
if accelerated:
len_written = self.proto[self.scheme].putAccelerated(
msg, local_file, new_inflight_path)
else:
len_written = self.proto[self.scheme].put(
msg, local_file, new_inflight_path)
self.proto[self.scheme].rename(new_inflight_path, new_file)
try:
if accelerated:
len_written = self.proto[self.scheme].putAccelerated(
msg, local_file, new_inflight_path)
else:
len_written = self.proto[self.scheme].put(
msg, local_file, new_inflight_path)
except Exception as ex:
logger.error( f"could not send inflight={inflight} {new_file}: {ex}" )
return False
try:
self.proto[self.scheme].rename(new_inflight_path, new_file)
except Exception as ex:
logger.error( f"could not rename inflight={inflight} {new_file}: {ex}" )
return False
else:
len_written = msg['size']
elif inflight == 'umask':
if not self.o.dry_run:
self.proto[self.scheme].umask()
if accelerated:
len_written = self.proto[self.scheme].putAccelerated(
msg, local_file, new_file)
else:
len_written = self.proto[self.scheme].put(
msg, local_file, new_file)
self.proto[self.scheme].put(msg, local_file, new_file)
try:
if accelerated:
len_written = self.proto[self.scheme].putAccelerated(
msg, local_file, new_file)
else:
len_written = self.proto[self.scheme].put(
msg, local_file, new_file)
except Exception as ex:
logger.error( f"could not send inflight={inflight} {new_file}: {ex}" )
return False
try:
self.proto[self.scheme].put(msg, local_file, new_file)
except Exception as ex:
logger.error( f"could not rename inflight={inflight} {new_file}: {ex}" )
return False
else:
len_written = msg['size']

Expand Down

0 comments on commit b1b7cf8

Please sign in to comment.