Skip to content

Commit 39a674d

Browse files
authored
feat(snapshot_read): snapshot-read cache (#2453) (#2454)
feat(snapshot_read): snapshot read cache Signed-off-by: Robin Han <[email protected]>
1 parent 92e837d commit 39a674d

File tree

20 files changed

+518
-40
lines changed

20 files changed

+518
-40
lines changed

bin/kafka-server-start.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
4747
fi
4848

4949
if [ "x$KAFKA_OPTS" = "x" ]; then
50-
export KAFKA_OPTS="-Dio.netty.allocator.maxOrder=11"
50+
export KAFKA_OPTS="-XX:+ExitOnOutOfMemoryError -XX:+HeapDumpOnOutOfMemoryError -Dio.netty.allocator.maxOrder=11"
5151
fi
5252

5353
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

core/src/main/scala/kafka/server/MetadataCache.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.kafka.admin.BrokerMetadata
2222
import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
2323
import org.apache.kafka.common.network.ListenerName
2424
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
25+
import org.apache.kafka.image.MetadataImage
2526
import org.apache.kafka.metadata.BrokerRegistration
2627
import org.apache.kafka.server.common.automq.AutoMQVersion
2728
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}
@@ -126,6 +127,8 @@ trait MetadataCache {
126127
def getValue(key: String): ByteBuffer
127128

128129
def getStreamEndOffset(streamId: Long): OptionalLong
130+
131+
def safeRun[T](func: Function[MetadataImage, T]): T
129132
// AutoMQ inject end
130133
}
131134

core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ class KRaftMetadataCache(
623623
_currentImage.features().autoMQVersion()
624624
}
625625

