From 1b340a6c365934809a6f4a1bb2343d0cfa49e7c2 Mon Sep 17 00:00:00 2001 From: tballison Date: Wed, 5 Jul 2023 16:44:27 -0400 Subject: [PATCH 1/2] TIKA-4097 -- handle orphaned temp files more robustly --- .../org/apache/tika/pipes/PipesClient.java | 31 +++++++++++++++++++ .../apache/tika/pipes/PipesConfigBase.java | 19 ++++++++++++ .../tika/pipes/async/AsyncProcessor.java | 15 +++++++++ 3 files changed, 65 insertions(+) diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java index 1e31473709..9552ba1f6b 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java @@ -29,6 +29,8 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -41,6 +43,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.file.Counters; +import org.apache.commons.io.file.PathUtils; import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream; import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream; import org.slf4j.Logger; @@ -77,10 +81,18 @@ public class PipesClient implements Closeable { private DataOutputStream output; private DataInputStream input; private int filesProcessed = 0; + //this is the client-specific subdirectory of the pipesConfig's getPipesTmpDir + final Path tmpDir; public PipesClient(PipesConfigBase pipesConfig) { this.pipesConfig = pipesConfig; this.pipesClientId = CLIENT_COUNTER.getAndIncrement(); + try { + tmpDir = Files.createTempDirectory(pipesConfig.getPipesTmpDir(), + "client-" + this.pipesClientId); + } catch (IOException e) { + throw new RuntimeException("Couldn't create temp dir?!", e); + } } public int getFilesProcessed() { @@ -277,6 +289,19 @@ private void destroyForcibly() throws InterruptedException { if (process.isAlive()) { LOG.error("Process still alive after {}ms", WAIT_ON_DESTROY_MS); } + try { + if (Files.isDirectory(tmpDir)) { + LOG.debug("about to delete the full async temp directory: {}", + pipesConfig.getPipesTmpDir().toAbsolutePath()); + Counters.PathCounters pathCounters = + PathUtils.deleteDirectory(pipesConfig.getPipesTmpDir()); + LOG.debug("Successfully deleted {} temporary files in {} directories", + pathCounters.getFileCounter().get(), + pathCounters.getDirectoryCounter().get()); + } + } catch (IllegalArgumentException | IOException e) { + LOG.warn("Failed to delete temporary directory: " + tmpDir.toAbsolutePath(), e); + } } private PipesResult readResults(FetchEmitTuple t, long start) throws IOException { @@ -524,6 +549,11 @@ private String[] getCommandline() { origGCString = arg; newGCLogString = arg.replace("${pipesClientId}", "id-" + pipesClientId); } + if (arg.startsWith("-Djava.io.tmpdir=")) { + throw new IllegalArgumentException("Can't specify java.io.tmpdir in jvmargs. Set " + + "the overall tmpdir for all async process and its forked processes in the" + + " attribute."); + } } if (origGCString != null && newGCLogString != null) { @@ -552,6 +582,7 @@ private String[] getCommandline() { "-Dlog4j.configurationFile=classpath:pipes-fork-server-default-log4j2.xml"); } commandLine.add("-DpipesClientId=" + pipesClientId); + commandLine.add("-Djava.io.tmpdir=" + ProcessUtils.escapeCommandLine(tmpDir.toAbsolutePath().toString())); commandLine.addAll(configArgs); commandLine.add("org.apache.tika.pipes.PipesServer"); commandLine.add(ProcessUtils.escapeCommandLine( diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java index bf6a6bb696..6fd6cc9b46 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java @@ -16,6 +16,8 @@ */ package org.apache.tika.pipes; +import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; @@ -59,6 +61,8 @@ public class PipesConfigBase extends ConfigBase { private Path tikaConfig; private String javaPath = "java"; + private Path pipesTmpDir = null; + public long getTimeoutMillis() { return timeoutMillis; } @@ -171,4 +175,19 @@ public long getSleepOnStartupTimeoutMillis() { public void setSleepOnStartupTimeoutMillis(long sleepOnStartupTimeoutMillis) { this.sleepOnStartupTimeoutMillis = sleepOnStartupTimeoutMillis; } + + public void setPipesTmpDir(Path pipesTmpDir) { + this.pipesTmpDir = pipesTmpDir; + } + + public Path getPipesTmpDir() throws IOException { + if (pipesTmpDir == null) { + pipesTmpDir = Files.createTempDirectory("tika-pipes-tmp-dir"); + } else { + if (! Files.isDirectory(pipesTmpDir)) { + Files.createDirectories(pipesTmpDir); + } + } + return pipesTmpDir; + } } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java index 3a6751f4ff..5b602a94c9 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java @@ -18,6 +18,7 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; @@ -30,6 +31,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.io.file.Counters; +import org.apache.commons.io.file.PathUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -252,6 +255,18 @@ public synchronized boolean checkActive() throws InterruptedException { public void close() throws IOException { executorService.shutdownNow(); asyncConfig.getPipesReporter().close(); + if (Files.isDirectory(asyncConfig.getPipesTmpDir())) { + try { + LOG.debug("about to delete the full async temp directory: {}", + asyncConfig.getPipesTmpDir().toAbsolutePath()); + Counters.PathCounters pathCounters = PathUtils.deleteDirectory(asyncConfig.getPipesTmpDir()); + LOG.debug("Successfully deleted {} temporary files in {} directories", + pathCounters.getFileCounter().get(), + pathCounters.getDirectoryCounter().get()); + } catch (IllegalArgumentException e) { + throw new IOException(e); + } + } } public long getTotalProcessed() { From b8f04fc54f8f37975097eec65f019a62214d657b Mon Sep 17 00:00:00 2001 From: tballison Date: Wed, 19 Jul 2023 15:06:56 -0400 Subject: [PATCH 2/2] TIKA-4097 -- handle orphaned temp files more robustly -- checkpoint commit -- WIP --- .../org/apache/tika/pipes/PipesClient.java | 5 +- .../apache/tika/pipes/PipesConfigBase.java | 8 ++- .../tika/pipes/async/AsyncProcessor.java | 55 ++++++++++++++++--- 3 files changed, 56 insertions(+), 12 deletions(-) diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java index 9552ba1f6b..f88cfee7a5 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java @@ -89,7 +89,7 @@ public PipesClient(PipesConfigBase pipesConfig) { this.pipesClientId = CLIENT_COUNTER.getAndIncrement(); try { tmpDir = Files.createTempDirectory(pipesConfig.getPipesTmpDir(), - "client-" + this.pipesClientId); + "client-" + this.pipesClientId + "-"); } catch (IOException e) { throw new RuntimeException("Couldn't create temp dir?!", e); } @@ -445,6 +445,9 @@ private void restart() throws IOException, InterruptedException, TimeoutExceptio } executorService = Executors.newFixedThreadPool(1); } + if (! Files.isDirectory(tmpDir)) { + Files.createDirectories(tmpDir); + } LOG.info("pipesClientId={}: restarting process", pipesClientId); } else { LOG.info("pipesClientId={}: starting process", pipesClientId); diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java index 6fd6cc9b46..ab7a98b88c 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java @@ -176,13 +176,17 @@ public void setSleepOnStartupTimeoutMillis(long sleepOnStartupTimeoutMillis) { this.sleepOnStartupTimeoutMillis = sleepOnStartupTimeoutMillis; } - public void setPipesTmpDir(Path pipesTmpDir) { + public void setPipesTmpDir(String pipesTmpDir) { + setPipesTmpDirPath(Paths.get(pipesTmpDir)); + } + + public void setPipesTmpDirPath(Path pipesTmpDir) { this.pipesTmpDir = pipesTmpDir; } public Path getPipesTmpDir() throws IOException { if (pipesTmpDir == null) { - pipesTmpDir = Files.createTempDirectory("tika-pipes-tmp-dir"); + pipesTmpDir = Files.createTempDirectory("tika-pipes-tmp-dir-"); } else { if (! Files.isDirectory(pipesTmpDir)) { Files.createDirectories(pipesTmpDir); diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java index 5b602a94c9..03aac7eb2c 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; @@ -85,6 +86,26 @@ public AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator) throws T asyncConfig.getNumClients() + asyncConfig.getNumEmitters() + 1); this.executorCompletionService = new ExecutorCompletionService<>(executorService); + + final Path tmpDir = asyncConfig.getPipesTmpDir(); + final List workers = new ArrayList<>(); + + for (int i = 0; i < asyncConfig.getNumClients(); i++) { + workers.add(new FetchEmitWorker(asyncConfig, fetchEmitTuples, emitData)); + } + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + executorService.shutdownNow(); + for (FetchEmitWorker worker : workers) { + try { + worker.close(); + } catch (IOException e) { + LOG.warn("Exception closing worker", e); + } + } + cleanTmpDir(tmpDir); + })); + try { if (!tikaConfigPath.toAbsolutePath().equals(asyncConfig.getTikaConfig().toAbsolutePath())) { LOG.warn("TikaConfig for AsyncProcessor ({}) is different " + @@ -108,9 +129,8 @@ public AsyncProcessor(Path tikaConfigPath, PipesIterator pipesIterator) throws T startCounter((TotalCounter) pipesIterator); } - for (int i = 0; i < asyncConfig.getNumClients(); i++) { - executorCompletionService.submit( - new FetchEmitWorker(asyncConfig, fetchEmitTuples, emitData)); + for (FetchEmitWorker worker : workers) { + executorCompletionService.submit(worker); } EmitterManager emitterManager = EmitterManager.load(asyncConfig.getTikaConfig()); @@ -255,16 +275,25 @@ public synchronized boolean checkActive() throws InterruptedException { public void close() throws IOException { executorService.shutdownNow(); asyncConfig.getPipesReporter().close(); - if (Files.isDirectory(asyncConfig.getPipesTmpDir())) { + cleanTmpDir(asyncConfig.getPipesTmpDir()); + } + + private static void cleanTmpDir(Path tmpDir) { + if (tmpDir == null) { + return; + } + if (Files.isDirectory(tmpDir)) { try { LOG.debug("about to delete the full async temp directory: {}", - asyncConfig.getPipesTmpDir().toAbsolutePath()); - Counters.PathCounters pathCounters = PathUtils.deleteDirectory(asyncConfig.getPipesTmpDir()); + tmpDir); + Counters.PathCounters pathCounters = PathUtils.deleteDirectory(tmpDir); LOG.debug("Successfully deleted {} temporary files in {} directories", pathCounters.getFileCounter().get(), pathCounters.getDirectoryCounter().get()); } catch (IllegalArgumentException e) { - throw new IOException(e); + LOG.debug("null tmpDir? " + tmpDir, e); + } catch (IOException e) { + LOG.warn("Couldn't delete tmpDir: " + tmpDir, e); } } } @@ -273,11 +302,12 @@ public long getTotalProcessed() { return totalProcessed.get(); } - private class FetchEmitWorker implements Callable { + private class FetchEmitWorker implements Callable, Closeable { private final AsyncConfig asyncConfig; private final ArrayBlockingQueue fetchEmitTuples; private final ArrayBlockingQueue emitDataQueue; + private final PipesClient pipesClient; private FetchEmitWorker(AsyncConfig asyncConfig, ArrayBlockingQueue fetchEmitTuples, @@ -285,12 +315,13 @@ private FetchEmitWorker(AsyncConfig asyncConfig, this.asyncConfig = asyncConfig; this.fetchEmitTuples = fetchEmitTuples; this.emitDataQueue = emitDataQueue; + this.pipesClient = new PipesClient(asyncConfig); } @Override public Integer call() throws Exception { - try (PipesClient pipesClient = new PipesClient(asyncConfig)) { + try { while (true) { FetchEmitTuple t = fetchEmitTuples.poll(1, TimeUnit.SECONDS); if (t == null) { @@ -337,9 +368,15 @@ public Integer call() throws Exception { totalProcessed.incrementAndGet(); } } + } finally { + close(); } } + public void close() throws IOException { + pipesClient.close(); + } + private boolean shouldEmit(PipesResult result) { if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS ||