diff --git a/core/src/main/mima-filters/1.1.x.backwards.excludes/kafka-clients-4.0-upgrade.excludes b/core/src/main/mima-filters/1.1.x.backwards.excludes/kafka-clients-4.0-upgrade.excludes new file mode 100644 index 00000000..451d90a3 --- /dev/null +++ b/core/src/main/mima-filters/1.1.x.backwards.excludes/kafka-clients-4.0-upgrade.excludes @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Method that was removed from kafka-clients 4.0 +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.RestrictedConsumer.committed") diff --git a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala index 55006009..8387fa95 100644 --- a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala +++ b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala @@ -68,7 +68,7 @@ object ConsumerResetProtection { .toMap .asJava - new ConsumerRecords[K, V](safe) + new ConsumerRecords[K, V](safe, java.util.Collections.emptyMap()) } /** diff --git a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala index b837dd42..848f5b36 100644 --- a/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala +++ b/core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala @@ -445,6 +445,7 @@ import scala.util.control.NonFatal progressTracker } + @nowarn("msg=deprecated") override def postStop(): Unit = { // reply to outstanding requests is important if the actor is restarted requests.foreach { diff --git a/docs/src/main/paradox/serialization.md b/docs/src/main/paradox/serialization.md index 02b5b696..f6ca3079 100644 --- a/docs/src/main/paradox/serialization.md +++ b/docs/src/main/paradox/serialization.md @@ -66,7 +66,7 @@ Maven io.confluent kafka-avro-serializer - confluent.version (eg. 7.9.2) + confluent.version (eg. 8.0.0) ... @@ -84,14 +84,14 @@ Maven sbt : ```scala - libraryDependencies += "io.confluent" % "kafka-avro-serializer" % confluentAvroVersion, // eg. 7.9.2 + libraryDependencies += "io.confluent" % "kafka-avro-serializer" % confluentAvroVersion, // eg. 8.0.0 resolvers += "Confluent Maven Repository" at "https://packages.confluent.io/maven/", ``` Gradle : ```gradle dependencies { - compile group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentAvroVersion // eg. 7.9.2 + compile group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentAvroVersion // eg. 8.0.0 } repositories { maven { @@ -103,7 +103,7 @@ Gradle ### Producer -To create serializers that use the Schema Registry, its URL needs to be provided as configuration `AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the serializer and that serializer is used in the @apidoc[ProducerSettings$]. +To create serializers that use the Schema Registry, its URL needs to be provided as configuration `AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the serializer and that serializer is used in the @apidoc[ProducerSettings$]. Scala : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala) { #imports #serializer } @@ -115,7 +115,7 @@ Java ### Consumer -To create deserializers that use the Schema Registry, its URL needs to be provided as configuration `AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the deserializer and that deserializer is used in the @apidoc[ConsumerSettings$]. +To create deserializers that use the Schema Registry, its URL needs to be provided as configuration `AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the deserializer and that deserializer is used in the @apidoc[ConsumerSettings$]. Scala : @@ snip [snip](/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala) { #imports #de-serializer } diff --git a/project/Versions.scala b/project/Versions.scala index afd691f9..c56d586a 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -22,7 +22,7 @@ object Versions { val pekkoConnectorsKafkaVersionForDocs = "current" val pekkoManagementVersionForDocs = "current" - val kafkaVersion = "3.9.1" + val kafkaVersion = "4.1.0" val KafkaVersionForDocs = "37" val scalaTestVersion = "3.2.19" @@ -33,7 +33,7 @@ object Versions { // this depends on Kafka, and should be upgraded to such latest version // that depends on the same Kafka version, as is defined above // See https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer?repo=confluent-packages - val confluentAvroSerializerVersion = "7.9.2" + val confluentAvroSerializerVersion = "8.0.0" val confluentLibsExclusionRules = Seq( ExclusionRule("log4j", "log4j"), ExclusionRule("org.slf4j", "slf4j-log4j12"), diff --git a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala index 3ae23870..1f77271b 100644 --- a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala +++ b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/scaladsl/KafkaSpec.scala @@ -36,6 +36,7 @@ import org.apache.kafka.clients.producer.{ Producer => KProducer, ProducerRecord import org.apache.kafka.common.ConsumerGroupState import org.slf4j.{ Logger, LoggerFactory } +import scala.annotation.nowarn import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.{ Await, ExecutionContext, Future } @@ -119,6 +120,7 @@ abstract class KafkaSpec(_kafkaPort: Int, val zooKeeperPort: Int, actorSystem: A * * If the predicate does not hold after configured amount of time, throws an exception. */ + @nowarn("cat=deprecation") def waitUntilConsumerSummary(groupId: String)(predicate: PartialFunction[List[MemberDescription], Boolean]): Unit = waitUntilConsumerGroup(groupId) { group => group.state() == ConsumerGroupState.STABLE && diff --git a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala index e43ea558..5f643fb1 100644 --- a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala +++ b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala @@ -35,7 +35,7 @@ import scala.collection.immutable import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ // #imports -import io.confluent.kafka.serializers.{ AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer, KafkaAvroSerializer } +import io.confluent.kafka.serializers.{ AbstractKafkaSchemaSerDeConfig, KafkaAvroDeserializer, KafkaAvroSerializer } import org.apache.avro.specific.SpecificRecord // #imports import org.apache.kafka.clients.consumer.ConsumerConfig @@ -59,7 +59,7 @@ class SchemaRegistrySerializationSpec extends DocsSpecBase with TestcontainersKa // #serializer #de-serializer val kafkaAvroSerDeConfig = Map[String, Any]( - AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl, + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl, KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> true.toString) // #serializer #de-serializer @@ -217,7 +217,7 @@ class SchemaRegistrySerializationSpec extends DocsSpecBase with TestcontainersKa private def specificRecordConsumerSettings(group: String): ConsumerSettings[String, SpecificRecord] = { val kafkaAvroSerDeConfig = Map[String, Any] { - AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl + AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl } val kafkaAvroDeserializer = new KafkaAvroDeserializer() kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig.asJava, false) diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala index f0d54d35..8ba35d38 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkSpec.scala @@ -81,7 +81,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val commitInterval = 200.millis @@ -117,7 +117,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "skip"), consumer.message(partition, "send")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val commitInterval = 200.millis @@ -157,7 +157,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds) @@ -191,7 +191,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds) @@ -226,7 +226,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) val producerRecordsPerInput = 2 val totalProducerRecords = elements.size * producerRecordsPerInput - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(elements.size.longValue()) @@ -259,7 +259,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) val consumer = FakeConsumer(groupId, topic, startOffset = 1616L) val message = consumer.message(partition, "increment the offset") - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(1) @@ -292,7 +292,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system) @@ -328,7 +328,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) // choose a large commit interval so that completion happens before @@ -364,7 +364,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val commitInterval = 5.seconds @@ -404,7 +404,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val commitInterval = 5.seconds @@ -441,7 +441,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 2")) // this producer does not auto complete messages - val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](false, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(1L) @@ -480,7 +480,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](false, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(1L) @@ -523,7 +523,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](false, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) // choose a large commit interval so that completion happens before @@ -569,7 +569,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(2L) @@ -602,7 +602,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxBatch(2L) @@ -640,7 +640,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) consumer.message(partition, "value 1"), consumer.message(partition, "value 2")) - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system) @@ -674,7 +674,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem) } it should "shut down without elements" in assertAllStagesStopped { - val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer) .withProducer(producer) val committerSettings = CommitterSettings(system).withMaxInterval(1.second) diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala index 93073673..9d38fa7f 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerDummy.scala @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.pekko.Done import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo, TopicPartition } +import org.apache.kafka.common.metrics.KafkaMetric import org.slf4j.{ Logger, LoggerFactory } import scala.concurrent.Promise @@ -50,7 +51,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] { override def subscribe(pattern: java.util.regex.Pattern, callback: ConsumerRebalanceListener): Unit = ??? override def subscribe(pattern: java.util.regex.Pattern): Unit = ??? override def unsubscribe(): Unit = ??? - override def poll(timeout: Long): ConsumerRecords[K, V] = ??? override def commitSync(): Unit = ??? override def commitSync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = ??? override def commitAsync(): Unit = ??? @@ -63,7 +63,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] { override def seekToEnd(partitions: java.util.Collection[TopicPartition]): Unit = ??? override def position(partition: TopicPartition): Long = ??? override def position(partition: TopicPartition, timeout: java.time.Duration): Long = ??? - override def committed(partition: TopicPartition): OffsetAndMetadata = ??? override def metrics(): java.util.Map[MetricName, _ <: Metric] = ??? override def partitionsFor(topic: String): java.util.List[PartitionInfo] = ??? override def listTopics(): java.util.Map[String, java.util.List[PartitionInfo]] = ??? @@ -81,13 +80,13 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] { override def endOffsets( partitions: java.util.Collection[TopicPartition]): java.util.Map[TopicPartition, java.lang.Long] = ??? override def close(): Unit = {} + override def close(options: CloseOptions): Unit = {} override def close(timeout: java.time.Duration): Unit = {} override def wakeup(): Unit = ??? override def commitSync(timeout: java.time.Duration): Unit = ??? override def commitSync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata], timeout: java.time.Duration): Unit = ??? - override def committed(partition: TopicPartition, timeout: java.time.Duration): OffsetAndMetadata = ??? override def committed(partitions: util.Set[TopicPartition]): util.Map[TopicPartition, OffsetAndMetadata] = ??? override def committed(partitions: util.Set[TopicPartition], timeout: Duration): util.Map[TopicPartition, OffsetAndMetadata] = ??? @@ -102,4 +101,11 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] { override def groupMetadata(): ConsumerGroupMetadata = ??? override def enforceRebalance(): Unit = ??? override def currentLag(partition: TopicPartition): java.util.OptionalLong = ??? + + override def subscribe(sp: SubscriptionPattern): Unit = ??? + override def subscribe(sp: SubscriptionPattern, listener: ConsumerRebalanceListener): Unit = ??? + + override def registerMetricForSubscription(metric: KafkaMetric): Unit = ??? + override def unregisterMetricFromSubscription(metric: KafkaMetric): Unit = ??? + } diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala index e922a8cd..05fa441e 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerMock.scala @@ -26,6 +26,7 @@ import org.mockito.stubbing.Answer import org.mockito.verification.VerificationMode import org.mockito.{ ArgumentMatchers, Mockito } +import scala.annotation.nowarn import scala.collection.immutable.Seq import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ @@ -113,7 +114,7 @@ class ConsumerMock[K, V](handler: ConsumerMock.CommitHandler = new ConsumerMock. if (releaseCommitCallbacks.get()) { handler.onComplete() } - new ConsumerRecords[K, V](records.asJava) + new ConsumerRecords[K, V](records.asJava, java.util.Collections.emptyMap()) } }) Mockito @@ -168,6 +169,7 @@ class ConsumerMock[K, V](handler: ConsumerMock.CommitHandler = new ConsumerMock. responses :+= records } + @nowarn("msg=deprecated") def verifyClosed(mode: VerificationMode = Mockito.times(1)) = verify(mock, mode).close(ConsumerMock.closeTimeout.toJava) @@ -207,7 +209,9 @@ class FailingConsumerMock[K, V](throwable: Throwable, failOnCallNumber: Int*) ex callNumber = callNumber + 1 if (failOnCallNumber.contains(callNumber)) throw throwable - else new ConsumerRecords[K, V](Map.empty[TopicPartition, java.util.List[ConsumerRecord[K, V]]].asJava) + else new ConsumerRecords[K, V]( + Map.empty[TopicPartition, java.util.List[ConsumerRecord[K, V]]].asJava, + java.util.Collections.emptyMap()) } }) } diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala index eca227ad..48a68b92 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerProgressTrackingSpec.scala @@ -30,7 +30,7 @@ class ConsumerProgressTrackingSpec extends AnyFlatSpecLike with Matchers with Lo private val tp = new TopicPartition("t", 0) private val m1 = new ConsumerRecord[String, String](tp.topic(), tp.partition(), 10L, "k1", "kv") def asConsumerRecords[K, V](tp: TopicPartition, records: ConsumerRecord[K, V]*): ConsumerRecords[K, V] = { - new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava) + new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava, java.util.Collections.emptyMap()) } private val records = asConsumerRecords(tp, m1) @@ -86,7 +86,7 @@ class ConsumerProgressTrackingSpec extends AnyFlatSpecLike with Matchers with Lo new ConsumerRecords[String, String]( Map( tp2 -> List(new ConsumerRecord[String, String](tp2.topic(), tp2.partition(), 10L, "k1", - "kv")).asJava).asJava)) + "kv")).asJava).asJava, java.util.Collections.emptyMap())) tracker.receivedMessages.map(extractOffsetFromSafe) should be(Map(tp -> 10L)) // no change to the committing tracker.commitRequested.map(extractOffset) should be(Map(tp -> 0L)) diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala index 8da09266..0cc90fd2 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerResetProtectionSpec.scala @@ -51,7 +51,7 @@ class ConsumerResetProtectionSpec val m1 = new ConsumerRecord(tp.topic(), tp.partition(), 10L, "k1", "kv") def asConsumerRecords[K, V](records: ConsumerRecord[K, V]*): ConsumerRecords[K, V] = { - new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava) + new ConsumerRecords[K, V](Map(tp -> records.asJava).asJava, java.util.Collections.emptyMap()) } val records = asConsumerRecords(m1) @@ -147,7 +147,8 @@ class ConsumerResetProtectionSpec new ConsumerRecords( Map( tp -> List(m1).asJava, - tp1 -> List(new ConsumerRecord(tp1.topic(), tp1.partition(), 10L, "k1", "kv")).asJava).asJava)) + tp1 -> List(new ConsumerRecord(tp1.topic(), tp1.partition(), 10L, "k1", "kv")).asJava).asJava, + java.util.Collections.emptyMap())) shouldHaveEqualRecords(records, protectedRecords) } @@ -169,7 +170,8 @@ class ConsumerResetProtectionSpec tp -> List( new ConsumerRecord(tp.topic(), tp.partition(), 101L, "k1", "kv"), new ConsumerRecord(tp.topic(), tp.partition(), 1L, "k2", "kv"), - new ConsumerRecord(tp.topic(), tp.partition(), 102L, "k1", "kv")).asJava).asJava)) + new ConsumerRecord(tp.topic(), tp.partition(), 102L, "k1", "kv")).asJava).asJava, + java.util.Collections.emptyMap())) records.count() should be(3) records.records(tp).asScala.map(_.offset()) should be(Seq(101L, 1L, 102L)) } diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala index 36158400..24d014f9 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala @@ -815,7 +815,7 @@ object PartitionedSourceSpec { if (data2.nonEmpty) { log.debug(s"poll result $data2") } - new ConsumerRecords[K, V](data2.asJava) + new ConsumerRecords[K, V](data2.asJava, java.util.Collections.emptyMap()) } override def position(partition: TopicPartition): Long = 0 override def position(partition: TopicPartition, timeout: java.time.Duration): Long = 0 diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala index d6a0ec04..12e93360 100644 --- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala +++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala @@ -158,7 +158,7 @@ class ProducerSpec(_system: ActorSystem) assertAllStagesStopped { val input = (1 to 10).map { recordAndMetadata(_)._1 } - val mockProducer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer) + val mockProducer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer) val fut: Future[Done] = Source(input).runWith(Producer.plainSink(settings.withProducer(mockProducer)))