diff --git a/sarracenia/__init__.py b/sarracenia/__init__.py index 6dcefd7a5..3d455a2d3 100755 --- a/sarracenia/__init__.py +++ b/sarracenia/__init__.py @@ -742,7 +742,7 @@ def updatePaths(msg, options, new_dir=None, new_file=None): msg[k] = options.fixed_headers[k] msg['_deleteOnPost'] |= set([ - 'new_dir', 'new_file', 'new_relPath', 'new_baseUrl', 'new_subtopic', 'post_format' + 'new_dir', 'new_file', 'new_relPath', 'new_baseUrl', 'new_subtopic', 'subtopic', 'post_format' ]) if new_dir: msg['new_dir'] = new_dir diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index 7d8c1a670..ad6f79613 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -524,7 +524,7 @@ def run(self): m['_deleteOnPost'] |= set(['old_relPath']) m['relPath'] = m['new_relPath'] m['old_subtopic'] = m['subtopic'] - m['_deleteOnPost'] |= set(['old_subtopic']) + m['_deleteOnPost'] |= set(['old_subtopic','subtopic']) m['subtopic'] = m['new_subtopic'] if '_format' in m: diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index d0851b87b..22025f1e3 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -375,13 +375,15 @@ def getSetup(self): self.client.connect( self.o['broker'].url.hostname, port=self.__sslClientSetup(), \ clean_start=False, properties=props ) self.client.enable_logger(logger) + self.client.loop_start() + ebo=1 while (self.connect_in_progress) or (self.subscribe_in_progress > 0): - self.client.loop() - time.sleep(0.1) + logger.info( f"waiting for subscription to be set up. (ebo={ebo})") + time.sleep(0.1*ebo) if self.please_stop: break - logger.info("waiting for subscription to be set up.") - self.client.loop_start() + if ebo < 512 : + ebo *= 2 self.connected=True break else: # either 'declare' or 'foreground' @@ -471,12 +473,15 @@ def putSetup(self): self.client.loop_start() + ebo=1 while self.connect_in_progress: - time.sleep(0.1) + logger.info( f"waiting for connection to {self.o['broker']} ebo={ebo}") + time.sleep(0.1*ebo) if self.please_stop: break - logger.info( f"waiting for connection to {self.o['broker']}") - self.client.loop() + if ebo < 512: + ebo *= 2 + if not self.connect_in_progress: self.connected=True @@ -672,9 +677,6 @@ def putNewMessage(self, # The caller probably doesn't expect the message to get modified by this method, so use a copy of the message body = copy.deepcopy(body) - # The caller probably doesn't expect the message to get modified by this method, so use a copy of the message - body = copy.deepcopy(body) - postFormat = body['_format'] if '_deleteOnPost' in body: @@ -683,7 +685,6 @@ def putNewMessage(self, # method to build json and _deleteOnPost would be a guide of what to skip. # library for that is jsonfile, but not present in repos so far. for k in body['_deleteOnPost']: - if k == 'subtopic': continue if k in body: del body[k] del body['_deleteOnPost'] @@ -723,7 +724,6 @@ def putNewMessage(self, topic = topic.replace('+', '%2B') del headers['topic'] - del body['subtopic'] if headers: props.UserProperty=list(map( lambda x : (x,headers[x]) , headers )) diff --git a/sarracenia/postformat/v03.py b/sarracenia/postformat/v03.py index 80dcbd63c..fe6697220 100644 --- a/sarracenia/postformat/v03.py +++ b/sarracenia/postformat/v03.py @@ -35,6 +35,8 @@ def importMine(body, headers, options) -> sarracenia.Message: """ msg = sarracenia.Message() msg["_format"] = __name__.split('.')[-1].lower() + + try: msg.copyDict(json.loads(body)) except Exception as ex: diff --git a/sarracenia/transfer/file.py b/sarracenia/transfer/file.py index e534570a8..bc05c49c4 100755 --- a/sarracenia/transfer/file.py +++ b/sarracenia/transfer/file.py @@ -376,6 +376,7 @@ def file_truncate(options, msg): fp.close() msg['subtopic'] = msg['relPath'].split(os.sep)[1:-1] + msg['_deleteOnPost'] |= set(['subtopic']) #msg.set_topic(options.post_topicPrefix,msg.target_relpath) except: