Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Paho mqtt v2 support #1119

Merged
merged 16 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions sarracenia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,54 @@ def copyDict(msg, d):
for h in d:
msg[h] = d[h]

def deriveSource(msg,o):
"""
set msg['source'] field as appropriate for given message and options (o)
"""
source=None
if 'source' in o:
source = o['source']
elif 'sourceFromExchange' in o and o['sourceFromExchange'] and 'exchange' in msg:
itisthere = re.match( "xs_([^_]+)_.*", msg['exchange'] )
if itisthere:
source = itisthere[1]
else:
itisthere = re.match( "xs_([^_]+)", msg['exchange'] )
if itisthere:
source = itisthere[1]
if 'source' in msg and 'sourceFromMessage' in o and o['sourceFromMessage']:
pass
elif source:
msg['source'] = source
msg['_deleteOnPost'] |= set(['source'])

def deriveTopics(msg,o,topic,separator='.'):
"""
derive subtopic, topicPrefix, and topic fields based on message and options.
"""
msg_topic = topic.split(separator)
# topic validation... deal with DMS topic scheme. https://github.com/MetPX/sarracenia/issues/1017
if 'topicCopy' in o and o['topicCopy']:
topicOverride=True
else:
topicOverride=False
if 'relPath' in msg:
path_topic = o['topicPrefix'] + os.path.dirname(msg['relPath']).split('/')

if msg_topic != path_topic:
topicOverride=True

# set subtopic if possible.
if msg_topic[0:len(o['topicPrefix'])] == o['topicPrefix']:
msg['subtopic'] = msg_topic[len(o['topicPrefix']):]
else:
topicOverride=True

if topicOverride:
msg['topic'] = topic
msg['_deleteOnPost'] |= set( ['topic'] )


