Skip to content

Commit

Permalink
reflecting the amqp changes to mqtt for #720
Browse files Browse the repository at this point in the history
  • Loading branch information
petersilva committed Jul 25, 2023
1 parent a253e99 commit 9441178
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def _mqtt_setup_signal_handler(self, signum, stack):
logger.info("ok, asked to stop")
self.please_stop=True

def __getSetup(self):
def getSetup(self):
"""
Establish a connection to consume messages with.
"""
Expand Down Expand Up @@ -419,7 +419,7 @@ def __getSetup(self):



def __putSetup(self):
def putSetup(self):
"""
establish a connection to allow publishing.
"""
Expand Down Expand Up @@ -613,6 +613,8 @@ def newMessages(self) -> list:

#logger.debug( f"rx_msg queue before: indices: {self.rx_msg_iToApp} {self.rx_msg_iFromBroker} " )
#logger.debug( f"rx_msg queue before: {len(self.rx_msg[self.rx_msg_iToApp])} indices: {self.rx_msg_iToApp} {self.rx_msg_iFromBroker} " )
if not self.connected:
self.getSetup()

if len(self.rx_msg[self.rx_msg_iToApp]) > self.o['batch']:
mqttml = self.rx_msg[self.rx_msg_iToApp][0:self.o['batch']]
Expand All @@ -631,6 +633,9 @@ def newMessages(self) -> list:

def getNewMessage(self) -> sarracenia.Message:

if not self.connected:
self.getSetup()

if len(self.rx_msg) > 0:
m = self.rx_msg[self.rx_msg_iToApp][0]
self.rx_msg[self.rx_msg_iToApp] = self.rx_msg[self.rx_msg_iToApp][1:]
Expand Down Expand Up @@ -662,7 +667,7 @@ def putNewMessage(self,
return False

if not self.connected:
self.__putSetup()
self.putSetup()

# The caller probably doesn't expect the message to get modified by this method, so use a copy of the message
body = copy.deepcopy(body)
Expand Down

0 comments on commit 9441178

Please sign in to comment.