Skip to content

Commit

Permalink
V03 issue720 -- fixing crash when a flag option is entered twice crea…
Browse files Browse the repository at this point in the history
…ting a list. (#729)

* fixes #720 for AMQP, invoking plugins so the declaration happens.

* starting on #415... syntax works, but connects fail.

* issue #415 websocket connection takes more time... need to wait for
establishment to complete.

* for #415 exchangeSplit implement lost (regreesion)
corrected with new topicDerive in postformat parent class, same for v02, and v03.

* tmate always hangs manually invoked tests in spite of not being
selected. disabling.

* for #720... do not instantiate flow for C components!

* reflecting the amqp changes to mqtt for #720

* stray debug message suppressed.

---------

Co-authored-by: petersilva <[email protected]>
  • Loading branch information
petersilva and petersilva authored Jul 27, 2023
1 parent 103e94e commit 4fffe9c
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 22 deletions.
9 changes: 8 additions & 1 deletion sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,11 @@ def add_option(self, option, kind='list', default_value=None, all_values=None ):
# Retreive the 'new' option & enforce the correct type.
v = getattr(self, option)

if kind not in [ 'list', 'set' ] and type(v) == list:
v=v[-1]
logger.warning( f"multiple declarations of {option}={getattr(self,option)} choosing last one: {v}" )


if kind == 'count':
count_options.append(option)
if type(v) is not int:
Expand Down Expand Up @@ -1700,7 +1705,9 @@ def finalize(self, component=None, config=None):
if hasattr(self, 'post_exchangeSuffix'):
self.post_exchange += '_%s' % self.post_exchangeSuffix

if hasattr(self, 'post_exchangeSplit') and self.post_exchangeSplit > 1:
if hasattr(self,'post_exchange') and (type(self.post_exchange) is list ):
pass
elif hasattr(self, 'post_exchangeSplit') and self.post_exchangeSplit > 1:
l = []
for i in range(0, int(self.post_exchangeSplit)):
y = self.post_exchange + '%02d' % i
Expand Down
9 changes: 7 additions & 2 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ def __init__(self, cfg=None):

self.plugins['load'].extend(self.o.plugins_late)

self.plugins['load'].extend(self.o.destfn_scripts)

# metrics - dictionary with names of plugins as the keys
self.metricsFlowReset()

Expand All @@ -206,7 +208,10 @@ def metricsFlowReset(self) -> None:
'transferConnected': False, 'transferConnectStart': 0, 'transferConnectTime':0,
'transferRxBytes': 0, 'transferTxBytes': 0, 'transferRxFiles': 0, 'transferTxFiles': 0 } }

def loadCallbacks(self, plugins_to_load):
def loadCallbacks(self, plugins_to_load=None):

if not plugins_to_load:
plugins_to_load=self.plugins['load']

for m in self.o.imports:
try:
Expand Down Expand Up @@ -369,7 +374,7 @@ def run(self):
check if stop_requested once in a while, but never return otherwise.
"""

if not self.loadCallbacks(self.plugins['load']+self.o.destfn_scripts):
if not self.loadCallbacks(self.plugins['load']):
return

logger.debug( f"working directory: {os.getpid()}" )
Expand Down
18 changes: 7 additions & 11 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,7 @@ def __init__(self, props, is_subscriber) -> None:
if 'logLevel' in self.o['settings'][me]:
logger.setLevel(self.o['logLevel'].upper())

if self.is_subscriber: #build_consumer
self.__getSetup()
return
else: # publisher...
self.__putSetup()
self.connection = None

def __connect(self, broker) -> bool:
"""
Expand Down Expand Up @@ -226,7 +222,7 @@ def _amqp_setup_signal_handler(self, signum, stack):
logger.info("ok, asked to stop")
self.please_stop=True

def __getSetup(self) -> None:
def getSetup(self) -> None:
"""
Setup so we can get messages.
Expand Down Expand Up @@ -332,7 +328,7 @@ def __getSetup(self) -> None:
if self.please_stop:
signal.raise_signal(signal.SIGINT)

def __putSetup(self) -> None:
def putSetup(self) -> None:

ebo = 1
original_sigint = signal.getsignal(signal.SIGINT)
Expand Down Expand Up @@ -466,7 +462,7 @@ def getNewMessage(self) -> sarracenia.Message:

try:
if not self.connection:
self.__getSetup()
self.getSetup()

raw_msg = self.channel.basic_get(self.o['queueName'])
if (raw_msg is None) and (self.connection.connected):
Expand Down Expand Up @@ -533,7 +529,7 @@ def ack(self, m) -> None:

# Cleanly close partially broken connection and restablish
self.close()
self.__getSetup()
self.getSetup()

if ebo < 60: ebo *= 2

Expand All @@ -554,10 +550,10 @@ def putNewMessage(self,
return False

# Check connection and channel status, try to reconnect if not connected
if (self.connection is None) or (not self.connection.connected) or (not self.channel.is_open):
if (not self.connection) or (not self.connection.connected) or (not self.channel.is_open):
try:
self.close()
self.__putSetup()
self.putSetup()
except Exception as err:
logger.warning(f"failed, connection was closed/broken and could not be re-opened {exchange}: {err}")
logger.debug('Exception details: ', exc_info=True)
Expand Down
18 changes: 12 additions & 6 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,8 @@ def __init__(self, options, is_subscriber):
self.rx_msg[3]=[]
self.rx_msg[4]=[]
self.rx_msg_mutex.release()
self.__getSetup()
else:
self.__putSetup()


logger.warning("note: mqtt support is newish, not very well tested")

def __sub_on_disconnect(client, userdata, rc, properties=None):
Expand Down Expand Up @@ -325,7 +323,7 @@ def _mqtt_setup_signal_handler(self, signum, stack):
logger.info("ok, asked to stop")
self.please_stop=True

def __getSetup(self):
def getSetup(self):
"""
Establish a connection to consume messages with.
"""
Expand Down Expand Up @@ -421,7 +419,7 @@ def __getSetup(self):



def __putSetup(self):
def putSetup(self):
"""
establish a connection to allow publishing.
"""
Expand Down Expand Up @@ -615,6 +613,8 @@ def newMessages(self) -> list:

#logger.debug( f"rx_msg queue before: indices: {self.rx_msg_iToApp} {self.rx_msg_iFromBroker} " )
#logger.debug( f"rx_msg queue before: {len(self.rx_msg[self.rx_msg_iToApp])} indices: {self.rx_msg_iToApp} {self.rx_msg_iFromBroker} " )
if not self.connected:
self.getSetup()

if len(self.rx_msg[self.rx_msg_iToApp]) > self.o['batch']:
mqttml = self.rx_msg[self.rx_msg_iToApp][0:self.o['batch']]
Expand All @@ -633,6 +633,9 @@ def newMessages(self) -> list:

def getNewMessage(self) -> sarracenia.Message:

if not self.connected:
self.getSetup()

if len(self.rx_msg) > 0:
m = self.rx_msg[self.rx_msg_iToApp][0]
self.rx_msg[self.rx_msg_iToApp] = self.rx_msg[self.rx_msg_iToApp][1:]
Expand Down Expand Up @@ -664,7 +667,10 @@ def putNewMessage(self,
return False

if not self.connected:
self.__putSetup()
self.putSetup()

# The caller probably doesn't expect the message to get modified by this method, so use a copy of the message
body = copy.deepcopy(body)

# The caller probably doesn't expect the message to get modified by this method, so use a copy of the message
body = copy.deepcopy(body)
Expand Down
16 changes: 14 additions & 2 deletions sarracenia/sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,7 @@ def declare(self):
'exchange': o.resolved_exchanges,
'message_strategy': { 'stubborn':True }
})
xdc.putSetup()
xdc.close()

# then declare and bind queues....
Expand All @@ -1296,6 +1297,7 @@ def declare(self):
od['queueName'] = o.resolved_qname
od['dry_run'] = self.options.dry_run
qdc = sarracenia.moth.Moth.subFactory(od)
qdc.getSetup()
qdc.close()

def disable(self):
Expand Down Expand Up @@ -1649,8 +1651,18 @@ def config_show(self):
continue

o = self.configs[c][cfg]['options']
print('\nConfig of %s/%s: ' % (c, cfg))
o.dump()
o.no=0
o.finalize()
if c not in [ 'cpost', 'cpump' ]:
flow = sarracenia.flow.Flow.factory(o)
flow.loadCallbacks()
print('\nConfig of %s/%s: (with callbacks)' % (c, cfg))
flow.o.dump()
del flow
flow=None
else:
print('\nConfig of %s/%s: ' % (c, cfg))
o.dump()

def remove(self):

Expand Down

0 comments on commit 4fffe9c

Please sign in to comment.