Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Method that was removed from kafka-clients 4.0
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.kafka.RestrictedConsumer.committed")
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object ConsumerResetProtection {
.toMap
.asJava

new ConsumerRecords[K, V](safe)
new ConsumerRecords[K, V](safe, java.util.Collections.emptyMap())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ import scala.util.control.NonFatal
progressTracker
}

@nowarn("msg=deprecated")
override def postStop(): Unit = {
// reply to outstanding requests is important if the actor is restarted
requests.foreach {
Expand Down
10 changes: 5 additions & 5 deletions docs/src/main/paradox/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Maven
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>confluent.version (eg. 7.9.2)</version>
<version>confluent.version (eg. 8.0.0)</version>
</dependency>
...
</dependencies>
Expand All @@ -84,14 +84,14 @@ Maven

sbt
: ```scala
libraryDependencies += "io.confluent" % "kafka-avro-serializer" % confluentAvroVersion, // eg. 7.9.2
libraryDependencies += "io.confluent" % "kafka-avro-serializer" % confluentAvroVersion, // eg. 8.0.0
resolvers += "Confluent Maven Repository" at "https://packages.confluent.io/maven/",
```

Gradle
: ```gradle
dependencies {
compile group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentAvroVersion // eg. 7.9.2
compile group: 'io.confluent', name: 'kafka-avro-serializer', version: confluentAvroVersion // eg. 8.0.0
}
repositories {
maven {
Expand All @@ -103,7 +103,7 @@ Gradle

### Producer

To create serializers that use the Schema Registry, its URL needs to be provided as configuration `AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the serializer and that serializer is used in the @apidoc[ProducerSettings$].
To create serializers that use the Schema Registry, its URL needs to be provided as configuration `AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the serializer and that serializer is used in the @apidoc[ProducerSettings$].

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala) { #imports #serializer }
Expand All @@ -115,7 +115,7 @@ Java

### Consumer

To create deserializers that use the Schema Registry, its URL needs to be provided as configuration `AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the deserializer and that deserializer is used in the @apidoc[ConsumerSettings$].
To create deserializers that use the Schema Registry, its URL needs to be provided as configuration `AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to the deserializer and that deserializer is used in the @apidoc[ConsumerSettings$].

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala) { #imports #de-serializer }
Expand Down
4 changes: 2 additions & 2 deletions project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object Versions {
val pekkoConnectorsKafkaVersionForDocs = "current"
val pekkoManagementVersionForDocs = "current"

val kafkaVersion = "3.9.1"
val kafkaVersion = "4.1.0"
val KafkaVersionForDocs = "37"

val scalaTestVersion = "3.2.19"
Expand All @@ -33,7 +33,7 @@ object Versions {
// this depends on Kafka, and should be upgraded to such latest version
// that depends on the same Kafka version, as is defined above
// See https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer?repo=confluent-packages
val confluentAvroSerializerVersion = "7.9.2"
val confluentAvroSerializerVersion = "8.0.0"
val confluentLibsExclusionRules = Seq(
ExclusionRule("log4j", "log4j"),
ExclusionRule("org.slf4j", "slf4j-log4j12"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.kafka.clients.producer.{ Producer => KProducer, ProducerRecord
import org.apache.kafka.common.ConsumerGroupState
import org.slf4j.{ Logger, LoggerFactory }

import scala.annotation.nowarn
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future }
Expand Down Expand Up @@ -119,6 +120,7 @@ abstract class KafkaSpec(_kafkaPort: Int, val zooKeeperPort: Int, actorSystem: A
*
* If the predicate does not hold after configured amount of time, throws an exception.
*/
@nowarn("cat=deprecation")
def waitUntilConsumerSummary(groupId: String)(predicate: PartialFunction[List[MemberDescription], Boolean]): Unit =
waitUntilConsumerGroup(groupId) { group =>
group.state() == ConsumerGroupState.STABLE &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import scala.collection.immutable
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
// #imports
import io.confluent.kafka.serializers.{ AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer, KafkaAvroSerializer }
import io.confluent.kafka.serializers.{ AbstractKafkaSchemaSerDeConfig, KafkaAvroDeserializer, KafkaAvroSerializer }
import org.apache.avro.specific.SpecificRecord
// #imports
import org.apache.kafka.clients.consumer.ConsumerConfig
Expand All @@ -59,7 +59,7 @@ class SchemaRegistrySerializationSpec extends DocsSpecBase with TestcontainersKa
// #serializer #de-serializer

val kafkaAvroSerDeConfig = Map[String, Any](
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl,
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl,
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> true.toString)
// #serializer #de-serializer

Expand Down Expand Up @@ -217,7 +217,7 @@ class SchemaRegistrySerializationSpec extends DocsSpecBase with TestcontainersKa

private def specificRecordConsumerSettings(group: String): ConsumerSettings[String, SpecificRecord] = {
val kafkaAvroSerDeConfig = Map[String, Any] {
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl
}
val kafkaAvroDeserializer = new KafkaAvroDeserializer()
kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig.asJava, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val commitInterval = 200.millis
Expand Down Expand Up @@ -117,7 +117,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "skip"),
consumer.message(partition, "send"))

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val commitInterval = 200.millis
Expand Down Expand Up @@ -157,7 +157,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds)
Expand Down Expand Up @@ -191,7 +191,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds)
Expand Down Expand Up @@ -226,7 +226,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
val producerRecordsPerInput = 2
val totalProducerRecords = elements.size * producerRecordsPerInput

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(elements.size.longValue())
Expand Down Expand Up @@ -259,7 +259,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
val consumer = FakeConsumer(groupId, topic, startOffset = 1616L)
val message = consumer.message(partition, "increment the offset")

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(1)
Expand Down Expand Up @@ -292,7 +292,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system)
Expand Down Expand Up @@ -328,7 +328,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
// choose a large commit interval so that completion happens before
Expand Down Expand Up @@ -364,7 +364,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val commitInterval = 5.seconds
Expand Down Expand Up @@ -404,7 +404,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val commitInterval = 5.seconds
Expand Down Expand Up @@ -441,7 +441,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 2"))

// this producer does not auto complete messages
val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](false, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(1L)
Expand Down Expand Up @@ -480,7 +480,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))

val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](false, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(1L)
Expand Down Expand Up @@ -523,7 +523,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))

val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](false, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
// choose a large commit interval so that completion happens before
Expand Down Expand Up @@ -569,7 +569,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(2L)
Expand Down Expand Up @@ -602,7 +602,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(2L)
Expand Down Expand Up @@ -640,7 +640,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 1"),
consumer.message(partition, "value 2"))

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system)
Expand Down Expand Up @@ -674,7 +674,7 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
}

it should "shut down without elements" in assertAllStagesStopped {
val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer = new MockProducer[String, String](true, None.orNull, new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxInterval(1.second)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.pekko.Done
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.{ Metric, MetricName, PartitionInfo, TopicPartition }
import org.apache.kafka.common.metrics.KafkaMetric
import org.slf4j.{ Logger, LoggerFactory }

import scala.concurrent.Promise
Expand Down Expand Up @@ -50,7 +51,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def subscribe(pattern: java.util.regex.Pattern, callback: ConsumerRebalanceListener): Unit = ???
override def subscribe(pattern: java.util.regex.Pattern): Unit = ???
override def unsubscribe(): Unit = ???
override def poll(timeout: Long): ConsumerRecords[K, V] = ???
override def commitSync(): Unit = ???
override def commitSync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = ???
override def commitAsync(): Unit = ???
Expand All @@ -63,7 +63,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def seekToEnd(partitions: java.util.Collection[TopicPartition]): Unit = ???
override def position(partition: TopicPartition): Long = ???
override def position(partition: TopicPartition, timeout: java.time.Duration): Long = ???
override def committed(partition: TopicPartition): OffsetAndMetadata = ???
override def metrics(): java.util.Map[MetricName, _ <: Metric] = ???
override def partitionsFor(topic: String): java.util.List[PartitionInfo] = ???
override def listTopics(): java.util.Map[String, java.util.List[PartitionInfo]] = ???
Expand All @@ -81,13 +80,13 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def endOffsets(
partitions: java.util.Collection[TopicPartition]): java.util.Map[TopicPartition, java.lang.Long] = ???
override def close(): Unit = {}
override def close(options: CloseOptions): Unit = {}
override def close(timeout: java.time.Duration): Unit = {}
override def wakeup(): Unit = ???

override def commitSync(timeout: java.time.Duration): Unit = ???
override def commitSync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata],
timeout: java.time.Duration): Unit = ???
override def committed(partition: TopicPartition, timeout: java.time.Duration): OffsetAndMetadata = ???
override def committed(partitions: util.Set[TopicPartition]): util.Map[TopicPartition, OffsetAndMetadata] = ???
override def committed(partitions: util.Set[TopicPartition],
timeout: Duration): util.Map[TopicPartition, OffsetAndMetadata] = ???
Expand All @@ -102,4 +101,11 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def groupMetadata(): ConsumerGroupMetadata = ???
override def enforceRebalance(): Unit = ???
override def currentLag(partition: TopicPartition): java.util.OptionalLong = ???

override def subscribe(sp: SubscriptionPattern): Unit = ???
override def subscribe(sp: SubscriptionPattern, listener: ConsumerRebalanceListener): Unit = ???

override def registerMetricForSubscription(metric: KafkaMetric): Unit = ???
override def unregisterMetricFromSubscription(metric: KafkaMetric): Unit = ???

}
Loading