Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V03 issue617 (part of it.) "topic" depends more on payload format, than message protocol. #685

Merged
merged 4 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sarracenia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,9 @@ def updatePaths(msg, options, new_dir=None, new_file=None):
logger.error('missing post_baseUrl setting')
return

if options.post_topicPrefix:
if options.post_format:
msg['post_format'] = options.post_format
elif options.post_topicPrefix:
msg['post_format'] = options.post_topicPrefix[0]
elif options.topicPrefix != msg['_format']:
logger.warning( f"received message in {msg['_format']} format, expected {options.post_topicPrefix} " )
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __call__(self, parser, namespace, values, option_string=None):
'post_documentRoot': None,
'post_baseDir': None,
'post_baseUrl': None,
'post_format': None,
'post_format': 'v03',
'realpathPost': False,
'recursive' : True,
'report': False,
Expand Down
5 changes: 3 additions & 2 deletions sarracenia/examples/moth_api_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@
m = h.getNewMessage()
if m is not None:
print("message: %s" % m)
content = m.getContent()
print("corresponding file: %s" % content)
#content = m.getContent()
#print("corresponding file: %s" % content)
h.ack(m)
time.sleep(0.1)
count += 1

print(' got %d messages' % count)
h.cleanup()
h.close()
exit( count )
17 changes: 14 additions & 3 deletions sarracenia/examples/moth_api_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,25 @@
import os
import time
import socket
import sys

"""
supply broker argument as a command line argument.

"""
if len(sys.argv) > 1:
broker = sys.argv[1]
else:
broker = 'amqp://tfeed:HungryCat@localhost'

cfg = default_config()
#cfg.logLevel = 'debug'
cfg.broker = sarracenia.credentials.Credential(
'amqp://tfeed:HungryCat@localhost')
cfg.exchange = 'xpublic'
cfg.broker = sarracenia.credentials.Credential( broker )
cfg.exchange = 'xsarra'
cfg.post_baseUrl = 'http://host'
cfg.post_baseDir = '/tmp'
cfg.topicPrefix = [ 'v03', 'post' ]
cfg.logLevel = 'debug'

print('cfg: %s' % cfg)

Expand Down
2 changes: 2 additions & 0 deletions sarracenia/flowcb/v2wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def __init__(self, h):
h['parts'] = '%s,%d,%d,%d,%d' % (m, p['size'], p['count'],
p['remainder'], p['number'])

h['topic'] = [ 'v02', 'post' ] + self.relpath.split('/')[0:-1]

