From 91ceb68496a596d4af8ec050e3107eec75f73872 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 8 Sep 2023 12:25:57 +0000 Subject: [PATCH 1/5] Bump com.github.tomakehurst:wiremock-jre8 from 2.35.0 to 2.35.1 Bumps [com.github.tomakehurst:wiremock-jre8](https://github.com/wiremock/wiremock) from 2.35.0 to 2.35.1. - [Release notes](https://github.com/wiremock/wiremock/releases) - [Commits](https://github.com/wiremock/wiremock/compare/2.35.0...2.35.1) --- updated-dependencies: - dependency-name: com.github.tomakehurst:wiremock-jre8 dependency-type: direct:development ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index a81774f..a32a198 100644 --- a/pom.xml +++ b/pom.xml @@ -96,7 +96,7 @@ com.github.tomakehurst wiremock-jre8 - 2.35.0 + 2.35.1 test From 48c639c9e9cd5b6f0b1154798911981b332beff1 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 29 Nov 2023 22:47:23 +0000 Subject: [PATCH 2/5] Bump ch.qos.logback:logback-classic from 1.3.5 to 1.3.12 Bumps [ch.qos.logback:logback-classic](https://github.com/qos-ch/logback) from 1.3.5 to 1.3.12. - [Commits](https://github.com/qos-ch/logback/compare/v_1.3.5...v_1.3.12) --- updated-dependencies: - dependency-name: ch.qos.logback:logback-classic dependency-type: direct:development ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index ad3ff0d..1a5cc6d 100644 --- a/pom.xml +++ b/pom.xml @@ -75,7 +75,7 @@ ch.qos.logback logback-classic - 1.3.5 + 1.3.12 test From 030cbe238ba569c28ae09217599acf09a10b6660 Mon Sep 17 00:00:00 2001 From: Ian Knight <128476114+knighto82@users.noreply.github.com> Date: Tue, 9 Apr 2024 13:43:30 +0100 Subject: [PATCH 3/5] Added configuration to manage max allowed in-flux data during uploads - protecting the heap --- .../fusion/FusionConfiguration.java | 7 ++++ .../operations/FusionAPIUploadOperations.java | 40 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/src/main/java/io/github/jpmorganchase/fusion/FusionConfiguration.java b/src/main/java/io/github/jpmorganchase/fusion/FusionConfiguration.java index 635d33e..b596a12 100644 --- a/src/main/java/io/github/jpmorganchase/fusion/FusionConfiguration.java +++ b/src/main/java/io/github/jpmorganchase/fusion/FusionConfiguration.java @@ -51,6 +51,13 @@ public class FusionConfiguration { @Builder.Default int uploadPartSize = 8; + /** + * Max in flux data to be read at a given time. Defaults to 500MB. + * If a value such as 1gb is required, then client would set this value to 1000; + */ + @Builder.Default + long maxInFluxDataSize = 500; + /** * Size of Thread-Pool to be used for uploading chunks of a multipart file * Defaults to number of available processors. diff --git a/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java b/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java index 501ce7c..20acba6 100644 --- a/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java +++ b/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java @@ -24,6 +24,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.invoke.MethodHandles; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -32,11 +33,15 @@ import java.util.concurrent.Executors; import lombok.Builder; import lombok.Getter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Builder @Getter public class FusionAPIUploadOperations implements APIUploadOperations { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String UPLOAD_FAILED_EXCEPTION_MSG = "Exception encountered while attempting to upload part, please try again"; @@ -76,6 +81,12 @@ public class FusionAPIUploadOperations implements APIUploadOperations { */ int uploadThreadPoolSize; + /** + * Max size of in-flux data that can be read at a given time. + * See {@link FusionConfiguration} for default values. + */ + long maxInFluxDataSize; + /** * Call the API upload endpoint to load a distribution * @@ -206,10 +217,13 @@ protected MultipartTransferContext callAPIToInitiateMultiPartUpload(UploadReques protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext mtx, UploadRequest ur) { int chunkSize = uploadPartSize * (1024 * 1024); + long maxInFluxBytes = maxInFluxDataSize * (1024L * 1024L); + byte[] buffer = new byte[chunkSize]; int partCnt = 1; int totalBytes = 0; + int inFluxBytes = 0; ExecutorService executor = Executors.newFixedThreadPool(uploadThreadPoolSize); try { @@ -217,10 +231,17 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext int bytesRead; while ((bytesRead = ur.getData().read(buffer)) != -1) { + + logger.debug("Creating upload task for part number {}, bytes read for this part {}", partCnt, bytesRead); + final int currentPartCnt = partCnt; final int currentBytesRead = bytesRead; byte[] taskBuffer = Arrays.copyOf(buffer, bytesRead); + if (inFluxBytes > maxInFluxBytes) { + inFluxBytes = easeDataPressure(futures); + } + futures.add(CompletableFuture.runAsync( () -> mtx.partUploaded( callAPIToUploadPart(mtx, ur, taskBuffer, currentBytesRead, currentPartCnt)), @@ -243,6 +264,17 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext return mtx.transferred(chunkSize, totalBytes, partCnt); } + private int easeDataPressure(List> futures) throws InterruptedException, ExecutionException { + + logger.debug("Reached max in-flux bytes - easing pressure"); + for (CompletableFuture future : futures) { + future.get(); + } + logger.debug("Max in-flux bytes handled - pressure eased"); + futures.clear(); + return 0; + } + protected UploadedPartContext callAPIToUploadPart( MultipartTransferContext mtx, UploadRequest ur, byte[] part, int read, int partNo) { @@ -348,6 +380,7 @@ public static class FusionAPIUploadOperationsBuilder { int singlePartUploadSizeLimit; int uploadPartSize; int uploadThreadPoolSize; + long maxInFluxDataSize; public FusionAPIUploadOperationsBuilder configuration(FusionConfiguration configuration) { this.configuration = configuration; @@ -371,6 +404,12 @@ private FusionAPIUploadOperationsBuilder uploadThreadPoolSize(int uploadThreadPo this.uploadThreadPoolSize = uploadThreadPoolSize; return this; } + + @SuppressWarnings("PIT") + private FusionAPIUploadOperationsBuilder maxInFluxDataSize(long maxInFluxDataSize) { + this.maxInFluxDataSize = maxInFluxDataSize; + return this; + } } private static class CustomFusionAPIUploadOperationsBuilder extends FusionAPIUploadOperationsBuilder { @@ -379,6 +418,7 @@ public FusionAPIUploadOperations build() { this.singlePartUploadSizeLimit = configuration.getSinglePartUploadSizeLimit(); this.uploadPartSize = configuration.getUploadPartSize(); this.uploadThreadPoolSize = configuration.getUploadThreadPoolSize(); + this.maxInFluxDataSize = configuration.getMaxInFluxDataSize(); if (Objects.isNull(digestProducer)) { this.digestProducer = AlgoSpecificDigestProducer.builder() From dd4aa19f556600dfeb2a799f94082c9cd3e848c6 Mon Sep 17 00:00:00 2001 From: Ian Knight <128476114+knighto82@users.noreply.github.com> Date: Tue, 9 Apr 2024 14:05:50 +0100 Subject: [PATCH 4/5] Spotless adjustments --- .../api/operations/FusionAPIUploadOperations.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java b/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java index 20acba6..5d0f543 100644 --- a/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java +++ b/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java @@ -40,7 +40,8 @@ @Getter public class FusionAPIUploadOperations implements APIUploadOperations { - private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger logger = + LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final String UPLOAD_FAILED_EXCEPTION_MSG = "Exception encountered while attempting to upload part, please try again"; @@ -219,7 +220,6 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext int chunkSize = uploadPartSize * (1024 * 1024); long maxInFluxBytes = maxInFluxDataSize * (1024L * 1024L); - byte[] buffer = new byte[chunkSize]; int partCnt = 1; int totalBytes = 0; @@ -232,7 +232,8 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext int bytesRead; while ((bytesRead = ur.getData().read(buffer)) != -1) { - logger.debug("Creating upload task for part number {}, bytes read for this part {}", partCnt, bytesRead); + logger.debug( + "Creating upload task for part number {}, bytes read for this part {}", partCnt, bytesRead); final int currentPartCnt = partCnt; final int currentBytesRead = bytesRead; @@ -264,7 +265,8 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext return mtx.transferred(chunkSize, totalBytes, partCnt); } - private int easeDataPressure(List> futures) throws InterruptedException, ExecutionException { + private int easeDataPressure(List> futures) + throws InterruptedException, ExecutionException { logger.debug("Reached max in-flux bytes - easing pressure"); for (CompletableFuture future : futures) { From d8156c8c00b1e2798e3209581cca2d2588acf4f3 Mon Sep 17 00:00:00 2001 From: Ian Knight <128476114+knighto82@users.noreply.github.com> Date: Tue, 9 Apr 2024 14:32:43 +0100 Subject: [PATCH 5/5] Dont forget to incrment current count of influx bytes --- .../fusion/api/operations/FusionAPIUploadOperations.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java b/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java index 5d0f543..8b84457 100644 --- a/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java +++ b/src/main/java/io/github/jpmorganchase/fusion/api/operations/FusionAPIUploadOperations.java @@ -250,6 +250,7 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext partCnt++; totalBytes += bytesRead; + inFluxBytes += bytesRead; } for (CompletableFuture future : futures) {