Skip to content

Commit

Permalink
add fileSizeMax and bufsize and blocksize get S
Browse files Browse the repository at this point in the history
Add fileSizeMax implementation, with documentation in both languages.
change bufsize to bufSize (with a synonym so old seting still works)
change blocksize to blockSize (with a synonym so old seting still works)
  • Loading branch information
petersilva committed Aug 13, 2024
1 parent b4eb8ba commit 05bab59
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 53 deletions.
20 changes: 14 additions & 6 deletions docs/source/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -475,16 +475,16 @@ lowered to 1. For most usual situations the default is fine. For higher volume
cases, one could raise it to reduce transfer overhead. It is only used for file
transfer protocols, not HTTP ones at the moment.

blocksize <size> default: 0 (auto)
blockSize <size> default: 0 (auto)
-----------------------------------

NOTE: **EXPERIMENTAL** sr3, expected to return in future version**
This **blocksize** option controls the partitioning strategy used to post files.
This **blockSize** option controls the partitioning strategy used to post files.
The value should be one of::

0 - autocompute an appropriate partitioning strategy (default)
1 - always send entire files in a single part.
<blocksize> - used a fixed partition size (example size: 1M )
<blockSize> - used a fixed partition size (example size: 1M )

Files can be announced as multiple parts. Each part has a separate checksum.
The parts and their checksums are stored in the cache. Partitions can traverse
Expand Down Expand Up @@ -522,10 +522,10 @@ Once connected to an AMQP broker, the user needs to bind a queue
to exchanges and topics to determine the notification messages of interest.


bufsize <size> (default: 1MB)
bufSize <size> (default: 1MB)
-----------------------------

Files will be copied in *bufsize*-byte blocks. for use by transfer protocols.
Files will be copied in *bufSize*-byte blocks. for use by transfer protocols.


byteRateMax <size> (default: 0)
Expand Down Expand Up @@ -1225,6 +1225,14 @@ fileAgeMin
If files are newer than this setting (default: 0), then ignore them, they are too
new to post. 0 deactivates the setting.

fileSizeMax (size: default 0)
-----------------------------

The default value of *fileSizeMax* is 0, meaning there is no limit. However one may
wish to prevent downloads of very large files in some situations. Setting a maximum
file size with the *fileSizeMax* option can be used to prevent unintentional
downloading of large data files.

nodupe_ttl <off|on|999[smhdw]>
------------------------------

Expand Down Expand Up @@ -1532,7 +1540,7 @@ randomize <flag>

Active if *-r|--randomize* appears in the command line... or *randomize* is set
to True in the configuration file used. If there are several notification messages because the
file is posted by block (the *blocksize* option was set), the block notification messages
file is posted by block (the *blockSize* option was set), the block notification messages
are randomized meaning that they will not be posted

realpathAdjust <count> (Experimental) (default: 0)
Expand Down
22 changes: 16 additions & 6 deletions docs/source/fr/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -476,16 +476,16 @@ ajuster à 1. Pour la plupart des situations, le défaut est bien. Pour un volu
on pourrait l’augmenter pour réduire les frais généraux de transfert. Cette option est seulement utilisé pour les
protocoles de transfert de fichiers, et non HTTP pour le moment.

blocksize <size> défaut: 0 (auto)
blockSize <size> défaut: 0 (auto)
-----------------------------------

REMARQUE: **EXPERIMENTAL pour sr3, devrait revenir dans la version future**
Cette option **blocksize** contrôle la stratégie de partitionnement utilisée pour publier des fichiers.
Cette option **blockSize** contrôle la stratégie de partitionnement utilisée pour publier des fichiers.
La valeur doit être l’une des suivantes ::

0 - calcul automatiquement une stratégie de partitionnement appropriée (défaut).
1 - envoyez toujours des fichiers entiers en une seule partie.
<blocksize> - utiliser une taille de partition fixe (taille d’exemple : 1M ).
<blockSize> - utiliser une taille de partition fixe (taille d’exemple : 1M ).

Les fichiers peuvent être annoncés en plusieurs parties. Chaque partie à un somme de contrôle (checksum) distinct.
Les parties et leurs somme de contrôle sont stockées dans la cache. Les partitions peuvent traverser
Expand Down Expand Up @@ -522,10 +522,10 @@ L’option broker indique à chaque composant quel courtier contacter.
Une fois connecté à un courtier AMQP, l’utilisateur doit lier une fil d’attente
aux échanges et aux thèmes pour déterminer le messages d'annonce en question.

bufsize <size> (défaut: 1m)
bufSize <size> (défaut: 1m)
---------------------------

Les fichiers seront copiés en tranches de *bufsize* octets. Utilisé par les protocoles de transfert.
Les fichiers seront copiés en tranches de *bufSize* octets. Utilisé par les protocoles de transfert.

byteRateMax <size> (défaut: 0)
------------------------------
Expand Down Expand Up @@ -1217,6 +1217,16 @@ fileAgeMin
Si les fichiers sont plus neuf que ce paramètre (défaut: 0 ... désactivé), ignorez-les, ils sont trop
neufs pour qu'ils puissent être postés.


fileSizeMax (size: default 0)
-----------------------------

La valeur par défaut de *fileSizeMax* est 0, ce qui signifie qu'il n'y a pas de limite. Cependant, on peut
souhaiter empêcher le téléchargement de fichiers très volumineux dans certaines situations. La définition d'une
taille de fichier maximale avec l'option *fileSizeMax* peut être utilisée pour empêcher le
téléchargement involontaire de fichiers de données volumineux.


nodupe_ttl <off|on|999[smhdw]>
------------------------------

Expand Down Expand Up @@ -1525,7 +1535,7 @@ randomize <flag>

Actif si *-r|--randomize* apparaît dans la ligne de commande... ou *randomize* est défini
à True dans le fichier de configuration utilisé. S’il y a plusieurs postes parce que
le fichier est publié par bloc (l’option *blocksize* a été définie), les messages d'annonce de bloc
le fichier est publié par bloc (l’option *blockSize* a été définie), les messages d'annonce de bloc
sont randomisés, ce qui signifie qu’ils ne seront pas affichés.

realpathAdjust <compte> (Experimental) (défaut: 0)
Expand Down
16 changes: 9 additions & 7 deletions sarracenia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ def computeIdentity(msg, path, o, offset=0, data=None) -> None:
fp.seek( offset )

while i < offset+msg['size']:
buf = fp.read(o.bufsize)
buf = fp.read(o.bufSize)
if not buf: break
sumalgo.update(buf)
i += len(buf)
Expand Down Expand Up @@ -661,10 +661,10 @@ def fromFileInfo(path, o, lstat=None):
elif hasattr(o, 'exchange'):
msg['exchange'] = o.exchange

if hasattr(o, 'blocksize') and (o.blocksize > 1) and lstat and \
if hasattr(o, 'blockSize') and (o.blockSize > 1) and lstat and \
(os_stat.S_IFMT(lstat.st_mode) == os_stat.S_IFREG) and \
(lstat.st_size > o.blocksize):
msg['blocks'] = { 'method': 'inplace', 'number':-1, 'size': o.blocksize, 'manifest': {} }
(lstat.st_size > o.blockSize):
msg['blocks'] = { 'method': 'inplace', 'number':-1, 'size': o.blockSize, 'manifest': {} }

msg['local_offset'] = 0
msg['_deleteOnPost'] = set(['exchange', 'local_offset', 'subtopic', '_format'])
Expand Down Expand Up @@ -783,11 +783,13 @@ def getIDStr(msg) -> str:
s=""
if 'baseUrl' in msg:
s+=msg['baseUrl']+' '
if 'relPath' in msg:
else:
s+="baseUrl missing "
if 'relPath' in msg and len(msg['relPath']) > 0:
if msg['relPath'][0] != '/' and s and s[-1] != '/':
s+='/'
s+=msg['relPath']
elif 'retrievePath' in msg:
elif 'retrievePath' in msg and len(msg['retrievePath']) > 0 :
if msg['retrievePath'][0] != '/' and s and s[-1] != '/':
s+='/'
s+= msg['retrievePath']
Expand Down Expand Up @@ -1025,7 +1027,7 @@ def new_pathWrite(msg,options,data):
# ide
#if isinstance(data, io.IOBase ):
# with open(opath, 'wb') as f:
# while buf = data.read(self.o.bufsize) > 0 :
# while buf = data.read(self.o.bufSize) > 0 :
# sz=f.write(buf)

if 'content' in msg:
Expand Down
9 changes: 6 additions & 3 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def __repr__(self) -> str:

perm_options = [ 'permDefault', 'permDirDefault','permLog']

size_options = ['accelThreshold', 'blocksize', 'bufsize', 'byteRateMax', 'inlineByteMax']
size_options = ['accelThreshold', 'blockSize', 'bufSize', 'byteRateMax', 'fileSizeMax', 'inlineByteMax']

str_options = [
'action', 'admin', 'baseDir', 'broker', 'cluster', 'directory', 'exchange',
Expand Down Expand Up @@ -702,6 +702,8 @@ class Config:
'base_dir': 'baseDir',
'baseurl': 'baseUrl',
'bind_queue': 'queueBind',
'blocksize': 'blockSize',
'bufsize': 'bufSize',
'cache': 'nodupe_ttl',
'c': 'include',
'cb': 'nodupe_basis',
Expand Down Expand Up @@ -830,7 +832,7 @@ def __init__(self, parent=None) -> 'Config':
for i in parent:
setattr(self, i, parent[i])

self.bufsize = 1024 * 1024
self.bufSize = 1024 * 1024
self.byteRateMax = 0

self.fileAgeMax = 0 # disabled.
Expand All @@ -852,6 +854,7 @@ def __init__(self, parent=None) -> 'Config':
self.plugins_late = []
self.plugins_early = []
self.exchange = None
self.fileSizeMax = 0
self.filename = None
self.fixed_headers = {}
self.flatten = '/'
Expand Down Expand Up @@ -2498,7 +2501,7 @@ def parse_args(self, isPost=False):
nargs='?',
help='how many transfers per each connection')
parser.add_argument(
'--blocksize',
'--blockSize',
type=int,
nargs='?',
help=
Expand Down
6 changes: 6 additions & 0 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,12 @@ def do_download(self) -> None:
self.reject(msg, 500, "link %s failed" % msg['fileOp'])
continue

# all non-files taken care of above... rest of routine is normal file download.

if self.o.fileSizeMax > 0 and msg['size'] > self.o.fileSizeMax:
self.reject(msg, 413, f"Payload Too Large {msg.getIDStr()}")
continue

# establish new_inflight_path which is the file to download into initially.
if self.o.inflight == None or (
('blocks' in msg) and (msg['blocks']['method'] == 'inplace')):
Expand Down
4 changes: 2 additions & 2 deletions sarracenia/flow/poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

default_options = {
'acceptUnmatched': True,
'blocksize': 1,
'bufsize': 1024 * 1024,
'blockSize': 1,
'bufSize': 1024 * 1024,
'chmod': 0o400,
'pollUrl': None,
'follow_symlinks': False,
Expand Down
4 changes: 2 additions & 2 deletions sarracenia/flow/post.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

default_options = {
'acceptUnmatched': True,
'blocksize': 1,
'bufsize': 1024 * 1024,
'blockSize': 1,
'bufSize': 1024 * 1024,
'follow_symlinks': False,
'force_polling': False,
'inflight': None,
Expand Down
4 changes: 2 additions & 2 deletions sarracenia/flow/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

default_options = {
'acceptUnmatched': True,
'blocksize': 1,
'bufsize': 1024 * 1024,
'blockSize': 1,
'bufSize': 1024 * 1024,
'follow_symlinks': False,
'force_polling': False,
'inflight': None,
Expand Down
6 changes: 3 additions & 3 deletions sarracenia/flowcb/block_reassembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Block_reassembly(FlowCB):
given the reassemble setting is on, and have received a block file,
then:
* partition file is whose name ends in §block_<blokno>,<blocksize>_§
* partition file is whose name ends in §block_<blokno>,<blockSize>_§
* determing the inflight_file name for the entire file.
* lock the inflight_file.
* place the block in the working file.
Expand Down Expand Up @@ -206,14 +206,14 @@ def after_work(self, worklist) -> None:
rf.seek(offset)

# copy data from block partition file into final destination.
sz=self.o.bufsize if self.o.bufsize > byteCount else byteCount
sz=self.o.bufSize if self.o.bufSize > byteCount else byteCount
bytesTransferred=0
while bytesTransferred < byteCount:
b = pf.read(sz)
rf.write(b)
bytesTransferred += len(b)
bytesLeft = byteCount - bytesTransferred
sz=self.o.bufsize if self.o.bufsize > bytesLeft else bytesLeft
sz=self.o.bufSize if self.o.bufSize > bytesLeft else bytesLeft

rf.close()
pf.close()
Expand Down
14 changes: 7 additions & 7 deletions sarracenia/flowcb/gather/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def __init__(self, options):
self.new_events = OrderedDict()
self.left_events = OrderedDict()

#self.o.blocksize = 200 * 1024 * 1024
#self.o.blockSize = 200 * 1024 * 1024
self.o.create_modify = ('create' in self.o.fileEvents) or (
'modify' in self.o.fileEvents)

Expand Down Expand Up @@ -144,10 +144,10 @@ def post_delete(self, path, key=None, value=None,is_directory=False):
def post_file(self, path, lstat, key=None, value=None):
#logger.debug("start %s" % path)

# check the value of blocksize
# check the value of blockSize

fsiz = lstat.st_size
blksz = self.set_blocksize(self.o.blocksize, fsiz)
blksz = self.set_blockSize(self.o.blockSize, fsiz)

# if we should send the file in parts

Expand Down Expand Up @@ -210,10 +210,10 @@ def post_file_in_parts(self, path, lstat):
msg = sarracenia.Message.fromFileInfo(path, self.o, lstat)

logger.debug( f"initial msg:{msg}" )
# check the value of blocksize
# check the value of blockSize

fsiz = lstat.st_size
chunksize = self.set_blocksize(self.o.blocksize, fsiz)
chunksize = self.set_blockSize(self.o.blockSize, fsiz)

# count blocks and remainder

Expand Down Expand Up @@ -482,11 +482,11 @@ def process_event(self, event, src, dst):
return (True, self.post1file(src, lstat))
return (True, [])

def set_blocksize(self, bssetting, fsiz):
def set_blockSize(self, bssetting, fsiz):

tfactor = 50 * 1024 * 1024

if bssetting == 0: ## default blocksize
if bssetting == 0: ## default blockSize
return tfactor

elif bssetting == 1: ## send file as one piece.
Expand Down
12 changes: 6 additions & 6 deletions sarracenia/transfer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class Transfer():
* permDirDefault - what permission to set on directories created.
* timeout - how long to wait for operations to complete.
* byteRateMax - maximum transfer rate (throttle to avoid exceeding)
* bufsize - size of buffers for file transfers.
* bufSize - size of buffers for file transfers.
"""
@staticmethod
Expand Down Expand Up @@ -290,7 +290,7 @@ def read_write(self, src, dst, length=0):
if length == 0:
while True:
if self.o.timeout: alarm_set(self.o.timeout)
chunk = src.read(self.o.bufsize)
chunk = src.read(self.o.bufSize)
if chunk:
new_chunk = self.on_data(chunk)
rw_length += len(new_chunk)
Expand All @@ -304,15 +304,15 @@ def read_write(self, src, dst, length=0):

# exact length to be transfered

nc = int(length / self.o.bufsize)
r = length % self.o.bufsize
nc = int(length / self.o.bufSize)
r = length % self.o.bufSize

# read/write bufsize "nc" times
# read/write bufSize "nc" times

i = 0
while i < nc:
if self.o.timeout: alarm_set(self.o.timeout)
chunk = src.read(self.o.bufsize)
chunk = src.read(self.o.bufSize)
if chunk:
new_chunk = self.on_data(chunk)
rw_length += len(new_chunk)
Expand Down
4 changes: 2 additions & 2 deletions sarracenia/transfer/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
# options.permDefault
# options.permDirDefault
# opt options.byteRateMax
# opt options.bufsize
# opt options.bufSize


class File(Transfer):
Expand Down Expand Up @@ -197,7 +197,7 @@ def file_insert(options, msg):
fp = open(msg['relPath'], 'rb')
if msg.partflg == 'i': fp.seek(msg['offset'], 0)

ok = file_write_length(fp, msg, options.bufsize, msg.filesize, options)
ok = file_write_length(fp, msg, options.bufSize, msg.filesize, options)

fp.close()

Expand Down
Loading

0 comments on commit 05bab59

Please sign in to comment.