From 42bc519f85fe164e689d6bfe28ebc54536ae9fd1 Mon Sep 17 00:00:00 2001
From: CLFutureX <775523362@qq.com>
Date: Tue, 6 Aug 2024 13:28:58 +0800
Subject: [PATCH 1/3] feat(s3stream): optimization handler chain

---
 .../kafka/log/stream/s3/DefaultS3Client.java    |  9 +++++----
 .../java/com/automq/stream/s3/S3Storage.java    | 10 +++++-----
 .../stream/s3/failover/StorageHandlerChain.java | 17 +++++++++++++++++
 .../com/automq/stream/s3/S3StorageTest.java     |  4 ++--
 4 files changed, 29 insertions(+), 11 deletions(-)
 create mode 100644 s3stream/src/main/java/com/automq/stream/s3/failover/StorageHandlerChain.java

diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
index b057741fe7..c6d8d19371 100644
--- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
+++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
@@ -29,6 +29,7 @@
 import com.automq.stream.s3.failover.ForceCloseStorageFailureHandler;
 import com.automq.stream.s3.failover.HaltStorageFailureHandler;
 import com.automq.stream.s3.failover.StorageFailureHandlerChain;
+import com.automq.stream.s3.failover.StorageHandlerChain;
 import com.automq.stream.s3.index.LocalStreamRangeIndexCache;
 import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
 import com.automq.stream.s3.objects.ObjectManager;
@@ -125,12 +126,12 @@ public void start() {
         this.blockCache = new StreamReaders(this.config.blockCacheSize(), objectManager, objectStorage, objectReaderFactory);
         this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionobjectStorage);
         this.writeAheadLog = buildWAL();
-        StorageFailureHandlerChain storageFailureHandler = new StorageFailureHandlerChain();
-        this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, objectStorage, storageFailureHandler);
+        StorageHandlerChain storageHandlerChain = new StorageFailureHandlerChain();
+        this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, objectStorage, storageHandlerChain);
         // stream object compactions share the same object storage with stream set object compactions
         this.streamClient = new S3StreamClient(this.streamManager, this.storage, this.objectManager, compactionobjectStorage, this.config, networkInboundLimiter, networkOutboundLimiter);
-        storageFailureHandler.addHandler(new ForceCloseStorageFailureHandler(streamClient));
-        storageFailureHandler.addHandler(new HaltStorageFailureHandler());
+        storageHandlerChain.addHandler(new ForceCloseStorageFailureHandler(streamClient));
+        storageHandlerChain.addHandler(new HaltStorageFailureHandler());
         this.streamClient.registerStreamLifeCycleListener(localIndexCache);
         this.kvClient = new ControllerKVClient(this.requestSender);
         this.failover = failover();
diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
index 76373b37dd..fa76583cd4 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
@@ -19,7 +19,7 @@
 import com.automq.stream.s3.context.AppendContext;
 import com.automq.stream.s3.context.FetchContext;
 import com.automq.stream.s3.failover.Failover;
-import com.automq.stream.s3.failover.StorageFailureHandler;
+import com.automq.stream.s3.failover.StorageHandlerChain;
 import com.automq.stream.s3.metadata.StreamMetadata;
 import com.automq.stream.s3.metrics.S3StreamMetricsManager;
 import com.automq.stream.s3.metrics.TimerUtil;
@@ -112,7 +112,7 @@ public class S3Storage implements Storage {
     private final ObjectManager objectManager;
     private final ObjectStorage objectStorage;
     private final S3BlockCache blockCache;
-    private final StorageFailureHandler storageFailureHandler;
+    private final StorageHandlerChain storageHandlerChain;
     /**
      * Stream callback locks. Used to ensure the stream callbacks will not be called concurrently.
      *
@@ -128,7 +128,7 @@ public class S3Storage implements Storage {
 
     @SuppressWarnings("this-escape")
     public S3Storage(Config config, WriteAheadLog deltaWAL, StreamManager streamManager, ObjectManager objectManager,
-        S3BlockCache blockCache, ObjectStorage objectStorage, StorageFailureHandler storageFailureHandler) {
+        S3BlockCache blockCache, ObjectStorage objectStorage, StorageHandlerChain storageHandlerChain) {
         this.config = config;
         this.maxDeltaWALCacheSize = config.walCacheSize();
         this.deltaWAL = deltaWAL;
@@ -137,7 +137,7 @@ public S3Storage(Config config, WriteAheadLog deltaWAL, StreamManager streamMana
         this.streamManager = streamManager;
         this.objectManager = objectManager;
         this.objectStorage = objectStorage;
-        this.storageFailureHandler = storageFailureHandler;
+        this.storageHandlerChain = storageHandlerChain;
         this.drainBackoffTask = this.backgroundExecutor.scheduleWithFixedDelay(this::tryDrainBackoffRecords, 100, 100, TimeUnit.MILLISECONDS);
         S3StreamMetricsManager.registerInflightWALUploadTasksCountSupplier(this.inflightWALUploadTasks::size);
         S3StreamMetricsManager.registerDeltaWalPendingUploadBytesSupplier(this.pendingUploadBytes::get);
@@ -468,7 +468,7 @@ public boolean append0(AppendContext context, WalWriteRequest request, boolean f
         appendResult.future().whenComplete((nil, ex) -> {
             if (ex != null) {
                 LOGGER.error("append WAL fail, request {}", request, ex);
-                storageFailureHandler.handle(ex);
+                storageHandlerChain.handle(ex);
                 return;
             }
             handleAppendCallback(request);
diff --git a/s3stream/src/main/java/com/automq/stream/s3/failover/StorageHandlerChain.java b/s3stream/src/main/java/com/automq/stream/s3/failover/StorageHandlerChain.java
new file mode 100644
index 0000000000..66dbfb01d2
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/s3/failover/StorageHandlerChain.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2024, AutoMQ HK Limited.
+ *
+ * The use of this file is governed by the Business Source License,
+ * as detailed in the file "/LICENSE.S3Stream" included in this repository.
+ *
+ * As of the Change Date specified in that file, in accordance with
+ * the Business Source License, use of this software will be governed
+ * by the Apache License, Version 2.0
+ */
+
+package com.automq.stream.s3.failover;
+
+public interface StorageHandlerChain extends StorageFailureHandler{
+    void addHandler(StorageFailureHandler handler);
+
+}
diff --git a/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java b/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java
index b3ae931517..c00d6eac6f 100644
--- a/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java
+++ b/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java
@@ -15,7 +15,7 @@
 import com.automq.stream.s3.cache.ReadDataBlock;
 import com.automq.stream.s3.cache.blockcache.DefaultObjectReaderFactory;
 import com.automq.stream.s3.cache.blockcache.StreamReaders;
-import com.automq.stream.s3.failover.StorageFailureHandler;
+import com.automq.stream.s3.failover.StorageHandlerChain;
 import com.automq.stream.s3.metadata.StreamMetadata;
 import com.automq.stream.s3.metadata.StreamState;
 import com.automq.stream.s3.model.StreamRecordBatch;
@@ -81,7 +81,7 @@ public void setup() {
         objectStorage = new MemoryObjectStorage();
         storage = new S3Storage(config, wal,
             streamManager, objectManager, new StreamReaders(config.blockCacheSize(), objectManager, objectStorage,
-            new DefaultObjectReaderFactory(objectStorage)), objectStorage, mock(StorageFailureHandler.class));
+            new DefaultObjectReaderFactory(objectStorage)), objectStorage, mock(StorageHandlerChain.class));
     }
 
     @Test

From 6c47f74f50589e82daa145a2245679845af128e1 Mon Sep 17 00:00:00 2001
From: CLFutureX <775523362@qq.com>
Date: Tue, 6 Aug 2024 13:33:28 +0800
Subject: [PATCH 2/3] feat(s3stream): optimization handler chain

---
 .../automq/stream/s3/failover/StorageFailureHandlerChain.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/s3stream/src/main/java/com/automq/stream/s3/failover/StorageFailureHandlerChain.java b/s3stream/src/main/java/com/automq/stream/s3/failover/StorageFailureHandlerChain.java
index 1c1798cbf4..e712517255 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/failover/StorageFailureHandlerChain.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/failover/StorageFailureHandlerChain.java
@@ -16,7 +16,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StorageFailureHandlerChain implements StorageFailureHandler {
+public class StorageFailureHandlerChain implements StorageHandlerChain {
     private static final Logger LOGGER = LoggerFactory.getLogger(StorageFailureHandlerChain.class);
     private final List<StorageFailureHandler> handlers = new ArrayList<>();
 
@@ -31,6 +31,7 @@ public void handle(Throwable ex) {
         }
     }
 
+    @Override
     public void addHandler(StorageFailureHandler handler) {
         handlers.add(handler);
     }

From bffb264992f85add1824ae3f1c08da70d80df7d6 Mon Sep 17 00:00:00 2001
From: CLFutureX <775523362@qq.com>
Date: Wed, 7 Aug 2024 15:13:04 +0800
Subject: [PATCH 3/3] feat(s3stream): optimization handler chain

---
 .../com/automq/stream/s3/failover/StorageHandlerChain.java     | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/s3stream/src/main/java/com/automq/stream/s3/failover/StorageHandlerChain.java b/s3stream/src/main/java/com/automq/stream/s3/failover/StorageHandlerChain.java
index 66dbfb01d2..1a2f4bf697 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/failover/StorageHandlerChain.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/failover/StorageHandlerChain.java
@@ -11,7 +11,6 @@
 
 package com.automq.stream.s3.failover;
 
-public interface StorageHandlerChain extends StorageFailureHandler{
+public interface StorageHandlerChain extends StorageFailureHandler {
     void addHandler(StorageFailureHandler handler);
-
 }