From 8c748ea0eda89e11f0aad4d95250dcb96d0f63c4 Mon Sep 17 00:00:00 2001 From: Jan van Mansum Date: Sun, 10 Nov 2024 10:21:10 +0100 Subject: [PATCH] Upload with multiple zip files working --- .../nl/knaw/dans/dvingest/core/ImportJob.java | 9 +++ .../knaw/dans/dvingest/core/IngestArea.java | 9 ++- .../knaw/dans/dvingest/core/IngestTask.java | 58 +++++++++++++++++++ .../knaw/dans/dvingest/core/PathIterator.java | 11 +++- .../dvingest/core/PathIteratorZipper.java | 6 +- 5 files changed, 89 insertions(+), 4 deletions(-) diff --git a/src/main/java/nl/knaw/dans/dvingest/core/ImportJob.java b/src/main/java/nl/knaw/dans/dvingest/core/ImportJob.java index b5bb902..4e25a0d 100644 --- a/src/main/java/nl/knaw/dans/dvingest/core/ImportJob.java +++ b/src/main/java/nl/knaw/dans/dvingest/core/ImportJob.java @@ -34,15 +34,22 @@ @RequiredArgsConstructor public class ImportJob implements Runnable { @NonNull + @Getter private final ImportCommandDto importCommand; @NonNull private final Path outputDir; @NonNull private final DataverseClient dataverseClient; + @NonNull + private final CompletionHandler completionHandler; @Getter private final ImportJobStatusDto status = new ImportJobStatusDto(); + public static interface CompletionHandler { + void handle(ImportJob job); + } + @Override public void run() { try { @@ -74,6 +81,8 @@ public void run() { catch (Exception e) { log.error("Failed to process import job", e); status.setStatus(StatusEnum.FAILED); + } finally { + completionHandler.handle(this); } } diff --git a/src/main/java/nl/knaw/dans/dvingest/core/IngestArea.java b/src/main/java/nl/knaw/dans/dvingest/core/IngestArea.java index 162372e..d003b42 100644 --- a/src/main/java/nl/knaw/dans/dvingest/core/IngestArea.java +++ b/src/main/java/nl/knaw/dans/dvingest/core/IngestArea.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import nl.knaw.dans.dvingest.api.ImportCommandDto; import nl.knaw.dans.dvingest.api.ImportJobStatusDto; +import nl.knaw.dans.dvingest.core.ImportJob.CompletionHandler; import nl.knaw.dans.lib.dataverse.DataverseClient; import java.io.IOException; @@ -83,7 +84,13 @@ private ImportJob createImportJob(ImportCommandDto importCommand) { else { relativePath = inbox.relativize(Path.of(importCommand.getPath())); } - return new ImportJob(importCommand, outbox.resolve(relativePath).toAbsolutePath(), dataverseClient); + return new ImportJob(importCommand, outbox.resolve(relativePath).toAbsolutePath(), dataverseClient, new CompletionHandler() { + + @Override + public void handle(ImportJob job) { + importJobs.remove(job.getImportCommand().getPath()); + } + }); } private void validatePath(String path) { diff --git a/src/main/java/nl/knaw/dans/dvingest/core/IngestTask.java b/src/main/java/nl/knaw/dans/dvingest/core/IngestTask.java index c3718da..49df54e 100644 --- a/src/main/java/nl/knaw/dans/dvingest/core/IngestTask.java +++ b/src/main/java/nl/knaw/dans/dvingest/core/IngestTask.java @@ -18,8 +18,12 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import nl.knaw.dans.lib.dataverse.DataverseClient; +import nl.knaw.dans.lib.dataverse.DataverseException; +import nl.knaw.dans.lib.dataverse.model.file.FileMeta; +import org.apache.commons.io.FileUtils; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; @Slf4j @@ -33,13 +37,29 @@ public class IngestTask implements Runnable { public void run() { try { var result = dataverseClient.dataverse("root").createDataset(deposit.getDatasetMetadata()); + var pid = result.getData().getPersistentId(); log.debug(result.getEnvelopeAsString()); // Upload files + var iterator = new PathIterator(FileUtils.iterateFiles(deposit.getFilesDir().toFile(), null, true)); + int uploadBatchSize = 1000; + while (iterator.hasNext()) { + var zipFile = PathIteratorZipper.builder() + .rootDir(deposit.getFilesDir()) + .sourceIterator(iterator) + .targetZipFile(Files.createTempFile("dvingest", ".zip")) + .maxNumberOfFiles(uploadBatchSize) + .build() + .zip(); + dataverseClient.dataset(pid).addFile(zipFile, new FileMeta()); + log.debug("Uploaded {} files (cumulative)", iterator.getIteratedCount()); + } // Publish dataset + dataverseClient.dataset(pid).publish(); // Wait for publish to complete + waitForState(pid, "RELEASED"); deposit.moveTo(outputDir.resolve("processed")); } @@ -53,4 +73,42 @@ public void run() { } } } + + private void waitForState(String datasetId, String expectedState) { + var numberOfTimesTried = 0; + var state = ""; + + try { + state = getDatasetState(datasetId); + + log.debug("Initial state for dataset {} is {}", datasetId, state); + + // TODO: make configurable again + while (!expectedState.equals(state) && numberOfTimesTried < 10) { + Thread.sleep(3000); + + state = getDatasetState(datasetId); + numberOfTimesTried += 1; + log.trace("Current state for dataset {} is {}, numberOfTimesTried = {}", datasetId, state, numberOfTimesTried); + } + + if (!expectedState.equals(state)) { + throw new IllegalStateException(String.format( + "Dataset did not become %s within the wait period; current state is %s", expectedState, state + )); + } + } + catch (InterruptedException e) { + throw new RuntimeException("Dataset state check was interrupted; last know state is " + state); + } + catch (IOException | DataverseException e) { + throw new RuntimeException(e); + } + } + + private String getDatasetState(String datasetId) throws IOException, DataverseException { + var version = dataverseClient.dataset(datasetId).getLatestVersion(); + return version.getData().getLatestVersion().getVersionState(); + + } } diff --git a/src/main/java/nl/knaw/dans/dvingest/core/PathIterator.java b/src/main/java/nl/knaw/dans/dvingest/core/PathIterator.java index 2ddd97e..dee351e 100644 --- a/src/main/java/nl/knaw/dans/dvingest/core/PathIterator.java +++ b/src/main/java/nl/knaw/dans/dvingest/core/PathIterator.java @@ -15,17 +15,23 @@ */ package nl.knaw.dans.dvingest.core; -import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; import java.io.File; import java.nio.file.Path; import java.util.Iterator; // TODO: move to dans-java-utils -@AllArgsConstructor +@RequiredArgsConstructor public class PathIterator implements Iterator { + @NonNull private final Iterator fileIterator; + @Getter + private long iteratedCount = 0; + @Override public boolean hasNext() { return fileIterator.hasNext(); @@ -33,6 +39,7 @@ public boolean hasNext() { @Override public Path next() { + iteratedCount++; return fileIterator.next().toPath(); } } \ No newline at end of file diff --git a/src/main/java/nl/knaw/dans/dvingest/core/PathIteratorZipper.java b/src/main/java/nl/knaw/dans/dvingest/core/PathIteratorZipper.java index d404e7f..9f3997a 100644 --- a/src/main/java/nl/knaw/dans/dvingest/core/PathIteratorZipper.java +++ b/src/main/java/nl/knaw/dans/dvingest/core/PathIteratorZipper.java @@ -15,8 +15,10 @@ */ package nl.knaw.dans.dvingest.core; +import lombok.AccessLevel; import lombok.Builder; import lombok.NonNull; +import lombok.RequiredArgsConstructor; import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream; import org.apache.commons.io.IOUtils; @@ -31,6 +33,7 @@ // TODO: move to dans-java-utils @Builder +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) public class PathIteratorZipper { @NonNull private final Path rootDir; @@ -45,7 +48,7 @@ public class PathIteratorZipper { @Builder.Default private final int maxNumberOfFiles = Integer.MAX_VALUE; - public void zip() throws IOException { + public Path zip() throws IOException { if (overwrite && Files.exists(targetZipFile)) { Files.delete(targetZipFile); } else { @@ -70,6 +73,7 @@ public void zip() throws IOException { } } } + return targetZipFile; } }