Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue1104 msg rate cpu #1105

Merged
merged 5 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/source/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,27 @@ a file before it is aged out of a the queue. Default is two days. If a file ha
been transferred after two days of attempts, it is discarded.


runStateThreshold_cpuSlow <count> (default: 0)
----------------------------------------------

The *runStateThreshold_cpuSlow* setting sets the minimum rate of transfer expected for flow
processing messages. If the messages processed per cpu second rate drops below this threshold,
then the flow will be identified as "cpuSlow." (shown as cpuS on the *sr3 status* display.)
This test will only apply if a flow is actually transferring messages.
The rate is only visible in *sr3 --full status*

This may indicate that the routing is inordinately expensive or the transfers inordinately slow.
Examples that could contribute to this:

* one hundred regular expressions must be evaluated per message received. Regex's, when cumulated, can get expensive.

* a complex plugin that does heavy transformations on data in route.

* repeating an operation for each message, when doing it once per batch would do.


It defaults to inactive, but may be set to identify transient issues.

runStateThreshold_hung <interval> (default: 450)
------------------------------------------------

Expand Down
20 changes: 20 additions & 0 deletions docs/source/fr/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1571,6 +1571,26 @@ L’option **retry_ttl** (nouvelle tentative de durée de vie) indique combien d
un fichier avant qu’il ne soit rejeté de la fil d’attente. Le défaut est de deux jours. Si un fichier n’a pas
été transféré après deux jours de tentatives, il est jeté.

runStateThreshold_cpuSlow <count> (par défaut : 0)
---------------------------------------------------

Le paramètre *runStateThreshold_cpuSlow* définit le taux de transfert minimum attendu pour le flux
de messages. Si le débit des messages traités par seconde CPU tombe en dessous de ce seuil,
alors le flux sera identifié comme « cpuSlow ». (affiché comme cpuS sur l'écran *sr3 status*.)
Ce test ne s'appliquera que si un flux transfère réellement des messages.
Le taux n'est visible que dans *sr3 --full status*

Cela peut indiquer que l'acheminement est excessivement coûteux ou que les transferts sont excessivement lents.
Exemples qui pourraient y contribuer :

* une centaine d'expressions régulières doivent être évaluées par message reçu. les expressions régulières, une fois cumulées, peuvent coûter cher.

* un plugin complexe qui effectue de lourdes transformations sur les données en cours de route.

* répéter une opération pour chaque message, alors qu'il suffirait de la faire une fois par lot.

Par défaut, il est inactif, mais peut être défini pour identifier des problèmes temporaires.


runStateThreshold_hung <intervalle> (défaut: 450s)
--------------------------------------------------
Expand Down
9 changes: 5 additions & 4 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ def __repr__(self) -> str:
'dry_run': False,
'filename': None,
'flowMain': None,
'runStateThreshold_idle': 900,
'inflight': None,
'inline': False,
'inlineOnly': False,
Expand All @@ -101,7 +100,6 @@ def __repr__(self) -> str:
'logMetrics': False,
'logStdout': False,
'metrics_writeInterval': 5,
'runStateThreshold_lag': 30,
'nodupe_driver': 'disk',
'nodupe_ttl': 0,
'overwrite': True,
Expand All @@ -119,8 +117,11 @@ def __repr__(self) -> str:
'report': False,
'retryEmptyBeforeExit': False,
'retry_refilter': False,
'runStateThreshold_retry': 1000,
'runStateThreshold_cpuSlow': 0,
'runStateThreshold_hung': 450,
'runStateThreshold_idle': 900,
'runStateThreshold_lag': 30,
'runStateThreshold_retry': 1000,
'runStateThreshold_slow': 0,
'sourceFromExchange': False,
'sourceFromMessage': False,
Expand All @@ -136,7 +137,7 @@ def __repr__(self) -> str:
count_options = [
'batch', 'count', 'exchangeSplit', 'instances', 'logRotateCount', 'no',
'post_exchangeSplit', 'prefetch', 'messageCountMax', 'messageRateMax',
'messageRateMin', 'runStateThreshold_reject', 'runStateThreshold_retry', 'runStateThreshold_slow'
'messageRateMin', 'runStateThreshold_cpuSlow', 'runStateThreshold_reject', 'runStateThreshold_retry', 'runStateThreshold_slow',
]


Expand Down
11 changes: 10 additions & 1 deletion sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ def metricsFlowReset(self) -> None:

self.new_metrics = { 'flow': { 'stop_requested': False, 'last_housekeeping': 0,
'transferConnected': False, 'transferConnectStart': 0, 'transferConnectTime':0,
'transferRxBytes': 0, 'transferTxBytes': 0, 'transferRxFiles': 0, 'transferTxFiles': 0 } }
'transferRxBytes': 0, 'transferTxBytes': 0, 'transferRxFiles': 0, 'transferTxFiles': 0,
'last_housekeeping_cpuTime': 0, 'cpuTime' : 0, } }

