diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index efa23e9d2..095e8be4f 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -1120,7 +1120,7 @@ If **messageCountMax** is greater than zero, the flow will exit after processing number of messages. This is normally used only for debugging. messageRateMax (default: 0) -------------------------------------- +----------------------------------- if **messageRateMax** is greater than zero, the flow attempts to respect this delivery speed in terms of messages per second. Note that the throttle is on messages obtained or generated @@ -1128,17 +1128,19 @@ per second, prior to accept/reject filtering. the flow will sleep to limit the p messageRateMin (default: 0) -------------------------------------- +----------------------------------- if **messageRateMin** is greater than zero, and the flow detected is lower than this rate, a warning message will be produced: -message_ttl (default: None) ---------------------------------------- +messageAgeMax (default: 0) +-------------------------------------- + +The messageAgeMax option sets the maximum age of a message to not +be rejected when consuming. Messages older than Max value are discarded +by the subscriber. (0 means no maximum age) -The **message_ttl** option set the time a message can live in the queue. -Past that time, the message is taken out of the queue by the broker. mirror (default: off) ---------------------------- @@ -1405,6 +1407,16 @@ Sets the message format for posted messages. the currently included values are: When provided, this value overrides whatever can be deduced from the post_topicPrefix. +post_messageAgeMax (default: 0) +------------------------------------------- + +The post_messageAgeMax (aka **message_ttl**) option is used when publishing a message, +as advice to the broker. Brokers discard messages which have exceeded their intended lifespan +without delivering them. (0 means no maximum lifespan is given.) +When posting to AMQP, this option sets the *x-message-ttl* property (but the latter is in milliseconds.) +When posting to MQTT, this option sets the *MessageExpiryInterval* property. + + post_on_start ------------- diff --git a/docs/source/fr/Reference/sr3_options.7.rst b/docs/source/fr/Reference/sr3_options.7.rst index 4bdf09409..944427846 100644 --- a/docs/source/fr/Reference/sr3_options.7.rst +++ b/docs/source/fr/Reference/sr3_options.7.rst @@ -1116,11 +1116,12 @@ messageRateMin (défaut: 0) Si **messageRateMin** est supérieur à zéro et que le flux détecté est inférieur à ce taux, un message d´annonce sera produit : -message_ttl (défaut: None) --------------------------------------- +messageAgeMax (défaut: None) +---------------------------------------- -L’option **message_ttl** définit un temps pour lequel un message d´annonce peut vivre dans la fil d’attente. -Après ce temps, le message d´annonce est retiré de la fil d’attente par le courtier. +L’option **AgeMax** définit un temps pour lequel un message d´annonce peut attendres du +côté consommateur. Après ce temps, le message d´annonce est rejeté par le flot. +(0 indique un age infini sera accepté.) mirror (défaut: off) --------------------------- @@ -1390,6 +1391,15 @@ Définit le format de message pour les messages publiés. les valeurs actuelleme Lorsqu'elle est fournie, cette valeur remplace tout ce qui peut être déduit de post_topicPrefix. +post_messageAgeMax (défaut: 0) +----------------------------------------- + +L'option post_messageAgeMax (alias **message_ttl**) est utilisée lors de la publication d'un message, +comme conseil au courtier. ttl est un abbréviation de *time to live* (*durée de vie maximale*) Les courtiers +rejettent les messages qui ont dépassé leur durée de vie prévue sans les livrer. (0 signifie qu'aucune durée de vie maximale n'est donnée.) +Lors de la publication avec AMQP, cette option définit la propriété *x-message-ttl* (mais cette dernière est en millisecondes). +Lors de la publication avec MQTT, cette option définit la propriété *MessageExpiryInterval*. + post_on_start ------------- diff --git a/sarracenia/config.py b/sarracenia/config.py index ab9a62837..bfd3d2a4a 100755 --- a/sarracenia/config.py +++ b/sarracenia/config.py @@ -152,7 +152,8 @@ def __repr__(self) -> str: float_options = [ 'messageRateMax', 'messageRateMin' ] duration_options = [ - 'expire', 'housekeeping', 'logRotateInterval', 'message_ttl', 'fileAgeMax', 'fileAgeMin', 'metrics_writeInterval', \ + 'expire', 'housekeeping', 'logRotateInterval', 'fileAgeMax', 'fileAgeMin', + 'messageAgeMax', 'post_messageAgeMax', 'metrics_writeInterval', \ 'runStateThreshold_idle', 'runStateThreshold_lag', 'retry_ttl', 'runStateThreshold_hung', 'sleep', 'timeout', 'varTimeOffset' ] @@ -753,7 +754,8 @@ class Config: 'logRotate': 'logRotateCount', 'logRotate': 'logRotateCount', 'logRotate_interval': 'logRotateInterval', - 'message-ttl': 'message_ttl', + 'message-ttl': 'post_messageAgeMax', + 'message_ttl': 'post_messageAgeMax', 'msg_replace_new_dir' : 'pathReplace', 'msg_filter_wmo2msc_replace_dir': 'filter_wmo2msc_replace_dir', 'msg_filter_wmo2msc_uniquify': 'filter_wmo2msc_uniquify', @@ -876,6 +878,7 @@ def __init__(self, parent=None) -> 'Config': self.mirror = False self.messageAgeMax = 0 self.post_exchanges = [] + self.post_messageAgeMax = 0 #self.post_topicPrefix = None self.pstrip = False self.queueName = None diff --git a/sarracenia/flowcb/post/message.py b/sarracenia/flowcb/post/message.py index 260021941..07ee0dcfb 100755 --- a/sarracenia/flowcb/post/message.py +++ b/sarracenia/flowcb/post/message.py @@ -24,7 +24,7 @@ def __init__(self, options): props.update(self.o.dictify()) # adjust settings post_xxx to be xxx, as Moth does not use post_ ones. - for k in [ 'broker', 'exchange', 'topicPrefix', 'exchangeSplit', 'topic' ]: + for k in [ 'broker', 'exchange', 'topicPrefix', 'exchangeSplit', 'topic', 'messageAgeMax' ]: post_one='post_'+k if hasattr( self.o, post_one ): #props.update({ k: getattr(self.o,post_one) } ) diff --git a/sarracenia/moth/amqp.py b/sarracenia/moth/amqp.py index e05d3953c..22ce97484 100755 --- a/sarracenia/moth/amqp.py +++ b/sarracenia/moth/amqp.py @@ -270,8 +270,8 @@ def _queueDeclare(self,passive=False) -> int: if self.o['expire']: x = int(self.o['expire'] * 1000) if x > 0: args['x-expires'] = x - if self.o['message_ttl']: - x = int(self.o['message_ttl'] * 1000) + if self.o['messageAgeMax']: + x = int(self.o['messageAgeMax'] * 1000) if x > 0: args['x-message-ttl'] = x #FIXME: conver expire, message_ttl to proper units. @@ -673,7 +673,7 @@ def putNewMessage(self, if self.o['message_ttl']: ttl = "%d" * int( - sarracenia.durationToSeconds(self.o['message_ttl']) * 1000) + sarracenia.durationToSeconds(self.o['messageAgeMax']) * 1000) else: ttl = "0" diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index d107a46a3..2d570822a 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -455,8 +455,8 @@ def putSetup(self): self.unexpected_publishes = collections.deque() props = Properties(PacketTypes.CONNECT) - if self.o['message_ttl'] > 0: - props.MessageExpiryInterval = int(self.o['message_ttl']) + if self.o['messageAgeMax'] > 0: + props.MessageExpiryInterval = int(self.o['messageAgeMax']) self.transport = 'websockets' if (self.o['broker'].url.scheme[-2:] == 'ws' ) or \ (self.o['broker'].url.scheme[-1] == 'w' ) else 'tcp'