From 9da4569bc014c919e26587954a634c3d5e3445af Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Thu, 13 Jun 2024 09:49:42 -0400 Subject: [PATCH 01/14] passes static flow with v2 API --- sarracenia/moth/mqtt.py | 63 +++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index d6f63b584..af9effb1e 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -180,9 +180,9 @@ def __init__(self, options, is_subscriber): logger.warning("note: mqtt support is newish, not very well tested") - def __sub_on_disconnect(client, userdata, rc, properties=None): + def __sub_on_disconnect(client, userdata, mid, reason_code, properties=None): userdata.metricsDisconnect() - logger.debug(paho.mqtt.client.connack_string(rc)) + logger.debug(reason_code) if hasattr(userdata, 'pending_publishes'): lost = len(userdata.pending_publishes) if lost > 0: @@ -194,9 +194,9 @@ def __sub_on_disconnect(client, userdata, rc, properties=None): def __sub_on_connect(client, userdata, flags, rc, properties=None): logger.debug("client=%s rc=%s, flags=%s" % - (client, paho.mqtt.client.connack_string(rc), flags)) + (client, rc, flags)) - if flags['session present'] != 1: + if not flags.session_present: logger.debug( 'no existing session, no recovery of inflight messages from previous connection' ) @@ -230,24 +230,31 @@ def __sub_on_connect(client, userdata, flags, rc, properties=None): def __sub_on_subscribe(client, userdata, mid, - granted_qos, + reason_codes, properties=None): + + userdata.subscribe_mutex.acquire() - logger.info("client: {} subscribe completed mid={} granted_qos={}".format( - client._client_id, mid, list(map(lambda x: x.getName(), granted_qos)))) + logger.info("client: {} subscribe completed mid={} reason_codes={}".format( + client._client_id, mid, reason_codes)) userdata.subscribe_in_progress -= 1 userdata.subscribe_mutex.release() - def __pub_on_disconnect(client, userdata, rc, properties=None): + def __pub_on_disconnect(client, userdata, mid, reason_code, properties=None): userdata.metricsDisconnect() - logger.info(paho.mqtt.client.connack_string(rc)) + logger.info(reason_code) - def __pub_on_connect(client, userdata, flags, rc, properties=None): + def __pub_on_connect(client, userdata, flags, reason_code, properties=None): userdata.connect_in_progress = False - userdata.metricsConnect() - logger.info(paho.mqtt.client.connack_string(rc)) + logger.info(reason_code) + + if reason_code == 0: + userdata.metricsConnect() + elif reason_code > 0: + pass # error processing... + - def __pub_on_publish(client, userdata, mid): + def __pub_on_publish(client, userdata, mid, reason_codes, properties=None): if mid in userdata.pending_publishes: logger.info('publish complete. mid={}'.format(mid)) @@ -299,13 +306,16 @@ def __clientSetup(self, cid) -> paho.mqtt.client.Client: self.connect_in_progress = True - if (self.o['broker'].url.scheme[-2:] == 'ws' ) or \ - (self.o['broker'].url.scheme[-1] == 'w' ) : - client = paho.mqtt.client.Client( userdata=self, transport="websockets", \ - client_id=cid, protocol=paho.mqtt.client.MQTTv5 ) - else: - client = paho.mqtt.client.Client( userdata=self, \ - client_id=cid, protocol=paho.mqtt.client.MQTTv5 ) + self.transport= 'websocket' if (self.o['broker'].url.scheme[-2:] == 'ws' ) or \ + (self.o['broker'].url.scheme[-1] == 'w' ) else 'tcp' + + logger.info( "FIXME: {self.transport=} ") + client = paho.mqtt.client.Client( \ + callback_api_version = paho.mqtt.client.CallbackAPIVersion.VERSION2, \ + client_id=cid, userdata=self, protocol=paho.mqtt.client.MQTTv5, \ + transport = self.transport , manual_ack = self.manual_ack ) + + # FIXME: transport = 'websockets', 'unix' client.connected = False client.on_connect = MQTT.__sub_on_connect @@ -446,13 +456,12 @@ def putSetup(self): if self.o['message_ttl'] > 0: props.MessageExpiryInterval = int(self.o['message_ttl']) - if (self.o['broker'].url.scheme[-2:] == 'ws' ) or \ - (self.o['broker'].url.scheme[-1] == 'w' ) : - self.client = paho.mqtt.client.Client( userdata=self, transport="websockets", \ - protocol=self.proto_version ) - else: - self.client = paho.mqtt.client.Client( userdata=self, \ - protocol=self.proto_version ) + self.transport = 'websockets' if (self.o['broker'].url.scheme[-2:] == 'ws' ) or \ + (self.o['broker'].url.scheme[-1] == 'w' ) else 'tcp' + + self.client = paho.mqtt.client.Client( + callback_api_version = paho.mqtt.client.CallbackAPIVersion.VERSION2, \ + userdata=self, transport=self.transport, protocol=self.proto_version ) #self.client = paho.mqtt.client.Client( # protocol=self.proto_version, userdata=self) From d793e1e52a66e1617ddac5c6447fa314f397e773 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Thu, 13 Jun 2024 10:27:22 -0400 Subject: [PATCH 02/14] make mqtt always use maual acknowledgements --- sarracenia/moth/mqtt.py | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index af9effb1e..478c4bb74 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -313,7 +313,7 @@ def __clientSetup(self, cid) -> paho.mqtt.client.Client: client = paho.mqtt.client.Client( \ callback_api_version = paho.mqtt.client.CallbackAPIVersion.VERSION2, \ client_id=cid, userdata=self, protocol=paho.mqtt.client.MQTTv5, \ - transport = self.transport , manual_ack = self.manual_ack ) + transport = self.transport , manual_ack = True ) # FIXME: transport = 'websockets', 'unix' @@ -346,7 +346,6 @@ def getSetup(self): something_broke = True self.connected=False - self.manual_ack = False while True: if self.please_stop: @@ -372,17 +371,6 @@ def getSetup(self): logger.info( f"is no around? {self.o['no']} " ) if ('no' in self.o) and self.o['no'] > 0: # instances 'started' self.client = self.__clientSetup(cid) - if hasattr(self, 'client') and hasattr(self.client, 'manual_ack_set'): # FIXME breaking this... - self.client.manual_ack_set(True) - logger.debug( - "Switching on manual_acks for higher reliability via explicit acknowledgements." - ) - self.manual_ack = True - else: - logger.warning( - "paho library automatically acknowledges receipt. may lose data every crash or restart." - ) - self.client.connect( self.o['broker'].url.hostname, port=self.__sslClientSetup(), \ clean_start=False, properties=props ) self.client.enable_logger(logger) @@ -667,7 +655,7 @@ def getNewMessage(self) -> sarracenia.Message: def ack(self, m: sarracenia.Message ) -> None: - if self.manual_ack and ('ack_id' in m): + if 'ack_id' in m: logger.info('mid=%d' % m['ack_id']) self.client.ack( m['ack_id'], m['qos'] ) del m['ack_id'] From ace30c04b02cc8acb24fa6271d349d786d03664c Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Thu, 13 Jun 2024 14:08:07 -0400 Subject: [PATCH 03/14] limit minimum paho-mqtt version required --- sarracenia/featuredetection.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sarracenia/featuredetection.py b/sarracenia/featuredetection.py index b367d70a7..d1b975156 100755 --- a/sarracenia/featuredetection.py +++ b/sarracenia/featuredetection.py @@ -131,3 +131,10 @@ features['filetypes']['present'] = False logger.debug( f'redhat magic bindings not supported.') +if features['mqtt']['present']: + import paho.mqtt + if not paho.mqtt.__version__ >= '2.1.0' : + features['mqtt']['present'] = False + logger.debug( f'paho-mqtt minimum version needed is 2.1.0') + + From 2e7ee5a24c964e5b577859a2f75c75a9583158dc Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Thu, 13 Jun 2024 17:30:21 -0400 Subject: [PATCH 04/14] flakey pretty much passes now, dynamic is very bad though --- sarracenia/flow/__init__.py | 3 ++- sarracenia/flowcb/retry.py | 16 ++++++++++++---- sarracenia/moth/mqtt.py | 2 +- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index f1deeb6c5..8d9604ad8 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -606,7 +606,8 @@ def run(self): and self.metrics['retry']['msgs_in_post_retry'] > 0): logger.debug("Not exiting because there are still messages in the post retry queue.") # Sleep for a while. Messages can't be retried before housekeeping has run... - current_sleep = 60 + # how long to sleep is unclear... if there are a lot of retries, and a low batch... could take a long time. + current_sleep = self.o.batch if self.o.batch < self.o.housekeeping else self.o.housekeeping // 2 else: self.runCallbacksTime('please_stop') diff --git a/sarracenia/flowcb/retry.py b/sarracenia/flowcb/retry.py index d5086d2e7..9cb3f3eb4 100755 --- a/sarracenia/flowcb/retry.py +++ b/sarracenia/flowcb/retry.py @@ -122,17 +122,25 @@ def after_work(self, worklist) -> None: return if len(worklist.failed) != 0: - #logger.debug("putting %d messages into %s" % (len(worklist.failed),self.download_retry_name) ) + logger.debug( f"putting {len(worklist.failed)} messages into {self.download_retry_name}" ) self.download_retry.put(worklist.failed) worklist.failed = [] # retry posting... - qty = (self.o.batch / 2) - len(worklist.ok) - if qty <= 0: return + if (self.o.batch > 2): + qty = self.o.batch // 2 - len(worklist.ok) + elif len(worklist.ok) < self.o.batch : + qty=self.o.batch - len(worklist.ok) + else: + qty=0 + + if qty <= 0: + logger.info( f"{len(worklist.ok)} messages to process, too busy to retry" ) + return mlist = self.post_retry.get(qty) - #logger.debug("loading from %s: qty=%d ... got: %d " % (self.post_retry_name, qty, len(mlist))) + logger.debug( f"loading from {self.post_retry_name}: qty={qty} ... got: {len(mlist)}" ) if len(mlist) > 0: worklist.ok.extend(mlist) diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index 478c4bb74..1aa988a68 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -773,7 +773,7 @@ def close(self): if hasattr(self, 'pending_publishes'): ebo=0.1 - while (len(self.pending_publishes)+len(self.unexpected_publishes)) >0: + while len(self.pending_publishes) >0: logger.info( f'waiting {ebo} seconds for last {len(self.pending_publishes)} messages to publish') if len(self.unexpected_publishes) < 10: logger.info( f'messages acknowledged before publish?: {self.unexpected_publishes}') From 12966ba9e5787423a211624a1c8045a7b02cc89f Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Thu, 13 Jun 2024 17:50:15 -0400 Subject: [PATCH 05/14] make mqtt binding version need more explicit --- sarracenia/featuredetection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sarracenia/featuredetection.py b/sarracenia/featuredetection.py index d1b975156..9d48c0304 100755 --- a/sarracenia/featuredetection.py +++ b/sarracenia/featuredetection.py @@ -70,7 +70,7 @@ 'lament': 'humans will have to read larger, uglier numbers', 'rejoice': 'humans numbers that are easier to read.' }, 'mqtt' : { 'modules_needed': ['paho.mqtt.client'], 'present': False, - 'lament': 'cannot connect to mqtt brokers' , + 'lament': 'cannot connect to mqtt brokers (need >= 2.1.0)' , 'rejoice': 'can connect to mqtt brokers' }, 'process' : { 'modules_needed': ['psutil'], 'present': False, 'lament': 'cannot monitor running processes, sr3 CLI basically does not work.', From 8513ad540402991b486e2df4fd5724e0e8a3fe02 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 17 Jun 2024 08:38:29 -0400 Subject: [PATCH 06/14] more v2 error handling... reason_codes --- sarracenia/moth/mqtt.py | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index 1aa988a68..86cb77db7 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -192,21 +192,23 @@ def __sub_on_disconnect(client, userdata, mid, reason_code, properties=None): else: logger.info('clean. no published messages lost.') - def __sub_on_connect(client, userdata, flags, rc, properties=None): - logger.debug("client=%s rc=%s, flags=%s" % - (client, rc, flags)) + def __sub_on_connect(client, userdata, flags, reason_code, properties=None): + + userdata.connect_in_progress = False + + if reason_code != 0 : + logger.error( f"failed to establish connection: {reason_code}") + return if not flags.session_present: logger.debug( 'no existing session, no recovery of inflight messages from previous connection' ) + logger.info('connection succeeded') - userdata.connect_in_progress = False - if rc != paho.mqtt.client.MQTT_ERR_SUCCESS: - logger.error("failed to establish connection") - return - + # else reason_code == 0 ... success. # FIXME: enhancement could subscribe accepts multiple (subj, qos) tuples so, could do this in one RTT. + userdata.connected=True userdata.subscribe_mutex.acquire() for binding_tuple in userdata.o['bindings']: @@ -246,23 +248,24 @@ def __pub_on_disconnect(client, userdata, mid, reason_code, properties=None): def __pub_on_connect(client, userdata, flags, reason_code, properties=None): userdata.connect_in_progress = False - logger.info(reason_code) if reason_code == 0: + logger.info(reason_code) userdata.metricsConnect() - elif reason_code > 0: - pass # error processing... + userdata.connected=True + else: + logger.error( f"connect failed: {reason_code}" ) def __pub_on_publish(client, userdata, mid, reason_codes, properties=None): if mid in userdata.pending_publishes: - logger.info('publish complete. mid={}'.format(mid)) + logger.info( f"publish complete. mid={mid}" ) # FIXME: worried... not clear if dequeue remove is thread safe. userdata.pending_publishes.remove(mid) else: userdata.unexpected_publishes.append(mid) - logger.info( 'BUG: ack for message we do not know we published. mid={}'. format(mid)) + logger.info( f"BUG: ack for message we do not know we published. mid={mid}" ) def __sslClientSetup(self) -> int: """ @@ -272,7 +275,7 @@ def __sslClientSetup(self) -> int: """ if self.o['broker'].url.scheme[-1] == 's': port = 8883 - logger.info('tlsRigour: %s' % self.o['tlsRigour']) + logger.info( f"tlsRigour: {self.o['tlsRigour']}" ) self.o['tlsRigour'] = self.o['tlsRigour'].lower() if self.o['tlsRigour'] == 'lax': self.tlsctx = ssl.create_default_context() @@ -352,7 +355,6 @@ def getSetup(self): break try: - cs = self.o['clean_session'] if ('queueName' in self.o) and ('no' in self.o): cid = self.o['queueName'] + "_i%02d" % self.o['no'] @@ -383,7 +385,6 @@ def getSetup(self): break if ebo < 512 : ebo *= 2 - self.connected=True break else: # either 'declare' or 'foreground' if 'instances' in self.o: @@ -480,9 +481,7 @@ def putSetup(self): if ebo < 512: ebo *= 2 - if not self.connect_in_progress: - self.connected=True break except Exception as err: From 40a20bd1a93ff685a5c3232b4377b6cddf961ac0 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 17 Jun 2024 08:39:11 -0400 Subject: [PATCH 07/14] forcing Log level to warning is confusing, removed --- sarracenia/moth/mqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index 86cb77db7..b7c1d3d6f 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -138,7 +138,7 @@ def __init__(self, options, is_subscriber): me = "%s.%s" % (__class__.__module__, __class__.__name__) - logger.setLevel('WARNING') + #logger.setLevel('WARNING') if ('settings' in self.o) and (me in self.o['settings']): for s in self.o['settings'][me]: From 9d70838465bba2f2c094a01f14621a8da9f8beff Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 17 Jun 2024 17:31:43 -0400 Subject: [PATCH 08/14] better error code handling on subscription --- sarracenia/flow/__init__.py | 3 ++- sarracenia/flowcb/__init__.py | 1 - sarracenia/moth/mqtt.py | 22 +++++++++++++++------- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index 8d9604ad8..58c5f6f42 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -604,7 +604,8 @@ def run(self): if (last_gather_len == 0) and (self.o.sleep < 0): if (self.o.retryEmptyBeforeExit and "retry" in self.metrics and self.metrics['retry']['msgs_in_post_retry'] > 0): - logger.debug("Not exiting because there are still messages in the post retry queue.") + logger.info( f"retryEmptyBeforeExit=True and there are still " + f"{self.metrics['retry']['msgs_in_post_retry']} messages in the post retry queue.") # Sleep for a while. Messages can't be retried before housekeeping has run... # how long to sleep is unclear... if there are a lot of retries, and a low batch... could take a long time. current_sleep = self.o.batch if self.o.batch < self.o.housekeeping else self.o.housekeeping // 2 diff --git a/sarracenia/flowcb/__init__.py b/sarracenia/flowcb/__init__.py index a3db82329..e6c2f88c8 100755 --- a/sarracenia/flowcb/__init__.py +++ b/sarracenia/flowcb/__init__.py @@ -271,6 +271,5 @@ def load_library(factory_path, options): setattr(opt, s, options.settings[factory_path][s]) else: opt = options - plugin = class_(opt) return plugin diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index b7c1d3d6f..0f3408e39 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -235,12 +235,21 @@ def __sub_on_subscribe(client, reason_codes, properties=None): + - userdata.subscribe_mutex.acquire() - logger.info("client: {} subscribe completed mid={} reason_codes={}".format( - client._client_id, mid, reason_codes)) - userdata.subscribe_in_progress -= 1 - userdata.subscribe_mutex.release() + for sub_result in reason_codes: + if sub_result == 1: + userdata.subscribe_mutex.acquire() + logger.info( f"client: {client._client_id} subscribe " + f"completed mid={mid} reason_codes={reason_codes}" ) + userdata.subscribe_in_progress -= 1 + userdata.subscribe_mutex.release() + elif sub_result >= 128: + logger.error( f"client: {client._client_id} subscribe " + f"failed mid={mid} reason_codes={reason_codes}" ) + else: + logger.warning( f"client: {client._client_id} subscribe " + f"unsure mid={mid} reason_codes={reason_codes}" ) def __pub_on_disconnect(client, userdata, mid, reason_code, properties=None): userdata.metricsDisconnect() @@ -312,7 +321,6 @@ def __clientSetup(self, cid) -> paho.mqtt.client.Client: self.transport= 'websocket' if (self.o['broker'].url.scheme[-2:] == 'ws' ) or \ (self.o['broker'].url.scheme[-1] == 'w' ) else 'tcp' - logger.info( "FIXME: {self.transport=} ") client = paho.mqtt.client.Client( \ callback_api_version = paho.mqtt.client.CallbackAPIVersion.VERSION2, \ client_id=cid, userdata=self, protocol=paho.mqtt.client.MQTTv5, \ @@ -756,7 +764,7 @@ def putNewMessage(self, logger.info("published mid={} ack_pending={} {} to under: {} ".format( info.mid, ack_pending, body, topic)) return True #success... - logger.error( f"publish failed {paho.mqtt.client.error_string(info.rc)} ") + logger.error( f"publish failed {paho.mqtt.client.error_string(info.rc)} {info}") except Exception as ex: logger.error('Exception details: ', exc_info=True) From b7c686523ba32b6790dc8f6e1fc5e7f58ba76c95 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 17 Jun 2024 17:54:05 -0400 Subject: [PATCH 09/14] use f-strings everywhere --- sarracenia/moth/mqtt.py | 60 ++++++++++++++++++----------------------- 1 file changed, 26 insertions(+), 34 deletions(-) diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index 0f3408e39..81f50f71e 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -138,7 +138,6 @@ def __init__(self, options, is_subscriber): me = "%s.%s" % (__class__.__module__, __class__.__name__) - #logger.setLevel('WARNING') if ('settings' in self.o) and (me in self.o['settings']): for s in self.o['settings'][me]: @@ -186,11 +185,10 @@ def __sub_on_disconnect(client, userdata, mid, reason_code, properties=None): if hasattr(userdata, 'pending_publishes'): lost = len(userdata.pending_publishes) if lost > 0: - logger.error( - 'message loss! cannot confirm %d messages were published: mids=%s' - % (lost, userdata.pending_publishes)) + logger.error( f"message loss! cannot confirm {lost}" + f"messages were published: mids={userdata.pending_publishes}" ) else: - logger.info('clean. no published messages lost.') + logger.info( f"clean. no published messages lost.") def __sub_on_connect(client, userdata, flags, reason_code, properties=None): @@ -202,9 +200,9 @@ def __sub_on_connect(client, userdata, flags, reason_code, properties=None): if not flags.session_present: logger.debug( - 'no existing session, no recovery of inflight messages from previous connection' + f"no existing session, no recovery of inflight messages from previous connection" ) - logger.info('connection succeeded') + logger.info( f"connection succeeded" ) # else reason_code == 0 ... success. # FIXME: enhancement could subscribe accepts multiple (subj, qos) tuples so, could do this in one RTT. @@ -216,15 +214,15 @@ def __sub_on_connect(client, userdata, flags, reason_code, properties=None): subj=userdata.o['topic'] else: exchange, prefix, subtopic = binding_tuple - logger.info("tuple: %s %s %s" % (exchange, prefix, subtopic)) + logger.info( f"tuple: {exchange} {prefix} {subtopic}") subj = '/'.join(['$share', userdata.o['queueName'], exchange] + prefix + subtopic) (res, mid) = client.subscribe(subj, qos=userdata.o['qos']) userdata.subscribe_in_progress += 1 - logger.info( "asked to subscribe to: %s, mid=%d qos=%s result: %s" % (subj, mid, \ - userdata.o['qos'], paho.mqtt.client.error_string(res)) ) + logger.info( f"asked to subscribe to: {subj}, mid={mid}" + f"qos={userdata.o['qos']} result: {paho.mqtt.client.error_string(res)}" ) userdata.subscribe_mutex.release() userdata.metricsConnect() @@ -284,7 +282,6 @@ def __sslClientSetup(self) -> int: """ if self.o['broker'].url.scheme[-1] == 's': port = 8883 - logger.info( f"tlsRigour: {self.o['tlsRigour']}" ) self.o['tlsRigour'] = self.o['tlsRigour'].lower() if self.o['tlsRigour'] == 'lax': self.tlsctx = ssl.create_default_context() @@ -305,7 +302,7 @@ def __sslClientSetup(self) -> int: self.tlsctx = ssl.create_default_context() else: self.logger.warning( - "option tlsRigour must be one of: lax, normal, strict") + f"option tlsRigour must be one of: lax, normal, strict") self.client.tls_set_context(self.tlsctx) else: port = 1883 @@ -402,7 +399,7 @@ def getSetup(self): for i in range(1,session_mxi ): icid = self.o['queueName'] + "_i%02d" % i - logger.info('declare session for instances %s' %icid) + logger.info( f"declare session for instances {icid}" ) decl_client = self.__clientSetup(icid) decl_client.on_connect = MQTT.__sub_on_connect decl_client.connect( self.o['broker'].url.hostname, port=self.__sslClientSetup(), \ @@ -411,12 +408,11 @@ def getSetup(self): decl_client.loop(1) decl_client.disconnect() decl_client.loop_stop() - logger.info('instance declaration for %s done' % icid) + logger.info( f"instance declaration for {icid} done" ) break except Exception as err: - logger.error("failed to {} with {}".format( - self.o['broker'].url.hostname, err)) + logger.error( f"failed to {self.o['broker'].url.hostname} with {err}" ) logger.error('Exception details: ', exc_info=True) if ebo < 60: ebo *= 2 @@ -476,7 +472,7 @@ def putSetup(self): res = self.client.connect_async(self.o['broker'].url.hostname, port=self.__sslClientSetup(), properties=props) - logger.info('connecting to %s, res=%s' % (self.o['broker'].url.hostname, res)) + logger.info( f"connecting to {self.o['broker'].url.hostname}, res={res}" ) self.client.loop_start() @@ -493,8 +489,7 @@ def putSetup(self): break except Exception as err: - logger.error("failed to {} with {}".format( - self.o['broker'].url.hostname, err)) + logger.error("failed to {self.o['broker'].url.hostname} with {err}" ) logger.error('Exception details: ', exc_info=True) if ebo < 60: ebo *= 2 @@ -517,8 +512,7 @@ def __sub_on_message(client, userdata, msg): """ if userdata.o['messageDebugDump']: - logger.info("Message received: id:%d, topic:%s payload:%s" % - (msg.mid, msg.topic, msg.payload)) + logger.info( f"Message received: id:{msg.mid}, topic:{msg.topic} payload:{msg.payload}" ) m = userdata._msgDecode(msg) userdata.rx_msg[userdata.rx_msg_iFromBroker].append(m) @@ -535,7 +529,7 @@ def getCleanUp(self): props.SessionExpiryInterval = 1 for i in range(1,self.o['instances']+1): icid= self.o['queueName'] + "_i%02d" % i - logger.info('cleanup session %s' % icid ) + logger.info( f"cleanup session {icid}" ) myclient = self.__clientSetup( icid ) myclient.connect( self.o['broker'].url.hostname, port=self.__sslClientSetup(), \ clean_start=False, properties=props ) @@ -544,7 +538,7 @@ def getCleanUp(self): if self.please_stop: break myclient.disconnect() - logger.info('instance deletion for %02d done' % i) + logger.info( f"instance deletion for {i:02d} done" ) if hasattr(self, 'client'): self.client.disconnect() @@ -564,11 +558,11 @@ def _msgDecode(self, mqttMessage) -> sarracenia.Message: logger.warning('message is missing content-type header') if hasattr(mqttMessage, 'payload'): - logger.info('payload: type: %s (%d bytes) %s' % - (type(mqttMessage.payload), len(mqttMessage.payload), mqttMessage.payload)) + logger.info( f"payload: type: {type(mqttMessage.payload)}" + f"(len: {len(mqttMessage.payload)%d} bytes) body:{mqttMessage.payload}" ) if hasattr(mqttMessage.properties, 'UserProperty'): - logger.info( f"User Property: {mqttMessage.properties.UserProperty} ") + logger.info( f"User Property: {mqttMessage.properties.UserProperty}") self.metrics['rxByteCount'] += len(mqttMessage.payload) try: @@ -577,8 +571,8 @@ def _msgDecode(self, mqttMessage) -> sarracenia.Message: message = PostFormat.importAny( mqttMessage.payload.decode('utf-8'), headers, mqttMessage.properties.ContentType, self.o) except Exception as ex: - logger.error("ignored malformed message: %s" % mqttMessage.payload) - logger.error("decode error: %s" % ex) + logger.error( f"ignored malformed message: {mqttMessage.payload}" ) + logger.error( f"decode error: {ex}" ) logger.error('Exception details: ', exc_info=True) self.metrics['rxBadCount'] += 1 return None @@ -595,7 +589,7 @@ def _msgDecode(self, mqttMessage) -> sarracenia.Message: else: self.metrics['rxBadCount'] += 1 self.ack(message) - logger.error('message acknowledged and discarded: %s' % message) + logger.error( f"message acknowledged and discarded: {message}" ) return None def _rotateInputBuffers(self) -> None: @@ -710,9 +704,7 @@ def putNewMessage(self, 'ascii')) % len(self.o['exchange']) exchange = self.o['exchange'][idx] else: - logger.error( - 'do not know which exchange to publish to: %s' % - self.o['exchange']) + logger.error( f"do not know which exchange to publish to: {self.o['exchange']}" ) return else: exchange = self.o['exchange'][0] @@ -761,8 +753,8 @@ def putNewMessage(self, self.metrics['txByteCount'] += len(raw_body) self.metrics['txGoodCount'] += 1 self.metrics['txLast'] = sarracenia.nowstr() - logger.info("published mid={} ack_pending={} {} to under: {} ".format( - info.mid, ack_pending, body, topic)) + logger.info( f"published mid={info.mid} ack_pending={ack_pending} " + f"{body} to under: {topic} " ) return True #success... logger.error( f"publish failed {paho.mqtt.client.error_string(info.rc)} {info}") From 8525317a174f2dbee1213ea67c95c7bd05ac394b Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 21 Jun 2024 08:05:30 -0400 Subject: [PATCH 10/14] typo in string format mqtt --- sarracenia/moth/mqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index 81f50f71e..d87204ba7 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -559,7 +559,7 @@ def _msgDecode(self, mqttMessage) -> sarracenia.Message: if hasattr(mqttMessage, 'payload'): logger.info( f"payload: type: {type(mqttMessage.payload)}" - f"(len: {len(mqttMessage.payload)%d} bytes) body:{mqttMessage.payload}" ) + f"(len: {len(mqttMessage.payload):d} bytes) body:{mqttMessage.payload}" ) if hasattr(mqttMessage.properties, 'UserProperty'): logger.info( f"User Property: {mqttMessage.properties.UserProperty}") From 71671140c877a50f4bb15cb4ff88ddaae663daa1 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 21 Jun 2024 08:33:32 -0400 Subject: [PATCH 11/14] add component/config marker to finalize error messages --- sarracenia/config.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/sarracenia/config.py b/sarracenia/config.py index e1e663bf5..167d7eefa 100755 --- a/sarracenia/config.py +++ b/sarracenia/config.py @@ -125,8 +125,6 @@ def __repr__(self) -> str: 'sourceFromExchange': False, 'sourceFromMessage': False, 'sundew_compat_regex_first_match_is_zero': False, - 'sourceFromExchange': False, - 'sourceFromMessage': False, 'topicCopy': False, 'v2compatRenameDoublePost': False, 'varTimeOffset': 0, @@ -1982,7 +1980,7 @@ def finalize(self, component=None, config=None): valid_inlineEncodings = [ 'guess', 'text', 'binary' ] if hasattr(self, 'inlineEncoding') and self.inlineEncoding not in valid_inlineEncodings: - logger.error( f"invalid inlineEncoding: {self.inlineEncoding} must be one of: {','.join(valid_inlineEncodings)}" ) + logger.error( f"{component}/{config} invalid inlineEncoding: {self.inlineEncoding} must be one of: {','.join(valid_inlineEncodings)}" ) if hasattr(self, 'no'): if self.statehost: @@ -2004,7 +2002,7 @@ def finalize(self, component=None, config=None): path = os.path.realpath(path) if sys.platform == 'win32' and words0.find('\\'): - logger.warning("%s %s" % (words0, words1)) + logger.warning("{component}/{config} %s %s" % (words0, words1)) logger.warning( "use of backslash ( \\ ) is an escape character. For a path separator use forward slash ( / )." ) @@ -2017,7 +2015,7 @@ def finalize(self, component=None, config=None): if hasattr(self, 'pollUrl'): if not hasattr(self,'post_baseUrl') or not self.post_baseUrl : - logger.debug( f"defaulting post_baseUrl to match pollURl, since it isn't specified." ) + logger.debug( f"{component}/{config} defaulting post_baseUrl to match pollURl, since it isn't specified." ) self.post_baseUrl = self.pollUrl # verify post_baseDir @@ -2036,13 +2034,13 @@ def finalize(self, component=None, config=None): self.post_baseDir = u.path elif self.baseDir is not None: self.post_baseDir = os.path.expanduser(self.baseDir) - logger.debug("defaulting post_baseDir to same as baseDir") + logger.debug("{component}/{config} defaulting post_baseDir to same as baseDir") if self.messageCountMax > 0: if self.batch > self.messageCountMax: self.batch = self.messageCountMax - logger.info( f'overriding batch for consistency with messageCountMax: {self.batch}' ) + logger.info( f'{component}/{config} overriding batch for consistency with messageCountMax: {self.batch}' ) if (component not in ['poll' ]): self.path = list(map( os.path.expanduser, self.path )) @@ -2053,10 +2051,10 @@ def finalize(self, component=None, config=None): self.sleep=1 if self.runStateThreshold_hung < self.housekeeping: - logger.warning( f"runStateThreshold_hung {self.runStateThreshold_hung} set lower than housekeeping {self.housekeeping}. sr3 sanity might think this flow is hung kill it too quickly.") + logger.warning( f"{component}/{config} runStateThreshold_hung {self.runStateThreshold_hung} set lower than housekeeping {self.housekeeping}. sr3 sanity might think this flow is hung kill it too quickly.") if self.vip and not features['vip']['present']: - logger.critical( f"vip feature requested, but missing library: {' '.join(features['vip']['modules_needed'])} " ) + logger.critical( f"{component}/{config} vip feature requested, but missing library: {' '.join(features['vip']['modules_needed'])} " ) sys.exit(1) def check_undeclared_options(self): @@ -2065,7 +2063,7 @@ def check_undeclared_options(self): # FIXME: confused about this... commenting out for now... for f,l,u in self.undeclared: if u not in alloptions: - logger.error( f"{f}:{l} undeclared option: {u}") + logger.error( f"{component}/{config} {f}:{l} undeclared option: {u}") elif u in flag_options: if type( getattr(self,u) ) is not bool: setattr(self,u,isTrue(getattr(self,u))) @@ -2094,7 +2092,7 @@ def check_undeclared_options(self): if not hasattr(self,u): no_defaults.add( u ) - logger.debug("missing defaults: %s" % no_defaults) + logger.debug("{component}/{config} missing defaults: %s" % no_defaults) """ 2020/05/26 FIXME here begins sheer terror. From 8bcb6c49f0fb2d23651dac4235034fca532e3e33 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 21 Jun 2024 09:18:27 -0400 Subject: [PATCH 12/14] correct error messages from previous commit --- sarracenia/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sarracenia/config.py b/sarracenia/config.py index 167d7eefa..cdf9c5e6d 100755 --- a/sarracenia/config.py +++ b/sarracenia/config.py @@ -2063,7 +2063,7 @@ def check_undeclared_options(self): # FIXME: confused about this... commenting out for now... for f,l,u in self.undeclared: if u not in alloptions: - logger.error( f"{component}/{config} {f}:{l} undeclared option: {u}") + logger.error( f"{f}:{l} undeclared option: {u}") elif u in flag_options: if type( getattr(self,u) ) is not bool: setattr(self,u,isTrue(getattr(self,u))) @@ -2092,7 +2092,7 @@ def check_undeclared_options(self): if not hasattr(self,u): no_defaults.add( u ) - logger.debug("{component}/{config} missing defaults: %s" % no_defaults) + logger.debug("missing defaults: %s" % no_defaults) """ 2020/05/26 FIXME here begins sheer terror. From fd974539e56e94554941f773268d907a33774061 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 21 Jun 2024 09:52:05 -0400 Subject: [PATCH 13/14] add mqtt sourceFromExchange/Message topicOverride support over the winter the exchangeFromSource and exchangeFromMirror and some topic settings were added on the amqp side. Support for these was absent in mqtt, and it's pretty much identical, so moved the implementation to the sarracenia.Message class, and can now call deriveSource() and deriveTopics() from both amqp and mqtt implementations. --- sarracenia/__init__.py | 48 +++++++++++++++++++++++++++++++++++++++++ sarracenia/moth/amqp.py | 40 ++-------------------------------- sarracenia/moth/mqtt.py | 4 ++++ 3 files changed, 54 insertions(+), 38 deletions(-) diff --git a/sarracenia/__init__.py b/sarracenia/__init__.py index 91e53acac..b7b059c4a 100755 --- a/sarracenia/__init__.py +++ b/sarracenia/__init__.py @@ -497,6 +497,54 @@ def copyDict(msg, d): for h in d: msg[h] = d[h] + def deriveSource(msg,o): + """ + set msg['source'] field as appropriate for given message and options (o) + """ + source=None + if 'source' in o: + source = o['source'] + elif 'sourceFromExchange' in o and o['sourceFromExchange'] and 'exchange' in msg: + itisthere = re.match( "xs_([^_]+)_.*", msg['exchange'] ) + if itisthere: + source = itisthere[1] + else: + itisthere = re.match( "xs_([^_]+)", msg['exchange'] ) + if itisthere: + source = itisthere[1] + if 'source' in msg and 'sourceFromMessage' in o and o['sourceFromMessage']: + pass + elif source: + msg['source'] = source + msg['_deleteOnPost'] |= set(['source']) + + def deriveTopics(msg,o,topic,separator='.'): + """ + derive subtopic, topicPrefix, and topic fields based on message and options. + """ + msg_topic = topic.split(separator) + # topic validation... deal with DMS topic scheme. https://github.com/MetPX/sarracenia/issues/1017 + if 'topicCopy' in o and o['topicCopy']: + topicOverride=True + else: + topicOverride=False + if 'relPath' in msg: + path_topic = o['topicPrefix'] + os.path.dirname(msg['relPath']).split('/') + + if msg_topic != path_topic: + topicOverride=True + + # set subtopic if possible. + if msg_topic[0:len(o['topicPrefix'])] == o['topicPrefix']: + msg['subtopic'] = msg_topic[len(o['topicPrefix']):] + else: + topicOverride=True + + if topicOverride: + msg['topic'] = topic + msg['_deleteOnPost'] |= set( ['topic'] ) + + def dumps(msg) -> str: """ FIXME: used to be msg_dumps. diff --git a/sarracenia/moth/amqp.py b/sarracenia/moth/amqp.py index aaaa37a50..2c9429a67 100755 --- a/sarracenia/moth/amqp.py +++ b/sarracenia/moth/amqp.py @@ -138,45 +138,9 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message: topic = raw_msg.delivery_info['routing_key'].replace( '%23', '#').replace('%22', '*') msg['exchange'] = raw_msg.delivery_info['exchange'] - source=None - if 'source' in self.o: - source = self.o['source'] - elif 'sourceFromExchange' in self.o and self.o['sourceFromExchange']: - itisthere = re.match( "xs_([^_]+)_.*", msg['exchange'] ) - if itisthere: - source = itisthere[1] - else: - itisthere = re.match( "xs_([^_]+)", msg['exchange'] ) - if itisthere: - source = itisthere[1] - if 'source' in msg and 'sourceFromMessage' in self.o and self.o['sourceFromMessage']: - pass - elif source: - msg['source'] = source - msg['_deleteOnPost'] |= set(['source']) - - msg_topic = topic.split('.') - - # topic validation... deal with DMS topic scheme. https://github.com/MetPX/sarracenia/issues/1017 - if 'topicCopy' in self.o and self.o['topicCopy']: - topicOverride=True - else: - topicOverride=False - if 'relPath' in msg: - path_topic = self.o['topicPrefix'] + os.path.dirname(msg['relPath']).split('/') - - if msg_topic != path_topic: - topicOverride=True - - # set subtopic if possible. - if msg_topic[0:len(self.o['topicPrefix'])] == self.o['topicPrefix']: - msg['subtopic'] = msg_topic[len(self.o['topicPrefix']):] - else: - topicOverride=True - if topicOverride: - msg['topic'] = topic - msg['_deleteOnPost'] |= set( ['topic'] ) + msg.deriveSource( self.o ) + msg.deriveTopics( self.o, topic ) msg['ack_id'] = raw_msg.delivery_info['delivery_tag'] msg['local_offset'] = 0 diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index d87204ba7..4dcf658d7 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -577,6 +577,10 @@ def _msgDecode(self, mqttMessage) -> sarracenia.Message: self.metrics['rxBadCount'] += 1 return None + message['exchange'] = mqttMessage.topic.split('/')[0] + message.deriveSource( self.o ) + message.deriveTopics( self.o, topic=mqttMessage.topic, separator='/' ) + message['ack_id'] = mqttMessage.mid message['qos'] = mqttMessage.qos message['local_offset'] = 0 From 97385695afd7badfa006f9bf248508076d3ba065 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 21 Jun 2024 16:29:15 -0400 Subject: [PATCH 14/14] missing space in debug print --- sarracenia/moth/mqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index 4dcf658d7..959776a01 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -221,7 +221,7 @@ def __sub_on_connect(client, userdata, flags, reason_code, properties=None): (res, mid) = client.subscribe(subj, qos=userdata.o['qos']) userdata.subscribe_in_progress += 1 - logger.info( f"asked to subscribe to: {subj}, mid={mid}" + logger.info( f"asked to subscribe to: {subj}, mid={mid} " f"qos={userdata.o['qos']} result: {paho.mqtt.client.error_string(res)}" ) userdata.subscribe_mutex.release() userdata.metricsConnect()