From 88e848a5a2c0b5b5f324db7d562a648798b788b1 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 3 Oct 2025 01:38:18 +0800 Subject: [PATCH 01/17] add some --- .../storage/internals/log/LogTestUtils.java | 186 ++++++ .../storage/internals/log/UnifiedLogTest.java | 590 +++++++++++++++++- .../apache/kafka/common/test/TestUtils.java | 281 +++++++-- 3 files changed, 1019 insertions(+), 38 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java index 50b666a0fb1d5..ca6a857a08dd8 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java @@ -16,11 +16,22 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.log.remote.storage.RemoteLogManager; +import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import java.io.File; import java.io.IOException; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; public class LogTestUtils { public static LogSegment createSegment(long offset, File logDir, int indexIntervalBytes, Time time) throws IOException { @@ -33,4 +44,179 @@ public static LogSegment createSegment(long offset, File logDir, int indexInterv // Create and return the LogSegment instance return new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time); } + + public static UnifiedLog createLog(File dir, + LogConfig config, + BrokerTopicStats brokerTopicStats, + Scheduler scheduler, + Time time) throws IOException { + return createLog( + dir, + config, + brokerTopicStats, + scheduler, + time, + 0L, // logStartOffset + 0L, // recoveryPoint + 5 * 60 * 1000, // maxTransactionTimeoutMs + new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, true, // lastShutdownClean + Optional.empty(), // topicId + new ConcurrentHashMap<>(), // numRemainingSegments + false, // remoteStorageSystemEnable + Optional.empty(), // remoteLogManager + LogOffsetsListener.NO_OP_OFFSETS_LISTENER + ); + } + + public static UnifiedLog createLog(File dir, + LogConfig config, + BrokerTopicStats brokerTopicStats, + Scheduler scheduler, + Time time, + long logStartOffset, + long recoveryPoint, + int maxTransactionTimeoutMs, + ProducerStateManagerConfig producerStateManagerConfig, + int producerIdExpirationCheckIntervalMs, + boolean lastShutdownClean, + Optional topicId, + ConcurrentMap numRemainingSegments, + boolean remoteStorageSystemEnable, + Optional remoteLogManager, + LogOffsetsListener logOffsetsListener) throws IOException { + return UnifiedLog.create( + dir, + config, + logStartOffset, + recoveryPoint, + scheduler, + brokerTopicStats, + time, + maxTransactionTimeoutMs, + producerStateManagerConfig, + producerIdExpirationCheckIntervalMs, + new LogDirFailureChannel(10), + lastShutdownClean, + topicId, // 直接傳入 Java Optional + numRemainingSegments, + remoteStorageSystemEnable, + logOffsetsListener + ); + } + + public static class LogConfigBuilder { + private long segmentMs = LogConfig.DEFAULT_SEGMENT_MS; + private int segmentBytes = LogConfig.DEFAULT_SEGMENT_BYTES; + private long retentionMs = LogConfig.DEFAULT_RETENTION_MS; + private long localRetentionMs = LogConfig.DEFAULT_LOCAL_RETENTION_MS; + private long retentionBytes = ServerLogConfigs.LOG_RETENTION_BYTES_DEFAULT; + private long localRetentionBytes = LogConfig.DEFAULT_LOCAL_RETENTION_BYTES; + private long segmentJitterMs = LogConfig.DEFAULT_SEGMENT_JITTER_MS; + private String cleanupPolicy = ServerLogConfigs.LOG_CLEANUP_POLICY_DEFAULT; + private int maxMessageBytes = ServerLogConfigs.MAX_MESSAGE_BYTES_DEFAULT; + private int indexIntervalBytes = ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DEFAULT; + private int segmentIndexBytes = ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT; + private long fileDeleteDelayMs = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT; + private boolean remoteLogStorageEnable = LogConfig.DEFAULT_REMOTE_STORAGE_ENABLE; + private boolean remoteLogCopyDisable = false; + private boolean remoteLogDeleteOnDisable = false; + + // 2. 為每個參數建立一個 "with" 方法,用於設定值並回傳 builder 本身 (fluent interface) + public LogConfigBuilder withSegmentMs(long segmentMs) { + this.segmentMs = segmentMs; + return this; + } + + public LogConfigBuilder withSegmentBytes(int segmentBytes) { + this.segmentBytes = segmentBytes; + return this; + } + + public LogConfigBuilder withRetentionMs(long retentionMs) { + this.retentionMs = retentionMs; + return this; + } + + public LogConfigBuilder withLocalRetentionMs(long localRetentionMs) { + this.localRetentionMs = localRetentionMs; + return this; + } + + public LogConfigBuilder withRetentionBytes(long retentionBytes) { + this.retentionBytes = retentionBytes; + return this; + } + + public LogConfigBuilder withLocalRetentionBytes(long localRetentionBytes) { + this.localRetentionBytes = localRetentionBytes; + return this; + } + + public LogConfigBuilder withSegmentJitterMs(long segmentJitterMs) { + this.segmentJitterMs = segmentJitterMs; + return this; + } + + public LogConfigBuilder withCleanupPolicy(String cleanupPolicy) { + this.cleanupPolicy = cleanupPolicy; + return this; + } + + public LogConfigBuilder withMaxMessageBytes(int maxMessageBytes) { + this.maxMessageBytes = maxMessageBytes; + return this; + } + + public LogConfigBuilder withIndexIntervalBytes(int indexIntervalBytes) { + this.indexIntervalBytes = indexIntervalBytes; + return this; + } + + public LogConfigBuilder withSegmentIndexBytes(int segmentIndexBytes) { + this.segmentIndexBytes = segmentIndexBytes; + return this; + } + + public LogConfigBuilder withFileDeleteDelayMs(long fileDeleteDelayMs) { + this.fileDeleteDelayMs = fileDeleteDelayMs; + return this; + } + + public LogConfigBuilder withRemoteLogStorageEnable(boolean remoteLogStorageEnable) { + this.remoteLogStorageEnable = remoteLogStorageEnable; + return this; + } + + public LogConfigBuilder withRemoteLogCopyDisable(boolean remoteLogCopyDisable) { + this.remoteLogCopyDisable = remoteLogCopyDisable; + return this; + } + + public LogConfigBuilder withRemoteLogDeleteOnDisable(boolean remoteLogDeleteOnDisable) { + this.remoteLogDeleteOnDisable = remoteLogDeleteOnDisable; + return this; + } + + // 3. 建立一個 build() 方法,它使用 builder 中設定的值來建立最終的 LogConfig 物件 + public LogConfig build() { + Properties logProps = new Properties(); + logProps.put(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(segmentMs)); + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, String.valueOf(segmentBytes)); + logProps.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs)); + logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, String.valueOf(localRetentionMs)); + logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, String.valueOf(retentionBytes)); + logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, String.valueOf(localRetentionBytes)); + logProps.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, String.valueOf(segmentJitterMs)); + logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy); + logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(maxMessageBytes)); + logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, String.valueOf(indexIntervalBytes)); + logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, String.valueOf(segmentIndexBytes)); + logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, String.valueOf(fileDeleteDelayMs)); + logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, String.valueOf(remoteLogStorageEnable)); + logProps.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, String.valueOf(remoteLogCopyDisable)); + logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, String.valueOf(remoteLogDeleteOnDisable)); + return new LogConfig(logProps); + } + } } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index ea14932ff203d..828ac31616282 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -16,17 +16,66 @@ */ package org.apache.kafka.storage.internals.log; -import org.apache.kafka.test.TestUtils; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.server.log.remote.storage.RemoteLogManager; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class UnifiedLogTest { private final File tmpDir = TestUtils.tempDirectory(); + private final File logDir = TestUtils.randomPartitionLogDir(tmpDir); + private final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false); + private final MockTime mockTime = new MockTime(); + private final int maxTransactionTimeoutMs = 60 * 60 * 1000; + private final ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(maxTransactionTimeoutMs, true); + private final List logsToClose = new ArrayList<>(); + + private LogConfig logConfig; + private UnifiedLog log; + + @BeforeEach + public void setup() throws IOException { + Properties props = TestUtils.createBrokerConfig(0, -1); + this.logConfig = new LogConfig(props); + } + + @AfterEach + public void tearDown() throws IOException { + brokerTopicStats.close(); + for (UnifiedLog log : logsToClose) { + log.close(); + } + Utils.delete(tmpDir); + } @Test public void testOffsetFromProducerSnapshotFile() { @@ -34,4 +83,543 @@ public void testOffsetFromProducerSnapshotFile() { File snapshotFile = LogFileUtils.producerSnapshotFile(tmpDir, offset); assertEquals(offset, UnifiedLog.offsetFromFile(snapshotFile)); } + + @Test + public void shouldApplyEpochToMessageOnAppendIfLeader() throws IOException { + SimpleRecord[] records = java.util.stream.IntStream.range(0, 50) + .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes())) + .toArray(SimpleRecord[]::new); + + // Given this partition is on leader epoch 72 + int epoch = 72; + try (UnifiedLog log = createLog(logDir, new LogConfig(new Properties()))) { + log.assignEpochStartOffset(epoch, records.length); + + // When appending messages as a leader (i.e. assignOffsets = true) + for (SimpleRecord record : records) { + log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, record), epoch); + } + + // Then leader epoch should be set on messages + for (int i = 0; i < records.length; i++) { + FetchDataInfo read = log.read(i, 1, FetchIsolation.LOG_END, true); + RecordBatch batch = read.records.batches().iterator().next(); + assertEquals(epoch, batch.partitionLeaderEpoch(), "Should have set leader epoch"); + } + } + } + + @Test + public void followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache() throws IOException { + int[] messageIds = java.util.stream.IntStream.range(0, 50).toArray(); + SimpleRecord[] records = Arrays.stream(messageIds) + .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes())) + .toArray(SimpleRecord[]::new); + + try (UnifiedLog log = createLog(logDir, new LogConfig(new Properties()))) { + // Given each message has an offset & epoch, as msgs from leader would + for (int i = 0; i < records.length; i++) { + long finalI = i; + MemoryRecords recordsForEpoch = MemoryRecords.withRecords(messageIds[i], Compression.NONE, records[i]); + recordsForEpoch.batches().forEach(batch -> { + batch.setPartitionLeaderEpoch(42); + batch.setLastOffset(finalI); + }); + appendAsFollower(log, recordsForEpoch, i); + } + + assertEquals(Optional.of(42), log.latestEpoch()); + } + } + + @Test + public void shouldTruncateLeaderEpochsWhenDeletingSegments() throws IOException { + MemoryRecords records = TestUtils.singletonRecords("test".getBytes()); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.sizeInBytes() * 5) + .withRetentionBytes(records.sizeInBytes() * 10L) + .build(); + + log = createLog(tmpDir, config); + LeaderEpochFileCache cache = epochCache(log); + + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records, 0); + } + + // Given epochs + cache.assign(0, 0); + cache.assign(1, 5); + cache.assign(2, 10); + + // When first segment is removed + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + + assertEquals(List.of(new EpochEntry(1, 5), new EpochEntry(2, 10)), cache.epochEntries()); + } + + @Test + public void shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() throws IOException { + MemoryRecords records = TestUtils.singletonRecords("test".getBytes()); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.sizeInBytes() * 5) + .withRetentionBytes(records.sizeInBytes() * 10L) + .build(); + + log = createLog(tmpDir, config); + LeaderEpochFileCache cache = epochCache(log); + + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records, 0); + } + + // Given epochs + cache.assign(0, 0); + cache.assign(1, 7); + cache.assign(2, 10); + + // When first segment removed (up to offset 5) + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + + assertEquals(List.of(new EpochEntry(0, 5), new EpochEntry(1, 7), new EpochEntry(2, 10)), cache.epochEntries()); + } + + @Test + public void shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog() throws IOException { + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(10 * createRecords(0, 0).sizeInBytes()) + .build(); + log = createLog(tmpDir, config); + LeaderEpochFileCache cache = epochCache(log); + + append(0, 0, 10); + append(1, 10, 6); + append(2, 16, 4); + + assertEquals(2, log.numberOfSegments()); + assertEquals(20, log.logEndOffset()); + + // When truncate to LEO (no op) + log.truncateTo(log.logEndOffset()); + // Then no change + assertEquals(3, cache.epochEntries().size()); + + // When truncate + log.truncateTo(11); + // Then no change + assertEquals(2, cache.epochEntries().size()); + + // When truncate + log.truncateTo(10); + assertEquals(1, cache.epochEntries().size()); + // When truncate all + log.truncateTo(0); + assertEquals(0, cache.epochEntries().size()); + } + + @Test + public void shouldDeleteSizeBasedSegments() throws IOException { + MemoryRecords records = TestUtils.singletonRecords("test".getBytes()); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(1024 * 1024 * 5) + .build(); + log = createLog(tmpDir, config); + + // append some messages to create some segments + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records, 0); + } + + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(2, log.numberOfSegments(), "should have 2 segments"); + } + + @Test + public void shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() throws IOException { + MemoryRecords records = TestUtils.singletonRecords("test".getBytes()); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.sizeInBytes() * 5) + .withRetentionBytes(records.sizeInBytes() * 15L) + .build(); + + log = createLog(tmpDir, config); + + // append some messages to create some segments + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records, 0); + } + + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(3, log.numberOfSegments(), "should have 3 segments"); + } + + @Test + public void shouldDeleteTimeBasedSegmentsReadyToBeDeleted() throws IOException { + MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), 10L); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.sizeInBytes() * 15) + .withRetentionMs(10000L) + .build(); + log = createLog(tmpDir, config); + + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records, 0); + } + + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(1, log.numberOfSegments(), "There should be 1 segment remaining"); + } + + @Test + public void shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() throws IOException { + MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), mockTime.milliseconds()); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.sizeInBytes() * 5) + .withRetentionMs(10000000) + .build(); + log = createLog(tmpDir, config); + + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records, 0); + } + + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(3, log.numberOfSegments(), "There should be 3 segments remaining"); + } + + @Test + public void shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() throws IOException { + MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.sizeInBytes() * 5) + .withRetentionMs(10000) + .withCleanupPolicy("compact") + .build(); + log = createLog(tmpDir, config); + + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records, 0); + } + + log.logSegments().iterator().next().setLastModified(mockTime.milliseconds() - 20000); + + int segments = log.numberOfSegments(); + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(segments, log.numberOfSegments(), "There should be 3 segments remaining"); + } + + @Test + public void shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() throws IOException { + MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.sizeInBytes() * 5) + .withRetentionBytes(records.sizeInBytes() * 10L) + .withCleanupPolicy("compact, delete") + .build(); + + log = createLog(tmpDir, config); + + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records, 0); + } + + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(1, log.numberOfSegments(), "There should be 1 segment remaining"); + } + + @Test + public void shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention() throws IOException { + MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); + int recordSize = records.sizeInBytes(); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(recordSize * 2) + .withRetentionBytes(recordSize / 2) + .withCleanupPolicy("") + .withRemoteLogStorageEnable(true) + .build(); + log = createLog(tmpDir, config, true); + + for (int i = 0; i < 10; i++) { + log.appendAsLeader(records, 0); + } + + int segmentsBefore = log.numberOfSegments(); + log.updateHighWatermark(log.logEndOffset()); + log.updateHighestOffsetInRemoteStorage(log.logEndOffset() - 1); + int deletedSegments = log.deleteOldSegments(); + + assertTrue(log.numberOfSegments() < segmentsBefore, "Some segments should be deleted due to size retention"); + assertTrue(deletedSegments > 0, "At least one segment should be deleted"); + } + + @Test + public void shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention() throws IOException { + long oldTimestamp = mockTime.milliseconds() - 20000; + MemoryRecords oldRecords = TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), oldTimestamp); + int recordSize = oldRecords.sizeInBytes(); + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(recordSize * 2) + .withLocalRetentionMs(5000) + .withCleanupPolicy("") + .withRemoteLogStorageEnable(true) + .build(); + log = createLog(tmpDir, logConfig, true); + + for (int i = 0; i < 10; i++) { + log.appendAsLeader(oldRecords, 0); + } + + MemoryRecords newRecords = TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), mockTime.milliseconds()); + for (int i = 0; i < 5; i++) { + log.appendAsLeader(newRecords, 0); + } + + int segmentsBefore = log.numberOfSegments(); + + log.updateHighWatermark(log.logEndOffset()); + log.updateHighestOffsetInRemoteStorage(log.logEndOffset() - 1); + int deletedSegments = log.deleteOldSegments(); + + assertTrue(log.numberOfSegments() < segmentsBefore, "Some segments should be deleted due to time retention"); + assertTrue(deletedSegments > 0, "At least one segment should be deleted"); + } + + @Test + public void testLogDeletionAfterDeleteRecords() throws IOException { + MemoryRecords records = TestUtils.singletonRecords("test".getBytes()); + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.sizeInBytes() * 5) + .build(); + log = createLog(tmpDir, logConfig); + + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records, 0); + } + assertEquals(3, log.numberOfSegments()); + assertEquals(0, log.logStartOffset()); + log.updateHighWatermark(log.logEndOffset()); + + log.maybeIncrementLogStartOffset(1, LogStartOffsetIncrementReason.ClientRecordDeletion); + log.deleteOldSegments(); + assertEquals(3, log.numberOfSegments()); + assertEquals(1, log.logStartOffset()); + + log.maybeIncrementLogStartOffset(6, LogStartOffsetIncrementReason.ClientRecordDeletion); + log.deleteOldSegments(); + assertEquals(2, log.numberOfSegments()); + assertEquals(6, log.logStartOffset()); + + log.maybeIncrementLogStartOffset(15, LogStartOffsetIncrementReason.ClientRecordDeletion); + log.deleteOldSegments(); + assertEquals(1, log.numberOfSegments()); + assertEquals(15, log.logStartOffset()); + } + + @Test + public void testLogDeletionAfterClose() throws IOException { + MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), mockTime.milliseconds() - 1000); + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.sizeInBytes() * 5) + .withSegmentIndexBytes(1000) + .withRetentionMs(999) + .build(); + log = createLog(tmpDir, logConfig); + + log.appendAsLeader(records, 0); + + assertEquals(1, log.numberOfSegments()); + assertEquals(1, epochCache(log).epochEntries().size()); + + log.close(); + log.delete(); + assertEquals(0, log.numberOfSegments()); + assertEquals(0, epochCache(log).epochEntries().size()); + } + + @Test + public void testDeleteOldSegments() throws IOException { + MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), mockTime.milliseconds() - 1000); + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.sizeInBytes() * 5) + .withSegmentIndexBytes(1000) + .withRetentionMs(999) + .build(); + log = createLog(tmpDir, logConfig); + + for (int i = 0; i < 100; i++) { + log.appendAsLeader(records, 0); + } + + log.assignEpochStartOffset(0, 40); + log.assignEpochStartOffset(1, 90); + + int numSegments = log.numberOfSegments(); + log.deleteOldSegments(); + assertEquals(numSegments, log.numberOfSegments()); + assertEquals(0L, log.logStartOffset()); + + for (long hw = 25; hw <= 30; hw++) { + log.updateHighWatermark(hw); + log.deleteOldSegments(); + assertTrue(log.logStartOffset() <= hw); + long finalHw = hw; + log.logSegments().forEach(segment -> { + FetchDataInfo segmentFetchInfo; + try { + segmentFetchInfo = segment.read(segment.baseOffset(), Integer.MAX_VALUE); + } catch (IOException e) { + throw new RuntimeException(e); + } + // FIXME: think + Optional lastBatch = Optional.empty(); + for (RecordBatch batch : segmentFetchInfo.records.batches()) { + lastBatch = Optional.of(batch); + } + lastBatch.ifPresent(batch -> assertTrue(batch.lastOffset() >= finalHw)); + }); + } + + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(1, log.numberOfSegments(), "The deleted segments should be gone."); + assertEquals(1, epochCache(log).epochEntries().size(), "Epoch entries should have gone."); + assertEquals(new EpochEntry(1, 100), epochCache(log).epochEntries().get(0), "Epoch entry should be the latest epoch and the leo."); + + for (int i = 0; i < 100; i++) { + log.appendAsLeader(records, 0); + } + + log.delete(); + assertEquals(0, log.numberOfSegments(), "The number of segments should be 0"); + assertEquals(0, log.deleteOldSegments(), "The number of deleted segments should be zero."); + assertEquals(0, epochCache(log).epochEntries().size(), "Epoch entries should have gone."); + } + + @Test + public void shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete() throws IOException { + MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); + int recordsPerSegment = 5; + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.sizeInBytes() * recordsPerSegment) + .withSegmentIndexBytes(1000) + .withCleanupPolicy("compact") + .build(); + log = createLog(tmpDir, logConfig); + + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records, 0); + } + + assertEquals(3, log.numberOfSegments()); + log.updateHighWatermark(log.logEndOffset()); + log.maybeIncrementLogStartOffset(recordsPerSegment, LogStartOffsetIncrementReason.ClientRecordDeletion); + + // The first segment, which is entirely before the log start offset, should be deleted + // Of the remaining the segments, the first can overlap the log start offset and the rest must have a base offset + // greater than the start offset. + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(2, log.numberOfSegments(), "There should be 2 segments remaining"); + assertTrue(log.logSegments().iterator().next().baseOffset() <= log.logStartOffset()); + log.logSegments().forEach(segment -> { + if (log.logSegments().iterator().next() != segment) { + assertTrue(segment.baseOffset() > log.logStartOffset()); + } + }); + } + + @Test + public void testFirstUnstableOffsetNoTransactionalData() throws IOException { + MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, + new SimpleRecord("foo".getBytes()), + new SimpleRecord("bar".getBytes()), + new SimpleRecord("baz".getBytes())); + + log.appendAsLeader(records, 0); + assertEquals(Optional.empty(), log.firstUnstableOffset()); + } + + @Test + public void testFirstUnstableOffsetWithTransactionalData() throws IOException { + long pid = 137L; + short epoch = 5; + int seq = 0; + + MemoryRecords records = MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq, + new SimpleRecord("foo".getBytes()), + new SimpleRecord("bar".getBytes()), + new SimpleRecord("baz".getBytes())); + + LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0); + assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); + + seq += 3; + log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq, + new SimpleRecord("blah".getBytes())), 0); + + assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); + + LogAppendInfo commitAppendInfo = appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT, mockTime.milliseconds()); + + assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); + log.updateHighWatermark(commitAppendInfo.lastOffset() + 1); + + assertEquals(Optional.empty(), log.firstUnstableOffset()); + } + + private void append(int epoch, long startOffset, int count) { + for (int i = 0; i < count; i++) { + log.appendAsFollower(createRecords(startOffset + i, epoch), epoch); + } + } + + private LeaderEpochFileCache epochCache(UnifiedLog log) { + return log.leaderEpochCache(); + } + + private void appendAsFollower(UnifiedLog log, MemoryRecords records, int leaderEpoch) { + records.batches().forEach(batch -> batch.setPartitionLeaderEpoch(leaderEpoch)); + log.appendAsFollower(records, leaderEpoch); + } + + private LogAppendInfo appendEndTxnMarkerAsLeader(UnifiedLog log, long producerId, short producerEpoch, ControlRecordType controlType, long timestamp) throws IOException { + MemoryRecords records = MemoryRecords.withEndTransactionMarker(producerId, producerEpoch, new EndTransactionMarker(controlType, 0)); + return log.appendAsLeader(records, 0); + } + + private UnifiedLog createLog(File dir, LogConfig config) throws IOException { + return createLog(dir, config, false); + } + + private UnifiedLog createLog(File dir, LogConfig config, boolean remoteStorageSystemEnable) throws IOException { + return createLog(dir, config, this.brokerTopicStats, mockTime.scheduler, this.mockTime, 0L, 0L, maxTransactionTimeoutMs, + this.producerStateManagerConfig, TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, true, + Optional.empty(), remoteStorageSystemEnable, Optional.empty(), LogOffsetsListener.NO_OP_OFFSETS_LISTENER); + } + + private UnifiedLog createLog(File dir, LogConfig config, BrokerTopicStats brokerTopicStats, Scheduler scheduler, MockTime time, + long logStartOffset, long recoveryPoint, int maxTransactionTimeoutMs, ProducerStateManagerConfig producerStateManagerConfig, + int producerIdExpirationCheckIntervalMs, boolean lastShutdownClean, Optional topicId, boolean remoteStorageSystemEnable, + Optional remoteLogManager, LogOffsetsListener logOffsetsListener) throws IOException { + + UnifiedLog log = LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, logStartOffset, recoveryPoint, + maxTransactionTimeoutMs, producerStateManagerConfig, producerIdExpirationCheckIntervalMs, lastShutdownClean, topicId, + new ConcurrentHashMap<>(), remoteStorageSystemEnable, remoteLogManager, logOffsetsListener); + + this.logsToClose.add(log); + return log; + } + + private MemoryRecords createRecords(long startOffset, int epoch) { + return TestUtils.records(List.of(new SimpleRecord("value".getBytes())), startOffset, epoch); + } + } diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java index 4c75272edd4af..1f16fcfc2c031 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java @@ -16,30 +16,49 @@ */ package org.apache.kafka.common.test; +import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.test.JaasTestUtils; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.server.config.KRaftConfigs; +import org.apache.kafka.server.config.ReplicationConfigs; +import org.apache.kafka.server.config.ServerConfigs; +import org.apache.kafka.server.config.ServerLogConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; import java.util.Random; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static java.lang.String.format; -/** - * Helper functions for writing unit tests. - *

- * Package-private: Not intended for use outside {@code org.apache.kafka.common.test}. - */ -class TestUtils { +public class TestUtils { private static final Logger log = LoggerFactory.getLogger(TestUtils.class); - /* A consistent random number generator to make tests repeatable */ public static final Random SEEDED_RANDOM = new Random(192348092834L); public static final String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; @@ -48,23 +67,14 @@ class TestUtils { private static final long DEFAULT_POLL_INTERVAL_MS = 100; private static final long DEFAULT_MAX_WAIT_MS = 15_000; + private static final Random RANDOM = new Random(); - /** - * Create an empty file in the default temporary-file directory, using `kafka` as the prefix and `tmp` as the - * suffix to generate its name. - */ public static File tempFile() throws IOException { final File file = Files.createTempFile("kafka", ".tmp").toFile(); file.deleteOnExit(); return file; } - /** - * Generate a random string of letters and digits of the given length - * - * @param len The length of the string - * @return The random string - */ public static String randomString(final int len) { final StringBuilder b = new StringBuilder(); for (int i = 0; i < len; i++) @@ -72,11 +82,7 @@ public static String randomString(final int len) { return b.toString(); } - /** - * Create a temporary relative directory in the specified parent directory with the given prefix. - * - */ - static File tempDirectory() { + public static File tempDirectory() { final File file; String prefix = "kafka-"; try { @@ -96,19 +102,17 @@ static File tempDirectory() { return file; } - /** - * uses default value of 15 seconds for timeout - */ + public static File tempRelativeDir(String parent) { + File file = new File(parent, "kafka-" + SEEDED_RANDOM.nextInt(1000000)); + file.mkdirs(); + file.deleteOnExit(); + return file; + } + public static void waitForCondition(final Supplier testCondition, final String conditionDetails) throws InterruptedException { waitForCondition(testCondition, DEFAULT_MAX_WAIT_MS, () -> conditionDetails); } - /** - * Wait for condition to be met for at most {@code maxWaitMs} and throw assertion failure otherwise. - * This should be used instead of {@code Thread.sleep} whenever possible as it allows a longer timeout to be used - * without unnecessarily increasing test time (as the condition is checked frequently). The longer timeout is needed to - * avoid transient failures due to slow or overloaded machines. - */ public static void waitForCondition(final Supplier testCondition, final long maxWaitMs, final Supplier conditionDetails) throws InterruptedException { @@ -134,15 +138,218 @@ public static void waitForCondition(final Supplier testCondition, } } - /** - * Wait for condition to be met for at most {@code maxWaitMs} and throw assertion failure otherwise. - * This should be used instead of {@code Thread.sleep} whenever possible as it allows a longer timeout to be used - * without unnecessarily increasing test time (as the condition is checked frequently). The longer timeout is needed to - * avoid transient failures due to slow or overloaded machines. - */ public static void waitForCondition(final Supplier testCondition, final long maxWaitMs, String conditionDetails) throws InterruptedException { waitForCondition(testCondition, maxWaitMs, () -> conditionDetails); } + + public static File randomPartitionLogDir(File parentDir) { + int attempts = 1000; + while (attempts > 0) { + File f = new File(parentDir, "kafka-" + RANDOM.nextInt(1000000)); + if (f.mkdir()) { + f.deleteOnExit(); + return f; + } + attempts--; + } + throw new RuntimeException("Failed to create directory after 1000 attempts"); + } + + public static Properties createBrokerConfig(int nodeId, int port) { + return new BrokerConfigBuilder(nodeId).withPort(port).build(); + } + + public static MemoryRecords singletonRecords(byte[] value, byte[] key) { + return singletonRecords(value, key, Compression.NONE, RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE); + } + + public static MemoryRecords singletonRecords(byte[] value, long timestamp) { + return singletonRecords(value, null, Compression.NONE, timestamp, RecordBatch.CURRENT_MAGIC_VALUE); + } + + public static MemoryRecords singletonRecords( + byte[] value + ) { + return records(List.of(new SimpleRecord(RecordBatch.NO_TIMESTAMP, null, value)), + RecordBatch.CURRENT_MAGIC_VALUE, + Compression.NONE, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, + 0, + RecordBatch.NO_PARTITION_LEADER_EPOCH + ); + } + + public static MemoryRecords singletonRecords( + byte[] value, + byte[] key, + Compression codec, + long timestamp, + byte magicValue + ) { + return records(List.of(new SimpleRecord(timestamp, key, value)), + magicValue, codec, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, + 0, + RecordBatch.NO_PARTITION_LEADER_EPOCH + ); + } + + public static MemoryRecords singletonRecords(byte[] value, byte[] key, long timestamp) { + return singletonRecords(value, key, Compression.NONE, timestamp, RecordBatch.CURRENT_MAGIC_VALUE); + } + + public static MemoryRecords records(List records) { + return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH); + } + + public static MemoryRecords records(List records, long baseOffset) { + return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, baseOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH); + } + + public static MemoryRecords records(List records, long baseOffset, int partitionLeaderEpoch) { + return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, baseOffset, partitionLeaderEpoch); + } + + public static MemoryRecords records(List records, byte magicValue, Compression compression) { + return records(records, magicValue, compression, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH); + } + + public static MemoryRecords records(List records, + byte magicValue, + Compression compression, + long producerId, + short producerEpoch, + int sequence, + long baseOffset, + int partitionLeaderEpoch) { + ByteBuffer buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records)); + MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, compression, TimestampType.CREATE_TIME, baseOffset, + System.currentTimeMillis(), producerId, producerEpoch, sequence, false, partitionLeaderEpoch); + for (SimpleRecord record : records) { + builder.append(record); + } + return builder.build(); + } + + public static class BrokerConfigBuilder { + private final int nodeId; + private boolean enableControlledShutdown = true; + private boolean enableDeleteTopic = true; + private int port = -1; + private Optional interBrokerSecurityProtocol = Optional.empty(); + private Optional trustStoreFile = Optional.empty(); + private Optional saslProperties = Optional.empty(); + private boolean enablePlaintext = true; + private boolean enableSaslPlaintext = false; + private int saslPlaintextPort = -1; + private boolean enableSsl = false; + private int sslPort = -1; + private boolean enableSaslSsl = false; + private int saslSslPort = -1; + private Optional rack = Optional.empty(); + private int logDirCount = 1; + private int numPartitions = 1; + private short defaultReplicationFactor = 1; + private boolean enableFetchFromFollower = false; + + public BrokerConfigBuilder(int nodeId) { + this.nodeId = nodeId; + } + + public BrokerConfigBuilder withPort(int port) { + this.enablePlaintext = true; + this.port = port; + return this; + } + + public BrokerConfigBuilder withSsl(int port, File trustStoreFile) { + this.enableSsl = true; + this.sslPort = port; + this.trustStoreFile = Optional.of(trustStoreFile); + return this; + } + + public Properties build() { + List> protocolAndPorts = new ArrayList<>(); + + if (enablePlaintext || (interBrokerSecurityProtocol.isPresent() && interBrokerSecurityProtocol.get() == SecurityProtocol.PLAINTEXT)) + protocolAndPorts.add(new AbstractMap.SimpleEntry<>(SecurityProtocol.PLAINTEXT, port)); + if (enableSsl || (interBrokerSecurityProtocol.isPresent() && interBrokerSecurityProtocol.get() == SecurityProtocol.SSL)) + protocolAndPorts.add(new AbstractMap.SimpleEntry<>(SecurityProtocol.SSL, sslPort)); + if (enableSaslPlaintext || (interBrokerSecurityProtocol.isPresent() && interBrokerSecurityProtocol.get() == SecurityProtocol.SASL_PLAINTEXT)) + protocolAndPorts.add(new AbstractMap.SimpleEntry<>(SecurityProtocol.SASL_PLAINTEXT, saslPlaintextPort)); + if (enableSaslSsl || (interBrokerSecurityProtocol.isPresent() && interBrokerSecurityProtocol.get() == SecurityProtocol.SASL_SSL)) + protocolAndPorts.add(new AbstractMap.SimpleEntry<>(SecurityProtocol.SASL_SSL, saslSslPort)); + + String listeners = protocolAndPorts.stream() + .map(entry -> String.format("%s://localhost:%d", entry.getKey().name(), entry.getValue())) + .collect(Collectors.joining(",")); + + Properties props = new Properties(); + props.put(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, "true"); + props.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"); + props.put(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG, String.valueOf(TimeUnit.MINUTES.toMillis(10))); + props.put(KRaftConfigs.NODE_ID_CONFIG, String.valueOf(nodeId)); + props.put(ServerConfigs.BROKER_ID_CONFIG, String.valueOf(nodeId)); + props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners); + props.put(SocketServerConfigs.LISTENERS_CONFIG, listeners); + props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER"); + + String securityProtocolMap = protocolAndPorts.stream() + .map(entry -> String.format("%s:%s", entry.getKey().name(), entry.getKey().name())) + .collect(Collectors.joining(",")) + ",CONTROLLER:PLAINTEXT"; + props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, securityProtocolMap); + + if (logDirCount > 1) { + String logDirs = IntStream.range(0, logDirCount) + .mapToObj(i -> tempDirectory().getAbsolutePath()) + .collect(Collectors.joining(",")); + props.put(ServerLogConfigs.LOG_DIRS_CONFIG, logDirs); + } else { + props.put(ServerLogConfigs.LOG_DIR_CONFIG, tempDirectory().getAbsolutePath()); + } + + props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker"); + props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1000@localhost:0"); + props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, String.valueOf(enableControlledShutdown)); + props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, String.valueOf(enableDeleteTopic)); + + rack.ifPresent(r -> props.put(ServerConfigs.BROKER_RACK_CONFIG, r)); + + try { + if (protocolAndPorts.stream().anyMatch(entry -> JaasTestUtils.usesSslTransportLayer(entry.getKey()))) { + props.putAll(JaasTestUtils.sslConfigs(org.apache.kafka.common.network.ConnectionMode.SERVER, false, trustStoreFile, "server" + nodeId)); + } + + if (protocolAndPorts.stream().anyMatch(entry -> JaasTestUtils.usesSaslAuthentication(entry.getKey()))) { + saslProperties.ifPresent(p -> props.putAll(JaasTestUtils.saslConfigs(Optional.of(p)))); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + interBrokerSecurityProtocol.ifPresent(protocol -> + props.put(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG, protocol.name())); + + props.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, String.valueOf(numPartitions)); + props.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, String.valueOf(defaultReplicationFactor)); + + if (enableFetchFromFollower) { + props.put(ServerConfigs.BROKER_RACK_CONFIG, String.valueOf(nodeId)); + props.put(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, "org.apache.kafka.common.replica.RackAwareReplicaSelector"); + } + + return props; + } + } } From 78862d5c259f419655c3f8f9bbc704448e97134d Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 3 Oct 2025 11:39:49 +0800 Subject: [PATCH 02/17] remove jaasTestUtils --- .../main/java/org/apache/kafka/common/test/TestUtils.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java index 1f16fcfc2c031..5003dc7629d39 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.common.security.test.JaasTestUtils; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.network.SocketServerConfigs; @@ -327,13 +326,6 @@ public Properties build() { rack.ifPresent(r -> props.put(ServerConfigs.BROKER_RACK_CONFIG, r)); try { - if (protocolAndPorts.stream().anyMatch(entry -> JaasTestUtils.usesSslTransportLayer(entry.getKey()))) { - props.putAll(JaasTestUtils.sslConfigs(org.apache.kafka.common.network.ConnectionMode.SERVER, false, trustStoreFile, "server" + nodeId)); - } - - if (protocolAndPorts.stream().anyMatch(entry -> JaasTestUtils.usesSaslAuthentication(entry.getKey()))) { - saslProperties.ifPresent(p -> props.putAll(JaasTestUtils.saslConfigs(Optional.of(p)))); - } } catch (Exception e) { throw new RuntimeException(e); } From 04313d742f6e5fcecfaf8508054f4dddd98e7f07 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 3 Oct 2025 12:54:10 +0800 Subject: [PATCH 03/17] remove brokerConfigBuilder --- .../apache/kafka/common/test/TestUtils.java | 109 ------------------ 1 file changed, 109 deletions(-) diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java index 5003dc7629d39..fbba8b6ef93d1 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java @@ -156,10 +156,6 @@ public static File randomPartitionLogDir(File parentDir) { throw new RuntimeException("Failed to create directory after 1000 attempts"); } - public static Properties createBrokerConfig(int nodeId, int port) { - return new BrokerConfigBuilder(nodeId).withPort(port).build(); - } - public static MemoryRecords singletonRecords(byte[] value, byte[] key) { return singletonRecords(value, key, Compression.NONE, RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE); } @@ -239,109 +235,4 @@ public static MemoryRecords records(List records, } return builder.build(); } - - public static class BrokerConfigBuilder { - private final int nodeId; - private boolean enableControlledShutdown = true; - private boolean enableDeleteTopic = true; - private int port = -1; - private Optional interBrokerSecurityProtocol = Optional.empty(); - private Optional trustStoreFile = Optional.empty(); - private Optional saslProperties = Optional.empty(); - private boolean enablePlaintext = true; - private boolean enableSaslPlaintext = false; - private int saslPlaintextPort = -1; - private boolean enableSsl = false; - private int sslPort = -1; - private boolean enableSaslSsl = false; - private int saslSslPort = -1; - private Optional rack = Optional.empty(); - private int logDirCount = 1; - private int numPartitions = 1; - private short defaultReplicationFactor = 1; - private boolean enableFetchFromFollower = false; - - public BrokerConfigBuilder(int nodeId) { - this.nodeId = nodeId; - } - - public BrokerConfigBuilder withPort(int port) { - this.enablePlaintext = true; - this.port = port; - return this; - } - - public BrokerConfigBuilder withSsl(int port, File trustStoreFile) { - this.enableSsl = true; - this.sslPort = port; - this.trustStoreFile = Optional.of(trustStoreFile); - return this; - } - - public Properties build() { - List> protocolAndPorts = new ArrayList<>(); - - if (enablePlaintext || (interBrokerSecurityProtocol.isPresent() && interBrokerSecurityProtocol.get() == SecurityProtocol.PLAINTEXT)) - protocolAndPorts.add(new AbstractMap.SimpleEntry<>(SecurityProtocol.PLAINTEXT, port)); - if (enableSsl || (interBrokerSecurityProtocol.isPresent() && interBrokerSecurityProtocol.get() == SecurityProtocol.SSL)) - protocolAndPorts.add(new AbstractMap.SimpleEntry<>(SecurityProtocol.SSL, sslPort)); - if (enableSaslPlaintext || (interBrokerSecurityProtocol.isPresent() && interBrokerSecurityProtocol.get() == SecurityProtocol.SASL_PLAINTEXT)) - protocolAndPorts.add(new AbstractMap.SimpleEntry<>(SecurityProtocol.SASL_PLAINTEXT, saslPlaintextPort)); - if (enableSaslSsl || (interBrokerSecurityProtocol.isPresent() && interBrokerSecurityProtocol.get() == SecurityProtocol.SASL_SSL)) - protocolAndPorts.add(new AbstractMap.SimpleEntry<>(SecurityProtocol.SASL_SSL, saslSslPort)); - - String listeners = protocolAndPorts.stream() - .map(entry -> String.format("%s://localhost:%d", entry.getKey().name(), entry.getValue())) - .collect(Collectors.joining(",")); - - Properties props = new Properties(); - props.put(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, "true"); - props.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"); - props.put(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG, String.valueOf(TimeUnit.MINUTES.toMillis(10))); - props.put(KRaftConfigs.NODE_ID_CONFIG, String.valueOf(nodeId)); - props.put(ServerConfigs.BROKER_ID_CONFIG, String.valueOf(nodeId)); - props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, listeners); - props.put(SocketServerConfigs.LISTENERS_CONFIG, listeners); - props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER"); - - String securityProtocolMap = protocolAndPorts.stream() - .map(entry -> String.format("%s:%s", entry.getKey().name(), entry.getKey().name())) - .collect(Collectors.joining(",")) + ",CONTROLLER:PLAINTEXT"; - props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, securityProtocolMap); - - if (logDirCount > 1) { - String logDirs = IntStream.range(0, logDirCount) - .mapToObj(i -> tempDirectory().getAbsolutePath()) - .collect(Collectors.joining(",")); - props.put(ServerLogConfigs.LOG_DIRS_CONFIG, logDirs); - } else { - props.put(ServerLogConfigs.LOG_DIR_CONFIG, tempDirectory().getAbsolutePath()); - } - - props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker"); - props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "1000@localhost:0"); - props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, String.valueOf(enableControlledShutdown)); - props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, String.valueOf(enableDeleteTopic)); - - rack.ifPresent(r -> props.put(ServerConfigs.BROKER_RACK_CONFIG, r)); - - try { - } catch (Exception e) { - throw new RuntimeException(e); - } - - interBrokerSecurityProtocol.ifPresent(protocol -> - props.put(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG, protocol.name())); - - props.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, String.valueOf(numPartitions)); - props.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, String.valueOf(defaultReplicationFactor)); - - if (enableFetchFromFollower) { - props.put(ServerConfigs.BROKER_RACK_CONFIG, String.valueOf(nodeId)); - props.put(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, "org.apache.kafka.common.replica.RackAwareReplicaSelector"); - } - - return props; - } - } } From a0c0bb71d46194d6303c2de50257e3ac092a4be4 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 3 Oct 2025 13:22:21 +0800 Subject: [PATCH 04/17] fix some --- .../storage/internals/log/LogTestUtils.java | 11 +- .../storage/internals/log/UnifiedLogTest.java | 163 +++++++++--------- .../src/main/resources/log4j2.yaml | 4 +- 3 files changed, 87 insertions(+), 91 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java index ca6a857a08dd8..4c1b06602c209 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.transaction.TransactionLogConfig; import org.apache.kafka.server.config.ServerLogConfigs; -import org.apache.kafka.server.log.remote.storage.RemoteLogManager; import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -64,11 +63,11 @@ public static UnifiedLog createLog(File dir, Optional.empty(), // topicId new ConcurrentHashMap<>(), // numRemainingSegments false, // remoteStorageSystemEnable - Optional.empty(), // remoteLogManager LogOffsetsListener.NO_OP_OFFSETS_LISTENER ); } + @SuppressWarnings("ParameterNumber") public static UnifiedLog createLog(File dir, LogConfig config, BrokerTopicStats brokerTopicStats, @@ -83,7 +82,6 @@ public static UnifiedLog createLog(File dir, Optional topicId, ConcurrentMap numRemainingSegments, boolean remoteStorageSystemEnable, - Optional remoteLogManager, LogOffsetsListener logOffsetsListener) throws IOException { return UnifiedLog.create( dir, @@ -98,7 +96,7 @@ public static UnifiedLog createLog(File dir, producerIdExpirationCheckIntervalMs, new LogDirFailureChannel(10), lastShutdownClean, - topicId, // 直接傳入 Java Optional + topicId, numRemainingSegments, remoteStorageSystemEnable, logOffsetsListener @@ -119,10 +117,9 @@ public static class LogConfigBuilder { private int segmentIndexBytes = ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT; private long fileDeleteDelayMs = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT; private boolean remoteLogStorageEnable = LogConfig.DEFAULT_REMOTE_STORAGE_ENABLE; - private boolean remoteLogCopyDisable = false; - private boolean remoteLogDeleteOnDisable = false; + private boolean remoteLogCopyDisable = LogConfig.DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG; + private boolean remoteLogDeleteOnDisable = LogConfig.DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG; - // 2. 為每個參數建立一個 "with" 方法,用於設定值並回傳 builder 本身 (fluent interface) public LogConfigBuilder withSegmentMs(long segmentMs) { this.segmentMs = segmentMs; return this; diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index 828ac31616282..e7e0273e76559 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -24,9 +24,9 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.utils.PrimitiveRef; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.transaction.TransactionLogConfig; -import org.apache.kafka.server.log.remote.storage.RemoteLogManager; import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.util.MockTime; import org.apache.kafka.server.util.Scheduler; @@ -34,7 +34,6 @@ import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.File; @@ -45,6 +44,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -59,15 +59,8 @@ public class UnifiedLogTest { private final ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(maxTransactionTimeoutMs, true); private final List logsToClose = new ArrayList<>(); - private LogConfig logConfig; private UnifiedLog log; - @BeforeEach - public void setup() throws IOException { - Properties props = TestUtils.createBrokerConfig(0, -1); - this.logConfig = new LogConfig(props); - } - @AfterEach public void tearDown() throws IOException { brokerTopicStats.close(); @@ -134,17 +127,17 @@ public void followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCa @Test public void shouldTruncateLeaderEpochsWhenDeletingSegments() throws IOException { - MemoryRecords records = TestUtils.singletonRecords("test".getBytes()); + Supplier records = () -> TestUtils.singletonRecords("test".getBytes()); LogConfig config = new LogTestUtils.LogConfigBuilder() - .withSegmentBytes(records.sizeInBytes() * 5) - .withRetentionBytes(records.sizeInBytes() * 10L) + .withSegmentBytes(records.get().sizeInBytes() * 5) + .withRetentionBytes(records.get().sizeInBytes() * 10L) .build(); - log = createLog(tmpDir, config); + log = createLog(logDir, config); LeaderEpochFileCache cache = epochCache(log); for (int i = 0; i < 15; i++) { - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); } // Given epochs @@ -167,7 +160,7 @@ public void shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() throws IOExc .withRetentionBytes(records.sizeInBytes() * 10L) .build(); - log = createLog(tmpDir, config); + log = createLog(logDir, config); LeaderEpochFileCache cache = epochCache(log); for (int i = 0; i < 15; i++) { @@ -191,7 +184,7 @@ public void shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog() throws IO LogConfig config = new LogTestUtils.LogConfigBuilder() .withSegmentBytes(10 * createRecords(0, 0).sizeInBytes()) .build(); - log = createLog(tmpDir, config); + log = createLog(logDir, config); LeaderEpochFileCache cache = epochCache(log); append(0, 0, 10); @@ -225,7 +218,7 @@ public void shouldDeleteSizeBasedSegments() throws IOException { LogConfig config = new LogTestUtils.LogConfigBuilder() .withSegmentBytes(1024 * 1024 * 5) .build(); - log = createLog(tmpDir, config); + log = createLog(logDir, config); // append some messages to create some segments for (int i = 0; i < 15; i++) { @@ -245,7 +238,7 @@ public void shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() throws IOEx .withRetentionBytes(records.sizeInBytes() * 15L) .build(); - log = createLog(tmpDir, config); + log = createLog(logDir, config); // append some messages to create some segments for (int i = 0; i < 15; i++) { @@ -264,7 +257,7 @@ public void shouldDeleteTimeBasedSegmentsReadyToBeDeleted() throws IOException { .withSegmentBytes(records.sizeInBytes() * 15) .withRetentionMs(10000L) .build(); - log = createLog(tmpDir, config); + log = createLog(logDir, config); for (int i = 0; i < 15; i++) { log.appendAsLeader(records, 0); @@ -277,15 +270,15 @@ public void shouldDeleteTimeBasedSegmentsReadyToBeDeleted() throws IOException { @Test public void shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() throws IOException { - MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), mockTime.milliseconds()); - LogConfig config = new LogTestUtils.LogConfigBuilder() - .withSegmentBytes(records.sizeInBytes() * 5) + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), mockTime.milliseconds()); + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.get().sizeInBytes() * 5) .withRetentionMs(10000000) .build(); - log = createLog(tmpDir, config); + log = createLog(logDir, logConfig); for (int i = 0; i < 15; i++) { - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); } log.updateHighWatermark(log.logEndOffset()); @@ -295,16 +288,16 @@ public void shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() throws IO @Test public void shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() throws IOException { - MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); LogConfig config = new LogTestUtils.LogConfigBuilder() - .withSegmentBytes(records.sizeInBytes() * 5) + .withSegmentBytes(records.get().sizeInBytes() * 5) .withRetentionMs(10000) .withCleanupPolicy("compact") .build(); - log = createLog(tmpDir, config); + log = createLog(logDir, config); for (int i = 0; i < 15; i++) { - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); } log.logSegments().iterator().next().setLastModified(mockTime.milliseconds() - 20000); @@ -317,17 +310,17 @@ public void shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() throws IOExc @Test public void shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() throws IOException { - MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); LogConfig config = new LogTestUtils.LogConfigBuilder() - .withSegmentBytes(records.sizeInBytes() * 5) - .withRetentionBytes(records.sizeInBytes() * 10L) + .withSegmentBytes(records.get().sizeInBytes() * 5) + .withRetentionBytes(records.get().sizeInBytes() * 10L) .withCleanupPolicy("compact, delete") .build(); - log = createLog(tmpDir, config); + log = createLog(logDir, config); for (int i = 0; i < 15; i++) { - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); } log.updateHighWatermark(log.logEndOffset()); @@ -337,18 +330,18 @@ public void shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDel @Test public void shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention() throws IOException { - MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); - int recordSize = records.sizeInBytes(); + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); + int recordSize = records.get().sizeInBytes(); LogConfig config = new LogTestUtils.LogConfigBuilder() .withSegmentBytes(recordSize * 2) .withRetentionBytes(recordSize / 2) .withCleanupPolicy("") .withRemoteLogStorageEnable(true) .build(); - log = createLog(tmpDir, config, true); + log = createLog(logDir, config, true); for (int i = 0; i < 10; i++) { - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); } int segmentsBefore = log.numberOfSegments(); @@ -363,23 +356,23 @@ public void shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention() thr @Test public void shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention() throws IOException { long oldTimestamp = mockTime.milliseconds() - 20000; - MemoryRecords oldRecords = TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), oldTimestamp); - int recordSize = oldRecords.sizeInBytes(); + Supplier oldRecords = () -> TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), oldTimestamp); + int recordSize = oldRecords.get().sizeInBytes(); LogConfig logConfig = new LogTestUtils.LogConfigBuilder() .withSegmentBytes(recordSize * 2) .withLocalRetentionMs(5000) .withCleanupPolicy("") .withRemoteLogStorageEnable(true) .build(); - log = createLog(tmpDir, logConfig, true); + log = createLog(logDir, logConfig, true); for (int i = 0; i < 10; i++) { - log.appendAsLeader(oldRecords, 0); + log.appendAsLeader(oldRecords.get(), 0); } - MemoryRecords newRecords = TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), mockTime.milliseconds()); + Supplier newRecords = () -> TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), mockTime.milliseconds()); for (int i = 0; i < 5; i++) { - log.appendAsLeader(newRecords, 0); + log.appendAsLeader(newRecords.get(), 0); } int segmentsBefore = log.numberOfSegments(); @@ -394,14 +387,14 @@ public void shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention() throw @Test public void testLogDeletionAfterDeleteRecords() throws IOException { - MemoryRecords records = TestUtils.singletonRecords("test".getBytes()); + Supplier records = () -> TestUtils.singletonRecords("test".getBytes()); LogConfig logConfig = new LogTestUtils.LogConfigBuilder() - .withSegmentBytes(records.sizeInBytes() * 5) + .withSegmentBytes(records.get().sizeInBytes() * 5) .build(); - log = createLog(tmpDir, logConfig); + log = createLog(logDir, logConfig); for (int i = 0; i < 15; i++) { - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); } assertEquals(3, log.numberOfSegments()); assertEquals(0, log.logStartOffset()); @@ -425,15 +418,15 @@ public void testLogDeletionAfterDeleteRecords() throws IOException { @Test public void testLogDeletionAfterClose() throws IOException { - MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), mockTime.milliseconds() - 1000); + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), mockTime.milliseconds() - 1000); LogConfig logConfig = new LogTestUtils.LogConfigBuilder() - .withSegmentBytes(records.sizeInBytes() * 5) + .withSegmentBytes(records.get().sizeInBytes() * 5) .withSegmentIndexBytes(1000) .withRetentionMs(999) .build(); - log = createLog(tmpDir, logConfig); + log = createLog(logDir, logConfig); - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); assertEquals(1, log.numberOfSegments()); assertEquals(1, epochCache(log).epochEntries().size()); @@ -446,16 +439,16 @@ public void testLogDeletionAfterClose() throws IOException { @Test public void testDeleteOldSegments() throws IOException { - MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), mockTime.milliseconds() - 1000); + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), mockTime.milliseconds() - 1000); LogConfig logConfig = new LogTestUtils.LogConfigBuilder() - .withSegmentBytes(records.sizeInBytes() * 5) + .withSegmentBytes(records.get().sizeInBytes() * 5) .withSegmentIndexBytes(1000) .withRetentionMs(999) .build(); - log = createLog(tmpDir, logConfig); + log = createLog(logDir, logConfig); for (int i = 0; i < 100; i++) { - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); } log.assignEpochStartOffset(0, 40); @@ -494,7 +487,7 @@ public void testDeleteOldSegments() throws IOException { assertEquals(new EpochEntry(1, 100), epochCache(log).epochEntries().get(0), "Epoch entry should be the latest epoch and the leo."); for (int i = 0; i < 100; i++) { - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); } log.delete(); @@ -505,17 +498,17 @@ public void testDeleteOldSegments() throws IOException { @Test public void shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete() throws IOException { - MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); int recordsPerSegment = 5; LogConfig logConfig = new LogTestUtils.LogConfigBuilder() - .withSegmentBytes(records.sizeInBytes() * recordsPerSegment) + .withSegmentBytes(records.get().sizeInBytes() * recordsPerSegment) .withSegmentIndexBytes(1000) .withCleanupPolicy("compact") .build(); - log = createLog(tmpDir, logConfig); + log = createLog(logDir, logConfig); for (int i = 0; i < 15; i++) { - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); } assertEquals(3, log.numberOfSegments()); @@ -538,12 +531,12 @@ public void shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelet @Test public void testFirstUnstableOffsetNoTransactionalData() throws IOException { - MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, + Supplier records = () -> MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("foo".getBytes()), new SimpleRecord("bar".getBytes()), new SimpleRecord("baz".getBytes())); - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); assertEquals(Optional.empty(), log.firstUnstableOffset()); } @@ -551,18 +544,19 @@ public void testFirstUnstableOffsetNoTransactionalData() throws IOException { public void testFirstUnstableOffsetWithTransactionalData() throws IOException { long pid = 137L; short epoch = 5; - int seq = 0; + PrimitiveRef.IntRef seq = PrimitiveRef.ofInt(0); - MemoryRecords records = MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq, - new SimpleRecord("foo".getBytes()), - new SimpleRecord("bar".getBytes()), - new SimpleRecord("baz".getBytes())); + Supplier records = () -> MemoryRecords.withTransactionalRecords( + Compression.NONE, pid, epoch, seq.value, + new SimpleRecord("foo".getBytes()), + new SimpleRecord("bar".getBytes()), + new SimpleRecord("baz".getBytes())); - LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0); + LogAppendInfo firstAppendInfo = log.appendAsLeader(records.get(), 0); assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); - seq += 3; - log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq, + seq.value += 3; + log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq.value, new SimpleRecord("blah".getBytes())), 0); assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); @@ -600,26 +594,31 @@ private UnifiedLog createLog(File dir, LogConfig config) throws IOException { } private UnifiedLog createLog(File dir, LogConfig config, boolean remoteStorageSystemEnable) throws IOException { - return createLog(dir, config, this.brokerTopicStats, mockTime.scheduler, this.mockTime, 0L, 0L, maxTransactionTimeoutMs, - this.producerStateManagerConfig, TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, true, - Optional.empty(), remoteStorageSystemEnable, Optional.empty(), LogOffsetsListener.NO_OP_OFFSETS_LISTENER); + return createLog(dir, config, this.brokerTopicStats, mockTime.scheduler, this.mockTime, + this.producerStateManagerConfig, Optional.empty(), remoteStorageSystemEnable); } - private UnifiedLog createLog(File dir, LogConfig config, BrokerTopicStats brokerTopicStats, Scheduler scheduler, MockTime time, - long logStartOffset, long recoveryPoint, int maxTransactionTimeoutMs, ProducerStateManagerConfig producerStateManagerConfig, - int producerIdExpirationCheckIntervalMs, boolean lastShutdownClean, Optional topicId, boolean remoteStorageSystemEnable, - Optional remoteLogManager, LogOffsetsListener logOffsetsListener) throws IOException { - - UnifiedLog log = LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, logStartOffset, recoveryPoint, - maxTransactionTimeoutMs, producerStateManagerConfig, producerIdExpirationCheckIntervalMs, lastShutdownClean, topicId, - new ConcurrentHashMap<>(), remoteStorageSystemEnable, remoteLogManager, logOffsetsListener); + private UnifiedLog createLog( + File dir, + LogConfig config, + BrokerTopicStats brokerTopicStats, + Scheduler scheduler, + MockTime time, + ProducerStateManagerConfig producerStateManagerConfig, + Optional topicId, + boolean remoteStorageSystemEnable) throws IOException { + + UnifiedLog log = LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, 0L, 0L, + 3600000, producerStateManagerConfig, + TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, true, topicId, + new ConcurrentHashMap<>(), remoteStorageSystemEnable, LogOffsetsListener.NO_OP_OFFSETS_LISTENER); this.logsToClose.add(log); return log; } + // FIXME: remove private MemoryRecords createRecords(long startOffset, int epoch) { return TestUtils.records(List.of(new SimpleRecord("value".getBytes())), startOffset, epoch); } - } diff --git a/test-common/test-common-runtime/src/main/resources/log4j2.yaml b/test-common/test-common-runtime/src/main/resources/log4j2.yaml index be546a18b55e6..5e7efabb143cc 100644 --- a/test-common/test-common-runtime/src/main/resources/log4j2.yaml +++ b/test-common/test-common-runtime/src/main/resources/log4j2.yaml @@ -27,9 +27,9 @@ Configuration: Loggers: Root: - level: INFO + level: DEBUG AppenderRef: - ref: STDOUT Logger: - name: org.apache.kafka - level: INFO + level: DEBUG From a51bcde9a3bcbb2e561101d27a6de7e6cf6a81f0 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 3 Oct 2025 16:00:40 +0800 Subject: [PATCH 05/17] fix some --- .../storage/internals/log/UnifiedLogTest.java | 88 ++++++++++++------- 1 file changed, 54 insertions(+), 34 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index e7e0273e76559..e7e59b39a3043 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.test.TestUtils; -import org.apache.kafka.common.utils.PrimitiveRef; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.transaction.TransactionLogConfig; import org.apache.kafka.server.storage.log.FetchIsolation; @@ -44,6 +43,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import java.util.function.Supplier; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -109,16 +109,20 @@ public void followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCa .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes())) .toArray(SimpleRecord[]::new); + Function recordsForEpoch = i -> { + MemoryRecords recs = MemoryRecords.withRecords(messageIds[i], Compression.NONE, records[i]); + recs.batches().forEach(record -> { + record.setPartitionLeaderEpoch(42); + record.setLastOffset(i); + }); + return recs; + }; + + // Given each message has an offset & epoch, as msgs from leader would try (UnifiedLog log = createLog(logDir, new LogConfig(new Properties()))) { // Given each message has an offset & epoch, as msgs from leader would for (int i = 0; i < records.length; i++) { - long finalI = i; - MemoryRecords recordsForEpoch = MemoryRecords.withRecords(messageIds[i], Compression.NONE, records[i]); - recordsForEpoch.batches().forEach(batch -> { - batch.setPartitionLeaderEpoch(42); - batch.setLastOffset(finalI); - }); - appendAsFollower(log, recordsForEpoch, i); + log.appendAsFollower(recordsForEpoch.apply(i), i); } assertEquals(Optional.of(42), log.latestEpoch()); @@ -154,17 +158,17 @@ public void shouldTruncateLeaderEpochsWhenDeletingSegments() throws IOException @Test public void shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() throws IOException { - MemoryRecords records = TestUtils.singletonRecords("test".getBytes()); + Supplier records = () -> TestUtils.singletonRecords("test".getBytes()); LogConfig config = new LogTestUtils.LogConfigBuilder() - .withSegmentBytes(records.sizeInBytes() * 5) - .withRetentionBytes(records.sizeInBytes() * 10L) + .withSegmentBytes(records.get().sizeInBytes() * 5) + .withRetentionBytes(records.get().sizeInBytes() * 10L) .build(); log = createLog(logDir, config); LeaderEpochFileCache cache = epochCache(log); for (int i = 0; i < 15; i++) { - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); } // Given epochs @@ -214,15 +218,16 @@ public void shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog() throws IO @Test public void shouldDeleteSizeBasedSegments() throws IOException { - MemoryRecords records = TestUtils.singletonRecords("test".getBytes()); + Supplier records = () -> TestUtils.singletonRecords("test".getBytes()); LogConfig config = new LogTestUtils.LogConfigBuilder() - .withSegmentBytes(1024 * 1024 * 5) + .withSegmentBytes(records.get().sizeInBytes() * 5) + .withRetentionBytes(records.get().sizeInBytes() * 10L) .build(); log = createLog(logDir, config); // append some messages to create some segments for (int i = 0; i < 15; i++) { - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); } log.updateHighWatermark(log.logEndOffset()); @@ -232,17 +237,17 @@ public void shouldDeleteSizeBasedSegments() throws IOException { @Test public void shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() throws IOException { - MemoryRecords records = TestUtils.singletonRecords("test".getBytes()); + Supplier records = () -> TestUtils.singletonRecords("test".getBytes()); LogConfig config = new LogTestUtils.LogConfigBuilder() - .withSegmentBytes(records.sizeInBytes() * 5) - .withRetentionBytes(records.sizeInBytes() * 15L) + .withSegmentBytes(records.get().sizeInBytes() * 5) + .withRetentionBytes(records.get().sizeInBytes() * 15L) .build(); log = createLog(logDir, config); // append some messages to create some segments for (int i = 0; i < 15; i++) { - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); } log.updateHighWatermark(log.logEndOffset()); @@ -252,15 +257,15 @@ public void shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() throws IOEx @Test public void shouldDeleteTimeBasedSegmentsReadyToBeDeleted() throws IOException { - MemoryRecords records = TestUtils.singletonRecords("test".getBytes(), 10L); + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), 10L); LogConfig config = new LogTestUtils.LogConfigBuilder() - .withSegmentBytes(records.sizeInBytes() * 15) + .withSegmentBytes(records.get().sizeInBytes() * 15) .withRetentionMs(10000L) .build(); log = createLog(logDir, config); for (int i = 0; i < 15; i++) { - log.appendAsLeader(records, 0); + log.appendAsLeader(records.get(), 0); } log.updateHighWatermark(log.logEndOffset()); @@ -426,15 +431,16 @@ public void testLogDeletionAfterClose() throws IOException { .build(); log = createLog(logDir, logConfig); + // append some messages to create some segments log.appendAsLeader(records.get(), 0); - assertEquals(1, log.numberOfSegments()); - assertEquals(1, epochCache(log).epochEntries().size()); + assertEquals(1, log.numberOfSegments(), "The deleted segments should be gone."); + assertEquals(1, epochCache(log).epochEntries().size(), "Epoch entries should have gone."); log.close(); log.delete(); assertEquals(0, log.numberOfSegments()); - assertEquals(0, epochCache(log).epochEntries().size()); + assertEquals(0, epochCache(log).epochEntries().size(), "Epoch entries should have gone."); } @Test @@ -459,6 +465,7 @@ public void testDeleteOldSegments() throws IOException { assertEquals(numSegments, log.numberOfSegments()); assertEquals(0L, log.logStartOffset()); + // only segments with offset before the current high watermark are eligible for deletion for (long hw = 25; hw <= 30; hw++) { log.updateHighWatermark(hw); log.deleteOldSegments(); @@ -531,41 +538,54 @@ public void shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelet @Test public void testFirstUnstableOffsetNoTransactionalData() throws IOException { - Supplier records = () -> MemoryRecords.withRecords(Compression.NONE, + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(1024 * 1024 * 5) + .build(); + log = createLog(logDir, logConfig); + + MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("foo".getBytes()), new SimpleRecord("bar".getBytes()), new SimpleRecord("baz".getBytes())); - log.appendAsLeader(records.get(), 0); + log.appendAsLeader(records, 0); assertEquals(Optional.empty(), log.firstUnstableOffset()); } @Test public void testFirstUnstableOffsetWithTransactionalData() throws IOException { + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(1024 * 1024 * 5).build(); + log = createLog(logDir, logConfig); + long pid = 137L; short epoch = 5; - PrimitiveRef.IntRef seq = PrimitiveRef.ofInt(0); + int seq = 0; - Supplier records = () -> MemoryRecords.withTransactionalRecords( - Compression.NONE, pid, epoch, seq.value, + // add some transactional records + MemoryRecords records = MemoryRecords.withTransactionalRecords( + Compression.NONE, pid, epoch, seq, new SimpleRecord("foo".getBytes()), new SimpleRecord("bar".getBytes()), new SimpleRecord("baz".getBytes())); - LogAppendInfo firstAppendInfo = log.appendAsLeader(records.get(), 0); + LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0); assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); - seq.value += 3; - log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq.value, + // add more transactional records + seq += 3; + log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq, new SimpleRecord("blah".getBytes())), 0); - assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); + // now transaction is committed LogAppendInfo commitAppendInfo = appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT, mockTime.milliseconds()); + // first unstable offset is not updated until the high watermark is advanced assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); log.updateHighWatermark(commitAppendInfo.lastOffset() + 1); + // now there should be no first unstable offset assertEquals(Optional.empty(), log.firstUnstableOffset()); } From 9b3b5c3d16bc42de52e9b0c00d71e87632b940cb Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 3 Oct 2025 18:05:08 +0800 Subject: [PATCH 06/17] fix closing issue --- .../kafka/storage/internals/log/UnifiedLogTest.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index e7e59b39a3043..2e7e14141e0ca 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.record.EndTransactionMarker; import org.apache.kafka.common.record.MemoryRecords; @@ -65,7 +66,13 @@ public class UnifiedLogTest { public void tearDown() throws IOException { brokerTopicStats.close(); for (UnifiedLog log : logsToClose) { - log.close(); + try { + // some test like testLogDeletionAfterClose and testLogDeletionAfterClose + // they are closed from test so KafkaStorageException is expected. + log.close(); + } catch (KafkaStorageException ignore) { + // ignore + } } Utils.delete(tmpDir); } @@ -460,6 +467,7 @@ public void testDeleteOldSegments() throws IOException { log.assignEpochStartOffset(0, 40); log.assignEpochStartOffset(1, 90); + // segments are not eligible for deletion if no high watermark has been set int numSegments = log.numberOfSegments(); log.deleteOldSegments(); assertEquals(numSegments, log.numberOfSegments()); @@ -579,7 +587,7 @@ public void testFirstUnstableOffsetWithTransactionalData() throws IOException { assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); // now transaction is committed - LogAppendInfo commitAppendInfo = appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT, mockTime.milliseconds()); + LogAppendInfo commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT, mockTime.milliseconds(), 0, 0); // first unstable offset is not updated until the high watermark is advanced assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); From d3da6ce01e216c2b09ae72a98768e89b78dd62d9 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 3 Oct 2025 18:32:25 +0800 Subject: [PATCH 07/17] refactor --- .../storage/internals/log/UnifiedLogTest.java | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index 2e7e14141e0ca..cfb8814ad4304 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -192,8 +192,9 @@ public void shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() throws IOExc @Test public void shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog() throws IOException { + Supplier records = () -> TestUtils.records(List.of(new SimpleRecord("value".getBytes())), 0, 0); LogConfig config = new LogTestUtils.LogConfigBuilder() - .withSegmentBytes(10 * createRecords(0, 0).sizeInBytes()) + .withSegmentBytes(10 * records.get().sizeInBytes()) .build(); log = createLog(logDir, config); LeaderEpochFileCache cache = epochCache(log); @@ -587,7 +588,7 @@ public void testFirstUnstableOffsetWithTransactionalData() throws IOException { assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); // now transaction is committed - LogAppendInfo commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT, mockTime.milliseconds(), 0, 0); + LogAppendInfo commitAppendInfo = appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT, mockTime.milliseconds()); // first unstable offset is not updated until the high watermark is advanced assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); @@ -598,8 +599,10 @@ public void testFirstUnstableOffsetWithTransactionalData() throws IOException { } private void append(int epoch, long startOffset, int count) { + Function records = i -> + TestUtils.records(List.of(new SimpleRecord("value".getBytes())), startOffset + i, epoch); for (int i = 0; i < count; i++) { - log.appendAsFollower(createRecords(startOffset + i, epoch), epoch); + log.appendAsFollower(records.apply(i), epoch); } } @@ -607,11 +610,6 @@ private LeaderEpochFileCache epochCache(UnifiedLog log) { return log.leaderEpochCache(); } - private void appendAsFollower(UnifiedLog log, MemoryRecords records, int leaderEpoch) { - records.batches().forEach(batch -> batch.setPartitionLeaderEpoch(leaderEpoch)); - log.appendAsFollower(records, leaderEpoch); - } - private LogAppendInfo appendEndTxnMarkerAsLeader(UnifiedLog log, long producerId, short producerEpoch, ControlRecordType controlType, long timestamp) throws IOException { MemoryRecords records = MemoryRecords.withEndTransactionMarker(producerId, producerEpoch, new EndTransactionMarker(controlType, 0)); return log.appendAsLeader(records, 0); @@ -644,9 +642,4 @@ private UnifiedLog createLog( this.logsToClose.add(log); return log; } - - // FIXME: remove - private MemoryRecords createRecords(long startOffset, int epoch) { - return TestUtils.records(List.of(new SimpleRecord("value".getBytes())), startOffset, epoch); - } } From 6774954bc7f71f321dd03c238da42a5e3e798d1a Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 3 Oct 2025 19:09:00 +0800 Subject: [PATCH 08/17] all test pass --- .../storage/internals/log/LogTestUtils.java | 49 ++++++++++--------- .../storage/internals/log/UnifiedLogTest.java | 8 +-- .../apache/kafka/common/test/TestUtils.java | 15 ------ 3 files changed, 30 insertions(+), 42 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java index 4c1b06602c209..f433fbd5bd394 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java @@ -18,9 +18,12 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.server.common.RequestLocal; import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -29,7 +32,6 @@ import java.io.IOException; import java.util.Optional; import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class LogTestUtils { @@ -44,27 +46,27 @@ public static LogSegment createSegment(long offset, File logDir, int indexInterv return new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time); } - public static UnifiedLog createLog(File dir, - LogConfig config, - BrokerTopicStats brokerTopicStats, - Scheduler scheduler, - Time time) throws IOException { - return createLog( - dir, - config, - brokerTopicStats, - scheduler, - time, - 0L, // logStartOffset - 0L, // recoveryPoint - 5 * 60 * 1000, // maxTransactionTimeoutMs - new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), - TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, true, // lastShutdownClean - Optional.empty(), // topicId - new ConcurrentHashMap<>(), // numRemainingSegments - false, // remoteStorageSystemEnable - LogOffsetsListener.NO_OP_OFFSETS_LISTENER - ); + public static LogAppendInfo appendEndTxnMarkerAsLeader(UnifiedLog log, + long producerId, + short producerEpoch, + ControlRecordType controlType, + long timestamp, + int coordinatorEpoch, + int leaderEpoch) { + MemoryRecords records = endTxnRecords(controlType, producerId, producerEpoch, 0L, coordinatorEpoch, leaderEpoch, timestamp); + + return log.appendAsLeader(records, leaderEpoch, AppendOrigin.COORDINATOR, RequestLocal.noCaching(), VerificationGuard.SENTINEL); + } + + public static MemoryRecords endTxnRecords(ControlRecordType controlRecordType, + long producerId, + short epoch, + long offset, + int coordinatorEpoch, + int partitionLeaderEpoch, + long timestamp) { + EndTransactionMarker marker = new EndTransactionMarker(controlRecordType, coordinatorEpoch); + return MemoryRecords.withEndTransactionMarker(offset, timestamp, partitionLeaderEpoch, producerId, epoch, marker); } @SuppressWarnings("ParameterNumber") @@ -195,7 +197,6 @@ public LogConfigBuilder withRemoteLogDeleteOnDisable(boolean remoteLogDeleteOnDi return this; } - // 3. 建立一個 build() 方法,它使用 builder 中設定的值來建立最終的 LogConfig 物件 public LogConfig build() { Properties logProps = new Properties(); logProps.put(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(segmentMs)); diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index cfb8814ad4304..b8b9b081ac7f9 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -57,7 +57,7 @@ public class UnifiedLogTest { private final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false); private final MockTime mockTime = new MockTime(); private final int maxTransactionTimeoutMs = 60 * 60 * 1000; - private final ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(maxTransactionTimeoutMs, true); + private final ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(maxTransactionTimeoutMs, false); private final List logsToClose = new ArrayList<>(); private UnifiedLog log; @@ -564,7 +564,8 @@ public void testFirstUnstableOffsetNoTransactionalData() throws IOException { @Test public void testFirstUnstableOffsetWithTransactionalData() throws IOException { LogConfig logConfig = new LogTestUtils.LogConfigBuilder() - .withSegmentBytes(1024 * 1024 * 5).build(); + .withSegmentBytes(1024 * 1024 * 5) + .build(); log = createLog(logDir, logConfig); long pid = 137L; @@ -588,7 +589,8 @@ public void testFirstUnstableOffsetWithTransactionalData() throws IOException { assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); // now transaction is committed - LogAppendInfo commitAppendInfo = appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT, mockTime.milliseconds()); + LogAppendInfo commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, + ControlRecordType.COMMIT, mockTime.milliseconds(), 0, 0); // first unstable offset is not updated until the high watermark is advanced assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java index fbba8b6ef93d1..46c234ee5779c 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java @@ -24,15 +24,8 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.network.SocketServerConfigs; -import org.apache.kafka.raft.QuorumConfig; -import org.apache.kafka.server.config.KRaftConfigs; -import org.apache.kafka.server.config.ReplicationConfigs; -import org.apache.kafka.server.config.ServerConfigs; -import org.apache.kafka.server.config.ServerLogConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,17 +34,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Files; -import java.util.AbstractMap; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; import java.util.Random; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import static java.lang.String.format; From 6a8b0bb314986e30c0c85bb1da2e7ed1c587afaf Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 3 Oct 2025 19:18:48 +0800 Subject: [PATCH 09/17] revert some --- .../apache/kafka/common/test/TestUtils.java | 46 +++++++++++++++---- .../src/main/resources/log4j2.yaml | 4 +- 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java index 46c234ee5779c..55d3527b302b5 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java @@ -40,9 +40,15 @@ import static java.lang.String.format; -public class TestUtils { +/** + * Helper functions for writing unit tests. + *

