Skip to content

Commit

Permalink
issue #415 websocket connection takes more time... need to wait for
Browse files Browse the repository at this point in the history
establishment to complete.
  • Loading branch information
petersilva committed Jul 11, 2023
1 parent a9c3a9c commit 2c757e0
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def __init__(self, options, is_subscriber):

if 'logLevel' in self.o['settings'][me]:
logger.setLevel(self.o['logLevel'].upper())

self.proto_version = paho.mqtt.client.MQTTv5

if 'receiveMaximum' in self.o and type(self.o['receiveMaximum']) is not int:
Expand Down Expand Up @@ -300,11 +300,9 @@ def __clientSetup(self, cid) -> paho.mqtt.client.Client:

if (self.o['broker'].url.scheme[-2:] == 'ws' ) or \
(self.o['broker'].url.scheme[-1] == 'w' ) :
logger.critical("yo! FIXME. I have websockets")
client = paho.mqtt.client.Client( userdata=self, transport="websockets", \
client_id=cid, protocol=paho.mqtt.client.MQTTv5 )
else:
logger.critical("yo! FIXME. darnwebnot")
client = paho.mqtt.client.Client( userdata=self, \
client_id=cid, protocol=paho.mqtt.client.MQTTv5 )

Expand Down Expand Up @@ -441,11 +439,9 @@ def __putSetup(self):

if (self.o['broker'].url.scheme[-2:] == 'ws' ) or \
(self.o['broker'].url.scheme[-1] == 'w' ) :
logger.critical("yo! FIXME. I have websockets")
self.client = paho.mqtt.client.Client( userdata=self, transport="websockets", \
protocol=self.proto_version )
else:
logger.critical("yo! FIXME. darnwebnot")
self.client = paho.mqtt.client.Client( userdata=self, \
protocol=self.proto_version )

Expand All @@ -465,10 +461,15 @@ def __putSetup(self):
res = self.client.connect_async(self.o['broker'].url.hostname,
port=self.__sslClientSetup(),
properties=props)
logger.info('connecting to %s, res=%s' %
(self.o['broker'].url.hostname, res))
logger.info('connecting to %s, res=%s' % (self.o['broker'].url.hostname, res))

self.client.loop_start()

while self.connect_in_progress:
time.sleep(0.1)
logger.info( f"waiting for connection to {self.o['broker']}")
self.client.loop()

self.connected=True
break

Expand Down Expand Up @@ -652,6 +653,9 @@ def putNewMessage(self,
if not self.connected:
self.__putSetup()

# The caller probably doesn't expect the message to get modified by this method, so use a copy of the message
body = copy.deepcopy(body)

postFormat = body['_format']

if '_deleteOnPost' in body:
Expand Down Expand Up @@ -727,6 +731,7 @@ def putNewMessage(self,
logger.info("published mid={} ack_pending={} {} to under: {} ".format(
info.mid, ack_pending, body, topic))
return True #success...
logger.error( f"publish failed {paho.mqtt.client.error_string(info.rc)} ")

except Exception as ex:
logger.error('Exception details: ', exc_info=True)
Expand Down

0 comments on commit 2c757e0

Please sign in to comment.