Skip to content

Commit

Permalink
Paho mqtt v2 support (#1119)
Browse files Browse the repository at this point in the history
* passes static flow with v2 API

* make mqtt always use maual acknowledgements

* limit minimum paho-mqtt version required

* flakey pretty much passes now, dynamic is very bad though

* make mqtt binding version need more explicit

* more v2 error handling... reason_codes

* forcing Log level to warning is confusing, removed

* better error code handling on subscription

* use f-strings everywhere

* typo in string format mqtt

* add component/config marker to finalize error messages

* correct error messages from previous commit

* add mqtt sourceFromExchange/Message topicOverride support

over the winter the exchangeFromSource and exchangeFromMirror
and some topic settings were added on the amqp side. Support for these
was absent in mqtt, and it's pretty much identical, so moved
the implementation to the sarracenia.Message class, and can now
call deriveSource() and deriveTopics() from both amqp and mqtt
implementations.

* missing space in debug print
  • Loading branch information
petersilva authored Jul 15, 2024
1 parent a6c2070 commit 805dc1e
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 146 deletions.
48 changes: 48 additions & 0 deletions sarracenia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,54 @@ def copyDict(msg, d):
for h in d:
msg[h] = d[h]

def deriveSource(msg,o):
"""
set msg['source'] field as appropriate for given message and options (o)
"""
source=None
if 'source' in o:
source = o['source']
elif 'sourceFromExchange' in o and o['sourceFromExchange'] and 'exchange' in msg:
itisthere = re.match( "xs_([^_]+)_.*", msg['exchange'] )
if itisthere:
source = itisthere[1]
else:
itisthere = re.match( "xs_([^_]+)", msg['exchange'] )
if itisthere:
source = itisthere[1]
if 'source' in msg and 'sourceFromMessage' in o and o['sourceFromMessage']:
pass
elif source:
msg['source'] = source
msg['_deleteOnPost'] |= set(['source'])

def deriveTopics(msg,o,topic,separator='.'):
"""
derive subtopic, topicPrefix, and topic fields based on message and options.
"""
msg_topic = topic.split(separator)
# topic validation... deal with DMS topic scheme. https://github.com/MetPX/sarracenia/issues/1017
if 'topicCopy' in o and o['topicCopy']:
topicOverride=True
else:
topicOverride=False
if 'relPath' in msg:
path_topic = o['topicPrefix'] + os.path.dirname(msg['relPath']).split('/')

if msg_topic != path_topic:
topicOverride=True

# set subtopic if possible.
if msg_topic[0:len(o['topicPrefix'])] == o['topicPrefix']:
msg['subtopic'] = msg_topic[len(o['topicPrefix']):]
else:
topicOverride=True

if topicOverride:
msg['topic'] = topic
msg['_deleteOnPost'] |= set( ['topic'] )


def dumps(msg) -> str:
"""
FIXME: used to be msg_dumps.
Expand Down
16 changes: 7 additions & 9 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ def __repr__(self) -> str:
'sourceFromExchange': False,
'sourceFromMessage': False,
'sundew_compat_regex_first_match_is_zero': False,
'sourceFromExchange': False,
'sourceFromMessage': False,
'topicCopy': False,
'v2compatRenameDoublePost': False,
'varTimeOffset': 0,
Expand Down Expand Up @@ -1982,7 +1980,7 @@ def finalize(self, component=None, config=None):

valid_inlineEncodings = [ 'guess', 'text', 'binary' ]
if hasattr(self, 'inlineEncoding') and self.inlineEncoding not in valid_inlineEncodings:
logger.error( f"invalid inlineEncoding: {self.inlineEncoding} must be one of: {','.join(valid_inlineEncodings)}" )
logger.error( f"{component}/{config} invalid inlineEncoding: {self.inlineEncoding} must be one of: {','.join(valid_inlineEncodings)}" )

if hasattr(self, 'no'):
if self.statehost:
Expand All @@ -2004,7 +2002,7 @@ def finalize(self, component=None, config=None):
path = os.path.realpath(path)

if sys.platform == 'win32' and words0.find('\\'):
logger.warning("%s %s" % (words0, words1))
logger.warning("{component}/{config} %s %s" % (words0, words1))
logger.warning(
"use of backslash ( \\ ) is an escape character. For a path separator use forward slash ( / )."
)
Expand All @@ -2017,7 +2015,7 @@ def finalize(self, component=None, config=None):

if hasattr(self, 'pollUrl'):
if not hasattr(self,'post_baseUrl') or not self.post_baseUrl :
logger.debug( f"defaulting post_baseUrl to match pollURl, since it isn't specified." )
logger.debug( f"{component}/{config} defaulting post_baseUrl to match pollURl, since it isn't specified." )
self.post_baseUrl = self.pollUrl

# verify post_baseDir
Expand All @@ -2036,13 +2034,13 @@ def finalize(self, component=None, config=None):
self.post_baseDir = u.path
elif self.baseDir is not None:
self.post_baseDir = os.path.expanduser(self.baseDir)
logger.debug("defaulting post_baseDir to same as baseDir")
logger.debug("{component}/{config} defaulting post_baseDir to same as baseDir")


if self.messageCountMax > 0:
if self.batch > self.messageCountMax:
self.batch = self.messageCountMax
logger.info( f'overriding batch for consistency with messageCountMax: {self.batch}' )
logger.info( f'{component}/{config} overriding batch for consistency with messageCountMax: {self.batch}' )

if (component not in ['poll' ]):
self.path = list(map( os.path.expanduser, self.path ))
Expand All @@ -2053,10 +2051,10 @@ def finalize(self, component=None, config=None):
self.sleep=1

if self.runStateThreshold_hung < self.housekeeping:
logger.warning( f"runStateThreshold_hung {self.runStateThreshold_hung} set lower than housekeeping {self.housekeeping}. sr3 sanity might think this flow is hung kill it too quickly.")
logger.warning( f"{component}/{config} runStateThreshold_hung {self.runStateThreshold_hung} set lower than housekeeping {self.housekeeping}. sr3 sanity might think this flow is hung kill it too quickly.")

if self.vip and not features['vip']['present']:
logger.critical( f"vip feature requested, but missing library: {' '.join(features['vip']['modules_needed'])} " )
logger.critical( f"{component}/{config} vip feature requested, but missing library: {' '.join(features['vip']['modules_needed'])} " )
sys.exit(1)

def check_undeclared_options(self):
Expand Down
9 changes: 8 additions & 1 deletion sarracenia/featuredetection.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
'lament': 'humans will have to read larger, uglier numbers',
'rejoice': 'humans numbers that are easier to read.' },
'mqtt' : { 'modules_needed': ['paho.mqtt.client'], 'present': False,
'lament': 'cannot connect to mqtt brokers' ,
'lament': 'cannot connect to mqtt brokers (need >= 2.1.0)' ,
'rejoice': 'can connect to mqtt brokers' },
'process' : { 'modules_needed': ['psutil'], 'present': False,
'lament': 'cannot monitor running processes, sr3 CLI basically does not work.',
Expand Down Expand Up @@ -131,3 +131,10 @@
features['filetypes']['present'] = False
logger.debug( f'redhat magic bindings not supported.')

if features['mqtt']['present']:
import paho.mqtt
if not paho.mqtt.__version__ >= '2.1.0' :
features['mqtt']['present'] = False
logger.debug( f'paho-mqtt minimum version needed is 2.1.0')


6 changes: 4 additions & 2 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,11 @@ def run(self):
if (last_gather_len == 0) and (self.o.sleep < 0):
if (self.o.retryEmptyBeforeExit and "retry" in self.metrics
and self.metrics['retry']['msgs_in_post_retry'] > 0):
logger.debug("Not exiting because there are still messages in the post retry queue.")
logger.info( f"retryEmptyBeforeExit=True and there are still "
f"{self.metrics['retry']['msgs_in_post_retry']} messages in the post retry queue.")
# Sleep for a while. Messages can't be retried before housekeeping has run...
current_sleep = 60
# how long to sleep is unclear... if there are a lot of retries, and a low batch... could take a long time.
current_sleep = self.o.batch if self.o.batch < self.o.housekeeping else self.o.housekeeping // 2
else:
self.runCallbacksTime('please_stop')

Expand Down
1 change: 0 additions & 1 deletion sarracenia/flowcb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,5 @@ def load_library(factory_path, options):
setattr(opt, s, options.settings[factory_path][s])
else:
opt = options

plugin = class_(opt)
return plugin
16 changes: 12 additions & 4 deletions sarracenia/flowcb/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,25 @@ def after_work(self, worklist) -> None:
return

if len(worklist.failed) != 0:
#logger.debug("putting %d messages into %s" % (len(worklist.failed),self.download_retry_name) )
logger.debug( f"putting {len(worklist.failed)} messages into {self.download_retry_name}" )
self.download_retry.put(worklist.failed)
worklist.failed = []

# retry posting...
qty = (self.o.batch / 2) - len(worklist.ok)
if qty <= 0: return
if (self.o.batch > 2):
qty = self.o.batch // 2 - len(worklist.ok)
elif len(worklist.ok) < self.o.batch :
qty=self.o.batch - len(worklist.ok)
else:
qty=0

if qty <= 0:
logger.info( f"{len(worklist.ok)} messages to process, too busy to retry" )
return

mlist = self.post_retry.get(qty)

#logger.debug("loading from %s: qty=%d ... got: %d " % (self.post_retry_name, qty, len(mlist)))
logger.debug( f"loading from {self.post_retry_name}: qty={qty} ... got: {len(mlist)}" )
if len(mlist) > 0:
worklist.ok.extend(mlist)

Expand Down
40 changes: 2 additions & 38 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,45 +138,9 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message:
topic = raw_msg.delivery_info['routing_key'].replace(
'%23', '#').replace('%22', '*')
msg['exchange'] = raw_msg.delivery_info['exchange']
source=None
if 'source' in self.o:
source = self.o['source']
elif 'sourceFromExchange' in self.o and self.o['sourceFromExchange']:
itisthere = re.match( "xs_([^_]+)_.*", msg['exchange'] )
if itisthere:
source = itisthere[1]
else:
itisthere = re.match( "xs_([^_]+)", msg['exchange'] )
if itisthere:
source = itisthere[1]
if 'source' in msg and 'sourceFromMessage' in self.o and self.o['sourceFromMessage']:
pass
elif source:
msg['source'] = source
msg['_deleteOnPost'] |= set(['source'])

msg_topic = topic.split('.')

# topic validation... deal with DMS topic scheme. https://github.com/MetPX/sarracenia/issues/1017
if 'topicCopy' in self.o and self.o['topicCopy']:
topicOverride=True
else:
topicOverride=False
if 'relPath' in msg:
path_topic = self.o['topicPrefix'] + os.path.dirname(msg['relPath']).split('/')

if msg_topic != path_topic:
topicOverride=True

# set subtopic if possible.
if msg_topic[0:len(self.o['topicPrefix'])] == self.o['topicPrefix']:
msg['subtopic'] = msg_topic[len(self.o['topicPrefix']):]
else:
topicOverride=True

if topicOverride:
msg['topic'] = topic
msg['_deleteOnPost'] |= set( ['topic'] )
msg.deriveSource( self.o )
msg.deriveTopics( self.o, topic )

msg['ack_id'] = raw_msg.delivery_info['delivery_tag']
msg['local_offset'] = 0
Expand Down
Loading

0 comments on commit 805dc1e

Please sign in to comment.