+ * Package-private: Not intended for use outside {@code org.apache.kafka.common.test}. + */ +class TestUtils { private static final Logger log = LoggerFactory.getLogger(TestUtils.class); + /* A consistent random number generator to make tests repeatable */ public static final Random SEEDED_RANDOM = new Random(192348092834L); public static final String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; @@ -53,12 +59,22 @@ public class TestUtils { private static final long DEFAULT_MAX_WAIT_MS = 15_000; private static final Random RANDOM = new Random(); + /** + * Create an empty file in the default temporary-file directory, using `kafka` as the prefix and `tmp` as the + * suffix to generate its name. + */ public static File tempFile() throws IOException { final File file = Files.createTempFile("kafka", ".tmp").toFile(); file.deleteOnExit(); return file; } + /** + * Generate a random string of letters and digits of the given length + * + * @param len The length of the string + * @return The random string + */ public static String randomString(final int len) { final StringBuilder b = new StringBuilder(); for (int i = 0; i < len; i++) @@ -66,7 +82,11 @@ public static String randomString(final int len) { return b.toString(); } - public static File tempDirectory() { + /** + * Create a temporary relative directory in the specified parent directory with the given prefix. + * + */ + static File tempDirectory() { final File file; String prefix = "kafka-"; try { @@ -86,17 +106,19 @@ public static File tempDirectory() { return file; } - public static File tempRelativeDir(String parent) { - File file = new File(parent, "kafka-" + SEEDED_RANDOM.nextInt(1000000)); - file.mkdirs(); - file.deleteOnExit(); - return file; - } - + /** + * uses default value of 15 seconds for timeout + */ public static void waitForCondition(final Supplier testCondition, final String conditionDetails) throws InterruptedException { waitForCondition(testCondition, DEFAULT_MAX_WAIT_MS, () -> conditionDetails); } + /** + * Wait for condition to be met for at most {@code maxWaitMs} and throw assertion failure otherwise. + * This should be used instead of {@code Thread.sleep} whenever possible as it allows a longer timeout to be used + * without unnecessarily increasing test time (as the condition is checked frequently). The longer timeout is needed to + * avoid transient failures due to slow or overloaded machines. + */ public static void waitForCondition(final Supplier testCondition, final long maxWaitMs, final Supplier conditionDetails) throws InterruptedException { @@ -122,6 +144,12 @@ public static void waitForCondition(final Supplier testCondition, } } + /** + * Wait for condition to be met for at most {@code maxWaitMs} and throw assertion failure otherwise. + * This should be used instead of {@code Thread.sleep} whenever possible as it allows a longer timeout to be used + * without unnecessarily increasing test time (as the condition is checked frequently). The longer timeout is needed to + * avoid transient failures due to slow or overloaded machines. + */ public static void waitForCondition(final Supplier testCondition, final long maxWaitMs, String conditionDetails) throws InterruptedException { diff --git a/test-common/test-common-runtime/src/main/resources/log4j2.yaml b/test-common/test-common-runtime/src/main/resources/log4j2.yaml index 5e7efabb143cc..be546a18b55e6 100644 --- a/test-common/test-common-runtime/src/main/resources/log4j2.yaml +++ b/test-common/test-common-runtime/src/main/resources/log4j2.yaml @@ -27,9 +27,9 @@ Configuration: Loggers: Root: - level: DEBUG + level: INFO AppenderRef: - ref: STDOUT Logger: - name: org.apache.kafka - level: DEBUG + level: INFO From 8b749cc55f9da938425028b64aa01d81b522bf0e Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 3 Oct 2025 19:23:57 +0800 Subject: [PATCH 10/17] remove common test dependency --- .../java/org/apache/kafka/test/TestUtils.java | 87 +++++++++++++++ .../storage/internals/log/UnifiedLogTest.java | 2 +- .../apache/kafka/common/test/TestUtils.java | 103 ------------------ 3 files changed, 88 insertions(+), 104 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 078d006e37a37..523e7fb365ad0 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.feature.Features; import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.message.ApiMessageType; @@ -31,6 +32,12 @@ import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.UnalignedRecords; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.ByteBufferChannel; @@ -742,4 +749,84 @@ public static ApiVersionsResponse createApiVersionsResponse( setZkMigrationEnabled(zkMigrationEnabled). build(); } + + public static MemoryRecords singletonRecords(byte[] value, byte[] key) { + return singletonRecords(value, key, Compression.NONE, RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE); + } + + public static MemoryRecords singletonRecords(byte[] value, long timestamp) { + return singletonRecords(value, null, Compression.NONE, timestamp, RecordBatch.CURRENT_MAGIC_VALUE); + } + + public static MemoryRecords singletonRecords( + byte[] value + ) { + return records(List.of(new SimpleRecord(RecordBatch.NO_TIMESTAMP, null, value)), + RecordBatch.CURRENT_MAGIC_VALUE, + Compression.NONE, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, + 0, + RecordBatch.NO_PARTITION_LEADER_EPOCH + ); + } + + public static MemoryRecords singletonRecords( + byte[] value, + byte[] key, + Compression codec, + long timestamp, + byte magicValue + ) { + return records(List.of(new SimpleRecord(timestamp, key, value)), + magicValue, codec, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, + 0, + RecordBatch.NO_PARTITION_LEADER_EPOCH + ); + } + + public static MemoryRecords singletonRecords(byte[] value, byte[] key, long timestamp) { + return singletonRecords(value, key, Compression.NONE, timestamp, RecordBatch.CURRENT_MAGIC_VALUE); + } + + public static MemoryRecords records(List records) { + return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH); + } + + public static MemoryRecords records(List records, long baseOffset) { + return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, baseOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH); + } + + public static MemoryRecords records(List records, long baseOffset, int partitionLeaderEpoch) { + return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, baseOffset, partitionLeaderEpoch); + } + + public static MemoryRecords records(List records, byte magicValue, Compression compression) { + return records(records, magicValue, compression, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH); + } + + public static MemoryRecords records(List records, + byte magicValue, + Compression compression, + long producerId, + short producerEpoch, + int sequence, + long baseOffset, + int partitionLeaderEpoch) { + ByteBuffer buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records)); + MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, compression, TimestampType.CREATE_TIME, baseOffset, + System.currentTimeMillis(), producerId, producerEpoch, sequence, false, partitionLeaderEpoch); + for (SimpleRecord record : records) { + builder.append(record); + } + return builder.build(); + } } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index b8b9b081ac7f9..b4c9e354ff45e 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.SimpleRecord; -import org.apache.kafka.common.test.TestUtils; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.transaction.TransactionLogConfig; import org.apache.kafka.server.storage.log.FetchIsolation; @@ -32,6 +31,7 @@ import org.apache.kafka.server.util.Scheduler; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java index 55d3527b302b5..4c75272edd4af 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java @@ -16,14 +16,7 @@ */ package org.apache.kafka.common.test; -import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.record.DefaultRecordBatch; -import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.record.MemoryRecordsBuilder; -import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.SimpleRecord; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; @@ -32,9 +25,7 @@ import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.file.Files; -import java.util.List; import java.util.Random; import java.util.function.Supplier; @@ -57,7 +48,6 @@ class TestUtils { private static final long DEFAULT_POLL_INTERVAL_MS = 100; private static final long DEFAULT_MAX_WAIT_MS = 15_000; - private static final Random RANDOM = new Random(); /** * Create an empty file in the default temporary-file directory, using `kafka` as the prefix and `tmp` as the @@ -155,97 +145,4 @@ public static void waitForCondition(final Supplier testCondition, String conditionDetails) throws InterruptedException { waitForCondition(testCondition, maxWaitMs, () -> conditionDetails); } - - public static File randomPartitionLogDir(File parentDir) { - int attempts = 1000; - while (attempts > 0) { - File f = new File(parentDir, "kafka-" + RANDOM.nextInt(1000000)); - if (f.mkdir()) { - f.deleteOnExit(); - return f; - } - attempts--; - } - throw new RuntimeException("Failed to create directory after 1000 attempts"); - } - - public static MemoryRecords singletonRecords(byte[] value, byte[] key) { - return singletonRecords(value, key, Compression.NONE, RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE); - } - - public static MemoryRecords singletonRecords(byte[] value, long timestamp) { - return singletonRecords(value, null, Compression.NONE, timestamp, RecordBatch.CURRENT_MAGIC_VALUE); - } - - public static MemoryRecords singletonRecords( - byte[] value - ) { - return records(List.of(new SimpleRecord(RecordBatch.NO_TIMESTAMP, null, value)), - RecordBatch.CURRENT_MAGIC_VALUE, - Compression.NONE, - RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, - RecordBatch.NO_SEQUENCE, - 0, - RecordBatch.NO_PARTITION_LEADER_EPOCH - ); - } - - public static MemoryRecords singletonRecords( - byte[] value, - byte[] key, - Compression codec, - long timestamp, - byte magicValue - ) { - return records(List.of(new SimpleRecord(timestamp, key, value)), - magicValue, codec, - RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, - RecordBatch.NO_SEQUENCE, - 0, - RecordBatch.NO_PARTITION_LEADER_EPOCH - ); - } - - public static MemoryRecords singletonRecords(byte[] value, byte[] key, long timestamp) { - return singletonRecords(value, key, Compression.NONE, timestamp, RecordBatch.CURRENT_MAGIC_VALUE); - } - - public static MemoryRecords records(List records) { - return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH); - } - - public static MemoryRecords records(List records, long baseOffset) { - return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, baseOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH); - } - - public static MemoryRecords records(List records, long baseOffset, int partitionLeaderEpoch) { - return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, baseOffset, partitionLeaderEpoch); - } - - public static MemoryRecords records(List records, byte magicValue, Compression compression) { - return records(records, magicValue, compression, RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH); - } - - public static MemoryRecords records(List records, - byte magicValue, - Compression compression, - long producerId, - short producerEpoch, - int sequence, - long baseOffset, - int partitionLeaderEpoch) { - ByteBuffer buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records)); - MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, compression, TimestampType.CREATE_TIME, baseOffset, - System.currentTimeMillis(), producerId, producerEpoch, sequence, false, partitionLeaderEpoch); - for (SimpleRecord record : records) { - builder.append(record); - } - return builder.build(); - } } From c9b16a3481ecd983e1776ad7ca9f5bec786773b0 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 3 Oct 2025 19:29:28 +0800 Subject: [PATCH 11/17] add note --- .../src/main/java/org/apache/kafka/common/test/TestUtils.java | 1 + 1 file changed, 1 insertion(+) diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java index 4c75272edd4af..a9e49dd04925e 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java @@ -35,6 +35,7 @@ * Helper functions for writing unit tests. *

* Package-private: Not intended for use outside {@code org.apache.kafka.common.test}. + * Use {@code org/apache/kafka/test/TestUtils} instead. */ class TestUtils { private static final Logger log = LoggerFactory.getLogger(TestUtils.class); From eaababedb9990b56a5be19ec22d160c8bbb4ac5d Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sat, 4 Oct 2025 09:50:27 +0800 Subject: [PATCH 12/17] add comment and remove old test --- .../scala/unit/kafka/log/UnifiedLogTest.scala | 467 ------------------ .../storage/internals/log/UnifiedLogTest.java | 16 +- 2 files changed, 14 insertions(+), 469 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index e6fdf09331bfc..209a292928535 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -3010,475 +3010,8 @@ class UnifiedLogTest { assertFalse(LogTestUtils.hasOffsetOverflow(log)) } - @Test - def testDeleteOldSegments(): Unit = { - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - for (_ <- 0 until 100) - log.appendAsLeader(createRecords, 0) - - log.assignEpochStartOffset(0, 40) - log.assignEpochStartOffset(1, 90) - - // segments are not eligible for deletion if no high watermark has been set - val numSegments = log.numberOfSegments - log.deleteOldSegments() - assertEquals(numSegments, log.numberOfSegments) - assertEquals(0L, log.logStartOffset) - - // only segments with offset before the current high watermark are eligible for deletion - for (hw <- 25 to 30) { - log.updateHighWatermark(hw) - log.deleteOldSegments() - assertTrue(log.logStartOffset <= hw) - log.logSegments.forEach { segment => - val segmentFetchInfo = segment.read(segment.baseOffset, Int.MaxValue) - val segmentLastOffsetOpt = segmentFetchInfo.records.records.asScala.lastOption.map(_.offset) - segmentLastOffsetOpt.foreach { lastOffset => - assertTrue(lastOffset >= hw) - } - } - } - - // expire all segments - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(1, log.numberOfSegments, "The deleted segments should be gone.") - assertEquals(1, epochCache(log).epochEntries.size, "Epoch entries should have gone.") - assertEquals(new EpochEntry(1, 100), epochCache(log).epochEntries.get(0), "Epoch entry should be the latest epoch and the leo.") - - // append some messages to create some segments - for (_ <- 0 until 100) - log.appendAsLeader(createRecords, 0) - - log.delete() - assertEquals(0, log.numberOfSegments, "The number of segments should be 0") - assertEquals(0, log.deleteOldSegments(), "The number of deleted segments should be zero.") - assertEquals(0, epochCache(log).epochEntries.size, "Epoch entries should have gone.") - } - - @Test - def testLogDeletionAfterClose(): Unit = { - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - log.appendAsLeader(createRecords, 0) - - assertEquals(1, log.numberOfSegments, "The deleted segments should be gone.") - assertEquals(1, epochCache(log).epochEntries.size, "Epoch entries should have gone.") - - log.close() - log.delete() - assertEquals(0, log.numberOfSegments, "The number of segments should be 0") - assertEquals(0, epochCache(log).epochEntries.size, "Epoch entries should have gone.") - } - - @Test - def testLogDeletionAfterDeleteRecords(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5) - val log = createLog(logDir, logConfig) - - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - assertEquals(3, log.numberOfSegments, "should have 3 segments") - assertEquals(log.logStartOffset, 0) - log.updateHighWatermark(log.logEndOffset) - - log.maybeIncrementLogStartOffset(1, LogStartOffsetIncrementReason.ClientRecordDeletion) - log.deleteOldSegments() - assertEquals(3, log.numberOfSegments, "should have 3 segments") - assertEquals(log.logStartOffset, 1) - - log.maybeIncrementLogStartOffset(6, LogStartOffsetIncrementReason.ClientRecordDeletion) - log.deleteOldSegments() - assertEquals(2, log.numberOfSegments, "should have 2 segments") - assertEquals(log.logStartOffset, 6) - - log.maybeIncrementLogStartOffset(15, LogStartOffsetIncrementReason.ClientRecordDeletion) - log.deleteOldSegments() - assertEquals(1, log.numberOfSegments, "should have 1 segments") - assertEquals(log.logStartOffset, 15) - } - def epochCache(log: UnifiedLog): LeaderEpochFileCache = log.leaderEpochCache - @Test - def shouldDeleteSizeBasedSegments(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(2,log.numberOfSegments, "should have 2 segments") - } - - @Test - def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15) - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(3,log.numberOfSegments, "should have 3 segments") - } - - @Test - def shouldDeleteTimeBasedSegmentsReadyToBeDeleted(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = 10) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000) - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(1, log.numberOfSegments, "There should be 1 segment remaining") - } - - @Test - def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = mockTime.milliseconds) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000000) - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(3, log.numberOfSegments, "There should be 3 segments remaining") - } - - @Test - def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact") - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - - // mark the oldest segment as older the retention.ms - log.logSegments.asScala.head.setLastModified(mockTime.milliseconds - 20000) - - val segments = log.numberOfSegments - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(segments, log.numberOfSegments, "There should be 3 segments remaining") - } - - @Test - def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L) - val recordSize = createRecords.sizeInBytes - val logConfig = LogTestUtils.createLogConfig( - segmentBytes = recordSize * 2, - localRetentionBytes = recordSize / 2, - cleanupPolicy = "", - remoteLogStorageEnable = true - ) - val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) - - for (_ <- 0 until 10) - log.appendAsLeader(createRecords, 0) - - val segmentsBefore = log.numberOfSegments - log.updateHighWatermark(log.logEndOffset) - log.updateHighestOffsetInRemoteStorage(log.logEndOffset - 1) - val deleteOldSegments = log.deleteOldSegments() - - assertTrue(log.numberOfSegments < segmentsBefore, "Some segments should be deleted due to size retention") - assertTrue(deleteOldSegments > 0, "At least one segment should be deleted") - } - - @Test - def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention(): Unit = { - val oldTimestamp = mockTime.milliseconds - 20000 - def oldRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = oldTimestamp) - val recordSize = oldRecords.sizeInBytes - val logConfig = LogTestUtils.createLogConfig( - segmentBytes = recordSize * 2, - localRetentionMs = 5000, - cleanupPolicy = "", - remoteLogStorageEnable = true - ) - val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) - - for (_ <- 0 until 10) - log.appendAsLeader(oldRecords, 0) - - def newRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = mockTime.milliseconds) - for (_ <- 0 until 5) - log.appendAsLeader(newRecords, 0) - - val segmentsBefore = log.numberOfSegments - - log.updateHighWatermark(log.logEndOffset) - log.updateHighestOffsetInRemoteStorage(log.logEndOffset - 1) - val deleteOldSegments = log.deleteOldSegments() - - assertTrue(log.numberOfSegments < segmentsBefore, "Some segments should be deleted due to time retention") - assertTrue(deleteOldSegments > 0, "At least one segment should be deleted") - } - - @Test - def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact,delete") - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(1, log.numberOfSegments, "There should be 1 segment remaining") - } - - @Test - def shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L) - val recordsPerSegment = 5 - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * recordsPerSegment, retentionMs = 10000, cleanupPolicy = "compact") - val log = createLog(logDir, logConfig, brokerTopicStats) - - // append some messages to create some segments - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - - // Three segments should be created - assertEquals(3, log.logSegments.asScala.count(_ => true)) - log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(recordsPerSegment, LogStartOffsetIncrementReason.ClientRecordDeletion) - - // The first segment, which is entirely before the log start offset, should be deleted - // Of the remaining the segments, the first can overlap the log start offset and the rest must have a base offset - // greater than the start offset - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(2, log.numberOfSegments, "There should be 2 segments remaining") - assertTrue(log.logSegments.asScala.head.baseOffset <= log.logStartOffset) - assertTrue(log.logSegments.asScala.tail.forall(s => s.baseOffset > log.logStartOffset)) - } - - @Test - def shouldApplyEpochToMessageOnAppendIfLeader(): Unit = { - val records = (0 until 50).toArray.map(id => new SimpleRecord(id.toString.getBytes)) - - //Given this partition is on leader epoch 72 - val epoch = 72 - val log = createLog(logDir, new LogConfig(new Properties)) - log.assignEpochStartOffset(epoch, records.length) - - //When appending messages as a leader (i.e. assignOffsets = true) - for (record <- records) - log.appendAsLeader( - MemoryRecords.withRecords(Compression.NONE, record), - epoch - ) - - //Then leader epoch should be set on messages - for (i <- records.indices) { - val read = LogTestUtils.readLog(log, i, 1).records.batches.iterator.next() - assertEquals(72, read.partitionLeaderEpoch, "Should have set leader epoch") - } - } - - @Test - def followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache(): Unit = { - val messageIds = (0 until 50).toArray - val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) - - //Given each message has an offset & epoch, as msgs from leader would - def recordsForEpoch(i: Int): MemoryRecords = { - val recs = MemoryRecords.withRecords(messageIds(i), Compression.NONE, records(i)) - recs.batches.forEach{record => - record.setPartitionLeaderEpoch(42) - record.setLastOffset(i) - } - recs - } - - val log = createLog(logDir, new LogConfig(new Properties)) - - //When appending as follower (assignOffsets = false) - for (i <- records.indices) - log.appendAsFollower(recordsForEpoch(i), i) - - assertEquals(Optional.of(42), log.latestEpoch) - } - - @Test - def shouldTruncateLeaderEpochsWhenDeletingSegments(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) - val log = createLog(logDir, logConfig) - val cache = epochCache(log) - - // Given three segments of 5 messages each - for (_ <- 0 until 15) { - log.appendAsLeader(createRecords, 0) - } - - //Given epochs - cache.assign(0, 0) - cache.assign(1, 5) - cache.assign(2, 10) - - //When first segment is removed - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - - //The oldest epoch entry should have been removed - assertEquals(util.List.of(new EpochEntry(1, 5), new EpochEntry(2, 10)), cache.epochEntries) - } - - @Test - def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) - val log = createLog(logDir, logConfig) - val cache = epochCache(log) - - // Given three segments of 5 messages each - for (_ <- 0 until 15) { - log.appendAsLeader(createRecords, 0) - } - - //Given epochs - cache.assign(0, 0) - cache.assign(1, 7) - cache.assign(2, 10) - - //When first segment removed (up to offset 5) - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - - //The first entry should have gone from (0,0) => (0,5) - assertEquals(util.List.of(new EpochEntry(0, 5), new EpochEntry(1, 7), new EpochEntry(2, 10)), cache.epochEntries) - } - - @Test - def shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog(): Unit = { - def createRecords(startOffset: Long, epoch: Int): MemoryRecords = { - TestUtils.records(Seq(new SimpleRecord("value".getBytes)), - baseOffset = startOffset, partitionLeaderEpoch = epoch) - } - - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 10 * createRecords(0, 0).sizeInBytes) - val log = createLog(logDir, logConfig) - val cache = epochCache(log) - - def append(epoch: Int, startOffset: Long, count: Int): Unit = { - for (i <- 0 until count) - log.appendAsFollower(createRecords(startOffset + i, epoch), epoch) - } - - //Given 2 segments, 10 messages per segment - append(epoch = 0, startOffset = 0, count = 10) - append(epoch = 1, startOffset = 10, count = 6) - append(epoch = 2, startOffset = 16, count = 4) - - assertEquals(2, log.numberOfSegments) - assertEquals(20, log.logEndOffset) - - //When truncate to LEO (no op) - log.truncateTo(log.logEndOffset) - - //Then no change - assertEquals(3, cache.epochEntries.size) - - //When truncate - log.truncateTo(11) - - //Then no change - assertEquals(2, cache.epochEntries.size) - - //When truncate - log.truncateTo(10) - - //Then - assertEquals(1, cache.epochEntries.size) - - //When truncate all - log.truncateTo(0) - - //Then - assertEquals(0, cache.epochEntries.size) - } - - @Test - def testFirstUnstableOffsetNoTransactionalData(): Unit = { - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - - val records = MemoryRecords.withRecords(Compression.NONE, - new SimpleRecord("foo".getBytes), - new SimpleRecord("bar".getBytes), - new SimpleRecord("baz".getBytes)) - - log.appendAsLeader(records, 0) - assertEquals(Optional.empty, log.firstUnstableOffset) - } - - @Test - def testFirstUnstableOffsetWithTransactionalData(): Unit = { - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - - val pid = 137L - val epoch = 5.toShort - var seq = 0 - - // add some transactional records - val records = MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq, - new SimpleRecord("foo".getBytes), - new SimpleRecord("bar".getBytes), - new SimpleRecord("baz".getBytes)) - - val firstAppendInfo = log.appendAsLeader(records, 0) - assertEquals(Optional.of(firstAppendInfo.firstOffset), log.firstUnstableOffset) - - // add more transactional records - seq += 3 - log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq, - new SimpleRecord("blah".getBytes)), 0) - - // LSO should not have changed - assertEquals(Optional.of(firstAppendInfo.firstOffset), log.firstUnstableOffset) - - // now transaction is committed - val commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT, mockTime.milliseconds()) - - // first unstable offset is not updated until the high watermark is advanced - assertEquals(Optional.of(firstAppendInfo.firstOffset), log.firstUnstableOffset) - log.updateHighWatermark(commitAppendInfo.lastOffset + 1) - - // now there should be no first unstable offset - assertEquals(Optional.empty, log.firstUnstableOffset) - } - @Test def testReadCommittedWithConcurrentHighWatermarkUpdates(): Unit = { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index b4c9e354ff45e..af43b929b2dd5 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -116,6 +116,7 @@ public void followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCa .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes())) .toArray(SimpleRecord[]::new); + //Given each message has an offset & epoch, as msgs from leader would Function recordsForEpoch = i -> { MemoryRecords recs = MemoryRecords.withRecords(messageIds[i], Compression.NONE, records[i]); recs.batches().forEach(record -> { @@ -125,7 +126,6 @@ public void followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCa return recs; }; - // Given each message has an offset & epoch, as msgs from leader would try (UnifiedLog log = createLog(logDir, new LogConfig(new Properties()))) { // Given each message has an offset & epoch, as msgs from leader would for (int i = 0; i < records.length; i++) { @@ -147,6 +147,7 @@ public void shouldTruncateLeaderEpochsWhenDeletingSegments() throws IOException log = createLog(logDir, config); LeaderEpochFileCache cache = epochCache(log); + // Given three segments of 5 messages each for (int i = 0; i < 15; i++) { log.appendAsLeader(records.get(), 0); } @@ -160,6 +161,7 @@ public void shouldTruncateLeaderEpochsWhenDeletingSegments() throws IOException log.updateHighWatermark(log.logEndOffset()); log.deleteOldSegments(); + //The oldest epoch entry should have been removed assertEquals(List.of(new EpochEntry(1, 5), new EpochEntry(2, 10)), cache.epochEntries()); } @@ -174,6 +176,7 @@ public void shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() throws IOExc log = createLog(logDir, config); LeaderEpochFileCache cache = epochCache(log); + // Given three segments of 5 messages each for (int i = 0; i < 15; i++) { log.appendAsLeader(records.get(), 0); } @@ -187,6 +190,7 @@ public void shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() throws IOExc log.updateHighWatermark(log.logEndOffset()); log.deleteOldSegments(); + //The first entry should have gone from (0,0) => (0,5) assertEquals(List.of(new EpochEntry(0, 5), new EpochEntry(1, 7), new EpochEntry(2, 10)), cache.epochEntries()); } @@ -199,6 +203,7 @@ public void shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog() throws IO log = createLog(logDir, config); LeaderEpochFileCache cache = epochCache(log); + //Given 2 segments, 10 messages per segment append(0, 0, 10); append(1, 10, 6); append(2, 16, 4); @@ -219,6 +224,7 @@ public void shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog() throws IO // When truncate log.truncateTo(10); assertEquals(1, cache.epochEntries().size()); + // When truncate all log.truncateTo(0); assertEquals(0, cache.epochEntries().size()); @@ -272,6 +278,7 @@ public void shouldDeleteTimeBasedSegmentsReadyToBeDeleted() throws IOException { .build(); log = createLog(logDir, config); + // append some messages to create some segments for (int i = 0; i < 15; i++) { log.appendAsLeader(records.get(), 0); } @@ -290,6 +297,7 @@ public void shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() throws IO .build(); log = createLog(logDir, logConfig); + // append some messages to create some segments for (int i = 0; i < 15; i++) { log.appendAsLeader(records.get(), 0); } @@ -309,10 +317,12 @@ public void shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() throws IOExc .build(); log = createLog(logDir, config); + // append some messages to create some segments for (int i = 0; i < 15; i++) { log.appendAsLeader(records.get(), 0); } + // mark the oldest segment as older the retention.ms log.logSegments().iterator().next().setLastModified(mockTime.milliseconds() - 20000); int segments = log.numberOfSegments(); @@ -332,6 +342,7 @@ public void shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDel log = createLog(logDir, config); + // append some messages to create some segments for (int i = 0; i < 15; i++) { log.appendAsLeader(records.get(), 0); } @@ -461,6 +472,7 @@ public void testDeleteOldSegments() throws IOException { .build(); log = createLog(logDir, logConfig); + // append some messages to create some segments for (int i = 0; i < 100; i++) { log.appendAsLeader(records.get(), 0); } @@ -487,7 +499,6 @@ public void testDeleteOldSegments() throws IOException { } catch (IOException e) { throw new RuntimeException(e); } - // FIXME: think Optional lastBatch = Optional.empty(); for (RecordBatch batch : segmentFetchInfo.records.batches()) { lastBatch = Optional.of(batch); @@ -523,6 +534,7 @@ public void shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelet .build(); log = createLog(logDir, logConfig); + // append some messages to create some segments for (int i = 0; i < 15; i++) { log.appendAsLeader(records.get(), 0); } From 8509b833c0dbbdfb8569e86cb57e17ca6b3cd812 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sat, 4 Oct 2025 09:51:55 +0800 Subject: [PATCH 13/17] remove unused code --- .../apache/kafka/storage/internals/log/UnifiedLogTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index af43b929b2dd5..e1f0a242f9dda 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -624,11 +624,6 @@ private LeaderEpochFileCache epochCache(UnifiedLog log) { return log.leaderEpochCache(); } - private LogAppendInfo appendEndTxnMarkerAsLeader(UnifiedLog log, long producerId, short producerEpoch, ControlRecordType controlType, long timestamp) throws IOException { - MemoryRecords records = MemoryRecords.withEndTransactionMarker(producerId, producerEpoch, new EndTransactionMarker(controlType, 0)); - return log.appendAsLeader(records, 0); - } - private UnifiedLog createLog(File dir, LogConfig config) throws IOException { return createLog(dir, config, false); } From da9905fc0691373b8d73274a65faee793f540c70 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sat, 4 Oct 2025 09:55:40 +0800 Subject: [PATCH 14/17] fix build --- .../org/apache/kafka/storage/internals/log/UnifiedLogTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index e1f0a242f9dda..6a5c338acc5e8 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.common.record.ControlRecordType; -import org.apache.kafka.common.record.EndTransactionMarker; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.SimpleRecord; From 99bddaf24fa17e1b3f178fd4f365bafadcea3344 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sat, 4 Oct 2025 10:08:21 +0800 Subject: [PATCH 15/17] improve test --- .../kafka/storage/internals/log/UnifiedLogTest.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index 6a5c338acc5e8..e8cbeeba36d66 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; -import org.apache.kafka.common.errors.KafkaStorageException; import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.RecordBatch; @@ -65,13 +64,7 @@ public class UnifiedLogTest { public void tearDown() throws IOException { brokerTopicStats.close(); for (UnifiedLog log : logsToClose) { - try { - // some test like testLogDeletionAfterClose and testLogDeletionAfterClose - // they are closed from test so KafkaStorageException is expected. - log.close(); - } catch (KafkaStorageException ignore) { - // ignore - } + log.close(); } Utils.delete(tmpDir); } @@ -448,6 +441,8 @@ public void testLogDeletionAfterClose() throws IOException { .withRetentionMs(999) .build(); log = createLog(logDir, logConfig); + // avoid close after test because it is closed in this test + logsToClose.remove(log); // append some messages to create some segments log.appendAsLeader(records.get(), 0); @@ -470,6 +465,8 @@ public void testDeleteOldSegments() throws IOException { .withRetentionMs(999) .build(); log = createLog(logDir, logConfig); + // avoid close after test because it is closed in this test + logsToClose.remove(log); // append some messages to create some segments for (int i = 0; i < 100; i++) { From 272cefeb1dbbc531dc579076b84ad8debabc0ac4 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sat, 4 Oct 2025 10:32:23 +0800 Subject: [PATCH 16/17] issue? --- .../apache/kafka/storage/internals/log/UnifiedLogTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index e8cbeeba36d66..db67cac6bd9c7 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -465,7 +465,7 @@ public void testDeleteOldSegments() throws IOException { .withRetentionMs(999) .build(); log = createLog(logDir, logConfig); - // avoid close after test because it is closed in this test + // Avoid close after test because it is deleted in this test. See line 517. logsToClose.remove(log); // append some messages to create some segments @@ -513,6 +513,10 @@ public void testDeleteOldSegments() throws IOException { log.appendAsLeader(records.get(), 0); } + // Bug? Due to log.delete() we can't close the unifiedLog + // because we delete log and the memory map is closed. + // When we close UnifiedLog, it will check that memory map again and + // will throw KafkaStorageException. log.delete(); assertEquals(0, log.numberOfSegments(), "The number of segments should be 0"); assertEquals(0, log.deleteOldSegments(), "The number of deleted segments should be zero."); From a8d0bbaf0f60cd9f7d5b58a407a149ab03576749 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sat, 4 Oct 2025 12:28:36 +0800 Subject: [PATCH 17/17] Revert "issue?" This reverts commit 272cefeb1dbbc531dc579076b84ad8debabc0ac4. --- .../apache/kafka/storage/internals/log/UnifiedLogTest.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index db67cac6bd9c7..e8cbeeba36d66 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -465,7 +465,7 @@ public void testDeleteOldSegments() throws IOException { .withRetentionMs(999) .build(); log = createLog(logDir, logConfig); - // Avoid close after test because it is deleted in this test. See line 517. + // avoid close after test because it is closed in this test logsToClose.remove(log); // append some messages to create some segments @@ -513,10 +513,6 @@ public void testDeleteOldSegments() throws IOException { log.appendAsLeader(records.get(), 0); } - // Bug? Due to log.delete() we can't close the unifiedLog - // because we delete log and the memory map is closed. - // When we close UnifiedLog, it will check that memory map again and - // will throw KafkaStorageException. log.delete(); assertEquals(0, log.numberOfSegments(), "The number of segments should be 0"); assertEquals(0, log.deleteOldSegments(), "The number of deleted segments should be zero.");