Skip to content

Commit

Permalink
Merge branch 'issue339_part2_queueNames' into issue339_feature_multib…
Browse files Browse the repository at this point in the history
…roker_subscriptions
  • Loading branch information
petersilva committed Aug 7, 2024
2 parents fdf7d2a + 8ddeeae commit 490fb1b
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 71 deletions.
40 changes: 27 additions & 13 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __repr__(self) -> str:
'post_baseDir': None,
'post_baseUrl': None,
'post_format': 'v03',
'qos': 1,
'realpathPost': False,
'recursive' : True,
'runStateThreshold_reject': 80,
Expand All @@ -133,7 +134,7 @@ def __repr__(self) -> str:

count_options = [
'batch', 'count', 'exchangeSplit', 'instances', 'logRotateCount', 'no',
'post_exchangeSplit', 'prefetch', 'messageCountMax', 'runStateThreshold_cpuSlow',
'post_exchangeSplit', 'prefetch', 'qos', 'messageCountMax', 'runStateThreshold_cpuSlow',
'runStateThreshold_reject', 'runStateThreshold_retry', 'runStateThreshold_slow',
]

Expand Down Expand Up @@ -650,7 +651,7 @@ class Config:
cfg.component = 'subscribe'
cfg.config = 'flow_demo'
cfg.action = 'start'
cfg.bindings = [ ('xpublic', ['v02', 'post'], ['*', 'WXO-DD', 'observations', 'swob-ml', '#' ]) ]
cfg.bindings = [ {'exchange':'xpublic', 'topicPrefix':['v02', 'post'], 'subtopic': ['*', 'WXO-DD', 'observations', 'swob-ml', '#'] } ]
cfg.queueName='q_anonymous.subscriber_test2'
cfg.download=True
cfg.batch=1
Expand Down Expand Up @@ -1346,21 +1347,31 @@ def _parse_binding(self, subtopic_string):
also should sqwawk about error if no exchange or topicPrefix defined.
also None to reset to empty, not done.
"""
if hasattr(self, 'broker') and self.broker is not None and self.broker.url is not None:
self._resolve_exchange()
if not hasattr(self, 'broker') or not self.broker or not self.broker.url:
logger.critical( f"need broker setting before subtopic" )
return

self._resolve_exchange()
self._resolveQueueName(self.component,self.config)

if type(subtopic_string) is str:
if not hasattr(self, 'broker') or self.broker is None or self.broker.url is None:
logger.error( f"{','.join(self.files)}:{self.lineno} broker needed before subtopic" )
return

if self.broker.url.scheme == 'amq' :
subtopic = subtopic_string.split('.')
else:
subtopic = subtopic_string.split('/')
if self.broker.url.scheme == 'amq' :
subtopic = subtopic_string.split('.')
else:
subtopic = subtopic_string.split('/')

if hasattr(self, 'exchange') and hasattr(self, 'topicPrefix'):
self.bindings.append((self.broker, self.exchange, self.topicPrefix, subtopic))
new_binding={}
for i in [ 'auto_delete', 'broker', 'durable', 'exchange', 'expire', 'message_ttl', 'prefetch', \
'qos', 'queueBind', 'queueDeclare', 'queueName', 'topicPrefix' ]:
new_binding[i] = getattr(self,i)
new_binding['subtopic'] = subtopic

self.bindings.append(new_binding)

def _parse_v2plugin(self, entryPoint, value):
"""
Expand Down Expand Up @@ -1802,9 +1813,7 @@ def _resolveQueueName(self,component,cfg):
else:
# only lead instance (0-foreground, 1-start, or none in the case of 'declare')
# should write the state file.


# lead instance shou
if os.path.isfile(queuefile):
f = open(queuefile, 'r')
self.queueName = f.read()
Expand All @@ -1823,6 +1832,9 @@ def _resolveQueueName(self,component,cfg):
if not os.path.isdir(os.path.dirname(queuefile)):
pathlib.Path(os.path.dirname(queuefile)).mkdir(parents=True, exist_ok=True)

#disable queue name saving, need to replace with .binding files.
#return

if not os.path.isfile(queuefile) and (self.queueName is not None):
tmpQfile=queuefile+'.tmp'
if not os.path.isfile(tmpQfile):
Expand Down Expand Up @@ -1998,7 +2010,7 @@ def finalize(self, component=None, config=None):


if (self.bindings == [] and hasattr(self, 'exchange')):
self.bindings = [(self.broker, self.exchange, self.topicPrefix, [ '#' ])]
self._parse_binding('#')

if hasattr(self, 'documentRoot') and (self.documentRoot is not None):
path = os.path.expanduser(os.path.abspath(self.documentRoot))
Expand Down Expand Up @@ -2413,6 +2425,7 @@ def __call__(self, parser, namespace, values, option_string):
namespace.bindings = []

namespace._resolve_exchange()
namespace._resolveQueueName(self.component,self.config)

if not hasattr(namespace, 'broker'):
raise Exception('broker needed before subtopic')
Expand All @@ -2433,7 +2446,7 @@ def __call__(self, parser, namespace, values, option_string):
topicPrefix = namespace.topicPrefix.split('/')

namespace.bindings.append(
(namespace.broker, namespace.exchange, topicPrefix, values))
{'broker':namespace.broker, 'exchange':namespace.exchange, 'topicPrefix':topicPrefix, 'subtopic':values})

def parse_args(self, isPost=False):
"""
Expand Down Expand Up @@ -2755,6 +2768,7 @@ def one_config(component, config, action, isPost=False):

cfg.applyComponentDefaults( component )

cfg.action = action
store_pwd = os.getcwd()

os.chdir(get_user_config_dir())
Expand Down
4 changes: 2 additions & 2 deletions sarracenia/flowcb/gather/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def __init__(self, options) -> None:

self.brokers=[]
for binding in self.o.bindings:
if len(binding) >= 4 and binding[0] not in self.brokers:
self.brokers.append(binding[0])
if type(binding) is dict:
self.brokers.append(binding['broker'])

if len(self.brokers) == 0:
self.brokers=[ self.o.broker ]
Expand Down
86 changes: 42 additions & 44 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,77 +320,75 @@ def getSetup(self) -> None:
signal.signal(signal.SIGINT, self._amqp_setup_signal_handler)
signal.signal(signal.SIGTERM, self._amqp_setup_signal_handler)

while True:
if 'bindings' not in self.o or self.o['bindings'] is []:
logger.critical( f"no bindings given" )
return

for binding in self.o['bindings']:

if self.please_stop:
break

if 'broker' not in self.o or self.o['broker'] is None:
logger.critical( f"no broker given" )
break

# It does not really matter how it fails, the recovery approach is always the same:
# tear the whole thing down, and start over.
try:
if type(binding) is dict:
pass
elif type(binding) is tuple and len(binding) == 3:
new_binding = { 'exchange': binding[0], 'topicPrefix': binding[1], 'subtopic': binding[2] }
for i in [ 'auto_delete', 'broker', 'durable', 'exchange', 'expire', 'message_ttl', \
'prefetch', 'queueBind', 'queueDeclare', 'queueName', 'topicPrefix' ]:
new_binding[i] = self.o[i]
binding=new_binding
else:
logger.critical( f"binding \"{binding}\" should be a list of dictionaries ( broker, exchange, topicPrefix, subtopic )" )
continue

if binding['broker'] != self.o['broker']:
continue

# from sr_consumer.build_connection...
if not self.__connect(self.o['broker']):
if not self.__connect(binding['broker']):
logger.critical('could not connect')
break
continue

# only first/lead instance needs to declare a queue and bindings.
if 'no' in self.o and self.o['no'] >= 2:
self.metricsConnect()
return
continue

#logger.info('getSetup connected to {}'.format(self.o['broker'].url.hostname) )

if self.o['prefetch'] != 0:
self.channel.basic_qos(0, self.o['prefetch'], True)
if binding['prefetch'] != 0:
self.channel.basic_qos(0, binding['prefetch'], True)

#FIXME: test self.first_setup and props['reset']... delete queue...
broker_str = self.o['broker'].url.geturl().replace(
':' + self.o['broker'].url.password + '@', '@')
broker_str = binding['broker'].geturl()

# from Queue declare
msg_count = self._queueDeclare()

if msg_count == -2: break

if self.o['queueBind'] and self.o['queueName']:
for tup in self.o['bindings']:
if len(tup) == 4:
broker, exchange, prefix, subtopic = tup
elif len(tup) == 3:
exchange, prefix, subtopic = tup
broker = self.o['broker']
else:
logger.critical( f"binding \"{tup}\" should be a list of tuples ( broker, exchange, prefix, subtopic )" )
continue
logger.critical( f"{broker=} {self.o['broker']=} ")
if broker != self.o['broker']:
continue
topic = '.'.join(prefix + subtopic)
if self.o['dry_run']:
logger.info('binding (dry run) %s with %s to %s (as: %s)' % \
( self.o['queueName'], topic, exchange, broker_str ) )
else:
logger.info('binding %s with %s to %s (as: %s)' % \
( self.o['queueName'], topic, exchange, broker_str ) )
if exchange:
self.management_channel.queue_bind(self.o['queueName'], exchange,
topic)
if msg_count == -2: continue

if binding['queueBind'] and binding['queueName']:
topic = '.'.join(binding['topicPrefix'] + binding['subtopic'])
if self.o['dry_run']:
logger.info('binding (dry run) %s with %s to %s (as: %s)' % \
( binding['queueName'], topic, binding['exchange'], broker_str ) )
else:
logger.info('binding %s with %s to %s (as: %s)' % \
( binding['queueName'], topic, binding['exchange'], broker_str ) )
if binding['exchange']:
self.management_channel.queue_bind(binding['queueName'],
binding['exchange'], topic)

# Setup Successfully Complete!
self.metricsConnect()
logger.debug('getSetup ... Done!')
break
continue

except Exception as err:
logger.error(
f'connecting to: {self.o["queueName"]}, durable: {self.o["durable"]}, expire: {self.o["expire"]}, auto_delete={self.o["auto_delete"]}'
f'connecting to: {binding["queueName"]}, durable: {binding["durable"]}, expire: {binding["expire"]}, auto_delete={binding["auto_delete"]}'
)
logger.error("AMQP getSetup failed to {} with {}".format(
self.o['broker'].url.hostname, err))
binding['broker'].url.hostname, err))
logger.debug('Exception details: ', exc_info=True)

if not self.o['message_strategy']['stubborn']: return
Expand Down
29 changes: 17 additions & 12 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,29 +208,34 @@ def __sub_on_connect(client, userdata, flags, reason_code, properties=None):
# FIXME: enhancement could subscribe accepts multiple (subj, qos) tuples so, could do this in one RTT.
userdata.connected=True
userdata.subscribe_mutex.acquire()
for binding_tuple in userdata.o['bindings']:
for binding in userdata.o['bindings']:

if 'topic' in userdata.o:
subj=userdata.o['topic']
else:
if len(binding_tuple) == 4:
broker, exchange, prefix, subtopic = binding_tuple
elif len(binding_tuple) == 3:
exchange, prefix, subtopic = binding_tuple
broker = userdata.o['broker']
if type(binding) is dict:
pass
elif type(binding) is tuple and len(binding) == 3: # old API.
new_binding = { prefix:binding[0], 'subtopic':binding[2] }
for i in [ 'auto_delete', 'broker', 'durable', 'exchange', 'expire', 'message_ttl', 'prefetch', 'qos', 'queueBind', 'queueDeclare', 'topicPrefix' ]:
new_binding[i] = userdata.o[i]
binding = new_binding
else:
logger.critical( f"invalid binding: \"{binding_tuple}\" should be a tuple containing ( broker, exchange, topicPrefix, subtopic )" )
logger.critical( f"invalid binding: \"{binding}\" should be a dictionary containing ( broker, exchange, topicPrefix, subtopic )" )
continue

logger.info( f"tuple: {broker} {exchange} {prefix} {subtopic}")
if binding['broker'] != userdata.o['broker']:
continue

logger.info( f"tuple: {binding['broker']} {binding['exchange']} {binding['topicPrefix']} {binding['subtopic']}")

subj = '/'.join(['$share', userdata.o['queueName'], exchange] +
prefix + subtopic)
subj = '/'.join(['$share', userdata.o['queueName'], binding['exchange']] +
binding['topicPrefix'] + binding['subtopic'] )

(res, mid) = client.subscribe(subj, qos=userdata.o['qos'])
(res, mid) = client.subscribe(subj, qos=binding['qos'])
userdata.subscribe_in_progress += 1
logger.info( f"request to subscribe to: {subj}, mid={mid} "
f"qos={userdata.o['qos']} sent: {paho.mqtt.client.error_string(res)}" )
f"qos={binding['qos']} sent: {paho.mqtt.client.error_string(res)}" )
userdata.subscribe_mutex.release()
userdata.metricsConnect()

Expand Down

0 comments on commit 490fb1b

Please sign in to comment.