diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 1e31473709..f88cfee7a5 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -29,6 +29,8 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -41,6 +43,8 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.file.Counters;
+import org.apache.commons.io.file.PathUtils;
import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream;
import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
import org.slf4j.Logger;
@@ -77,10 +81,18 @@ public class PipesClient implements Closeable {
private DataOutputStream output;
private DataInputStream input;
private int filesProcessed = 0;
+ //this is the client-specific subdirectory of the pipesConfig's getPipesTmpDir
+ final Path tmpDir;
public PipesClient(PipesConfigBase pipesConfig) {
this.pipesConfig = pipesConfig;
this.pipesClientId = CLIENT_COUNTER.getAndIncrement();
+ try {
+ tmpDir = Files.createTempDirectory(pipesConfig.getPipesTmpDir(),
+ "client-" + this.pipesClientId + "-");
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't create temp dir?!", e);
+ }
}
public int getFilesProcessed() {
@@ -277,6 +289,19 @@ private void destroyForcibly() throws InterruptedException {
if (process.isAlive()) {
LOG.error("Process still alive after {}ms", WAIT_ON_DESTROY_MS);
}
+ try {
+ if (Files.isDirectory(tmpDir)) {
+ LOG.debug("about to delete the full async temp directory: {}",
+ pipesConfig.getPipesTmpDir().toAbsolutePath());
+ Counters.PathCounters pathCounters =
+ PathUtils.deleteDirectory(pipesConfig.getPipesTmpDir());
+ LOG.debug("Successfully deleted {} temporary files in {} directories",
+ pathCounters.getFileCounter().get(),
+ pathCounters.getDirectoryCounter().get());
+ }
+ } catch (IllegalArgumentException | IOException e) {
+ LOG.warn("Failed to delete temporary directory: " + tmpDir.toAbsolutePath(), e);
+ }
}
private PipesResult readResults(FetchEmitTuple t, long start) throws IOException {
@@ -420,6 +445,9 @@ private void restart() throws IOException, InterruptedException, TimeoutExceptio
}
executorService = Executors.newFixedThreadPool(1);
}
+ if (! Files.isDirectory(tmpDir)) {
+ Files.createDirectories(tmpDir);
+ }
LOG.info("pipesClientId={}: restarting process", pipesClientId);
} else {
LOG.info("pipesClientId={}: starting process", pipesClientId);
@@ -524,6 +552,11 @@ private String[] getCommandline() {
origGCString = arg;
newGCLogString = arg.replace("${pipesClientId}", "id-" + pipesClientId);
}
+ if (arg.startsWith("-Djava.io.tmpdir=")) {
+ throw new IllegalArgumentException("Can't specify java.io.tmpdir in jvmargs. Set " +
+ "the overall tmpdir for all async process and its forked processes in the" +
+ " attribute.");
+ }
}
if (origGCString != null && newGCLogString != null) {
@@ -552,6 +585,7 @@ private String[] getCommandline() {
"-Dlog4j.configurationFile=classpath:pipes-fork-server-default-log4j2.xml");
}
commandLine.add("-DpipesClientId=" + pipesClientId);
+ commandLine.add("-Djava.io.tmpdir=" + ProcessUtils.escapeCommandLine(tmpDir.toAbsolutePath().toString()));
commandLine.addAll(configArgs);
commandLine.add("org.apache.tika.pipes.PipesServer");
commandLine.add(ProcessUtils.escapeCommandLine(
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
index bf6a6bb696..ab7a98b88c 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
@@ -16,6 +16,8 @@
*/
package org.apache.tika.pipes;
+import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -59,6 +61,8 @@ public class PipesConfigBase extends ConfigBase {
private Path tikaConfig;
private String javaPath = "java";
+ private Path pipesTmpDir = null;
+
public long getTimeoutMillis() {
return timeoutMillis;
}
@@ -171,4 +175,23 @@ public long getSleepOnStartupTimeoutMillis() {
public void setSleepOnStartupTimeoutMillis(long sleepOnStartupTimeoutMillis) {
this.sleepOnStartupTimeoutMillis = sleepOnStartupTimeoutMillis;
}
+
+ public void setPipesTmpDir(String pipesTmpDir) {
+ setPipesTmpDirPath(Paths.get(pipesTmpDir));
+ }
+
+ public void setPipesTmpDirPath(Path pipesTmpDir) {
+ this.pipesTmpDir = pipesTmpDir;
+ }
+
+ public Path getPipesTmpDir() throws IOException {
+ if (pipesTmpDir == null) {
+ pipesTmpDir = Files.createTempDirectory("tika-pipes-tmp-dir-");
+ } else {
+ if (! Files.isDirectory(pipesTmpDir)) {
+ Files.createDirectories(pipesTmpDir);
+ }
+ }
+ return pipesTmpDir;
+ }
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 3a6751f4ff..03aac7eb2c 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -18,7 +18,9 @@
import java.io.Closeable;
import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
@@ -30,6 +32,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.io.file.Counters;
+import org.apache.commons.io.file.PathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +86,26 @@ public AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator) throws T
asyncConfig.getNumClients() + asyncConfig.getNumEmitters() + 1);
this.executorCompletionService =
new ExecutorCompletionService<>(executorService);
+
+ final Path tmpDir = asyncConfig.getPipesTmpDir();
+ final List workers = new ArrayList<>();
+
+ for (int i = 0; i < asyncConfig.getNumClients(); i++) {
+ workers.add(new FetchEmitWorker(asyncConfig, fetchEmitTuples, emitData));
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ executorService.shutdownNow();
+ for (FetchEmitWorker worker : workers) {
+ try {
+ worker.close();
+ } catch (IOException e) {
+ LOG.warn("Exception closing worker", e);
+ }
+ }
+ cleanTmpDir(tmpDir);
+ }));
+
try {
if (!tikaConfigPath.toAbsolutePath().equals(asyncConfig.getTikaConfig().toAbsolutePath())) {
LOG.warn("TikaConfig for AsyncProcessor ({}) is different " +
@@ -105,9 +129,8 @@ public AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator) throws T
startCounter((TotalCounter) pipesIterator);
}
- for (int i = 0; i < asyncConfig.getNumClients(); i++) {
- executorCompletionService.submit(
- new FetchEmitWorker(asyncConfig, fetchEmitTuples, emitData));
+ for (FetchEmitWorker worker : workers) {
+ executorCompletionService.submit(worker);
}
EmitterManager emitterManager = EmitterManager.load(asyncConfig.getTikaConfig());
@@ -252,17 +275,39 @@ public synchronized boolean checkActive() throws InterruptedException {
public void close() throws IOException {
executorService.shutdownNow();
asyncConfig.getPipesReporter().close();
+ cleanTmpDir(asyncConfig.getPipesTmpDir());
+ }
+
+ private static void cleanTmpDir(Path tmpDir) {
+ if (tmpDir == null) {
+ return;
+ }
+ if (Files.isDirectory(tmpDir)) {
+ try {
+ LOG.debug("about to delete the full async temp directory: {}",
+ tmpDir);
+ Counters.PathCounters pathCounters = PathUtils.deleteDirectory(tmpDir);
+ LOG.debug("Successfully deleted {} temporary files in {} directories",
+ pathCounters.getFileCounter().get(),
+ pathCounters.getDirectoryCounter().get());
+ } catch (IllegalArgumentException e) {
+ LOG.debug("null tmpDir? " + tmpDir, e);
+ } catch (IOException e) {
+ LOG.warn("Couldn't delete tmpDir: " + tmpDir, e);
+ }
+ }
}
public long getTotalProcessed() {
return totalProcessed.get();
}
- private class FetchEmitWorker implements Callable {
+ private class FetchEmitWorker implements Callable, Closeable {
private final AsyncConfig asyncConfig;
private final ArrayBlockingQueue fetchEmitTuples;
private final ArrayBlockingQueue emitDataQueue;
+ private final PipesClient pipesClient;
private FetchEmitWorker(AsyncConfig asyncConfig,
ArrayBlockingQueue fetchEmitTuples,
@@ -270,12 +315,13 @@ private FetchEmitWorker(AsyncConfig asyncConfig,
this.asyncConfig = asyncConfig;
this.fetchEmitTuples = fetchEmitTuples;
this.emitDataQueue = emitDataQueue;
+ this.pipesClient = new PipesClient(asyncConfig);
}
@Override
public Integer call() throws Exception {
- try (PipesClient pipesClient = new PipesClient(asyncConfig)) {
+ try {
while (true) {
FetchEmitTuple t = fetchEmitTuples.poll(1, TimeUnit.SECONDS);
if (t == null) {
@@ -322,9 +368,15 @@ public Integer call() throws Exception {
totalProcessed.incrementAndGet();
}
}
+ } finally {
+ close();
}
}
+ public void close() throws IOException {
+ pipesClient.close();
+ }
+
private boolean shouldEmit(PipesResult result) {
if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS ||