Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ jobs:
- name: Run Compile for Kafka 1.x.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt -J-Xmx6144m ";++${{ matrix.scala }} ;clean ;kafka1x/compile"

- name: Run Compile for Kafka 4.x.x, Java ${{ matrix.java }} and Scala ${{ matrix.scala }}
run: sbt -J-Xmx6144m ";++${{ matrix.scala }} ;clean ;kafka4x/compile"

tests:
name: scala-${{ matrix.scala }} jdk-${{ matrix.java }} tests
runs-on: ubuntu-latest
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@ Work in progress!
8. [Maintainers](#maintainers)
9. [License](#license)

## Getting Started with Kafka 4.0.0 or above

In SBT:

```scala
libraryDependencies += "io.monix" %% "monix-kafka-4x" % "<tbd>"
```

Also add a dependency override:

```scala
dependencyOverrides += "org.apache.kafka" % "kafka-clients" % "4.0.0"
```

## Getting Started with Kafka 1.0.x or above

In SBT:
Expand Down
14 changes: 11 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,21 @@ lazy val commonDependencies = Seq(
"org.scalacheck" %% "scalacheck" % "1.15.2" % "test")
)

ThisBuild / scalaVersion := "2.13.8"
ThisBuild / crossScalaVersions := List("2.12.15", "2.13.8")
ThisBuild / scalaVersion := "2.13.15"
ThisBuild / crossScalaVersions := List("2.12.15", "2.13.15")

lazy val monixKafka = project.in(file("."))
.settings(sharedSettings)
.settings(doNotPublishArtifact)
.aggregate(kafka1x, kafka11, kafka10)
.aggregate(kafka4x, kafka1x, kafka11, kafka10)

lazy val kafka4x = project.in(file("kafka-4.0.x"))
.settings(commonDependencies)
.settings(mimaSettings("monix-kafka-4x"))
.settings(
name := "monix-kafka-4x",
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "4.0.0" exclude("org.slf4j","slf4j-log4j12") exclude("log4j", "log4j")
)

lazy val kafka1x = project.in(file("kafka-1.0.x"))
.settings(commonDependencies)
Expand Down
81 changes: 81 additions & 0 deletions kafka-4.0.x/src/main/resources/monix/kafka/default.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
kafka {
bootstrap.servers = "localhost:9092"
client.id = ""

# E.g. "org.apache.kafka.clients.producer.internals.DefaultPartitioner"
partitioner.class = null

acks = "1"
buffer.memory = 33554432
compression.type = "none"
retries = 0
max.in.flight.requests.per.connection = 5

ssl.key.password = null
ssl.keystore.password = null
ssl.keystore.location = null
ssl.truststore.password = null
ssl.truststore.location = null

batch.size = 16384
connections.max.idle.ms = 540000
linger.ms = 0
max.block.ms = 60000
max.request.size = 1048576

receive.buffer.bytes = 32768
request.timeout.ms = 40000

sasl.kerberos.service.name = null
sasl.mechanism = "GSSAPI"

security.protocol = "PLAINTEXT"
send.buffer.bytes = 131072
ssl.enabled.protocols = "TLSv1.2,TLSv1.1,TLSv1"
ssl.keystore.type = "JKS"
ssl.protocol = "TLS"
ssl.provider = null
ssl.truststore.type = "JKS"

reconnect.backoff.ms = 50
retry.backoff.ms = 100

metadata.max.age.ms = 300000

metric.reporters = ""
metrics.num.samples = 2
metrics.sample.window.ms = 30000

# Consumer specific settings
client.rack = ""
fetch.min.bytes = 1
fetch.max.bytes = 52428800
group.id = ""
heartbeat.interval.ms = 3000
max.partition.fetch.bytes = 1048576
auto.offset.reset = "latest"
# Disabled to use back-pressure or manual commits instead
enable.auto.commit = false
exclude.internal.topics = true
receive.buffer.bytes = 65536
check.crcs = true
fetch.max.wait.ms = 500
# Default values for polling
# See https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
session.timeout.ms = 10000
max.poll.records = 500
max.poll.interval.ms = 300000

# Monix specific settings

# Number of requests that KafkaProducerSink
# can push in parallel
monix.producer.sink.parallelism = 100
# Triggers either seekToEnd or seektoBeginning when the observable starts
# Possible values: end, beginning, no-seek
monix.observable.seek.onStart = "no-seek"
# Possible values: sync, async
monix.observable.commit.type = "sync"
# Possible values: before-ack, after-ack or no-ack
monix.observable.commit.order = "after-ack"
}
35 changes: 35 additions & 0 deletions kafka-4.0.x/src/main/scala/monix/kafka/Commit.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2014-2022 by The Monix Project Developers.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.kafka

import monix.eval.Task
import org.apache.kafka.common.TopicPartition

/** Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context.
*/
trait Commit {
def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit]
def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit]
}

private[kafka] object Commit {

val empty: Commit = new Commit {
override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit
override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit
}
}
23 changes: 23 additions & 0 deletions kafka-4.0.x/src/main/scala/monix/kafka/CommittableMessage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2014-2022 by The Monix Project Developers.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord

/** Represents data consumed from Kafka and [[CommittableOffset]] built from it
*/
final case class CommittableMessage[K, V](record: ConsumerRecord[K, V], committableOffset: CommittableOffset)
54 changes: 54 additions & 0 deletions kafka-4.0.x/src/main/scala/monix/kafka/CommittableOffset.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2014-2022 by The Monix Project Developers.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.kafka

import monix.eval.Task
import org.apache.kafka.common.TopicPartition

/** Represents offset for specified topic and partition that can be
* committed synchronously by [[commitSync]] method call or asynchronously by one of commitAsync methods.
* To achieve good performance it is recommended to use batched commit with
* [[CommittableOffsetBatch]] class.
*
* @param topicPartition is the topic and partition identifier
*
* @param offset is the offset to be committed
*
* @param commitCallback is the set of callbacks for batched commit realized as closures
* in [[KafkaConsumerObservable]] context.
*/
final class CommittableOffset private[kafka] (
val topicPartition: TopicPartition,
val offset: Long,
private[kafka] val commitCallback: Commit) {

/** Synchronously commits [[offset]] for the [[topicPartition]] to Kafka. It is recommended
* to use batched commit with [[CommittableOffsetBatch]] class.
*/
def commitSync(): Task[Unit] = commitCallback.commitBatchSync(Map(topicPartition -> offset))

/** Asynchronously commits [[offset]] to Kafka. It is recommended
* to use batched commit with [[CommittableOffsetBatch]] class.
*/
def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset))
}

object CommittableOffset {

private[kafka] def apply(topicPartition: TopicPartition, offset: Long, commitCallback: Commit): CommittableOffset =
new CommittableOffset(topicPartition, offset, commitCallback)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2014-2022 by The Monix Project Developers.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.kafka

import monix.eval.Task
import org.apache.kafka.common.TopicPartition

/** Batch of Kafka offsets which can be committed together.
* Can be built from offsets sequence by [[CommittableOffsetBatch#apply]] method.
* You can also use [[CommittableOffsetBatch#empty]] method to create empty batch and
* add offsets to it using [[updated]] method.
*
* WARNING: Order of the offsets is important. Only the last added offset
* for topic and partition will be committed to Kafka.
*
* @param offsets is the offsets batch for a provided topics and partitions.
* Make sure that each of them was received from one [[KafkaConsumerObservable]].
*
* @param commitCallback is the set of callbacks for batched commit realized as closure
* in [[KafkaConsumerObservable]] context. This parameter is obtained from the last [[CommittableOffset]]
* added to batch.
*/
final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartition, Long], commitCallback: Commit) {

/** Synchronously commits [[offsets]] to Kafka
*/
def commitSync(): Task[Unit] = commitCallback.commitBatchSync(offsets)

/** Asynchronously commits [[offsets]] to Kafka
*/
def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(offsets)

/** Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified
* for same topic and partition.
*/
def updated(committableOffset: CommittableOffset): CommittableOffsetBatch =
new CommittableOffsetBatch(
offsets.updated(committableOffset.topicPartition, committableOffset.offset),
committableOffset.commitCallback
)
}

object CommittableOffsetBatch {

/** Creates empty [[CommittableOffsetBatch]]. Can be used as neutral element in fold:
* {{{
* offsets.foldLeft(CommittableOffsetBatch.empty)(_ updated _)
* }}}
*/
val empty: CommittableOffsetBatch = new CommittableOffsetBatch(Map.empty, Commit.empty)

/** Builds [[CommittableOffsetBatch]] from offsets sequence. Be careful with
* sequence order. If there is more than once offset for a topic and partition in the
* sequence then the last one will remain.
*/
def apply(offsets: Seq[CommittableOffset]): CommittableOffsetBatch =
if (offsets.nonEmpty) {
val aggregatedOffsets = offsets.foldLeft(Map.empty[TopicPartition, Long]) { (acc, o) =>
acc.updated(o.topicPartition, o.offset)
}
new CommittableOffsetBatch(aggregatedOffsets, offsets.head.commitCallback)
} else {
empty
}

/** Builds [[CommittableOffsetBatch]] list from offsets sequence by merging the offsets
* that have the same commit callback. This will help when the committable offsets are
* from different consumers.
* {{{
* CommittableOffsetBatch.mergeByCommitCallback(offsets)
* }}}
*/
def mergeByCommitCallback(committableOffsets: Seq[CommittableOffset]): List[CommittableOffsetBatch] = {
if (committableOffsets.nonEmpty) {
committableOffsets
.groupBy(_.commitCallback)
.values
.map(CommittableOffsetBatch(_))
.toList
} else {
List.empty
}
}
}
Loading