Skip to content

Commit 3c405b7

Browse files
committed
On branch edburns/dd-2758695-virtual-threads Add **Shared ScheduledExecutorService** for timeouts
## CopilotSession.java - Added `ScheduledExecutorService` import. - New field `timeoutScheduler`: shared single-thread scheduler, daemon thread named `sendAndWait-timeout`. - Initialized in 3-arg constructor. - `sendAndWait()`: replaced per-call `Executors.newSingleThreadScheduledExecutor()` with `timeoutScheduler.schedule()`. Cleanup calls `timeoutTask.cancel(false)` instead of `scheduler.shutdown()`. - `close()`: added `timeoutScheduler.shutdownNow()` before the blocking `session.destroy` RPC call so stale timeouts cannot fire after close. ## TimeoutEdgeCaseTest.java (new) - `testTimeoutDoesNotFireAfterSessionClose`: proves close() cancels pending timeouts (future not completed by stale TimeoutException). - `testSendAndWaitReusesTimeoutThread`: proves two sendAndWait calls share one scheduler thread instead of spawning two. - Uses reflection to construct a hanging `JsonRpcClient` (blocking InputStream, sink OutputStream). Signed-off-by: Ed Burns <edburns@microsoft.com>
1 parent 80b1392 commit 3c405b7

File tree

2 files changed

+160
-9
lines changed

2 files changed

+160
-9
lines changed

src/main/java/com/github/copilot/sdk/CopilotSession.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.concurrent.CompletableFuture;
1515
import java.util.concurrent.ConcurrentHashMap;
1616
import java.util.concurrent.Executors;
17+
import java.util.concurrent.ScheduledExecutorService;
1718
import java.util.concurrent.TimeUnit;
1819
import java.util.concurrent.TimeoutException;
1920
import java.util.concurrent.atomic.AtomicReference;
@@ -121,6 +122,7 @@ public final class CopilotSession implements AutoCloseable {
121122
private volatile EventErrorHandler eventErrorHandler;
122123
private volatile EventErrorPolicy eventErrorPolicy = EventErrorPolicy.PROPAGATE_AND_LOG_ERRORS;
123124
private volatile Map<String, java.util.function.Function<String, CompletableFuture<String>>> transformCallbacks;
125+
private final ScheduledExecutorService timeoutScheduler;
124126

125127
/** Tracks whether this session instance has been terminated via close(). */
126128
private volatile boolean isTerminated = false;
@@ -157,6 +159,11 @@ public final class CopilotSession implements AutoCloseable {
157159
this.sessionId = sessionId;
158160
this.rpc = rpc;
159161
this.workspacePath = workspacePath;
162+
this.timeoutScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
163+
var t = new Thread(r, "sendAndWait-timeout");
164+
t.setDaemon(true);
165+
return t;
166+
});
160167
}
161168

