Skip to content

Commit

Permalink
Merge pull request #1105 from MetPX/issue1104_msgRateCpu
Browse files Browse the repository at this point in the history
Issue1104 msg rate cpu
  • Loading branch information
petersilva authored Jun 13, 2024
2 parents f5b8f18 + 01b8a46 commit 5b66b38
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 14 deletions.
21 changes: 21 additions & 0 deletions docs/source/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,27 @@ a file before it is aged out of a the queue. Default is two days. If a file ha
been transferred after two days of attempts, it is discarded.


runStateThreshold_cpuSlow <count> (default: 0)
----------------------------------------------

The *runStateThreshold_cpuSlow* setting sets the minimum rate of transfer expected for flow
processing messages. If the messages processed per cpu second rate drops below this threshold,
then the flow will be identified as "cpuSlow." (shown as cpuS on the *sr3 status* display.)
This test will only apply if a flow is actually transferring messages.
The rate is only visible in *sr3 --full status*

This may indicate that the routing is inordinately expensive or the transfers inordinately slow.
Examples that could contribute to this:

* one hundred regular expressions must be evaluated per message received. Regex's, when cumulated, can get expensive.

* a complex plugin that does heavy transformations on data in route.

* repeating an operation for each message, when doing it once per batch would do.


It defaults to inactive, but may be set to identify transient issues.

runStateThreshold_hung <interval> (default: 450)
------------------------------------------------

Expand Down
20 changes: 20 additions & 0 deletions docs/source/fr/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1571,6 +1571,26 @@ L’option **retry_ttl** (nouvelle tentative de durée de vie) indique combien d
un fichier avant qu’il ne soit rejeté de la fil d’attente. Le défaut est de deux jours. Si un fichier n’a pas
été transféré après deux jours de tentatives, il est jeté.

runStateThreshold_cpuSlow <count> (par défaut : 0)
---------------------------------------------------

Le paramètre *runStateThreshold_cpuSlow* définit le taux de transfert minimum attendu pour le flux
de messages. Si le débit des messages traités par seconde CPU tombe en dessous de ce seuil,
alors le flux sera identifié comme « cpuSlow ». (affiché comme cpuS sur l'écran *sr3 status*.)
Ce test ne s'appliquera que si un flux transfère réellement des messages.
Le taux n'est visible que dans *sr3 --full status*

Cela peut indiquer que l'acheminement est excessivement coûteux ou que les transferts sont excessivement lents.
Exemples qui pourraient y contribuer :

* une centaine d'expressions régulières doivent être évaluées par message reçu. les expressions régulières, une fois cumulées, peuvent coûter cher.

* un plugin complexe qui effectue de lourdes transformations sur les données en cours de route.

* répéter une opération pour chaque message, alors qu'il suffirait de la faire une fois par lot.

Par défaut, il est inactif, mais peut être défini pour identifier des problèmes temporaires.


runStateThreshold_hung <intervalle> (défaut: 450s)
--------------------------------------------------
Expand Down
9 changes: 5 additions & 4 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ def __repr__(self) -> str:
'dry_run': False,
'filename': None,
'flowMain': None,
'runStateThreshold_idle': 900,
'inflight': None,
'inline': False,
'inlineOnly': False,
Expand All @@ -101,7 +100,6 @@ def __repr__(self) -> str:
'logMetrics': False,
'logStdout': False,
'metrics_writeInterval': 5,
'runStateThreshold_lag': 30,
'nodupe_driver': 'disk',
'nodupe_ttl': 0,
'overwrite': True,
Expand All @@ -119,8 +117,11 @@ def __repr__(self) -> str:
'report': False,
'retryEmptyBeforeExit': False,
'retry_refilter': False,
'runStateThreshold_retry': 1000,
'runStateThreshold_cpuSlow': 0,
'runStateThreshold_hung': 450,
'runStateThreshold_idle': 900,
'runStateThreshold_lag': 30,
'runStateThreshold_retry': 1000,
'runStateThreshold_slow': 0,
'sourceFromExchange': False,
'sourceFromMessage': False,
Expand All @@ -136,7 +137,7 @@ def __repr__(self) -> str:
count_options = [
'batch', 'count', 'exchangeSplit', 'instances', 'logRotateCount', 'no',
'post_exchangeSplit', 'prefetch', 'messageCountMax', 'messageRateMax',
'messageRateMin', 'runStateThreshold_reject', 'runStateThreshold_retry', 'runStateThreshold_slow'
'messageRateMin', 'runStateThreshold_cpuSlow', 'runStateThreshold_reject', 'runStateThreshold_retry', 'runStateThreshold_slow',
]


Expand Down
11 changes: 10 additions & 1 deletion sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ def metricsFlowReset(self) -> None:

self.new_metrics = { 'flow': { 'stop_requested': False, 'last_housekeeping': 0,
'transferConnected': False, 'transferConnectStart': 0, 'transferConnectTime':0,
'transferRxBytes': 0, 'transferTxBytes': 0, 'transferRxFiles': 0, 'transferTxFiles': 0 } }
'transferRxBytes': 0, 'transferTxBytes': 0, 'transferRxFiles': 0, 'transferTxFiles': 0,
'last_housekeeping_cpuTime': 0, 'cpuTime' : 0, } }

# carry over some metrics... that don't reset.
if hasattr(self,'metrics'):
Expand Down Expand Up @@ -364,6 +365,8 @@ def _runCallbackMetrics(self):
except Exception as ex:
logger.error( f'flowCallback plugin {p}/metricsReport crashed: {ex}' )
logger.debug( "details:", exc_info=True )
ost = os.times()
self.metrics['flow']['cpuTime'] = ost.user+ost.system-self.metrics['flow']['last_housekeeping_cpuTime']

def _runCallbackPoll(self):
if hasattr(self, "Poll"):
Expand Down Expand Up @@ -412,6 +415,9 @@ def _runHousekeeping(self, now) -> float:
self.runCallbacksTime('on_housekeeping')
self.metricsFlowReset()
self.metrics['flow']['last_housekeeping'] = now
ost = os.times()
self.metrics['flow']['last_housekeeping_cpuTime'] = ost.user+ost.system
self.metrics['flow']['cpuTime'] = ost.user+ost.system

next_housekeeping = now + self.o.housekeeping
self.metrics['flow']['next_housekeeping'] = next_housekeeping
Expand Down Expand Up @@ -529,6 +535,8 @@ def run(self):
current_sleep = self.o.sleep
last_time = start_time
self.metrics['flow']['last_housekeeping'] = start_time
ost=os.times()
self.metrics['flow']['last_housekeeping_cpuTime'] = ost.user+ost.system

if self.o.logLevel == 'debug':
logger.debug("options:")
Expand Down Expand Up @@ -600,6 +608,7 @@ def run(self):
elapsed = now - last_time

self.metrics['flow']['msgRate'] = current_rate
self.metrics['flow']['msgRateCpu'] = total_messages / (self.metrics['flow']['cpuTime']+self.metrics['flow']['last_housekeeping_cpuTime'] )

if (last_gather_len == 0) and (self.o.sleep < 0):
if (self.o.retryEmptyBeforeExit and "retry" in self.metrics
Expand Down
30 changes: 21 additions & 9 deletions sarracenia/sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@

logger = logging.getLogger(__name__)

empty_metrics={ "byteRate":0, "rejectCount":0, "last_housekeeping":0, "messagesQueued": 0,
empty_metrics={ "byteRate":0, "cpuTime":0, "rejectCount":0, "last_housekeeping":0, "messagesQueued": 0,
"lagMean": 0, "latestTransfer": 0, "rejectPercent":0, "transferRxByteRate":0, "transferTxByteRate": 0,
"rxByteCount":0, "rxGoodCount":0, "rxBadCount":0, "txByteCount":0, "txGoodCount":0, "txBadCount":0,
"lagMax":0, "lagTotal":0, "lagMessageCount":0, "disconnectTime":0, "transferConnectTime":0,
"transferRxLast": 0, "transferTxLast": 0, "rxLast":0, "txLast":0,
"transferRxBytes":0, "transferRxFiles":0, "transferTxBytes": 0, "transferTxFiles": 0,
"msgs_in_post_retry": 0, "msgs_in_download_retry":0, "brokerQueuedMessageCount": 0,
'time_base': 0, 'byteTotal': 0, 'byteRate': 0, 'msgRate': 0, 'retry': 0, 'transferLast': 0,
'time_base': 0, 'byteTotal': 0, 'byteRate': 0, 'msgRate': 0, 'msgRateCpu': 0, 'retry': 0, 'transferLast': 0,
'connectPercent': 0, 'byteConnectPercent': 0
}

Expand Down Expand Up @@ -894,7 +894,7 @@ def _resolve(self):
'rxLagTime':0, 'rxLagCount':0,
'rxMessageQueued':0, 'rxMessageRetry':0,
'txMessageQueued':0, 'txMessageRetry':0,
'rxMessageRate':0, 'rxDataRate':0, 'rxFileRate':0, 'rxMessageByteRate':0,
'rxMessageRate':0, 'rxMessageRateCpu':0, 'rxDataRate':0, 'rxFileRate':0, 'rxMessageByteRate':0,
'txMessageRate':0, 'txDataRate':0, 'txFileRate':0, 'txMessageByteRate':0
}
for c in self.components:
Expand Down Expand Up @@ -953,6 +953,8 @@ def _resolve(self):
metrics['txLast'] = newval
if 'messageLast' not in metrics or (newval > metrics['messageLast']):
metrics['messageLast'] = newval
elif k in [ "cpuTime" ]:
metrics['cpuTime'] += newval
else:
metrics[k] += newval
#else:
Expand Down Expand Up @@ -999,9 +1001,14 @@ def _resolve(self):

m['byteRate'] = byteTotal/time_base
m['msgRate'] = (m["rxGoodCount"]+m["rxBadCount"])/time_base
if m['cpuTime'] > 0:
m['msgRateCpu'] = (m["rxGoodCount"]+m["rxBadCount"])/m['cpuTime']
else:
m['msgRateCpu'] = 0

self.cumulative_stats['rxMessageByteRate'] += m['byteRate']
self.cumulative_stats['rxMessageRate'] += m['msgRate']
self.cumulative_stats['rxMessageRateCpu'] += m['msgRateCpu']

m['transferRxByteRate'] = m['transferRxBytes']/time_base
m['transferRxFileRate'] = m['transferRxFiles']/time_base
Expand Down Expand Up @@ -1110,6 +1117,10 @@ def _resolve(self):
else:
flow_status = 'running'

if self.states[c][cfg]['metrics']['msgRate'] > 0 and \
self.states[c][cfg]['metrics']['msgRateCpu'] < self.configs[c][cfg]['options'].runStateThreshold_cpuSlow:
flow_status = 'cpuSlow'

self.states[c][cfg]['resource_usage'] = copy.deepcopy(resource_usage)
self.configs[c][cfg]['status'] = flow_status

Expand Down Expand Up @@ -1285,7 +1296,7 @@ def __init__(self, opt, config_fnmatches=None):
'sender', 'shovel', 'subscribe', 'watch', 'winnow'
]
# active means >= 1 process exists on the node.
self.status_active = ['hung', 'idle', 'lagging', 'partial', 'reject', 'retry', 'running', 'slow', 'waitVip' ]
self.status_active = ['cpuSlow', 'hung', 'idle', 'lagging', 'partial', 'reject', 'retry', 'running', 'slow', 'waitVip' ]
self.status_values = self.status_active + [ 'disabled', 'include', 'missing', 'stopped', 'unknown' ]

