Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage Max In-Flux Data During Uploads #60

Merged
merged 7 commits into from
Apr 9, 2024
Merged
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.3.5</version> <!-- logback >1.3.x does not support JDK8 -->
<version>1.3.12</version> <!-- logback >1.3.x does not support JDK8 -->
<scope>test</scope>
</dependency>

Expand All @@ -96,7 +96,7 @@
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<version>2.35.0</version>
<version>2.35.1</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public class FusionConfiguration {
@Builder.Default
int uploadPartSize = 8;

/**
* Max in flux data to be read at a given time. Defaults to 500MB.
* If a value such as 1gb is required, then client would set this value to 1000;
*/
@Builder.Default
long maxInFluxDataSize = 500;

/**
* Size of Thread-Pool to be used for uploading chunks of a multipart file
* Defaults to number of available processors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,11 +33,16 @@
import java.util.concurrent.Executors;
import lombok.Builder;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Builder
@Getter
public class FusionAPIUploadOperations implements APIUploadOperations {

private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final String UPLOAD_FAILED_EXCEPTION_MSG =
"Exception encountered while attempting to upload part, please try again";

Expand Down Expand Up @@ -76,6 +82,12 @@ public class FusionAPIUploadOperations implements APIUploadOperations {
*/
int uploadThreadPoolSize;

/**
* Max size of in-flux data that can be read at a given time.
* See {@link FusionConfiguration} for default values.
*/
long maxInFluxDataSize;

/**
* Call the API upload endpoint to load a distribution
*
Expand Down Expand Up @@ -206,28 +218,39 @@ protected MultipartTransferContext callAPIToInitiateMultiPartUpload(UploadReques
protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext mtx, UploadRequest ur) {

int chunkSize = uploadPartSize * (1024 * 1024);
long maxInFluxBytes = maxInFluxDataSize * (1024L * 1024L);

byte[] buffer = new byte[chunkSize];
int partCnt = 1;
int totalBytes = 0;
int inFluxBytes = 0;

ExecutorService executor = Executors.newFixedThreadPool(uploadThreadPoolSize);
try {
List<CompletableFuture<Void>> futures = new ArrayList<>();

int bytesRead;
while ((bytesRead = ur.getData().read(buffer)) != -1) {

logger.debug(
"Creating upload task for part number {}, bytes read for this part {}", partCnt, bytesRead);

final int currentPartCnt = partCnt;
final int currentBytesRead = bytesRead;
byte[] taskBuffer = Arrays.copyOf(buffer, bytesRead);

if (inFluxBytes > maxInFluxBytes) {
inFluxBytes = easeDataPressure(futures);
}

futures.add(CompletableFuture.runAsync(
() -> mtx.partUploaded(
callAPIToUploadPart(mtx, ur, taskBuffer, currentBytesRead, currentPartCnt)),
executor));

partCnt++;
totalBytes += bytesRead;
inFluxBytes += bytesRead;
}

for (CompletableFuture<Void> future : futures) {
Expand All @@ -243,6 +266,18 @@ protected MultipartTransferContext callAPIToUploadParts(MultipartTransferContext
return mtx.transferred(chunkSize, totalBytes, partCnt);
}

private int easeDataPressure(List<CompletableFuture<Void>> futures)
throws InterruptedException, ExecutionException {

logger.debug("Reached max in-flux bytes - easing pressure");
for (CompletableFuture<Void> future : futures) {
future.get();
}
logger.debug("Max in-flux bytes handled - pressure eased");
futures.clear();
return 0;
}

protected UploadedPartContext callAPIToUploadPart(
MultipartTransferContext mtx, UploadRequest ur, byte[] part, int read, int partNo) {

Expand Down Expand Up @@ -348,6 +383,7 @@ public static class FusionAPIUploadOperationsBuilder {
int singlePartUploadSizeLimit;
int uploadPartSize;
int uploadThreadPoolSize;
long maxInFluxDataSize;

public FusionAPIUploadOperationsBuilder configuration(FusionConfiguration configuration) {
this.configuration = configuration;
Expand All @@ -371,6 +407,12 @@ private FusionAPIUploadOperationsBuilder uploadThreadPoolSize(int uploadThreadPo
this.uploadThreadPoolSize = uploadThreadPoolSize;
return this;
}

@SuppressWarnings("PIT")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this suppression for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with Ian, false positives from PIT. Needs to be addressed in a future PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be addressing this in next batch of build work. False metric being provided here.

private FusionAPIUploadOperationsBuilder maxInFluxDataSize(long maxInFluxDataSize) {
this.maxInFluxDataSize = maxInFluxDataSize;
return this;
}
}

private static class CustomFusionAPIUploadOperationsBuilder extends FusionAPIUploadOperationsBuilder {
Expand All @@ -379,6 +421,7 @@ public FusionAPIUploadOperations build() {
this.singlePartUploadSizeLimit = configuration.getSinglePartUploadSizeLimit();
this.uploadPartSize = configuration.getUploadPartSize();
this.uploadThreadPoolSize = configuration.getUploadThreadPoolSize();
this.maxInFluxDataSize = configuration.getMaxInFluxDataSize();

if (Objects.isNull(digestProducer)) {
this.digestProducer = AlgoSpecificDigestProducer.builder()
Expand Down
Loading