Skip to content

Commit

Permalink
Merge branch 'main' into v03_wip
Browse files Browse the repository at this point in the history
  • Loading branch information
petersilva committed Jun 19, 2023
2 parents c4d3207 + 74ac84a commit 2ca1853
Show file tree
Hide file tree
Showing 25 changed files with 368 additions and 78 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow_basic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
runs-on: ${{ matrix.osver }}

name: Maintenance test on ${{ matrix.osver }}
timeout-minutes: 30
timeout-minutes: 20

steps:
- uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/flow_redis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
runs-on: ${{ matrix.osver }}

name: ${{ matrix.which_test }} test on ${{ matrix.osver }}
timeout-minutes: 30
timeout-minutes: 40

steps:
- uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
runs-on: ${{ matrix.osver }}

name: Unit test on ${{ matrix.osver }}
timeout-minutes: 30
timeout-minutes: 10

steps:
- uses: actions/checkout@v3
Expand Down
10 changes: 10 additions & 0 deletions docs/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@


run sphinx::

make html


prerequisites::

apt install python3-sphinx python3-nbsphinx python3-sphinx-rtd-theme
1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Sphinx
magic
nbsphinx
jupyter
docutils
Expand Down
31 changes: 31 additions & 0 deletions docs/source/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1328,12 +1328,34 @@ will result in posting messages to five exchanges named: xwinnow00, xwinnow01,
xwinnow02, xwinnow03 and xwinnow04, where each exchange will receive only one fifth
of the total flow.

post_format <name> (default: v03)
---------------------------------

Sets the message format for posted messages. the currently included values are:

* v02 ... used by all existing data pumps for most cases.
* v03 ... default in sr3 JSON format easier to work with.
* wis ... a experimental geoJSON format in flux for the World Meteorological Organization

When provided, this value overrides whatever can be deduced from the post_topicPrefix.


post_on_start
-------------

When starting watch, one can either have the program post all the files in the directories watched
or not. (not implemented in sr3_cpost)

post_topic <string>
---------------------

Explicitly set a posting topic string, overriding the usual
group of settings. For sarracenia data pumps, this should never be needed,
as the use of post_exchange, post_topicPrefix, and relpath normally builds the right
value for topics for both posting and binding.



post_topicPrefix (default: topicPrefix)
---------------------------------------

Expand Down Expand Up @@ -1721,6 +1743,15 @@ it is nevertheless necessary to use it, then set tlsRigour to *lax* and
the connection should succeed regardless.


topic <string>
--------------

Explicitly set a subscribing topic string, overriding the value usually
derived from a group of settings. For sarracenia data pumps, this should never be needed,
as the use of *exchange*, *topicPrefix*, and *subtopic* normally builds the right
value.


topicPrefix (default: v03)
--------------------------

Expand Down
1 change: 1 addition & 0 deletions docs/source/Reference/sr_post.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ The headers are an array of name:value pairs::
"encoding" : "utf-8" | "base64" ,
"value" " "encoded file content"
}
"contentType" : "string" - MIME-type information referring to the data.

For "v03.report" topic notification messages the following addtional
headers will be present:
Expand Down
29 changes: 29 additions & 0 deletions docs/source/fr/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1312,12 +1312,33 @@ entraînera la publication de messages d'annonce sur cinq échanges nommés : xw
xwinnow02, xwinnow03 et xwinnow04, où chaque échange ne recevra qu’un cinquième
du flux total.

post_format <name> (défaut: v03)
--------------------------------

Définit le format de message pour les messages publiés. les valeurs actuellement incluses sont :

* v02 ... utilisé par toutes les pompes de données existantes dans la plupart des cas.
* v03 ... par défaut au format sr3 JSON plus facile à utiliser.
* wis ... un format expérimental geoJSON en flux pour l'Organisation météorologique mondiale

Lorsqu'elle est fournie, cette valeur remplace tout ce qui peut être déduit de post_topicPrefix.


post_on_start
-------------

Lors du démarrage de watch, on peut soit demander au programme de publier tous les fichiers dans les répertoires
surveillés, ou pas. (pas implanté en sr3_cpost)

post_topic <chaine>
-------------------

Définissez explicitement une chaîne de sujet de publication, en remplaçant l'habituel
groupe de paramètres. Pour les pompes de données Sarracenia, cela ne devrait jamais être nécessaire,
car l'utilisation de *post_exchange*, *post_topicPrefix* et le *relPath* construit normalement le bon
valeur pour les sujets à la fois pour la publication et la liaison.


post_topicPrefix (défaut: topicPrefix)
--------------------------------------

Expand Down Expand Up @@ -1688,6 +1709,14 @@ Par exemple, si on se connecte à un site et que son certificat est expiré, et
qu'il est quand même nécessaire de l’utiliser, alors définir tlsRigour a *lax* pourra
permettre la connexion de réussir.

topic <chaine>
--------------

Définissez explicitement une chaîne de sujet d'abonnement ou de publication, en remplaçant la valeur
dériver à partir de l'habituel groupe de paramètres. Pour les pompes de données Sarracenia, cela ne
devrait jamais être nécessaire, car l'utilisation de l'*exchange*, *topicPrefix* et *subtopic*
construit normalement le bon valeur.

topicPrefix (défaut: v03)
-------------------------

Expand Down
1 change: 1 addition & 0 deletions docs/source/fr/Reference/sr_post.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ Les en-têtes sont un tableau de paires nom:valeur::
"message" : - message de rapport d’état documenté dans `Report Messages`_
}

"contentType" : "chaine mime-type" - indique le format des données.
"type": "Feature" - utilisé pour la compatibilité geoJSON.
"geometry" : ... selon la compatibilité GoJSON RFC7946.

Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Sphinx
python-magic
paramiko
nbsphinx
jupyter
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ appdirs
humanfriendly
humanize
jsonpickle
python-magic
paramiko
psutil>=5.3.0
watchdog
15 changes: 13 additions & 2 deletions sarracenia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import datetime
import importlib.util
import logging
import magic
import os
import os.path
import paramiko
Expand Down Expand Up @@ -467,8 +468,18 @@ def fromFileData(path, o, lstat=None):
returns a well-formed message, or none.
"""
m = sarracenia.Message.fromFileInfo(path, o, lstat)
if lstat and os_stat.S_ISREG(lstat.st_mode):
m.__computeIntegrity(path, o)
if lstat :
if os_stat.S_ISREG(lstat.st_mode):
m.__computeIntegrity(path, o)
try:
t = magic.from_file(path,mime=True)
m['contentType'] = t
except Exception as ex:
logging.info("trying to determine mime-type. Exception details:", exc_info=True)
elif os_stat.S_ISDIR(lstat.st_mode):
m['contentType'] = 'text/directory' # source: https://www.w3.org/2002/12/cal/rfc2425.html
elif os_stat.S_ISLNK(lstat.st_mode):
m['contentType'] = 'text/link' # I invented this one, could not find any reference
return m

@staticmethod
Expand Down
6 changes: 3 additions & 3 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def __call__(self, parser, namespace, values, option_string=None):
'sanity_log_dead', 'sleep', 'timeout', 'varTimeOffset'
]

list_options = ['path']
list_options = ['path' ]

# set, valid values of the set.
set_options = [ 'logEvents', 'fileEvents' ]
Expand All @@ -144,9 +144,9 @@ def __call__(self, parser, namespace, values, option_string=None):
'action', 'admin', 'baseDir', 'broker', 'cluster', 'directory', 'exchange',
'exchange_suffix', 'feeder', 'filename', 'flatten', 'flowMain', 'header', 'integrity', 'logLevel',
'pollUrl', 'post_baseUrl', 'post_baseDir', 'post_broker', 'post_exchange',
'post_exchangeSuffix', 'post_format', 'queueName', 'sendTo', 'rename',
'post_exchangeSuffix', 'post_format', 'post_topic', 'queueName', 'sendTo', 'rename',
'report_exchange', 'source', 'strip', 'timezone', 'nodupe_ttl',
'nodupe_basis', 'tlsRigour', 'vip'
'nodupe_basis', 'tlsRigour', 'topic', 'vip'
]
"""
for backward compatibility,
Expand Down
30 changes: 25 additions & 5 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import importlib
import logging
import magic
import os
import re

Expand All @@ -25,6 +26,7 @@

from base64 import b64decode, b64encode
from mimetypes import guess_type

# end v2 subscriber

from sarracenia import nowflt
Expand Down Expand Up @@ -178,8 +180,8 @@ def __init__(self, cfg=None):

if (( hasattr(self.o, 'delete_source') and self.o.delete_source ) or \
( hasattr(self.o, 'delete_destination') and self.o.delete_destination )) and \
('sarracenia.flowcb.work.delete.Delete' not in self.plugins_late['load']):
self.plugins_late['load'].append('sarracenia.flowcb.work.delete.Delete')
('sarracenia.flowcb.work.delete.Delete' not in self.o.plugins_late):
self.o.plugins_late.append('sarracenia.flowcb.work.delete.Delete')

# transport stuff.. for download, get, put, etc...
self.scheme = None
Expand Down Expand Up @@ -484,9 +486,13 @@ def run(self):
for m in self.worklist.ok:
if ('new_baseUrl' in m) and (m['baseUrl'] !=
m['new_baseUrl']):
m['old_baseUrl'] = m['baseUrl']
m['_deleteOnPost'] |= set(['old_baseUrl'])
m['baseUrl'] = m['new_baseUrl']
if ('new_retrievePath' in m) :
m['old_retrievePath'] = m['retrievePath']
m['retrievePath'] = m['new_retrievePath']
m['_deleteOnPost'] |= set(['old_retrievePath'])

# if new_file does not match relPath, then adjust relPath so it does.
if 'relPath' in m and m['new_file'] != m['relPath'].split('/')[-1]:
Expand All @@ -502,11 +508,18 @@ def run(self):
m['new_relPath'] = m['new_file']

if ('new_relPath' in m) and (m['relPath'] != m['new_relPath']):
m['old_relPath'] = m['relPath']
m['_deleteOnPost'] |= set(['old_relPath'])
m['relPath'] = m['new_relPath']
m['old_subtopic'] = m['subtopic']
m['_deleteOnPost'] |= set(['old_subtopic'])
m['subtopic'] = m['new_subtopic']
if ('_format' in m) and ( m['_format'] !=
m['post_format']):
m['_format'] = m['post_format']

if '_format' in m:
m['old_format'] = m['_format']
m['_deleteOnPost'] |= set(['old_format'])
m['_format'] = m['post_format']


self._runCallbacksWorklist('after_work')

Expand Down Expand Up @@ -1823,6 +1836,9 @@ def download(self, msg, options) -> bool:
if os.path.isfile(new_file):
os.remove(new_file)
os.rename(new_inflight_path, new_file)
# older versions don't include the contentType, so patch it here.
if 'contentType' not in msg:
msg['contentType'] = magic.from_file(new_file,mime=True)
elif len_written < 0:
logger.error("failed to download %s" % new_file)
return False
Expand Down Expand Up @@ -1934,6 +1950,10 @@ def send(self, msg, options):
else:
local_path = '/' + msg['relPath']

# older versions don't include the contentType, so patch it here.
if 'contentType' not in msg:
m['contentType'] = magic.from_file(new_file,mime=True)

local_dir = os.path.dirname(local_path).replace('\\', '/')
local_file = os.path.basename(local_path).replace('\\', '/')
new_dir = msg['new_dir'].replace('\\', '/')
Expand Down
24 changes: 24 additions & 0 deletions sarracenia/flowcb/accept/trim_legacy_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""
Plugin :
This plugin is aesthetic... some users want less cluttered messages, remove fields
that are not needed in the non-file replication case, and where sundew_compatibility
is irrelevant.
Usage:
callback accept.trim_legacy_fields
"""

import logging
import os, stat, time
from sarracenia.flowcb import FlowCB

logger = logging.getLogger(__name__)


class Trim_legacy_fields(FlowCB):

def after_accept(self, worklist):
for message in worklist.incoming:
for h in [ 'atime', 'filename', 'from_cluster', 'mtime', 'source', 'sundew_extension', 'to_clusters' ]:
if h in message:
del message[h]
20 changes: 11 additions & 9 deletions sarracenia/flowcb/post/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@ def __init__(self, options):
if hasattr(self.o, 'post_broker'):
props = sarracenia.moth.default_options
props.update(self.o.dictify())
props.update({
'broker': self.o.post_broker,
'exchange': self.o.post_exchange,
'topicPrefix': self.o.post_topicPrefix,
})
if hasattr(self.o, 'post_exchangeSplit'):
props.update({
'exchangeSplit': self.o.post_exchangeSplit,
})

if hasattr(self.o, 'topic' ):
del self.o['topic']

# adjust settings post_xxx to be xxx, as Moth does not use post_ ones.
for k in [ 'broker', 'exchange', 'topicPrefix', 'exchangeSplit', 'topic' ]:
post_one='post_'+k
if hasattr( self.o, post_one ):
#props.update({ k: getattr(self.o,post_one) } )
props[ k ] = getattr(self.o,post_one)

self.poster = sarracenia.moth.Moth.pubFactory(props)

def post(self, worklist):
Expand Down
Loading

0 comments on commit 2ca1853

Please sign in to comment.