def dumps(msg) -> str:
"""
FIXME: used to be msg_dumps.
Expand Down
16 changes: 7 additions & 9 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ def __repr__(self) -> str:
'sourceFromExchange': False,
'sourceFromMessage': False,
'sundew_compat_regex_first_match_is_zero': False,
'sourceFromExchange': False,
'sourceFromMessage': False,
'topicCopy': False,
'v2compatRenameDoublePost': False,
'varTimeOffset': 0,
Expand Down Expand Up @@ -1982,7 +1980,7 @@ def finalize(self, component=None, config=None):

valid_inlineEncodings = [ 'guess', 'text', 'binary' ]
if hasattr(self, 'inlineEncoding') and self.inlineEncoding not in valid_inlineEncodings:
logger.error( f"invalid inlineEncoding: {self.inlineEncoding} must be one of: {','.join(valid_inlineEncodings)}" )
logger.error( f"{component}/{config} invalid inlineEncoding: {self.inlineEncoding} must be one of: {','.join(valid_inlineEncodings)}" )

if hasattr(self, 'no'):
if self.statehost:
Expand All @@ -2004,7 +2002,7 @@ def finalize(self, component=None, config=None):
path = os.path.realpath(path)

if sys.platform == 'win32' and words0.find('\\'):
logger.warning("%s %s" % (words0, words1))
logger.warning("{component}/{config} %s %s" % (words0, words1))
logger.warning(
"use of backslash ( \\ ) is an escape character. For a path separator use forward slash ( / )."
)
Expand All @@ -2017,7 +2015,7 @@ def finalize(self, component=None, config=None):

if hasattr(self, 'pollUrl'):
if not hasattr(self,'post_baseUrl') or not self.post_baseUrl :
logger.debug( f"defaulting post_baseUrl to match pollURl, since it isn't specified." )
logger.debug( f"{component}/{config} defaulting post_baseUrl to match pollURl, since it isn't specified." )
self.post_baseUrl = self.pollUrl

# verify post_baseDir
Expand All @@ -2036,13 +2034,13 @@ def finalize(self, component=None, config=None):
self.post_baseDir = u.path
elif self.baseDir is not None:
self.post_baseDir = os.path.expanduser(self.baseDir)
logger.debug("defaulting post_baseDir to same as baseDir")
logger.debug("{component}/{config} defaulting post_baseDir to same as baseDir")


if self.messageCountMax > 0:
if self.batch > self.messageCountMax:
self.batch = self.messageCountMax
logger.info( f'overriding batch for consistency with messageCountMax: {self.batch}' )
logger.info( f'{component}/{config} overriding batch for consistency with messageCountMax: {self.batch}' )

if (component not in ['poll' ]):
self.path = list(map( os.path.expanduser, self.path ))
Expand All @@ -2053,10 +2051,10 @@ def finalize(self, component=None, config=None):
self.sleep=1

if self.runStateThreshold_hung < self.housekeeping:
logger.warning( f"runStateThreshold_hung {self.runStateThreshold_hung} set lower than housekeeping {self.housekeeping}. sr3 sanity might think this flow is hung kill it too quickly.")
logger.warning( f"{component}/{config} runStateThreshold_hung {self.runStateThreshold_hung} set lower than housekeeping {self.housekeeping}. sr3 sanity might think this flow is hung kill it too quickly.")

if self.vip and not features['vip']['present']:
logger.critical( f"vip feature requested, but missing library: {' '.join(features['vip']['modules_needed'])} " )
logger.critical( f"{component}/{config} vip feature requested, but missing library: {' '.join(features['vip']['modules_needed'])} " )
sys.exit(1)

def check_undeclared_options(self):
Expand Down
9 changes: 8 additions & 1 deletion sarracenia/featuredetection.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
'lament': 'humans will have to read larger, uglier numbers',
'rejoice': 'humans numbers that are easier to read.' },
'mqtt' : { 'modules_needed': ['paho.mqtt.client'], 'present': False,
'lament': 'cannot connect to mqtt brokers' ,
'lament': 'cannot connect to mqtt brokers (need >= 2.1.0)' ,
'rejoice': 'can connect to mqtt brokers' },
'process' : { 'modules_needed': ['psutil'], 'present': False,
'lament': 'cannot monitor running processes, sr3 CLI basically does not work.',
Expand Down Expand Up @@ -131,3 +131,10 @@
features['filetypes']['present'] = False
logger.debug( f'redhat magic bindings not supported.')

if features['mqtt']['present']:
import paho.mqtt
if not paho.mqtt.__version__ >= '2.1.0' :
features['mqtt']['present'] = False
logger.debug( f'paho-mqtt minimum version needed is 2.1.0')


6 changes: 4 additions & 2 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,11 @@ def run(self):
if (last_gather_len == 0) and (self.o.sleep < 0):
if (self.o.retryEmptyBeforeExit and "retry" in self.metrics
and self.metrics['retry']['msgs_in_post_retry'] > 0):
logger.debug("Not exiting because there are still messages in the post retry queue.")
logger.info( f"retryEmptyBeforeExit=True and there are still "
f"{self.metrics['retry']['msgs_in_post_retry']} messages in the post retry queue.")
# Sleep for a while. Messages can't be retried before housekeeping has run...
current_sleep = 60
# how long to sleep is unclear... if there are a lot of retries, and a low batch... could take a long time.
current_sleep = self.o.batch if self.o.batch < self.o.housekeeping else self.o.housekeeping // 2
else:
self.runCallbacksTime('please_stop')

Expand Down
1 change: 0 additions & 1 deletion sarracenia/flowcb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,5 @@ def load_library(factory_path, options):
setattr(opt, s, options.settings[factory_path][s])
else:
opt = options

plugin = class_(opt)
return plugin
16 changes: 12 additions & 4 deletions sarracenia/flowcb/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,25 @@ def after_work(self, worklist) -> None:
return

if len(worklist.failed) != 0:
#logger.debug("putting %d messages into %s" % (len(worklist.failed),self.download_retry_name) )
logger.debug( f"putting {len(worklist.failed)} messages into {self.download_retry_name}" )
self.download_retry.put(worklist.failed)
worklist.failed = []

# retry posting...
qty = (self.o.batch / 2) - len(worklist.ok)
if qty <= 0: return
if (self.o.batch > 2):
qty = self.o.batch // 2 - len(worklist.ok)
elif len(worklist.ok) < self.o.batch :
qty=self.o.batch - len(worklist.ok)
else:
qty=0

if qty <= 0:
logger.info( f"{len(worklist.ok)} messages to process, too busy to retry" )
return

mlist = self.post_retry.get(qty)

#logger.debug("loading from %s: qty=%d ... got: %d " % (self.post_retry_name, qty, len(mlist)))
logger.debug( f"loading from {self.post_retry_name}: qty={qty} ... got: {len(mlist)}" )
if len(mlist) > 0:
worklist.ok.extend(mlist)

Expand Down
40 changes: 2 additions & 38 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,45 +138,9 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message:
topic = raw_msg.delivery_info['routing_key'].replace(
'%23', '#').replace('%22', '*')
msg['exchange'] = raw_msg.delivery_info['exchange']
source=None
if 'source' in self.o:
source = self.o['source']
elif 'sourceFromExchange' in self.o and self.o['sourceFromExchange']:
itisthere = re.match( "xs_([^_]+)_.*", msg['exchange'] )
if itisthere:
source = itisthere[1]
else:
itisthere = re.match( "xs_([^_]+)", msg['exchange'] )
if itisthere:
source = itisthere[1]
if 'source' in msg and 'sourceFromMessage' in self.o and self.o['sourceFromMessage']:
pass
elif source:
msg['source'] = source
msg['_deleteOnPost'] |= set(['source'])

msg_topic = topic.split('.')

# topic validation... deal with DMS topic scheme. https://github.com/MetPX/sarracenia/issues/1017
if 'topicCopy' in self.o and self.o['topicCopy']:
topicOverride=True
else:
topicOverride=False
if 'relPath' in msg:
path_topic = self.o['topicPrefix'] + os.path.dirname(msg['relPath']).split('/')

if msg_topic != path_topic:
topicOverride=True

# set subtopic if possible.
if msg_topic[0:len(self.o['topicPrefix'])] == self.o['topicPrefix']:
msg['subtopic'] = msg_topic[len(self.o['topicPrefix']):]
else:
topicOverride=True

if topicOverride:
msg['topic'] = topic
msg['_deleteOnPost'] |= set( ['topic'] )
msg.deriveSource( self.o )
msg.deriveTopics( self.o, topic )

msg['ack_id'] = raw_msg.delivery_info['delivery_tag']
msg['local_offset'] = 0
Expand Down
Loading
Loading