Skip to content

MINOR: Cleanup Core Module- Scala Modules (4/n) #19805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ class KafkaConfigTest {
// We should be also able to set completely new property
val config3 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact")))
assertEquals(1, config3.nodeId)
assertEquals(util.Arrays.asList("compact"), config3.logCleanupPolicy)
assertEquals(util.List.of("compact"), config3.logCleanupPolicy)

// We should be also able to set several properties
val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact,delete", "--override", "node.id=2")))
assertEquals(2, config4.nodeId)
assertEquals(util.Arrays.asList("compact","delete"), config4.logCleanupPolicy)
assertEquals(util.List.of("compact","delete"), config4.logCleanupPolicy)
}

@Test
Expand Down
45 changes: 28 additions & 17 deletions core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ import org.junit.jupiter.api.{AfterEach, Test}
import java.lang.{Long => JLong}
import java.util
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import java.util.stream.Collectors
import scala.jdk.OptionConverters.RichOptional

/**
Expand All @@ -61,13 +60,13 @@ class LogCleanerManagerTest extends Logging {
val offset = 999
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)

val cleanerCheckpoints: mutable.Map[TopicPartition, JLong] = mutable.Map[TopicPartition, JLong]()
val cleanerCheckpoints: util.HashMap[TopicPartition, JLong] = new util.HashMap[TopicPartition, JLong]()

class LogCleanerManagerMock(logDirs: util.List[File],
logs: util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog],
logDirFailureChannel: LogDirFailureChannel) extends LogCleanerManager(logDirs, logs, logDirFailureChannel) {
override def allCleanerCheckpoints: util.Map[TopicPartition, JLong] = {
cleanerCheckpoints.toMap.asJava
cleanerCheckpoints
}

override def updateCheckpoints(dataDir: File, partitionToUpdateOrAdd: Optional[util.Map.Entry[TopicPartition, JLong]],
Expand Down Expand Up @@ -382,7 +381,11 @@ class LogCleanerManagerTest extends Logging {
assertEquals(0, cleanable.size, "should have 0 logs ready to be compacted")

// log cleanup finished, and log can be picked up for compaction
cleanerManager.resumeCleaning(deletableLog.asScala.map(_.getKey).toSet.asJava)
cleanerManager.resumeCleaning(
deletableLog.stream()
.map[TopicPartition](entry => entry.getKey)
.collect(Collectors.toSet[TopicPartition]())
)
val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).toScala
assertEquals(1, cleanable2.size, "should have 1 logs ready to be compacted")

Expand All @@ -396,7 +399,7 @@ class LogCleanerManagerTest extends Logging {
assertEquals(0, deletableLog2.size, "should have 0 logs ready to be deleted")

// compaction done, should have 1 log eligible for log cleanup
cleanerManager.doneDeleting(Seq(cleanable2.get.topicPartition).asJava)
cleanerManager.doneDeleting(util.List.of(cleanable2.get.topicPartition))
val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions()
assertEquals(1, deletableLog3.size, "should have 1 logs ready to be deleted")
}
Expand Down Expand Up @@ -501,9 +504,13 @@ class LogCleanerManagerTest extends Logging {
val pausedPartitions = cleanerManager.pauseCleaningForNonCompactedPartitions()
// Log truncation happens due to unclean leader election
cleanerManager.abortAndPauseCleaning(log.topicPartition)
cleanerManager.resumeCleaning(Set(log.topicPartition).asJava)
cleanerManager.resumeCleaning(util.Set.of(log.topicPartition))
// log cleanup finishes and pausedPartitions are resumed
cleanerManager.resumeCleaning(pausedPartitions.asScala.map(_.getKey).toSet.asJava)
cleanerManager.resumeCleaning(
pausedPartitions.stream()
.map[TopicPartition](entry => entry.getKey)
.collect(Collectors.toSet[TopicPartition]())
)

assertEquals(Optional.empty(), cleanerManager.cleaningState(log.topicPartition))
}
Expand All @@ -522,7 +529,11 @@ class LogCleanerManagerTest extends Logging {
// Broker processes StopReplicaRequest with delete=true
cleanerManager.abortCleaning(log.topicPartition)
// log cleanup finishes and pausedPartitions are resumed
cleanerManager.resumeCleaning(pausedPartitions.asScala.map(_.getKey).toSet.asJava)
cleanerManager.resumeCleaning(
pausedPartitions.stream()
.map[TopicPartition](entry => entry.getKey)
.collect(Collectors.toSet[TopicPartition]())
)

assertEquals(Optional.empty(), cleanerManager.cleaningState(log.topicPartition))
}
Expand Down Expand Up @@ -743,17 +754,17 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager: LogCleanerManager = createCleanerManager(log)
val tp = new TopicPartition("log", 0)

assertThrows(classOf[IllegalStateException], () => cleanerManager.doneDeleting(Seq(tp).asJava))
assertThrows(classOf[IllegalStateException], () => cleanerManager.doneDeleting(util.List.of(tp)))

cleanerManager.setCleaningState(tp, LogCleaningState.logCleaningPaused(1))
assertThrows(classOf[IllegalStateException], () => cleanerManager.doneDeleting(Seq(tp).asJava))
assertThrows(classOf[IllegalStateException], () => cleanerManager.doneDeleting(util.List.of(tp)))

cleanerManager.setCleaningState(tp, LOG_CLEANING_IN_PROGRESS)
cleanerManager.doneDeleting(Seq(tp).asJava)
cleanerManager.doneDeleting(util.List.of(tp))
assertTrue(cleanerManager.cleaningState(tp).isEmpty)

cleanerManager.setCleaningState(tp, LOG_CLEANING_ABORTED)
cleanerManager.doneDeleting(Seq(tp).asJava)
cleanerManager.doneDeleting(util.List.of(tp))
assertEquals(LogCleaningState.logCleaningPaused(1), cleanerManager.cleaningState(tp).get)
}

Expand All @@ -771,7 +782,7 @@ class LogCleanerManagerTest extends Logging {

val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats())
assertEquals(Optional.empty(), filthiestLog, "Log should not be selected for cleaning")
assertEquals(20L, cleanerCheckpoints(tp), "Unselected log should have checkpoint offset updated")
assertEquals(20L, cleanerCheckpoints.get(tp), "Unselected log should have checkpoint offset updated")
}

/**
Expand All @@ -793,17 +804,17 @@ class LogCleanerManagerTest extends Logging {

val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get
assertEquals(tp1, filthiestLog.topicPartition, "Dirtier log should be selected")
assertEquals(15L, cleanerCheckpoints(tp0), "Unselected log should have checkpoint offset updated")
assertEquals(15L, cleanerCheckpoints.get(tp0), "Unselected log should have checkpoint offset updated")
}

private def createCleanerManager(log: UnifiedLog): LogCleanerManager = {
val logs = new util.concurrent.ConcurrentHashMap[TopicPartition, UnifiedLog]()
logs.put(topicPartition, log)
new LogCleanerManager(Seq(logDir, logDir2).asJava, logs, null)
new LogCleanerManager(util.List.of(logDir, logDir2), logs, null)
}

private def createCleanerManagerMock(pool: util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog]): LogCleanerManagerMock = {
new LogCleanerManagerMock(Seq(logDir).asJava, pool, null)
new LogCleanerManagerMock(util.List.of(logDir), pool, null)
}

private def createLog(segmentSize: Int,
Expand Down
12 changes: 6 additions & 6 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ class LogCleanerTest extends Logging {

// clean the log
val stats = new CleanerStats(Time.SYSTEM)
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava, map, 0L, stats, new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), map, 0L, stats, new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
val shouldRemain = LogTestUtils.keysInLog(log).filterNot(keys.contains)
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
Expand All @@ -926,7 +926,7 @@ class LogCleanerTest extends Logging {
val (log, offsetMap) = createLogWithMessagesLargerThanMaxSize(largeMessageSize = 1024 * 1024)

val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava, offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
val shouldRemain = LogTestUtils.keysInLog(log).filter(k => !offsetMap.map.containsKey(k.toString))
assertEquals(shouldRemain, LogTestUtils.keysInLog(log))
}
Expand All @@ -945,7 +945,7 @@ class LogCleanerTest extends Logging {

val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava, offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
)
}

Expand All @@ -962,7 +962,7 @@ class LogCleanerTest extends Logging {

val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
assertThrows(classOf[CorruptRecordException], () =>
cleaner.cleanSegments(log, Seq(log.logSegments.asScala.head).asJava, offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
cleaner.cleanSegments(log, util.List.of(log.logSegments.asScala.head), offsetMap, 0L, new CleanerStats(Time.SYSTEM), new CleanedTransactionMetadata, -1, log.logSegments.asScala.head.readNextOffset)
)
}

Expand Down Expand Up @@ -1636,7 +1636,7 @@ class LogCleanerTest extends Logging {

// Try to clean segment with offset overflow. This will trigger log split and the cleaning itself must abort.
assertThrows(classOf[LogCleaningAbortedException], () =>
cleaner.cleanSegments(log, Seq(segmentWithOverflow).asJava, offsetMap, 0L, new CleanerStats(Time.SYSTEM),
cleaner.cleanSegments(log, util.List.of(segmentWithOverflow), offsetMap, 0L, new CleanerStats(Time.SYSTEM),
new CleanedTransactionMetadata, -1, segmentWithOverflow.readNextOffset)
)
assertEquals(numSegmentsInitial + 1, log.logSegments.size)
Expand All @@ -1646,7 +1646,7 @@ class LogCleanerTest extends Logging {
// Clean each segment now that split is complete.
val upperBoundOffset = log.logSegments.asScala.last.readNextOffset
for (segmentToClean <- log.logSegments.asScala)
cleaner.cleanSegments(log, List(segmentToClean).asJava, offsetMap, 0L, new CleanerStats(Time.SYSTEM),
cleaner.cleanSegments(log, util.List.of(segmentToClean), offsetMap, 0L, new CleanerStats(Time.SYSTEM),
new CleanedTransactionMetadata, -1, upperBoundOffset)
assertEquals(expectedKeysAfterCleaning, LogTestUtils.keysInLog(log))
assertFalse(LogTestUtils.hasOffsetOverflow(log))
Expand Down
47 changes: 24 additions & 23 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test

import java.util.{Collections, Properties}
import java.util
import java.util.Properties
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.storage.internals.log.{LogConfig, ThrottledReplicaListValidator}
Expand Down Expand Up @@ -142,7 +143,7 @@ class LogConfigTest {
/* Sanity check that toHtml produces one of the expected configs */
@Test
def testToHtml(): Unit = {
val html = LogConfig.configDefCopy.toHtml(4, (key: String) => "prefix_" + key, Collections.emptyMap())
val html = LogConfig.configDefCopy.toHtml(4, (key: String) => "prefix_" + key, util.Map.of)
val expectedConfig = "<h4><a id=\"file.delete.delay.ms\"></a><a id=\"prefix_file.delete.delay.ms\" href=\"#prefix_file.delete.delay.ms\">file.delete.delay.ms</a></h4>"
assertTrue(html.contains(expectedConfig), s"Could not find `$expectedConfig` in:\n $html")
}
Expand Down Expand Up @@ -293,7 +294,7 @@ class LogConfigTest {
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString)
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString)
assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
() => LogConfig.validate(util.Map.of, props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
}

@Test
Expand All @@ -303,7 +304,7 @@ class LogConfigTest {
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
val logProps = new Properties()
def validateCleanupPolicy(): Unit = {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
}
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
Expand All @@ -330,10 +331,10 @@ class LogConfigTest {
val logProps = new Properties()
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
if (sysRemoteStorageEnabled) {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
} else {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
() => LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker"))
}
}
Expand All @@ -349,21 +350,21 @@ class LogConfigTest {
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false")
if (wasRemoteStorageEnabled) {
val message = assertThrows(classOf[InvalidConfigurationException],
() => LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
() => LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
assertTrue(message.getMessage.contains("It is invalid to disable remote storage without deleting remote data. " +
"If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " +
"If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."))


// It should be able to disable the remote log storage when delete on disable is set to true
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true")
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
} else {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps,
kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
LogConfig.validate(util.Map.of(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps,
kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
}
}

Expand All @@ -382,12 +383,12 @@ class LogConfigTest {
logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500")
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
() => LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
} else {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
}
}

Expand All @@ -406,12 +407,12 @@ class LogConfigTest {
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128")
if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
() => LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
} else {
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validate(util.Map.of, logProps, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
}
}

Expand All @@ -426,10 +427,10 @@ class LogConfigTest {

if (sysRemoteStorageEnabled) {
val message = assertThrows(classOf[ConfigException],
() => LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
() => LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled))
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
} else {
LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled)
}
}

Expand Down
Loading