Skip to content

Commit 5860a39

Browse files
authored
fix(s3stream): fix compaction block on upload exception (#2264)
Signed-off-by: Shichao Nie <[email protected]>
1 parent 273c134 commit 5860a39

File tree

3 files changed

+99
-24
lines changed

3 files changed

+99
-24
lines changed

s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.automq.stream.s3.objects.StreamObject;
3333
import com.automq.stream.s3.operator.ObjectStorage;
3434
import com.automq.stream.s3.streams.StreamManager;
35+
import com.automq.stream.utils.FutureUtil;
3536
import com.automq.stream.utils.LogContext;
3637
import com.automq.stream.utils.ThreadUtils;
3738
import com.automq.stream.utils.Threads;
@@ -750,28 +751,29 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List<Compactio
750751
if (uploadException != null) {
751752
logger.error("Error while uploading compaction objects", uploadException);
752753
}
753-
uploader.forceUploadStreamSetObject().whenComplete((vv, forceUploadException) -> {
754-
if (forceUploadException != null) {
755-
logger.error("Error while force uploading stream set object", uploadException);
756-
}
757-
if (uploadException != null || forceUploadException != null) {
758-
uploader.release().whenComplete((vvv, releaseException) -> {
759-
if (releaseException != null) {
760-
logger.error("Unexpected exception while release uploader");
761-
}
762-
for (CompactedObject compactedObject : compactionPlan.compactedObjects()) {
763-
compactedObject.streamDataBlocks().forEach(StreamDataBlock::release);
764-
}
765-
if (uploadException != null) {
766-
compactionCf.completeExceptionally(new CompletionException("Uploading failed", uploadException));
767-
} else {
768-
compactionCf.completeExceptionally(new CompletionException("Force uploading sso failed", forceUploadException));
769-
}
770-
});
771-
} else {
772-
compactionCf.complete(null);
773-
}
774-
});
754+
FutureUtil.exec(uploader::forceUploadStreamSetObject, logger, "force upload sso")
755+
.whenComplete((vv, forceUploadException) -> {
756+
if (forceUploadException != null) {
757+
logger.error("Error while force uploading stream set object", uploadException);
758+
}
759+
if (uploadException != null || forceUploadException != null) {
760+
FutureUtil.exec(uploader::release, logger, "release uploader").whenComplete((vvv, releaseException) -> {
761+
if (releaseException != null) {
762+
logger.error("Unexpected exception while release uploader");
763+
}
764+
for (CompactedObject compactedObject : compactionPlan.compactedObjects()) {
765+
compactedObject.streamDataBlocks().forEach(StreamDataBlock::release);
766+
}
767+
if (uploadException != null) {
768+
compactionCf.completeExceptionally(new CompletionException("Uploading failed", uploadException));
769+
} else {
770+
compactionCf.completeExceptionally(new CompletionException("Force uploading sso failed", forceUploadException));
771+
}
772+
});
773+
} else {
774+
compactionCf.complete(null);
775+
}
776+
});
775777
});
776778
}
777779
try {

s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,14 @@ public CompletableFuture<Void> forceUpload() {
8585
private void uploadWaitingList() {
8686
CompositeByteBuf buf = groupWaitingBlocks();
8787
List<StreamDataBlock> blocks = new LinkedList<>(waitingUploadBlocks);
88-
writer.write(buf).thenAccept(v -> {
88+
writer.write(buf).whenComplete((v, ex) -> {
8989
for (StreamDataBlock block : blocks) {
9090
waitingUploadBlockCfs.computeIfPresent(block, (k, cf) -> {
91-
cf.complete(null);
91+
if (ex != null) {
92+
cf.completeExceptionally(ex);
93+
} else {
94+
cf.complete(null);
95+
}
9296
return null;
9397
});
9498
}

s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,13 @@
6060

6161
import io.netty.buffer.ByteBuf;
6262
import software.amazon.awssdk.core.async.AsyncRequestBody;
63+
import software.amazon.awssdk.http.HttpStatusCode;
6364
import software.amazon.awssdk.services.s3.S3AsyncClient;
65+
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
66+
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
6467
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
68+
import software.amazon.awssdk.services.s3.model.S3Exception;
69+
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
6570

6671
import static org.junit.jupiter.api.Assertions.assertEquals;
6772
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -618,6 +623,70 @@ public void testCompactNoneExistObjects2() {
618623
}
619624
}
620625

626+
@Test
627+
public void testCompactWithUploadException() {
628+
when(config.streamSetObjectCompactionStreamSplitSize()).thenReturn(100 * 1024 * 1024L);
629+
when(config.streamSetObjectCompactionCacheSize()).thenReturn(1024 * 1024 * 1024L);
630+
when(config.objectPartSize()).thenReturn(100 * 1024 * 1024);
631+
Map<Long, List<StreamDataBlock>> streamDataBlockMap = getStreamDataBlockMapLarge();
632+
S3ObjectMetadata objectMetadata0 = new S3ObjectMetadata(OBJECT_0, 0, S3ObjectType.STREAM_SET);
633+
S3ObjectMetadata objectMetadata1 = new S3ObjectMetadata(OBJECT_1, 0, S3ObjectType.STREAM_SET);
634+
S3ObjectMetadata objectMetadata2 = new S3ObjectMetadata(OBJECT_2, 0, S3ObjectType.STREAM_SET);
635+
List<S3ObjectMetadata> s3ObjectMetadata = List.of(objectMetadata0, objectMetadata1, objectMetadata2);
636+
this.compactionAnalyzer = new CompactionAnalyzer(config.streamSetObjectCompactionCacheSize(), config.streamSetObjectCompactionStreamSplitSize(),
637+
config.maxStreamNumPerStreamSetObject(), config.maxStreamObjectNumPerCommit());
638+
List<CompactionPlan> compactionPlans = this.compactionAnalyzer.analyze(streamDataBlockMap, new HashSet<>());
639+
CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest();
640+
641+
S3AsyncClient s3AsyncClient = Mockito.mock(S3AsyncClient.class);
642+
doAnswer(invocation -> CompletableFuture.failedFuture(S3Exception.builder().statusCode(HttpStatusCode.NOT_FOUND).build())).when(s3AsyncClient).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class));
643+
doAnswer(invocation -> CompletableFuture.completedFuture(CreateMultipartUploadResponse.builder().uploadId("123").build())).when(s3AsyncClient).createMultipartUpload(any(CreateMultipartUploadRequest.class));
644+
doAnswer(invocation -> CompletableFuture.failedFuture(S3Exception.builder().statusCode(HttpStatusCode.NOT_FOUND).build())).when(s3AsyncClient).uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class));
645+
646+
AwsObjectStorage objectStorage = Mockito.spy(new AwsObjectStorage(s3AsyncClient, ""));
647+
doAnswer(invocation -> CompletableFuture.completedFuture(TestUtils.randomPooled(65 * 1024 * 1024))).when(objectStorage).rangeRead(any(), eq(objectMetadata0.key()), anyLong(), anyLong());
648+
doAnswer(invocation -> CompletableFuture.completedFuture(TestUtils.randomPooled(80 * 1024 * 1024))).when(objectStorage).rangeRead(any(), eq(objectMetadata1.key()), anyLong(), anyLong());
649+
doAnswer(invocation -> CompletableFuture.completedFuture(TestUtils.randomPooled(50 * 1024 * 1024))).when(objectStorage).rangeRead(any(), eq(objectMetadata2.key()), anyLong(), anyLong());
650+
651+
CompactionManager compactionManager = new CompactionManager(config, objectManager, streamManager, objectStorage);
652+
Assertions.assertThrowsExactly(CompletionException.class,
653+
() -> compactionManager.executeCompactionPlans(request, compactionPlans, s3ObjectMetadata));
654+
for (CompactionPlan plan : compactionPlans) {
655+
plan.streamDataBlocksMap().forEach((streamId, blocks) -> blocks.forEach(block -> {
656+
if (block.getObjectId() != OBJECT_1) {
657+
block.getDataCf().thenAccept(data -> {
658+
Assertions.assertEquals(0, data.refCnt());
659+
}).join();
660+
}
661+
}));
662+
}
663+
}
664+
665+
private static Map<Long, List<StreamDataBlock>> getStreamDataBlockMapLarge() {
666+
StreamDataBlock block1 = new StreamDataBlock(OBJECT_0, new DataBlockIndex(0, 0, 15, 15, 0, 15 * 1024 * 1024));
667+
StreamDataBlock block2 = new StreamDataBlock(OBJECT_0, new DataBlockIndex(1, 0, 20, 20, 15, 50 * 1024 * 1024));
668+
669+
StreamDataBlock block3 = new StreamDataBlock(OBJECT_1, new DataBlockIndex(0, 15, 12, 12, 0, 20 * 1024 * 1024));
670+
StreamDataBlock block4 = new StreamDataBlock(OBJECT_1, new DataBlockIndex(1, 20, 25, 25, 20, 60 * 1024 * 1024));
671+
672+
StreamDataBlock block5 = new StreamDataBlock(OBJECT_2, new DataBlockIndex(0, 27, 13, 20, 0, 20 * 1024 * 1024));
673+
StreamDataBlock block6 = new StreamDataBlock(OBJECT_2, new DataBlockIndex(3, 0, 30, 30, 20, 30 * 1024 * 1024));
674+
return Map.of(
675+
OBJECT_0, List.of(
676+
block1,
677+
block2
678+
),
679+
OBJECT_1, List.of(
680+
block3,
681+
block4
682+
),
683+
OBJECT_2, List.of(
684+
block5,
685+
block6
686+
)
687+
);
688+
}
689+
621690
@Test
622691
public void testCompactWithLimit() {
623692
when(config.streamSetObjectCompactionStreamSplitSize()).thenReturn(70L);

0 commit comments

Comments
 (0)