Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.logging.log4j.Level;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -151,8 +152,11 @@ void pauseAndResume_ObjectNotChanged_shouldResumeDownload(S3TransferManager tm)
assertEqualsBySdkFields(resumableFileDownload.downloadFileRequest(), request);
assertThat(testDownloadListener.getObjectResponse).isNotNull();

// wait to give CRT client time to write last bytes to file if any
long actualInProgressFileSize = waitUntilFileSizeStable(path);

// Skip the test if everything has been downloaded.
Assumptions.assumeTrue(resumableFileDownload.bytesTransferred() < sourceFile.length());
Assumptions.assumeTrue(actualInProgressFileSize < sourceFile.length());

assertThat(resumableFileDownload.s3ObjectLastModified()).hasValue(testDownloadListener.getObjectResponse.lastModified());
// Request may not be cancelled right away when pause is invoked, so there may be more bytes written to the file
Expand All @@ -162,7 +166,7 @@ void pauseAndResume_ObjectNotChanged_shouldResumeDownload(S3TransferManager tm)
assertThat(download.completionFuture()).isCancelled();

log.debug(() -> "Resuming download ");
verifyFileDownload(path, resumableFileDownload, OBJ_SIZE - bytesTransferred, tm);
verifyFileDownload(path, resumableFileDownload, OBJ_SIZE - actualInProgressFileSize, tm);
}

private void assertEqualsBySdkFields(DownloadFileRequest actual, DownloadFileRequest expected) {
Expand Down Expand Up @@ -252,6 +256,25 @@ private static void waitUntilFirstByteBufferDelivered(FileDownload download) {
waiter.run(() -> download.progress().snapshot());
}

/**
* CRT File downloads can continue to flush bytes to the file after pause. Wait until the file size is stable.
*
* @param file
* @return the final file size
*/
private static long waitUntilFileSizeStable(Path file) {
final AtomicLong fileSize = new AtomicLong(0L);
Waiter<Path> waiter = Waiter.builder(Path.class)
.addAcceptor(WaiterAcceptor.successOnResponseAcceptor(p -> file.toFile().length() == fileSize.getAndSet(file.toFile().length())))
.addAcceptor(WaiterAcceptor.retryOnResponseAcceptor(r -> true))
.overrideConfiguration(o -> o.waitTimeout(Duration.ofMinutes(1))
.maxAttempts(Integer.MAX_VALUE)
.backoffStrategy(FixedDelayBackoffStrategy.create(Duration.ofMillis(500))))
.build();
waiter.run(() -> file);
return file.toFile().length();
}

private static final class TestDownloadListener implements TransferListener {
private int transferInitiatedCount = 0;
private GetObjectResponse getObjectResponse;
Expand Down
Loading