diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 000000000..59aac7e10 Binary files /dev/null and b/.DS_Store differ diff --git a/api/src/main/java/io/minio/MinioAsyncClient.java b/api/src/main/java/io/minio/MinioAsyncClient.java index 715559b09..677708d06 100644 --- a/api/src/main/java/io/minio/MinioAsyncClient.java +++ b/api/src/main/java/io/minio/MinioAsyncClient.java @@ -76,6 +76,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import java.util.regex.Matcher; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; @@ -140,7 +142,8 @@ private MinioAsyncClient( String region, Provider provider, OkHttpClient httpClient, - boolean closeHttpClient) { + boolean closeHttpClient, + ExecutorService executorService) { super( baseUrl, awsS3Prefix, @@ -150,7 +153,8 @@ private MinioAsyncClient( region, provider, httpClient, - closeHttpClient); + closeHttpClient, + executorService); } protected MinioAsyncClient(MinioAsyncClient client) { @@ -453,7 +457,7 @@ public CompletableFuture copyObject(CopyObjectArgs args) args.validateSse(this.baseUrl); return CompletableFuture.supplyAsync( - () -> args.source().offset() != null && args.source().length() != null) + () -> args.source().offset() != null && args.source().length() != null, executorService) .thenCompose( condition -> { if (condition) { @@ -668,7 +672,7 @@ public CompletableFuture composeObject(ComposeObjectArgs ar Multimap headers = newMultimap(args.extraHeaders()); headers.putAll(args.genHeaders()); return headers; - }) + }, executorService) .thenCompose( headers -> { try { @@ -706,7 +710,7 @@ public CompletableFuture composeObject(ComposeObjectArgs ar CompletableFuture.supplyAsync( () -> { return new Part[partCount[0]]; - }); + }, executorService); for (ComposeSource src : sources) { long size = 0; try { @@ -3156,7 +3160,7 @@ public CompletableFuture uploadSnowballObjects( } } return baos; - }) + }, executorService) .thenCompose( baos -> { Multimap headers = newMultimap(args.extraHeaders()); @@ -3225,6 +3229,7 @@ public static final class Builder { private Provider provider; private OkHttpClient httpClient; private boolean closeHttpClient; + private ExecutorService executorService = ForkJoinPool.commonPool(); private void setAwsInfo(String host, boolean https) { this.awsS3Prefix = null; @@ -3338,6 +3343,11 @@ public Builder httpClient(OkHttpClient httpClient, boolean close) { return this; } + public Builder executorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + public MinioAsyncClient build() { HttpUtils.validateNotNull(this.baseUrl, "endpoint"); @@ -3365,7 +3375,8 @@ public MinioAsyncClient build() { region, provider, httpClient, - closeHttpClient); + closeHttpClient, + executorService()); } } } diff --git a/api/src/main/java/io/minio/S3Base.java b/api/src/main/java/io/minio/S3Base.java index 5580d5745..b86eb563b 100644 --- a/api/src/main/java/io/minio/S3Base.java +++ b/api/src/main/java/io/minio/S3Base.java @@ -84,6 +84,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -137,6 +138,7 @@ public abstract class S3Base implements AutoCloseable { protected Provider provider; protected OkHttpClient httpClient; protected boolean closeHttpClient; + protected final ExecutorService executorService; /** @deprecated This method is no longer supported. */ @Deprecated @@ -161,6 +163,20 @@ protected S3Base( false); } + protected S3Base( + HttpUrl baseUrl, + String awsS3Prefix, + String awsDomainSuffix, + boolean awsDualstack, + boolean useVirtualStyle, + String region, + Provider provider, + OkHttpClient httpClient, + boolean closeHttpClient) { + this(baseUrl, awsS3Prefix, awsDomainSuffix, awsDualstack, useVirtualStyle, region, provider, httpClient, closeHttpClient); + this.executorService = ForkJoinPool.commonPool(); + } + protected S3Base( HttpUrl baseUrl, String awsS3Prefix, @@ -170,7 +186,8 @@ protected S3Base( String region, Provider provider, OkHttpClient httpClient, - boolean closeHttpClient) { + boolean closeHttpClient, + ExecutorService executorService) { this.baseUrl = baseUrl; this.awsS3Prefix = awsS3Prefix; this.awsDomainSuffix = awsDomainSuffix; @@ -180,6 +197,7 @@ protected S3Base( this.provider = provider; this.httpClient = httpClient; this.closeHttpClient = closeHttpClient; + this.executorService = executorService; } /** @deprecated This method is no longer supported. */ @@ -221,6 +239,7 @@ protected S3Base(S3Base client) { this.provider = client.provider; this.httpClient = client.httpClient; this.closeHttpClient = client.closeHttpClient; + this.executorService = client.executorService; } /** Check whether argument is valid or not. */ @@ -1171,7 +1190,8 @@ protected CompletableFuture calculatePartCountAsync(List long[] objectSize = {0}; int index = 0; - CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> 0); + CompletableFuture completableFuture = + CompletableFuture.supplyAsync(() -> 0, executorService); for (ComposeSource src : sources) { index++; final int i = index; @@ -2891,7 +2911,8 @@ private CompletableFuture putMultipartObjectAsync( throw new CompletionException(throwable); } return response; - }); + }, + executorService); } /** @@ -2937,7 +2958,8 @@ protected CompletableFuture putObjectAsync( } catch (NoSuchAlgorithmException | IOException e) { throw new CompletionException(e); } - }) + }, + executorService) .thenCompose( partSource -> { try {