diff --git a/sarracenia/config.py b/sarracenia/config.py index eb17d35cf..a588805f5 100755 --- a/sarracenia/config.py +++ b/sarracenia/config.py @@ -991,6 +991,11 @@ def add_option(self, option, kind='list', default_value=None, all_values=None ): # Retreive the 'new' option & enforce the correct type. v = getattr(self, option) + if kind not in [ 'list', 'set' ] and type(v) == list: + v=v[-1] + logger.warning( f"multiple declarations of {option}={getattr(self,option)} choosing last one: {v}" ) + + if kind == 'count': count_options.append(option) if type(v) is not int: @@ -1700,7 +1705,9 @@ def finalize(self, component=None, config=None): if hasattr(self, 'post_exchangeSuffix'): self.post_exchange += '_%s' % self.post_exchangeSuffix - if hasattr(self, 'post_exchangeSplit') and self.post_exchangeSplit > 1: + if hasattr(self,'post_exchange') and (type(self.post_exchange) is list ): + pass + elif hasattr(self, 'post_exchangeSplit') and self.post_exchangeSplit > 1: l = [] for i in range(0, int(self.post_exchangeSplit)): y = self.post_exchange + '%02d' % i diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index f3acbdc76..a6708d6c0 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -198,6 +198,8 @@ def __init__(self, cfg=None): self.plugins['load'].extend(self.o.plugins_late) + self.plugins['load'].extend(self.o.destfn_scripts) + # metrics - dictionary with names of plugins as the keys self.metricsFlowReset() @@ -206,7 +208,10 @@ def metricsFlowReset(self) -> None: 'transferConnected': False, 'transferConnectStart': 0, 'transferConnectTime':0, 'transferRxBytes': 0, 'transferTxBytes': 0, 'transferRxFiles': 0, 'transferTxFiles': 0 } } - def loadCallbacks(self, plugins_to_load): + def loadCallbacks(self, plugins_to_load=None): + + if not plugins_to_load: + plugins_to_load=self.plugins['load'] for m in self.o.imports: try: @@ -369,7 +374,7 @@ def run(self): check if stop_requested once in a while, but never return otherwise. """ - if not self.loadCallbacks(self.plugins['load']+self.o.destfn_scripts): + if not self.loadCallbacks(self.plugins['load']): return logger.debug( f"working directory: {os.getpid()}" ) diff --git a/sarracenia/moth/amqp.py b/sarracenia/moth/amqp.py index 94325eac9..125583bd4 100755 --- a/sarracenia/moth/amqp.py +++ b/sarracenia/moth/amqp.py @@ -174,11 +174,7 @@ def __init__(self, props, is_subscriber) -> None: if 'logLevel' in self.o['settings'][me]: logger.setLevel(self.o['logLevel'].upper()) - if self.is_subscriber: #build_consumer - self.__getSetup() - return - else: # publisher... - self.__putSetup() + self.connection = None def __connect(self, broker) -> bool: """ @@ -226,7 +222,7 @@ def _amqp_setup_signal_handler(self, signum, stack): logger.info("ok, asked to stop") self.please_stop=True - def __getSetup(self) -> None: + def getSetup(self) -> None: """ Setup so we can get messages. @@ -332,7 +328,7 @@ def __getSetup(self) -> None: if self.please_stop: signal.raise_signal(signal.SIGINT) - def __putSetup(self) -> None: + def putSetup(self) -> None: ebo = 1 original_sigint = signal.getsignal(signal.SIGINT) @@ -466,7 +462,7 @@ def getNewMessage(self) -> sarracenia.Message: try: if not self.connection: - self.__getSetup() + self.getSetup() raw_msg = self.channel.basic_get(self.o['queueName']) if (raw_msg is None) and (self.connection.connected): @@ -533,7 +529,7 @@ def ack(self, m) -> None: # Cleanly close partially broken connection and restablish self.close() - self.__getSetup() + self.getSetup() if ebo < 60: ebo *= 2 @@ -554,10 +550,10 @@ def putNewMessage(self, return False # Check connection and channel status, try to reconnect if not connected - if (self.connection is None) or (not self.connection.connected) or (not self.channel.is_open): + if (not self.connection) or (not self.connection.connected) or (not self.channel.is_open): try: self.close() - self.__putSetup() + self.putSetup() except Exception as err: logger.warning(f"failed, connection was closed/broken and could not be re-opened {exchange}: {err}") logger.debug('Exception details: ', exc_info=True) diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index 963ef8ae6..0c1618c24 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -175,10 +175,8 @@ def __init__(self, options, is_subscriber): self.rx_msg[3]=[] self.rx_msg[4]=[] self.rx_msg_mutex.release() - self.__getSetup() - else: - self.__putSetup() + logger.warning("note: mqtt support is newish, not very well tested") def __sub_on_disconnect(client, userdata, rc, properties=None): @@ -325,7 +323,7 @@ def _mqtt_setup_signal_handler(self, signum, stack): logger.info("ok, asked to stop") self.please_stop=True - def __getSetup(self): + def getSetup(self): """ Establish a connection to consume messages with. """ @@ -421,7 +419,7 @@ def __getSetup(self): - def __putSetup(self): + def putSetup(self): """ establish a connection to allow publishing. """ @@ -615,6 +613,8 @@ def newMessages(self) -> list: #logger.debug( f"rx_msg queue before: indices: {self.rx_msg_iToApp} {self.rx_msg_iFromBroker} " ) #logger.debug( f"rx_msg queue before: {len(self.rx_msg[self.rx_msg_iToApp])} indices: {self.rx_msg_iToApp} {self.rx_msg_iFromBroker} " ) + if not self.connected: + self.getSetup() if len(self.rx_msg[self.rx_msg_iToApp]) > self.o['batch']: mqttml = self.rx_msg[self.rx_msg_iToApp][0:self.o['batch']] @@ -633,6 +633,9 @@ def newMessages(self) -> list: def getNewMessage(self) -> sarracenia.Message: + if not self.connected: + self.getSetup() + if len(self.rx_msg) > 0: m = self.rx_msg[self.rx_msg_iToApp][0] self.rx_msg[self.rx_msg_iToApp] = self.rx_msg[self.rx_msg_iToApp][1:] @@ -664,7 +667,10 @@ def putNewMessage(self, return False if not self.connected: - self.__putSetup() + self.putSetup() + + # The caller probably doesn't expect the message to get modified by this method, so use a copy of the message + body = copy.deepcopy(body) # The caller probably doesn't expect the message to get modified by this method, so use a copy of the message body = copy.deepcopy(body) diff --git a/sarracenia/sr.py b/sarracenia/sr.py index b88f54a7e..a2a2b0211 100755 --- a/sarracenia/sr.py +++ b/sarracenia/sr.py @@ -1276,6 +1276,7 @@ def declare(self): 'exchange': o.resolved_exchanges, 'message_strategy': { 'stubborn':True } }) + xdc.putSetup() xdc.close() # then declare and bind queues.... @@ -1296,6 +1297,7 @@ def declare(self): od['queueName'] = o.resolved_qname od['dry_run'] = self.options.dry_run qdc = sarracenia.moth.Moth.subFactory(od) + qdc.getSetup() qdc.close() def disable(self): @@ -1649,8 +1651,18 @@ def config_show(self): continue o = self.configs[c][cfg]['options'] - print('\nConfig of %s/%s: ' % (c, cfg)) - o.dump() + o.no=0 + o.finalize() + if c not in [ 'cpost', 'cpump' ]: + flow = sarracenia.flow.Flow.factory(o) + flow.loadCallbacks() + print('\nConfig of %s/%s: (with callbacks)' % (c, cfg)) + flow.o.dump() + del flow + flow=None + else: + print('\nConfig of %s/%s: ' % (c, cfg)) + o.dump() def remove(self):