Skip to content

Commit 833262b

Browse files
authored
Add custom metrics in AbstractThrottledPunctuator to track total events in eventStore and if the yield condition is hit (#108)
* ENGTAI-65929: Add custom metrics in AbstractThrottledPunctuator to track total events in eventStore and if the yield condition is hit * update metric * update method name * add default constructor * separate yielded metric from tags to gauge * pass punctuator name as constructor argument * upgrade commons-lang version * upgrade grpc-utils and apache avro versions * upgrade dependency check version * add dependency constraint
1 parent f5cdc04 commit 833262b

File tree

5 files changed

+42
-6
lines changed

5 files changed

+42
-6
lines changed

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ plugins {
88
id("org.hypertrace.publish-plugin") version "1.1.1" apply false
99
id("org.hypertrace.jacoco-report-plugin") version "0.3.0" apply false
1010
id("org.hypertrace.code-style-plugin") version "2.1.2" apply false
11-
id("org.owasp.dependencycheck") version "12.1.0"
11+
id("org.owasp.dependencycheck") version "12.1.3"
1212
}
1313

1414
subprojects {

kafka-bom/build.gradle.kts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ dependencies {
2222
api("org.apache.commons:commons-compress:1.26.0") {
2323
because("https://www.tenable.com/cve/CVE-2024-25710")
2424
}
25+
api("org.apache.commons:commons-lang3:3.18.0") {
26+
because("CVE-2025-48924 is fixed in 3.18.0")
27+
}
28+
2529

2630
api("io.confluent:kafka-streams-avro-serde:$confluentVersion")
2731
api("io.confluent:kafka-protobuf-serializer:$confluentVersion")
@@ -30,6 +34,6 @@ dependencies {
3034
api("org.apache.kafka:kafka-clients:$confluentCcsVersion")
3135
api("org.apache.kafka:kafka-streams:$confluentCcsVersion")
3236
api("org.apache.kafka:kafka-streams-test-utils:$confluentCcsVersion")
33-
api("org.apache.avro:avro:1.11.4")
37+
api("org.apache.avro:avro:1.12.0")
3438
}
3539
}

kafka-streams-framework/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ dependencies {
1818
api(platform(project(":kafka-bom")))
1919
api("org.apache.kafka:kafka-streams")
2020
api("io.confluent:kafka-streams-avro-serde")
21-
api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.14")
21+
api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.16")
2222

2323
implementation("org.apache.avro:avro")
2424
implementation("org.apache.kafka:kafka-clients")
2525
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.89")
2626
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.89")
27-
implementation("org.apache.commons:commons-lang3:3.12.0")
27+
implementation("org.apache.commons:commons-lang3:3.18.0")
2828

2929
testCompileOnly("org.projectlombok:lombok:1.18.26")
3030
testAnnotationProcessor("org.projectlombok:lombok:1.18.26")

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.hypertrace.core.kafkastreams.framework.punctuators;
22

3+
import io.micrometer.core.instrument.MeterRegistry;
4+
import io.micrometer.core.instrument.Tags;
35
import java.time.Clock;
46
import java.util.ArrayList;
57
import java.util.Collections;
@@ -20,12 +22,29 @@ public abstract class AbstractThrottledPunctuator<T> implements Punctuator {
2022
private final Clock clock;
2123
private final KeyValueStore<Long, List<T>> eventStore;
2224
private final ThrottledPunctuatorConfig config;
25+
private final MeterRegistry meterRegistry;
26+
private final String punctuatorName;
2327

2428
public AbstractThrottledPunctuator(
25-
Clock clock, ThrottledPunctuatorConfig config, KeyValueStore<Long, List<T>> eventStore) {
29+
Clock clock,
30+
ThrottledPunctuatorConfig config,
31+
KeyValueStore<Long, List<T>> eventStore,
32+
MeterRegistry meterRegistry,
33+
String punctuatorName) {
2634
this.clock = clock;
2735
this.config = config;
2836
this.eventStore = eventStore;
37+
this.meterRegistry = meterRegistry;
38+
this.punctuatorName = resolvePunctuatorName(punctuatorName);
39+
}
40+
41+
public AbstractThrottledPunctuator(
42+
Clock clock, ThrottledPunctuatorConfig config, KeyValueStore<Long, List<T>> eventStore) {
43+
this(clock, config, eventStore, null, null);
44+
}
45+
46+
private String resolvePunctuatorName(String name) {
47+
return (name != null && !name.isBlank()) ? name : this.getClass().getSimpleName();
2948
}
3049

3150
public void scheduleTask(long scheduleMs, T event) {
@@ -124,6 +143,8 @@ public final void punctuate(long timestamp) {
124143
}
125144
}
126145
}
146+
boolean yielded = shouldYieldNow(startTime);
147+
publishMetrics(totalProcessedTasks, yielded);
127148
log.debug(
128149
"processed windows: {}, processed tasks: {}, time taken: {}",
129150
totalProcessedWindows,
@@ -148,4 +169,15 @@ private boolean shouldYieldNow(long startTimestamp) {
148169
private long normalize(long timestamp) {
149170
return timestamp - (timestamp % config.getWindowMs());
150171
}
172+
173+
private void publishMetrics(int totalProcessedTasks, boolean yielded) {
174+
if (meterRegistry != null) {
175+
meterRegistry
176+
.counter(
177+
"throttled.punctuator.processed.task.count", Tags.of("punctuator", punctuatorName))
178+
.increment(totalProcessedTasks);
179+
meterRegistry.gauge(
180+
"throttled.punctuator.yielded", Tags.of("punctuator", punctuatorName), yielded ? 1 : 0);
181+
}
182+
}
151183
}

kafka-streams-partitioners/weighted-group-partitioner/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ dependencies {
1515

1616
api(platform(project(":kafka-bom")))
1717
api("org.apache.kafka:kafka-streams")
18-
api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.14")
18+
api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.16")
1919
api("com.typesafe:config:1.4.2")
2020
implementation("com.google.guava:guava:32.0.1-jre")
2121
implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.13.14")

0 commit comments

Comments
 (0)