Skip to content

Commit

Permalink
Keep track of rogue output lines from the worker
Browse files Browse the repository at this point in the history
And if the worker process crashes, aggregate any error lines that
transpired during the lifetime of the service, reporting them as a
crash message.
  • Loading branch information
ctrueden committed Sep 25, 2024
1 parent 34c77b1 commit 177bb42
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 21 deletions.
148 changes: 128 additions & 20 deletions src/main/java/org/apposed/appose/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -58,6 +59,20 @@ public class Service implements AutoCloseable {
private final Map<String, Task> tasks = new ConcurrentHashMap<>();
private final int serviceID;

/**
* List of unparseable (non-JSON) lines seen since the service started.
* If the worker process crashes, we use these lines as the content
* of an error message to any still pending tasks when reporting the crash.
*/
private final List<String> invalidLines = new ArrayList<>();

/**
* List of lines emitted to the standard error stream seen since the service
* started. If the worker process crashes, we use these lines as the content
* of an error message to any still pending tasks when reporting the crash.
*/
private final List<String> errorLines = new ArrayList<>();

private Process process;
private PrintWriter stdin;
private Thread stdoutThread;
Expand Down Expand Up @@ -131,12 +146,84 @@ public Task task(String script, Map<String, Object> inputs) throws IOException {
return new Task(script, inputs);
}

/** Closes the worker process's input stream, in order to shut it down. */
/**
* Closes the worker process's input stream, in order to shut it down.
* Pending tasks will run to completion before the worker process terminates.
* <p>
* To shut down the service more forcibly, interrupting any pending tasks,
* use {@link #kill()} instead.
* </p>
* <p>
* To wait until the service's worker process has completely shut down
* and all output has been reported, call {@link #waitFor()} afterward.
* </p>
*/
@Override
public void close() {
stdin.close();
}

/**
* Forces the service's worker process to begin shutting down. Any tasks still
* pending completion will be interrupted, reporting {@link TaskStatus#CRASHED}.
* <p>
* To shut down the service more gently, allowing any pending tasks to run to
* completion, use {@link #close()} instead.
* </p>
* <p>
* To wait until the service's worker process has completely shut down
* and all output has been reported, call {@link #waitFor()} afterward.
* </p>
*/
public void kill() {
process.destroyForcibly();
}

/**
* Waits for the service's worker process to terminate.
*
* @return Exit value of the worker process.
* @throws InterruptedException If any of the worker process's monitoring
* threads are interrupted before shutting down.
*/
public int waitFor() throws InterruptedException {
process.waitFor();

// Wait for worker output processing threads to finish up.
stdoutThread.join();
stderrThread.join();
monitorThread.join();

return process.exitValue();
}

/**
* Returns true if the service's worker process is currently running,
* or false if it has not yet started or has already shut down or crashed.
*
* @return Whether the service's worker process is currently running.
*/
public boolean isAlive() {
return process != null && process.isAlive();
}

/**
* Unparseable lines emitted by the worker process on its stdout stream,
* collected over the lifetime of the service.
* Can be useful for analyzing why a worker process has crashed.
*/
public List<String> invalidLines() {
return Collections.unmodifiableList(invalidLines);
}

/**
* Lines emitted by the worker process on its stderr stream,
* collected over the lifetime of the service.
* Can be useful for analyzing why a worker process has crashed.
*/
public List<String> errorLines() {
return Collections.unmodifiableList(errorLines);
}
/** Input loop processing lines from the worker stdout stream. */
private void stdoutLoop() {
BufferedReader stdout = new BufferedReader(new InputStreamReader(process.getInputStream()));
Expand All @@ -153,7 +240,7 @@ private void stdoutLoop() {

if (line == null) {
debugService("<worker stdout closed>");
return;
break;
}
try {
Map<String, Object> response = Types.decode(line);
Expand All @@ -174,47 +261,68 @@ private void stdoutLoop() {
// Something went wrong decoding the line of JSON.
// Skip it and keep going, but log it first.
debugService(String.format("<INVALID> %s", line));
invalidLines.add(line);
}
}
}

/** Input loop processing lines from the worker stderr stream. */
private void stderrLoop() {
BufferedReader stderr = new BufferedReader(new InputStreamReader(process.getErrorStream()));
try {
while (true) {
String line = stderr.readLine();
if (line == null) {
debugService("<worker stderr closed>");
return;
}
debugWorker(line);
while (true) {
String line;
try {
line = stderr.readLine();
}
}
catch (IOException exc) {
debugWorker(Types.stackTrace(exc));
catch (IOException exc) {
// Something went wrong reading the line. Panic!
debugService(Types.stackTrace(exc));
break;
}
if (line == null) {
debugService("<worker stderr closed>");
break;
}
debugWorker(line);
errorLines.add(line);
}
}

@SuppressWarnings("BusyWait")
private void monitorLoop() {
// Wait until the worker process terminates.
while (process.isAlive()) {
while (process.isAlive() || stdoutThread.isAlive() || stderrThread.isAlive()) {
try {
Thread.sleep(50);
Thread.sleep(10);
}
catch (InterruptedException exc) {
debugService(Types.stackTrace(exc));
}
}
debugService("<worker process termination detected>");

// Do some sanity checks.
int exitCode = process.exitValue();
if (exitCode != 0) debugService("<worker process terminated with exit code " + exitCode + ">");
int taskCount = tasks.size();
if (taskCount > 0) debugService("<worker process terminated with " + taskCount + " pending tasks>");
if (taskCount > 0) {
debugService("<worker process terminated with " +
taskCount + " pending task" + (taskCount == 1 ? "" : "s") + ">");
}

// Notify any remaining tasks about the process crash.
tasks.values().forEach(Task::crash);
Collection<Task> remainingTasks = tasks.values();
if (!remainingTasks.isEmpty()) {
// Notify any remaining tasks about the process crash.
StringBuilder sb = new StringBuilder();
String nl = System.lineSeparator();
sb.append("Worker crashed with exit code ").append(exitCode).append(".").append(nl);
String stdout = invalidLines.isEmpty() ? "<none>" : String.join(nl, invalidLines);
String stderr = errorLines.isEmpty() ? "<none>" : String.join(nl, errorLines);
sb.append(nl).append("[stdout]").append(nl).append(stdout).append(nl);
sb.append(nl).append("[stderr]").append(nl).append(stderr).append(nl);
String error = sb.toString();
remainingTasks.forEach(task -> task.crash(error));
}
tasks.clear();
}

Expand Down Expand Up @@ -325,7 +433,6 @@ private void request(RequestType requestType, Map<String, Object> args) {
debugService(encoded);
}

@SuppressWarnings("hiding")
private void handle(Map<String, Object> response) {
String maybeResponseType = (String) response.get("responseType");
if (maybeResponseType == null) {
Expand Down Expand Up @@ -377,9 +484,10 @@ private void handle(Map<String, Object> response) {
}
}

private void crash() {
private void crash(String error) {
TaskEvent event = new TaskEvent(this, ResponseType.CRASH);
status = TaskStatus.CRASHED;
this.error = error;
listeners.forEach(l -> l.accept(event));
synchronized (this) {
notifyAll();
Expand Down
95 changes: 94 additions & 1 deletion src/test/java/org/apposed/appose/ApposeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
package org.apposed.appose;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -38,6 +40,7 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apposed.appose.Service.ResponseType;
Expand Down Expand Up @@ -152,7 +155,7 @@ public void testServiceStartupFailure() throws IOException, InterruptedException
public void testTaskFailurePython() throws InterruptedException, IOException {
Environment env = Appose.system();
try (Service service = env.python()) {
service.debug(System.out::println);
maybeDebug(service);
String script = "whee\n";
Task task = service.task(script);
task.waitFor();
Expand All @@ -168,6 +171,96 @@ public void testTaskFailurePython() throws InterruptedException, IOException {
}
}

@Test
public void testStartupCrash() throws InterruptedException, IOException {
Environment env = Appose.system();
List<String> pythonExes = Arrays.asList("python", "python3", "python.exe");
Service service = env.service(pythonExes, "-c", "import nonexistentpackage").start();
// Wait up to 500ms for the crash.
for (int i = 0; i < 100; i++) {
if (!service.isAlive()) break;
Thread.sleep(5);
}
assertFalse(service.isAlive());
assertEquals(3, service.errorLines().size());
// Check that the crash happened and was recorded correctly.
List<String> expectedLines = Arrays.asList(
"Traceback (most recent call last):",
" File \"<string>\", line 1, in <module>",
"ModuleNotFoundError: No module named 'nonexistentpackage'"
);
assertEquals(expectedLines, service.errorLines());
}

@Test
public void testCrashWithActiveTask() throws InterruptedException, IOException {
Environment env = Appose.system();
try (Service service = env.python()) {
maybeDebug(service);
// Create a "long-running" task.
String script =
"import sys\n" +
"sys.stderr.write('one\\n')\n" +
"sys.stderr.flush()\n" +
"print('two')\n" +
"sys.stdout.flush()\n" +
"sys.stderr.write('three\\n')\n" +
"sys.stderr.flush()\n" +
"task.update('halfway')\n" +
"print('four')\n" +
"sys.stdout.flush()\n" +
"sys.stderr.write('five\\n')\n" +
"sys.stderr.flush()\n" +
"print('six')\n" +
"sys.stdout.flush()\n" +
"sys.stderr.write('seven\\n')\n" +
"import time; time.sleep(999)\n";
Task task = service.task(script);

// Record any crash reported in the task notifications.
String[] reportedError = {null};
task.listen(event -> {
if (event.responseType == ResponseType.CRASH) {
reportedError[0] = task.error;
}
});
// Launch the task.
task.start();
// Simulate a crash after 500ms has gone by.
Thread.sleep(500);
service.kill();

// Wait for the service to fully shut down after the crash.
int exitCode = service.waitFor();
assertTrue(exitCode != 0);

// Is the tag flagged as crashed?
assertSame(TaskStatus.CRASHED, task.status);

// Was the crash error successfully and consistently recorded?
assertNotNull(reportedError[0]);
List<String> lines = Arrays.asList(task.error.split("\\n"));
String nl = System.lineSeparator();
assertEquals(Arrays.asList("two", "four", "six"), service.invalidLines());
assertEquals(Arrays.asList("one", "three", "five", "seven"), service.errorLines());
String expected =
"Worker crashed with exit code ###." + nl +
nl +
"[stdout]" + nl +
"two" + nl +
"four" + nl +
"six" + nl +
nl +
"[stderr]" + nl +
"one" + nl +
"three" + nl +
"five" + nl +
"seven" + nl;
String generalizedError = task.error.replaceFirst("exit code [0-9]+", "exit code ###");
assertEquals(expected, generalizedError);
}
}

public void executeAndAssert(Service service, String script)
throws IOException, InterruptedException
{
Expand Down

0 comments on commit 177bb42

Please sign in to comment.