Skip to content

Commit

Permalink
KAFKA-12895: Drop support for Scala 2.12 in Kafka 4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
frankvicky committed Sep 29, 2024
1 parent 0569603 commit b17213c
Show file tree
Hide file tree
Showing 54 changed files with 156 additions and 356 deletions.
7 changes: 0 additions & 7 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ jackson-jaxrs-json-provider-2.16.2
jackson-module-afterburner-2.16.2
jackson-module-jaxb-annotations-2.16.2
jackson-module-scala_2.13-2.16.2
jackson-module-scala_2.12-2.16.2
jakarta.validation-api-2.0.2
javassist-3.29.2-GA
jetty-client-9.4.54.v20240208
Expand Down Expand Up @@ -257,15 +256,9 @@ opentelemetry-proto-1.0.0-alpha
plexus-utils-3.5.1
reload4j-1.2.25
rocksdbjni-7.9.2
scala-collection-compat_2.12-2.10.0
scala-collection-compat_2.13-2.10.0
scala-library-2.12.19
scala-library-2.13.14
scala-logging_2.12-3.9.5
scala-logging_2.13-3.9.5
scala-reflect-2.12.19
scala-reflect-2.13.14
scala-java8-compat_2.12-1.0.2
scala-java8-compat_2.13-1.0.2
snappy-java-1.1.10.5
swagger-annotations-2.2.8
Expand Down
15 changes: 7 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ the broker and tools has been deprecated since Apache Kafka 3.7 and removal of b
see [KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223) and
[KIP-1013](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510) for more details).

Scala 2.12 and 2.13 are supported and 2.13 is used by default. Scala 2.12 support has been deprecated since
Apache Kafka 3.0 and will be removed in Apache Kafka 4.0 (see [KIP-751](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218)
Scala 2.13 is used by default. Scala 2.12 support has been removed in Apache Kafka 4.0 (see [KIP-751](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218)
for more details). See below for how to use a specific Scala version or all of the supported Scala versions.

### Build a jar and run it ###
Expand Down Expand Up @@ -114,14 +113,14 @@ Using compiled files:
### Cleaning the build ###
./gradlew clean

### Running a task with one of the Scala versions available (2.12.x or 2.13.x) ###
### Running a task with Scala 2.13.x ###
*Note that if building the jars with a version other than 2.13.x, you need to set the `SCALA_VERSION` variable or change it in `bin/kafka-run-class.sh` to run the quick start.*

You can pass either the major version (eg 2.12) or the full version (eg 2.12.7):
You can pass either the major version (eg 2.13) or the full version (eg 2.13.14):

./gradlew -PscalaVersion=2.12 jar
./gradlew -PscalaVersion=2.12 test
./gradlew -PscalaVersion=2.12 releaseTarGz
./gradlew -PscalaVersion=2.13 jar
./gradlew -PscalaVersion=2.13 test
./gradlew -PscalaVersion=2.13 releaseTarGz

### Running a task with all the scala versions enabled by default ###

Expand Down Expand Up @@ -203,7 +202,7 @@ Please note for this to work you should create/update user maven settings (typic


### Installing ALL the jars to the local Maven repository ###
The recommended command to build for both Scala 2.12 and 2.13 is:
The recommended command to build for Scala 2.13 is:

./gradlewAll publishToMavenLocal

Expand Down
31 changes: 4 additions & 27 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -708,25 +708,11 @@ subprojects {
scalaCompileOptions.additionalParameters += inlineFrom
}

if (versions.baseScala != '2.12') {
scalaCompileOptions.additionalParameters += ["-opt-warnings", "-Xlint:strict-unsealed-patmat"]
// Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings
scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]
}

// these options are valid for Scala versions < 2.13 only
// Scala 2.13 removes them, see https://github.com/scala/scala/pull/6502 and https://github.com/scala/scala/pull/5969
if (versions.baseScala == '2.12') {
scalaCompileOptions.additionalParameters += [
"-Xlint:by-name-right-associative",
"-Xlint:nullary-override",
"-Xlint:unsound-match"
]
}
scalaCompileOptions.additionalParameters += ["-opt-warnings", "-Xlint:strict-unsealed-patmat"]
// Scala 2.13.2 introduces compiler warnings suppression, which is a pre-requisite for -Xfatal-warnings
scalaCompileOptions.additionalParameters += ["-Xfatal-warnings"]

// Scalac 2.12 `-release` requires Java 9 or higher, but Scala 2.13 doesn't have that restriction
if (versions.baseScala == "2.13" || JavaVersion.current().isJava9Compatible())
scalaCompileOptions.additionalParameters += ["-release", String.valueOf(minJavaVersion)]
scalaCompileOptions.additionalParameters += ["-release", String.valueOf(minJavaVersion)]

addParametersForTests(name, options)

Expand Down Expand Up @@ -1031,7 +1017,6 @@ project(':core') {
implementation libs.joptSimple
implementation libs.jose4j
implementation libs.metrics
implementation libs.scalaCollectionCompat
implementation libs.scalaJava8Compat
// only needed transitively, but set it explicitly to ensure it has the same version as scala-library
implementation libs.scalaReflect
Expand Down Expand Up @@ -2682,14 +2667,6 @@ project(':streams:streams-scala') {
api project(':streams')

api libs.scalaLibrary
if ( versions.baseScala == '2.12' ) {
// Scala-Collection-Compat isn't required when compiling with Scala 2.13 or later,
// and having it in the dependencies could lead to classpath conflicts in Scala 3
// projects that use kafka-streams-kafka_2.13 (because we don't have a Scala 3 version yet)
// but also pull in scala-collection-compat_3 via another dependency.
// So we make sure to not include it in the dependencies.
api libs.scalaCollectionCompat
}
testImplementation project(':group-coordinator')
testImplementation project(':core')
testImplementation project(':core').sourceSets.test.output
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ object ConfigCommand extends Logging {

private def describeQuotaConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]): Unit = {
val quotaConfigs = getAllClientQuotasConfigs(adminClient, entityTypes, entityNames)
quotaConfigs.forKeyValue { (entity, entries) =>
quotaConfigs.foreachEntry { (entity, entries) =>
val entityEntries = entity.entries.asScala

def entitySubstr(entityType: String): Option[String] =
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.admin

import joptsimple.{OptionSet, OptionSpec, OptionSpecBuilder}
import kafka.server.KafkaConfig
import kafka.utils.Implicits._
import kafka.utils.{Logging, ToolsUtils}
import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
import org.apache.kafka.common.security.JaasUtils
Expand Down Expand Up @@ -130,7 +129,7 @@ object ZkSecurityMigrator extends Logging {
// Now override any set system properties with explicitly-provided values from the config file
// Emit INFO logs due to camel-case property names encouraging mistakes -- help people see mistakes they make
info(s"Found ${zkTlsConfigFileProps.size()} ZooKeeper client configuration properties in file $filename")
zkTlsConfigFileProps.asScala.forKeyValue { (key, value) =>
zkTlsConfigFileProps.asScala.foreachEntry { (key, value) =>
info(s"Setting $key")
KafkaConfig.setZooKeeperClientProperty(zkClientConfig, key, value)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.controller
import com.yammer.metrics.core.{Gauge, Timer}
import kafka.cluster.Broker
import kafka.server.KafkaConfig
import kafka.utils.Implicits._
import kafka.utils._
import org.apache.kafka.clients._
import org.apache.kafka.common._
Expand Down Expand Up @@ -524,11 +523,11 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
else if (metadataVersion.isAtLeast(IBP_1_0_IV0)) 1
else 0

leaderAndIsrRequestMap.forKeyValue { (broker, leaderAndIsrPartitionStates) =>
leaderAndIsrRequestMap.foreachEntry { (broker, leaderAndIsrPartitionStates) =>
if (metadataInstance.liveOrShuttingDownBrokerIds.contains(broker)) {
val leaderIds = mutable.Set.empty[Int]
var numBecomeLeaders = 0
leaderAndIsrPartitionStates.forKeyValue { (topicPartition, state) =>
leaderAndIsrPartitionStates.foreachEntry { (topicPartition, state) =>
leaderIds += state.leader
val typeOfRequest = if (broker == state.leader) {
numBecomeLeaders += 1
Expand Down Expand Up @@ -669,18 +668,18 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
handleStopReplicaResponse(stopReplicaResponse, brokerId, partitionErrorsForDeletingTopics.toMap)
}

stopReplicaRequestMap.forKeyValue { (brokerId, partitionStates) =>
stopReplicaRequestMap.foreachEntry { (brokerId, partitionStates) =>
if (metadataInstance.liveOrShuttingDownBrokerIds.contains(brokerId)) {
if (traceEnabled)
partitionStates.forKeyValue { (topicPartition, partitionState) =>
partitionStates.foreachEntry { (topicPartition, partitionState) =>
stateChangeLog.trace(s"Sending StopReplica request $partitionState to " +
s"broker $brokerId for partition $topicPartition")
}

val brokerEpoch = metadataInstance.liveBrokerIdAndEpochs(brokerId)
if (stopReplicaRequestVersion >= 3) {
val stopReplicaTopicState = mutable.Map.empty[String, StopReplicaTopicState]
partitionStates.forKeyValue { (topicPartition, partitionState) =>
partitionStates.foreachEntry { (topicPartition, partitionState) =>
val topicState = stopReplicaTopicState.getOrElseUpdate(topicPartition.topic,
new StopReplicaTopicState().setTopicName(topicPartition.topic))
topicState.partitionStates().add(partitionState)
Expand All @@ -699,7 +698,7 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
val topicStatesWithDelete = mutable.Map.empty[String, StopReplicaTopicState]
val topicStatesWithoutDelete = mutable.Map.empty[String, StopReplicaTopicState]

partitionStates.forKeyValue { (topicPartition, partitionState) =>
partitionStates.foreachEntry { (topicPartition, partitionState) =>
val topicStates = if (partitionState.deletePartition()) {
numPartitionStateWithDelete += 1
topicStatesWithDelete
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/controller/ControllerContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package kafka.controller

import kafka.cluster.Broker
import kafka.utils.Implicits._
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderAndIsr

Expand Down Expand Up @@ -522,7 +521,7 @@ class ControllerContext extends ControllerChannelContext {
}

private def cleanPreferredReplicaImbalanceMetric(topic: String): Unit = {
partitionAssignments.getOrElse(topic, mutable.Map.empty).forKeyValue { (partition, replicaAssignment) =>
partitionAssignments.getOrElse(topic, mutable.Map.empty).foreachEntry { (partition, replicaAssignment) =>
partitionLeadershipInfo.get(new TopicPartition(topic, partition)).foreach { leadershipInfo =>
if (!hasPreferredLeader(replicaAssignment, leadershipInfo))
preferredReplicaImbalanceCount -= 1
Expand Down
17 changes: 8 additions & 9 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import kafka.coordinator.transaction.ZkProducerIdManager
import kafka.server._
import kafka.server.metadata.ZkFinalizedFeatureCache
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zk.TopicZNode.TopicIdReplicaAssignment
import kafka.zk.{FeatureZNodeStatus, _}
Expand Down Expand Up @@ -1030,7 +1029,7 @@ class KafkaController(val config: KafkaConfig,

private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq): Unit = {
val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
leaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>
leaderIsrAndControllerEpochs.foreachEntry { (partition, leaderIsrAndControllerEpoch) =>
controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
}
}
Expand Down Expand Up @@ -1297,7 +1296,7 @@ class KafkaController(val config: KafkaConfig,
}.toMap.groupBy { case (_, assignedReplicas) => assignedReplicas.head }

// for each broker, check if a preferred replica election needs to be triggered
preferredReplicasForTopicsByBrokers.forKeyValue { (leaderBroker, topicPartitionsForBroker) =>
preferredReplicasForTopicsByBrokers.foreachEntry { (leaderBroker, topicPartitionsForBroker) =>
val topicsNotInPreferredReplica = topicPartitionsForBroker.filter { case (topicPartition, _) =>
val leadershipInfo = controllerContext.partitionLeadershipInfo(topicPartition)
leadershipInfo.exists(_.leaderAndIsr.leader != leaderBroker)
Expand Down Expand Up @@ -1776,7 +1775,7 @@ class KafkaController(val config: KafkaConfig,
}
} else if (partitionsToBeAdded.nonEmpty) {
info(s"New partitions to be added $partitionsToBeAdded")
partitionsToBeAdded.forKeyValue { (topicPartition, assignedReplicas) =>
partitionsToBeAdded.foreachEntry { (topicPartition, assignedReplicas) =>
controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)
}
onNewPartitionCreation(partitionsToBeAdded.keySet)
Expand Down Expand Up @@ -1821,7 +1820,7 @@ class KafkaController(val config: KafkaConfig,
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]

zkClient.getPartitionReassignment.forKeyValue { (tp, targetReplicas) =>
zkClient.getPartitionReassignment.foreachEntry { (tp, targetReplicas) =>
maybeBuildReassignment(tp, Some(targetReplicas)) match {
case Some(context) => partitionsToReassign.put(tp, context)
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
Expand Down Expand Up @@ -1858,7 +1857,7 @@ class KafkaController(val config: KafkaConfig,
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]

reassignments.forKeyValue { (tp, targetReplicas) =>
reassignments.foreachEntry { (tp, targetReplicas) =>
val maybeApiError = targetReplicas.flatMap(validateReplicas(tp, _))
maybeApiError match {
case None =>
Expand Down Expand Up @@ -2304,7 +2303,7 @@ class KafkaController(val config: KafkaConfig,

// After we have returned the result of the `AlterPartition` request, we should check whether
// there are any reassignments which can be completed by a successful ISR expansion.
partitionResponses.forKeyValue { (topicPartition, partitionResponse) =>
partitionResponses.foreachEntry { (topicPartition, partitionResponse) =>
if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
val isSuccessfulUpdate = partitionResponse.isRight
if (isSuccessfulUpdate) {
Expand Down Expand Up @@ -2480,7 +2479,7 @@ class KafkaController(val config: KafkaConfig,
partitionsToAlter.keySet
)

partitionResponses.groupBy(_._1.topic).forKeyValue { (topicName, partitionResponses) =>
partitionResponses.groupBy(_._1.topic).foreachEntry { (topicName, partitionResponses) =>
// Add each topic part to the response
val topicResponse = if (useTopicsIds) {
new AlterPartitionResponseData.TopicData()
Expand All @@ -2491,7 +2490,7 @@ class KafkaController(val config: KafkaConfig,
}
alterPartitionResponse.topics.add(topicResponse)

partitionResponses.forKeyValue { (tp, errorOrIsr) =>
partitionResponses.foreachEntry { (tp, errorOrIsr) =>
// Add each partition part to the response (new ISR or error)
errorOrIsr match {
case Left(error) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.controller
import kafka.common.StateChangeFailedException
import kafka.controller.Election._
import kafka.server.KafkaConfig
import kafka.utils.Implicits._
import kafka.utils.Logging
import kafka.zk.KafkaZkClient
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
Expand Down Expand Up @@ -437,7 +436,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
finishedUpdates.forKeyValue { (partition, result) =>
finishedUpdates.foreachEntry { (partition, result) =>
result.foreach { leaderAndIsr =>
val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package kafka.controller

import kafka.common.StateChangeFailedException
import kafka.server.KafkaConfig
import kafka.utils.Implicits._
import kafka.utils.Logging
import kafka.zk.KafkaZkClient
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
Expand Down Expand Up @@ -110,7 +109,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
if (replicas.nonEmpty) {
try {
controllerBrokerRequestBatch.newBatch()
replicas.groupBy(_.replica).forKeyValue { (replicaId, replicas) =>
replicas.groupBy(_.replica).foreachEntry { (replicaId, replicas) =>
doHandleStateChanges(replicaId, replicas, targetState)
}
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
Expand Down Expand Up @@ -227,7 +226,7 @@ class ZkReplicaStateMachine(config: KafkaConfig,
controllerContext.partitionLeadershipInfo(replica.topicPartition).isDefined
}
val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))
updatedLeaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>
updatedLeaderIsrAndControllerEpochs.foreachEntry { (partition, leaderIsrAndControllerEpoch) =>
stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)
Expand Down
Loading

0 comments on commit b17213c

Please sign in to comment.