Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12895: Drop support for Scala 2.12 in Kafka 4.0 #17313

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from

Conversation

frankvicky
Copy link
Collaborator

@frankvicky frankvicky commented Sep 29, 2024

JIRA: KAFKA-12895

We propose to deprecate Scala 2.12 support n Apache Kafka 3.0 and to drop it in Apache Kafka 4.0.

Implement KIP-751

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@github-actions github-actions bot added core Kafka Broker build Related to the Github or Jenkins builds labels Sep 29, 2024
@frankvicky frankvicky marked this pull request as draft September 29, 2024 09:09
@frankvicky frankvicky marked this pull request as ready for review September 29, 2024 11:33
@frankvicky
Copy link
Collaborator Author

Hi @jolshan,
You mentioned earlier that you were interested in this ticket. Could you please take a look?
Thank you ! 😺

@m1a2st
Copy link
Contributor

m1a2st commented Sep 29, 2024

Thanks for your PR, left some comments, PTAL

@m1a2st
Copy link
Contributor

m1a2st commented Sep 29, 2024

Please also update the LICENSE-binary, there are some scala 2.12.19 version, and should also update the gradlewAll file.
Some html document also need to update.

@frankvicky
Copy link
Collaborator Author

Currently there are some html metiond about Scala 2.12 are located in Streams module. I'm not sure they should be change in this PR or not.

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky Could you please add this change to upgrade.html?

// a higher minimum Java requirement than Kafka. This was previously the case for Scala 2.12 and Java 7.
availableScalaVersions = [ '2.12', '2.13' ]
// a higher minimum Java requirement than Kafka.
availableScalaVersions = [ '2.13' ]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it still used?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I will remove it

Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the classes where we're actually change code (LogCleanerManager and LogCleaner), can you double check that we have unit test code coverage for these methods? I just want to be sure we're not changing the logic.

@@ -121,7 +120,7 @@ class TxnMarkerQueue(@volatile var destination: Node) extends Logging {
}