self.bin_dir = os.path.dirname(os.path.realpath(__file__))
Expand Down Expand Up @@ -2505,7 +2516,7 @@ def status(self):
line = "%-40s %-11s %7s %10s %19s %14s %38s " % ("Component/Config", "Processes", "Connection", "Lag", "", "Rates", "" )

if self.options.displayFull:
line += "%-40s %17s %33s %40s" % ("Counters (per housekeeping)", "", "Data Counters", "" )
line += "%10s %-40s %17s %33s %40s" % ("", "Counters (per housekeeping)", "", "Data Counters", "" )
line += "%s %-21s " % (" ", "Memory" )

if self.options.displayFull:
Expand All @@ -2518,8 +2529,8 @@ def status(self):
underline = "%-40s %-5s %5s %5s %4s %4s %8s %7s %5s %5s %5s %10s %-10s %-10s %-10s " % ("", "-----", "---", "-----", "---", "----", "------", "------", "------", "----", "----", "------", "--------", "------", "------" )

if self.options.displayFull:
line += "%10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %8s" % \
( "subBytes", "Accepted", "Rejected", "Malformed", "pubBytes", "pubMsgs", "pubMal", "rxData", "rxFiles", "txData", "txFiles", "Since" )
line += "%10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %8s" % \
( "Msg/scpu", "subBytes", "Accepted", "Rejected", "Malformed", "pubBytes", "pubMsgs", "pubMal", "rxData", "rxFiles", "txData", "txFiles", "Since" )
underline += "%10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %8s %10s " % \
( "-------", "--------", "--------", "---------", "-------", "------", "-----", "-----", "-------", "------", "-------", "-----" )

Expand Down Expand Up @@ -2588,7 +2599,8 @@ def status(self):
)

if self.options.displayFull :
line += " %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %7.2fs" % ( \
line += " %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %7.2fs" % ( \
naturalSize(m['msgRateCpu']).replace("B","m").replace("mytes","msgs"), \
naturalSize(m['rxByteCount']), \
naturalSize(m['rxGoodCount']).replace("B","m").replace("myte","msg"), \
naturalSize(m["rejectCount"]).replace("B","m").replace("myte","msg"), \
Expand All @@ -2600,7 +2612,7 @@ def status(self):
naturalSize(m["transferRxFiles"]).replace("B","F").replace("Fyte","File"), \
naturalSize(m["transferTxBytes"]), \
naturalSize(m["transferTxFiles"]).replace("B","F").replace("Fyte","File"), \
time_base )
m["time_base"] )
else:
line += " %10s %10s %9s %5s %5s %10s %8s" % ( "-", "-", "-", "-", "-", "-", "-" )
if self.options.displayFull:
Expand Down

0 comments on commit 5b66b38

Please sign in to comment.