Skip to content

Commit

Permalink
rename confing bindings to subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
petersilva committed Aug 7, 2024
1 parent 07d4892 commit 61ad1b9
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 70 deletions.
49 changes: 24 additions & 25 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ class Config:
cfg.component = 'subscribe'
cfg.config = 'flow_demo'
cfg.action = 'start'
cfg.bindings = [ {'exchange':'xpublic', 'topicPrefix':['v02', 'post'], 'subtopic': ['*', 'WXO-DD', 'observations', 'swob-ml', '#'] } ]
cfg.subscriptions = [ {'exchange':'xpublic', 'topicPrefix':['v02', 'post'], 'subtopic': ['*', 'WXO-DD', 'observations', 'swob-ml', '#'] } ]
cfg.queueName='q_anonymous.subscriber_test2'
cfg.download=True
cfg.batch=1
Expand Down Expand Up @@ -806,7 +806,7 @@ def __init__(self, parent=None) -> 'Config':
"""
instantiate an empty Configuration
"""
self.bindings = []
self.subscriptions = []
self.__admin = None
self.__broker = None
self.__post_broker = None
Expand Down Expand Up @@ -1340,16 +1340,16 @@ def _resolve_exchange(self):
if hasattr(self, 'exchangeSplit') and hasattr(self, 'no') and (self.no > 0):
self.exchange += "%02d" % self.no

def _empty_binding(self) -> dict:
new_binding={}
def _empty_subscription(self) -> dict:
new_subscriptions={}
for i in [ 'auto_delete', 'broker', 'durable', 'exchange', 'expire', 'message_ttl', 'prefetch', \
'qos', 'queueBind', 'queueDeclare', 'queueName', 'topicPrefix' ]:
new_binding[i] = getattr(self,i)
return new_binding
new_subscriptions[i] = getattr(self,i)
return new_subscriptions



def _parse_binding(self, subtopic_string):
def _parse_subscription(self, subtopic_string):
"""
FIXME: see original parse, with substitions for url encoding.
also should sqwawk about error if no exchange or topicPrefix defined.
Expand All @@ -1373,10 +1373,10 @@ def _parse_binding(self, subtopic_string):
subtopic = subtopic_string.split('/')

if hasattr(self, 'exchange') and hasattr(self, 'topicPrefix'):
new_binding=self._empty_binding()
new_binding['subtopic'] = subtopic
new_subscription=self._empty_subscription()
new_subscription['subtopic'] = subtopic

self.bindings.append(new_binding)
self.subscriptions.append(new_subscription)

def _parse_v2plugin(self, entryPoint, value):
"""
Expand Down Expand Up @@ -1616,8 +1616,8 @@ def parse_line(self, component, cfg, cfname, lineno, l ):
except Exception as ex:
logger.error( f"{','.join(self.files)}:{self.lineno} file {v} failed to parse: {ex}" )
logger.debug('Exception details: ', exc_info=True)
elif k in ['subtopic']:
self._parse_binding(v)
elif k in [ 'subscription', 'subscribe', 'subtopic' ]:
self._parse_subscription(v)
elif k in ['topicPrefix']:
if '/' in v :
self.topicPrefix = v.split('/')
Expand Down Expand Up @@ -1837,7 +1837,7 @@ def _resolveQueueName(self,component,cfg):
if not os.path.isdir(os.path.dirname(queuefile)):
pathlib.Path(os.path.dirname(queuefile)).mkdir(parents=True, exist_ok=True)

#disable queue name saving, need to replace with .binding files.
#disable queue name saving, need to replace with .subscription files.
#return

if not os.path.isfile(queuefile) and (self.queueName is not None):
Expand Down Expand Up @@ -2014,8 +2014,8 @@ def finalize(self, component=None, config=None):
self.novipFilename = self.pid_filename.replace('.pid', '.noVip')


if (self.bindings == [] and hasattr(self, 'exchange')):
self._parse_binding('#')
if (self.subscriptions == [] and hasattr(self, 'exchange')):
self._parse_subscription('#')

if hasattr(self, 'documentRoot') and (self.documentRoot is not None):
path = os.path.expanduser(os.path.abspath(self.documentRoot))
Expand Down Expand Up @@ -2422,12 +2422,12 @@ def variableExpansion(self, cdir, message=None ) -> str:

class addBinding(argparse.Action):
"""
called by argparse to deal with queue bindings.
called by argparse to deal with queue subscriptions.
"""
def __call__(self, parser, namespace, values, option_string):

if values == 'None':
namespace.bindings = []
namespace.subscriptions = []

namespace._resolve_exchange()
namespace._resolveQueueName(self.component,self.config)
Expand All @@ -2450,11 +2450,11 @@ def __call__(self, parser, namespace, values, option_string):
else:
topicPrefix = namespace.topicPrefix.split('/')

new_binding = namespace._empty_binding()
new_binding['topicPrefix'] = topicPrefix
new_binding['subtopic'] = values
new_subscription = namespace._empty_subscription()
new_subscription['topicPrefix'] = topicPrefix
new_subscription['subtopic'] = values

namespace.bindings.append( new_binding )
namespace.subscriptions.append( new_subscription )

def parse_args(self, isPost=False):
"""
Expand Down Expand Up @@ -2572,8 +2572,7 @@ def parse_args(self, isPost=False):
"""
"""
FIXME: in previous parser, exchange is a modifier for bindings, can have several different values for different subtopic bindings.
as currently coded, just a single value that over-writes previous setting, so only binding to a single exchange is possible.
FIXME: in previous parser, exchange is a modifier for subscriptions, can have several different values for different subtopic subscriptions.
"""

parser.add_argument('--inline',
Expand All @@ -2599,8 +2598,8 @@ def parse_args(self, isPost=False):
nargs='?',
default=self.identity_method,
help='choose a different checksumming method for the files posted')
if hasattr(self, 'bindings'):
parser.set_defaults(bindings=self.bindings)
if hasattr(self, 'subscriptions'):
parser.set_defaults(subscriptions=self.subscriptions)


parser.add_argument(
Expand Down
10 changes: 5 additions & 5 deletions sarracenia/flowcb/gather/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ def __init__(self, options) -> None:
logger.critical('missing required broker specification')
return

if not hasattr(self.o, 'bindings') or not self.o.bindings:
logger.critical('missing required bindings (exchange,subtopic) for broker')
if not hasattr(self.o, 'subscriptions') or not self.o.subscriptions:
logger.critical('missing required subscriptions (exchange,subtopic) for broker')
return

self.brokers=[]
for binding in self.o.bindings:
if type(binding) is dict:
self.brokers.append(binding['broker'])
for subscriptions in self.o.subscriptions:
if type(subscriptions) is dict:
self.brokers.append(subscriptions['broker'])

if len(self.brokers) == 0:
self.brokers=[ self.o.broker ]
Expand Down
55 changes: 28 additions & 27 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def getSetup(self) -> None:
Setup so we can get messages.
if message_strategy is stubborn, will loop here forever.
connect, declare queue, apply bindings.
connect, declare queue, apply subscriptions.
"""

ebo = 1
Expand All @@ -320,63 +320,64 @@ def getSetup(self) -> None:
signal.signal(signal.SIGINT, self._amqp_setup_signal_handler)
signal.signal(signal.SIGTERM, self._amqp_setup_signal_handler)

if 'bindings' not in self.o or self.o['bindings'] is []:
logger.critical( f"no bindings given" )
if 'subscriptions' not in self.o or self.o['subscriptions'] is []:
logger.critical( f"no subscriptions given" )
return

for binding in self.o['bindings']:
for subscription in self.o['subscriptions']:

if self.please_stop:
break

try:
if type(binding) is dict:
if type(subscription) is dict:
pass
elif type(binding) is tuple and len(binding) == 3:
new_binding = { 'exchange': binding[0], 'topicPrefix': binding[1], 'subtopic': binding[2] }
elif type(subscription) is tuple and len(subscription) == 3:
new_subscription = { 'exchange': subscription[0], \
'topicPrefix': subscription[1], 'subtopic': subscription[2] }
for i in [ 'auto_delete', 'broker', 'durable', 'exchange', 'expire', 'message_ttl', \
'prefetch', 'queueBind', 'queueDeclare', 'queueName', 'topicPrefix' ]:
new_binding[i] = self.o[i]
binding=new_binding
new_subscription[i] = self.o[i]
subscription=new_subscription
else:
logger.critical( f"binding \"{binding}\" should be a list of dictionaries ( broker, exchange, topicPrefix, subtopic )" )
logger.critical( f"subscription \"{subscription}\" should be a list of dictionaries ( broker, exchange, topicPrefix, subtopic )" )
continue

if binding['broker'] != self.o['broker']:
if subscription['broker'] != self.o['broker']:
continue

# from sr_consumer.build_connection...
if not self.__connect(binding['broker']):
if not self.__connect(subscription['broker']):
logger.critical('could not connect')
continue

# only first/lead instance needs to declare a queue and bindings.
# only first/lead instance needs to declare a queue and subscription.
if 'no' in self.o and self.o['no'] >= 2:
self.metricsConnect()
continue

if binding['prefetch'] != 0:
self.channel.basic_qos(0, binding['prefetch'], True)
if subscription['prefetch'] != 0:
self.channel.basic_qos(0, subscription['prefetch'], True)

#FIXME: test self.first_setup and props['reset']... delete queue...
broker_str = binding['broker'].geturl()
broker_str = subscription['broker'].geturl()

# from Queue declare
msg_count = self._queueDeclare()

if msg_count == -2: continue

if binding['queueBind'] and binding['queueName']:
topic = '.'.join(binding['topicPrefix'] + binding['subtopic'])
if subscription['queueBind'] and subscription['queueName']:
topic = '.'.join(subscription['topicPrefix'] + subscription['subtopic'])
if self.o['dry_run']:
logger.info('binding (dry run) %s with %s to %s (as: %s)' % \
( binding['queueName'], topic, binding['exchange'], broker_str ) )
logger.info('subscription (dry run) binding %s with %s to %s (as: %s)' % \
( subscription['queueName'], topic, subscription['exchange'], broker_str ) )
else:
logger.info('binding %s with %s to %s (as: %s)' % \
( binding['queueName'], topic, binding['exchange'], broker_str ) )
if binding['exchange']:
self.management_channel.queue_bind(binding['queueName'],
binding['exchange'], topic)
logger.info('subscription binding %s with %s to %s (as: %s)' % \
( subscription['queueName'], topic, subscription['exchange'], broker_str ) )
if subscription['exchange']:
self.management_channel.queue_bind(subscription['queueName'],
subscription['exchange'], topic)

# Setup Successfully Complete!
self.metricsConnect()
Expand All @@ -385,10 +386,10 @@ def getSetup(self) -> None:

except Exception as err:
logger.error(
f'connecting to: {binding["queueName"]}, durable: {binding["durable"]}, expire: {binding["expire"]}, auto_delete={binding["auto_delete"]}'
f'connecting to: {subscription["queueName"]}, durable: {subscription["durable"]}, expire: {subscription["expire"]}, auto_delete={subscription["auto_delete"]}'
)
logger.error("AMQP getSetup failed to {} with {}".format(
binding['broker'].url.hostname, err))
subscription['broker'].url.hostname, err))
logger.debug('Exception details: ', exc_info=True)

if not self.o['message_strategy']['stubborn']: return
Expand Down
26 changes: 13 additions & 13 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,34 +208,34 @@ def __sub_on_connect(client, userdata, flags, reason_code, properties=None):
# FIXME: enhancement could subscribe accepts multiple (subj, qos) tuples so, could do this in one RTT.
userdata.connected=True
userdata.subscribe_mutex.acquire()
for binding in userdata.o['bindings']:
for subscription in userdata.o['subscriptions']:

if 'topic' in userdata.o:
subj=userdata.o['topic']
else:
if type(binding) is dict:
if type(subscription) is dict:
pass
elif type(binding) is tuple and len(binding) == 3: # old API.
new_binding = { prefix:binding[0], 'subtopic':binding[2] }
elif type(subscription) is tuple and len(subscription) == 3: # old API.
new_subscription = { prefix:subscription[0], 'subtopic':subscription[2] }
for i in [ 'auto_delete', 'broker', 'durable', 'exchange', 'expire', 'message_ttl', 'prefetch', 'qos', 'queueBind', 'queueDeclare', 'topicPrefix' ]:
new_binding[i] = userdata.o[i]
binding = new_binding
new_subscription[i] = userdata.o[i]
subscription = new_subscription
else:
logger.critical( f"invalid binding: \"{binding}\" should be a dictionary containing ( broker, exchange, topicPrefix, subtopic )" )
logger.critical( f"invalid subscription: \"{subscription}\" should be a dictionary containing ( broker, exchange, topicPrefix, subtopic )" )
continue

if binding['broker'] != userdata.o['broker']:
if subscription['broker'] != userdata.o['broker']:
continue

logger.info( f"tuple: {binding['broker']} {binding['exchange']} {binding['topicPrefix']} {binding['subtopic']}")
logger.info( f"tuple: {subscription['broker']} {subscription['exchange']} {subscription['topicPrefix']} {subscription['subtopic']}")

subj = '/'.join(['$share', userdata.o['queueName'], binding['exchange']] +
binding['topicPrefix'] + binding['subtopic'] )
subj = '/'.join(['$share', userdata.o['queueName'], subscription['exchange']] +
subscription['topicPrefix'] + subscription['subtopic'] )

(res, mid) = client.subscribe(subj, qos=binding['qos'])
(res, mid) = client.subscribe(subj, qos=subscription['qos'])
userdata.subscribe_in_progress += 1
logger.info( f"request to subscribe to: {subj}, mid={mid} "
f"qos={binding['qos']} sent: {paho.mqtt.client.error_string(res)}" )
f"qos={subscription['qos']} sent: {paho.mqtt.client.error_string(res)}" )
userdata.subscribe_mutex.release()
userdata.metricsConnect()

Expand Down

0 comments on commit 61ad1b9

Please sign in to comment.