diff --git a/pom.xml b/pom.xml
index b067e15..b4ae966 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@
ch.qos.logback
logback-classic
- 1.3.5
+ 1.3.12
test
@@ -96,7 +96,7 @@
com.github.tomakehurst
wiremock-jre8
- 2.35.0
+ 2.35.1
test
diff --git a/src/main/java/io/github/jpmorganchase/fusion/FusionConfiguration.java b/src/main/java/io/github/jpmorganchase/fusion/FusionConfiguration.java
index 635d33e..b596a12 100644
--- a/src/main/java/io/github/jpmorganchase/fusion/FusionConfiguration.java
+++ b/src/main/java/io/github/jpmorganchase/fusion/FusionConfiguration.java
@@ -51,6 +51,13 @@ public class FusionConfiguration {
@Builder.Default
int uploadPartSize = 8;
+ /**
+ * Max in flux data to be read at a given time. Defaults to 500MB.
+ * If a value such as 1gb is required, then client would set this value to 1000;
+ */
+ @Builder.Default
+ long maxInFluxDataSize = 500;
+
/**
* Size of Thread-Pool to be used for uploading chunks of a multipart file
* Defaults to number of available processors.
diff --git a/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java b/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java
index 501ce7c..8b84457 100644
--- a/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java
+++ b/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java
@@ -24,6 +24,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CompletableFuture;
@@ -32,11 +33,16 @@
import java.util.concurrent.Executors;
import lombok.Builder;
import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Builder
@Getter
public class FusionAPIUploadOperations implements APIUploadOperations {
+ private static final Logger logger =
+ LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private static final String UPLOAD_FAILED_EXCEPTION_MSG =
"Exception encountered while attempting to upload part, please try again";
@@ -76,6 +82,12 @@ public class FusionAPIUploadOperations implements APIUploadOperations {
*/
int uploadThreadPoolSize;
+ /**
+ * Max size of in-flux data that can be read at a given time.
+ * See {@link FusionConfiguration} for default values.
+ */
+ long maxInFluxDataSize;
+
/**
* Call the API upload endpoint to load a distribution
*
@@ -206,10 +218,12 @@ protected MultipartTransferContext callAPIToInitiateMultiPartUpload(UploadReques
protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext mtx, UploadRequest ur) {
int chunkSize = uploadPartSize * (1024 * 1024);
+ long maxInFluxBytes = maxInFluxDataSize * (1024L * 1024L);
byte[] buffer = new byte[chunkSize];
int partCnt = 1;
int totalBytes = 0;
+ int inFluxBytes = 0;
ExecutorService executor = Executors.newFixedThreadPool(uploadThreadPoolSize);
try {
@@ -217,10 +231,18 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext
int bytesRead;
while ((bytesRead = ur.getData().read(buffer)) != -1) {
+
+ logger.debug(
+ "Creating upload task for part number {}, bytes read for this part {}", partCnt, bytesRead);
+
final int currentPartCnt = partCnt;
final int currentBytesRead = bytesRead;
byte[] taskBuffer = Arrays.copyOf(buffer, bytesRead);
+ if (inFluxBytes > maxInFluxBytes) {
+ inFluxBytes = easeDataPressure(futures);
+ }
+
futures.add(CompletableFuture.runAsync(
() -> mtx.partUploaded(
callAPIToUploadPart(mtx, ur, taskBuffer, currentBytesRead, currentPartCnt)),
@@ -228,6 +250,7 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext
partCnt++;
totalBytes += bytesRead;
+ inFluxBytes += bytesRead;
}
for (CompletableFuture future : futures) {
@@ -243,6 +266,18 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext
return mtx.transferred(chunkSize, totalBytes, partCnt);
}
+ private int easeDataPressure(List> futures)
+ throws InterruptedException, ExecutionException {
+
+ logger.debug("Reached max in-flux bytes - easing pressure");
+ for (CompletableFuture future : futures) {
+ future.get();
+ }
+ logger.debug("Max in-flux bytes handled - pressure eased");
+ futures.clear();
+ return 0;
+ }
+
protected UploadedPartContext callAPIToUploadPart(
MultipartTransferContext mtx, UploadRequest ur, byte[] part, int read, int partNo) {
@@ -348,6 +383,7 @@ public static class FusionAPIUploadOperationsBuilder {
int singlePartUploadSizeLimit;
int uploadPartSize;
int uploadThreadPoolSize;
+ long maxInFluxDataSize;
public FusionAPIUploadOperationsBuilder configuration(FusionConfiguration configuration) {
this.configuration = configuration;
@@ -371,6 +407,12 @@ private FusionAPIUploadOperationsBuilder uploadThreadPoolSize(int uploadThreadPo
this.uploadThreadPoolSize = uploadThreadPoolSize;
return this;
}
+
+ @SuppressWarnings("PIT")
+ private FusionAPIUploadOperationsBuilder maxInFluxDataSize(long maxInFluxDataSize) {
+ this.maxInFluxDataSize = maxInFluxDataSize;
+ return this;
+ }
}
private static class CustomFusionAPIUploadOperationsBuilder extends FusionAPIUploadOperationsBuilder {
@@ -379,6 +421,7 @@ public FusionAPIUploadOperations build() {
this.singlePartUploadSizeLimit = configuration.getSinglePartUploadSizeLimit();
this.uploadPartSize = configuration.getUploadPartSize();
this.uploadThreadPoolSize = configuration.getUploadThreadPoolSize();
+ this.maxInFluxDataSize = configuration.getMaxInFluxDataSize();
if (Objects.isNull(digestProducer)) {
this.digestProducer = AlgoSpecificDigestProducer.builder()