Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue1189 SFTP FSETATTTR fix + timeout 0 support #1190

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion docs/source/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,18 @@ off and output with *post_exchangeSplit*, which route notification with the same
the same member of a **second layer of subscribers (winnow) whose duplicate suppression caches
are active.**

nofsetstat <off|on> (default: off)
----------------------------------

Error messages such as this::

[ERROR] sarracenia.flow send could not send /source/filename to inflight=None sftp://[email protected]/ /destination/filename: FSETSTAT unsupported

Some SFTP servers will restrict permissions in ways which limit functionality. Cannot set modification times, or
truncate files (when new file is shorter than earlier version.), cannot set mode/permission bits.
To avoid warnings or errors being generated about FSETSTAT, one can accept the limited functionality
by setting **nofsetstat on**


outlet post|json|url (default: post)
------------------------------------
Expand Down Expand Up @@ -2023,11 +2035,12 @@ a topicPrefix, followed by subtopics derived from the *relPath* field of the mes
Some networks may choose to use different topic conventions, external to sarracenia.


timeout <interval> (default: 0)
timeout <interval> (default: 300)
-------------------------------

The **timeout** option, sets the number of seconds to wait before aborting a
connection or download transfer (applied per buffer during transfer).
Setting to zero disables timeouts.


timezone <string> (default: UTC)
Expand Down
16 changes: 15 additions & 1 deletion docs/source/fr/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,19 @@ utiliser *post_exchangeSplit* pour la sortie. Cela achemine les publications en
une **deuxième couche d’abonnés (winnow) dont les caches de suppression des doublons sont actives.**


nofsetstat on|off (défaut: faux)
--------------------------------

Certains serveurs limitent le permissions aux répertoires::

[ERROR] sarracenia.flow send could not send /source/filename to inflight=None sftp://[email protected]/ /destination/filename: FSETSTAT unsupported

Les autorisations restreintes sur le serveur signifient des fonctionnalités limitées. Impossible de corriger les
heures de modification ou de tronquer les fichiers (lorsque le nouveau fichier est plus court que la version
précédente), impossible de définir les bits de mode/autorisation. On allume cette option pour supprimer
des avertissements pour vous informer de tels difficultés.


outlet post|json|url (défaut: post)
-----------------------------------

Expand Down Expand Up @@ -1994,11 +2007,12 @@ source et la destination sont comparés.

Lorsqu’il est défini dans un composant de publication, les en-têtes *atime* et *mtime* des messages d'annonce sont éliminés.

timeout <intervalle> (défaut: 0)
timeout <intervalle> (défaut: 300)
--------------------------------

L’option **timeout** définit le nombre de secondes à attendre avant d’interrompre un
transfert de connexion ou de téléchargement (appliqué pendant le transfert).
On peut désactiver les **timeout** avec 0 comme intervalle.

timezone <chaine> (défaut: UTC)
--------------------------------
Expand Down
3 changes: 2 additions & 1 deletion sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def __repr__(self) -> str:
'metrics_writeInterval': 5,
'nodupe_driver': 'disk',
'nodupe_ttl': 0,
'nofsetstat': True,
'overwrite': True,
'path': [],
'permDefault' : octal_number(0),
Expand Down Expand Up @@ -142,7 +143,7 @@ def __repr__(self) -> str:
flag_options = [ 'acceptSizeWrong', 'acceptUnmatched', 'amqp_consumer', 'baseUrl_relPath', 'debug', \
'delete', 'discard', 'download', 'dry_run', 'durable', 'exchangeDeclare', 'exchangeSplit', 'logReject', 'realpathFilter', \
'follow_symlinks', 'force_polling', 'inline', 'inlineOnly', 'inplace', 'logMetrics', 'logStdout', 'logReject', 'restore', \
'messageDebugDump', 'mirror', 'timeCopy', 'notify_only', 'overwrite', 'post_on_start', \
'messageDebugDump', 'mirror', 'timeCopy', 'nofsetstat', 'notify_only', 'overwrite', 'post_on_start', \
'permCopy', 'persistent', 'queueBind', 'queueDeclare', 'randomize', 'recursive', 'realpathPost', \
'reconnect', 'report', 'reset', 'retry_refilter', 'retryEmptyBeforeExit', 'save',
'sundew_compat_regex_first_match_is_zero', 'sourceFromExchange', 'sourceFromMessage', 'topicCopy',
Expand Down
31 changes: 20 additions & 11 deletions sarracenia/transfer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ class TimeoutException(Exception):


# alarm_cancel
def alarm_cancel():
def alarm_cancel(time):

if time <= 0:
return

if sys.platform != 'win32':
signal.alarm(0)

Expand All @@ -68,6 +72,10 @@ def alarm_set(time):
FIXME: replace with set itimer for > 1 second resolution... currently rouding to nearest second.
"""

# shortcut to disable alarms if timeout set to zero.
if time <= 0:
return

if sys.platform != 'win32':
signal.signal(signal.SIGALRM, alarm_raise)
signal.alarm(int(time + 0.5))
Expand Down Expand Up @@ -133,6 +141,7 @@ class Transfer():

uses options (on Sarracenia.config data structure passed to constructor/factory.)
* credentials - used to authentication information.
* nofsetstat - used for SFTP to deal with limited server side permissions.
* sendTo - server to connect to.
* batch - how many files to transfer before a connection is torn down and re-established.
* permDefault - what permissions to set on files transferred.
Expand Down Expand Up @@ -289,14 +298,14 @@ def read_write(self, src, dst, length=0):

if length == 0:
while True:
if self.o.timeout: alarm_set(self.o.timeout)
alarm_set(self.o.timeout)
chunk = src.read(self.o.bufsize)
if chunk:
new_chunk = self.on_data(chunk)
rw_length += len(new_chunk)
dst.write(new_chunk)
self.logProgress(rw_length)
alarm_cancel()
alarm_cancel(self.o.timeout)
if not chunk: break
if self.sumalgo: self.sumalgo.update(chunk)
self.throttle(chunk)
Expand All @@ -311,14 +320,14 @@ def read_write(self, src, dst, length=0):

i = 0
while i < nc:
if self.o.timeout: alarm_set(self.o.timeout)
alarm_set(self.o.timeout)
chunk = src.read(self.o.bufsize)
if chunk:
new_chunk = self.on_data(chunk)
rw_length += len(new_chunk)
dst.write(new_chunk)
self.logProgress(rw_length)
alarm_cancel()
alarm_cancel(self.o.timeout)
if not chunk: break
if self.sumalgo: self.sumalgo.update(chunk)
self.throttle(chunk)
Expand All @@ -327,14 +336,14 @@ def read_write(self, src, dst, length=0):
# remaining

if r > 0:
if self.o.timeout: alarm_set(self.o.timeout)
alarm_set(self.o.timeout)
chunk = src.read(r)
if chunk:
new_chunk = self.on_data(chunk)
rw_length += len(new_chunk)
dst.write(new_chunk)
self.logProgress(rw_length)
alarm_cancel()
alarm_cancel(self.o.timeout)
if self.sumalgo: self.sumalgo.update(chunk)
self.throttle(chunk)

Expand Down Expand Up @@ -455,15 +464,15 @@ def throttle(self, buf):
def write_chunk(self, chunk):
if self.chunk_iow: self.chunk_iow.write(chunk)
self.rw_length += len(chunk)
alarm_cancel()
alarm_cancel(self.o.timeout)
self.logProgress(self.rw_length)
if self.sumalgo: self.sumalgo.update(chunk)
self.throttle(chunk)
if self.o.timeout: alarm_set(self.o.timeout)
alarm_set(self.o.timeout)

# write_chunk_end
def write_chunk_end(self):
alarm_cancel()
alarm_cancel(self.o.timeout)
self.chunk_iow = None
return self.rw_length

Expand All @@ -474,7 +483,7 @@ def write_chunk_init(self, proto):
self.tbegin = nowflt()
self.lastLog = self.tbegin
self.rw_length = 0
if self.o.timeout: alarm_set(self.o.timeout)
alarm_set(self.o.timeout)

def gethttpsUrl(self, path):
return None
Expand Down
36 changes: 18 additions & 18 deletions sarracenia/transfer/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def cd(self, path):
self.ftp.cwd(self.originalDir)
self.ftp.cwd(path)
self.pwd = path
alarm_cancel()
alarm_cancel(self.o.timeout)

def cd_forced(self, perm, path):
logger.debug("sr_ftp cd_forced %d %s" % (perm, path))
Expand All @@ -96,11 +96,11 @@ def cd_forced(self, perm, path):
self.ftp.cwd(self.originalDir)
try:
self.ftp.cwd(path)
alarm_cancel()
alarm_cancel(self.o.timeout)
return
except:
pass
alarm_cancel()
alarm_cancel(self.o.timeout)

# need to create subdir

Expand All @@ -113,25 +113,25 @@ def cd_forced(self, perm, path):
try:
alarm_set(self.o.timeout)
self.ftp.cwd(d)
alarm_cancel()
alarm_cancel(self.o.timeout)
continue
except:
pass

# create
alarm_set(self.o.timeout)
self.ftp.mkd(d)
alarm_cancel()
alarm_cancel(self.o.timeout)

# chmod
alarm_set(self.o.timeout)
self.ftp.voidcmd('SITE CHMOD ' + "{0:o}".format(perm) + ' ' + d)
alarm_cancel()
alarm_cancel(self.o.timeout)

# cd
alarm_set(self.o.timeout)
self.ftp.cwd(d)
alarm_cancel()
alarm_cancel(self.o.timeout)

# check_is_connected

Expand Down Expand Up @@ -164,7 +164,7 @@ def chmod(self, perm, path):
logger.debug("sr_ftp chmod %s %s" % (str(perm), path))
alarm_set(self.o.timeout)
self.ftp.voidcmd('SITE CHMOD ' + "{0:o}".format(perm) + ' ' + path)
alarm_cancel()
alarm_cancel(self.o.timeout)

# close
def close(self):
Expand All @@ -179,7 +179,7 @@ def close(self):
old_ftp.quit()
except:
pass
alarm_cancel()
alarm_cancel(self.o.timeout)

# connect...
def connect(self):
Expand Down Expand Up @@ -247,7 +247,7 @@ def connect(self):
(self.host, self.user))
logger.debug('Exception details: ', exc_info=True)

alarm_cancel()
alarm_cancel(self.o.timeout)
return self.connected

# credentials...
Expand Down Expand Up @@ -288,7 +288,7 @@ def delete(self, path):
self.ftp.delete(path)
except:
d = self.ftp.pwd()
alarm_cancel()
alarm_cancel(self.o.timeout)

# get
def get(self,
Expand Down Expand Up @@ -350,7 +350,7 @@ def getAccelerated(self, msg, remote_file, local_file, length=0, remote_offset=0
def getcwd(self):
alarm_set(self.o.timeout)
pwd = self.ftp.pwd()
alarm_cancel()
alarm_cancel(self.o.timeout)
return pwd

# ls
Expand All @@ -359,7 +359,7 @@ def ls(self):
self.entries = {}
alarm_set(self.o.timeout)
self.ftp.retrlines('LIST', self.line_callback)
alarm_cancel()
alarm_cancel(self.o.timeout)
logger.debug("sr_ftp ls = (size: %d) %s ..." % (len(self.entries), str(self.entries)[0:255]))
return self.entries

Expand Down Expand Up @@ -400,12 +400,12 @@ def mkdir(self, remote_dir):
logger.debug("sr_ftp mkdir %s" % remote_dir)
alarm_set(self.o.timeout)
self.ftp.mkd(remote_dir)
alarm_cancel()
alarm_cancel(self.o.timeout)
alarm_set(self.o.timeout)
self.ftp.voidcmd('SITE CHMOD ' +
"{0:o}".format(self.o.permDirDefault) + ' ' +
remote_dir)
alarm_cancel()
alarm_cancel(self.o.timeout)

# put
def put(self,
Expand Down Expand Up @@ -464,18 +464,18 @@ def rename(self, remote_old, remote_new):
logger.debug("sr_ftp rename %s %s" % (remote_old, remote_new))
alarm_set(self.o.timeout)
self.ftp.rename(remote_old, remote_new)
alarm_cancel()
alarm_cancel(self.o.timeout)

# rmdir
def rmdir(self, path):
logger.debug("sr_ftp rmdir %s" % path)
alarm_set(self.o.timeout)
self.ftp.rmd(path)
alarm_cancel()
alarm_cancel(self.o.timeout)

# umask
def umask(self):
logger.debug("sr_ftp umask")
alarm_set(self.o.timeout)
self.ftp.voidcmd('SITE UMASK 777')
alarm_cancel()
alarm_cancel(self.o.timeout)
12 changes: 6 additions & 6 deletions sarracenia/transfer/https.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def ls(self):
while True:
alarm_set(self.o.timeout)
chunk = self.http.read(self.o.bufsize)
alarm_cancel()
alarm_cancel(self.o.timeout)
if not chunk: break
if dbuf: dbuf += chunk
else: dbuf = chunk
Expand Down Expand Up @@ -339,7 +339,7 @@ def __open__(self, path, remote_offset=0, length=0):

self.connected = True

alarm_cancel()
alarm_cancel(self.o.timeout)

return True

Expand All @@ -348,21 +348,21 @@ def __open__(self, path, remote_offset=0, length=0):
logger.error(
'Server couldn\'t fulfill the request. Error code: %s, %s' %
(e.code, e.reason))
alarm_cancel()
alarm_cancel(self.o.timeout)
self.connected = False
raise
except urllib.error.URLError as e:
logger.error('Download failed 5 %s ' % self.urlstr)
logger.error('Failed to reach server. Reason: %s' % e.reason)
alarm_cancel()
alarm_cancel(self.o.timeout)
self.connected = False
raise
except:
logger.warning("unable to open %s" % self.urlstr)
logger.debug('Exception details: ', exc_info=True)
self.connected = False
alarm_cancel()
alarm_cancel(self.o.timeout)
raise

alarm_cancel()
alarm_cancel(self.o.timeout)
return False
1 change: 0 additions & 1 deletion sarracenia/transfer/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import json

from sarracenia.transfer import Transfer
from sarracenia.transfer import alarm_cancel, alarm_set, alarm_raise

import boto3, botocore
from boto3.s3.transfer import TransferConfig
Expand Down
Loading
Loading