Skip to content

Commit

Permalink
The Hearty eXpert Issue1138 - documenting messageAgeMax (#1147)
Browse files Browse the repository at this point in the history
* implement messageAgeMax and post_ properly

* messageAgeMax documentation English

* French documentation of messageAgeMax settings

* clarifying message propoerties set by post_messageAgeMax for each message protocol
  • Loading branch information
petersilva committed Aug 16, 2024
1 parent ef6676e commit 292a5f8
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 18 deletions.
24 changes: 18 additions & 6 deletions docs/source/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1137,25 +1137,27 @@ If **messageCountMax** is greater than zero, the flow will exit after processing
number of messages. This is normally used only for debugging.

messageRateMax <float> (default: 0)
-------------------------------------
-----------------------------------

if **messageRateMax** is greater than zero, the flow attempts to respect this delivery
speed in terms of messages per second. Note that the throttle is on messages obtained or generated
per second, prior to accept/reject filtering. the flow will sleep to limit the processing rate.


messageRateMin <float> (default: 0)
-------------------------------------
-----------------------------------

if **messageRateMin** is greater than zero, and the flow detected is lower than this rate,
a warning message will be produced:


message_ttl <duration> (default: None)
---------------------------------------
messageAgeMax <duration> (default: 0)
--------------------------------------

The messageAgeMax option sets the maximum age of a message to not
be rejected when consuming. Messages older than Max value are discarded
by the subscriber. (0 means no maximum age)

The **message_ttl** option set the time a message can live in the queue.
Past that time, the message is taken out of the queue by the broker.

mirror <flag> (default: off)
----------------------------
Expand Down Expand Up @@ -1430,6 +1432,16 @@ Sets the message format for posted messages. the currently included values are:
When provided, this value overrides whatever can be deduced from the post_topicPrefix.


post_messageAgeMax <duration> (default: 0)
-------------------------------------------

The post_messageAgeMax (aka **message_ttl**) option is used when publishing a message,
as advice to the broker. Brokers discard messages which have exceeded their intended lifespan
without delivering them. (0 means no maximum lifespan is given.)
When posting to AMQP, this option sets the *x-message-ttl* property (but the latter is in milliseconds.)
When posting to MQTT, this option sets the *MessageExpiryInterval* property.


post_on_start
-------------

Expand Down
18 changes: 14 additions & 4 deletions docs/source/fr/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1144,11 +1144,12 @@ messageRateMin <float> (défaut: 0)
Si **messageRateMin** est supérieur à zéro et que le flux détecté est inférieur à ce taux,
un message d´annonce sera produit :

message_ttl <duration> (défaut: None)
--------------------------------------
messageAgeMax <duration> (défaut: None)
----------------------------------------

L’option **message_ttl** définit un temps pour lequel un message d´annonce peut vivre dans la fil d’attente.
Après ce temps, le message d´annonce est retiré de la fil d’attente par le courtier.
L’option **AgeMax** définit un temps pour lequel un message d´annonce peut attendres du
côté consommateur. Après ce temps, le message d´annonce est rejeté par le flot.
(0 indique un age infini sera accepté.)

mirror <flag> (défaut: off)
---------------------------
Expand Down Expand Up @@ -1428,6 +1429,15 @@ Définit le format de message pour les messages publiés. les valeurs actuelleme

Lorsqu'elle est fournie, cette valeur remplace tout ce qui peut être déduit de post_topicPrefix.

post_messageAgeMax <duration> (défaut: 0)
-----------------------------------------

L'option post_messageAgeMax (alias **message_ttl**) est utilisée lors de la publication d'un message,
comme conseil au courtier. ttl est un abbréviation de *time to live* (*durée de vie maximale*) Les courtiers
rejettent les messages qui ont dépassé leur durée de vie prévue sans les livrer. (0 signifie qu'aucune durée de vie maximale n'est donnée.)
Lors de la publication avec AMQP, cette option définit la propriété *x-message-ttl* (mais cette dernière est en millisecondes).
Lors de la publication avec MQTT, cette option définit la propriété *MessageExpiryInterval*.


post_on_start
-------------
Expand Down
7 changes: 5 additions & 2 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ def __repr__(self) -> str:
float_options = [ 'messageRateMax', 'messageRateMin' ]

duration_options = [
'expire', 'housekeeping', 'logRotateInterval', 'message_ttl', 'fileAgeMax', 'fileAgeMin', 'metrics_writeInterval', \
'expire', 'housekeeping', 'logRotateInterval', 'fileAgeMax', 'fileAgeMin',
'messageAgeMax', 'post_messageAgeMax', 'metrics_writeInterval', \
'runStateThreshold_idle', 'runStateThreshold_lag', 'retry_ttl', 'runStateThreshold_hung', 'sleep', 'timeout', 'varTimeOffset'
]

Expand Down Expand Up @@ -755,7 +756,8 @@ class Config:
'logRotate': 'logRotateCount',
'logRotate': 'logRotateCount',
'logRotate_interval': 'logRotateInterval',
'message-ttl': 'message_ttl',
'message-ttl': 'post_messageAgeMax',
'message_ttl': 'post_messageAgeMax',
'msg_replace_new_dir' : 'pathReplace',
'msg_filter_wmo2msc_replace_dir': 'filter_wmo2msc_replace_dir',
'msg_filter_wmo2msc_uniquify': 'filter_wmo2msc_uniquify',
Expand Down Expand Up @@ -879,6 +881,7 @@ def __init__(self, parent=None) -> 'Config':
self.mirror = False
self.messageAgeMax = 0
self.post_exchanges = []
self.post_messageAgeMax = 0
#self.post_topicPrefix = None
self.pstrip = False
self.queueName = None
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flowcb/post/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, options):
props.update(self.o.dictify())

