From 8f98ce82380f8a8bdcb67fd1104ef4d704342ea2 Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Mon, 6 Apr 2026 08:27:32 -0500 Subject: [PATCH 01/12] fix: abort stuck SDK tool state on force-complete; add reflect loop worker timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs observed when worker-2 got stuck on a bash tool call (wc -l that hung) during a Reflect loop run, causing the orchestrator to be idle for 33+ minutes until the server's session idle-timeout killed it: **Bug 1 — ForceCompleteProcessingAsync doesn't clear SDK pending tool state** When the pre-dispatch 150s timeout fires on a worker with an active tool call, ForceCompleteProcessingAsync resets PolyPilot's tracking state but never calls AbortAsync on the CLI session. The CLI is still blocked waiting for tool results that will never arrive. The next SendAsync succeeds at the transport level but the CLI silently drops it — the message is queued behind the stuck tool and never processed. Fix: capture hadActiveTool before clearing ActiveToolCallCount, then call session.AbortAsync() after the UI-thread cleanup when true. This is the same approach the RESUME-QUIESCE path in CopilotService.Persistence.cs uses. AbortAsync failure is non-fatal and logged. **Bug 2 — Reflect loop has no collection timeout on worker wait** SendViaOrchestratorAsync (non-reflect) wraps Task.WhenAll(workerTasks) with OrchestratorCollectionTimeout (15 min) and force-completes stuck workers. SendViaOrchestratorReflectAsync had no equivalent — a single stuck worker blocked the entire loop indefinitely, starving the orchestrator's CLI session of activity until the server killed it (~35 min idle timeout), leaving the reflect loop hanging with no recovery for hours. Fix: apply the same bounded-wait + force-complete pattern to the reflect loop's worker dispatch. Mirrors the non-reflect path exactly. Co-Authored-By: Claude Sonnet 4.6 --- .../Services/CopilotService.Organization.cs | 77 ++++++++++++++++++- 1 file changed, 75 insertions(+), 2 deletions(-) diff --git a/PolyPilot/Services/CopilotService.Organization.cs b/PolyPilot/Services/CopilotService.Organization.cs index 6c7991204..c32ad0ff6 100644 --- a/PolyPilot/Services/CopilotService.Organization.cs +++ b/PolyPilot/Services/CopilotService.Organization.cs @@ -2195,6 +2195,14 @@ private async Task ForceCompleteProcessingAsync(string sessionName, SessionState CancelTurnEndFallback(state); CancelToolHealthCheck(state); + // Capture whether a tool call is in-flight BEFORE we reset ActiveToolCallCount below. + // If true we must abort the SDK session after the UI-thread cleanup so the CLI clears + // its pending tool expectations. Without this, the next SendAsync is silently dropped — + // the SDK queues the message but the CLI is blocked waiting for a tool result that will + // never arrive (e.g., a bash tool whose process was killed externally). + // This mirrors the RESUME-QUIESCE abort in CopilotService.Persistence.cs. + var hadActiveTool = Volatile.Read(ref state.ActiveToolCallCount) > 0; + var tcs = new TaskCompletionSource(); InvokeOnUI(() => { @@ -2243,6 +2251,28 @@ private async Task ForceCompleteProcessingAsync(string sessionName, SessionState catch (Exception ex) { tcs.TrySetException(ex); } }); try { await tcs.Task; } catch { } + + // If a tool call was in-flight when we force-completed, abort the SDK session to clear + // the CLI's pending tool expectations. Without this, the next SendAsync succeeds at the + // transport level but the CLI never processes it — it's stuck waiting for tool results + // that will never arrive. Non-fatal: if AbortAsync fails the session will still accept + // new messages on the next reconnect (lazy-resume clears SDK state independently). + if (hadActiveTool) + { + var session = state.Session; + if (session != null) + { + try + { + await session.AbortAsync(CancellationToken.None); + Debug($"[DISPATCH] ForceCompleteProcessing '{sessionName}': abort sent to clear pending tool state"); + } + catch (Exception abortEx) + { + Debug($"[DISPATCH] ForceCompleteProcessing '{sessionName}': abort failed (non-fatal): {abortEx.Message}"); + } + } + } } /// @@ -3837,8 +3867,51 @@ private async Task SendViaOrchestratorReflectAsync(string groupId, List InvokeOnUI(() => OnOrchestratorPhaseChanged?.Invoke(groupId, OrchestratorPhase.WaitingForWorkers, iterDetail)); - var workerTasks = assignments.Select(a => ExecuteWorkerAsync(a.WorkerName, a.Task, prompt, ct)); - var results = await Task.WhenAll(workerTasks); + var workerTasks = assignments.Select(a => ExecuteWorkerAsync(a.WorkerName, a.Task, prompt, ct)).ToList(); + + // Bounded wait — mirrors the non-reflect path (SendViaOrchestratorAsync). + // Without a timeout a stuck worker (e.g., a bash tool whose process hangs) + // blocks the reflect loop indefinitely, starves the orchestrator's CLI session + // of activity until the server's ~35-min idle timer kills it, and leaves + // everything stuck with no recovery path for hours. + var allDone = Task.WhenAll(workerTasks); + // Use CancellationToken.None for the delay — if the caller's token is cancelled, + // Task.WhenAny returns the cancelled allDone and the OCE propagates cleanly. + var timeoutTask = Task.Delay(OrchestratorCollectionTimeout, CancellationToken.None); + WorkerResult[] results; + if (await Task.WhenAny(allDone, timeoutTask) != allDone) + { + Debug($"[DISPATCH] Reflect iteration {reflectState.CurrentIteration}: collection timeout ({OrchestratorCollectionTimeout.TotalMinutes}m) — force-completing stuck workers"); + foreach (var a in assignments) + { + if (_sessions.TryGetValue(a.WorkerName, out var ws)) + { + if (ws.Info.IsProcessing) + { + Debug($"[DISPATCH] Reflect: force-completing stuck worker '{a.WorkerName}'"); + AddOrchestratorSystemMessage(a.WorkerName, + "⚠️ Worker timed out — orchestrator is proceeding with partial results."); + await ForceCompleteProcessingAsync(a.WorkerName, ws, $"reflect collection timeout ({OrchestratorCollectionTimeout.TotalMinutes}m)"); + } + else if (ws.ResponseCompletion?.Task.IsCompleted == false) + { + Debug($"[DISPATCH] Reflect: resolving TCS for non-processing worker '{a.WorkerName}'"); + ws.ResponseCompletion?.TrySetResult("(worker timed out — never started processing)"); + } + } + } + var partialResults = new List(); + foreach (var t in workerTasks) + { + try { partialResults.Add(await t); } + catch (Exception ex) { partialResults.Add(new WorkerResult("unknown", null, false, $"Error: {ex.Message}", TimeSpan.Zero)); } + } + results = partialResults.ToArray(); + } + else + { + results = await allDone; + } // Track both attempted and successful workers across all iterations foreach (var a in assignments) From dbbedfb8ca3947adf085442ae7a32da812050305 Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Mon, 6 Apr 2026 09:46:31 -0500 Subject: [PATCH 02/12] fix: drop superseded sessions on group move instead of creating (previous) duplicates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When LoadOrganization's scattered team reconstruction moves sessions to a new multi-agent group, the sessions get recreated with new SessionIds. On the next save+restore, MergeSessionEntries saw a name collision between the old persisted entry (old GroupId) and the new active entry (new GroupId) and renamed the old one to '(previous)' — creating confusing phantom workers. Fix: when a name collision occurs and the persisted entry has a DIFFERENT GroupId than the active entry, silently drop the persisted entry. The session was moved between groups and its history was already migrated via CopyEventsToNewSession. The '(previous)' rename is preserved for same-group collisions (reconnect, lazy-resume) where the old entry may have unique history. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot.Tests/SessionPersistenceTests.cs | 70 +++++++++++++++++++ .../Services/CopilotService.Persistence.cs | 15 ++++ 2 files changed, 85 insertions(+) diff --git a/PolyPilot.Tests/SessionPersistenceTests.cs b/PolyPilot.Tests/SessionPersistenceTests.cs index 991942ed8..33eb5280c 100644 --- a/PolyPilot.Tests/SessionPersistenceTests.cs +++ b/PolyPilot.Tests/SessionPersistenceTests.cs @@ -1671,4 +1671,74 @@ public void Merge_NameCollision_MissingDirStillExcluded() Assert.Single(result); Assert.Equal("new-id", result[0].SessionId); } + + [Fact] + public void Merge_NameCollision_DifferentGroupId_DropsPersistedEntry() + { + // When a session is moved to a new group during scattered team reconstruction + // and then recreated with a new SessionId, the persisted entry from the old group + // should be silently dropped — not renamed to "(previous)". + var active = new List + { + new() { SessionId = "new-id", DisplayName = "Copilot Cli-worker-1", Model = "m", + WorkingDirectory = "/w", GroupId = "new-group-id" } + }; + var persisted = new List + { + new() { SessionId = "old-id", DisplayName = "Copilot Cli-worker-1", Model = "m", + WorkingDirectory = "/w", GroupId = "old-group-id" } + }; + + var result = CopilotService.MergeSessionEntries(active, persisted, new HashSet(), new HashSet(), _ => true); + + // Only the active entry should remain — no "(previous)" duplicate + Assert.Single(result); + Assert.Equal("new-id", result[0].SessionId); + Assert.Equal("Copilot Cli-worker-1", result[0].DisplayName); + } + + [Fact] + public void Merge_NameCollision_SameGroupId_StillCreatesPrevious() + { + // When the collision happens within the same group (e.g., reconnect replaced + // the session), the old entry should still be preserved as "(previous)". + var active = new List + { + new() { SessionId = "new-id", DisplayName = "MyWorker", Model = "m", + WorkingDirectory = "/w", GroupId = "same-group" } + }; + var persisted = new List + { + new() { SessionId = "old-id", DisplayName = "MyWorker", Model = "m", + WorkingDirectory = "/w", GroupId = "same-group" } + }; + + var result = CopilotService.MergeSessionEntries(active, persisted, new HashSet(), new HashSet(), _ => true); + + Assert.Equal(2, result.Count); + Assert.Equal("MyWorker", result[0].DisplayName); + Assert.Equal("MyWorker (previous)", result[1].DisplayName); + } + + [Fact] + public void Merge_NameCollision_NullGroupIds_StillCreatesPrevious() + { + // When GroupId is null on either side (legacy entries), fall back to + // the existing "(previous)" behavior — don't silently drop. + var active = new List + { + new() { SessionId = "new-id", DisplayName = "MyWorker", Model = "m", + WorkingDirectory = "/w", GroupId = null } + }; + var persisted = new List + { + new() { SessionId = "old-id", DisplayName = "MyWorker", Model = "m", + WorkingDirectory = "/w", GroupId = null } + }; + + var result = CopilotService.MergeSessionEntries(active, persisted, new HashSet(), new HashSet(), _ => true); + + Assert.Equal(2, result.Count); + Assert.Equal("MyWorker (previous)", result[1].DisplayName); + } } diff --git a/PolyPilot/Services/CopilotService.Persistence.cs b/PolyPilot/Services/CopilotService.Persistence.cs index 5eff2f5f0..c320c1a66 100644 --- a/PolyPilot/Services/CopilotService.Persistence.cs +++ b/PolyPilot/Services/CopilotService.Persistence.cs @@ -190,6 +190,21 @@ internal static List MergeSessionEntries( var entryToAdd = existing; if (activeNames.Contains(existing.DisplayName)) { + // If the active session moved to a different group (e.g., scattered team + // reconstruction created a new multi-agent group and the session was recreated + // there), the persisted entry is the obsolete predecessor — its history was + // already recovered via CopyEventsToNewSession. Drop it silently instead of + // creating a confusing "(previous)" duplicate. + var activeCounterpart = active.FirstOrDefault(a => + string.Equals(a.DisplayName, existing.DisplayName, StringComparison.OrdinalIgnoreCase)); + if (activeCounterpart != null + && !string.IsNullOrEmpty(existing.GroupId) + && !string.IsNullOrEmpty(activeCounterpart.GroupId) + && !string.Equals(existing.GroupId, activeCounterpart.GroupId, StringComparison.OrdinalIgnoreCase)) + { + continue; // Superseded by group move — history already migrated + } + entryToAdd = new ActiveSessionEntry { SessionId = existing.SessionId, From 07e6a6f1f8f495161bd15b97ab05bdcd0427b3f4 Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Mon, 6 Apr 2026 09:55:29 -0500 Subject: [PATCH 03/12] fix: apply zombie timeout to stale shells in IDLE-DEFER, not just agents Stale/orphaned shells reported by the CLI in session.idle backgroundTasks were never expired, causing sessions to stay stuck in IDLE-DEFER until the user manually hit Stop. This happened when the CLI reported shells as 'active' that were actually dead or detached. Apply the same SubagentZombieTimeoutMinutes (20 min) expiry to shells that already applies to agents. After the threshold, both are treated as zombies and the session is allowed to complete. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot.Tests/ZombieSubagentExpiryTests.cs | 17 ++++++----- PolyPilot/Services/CopilotService.Events.cs | 31 ++++++++++++-------- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/PolyPilot.Tests/ZombieSubagentExpiryTests.cs b/PolyPilot.Tests/ZombieSubagentExpiryTests.cs index bbe065671..b82896f9d 100644 --- a/PolyPilot.Tests/ZombieSubagentExpiryTests.cs +++ b/PolyPilot.Tests/ZombieSubagentExpiryTests.cs @@ -122,13 +122,13 @@ public void ZombieThresholdExactlyMet_ReturnsFalse() idle, TicksAgo(CopilotService.SubagentZombieTimeoutMinutes))); } - // --- Shells are never expired --- + // --- Shells are expired after zombie timeout (same as agents) --- [Fact] - public void ZombieThresholdExceeded_WithShells_ReturnsTrue() + public void ZombieThresholdExceeded_WithShells_ReturnsFalse() { - // Even if all background agents are expired, an active shell keeps IDLE-DEFER alive. - // Shells are managed at the OS level — PolyPilot never force-expires them. + // When both agents and shells are present but the zombie threshold is exceeded, + // everything is treated as expired — stale shells should not block completion. var idle = new SessionIdleEvent { Data = new SessionIdleData @@ -152,15 +152,16 @@ public void ZombieThresholdExceeded_WithShells_ReturnsTrue() } } }; - Assert.True(CopilotService.HasActiveBackgroundTasks(idle, TicksAgo(30))); + Assert.False(CopilotService.HasActiveBackgroundTasks(idle, TicksAgo(30))); } [Fact] - public void ZombieThresholdExceeded_ShellsOnly_ReturnsTrue() + public void ZombieThresholdExceeded_ShellsOnly_ReturnsFalse() { - // Shells alone always block completion — they are never zombie-expired. + // Shells alone should also be expired after the zombie timeout — + // stale/orphaned shells reported by the CLI should not block completion forever. var idle = MakeIdleWithShells(); - Assert.True(CopilotService.HasActiveBackgroundTasks(idle, TicksAgo(60))); + Assert.False(CopilotService.HasActiveBackgroundTasks(idle, TicksAgo(60))); } [Fact] diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index a3f232e52..e98314294 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -704,11 +704,14 @@ void Invoke(Action action) var hasActiveTasks = HasActiveBackgroundTasks(idle, deferTicks); // Log zombie expiry here where Debug() is available (HasActiveBackgroundTasks is static) - if (!hasActiveTasks && deferTicks != 0 && (idle.Data?.BackgroundTasks?.Agents?.Length ?? 0) > 0) + var zombieBt = idle.Data?.BackgroundTasks; + var zombieAgentCount = zombieBt?.Agents?.Length ?? 0; + var zombieShellCount = zombieBt?.Shells?.Length ?? 0; + if (!hasActiveTasks && deferTicks != 0 && (zombieAgentCount > 0 || zombieShellCount > 0)) { var expiredMinutes = TimeSpan.FromTicks(DateTime.UtcNow.Ticks - deferTicks).TotalMinutes; - Debug($"[IDLE-DEFER-ZOMBIE] '{sessionName}' {idle.Data!.BackgroundTasks!.Agents!.Length} " + - $"background agent(s) expired after {expiredMinutes:F0}min " + + Debug($"[IDLE-DEFER-ZOMBIE] '{sessionName}' {zombieAgentCount} agent(s) + {zombieShellCount} shell(s) " + + $"expired after {expiredMinutes:F0}min " + $"(threshold={SubagentZombieTimeoutMinutes}min) — allowing session to complete"); } @@ -2230,7 +2233,8 @@ internal static bool IsMcpError(string? text) /// before all background agents are treated as zombies and the session is allowed to complete. /// The Copilot CLI has no per-agent timeout, so a crashed or orphaned subagent blocks /// IDLE-DEFER indefinitely. After this threshold PolyPilot expires the stale block. - /// Shells are never expired — they are managed at the OS level. + /// The same timeout applies to shells — stale/detached shells that the CLI never + /// reports as completed should not block the session indefinitely. /// internal const int SubagentZombieTimeoutMinutes = 20; @@ -2239,12 +2243,11 @@ internal static bool IsMcpError(string? text) /// When background tasks are active, session.idle means "foreground quiesced, background /// still running" — NOT true completion. /// - /// When is non-zero, background agents are treated - /// as zombies if the session has been in IDLE-DEFER longer than - /// . This allows the session to complete even if - /// the CLI never fires SubagentCompleted for a crashed or orphaned subagent. - /// The caller is responsible for logging the zombie expiry via Debug(). - /// Shells are never expired — their lifecycle is managed by the OS. + /// When is non-zero, background tasks (both + /// agents AND shells) are treated as zombies if the session has been in IDLE-DEFER longer + /// than . This allows the session to complete + /// even if the CLI never fires SubagentCompleted/ShellCompleted for crashed or orphaned + /// background tasks. The caller is responsible for logging the zombie expiry via Debug(). /// internal static bool HasActiveBackgroundTasks( SessionIdleEvent idle, @@ -2254,15 +2257,19 @@ internal static bool HasActiveBackgroundTasks( if (bt == null) return false; bool hasAgents = bt.Agents is { Length: > 0 }; + bool hasShells = bt.Shells is { Length: > 0 }; - if (hasAgents && idleDeferStartedAtTicks != 0) + if ((hasAgents || hasShells) && idleDeferStartedAtTicks != 0) { var elapsed = TimeSpan.FromTicks(DateTime.UtcNow.Ticks - idleDeferStartedAtTicks); if (elapsed.TotalMinutes >= SubagentZombieTimeoutMinutes) + { hasAgents = false; + hasShells = false; + } } - return hasAgents || (bt.Shells is { Length: > 0 }); + return hasAgents || hasShells; } private void StartProcessingWatchdog(SessionState state, string sessionName) From 67e69a8eaf9179e6c37ae0c985d01eadc412fbfa Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Mon, 6 Apr 2026 10:02:20 -0500 Subject: [PATCH 04/12] fix: add dedup guard in CompleteResponse to prevent duplicate messages FlushCurrentResponse (called on TurnEnd and IDLE-DEFER) already has a dedup guard, but CompleteResponse did not. When IDLE-DEFER flushes text to History and then the session receives the same content again (SDK replay after reconnect, IDLE-DEFER re-arm), CompleteResponse would add an identical second message. Now it checks against the last assistant message before adding, matching the same pattern used in FlushCurrentResponse. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot/Services/CopilotService.Events.cs | 37 ++++++++++++++------- 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index e98314294..73c221181 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -1344,19 +1344,32 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul var response = state.CurrentResponse.ToString(); if (!string.IsNullOrWhiteSpace(response)) { - var msg = new ChatMessage("assistant", response, DateTime.Now) { Model = state.Info.Model }; - state.Info.History.Add(msg); - state.Info.MessageCount = state.Info.History.Count; - // If user is viewing this session, keep it read - if (state.Info.Name == _activeSessionName) - state.Info.LastReadMessageCount = state.Info.History.Count; + // Dedup guard: FlushCurrentResponse (called on TurnEnd and IDLE-DEFER) may have + // already added this exact text to History. If the SDK replayed deltas after a + // flush (e.g., IDLE-DEFER re-arm, reconnect replay), CurrentResponse can accumulate + // the same content again. Without this check, the same message appears twice. + var lastAssistant = state.Info.History.LastOrDefault(m => + m.Role == "assistant" && m.MessageType != ChatMessageType.ToolCall); + if (lastAssistant?.Content != response) + { + var msg = new ChatMessage("assistant", response, DateTime.Now) { Model = state.Info.Model }; + state.Info.History.Add(msg); + state.Info.MessageCount = state.Info.History.Count; + // If user is viewing this session, keep it read + if (state.Info.Name == _activeSessionName) + state.Info.LastReadMessageCount = state.Info.History.Count; - // Write-through to DB - if (!string.IsNullOrEmpty(state.Info.SessionId)) - SafeFireAndForget(_chatDb.AddMessageAsync(state.Info.SessionId, msg), "AddMessageAsync"); - - // Track code suggestions from final response segment - _usageStats?.TrackCodeSuggestion(response); + // Write-through to DB + if (!string.IsNullOrEmpty(state.Info.SessionId)) + SafeFireAndForget(_chatDb.AddMessageAsync(state.Info.SessionId, msg), "AddMessageAsync"); + + // Track code suggestions from final response segment + _usageStats?.TrackCodeSuggestion(response); + } + else + { + Debug($"[DEDUP] CompleteResponse skipped duplicate content ({response.Length} chars) for '{state.Info.Name}'"); + } } // Build full turn response for TCS: include text flushed mid-turn (e.g., on TurnEnd) // plus any remaining text in CurrentResponse. Without this, orchestrator dispatch From 1aaca135a20f876ad76e1140fd3d64b67b2b2d6d Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Mon, 6 Apr 2026 10:24:16 -0500 Subject: [PATCH 05/12] fix: stop resetting HasReceivedDeltasThisTurn in FlushCurrentResponse MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FlushCurrentResponse was resetting HasReceivedDeltasThisTurn=false after each mid-turn flush. This allowed the next AssistantMessageEvent (which contains the full turn content) to slip through the delta guard and re-append content already flushed to History — producing visible duplicate messages in the chat. The flag should only be reset at turn boundaries (AssistantTurnStartEvent), not on every flush. Removing the mid-turn reset prevents the full-message event from re-adding already-flushed content. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot/Services/CopilotService.Events.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index 73c221181..f1c4b83a2 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -1229,7 +1229,6 @@ private void FlushCurrentResponse(SessionState state) { Debug($"[DEDUP] FlushCurrentResponse skipped duplicate content ({text.Length} chars) for session '{state.Info.Name}'"); state.CurrentResponse.Clear(); - state.HasReceivedDeltasThisTurn = false; return; } @@ -1251,7 +1250,10 @@ private void FlushCurrentResponse(SessionState state) state.FlushedResponse.Append(text); state.CurrentResponse.Clear(); - state.HasReceivedDeltasThisTurn = false; + // NOTE: Do NOT reset HasReceivedDeltasThisTurn here — that flag gates whether + // AssistantMessageEvent (full content) is accepted. Resetting it mid-turn causes + // the next full-message event to re-add content already flushed to History. + // HasReceivedDeltasThisTurn is only reset at turn boundaries (AssistantTurnStartEvent). // Early dispatch: if the orchestrator wrote @worker blocks in an intermediate sub-turn, // resolve the TCS now so ParseTaskAssignments can run immediately. Without this, the From c90f091e3b490d0c7b55afe09f2b3d26baa66e42 Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Mon, 6 Apr 2026 10:55:49 -0500 Subject: [PATCH 06/12] fix: suppress stale streaming duplicates after flush Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot.Tests/ChatExperienceSafetyTests.cs | 16 +++++++++++++++ PolyPilot/Components/ChatMessageList.razor | 21 +++++++++++++++++++- PolyPilot/Services/CopilotService.Events.cs | 1 + 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/PolyPilot.Tests/ChatExperienceSafetyTests.cs b/PolyPilot.Tests/ChatExperienceSafetyTests.cs index b4f84fab0..e8cd2bcb0 100644 --- a/PolyPilot.Tests/ChatExperienceSafetyTests.cs +++ b/PolyPilot.Tests/ChatExperienceSafetyTests.cs @@ -846,6 +846,22 @@ public void CompleteResponse_Source_ClearsSendingFlag() Assert.Contains("SendingFlag", afterCR); } + /// + /// The UI must suppress the live streaming bubble once that exact assistant text has + /// already been flushed into History. Otherwise IDLE-DEFER sessions render the same + /// answer twice until the next prompt clears the streaming cache. + /// + [Fact] + public void ChatMessageList_Source_SuppressesStreamingDuplicateAfterFlush() + { + var source = File.ReadAllText( + Path.Combine(GetRepoRoot(), "PolyPilot", "Components", "ChatMessageList.razor")); + + Assert.Contains("private bool ShouldShowStreamingContent()", source); + Assert.Contains("NormalizeStreamingText(lastAssistant?.Content)", source); + Assert.Contains("NormalizeStreamingText(StreamingContent)", source); + } + /// /// The "Session not found" reconnect path must include McpServers and SkillDirectories /// in the fresh session config (PR #330 regression guard). diff --git a/PolyPilot/Components/ChatMessageList.razor b/PolyPilot/Components/ChatMessageList.razor index c63c1a8ad..ec4581d92 100644 --- a/PolyPilot/Components/ChatMessageList.razor +++ b/PolyPilot/Components/ChatMessageList.razor @@ -68,7 +68,7 @@ } @* Streaming content — show while actively processing, or while remote streaming guard is active *@ - @if (!string.IsNullOrEmpty(StreamingContent) && (IsProcessing || IsRemoteStreaming)) + @if (ShouldShowStreamingContent()) {
@if (!Compact) @@ -193,6 +193,25 @@ return ToolActivities.Where(a => !historyCallIds.Contains(a.CallId)); } + private bool ShouldShowStreamingContent() + { + if (string.IsNullOrWhiteSpace(StreamingContent) || (!IsProcessing && !IsRemoteStreaming)) + return false; + + // TurnEnd / IDLE-DEFER can flush the current assistant text into History while the + // session stays "processing" for background tasks. Suppress the live bubble once the + // same text is already present in History to avoid showing it twice. + var lastAssistant = Messages.LastOrDefault(m => + m.Role == "assistant" && + m.MessageType != ChatMessageType.ToolCall && + m.MessageType != ChatMessageType.Reasoning && + m.MessageType != ChatMessageType.Image); + + return !string.Equals(NormalizeStreamingText(lastAssistant?.Content), NormalizeStreamingText(StreamingContent), StringComparison.Ordinal); + } + + private static string NormalizeStreamingText(string? text) => string.IsNullOrWhiteSpace(text) ? "" : text.Trim(); + private static readonly Dictionary _imageCache = new(); // LRU markdown cache: uses content string as key (no hash collisions), with eviction at 1000 entries private static readonly Dictionary _markdownCache = new(); diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index f1c4b83a2..bd0ff50de 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -1333,6 +1333,7 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul CancelTurnEndFallback(state); CancelToolHealthCheck(state); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); + Interlocked.Exchange(ref state.SendingFlag, 0); state.HasUsedToolsThisTurn = false; state.HasDeferredIdle = false; Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); From dc097f0d176244facc9a56aa23e76ae97863cd3c Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Mon, 6 Apr 2026 11:14:50 -0500 Subject: [PATCH 07/12] =?UTF-8?q?fix:=20strengthen=20worker=20validation?= =?UTF-8?q?=20prompts=20=E2=80=94=20require=20test=20authorship=20and=20ru?= =?UTF-8?q?ntime=20evidence?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implementer Step 3: explicitly require writing/updating tests for new behavior (not just running existing ones), and verify observable runtime output directly rather than trusting tests alone. Challenger Step 4: remove "when possible" hedge on runtime validation, add explicit prohibition on approving UI features based on unit tests alone, and require workers to state when they cannot verify something at runtime rather than silently omitting the gap. Co-Authored-By: Claude Sonnet 4.6 --- PolyPilot/Models/ModelCapabilities.cs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/PolyPilot/Models/ModelCapabilities.cs b/PolyPilot/Models/ModelCapabilities.cs index c483116cf..5dcc66474 100644 --- a/PolyPilot/Models/ModelCapabilities.cs +++ b/PolyPilot/Models/ModelCapabilities.cs @@ -363,7 +363,9 @@ public record GroupPreset(string Name, string Description, string Emoji, MultiAg - Commit your changes with descriptive messages as you complete sections. ## Step 3: Validate everything -- Run the build and tests to verify correctness. +- The build must pass and existing tests must not regress. +- Write or update tests that directly exercise the new behavior — do not rely solely on pre-existing tests that may not cover what you just added. +- If the change has observable runtime output (UI rendering, CLI output, API responses, etc.), verify that output directly — do not assume passing tests are sufficient evidence that the behavior is correct. - If the task involves a runnable app (MAUI, web, console, etc.), launch it and verify it works at runtime when a runtime environment is available. Building alone is NOT sufficient — many bugs (DI failures, runtime crashes, locale issues, missing UI) only surface when you actually run the app. - If the prompt specifies validation steps (e.g., "validate with MauiDevFlow", "verify the API works", "test in the browser"), you MUST perform those exact validation steps. Do not skip them. - Use any available tools and skills to validate. @@ -397,11 +399,13 @@ You are the Challenger. Your job is to find real problems in the Implementer's w - This is the most important step — the Implementer may have built something that compiles but doesn't cover all requirements. ## Step 4: Runtime Validation -- Run the build and tests yourself to verify correctness. -- If the task involves a runnable app, launch it and verify it works at runtime when possible. Many bugs only surface when you actually run the app. -- If the prompt specifies validation steps (e.g., "validate with MauiDevFlow"), perform those same validation steps yourself. +- Do not approve based on trust. Run the build and tests yourself — independently, not just by reading the Implementer's report. +- For changes with observable runtime behavior (UI rendering, CLI output, API responses), verify that behavior at runtime. Do not approve a UI feature because unit tests pass — verify it actually renders correctly. +- If the task involves a runnable app, launch it and verify it works. Many bugs only surface at runtime. +- If the prompt specifies validation steps (e.g., "validate with MauiDevFlow"), perform those same steps yourself. - Use any available tools and skills for runtime verification. - For every validation claim, cite the specific command you ran and its output as evidence (e.g., "ran `dotnet test` — 23 passed, 0 failed"). Do NOT claim something works without showing proof. +- If you cannot verify something at runtime (no device, no environment), say so explicitly — do not approve blindly or omit the gap. ## Verdict - If EVERY checklist item is implemented, correct, and validated, say so clearly and emit [[GROUP_REFLECT_COMPLETE]]. From 999ebd7334c05ceb4c52818bff1bed6a56c01bbf Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Mon, 6 Apr 2026 11:29:06 -0500 Subject: [PATCH 08/12] fix: preserve live drafts during UI rerenders Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot.Tests/ChatExperienceSafetyTests.cs | 19 ++++++++++++++++ PolyPilot/Components/Pages/Dashboard.razor | 5 ++++ PolyPilot/wwwroot/index.html | 24 +++++++++++++++++++- 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/PolyPilot.Tests/ChatExperienceSafetyTests.cs b/PolyPilot.Tests/ChatExperienceSafetyTests.cs index e8cd2bcb0..d10c713c4 100644 --- a/PolyPilot.Tests/ChatExperienceSafetyTests.cs +++ b/PolyPilot.Tests/ChatExperienceSafetyTests.cs @@ -862,6 +862,25 @@ public void ChatMessageList_Source_SuppressesStreamingDuplicateAfterFlush() Assert.Contains("NormalizeStreamingText(StreamingContent)", source); } + /// + /// Draft restore must not clobber newer user typing with a stale cached draft during + /// normal render cycles. The browser keeps a live draft map and restore logic skips + /// overwriting text that diverged from the last restored value. + /// + [Fact] + public void DraftRestore_Source_PreservesLiveTyping() + { + var indexHtml = File.ReadAllText( + Path.Combine(GetRepoRoot(), "PolyPilot", "wwwroot", "index.html")); + var dashboard = File.ReadAllText( + Path.Combine(GetRepoRoot(), "PolyPilot", "Components", "Pages", "Dashboard.razor")); + + Assert.Contains("window.__liveDrafts", dashboard); + Assert.Contains("hasDivergedUserText", indexHtml); + Assert.Contains("current !== desired && current !== lastRestored", indexHtml); + Assert.Contains("delete window.__liveDrafts[elementId]", indexHtml); + } + /// /// The "Session not found" reconnect path must include McpServers and SkillDirectories /// in the fresh session config (PR #330 regression guard). diff --git a/PolyPilot/Components/Pages/Dashboard.razor b/PolyPilot/Components/Pages/Dashboard.razor index b9c5f1498..1aa86f298 100644 --- a/PolyPilot/Components/Pages/Dashboard.razor +++ b/PolyPilot/Components/Pages/Dashboard.razor @@ -1168,6 +1168,11 @@ // overwrite their changes with a history entry. document.addEventListener('input', function(e) { if (e.target.matches && e.target.matches('.card-input input, .card-input textarea, .input-row textarea')) { + if (!window.__liveDrafts) window.__liveDrafts = {}; + if (e.target.id) { + if (e.target.value) window.__liveDrafts[e.target.id] = e.target.value; + else delete window.__liveDrafts[e.target.id]; + } var card = e.target.closest('[data-session]'); var sn = card ? card.dataset.session : ''; if (sn && window.__histNavActive) window.__histNavActive[sn] = false; diff --git a/PolyPilot/wwwroot/index.html b/PolyPilot/wwwroot/index.html index 8f786deb5..9d1d2a86e 100644 --- a/PolyPilot/wwwroot/index.html +++ b/PolyPilot/wwwroot/index.html @@ -375,6 +375,8 @@ var el = document.getElementById(elementId); if (el) { el.value = ''; + if (window.__liveDrafts) delete window.__liveDrafts[elementId]; + el.__lastRestoredDraft = ''; el.style.height = 'auto'; // Remove slash command ghost overlay var ghost = document.getElementById(elementId + '--slash-ghost'); @@ -761,9 +763,29 @@ // Single rAF follow-up is sufficient — content is rendered synchronously by Blazor requestAnimationFrame(doScroll); var drafts = JSON.parse(draftsJson || '{}'); + if (window.__liveDrafts) { + for (var liveId in window.__liveDrafts) { + if (Object.prototype.hasOwnProperty.call(window.__liveDrafts, liveId)) { + drafts[liveId] = window.__liveDrafts[liveId]; + } + } + } for (var id in drafts) { var el = document.getElementById(id); - if (el) el.value = drafts[id]; + if (!el) continue; + var desired = drafts[id] || ''; + var current = (typeof el.value === 'string') ? el.value : ''; + var lastRestored = (typeof el.__lastRestoredDraft === 'string') ? el.__lastRestoredDraft : ''; + var hasDivergedUserText = current.length > 0 && current !== desired && current !== lastRestored; + if (hasDivergedUserText) continue; + if (current !== desired) { + el.value = desired; + if (el.tagName === 'TEXTAREA' && el.closest('.input-row')) { + el.style.height = 'auto'; + el.style.height = Math.min(el.scrollHeight, 150) + 'px'; + } + } + el.__lastRestoredDraft = desired; } if (focusId) { var fel = document.getElementById(focusId); From 9e618efc3baa1c2cdd3ec0f78e2766333d227805 Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Mon, 6 Apr 2026 12:04:59 -0500 Subject: [PATCH 09/12] fix: tighten replay dedup and shell liveness Scope assistant replay dedup to the current turn so legitimate repeated replies still persist, and give background shells a longer zombie window with regression coverage. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot.Tests/ChatExperienceSafetyTests.cs | 94 +++++++++++++++++++- PolyPilot.Tests/ZombieSubagentExpiryTests.cs | 27 ++++-- PolyPilot/Services/CopilotService.Events.cs | 84 ++++++++++------- 3 files changed, 161 insertions(+), 44 deletions(-) diff --git a/PolyPilot.Tests/ChatExperienceSafetyTests.cs b/PolyPilot.Tests/ChatExperienceSafetyTests.cs index d10c713c4..28fd81190 100644 --- a/PolyPilot.Tests/ChatExperienceSafetyTests.cs +++ b/PolyPilot.Tests/ChatExperienceSafetyTests.cs @@ -318,6 +318,38 @@ public async Task CompleteResponse_FlushesContentToHistory() Assert.Contains("model's response text", lastMessage.Content); } + /// + /// Identical assistant text across DIFFERENT turns must still be persisted. + /// The replay dedup guard should only suppress content already flushed in the + /// current turn, not a legitimate repeated reply like "Done." in a later turn. + /// + [Fact] + public async Task CompleteResponse_IdenticalCrossTurnReply_IsStillPersisted() + { + var svc = CreateService(); + await svc.ReconnectAsync(new ConnectionSettings { Mode = ConnectionMode.Demo }); + var session = await svc.CreateSessionAsync("cross-turn-complete-test"); + + var state = GetSessionState(svc, "cross-turn-complete-test"); + session.IsProcessing = true; + SetField(state, "SendingFlag", 1); + + session.History.Add(ChatMessage.AssistantMessage("Done.")); + var historyBefore = session.History.Count; + + GetCurrentResponse(state).Append("Done."); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + SetResponseCompletion(state, tcs); + + InvokeCompleteResponse(svc, state, null); + + Assert.Equal(historyBefore + 1, session.History.Count); + Assert.Equal("Done.", session.History.Last().Content); + Assert.True(tcs.Task.IsCompleted); + Assert.Equal("Done.", tcs.Task.Result); + } + /// /// CompleteResponse must include FlushedResponse (from mid-turn flushes on TurnEnd) /// in the TCS result. Without this, orchestrator dispatch gets empty string. @@ -355,6 +387,37 @@ public async Task CompleteResponse_IncludesFlushedResponseInTcsResult() Assert.Contains("Second sub-turn continuation", result); } + /// + /// If the SDK replays the exact text that was already flushed earlier in the SAME turn, + /// CompleteResponse must not duplicate it in either History or the TCS result. + /// + [Fact] + public async Task CompleteResponse_SameTurnReplay_DoesNotDuplicateHistoryOrTcs() + { + var svc = CreateService(); + await svc.ReconnectAsync(new ConnectionSettings { Mode = ConnectionMode.Demo }); + var session = await svc.CreateSessionAsync("same-turn-replay-test"); + + var state = GetSessionState(svc, "same-turn-replay-test"); + session.IsProcessing = true; + SetField(state, "SendingFlag", 1); + + GetFlushedResponse(state).Append("Already flushed content"); + session.History.Add(ChatMessage.AssistantMessage("Already flushed content")); + var historyBefore = session.History.Count; + + GetCurrentResponse(state).Append("Already flushed content"); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + SetResponseCompletion(state, tcs); + + InvokeCompleteResponse(svc, state, null); + + Assert.Equal(historyBefore, session.History.Count); + Assert.True(tcs.Task.IsCompleted); + Assert.Equal("Already flushed content", tcs.Task.Result); + } + /// /// CompleteResponse fires OnSessionComplete so orchestrator loops can unblock. /// Without this (INV-O4), multi-agent workers hang forever waiting for completion. @@ -712,8 +775,8 @@ public async Task FlushCurrentResponse_AddsToHistory() } /// - /// FlushCurrentResponse dedup guard: if the last assistant message has identical content, - /// the flush is skipped to prevent duplicates on session resume. + /// FlushCurrentResponse dedup guard: if the exact same segment was already flushed in + /// the CURRENT turn, the replay is skipped to prevent duplicates on resume/IDLE-DEFER. /// [Fact] public async Task FlushCurrentResponse_DedupGuard_SkipsDuplicate() @@ -724,11 +787,12 @@ public async Task FlushCurrentResponse_DedupGuard_SkipsDuplicate() var state = GetSessionState(svc, "dedup-test"); - // Add a message that looks like it was already flushed + // Simulate the current turn already flushing this exact segment once. + GetFlushedResponse(state).Append("Already flushed content"); session.History.Add(ChatMessage.AssistantMessage("Already flushed content")); var historyCountAfterFirst = session.History.Count; - // Simulate the same content appearing in CurrentResponse (SDK replay on resume) + // Simulate the same content appearing in CurrentResponse again (SDK replay) GetCurrentResponse(state).Append("Already flushed content"); // Act @@ -738,6 +802,28 @@ public async Task FlushCurrentResponse_DedupGuard_SkipsDuplicate() Assert.Equal(historyCountAfterFirst, session.History.Count); } + /// + /// A brand-new turn that happens to produce the same assistant text as the prior turn + /// must still be preserved. Dedup is same-turn only. + /// + [Fact] + public async Task FlushCurrentResponse_IdenticalCrossTurnReply_IsStillPersisted() + { + var svc = CreateService(); + await svc.ReconnectAsync(new ConnectionSettings { Mode = ConnectionMode.Demo }); + var session = await svc.CreateSessionAsync("cross-turn-flush-test"); + + var state = GetSessionState(svc, "cross-turn-flush-test"); + session.History.Add(ChatMessage.AssistantMessage("Done.")); + var historyBefore = session.History.Count; + + GetCurrentResponse(state).Append("Done."); + InvokeFlushCurrentResponse(svc, state); + + Assert.Equal(historyBefore + 1, session.History.Count); + Assert.Equal("Done.", session.History.Last().Content); + } + /// /// FlushCurrentResponse accumulates text in FlushedResponse so CompleteResponse /// can include it in the TCS result for orchestrator dispatch. diff --git a/PolyPilot.Tests/ZombieSubagentExpiryTests.cs b/PolyPilot.Tests/ZombieSubagentExpiryTests.cs index b82896f9d..390940097 100644 --- a/PolyPilot.Tests/ZombieSubagentExpiryTests.cs +++ b/PolyPilot.Tests/ZombieSubagentExpiryTests.cs @@ -122,13 +122,13 @@ public void ZombieThresholdExactlyMet_ReturnsFalse() idle, TicksAgo(CopilotService.SubagentZombieTimeoutMinutes))); } - // --- Shells are expired after zombie timeout (same as agents) --- + // --- Shells get a longer zombie timeout than agents --- [Fact] - public void ZombieThresholdExceeded_WithShells_ReturnsFalse() + public void AfterAgentThreshold_MixedShellsStillKeepSessionActive() { - // When both agents and shells are present but the zombie threshold is exceeded, - // everything is treated as expired — stale shells should not block completion. + // At 30 minutes, agents should have expired but shells should still keep the + // session deferred. This protects legitimate long-running build/test shells. var idle = new SessionIdleEvent { Data = new SessionIdleData @@ -152,16 +152,17 @@ public void ZombieThresholdExceeded_WithShells_ReturnsFalse() } } }; - Assert.False(CopilotService.HasActiveBackgroundTasks(idle, TicksAgo(30))); + Assert.True(CopilotService.HasActiveBackgroundTasks( + idle, TicksAgo(CopilotService.SubagentZombieTimeoutMinutes + 10))); } [Fact] public void ZombieThresholdExceeded_ShellsOnly_ReturnsFalse() { - // Shells alone should also be expired after the zombie timeout — - // stale/orphaned shells reported by the CLI should not block completion forever. + // Shells alone should eventually expire too — just with a longer threshold than agents. var idle = MakeIdleWithShells(); - Assert.False(CopilotService.HasActiveBackgroundTasks(idle, TicksAgo(60))); + Assert.False(CopilotService.HasActiveBackgroundTasks( + idle, TicksAgo(CopilotService.ShellZombieTimeoutMinutes + 1))); } [Fact] @@ -171,6 +172,16 @@ public void FreshDeferStart_ShellsOnly_ReturnsTrue() Assert.True(CopilotService.HasActiveBackgroundTasks(idle, TicksAgo(1))); } + [Fact] + public void AfterAgentThreshold_ShellsOnly_StillReturnsTrue() + { + // Shells should survive past the 20-minute agent timeout so legitimate long-running + // commands do not get truncated just because they're shell-backed instead of subagents. + var idle = MakeIdleWithShells(); + Assert.True(CopilotService.HasActiveBackgroundTasks( + idle, TicksAgo(CopilotService.SubagentZombieTimeoutMinutes + 5))); + } + // --- Cross-turn stale timestamp: the critical lifecycle bug this PR fixes --- [Fact] diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index bd0ff50de..36530e665 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -1215,33 +1215,50 @@ private void TryAttachImages(MessageOptions options, List imagePaths) } } + private static string? GetLastFlushedSegmentThisTurn(SessionState state) + { + if (state.FlushedResponse.Length == 0) return null; + + var flushed = state.FlushedResponse.ToString(); + var separatorIndex = flushed.LastIndexOf("\n\n", StringComparison.Ordinal); + return separatorIndex >= 0 ? flushed[(separatorIndex + 2)..] : flushed; + } + + private static bool WasResponseAlreadyFlushedThisTurn(SessionState state, string text) + { + if (string.IsNullOrWhiteSpace(text)) return false; + + var lastSegment = GetLastFlushedSegmentThisTurn(state); + return !string.IsNullOrEmpty(lastSegment) && + string.Equals(lastSegment, text, StringComparison.Ordinal); + } + /// Flush accumulated assistant text to history without ending the turn. private void FlushCurrentResponse(SessionState state) { var text = state.CurrentResponse.ToString(); if (string.IsNullOrWhiteSpace(text)) return; - - // Dedup guard: if this exact text was already flushed (e.g., SDK replayed events - // after resume and content was re-appended to CurrentResponse), don't duplicate. - var lastAssistant = state.Info.History.LastOrDefault(m => - m.Role == "assistant" && m.MessageType != ChatMessageType.ToolCall); - if (lastAssistant?.Content == text) + + // Dedup only within the CURRENT turn. SDK replay after a flush can re-append the + // same sub-turn text to CurrentResponse, but identical replies across different + // turns are legitimate and must still be preserved in History. + if (WasResponseAlreadyFlushedThisTurn(state, text)) { - Debug($"[DEDUP] FlushCurrentResponse skipped duplicate content ({text.Length} chars) for session '{state.Info.Name}'"); + Debug($"[DEDUP] FlushCurrentResponse skipped same-turn replay ({text.Length} chars) for session '{state.Info.Name}'"); state.CurrentResponse.Clear(); return; } - + var msg = new ChatMessage("assistant", text, DateTime.Now) { Model = state.Info.Model }; state.Info.History.Add(msg); state.Info.MessageCount = state.Info.History.Count; - + if (!string.IsNullOrEmpty(state.Info.SessionId)) SafeFireAndForget(_chatDb.AddMessageAsync(state.Info.SessionId, msg), "AddMessageAsync"); - + // Track code suggestions from accumulated response segment _usageStats?.TrackCodeSuggestion(text); - + // Accumulate flushed text so CompleteResponse can include it in the TCS result. // Without this, orchestrator dispatch gets "" because TurnEnd flush clears // CurrentResponse before SessionIdle fires CompleteResponse. @@ -1345,15 +1362,13 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul Interlocked.Exchange(ref state.TurnEndReceivedAtTicks, 0); state.Info.IsResumed = false; // Clear after first successful turn var response = state.CurrentResponse.ToString(); + var responseAlreadyFlushedThisTurn = WasResponseAlreadyFlushedThisTurn(state, response); if (!string.IsNullOrWhiteSpace(response)) { - // Dedup guard: FlushCurrentResponse (called on TurnEnd and IDLE-DEFER) may have - // already added this exact text to History. If the SDK replayed deltas after a - // flush (e.g., IDLE-DEFER re-arm, reconnect replay), CurrentResponse can accumulate - // the same content again. Without this check, the same message appears twice. - var lastAssistant = state.Info.History.LastOrDefault(m => - m.Role == "assistant" && m.MessageType != ChatMessageType.ToolCall); - if (lastAssistant?.Content != response) + // Dedup only within the current turn. FlushCurrentResponse may have already + // committed this exact segment when SessionIdle replays after IDLE-DEFER, but + // identical assistant replies on different turns are legitimate and must persist. + if (!responseAlreadyFlushedThisTurn) { var msg = new ChatMessage("assistant", response, DateTime.Now) { Model = state.Info.Model }; state.Info.History.Add(msg); @@ -1371,18 +1386,19 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul } else { - Debug($"[DEDUP] CompleteResponse skipped duplicate content ({response.Length} chars) for '{state.Info.Name}'"); + Debug($"[DEDUP] CompleteResponse skipped same-turn replay ({response.Length} chars) for '{state.Info.Name}'"); } } // Build full turn response for TCS: include text flushed mid-turn (e.g., on TurnEnd) // plus any remaining text in CurrentResponse. Without this, orchestrator dispatch // gets "" because FlushCurrentResponse on TurnEnd clears CurrentResponse before // SessionIdle fires CompleteResponse. + var responseForCompletion = responseAlreadyFlushedThisTurn ? string.Empty : response; var fullResponse = state.FlushedResponse.Length > 0 - ? (string.IsNullOrEmpty(response) + ? (string.IsNullOrEmpty(responseForCompletion) ? state.FlushedResponse.ToString() - : state.FlushedResponse + "\n\n" + response) - : response; + : state.FlushedResponse + "\n\n" + responseForCompletion) + : responseForCompletion; // Track one message per completed turn regardless of trailing text _usageStats?.TrackMessage(); // Reset permission recovery attempts on successful turn completion @@ -2249,21 +2265,25 @@ internal static bool IsMcpError(string? text) /// before all background agents are treated as zombies and the session is allowed to complete. /// The Copilot CLI has no per-agent timeout, so a crashed or orphaned subagent blocks /// IDLE-DEFER indefinitely. After this threshold PolyPilot expires the stale block. - /// The same timeout applies to shells — stale/detached shells that the CLI never - /// reports as completed should not block the session indefinitely. /// internal const int SubagentZombieTimeoutMinutes = 20; + /// + /// Shells get a longer zombie window than subagents because legitimate build/test + /// processes can run for much longer than the agent orchestration rounds that spawned them. + /// This still bounds leaked shells, but avoids truncating healthy long-running work. + /// + internal const int ShellZombieTimeoutMinutes = 60; /// /// Check if a SessionIdleEvent reports active background tasks (agents or shells). /// When background tasks are active, session.idle means "foreground quiesced, background /// still running" — NOT true completion. /// - /// When is non-zero, background tasks (both - /// agents AND shells) are treated as zombies if the session has been in IDLE-DEFER longer - /// than . This allows the session to complete - /// even if the CLI never fires SubagentCompleted/ShellCompleted for crashed or orphaned - /// background tasks. The caller is responsible for logging the zombie expiry via Debug(). + /// When is non-zero, agents and shells get + /// separate zombie windows. Agents expire after ; + /// shells expire after the longer . This allows + /// the session to recover from leaked background tasks without prematurely truncating + /// legitimate long-running shell work. /// internal static bool HasActiveBackgroundTasks( SessionIdleEvent idle, @@ -2278,11 +2298,11 @@ internal static bool HasActiveBackgroundTasks( if ((hasAgents || hasShells) && idleDeferStartedAtTicks != 0) { var elapsed = TimeSpan.FromTicks(DateTime.UtcNow.Ticks - idleDeferStartedAtTicks); - if (elapsed.TotalMinutes >= SubagentZombieTimeoutMinutes) - { + if (hasAgents && elapsed.TotalMinutes >= SubagentZombieTimeoutMinutes) hasAgents = false; + + if (hasShells && elapsed.TotalMinutes >= ShellZombieTimeoutMinutes) hasShells = false; - } } return hasAgents || hasShells; From fba541edc08913ca9c59e0fd0e264b02dc4e4652 Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Mon, 6 Apr 2026 13:05:17 -0500 Subject: [PATCH 10/12] fix: bound forced abort and preserve recovered sessions Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot.Tests/SessionPersistenceTests.cs | 29 ++++++++++++++-- PolyPilot.Tests/SessionStabilityTests.cs | 12 +++++++ PolyPilot/Models/AgentSessionInfo.cs | 7 ++++ .../Services/CopilotService.Organization.cs | 13 +++++++- .../Services/CopilotService.Persistence.cs | 33 ++++++++++++++----- PolyPilot/Services/CopilotService.cs | 1 + 6 files changed, 83 insertions(+), 12 deletions(-) diff --git a/PolyPilot.Tests/SessionPersistenceTests.cs b/PolyPilot.Tests/SessionPersistenceTests.cs index 33eb5280c..d57c3e022 100644 --- a/PolyPilot.Tests/SessionPersistenceTests.cs +++ b/PolyPilot.Tests/SessionPersistenceTests.cs @@ -1673,15 +1673,16 @@ public void Merge_NameCollision_MissingDirStillExcluded() } [Fact] - public void Merge_NameCollision_DifferentGroupId_DropsPersistedEntry() + public void Merge_NameCollision_DifferentGroupId_WithExplicitRecovery_DropsPersistedEntry() { // When a session is moved to a new group during scattered team reconstruction // and then recreated with a new SessionId, the persisted entry from the old group - // should be silently dropped — not renamed to "(previous)". + // should be silently dropped only if the replacement explicitly records that it + // recovered history from the old session. var active = new List { new() { SessionId = "new-id", DisplayName = "Copilot Cli-worker-1", Model = "m", - WorkingDirectory = "/w", GroupId = "new-group-id" } + WorkingDirectory = "/w", GroupId = "new-group-id", RecoveredFromSessionId = "old-id" } }; var persisted = new List { @@ -1697,6 +1698,28 @@ public void Merge_NameCollision_DifferentGroupId_DropsPersistedEntry() Assert.Equal("Copilot Cli-worker-1", result[0].DisplayName); } + [Fact] + public void Merge_NameCollision_DifferentGroupId_WithoutRecoveryMarker_KeepsPrevious() + { + var active = new List + { + new() { SessionId = "new-id", DisplayName = "Copilot Cli-worker-1", Model = "m", + WorkingDirectory = "/w", GroupId = "new-group-id" } + }; + var persisted = new List + { + new() { SessionId = "old-id", DisplayName = "Copilot Cli-worker-1", Model = "m", + WorkingDirectory = "/w", GroupId = "old-group-id" } + }; + + var result = CopilotService.MergeSessionEntries(active, persisted, new HashSet(), new HashSet(), _ => true); + + Assert.Equal(2, result.Count); + Assert.Equal("Copilot Cli-worker-1", result[0].DisplayName); + Assert.Equal("Copilot Cli-worker-1 (previous)", result[1].DisplayName); + Assert.Equal("old-id", result[1].SessionId); + } + [Fact] public void Merge_NameCollision_SameGroupId_StillCreatesPrevious() { diff --git a/PolyPilot.Tests/SessionStabilityTests.cs b/PolyPilot.Tests/SessionStabilityTests.cs index cbcdabce6..a76684d8c 100644 --- a/PolyPilot.Tests/SessionStabilityTests.cs +++ b/PolyPilot.Tests/SessionStabilityTests.cs @@ -128,6 +128,18 @@ public void ForceCompleteProcessing_SkipsIfNotProcessing() Assert.Contains("!state.Info.IsProcessing", method); } + [Fact] + public void ForceCompleteProcessing_BoundsAbortAsyncTimeout() + { + var source = File.ReadAllText(TestPaths.OrganizationCs); + var method = ExtractMethod(source, "Task ForceCompleteProcessingAsync"); + + Assert.Contains("ForceCompleteAbortTimeoutSeconds", source); + Assert.Contains("new CancellationTokenSource(TimeSpan.FromSeconds(ForceCompleteAbortTimeoutSeconds))", method); + Assert.Contains("await session.AbortAsync(abortCts.Token);", method); + Assert.Contains("OperationCanceledException", method); + } + // ─── Mixed Worker Success/Failure Synthesis Tests ─── [Fact] diff --git a/PolyPilot/Models/AgentSessionInfo.cs b/PolyPilot/Models/AgentSessionInfo.cs index 204348f51..15861e1c4 100644 --- a/PolyPilot/Models/AgentSessionInfo.cs +++ b/PolyPilot/Models/AgentSessionInfo.cs @@ -36,6 +36,13 @@ public class AgentSessionInfo // For resumed sessions public string? SessionId { get; set; } public bool IsResumed { get; set; } + /// + /// When this session was recreated from an older session during restore/recovery, + /// records the source session ID whose history was explicitly injected into this one. + /// Persisted to active-sessions.json so merge logic can safely suppress the obsolete + /// predecessor only when recovery actually happened. + /// + public string? RecoveredFromSessionId { get; set; } // Timestamp of last state change (message received, turn end, etc.) // Uses Interlocked ticks pattern for thread safety (updated from background SDK event threads). diff --git a/PolyPilot/Services/CopilotService.Organization.cs b/PolyPilot/Services/CopilotService.Organization.cs index c32ad0ff6..1e3e0b23b 100644 --- a/PolyPilot/Services/CopilotService.Organization.cs +++ b/PolyPilot/Services/CopilotService.Organization.cs @@ -2182,6 +2182,12 @@ internal static List TryParseJsonAssignments(string response, Li private record WorkerResult(string WorkerName, string? Response, bool Success, string? Error, TimeSpan Duration); + /// + /// Upper bound for AbortAsync during force-complete cleanup. The cleanup path exists to + /// recover from stuck workers; it must stay bounded even if the SDK RPC is half-open. + /// + internal const int ForceCompleteAbortTimeoutSeconds = 15; + /// /// Full INV-1-compliant force-completion of a session's processing state. /// Clears all 9+ companion fields, resolves the ResponseCompletion TCS, @@ -2262,11 +2268,16 @@ private async Task ForceCompleteProcessingAsync(string sessionName, SessionState var session = state.Session; if (session != null) { + using var abortCts = new CancellationTokenSource(TimeSpan.FromSeconds(ForceCompleteAbortTimeoutSeconds)); try { - await session.AbortAsync(CancellationToken.None); + await session.AbortAsync(abortCts.Token); Debug($"[DISPATCH] ForceCompleteProcessing '{sessionName}': abort sent to clear pending tool state"); } + catch (OperationCanceledException) when (abortCts.IsCancellationRequested) + { + Debug($"[DISPATCH] ForceCompleteProcessing '{sessionName}': abort timed out after {ForceCompleteAbortTimeoutSeconds}s — proceeding anyway"); + } catch (Exception abortEx) { Debug($"[DISPATCH] ForceCompleteProcessing '{sessionName}': abort failed (non-fatal): {abortEx.Message}"); diff --git a/PolyPilot/Services/CopilotService.Persistence.cs b/PolyPilot/Services/CopilotService.Persistence.cs index c320c1a66..5b8de3589 100644 --- a/PolyPilot/Services/CopilotService.Persistence.cs +++ b/PolyPilot/Services/CopilotService.Persistence.cs @@ -30,6 +30,7 @@ private void SaveActiveSessionsToDisk() ReasoningEffort = s.Info.ReasoningEffort, WorkingDirectory = s.Info.WorkingDirectory, GroupId = sessionMetas.FirstOrDefault(m => m.SessionName == s.Info.Name)?.GroupId, + RecoveredFromSessionId = s.Info.RecoveredFromSessionId, LastPrompt = s.Info.IsProcessing ? s.Info.History.LastOrDefault(m => m.IsUser)?.Content : null, @@ -77,6 +78,7 @@ private void SaveActiveSessionsToDiskCore() WorkingDirectory = s.Info.WorkingDirectory, ReasoningEffort = s.Info.ReasoningEffort, GroupId = sessionMetas.FirstOrDefault(m => m.SessionName == s.Info.Name)?.GroupId, + RecoveredFromSessionId = s.Info.RecoveredFromSessionId, LastPrompt = s.Info.IsProcessing ? s.Info.History.LastOrDefault(m => m.IsUser)?.Content : null, @@ -192,17 +194,14 @@ internal static List MergeSessionEntries( { // If the active session moved to a different group (e.g., scattered team // reconstruction created a new multi-agent group and the session was recreated - // there), the persisted entry is the obsolete predecessor — its history was - // already recovered via CopyEventsToNewSession. Drop it silently instead of - // creating a confusing "(previous)" duplicate. + // there), the persisted entry may be an obsolete predecessor. Drop it only + // when the replacement explicitly records that it recovered history from this + // exact session ID; otherwise keep a "(previous)" entry so recoverability wins. var activeCounterpart = active.FirstOrDefault(a => string.Equals(a.DisplayName, existing.DisplayName, StringComparison.OrdinalIgnoreCase)); - if (activeCounterpart != null - && !string.IsNullOrEmpty(existing.GroupId) - && !string.IsNullOrEmpty(activeCounterpart.GroupId) - && !string.Equals(existing.GroupId, activeCounterpart.GroupId, StringComparison.OrdinalIgnoreCase)) + if (activeCounterpart != null && CanSafelyDropSupersededGroupMoveEntry(existing, activeCounterpart)) { - continue; // Superseded by group move — history already migrated + continue; // Explicitly recovered into the active group-moved replacement } entryToAdd = new ActiveSessionEntry @@ -214,6 +213,7 @@ internal static List MergeSessionEntries( WorkingDirectory = existing.WorkingDirectory, LastPrompt = existing.LastPrompt, GroupId = existing.GroupId, + RecoveredFromSessionId = existing.RecoveredFromSessionId, TotalInputTokens = existing.TotalInputTokens, TotalOutputTokens = existing.TotalOutputTokens, ContextCurrentTokens = existing.ContextCurrentTokens, @@ -238,6 +238,21 @@ internal static List MergeSessionEntries( return merged; } + private static bool CanSafelyDropSupersededGroupMoveEntry( + ActiveSessionEntry existing, + ActiveSessionEntry activeCounterpart) + { + if (string.IsNullOrEmpty(existing.GroupId) || + string.IsNullOrEmpty(activeCounterpart.GroupId) || + string.Equals(existing.GroupId, activeCounterpart.GroupId, StringComparison.OrdinalIgnoreCase)) + { + return false; + } + + return !string.IsNullOrEmpty(activeCounterpart.RecoveredFromSessionId) && + string.Equals(activeCounterpart.RecoveredFromSessionId, existing.SessionId, StringComparison.OrdinalIgnoreCase); + } + /// /// Restore persisted usage stats onto the in-memory session after resume. /// Uses Math.Max for accumulative fields to avoid overwriting values the SDK @@ -263,6 +278,7 @@ private void RestoreUsageStats(ActiveSessionEntry entry) // Restore real LastUpdatedAt so focus detection uses actual activity time, not restore time if (entry.LastUpdatedAt.HasValue) state.Info.LastUpdatedAt = entry.LastUpdatedAt.Value; + state.Info.RecoveredFromSessionId ??= entry.RecoveredFromSessionId; // Backfill from events.jsonl only when ALL tracked fields are zero (indicating "never tracked") if (entry.PremiumRequestsUsed == 0 && entry.TotalApiTimeSeconds == 0 && !entry.CreatedAt.HasValue) @@ -944,6 +960,7 @@ public async Task RestorePreviousSessionsAsync(CancellationToken cancellationTok recreatedState.Info.History.Add(ChatMessage.SystemMessage("🔄 Session recreated — conversation history recovered from previous session.")); recreatedState.Info.MessageCount = recreatedState.Info.History.Count; recreatedState.Info.LastReadMessageCount = recreatedState.Info.History.Count; + recreatedState.Info.RecoveredFromSessionId = bestSourceId; } // Restore usage stats (token counts, CreatedAt, etc.) diff --git a/PolyPilot/Services/CopilotService.cs b/PolyPilot/Services/CopilotService.cs index f9199fafe..dc69bc082 100644 --- a/PolyPilot/Services/CopilotService.cs +++ b/PolyPilot/Services/CopilotService.cs @@ -5060,6 +5060,7 @@ public class ActiveSessionEntry public string? WorkingDirectory { get; set; } public string? LastPrompt { get; set; } public string? GroupId { get; set; } + public string? RecoveredFromSessionId { get; set; } // Usage stats persisted across reconnects public int TotalInputTokens { get; set; } public int TotalOutputTokens { get; set; } From 12e0823a3d15162bcca7ea008b192305fd42c95f Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Mon, 6 Apr 2026 13:19:50 -0500 Subject: [PATCH 11/12] fix: refine replay dedup and worker failure labels Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot.Tests/ChatExperienceSafetyTests.cs | 100 +++++++++++++++++- PolyPilot.Tests/SessionStabilityTests.cs | 9 ++ PolyPilot/Services/CopilotService.Events.cs | 29 +++-- .../Services/CopilotService.Organization.cs | 15 +-- PolyPilot/Services/CopilotService.cs | 7 ++ 5 files changed, 140 insertions(+), 20 deletions(-) diff --git a/PolyPilot.Tests/ChatExperienceSafetyTests.cs b/PolyPilot.Tests/ChatExperienceSafetyTests.cs index 28fd81190..0e64495a1 100644 --- a/PolyPilot.Tests/ChatExperienceSafetyTests.cs +++ b/PolyPilot.Tests/ChatExperienceSafetyTests.cs @@ -72,6 +72,14 @@ private static void InvokeFlushCurrentResponse(CopilotService svc, object sessio method.Invoke(svc, new object?[] { sessionState }); } + /// Invokes the private ClearFlushedReplayDedup helper to simulate a tool/sub-turn boundary. + private static void InvokeClearFlushedReplayDedup(object sessionState) + { + var method = typeof(CopilotService).GetMethod("ClearFlushedReplayDedup", + BindingFlags.NonPublic | BindingFlags.Static)!; + method.Invoke(null, new[] { sessionState }); + } + /// Gets a field from SessionState by name. private static T GetField(object state, string fieldName) { @@ -402,8 +410,8 @@ public async Task CompleteResponse_SameTurnReplay_DoesNotDuplicateHistoryOrTcs() session.IsProcessing = true; SetField(state, "SendingFlag", 1); - GetFlushedResponse(state).Append("Already flushed content"); - session.History.Add(ChatMessage.AssistantMessage("Already flushed content")); + GetCurrentResponse(state).Append("Already flushed content"); + InvokeFlushCurrentResponse(svc, state); var historyBefore = session.History.Count; GetCurrentResponse(state).Append("Already flushed content"); @@ -418,6 +426,38 @@ public async Task CompleteResponse_SameTurnReplay_DoesNotDuplicateHistoryOrTcs() Assert.Equal("Already flushed content", tcs.Task.Result); } + /// + /// Same-turn replay dedup must still work for ordinary multi-paragraph/model-formatted + /// responses that contain "\n\n" inside the content body. + /// + [Fact] + public async Task CompleteResponse_SameTurnReplay_MultiParagraphContent_DoesNotDuplicate() + { + var svc = CreateService(); + await svc.ReconnectAsync(new ConnectionSettings { Mode = ConnectionMode.Demo }); + var session = await svc.CreateSessionAsync("same-turn-replay-multipara"); + + var state = GetSessionState(svc, "same-turn-replay-multipara"); + session.IsProcessing = true; + SetField(state, "SendingFlag", 1); + + const string content = "First paragraph.\n\n```csharp\nConsole.WriteLine(\"hi\");\n```\n\nFinal paragraph."; + GetCurrentResponse(state).Append(content); + InvokeFlushCurrentResponse(svc, state); + var historyBefore = session.History.Count; + + GetCurrentResponse(state).Append(content); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + SetResponseCompletion(state, tcs); + + InvokeCompleteResponse(svc, state, null); + + Assert.Equal(historyBefore, session.History.Count); + Assert.True(tcs.Task.IsCompleted); + Assert.Equal(content, tcs.Task.Result); + } + /// /// CompleteResponse fires OnSessionComplete so orchestrator loops can unblock. /// Without this (INV-O4), multi-agent workers hang forever waiting for completion. @@ -788,8 +828,8 @@ public async Task FlushCurrentResponse_DedupGuard_SkipsDuplicate() var state = GetSessionState(svc, "dedup-test"); // Simulate the current turn already flushing this exact segment once. - GetFlushedResponse(state).Append("Already flushed content"); - session.History.Add(ChatMessage.AssistantMessage("Already flushed content")); + GetCurrentResponse(state).Append("Already flushed content"); + InvokeFlushCurrentResponse(svc, state); var historyCountAfterFirst = session.History.Count; // Simulate the same content appearing in CurrentResponse again (SDK replay) @@ -802,6 +842,31 @@ public async Task FlushCurrentResponse_DedupGuard_SkipsDuplicate() Assert.Equal(historyCountAfterFirst, session.History.Count); } + /// + /// Same-turn flush dedup must treat embedded paragraph breaks as normal content, not as + /// separators between separately flushed segments. + /// + [Fact] + public async Task FlushCurrentResponse_DedupGuard_MultiParagraphContent_SkipsDuplicate() + { + var svc = CreateService(); + await svc.ReconnectAsync(new ConnectionSettings { Mode = ConnectionMode.Demo }); + var session = await svc.CreateSessionAsync("dedup-multipara-test"); + + var state = GetSessionState(svc, "dedup-multipara-test"); + + const string content = "Overview:\n\n- first item\n- second item\n\nDone."; + GetCurrentResponse(state).Append(content); + InvokeFlushCurrentResponse(svc, state); + var historyCountAfterFirst = session.History.Count; + + GetCurrentResponse(state).Append(content); + + InvokeFlushCurrentResponse(svc, state); + + Assert.Equal(historyCountAfterFirst, session.History.Count); + } + /// /// A brand-new turn that happens to produce the same assistant text as the prior turn /// must still be preserved. Dedup is same-turn only. @@ -824,6 +889,33 @@ public async Task FlushCurrentResponse_IdenticalCrossTurnReply_IsStillPersisted( Assert.Equal("Done.", session.History.Last().Content); } + /// + /// A later same-turn sub-turn may legitimately produce the same short text again after a + /// tool/sub-turn boundary. That follow-up response must not be mistaken for an SDK replay. + /// + [Fact] + public async Task FlushCurrentResponse_IdenticalSameTurnAfterBoundary_IsStillPersisted() + { + var svc = CreateService(); + await svc.ReconnectAsync(new ConnectionSettings { Mode = ConnectionMode.Demo }); + var session = await svc.CreateSessionAsync("same-turn-after-boundary"); + + var state = GetSessionState(svc, "same-turn-after-boundary"); + + GetCurrentResponse(state).Append("Done."); + InvokeFlushCurrentResponse(svc, state); + var historyAfterFirst = session.History.Count; + + // Simulate a tool/sub-turn boundary before the assistant emits the same text again. + InvokeClearFlushedReplayDedup(state); + + GetCurrentResponse(state).Append("Done."); + InvokeFlushCurrentResponse(svc, state); + + Assert.Equal(historyAfterFirst + 1, session.History.Count); + Assert.Equal("Done.", session.History.Last().Content); + } + /// /// FlushCurrentResponse accumulates text in FlushedResponse so CompleteResponse /// can include it in the TCS result for orchestrator dispatch. diff --git a/PolyPilot.Tests/SessionStabilityTests.cs b/PolyPilot.Tests/SessionStabilityTests.cs index a76684d8c..b503d3d4d 100644 --- a/PolyPilot.Tests/SessionStabilityTests.cs +++ b/PolyPilot.Tests/SessionStabilityTests.cs @@ -140,6 +140,15 @@ public void ForceCompleteProcessing_BoundsAbortAsyncTimeout() Assert.Contains("OperationCanceledException", method); } + [Fact] + public void OrchestratorTimeout_ResultCollection_PreservesWorkerNames() + { + var source = File.ReadAllText(TestPaths.OrganizationCs); + + Assert.Contains("var workerName = i < assignments.Count ? assignments[i].WorkerName : \"unknown\";", source); + Assert.DoesNotContain("new WorkerResult(\"unknown\", null, false", source); + } + // ─── Mixed Worker Success/Failure Synthesis Tests ─── [Fact] diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index 36530e665..cd787a7cd 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -355,6 +355,7 @@ void Invoke(Action action) if (toolStart.Data == null) break; Interlocked.Increment(ref state.ActiveToolCallCount); state.HasUsedToolsThisTurn = true; // volatile field — no explicit barrier needed + ClearFlushedReplayDedup(state); // Record tool start time and schedule health check Interlocked.Exchange(ref state.ToolStartedAtTicks, DateTime.UtcNow.Ticks); ScheduleToolHealthCheck(state, sessionName); @@ -386,6 +387,7 @@ void Invoke(Action action) Invoke(() => { FlushCurrentResponse(state); + ClearFlushedReplayDedup(state); state.Info.History.Add(imgPlaceholder); OnToolStarted?.Invoke(sessionName, startToolName, startCallId, toolInput); }); @@ -396,6 +398,7 @@ void Invoke(Action action) Invoke(() => { FlushCurrentResponse(state); + ClearFlushedReplayDedup(state); state.Info.History.Add(toolMsg); OnToolStarted?.Invoke(sessionName, startToolName, startCallId, toolInput); OnActivity?.Invoke(sessionName, $"🔧 Running {startToolName}..."); @@ -561,6 +564,7 @@ void Invoke(Action action) CancelTurnEndFallback(state); state.FallbackCanceledByTurnStart = true; state.HasReceivedDeltasThisTurn = false; + ClearFlushedReplayDedup(state); var phaseAdvancedToThinking = state.Info.ProcessingPhase < 2; if (phaseAdvancedToThinking) state.Info.ProcessingPhase = 2; // Thinking Interlocked.Exchange(ref state.ActiveToolCallCount, 0); @@ -1215,22 +1219,23 @@ private void TryAttachImages(MessageOptions options, List imagePaths) } } - private static string? GetLastFlushedSegmentThisTurn(SessionState state) + private static void ClearFlushedReplayDedup(SessionState state) { - if (state.FlushedResponse.Length == 0) return null; - - var flushed = state.FlushedResponse.ToString(); - var separatorIndex = flushed.LastIndexOf("\n\n", StringComparison.Ordinal); - return separatorIndex >= 0 ? flushed[(separatorIndex + 2)..] : flushed; + state.LastFlushedResponseSegment = null; + state.FlushedReplayDedupArmed = false; } private static bool WasResponseAlreadyFlushedThisTurn(SessionState state, string text) { - if (string.IsNullOrWhiteSpace(text)) return false; + if (string.IsNullOrWhiteSpace(text) || + state.FlushedResponse.Length == 0 || + !state.FlushedReplayDedupArmed || + string.IsNullOrEmpty(state.LastFlushedResponseSegment)) + { + return false; + } - var lastSegment = GetLastFlushedSegmentThisTurn(state); - return !string.IsNullOrEmpty(lastSegment) && - string.Equals(lastSegment, text, StringComparison.Ordinal); + return string.Equals(state.LastFlushedResponseSegment, text, StringComparison.Ordinal); } /// Flush accumulated assistant text to history without ending the turn. @@ -1265,6 +1270,8 @@ private void FlushCurrentResponse(SessionState state) if (state.FlushedResponse.Length > 0) state.FlushedResponse.Append("\n\n"); state.FlushedResponse.Append(text); + state.LastFlushedResponseSegment = text; + state.FlushedReplayDedupArmed = true; state.CurrentResponse.Clear(); // NOTE: Do NOT reset HasReceivedDeltasThisTurn here — that flag gates whether @@ -1286,6 +1293,7 @@ private void FlushCurrentResponse(SessionState state) var remaining = state.CurrentResponse.ToString(); var fullResponse = string.IsNullOrEmpty(remaining) ? flushed : flushed + "\n\n" + remaining; state.FlushedResponse.Clear(); + ClearFlushedReplayDedup(state); state.CurrentResponse.Clear(); state.ResponseCompletion.TrySetResult(fullResponse); } @@ -1408,6 +1416,7 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul // call must see IsProcessing=false or it throws "already processing". state.CurrentResponse.Clear(); state.FlushedResponse.Clear(); + ClearFlushedReplayDedup(state); state.PendingReasoningMessages.Clear(); // Accumulate API time before clearing ProcessingStartedAt if (state.Info.ProcessingStartedAt is { } started) diff --git a/PolyPilot/Services/CopilotService.Organization.cs b/PolyPilot/Services/CopilotService.Organization.cs index 1e3e0b23b..ca068c557 100644 --- a/PolyPilot/Services/CopilotService.Organization.cs +++ b/PolyPilot/Services/CopilotService.Organization.cs @@ -1966,10 +1966,11 @@ private async Task SendViaOrchestratorAsync(string groupId, List members // Collect results — all tasks should now be completed (force-completed or already done). // Use try/catch since force-completed tasks may fault. var partialResults = new List(); - foreach (var t in workerTasks) + for (var i = 0; i < workerTasks.Count; i++) { - try { partialResults.Add(await t); } - catch (Exception ex) { partialResults.Add(new WorkerResult("unknown", null, false, $"Error: {ex.Message}", TimeSpan.Zero)); } + var workerName = i < assignments.Count ? assignments[i].WorkerName : "unknown"; + try { partialResults.Add(await workerTasks[i]); } + catch (Exception ex) { partialResults.Add(new WorkerResult(workerName, null, false, $"Error: {ex.Message}", TimeSpan.Zero)); } } results = partialResults.ToArray(); } @@ -2244,6 +2245,7 @@ private async Task ForceCompleteProcessingAsync(string sessionName, SessionState state.CurrentResponse.Clear(); state.FlushedResponse.Clear(); + ClearFlushedReplayDedup(state); state.PendingReasoningMessages.Clear(); state.Info.IsProcessing = false; @@ -3912,10 +3914,11 @@ private async Task SendViaOrchestratorReflectAsync(string groupId, List } } var partialResults = new List(); - foreach (var t in workerTasks) + for (var i = 0; i < workerTasks.Count; i++) { - try { partialResults.Add(await t); } - catch (Exception ex) { partialResults.Add(new WorkerResult("unknown", null, false, $"Error: {ex.Message}", TimeSpan.Zero)); } + var workerName = i < assignments.Count ? assignments[i].WorkerName : "unknown"; + try { partialResults.Add(await workerTasks[i]); } + catch (Exception ex) { partialResults.Add(new WorkerResult(workerName, null, false, $"Error: {ex.Message}", TimeSpan.Zero)); } } results = partialResults.ToArray(); } diff --git a/PolyPilot/Services/CopilotService.cs b/PolyPilot/Services/CopilotService.cs index dc69bc082..bcbe0f025 100644 --- a/PolyPilot/Services/CopilotService.cs +++ b/PolyPilot/Services/CopilotService.cs @@ -568,6 +568,13 @@ private class SessionState /// CompleteResponse combines this with CurrentResponse for the TCS result so /// orchestrator dispatch gets the full response text. public StringBuilder FlushedResponse { get; } = new(); + /// The last response segment flushed during the current turn. Used only + /// for immediate same-subturn replay suppression when the SDK replays events. + public string? LastFlushedResponseSegment { get; set; } + /// True only until a new tool/sub-turn boundary is observed. This keeps + /// replay dedup scoped to the just-flushed segment instead of suppressing later + /// identical content from a legitimate follow-up sub-turn. + public bool FlushedReplayDedupArmed { get; set; } public bool HasReceivedDeltasThisTurn { get; set; } public bool HasReceivedEventsSinceResume; public string? LastMessageId { get; set; } From d662403f6a1c3fb6f2f58aac822395e51474a292 Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Mon, 6 Apr 2026 15:06:17 -0500 Subject: [PATCH 12/12] fix: make replay dedup fields volatile for ARM64 thread safety ClearFlushedReplayDedup is called on the SDK background thread (ToolExecutionStartEvent, AssistantTurnStartEvent) while WasResponseAlreadyFlushedThisTurn reads the same fields on the UI thread. On ARM64 (Apple Silicon), the weak memory model can cause stale reads that silently drop legitimate assistant content. Convert LastFlushedResponseSegment and FlushedReplayDedupArmed from plain auto-properties to volatile fields, matching the existing HasUsedToolsThisTurn pattern. Clear armed before segment in ClearFlushedReplayDedup so concurrent readers short-circuit safely. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot/Services/CopilotService.Events.cs | 250 ++++++++++++++++---- PolyPilot/Services/CopilotService.cs | 71 ++++-- 2 files changed, 254 insertions(+), 67 deletions(-) diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index cd787a7cd..0115be0c5 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -1,3 +1,4 @@ +using System.Collections; using System.Text; using System.Collections.Concurrent; using System.Text.Json; @@ -83,6 +84,150 @@ private static EventVisibility ClassifySessionEvent(SessionEvent evt) : EventVisibility.TimelineOnly; } + internal readonly record struct BackgroundTaskSnapshot(int AgentCount, int ShellCount, string Fingerprint, bool IsKnown) + { + public bool HasAny => AgentCount > 0 || ShellCount > 0; + } + + private static object? UnwrapBackgroundTasksPayload(object? backgroundTasksOrEventData) + { + if (backgroundTasksOrEventData == null) + return null; + + var nested = backgroundTasksOrEventData.GetType().GetProperty("BackgroundTasks")?.GetValue(backgroundTasksOrEventData); + return nested ?? backgroundTasksOrEventData; + } + + private static List GetBackgroundTaskKeys(object? backgroundTasks, string collectionName, params string[] keyProperties) + { + var keys = new List(); + var items = backgroundTasks?.GetType().GetProperty(collectionName)?.GetValue(backgroundTasks) as IEnumerable; + if (items == null) + return keys; + + var index = 0; + foreach (var item in items) + { + if (item == null) + { + index++; + continue; + } + + string? key = null; + foreach (var propertyName in keyProperties) + { + if (item.GetType().GetProperty(propertyName)?.GetValue(item) is string value && + !string.IsNullOrWhiteSpace(value)) + { + key = value.Trim(); + break; + } + } + + keys.Add(key ?? $"#{index}"); + index++; + } + + keys.Sort(StringComparer.Ordinal); + return keys; + } + + internal static BackgroundTaskSnapshot GetBackgroundTaskSnapshot(object? backgroundTasksOrEventData) + { + if (backgroundTasksOrEventData == null) + return new BackgroundTaskSnapshot(0, 0, string.Empty, IsKnown: true); + + var outerType = backgroundTasksOrEventData.GetType(); + var backgroundTasks = UnwrapBackgroundTasksPayload(backgroundTasksOrEventData); + if (backgroundTasks == null) + return new BackgroundTaskSnapshot(0, 0, string.Empty, IsKnown: true); + + var type = backgroundTasks.GetType(); + if (type.GetProperty("Agents") == null && type.GetProperty("Shells") == null) + { + return outerType.GetProperty("BackgroundTasks") != null + ? new BackgroundTaskSnapshot(0, 0, string.Empty, IsKnown: true) + : default; + } + + var agentKeys = GetBackgroundTaskKeys(backgroundTasks, "Agents", "AgentId", "AgentName", "AgentType", "Description"); + var shellKeys = GetBackgroundTaskKeys(backgroundTasks, "Shells", "ShellId", "Description"); + + var fingerprintParts = new List(agentKeys.Count + shellKeys.Count); + fingerprintParts.AddRange(agentKeys.Select(static key => $"agent:{key}")); + fingerprintParts.AddRange(shellKeys.Select(static key => $"shell:{key}")); + + return new BackgroundTaskSnapshot( + agentKeys.Count, + shellKeys.Count, + string.Join("|", fingerprintParts), + IsKnown: true); + } + + internal static long GetBackgroundTaskFirstSeenTicks( + object? backgroundTasksOrEventData, + string? previousFingerprint, + long previousTicks, + DateTime utcNow) + { + var snapshot = GetBackgroundTaskSnapshot(backgroundTasksOrEventData); + if (!snapshot.IsKnown) + return previousTicks; + if (!snapshot.HasAny) + return 0L; + + return previousTicks != 0 && + string.Equals(previousFingerprint, snapshot.Fingerprint, StringComparison.Ordinal) + ? previousTicks + : utcNow.Ticks; + } + + private static (BackgroundTaskSnapshot Snapshot, long FirstSeenTicks) RefreshDeferredBackgroundTaskTracking( + SessionState state, + object? backgroundTasksOrEventData) + { + var snapshot = GetBackgroundTaskSnapshot(backgroundTasksOrEventData); + var previousTicks = Interlocked.Read(ref state.DeferredBackgroundTasksFirstSeenAtTicks); + var firstSeenTicks = GetBackgroundTaskFirstSeenTicks( + backgroundTasksOrEventData, + state.DeferredBackgroundTaskFingerprint, + previousTicks, + DateTime.UtcNow); + + if (!snapshot.IsKnown) + return (snapshot, previousTicks); + + if (!snapshot.HasAny) + { + state.DeferredBackgroundTaskFingerprint = null; + Interlocked.Exchange(ref state.DeferredBackgroundTasksFirstSeenAtTicks, 0L); + Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); + return (snapshot, 0L); + } + + state.DeferredBackgroundTaskFingerprint = snapshot.Fingerprint; + Interlocked.Exchange(ref state.DeferredBackgroundTasksFirstSeenAtTicks, firstSeenTicks); + Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, firstSeenTicks); + return (snapshot, firstSeenTicks); + } + + private void TryResolveDeferredIdleAfterBackgroundTaskChange(SessionState state, string sessionName, object? backgroundTasksOrEventData) + { + var tracking = RefreshDeferredBackgroundTaskTracking(state, backgroundTasksOrEventData); + if (!state.HasDeferredIdle || !state.Info.IsProcessing || !tracking.Snapshot.IsKnown || tracking.Snapshot.HasAny) + return; + + Debug($"[IDLE-DEFER-RESOLVE] '{sessionName}' background task set is now empty — completing deferred turn"); + InvokeOnUI(() => + { + if (state.IsOrphaned || !state.HasDeferredIdle || !state.Info.IsProcessing) + return; + + CompleteResponse(state); + }); + } + private void LogUnhandledSessionEvent(string sessionName, SessionEvent evt) { var eventTypeName = evt.GetType().Name; @@ -575,10 +720,16 @@ void Invoke(Action action) // via the normal CompleteResponse path on the next session.idle. // WasUserAborted guard: skip re-arm if the user explicitly clicked Stop — // in-flight TurnStart events from before the abort must not restart processing. - if (!state.Info.IsProcessing && isCurrentState && !state.IsOrphaned && !state.WasUserAborted) + if (ShouldRearmOnTurnStart( + state.Info.IsProcessing, + isCurrentState, + state.IsOrphaned, + state.WasUserAborted, + state.AllowTurnStartRearm)) { Debug($"[EVT-REARM] '{sessionName}' TurnStartEvent arrived after premature session.idle — re-arming IsProcessing"); state.PrematureIdleSignal.Set(); // Signal to ExecuteWorkerAsync that TCS result was truncated + state.AllowTurnStartRearm = false; // One-shot guard for this completion cycle Invoke(() => { if (state.IsOrphaned) return; @@ -591,6 +742,10 @@ void Invoke(Action action) NotifyStateChangedCoalesced(); }); } + else if (!state.Info.IsProcessing && isCurrentState && !state.IsOrphaned && !state.WasUserAborted) + { + Debug($"[EVT-REARM-SKIP] '{sessionName}' TurnStartEvent arrived after explicit completion — ignoring stale replay"); + } else { Invoke(() => @@ -693,24 +848,16 @@ void Invoke(Action action) Debug($"[IDLE-DIAG] '{sessionName}' session.idle payload: backgroundTasks={{agents={agentCount}, shells={shellCount}, null={bt == null}}}"); } - // KEY FIX: Check if the server reports active background tasks (sub-agents, shells). - // session.idle with background tasks means "foreground quiesced, background still running." - // Do NOT treat this as terminal — flush text and wait for the real idle. - // Record when we first entered IDLE-DEFER for this turn (used for zombie expiry). - // CompareExchange(0 → now): sets only on the first IDLE-DEFER; subsequent ones - // for the same turn preserve the original timestamp so elapsed time is cumulative. - Interlocked.CompareExchange( - ref state.SubagentDeferStartedAtTicks, - DateTime.UtcNow.Ticks, - 0L); - - var deferTicks = Interlocked.Read(ref state.SubagentDeferStartedAtTicks); + // KEY FIX: age background tasks by stable fingerprint (agent/shell IDs), not just + // by "current turn." Without this, the same orphaned shell IDs get their timer + // reset on every new prompt and sessions like PROMPT can appear busy forever. + var tracking = RefreshDeferredBackgroundTaskTracking(state, idle.Data?.BackgroundTasks); + var deferTicks = tracking.FirstSeenTicks; var hasActiveTasks = HasActiveBackgroundTasks(idle, deferTicks); // Log zombie expiry here where Debug() is available (HasActiveBackgroundTasks is static) - var zombieBt = idle.Data?.BackgroundTasks; - var zombieAgentCount = zombieBt?.Agents?.Length ?? 0; - var zombieShellCount = zombieBt?.Shells?.Length ?? 0; + var zombieAgentCount = tracking.Snapshot.AgentCount; + var zombieShellCount = tracking.Snapshot.ShellCount; if (!hasActiveTasks && deferTicks != 0 && (zombieAgentCount > 0 || zombieShellCount > 0)) { var expiredMinutes = TimeSpan.FromTicks(DateTime.UtcNow.Ticks - deferTicks).TotalMinutes; @@ -915,8 +1062,7 @@ await notifService.SendNotificationAsync( CancelToolHealthCheck(state); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; - state.HasDeferredIdle = false; - Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); + ClearDeferredIdleTracking(state); Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); Interlocked.Exchange(ref state.EventCountThisTurn, 0); @@ -941,6 +1087,7 @@ await notifService.SendNotificationAsync( // call must see IsProcessing=false or it throws "already processing". // (Matches CompleteResponse ordering per INV-O3) state.Info.IsProcessing = false; + state.AllowTurnStartRearm = false; // Errors are terminal for this turn; late TurnStart events are stale state.Info.IsResumed = false; state.IsReconnectedSend = false; // INV-1: clear all per-turn flags on termination Interlocked.Exchange(ref state.SendingFlag, 0); // Release atomic send lock (INV-1) @@ -1072,20 +1219,16 @@ await notifService.SendNotificationAsync( break; } - case SessionBackgroundTasksChangedEvent: + case SessionBackgroundTasksChangedEvent backgroundTasksChanged: { // Real-time background task status update — fires when agents/shells start or stop. - // Provides proactive awareness without waiting for session.idle. - // Proactively stamp SubagentDeferStartedAtTicks so the zombie expiry timer - // starts as early as possible — don't wait for the next session.idle to learn - // that background tasks are active. CompareExchange(0 → now) preserves any - // existing timestamp from an earlier IDLE-DEFER for the same turn. - Interlocked.CompareExchange( - ref state.SubagentDeferStartedAtTicks, - DateTime.UtcNow.Ticks, - 0L); + // Provides proactive awareness without waiting for session.idle and preserves + // age across turns for the same shell/agent IDs so orphaned tasks still expire. + var bgTracking = RefreshDeferredBackgroundTaskTracking(state, backgroundTasksChanged.Data); Debug($"[BG-TASKS] '{sessionName}' background tasks changed " + - $"(SubagentDeferStartedAtTicks={Interlocked.Read(ref state.SubagentDeferStartedAtTicks)})"); + $"(agents={bgTracking.Snapshot.AgentCount}, shells={bgTracking.Snapshot.ShellCount}, " + + $"SubagentDeferStartedAtTicks={Interlocked.Read(ref state.SubagentDeferStartedAtTicks)})"); + TryResolveDeferredIdleAfterBackgroundTaskChange(state, sessionName, backgroundTasksChanged.Data); Invoke(() => { state.Info.LastUpdatedAt = DateTime.Now; @@ -1221,8 +1364,11 @@ private void TryAttachImages(MessageOptions options, List imagePaths) private static void ClearFlushedReplayDedup(SessionState state) { - state.LastFlushedResponseSegment = null; + // Clear armed FIRST: concurrent UI-thread readers check armed before segment, + // so seeing armed=false short-circuits even if segment hasn't been cleared yet. + // Both fields are volatile — safe for cross-thread access on ARM64. state.FlushedReplayDedupArmed = false; + state.LastFlushedResponseSegment = null; } private static bool WasResponseAlreadyFlushedThisTurn(SessionState state, string text) @@ -1238,6 +1384,20 @@ private static bool WasResponseAlreadyFlushedThisTurn(SessionState state, string return string.Equals(state.LastFlushedResponseSegment, text, StringComparison.Ordinal); } + internal static bool ShouldRearmOnTurnStart( + bool isProcessing, + bool isCurrentState, + bool isOrphaned, + bool wasUserAborted, + bool allowTurnStartRearm) + { + return !isProcessing + && isCurrentState + && !isOrphaned + && !wasUserAborted + && allowTurnStartRearm; + } + /// Flush accumulated assistant text to history without ending the turn. private void FlushCurrentResponse(SessionState state) { @@ -1360,8 +1520,7 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul Interlocked.Exchange(ref state.ActiveToolCallCount, 0); Interlocked.Exchange(ref state.SendingFlag, 0); state.HasUsedToolsThisTurn = false; - state.HasDeferredIdle = false; - Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); + ClearDeferredIdleTracking(state); state.IsReconnectedSend = false; // Clear reconnect flag on turn completion (defense-in-depth) state.FallbackCanceledByTurnStart = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); @@ -1424,6 +1583,7 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul state.Info.TotalApiTimeSeconds += (DateTime.UtcNow - started).TotalSeconds; state.Info.PremiumRequestsUsed++; } + state.AllowTurnStartRearm = true; // session.idle/turn-end completion can be premature; allow one late TurnStart recovery state.Info.IsProcessing = false; state.Info.IsResumed = false; Interlocked.Exchange(ref state.SendingFlag, 0); // Release atomic send lock @@ -2128,8 +2288,7 @@ private void TriggerToolHealthRecovery(SessionState state, string sessionName, s // Full cleanup mirroring CompleteResponse — missing fields here caused stuck sessions Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; - state.HasDeferredIdle = false; - Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); + ClearDeferredIdleTracking(state); state.FallbackCanceledByTurnStart = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); Interlocked.Exchange(ref state.WatchdogCaseAResets, 0); @@ -2150,6 +2309,7 @@ private void TriggerToolHealthRecovery(SessionState state, string sessionName, s state.PendingReasoningMessages.Clear(); state.Info.IsProcessing = false; + state.AllowTurnStartRearm = false; // Explicit tool-health recovery should stay completed state.Info.IsResumed = false; Interlocked.Exchange(ref state.SendingFlag, 0); state.Info.ProcessingStartedAt = null; @@ -2298,11 +2458,9 @@ internal static bool HasActiveBackgroundTasks( SessionIdleEvent idle, long idleDeferStartedAtTicks = 0) { - var bt = idle.Data?.BackgroundTasks; - if (bt == null) return false; - - bool hasAgents = bt.Agents is { Length: > 0 }; - bool hasShells = bt.Shells is { Length: > 0 }; + var snapshot = GetBackgroundTaskSnapshot(idle.Data?.BackgroundTasks); + bool hasAgents = snapshot.AgentCount > 0; + bool hasShells = snapshot.ShellCount > 0; if ((hasAgents || hasShells) && idleDeferStartedAtTicks != 0) { @@ -2739,8 +2897,7 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session CancelToolHealthCheck(state); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; - state.HasDeferredIdle = false; - Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); + ClearDeferredIdleTracking(state); Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); Interlocked.Exchange(ref state.EventCountThisTurn, 0); @@ -2756,6 +2913,7 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session catch (Exception flushEx) { Debug($"[WATCHDOG] '{sessionName}' flush failed during kill: {flushEx.Message}"); } Debug($"[WATCHDOG] '{sessionName}' IsProcessing=false — watchdog timeout after {totalProcessingSeconds:F0}s total, elapsed={elapsed:F0}s, exceededMaxTime={exceededMaxTime}"); state.Info.IsProcessing = false; + state.AllowTurnStartRearm = false; // Watchdog timeout is an explicit forced stop Interlocked.Exchange(ref state.SendingFlag, 0); if (state.Info.ProcessingStartedAt is { } wdStarted) state.Info.TotalApiTimeSeconds += (DateTime.UtcNow - wdStarted).TotalSeconds; @@ -2835,12 +2993,12 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session catch { /* Flush failure must not prevent IsProcessing cleanup */ } // INV-1: clear IsProcessing and all 9 companion fields state.Info.IsProcessing = false; + state.AllowTurnStartRearm = false; // Watchdog crash cleanup is terminal for this turn state.Info.IsResumed = false; Interlocked.Exchange(ref state.SendingFlag, 0); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; - state.HasDeferredIdle = false; - Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); + ClearDeferredIdleTracking(state); Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); Interlocked.Exchange(ref state.EventCountThisTurn, 0); @@ -2896,8 +3054,7 @@ private void ClearProcessingStateForRecoveryFailure(SessionState state, string s state.Info.IsProcessing = false; state.Info.IsResumed = false; state.HasUsedToolsThisTurn = false; - state.HasDeferredIdle = false; - Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); + ClearDeferredIdleTracking(state); Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); @@ -3036,8 +3193,7 @@ private async Task TryRecoverPermissionAsync(SessionState state, string sessionN state.Info.IsProcessing = false; state.Info.IsResumed = false; state.HasUsedToolsThisTurn = false; - state.HasDeferredIdle = false; - Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); + ClearDeferredIdleTracking(state); Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); diff --git a/PolyPilot/Services/CopilotService.cs b/PolyPilot/Services/CopilotService.cs index bcbe0f025..d89b23bcf 100644 --- a/PolyPilot/Services/CopilotService.cs +++ b/PolyPilot/Services/CopilotService.cs @@ -569,12 +569,15 @@ private class SessionState /// orchestrator dispatch gets the full response text. public StringBuilder FlushedResponse { get; } = new(); /// The last response segment flushed during the current turn. Used only - /// for immediate same-subturn replay suppression when the SDK replays events. - public string? LastFlushedResponseSegment { get; set; } + /// for immediate same-subturn replay suppression when the SDK replays events. + /// Volatile: written on SDK background thread (ClearFlushedReplayDedup at tool/turn + /// boundaries), read on UI thread (WasResponseAlreadyFlushedThisTurn). + public volatile string? LastFlushedResponseSegment; /// True only until a new tool/sub-turn boundary is observed. This keeps /// replay dedup scoped to the just-flushed segment instead of suppressing later - /// identical content from a legitimate follow-up sub-turn. - public bool FlushedReplayDedupArmed { get; set; } + /// identical content from a legitimate follow-up sub-turn. + /// Volatile: written on SDK background thread, read on UI thread. + public volatile bool FlushedReplayDedupArmed; public bool HasReceivedDeltasThisTurn { get; set; } public bool HasReceivedEventsSinceResume; public string? LastMessageId { get; set; } @@ -707,6 +710,26 @@ private class SessionState /// Cleared by SendPromptAsync at the start of each new turn. public volatile bool WasUserAborted; /// + /// One-shot guard for EVT-REARM. Set only by speculative auto-completion paths + /// (CompleteResponse) so a late AssistantTurnStartEvent can revive a turn that + /// was completed too early. Explicit abort/recovery paths leave this false so + /// stale SDK TurnStart replays do not resurrect intentionally-cleared sessions. + /// Cleared on each new SendPromptAsync turn. + /// + public volatile bool AllowTurnStartRearm; + /// + /// Stable identity of the most recently reported background task set (agent IDs + shell IDs). + /// Preserved across SendPromptAsync so the same orphaned background shells keep aging instead + /// of resetting the zombie timeout every time the user sends another prompt. + /// + public volatile string? DeferredBackgroundTaskFingerprint; + /// + /// UTC ticks when the current DeferredBackgroundTaskFingerprint was first observed. Unlike the + /// per-turn SubagentDeferStartedAtTicks, this can intentionally survive into the next prompt + /// so repeated reports of the SAME shell IDs still expire after the real wall-clock timeout. + /// + public long DeferredBackgroundTasksFirstSeenAtTicks; + /// /// UTC ticks when IDLE-DEFER was first entered for the current turn (first /// SessionIdleEvent with active background tasks). 0 = not set. /// Uses Interlocked for thread safety: the IDLE-DEFER section (SDK event thread) sets via @@ -722,6 +745,18 @@ private static void DisposePrematureIdleSignal(SessionState? state) try { state?.PrematureIdleSignal?.Dispose(); } catch { } } + private static void ClearDeferredIdleTracking(SessionState state, bool preserveCarryOver = false) + { + state.HasDeferredIdle = false; + Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); + + if (!preserveCarryOver) + { + state.DeferredBackgroundTaskFingerprint = null; + Interlocked.Exchange(ref state.DeferredBackgroundTasksFirstSeenAtTicks, 0L); + } + } + /// Ping interval to prevent the headless server from killing idle sessions. /// The server has a ~35 minute idle timeout; pinging every 15 minutes keeps sessions alive. internal const int KeepalivePingIntervalSeconds = 15 * 60; // 15 minutes @@ -3299,10 +3334,10 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis state.Info.ClearPermissionDenials(); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); // Reset stale tool count from previous turn state.HasUsedToolsThisTurn = false; // Reset stale tool flag from previous turn - state.HasDeferredIdle = false; // Reset deferred idle flag from previous turn - Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); // Companion pair — must clear with HasDeferredIdle + ClearDeferredIdleTracking(state, preserveCarryOver: true); // Keep stale shell age across turns so orphaned IDs can still expire state.IsReconnectedSend = false; // Clear reconnect flag — new turn starts fresh (see watchdog reconnect timeout) state.WasUserAborted = false; // Clear abort flag — new turn starts fresh (re-enables EVT-REARM) + state.AllowTurnStartRearm = false; // New user send is authoritative; ignore stale turn-start replays from the prior turn state.PrematureIdleSignal.Reset(); // Clear premature idle detection from previous turn state.FallbackCanceledByTurnStart = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); @@ -3598,8 +3633,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis }; // Mirror primary reconnect: reset tool tracking for new connection siblingState.HasUsedToolsThisTurn = false; - siblingState.HasDeferredIdle = false; - Interlocked.Exchange(ref siblingState.SubagentDeferStartedAtTicks, 0L); + ClearDeferredIdleTracking(siblingState); Interlocked.Exchange(ref siblingState.ActiveToolCallCount, 0); Interlocked.Exchange(ref siblingState.SuccessfulToolCountThisTurn, 0); Interlocked.Exchange(ref siblingState.ToolHealthStaleChecks, 0); @@ -3829,8 +3863,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis // Reset HasUsedToolsThisTurn so the retried turn starts with the default // 120s watchdog tier instead of the inflated 600s from stale tool state. state.HasUsedToolsThisTurn = false; - state.HasDeferredIdle = false; - Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); + ClearDeferredIdleTracking(state); // Schedule persistence of the new session ID so it survives app restart. // Without this, the debounced save captures the pre-reconnect snapshot @@ -3902,10 +3935,10 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis Debug($"[ERROR] '{sessionName}' reconnect+retry failed, clearing IsProcessing"); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; - state.HasDeferredIdle = false; - Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); + ClearDeferredIdleTracking(state); Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); state.Info.IsResumed = false; + state.AllowTurnStartRearm = false; // Explicit reconnect failure is terminal for this turn state.Info.IsProcessing = false; if (state.Info.ProcessingStartedAt is { } rcStarted) state.Info.TotalApiTimeSeconds += (DateTime.UtcNow - rcStarted).TotalSeconds; @@ -3926,10 +3959,10 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis Debug($"[ERROR] '{sessionName}' SendAsync failed, clearing IsProcessing (error={ex.Message})"); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; - state.HasDeferredIdle = false; - Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); + ClearDeferredIdleTracking(state); Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); state.Info.IsResumed = false; + state.AllowTurnStartRearm = false; // Explicit send failure is terminal for this turn state.Info.IsProcessing = false; if (state.Info.ProcessingStartedAt is { } saStarted) state.Info.TotalApiTimeSeconds += (DateTime.UtcNow - saStarted).TotalSeconds; @@ -4072,6 +4105,7 @@ public async Task AbortSessionAsync(string sessionName, bool markAsInterrupted = Debug($"[ABORT] '{sessionName}' user abort, clearing IsProcessing"); state.Info.IsProcessing = false; state.WasUserAborted = true; // Suppress EVT-REARM for in-flight TurnStart events after abort + state.AllowTurnStartRearm = false; // Abort is explicit terminal intent — do not revive on late TurnStart replays state.Info.IsResumed = false; if (state.Info.ProcessingStartedAt is { } abortStarted) state.Info.TotalApiTimeSeconds += (DateTime.UtcNow - abortStarted).TotalSeconds; @@ -4080,8 +4114,7 @@ public async Task AbortSessionAsync(string sessionName, bool markAsInterrupted = state.Info.ProcessingPhase = 0; Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; - state.HasDeferredIdle = false; - Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); // INV-1: companion pair with HasDeferredIdle + ClearDeferredIdleTracking(state); state.IsReconnectedSend = false; // INV-1: clear all per-turn flags on abort Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); // Release send lock — allows a subsequent SteerSessionAsync to acquire it immediately @@ -4207,8 +4240,7 @@ await InvokeOnUIAsync(() => Debug($"[STEER-ERROR] '{sessionName}' soft steer SendAsync failed, clearing IsProcessing (error={ex.Message})"); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; - state.HasDeferredIdle = false; - Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); + ClearDeferredIdleTracking(state); Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); state.Info.IsResumed = false; state.IsReconnectedSend = false; // INV-1 item 8: prevent stale 35s timeout on next watchdog start @@ -4496,8 +4528,7 @@ await InvokeOnUIAsync(() => state.Info.IsProcessing = false; state.Info.IsResumed = false; state.HasUsedToolsThisTurn = false; - state.HasDeferredIdle = false; - Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L); + ClearDeferredIdleTracking(state); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); Interlocked.Exchange(ref state.SendingFlag, 0); state.Info.ProcessingStartedAt = null;