Skip to content

Commit c8bca4c

Browse files
authored
feat: upgrade to Kafka 4.0.0 (#562)
feat: upgrade for Kafka 4.0
1 parent b79d0ec commit c8bca4c

File tree

22 files changed

+394
-388
lines changed

22 files changed

+394
-388
lines changed

.github/workflows/coverage-report.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
- uses: actions/setup-java@v4
1414
with:
1515
distribution: 'temurin'
16-
java-version: 8
16+
java-version: 17
1717
cache: 'sbt'
1818
- uses: sbt/setup-sbt@v1
1919
- name: Compile

.github/workflows/release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
- uses: actions/setup-java@v4
1515
with:
1616
distribution: 'temurin'
17-
java-version: 8
17+
java-version: 17
1818
cache: 'sbt'
1919
- uses: sbt/setup-sbt@v1
2020
- name: Publish artifacts

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
- uses: actions/setup-java@v4
1414
with:
1515
distribution: 'temurin'
16-
java-version: 8
16+
java-version: 17
1717
cache: 'sbt'
1818
- uses: sbt/setup-sbt@v1
1919
- name: Check formatting

README.md

Lines changed: 54 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,57 @@ A library that provides an in-memory Kafka instance to run your tests against.
1111

1212
Inspired by [kafka-unit](https://github.com/chbatey/kafka-unit).
1313

14+
1. [Version compatibility matrix](#version-compatibility-matrix)
15+
2. [Upgrade notes](#upgrade-notes)
16+
3. [Usage](#usage)
17+
1. [embedded-kafka](#embedded-kafka-1)
18+
2. [embedded-kafka-streams](#embedded-kafka-streams)
19+
3. [embedded-kafka-connect](#embedded-kafka-connect)
20+
21+
---
22+
1423
## Version compatibility matrix
1524

16-
embedded-kafka is available on Maven Central, compiled for Scala 2.12, 2.13 and Scala 3 (since v3.4.0.1).
25+
The library is available on Maven Central.
1726

1827
Versions match the version of Kafka they're built against.
1928

20-
## Important known limitation (prior to v2.8.0)
29+
| embedded-kafka version | Kafka version | Scala versions | Java version |
30+
|------------------------|---------------|-----------------|--------------|
31+
| 4.0.0 | 4.0.0 | 2.13, 3.3 | 17+ |
32+
| 3.4.0.1 - 3.9.0 | 3.4.0 - 3.9.0 | 2,12, 2.13, 3.3 | 8+ |
33+
34+
_Note that [prior to v2.8.0](https://github.com/apache/kafka/pull/10174) Kafka core was inlining the Scala library, so you couldn't use a different Scala **patch** version than [what Kafka used to compile its jars](https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L30)._
2135

22-
[Prior to v2.8.0](https://github.com/apache/kafka/pull/10174) Kafka core was inlining the Scala library, so you couldn't use a different Scala **patch** version than [what Kafka used to compile its jars](https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L30)!
36+
---
2337

24-
## Breaking change: new package name
38+
## Upgrade notes
2539

26-
From v2.8.0 onwards package name has been updated to reflect the library group id (i.e. `io.github.embeddedkafka`).
40+
### 4.0.0
2741

42+
Major changes:
43+
- **Java 17+:** as [Kafka Server 4.x requires Java 17+](https://kafka.apache.org/40/documentation/compatibility.html), so does embedded-kafka even though Kafka Clients/Streams are still available for Java 11+.
44+
- **Scala 2.13+**: Kafka is not compiled against Scala 2.12 anymore, so does embedded-kafka.
45+
- embedded-kafka 4.0.0 starts a Kafka server in combined mode (broker and controller) and no more Zookeeper.
46+
47+
As a user, you'll have to change your code to use `controllerPort` instead of `zookeeperPort` in places you were doing so:
48+
```diff
49+
- EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 54321)
50+
+ EmbeddedKafkaConfig(kafkaPort = 12345, controllerPort = 54321)
51+
```
52+
53+
### 2.8.0
54+
55+
**Package name change:** from v2.8.0 onwards package name has been updated to reflect the library group id (i.e. `io.github.embeddedkafka`).
2856
Aliases to the old package name have been added, along with a one-time [Scalafix rule](https://github.com/embeddedkafka/embedded-kafka-scalafix) to ensure the smoothest migration.
2957

30-
## embedded-kafka
58+
---
59+
60+
## Usage
61+
62+
### embedded-kafka
3163

32-
### How to use
64+
#### How to use
3365

3466
* In your `build.sbt` file add the following dependency (replace `x.x.x` with the appropriate version): `"io.github.embeddedkafka" %% "embedded-kafka" % "x.x.x" % Test`
3567
* Have your class extend the `EmbeddedKafka` trait.
@@ -52,11 +84,11 @@ class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka {
5284
}
5385
```
5486

55-
* In-memory Zookeeper and Kafka will be instantiated respectively on port 6000 and 6001 and automatically shutdown at the end of the test.
87+
* In-memory Kafka broker and controller (combined mode) will be instantiated respectively on port 6000 and 6001 and automatically shutdown at the end of the test.
5688

57-
### Use without the `withRunningKafka` method
89+
#### Use without the `withRunningKafka` method
5890

59-
A `EmbeddedKafka` companion object is provided for usage without extending the `EmbeddedKafka` trait. Zookeeper and Kafka can be started and stopped in a programmatic way. This is the recommended usage if you have more than one test in your file and you don't want to start and stop Kafka and Zookeeper on every test.
91+
A `EmbeddedKafka` companion object is provided for usage without extending the `EmbeddedKafka` trait. Kafka can be started and stopped in a programmatic way. This is the recommended usage if you have more than one test in your file and you don't want to start and stop Kafka on every test.
6092

6193
```scala
6294
class MySpec extends AnyWordSpecLike with Matchers {
@@ -76,9 +108,9 @@ class MySpec extends AnyWordSpecLike with Matchers {
76108

77109
Please note that in order to avoid Kafka instances not shutting down properly, it's recommended to call `EmbeddedKafka.stop()` in a `after` block or in a similar teardown logic.
78110

79-
### Configuration
111+
#### Configuration
80112

81-
It's possible to change the ports on which Zookeeper and Kafka are started by providing an implicit `EmbeddedKafkaConfig`
113+
It's possible to change the ports on which Kafka broker and controller are started by providing an implicit `EmbeddedKafkaConfig`
82114

83115
```scala
84116
class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka {
@@ -96,7 +128,7 @@ class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka {
96128
}
97129
```
98130

99-
If you want to run ZooKeeper and Kafka on arbitrary available ports, you can
131+
If you want to run Kafka broker and controller on arbitrary available ports, you can
100132
use the `withRunningKafkaOnFoundPort` method. This is useful to make tests more
101133
reliable, especially when running tests in parallel or on machines where other
102134
tests or services may be running with port numbers you can't control.
@@ -107,7 +139,7 @@ class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka {
107139
"runs with embedded kafka on arbitrary available ports" should {
108140

109141
"work" in {
110-
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
142+
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, controllerPort = 0)
111143

112144
withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
113145
// now a kafka broker is listening on actualConfig.kafkaPort
@@ -153,7 +185,7 @@ Those properties will be added to the broker configuration, be careful some prop
153185
in case of conflict the `customBrokerProperties` values will take precedence. Please look at the source code to see what these properties
154186
are.
155187

156-
### Utility methods
188+
#### Utility methods
157189

158190
The `EmbeddedKafka` trait provides also some utility methods to interact with the embedded kafka, in order to set preconditions or verifications in your specs:
159191

@@ -165,17 +197,17 @@ def consumeFirstMessageFrom(topic: String): String
165197
def createCustomTopic(topic: String, topicConfig: Map[String,String], partitions: Int, replicationFactor: Int): Unit
166198
```
167199

168-
### Custom producers
200+
#### Custom producers
169201

170202
Given implicits `Deserializer`s for each type and an `EmbeddedKafkaConfig` it is possible to use `withProducer[A, B, R] { your code here }` where R is the code return type.
171203

172204
For more information about how to use the utility methods, you can either look at the Scaladocs or at the tests of this project.
173205

174-
### Custom consumers
206+
#### Custom consumers
175207

176208
Given implicits `Serializer`s for each type and an `EmbeddedKafkaConfig` it is possible to use `withConsumer[A, B, R] { your code here }` where R is the code return type.
177209

178-
### Loan methods example
210+
#### Loan methods example
179211

180212
A simple test using loan methods can be as simple as this:
181213

@@ -201,26 +233,26 @@ A simple test using loan methods can be as simple as this:
201233
})
202234
```
203235

204-
## embedded-kafka-streams
236+
### embedded-kafka-streams
205237

206238
A library that builds on top of `embedded-kafka` to offer easy testing of [Kafka Streams](https://kafka.apache.org/documentation/streams).
207239

208240
It takes care of instantiating and starting your streams as well as closing them after running your test-case code.
209241

210-
### How to use
242+
#### How to use
211243

212244
* In your `build.sbt` file add the following dependency (replace `x.x.x` with the appropriate version): `"io.github.embeddedkafka" %% "embedded-kafka-streams" % "x.x.x" % Test`
213245
* Have a look at the [example test](kafka-streams/src/test/scala/io/github/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala)
214246
* For most of the cases have your class extend the `EmbeddedKafkaStreams` trait. This offers both streams management and easy loaning of producers and consumers for asserting resulting messages in output/sink topics.
215247
* Use `EmbeddedKafkaStreams.runStreams` and `EmbeddedKafka.withConsumer` and `EmbeddedKafka.withProducer`. This allows you to create your own consumers of custom types as seen in the [example test](kafka-streams/src/test/scala/io/github/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala).
216248

217-
## embedded-kafka-connect
249+
### embedded-kafka-connect
218250

219251
A library that builds on top of `embedded-kafka` to offer easy testing of [Kafka Connect](https://kafka.apache.org/documentation/#connect).
220252

221253
It takes care of instantiating and starting a Kafka Connect server as well as closing it after running your test-case code.
222254

223-
### How to use
255+
#### How to use
224256

225257
* In your `build.sbt` file add the following dependency (replace `x.x.x` with the appropriate version): `"io.github.embeddedkafka" %% "embedded-kafka-connect" % "x.x.x" % Test`
226258
* Have a look at the [example test](kafka-connect/src/test/scala/io/github/embeddedkafka/connect/ExampleKafkaConnectSpec.scala)

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ lazy val commonSettings = Seq(
7575
organization := "io.github.embeddedkafka",
7676
scalaVersion := Versions.Scala213,
7777
crossScalaVersions := Seq(
78-
Versions.Scala212,
7978
Versions.Scala213,
8079
Versions.Scala3
8180
)
@@ -92,6 +91,7 @@ lazy val embeddedKafka = (project in file("embedded-kafka"))
9291
.settings(name := "embedded-kafka")
9392
.settings(commonSettings: _*)
9493
.settings(libraryDependencies ++= EmbeddedKafka.prodDeps)
94+
.settings(libraryDependencies ++= EmbeddedKafka.testDeps)
9595

9696
lazy val kafkaStreams = (project in file("kafka-streams"))
9797
.settings(name := "embedded-kafka-streams")

embedded-kafka/src/main/scala/io/github/embeddedkafka/EmbeddedKafka.scala

Lines changed: 21 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,19 @@ trait EmbeddedKafka
2121

2222
override private[embeddedkafka] def withRunningServers[T](
2323
config: EmbeddedKafkaConfig,
24-
actualZkPort: Int,
2524
kafkaLogsDir: Path
2625
)(body: EmbeddedKafkaConfig => T): T = {
27-
val broker =
26+
val (broker, controller) =
2827
startKafka(
2928
config.kafkaPort,
30-
actualZkPort,
29+
config.controllerPort,
3130
config.customBrokerProperties,
3231
kafkaLogsDir
3332
)
3433

3534
val configWithUsedPorts = EmbeddedKafkaConfig(
3635
EmbeddedKafka.kafkaPort(broker),
37-
actualZkPort,
36+
EmbeddedKafka.controllerPort(controller),
3837
config.customBrokerProperties,
3938
config.customProducerProperties,
4039
config.customConsumerProperties
@@ -43,8 +42,13 @@ trait EmbeddedKafka
4342
try {
4443
body(configWithUsedPorts)
4544
} finally {
45+
// In combined mode, we want to shut down the broker first, since the controller may be
46+
// needed for controlled shutdown. Additionally, the controller shutdown process currently
47+
// stops the raft client early on, which would disrupt broker shutdown.
4648
broker.shutdown()
49+
controller.shutdown()
4750
broker.awaitShutdown()
51+
controller.awaitShutdown()
4852
}
4953
}
5054
}
@@ -53,21 +57,8 @@ object EmbeddedKafka
5357
extends EmbeddedKafka
5458
with RunningEmbeddedKafkaOps[EmbeddedKafkaConfig, EmbeddedK] {
5559
override def start()(implicit config: EmbeddedKafkaConfig): EmbeddedK = {
56-
val zkLogsDir = Files.createTempDirectory("zookeeper-logs")
5760
val kafkaLogsDir = Files.createTempDirectory("kafka-logs")
58-
59-
val factory =
60-
EmbeddedZ(startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir)
61-
62-
val configWithUsedPorts = EmbeddedKafkaConfig(
63-
config.kafkaPort,
64-
zookeeperPort(factory),
65-
config.customBrokerProperties,
66-
config.customProducerProperties,
67-
config.customConsumerProperties
68-
)
69-
70-
startKafka(kafkaLogsDir, Option(factory))(configWithUsedPorts)
61+
startKafka(kafkaLogsDir)(config)
7162
}
7263

7364
override def isRunning: Boolean =
@@ -77,50 +68,45 @@ object EmbeddedKafka
7768
}
7869

7970
private[embeddedkafka] trait EmbeddedKafkaSupport[C <: EmbeddedKafkaConfig] {
80-
this: ZooKeeperOps with KafkaOps =>
71+
this: KafkaOps =>
8172

8273
/**
8374
* Starts a Kafka broker (and performs additional logic, if any), then
8475
* executes the body passed as a parameter.
8576
*
8677
* @param config
8778
* the user-defined [[EmbeddedKafkaConfig]]
88-
* @param actualZkPort
89-
* the actual ZooKeeper port
9079
* @param kafkaLogsDir
9180
* the path for the Kafka logs
9281
* @param body
9382
* the function to execute
9483
*/
9584
private[embeddedkafka] def withRunningServers[T](
9685
config: C,
97-
actualZkPort: Int,
9886
kafkaLogsDir: Path
9987
)(body: C => T): T
10088

10189
/**
102-
* Starts a ZooKeeper instance and a Kafka broker (and performs additional
103-
* logic, if any), then executes the body passed as a parameter.
90+
* Starts a Kafka broker and controller (and performs additional logic, if
91+
* any), then executes the body passed as a parameter.
10492
*
10593
* @param body
10694
* the function to execute
10795
* @param config
10896
* an implicit [[EmbeddedKafkaConfig]]
10997
*/
11098
def withRunningKafka[T](body: => T)(implicit config: C): T = {
111-
withRunningZooKeeper(config.zooKeeperPort) { actualZkPort =>
112-
withTempDir("kafka") { kafkaLogsDir =>
113-
withRunningServers(config, actualZkPort, kafkaLogsDir)(_ => body)
114-
}
99+
withTempDir("kafka") { kafkaLogsDir =>
100+
withRunningServers(config, kafkaLogsDir)(_ => body)
115101
}
116102
}
117103

118104
/**
119-
* Starts a ZooKeeper instance and a Kafka broker (and performs additional
120-
* logic, if any), then executes the body passed as a parameter. The actual
121-
* ports of the servers will be detected and inserted into a copied version
122-
* of the [[EmbeddedKafkaConfig]] that gets passed to body. This is useful if
123-
* you set any port to `0`, which will listen on an arbitrary available port.
105+
* Starts a Kafka broker and controller (and performs additional logic, if
106+
* any), then executes the body passed as a parameter. The actual ports of
107+
* the servers will be detected and inserted into a copied version of the
108+
* [[EmbeddedKafkaConfig]] that gets passed to body. This is useful if you
109+
* set any port to `0`, which will listen on an arbitrary available port.
124110
*
125111
* @param config
126112
* the user-defined [[EmbeddedKafkaConfig]]
@@ -129,23 +115,8 @@ private[embeddedkafka] trait EmbeddedKafkaSupport[C <: EmbeddedKafkaConfig] {
129115
* actual ports the servers are running on
130116
*/
131117
def withRunningKafkaOnFoundPort[T](config: C)(body: C => T): T = {
132-
withRunningZooKeeper(config.zooKeeperPort) { actualZkPort =>
133-
withTempDir("kafka") { kafkaLogsDir =>
134-
withRunningServers(config, actualZkPort, kafkaLogsDir)(body)
135-
}
136-
}
137-
}
138-
139-
private[embeddedkafka] def withRunningZooKeeper[T](
140-
port: Int
141-
)(body: Int => T): T = {
142-
withTempDir("zookeeper-logs") { zkLogsDir =>
143-
val factory = startZooKeeper(port, zkLogsDir)
144-
try {
145-
body(factory.getLocalPort)
146-
} finally {
147-
factory.shutdown()
148-
}
118+
withTempDir("kafka") { kafkaLogsDir =>
119+
withRunningServers(config, kafkaLogsDir)(body)
149120
}
150121
}
151122

0 commit comments

Comments
 (0)