From 689e2cabbb35197acd6a43828752e917a7ce73aa Mon Sep 17 00:00:00 2001 From: tlrx Date: Tue, 27 May 2025 13:19:30 +0200 Subject: [PATCH 1/2] Add integration test for concurrent multipart uploads on Azure Relates ES-11815 --- .../AzureStorageCleanupThirdPartyTests.java | 102 ++++++++++++++++-- .../repositories/azure/AzureBlobStore.java | 6 +- 2 files changed, 93 insertions(+), 15 deletions(-) diff --git a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java index fb4de42e57e4d..ae425d6d06036 100644 --- a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java +++ b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java @@ -39,13 +39,21 @@ import org.elasticsearch.rest.RestStatus; import org.junit.ClassRule; -import java.io.ByteArrayInputStream; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.net.HttpURLConnection; +import java.nio.channels.Channels; +import java.nio.file.Files; import java.util.Collection; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import java.util.zip.CheckedOutputStream; +import static org.elasticsearch.common.io.Streams.limitStream; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.hamcrest.Matchers.blankOrNullString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; /** @@ -58,6 +66,27 @@ public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyReposi private static final String AZURE_ACCOUNT = System.getProperty("test.azure.account"); + /** + * AzureRepositoryPlugin that sets a low value for getUploadBlockSize() + */ + public static class TestAzureRepositoryPlugin extends AzureRepositoryPlugin { + + public TestAzureRepositoryPlugin(Settings settings) { + super(settings); + } + + @Override + AzureStorageService createAzureStorageService(Settings settings, AzureClientProvider azureClientProvider) { + final long blockSize = ByteSizeValue.ofKb(64L).getBytes() * randomIntBetween(1, 15); + return new AzureStorageService(settings, azureClientProvider) { + @Override + long getUploadBlockSize() { + return blockSize; + } + }; + } + } + @ClassRule public static AzureHttpFixture fixture = new AzureHttpFixture( USE_FIXTURE ? AzureHttpFixture.Protocol.HTTP : AzureHttpFixture.Protocol.NONE, @@ -71,7 +100,7 @@ public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyReposi @Override protected Collection> getPlugins() { - return pluginList(AzureRepositoryPlugin.class); + return pluginList(TestAzureRepositoryPlugin.class); } @Override @@ -158,19 +187,70 @@ private void ensureSasTokenPermissions() { public void testMultiBlockUpload() throws Exception { final BlobStoreRepository repo = getRepository(); + assertThat( + asInstanceOf(AzureBlobStore.class, repo.blobStore()).getLargeBlobThresholdInBytes(), + equalTo(ByteSizeUnit.MB.toBytes(1L)) + ); + assertThat( + asInstanceOf(AzureBlobStore.class, repo.blobStore()).getUploadBlockSize(), + lessThan(ByteSizeUnit.MB.toBytes(1L)) + ); + // The configured threshold for this test suite is 1mb - final int blobSize = ByteSizeUnit.MB.toIntBytes(2); + final long blobSize = randomLongBetween(ByteSizeUnit.MB.toBytes(2), ByteSizeUnit.MB.toBytes(4)); + final int bufferSize = 8192; + + final var file = createTempFile(); + final long expectedChecksum; + try (var output = new CheckedOutputStream(new BufferedOutputStream(Files.newOutputStream(file)), new CRC32())) { + long remaining = blobSize; + while (remaining > 0L) { + final var buffer = randomByteArrayOfLength(Math.toIntExact(Math.min(bufferSize, remaining))); + output.write(buffer); + remaining -= buffer.length; + } + output.flush(); + expectedChecksum = output.getChecksum().getValue(); + } + PlainActionFuture future = new PlainActionFuture<>(); repo.threadPool().generic().execute(ActionRunnable.run(future, () -> { final BlobContainer blobContainer = repo.blobStore().blobContainer(repo.basePath().add("large_write")); - blobContainer.writeBlob( - randomPurpose(), - UUIDs.base64UUID(), - new ByteArrayInputStream(randomByteArrayOfLength(blobSize)), - blobSize, - false - ); - blobContainer.delete(randomPurpose()); + try { + final var blobName = UUIDs.base64UUID(); + if (randomBoolean()) { + try (var input = new BufferedInputStream(Files.newInputStream(file))) { + blobContainer.writeBlob(randomPurpose(), blobName, input, blobSize, false); + } + } else { + assertThat(blobContainer.supportsConcurrentMultipartUploads(), equalTo(true)); + blobContainer.writeBlobAtomic(randomPurpose(), blobName, blobSize, (offset, length) -> { + var channel = Files.newByteChannel(file); + if (offset > 0L) { + if (channel.size() <= offset) { + throw new AssertionError(); + } + channel.position(offset); + } + assert channel.position() == offset; + return new BufferedInputStream(limitStream(Channels.newInputStream(channel), length)); + }, false); + } + + long bytesCount = 0L; + try (var input = new CheckedInputStream(blobContainer.readBlob(OperationPurpose.INDICES, blobName), new CRC32())) { + var buffer = new byte[bufferSize]; + int bytesRead; + while ((bytesRead = input.read(buffer)) != -1) { + bytesCount += bytesRead; + } + + assertThat(bytesCount, equalTo(blobSize)); + assertThat(input.getChecksum().getValue(), equalTo(expectedChecksum)); + } + } finally { + blobContainer.delete(randomPurpose()); + } })); future.get(); } diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 430459f47ddfb..64c0478c529c3 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -504,12 +504,10 @@ void writeBlobAtomic( .collect(Collectors.toList()) .flatMap(blockIds -> { logger.debug("{}: all {} parts uploaded, now committing", blobName, multiParts.size()); - var response = asyncClient.commitBlockList( + return asyncClient.commitBlockList( multiParts.stream().map(MultiPart::blockId).toList(), failIfAlreadyExists == false - ); - logger.debug("{}: all {} parts committed", blobName, multiParts.size()); - return response; + ).doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size())); }) .block(); } From b2f4ffaa4d21060a78d823bb35f63e1518541cbd Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 27 May 2025 11:29:10 +0000 Subject: [PATCH 2/2] [CI] Auto commit changes from spotless --- .../azure/AzureStorageCleanupThirdPartyTests.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java index ae425d6d06036..2f63fb36613ee 100644 --- a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java +++ b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java @@ -191,10 +191,7 @@ public void testMultiBlockUpload() throws Exception { asInstanceOf(AzureBlobStore.class, repo.blobStore()).getLargeBlobThresholdInBytes(), equalTo(ByteSizeUnit.MB.toBytes(1L)) ); - assertThat( - asInstanceOf(AzureBlobStore.class, repo.blobStore()).getUploadBlockSize(), - lessThan(ByteSizeUnit.MB.toBytes(1L)) - ); + assertThat(asInstanceOf(AzureBlobStore.class, repo.blobStore()).getUploadBlockSize(), lessThan(ByteSizeUnit.MB.toBytes(1L))); // The configured threshold for this test suite is 1mb final long blobSize = randomLongBetween(ByteSizeUnit.MB.toBytes(2), ByteSizeUnit.MB.toBytes(4));