# adjust settings post_xxx to be xxx, as Moth does not use post_ ones.
for k in [ 'broker', 'exchange', 'topicPrefix', 'exchangeSplit', 'topic' ]:
for k in [ 'broker', 'exchange', 'topicPrefix', 'exchangeSplit', 'topic', 'messageAgeMax' ]:
post_one='post_'+k
if hasattr( self.o, post_one ):
#props.update({ k: getattr(self.o,post_one) } )
Expand Down
6 changes: 3 additions & 3 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ def _queueDeclare(self,passive=False) -> int:
if self.o['expire']:
x = int(self.o['expire'] * 1000)
if x > 0: args['x-expires'] = x
if self.o['message_ttl']:
x = int(self.o['message_ttl'] * 1000)
if self.o['messageAgeMax']:
x = int(self.o['messageAgeMax'] * 1000)
if x > 0: args['x-message-ttl'] = x

#FIXME: conver expire, message_ttl to proper units.
Expand Down Expand Up @@ -673,7 +673,7 @@ def putNewMessage(self,

if self.o['message_ttl']:
ttl = "%d" * int(
sarracenia.durationToSeconds(self.o['message_ttl']) * 1000)
sarracenia.durationToSeconds(self.o['messageAgeMax']) * 1000)
else:
ttl = "0"

Expand Down
4 changes: 2 additions & 2 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ def putSetup(self):
self.unexpected_publishes = collections.deque()

props = Properties(PacketTypes.CONNECT)
if self.o['message_ttl'] > 0:
props.MessageExpiryInterval = int(self.o['message_ttl'])
if self.o['messageAgeMax'] > 0:
props.MessageExpiryInterval = int(self.o['messageAgeMax'])

self.transport = 'websockets' if (self.o['broker'].url.scheme[-2:] == 'ws' ) or \
(self.o['broker'].url.scheme[-1] == 'w' ) else 'tcp'
Expand Down

0 comments on commit 292a5f8

Please sign in to comment.