Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exchange split override #1041

Merged
merged 2 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions docs/source/How2Guides/FlowCallbacks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,92 @@ of the one from the message, as the original is necessary for successful upstrea
* msg['new_subtopic'] ... the subtopic hierarchy that will be encoded in the notification message for downstream consumers.


Override Fields
---------------

To change processing of messages, one can set overrides to change how built-in algorithms work.
For example:

* msg['nodupe_override'] = { 'key': ..., 'path': ... } changes how the duplicate detection operates.
* msg['topic'] ... defines the topic of a published message (instead of being calculated from other fields.)
* msg['exchangeSplitOverride'] = int ... changes how post_ExchangeSplit chooses among multiple postExchanges.



Customizing Duplicate Suppression
---------------------------------

The built-in processing for duplicates is to use the identity field as a key, and store the path as the value.
So if a file is received with the same key, and the path is already present, then it is considered a duplicate
and dropped.

In some cases, we may want only the file name to be used, so if any file with the same name is received twice,
regardless of content, then it should be considered a duplicate and dropped. This is useful when multiple systems
are producing the same products, but they are not bitwise identical. The built-in flowcb that implements
that functionality is below::


import logging
from sarracenia.flowcb import FlowCB

logger = logging.getLogger(__name__)


class Name(FlowCB):
"""
Override the the comparison so that files with the same name,
regardless of what directory they are in, are considered the same.
This is useful when receiving data from two different sources (two different trees)
and winnowing between them.
"""
def after_accept(self, worklist):
for m in worklist.incoming:
if not 'nodupe_override' in m:
m['_deleteOnPost'] \|= set(['nodupe_override'])
m['nodupe_override'] = {}

m['nodupe_override']['path'] = m['relPath'].split('/')[-1]
m['nodupe_override']['key'] = m['relPath'].split('/')[-1]


Customizing post_exchangeSplit
------------------------------

The exchangeSplit function allows a single flow to send outputs to different exchanges,
numbered 1...n to provide load distribution. The built-in processing does this in a
fixed way based on the hash of the identify field. The purpose of exchangeSplit is to
allow a common set of downstream paths to receive a subset of the total flow, and for
products with similar "routing" to land on the same downstream node. For example, a file
with a given checksum, for winnowing to work, has to land on the same downstream node.

It could be that, rather than using a checksum, one would prefer to use some other
method to decide which exchange is used::

import logging
from sarracenia.flowcb import FlowCB
import hashlib
logger = logging.getLogger(__name__)


class Distbydir(FlowCB):
"""
Override the use of the identity field so that products can be grouped by directory in the relPath
This ensures that all products received from the same directory get posted to the same
exchange when post_exchangeSplit is active.
"""
def after_accept(self, worklist):
for m in worklist.incoming:
m['_deleteOnPost'] |= set(['exchangeSplitOverride'])
m['exchangeSplitOverride'] = int(hashlib.md5(m['relPath'].split(os.sep)[-2]).hexdigest()[0])


This routine sets the exchangeSplitOverride field, which needs to be an integer
that will be used to pick which of the n exchanges in the post_exchangeSplit
exchanges defined. This routine calculates a checksum of the directory
containing the file and then converts the first character of that checksum
to an integer. If the directory is the same, the exchange chosen will be the same.


Sample Flowcb Sub-Class
-----------------------

Expand Down
101 changes: 101 additions & 0 deletions docs/source/fr/CommentFaire/FlowCallbacks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,107 @@ Autres entry_points, extraits de sarracenia/flowcb/__init__.py ::

def on_stop(self):

new_* Champs
------------

Pendant le traitement des messages de notification, les valeurs des champs standard d'origine restent généralement inchangées (telles que lues).
Pour modifier les champs des messages de notification transmis aux consommateurs en aval, on modifie plutôt new_field
de celui du message, car l'original est nécessaire pour une récupération réussie en amont
:

* msg['new_baseUrl'] ... baseUrl à transmettre aux consommateurs en aval.

* msg['new_dir'] ... le répertoire dans lequel un fichier sera téléchargé ou envoyé.

* msg['new_file'] .... nom final du fichier à écrire.

* msg['new_inflight_path'] ... nom calculé du fichier temporaire à écrire avant de le renommer en msg['new_file'] ... ne pas modifier manuellement.

* msg['new_relPath'] ... calculé à partir de 'new_baseUrl', 'post_baseDir', 'new_dir', 'new_file'... ne pas modifier manuellement.

* msg['post_version'] ... le format d'encodage du message à poster (à partir des paramètres)

* msg['new_subtopic'] ... la hiérarchie des sous-thèmes qui sera codée dans le message de notification destiné aux consommateurs en aval.

Les champs override
-------------------

Pour modifier le traitement des messages, on peut définir des remplacements pour modifier le fonctionnement des algorithmes intégrés.
Par exemple:

* msg['nodupe_override'] = { 'key': ..., 'path': ... } modifie le fonctionnement de la détection des doublons.
* msg['topic'] ... définit le sujet d'un message publié (au lieu d'être calculé à partir d'autres champs.)
* msg['exchangeSplitOverride'] = int ... change la façon dont post_ExchangeSplit choisit parmi plusieurs postExchanges


Personnalisation de la suppression des doublons
---------------------------------

Le traitement intégré des doublons consiste à utiliser le champ d'identité comme clé et à stocker le chemin (path) comme valeur.
Ainsi, si un fichier est reçu avec la même clé et que le path est déjà présent, il est alors considéré comme un doublon.
et laissé tomber.

Dans certains cas, nous pouvons souhaiter que seul le nom du fichier soit utilisé, donc si un fichier portant le même nom est reçu deux fois,
quel que soit le contenu, il doit alors être considéré comme un doublon et supprimé. Ceci est utile lorsque plusieurs systèmes
produisent les mêmes produits, mais ils ne sont pas identiques au niveau des bits. Le flowcb intégré qui implémente
cette fonctionnalité est ci-dessous ::


import logging
from sarracenia.flowcb import FlowCB

logger = logging.getLogger(__name__)


class Name(FlowCB):
"""
Remplacez la comparaison afin que les fichiers portant le même nom, quel que soit
le répertoire dans lequel ils se trouvent, sont considérés comme identiques.
Ceci est utile lors de la réception de données provenant de deux sources différentes
(deux arbres différents) et vanner entre eux.
"""
def after_accept(self, worklist):
for m in worklist.incoming:
if not 'nodupe_override' in m:
m['_deleteOnPost'] \|= set(['nodupe_override'])
m['nodupe_override'] = {}

m['nodupe_override']['path'] = m['relPath'].split('/')[-1]
m['nodupe_override']['key'] = m['relPath'].split('/')[-1]



Personnalisation de post_exchangeSplit
-------------------------------

La fonction ExchangeSplit permet à un seul flux d'envoyer des sorties à différents échanges,
numérotés 1...n pour assurer la répartition de la charge. Le traitement intégré le fait de manière
manière fixe basée sur le hachage du champ d'identification. Le but d'exchangeSplit est de
permettre à un ensemble commun de chemins en aval de recevoir un sous-ensemble du flux total, et pour
les produits avec un « routage » similaire atterrissent sur le même nœud en aval. Par exemple, un fichier
avec une somme de contrôle donnée, pour que le vannage fonctionne, il doit atterrir sur le même nœud en aval.

Il se pourrait que, plutôt que d'utiliser une somme de contrôle, on préfère utiliser une autre somme de contrôle.
méthode pour décider quel échange est utilisé::


import logging
from sarracenia.flowcb import FlowCB
import hashlib
logger = logging.getLogger(__name__)


class Distbydir(FlowCB):
"""
Remplacer l'utilisation du champ d'identité afin que les produits puissent
être regroupés par répertoire dans le relPath. Cela garantit que tous les produits
reçus du même répertoire sont publiés dans le même exchange lorsque post_exchangeSplit est actif.
"""
def after_accept(self, worklist):
for m in worklist.incoming:
m['_deleteOnPost'] |= set(['exchangeSplitOverride'])
m['exchangeSplitOverride'] = int(hashlib.md5(m['relPath'].split(os.sep)[-2]).hexdigest()[0])



Exemple de sous-classe Flowcb
Expand Down
34 changes: 34 additions & 0 deletions sarracenia/flowcb/accept/distbydir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import logging
from sarracenia.flowcb import FlowCB
import hashlib
logger = logging.getLogger(__name__)


class Distbydir(FlowCB):
"""
Override the hash of the identity field so that products can be grouped by directory in the relPath
this ensures that all products received from the same directory get posted to the same
exchange when post_exchangeSplit is active.

settings:

python path index 0 1 2 3
msg['relPath'] = my_favourite/second44/third33/thefile.txt

distbydir_offset = -2 ... hashes "third33"
distbydir_offset = 1 ... hashes "second44"

can pick, using python indexing, any element of the path.
"""
def __init__(self, options):
super().__init__(options,logger)

# setting it to -2 means the last directory in a path.
self.o.add_option( 'distbydir_offset', 'count', -2 )

def after_accept(self, worklist):
for m in worklist.incoming:
m['_deleteOnPost'] |= set(['exchangeSplitOverride'])
m['exchangeSplitOverride'] = int(hashlib.md5(m['relPath'].split('/')[self.o.distbydir_offset]).hexdigest()[0])


2 changes: 1 addition & 1 deletion sarracenia/moth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def newMessages(self) -> list:
logger.error("NewMessages unimplemented")
return []

def putNewMessage(self, message:sarracenia.Message, content_type: str ='application/json') -> bool:
def putNewMessage(self, message:sarracenia.Message, content_type: str ='application/json', exchange: str = None) -> bool:
"""
publish a message as set up to the given topic.

Expand Down
7 changes: 5 additions & 2 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,9 +689,12 @@ def putNewMessage(self,
if ( 'exchangeSplit' in self.o) and self.o['exchangeSplit'] > 1:
# FIXME: assert ( len(self.o['exchange']) == self.o['post_exchangeSplit'] )
# if that isn't true... then there is something wrong... should we check ?
idx = sum(
bytearray(body['identity']['value'],
if 'exchangeSplitOverride' in message:
idx = int(message['exchangeSplitOverride'])%len(self.o['exchange'])
else:
idx = sum( bytearray(body['identity']['value'],
'ascii')) % len(self.o['exchange'])

exchange = self.o['exchange'][idx]
else:
logger.error(
Expand Down
6 changes: 4 additions & 2 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,10 @@ def putNewMessage(self,
if 'post_exchangeSplit' in self.o:
# FIXME: assert ( len(self.o['exchange']) == self.o['post_exchangeSplit'] )
# if that isn't true... then there is something wrong... should we check ?
idx = sum(
bytearray(body['identity']['value'],
if 'exchangeSplitOverride' in message:
idx = int(message['exchangeSplitOverride'])%len(self.o['exchange'])
else:
idx = sum(bytearray(body['identity']['value'],
'ascii')) % len(self.o['exchange'])
exchange = self.o['exchange'][idx]
else:
Expand Down
Loading