Skip to content

Commit

Permalink
625 re enable reporting left aside for sr3 (#762)
Browse files Browse the repository at this point in the history
* more correct python parameter typing in moth classes.

* reports implemented and working again as per #625.

* move timeCompleted within report field. update sr_post.7 man page
to reflect effective report format.

* setting uninitialized lists to None has side effected.

* unit tests have more output when admin.conf is present.
  • Loading branch information
petersilva authored Oct 10, 2023
1 parent 633c181 commit e7f7027
Show file tree
Hide file tree
Showing 17 changed files with 210 additions and 39 deletions.
2 changes: 1 addition & 1 deletion AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Cléa Aumont
how about issues: #447, #438, #435, #433, #433, #381, #378, #366, #350

Cody Au
fixes #413, #387, #348. working on sr process management.
fixes #413, #387, #348. working on sr process management.

Michel Grenier <[email protected]> (Retired)
dd_subscribe, sr_subscribe, sr_sarra, sr_post,
Expand Down
1 change: 1 addition & 0 deletions docs/source/Reference/sr_post.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ The headers are an array of name:value pairs::
headers will be present:
"report" { "code": 999 - HTTP style response code.
"timeCompleted": "YYYYMMDDTHHMMSS.ss" - UTC date/timestamp.
"message" : - status report message documented in `Report Messages`_
}

Expand Down
3 changes: 2 additions & 1 deletion docs/source/fr/Reference/sr_post.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Les en-têtes sont un tableau de paires nom:valeur::

OBLIGATOIRE:

"pubTime" - YYYYMMDDTHHMMSS.ss - UTC date/horodatage.
"pubTime" - "YYYYMMDDTHHMMSS.ss" - UTC date/horodatage.
"baseUrl" - racine de l’URL à télécharger.
"relPath" - Le chemin relatif peut être concaténé à <base_url>

Expand Down Expand Up @@ -124,6 +124,7 @@ Les en-têtes sont un tableau de paires nom:valeur::
Pour le messages de thème "v03.report", les en-têtes additionnelles qui suivent seront présents:

"report" { "code": 999 - Code de réponse de style HTTP.
"timeCompleted": "YYYYMMDDTHHMMSS.ss" - UTC date/horodatage.
"message" : - message de rapport d’état documenté dans `Report Messages`_
}

Expand Down
11 changes: 5 additions & 6 deletions sarracenia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class TimeConversions:
* beginning of epoch is platform dependent, and conversion to actual date is fraught (leap seconds, etc...)
* Entire SR\_* formats are text, no floats are sent over the protocol
* Entire SR_* formats are text, no floats are sent over the protocol
(avoids byte order issues, null byte / encoding issues, and enhances readability.)
* str format: YYYYMMDDHHMMSS.msec goal of this representation is that a naive
Expand Down Expand Up @@ -717,14 +717,13 @@ def setReport(msg, code, text=None):
logger.warning('overriding initial report: %d: %s' %
(msg['report']['code'], msg['report']['message']))

msg['timeCompleted'] = nowstr()
msg['report'] = {'code': code, 'message': text}
msg['_deleteOnPost'] |= set(['report','timeCompleted'])
msg['report'] = {'code': code, 'timeCompleted': nowstr(), 'message': text}
msg['_deleteOnPost'] |= set(['report'])

def updatePaths(msg, options, new_dir=None, new_file=None):
"""
set the new\\\_ fields in the message based on changed file placement.
if new_ options are ommitted updaste the rest of the fields in
set the new_* fields in the message based on changed file placement.
if new_* options are ommitted updaste the rest of the fields in
the message based on their current values.
If you change file placement in a flow callback, for example.
Expand Down
8 changes: 7 additions & 1 deletion sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ def add_option(self, option, kind='list', default_value=None, all_values=None ):
* 'str' an arbitrary string value, as will all of the above types, each
succeeding occurrence overrides the previous one.
If a value is set to None, that could mean that it has not been set.
"""
#Blindly add the option to the list if it doesn't already exist
if not hasattr(self, option):
Expand Down Expand Up @@ -1016,7 +1017,12 @@ def add_option(self, option, kind='list', default_value=None, all_values=None ):
elif kind == 'list':
list_options.append( option )
if type(v) is not list:
setattr(self, option, [v])
#subtlety... None means: has not been set,
# where an empty list to be an explicit setting.
if v is None:
setattr(self, option, None)
else:
setattr(self, option, [v])
elif kind == 'set':
set_options.append(option)
sv=set()
Expand Down
8 changes: 1 addition & 7 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,7 @@ def run(self):
self.post()
self._runCallbacksWorklist('after_post')

self.report()

self._runCallbacksWorklist('report')
self._runCallbackMetrics()

if hasattr(self.o, 'metricsFilename' ):
Expand Down Expand Up @@ -1058,11 +1057,6 @@ def post(self) -> None:
logger.error( f'flowCallback plugin {p} crashed: {ex}' )
logger.debug( "details:", exc_info=True )

def report(self) -> None:
# post reports
# apply on_report plugins
pass

def write_inline_file(self, msg) -> bool:
"""
write local file based on a message with inlined content.
Expand Down
4 changes: 2 additions & 2 deletions sarracenia/flowcb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

'ack', 'after_accept', 'after_post', 'after_work', 'destfn', 'do_poll',
'download', 'gather', 'metricsReport', 'on_cleanup', 'on_declare', 'on_features',
'on_housekeeping', 'on_report', 'on_sanity', 'on_start', 'on_stop',
'please_stop', 'poll', 'post', 'send',
'on_housekeeping', 'on_sanity', 'on_start', 'on_stop',
'please_stop', 'poll', 'post', 'report', 'send',

]

Expand Down
4 changes: 2 additions & 2 deletions sarracenia/flowcb/accept/pathreplace.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ def __init__(self, options):
self.o.add_option( 'pathReplaceFields', 'set', all_fields, all_fields )

def on_start(self):
if self.o.pathReplace == [None]:
if self.o.pathReplace == None:
logger.error("pathReplace setting mandatory")
return

logger.debug("pathReplace is %s " % self.o.pathReplace )

def after_accept(self, worklist):
if self.o.pathReplace == [None]:
if self.o.pathReplace == None:
return

for msg in worklist.incoming:
Expand Down
8 changes: 4 additions & 4 deletions sarracenia/flowcb/accept/postoverride.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,21 @@ def __init__(self, options):
self.o.add_option('postOverride', 'list')
self.o.add_option('postOverrideDel', 'list')

if self.o.postOverride != [None]:
if self.o.postOverride != None:
logger.info('postOverride settings: %s' % self.o.postOverride)
if self.o.postOverrideDel != [None]:
if self.o.postOverrideDel != None:
logger.info('postOverrideDel settings: %s' % self.o.postOverrideDel)

def after_accept(self, worklist):
for message in worklist.incoming:

if self.o.postOverride != [None]:
if self.o.postOverride != None:
for o in self.o.postOverride:
(osetting, ovalue) = o.split()
logger.debug('postOverride applying key:%s value:%s' % (osetting, ovalue))
message[osetting] = ovalue

if self.o.postOverrideDel != [None]:
if self.o.postOverrideDel != None:
for od in self.o.postOverrideDel:
if od in message:
logger.debug('postOverride deleting key:%s ' % od)
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flowcb/accept/toclusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, options):

self.o.add_option('msgToClusters', 'list')

if self.o.msgToClusters == [None]:
if self.o.msgToClusters == None:
logger.info("msgToClusters setting mandatory")
return

Expand Down
168 changes: 168 additions & 0 deletions sarracenia/flowcb/report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@

import logging
import sarracenia
from sarracenia.flowcb import FlowCB

logger = logging.getLogger(__name__)


class Report(FlowCB):
"""
The reporting flow callback class. reports are messages meant to be sent by
consumers back to publishers to provide publishers with telemetry data about
how many consumers are downloading, and how it went.
minimally it can be invoked with:
callback report
and default settings might work.
report_broker -- the broker to send reports to, defaults to the same as broker.
report_exchange -- exchange to publish reports to, default depends on broker user.
if the broker user has a feeder or admin role, then defaults to xreport.
otherwise, default is xr_<username}.
can override with this setting.
repot_topicPrefix, report_topic, report_exchangeSplit ... same as for post_broker.
"""
def __init__(self, options):

super().__init__(options,logger)
self.o.add_option( 'report_exchangeSplit', 'count', 0 )
self.o.add_option( 'report_topicPrefix', 'list', ['v03'] )
self.o.add_option( 'report_topic', 'str', None )
self.o.add_option( 'report_broker', 'str', None )
self.o.add_option( 'report_exchange', 'str', None )

if not hasattr(self.o, 'report_broker') and getattr(self.o,'broker'):
self.o.report_broker = self.o.broker

if hasattr(self.o, 'report_broker') and self.o.report_broker:
if type(self.o.report_broker) == str:
ok, cred_details = self.o._validate_urlstr(self.o.report_broker)
if ok:
self.o.report_broker = cred_details

if not hasattr( self.o, 'report_exchange' ) or not getattr( self.o, 'report_exchange'):
# guess default report exchange.
if hasattr(self.o.report_broker,'url') and hasattr(self.o.report_broker.url,'username'):
user = self.o.report_broker.url.username
exchange = 'xr_' + user
if user in self.o.declared_users:
role=self.o.declared_users[user]
if role in 'feeder' in [ 'feeder', 'admin' ]:
exchange = 'xreport'
self.o.report_exchange = exchange

logger.info( f" defaulting reporting to {self.o.report_broker}/{self.o.report_exchange} " )

self.__reset()

if hasattr(self.o, 'report_broker'):
props = sarracenia.moth.default_options
props.update(self.o.dictify())
logger.info( f" in props... report_broker: {props['report_broker']}" )

if hasattr(self.o, 'topic' ):
del self.o['topic']

# adjust settings post_xxx to be xxx, as Moth does not use post_ ones.
for k in [ 'broker', 'exchange', 'topicPrefix', 'exchangeSplit', 'topic' ]:
post_one='report_'+k
if hasattr( self.o, post_one ) and getattr( self.o, post_one):
props[ k ] = getattr(self.o,post_one)

logger.info( f" in props... report_broker: {props['broker']}, exchange: {props['exchange']}" )
self.poster = sarracenia.moth.Moth.pubFactory(props)
logger.info( f" poster: {self.poster} " )
else:
self.poster = None

def __reset(self):
self.last_housekeeping = sarracenia.nowflt()
self.reportCount = 0
self.reportRate = 0

def metricsReport(self):
return { 'reportRate':self.reportRate, 'reportCount':self.reportCount }

def reportPost(self,m):

if not hasattr(self.poster,'putNewMessage'):
return

if 'report' not in m:
logger.error( f"not reporting because no disposition defined for {m}" )

if 'content' in m:
del m['content']

m['_deleteOnPost'] = m['_deleteOnPost'].difference(set(['report']))

self.poster.putNewMessage(m)

def after_accept(self, worklist):
if not hasattr(self.poster,'putNewMessage'):
return

for m in worklist.rejected:
self.reportPost(m)

def after_work(self, worklist):
if not hasattr(self.poster,'putNewMessage'):
return

for m in worklist.rejected:
self.reportPost(m)

def report(self, worklist):
if not hasattr(self.poster,'putNewMessage'):
return

for m in worklist.ok:
self.reportPost(m)

for m in worklist.failed:
mm=copy.deepcopy(m) # copy because might be retried, so no modification is allowed.
self.reportPost(mm)

for m in worklist.rejected:
self.reportPost(m)

def stats(self):
tot = self.reportCount
how_long = sarracenia.nowflt() - self.last_housekeeping
if tot > 0:
apc = 100 * self.reportCount / tot
rate = self.reportCount / how_long
else:
apc = 0
rate = 0

self.reportRate = rate
logger.info( "reports %d, rate %3.1f reports/s" % (self.reportCount , rate))


def on_declare(self):
logger.info("hello")

def on_housekeeping(self):
if hasattr(self,'poster') and self.poster:
m = self.poster.metricsReport()
logger.info(
f"reports: good: {m['txGoodCount']} bad: {m['txBadCount']} bytes: {m['txByteCount']}"
)
self.poster.metricsReset()

if set(['on_housekeeping']) & self.o.logEvents:
self.stats()
logger.info("housekeeping")
self.__reset()

def on_stop(self):
if hasattr(self,'poster') and self.poster:
self.poster.close()
logger.info('closing')

2 changes: 1 addition & 1 deletion sarracenia/flowcb/rootchown.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(self, options):
def on_start(self):

if not hasattr(self.o, "rootChownMappingFile"):
self.o.mapping_file = [None]
self.o.mapping_file = None
return True

mf_path = self.o.mapping_file[0]
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flowcb/work/age.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def after_work(self, worklist) -> None:
for m in worklist.ok:
if not 'mtime' in m:
return None
completed = sarracenia.timestr2flt(m['timeCompleted'])
completed = sarracenia.timestr2flt(m['report']['timeCompleted'])
mtime = sarracenia.timestr2flt(m['mtime'])
pubtime = sarracenia.timestr2flt(m['pubTime'])
age = completed - mtime
Expand Down
4 changes: 2 additions & 2 deletions sarracenia/moth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def __init__(self, props=None, is_subscriber=True) -> None:
logging.basicConfig(format=self.o['logFormat'],
level=getattr(logging, self.o['logLevel'].upper()))

def ack(self, message) -> None:
def ack(self, message: sarracenia.Message ) -> None:
"""
tell broker that a given message has been received.
Expand Down Expand Up @@ -334,7 +334,7 @@ def newMessages(self) -> list:
logger.error("NewMessages unimplemented")
return []

def putNewMessage(self, message, content_type='application/json') -> bool:
def putNewMessage(self, message:sarracenia.Message, content_type: str ='application/json') -> bool:
"""
publish a message as set up to the given topic.
Expand Down
Loading

0 comments on commit e7f7027

Please sign in to comment.