162169
/**
@@ -407,17 +414,11 @@ public CompletableFuture<AssistantMessageEvent> sendAndWait(MessageOptions optio
407414
return null;
408415
});
409416

410-
// Set up timeout with daemon thread so it doesn't prevent JVM exit
411-
var scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
412-
var t = new Thread(r, "sendAndWait-timeout");
413-
t.setDaemon(true);
414-
return t;
415-
});
416-
scheduler.schedule(() -> {
417+
// Schedule timeout on the shared session-level scheduler
418+
var timeoutTask = timeoutScheduler.schedule(() -> {
417419
if (!future.isDone()) {
418420
future.completeExceptionally(new TimeoutException("sendAndWait timed out after " + timeoutMs + "ms"));
419421
}
420-
scheduler.shutdown();
421422
}, timeoutMs, TimeUnit.MILLISECONDS);
422423

423424
var result = new CompletableFuture<AssistantMessageEvent>();
@@ -429,7 +430,7 @@ public CompletableFuture<AssistantMessageEvent> sendAndWait(MessageOptions optio
429430
} catch (IOException e) {
430431
LOG.log(Level.SEVERE, "Error closing subscription", e);
431432
}
432-
scheduler.shutdown();
433+
timeoutTask.cancel(false);
433434
if (!result.isDone()) {
434435
if (ex != null) {
435436
result.completeExceptionally(ex);
@@ -1303,6 +1304,8 @@ public void close() {
13031304
isTerminated = true;
13041305
}
13051306

1307+
timeoutScheduler.shutdownNow();
1308+
13061309
try {
13071310
rpc.invoke("session.destroy", Map.of("sessionId", sessionId), Void.class).get(5, TimeUnit.SECONDS);
13081311
} catch (Exception e) {
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) Microsoft Corporation. All rights reserved.
3+
*--------------------------------------------------------------------------------------------*/
4+
5+
package com.github.copilot.sdk;
6+
7+
import static org.junit.jupiter.api.Assertions.assertFalse;
8+
import static org.junit.jupiter.api.Assertions.assertTrue;
9+
10+
import java.io.ByteArrayOutputStream;
11+
import java.io.IOException;
12+
import java.io.InputStream;
13+
import java.net.Socket;
14+
import java.util.concurrent.CompletableFuture;
15+
16+
import org.junit.jupiter.api.Test;
17+
18+
import com.github.copilot.sdk.events.AssistantMessageEvent;
19+
import com.github.copilot.sdk.json.MessageOptions;
20+
21+
/**
22+
* Tests for timeout edge cases in {@link CopilotSession#sendAndWait}.
23+
* <p>
24+
* These tests prove two defects in the current per-call
25+
* {@code ScheduledExecutorService} approach:
26+
* <ol>
27+
* <li>A timeout fires after {@code close()}, leaking a {@code TimeoutException}
28+
* onto the returned future.</li>
29+
* <li>Each {@code sendAndWait} call spawns a new OS thread (~1 MB stack),
30+
* instead of reusing a shared scheduler thread.</li>
31+
* </ol>
32+
*/
33+
public class TimeoutEdgeCaseTest {
34+
35+
/**
36+
* Creates a {@link JsonRpcClient} whose {@code invoke()} returns futures that
37+
* never complete. The reader thread blocks forever on the input stream, and
38+
* writes go to a no-op output stream.
39+
*/
40+
private JsonRpcClient createHangingRpcClient() throws Exception {
41+
InputStream blockingInput = new InputStream() {
42+
@Override
43+
public int read() throws IOException {
44+
try {
45+
Thread.sleep(Long.MAX_VALUE);
46+
} catch (InterruptedException e) {
47+
Thread.currentThread().interrupt();
48+
return -1;
49+
}
50+
return -1;
51+
}
52+
};
53+
ByteArrayOutputStream sinkOutput = new ByteArrayOutputStream();
54+
55+
var ctor = JsonRpcClient.class.getDeclaredConstructor(
56+
InputStream.class, java.io.OutputStream.class, Socket.class, Process.class);
57+
ctor.setAccessible(true);
58+
return (JsonRpcClient) ctor.newInstance(blockingInput, sinkOutput, null, null);
59+
}
60+
61+
/**
62+
* After {@code close()}, the future returned by {@code sendAndWait} must NOT be
63+
* completed by a stale timeout.
64+
* <p>
65+
* Current buggy behavior: the per-call scheduler is not cancelled by
66+
* {@code close()}, so its 2-second timeout fires during the 5-second
67+
* {@code session.destroy} RPC wait, completing the future with
68+
* {@code TimeoutException}.
69+
* <p>
70+
* Expected behavior after fix: {@code close()} cancels pending timeouts before
71+
* the blocking RPC call, so the future remains incomplete.
72+
*/
73+
@Test
74+
void testTimeoutDoesNotFireAfterSessionClose() throws Exception {
75+
JsonRpcClient rpc = createHangingRpcClient();
76+
try {
77+
CopilotSession session = new CopilotSession("test-timeout-id", rpc);
78+
79+
CompletableFuture<AssistantMessageEvent> result = session.sendAndWait(
80+
new MessageOptions().setPrompt("hello"), 2000);
81+
82+
assertFalse(result.isDone(), "Future should be pending before timeout fires");
83+
84+
// close() blocks up to 5s on session.destroy RPC. The 2s timeout
85+
// fires during that window with the current per-call scheduler.
86+
session.close();
87+
88+
assertFalse(result.isDone(),
89+
"Future should not be completed by a timeout after session is closed. "
90+
+ "The per-call ScheduledExecutorService leaked a TimeoutException.");
91+
} finally {
92+
rpc.close();
93+
}
94+
}
95+
96+
/**
97+
* A shared scheduler should reuse a single thread across multiple
98+
* {@code sendAndWait} calls, rather than spawning a new OS thread per call.
99+
* <p>
100+
* Current buggy behavior: two calls create two {@code sendAndWait-timeout}
101+
* threads.
102+
* <p>
103+
* Expected behavior after fix: two calls still use only one scheduler thread.
104+
*/
105+
@Test
106+
void testSendAndWaitReusesTimeoutThread() throws Exception {
107+
JsonRpcClient rpc = createHangingRpcClient();
108+
try {
109+
CopilotSession session = new CopilotSession("test-thread-count-id", rpc);
110+
111+
long baselineCount = countTimeoutThreads();
112+
113+
CompletableFuture<AssistantMessageEvent> result1 = session.sendAndWait(
114+
new MessageOptions().setPrompt("hello1"), 30000);
115+
116+
Thread.sleep(100);
117+
long afterFirst = countTimeoutThreads();
118+
assertTrue(afterFirst >= baselineCount + 1,
119+
"Expected at least one new sendAndWait-timeout thread after first call. "
120+
+ "Baseline: " + baselineCount + ", after: " + afterFirst);
121+
122+
CompletableFuture<AssistantMessageEvent> result2 = session.sendAndWait(
123+
new MessageOptions().setPrompt("hello2"), 30000);
124+
125+
Thread.sleep(100);
126+
long afterSecond = countTimeoutThreads();
127+
assertTrue(afterSecond == afterFirst,
128+
"Shared scheduler should reuse the same thread — no new threads after second call. "
129+
+ "After first: " + afterFirst + ", after second: " + afterSecond);
130+
131+
result1.cancel(true);
132+
result2.cancel(true);
133+
session.close();
134+
} finally {
135+
rpc.close();
136+
}
137+
}
138+
139+
/**
140+
* Counts the number of live threads whose name contains "sendAndWait-timeout".
141+
*/
142+
private long countTimeoutThreads() {
143+
return Thread.getAllStackTraces().keySet().stream()
144+
.filter(t -> t.getName().contains("sendAndWait-timeout"))
145+
.filter(Thread::isAlive)
146+
.count();
147+
}
148+
}

0 commit comments

Comments
 (0)