Skip to content

Commit 14249c5

Browse files
authored
perf: add log level checks to optimize AutoMQ logging performance (#2378)
* perf: add log level checks to optimize AutoMQ logging performance * fix: add @SuppressWarnings("NPathComplexity") to fix checkstyle issues
1 parent 0302285 commit 14249c5

File tree

14 files changed

+115
-41
lines changed

14 files changed

+115
-41
lines changed

automq-shell/src/main/java/com/automq/shell/stream/ClientKVClient.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ private void connect(Node node) throws IOException {
6666
public KeyValue.Value getKV(String key) throws IOException {
6767
long now = Time.SYSTEM.milliseconds();
6868

69-
LOGGER.trace("[ClientKVClient]: Get KV: {}", key);
69+
if (LOGGER.isTraceEnabled()) {
70+
LOGGER.trace("[ClientKVClient]: Get KV: {}", key);
71+
}
7072

7173
GetKVsRequestData data = new GetKVsRequestData()
7274
.setGetKeyRequests(List.of(new GetKVsRequestData.GetKVRequest().setKey(key)));
@@ -88,7 +90,9 @@ public KeyValue.Value getKV(String key) throws IOException {
8890
public KeyValue.Value putKV(String key, byte[] value) throws IOException {
8991
long now = Time.SYSTEM.milliseconds();
9092

91-
LOGGER.trace("[ClientKVClient]: put KV: {}", key);
93+
if (LOGGER.isTraceEnabled()) {
94+
LOGGER.trace("[ClientKVClient]: put KV: {}", key);
95+
}
9296

9397
PutKVsRequestData data = new PutKVsRequestData()
9498
.setPutKVRequests(List.of(new PutKVsRequestData.PutKVRequest().setKey(key).setValue(value)));
@@ -110,7 +114,10 @@ public KeyValue.Value putKV(String key, byte[] value) throws IOException {
110114
public KeyValue.Value deleteKV(String key) throws IOException {
111115
long now = Time.SYSTEM.milliseconds();
112116

113-
LOGGER.trace("[ClientKVClient]: Delete KV: {}", key);
117+
if (LOGGER.isTraceEnabled()) {
118+
LOGGER.trace("[ClientKVClient]: Delete KV: {}", key);
119+
}
120+
114121
DeleteKVsRequestData data = new DeleteKVsRequestData()
115122
.setDeleteKVRequests(List.of(new DeleteKVsRequestData.DeleteKVRequest().setKey(key)));
116123

core/src/main/java/kafka/autobalancer/goals/AbstractGoal.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@ protected Optional<Action> tryMovePartitionOut(ActionParameters parameters) {
5353
candidateActionScores.add(Map.entry(action, score));
5454
}
5555
}
56-
LOGGER.debug("try move partition {} out for broker {}, all possible action score: {} on goal {}", parameters.replica.getTopicPartition(),
56+
if (LOGGER.isDebugEnabled()) {
57+
LOGGER.debug("try move partition {} out for broker {}, all possible action score: {} on goal {}", parameters .replica.getTopicPartition(),
5758
parameters.srcBroker.getBrokerId(), candidateActionScores, name());
59+
}
5860
return getAcceptableAction(candidateActionScores);
5961
}
6062

@@ -76,8 +78,10 @@ protected Optional<Action> trySwapPartitionOut(ActionParameters parameters,
7678
candidate.getBrokerId(), candidateReplica.getTopicPartition());
7779
double score = calculateCandidateActionScores(parameters.goalsByPriority, action, parameters.cluster, parameters.optimizedGoals, parameters.goalsByGroup);
7880
if (score > POSITIVE_ACTION_SCORE_THRESHOLD) {
79-
LOGGER.debug("try swap partition {} out for broker {} with {}, action score: {}", parameters.replica.getTopicPartition(),
81+
if (LOGGER.isDebugEnabled()) {
82+
LOGGER.debug("try swap partition {} out for broker {} with {}, action score: {}", parameters.replica.getTopicPartition(),
8083
parameters.srcBroker.getBrokerId(), candidateReplica.getTopicPartition(), score);
84+
}
8185
return Optional.of(action);
8286
}
8387
}
@@ -92,13 +96,17 @@ protected double calculateCandidateActionScores(Collection<Goal> goalsByPriority
9296
for (Goal goal : goalsByPriority) {
9397
double score = goal.actionAcceptanceScore(action, cluster);
9498
if (score == NOT_ACCEPTABLE) {
95-
LOGGER.debug("action {} is not acceptable for goal {}", action, goal);
99+
if (LOGGER.isDebugEnabled()) {
100+
LOGGER.debug("action {} is not acceptable for goal {}", action, goal);
101+
}
96102
return NOT_ACCEPTABLE;
97103
}
98104
goalScoreMapByGroup.compute(goal.group(), (k, v) -> v == null ? new HashMap<>() : v).put(goal, score);
99105
}
100106

101-
LOGGER.debug("action {} scores on each goal: {}", action, goalScoreMapByGroup);
107+
if (LOGGER.isDebugEnabled()) {
108+
LOGGER.debug("action {} scores on each goal: {}", action, goalScoreMapByGroup);
109+
}
102110
Map<String, Double> groupScoreMap = weightedGoalsScoreByGroup(goalScoreMapByGroup);
103111
for (Map.Entry<String, Double> entry : groupScoreMap.entrySet()) {
104112
String group = entry.getKey();

core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -285,14 +285,17 @@ protected void createAutoBalancerMetricsProducer(Properties producerProps) throw
285285
}, metricsReporterCreateRetries);
286286
}
287287

288+
@SuppressWarnings("NPathComplexity")
288289
@Override
289290
public void run() {
290291
LOGGER.info("Starting auto balancer metrics reporter with reporting interval of {} ms.", reportingIntervalMs);
291292

292293
try {
293294
while (!shutdown) {
294295
long now = System.currentTimeMillis();
295-
LOGGER.debug("Reporting metrics for time {}.", now);
296+
if (LOGGER.isDebugEnabled()) {
297+
LOGGER.debug("Reporting metrics for time {}.", now);
298+
}
296299
try {
297300
if (now > lastReportingTime + reportingIntervalMs) {
298301
numMetricSendFailure = 0;
@@ -343,7 +346,9 @@ public void run() {
343346
public void sendAutoBalancerMetric(AutoBalancerMetrics ccm) {
344347
ProducerRecord<String, AutoBalancerMetrics> producerRecord =
345348
new ProducerRecord<>(Topic.AUTO_BALANCER_METRICS_TOPIC_NAME, null, ccm.time(), ccm.key(), ccm);
346-
LOGGER.debug("Sending auto balancer metric {}.", ccm);
349+
if (LOGGER.isDebugEnabled()) {
350+
LOGGER.debug("Sending auto balancer metric {}.", ccm);
351+
}
347352
producer.send(producerRecord, (recordMetadata, e) -> {
348353
if (e != null) {
349354
numMetricSendFailure++;
@@ -356,7 +361,9 @@ public void sendAutoBalancerMetric(AutoBalancerMetrics ccm) {
356361
}
357362

358363
private void reportMetrics(long now) throws Exception {
359-
LOGGER.debug("Reporting metrics.");
364+
if (LOGGER.isDebugEnabled()) {
365+
LOGGER.debug("Reporting metrics.");
366+
}
360367

361368
YammerMetricProcessor.Context context = new YammerMetricProcessor.Context(now, brokerId, brokerRack, reportingIntervalMs);
362369
processMetrics(context);
@@ -366,7 +373,9 @@ private void reportMetrics(long now) throws Exception {
366373
sendAutoBalancerMetric(entry.getValue());
367374
}
368375

369-
LOGGER.debug("Finished reporting metrics, total metrics size: {}, merged size: {}.", interestedMetrics.size(), context.getMetricMap().size());
376+
if (LOGGER.isDebugEnabled()) {
377+
LOGGER.debug("Finished reporting metrics, total metrics size: {}, merged size: {}.", interestedMetrics.size(), context.getMetricMap().size());
378+
}
370379
}
371380

372381
protected void checkMetricCompleteness(YammerMetricProcessor.Context context) {
@@ -403,7 +412,9 @@ protected void processBrokerMetrics(YammerMetricProcessor.Context context) {
403412

404413
protected void processYammerMetrics(YammerMetricProcessor.Context context) throws Exception {
405414
for (Map.Entry<MetricName, Metric> entry : interestedMetrics.entrySet()) {
406-
LOGGER.trace("Processing yammer metric {}, scope = {}", entry.getKey(), entry.getKey().getScope());
415+
if (LOGGER.isTraceEnabled()) {
416+
LOGGER.trace("Processing yammer metric {}, scope = {}", entry.getKey(), entry.getKey().getScope());
417+
}
407418
entry.getValue().processWith(yammerMetricProcessor, entry.getKey(), context);
408419
}
409420
Iterator<Map.Entry<String, AutoBalancerMetrics>> iterator = context.getMetricMap().entrySet().iterator();
@@ -430,7 +441,9 @@ protected void addMissingMetrics(YammerMetricProcessor.Context context) {
430441

431442
protected void addMetricIfInterested(MetricName name, Metric metric) {
432443
if (isInterestedMetric(name)) {
433-
LOGGER.debug("Added new metric {} to auto balancer metrics reporter.", name);
444+
if (LOGGER.isDebugEnabled()) {
445+
LOGGER.debug("Added new metric {} to auto balancer metrics reporter.", name);
446+
}
434447
interestedMetrics.put(name, metric);
435448
}
436449
}

core/src/main/java/kafka/autobalancer/metricsreporter/metric/YammerMetricProcessor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ public class YammerMetricProcessor implements MetricProcessor<YammerMetricProces
4141
@Override
4242
public void processMeter(MetricName metricName, Metered metered, Context context) {
4343
if (MetricsUtils.isInterested(metricName)) {
44-
LOG.trace("Processing metric {} of type Meter.", metricName);
44+
if (LOG.isTraceEnabled()) {
45+
LOG.trace("Processing metric {} of type Meter.", metricName);
46+
}
4547
double value;
4648
if (context.reportingInterval().toMillis() <= TimeUnit.MINUTES.toMillis(1)) {
4749
value = metered.oneMinuteRate();
@@ -69,7 +71,9 @@ public void processCounter(MetricName metricName, Counter counter, Context conte
6971
@Override
7072
public void processHistogram(MetricName metricName, Histogram histogram, Context context) {
7173
if (MetricsUtils.isInterested(metricName)) {
72-
LOG.trace("Processing metric {} of type Histogram.", metricName);
74+
if (LOG.isTraceEnabled()) {
75+
LOG.trace("Processing metric {} of type Histogram.", metricName);
76+
}
7377
// Get max metric value
7478
AutoBalancerMetrics ccm = MetricsUtils.toAutoBalancerMetric(context.time(),
7579
context.brokerId(),
@@ -112,7 +116,9 @@ public void processHistogram(MetricName metricName, Histogram histogram, Context
112116
@Override
113117
public void processTimer(MetricName metricName, Timer timer, Context context) {
114118
if (MetricsUtils.isInterested(metricName)) {
115-
LOG.trace("Processing metric {} of type Timer.", metricName);
119+
if (LOG.isTraceEnabled()) {
120+
LOG.trace("Processing metric {} of type Timer.", metricName);
121+
}
116122

117123
AutoBalancerMetrics ccm = MetricsUtils.toAutoBalancerMetric(context.time(),
118124
context.brokerId(),
@@ -161,7 +167,9 @@ public void processTimer(MetricName metricName, Timer timer, Context context) {
161167
@Override
162168
public void processGauge(MetricName metricName, Gauge<?> gauge, Context context) {
163169
if (MetricsUtils.isInterested(metricName)) {
164-
LOG.trace("Processing metric {} of type Gauge.", metricName);
170+
if (LOG.isTraceEnabled()) {
171+
LOG.trace("Processing metric {} of type Gauge.", metricName);
172+
}
165173
if (!(gauge.value() instanceof Number)) {
166174
throw new IllegalStateException(String.format("The value of yammer metric %s is %s, which is not a number",
167175
metricName, gauge.value()));

core/src/main/scala/kafka/log/LogManager.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -726,10 +726,14 @@ class LogManager(logDirs: Seq[File],
726726
val logs = logsInDir(localLogsByDir, dir)
727727

728728
// update the last flush point
729-
debug(s"Updating recovery points at $dir")
729+
if (isDebugEnabled) {
730+
debug(s"Updating recovery points at $dir")
731+
}
730732
checkpointRecoveryOffsetsInDir(dir, logs)
731733

732-
debug(s"Updating log start offsets at $dir")
734+
if (isDebugEnabled) {
735+
debug(s"Updating log start offsets at $dir")
736+
}
733737
checkpointLogStartOffsetsInDir(dir, logs)
734738

735739
// mark that the shutdown was clean by creating marker file for log dirs that:
@@ -739,7 +743,9 @@ class LogManager(logDirs: Seq[File],
739743
if (hadCleanShutdownFlags.getOrDefault(logDirAbsolutePath, false) ||
740744
loadLogsCompletedFlags.getOrDefault(logDirAbsolutePath, false)) {
741745
val cleanShutdownFileHandler = new CleanShutdownFileHandler(dir.getPath)
742-
debug(s"Writing clean shutdown marker at $dir with broker epoch=$brokerEpoch")
746+
if (isDebugEnabled) {
747+
debug(s"Writing clean shutdown marker at $dir with broker epoch=$brokerEpoch")
748+
}
743749
CoreUtils.swallow(cleanShutdownFileHandler.write(brokerEpoch), this)
744750
}
745751
}

core/src/main/scala/kafka/log/stream/s3/ControllerKVClient.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ public ControllerKVClient(ControllerRequestSender requestSender) {
5555

5656
@Override
5757
public CompletableFuture<Value> putKVIfAbsent(KeyValue keyValue) {
58-
LOGGER.trace("[ControllerKVClient]: Put KV if absent: {}", keyValue);
58+
if (LOGGER.isTraceEnabled()) {
59+
LOGGER.trace("[ControllerKVClient]: Put KV if absent: {}", keyValue);
60+
}
5961
PutKVRequest request = new PutKVRequest()
6062
.setKey(keyValue.key().get())
6163
.setValue(keyValue.value().get().array());
@@ -83,7 +85,9 @@ public Builder toRequestBuilder() {
8385
Errors code = Errors.forCode(response.errorCode());
8486
switch (code) {
8587
case NONE:
86-
LOGGER.trace("[ControllerKVClient]: Put KV if absent: {}, result: {}", keyValue, response);
88+
if (LOGGER.isTraceEnabled()) {
89+
LOGGER.trace("[ControllerKVClient]: Put KV if absent: {}, result: {}", keyValue, response);
90+
}
8791
return ResponseHandleResult.withSuccess(Value.of(response.value()));
8892
case KEY_EXIST:
8993
LOGGER.warn("[ControllerKVClient]: Failed to Put KV if absent: {}, code: {}, key already exist", keyValue, code);
@@ -99,7 +103,9 @@ public Builder toRequestBuilder() {
99103

100104
@Override
101105
public CompletableFuture<Value> putKV(KeyValue keyValue) {
102-
LOGGER.trace("[ControllerKVClient]: Put KV: {}", keyValue);
106+
if (LOGGER.isTraceEnabled()) {
107+
LOGGER.trace("[ControllerKVClient]: Put KV: {}", keyValue);
108+
}
103109
PutKVRequest request = new PutKVRequest()
104110
.setKey(keyValue.key().get())
105111
.setValue(keyValue.value().get().array())
@@ -128,7 +134,9 @@ public Builder toRequestBuilder() {
128134
Errors code = Errors.forCode(response.errorCode());
129135
switch (code) {
130136
case NONE:
131-
LOGGER.trace("[ControllerKVClient]: Put KV: {}, result: {}", keyValue, response);
137+
if (LOGGER.isTraceEnabled()) {
138+
LOGGER.trace("[ControllerKVClient]: Put KV: {}, result: {}", keyValue, response);
139+
}
132140
return ResponseHandleResult.withSuccess(Value.of(response.value()));
133141
default:
134142
LOGGER.error("[ControllerKVClient]: Failed to Put KV: {}, code: {}, retry later", keyValue, code);
@@ -141,7 +149,9 @@ public Builder toRequestBuilder() {
141149

142150
@Override
143151
public CompletableFuture<Value> getKV(Key key) {
144-
LOGGER.trace("[ControllerKVClient]: Get KV: {}", key);
152+
if (LOGGER.isTraceEnabled()) {
153+
LOGGER.trace("[ControllerKVClient]: Get KV: {}", key);
154+
}
145155
GetKVRequest request = new GetKVRequest()
146156
.setKey(key.get());
147157
WrapRequest req = new BatchRequest() {
@@ -169,7 +179,9 @@ public Builder toRequestBuilder() {
169179
switch (code) {
170180
case NONE:
171181
Value val = Value.of(response.value());
172-
LOGGER.trace("[ControllerKVClient]: Get KV: {}, result: {}", key, response);
182+
if (LOGGER.isTraceEnabled()) {
183+
LOGGER.trace("[ControllerKVClient]: Get KV: {}, result: {}", key, response);
184+
}
173185
return ResponseHandleResult.withSuccess(val);
174186
default:
175187
LOGGER.error("[ControllerKVClient]: Failed to Get KV: {}, code: {}, retry later", key, code);
@@ -182,7 +194,9 @@ public Builder toRequestBuilder() {
182194

183195
@Override
184196
public CompletableFuture<Value> delKV(Key key) {
185-
LOGGER.trace("[ControllerKVClient]: Delete KV: {}", key);
197+
if (LOGGER.isTraceEnabled()) {
198+
LOGGER.trace("[ControllerKVClient]: Delete KV: {}", key);
199+
}
186200
DeleteKVRequest request = new DeleteKVRequest()
187201
.setKey(key.get());
188202
WrapRequest req = new BatchRequest() {
@@ -210,7 +224,9 @@ public Builder toRequestBuilder() {
210224
Errors code = Errors.forCode(response.errorCode());
211225
switch (code) {
212226
case NONE:
213-
LOGGER.trace("[ControllerKVClient]: Delete KV: {}, result: {}", key, response);
227+
if (LOGGER.isTraceEnabled()) {
228+
LOGGER.trace("[ControllerKVClient]: Delete KV: {}, result: {}", key, response);
229+
}
214230
return ResponseHandleResult.withSuccess(Value.of(response.value()));
215231
case KEY_NOT_EXIST:
216232
LOGGER.info("[ControllerKVClient]: Delete KV: {}, result: KEY_NOT_EXIST", key);

core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,9 +298,11 @@ class ElasticLog(val metaStream: MetaStream,
298298
maxOffsetMetadata: LogOffsetMetadata,
299299
includeAbortedTxns: Boolean): CompletableFuture[FetchDataInfo] = {
300300
maybeHandleIOExceptionAsync(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
301-
trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " +
302-
s"total length ${segments.sizeInBytes} bytes")
303-
// get LEO from super class
301+
if (isTraceEnabled) {
302+
trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " +
303+
s"total length ${segments.sizeInBytes} bytes")
304+
}
305+
// get LEO from super class
304306
val endOffsetMetadata = nextOffsetMetadata
305307
val endOffset = endOffsetMetadata.messageOffset
306308
val segmentOpt = segments.lastSegment

core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,9 @@ public void append(
210210

211211
// append the messages
212212
long appendedBytes = log.append(records, largestOffset + 1);
213-
LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log, largestOffset);
213+
if (LOGGER.isTraceEnabled()) {
214+
LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log, largestOffset);
215+
}
214216
// Update the in memory max timestamp and corresponding offset.
215217
if (largestTimestampMs > maxTimestampSoFar()) {
216218
maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, offsetOfMaxTimestamp);

core/src/main/scala/kafka/server/AlterPartitionManager.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,9 @@ class DefaultAlterPartitionManager(
403403
return
404404
}
405405
val request = new ElectLeadersRequest.Builder(ElectionType.PREFERRED, topicPartitions, 1000)
406-
debug(s"sending elect leader to controller $request")
406+
if (isDebugEnabled) {
407+
debug(s"sending elect leader to controller $request")
408+
}
407409
controllerChannelManager.sendRequest(request, new ControllerRequestCompletionHandler {
408410
override def onTimeout(): Unit = {
409411
inflightElectLeadersRequest.set(false)
@@ -413,7 +415,9 @@ class DefaultAlterPartitionManager(
413415
}
414416

415417
override def onComplete(response: ClientResponse): Unit = {
416-
debug(s"Received elect leader response $response")
418+
if (isDebugEnabled) {
419+
debug(s"Received elect leader response $response")
420+
}
417421
// no need retry, controller have backup logic to elect leader when timeout
418422
// In the normal case, check for pending updates to send immediately
419423
inflightElectLeadersRequest.set(false)

core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class KRaftMetadataCache(
119119
streamMetadata.state == StreamState.OPENED &&
120120
streamMetadata.lastRange().nodeId() == tpRegistration.leader
121121

122-
if (!result) {
122+
if (!result && isDebugEnabled) {
123123
debug(s"Failover failed for topicPartition $topicPartition, tpEpoch $tpRegistration, streamMetadata ${streamMetadata}")
124124
}
125125

0 commit comments

Comments
 (0)