Skip to content

Commit

Permalink
Issue339 making a new feature branch that will contain stuff as it ma…
Browse files Browse the repository at this point in the history
…tures... (#1153)

* adding broker to bindings

* basic multi-broker consumer working

* passing static flow now.

needed a string index for url. json encodeer was barfing on credentials,
created a string routine for it to use instead.

I think I might have a similar problem in flakey with retry queues and
messages.

also acks were not working (self.consumers instead of self.consumer)
and got rid of some debug output.

* now passing flakey

* now passing dynamic

* make compatible with previous binding, existing python code still works

* flow_maint passes

* no debug in flow_api example
  • Loading branch information
petersilva authored Aug 7, 2024
1 parent cbedb30 commit fdf7d2a
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 51 deletions.
6 changes: 3 additions & 3 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ def _parse_binding(self, subtopic_string):
subtopic = subtopic_string.split('/')

if hasattr(self, 'exchange') and hasattr(self, 'topicPrefix'):
self.bindings.append((self.exchange, self.topicPrefix, subtopic))
self.bindings.append((self.broker, self.exchange, self.topicPrefix, subtopic))

def _parse_v2plugin(self, entryPoint, value):
"""
Expand Down Expand Up @@ -1998,7 +1998,7 @@ def finalize(self, component=None, config=None):


if (self.bindings == [] and hasattr(self, 'exchange')):
self.bindings = [(self.exchange, self.topicPrefix, [ '#' ])]
self.bindings = [(self.broker, self.exchange, self.topicPrefix, [ '#' ])]

if hasattr(self, 'documentRoot') and (self.documentRoot is not None):
path = os.path.expanduser(os.path.abspath(self.documentRoot))
Expand Down Expand Up @@ -2433,7 +2433,7 @@ def __call__(self, parser, namespace, values, option_string):
topicPrefix = namespace.topicPrefix.split('/')

namespace.bindings.append(
(namespace.exchange, topicPrefix, values))
(namespace.broker, namespace.exchange, topicPrefix, values))

def parse_args(self, isPost=False):
"""
Expand Down
39 changes: 22 additions & 17 deletions sarracenia/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,25 +104,10 @@ def __init__(self, urlstr=None):
self.azure_credentials = None
self.implicit_ftps = False

def __str__(self):
def __str__(self) -> str:
"""Returns attributes of the Credential object as a readable string.
"""

s = ''
if False:
s += self.url.geturl()
else:
s += self.url.scheme + '://'
if self.url.username:
s += self.url.username
#if self.url.password:
# s += ':' + self.url.password
if self.url.hostname:
s += '@' + self.url.hostname
if self.url.port:
s += ':' + str(self.url.port)
if self.url.path:
s += self.url.path
s = self.geturl()

s += " %s" % self.ssh_keyfile
s += " %s" % self.passive
Expand All @@ -139,6 +124,24 @@ def __str__(self):

return s

def geturl(self) -> str:
if not hasattr(self,'url'):
return ''

s=''
s += self.url.scheme + '://'
if self.url.username:
s += self.url.username
#if self.url.password:
# s += ':' + self.url.password
if self.url.hostname:
s += '@' + self.url.hostname
if self.url.port:
s += ':' + str(self.url.port)
if self.url.path:
s += self.url.path
return s


class CredentialDB:
"""Parses, stores and manages Credential objects.
Expand Down Expand Up @@ -242,6 +245,8 @@ def get(self, urlstr):
k=urlstr
return False, self.credentials[k]



def has(self, urlstr):
"""Return ``True`` if the Credential matching the urlstr is already in the CredentialDB.
Expand Down
3 changes: 2 additions & 1 deletion sarracenia/examples/flow_api_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
cfg.component = 'subscribe'
cfg.config = 'flow_demo'
cfg.action = 'hoho'
cfg.bindings = [('xpublic', ['v02', 'post'],
cfg.bindings = [( 'xpublic', ['v02', 'post'],
['*', 'WXO-DD', 'observations', 'swob-ml', '#'])]
cfg.queueName = 'q_anonymous.subscriber_test2'
#cfg.debug = True
cfg.download = True
cfg.batch = 1
cfg.messageCountMax = 5
Expand Down
88 changes: 65 additions & 23 deletions sarracenia/flowcb/gather/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,54 +23,96 @@ def __init__(self, options) -> None:
self.od = sarracenia.moth.default_options
self.od.update(self.o.dictify())

if hasattr(self.o, 'broker') and self.o.broker:
self.consumer = sarracenia.moth.Moth.subFactory(self.od)
else:
if not hasattr(self.o, 'broker') or not self.o.broker:
logger.critical('missing required broker specification')
return

if not hasattr(self.o, 'bindings') or not self.o.bindings:
logger.critical('missing required bindings (exchange,subtopic) for broker')
return

self.brokers=[]
for binding in self.o.bindings:
if len(binding) >= 4 and binding[0] not in self.brokers:
self.brokers.append(binding[0])

if len(self.brokers) == 0:
self.brokers=[ self.o.broker ]

self.consumers={}
for broker in self.brokers:
self.od['broker']=broker
self.consumers[broker] = sarracenia.moth.Moth.subFactory(self.od)

def gather(self, messageCountMax) -> list:
"""
return:
True ... you can gather from other sources. and:
a list of messages obtained from this source.
"""
if hasattr(self,'consumer') and hasattr(self.consumer,'newMessages'):
return (True, self.consumer.newMessages())
else:
logger.warning( f'not connected. Trying to connect to {self.o.broker}')
self.consumer = sarracenia.moth.Moth.subFactory(self.od)
new_messages=[]
if not hasattr(self,'consumers'):
return (True, [])

for broker in self.brokers:
if broker in self.consumers and hasattr(self.consumers[broker],'newMessages'):
new_messages.extend(self.consumers[broker].newMessages())
else:
logger.warning( f'not connected. Trying to connect to {broker}')
self.od['broker']=broker
self.consumers[broker] = sarracenia.moth.Moth.subFactory(self.od)

return (True, new_messages)

def ack(self, mlist) -> None:

if not hasattr(self,'consumer'):
if not hasattr(self,'consumers'):
return

for m in mlist:
if not 'broker' in m:
logger.error( f"cannot ack, missing broker in {m}" )
continue

if not m['broker'] in self.consumers:
logger.error( f"cannot ack, no consumer for {m['broker'].geturl()}" )
continue

# messages being re-downloaded should not be re-acked, but they won't have an ack_id (see issue #466)
self.consumer.ack(m)
if hasattr(self.consumers[m['broker']],'ack'):
self.consumers[m['broker']].ack(m)
else:
logger.error( f"cannot ack" )

def metricsReport(self) -> dict:
if hasattr(self,'consumer') and hasattr(self.consumer,'metricsReport'):
return self.consumer.metricsReport()
else:

if not hasattr(self,'consumers'):
return {}

metrics={}
for broker in self.brokers:
if hasattr(self.consumers[broker],'metricsReport'):
metrics[broker.geturl()] = self.consumers[broker].metricsReport()
return metrics

def on_housekeeping(self) -> None:

if not hasattr(self,'consumer'):
if not hasattr(self,'consumers'):
return

if hasattr(self.consumer, 'metricsReport'):
m = self.consumer.metricsReport()
average = (m['rxByteCount'] /
m['rxGoodCount'] if m['rxGoodCount'] != 0 else 0)
logger.info( f"messages: good: {m['rxGoodCount']} bad: {m['rxBadCount']} " +\
f"bytes: {naturalSize(m['rxByteCount'])} " +\
m = self.metricsReport()
for broker in self.brokers:
burl=broker.geturl()
average = (m[burl]['rxByteCount'] /
m[burl]['rxGoodCount'] if m[burl]['rxGoodCount'] != 0 else 0)
logger.info( f"{burl} messages: good: {m[burl]['rxGoodCount']} bad: {m[burl]['rxBadCount']} " +\
f"bytes: {naturalSize(m[burl]['rxByteCount'])} " +\
f"average: {naturalSize(average)}" )
self.consumer.metricsReset()
self.consumers[broker].metricsReset()

def on_stop(self) -> None:
if hasattr(self,'consumer') and hasattr(self.consumer, 'close'):
self.consumer.close()
if hasattr(self,'consumers'):
for broker in self.brokers:
if hasattr(self.consumers[broker], 'close'):
self.consumers[broker].close()
logger.info('closing')
11 changes: 11 additions & 0 deletions sarracenia/flowcb/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ def after_accept(self, worklist) -> None:
if len(mlist) > 0:
worklist.incoming.extend(mlist)

def clean_messages(self,worklist):
for m in worklist.failed:
# json cannot serialize Credential... so just remove it prior to storing it.
if 'broker' in m:
del m['broker']
if 'ack_id' in m:
logger.error( f"putting unacked message {m.getIDStr()} in retry, will never be acked." )
del m['ack_id']

def after_work(self, worklist) -> None:
"""
Messages in `worklist.failed` should be put in the download retry queue. If there are only a few new
Expand All @@ -123,6 +132,7 @@ def after_work(self, worklist) -> None:

if len(worklist.failed) != 0:
logger.debug( f"putting {len(worklist.failed)} messages into {self.download_retry_name}" )
self.clean_messages(worklist)
self.download_retry.put(worklist.failed)
worklist.failed = []

Expand Down Expand Up @@ -151,6 +161,7 @@ def after_post(self, worklist) -> None:
if not features['retry']['present'] :
return

self.clean_messages(worklist)
self.post_retry.put(worklist.failed)
worklist.failed=[]

Expand Down
17 changes: 13 additions & 4 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message:
logger.info('had no delivery info')
logger.info('raw message end')



if type(body) is bytes:
try:
body = raw_msg.body.decode("utf8")
Expand Down Expand Up @@ -142,9 +140,10 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message:
msg.deriveSource( self.o )
msg.deriveTopics( self.o, topic )

msg['broker'] = self.o['broker']
msg['ack_id'] = raw_msg.delivery_info['delivery_tag']
msg['local_offset'] = 0
msg['_deleteOnPost'] |= set( ['ack_id', 'exchange', 'local_offset', 'subtopic'])
msg['_deleteOnPost'] |= set( ['ack_id', 'broker', 'exchange', 'local_offset', 'subtopic'])
if not msg.validate():
if hasattr(self,'channel'):
self.channel.basic_ack(msg['ack_id'])
Expand Down Expand Up @@ -359,7 +358,17 @@ def getSetup(self) -> None:

if self.o['queueBind'] and self.o['queueName']:
for tup in self.o['bindings']:
exchange, prefix, subtopic = tup
if len(tup) == 4:
broker, exchange, prefix, subtopic = tup
elif len(tup) == 3:
exchange, prefix, subtopic = tup
broker = self.o['broker']
else:
logger.critical( f"binding \"{tup}\" should be a list of tuples ( broker, exchange, prefix, subtopic )" )
continue
logger.critical( f"{broker=} {self.o['broker']=} ")
if broker != self.o['broker']:
continue
topic = '.'.join(prefix + subtopic)
if self.o['dry_run']:
logger.info('binding (dry run) %s with %s to %s (as: %s)' % \
Expand Down
15 changes: 12 additions & 3 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,16 @@ def __sub_on_connect(client, userdata, flags, reason_code, properties=None):
if 'topic' in userdata.o:
subj=userdata.o['topic']
else:
exchange, prefix, subtopic = binding_tuple
logger.info( f"tuple: {exchange} {prefix} {subtopic}")
if len(binding_tuple) == 4:
broker, exchange, prefix, subtopic = binding_tuple
elif len(binding_tuple) == 3:
exchange, prefix, subtopic = binding_tuple
broker = userdata.o['broker']
else:
logger.critical( f"invalid binding: \"{binding_tuple}\" should be a tuple containing ( broker, exchange, topicPrefix, subtopic )" )
continue

logger.info( f"tuple: {broker} {exchange} {prefix} {subtopic}")

subj = '/'.join(['$share', userdata.o['queueName'], exchange] +
prefix + subtopic)
Expand Down Expand Up @@ -590,10 +598,11 @@ def _msgDecode(self, mqttMessage) -> sarracenia.Message:
message.deriveSource( self.o )
message.deriveTopics( self.o, topic=mqttMessage.topic, separator='/' )

message['broker'] = self.o['broker']
message['ack_id'] = mqttMessage.mid
message['qos'] = mqttMessage.qos
message['local_offset'] = 0
message['_deleteOnPost'] |= set( ['exchange', 'local_offset', 'ack_id', 'qos' ])
message['_deleteOnPost'] |= set( ['ack_id', 'broker', 'exchange', 'local_offset', 'qos' ])

self.metrics['rxLast'] = sarracenia.nowstr()
if message.validate():
Expand Down

0 comments on commit fdf7d2a

Please sign in to comment.