From e6da9fff6204337e72f6092bb0c3fe346befa321 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 7 Jun 2024 11:28:19 -0400 Subject: [PATCH 1/5] implement message rate per cpu second as a metric --- sarracenia/flow/__init__.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index f1deeb6c5..0f1c8a580 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -218,7 +218,8 @@ def metricsFlowReset(self) -> None: self.new_metrics = { 'flow': { 'stop_requested': False, 'last_housekeeping': 0, 'transferConnected': False, 'transferConnectStart': 0, 'transferConnectTime':0, - 'transferRxBytes': 0, 'transferTxBytes': 0, 'transferRxFiles': 0, 'transferTxFiles': 0 } } + 'transferRxBytes': 0, 'transferTxBytes': 0, 'transferRxFiles': 0, 'transferTxFiles': 0, + 'last_housekeeping_cpuTime': 0, 'cpuTime' : 0, } } # carry over some metrics... that don't reset. if hasattr(self,'metrics'): @@ -364,6 +365,8 @@ def _runCallbackMetrics(self): except Exception as ex: logger.error( f'flowCallback plugin {p}/metricsReport crashed: {ex}' ) logger.debug( "details:", exc_info=True ) + ost = os.times() + self.metrics['flow']['cpuTime'] = ost.user+ost.system-self.metrics['flow']['last_housekeeping_cpuTime'] def _runCallbackPoll(self): if hasattr(self, "Poll"): @@ -412,6 +415,9 @@ def _runHousekeeping(self, now) -> float: self.runCallbacksTime('on_housekeeping') self.metricsFlowReset() self.metrics['flow']['last_housekeeping'] = now + ost = os.times() + self.metrics['flow']['last_housekeeping_cpuTime'] = ost.user+ost.system + self.metrics['flow']['cpuTime'] = ost.user+ost.system next_housekeeping = now + self.o.housekeeping self.metrics['flow']['next_housekeeping'] = next_housekeeping @@ -529,6 +535,8 @@ def run(self): current_sleep = self.o.sleep last_time = start_time self.metrics['flow']['last_housekeeping'] = start_time + ost=os.times() + self.metrics['flow']['last_housekeeping_cpuTime'] = ost.user+ost.system if self.o.logLevel == 'debug': logger.debug("options:") @@ -600,6 +608,7 @@ def run(self): elapsed = now - last_time self.metrics['flow']['msgRate'] = current_rate + self.metrics['flow']['msgRateCpu'] = total_messages / (self.metrics['flow']['cpuTime']+self.metrics['flow']['last_housekeeping_cpuTime'] ) if (last_gather_len == 0) and (self.o.sleep < 0): if (self.o.retryEmptyBeforeExit and "retry" in self.metrics From 8d851ad556959fc2aad0f62336aa85c4a7eff2a3 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 7 Jun 2024 09:31:05 -0400 Subject: [PATCH 2/5] sr3 --full status crash --- sarracenia/sr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sarracenia/sr.py b/sarracenia/sr.py index 302fc0252..32f7dc7c3 100755 --- a/sarracenia/sr.py +++ b/sarracenia/sr.py @@ -2600,7 +2600,7 @@ def status(self): naturalSize(m["transferRxFiles"]).replace("B","F").replace("Fyte","File"), \ naturalSize(m["transferTxBytes"]), \ naturalSize(m["transferTxFiles"]).replace("B","F").replace("Fyte","File"), \ - time_base ) + m["time_base"] ) else: line += " %10s %10s %9s %5s %5s %10s %8s" % ( "-", "-", "-", "-", "-", "-", "-" ) if self.options.displayFull: From 15442949979dd0fb724ae801be142c1844fd509d Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 7 Jun 2024 12:12:42 -0400 Subject: [PATCH 3/5] add cpuSlow metric and threshold to sr status --- sarracenia/config.py | 9 +++++---- sarracenia/sr.py | 28 ++++++++++++++++++++-------- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/sarracenia/config.py b/sarracenia/config.py index 6d3c636ee..f69a0b474 100755 --- a/sarracenia/config.py +++ b/sarracenia/config.py @@ -92,7 +92,6 @@ def __repr__(self) -> str: 'dry_run': False, 'filename': None, 'flowMain': None, - 'runStateThreshold_idle': 900, 'inflight': None, 'inline': False, 'inlineOnly': False, @@ -101,7 +100,6 @@ def __repr__(self) -> str: 'logMetrics': False, 'logStdout': False, 'metrics_writeInterval': 5, - 'runStateThreshold_lag': 30, 'nodupe_driver': 'disk', 'nodupe_ttl': 0, 'overwrite': True, @@ -119,8 +117,11 @@ def __repr__(self) -> str: 'report': False, 'retryEmptyBeforeExit': False, 'retry_refilter': False, - 'runStateThreshold_retry': 1000, + 'runStateThreshold_cpuSlow': 0, 'runStateThreshold_hung': 450, + 'runStateThreshold_idle': 900, + 'runStateThreshold_lag': 30, + 'runStateThreshold_retry': 1000, 'runStateThreshold_slow': 0, 'sourceFromExchange': False, 'sourceFromMessage': False, @@ -136,7 +137,7 @@ def __repr__(self) -> str: count_options = [ 'batch', 'count', 'exchangeSplit', 'instances', 'logRotateCount', 'no', 'post_exchangeSplit', 'prefetch', 'messageCountMax', 'messageRateMax', - 'messageRateMin', 'runStateThreshold_reject', 'runStateThreshold_retry', 'runStateThreshold_slow' + 'messageRateMin', 'runStateThreshold_cpuSlow', 'runStateThreshold_reject', 'runStateThreshold_retry', 'runStateThreshold_slow', ] diff --git a/sarracenia/sr.py b/sarracenia/sr.py index 32f7dc7c3..838b85394 100755 --- a/sarracenia/sr.py +++ b/sarracenia/sr.py @@ -55,14 +55,14 @@ logger = logging.getLogger(__name__) -empty_metrics={ "byteRate":0, "rejectCount":0, "last_housekeeping":0, "messagesQueued": 0, +empty_metrics={ "byteRate":0, "cpuTime":0, "rejectCount":0, "last_housekeeping":0, "messagesQueued": 0, "lagMean": 0, "latestTransfer": 0, "rejectPercent":0, "transferRxByteRate":0, "transferTxByteRate": 0, "rxByteCount":0, "rxGoodCount":0, "rxBadCount":0, "txByteCount":0, "txGoodCount":0, "txBadCount":0, "lagMax":0, "lagTotal":0, "lagMessageCount":0, "disconnectTime":0, "transferConnectTime":0, "transferRxLast": 0, "transferTxLast": 0, "rxLast":0, "txLast":0, "transferRxBytes":0, "transferRxFiles":0, "transferTxBytes": 0, "transferTxFiles": 0, "msgs_in_post_retry": 0, "msgs_in_download_retry":0, "brokerQueuedMessageCount": 0, - 'time_base': 0, 'byteTotal': 0, 'byteRate': 0, 'msgRate': 0, 'retry': 0, 'transferLast': 0, + 'time_base': 0, 'byteTotal': 0, 'byteRate': 0, 'msgRate': 0, 'msgRateCpu': 0, 'retry': 0, 'transferLast': 0, 'connectPercent': 0, 'byteConnectPercent': 0 } @@ -894,7 +894,7 @@ def _resolve(self): 'rxLagTime':0, 'rxLagCount':0, 'rxMessageQueued':0, 'rxMessageRetry':0, 'txMessageQueued':0, 'txMessageRetry':0, - 'rxMessageRate':0, 'rxDataRate':0, 'rxFileRate':0, 'rxMessageByteRate':0, + 'rxMessageRate':0, 'rxMessageRateCpu':0, 'rxDataRate':0, 'rxFileRate':0, 'rxMessageByteRate':0, 'txMessageRate':0, 'txDataRate':0, 'txFileRate':0, 'txMessageByteRate':0 } for c in self.components: @@ -953,6 +953,8 @@ def _resolve(self): metrics['txLast'] = newval if 'messageLast' not in metrics or (newval > metrics['messageLast']): metrics['messageLast'] = newval + elif k in [ "cpuTime" ]: + metrics['cpuTime'] += newval else: metrics[k] += newval #else: @@ -999,9 +1001,14 @@ def _resolve(self): m['byteRate'] = byteTotal/time_base m['msgRate'] = (m["rxGoodCount"]+m["rxBadCount"])/time_base + if m['cpuTime'] > 0: + m['msgRateCpu'] = (m["rxGoodCount"]+m["rxBadCount"])/m['cpuTime'] + else: + m['msgRateCpu'] = 0 self.cumulative_stats['rxMessageByteRate'] += m['byteRate'] self.cumulative_stats['rxMessageRate'] += m['msgRate'] + self.cumulative_stats['rxMessageRateCpu'] += m['msgRateCpu'] m['transferRxByteRate'] = m['transferRxBytes']/time_base m['transferRxFileRate'] = m['transferRxFiles']/time_base @@ -1110,6 +1117,10 @@ def _resolve(self): else: flow_status = 'running' + if self.states[c][cfg]['metrics']['msgRate'] > 0 and \ + self.states[c][cfg]['metrics']['msgRateCpu'] < self.configs[c][cfg]['options'].runStateThreshold_cpuSlow: + flow_status = 'cpuSlow' + self.states[c][cfg]['resource_usage'] = copy.deepcopy(resource_usage) self.configs[c][cfg]['status'] = flow_status @@ -1285,7 +1296,7 @@ def __init__(self, opt, config_fnmatches=None): 'sender', 'shovel', 'subscribe', 'watch', 'winnow' ] # active means >= 1 process exists on the node. - self.status_active = ['hung', 'idle', 'lagging', 'partial', 'reject', 'retry', 'running', 'slow', 'waitVip' ] + self.status_active = ['cpuSlow', 'hung', 'idle', 'lagging', 'partial', 'reject', 'retry', 'running', 'slow', 'waitVip' ] self.status_values = self.status_active + [ 'disabled', 'include', 'missing', 'stopped', 'unknown' ] self.bin_dir = os.path.dirname(os.path.realpath(__file__)) @@ -2505,7 +2516,7 @@ def status(self): line = "%-40s %-11s %7s %10s %19s %14s %38s " % ("Component/Config", "Processes", "Connection", "Lag", "", "Rates", "" ) if self.options.displayFull: - line += "%-40s %17s %33s %40s" % ("Counters (per housekeeping)", "", "Data Counters", "" ) + line += "%10s %-40s %17s %33s %40s" % ("", "Counters (per housekeeping)", "", "Data Counters", "" ) line += "%s %-21s " % (" ", "Memory" ) if self.options.displayFull: @@ -2518,8 +2529,8 @@ def status(self): underline = "%-40s %-5s %5s %5s %4s %4s %8s %7s %5s %5s %5s %10s %-10s %-10s %-10s " % ("", "-----", "---", "-----", "---", "----", "------", "------", "------", "----", "----", "------", "--------", "------", "------" ) if self.options.displayFull: - line += "%10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %8s" % \ - ( "subBytes", "Accepted", "Rejected", "Malformed", "pubBytes", "pubMsgs", "pubMal", "rxData", "rxFiles", "txData", "txFiles", "Since" ) + line += "%10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %8s" % \ + ( "Msg/scpu", "subBytes", "Accepted", "Rejected", "Malformed", "pubBytes", "pubMsgs", "pubMal", "rxData", "rxFiles", "txData", "txFiles", "Since" ) underline += "%10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %8s %10s " % \ ( "-------", "--------", "--------", "---------", "-------", "------", "-----", "-----", "-------", "------", "-------", "-----" ) @@ -2588,7 +2599,8 @@ def status(self): ) if self.options.displayFull : - line += " %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %7.2fs" % ( \ + line += " %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %7.2fs" % ( \ + naturalSize(m['msgRateCpu']).replace("B","m").replace("mytes","msgs"), \ naturalSize(m['rxByteCount']), \ naturalSize(m['rxGoodCount']).replace("B","m").replace("myte","msg"), \ naturalSize(m["rejectCount"]).replace("B","m").replace("myte","msg"), \ From 6cefd7f18384ff488bb787d8ccff0aa70a6e9e7a Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 7 Jun 2024 12:20:59 -0400 Subject: [PATCH 4/5] document runStateThreshold_cpuSlow in reference bilingual --- docs/source/Reference/sr3_options.7.rst | 12 ++++++++++++ docs/source/fr/Reference/sr3_options.7.rst | 12 ++++++++++++ 2 files changed, 24 insertions(+) diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index 88df2c07c..a864ab2a2 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -1591,6 +1591,18 @@ a file before it is aged out of a the queue. Default is two days. If a file ha been transferred after two days of attempts, it is discarded. +runStateThreshold_cpuSlow (default: 0) +---------------------------------------------- + +The *runStateThreshold_cpuSlow* setting sets the minimum rate of transfer expected for flow +processing messages. If the messages processed per cpu second rate drops below this threshold, +then the flow will be identified as "cpuSlow." (shown as cpuS on the *sr3 status* display.) +This test will only apply if a flow is actually transferring messages. +The rate is only visible in *sr3 --full status* + +This may indicate that the routing is inordinately expensive or the transfers inordinately slow. +It defaults to inactive, but may be set to identify transient issues. + runStateThreshold_hung (default: 450) ------------------------------------------------ diff --git a/docs/source/fr/Reference/sr3_options.7.rst b/docs/source/fr/Reference/sr3_options.7.rst index 4ba782392..2bc563409 100644 --- a/docs/source/fr/Reference/sr3_options.7.rst +++ b/docs/source/fr/Reference/sr3_options.7.rst @@ -1571,6 +1571,18 @@ L’option **retry_ttl** (nouvelle tentative de durée de vie) indique combien d un fichier avant qu’il ne soit rejeté de la fil d’attente. Le défaut est de deux jours. Si un fichier n’a pas été transféré après deux jours de tentatives, il est jeté. +runStateThreshold_cpuSlow (par défaut : 0) +--------------------------------------------------- + +Le paramètre *runStateThreshold_cpuSlow* définit le taux de transfert minimum attendu pour le flux +de messages. Si le débit des messages traités par seconde CPU tombe en dessous de ce seuil, +alors le flux sera identifié comme « cpuSlow ». (affiché comme cpuS sur l'écran *sr3 status*.) +Ce test ne s'appliquera que si un flux transfère réellement des messages. +Le taux n'est visible que dans *sr3 --full status* + +Cela peut indiquer que l'acheminement est excessivement coûteux ou que les transferts sont excessivement lents. +Par défaut, il est inactif, mais peut être défini pour identifier des problèmes temporaires. + runStateThreshold_hung (défaut: 450s) -------------------------------------------------- From 01b8a465c6cb0179480bddf8e23dcb8519bb33b9 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Tue, 11 Jun 2024 11:19:35 -0400 Subject: [PATCH 5/5] more elaborate advice when cpuS encountered --- docs/source/Reference/sr3_options.7.rst | 9 +++++++++ docs/source/fr/Reference/sr3_options.7.rst | 8 ++++++++ 2 files changed, 17 insertions(+) diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index a864ab2a2..effc57daf 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -1601,6 +1601,15 @@ This test will only apply if a flow is actually transferring messages. The rate is only visible in *sr3 --full status* This may indicate that the routing is inordinately expensive or the transfers inordinately slow. +Examples that could contribute to this: + +* one hundred regular expressions must be evaluated per message received. Regex's, when cumulated, can get expensive. + +* a complex plugin that does heavy transformations on data in route. + +* repeating an operation for each message, when doing it once per batch would do. + + It defaults to inactive, but may be set to identify transient issues. runStateThreshold_hung (default: 450) diff --git a/docs/source/fr/Reference/sr3_options.7.rst b/docs/source/fr/Reference/sr3_options.7.rst index 2bc563409..35c5dbb97 100644 --- a/docs/source/fr/Reference/sr3_options.7.rst +++ b/docs/source/fr/Reference/sr3_options.7.rst @@ -1581,6 +1581,14 @@ Ce test ne s'appliquera que si un flux transfère réellement des messages. Le taux n'est visible que dans *sr3 --full status* Cela peut indiquer que l'acheminement est excessivement coûteux ou que les transferts sont excessivement lents. +Exemples qui pourraient y contribuer : + +* une centaine d'expressions régulières doivent être évaluées par message reçu. les expressions régulières, une fois cumulées, peuvent coûter cher. + +* un plugin complexe qui effectue de lourdes transformations sur les données en cours de route. + +* répéter une opération pour chaque message, alors qu'il suffirait de la faire une fois par lot. + Par défaut, il est inactif, mais peut être défini pour identifier des problèmes temporaires.