Skip to content

Commit

Permalink
V03 issue693 improve mqtt performance on restart_server flow test. (#747
Browse files Browse the repository at this point in the history
)

* improved restart_server score from 29 to 33. hmm...

* getting rid of subtopic in published messages.

---------

Co-authored-by: petersilva <[email protected]>
  • Loading branch information
petersilva and petersilva authored Aug 14, 2023
1 parent 0e03372 commit 59dbc7d
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 14 deletions.
2 changes: 1 addition & 1 deletion sarracenia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ def updatePaths(msg, options, new_dir=None, new_file=None):
msg[k] = options.fixed_headers[k]

msg['_deleteOnPost'] |= set([
'new_dir', 'new_file', 'new_relPath', 'new_baseUrl', 'new_subtopic', 'post_format'
'new_dir', 'new_file', 'new_relPath', 'new_baseUrl', 'new_subtopic', 'subtopic', 'post_format'
])
if new_dir:
msg['new_dir'] = new_dir
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ def run(self):
m['_deleteOnPost'] |= set(['old_relPath'])
m['relPath'] = m['new_relPath']
m['old_subtopic'] = m['subtopic']
m['_deleteOnPost'] |= set(['old_subtopic'])
m['_deleteOnPost'] |= set(['old_subtopic','subtopic'])
m['subtopic'] = m['new_subtopic']

if '_format' in m:
Expand Down
24 changes: 12 additions & 12 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,15 @@ def getSetup(self):
self.client.connect( self.o['broker'].url.hostname, port=self.__sslClientSetup(), \
clean_start=False, properties=props )
self.client.enable_logger(logger)
self.client.loop_start()
ebo=1
while (self.connect_in_progress) or (self.subscribe_in_progress > 0):
self.client.loop()
time.sleep(0.1)
logger.info( f"waiting for subscription to be set up. (ebo={ebo})")
time.sleep(0.1*ebo)
if self.please_stop:
break
logger.info("waiting for subscription to be set up.")
self.client.loop_start()
if ebo < 512 :
ebo *= 2
self.connected=True
break
else: # either 'declare' or 'foreground'
Expand Down Expand Up @@ -471,12 +473,15 @@ def putSetup(self):

self.client.loop_start()

ebo=1
while self.connect_in_progress:
time.sleep(0.1)
logger.info( f"waiting for connection to {self.o['broker']} ebo={ebo}")
time.sleep(0.1*ebo)
if self.please_stop:
break
logger.info( f"waiting for connection to {self.o['broker']}")
self.client.loop()
if ebo < 512:
ebo *= 2


if not self.connect_in_progress:
self.connected=True
Expand Down Expand Up @@ -672,9 +677,6 @@ def putNewMessage(self,
# The caller probably doesn't expect the message to get modified by this method, so use a copy of the message
body = copy.deepcopy(body)

# The caller probably doesn't expect the message to get modified by this method, so use a copy of the message
body = copy.deepcopy(body)

postFormat = body['_format']

if '_deleteOnPost' in body:
Expand All @@ -683,7 +685,6 @@ def putNewMessage(self,
# method to build json and _deleteOnPost would be a guide of what to skip.
# library for that is jsonfile, but not present in repos so far.
for k in body['_deleteOnPost']:
if k == 'subtopic': continue
if k in body:
del body[k]
del body['_deleteOnPost']
Expand Down Expand Up @@ -723,7 +724,6 @@ def putNewMessage(self,
topic = topic.replace('+', '%2B')

del headers['topic']
del body['subtopic']

if headers:
props.UserProperty=list(map( lambda x : (x,headers[x]) , headers ))
Expand Down
2 changes: 2 additions & 0 deletions sarracenia/postformat/v03.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def importMine(body, headers, options) -> sarracenia.Message:
"""
msg = sarracenia.Message()
msg["_format"] = __name__.split('.')[-1].lower()


try:
msg.copyDict(json.loads(body))
except Exception as ex:
Expand Down
1 change: 1 addition & 0 deletions sarracenia/transfer/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ def file_truncate(options, msg):
fp.close()

msg['subtopic'] = msg['relPath'].split(os.sep)[1:-1]
msg['_deleteOnPost'] |= set(['subtopic'])
#msg.set_topic(options.post_topicPrefix,msg.target_relpath)

except:
Expand Down

0 comments on commit 59dbc7d

Please sign in to comment.