626-
def safeRun[T](func: Function[MetadataImage, T]): T = {
626+
override def safeRun[T](func: Function[MetadataImage, T]): T = {
627627
val image = retainedImage()
628628
try {
629629
func.apply(image)

core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.kafka.common.network.ListenerName
4040
import org.apache.kafka.common.protocol.Errors
4141
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest}
4242
import org.apache.kafka.common.security.auth.SecurityProtocol
43+
import org.apache.kafka.image.MetadataImage
4344
import org.apache.kafka.metadata.BrokerRegistration
4445
import org.apache.kafka.server.common.automq.AutoMQVersion
4546
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
@@ -736,6 +737,10 @@ class ZkMetadataCache(
736737
override def getStreamEndOffset(streamId: Long): OptionalLong = {
737738
throw new UnsupportedOperationException()
738739
}
740+
741+
override def safeRun[T](func: Function[MetadataImage, T]): T = {
742+
throw new UnsupportedOperationException()
743+
}
739744
// AutoMQ inject end
740745

741746
}

s3stream/src/main/java/com/automq/stream/api/ReadOptions.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class ReadOptions {
2727
private boolean fastRead;
2828
private boolean pooledBuf;
2929
private boolean prioritizedRead;
30+
private boolean snapshotRead;
3031

3132
public static Builder builder() {
3233
return new Builder();
@@ -44,6 +45,15 @@ public boolean prioritizedRead() {
4445
return prioritizedRead;
4546
}
4647

48+
public boolean snapshotRead() {
49+
return snapshotRead;
50+
}
51+
52+
public ReadOptions snapshotRead(boolean snapshotRead) {
53+
this.snapshotRead = snapshotRead;
54+
return this;
55+
}
56+
4757
public static class Builder {
4858
private final ReadOptions options = new ReadOptions();
4959

@@ -68,6 +78,11 @@ public Builder prioritizedRead(boolean prioritizedRead) {
6878
return this;
6979
}
7080

81+
public Builder snapshotRead(boolean snapshotRead) {
82+
options.snapshotRead = snapshotRead;
83+
return this;
84+
}
85+
7186
public ReadOptions build() {
7287
return options;
7388
}

s3stream/src/main/java/com/automq/stream/s3/Config.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public class Config {
5959
private int refillPeriodMs = 10;
6060
private long objectRetentionTimeInSecond = 10 * 60; // 10min
6161
private boolean failoverEnable = false;
62+
private boolean snapshotReadEnable = false;
6263
private Supplier<Version> version = () -> {
6364
throw new UnsupportedOperationException();
6465
};
@@ -324,6 +325,15 @@ public boolean failoverEnable() {
324325
return failoverEnable;
325326
}
326327

328+
public Config snapshotReadEnable(boolean snapshotReadEnable) {
329+
this.snapshotReadEnable = snapshotReadEnable;
330+
return this;
331+
}
332+
333+
public boolean snapshotReadEnable() {
334+
return snapshotReadEnable;
335+
}
336+
327337
public Config version(Supplier<Version> version) {
328338
this.version = version;
329339
return this;

s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public CompletableFuture<DataBlockGroup> read(ReadOptions readOptions, DataBlock
159159
}
160160

161161
void asyncGetBasicObjectInfo() {
162-
int guessIndexBlockSize = 1024 + (int) (metadata.objectSize() / (1024 * 1024 /* 1MB */) * 36 /* index unit size*/);
162+
int guessIndexBlockSize = 8192 + (int) (metadata.objectSize() / (1024 * 1024 /* 1MB */) * 36 /* index unit size*/);
163163
asyncGetBasicObjectInfo0(Math.max(0, metadata.objectSize() - guessIndexBlockSize), true);
164164
}
165165

@@ -578,6 +578,10 @@ private static int check(ByteBuf buf) {
578578
}
579579

580580
public CloseableIterator<StreamRecordBatch> iterator() {
581+
return iterator(true);
582+
}
583+
584+
public CloseableIterator<StreamRecordBatch> iterator(boolean copy) {
581585
ByteBuf buf = this.buf.duplicate();
582586
AtomicInteger currentBlockRecordCount = new AtomicInteger(0);
583587
AtomicInteger remainingRecordCount = new AtomicInteger(recordCount);
@@ -599,7 +603,7 @@ public StreamRecordBatch next() {
599603
buf.skipBytes(4);
600604
}
601605
currentBlockRecordCount.decrementAndGet();
602-
return StreamRecordBatchCodec.duplicateDecode(buf);
606+
return copy ? StreamRecordBatchCodec.duplicateDecode(buf) : StreamRecordBatchCodec.sliceRetainDecode(buf);
603607
}
604608

605609
@Override

s3stream/src/main/java/com/automq/stream/s3/S3Storage.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.automq.stream.s3.cache.LogCache;
2525
import com.automq.stream.s3.cache.ReadDataBlock;
2626
import com.automq.stream.s3.cache.S3BlockCache;
27+
import com.automq.stream.s3.cache.SnapshotReadCache;
2728
import com.automq.stream.s3.context.AppendContext;
2829
import com.automq.stream.s3.context.FetchContext;
2930
import com.automq.stream.s3.failover.Failover;
@@ -99,6 +100,7 @@ public class S3Storage implements Storage {
99100
* WAL log cache
100101
*/
101102
private final LogCache deltaWALCache;
103+
private final LogCache snapshotReadCache;
102104
/**
103105
* WAL out of order callback sequencer. {@link #streamCallbackLocks} will ensure the memory safety.
104106
*/
@@ -150,7 +152,16 @@ public S3Storage(Config config, WriteAheadLog deltaWAL, StreamManager streamMana
150152
this.maxDeltaWALCacheSize = config.walCacheSize();
151153
this.deltaWAL = deltaWAL;
152154
this.blockCache = blockCache;
153-
this.deltaWALCache = new LogCache(config.walCacheSize(), config.walUploadThreshold(), config.maxStreamNumPerStreamSetObject());
155+
long deltaWALCacheSize = config.walCacheSize();
156+
long snapshotReadCacheSize = 0;
157+
if (config.snapshotReadEnable()) {
158+
deltaWALCacheSize = Math.max(config.walCacheSize() / 3, 10L * 1024 * 1024);
159+
snapshotReadCacheSize = Math.max(config.walCacheSize() / 3 * 2, 10L * 1024 * 1024);
160+
}
161+
this.deltaWALCache = new LogCache(deltaWALCacheSize, config.walUploadThreshold(), config.maxStreamNumPerStreamSetObject());
162+
this.snapshotReadCache = new LogCache(snapshotReadCacheSize, Math.max(snapshotReadCacheSize / 6, 1));
163+
S3StreamMetricsManager.registerDeltaWalCacheSizeSupplier(() -> deltaWALCache.size() + snapshotReadCache.size());
164+
SnapshotReadCache.instance().setup(this.snapshotReadCache, objectStorage);
154165
this.streamManager = streamManager;
155166
this.objectManager = objectManager;
156167
this.objectStorage = objectStorage;
@@ -547,13 +558,15 @@ public CompletableFuture<ReadDataBlock> read(FetchContext context,
547558
return cf;
548559
}
549560

561+
@SuppressWarnings({"checkstyle:npathcomplexity"})
550562
@WithSpan
551563
private CompletableFuture<ReadDataBlock> read0(FetchContext context,
552564
@SpanAttribute long streamId,
553565
@SpanAttribute long startOffset,
554566
@SpanAttribute long endOffset,
555567
@SpanAttribute int maxBytes) {
556-
List<StreamRecordBatch> logCacheRecords = deltaWALCache.get(context, streamId, startOffset, endOffset, maxBytes);
568+
LogCache firstCache = context.readOptions().snapshotRead() ? snapshotReadCache : deltaWALCache;
569+
List<StreamRecordBatch> logCacheRecords = firstCache.get(context, streamId, startOffset, endOffset, maxBytes);
557570
if (!logCacheRecords.isEmpty() && logCacheRecords.get(0).getBaseOffset() <= startOffset) {
558571
return CompletableFuture.completedFuture(new ReadDataBlock(logCacheRecords, CacheAccessType.DELTA_WAL_CACHE_HIT));
559572
}

s3stream/src/main/java/com/automq/stream/s3/S3Stream.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,12 +224,16 @@ private CompletableFuture<AppendResult> append0(AppendContext context, RecordBat
224224
});
225225
}
226226

227+
@SuppressWarnings({"checkstyle:npathcomplexity"})
227228
@Override
228229
@WithSpan
229230
public CompletableFuture<FetchResult> fetch(FetchContext context,
230231
@SpanAttribute long startOffset,
231232
@SpanAttribute long endOffset,
232233
@SpanAttribute int maxBytes) {
234+
if (snapshotRead()) {
235+
context.readOptions().snapshotRead(true);
236+
}
233237
TimerUtil timerUtil = new TimerUtil();
234238
readLock.lock();
235239
try {

s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,23 @@ public static StreamRecordBatch duplicateDecode(ByteBuf buf) {
7474
* Decode a stream record batch from a byte buffer and move the reader index.
7575
* The returned stream record batch shares the payload buffer with the input buffer.
7676
*/
77-
public static StreamRecordBatch decode(ByteBuf buf) {
77+
public static StreamRecordBatch decode(ByteBuf buf, boolean retain) {
7878
buf.readByte(); // magic
7979
long streamId = buf.readLong();
8080
long epoch = buf.readLong();
8181
long baseOffset = buf.readLong();
8282
int lastOffsetDelta = buf.readInt();
8383
int payloadLength = buf.readInt();
84-
ByteBuf payload = buf.slice(buf.readerIndex(), payloadLength);
84+
ByteBuf payload = retain ? buf.retainedSlice(buf.readerIndex(), payloadLength) : buf.slice(buf.readerIndex(), payloadLength);
8585
buf.skipBytes(payloadLength);
8686
return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload);
8787
}
88+
89+
public static StreamRecordBatch decode(ByteBuf buf) {
90+
return decode(buf, false);
91+
}
92+
93+
public static StreamRecordBatch sliceRetainDecode(ByteBuf buf) {
94+
return decode(buf, true);
95+
}
8896
}

0 commit comments

Comments
 (0)