Skip to content

Commit

Permalink
Upload with multiple zip files working
Browse files Browse the repository at this point in the history
  • Loading branch information
janvanmansum committed Nov 10, 2024
1 parent 8cce606 commit 8c748ea
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 4 deletions.
9 changes: 9 additions & 0 deletions src/main/java/nl/knaw/dans/dvingest/core/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,22 @@
@RequiredArgsConstructor
public class ImportJob implements Runnable {
@NonNull
@Getter
private final ImportCommandDto importCommand;
@NonNull
private final Path outputDir;
@NonNull
private final DataverseClient dataverseClient;
@NonNull
private final CompletionHandler completionHandler;

@Getter
private final ImportJobStatusDto status = new ImportJobStatusDto();

public static interface CompletionHandler {
void handle(ImportJob job);
}

@Override
public void run() {
try {
Expand Down Expand Up @@ -74,6 +81,8 @@ public void run() {
catch (Exception e) {
log.error("Failed to process import job", e);
status.setStatus(StatusEnum.FAILED);
} finally {
completionHandler.handle(this);
}
}

Expand Down
9 changes: 8 additions & 1 deletion src/main/java/nl/knaw/dans/dvingest/core/IngestArea.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.extern.slf4j.Slf4j;
import nl.knaw.dans.dvingest.api.ImportCommandDto;
import nl.knaw.dans.dvingest.api.ImportJobStatusDto;
import nl.knaw.dans.dvingest.core.ImportJob.CompletionHandler;
import nl.knaw.dans.lib.dataverse.DataverseClient;

import java.io.IOException;
Expand Down Expand Up @@ -83,7 +84,13 @@ private ImportJob createImportJob(ImportCommandDto importCommand) {
else {
relativePath = inbox.relativize(Path.of(importCommand.getPath()));
}
return new ImportJob(importCommand, outbox.resolve(relativePath).toAbsolutePath(), dataverseClient);
return new ImportJob(importCommand, outbox.resolve(relativePath).toAbsolutePath(), dataverseClient, new CompletionHandler() {

@Override
public void handle(ImportJob job) {
importJobs.remove(job.getImportCommand().getPath());
}
});
}

private void validatePath(String path) {
Expand Down
58 changes: 58 additions & 0 deletions src/main/java/nl/knaw/dans/dvingest/core/IngestTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import nl.knaw.dans.lib.dataverse.DataverseClient;
import nl.knaw.dans.lib.dataverse.DataverseException;
import nl.knaw.dans.lib.dataverse.model.file.FileMeta;
import org.apache.commons.io.FileUtils;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

@Slf4j
Expand All @@ -33,13 +37,29 @@ public class IngestTask implements Runnable {
public void run() {
try {
var result = dataverseClient.dataverse("root").createDataset(deposit.getDatasetMetadata());
var pid = result.getData().getPersistentId();
log.debug(result.getEnvelopeAsString());

// Upload files
var iterator = new PathIterator(FileUtils.iterateFiles(deposit.getFilesDir().toFile(), null, true));
int uploadBatchSize = 1000;
while (iterator.hasNext()) {
var zipFile = PathIteratorZipper.builder()
.rootDir(deposit.getFilesDir())
.sourceIterator(iterator)
.targetZipFile(Files.createTempFile("dvingest", ".zip"))
.maxNumberOfFiles(uploadBatchSize)
.build()
.zip();
dataverseClient.dataset(pid).addFile(zipFile, new FileMeta());
log.debug("Uploaded {} files (cumulative)", iterator.getIteratedCount());
}

// Publish dataset
dataverseClient.dataset(pid).publish();

// Wait for publish to complete
waitForState(pid, "RELEASED");

deposit.moveTo(outputDir.resolve("processed"));
}
Expand All @@ -53,4 +73,42 @@ public void run() {
}
}
}

private void waitForState(String datasetId, String expectedState) {
var numberOfTimesTried = 0;
var state = "";

try {
state = getDatasetState(datasetId);

log.debug("Initial state for dataset {} is {}", datasetId, state);

// TODO: make configurable again
while (!expectedState.equals(state) && numberOfTimesTried < 10) {
Thread.sleep(3000);

state = getDatasetState(datasetId);
numberOfTimesTried += 1;
log.trace("Current state for dataset {} is {}, numberOfTimesTried = {}", datasetId, state, numberOfTimesTried);
}

if (!expectedState.equals(state)) {
throw new IllegalStateException(String.format(
"Dataset did not become %s within the wait period; current state is %s", expectedState, state
));
}
}
catch (InterruptedException e) {
throw new RuntimeException("Dataset state check was interrupted; last know state is " + state);
}
catch (IOException | DataverseException e) {
throw new RuntimeException(e);
}
}

private String getDatasetState(String datasetId) throws IOException, DataverseException {
var version = dataverseClient.dataset(datasetId).getLatestVersion();
return version.getData().getLatestVersion().getVersionState();

}
}
11 changes: 9 additions & 2 deletions src/main/java/nl/knaw/dans/dvingest/core/PathIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,31 @@
*/
package nl.knaw.dans.dvingest.core;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

import java.io.File;
import java.nio.file.Path;
import java.util.Iterator;

// TODO: move to dans-java-utils
@AllArgsConstructor
@RequiredArgsConstructor
public class PathIterator implements Iterator<Path> {
@NonNull
private final Iterator<File> fileIterator;

@Getter
private long iteratedCount = 0;

@Override
public boolean hasNext() {
return fileIterator.hasNext();
}

@Override
public Path next() {
iteratedCount++;
return fileIterator.next().toPath();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package nl.knaw.dans.dvingest.core;

import lombok.AccessLevel;
import lombok.Builder;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
import org.apache.commons.io.IOUtils;
Expand All @@ -31,6 +33,7 @@

// TODO: move to dans-java-utils
@Builder
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class PathIteratorZipper {
@NonNull
private final Path rootDir;
Expand All @@ -45,7 +48,7 @@ public class PathIteratorZipper {
@Builder.Default
private final int maxNumberOfFiles = Integer.MAX_VALUE;

public void zip() throws IOException {
public Path zip() throws IOException {
if (overwrite && Files.exists(targetZipFile)) {
Files.delete(targetZipFile);
} else {
Expand All @@ -70,6 +73,7 @@ public void zip() throws IOException {
}
}
}
return targetZipFile;
}
}

Expand Down

0 comments on commit 8c748ea

Please sign in to comment.