Skip to content

Commit

Permalink
Updates per review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bbejeck committed Sep 13, 2024
1 parent 8da50ae commit 965fd23
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 19 deletions.
16 changes: 11 additions & 5 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1826,15 +1826,21 @@ default ListShareGroupsResult listShareGroups() {


/**
* An application metric provided for subscription.
* Add the provided application metric for subscription.
* This metric will be added to this client's metrics
* that are available for subscription and sent as
* telemetry data to the broker.
* The provided metric must map to an OTLP metric data point
* type in the OpenTelemetry v1 metrics protobuf message types.
* Specifically, the metric should be one of the following:
* - `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent.
* - `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count.
* <ul>
* <li>
* `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent.
* </li>
* <li>
* `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count.
* </li>
* </ul>
* Metrics not matching these types are silently ignored.
* Executing this method for a previously registered metric is a benign operation and results in updating that metrics entry.
*
Expand All @@ -1843,11 +1849,11 @@ default ListShareGroupsResult listShareGroups() {
void registerMetricForSubscription(KafkaMetric metric);

/**
* An application to be removed from subscription.
* Remove the provided application metric for subscription.
* This metric is removed from this client's metrics
* and will not be available for subscription any longer.
* Executing this method with a metric that has not been registered is a
* benign operation and does not result in any action taken (no-op)
* benign operation and does not result in any action taken (no-op).
*
* @param metric The application metric to remove
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,10 +562,10 @@ static KafkaAdminClient createInternal(
time,
1,
(int) TimeUnit.HOURS.toMillis(1),
null,
null,
metadataManager.updater(),
(hostResolver == null) ? new DefaultHostResolver() : hostResolver,
null,
null,
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics, networkClient,
timeoutProcessorFactory, logContext, clientTelemetryReporter);
Expand Down Expand Up @@ -704,6 +704,7 @@ public void close(Duration timeout) {
// Wait for the thread to be joined.
thread.join(waitTimeMs);
}
metrics.close();
log.debug("Kafka admin client closed.");
} catch (InterruptedException e) {
log.debug("Interrupted while joining I/O thread", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1526,15 +1526,21 @@ public void resume(Collection<TopicPartition> partitions) {
}

/**
* An application metric provided for subscription.
* Add the provided application metric for subscription.
* This metric will be added to this client's metrics
* that are available for subscription and sent as
* telemetry data to the broker.
* The provided metric must map to an OTLP metric data point
* type in the OpenTelemetry v1 metrics protobuf message types.
* Specifically, the metric should be one of the following:
* - `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent.
* - `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count.
* <ul>
* <li>
* `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent.
* </li>
* <li>
* `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count.
* </li>
* </ul>
* Metrics not matching these types are silently ignored.
* Executing this method for a previously registered metric is a benign operation and results in updating that metrics entry.
*
Expand All @@ -1546,11 +1552,11 @@ public void registerMetricForSubscription(KafkaMetric metric) {
}

/**
* An application to be removed from subscription.
* Remove the provided application metric for subscription.
* This metric is removed from this client's metrics
* and will not be available for subscription any longer.
* Executing this method with a metric that has not been registered is a
* benign operation and does not result in any action taken (no-op)
* benign operation and does not result in any action taken (no-op).
*
* @param metric The application metric to remove
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1303,15 +1303,21 @@ public List<PartitionInfo> partitionsFor(String topic) {


/**
* An application metric provided for subscription.
* Add the provided application metric for subscription.
* This metric will be added to this client's metrics
* that are available for subscription and sent as
* telemetry data to the broker.
* The provided metric must map to an OTLP metric data point
* type in the OpenTelemetry v1 metrics protobuf message types.
* Specifically, the metric should be one of the following:
* - `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent.
* - `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count.
* <ul>
* <li>
* `Sum`: Monotonic total count meter (Counter). Suitable for metrics like total number of X, e.g., total bytes sent.
* </li>
* <li>
* `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable for metrics like current value of Y, e.g., current queue count.
* </li>
* </ul>
* Metrics not matching these types are silently ignored.
* Executing this method for a previously registered metric is a benign operation and results in updating that metrics entry.
*
Expand All @@ -1326,11 +1332,11 @@ public void registerMetricForSubscription(KafkaMetric metric) {
}

/**
* An application to be removed from subscription.
* Remove the provided application metric for subscription.
* This metric is removed from this client's metrics
* and will not be available for subscription any longer.
* Executing this method with a metric that has not been registered is a
* benign operation and does not result in any action taken (no-op)
* benign operation and does not result in any action taken (no-op).
*
* @param metric The application metric to remove
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void metricChange(final KafkaMetric metric) {
}
}

boolean isStreamsClientMetric(final KafkaMetric metric) {
private boolean isStreamsClientMetric(final KafkaMetric metric) {
final boolean shouldInclude = metric.metricName().group().equals("stream-metrics");
if (!shouldInclude) {
log.debug("Rejecting thread metric {}", metric.metricName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void metricChange(final KafkaMetric metric) {
}
}

boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric metric) {
private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric metric) {
final Map<String, String> tags = metric.metricName().tags();
final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && (tags.get(THREAD_ID_TAG).equals(threadId) || tags.get(THREAD_ID_TAG).equals(stateUpdaterThreadId));
if (!shouldInclude) {
Expand Down

0 comments on commit 965fd23

Please sign in to comment.