Skip to content

Add integration test for concurrent multipart uploads on Azure #128503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,21 @@
import org.elasticsearch.rest.RestStatus;
import org.junit.ClassRule;

import java.io.ByteArrayInputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.net.HttpURLConnection;
import java.nio.channels.Channels;
import java.nio.file.Files;
import java.util.Collection;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;

import static org.elasticsearch.common.io.Streams.limitStream;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;

/**
Expand All @@ -58,6 +66,27 @@ public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyReposi

private static final String AZURE_ACCOUNT = System.getProperty("test.azure.account");

/**
* AzureRepositoryPlugin that sets a low value for getUploadBlockSize()
*/
public static class TestAzureRepositoryPlugin extends AzureRepositoryPlugin {

public TestAzureRepositoryPlugin(Settings settings) {
super(settings);
}

@Override
AzureStorageService createAzureStorageService(Settings settings, AzureClientProvider azureClientProvider) {
final long blockSize = ByteSizeValue.ofKb(64L).getBytes() * randomIntBetween(1, 15);
return new AzureStorageService(settings, azureClientProvider) {
@Override
long getUploadBlockSize() {
return blockSize;
}
};
}
}

@ClassRule
public static AzureHttpFixture fixture = new AzureHttpFixture(
USE_FIXTURE ? AzureHttpFixture.Protocol.HTTP : AzureHttpFixture.Protocol.NONE,
Expand All @@ -71,7 +100,7 @@ public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyReposi

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(AzureRepositoryPlugin.class);
return pluginList(TestAzureRepositoryPlugin.class);
}

@Override
Expand Down Expand Up @@ -158,19 +187,67 @@ private void ensureSasTokenPermissions() {

public void testMultiBlockUpload() throws Exception {
final BlobStoreRepository repo = getRepository();
assertThat(
asInstanceOf(AzureBlobStore.class, repo.blobStore()).getLargeBlobThresholdInBytes(),
equalTo(ByteSizeUnit.MB.toBytes(1L))
);
assertThat(asInstanceOf(AzureBlobStore.class, repo.blobStore()).getUploadBlockSize(), lessThan(ByteSizeUnit.MB.toBytes(1L)));

// The configured threshold for this test suite is 1mb
final int blobSize = ByteSizeUnit.MB.toIntBytes(2);
final long blobSize = randomLongBetween(ByteSizeUnit.MB.toBytes(2), ByteSizeUnit.MB.toBytes(4));
final int bufferSize = 8192;

final var file = createTempFile();
final long expectedChecksum;
try (var output = new CheckedOutputStream(new BufferedOutputStream(Files.newOutputStream(file)), new CRC32())) {
long remaining = blobSize;
while (remaining > 0L) {
final var buffer = randomByteArrayOfLength(Math.toIntExact(Math.min(bufferSize, remaining)));
output.write(buffer);
remaining -= buffer.length;
}
output.flush();
expectedChecksum = output.getChecksum().getValue();
}

PlainActionFuture<Void> future = new PlainActionFuture<>();
repo.threadPool().generic().execute(ActionRunnable.run(future, () -> {
final BlobContainer blobContainer = repo.blobStore().blobContainer(repo.basePath().add("large_write"));
blobContainer.writeBlob(
randomPurpose(),
UUIDs.base64UUID(),
new ByteArrayInputStream(randomByteArrayOfLength(blobSize)),
blobSize,
false
);
blobContainer.delete(randomPurpose());
try {
final var blobName = UUIDs.base64UUID();
if (randomBoolean()) {
try (var input = new BufferedInputStream(Files.newInputStream(file))) {
blobContainer.writeBlob(randomPurpose(), blobName, input, blobSize, false);
}
} else {
assertThat(blobContainer.supportsConcurrentMultipartUploads(), equalTo(true));
blobContainer.writeBlobAtomic(randomPurpose(), blobName, blobSize, (offset, length) -> {
var channel = Files.newByteChannel(file);
if (offset > 0L) {
if (channel.size() <= offset) {
throw new AssertionError();
}
channel.position(offset);
}
assert channel.position() == offset;
return new BufferedInputStream(limitStream(Channels.newInputStream(channel), length));
}, false);
}

long bytesCount = 0L;
try (var input = new CheckedInputStream(blobContainer.readBlob(OperationPurpose.INDICES, blobName), new CRC32())) {
var buffer = new byte[bufferSize];
int bytesRead;
while ((bytesRead = input.read(buffer)) != -1) {
bytesCount += bytesRead;
}

assertThat(bytesCount, equalTo(blobSize));
assertThat(input.getChecksum().getValue(), equalTo(expectedChecksum));
}
} finally {
blobContainer.delete(randomPurpose());
}
}));
future.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,12 +504,10 @@ void writeBlobAtomic(
.collect(Collectors.toList())
.flatMap(blockIds -> {
logger.debug("{}: all {} parts uploaded, now committing", blobName, multiParts.size());
var response = asyncClient.commitBlockList(
return asyncClient.commitBlockList(
multiParts.stream().map(MultiPart::blockId).toList(),
failIfAlreadyExists == false
);
logger.debug("{}: all {} parts committed", blobName, multiParts.size());
return response;
).doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size()));
})
.block();
}
Expand Down