Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s3stream): fix compaction block on upload exception #2265

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.automq.stream.s3.objects.StreamObject;
import com.automq.stream.s3.operator.ObjectStorage;
import com.automq.stream.s3.streams.StreamManager;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.LogContext;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
Expand Down Expand Up @@ -755,28 +756,29 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List<Compactio
if (uploadException != null) {
logger.error("Error while uploading compaction objects", uploadException);
}
uploader.forceUploadStreamSetObject().whenComplete((vv, forceUploadException) -> {
if (forceUploadException != null) {
logger.error("Error while force uploading stream set object", uploadException);
}
if (uploadException != null || forceUploadException != null) {
uploader.release().whenComplete((vvv, releaseException) -> {
if (releaseException != null) {
logger.error("Unexpected exception while release uploader");
}
for (CompactedObject compactedObject : compactionPlan.compactedObjects()) {
compactedObject.streamDataBlocks().forEach(StreamDataBlock::release);
}
if (uploadException != null) {
compactionCf.completeExceptionally(new CompletionException("Uploading failed", uploadException));
} else {
compactionCf.completeExceptionally(new CompletionException("Force uploading sso failed", forceUploadException));
}
});
} else {
compactionCf.complete(null);
}
});
FutureUtil.exec(uploader::forceUploadStreamSetObject, logger, "force upload sso")
.whenComplete((vv, forceUploadException) -> {
if (forceUploadException != null) {
logger.error("Error while force uploading stream set object", uploadException);
}
if (uploadException != null || forceUploadException != null) {
FutureUtil.exec(uploader::release, logger, "release uploader").whenComplete((vvv, releaseException) -> {
if (releaseException != null) {
logger.error("Unexpected exception while release uploader");
}
for (CompactedObject compactedObject : compactionPlan.compactedObjects()) {
compactedObject.streamDataBlocks().forEach(StreamDataBlock::release);
}
if (uploadException != null) {
compactionCf.completeExceptionally(new CompletionException("Uploading failed", uploadException));
} else {
compactionCf.completeExceptionally(new CompletionException("Force uploading sso failed", forceUploadException));
}
});
} else {
compactionCf.complete(null);
}
});
});
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,14 @@ public CompletableFuture<Void> forceUpload() {
private void uploadWaitingList() {
CompositeByteBuf buf = groupWaitingBlocks();
List<StreamDataBlock> blocks = new LinkedList<>(waitingUploadBlocks);
writer.write(buf).thenAccept(v -> {
writer.write(buf).whenComplete((v, ex) -> {
for (StreamDataBlock block : blocks) {
waitingUploadBlockCfs.computeIfPresent(block, (k, cf) -> {
cf.complete(null);
if (ex != null) {
cf.completeExceptionally(ex);
} else {
cf.complete(null);
}
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.http.HttpStatusCode;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -534,6 +539,70 @@ public void testCompactNoneExistObjects2() {
}
}

@Test
public void testCompactWithUploadException() {
when(config.streamSetObjectCompactionStreamSplitSize()).thenReturn(100 * 1024 * 1024L);
when(config.streamSetObjectCompactionCacheSize()).thenReturn(1024 * 1024 * 1024L);
when(config.objectPartSize()).thenReturn(100 * 1024 * 1024);
Map<Long, List<StreamDataBlock>> streamDataBlockMap = getStreamDataBlockMapLarge();
S3ObjectMetadata objectMetadata0 = new S3ObjectMetadata(OBJECT_0, 0, S3ObjectType.STREAM_SET);
S3ObjectMetadata objectMetadata1 = new S3ObjectMetadata(OBJECT_1, 0, S3ObjectType.STREAM_SET);
S3ObjectMetadata objectMetadata2 = new S3ObjectMetadata(OBJECT_2, 0, S3ObjectType.STREAM_SET);
List<S3ObjectMetadata> s3ObjectMetadata = List.of(objectMetadata0, objectMetadata1, objectMetadata2);
this.compactionAnalyzer = new CompactionAnalyzer(config.streamSetObjectCompactionCacheSize(), config.streamSetObjectCompactionStreamSplitSize(),
config.maxStreamNumPerStreamSetObject(), config.maxStreamObjectNumPerCommit());
List<CompactionPlan> compactionPlans = this.compactionAnalyzer.analyze(streamDataBlockMap, new HashSet<>());
CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest();

S3AsyncClient s3AsyncClient = Mockito.mock(S3AsyncClient.class);
doAnswer(invocation -> CompletableFuture.failedFuture(S3Exception.builder().statusCode(HttpStatusCode.NOT_FOUND).build())).when(s3AsyncClient).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class));
doAnswer(invocation -> CompletableFuture.completedFuture(CreateMultipartUploadResponse.builder().uploadId("123").build())).when(s3AsyncClient).createMultipartUpload(any(CreateMultipartUploadRequest.class));
doAnswer(invocation -> CompletableFuture.failedFuture(S3Exception.builder().statusCode(HttpStatusCode.NOT_FOUND).build())).when(s3AsyncClient).uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class));

AwsObjectStorage objectStorage = Mockito.spy(new AwsObjectStorage(s3AsyncClient, ""));
doAnswer(invocation -> CompletableFuture.completedFuture(TestUtils.randomPooled(65 * 1024 * 1024))).when(objectStorage).rangeRead(any(), eq(objectMetadata0.key()), anyLong(), anyLong());
doAnswer(invocation -> CompletableFuture.completedFuture(TestUtils.randomPooled(80 * 1024 * 1024))).when(objectStorage).rangeRead(any(), eq(objectMetadata1.key()), anyLong(), anyLong());
doAnswer(invocation -> CompletableFuture.completedFuture(TestUtils.randomPooled(50 * 1024 * 1024))).when(objectStorage).rangeRead(any(), eq(objectMetadata2.key()), anyLong(), anyLong());

CompactionManager compactionManager = new CompactionManager(config, objectManager, streamManager, objectStorage);
Assertions.assertThrowsExactly(CompletionException.class,
() -> compactionManager.executeCompactionPlans(request, compactionPlans, s3ObjectMetadata));
for (CompactionPlan plan : compactionPlans) {
plan.streamDataBlocksMap().forEach((streamId, blocks) -> blocks.forEach(block -> {
if (block.getObjectId() != OBJECT_1) {
block.getDataCf().thenAccept(data -> {
Assertions.assertEquals(0, data.refCnt());
}).join();
}
}));
}
}

private static Map<Long, List<StreamDataBlock>> getStreamDataBlockMapLarge() {
StreamDataBlock block1 = new StreamDataBlock(OBJECT_0, new DataBlockIndex(0, 0, 15, 15, 0, 15 * 1024 * 1024));
StreamDataBlock block2 = new StreamDataBlock(OBJECT_0, new DataBlockIndex(1, 0, 20, 20, 15, 50 * 1024 * 1024));

StreamDataBlock block3 = new StreamDataBlock(OBJECT_1, new DataBlockIndex(0, 15, 12, 12, 0, 20 * 1024 * 1024));
StreamDataBlock block4 = new StreamDataBlock(OBJECT_1, new DataBlockIndex(1, 20, 25, 25, 20, 60 * 1024 * 1024));

StreamDataBlock block5 = new StreamDataBlock(OBJECT_2, new DataBlockIndex(0, 27, 13, 20, 0, 20 * 1024 * 1024));
StreamDataBlock block6 = new StreamDataBlock(OBJECT_2, new DataBlockIndex(3, 0, 30, 30, 20, 30 * 1024 * 1024));
return Map.of(
OBJECT_0, List.of(
block1,
block2
),
OBJECT_1, List.of(
block3,
block4
),
OBJECT_2, List.of(
block5,
block6
)
);
}

@Test
public void testCompactWithLimit() {
when(config.streamSetObjectCompactionStreamSplitSize()).thenReturn(70L);
Expand Down
Loading