def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: PendingCompleteTxnAndMarkerEntry): Unit = {
val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, {
val queue =markersPerTxnTopicPartition.getOrElseUpdate(txnTopicPartition, {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: whitespace

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, fixed it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the classes where we're actually change code (LogCleanerManager and LogCleaner), can you double check that we have unit test code coverage for these methods? I just want to be sure we're not changing the logic.

Regarding to comment from @chia7712 , I have reverted the changes in LogCleanerManager and LogCleaner

@mumrah mumrah requested a review from ijuma September 29, 2024 22:54
@mumrah mumrah added the dependencies Pull requests that update a dependency file label Sep 29, 2024
@jolshan
Copy link
Contributor

jolshan commented Sep 30, 2024

Scanned through some of the files here: https://github.com/search?q=repo%3Aapache%2Fkafka%202.12&type=code

I noticed the vagrant file also has 2.12 references. Is this just for building old versions?

# tests with Java 7. We have since switched to Java 8, so 2.0.0 and later use Scala 2.12.

What is the concern about the streams changes?

@chia7712
Copy link
Contributor

I noticed the vagrant file also has 2.12 references. Is this just for building old versions?

yes, the dockerfile (e2e) also has 2.12 references.

RUN mkdir -p "/opt/kafka-3.3.2" && chmod a+rw /opt/kafka-3.3.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.3.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.3.2"

// in the case where there's only a single Arguments in the list. The following commented-out
// method works in Scala 2.13, but not 2.12. For this reason, tests which run against just a
// single combination are written using @CsvSource rather than the more elegant @MethodSource.
// def getTestQuorumAndGroupProtocolParametersZkOnly() : java.util.stream.Stream[Arguments] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be used by testAssignAndConsumeWithLeaderChangeValidatingPositions. Since we are phasing out ZooKeeper, it's fine to remove it for now.

def testAssignAndConsumeWithLeaderChangeValidatingPositions(quorum: String, groupProtocol: String): Unit = {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have uncommented the getTestQuorumAndGroupProtocolParametersZkOnly method and replaced @CsvSource with @MethodSource.

@chia7712
Copy link
Contributor

For the classes where we're actually change code (LogCleanerManager and LogCleaner), can you double check that we have unit test code coverage for these methods? I just want to be sure we're not changing the logic.

This PR should primarily focus on removing Scala 2.12. The changes made to LogCleanerManager and LogCleaner seem more like cosmetic modifications. It might be better to revert them to their original state.

@@ -17,5 +17,5 @@

# Convenient way to invoke a gradle command with all Scala versions supported
# by default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add a comment here that this is no longer needed, but we are keeping it for backwards compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should print deprecated information to users to remind that this gradlewAll is unnecessary and will be removed later.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, I left a few comments.

README.md Outdated
@@ -11,8 +11,7 @@ the broker and tools has been deprecated since Apache Kafka 3.7 and removal of b
see [KIP-750](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223) and
[KIP-1013](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=284789510) for more details).

Scala 2.12 and 2.13 are supported and 2.13 is used by default. Scala 2.12 support has been deprecated since
Apache Kafka 3.0 and will be removed in Apache Kafka 4.0 (see [KIP-751](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218)
Scala 2.13 is used by default. Scala 2.12 support has been removed in Apache Kafka 4.0 (see [KIP-751](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should simply say that Scala 2.13 is the only version we support. The readme is not a changelog. We have a separate file for the changelog.

README.md Outdated
@@ -114,14 +113,14 @@ Using compiled files:
### Cleaning the build ###
./gradlew clean

### Running a task with one of the Scala versions available (2.12.x or 2.13.x) ###
### Running a task with Scala 2.13.x ###
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just delete this section and all the other sections related to multiple Scala versions, etc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve kept the section related to ./gradlewAll since we haven’t removed it yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove that too. We can simply add a comment to gradlewAll explaining that it has been kept for compatibility reasons, but there is no reason to use it anymore.

* In Scala 2.12, `ConcurrentMap.getOrElse` has the same behaviour as this method, but JConcurrentMapWrapper that
* wraps Java maps does not.
*/
def atomicGetOrUpdate[K, V](map: concurrent.Map[K, V], key: K, createValue: => V): V = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we double check that this comment is accurate for the various concurrent.Map implementations?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed the implementation in the Scala source code.

We could remove atomicGetOrUpdate() in Scala 2.13, as Scala now provides a very similar implementation for getOrElseUpdate().

In Scala 2.12, if you convert a ConcurrentHashMap using scala.collection.JavaConverters.asScala, you will indeed get a less reliable version of getOrElseUpdate that does not handle race conditions or atomic operations properly. This is the less robust version, as it doesn't ensure atomic updates.

2.13

  /**
    * Replaces the entry for the given key only if it was previously mapped
    * to some value.
    *
    * $atomicop
    *
    * @param k   key for which the entry should be replaced
    * @param v   value to be associated with the specified key
    * @return    `Some(v)` if the given key was previously mapped to some value `v`, or `None` otherwise
    */
  def replace(k: K, v: V): Option[V]

  override def getOrElseUpdate(key: K, @deprecatedName("op", since="2.13.13") defaultValue: => V): V = get(key) match {
    case Some(v) => v
    case None =>
      val v = defaultValue
      putIfAbsent(key, v) match {
        case Some(ov) => ov
        case None => v
      }
  }

2.12

  /** If given key is already in this map, returns associated value.
   *
   *  Otherwise, computes value from given expression `op`, stores with key
   *  in map and returns that value.
   *
   *  Concurrent map implementations may evaluate the expression `op`
   *  multiple times, or may evaluate `op` without inserting the result.
   *  
   *  @param  key the key to test
   *  @param  op  the computation yielding the value to associate with `key`, if
   *              `key` is previously unbound.
   *  @return     the value associated with key (either previously or as a result
   *              of executing the method).
   */
  def getOrElseUpdate(key: K, op: => V): V =
    get(key) match {
      case Some(v) => v
      case None => val d = op; this(key) = d; d
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankvicky we should check the JavaCollectionWrappers for 2.13

https://github.com/scala/scala/blob/2.13.x/src/library/scala/collection/convert/JavaCollectionWrappers.scala#L482

    override def getOrElseUpdate(key: K, op: => V): V =
      underlying.computeIfAbsent(key, _ => op) match {
        case null => super/*[concurrent.Map]*/.getOrElseUpdate(key, op)
        case v    => v
      }

Hence, the wrapper actually leverage computeIfAbsent for normal case. The getOrElseUpdate is used to put the null to map since computeIfAbsent does not. That is for following getOrElseUpdate semantics

The getOrElseUpdate of current scala 2.13 is good, and we can do a bit cleanup (in the follow-up) as current usages (markersQueuePerBroker and markersPerTxnTopicPartition) are not necessary to use scala wrapper ..

Copy link
Contributor

@ijuma ijuma Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Java implementation of computeIfAbsent acquires locks, so the behavior is different.

@ijuma
Copy link
Contributor

ijuma commented Sep 30, 2024

This PR should primarily focus on removing Scala 2.12. The changes made to LogCleanerManager and LogCleaner seem more like cosmetic modifications. It might be better to revert them to their original state.

These are not just cosmetic, they're fixing unnecessary perf overhead as well. I don't think we should have any code remaining that has a "Scala 2.12" comment. So, the change was fine, in my opinion. If we want to do it via a separate PR, let's make sure we have a JIRA and we don't consider the work done until this is also completed.

@chia7712
Copy link
Contributor

These are not just cosmetic, they're fixing unnecessary perf overhead as well. I don't think we should have any code remaining that has a "Scala 2.12" comment. So, the change was fine, in my opinion. If we want to do it via a separate PR, let's make sure we have a JIRA and we don't consider the work done until this is also completed.

It seems to me that both "cosmetic" changes and "unnecessary performance overhead" should be handled in a follow-up to avoid prolonged discussions. I believe there are several improvements, such as KAFKA-14615, that can be addressed after dropping Scala 2.12.

@chia7712
Copy link
Contributor

If we want to do it via a separate PR, let's make sure we have a JIRA and we don't consider the work done until this is also completed.

https://issues.apache.org/jira/browse/KAFKA-17667

@ijuma
Copy link
Contributor

ijuma commented Sep 30, 2024

My point is a bit different. It's one thing to have optimizations, but it's another to have leftover code like:

// Note: we don't use retain or filterInPlace method in this function because retain is deprecated in
// scala 2.13 while filterInPlace is not available in scala 2.12.

Doing what it takes to clean up such comments should be part of removing 2.12 support.

@chia7712
Copy link
Contributor

Doing what it takes to clean up such comments should be part of removing 2.12 support.

I agree with cleaning up the comments and straightforwardly rewriting the code using Scala 2.13 APIs. However, any tasks requiring further discussion or additional testing can be deferred to a follow-up. This approach helps prevent infinite conflicts in this large patch (with 54 files changed)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Related to the Github or Jenkins builds core Kafka Broker dependencies Pull requests that update a dependency file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants