Skip to content

Commit b95f1df

Browse files
Support transactions, better listener names in config (#564)
Transactions are now possible; by changing the config a single broker is now enough. The listener names in the broker config can be anything. What most people do (use the protocol name) is very confusing. Instead, we use the role of the listener.
1 parent c8bca4c commit b95f1df

File tree

1 file changed

+10
-6
lines changed
  • embedded-kafka/src/main/scala/io/github/embeddedkafka/ops

1 file changed

+10
-6
lines changed

embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/kafkaOps.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package io.github.embeddedkafka.ops
33
import io.github.embeddedkafka.{EmbeddedK, EmbeddedKafkaConfig, EmbeddedServer}
44
import kafka.server._
55
import org.apache.kafka.common.Uuid
6-
import org.apache.kafka.common.security.auth.SecurityProtocol
76
import org.apache.kafka.common.utils.Time
87
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
8+
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
99
import org.apache.kafka.metadata.properties.{
1010
MetaProperties,
1111
MetaPropertiesEnsemble,
@@ -17,6 +17,7 @@ import org.apache.kafka.raft.QuorumConfig
1717
import org.apache.kafka.server.ServerSocketFactory
1818
import org.apache.kafka.server.config.{
1919
KRaftConfigs,
20+
ReplicationConfigs,
2021
ServerConfigs,
2122
ServerLogConfigs
2223
}
@@ -52,23 +53,26 @@ trait KafkaOps {
5253
// Without this the controller starts correctly on a random port but it's too late to use this port in the configs for the broker
5354
val actualControllerPort = findPortForControllerOrFail(controllerPort)
5455

55-
val brokerListener = s"${SecurityProtocol.PLAINTEXT}://localhost:$kafkaPort"
56+
val brokerListener = s"BROKER://localhost:$kafkaPort"
5657
val controllerListener = s"CONTROLLER://localhost:$actualControllerPort"
5758

5859
val configProperties = Map[String, Object](
59-
KRaftConfigs.PROCESS_ROLES_CONFIG -> "broker,controller",
60-
KRaftConfigs.NODE_ID_CONFIG -> nodeId.toString,
61-
KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG -> "CONTROLLER",
60+
KRaftConfigs.PROCESS_ROLES_CONFIG -> "broker,controller",
61+
KRaftConfigs.NODE_ID_CONFIG -> nodeId.toString,
62+
ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG -> "BROKER",
63+
KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG -> "CONTROLLER",
6264
QuorumConfig.QUORUM_VOTERS_CONFIG -> s"$nodeId@localhost:$actualControllerPort",
6365
ServerConfigs.BROKER_ID_CONFIG -> nodeId.toString,
6466
SocketServerConfigs.LISTENERS_CONFIG -> s"$brokerListener,$controllerListener",
6567
SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG -> brokerListener,
66-
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG -> "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL",
68+
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG -> "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT",
6769
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> autoCreateTopics.toString,
6870
ServerLogConfigs.LOG_DIRS_CONFIG -> kafkaLogDir.toAbsolutePath.toString,
6971
ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG -> 1.toString,
7072
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1.toString,
7173
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> 1.toString,
74+
TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1.toString,
75+
TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG -> 1.toString,
7276
// The total memory used for log deduplication across all cleaner threads, keep it small to not exhaust suite memory
7377
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP -> logCleanerDedupeBufferSize.toString
7478
) ++ customBrokerProperties

0 commit comments

Comments
 (0)