# carry over some metrics... that don't reset.
if hasattr(self,'metrics'):
Expand Down Expand Up @@ -364,6 +365,8 @@ def _runCallbackMetrics(self):
except Exception as ex:
logger.error( f'flowCallback plugin {p}/metricsReport crashed: {ex}' )
logger.debug( "details:", exc_info=True )
ost = os.times()
self.metrics['flow']['cpuTime'] = ost.user+ost.system-self.metrics['flow']['last_housekeeping_cpuTime']

def _runCallbackPoll(self):
if hasattr(self, "Poll"):
Expand Down Expand Up @@ -412,6 +415,9 @@ def _runHousekeeping(self, now) -> float:
self.runCallbacksTime('on_housekeeping')
self.metricsFlowReset()
self.metrics['flow']['last_housekeeping'] = now
ost = os.times()
self.metrics['flow']['last_housekeeping_cpuTime'] = ost.user+ost.system
self.metrics['flow']['cpuTime'] = ost.user+ost.system

next_housekeeping = now + self.o.housekeeping
self.metrics['flow']['next_housekeeping'] = next_housekeeping
Expand Down Expand Up @@ -529,6 +535,8 @@ def run(self):
current_sleep = self.o.sleep
last_time = start_time
self.metrics['flow']['last_housekeeping'] = start_time
ost=os.times()
self.metrics['flow']['last_housekeeping_cpuTime'] = ost.user+ost.system

if self.o.logLevel == 'debug':
logger.debug("options:")
Expand Down Expand Up @@ -600,6 +608,7 @@ def run(self):
elapsed = now - last_time

self.metrics['flow']['msgRate'] = current_rate
self.metrics['flow']['msgRateCpu'] = total_messages / (self.metrics['flow']['cpuTime']+self.metrics['flow']['last_housekeeping_cpuTime'] )

if (last_gather_len == 0) and (self.o.sleep < 0):
if (self.o.retryEmptyBeforeExit and "retry" in self.metrics
Expand Down
30 changes: 21 additions & 9 deletions sarracenia/sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@

logger = logging.getLogger(__name__)

empty_metrics={ "byteRate":0, "rejectCount":0, "last_housekeeping":0, "messagesQueued": 0,
empty_metrics={ "byteRate":0, "cpuTime":0, "rejectCount":0, "last_housekeeping":0, "messagesQueued": 0,
"lagMean": 0, "latestTransfer": 0, "rejectPercent":0, "transferRxByteRate":0, "transferTxByteRate": 0,
"rxByteCount":0, "rxGoodCount":0, "rxBadCount":0, "txByteCount":0, "txGoodCount":0, "txBadCount":0,
"lagMax":0, "lagTotal":0, "lagMessageCount":0, "disconnectTime":0, "transferConnectTime":0,
"transferRxLast": 0, "transferTxLast": 0, "rxLast":0, "txLast":0,
"transferRxBytes":0, "transferRxFiles":0, "transferTxBytes": 0, "transferTxFiles": 0,
"msgs_in_post_retry": 0, "msgs_in_download_retry":0, "brokerQueuedMessageCount": 0,
'time_base': 0, 'byteTotal': 0, 'byteRate': 0, 'msgRate': 0, 'retry': 0, 'transferLast': 0,
'time_base': 0, 'byteTotal': 0, 'byteRate': 0, 'msgRate': 0, 'msgRateCpu': 0, 'retry': 0, 'transferLast': 0,
'connectPercent': 0, 'byteConnectPercent': 0
}

Expand Down Expand Up @@ -894,7 +894,7 @@ def _resolve(self):
'rxLagTime':0, 'rxLagCount':0,
'rxMessageQueued':0, 'rxMessageRetry':0,
'txMessageQueued':0, 'txMessageRetry':0,
'rxMessageRate':0, 'rxDataRate':0, 'rxFileRate':0, 'rxMessageByteRate':0,
'rxMessageRate':0, 'rxMessageRateCpu':0, 'rxDataRate':0, 'rxFileRate':0, 'rxMessageByteRate':0,
'txMessageRate':0, 'txDataRate':0, 'txFileRate':0, 'txMessageByteRate':0
}
for c in self.components:
Expand Down Expand Up @@ -953,6 +953,8 @@ def _resolve(self):
metrics['txLast'] = newval
if 'messageLast' not in metrics or (newval > metrics['messageLast']):
metrics['messageLast'] = newval
elif k in [ "cpuTime" ]:
metrics['cpuTime'] += newval
else:
metrics[k] += newval
#else:
Expand Down Expand Up @@ -999,9 +1001,14 @@ def _resolve(self):

m['byteRate'] = byteTotal/time_base
m['msgRate'] = (m["rxGoodCount"]+m["rxBadCount"])/time_base
if m['cpuTime'] > 0:
m['msgRateCpu'] = (m["rxGoodCount"]+m["rxBadCount"])/m['cpuTime']
else:
m['msgRateCpu'] = 0

self.cumulative_stats['rxMessageByteRate'] += m['byteRate']
self.cumulative_stats['rxMessageRate'] += m['msgRate']
self.cumulative_stats['rxMessageRateCpu'] += m['msgRateCpu']

m['transferRxByteRate'] = m['transferRxBytes']/time_base
m['transferRxFileRate'] = m['transferRxFiles']/time_base
Expand Down Expand Up @@ -1110,6 +1117,10 @@ def _resolve(self):
else:
flow_status = 'running'

if self.states[c][cfg]['metrics']['msgRate'] > 0 and \
self.states[c][cfg]['metrics']['msgRateCpu'] < self.configs[c][cfg]['options'].runStateThreshold_cpuSlow:
flow_status = 'cpuSlow'

self.states[c][cfg]['resource_usage'] = copy.deepcopy(resource_usage)
self.configs[c][cfg]['status'] = flow_status

Expand Down Expand Up @@ -1285,7 +1296,7 @@ def __init__(self, opt, config_fnmatches=None):
'sender', 'shovel', 'subscribe', 'watch', 'winnow'
]
# active means >= 1 process exists on the node.
self.status_active = ['hung', 'idle', 'lagging', 'partial', 'reject', 'retry', 'running', 'slow', 'waitVip' ]
self.status_active = ['cpuSlow', 'hung', 'idle', 'lagging', 'partial', 'reject', 'retry', 'running', 'slow', 'waitVip' ]
self.status_values = self.status_active + [ 'disabled', 'include', 'missing', 'stopped', 'unknown' ]

self.bin_dir = os.path.dirname(os.path.realpath(__file__))
Expand Down Expand Up @@ -2505,7 +2516,7 @@ def status(self):
line = "%-40s %-11s %7s %10s %19s %14s %38s " % ("Component/Config", "Processes", "Connection", "Lag", "", "Rates", "" )

if self.options.displayFull:
line += "%-40s %17s %33s %40s" % ("Counters (per housekeeping)", "", "Data Counters", "" )
line += "%10s %-40s %17s %33s %40s" % ("", "Counters (per housekeeping)", "", "Data Counters", "" )
line += "%s %-21s " % (" ", "Memory" )

if self.options.displayFull:
Expand All @@ -2518,8 +2529,8 @@ def status(self):
underline = "%-40s %-5s %5s %5s %4s %4s %8s %7s %5s %5s %5s %10s %-10s %-10s %-10s " % ("", "-----", "---", "-----", "---", "----", "------", "------", "------", "----", "----", "------", "--------", "------", "------" )

if self.options.displayFull:
line += "%10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %8s" % \
( "subBytes", "Accepted", "Rejected", "Malformed", "pubBytes", "pubMsgs", "pubMal", "rxData", "rxFiles", "txData", "txFiles", "Since" )
line += "%10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %8s" % \
( "Msg/scpu", "subBytes", "Accepted", "Rejected", "Malformed", "pubBytes", "pubMsgs", "pubMal", "rxData", "rxFiles", "txData", "txFiles", "Since" )
underline += "%10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %8s %10s " % \
( "-------", "--------", "--------", "---------", "-------", "------", "-----", "-----", "-------", "------", "-------", "-----" )

Expand Down Expand Up @@ -2588,7 +2599,8 @@ def status(self):
)

if self.options.displayFull :
line += " %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %7.2fs" % ( \
line += " %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %10s %7.2fs" % ( \
naturalSize(m['msgRateCpu']).replace("B","m").replace("mytes","msgs"), \
naturalSize(m['rxByteCount']), \
naturalSize(m['rxGoodCount']).replace("B","m").replace("myte","msg"), \
naturalSize(m["rejectCount"]).replace("B","m").replace("myte","msg"), \
Expand All @@ -2600,7 +2612,7 @@ def status(self):
naturalSize(m["transferRxFiles"]).replace("B","F").replace("Fyte","File"), \
naturalSize(m["transferTxBytes"]), \
naturalSize(m["transferTxFiles"]).replace("B","F").replace("Fyte","File"), \
time_base )
m["time_base"] )
else:
line += " %10s %10s %9s %5s %5s %10s %8s" % ( "-", "-", "-", "-", "-", "-", "-" )
if self.options.displayFull:
Expand Down
Loading