if 'parts' in h:
self.partstr = h['parts']
#else:
Expand Down
26 changes: 16 additions & 10 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,16 +547,7 @@ def putNewMessage(self,
body = copy.deepcopy(body)

version = body['_format']
topic = '.'.join(self.o['topicPrefix'] + body['subtopic'])
topic = topic.replace('#', '%23')
topic = topic.replace('*', '%22')

if len(topic) >= 255: # ensure topic is <= 255 characters
logger.error("message topic too long, truncating")
mxlen = amqp_ss_maxlen
while (topic.encode("utf8")[mxlen - 1] & 0xc0 == 0xc0):
mxlen -= 1
topic = topic.encode("utf8")[0:mxlen].decode("utf8")

if '_deleteOnPost' in body:
# FIXME: need to delete because building entire JSON object at once.
Expand Down Expand Up @@ -594,11 +585,26 @@ def putNewMessage(self,
else:
ttl = "0"

raw_body, headers, content_type = PostFormat.exportAny( body, version )
raw_body, headers, content_type = PostFormat.exportAny( body, version, self.o['topicPrefix'] )

topic = '.'.join(headers['topic'])
topic = topic.replace('#', '%23')
topic = topic.replace('*', '%22')

if len(topic) >= 255: # ensure topic is <= 255 characters
logger.error("message topic too long, truncating")
mxlen = amqp_ss_maxlen
while (topic.encode("utf8")[mxlen - 1] & 0xc0 == 0xc0):
mxlen -= 1
topic = topic.encode("utf8")[0:mxlen].decode("utf8")

if self.o['messageDebugDump']:
logger.info('raw message body: version: %s type: %s %s' %
(version, type(raw_body), raw_body))
logger.info('raw message headers: type: %s value: %s' % (type(headers), headers))

del headers['topic']

if headers :
for k in headers:
if (type(headers[k]) is str) and (len(headers[k]) >=
Expand Down
22 changes: 12 additions & 10 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ def putNewMessage(self,
self.__putSetup(self.o)

postFormat = body['_format']

if '_deleteOnPost' in body:
# FIXME: need to delete because building entire JSON object at once.
# makes this routine alter the message. Ideally, would use incremental
Expand Down Expand Up @@ -661,22 +662,23 @@ def putNewMessage(self,
else:
exchange = self.o['exchange']

# FIXME: might
topic = '/'.join([exchange] + self.o['topicPrefix'] + body['subtopic'])

# url-quote wildcard characters in topics.
topic = topic.replace('#', '%23')
topic = topic.replace('+', '%2B')

del body['subtopic']
props = Properties(PacketTypes.PUBLISH)
# https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901111
props = Properties(PacketTypes.PUBLISH)
props.PayloadFormatIndicator = 1 # designates UTF-8

props.ContentType = PostFormat.content_type( postFormat )

try:
raw_body, headers, content_type = PostFormat.exportAny( body, postFormat )
raw_body, headers, content_type = PostFormat.exportAny( body, postFormat, [exchange]+self.o['topicPrefix'] )
# FIXME: might
topic = '/'.headers['topic']

# url-quote wildcard characters in topics.
topic = topic.replace('#', '%23')
topic = topic.replace('+', '%2B')

del headers['topic']
del body['subtopic']

if headers:
props.UserProperty=list(map( lambda x : (x,headers[x]) , headers ))
Expand Down
7 changes: 4 additions & 3 deletions sarracenia/postformat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,16 @@ def importAny(payload, headers, content_type ) -> sarracenia.Message:
pass

@staticmethod
def exportAny(msg, post_format='v03') -> (str, dict, str):
def exportAny(msg, post_format='v03', topicPrefix=[ 'v03' ]) -> (str, dict, str):
"""
return a tuple of the encoded message body, a headers dict, and content_type
and a completed topic as a list as one header.
"""
for sc in PostFormat.__subclasses__():
if post_format == sc.__name__.lower():
return sc.exportMine( msg )
return sc.exportMine( msg, topicPrefix )

return None, None, self.mimetype
return None, None, None

# test for v04 first, because v03 may claim all other JSON.
import sarracenia.postformat.wis
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/postformat/v02.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def importMine(body, headers) -> sarracenia.Message:
return msg

@staticmethod
def exportMine(body) -> (str, dict, str):
def exportMine(body,topic_prefix) -> (str, dict, str):
"""
given a v03 (internal) message, produce an encoded version.
"""
Expand Down
10 changes: 8 additions & 2 deletions sarracenia/postformat/v03.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,15 @@ def importMine(body, headers) -> sarracenia.Message:
return msg

@staticmethod
def exportMine(body) -> (str, dict, str):
def exportMine(body,topic_prefix) -> (str, dict, str):
"""
given a v03 (internal) message, produce an encoded version.
"""
raw_body = json.dumps(body)
return raw_body, None, V03.content_type()

if 'relPath' in body:
headers = { 'topic': topic_prefix + body['relPath'].split('/')[0:-1] }
else:
headers = { 'topic': topic_prefix }

return raw_body, headers, V03.content_type()
9 changes: 5 additions & 4 deletions sarracenia/sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,10 +1066,11 @@ def __init__(self, opt, config_fnmatches=None):

if self.appname is None:
self.appname = 'sr3'
else:
print(
'DEVELOPMENT using alternate application name: %s, bindir=%s' %
(self.appname, self.bin_dir))
#else:
# print(
# 'DEVELOPMENT using alternate application name: %s, bindir=%s' %
# (self.appname, self.bin_dir))


if not os.path.isdir(self.user_config_dir):
print( f'INFO: No {self.appname} configuration found. creating an empty one {self.user_config_dir}' )
Expand Down