Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue1022 #1024

Merged
merged 11 commits into from
Apr 26, 2024
17 changes: 16 additions & 1 deletion docs/source/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,7 @@ NOTE::
whereas the expression: .*GIF matches the entire name.

sourceFromExchange <flag> (default: off)
------------------------------------------
----------------------------------------

The **sourceFromExchange** option is mainly for use by administrators.
If messages received are posted directly from a source, the exchange used
Expand All @@ -1720,6 +1720,21 @@ It is commonly combined with::
To have data arrive in the standard format tree.


sourceFromMessage <flag> (default: off)
---------------------------------------

The **sourceFromMessage** option is mainly for use by administrators.
Normally the *source* field from an inbound message is ignored.
When this option is set, the field in the message is accepted and used
for processing. (overrides *source*, and *sourceFromExchange* )

It defaults to off because malicious messages can misrepresent data
origin. To be used only with flows of responsibly curated, trustable
message flows.




subtopic <amqp pattern> (default: #)
------------------------------------

Expand Down
12 changes: 12 additions & 0 deletions docs/source/fr/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1693,6 +1693,18 @@ les rapports à l’origine des données injectées. Cela est généralement com

Pour que les données arrivent dans l’arborescence de format standard.

sourceFromMessage <flag> (défaut: off)
--------------------------------------

L'option **sourceFromMessage** est principalement destinée aux administrateurs.
Normalement, le champ *source* d'un message entrant est ignoré.
Lorsque cette option est définie, le champ du message est accepté et utilisé
pour le traitement. (remplace *source* et *sourceFromExchange* )

Il est désactivé par défaut car les messages malveillants peuvent déformer les données
origine. À utiliser uniquement avec des flux de données fiables et organisés de manière responsable.


subtopic <modèle amqp> (défaut: #)
-----------------------------------

Expand Down
13 changes: 7 additions & 6 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ def __repr__(self) -> str:
'retry_refilter': False,
'sanity_log_dead': 9999,
'sourceFromExchange': False,
'sourceFromMessage': False,
'sundew_compat_regex_first_match_is_zero': False,
'sourceFromExchange': False,
'topicCopy': False,
'topicCopy': False
'v2compatRenameDoublePost': False,
'varTimeOffset': 0
}
Expand All @@ -137,7 +137,7 @@ def __repr__(self) -> str:
'messageDebugDump', 'mirror', 'timeCopy', 'notify_only', 'overwrite', 'post_on_start', \
'permCopy', 'persistent', 'queueBind', 'queueDeclare', 'randomize', 'recursive', 'realpathPost', \
'reconnect', 'report', 'reset', 'retry_refilter', 'retryEmptyBeforeExit', 'save', 'sundew_compat_regex_first_match_is_zero', \
'sourceFromExchange', 'statehost', 'topicCopy', 'users', 'v2compatRenameDoublePost'
'sourceFromExchange', 'sourceFromMessage', 'statehost', 'topicCopy', 'users', 'v2compatRenameDoublePost' \
]

float_options = [ ]
Expand Down Expand Up @@ -1847,9 +1847,10 @@ def finalize(self, component=None, config=None):
if hasattr(self, 'nodupe_basis'):
if self.nodupe_basis == 'data':
self.plugins_early.append( 'nodupe.data' )
delattr( self, 'nodupe_basis' )
elif self.nodupe_basis == 'name':
self.plugins_early.append( 'nodupe.name' )
delattr( self, 'nodupe_basis' )
delattr( self, 'nodupe_basis' )

# FIXME: note that v2 *user_cache_dir* is, v3 called: cfg_run_dir
if config[-5:] == '.conf':
Expand Down Expand Up @@ -2757,7 +2758,7 @@ def cfglogs(cfg_preparse, component, config, logLevel, child_inst):
except Exception as ex:
logging.error( "makedirs {} failed err={}".format(os.path.dirname(metricsfilename),ex))
logging.debug("Exception details:", exc_info=True)
os.sleep(1)
time.sleep(0.1)

cfg_preparse.metricsFilename = metricsfilename

Expand All @@ -2773,7 +2774,7 @@ def cfglogs(cfg_preparse, component, config, logLevel, child_inst):
except Exception as ex:
logging.error( "makedirs {} failed err={}".format(os.path.dirname(logfilename),ex))
logging.debug("Exception details:", exc_info=True)
os.sleep(1)
time.sleep(0.1)

log_format = '%(asctime)s [%(levelname)s] %(name)s %(funcName)s %(message)s'
if logging.getLogger().hasHandlers():
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2081,7 +2081,7 @@ def download(self, msg, options) -> bool:
remote_file)
logger.debug('Exception details: ', exc_info=True)

if (block_length == 0) and (len_written > 0):
if (self.o.acceptSizeWrong or (block_length == 0)) and (len_written > 0):
return True

if (len_written != block_length):
Expand Down
46 changes: 46 additions & 0 deletions sarracenia/flowcb/accept/sftp_absolute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/python3
"""
Description:
SR3 plugin that appends / to SFTP baseUrl's that have no directory on the end.

In v2 all paths were absolute, in sr3 they are relative. adding this plugin makes
sr3 behave like v2 when processing sftp urls.

the value of msg['baseUrl'] in messages is changed:

sftp://user@host --> sftp://user@host/

so a subscriber will with a message relPath=a/b/c will download from /a/b/c as it would in v2.
Without this modification, sr3 would download from ~/a/b/c on the remote.

Usage:
flowcb accept.sftp_absolute

"""

import logging
from sarracenia.flowcb import FlowCB
import urllib.parse

logger = logging.getLogger(__name__)

class Sftp_absolute(FlowCB):

def __init__(self, options) :
super().__init__(options,logger)

def after_accept(self, worklist):

for msg in worklist.incoming:

if msg['baseUrl'].startswith('sftp:'):
u = urllib.parse.urlparse(msg['baseUrl'])
if u.path == '':
msg['baseUrl'] += '/'
logger.info( f"appended / {msg['baseUrl']}")
elif u.path[0] != '/':
msg['baseUrl'] = u.scheme + '://' + u.netloc + '/' + u.path
logger.info( f"prepended / {msg['baseUrl']}")
continue
logger.info( "no baseUrl change")

2 changes: 1 addition & 1 deletion sarracenia/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def start(self):
except Exception as ex:
logging.error( "makedirs {} failed err={}".format(os.path.dirname(logfilename),ex))
logging.debug("Exception details:", exc_info=True)
os.sleep(1)
time.sleep(0.1)

log_format = '%(asctime)s [%(levelname)s] %(name)s %(funcName)s %(message)s'
if logging.getLogger().hasHandlers():
Expand Down
7 changes: 4 additions & 3 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,17 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message:
source=None
if 'source' in self.o:
source = self.o['source']
elif self.o['sourceFromExchange']:
elif 'sourceFromExchange' in self.o and self.o['sourceFromExchange']:
itisthere = re.match( "xs_([^_]+)_.*", msg['exchange'] )
if itisthere:
source = itisthere[1]
else:
itisthere = re.match( "xs_([^_]+)", msg['exchange'] )
if itisthere:
source = itisthere[1]

if source:
if 'source' in msg and 'sourceFromMessage' in self.o and self.o['sourceFromMessage']:
pass
elif source:
msg['source'] = source
msg['_deleteOnPost'] |= set(['source'])

Expand Down
8 changes: 7 additions & 1 deletion sarracenia/sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _launch_instance(self, component_path, c, cfg, i):
except Exception as ex:
logging.error( "makedirs {} failed err={}".format(os.path.dirname(lfn),ex))
logging.debug("Exception details:", exc_info=True)
os.sleep(1)
time.sleep(0.1)

if c in [ 'flow',
'poll', 'post', 'report', 'sarra', 'sender', 'shovel',
Expand Down Expand Up @@ -2656,6 +2656,12 @@ def convert(self):
with open(v3_config_path, 'w') as v3_cfg:
v3_cfg.write( f'# created by: sr3 convert {cfg}\n')

if component in [ 'sarra', 'subscribe' ]:
v3_cfg.write('#v2 sftp handling is always absolute, sr3 is relative. This plugin helps during conversion, remove when all sr3:\n')
v3_cfg.write('flowcb accept.sftp_absolute\n')
if component in [ 'sender' ]:
v3_cfg.write('#v2 sftp handling is always absolute, sr3 is relative. might need this, remove when all sr3:\n')
v3_cfg.write('#flowcb accept.sftp_absolute\n')
if component in [ 'shovel', 'winnow' ]:
v3_cfg.write('# topicCopy on is only there for bug-for-bug compat with v2. turn it off if you can.\n')
v3_cfg.write('topicCopy on\n')
Expand Down
3 changes: 2 additions & 1 deletion sarracenia/transfer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ def read_writelocal(self,

# warn if length mismatch without transformation.
# 2022/12/02 - pas should see a lot of these messages in HPC case from now on...
if length != 0 and rw_length != length:

if not self.o.acceptSizeWrong and length != 0 and rw_length != length:
logger.warning(
"util/writelocal mismatched file length writing %s. Message said to expect %d bytes. Got %d bytes."
% (local_file, length, rw_length))
Expand Down
Loading