Skip to content

Commit

Permalink
Fix spotbugs and checkstyle errors
Browse files Browse the repository at this point in the history
  • Loading branch information
bbejeck committed Aug 28, 2024
1 parent 74c2964 commit 4e8ffa7
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ static KafkaAdminClient createInternal(AdminClientConfig config,
metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time);
LogContext logContext = createLogContext(clientId);
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics,
client, null, logContext, null);
client, null, logContext, Optional.empty());
} catch (Throwable exc) {
closeQuietly(metrics, "Metrics");
throw new KafkaException("Failed to create new KafkaAdminClient", exc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1547,7 +1547,7 @@ public void registerMetric(KafkaMetric metric) {
*/
@Override
public void unregisterMetric(KafkaMetric metric) {
delegate.unregisterMetric(metric);
delegate.unregisterMetric(metric);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,10 +1326,10 @@ public void registerMetricForSubscription(KafkaMetric metric) {
*/
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricRemoval(metric);
}
if (clientTelemetryReporter.isPresent()) {
ClientTelemetryReporter reporter = clientTelemetryReporter.get();
reporter.metricRemoval(metric);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ public class MockProducer<K, V> implements Producer<K, V> {
/**
* Create a mock producer
*
* @param cluster The cluster holding metadata for this producer
* @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise
* the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after
* {@link #send(ProducerRecord) send()} to complete the call and unblock the {@link
* java.util.concurrent.Future Future&lt;RecordMetadata&gt;} that is returned.
* @param partitioner The partition strategy
* @param keySerializer The serializer for key that implements {@link Serializer}.
* @param cluster The cluster holding metadata for this producer
* @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise
* the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after
* {@link #send(ProducerRecord) send()} to complete the call and unblock the {@link
* java.util.concurrent.Future Future&lt;RecordMetadata&gt;} that is returned.
* @param partitioner The partition strategy
* @param keySerializer The serializer for key that implements {@link Serializer}.
* @param valueSerializer The serializer for value that implements {@link Serializer}.
*/
public MockProducer(final Cluster cluster,
Expand All @@ -120,7 +120,7 @@ public MockProducer(final Cluster cluster,

/**
* Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers.
*
* <p>
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer) new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
*/
@SuppressWarnings("deprecation")
Expand All @@ -132,7 +132,7 @@ public MockProducer(final boolean autoComplete,

/**
* Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers.
*
* <p>
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer) new MockProducer(cluster, autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
*/
@SuppressWarnings("deprecation")
Expand All @@ -145,7 +145,7 @@ public MockProducer(final Cluster cluster,

/**
* Create a new mock producer with invented metadata the given autoComplete setting, partitioner and key\value serializers.
*
* <p>
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer) new MockProducer(Cluster.empty(), autoComplete, partitioner, keySerializer, valueSerializer)}
*/
public MockProducer(final boolean autoComplete,
Expand All @@ -157,7 +157,7 @@ public MockProducer(final boolean autoComplete,

/**
* Create a new mock producer with invented metadata.
*
* <p>
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer) new MockProducer(Cluster.empty(), false, null, null, null)}
*/
public MockProducer() {
Expand Down Expand Up @@ -226,7 +226,7 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
return;
}
Map<TopicPartition, OffsetAndMetadata> uncommittedOffsets =
this.uncommittedConsumerGroupOffsets.computeIfAbsent(groupMetadata.groupId(), k -> new HashMap<>());
this.uncommittedConsumerGroupOffsets.computeIfAbsent(groupMetadata.groupId(), k -> new HashMap<>());
uncommittedOffsets.putAll(offsets);
this.sentOffsets = true;
}
Expand Down Expand Up @@ -336,7 +336,7 @@ public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record, Cal
keySerializer.serialize(record.topic(), record.key());
valueSerializer.serialize(record.topic(), record.value());
}

TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
ProduceRequestResult result = new ProduceRequestResult(topicPartition);
FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP,
Expand Down Expand Up @@ -504,7 +504,6 @@ public synchronized List<ProducerRecord<K, V>> uncommittedRecords() {
}

/**
*
* Get the list of committed consumer group offsets since the last call to {@link #clear()}
*/
public synchronized List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> consumerGroupOffsetsHistory() {
Expand Down Expand Up @@ -611,11 +610,11 @@ public void complete(RuntimeException e) {

@Override
public void registerMetricForSubscription(KafkaMetric metric) {
addedMetrics.add(metric);
addedMetrics.add(metric);
}

@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
addedMetrics.remove(metric);
addedMetrics.remove(metric);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
Expand Down Expand Up @@ -66,14 +65,11 @@
import org.slf4j.Logger;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
Expand Down

0 comments on commit 4e8ffa7

Please sign in to comment.