diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index 179aecaa8..552d07a3f 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -143,7 +143,7 @@ def __init__(self, options, is_subscriber): if 'logLevel' in self.o['settings'][me]: logger.setLevel(self.o['logLevel'].upper()) - + self.proto_version = paho.mqtt.client.MQTTv5 if 'receiveMaximum' in self.o and type(self.o['receiveMaximum']) is not int: @@ -300,11 +300,9 @@ def __clientSetup(self, cid) -> paho.mqtt.client.Client: if (self.o['broker'].url.scheme[-2:] == 'ws' ) or \ (self.o['broker'].url.scheme[-1] == 'w' ) : - logger.critical("yo! FIXME. I have websockets") client = paho.mqtt.client.Client( userdata=self, transport="websockets", \ client_id=cid, protocol=paho.mqtt.client.MQTTv5 ) else: - logger.critical("yo! FIXME. darnwebnot") client = paho.mqtt.client.Client( userdata=self, \ client_id=cid, protocol=paho.mqtt.client.MQTTv5 ) @@ -441,11 +439,9 @@ def __putSetup(self): if (self.o['broker'].url.scheme[-2:] == 'ws' ) or \ (self.o['broker'].url.scheme[-1] == 'w' ) : - logger.critical("yo! FIXME. I have websockets") self.client = paho.mqtt.client.Client( userdata=self, transport="websockets", \ protocol=self.proto_version ) else: - logger.critical("yo! FIXME. darnwebnot") self.client = paho.mqtt.client.Client( userdata=self, \ protocol=self.proto_version ) @@ -465,10 +461,15 @@ def __putSetup(self): res = self.client.connect_async(self.o['broker'].url.hostname, port=self.__sslClientSetup(), properties=props) - logger.info('connecting to %s, res=%s' % - (self.o['broker'].url.hostname, res)) + logger.info('connecting to %s, res=%s' % (self.o['broker'].url.hostname, res)) self.client.loop_start() + + while self.connect_in_progress: + time.sleep(0.1) + logger.info( f"waiting for connection to {self.o['broker']}") + self.client.loop() + self.connected=True break @@ -652,6 +653,9 @@ def putNewMessage(self, if not self.connected: self.__putSetup() + # The caller probably doesn't expect the message to get modified by this method, so use a copy of the message + body = copy.deepcopy(body) + postFormat = body['_format'] if '_deleteOnPost' in body: @@ -727,6 +731,7 @@ def putNewMessage(self, logger.info("published mid={} ack_pending={} {} to under: {} ".format( info.mid, ack_pending, body, topic)) return True #success... + logger.error( f"publish failed {paho.mqtt.client.error_string(info.rc)} ") except Exception as ex: logger.error('Exception details: ', exc_info=True)