diff --git a/pom.xml b/pom.xml index b067e15..b4ae966 100644 --- a/pom.xml +++ b/pom.xml @@ -75,7 +75,7 @@ ch.qos.logback logback-classic - 1.3.5 + 1.3.12 test @@ -96,7 +96,7 @@ com.github.tomakehurst wiremock-jre8 - 2.35.0 + 2.35.1 test diff --git a/src/main/java/io/github/jpmorganchase/fusion/FusionConfiguration.java b/src/main/java/io/github/jpmorganchase/fusion/FusionConfiguration.java index 635d33e..b596a12 100644 --- a/src/main/java/io/github/jpmorganchase/fusion/FusionConfiguration.java +++ b/src/main/java/io/github/jpmorganchase/fusion/FusionConfiguration.java @@ -51,6 +51,13 @@ public class FusionConfiguration { @Builder.Default int uploadPartSize = 8; + /** + * Max in flux data to be read at a given time. Defaults to 500MB. + * If a value such as 1gb is required, then client would set this value to 1000; + */ + @Builder.Default + long maxInFluxDataSize = 500; + /** * Size of Thread-Pool to be used for uploading chunks of a multipart file * Defaults to number of available processors. diff --git a/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java b/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java index 501ce7c..8b84457 100644 --- a/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java +++ b/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java @@ -24,6 +24,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.invoke.MethodHandles; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -32,11 +33,16 @@ import java.util.concurrent.Executors; import lombok.Builder; import lombok.Getter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Builder @Getter public class FusionAPIUploadOperations implements APIUploadOperations { + private static final Logger logger = + LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String UPLOAD_FAILED_EXCEPTION_MSG = "Exception encountered while attempting to upload part, please try again"; @@ -76,6 +82,12 @@ public class FusionAPIUploadOperations implements APIUploadOperations { */ int uploadThreadPoolSize; + /** + * Max size of in-flux data that can be read at a given time. + * See {@link FusionConfiguration} for default values. + */ + long maxInFluxDataSize; + /** * Call the API upload endpoint to load a distribution * @@ -206,10 +218,12 @@ protected MultipartTransferContext callAPIToInitiateMultiPartUpload(UploadReques protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext mtx, UploadRequest ur) { int chunkSize = uploadPartSize * (1024 * 1024); + long maxInFluxBytes = maxInFluxDataSize * (1024L * 1024L); byte[] buffer = new byte[chunkSize]; int partCnt = 1; int totalBytes = 0; + int inFluxBytes = 0; ExecutorService executor = Executors.newFixedThreadPool(uploadThreadPoolSize); try { @@ -217,10 +231,18 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext int bytesRead; while ((bytesRead = ur.getData().read(buffer)) != -1) { + + logger.debug( + "Creating upload task for part number {}, bytes read for this part {}", partCnt, bytesRead); + final int currentPartCnt = partCnt; final int currentBytesRead = bytesRead; byte[] taskBuffer = Arrays.copyOf(buffer, bytesRead); + if (inFluxBytes > maxInFluxBytes) { + inFluxBytes = easeDataPressure(futures); + } + futures.add(CompletableFuture.runAsync( () -> mtx.partUploaded( callAPIToUploadPart(mtx, ur, taskBuffer, currentBytesRead, currentPartCnt)), @@ -228,6 +250,7 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext partCnt++; totalBytes += bytesRead; + inFluxBytes += bytesRead; } for (CompletableFuture future : futures) { @@ -243,6 +266,18 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext return mtx.transferred(chunkSize, totalBytes, partCnt); } + private int easeDataPressure(List> futures) + throws InterruptedException, ExecutionException { + + logger.debug("Reached max in-flux bytes - easing pressure"); + for (CompletableFuture future : futures) { + future.get(); + } + logger.debug("Max in-flux bytes handled - pressure eased"); + futures.clear(); + return 0; + } + protected UploadedPartContext callAPIToUploadPart( MultipartTransferContext mtx, UploadRequest ur, byte[] part, int read, int partNo) { @@ -348,6 +383,7 @@ public static class FusionAPIUploadOperationsBuilder { int singlePartUploadSizeLimit; int uploadPartSize; int uploadThreadPoolSize; + long maxInFluxDataSize; public FusionAPIUploadOperationsBuilder configuration(FusionConfiguration configuration) { this.configuration = configuration; @@ -371,6 +407,12 @@ private FusionAPIUploadOperationsBuilder uploadThreadPoolSize(int uploadThreadPo this.uploadThreadPoolSize = uploadThreadPoolSize; return this; } + + @SuppressWarnings("PIT") + private FusionAPIUploadOperationsBuilder maxInFluxDataSize(long maxInFluxDataSize) { + this.maxInFluxDataSize = maxInFluxDataSize; + return this; + } } private static class CustomFusionAPIUploadOperationsBuilder extends FusionAPIUploadOperationsBuilder { @@ -379,6 +421,7 @@ public FusionAPIUploadOperations build() { this.singlePartUploadSizeLimit = configuration.getSinglePartUploadSizeLimit(); this.uploadPartSize = configuration.getUploadPartSize(); this.uploadThreadPoolSize = configuration.getUploadThreadPoolSize(); + this.maxInFluxDataSize = configuration.getMaxInFluxDataSize(); if (Objects.isNull(digestProducer)) { this.digestProducer = AlgoSpecificDigestProducer.builder()