diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index 7e5e948b0..f390e4b24 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -1826,6 +1826,15 @@ When set in a posting component, it has the effect of eliding the *atime* and *m headers from the messages. +topicCopy (default: off) +------------------------ + +Setting *topicCopy* to true tells sarracenia pass topics through unaltered. +Sarracenia has a convention for how topics for products should be organized. There is +a topicPrefix, followed by subtopics derived from the *relPath* field of the message. +Some networks may choose to use different topic conventions, external to sarracenia. + + timeout (default: 0) ------------------------------- diff --git a/docs/source/fr/Reference/sr3_options.7.rst b/docs/source/fr/Reference/sr3_options.7.rst index 260e96a68..c0941c127 100644 --- a/docs/source/fr/Reference/sr3_options.7.rst +++ b/docs/source/fr/Reference/sr3_options.7.rst @@ -1840,6 +1840,14 @@ rajouté au subtopic pour former une hiérarchie complète de thèmes (topics). Cette option s’applique aux liaisons d’abonnement. Indique la version des messages d'annonce reçus dans les subtopics. (V03 fait référence à ``_) +topicCopy (défaut: False) +------------------------- + +Définir *topicCopy* à *true* indique à sarracenia de transmettre les *topic* des messages sans modification. +Sarracenia a une convention sur la manière dont les *topic* des produits sont organisés. Il y a +un *topicPrefix*, suivi de *subtopic* (sous-thèmes) dérivés du champ *relPath* du message. +Certains réseaux peuvent choisir d'utiliser des conventions thématiques différentes, externes à la sarracenia. + users (défaut: false) ---------------------------- diff --git a/sarracenia/config.py b/sarracenia/config.py index 34f7a6896..edbb9cdaf 100755 --- a/sarracenia/config.py +++ b/sarracenia/config.py @@ -118,6 +118,7 @@ def __repr__(self) -> str: 'sourceFromExchange': False, 'sundew_compat_regex_first_match_is_zero': False, 'sourceFromExchange': False, + 'topicCopy': False, 'v2compatRenameDoublePost': False, 'varTimeOffset': 0 } @@ -136,7 +137,7 @@ def __repr__(self) -> str: 'messageDebugDump', 'mirror', 'timeCopy', 'notify_only', 'overwrite', 'post_on_start', \ 'permCopy', 'persistent', 'queueBind', 'queueDeclare', 'randomize', 'recursive', 'realpathPost', \ 'reconnect', 'report', 'reset', 'retry_refilter', 'retryEmptyBeforeExit', 'save', 'sundew_compat_regex_first_match_is_zero', \ - 'sourceFromExchange', 'statehost', 'users', 'v2compatRenameDoublePost' + 'sourceFromExchange', 'statehost', 'topicCopy', 'users', 'v2compatRenameDoublePost' ] float_options = [ ] diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index e732ba2fc..6ac0d4ddb 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -61,6 +61,7 @@ 'messageRateMin': 0, 'sleep': 0.1, 'topicPrefix': ['v03'], + 'topicCopy': False, 'vip': [] } diff --git a/sarracenia/flow/shovel.py b/sarracenia/flow/shovel.py index db592dd6c..8b33b37ab 100644 --- a/sarracenia/flow/shovel.py +++ b/sarracenia/flow/shovel.py @@ -3,7 +3,10 @@ logger = logging.getLogger(__name__) -default_options = {'acceptUnmatched': True, 'nodupe_ttl': 0} +default_options = { + 'acceptUnmatched': True, + 'nodupe_ttl': 0, +} class Shovel(Flow): diff --git a/sarracenia/flow/winnow.py b/sarracenia/flow/winnow.py index ff0fcfbe9..70489dc12 100644 --- a/sarracenia/flow/winnow.py +++ b/sarracenia/flow/winnow.py @@ -3,7 +3,10 @@ logger = logging.getLogger(__name__) -default_options = {'acceptUnmatched': True, 'nodupe_ttl': 300} +default_options = { + 'acceptUnmatched': True, + 'nodupe_ttl': 300, +} class Winnow(Flow): diff --git a/sarracenia/moth/amqp.py b/sarracenia/moth/amqp.py index a3a784351..d9ba98704 100755 --- a/sarracenia/moth/amqp.py +++ b/sarracenia/moth/amqp.py @@ -154,7 +154,29 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message: msg['source'] = source msg['_deleteOnPost'] |= set(['source']) - msg['subtopic'] = topic.split('.')[len(self.o['topicPrefix']):] + msg_topic = topic.split('.') + + # topic validation... deal with DMS topic scheme. https://github.com/MetPX/sarracenia/issues/1017 + if 'topicCopy' in self.o and self.o['topicCopy']: + topicOverride=True + else: + topicOverride=False + if 'relPath' in msg: + path_topic = self.o['topicPrefix'] + os.path.dirname(msg['relPath']).split('/') + + if msg_topic != path_topic: + topicOverride=True + + # set subtopic if possible. + if msg_topic[0:len(self.o['topicPrefix'])] == self.o['topicPrefix']: + msg['subtopic'] = msg_topic[len(self.o['topicPrefix']):] + else: + topicOverride=True + + if topicOverride: + msg['topic'] = topic + msg['_deleteOnPost'] |= set( ['topic'] ) + msg['ack_id'] = raw_msg.delivery_info['delivery_tag'] msg['local_offset'] = 0 msg['_deleteOnPost'] |= set( ['ack_id', 'exchange', 'local_offset', 'subtopic']) diff --git a/sarracenia/sr.py b/sarracenia/sr.py index 890010f9c..6f759212f 100755 --- a/sarracenia/sr.py +++ b/sarracenia/sr.py @@ -2656,6 +2656,11 @@ def convert(self): pos_args_present=False with open(v3_config_path, 'w') as v3_cfg: v3_cfg.write( f'# created by: sr3 convert {cfg}\n') + + if component in [ 'shovel', 'winnow' ]: + v3_cfg.write('# topicCopy on is only there for bug-for-bug compat with v2. turn it off if you can.\n') + v3_cfg.write('topicCopy on\n') + with open(v2_config_path, 'r') as v2_cfg: for line in v2_cfg.readlines(): if len(line.strip()) < 1: