diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java index 1e31473709..f88cfee7a5 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java @@ -29,6 +29,8 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -41,6 +43,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.file.Counters; +import org.apache.commons.io.file.PathUtils; import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream; import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream; import org.slf4j.Logger; @@ -77,10 +81,18 @@ public class PipesClient implements Closeable { private DataOutputStream output; private DataInputStream input; private int filesProcessed = 0; + //this is the client-specific subdirectory of the pipesConfig's getPipesTmpDir + final Path tmpDir; public PipesClient(PipesConfigBase pipesConfig) { this.pipesConfig = pipesConfig; this.pipesClientId = CLIENT_COUNTER.getAndIncrement(); + try { + tmpDir = Files.createTempDirectory(pipesConfig.getPipesTmpDir(), + "client-" + this.pipesClientId + "-"); + } catch (IOException e) { + throw new RuntimeException("Couldn't create temp dir?!", e); + } } public int getFilesProcessed() { @@ -277,6 +289,19 @@ private void destroyForcibly() throws InterruptedException { if (process.isAlive()) { LOG.error("Process still alive after {}ms", WAIT_ON_DESTROY_MS); } + try { + if (Files.isDirectory(tmpDir)) { + LOG.debug("about to delete the full async temp directory: {}", + pipesConfig.getPipesTmpDir().toAbsolutePath()); + Counters.PathCounters pathCounters = + PathUtils.deleteDirectory(pipesConfig.getPipesTmpDir()); + LOG.debug("Successfully deleted {} temporary files in {} directories", + pathCounters.getFileCounter().get(), + pathCounters.getDirectoryCounter().get()); + } + } catch (IllegalArgumentException | IOException e) { + LOG.warn("Failed to delete temporary directory: " + tmpDir.toAbsolutePath(), e); + } } private PipesResult readResults(FetchEmitTuple t, long start) throws IOException { @@ -420,6 +445,9 @@ private void restart() throws IOException, InterruptedException, TimeoutExceptio } executorService = Executors.newFixedThreadPool(1); } + if (! Files.isDirectory(tmpDir)) { + Files.createDirectories(tmpDir); + } LOG.info("pipesClientId={}: restarting process", pipesClientId); } else { LOG.info("pipesClientId={}: starting process", pipesClientId); @@ -524,6 +552,11 @@ private String[] getCommandline() { origGCString = arg; newGCLogString = arg.replace("${pipesClientId}", "id-" + pipesClientId); } + if (arg.startsWith("-Djava.io.tmpdir=")) { + throw new IllegalArgumentException("Can't specify java.io.tmpdir in jvmargs. Set " + + "the overall tmpdir for all async process and its forked processes in the" + + " attribute."); + } } if (origGCString != null && newGCLogString != null) { @@ -552,6 +585,7 @@ private String[] getCommandline() { "-Dlog4j.configurationFile=classpath:pipes-fork-server-default-log4j2.xml"); } commandLine.add("-DpipesClientId=" + pipesClientId); + commandLine.add("-Djava.io.tmpdir=" + ProcessUtils.escapeCommandLine(tmpDir.toAbsolutePath().toString())); commandLine.addAll(configArgs); commandLine.add("org.apache.tika.pipes.PipesServer"); commandLine.add(ProcessUtils.escapeCommandLine( diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java index bf6a6bb696..ab7a98b88c 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java @@ -16,6 +16,8 @@ */ package org.apache.tika.pipes; +import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; @@ -59,6 +61,8 @@ public class PipesConfigBase extends ConfigBase { private Path tikaConfig; private String javaPath = "java"; + private Path pipesTmpDir = null; + public long getTimeoutMillis() { return timeoutMillis; } @@ -171,4 +175,23 @@ public long getSleepOnStartupTimeoutMillis() { public void setSleepOnStartupTimeoutMillis(long sleepOnStartupTimeoutMillis) { this.sleepOnStartupTimeoutMillis = sleepOnStartupTimeoutMillis; } + + public void setPipesTmpDir(String pipesTmpDir) { + setPipesTmpDirPath(Paths.get(pipesTmpDir)); + } + + public void setPipesTmpDirPath(Path pipesTmpDir) { + this.pipesTmpDir = pipesTmpDir; + } + + public Path getPipesTmpDir() throws IOException { + if (pipesTmpDir == null) { + pipesTmpDir = Files.createTempDirectory("tika-pipes-tmp-dir-"); + } else { + if (! Files.isDirectory(pipesTmpDir)) { + Files.createDirectories(pipesTmpDir); + } + } + return pipesTmpDir; + } } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java index 3a6751f4ff..03aac7eb2c 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java @@ -18,7 +18,9 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; @@ -30,6 +32,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.io.file.Counters; +import org.apache.commons.io.file.PathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +86,26 @@ public AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator) throws T asyncConfig.getNumClients() + asyncConfig.getNumEmitters() + 1); this.executorCompletionService = new ExecutorCompletionService<>(executorService); + + final Path tmpDir = asyncConfig.getPipesTmpDir(); + final List workers = new ArrayList<>(); + + for (int i = 0; i < asyncConfig.getNumClients(); i++) { + workers.add(new FetchEmitWorker(asyncConfig, fetchEmitTuples, emitData)); + } + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + executorService.shutdownNow(); + for (FetchEmitWorker worker : workers) { + try { + worker.close(); + } catch (IOException e) { + LOG.warn("Exception closing worker", e); + } + } + cleanTmpDir(tmpDir); + })); + try { if (!tikaConfigPath.toAbsolutePath().equals(asyncConfig.getTikaConfig().toAbsolutePath())) { LOG.warn("TikaConfig for AsyncProcessor ({}) is different " + @@ -105,9 +129,8 @@ public AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator) throws T startCounter((TotalCounter) pipesIterator); } - for (int i = 0; i < asyncConfig.getNumClients(); i++) { - executorCompletionService.submit( - new FetchEmitWorker(asyncConfig, fetchEmitTuples, emitData)); + for (FetchEmitWorker worker : workers) { + executorCompletionService.submit(worker); } EmitterManager emitterManager = EmitterManager.load(asyncConfig.getTikaConfig()); @@ -252,17 +275,39 @@ public synchronized boolean checkActive() throws InterruptedException { public void close() throws IOException { executorService.shutdownNow(); asyncConfig.getPipesReporter().close(); + cleanTmpDir(asyncConfig.getPipesTmpDir()); + } + + private static void cleanTmpDir(Path tmpDir) { + if (tmpDir == null) { + return; + } + if (Files.isDirectory(tmpDir)) { + try { + LOG.debug("about to delete the full async temp directory: {}", + tmpDir); + Counters.PathCounters pathCounters = PathUtils.deleteDirectory(tmpDir); + LOG.debug("Successfully deleted {} temporary files in {} directories", + pathCounters.getFileCounter().get(), + pathCounters.getDirectoryCounter().get()); + } catch (IllegalArgumentException e) { + LOG.debug("null tmpDir? " + tmpDir, e); + } catch (IOException e) { + LOG.warn("Couldn't delete tmpDir: " + tmpDir, e); + } + } } public long getTotalProcessed() { return totalProcessed.get(); } - private class FetchEmitWorker implements Callable { + private class FetchEmitWorker implements Callable, Closeable { private final AsyncConfig asyncConfig; private final ArrayBlockingQueue fetchEmitTuples; private final ArrayBlockingQueue emitDataQueue; + private final PipesClient pipesClient; private FetchEmitWorker(AsyncConfig asyncConfig, ArrayBlockingQueue fetchEmitTuples, @@ -270,12 +315,13 @@ private FetchEmitWorker(AsyncConfig asyncConfig, this.asyncConfig = asyncConfig; this.fetchEmitTuples = fetchEmitTuples; this.emitDataQueue = emitDataQueue; + this.pipesClient = new PipesClient(asyncConfig); } @Override public Integer call() throws Exception { - try (PipesClient pipesClient = new PipesClient(asyncConfig)) { + try { while (true) { FetchEmitTuple t = fetchEmitTuples.poll(1, TimeUnit.SECONDS); if (t == null) { @@ -322,9 +368,15 @@ public Integer call() throws Exception { totalProcessed.incrementAndGet(); } } + } finally { + close(); } } + public void close() throws IOException { + pipesClient.close(); + } + private boolean shouldEmit(PipesResult result) { if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS ||