diff --git a/build.gradle b/build.gradle index 55d670209e..0eec952889 100644 --- a/build.gradle +++ b/build.gradle @@ -48,7 +48,7 @@ allprojects { targetCompatibility = JavaVersion.VERSION_17 project.ext.versions = [ - kafka : '2.8.2', + kafka : '3.6.2', guava : '23.0', jackson : '2.15.2', jersey : '3.1.2', diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FailFastLocalKafkaProducerProperties.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FailFastLocalKafkaProducerProperties.java index d54456e0d7..7d5b24ed02 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FailFastLocalKafkaProducerProperties.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FailFastLocalKafkaProducerProperties.java @@ -33,6 +33,8 @@ public class FailFastLocalKafkaProducerProperties implements KafkaProducerParame private boolean reportNodeMetricsEnabled = false; + private boolean idempotenceEnabled = false; + @Override public Duration getMaxBlock() { return maxBlock; @@ -158,4 +160,12 @@ public Duration getDeliveryTimeout() { public void setDeliveryTimeout(Duration deliveryTimeout) { this.deliveryTimeout = deliveryTimeout; } + + public boolean isIdempotenceEnabled() { + return idempotenceEnabled; + } + + public void setIdempotenceEnabled(boolean idempotenceEnabled) { + this.idempotenceEnabled = idempotenceEnabled; + } } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FailFastRemoteKafkaProducerProperties.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FailFastRemoteKafkaProducerProperties.java index a35722f5d4..1265f343bb 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FailFastRemoteKafkaProducerProperties.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/FailFastRemoteKafkaProducerProperties.java @@ -33,6 +33,8 @@ public class FailFastRemoteKafkaProducerProperties implements KafkaProducerParam private boolean reportNodeMetricsEnabled = false; + private boolean idempotenceEnabled = false; + @Override public Duration getMaxBlock() { return maxBlock; @@ -158,4 +160,13 @@ public Duration getDeliveryTimeout() { public void setDeliveryTimeout(Duration deliveryTimeout) { this.deliveryTimeout = deliveryTimeout; } + + @Override + public boolean isIdempotenceEnabled() { + return idempotenceEnabled; + } + + public void setIdempotenceEnabled(boolean idempotenceEnabled) { + this.idempotenceEnabled = idempotenceEnabled; + } } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/KafkaProducerProperties.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/KafkaProducerProperties.java index 8bdb084e47..34e97143e3 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/KafkaProducerProperties.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/config/KafkaProducerProperties.java @@ -36,6 +36,8 @@ public class KafkaProducerProperties implements KafkaProducerParameters { private boolean reportNodeMetricsEnabled = false; + private boolean idempotenceEnabled = false; + @Override public Duration getMaxBlock() { return maxBlock; @@ -161,4 +163,12 @@ public Duration getDeliveryTimeout() { public void setDeliveryTimeout(Duration deliveryTimeout) { this.deliveryTimeout = deliveryTimeout; } + + public boolean isIdempotenceEnabled() { + return idempotenceEnabled; + } + + public void setIdempotenceEnabled(boolean idempotenceEnabled) { + this.idempotenceEnabled = idempotenceEnabled; + } } diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java index 934b7ef66d..b9c53dc048 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSender.java @@ -142,8 +142,12 @@ private double findProducerMetric(Producer producer, Predicate> predicate) { Optional> first = producer.metrics().entrySet().stream().filter(predicate).findFirst(); - double value = first.map(metricNameEntry -> metricNameEntry.getValue().value()).orElse(0.0); - return value < 0 ? 0.0 : value; + Object value = first.map(metricNameEntry -> metricNameEntry.getValue().metricValue()).orElse(0.0d); + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } else { + return 0.0; + } } private ToDoubleFunction> producerMetric(MetricName producerMetricName) { diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSendersFactory.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSendersFactory.java index 7636d4ecb5..b03b8bc387 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSendersFactory.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaMessageSendersFactory.java @@ -18,6 +18,7 @@ import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG; @@ -114,6 +115,7 @@ private KafkaMessageSender sender(KafkaParameters kafkaParameter props.put(LINGER_MS_CONFIG, (int) kafkaProducerParameters.getLinger().toMillis()); props.put(METRICS_SAMPLE_WINDOW_MS_CONFIG, (int) kafkaProducerParameters.getMetricsSampleWindow().toMillis()); props.put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, kafkaProducerParameters.getMaxInflightRequestsPerConnection()); + props.put(ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerParameters.isIdempotenceEnabled()); props.put(ACKS_CONFIG, acks); if (kafkaParameters.isAuthenticationEnabled()) { diff --git a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaProducerParameters.java b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaProducerParameters.java index 79454b522f..c0caa3913c 100644 --- a/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaProducerParameters.java +++ b/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/KafkaProducerParameters.java @@ -31,4 +31,6 @@ public interface KafkaProducerParameters { int getMaxInflightRequestsPerConnection(); boolean isReportNodeMetricsEnabled(); + + boolean isIdempotenceEnabled(); } diff --git a/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/LocalDatacenterMessageProducerIntegrationTest.groovy b/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/LocalDatacenterMessageProducerIntegrationTest.groovy index f7b9d91c1b..edbab24478 100644 --- a/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/LocalDatacenterMessageProducerIntegrationTest.groovy +++ b/hermes-frontend/src/test/groovy/pl/allegro/tech/hermes/frontend/producer/kafka/LocalDatacenterMessageProducerIntegrationTest.groovy @@ -13,11 +13,7 @@ import org.apache.kafka.common.TopicPartition import org.testcontainers.containers.KafkaContainer import org.testcontainers.containers.wait.strategy.Wait import org.testcontainers.spock.Testcontainers -import pl.allegro.tech.hermes.api.ContentType -import pl.allegro.tech.hermes.api.DeliveryType -import pl.allegro.tech.hermes.api.Subscription -import pl.allegro.tech.hermes.api.SubscriptionMode -import pl.allegro.tech.hermes.api.Topic +import pl.allegro.tech.hermes.api.* import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId import pl.allegro.tech.hermes.common.kafka.JsonToAvroMigrationKafkaNamesMapper import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper @@ -33,6 +29,7 @@ import pl.allegro.tech.hermes.frontend.server.CachedTopicsTestHelper import pl.allegro.tech.hermes.metrics.PathsCompiler import pl.allegro.tech.hermes.test.helper.avro.AvroUser import pl.allegro.tech.hermes.test.helper.builder.TopicBuilder +import pl.allegro.tech.hermes.test.helper.containers.ImageTags import spock.lang.Shared import spock.lang.Specification @@ -42,10 +39,7 @@ import java.util.stream.Collectors import static java.util.Collections.emptyList import static java.util.Collections.emptyMap import static java.util.concurrent.TimeUnit.MILLISECONDS -import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG -import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG +import static org.apache.kafka.clients.consumer.ConsumerConfig.* import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG @@ -58,7 +52,7 @@ class LocalDatacenterMessageProducerIntegrationTest extends Specification { BrokerLatencyReporter brokerLatencyReporter = new BrokerLatencyReporter(false, null, null, null) @Shared - KafkaContainer kafkaContainer = new KafkaContainer() + KafkaContainer kafkaContainer = new KafkaContainer(ImageTags.confluentImagesTag()) @Shared KafkaProducer leaderConfirms @@ -161,34 +155,6 @@ class LocalDatacenterMessageProducerIntegrationTest extends Specification { partitionsWithMessagesData.get(0).offset() == 10 } - def "should publish messages with random distribiution when pratition-key is not present"() { - Topic topic = createAvroTopic("pl.allegro.test.randomFoo") - Subscription subscription = createTestSubscription(topic, "test-subscription") - String kafkaTopicName = topic.getName().toString() - ConsumerGroupId consumerGroupId = kafkaNamesMapper.toConsumerGroupId(subscription.qualifiedName) - createTopicInKafka(kafkaTopicName, NUMBER_OF_PARTITION) - CachedTopic cachedTopic = CachedTopicsTestHelper.cachedTopic(topic) - KafkaConsumer consumer = createConsumer(consumerGroupId, kafkaTopicName) - - when: - 1.upto(10) { - brokerMessageProducer.send(generateAvroMessage(null), cachedTopic, null) - waitForRecordPublishing(consumer) - } - - then: - consumer.close() - - List partitionsWithMessagesData = adminClient - .listConsumerGroupOffsets(consumerGroupId.asString()) - .partitionsToOffsetAndMetadata() - .get().values().stream() - .filter { metadata -> metadata.offset() != 0 } - .collect(Collectors.toList()) - - partitionsWithMessagesData.size() == NUMBER_OF_PARTITION - } - private static AvroMessage generateAvroMessage(String partitionKey) { def avroUser = new AvroUser() return new AvroMessage(UUID.randomUUID().toString(), avroUser.asBytes(), 0L, avroUser.compiledSchema, diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/containers/ImageTags.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/containers/ImageTags.java index 82d59230a5..d140544320 100644 --- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/containers/ImageTags.java +++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/containers/ImageTags.java @@ -1,7 +1,7 @@ package pl.allegro.tech.hermes.test.helper.containers; public class ImageTags { - static String confluentImagesTag() { + public static String confluentImagesTag() { return System.getProperty("confluentImagesTag", "6.1.0"); } } diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java index f5ffb06549..883e408a87 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.function.Predicate; +import java.util.stream.Collectors; import static com.jayway.awaitility.Awaitility.waitAtMost; import static java.util.stream.IntStream.range; @@ -84,7 +85,7 @@ public void shouldMoveOffsetInDryRunMode() throws InterruptedException { TestSubscriber subscriber = subscribers.createSubscriber(); Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); final Subscription subscription = hermes.initHelper().createSubscription(subscriptionWithRandomName(topic.getName(), subscriber.getEndpoint()).build()); - // we have 2 partitions, thus 4 messages to get 2 per partition + // 4 messages publishAndConsumeMessages(messages, topic, subscriber); Thread.sleep(2000); final OffsetRetransmissionDate retransmissionDate = new OffsetRetransmissionDate(OffsetDateTime.now()); @@ -100,8 +101,9 @@ public void shouldMoveOffsetInDryRunMode() throws InterruptedException { MultiDCOffsetChangeSummary summary = response.expectBody(MultiDCOffsetChangeSummary.class).returnResult().getResponseBody(); assert summary != null; - assertThat(summary.getPartitionOffsetListPerBrokerName().get(PRIMARY_KAFKA_CLUSTER_NAME).get(0).getOffset()) - .isEqualTo(2); + Long offsetSum = summary.getPartitionOffsetListPerBrokerName().get(PRIMARY_KAFKA_CLUSTER_NAME).stream() + .collect(Collectors.summarizingLong(PartitionOffset::getOffset)).getSum(); + assertThat(offsetSum).isEqualTo(4); subscriber.noMessagesReceived(); }