diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index 88df2c07c..effc57daf 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -1591,6 +1591,27 @@ a file before it is aged out of a the queue. Default is two days. If a file ha been transferred after two days of attempts, it is discarded. +runStateThreshold_cpuSlow (default: 0) +---------------------------------------------- + +The *runStateThreshold_cpuSlow* setting sets the minimum rate of transfer expected for flow +processing messages. If the messages processed per cpu second rate drops below this threshold, +then the flow will be identified as "cpuSlow." (shown as cpuS on the *sr3 status* display.) +This test will only apply if a flow is actually transferring messages. +The rate is only visible in *sr3 --full status* + +This may indicate that the routing is inordinately expensive or the transfers inordinately slow. +Examples that could contribute to this: + +* one hundred regular expressions must be evaluated per message received. Regex's, when cumulated, can get expensive. + +* a complex plugin that does heavy transformations on data in route. + +* repeating an operation for each message, when doing it once per batch would do. + + +It defaults to inactive, but may be set to identify transient issues. + runStateThreshold_hung (default: 450) ------------------------------------------------ diff --git a/docs/source/fr/Reference/sr3_options.7.rst b/docs/source/fr/Reference/sr3_options.7.rst index 4ba782392..35c5dbb97 100644 --- a/docs/source/fr/Reference/sr3_options.7.rst +++ b/docs/source/fr/Reference/sr3_options.7.rst @@ -1571,6 +1571,26 @@ L’option **retry_ttl** (nouvelle tentative de durée de vie) indique combien d un fichier avant qu’il ne soit rejeté de la fil d’attente. Le défaut est de deux jours. Si un fichier n’a pas été transféré après deux jours de tentatives, il est jeté. +runStateThreshold_cpuSlow (par défaut : 0) +--------------------------------------------------- + +Le paramètre *runStateThreshold_cpuSlow* définit le taux de transfert minimum attendu pour le flux +de messages. Si le débit des messages traités par seconde CPU tombe en dessous de ce seuil, +alors le flux sera identifié comme « cpuSlow ». (affiché comme cpuS sur l'écran *sr3 status*.) +Ce test ne s'appliquera que si un flux transfère réellement des messages. +Le taux n'est visible que dans *sr3 --full status* + +Cela peut indiquer que l'acheminement est excessivement coûteux ou que les transferts sont excessivement lents. +Exemples qui pourraient y contribuer : + +* une centaine d'expressions régulières doivent être évaluées par message reçu. les expressions régulières, une fois cumulées, peuvent coûter cher. + +* un plugin complexe qui effectue de lourdes transformations sur les données en cours de route. + +* répéter une opération pour chaque message, alors qu'il suffirait de la faire une fois par lot. + +Par défaut, il est inactif, mais peut être défini pour identifier des problèmes temporaires. + runStateThreshold_hung (défaut: 450s) -------------------------------------------------- diff --git a/sarracenia/config.py b/sarracenia/config.py index 6d3c636ee..f69a0b474 100755 --- a/sarracenia/config.py +++ b/sarracenia/config.py @@ -92,7 +92,6 @@ def __repr__(self) -> str: 'dry_run': False, 'filename': None, 'flowMain': None, - 'runStateThreshold_idle': 900, 'inflight': None, 'inline': False, 'inlineOnly': False, @@ -101,7 +100,6 @@ def __repr__(self) -> str: 'logMetrics': False, 'logStdout': False, 'metrics_writeInterval': 5, - 'runStateThreshold_lag': 30, 'nodupe_driver': 'disk', 'nodupe_ttl': 0, 'overwrite': True, @@ -119,8 +117,11 @@ def __repr__(self) -> str: 'report': False, 'retryEmptyBeforeExit': False, 'retry_refilter': False, - 'runStateThreshold_retry': 1000, + 'runStateThreshold_cpuSlow': 0, 'runStateThreshold_hung': 450, + 'runStateThreshold_idle': 900, + 'runStateThreshold_lag': 30, + 'runStateThreshold_retry': 1000, 'runStateThreshold_slow': 0, 'sourceFromExchange': False, 'sourceFromMessage': False, @@ -136,7 +137,7 @@ def __repr__(self) -> str: count_options = [ 'batch', 'count', 'exchangeSplit', 'instances', 'logRotateCount', 'no', 'post_exchangeSplit', 'prefetch', 'messageCountMax', 'messageRateMax', - 'messageRateMin', 'runStateThreshold_reject', 'runStateThreshold_retry', 'runStateThreshold_slow' + 'messageRateMin', 'runStateThreshold_cpuSlow', 'runStateThreshold_reject', 'runStateThreshold_retry', 'runStateThreshold_slow', ] diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index f1deeb6c5..0f1c8a580 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -218,7 +218,8 @@ def metricsFlowReset(self) -> None: self.new_metrics = { 'flow': { 'stop_requested': False, 'last_housekeeping': 0, 'transferConnected': False, 'transferConnectStart': 0, 'transferConnectTime':0, - 'transferRxBytes': 0, 'transferTxBytes': 0, 'transferRxFiles': 0, 'transferTxFiles': 0 } } + 'transferRxBytes': 0, 'transferTxBytes': 0, 'transferRxFiles': 0, 'transferTxFiles': 0, + 'last_housekeeping_cpuTime': 0, 'cpuTime' : 0, } } # carry over some metrics... that don't reset. if hasattr(self,'metrics'): @@ -364,6 +365,8 @@ def _runCallbackMetrics(self): except Exception as ex: logger.error( f'flowCallback plugin {p}/metricsReport crashed: {ex}' ) logger.debug( "details:", exc_info=True ) + ost = os.times() + self.metrics['flow']['cpuTime'] = ost.user+ost.system-self.metrics['flow']['last_housekeeping_cpuTime'] def _runCallbackPoll(self): if hasattr(self, "Poll"): @@ -412,6 +415,9 @@ def _runHousekeeping(self, now) -> float: self.runCallbacksTime('on_housekeeping') self.metricsFlowReset() self.metrics['flow']['last_housekeeping'] = now + ost = os.times() + self.metrics['flow']['last_housekeeping_cpuTime'] = ost.user+ost.system + self.metrics['flow']['cpuTime'] = ost.user+ost.system next_housekeeping = now + self.o.housekeeping self.metrics['flow']['next_housekeeping'] = next_housekeeping @@ -529,6 +535,8 @@ def run(self): current_sleep = self.o.sleep last_time = start_time self.metrics['flow']['last_housekeeping'] = start_time + ost=os.times() + self.metrics['flow']['last_housekeeping_cpuTime'] = ost.user+ost.system if self.o.logLevel == 'debug': logger.debug("options:") @@ -600,6 +608,7 @@ def run(self): elapsed = now - last_time self.metrics['flow']['msgRate'] = current_rate + self.metrics['flow']['msgRateCpu'] = total_messages / (self.metrics['flow']['cpuTime']+self.metrics['flow']['last_housekeeping_cpuTime'] ) if (last_gather_len == 0) and (self.o.sleep < 0): if (self.o.retryEmptyBeforeExit and "retry" in self.metrics diff --git a/sarracenia/sr.py b/sarracenia/sr.py index 302fc0252..838b85394 100755 --- a/sarracenia/sr.py +++ b/sarracenia/sr.py @@ -55,14 +55,14 @@ logger = logging.getLogger(__name__) -empty_metrics={ "byteRate":0, "rejectCount":0, "last_housekeeping":0, "messagesQueued": 0, +empty_metrics={ "byteRate":0, "cpuTime":0, "rejectCount":0, "last_housekeeping":0, "messagesQueued": 0, "lagMean": 0, "latestTransfer": 0, "rejectPercent":0, "transferRxByteRate":0, "transferTxByteRate": 0, "rxByteCount":0, "rxGoodCount":0, "rxBadCount":0, "txByteCount":0, "txGoodCount":0, "txBadCount":0, "lagMax":0, "lagTotal":0, "lagMessageCount":0, "disconnectTime":0, "transferConnectTime":0, "transferRxLast": 0, "transferTxLast": 0, "rxLast":0, "txLast":0, "transferRxBytes":0, "transferRxFiles":0, "transferTxBytes": 0, "transferTxFiles": 0, "msgs_in_post_retry": 0, "msgs_in_download_retry":0, "brokerQueuedMessageCount": 0, - 'time_base': 0, 'byteTotal': 0, 'byteRate': 0, 'msgRate': 0, 'retry': 0, 'transferLast': 0, + 'time_base': 0, 'byteTotal': 0, 'byteRate': 0, 'msgRate': 0, 'msgRateCpu': 0, 'retry': 0, 'transferLast': 0, 'connectPercent': 0, 'byteConnectPercent': 0 } @@ -894,7 +894,7 @@ def _resolve(self): 'rxLagTime':0, 'rxLagCount':0, 'rxMessageQueued':0, 'rxMessageRetry':0, 'txMessageQueued':0, 'txMessageRetry':0, - 'rxMessageRate':0, 'rxDataRate':0, 'rxFileRate':0, 'rxMessageByteRate':0, + 'rxMessageRate':0, 'rxMessageRateCpu':0, 'rxDataRate':0, 'rxFileRate':0, 'rxMessageByteRate':0, 'txMessageRate':0, 'txDataRate':0, 'txFileRate':0, 'txMessageByteRate':0 } for c in self.components: @@ -953,6 +953,8 @@ def _resolve(self): metrics['txLast'] = newval if 'messageLast' not in metrics or (newval > metrics['messageLast']): metrics['messageLast'] = newval + elif k in [ "cpuTime" ]: + metrics['cpuTime'] += newval else: metrics[k] += newval #else: @@ -999,9 +1001,14 @@ def _resolve(self): m['byteRate'] = byteTotal/time_base m['msgRate'] = (m["rxGoodCount"]+m["rxBadCount"])/time_base + if m['cpuTime'] > 0: + m['msgRateCpu'] = (m["rxGoodCount"]+m["rxBadCount"])/m['cpuTime'] + else: + m['msgRateCpu'] = 0 self.cumulative_stats['rxMessageByteRate'] += m['byteRate'] self.cumulative_stats['rxMessageRate'] += m['msgRate'] + self.cumulative_stats['rxMessageRateCpu'] += m['msgRateCpu'] m['transferRxByteRate'] = m['transferRxBytes']/time_base m['transferRxFileRate'] = m['transferRxFiles']/time_base @@ -1110,6 +1117,10 @@ def _resolve(self): else: flow_status = 'running' + if self.states[c][cfg]['metrics']['msgRate'] > 0 and \ + self.states[c][cfg]['metrics']['msgRateCpu'] < self.configs[c][cfg]['options'].runStateThreshold_cpuSlow: + flow_status = 'cpuSlow' + self.states[c][cfg]['resource_usage'] = copy.deepcopy(resource_usage) self.configs[c][cfg]['status'] = flow_status @@ -1285,7 +1296,7 @@ def __init__(self, opt, config_fnmatches=None): 'sender', 'shovel', 'subscribe', 'watch', 'winnow' ] # active means >= 1 process exists on the node. - self.status_active = ['hung', 'idle', 'lagging', 'partial', 'reject', 'retry', 'running', 'slow', 'waitVip' ] + self.status_active = ['cpuSlow', 'hung', 'idle', 'lagging', 'partial', 'reject', 'retry', 'running', 'slow', 'waitVip' ] self.status_values = self.status_active + [ 'disabled', 'include', 'missing', 'stopped', 'unknown' ] self.bin_dir = os.path.dirname(os.path.realpath(__file__)) @@ -2505,7 +2516,7 @@ def status(self): line = "%-40s %-11s %7s %10s %19s %14s %38s " % ("Component/Config", "Processes", "Connection", "Lag", "", "Rates", "" ) if self.options.displayFull: - line += "%-40s %17s %33s %40s" % ("Counters (per housekeeping)", "", "Data Counters", "" ) + line += "%10s %-40s %17s %33s %40s" % ("", "Counters (per housekeeping)", "", "Data Counters", "" ) line += "%s %-21s " % (" ", "Memory" ) if self.options.displayFull: @@ -2518,8 +2529,8 @@ def status(self): underline = "%-40s %-5s %5s %5s %4s %4s %8s %7s %5s %5s %5s %10s %-10s %-10s %-10s " % ("", "-----", "---", "-----", "---", "----", "------", "------", "------", "----", "----", "------", "--------", "------", "------" ) if self.options.displayFull: - line += "%10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %8s" % \ - ( "subBytes", "Accepted", "Rejected", "Malformed", "pubBytes", "pubMsgs", "pubMal", "rxData", "rxFiles", "txData", "txFiles", "Since" ) + line += "%10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %8s" % \ + ( "Msg/scpu", "subBytes", "Accepted", "Rejected", "Malformed", "pubBytes", "pubMsgs", "pubMal", "rxData", "rxFiles", "txData", "txFiles", "Since" ) underline += "%10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %8s %10s " % \ ( "-------", "--------", "--------", "---------", "-------", "------", "-----", "-----", "-------", "------", "-------", "-----" ) @@ -2588,7 +2599,8 @@ def status(self): ) if self.options.displayFull : - line += " %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %7.2fs" % ( \ + line += " %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %7.2fs" % ( \ + naturalSize(m['msgRateCpu']).replace("B","m").replace("mytes","msgs"), \ naturalSize(m['rxByteCount']), \ naturalSize(m['rxGoodCount']).replace("B","m").replace("myte","msg"), \ naturalSize(m["rejectCount"]).replace("B","m").replace("myte","msg"), \ @@ -2600,7 +2612,7 @@ def status(self): naturalSize(m["transferRxFiles"]).replace("B","F").replace("Fyte","File"), \ naturalSize(m["transferTxBytes"]), \ naturalSize(m["transferTxFiles"]).replace("B","F").replace("Fyte","File"), \ - time_base ) + m["time_base"] ) else: line += " %10s %10s %9s %5s %5s %10s %8s" % ( "-", "-", "-", "-", "-", "-", "-" ) if self.options.displayFull: