Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
2ba2522
Fix merge conflicts from breaking into smaller PR
bbejeck Aug 27, 2024
2701efe
Resolving merge conflics
bbejeck Aug 27, 2024
56b912d
Fixing rebase
bbejeck Aug 27, 2024
3eca4eb
Fix producer javadoc comment
bbejeck Aug 27, 2024
d1ed133
Fix spotbugs and checkstyle errors
bbejeck Aug 28, 2024
17f7808
Simplify filtering for adding metrics
bbejeck Aug 28, 2024
852fbfa
Clean up the creation of the reporter such that it relys on the metri…
bbejeck Aug 29, 2024
84b201e
Resolving merge conflict from this previous commit Added parameterize…
bbejeck Aug 30, 2024
2c9d548
Added test for passing streams metrics
bbejeck Sep 2, 2024
ef5286b
Added integration tests, set logging level in reporter to debug
bbejeck Sep 3, 2024
87f58f1
Added test to confirm stream metrics don't bleed into consumer metrics
bbejeck Sep 3, 2024
abcc6a7
Rebasing work
bbejeck Sep 4, 2024
6057978
Add some description to large test method
bbejeck Sep 4, 2024
bfcb3dd
checkstyle fixes
bbejeck Sep 4, 2024
eed0827
Remove unintentional formatting
bbejeck Sep 4, 2024
5a380a4
Changes per comments, resolving merge conflicts from PR breakdown
bbejeck Sep 5, 2024
71f4ed5
Close telemetry reporter on admin client close.
bbejeck Sep 5, 2024
ef83d32
Cleanup, revert closing telemetry reporter pending investigation, rem…
bbejeck Sep 6, 2024
57961ca
Checkstyle errors
bbejeck Sep 6, 2024
c1c7bd3
Address review comments: More descriptive javadoc
bbejeck Sep 9, 2024
67bd91b
Address review comments: indentation, consumer generic types
bbejeck Sep 9, 2024
1a772de
Address review comments: update test to account for correct state met…
bbejeck Sep 9, 2024
caaa53c
Merge Admin delegate for testing sending metrics via admin client
bbejeck Sep 9, 2024
98b28e9
Rebasing work
bbejeck Sep 10, 2024
732a203
Refactor and add telemetry support in integration test
bbejeck Sep 11, 2024
39145e5
Updates per review comments
bbejeck Sep 13, 2024
5f92c6d
Add the telemetry reporter to the reporters list after creation
bbejeck Sep 25, 2024
8a65d28
Fix merge conflic from Fix broken tests in KafkaAdminClientTest
bbejeck Sep 26, 2024
87e4c67
Minor cleanup
bbejeck Sep 26, 2024
4201cb5
Fix merge conflict from Disable metrics push in AdminClient by defaul…
bbejeck Sep 27, 2024
631b720
Fix merge conflict from fix tests, start of end-to-end metrics test
bbejeck Sep 28, 2024
637ad35
Call initiate close on ClientTelemetryReporter during admin close
bbejeck Sep 30, 2024
5400f62
Updates per comments
bbejeck Oct 1, 2024
3091485
Fix missed items during rebase
bbejeck Oct 5, 2024
69f9cc9
Fix constructor that got mangled during rebase
bbejeck Oct 7, 2024
a88edc2
Fixes for resolve conflict merge mistakes during rebase
bbejeck Oct 16, 2024
9b3e223
Address review comments
bbejeck Oct 16, 2024
8389576
Checkstyle fixes
bbejeck Oct 17, 2024
6d64484
Address Comments
bbejeck Oct 17, 2024
4eec2b1
Completed integration test
bbejeck Oct 17, 2024
1c5884c
Get initial end-to-end integration test running
bbejeck Oct 18, 2024
427c65e
Respond to comments
bbejeck Oct 18, 2024
78cef95
More descriptive name
bbejeck Oct 18, 2024
f61fb20
Reslove conflict from this commit Add open telemetry protobuf as test…
bbejeck Oct 21, 2024
d5c5a7b
Address comments, fix a potential bug and side cleanup
bbejeck Oct 23, 2024
3f64e7d
Store clientInstanceId before returning
bbejeck Oct 23, 2024
6704891
Address review comments
bbejeck Oct 24, 2024
ec48119
Update end-to-end test
bbejeck Oct 28, 2024
52d389f
Update end-to-end test to validate metrics for INFO, DEBUG and TRACE
bbejeck Oct 29, 2024
03de445
commit f30f2d03e9064cb9a167fe35b0c5d63470405ce6 puts
bbejeck Oct 30, 2024
6a40bba
Standardize on lower-case if names ever change
bbejeck Oct 30, 2024
26ae0b5
Standardize on lower-case if names ever change
bbejeck Oct 30, 2024
21667fe
Rebased trunk and updated test
bbejeck Nov 2, 2024
8b5682d
switch back to combined mode - getting not enough resources error
bbejeck Nov 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2639,7 +2639,7 @@ project(':streams') {
}

dependencies {
api project(':clients')
api project(path: ':clients', configuration: 'shadow')
// `org.rocksdb.Options` is part of Kafka Streams public api via `RocksDBConfigSetter`
api libs.rocksDBJni

Expand All @@ -2659,6 +2659,7 @@ project(':streams') {
testImplementation libs.mockitoCore
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
testImplementation libs.junitPlatformSuiteEngine // supports suite test
testImplementation project(':group-coordinator')

testRuntimeOnly project(':streams:test-utils')
testRuntimeOnly runtimeTestLibs
Expand Down
5 changes: 5 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.tools" />
<allow pkg="org.apache.kafka.server.config" />
<allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
<allow class="org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.MetricsData"/>
<allow class="org.apache.kafka.server.telemetry.ClientTelemetry" />
<allow class="org.apache.kafka.server.telemetry.ClientTelemetryPayload" />
<allow class="org.apache.kafka.server.telemetry.ClientTelemetryReceiver" />
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
<allow class="org.apache.kafka.coordinator.transaction.TransactionLogConfig" />
<allow pkg="org.apache.kafka.coordinator.group" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.ListClientMetricsResourcesRequestData;
import org.apache.kafka.common.message.ListGroupsRequestData;
Expand Down Expand Up @@ -228,8 +227,6 @@
import org.apache.kafka.common.requests.ElectLeadersResponse;
import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
Expand All @@ -254,6 +251,8 @@
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
import org.apache.kafka.common.security.token.delegation.TokenInformation;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.KafkaThread;
Expand Down Expand Up @@ -409,6 +408,7 @@ public class KafkaAdminClient extends AdminClient {
private final boolean clientTelemetryEnabled;
private final MetadataRecoveryStrategy metadataRecoveryStrategy;
private final AdminFetchMetricsManager adminFetchMetricsManager;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;

/**
* The telemetry requests client instance id.
Expand Down Expand Up @@ -529,6 +529,7 @@ static KafkaAdminClient createInternal(
String clientId = generateClientId(config);
ApiVersions apiVersions = new ApiVersions();
LogContext logContext = createLogContext(clientId);
Optional<ClientTelemetryReporter> clientTelemetryReporter;

try {
// Since we only request node information, it's safe to pass true for allowAutoTopicCreation (and it
Expand All @@ -540,6 +541,8 @@ static KafkaAdminClient createInternal(
adminAddresses.usingBootstrapControllers());
metadataManager.update(Cluster.bootstrap(adminAddresses.addresses()), time.milliseconds());
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
clientTelemetryReporter.ifPresent(reporters::add);
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
Expand All @@ -557,10 +560,13 @@ static KafkaAdminClient createInternal(
time,
1,
(int) TimeUnit.HOURS.toMillis(1),
null,
metadataManager.updater(),
(hostResolver == null) ? new DefaultHostResolver() : hostResolver);
(hostResolver == null) ? new DefaultHostResolver() : hostResolver,
null,
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient,
timeoutProcessorFactory, logContext);
timeoutProcessorFactory, logContext, clientTelemetryReporter);
} catch (Throwable exc) {
closeQuietly(metrics, "Metrics");
closeQuietly(networkClient, "NetworkClient");
Expand All @@ -575,12 +581,13 @@ static KafkaAdminClient createInternal(AdminClientConfig config,
Time time) {
Metrics metrics = null;
String clientId = generateClientId(config);

Optional<ClientTelemetryReporter> clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);

try {
metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time);
LogContext logContext = createLogContext(clientId);
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics,
client, null, logContext);
client, null, logContext, clientTelemetryReporter);
} catch (Throwable exc) {
closeQuietly(metrics, "Metrics");
throw new KafkaException("Failed to create new KafkaAdminClient", exc);
Expand All @@ -598,7 +605,8 @@ private KafkaAdminClient(AdminClientConfig config,
Metrics metrics,
KafkaClient client,
TimeoutProcessorFactory timeoutProcessorFactory,
LogContext logContext) {
LogContext logContext,
Optional<ClientTelemetryReporter> clientTelemetryReporter) {
this.clientId = clientId;
this.log = logContext.logger(KafkaAdminClient.class);
this.logContext = logContext;
Expand All @@ -622,6 +630,9 @@ private KafkaAdminClient(AdminClientConfig config,
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
this.clientTelemetryEnabled = config.getBoolean(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this.clientId, config);
this.clientTelemetryReporter = clientTelemetryReporter;
this.clientTelemetryReporter.ifPresent(reporters::add);
this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);
config.logUnused();
Expand Down Expand Up @@ -664,6 +675,8 @@ public void close(Duration timeout) {
long now = time.milliseconds();
long newHardShutdownTimeMs = now + waitTimeMs;
long prev = INVALID_SHUTDOWN_TIME;
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
metrics.close();
while (true) {
if (hardShutdownTimeMs.compareAndSet(prev, newHardShutdownTimeMs)) {
if (prev == INVALID_SHUTDOWN_TIME) {
Expand Down Expand Up @@ -4272,12 +4285,18 @@ private KafkaFutureImpl<List<MemberIdentity>> getMembersFromGroup(String groupId

@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricChange(metric);
}
}

@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricRemoval(metric);
}
}

@Override
Expand Down Expand Up @@ -5050,47 +5069,16 @@ public Uuid clientInstanceId(Duration timeout) {
throw new IllegalArgumentException("The timeout cannot be negative.");
}

if (!clientTelemetryEnabled) {
if (clientTelemetryReporter.isEmpty()) {
throw new IllegalStateException("Telemetry is not enabled. Set config `" + AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG + "` to `true`.");

}

if (clientInstanceId != null) {
return clientInstanceId;
}

final long now = time.milliseconds();
final KafkaFutureImpl<Uuid> future = new KafkaFutureImpl<>();
runnable.call(new Call("getTelemetrySubscriptions", calcDeadlineMs(now, (int) timeout.toMillis()),
new LeastLoadedNodeProvider()) {

@Override
GetTelemetrySubscriptionsRequest.Builder createRequest(int timeoutMs) {
return new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData(), true);
}

@Override
void handleResponse(AbstractResponse abstractResponse) {
GetTelemetrySubscriptionsResponse response = (GetTelemetrySubscriptionsResponse) abstractResponse;
if (response.error() != Errors.NONE) {
future.completeExceptionally(response.error().exception());
} else {
future.complete(response.data().clientInstanceId());
}
}

@Override
void handleFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
}, now);

try {
clientInstanceId = future.get();
} catch (Exception e) {
log.error("Error occurred while fetching client instance id", e);
throw new KafkaException("Error occurred while fetching client instance id", e);
}

clientInstanceId = ClientTelemetryUtils.fetchClientInstanceId(clientTelemetryReporter.get(), timeout);
return clientInstanceId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1427,6 +1427,46 @@ public void resume(Collection<TopicPartition> partitions) {
delegate.resume(partitions);
}

/**
* Add the provided application metric for subscription.
* This metric will be added to this client's metrics
* that are available for subscription and sent as
* telemetry data to the broker.
* The provided metric must map to an OTLP metric data point
* type in the OpenTelemetry v1 metrics protobuf message types.
* Specifically, the metric should be one of the following:
* <ul>
* <li>
* `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent.
* </li>
* <li>
* `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count.
* </li>
* </ul>
* Metrics not matching these types are silently ignored.
* Executing this method for a previously registered metric is a benign operation and results in updating that metrics entry.
*
* @param metric The application metric to register
*/
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
delegate.registerMetricForSubscription(metric);
}

/**
* Remove the provided application metric for subscription.
* This metric is removed from this client's metrics
* and will not be available for subscription any longer.
* Executing this method with a metric that has not been registered is a
* benign operation and does not result in any action taken (no-op).
*
* @param metric The application metric to remove
*/
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
delegate.unregisterMetricFromSubscription(metric);
}

/**
* Get the set of partitions that were previously paused by a call to {@link #pause(Collection)}.
*
Expand Down Expand Up @@ -1706,14 +1746,4 @@ KafkaConsumerMetrics kafkaConsumerMetrics() {
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
return delegate.updateAssignmentMetadataIfNeeded(timer);
}

@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -647,12 +647,18 @@ private void updateGroupMetadata(final Optional<Integer> memberEpoch, final Stri

@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricChange(metric);
}
}

@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricRemoval(metric);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,12 +430,18 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste

@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricChange(metric);
}
}

@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricRemoval(metric);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -1300,6 +1301,53 @@ public List<PartitionInfo> partitionsFor(String topic) {
return Collections.unmodifiableMap(this.metrics.metrics());
}


/**
* Add the provided application metric for subscription.
* This metric will be added to this client's metrics
* that are available for subscription and sent as
* telemetry data to the broker.
* The provided metric must map to an OTLP metric data point
* type in the OpenTelemetry v1 metrics protobuf message types.
* Specifically, the metric should be one of the following:
* <ul>
* <li>
* `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent.
* </li>
* <li>
* `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count.
* </li>
* </ul>
* Metrics not matching these types are silently ignored.
* Executing this method for a previously registered metric is a benign operation and results in updating that metrics entry.
*
* @param metric The application metric to register
*/
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricChange(metric);
}
}

/**
* Remove the provided application metric for subscription.
* This metric is removed from this client's metrics
* and will not be available for subscription any longer.
* Executing this method with a metric that has not been registered is a
* benign operation and does not result in any action taken (no-op).
*
* @param metric The application metric to remove
*/
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricRemoval(metric);
}
}

/**
* Determines the client's unique client instance ID used for telemetry. This ID is unique to
* this specific client instance and will not change after it is initially generated.
Expand Down
Loading