From 8f89c7ad794d607f23bfef02973f2655dcb26b05 Mon Sep 17 00:00:00 2001 From: Peyton Montei Date: Fri, 15 May 2026 15:19:38 -0400 Subject: [PATCH 01/12] feat(agent): add StreamingGeneratorTemplate + progress-writer field degradation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation for non-Claude streaming summary generation. Mirrors the ReviewerTemplate pattern (review/types/template.go, from PR #1192): - StreamingGeneratorTemplate owns subprocess lifecycle (Start, scanner, drain, Wait, stderr capture, fallback detection, error wrapping into *TextGenerationError). Per-agent code only provides BuildCmd (argv) and Parser (stdout -> progress events). - testutil.FakeStreamCmd extracted from claudecode/generate_streaming_test.go so all 5 streaming agents (claudecode + 4 upcoming) reuse one fake. Drops the in-line itoa reinvention (carry-forward from PR #964 review) in favor of strconv.Itoa. - summaryProgressWriter renders PhaseFirstToken with graceful degradation: omits the (TTFT, cached input tokens) clause when both fields are zero; omits one or the other when only one is present. Lets non-Claude agents (some of which emit neither field) render cleanly under the same template. No agent uses the new template yet — wiring lands in subsequent commits (one per agent). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../claudecode/generate_streaming_test.go | 69 +------- cmd/entire/cli/agent/streaming_template.go | 150 ++++++++++++++++++ .../cli/agent/streaming_template_test.go | 135 ++++++++++++++++ cmd/entire/cli/agent/testutil/streaming.go | 50 ++++++ cmd/entire/cli/explain.go | 15 +- cmd/entire/cli/explain_test.go | 36 +++++ 6 files changed, 389 insertions(+), 66 deletions(-) create mode 100644 cmd/entire/cli/agent/streaming_template.go create mode 100644 cmd/entire/cli/agent/streaming_template_test.go create mode 100644 cmd/entire/cli/agent/testutil/streaming.go diff --git a/cmd/entire/cli/agent/claudecode/generate_streaming_test.go b/cmd/entire/cli/agent/claudecode/generate_streaming_test.go index c45d28d62c..1faa960ca8 100644 --- a/cmd/entire/cli/agent/claudecode/generate_streaming_test.go +++ b/cmd/entire/cli/agent/claudecode/generate_streaming_test.go @@ -6,10 +6,10 @@ import ( "os" "os/exec" "path/filepath" - "strings" "testing" "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/agent/testutil" ) func TestGenerateTextStreaming_Success(t *testing.T) { @@ -21,7 +21,7 @@ func TestGenerateTextStreaming_Success(t *testing.T) { } agentInst := &ClaudeCodeAgent{ - CommandRunner: fakeStreamCmd(string(fixture), "", 0), + CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0), } var phases []agent.ProgressPhase @@ -57,8 +57,8 @@ func TestGenerateTextStreaming_FallbackOnUnrecognizedFlag(t *testing.T) { // Old CLI: exit non-zero with stderr complaining about --output-format=stream-json. // Fallback path is exercised by routing the *second* call (GenerateText) to a // canned non-streaming envelope. - streamCall := fakeStreamCmd("", "error: unknown flag: --output-format=stream-json", 1) - nonStreamCall := fakeStreamCmd(`{"is_error":false,"result":"fallback ok","subtype":"success"}`, "", 0) + streamCall := testutil.FakeStreamCmd("", "error: unknown flag: --output-format=stream-json", 1) + nonStreamCall := testutil.FakeStreamCmd(`{"is_error":false,"result":"fallback ok","subtype":"success"}`, "", 0) calls := 0 agentInst := &ClaudeCodeAgent{ CommandRunner: func(ctx context.Context, name string, args ...string) *exec.Cmd { @@ -100,7 +100,7 @@ func TestGenerateTextStreaming_EnvelopeErrorSurfaced(t *testing.T) { } agentInst := &ClaudeCodeAgent{ - CommandRunner: fakeStreamCmd(string(fixture), "", 0), + CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0), } _, err = agentInst.GenerateTextStreaming(context.Background(), "test", "haiku", nil) if err == nil { @@ -135,62 +135,3 @@ func equalPhases(a, b []agent.ProgressPhase) bool { } return true } - -// fakeStreamCmd returns a CommandRunner factory whose *exec.Cmd, when Start()'d -// and Wait()'d, produces stdout/stderr/exit-code as configured. Implementation -// uses 'sh -c' to write fixture data; on Windows runners we'd need PowerShell, -// but our CI is Linux/macOS. -func fakeStreamCmd(stdout, stderr string, exitCode int) func(ctx context.Context, name string, args ...string) *exec.Cmd { - return func(ctx context.Context, _ string, _ ...string) *exec.Cmd { - script := buildFakeShellScript(stdout, stderr, exitCode) - return exec.CommandContext(ctx, "sh", "-c", script) - } -} - -func buildFakeShellScript(stdout, stderr string, exitCode int) string { - var sb strings.Builder - if stdout != "" { - sb.WriteString("cat <<'__EOF__'\n") - sb.WriteString(stdout) - if !strings.HasSuffix(stdout, "\n") { - sb.WriteString("\n") - } - sb.WriteString("__EOF__\n") - } - if stderr != "" { - sb.WriteString("cat <<'__EOF__' 1>&2\n") - sb.WriteString(stderr) - if !strings.HasSuffix(stderr, "\n") { - sb.WriteString("\n") - } - sb.WriteString("__EOF__\n") - } - if exitCode != 0 { - sb.WriteString("exit ") - sb.WriteString(itoa(exitCode)) - sb.WriteString("\n") - } - return sb.String() -} - -func itoa(n int) string { - if n == 0 { - return "0" - } - neg := n < 0 - if neg { - n = -n - } - var buf [12]byte - i := len(buf) - for n > 0 { - i-- - buf[i] = byte('0' + n%10) - n /= 10 - } - if neg { - i-- - buf[i] = '-' - } - return string(buf[i:]) -} diff --git a/cmd/entire/cli/agent/streaming_template.go b/cmd/entire/cli/agent/streaming_template.go new file mode 100644 index 0000000000..2742504996 --- /dev/null +++ b/cmd/entire/cli/agent/streaming_template.go @@ -0,0 +1,150 @@ +package agent + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "log/slog" + "os" + "os/exec" + "strings" + + "github.com/entireio/cli/cmd/entire/cli/logging" +) + +// StreamingGeneratorTemplate is the shared subprocess-lifecycle wrapper for +// streaming text generators. Per-agent code provides BuildCmd (argv) and +// Parser (stdout → progress events); the template handles Start, drain, +// Wait, stderr capture, and error wrapping. +// +// Parallels review/types/template.ReviewerTemplate (established in PR #1192). +// +// Fields must be non-nil before Generate is called; nil values cause +// Generate to return ErrTemplateMisconfigured. +type StreamingGeneratorTemplate struct { + // AgentName is an identifier used in log entries (e.g., "codex"). + AgentName string + + // DisplayName is the user-facing CLI binary name used in *TextGenerationError + // wrapping (e.g., "codex"). Often the same as AgentName but kept separate + // for cases where they differ (e.g., Cursor's agent is named "cursor" + // but its binary is "agent"). + DisplayName string + + // BuildCmd constructs the *exec.Cmd for one streaming call. Implementations + // MUST set cmd.Stdin to the prompt and cmd.Args to the agent's + // streaming-mode invocation. The template will set cmd.Dir = os.TempDir() + // and cmd.Env = StripGitEnv(os.Environ()) before Start. + BuildCmd func(ctx context.Context, prompt, model string) *exec.Cmd + + // Parser consumes the agent's stdout stream and dispatches progress + // callbacks. Returns the final result text on success. Must read until + // stdout EOF before returning so the template can call Wait cleanly. + // progress may be nil; Parser must handle that. + Parser func(stdout io.Reader, progress ProgressFn) (result string, err error) + + // LooksLikeUnrecognizedFlag is optional. When non-nil and the subprocess + // fails with stderr matching the predicate, the caller can fall back to + // the agent's non-streaming GenerateText path. The template surfaces + // this signal via ErrUnrecognizedStreamingFlag so the caller decides. + LooksLikeUnrecognizedFlag func(stderr string) bool +} + +// ErrTemplateMisconfigured is returned when required template fields are nil. +var ErrTemplateMisconfigured = errors.New("streaming template misconfigured") + +// ErrUnrecognizedStreamingFlag is returned when LooksLikeUnrecognizedFlag +// indicates the CLI rejected a streaming-specific flag. Callers that +// implement a fallback should errors.Is this to detect. +var ErrUnrecognizedStreamingFlag = errors.New("CLI rejected streaming flag") + +// Generate runs one streaming generation and returns the final result text. +// On error, returns *TextGenerationError carrying captured stderr and the +// stdout byte count, matching RunIsolatedTextGeneratorCLI's error shape. +func (t *StreamingGeneratorTemplate) Generate( + ctx context.Context, + prompt, model string, + progress ProgressFn, +) (string, error) { + if t.BuildCmd == nil || t.Parser == nil { + return "", ErrTemplateMisconfigured + } + + cmd := t.BuildCmd(ctx, prompt, model) + cmd.Dir = os.TempDir() + cmd.Env = StripGitEnv(os.Environ()) + + stdout, err := cmd.StdoutPipe() + if err != nil { + return "", fmt.Errorf("%s stream stdout pipe: %w", t.AgentName, err) + } + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Start(); err != nil { + if ctx.Err() != nil { + return "", &TextGenerationError{ + Err: ctx.Err(), + Stderr: strings.TrimSpace(stderr.String()), + StdoutBytes: 0, + } + } + return "", fmt.Errorf("%s stream start: %w", t.AgentName, err) + } + + counter := &countingReader{r: stdout} + result, parseErr := t.Parser(counter, progress) + + if _, drainErr := io.Copy(io.Discard, stdout); drainErr != nil { + logging.Debug(ctx, "draining stream stdout", + slog.String("agent", t.AgentName), + slog.String("error", drainErr.Error())) + } + waitErr := cmd.Wait() + + if ctx.Err() != nil { + stderrStr := strings.TrimSpace(stderr.String()) + return "", &TextGenerationError{ + Err: ctx.Err(), + Stderr: stderrStr, + StdoutBytes: counter.n, + } + } + + if parseErr == nil && waitErr == nil { + return result, nil + } + + stderrStr := strings.TrimSpace(stderr.String()) + if waitErr != nil && t.LooksLikeUnrecognizedFlag != nil && t.LooksLikeUnrecognizedFlag(stderrStr) { + logging.Warn(ctx, "CLI rejected streaming flags; caller should fall back to non-streaming", + slog.String("agent", t.AgentName), + slog.String("stderr", stderrStr)) + return "", ErrUnrecognizedStreamingFlag + } + + wrappedErr := waitErr + if wrappedErr == nil { + wrappedErr = parseErr + } + return "", &TextGenerationError{ + Err: fmt.Errorf("%s stream failed: %w", t.AgentName, wrappedErr), + Stderr: stderrStr, + StdoutBytes: counter.n, + } +} + +// countingReader passes bytes through and counts them. Used by the template +// so the diagnostic path can ask "did the subprocess produce any output?". +type countingReader struct { + r io.Reader + n int +} + +func (c *countingReader) Read(p []byte) (int, error) { + n, err := c.r.Read(p) + c.n += n + return n, err //nolint:wrapcheck // io.Reader contract requires passthrough (including io.EOF) without wrapping +} diff --git a/cmd/entire/cli/agent/streaming_template_test.go b/cmd/entire/cli/agent/streaming_template_test.go new file mode 100644 index 0000000000..354d626e77 --- /dev/null +++ b/cmd/entire/cli/agent/streaming_template_test.go @@ -0,0 +1,135 @@ +package agent_test + +import ( + "context" + "errors" + "io" + "os/exec" + "strings" + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/agent/testutil" +) + +func TestStreamingGeneratorTemplate_Generate_Success(t *testing.T) { + t.Parallel() + + parsed := false + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "fake", + DisplayName: "fake", + BuildCmd: func(ctx context.Context, _, _ string) *exec.Cmd { + return testutil.FakeStreamCmd("hello\nworld\n", "", 0)(ctx, "fake", []string{}...) + }, + Parser: func(stdout io.Reader, _ agent.ProgressFn) (string, error) { + b, err := io.ReadAll(stdout) + if err != nil { + return "", err + } + parsed = true + return strings.TrimSpace(string(b)), nil + }, + } + + result, err := tmpl.Generate(context.Background(), "prompt", "model", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result != "hello\nworld" { + t.Errorf("result = %q, want %q", result, "hello\nworld") + } + if !parsed { + t.Error("expected parser to have been called") + } +} + +func TestStreamingGeneratorTemplate_Generate_NilFieldsReturnError(t *testing.T) { + t.Parallel() + + tmpl := &agent.StreamingGeneratorTemplate{} + _, err := tmpl.Generate(context.Background(), "prompt", "model", nil) + if !errors.Is(err, agent.ErrTemplateMisconfigured) { + t.Errorf("err = %v, want ErrTemplateMisconfigured", err) + } +} + +func TestStreamingGeneratorTemplate_Generate_UnrecognizedFlagFallback(t *testing.T) { + t.Parallel() + + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "fake", + DisplayName: "fake", + BuildCmd: func(ctx context.Context, _, _ string) *exec.Cmd { + return testutil.FakeStreamCmd("", "error: unknown flag: --stream-json", 1)(ctx, "fake", []string{}...) + }, + Parser: func(stdout io.Reader, _ agent.ProgressFn) (string, error) { + _, _ = io.Copy(io.Discard, stdout) //nolint:errcheck // best-effort drain in test fake; failure here is irrelevant + return "", nil + }, + LooksLikeUnrecognizedFlag: func(stderr string) bool { + return strings.Contains(stderr, "unknown flag") && strings.Contains(stderr, "stream-json") + }, + } + + _, err := tmpl.Generate(context.Background(), "prompt", "model", nil) + if !errors.Is(err, agent.ErrUnrecognizedStreamingFlag) { + t.Errorf("err = %v, want ErrUnrecognizedStreamingFlag", err) + } +} + +func TestStreamingGeneratorTemplate_Generate_NonZeroExitWrapsError(t *testing.T) { + t.Parallel() + + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "fake", + DisplayName: "fake", + BuildCmd: func(ctx context.Context, _, _ string) *exec.Cmd { + return testutil.FakeStreamCmd("partial\n", "boom\n", 1)(ctx, "fake", []string{}...) + }, + Parser: func(stdout io.Reader, _ agent.ProgressFn) (string, error) { + _, _ = io.Copy(io.Discard, stdout) //nolint:errcheck // best-effort drain in test fake; failure here is irrelevant + return "", nil + }, + } + + _, err := tmpl.Generate(context.Background(), "prompt", "model", nil) + var failure *agent.TextGenerationError + if !errors.As(err, &failure) { + t.Fatalf("err = %v, want *TextGenerationError", err) + } + if !strings.Contains(failure.Stderr, "boom") { + t.Errorf("stderr captured = %q, want substring 'boom'", failure.Stderr) + } + if failure.StdoutBytes == 0 { + t.Errorf("stdoutBytes = 0, want > 0 (subprocess emitted 'partial\\n')") + } +} + +func TestStreamingGeneratorTemplate_Generate_ContextCancelled(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "fake", + DisplayName: "fake", + BuildCmd: func(ctx context.Context, _, _ string) *exec.Cmd { + return testutil.FakeStreamCmd("ok\n", "", 0)(ctx, "fake", []string{}...) + }, + Parser: func(stdout io.Reader, _ agent.ProgressFn) (string, error) { + _, _ = io.Copy(io.Discard, stdout) //nolint:errcheck // best-effort drain in test fake; failure here is irrelevant + return "", nil + }, + } + + _, err := tmpl.Generate(ctx, "prompt", "model", nil) + var failure *agent.TextGenerationError + if !errors.As(err, &failure) { + t.Fatalf("err = %v, want *TextGenerationError wrapping context error", err) + } + if !errors.Is(failure.Err, context.Canceled) { + t.Errorf("inner err = %v, want context.Canceled", failure.Err) + } +} diff --git a/cmd/entire/cli/agent/testutil/streaming.go b/cmd/entire/cli/agent/testutil/streaming.go new file mode 100644 index 0000000000..874c8c94fd --- /dev/null +++ b/cmd/entire/cli/agent/testutil/streaming.go @@ -0,0 +1,50 @@ +// Package testutil holds shared test helpers for the agent package and +// its sub-packages. Not for production use. +package testutil + +import ( + "context" + "os/exec" + "strconv" + "strings" +) + +// FakeStreamCmd returns a CommandRunner factory whose *exec.Cmd, when +// Start()'d and Wait()'d, produces stdout/stderr/exit-code as configured. +// Implementation uses `sh -c` to write fixture data; on Windows runners +// we would need PowerShell, but the project's CI is Linux/macOS only. +func FakeStreamCmd(stdout, stderr string, exitCode int) func(ctx context.Context, name string, args ...string) *exec.Cmd { + return func(ctx context.Context, _ string, _ ...string) *exec.Cmd { + script := BuildFakeShellScript(stdout, stderr, exitCode) + return exec.CommandContext(ctx, "sh", "-c", script) + } +} + +// BuildFakeShellScript renders a shell snippet that emits the given stdout, +// stderr, and exit code. Exported so callers that need a customized fake +// (e.g. multi-stage stdout) can compose against it. +func BuildFakeShellScript(stdout, stderr string, exitCode int) string { + var sb strings.Builder + if stdout != "" { + sb.WriteString("cat <<'__EOF__'\n") + sb.WriteString(stdout) + if !strings.HasSuffix(stdout, "\n") { + sb.WriteString("\n") + } + sb.WriteString("__EOF__\n") + } + if stderr != "" { + sb.WriteString("cat <<'__EOF__' 1>&2\n") + sb.WriteString(stderr) + if !strings.HasSuffix(stderr, "\n") { + sb.WriteString("\n") + } + sb.WriteString("__EOF__\n") + } + if exitCode != 0 { + sb.WriteString("exit ") + sb.WriteString(strconv.Itoa(exitCode)) + sb.WriteString("\n") + } + return sb.String() +} diff --git a/cmd/entire/cli/explain.go b/cmd/entire/cli/explain.go index aabdbc89a2..b818c502f8 100644 --- a/cmd/entire/cli/explain.go +++ b/cmd/entire/cli/explain.go @@ -1447,9 +1447,20 @@ func (s *summaryProgressWriter) handle(p agent.GenerationProgress) { case agent.PhaseConnecting: s.printLine(s.arrow + " Sending request to provider...") case agent.PhaseFirstToken: + var clauses []string + if p.TTFTms > 0 { + clauses = append(clauses, "TTFT "+formatMs(p.TTFTms)) + } + if p.CachedInputTokens > 0 { + clauses = append(clauses, formatTokenCount(p.CachedInputTokens)+" cached input tokens") + } + detail := "" + if len(clauses) > 0 { + detail = " (" + strings.Join(clauses, ", ") + ")" + } s.printLine(fmt.Sprintf( - "%s Provider responded (TTFT %s, %s cached input tokens) -- generating...", - s.arrow, formatMs(p.TTFTms), formatTokenCount(p.CachedInputTokens))) + "%s Provider responded%s -- generating...", + s.arrow, detail)) case agent.PhaseGenerating: if !s.shouldEmitGenerating(p) { return diff --git a/cmd/entire/cli/explain_test.go b/cmd/entire/cli/explain_test.go index 76db9a13ff..d016c00533 100644 --- a/cmd/entire/cli/explain_test.go +++ b/cmd/entire/cli/explain_test.go @@ -6675,6 +6675,42 @@ func TestSummaryProgressWriter_NonTTYGenerateThrottle(t *testing.T) { } } +func TestSummaryProgressWriter_FirstTokenWithoutOptionalFields(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + attempt := newSummaryAttempt("gemini-cli", 0) + pw := newSummaryProgressWriter(&buf, attempt) + + pw.handle(agent.GenerationProgress{Phase: agent.PhaseFirstToken}) + + out := buf.String() + if !strings.Contains(out, "Provider responded -- generating...") { + t.Errorf("output = %q, want 'Provider responded -- generating...' (no parens)", out) + } + if strings.Contains(out, "(") { + t.Errorf("output = %q, should not contain '(' when both fields are zero", out) + } +} + +func TestSummaryProgressWriter_FirstTokenWithOnlyCachedTokens(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + attempt := newSummaryAttempt("codex", 0) + pw := newSummaryProgressWriter(&buf, attempt) + + pw.handle(agent.GenerationProgress{Phase: agent.PhaseFirstToken, CachedInputTokens: 23808}) + + out := buf.String() + if !strings.Contains(out, "Provider responded (23.8k cached input tokens) -- generating...") { + t.Errorf("output = %q, want '(23.8k cached input tokens)' clause", out) + } + if strings.Contains(out, "TTFT") { + t.Errorf("output = %q, should not mention TTFT when TTFTms == 0", out) + } +} + func TestSummaryProgressWriter_Accessible(t *testing.T) { // Cannot use t.Parallel() — t.Setenv mutates process-global state. // (Strictly, t.Setenv IS compatible with t.Parallel() in Go 1.17+ when From 8843e71b8a8eb759d36e657a3ba9a36e93d589c4 Mon Sep 17 00:00:00 2001 From: Peyton Montei Date: Fri, 15 May 2026 16:26:42 -0400 Subject: [PATCH 02/12] feat(codex): implement StreamingTextGenerator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex now emits live progress events through the explain --generate UI. parseCodexStream consumes `codex exec --skip-git-repo-check --json -` NDJSON output and dispatches our 4 phases: - turn.started -> PhaseConnecting - turn.completed -> PhaseFirstToken (carries cached_input_tokens from usage; no TTFT — Codex doesn't emit one; progress writer omits the clause gracefully) then PhaseDone (with usage.output_tokens and an observed turn duration) PhaseFirstToken is intentionally deferred until turn.completed: Codex's --json CLI buffers each item and emits agent_message in one chunk, so there is no incremental "first token" signal to surface, and the usage data (cached_input_tokens needed for the progress banner) is only available on the terminal turn.completed event. Tool-call events (item.{started,completed} with type=command_execution) are intentionally ignored — summary generation doesn't authorize tools, so they shouldn't appear in this code path, but the parser skips them silently if they do. Verified end-to-end against the real codex CLI on a 734k-token transcript; output sequence matches `Sending request -> Provider responded (Xk cached input tokens) -> Summary generated (Ys, Z output tokens)`. Falls back to non-streaming GenerateText if the CLI rejects --json (older codex versions). Fallback path is unit-tested via the shared FakeStreamCmd. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../cli/agent/codex/generate_streaming.go | 159 ++++++++++++++++++ .../agent/codex/generate_streaming_test.go | 132 +++++++++++++++ cmd/entire/cli/agent/codex/stream_response.go | 27 +++ .../agent/codex/testdata/stream_error.jsonl | 4 + .../agent/codex/testdata/stream_success.jsonl | 4 + 5 files changed, 326 insertions(+) create mode 100644 cmd/entire/cli/agent/codex/generate_streaming.go create mode 100644 cmd/entire/cli/agent/codex/generate_streaming_test.go create mode 100644 cmd/entire/cli/agent/codex/stream_response.go create mode 100644 cmd/entire/cli/agent/codex/testdata/stream_error.jsonl create mode 100644 cmd/entire/cli/agent/codex/testdata/stream_success.jsonl diff --git a/cmd/entire/cli/agent/codex/generate_streaming.go b/cmd/entire/cli/agent/codex/generate_streaming.go new file mode 100644 index 0000000000..2b4d9e4afd --- /dev/null +++ b/cmd/entire/cli/agent/codex/generate_streaming.go @@ -0,0 +1,159 @@ +package codex + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os/exec" + "strings" + "time" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +const streamBufferMax = 4 * 1024 * 1024 // 4 MiB + +// parseCodexStream consumes `codex exec --json` NDJSON output, dispatches +// progress callbacks, and returns the final agent_message text. +func parseCodexStream(stdout io.Reader, progress agent.ProgressFn) (string, error) { + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 64*1024), streamBufferMax) + + var ( + resultText string + sawTurnComplete bool + usage *codexStreamUsage + turnStartedAt time.Time + turnDuration time.Duration + ) + + dispatch := func(p agent.GenerationProgress) { + if progress != nil { + progress(p) + } + } + + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var ev codexStreamEvent + if err := json.Unmarshal(line, &ev); err != nil { + continue + } + + switch ev.Type { + case "turn.started": + turnStartedAt = time.Now() + dispatch(agent.GenerationProgress{Phase: agent.PhaseConnecting}) + + case "item.completed": + // Codex emits the full agent_message in one item; we capture + // the text but defer PhaseFirstToken until turn.completed so + // the cached_input_tokens usage clause can be attached. The + // CLI buffers and emits items in one chunk per turn, so there + // is no incremental "first token" signal to surface anyway. + if ev.Item != nil && ev.Item.Type == "agent_message" { + resultText = ev.Item.Text + } + + case "turn.completed": + sawTurnComplete = true + usage = ev.Usage + if !turnStartedAt.IsZero() { + turnDuration = time.Since(turnStartedAt) + } + + case "turn.failed", "error": + detail := "unspecified error" + if len(line) > 0 { + detail = string(line) + } + return "", fmt.Errorf("codex turn failed: %s", detail) + } + } + if err := scanner.Err(); err != nil { + return "", fmt.Errorf("reading codex stream: %w", err) + } + if !sawTurnComplete { + return "", errors.New("codex stream ended without a turn.completed event") + } + if resultText == "" { + return "", errors.New("codex stream produced no agent_message") + } + if progress != nil { + firstToken := agent.GenerationProgress{Phase: agent.PhaseFirstToken} + if usage != nil { + firstToken.InputTokens = usage.InputTokens + firstToken.CachedInputTokens = usage.CachedInputTokens + } + dispatch(firstToken) + + done := agent.GenerationProgress{Phase: agent.PhaseDone} + if usage != nil { + done.OutputTokens = usage.OutputTokens + done.InputTokens = usage.InputTokens + done.CachedInputTokens = usage.CachedInputTokens + } + if turnDuration > 0 { + done.DurationMs = int(turnDuration.Milliseconds()) + } + dispatch(done) + } + return resultText, nil +} + +// GenerateTextStreaming implements agent.StreamingTextGenerator. +func (c *CodexAgent) GenerateTextStreaming( + ctx context.Context, + prompt, model string, + progress agent.ProgressFn, +) (string, error) { + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "codex", + DisplayName: "codex", + BuildCmd: c.buildStreamCmd, + Parser: parseCodexStream, + LooksLikeUnrecognizedFlag: looksLikeCodexUnrecognizedFlag, + } + + result, err := tmpl.Generate(ctx, prompt, model, progress) + if err != nil { + if errors.Is(err, agent.ErrUnrecognizedStreamingFlag) { + return c.GenerateText(ctx, prompt, model) + } + return "", fmt.Errorf("codex streaming generate: %w", err) + } + return result, nil +} + +func (c *CodexAgent) buildStreamCmd(ctx context.Context, prompt, model string) *exec.Cmd { + commandRunner := c.CommandRunner + if commandRunner == nil { + commandRunner = exec.CommandContext + } + args := []string{"exec", "--skip-git-repo-check", "--json"} + if model != "" { + args = append(args, "--model", model) + } + args = append(args, "-") + cmd := commandRunner(ctx, "codex", args...) + cmd.Stdin = strings.NewReader(prompt) + return cmd +} + +func looksLikeCodexUnrecognizedFlag(stderr string) bool { + lower := strings.ToLower(stderr) + hasRejectPhrase := strings.Contains(lower, "unrecognized option") || + strings.Contains(lower, "unknown flag") || + strings.Contains(lower, "unknown option") || + strings.Contains(lower, "invalid option") + if !hasRejectPhrase { + return false + } + return strings.Contains(lower, "json") || strings.Contains(lower, "exec") +} diff --git a/cmd/entire/cli/agent/codex/generate_streaming_test.go b/cmd/entire/cli/agent/codex/generate_streaming_test.go new file mode 100644 index 0000000000..a2add02644 --- /dev/null +++ b/cmd/entire/cli/agent/codex/generate_streaming_test.go @@ -0,0 +1,132 @@ +package codex + +import ( + "bytes" + "context" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/agent/testutil" +) + +func TestParseCodexStream_Success(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + var phases []agent.ProgressPhase + result, err := parseCodexStream(bytes.NewReader(data), func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + }) + if err != nil { + t.Fatalf("parse: %v", err) + } + if result != "Hello, world." { + t.Errorf("result = %q, want %q", result, "Hello, world.") + } + + counts := map[agent.ProgressPhase]int{} + for _, p := range phases { + counts[p]++ + } + if counts[agent.PhaseConnecting] != 1 { + t.Errorf("PhaseConnecting count = %d, want 1", counts[agent.PhaseConnecting]) + } + if counts[agent.PhaseFirstToken] != 1 { + t.Errorf("PhaseFirstToken count = %d, want 1", counts[agent.PhaseFirstToken]) + } + if counts[agent.PhaseDone] != 1 { + t.Errorf("PhaseDone count = %d, want 1", counts[agent.PhaseDone]) + } +} + +func TestParseCodexStream_ErrorEnvelope(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_error.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + _, err = parseCodexStream(bytes.NewReader(data), nil) + if err == nil { + t.Fatal("expected error from error envelope") + } + if !strings.Contains(err.Error(), "model not found") { + t.Errorf("error %q should mention 'model not found'", err) + } +} + +func TestParseCodexStream_MissingTurnCompleted(t *testing.T) { + t.Parallel() + + stream := `{"type":"thread.started","thread_id":"t"} +{"type":"item.completed","item":{"id":"i","type":"agent_message","text":"partial"}} +` + _, err := parseCodexStream(strings.NewReader(stream), nil) + if err == nil { + t.Fatal("expected error when stream lacks turn.completed") + } +} + +func TestCodexGenerateTextStreaming_Success(t *testing.T) { + t.Parallel() + + fixture, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + c := &CodexAgent{ + CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0), + } + + var phases []agent.ProgressPhase + result, err := c.GenerateTextStreaming(context.Background(), "test prompt", "haiku", func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result != "Hello, world." { + t.Errorf("result = %q, want %q", result, "Hello, world.") + } + if len(phases) != 3 { + t.Errorf("phases = %v (count %d), want 3 (Connecting, FirstToken, Done)", phases, len(phases)) + } +} + +func TestCodexGenerateTextStreaming_FallbackOnUnrecognizedFlag(t *testing.T) { + t.Parallel() + + streamCall := testutil.FakeStreamCmd("", "error: unknown flag: --json", 1) + nonStreamCall := testutil.FakeStreamCmd("fallback response", "", 0) + calls := 0 + c := &CodexAgent{ + CommandRunner: func(ctx context.Context, name string, args ...string) *exec.Cmd { + calls++ + if calls == 1 { + return streamCall(ctx, name, args...) + } + return nonStreamCall(ctx, name, args...) + }, + } + + result, err := c.GenerateTextStreaming(context.Background(), "test", "haiku", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(result, "fallback") { + t.Errorf("result = %q, want substring 'fallback'", result) + } + if calls != 2 { + t.Errorf("calls = %d, want 2 (streaming + fallback)", calls) + } +} diff --git a/cmd/entire/cli/agent/codex/stream_response.go b/cmd/entire/cli/agent/codex/stream_response.go new file mode 100644 index 0000000000..1a84198b2b --- /dev/null +++ b/cmd/entire/cli/agent/codex/stream_response.go @@ -0,0 +1,27 @@ +package codex + +// codexStreamEvent is one decoded line of `codex exec --json` output. +// Fields are populated based on the event Type/Item.Type. +type codexStreamEvent struct { + Type string `json:"type"` + ThreadID string `json:"thread_id,omitempty"` + Item *codexStreamItem `json:"item,omitempty"` + Usage *codexStreamUsage `json:"usage,omitempty"` +} + +// codexStreamItem appears inside type=item.completed events. For summary +// generation we care only about Type="agent_message" items, which carry +// the model's response in Text. +type codexStreamItem struct { + ID string `json:"id"` + Type string `json:"type"` // "agent_message" | "command_execution" | ... + Text string `json:"text"` // populated for agent_message +} + +// codexStreamUsage appears inside the terminal type=turn.completed event. +type codexStreamUsage struct { + InputTokens int `json:"input_tokens,omitempty"` + CachedInputTokens int `json:"cached_input_tokens,omitempty"` + OutputTokens int `json:"output_tokens,omitempty"` + ReasoningOutputTokens int `json:"reasoning_output_tokens,omitempty"` +} diff --git a/cmd/entire/cli/agent/codex/testdata/stream_error.jsonl b/cmd/entire/cli/agent/codex/testdata/stream_error.jsonl new file mode 100644 index 0000000000..e2799e9b06 --- /dev/null +++ b/cmd/entire/cli/agent/codex/testdata/stream_error.jsonl @@ -0,0 +1,4 @@ +{"type":"thread.started","thread_id":"test-thread"} +{"type":"turn.started"} +{"type":"error","message":"invalid_request_error: model not found"} +{"type":"turn.failed","error":{"message":"model not found"}} diff --git a/cmd/entire/cli/agent/codex/testdata/stream_success.jsonl b/cmd/entire/cli/agent/codex/testdata/stream_success.jsonl new file mode 100644 index 0000000000..a13b4004b4 --- /dev/null +++ b/cmd/entire/cli/agent/codex/testdata/stream_success.jsonl @@ -0,0 +1,4 @@ +{"type":"thread.started","thread_id":"test-thread"} +{"type":"turn.started"} +{"type":"item.completed","item":{"id":"item_0","type":"agent_message","text":"Hello, world."}} +{"type":"turn.completed","usage":{"input_tokens":9,"cached_input_tokens":1234,"output_tokens":3,"reasoning_output_tokens":0}} From 1db240c8084a1b41227921c1ae5269f44b4c5ed5 Mon Sep 17 00:00:00 2001 From: Peyton Montei Date: Fri, 15 May 2026 16:37:30 -0400 Subject: [PATCH 03/12] fix(codex): redact raw error envelopes + surface skipped malformed lines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two findings from code-quality review on commit 3d0a38c01: 1. `turn.failed`/`error` event handler used to include `string(line)` — the full raw JSON envelope — in the returned error. That string flows through *TextGenerationError.Err into logs, telemetry, and surfaces in user-facing error output. The raw line can carry echoed user content or model-message fragments, violating the CLAUDE.md privacy rule. Decode just the operational `message` / `error.message` fields instead; fall through to "unspecified error" when neither is present. 2. Malformed JSON lines were silently `continue`'d. A Codex protocol regression would manifest only as the downstream "produced no agent_message" error. Count the skipped lines and include the count in the error message so protocol drift becomes visible without logging the raw lines themselves. No behavior change on the happy path. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../cli/agent/codex/generate_streaming.go | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/cmd/entire/cli/agent/codex/generate_streaming.go b/cmd/entire/cli/agent/codex/generate_streaming.go index 2b4d9e4afd..ef92feee97 100644 --- a/cmd/entire/cli/agent/codex/generate_streaming.go +++ b/cmd/entire/cli/agent/codex/generate_streaming.go @@ -28,6 +28,7 @@ func parseCodexStream(stdout io.Reader, progress agent.ProgressFn) (string, erro usage *codexStreamUsage turnStartedAt time.Time turnDuration time.Duration + malformed int ) dispatch := func(p agent.GenerationProgress) { @@ -43,6 +44,11 @@ func parseCodexStream(stdout io.Reader, progress agent.ProgressFn) (string, erro } var ev codexStreamEvent if err := json.Unmarshal(line, &ev); err != nil { + // Codex may emit transient noise (blank lines, partial flushes); a + // schema-incompatible line is recoverable per-event but tracked so + // protocol regressions surface in the "no agent_message" error + // instead of disappearing silently. + malformed++ continue } @@ -69,9 +75,23 @@ func parseCodexStream(stdout io.Reader, progress agent.ProgressFn) (string, erro } case "turn.failed", "error": - detail := "unspecified error" - if len(line) > 0 { - detail = string(line) + var details struct { + Message string `json:"message"` + Error struct { + Message string `json:"message"` + } `json:"error"` + } + // Partial decode tolerated: if the schema changes we fall through + // to "unspecified error" rather than leaking the raw line (which + // may carry echoed user content or model-message fragments) into + // logs, telemetry, and *TextGenerationError.Stderr. + _ = json.Unmarshal(line, &details) //nolint:errcheck // see comment above + detail := details.Message + if detail == "" { + detail = details.Error.Message + } + if detail == "" { + detail = "unspecified error" } return "", fmt.Errorf("codex turn failed: %s", detail) } @@ -83,6 +103,9 @@ func parseCodexStream(stdout io.Reader, progress agent.ProgressFn) (string, erro return "", errors.New("codex stream ended without a turn.completed event") } if resultText == "" { + if malformed > 0 { + return "", fmt.Errorf("codex stream produced no agent_message (%d malformed lines skipped)", malformed) + } return "", errors.New("codex stream produced no agent_message") } if progress != nil { From 864cf79b37631970e4a1ff35057c548e9132bf2b Mon Sep 17 00:00:00 2001 From: Peyton Montei Date: Fri, 15 May 2026 17:05:42 -0400 Subject: [PATCH 04/12] feat(geminicli): implement StreamingTextGenerator Gemini now emits live progress events through the explain --generate UI. parseGeminiStream consumes `gemini --output-format stream-json -p " " --model ` NDJSON output and dispatches our 4 phases: - init -> PhaseConnecting - first message(role:assistant) -> PhaseFirstToken (no TTFT/cached -- Gemini's init/message events don't carry them; the progress writer omits the clause gracefully) - subsequent assistant messages -> PhaseGenerating with a running OutputTokens estimate from totalChars/4 - result(status:success, stats={...}) -> PhaseDone with stats-reported output/input/cached tokens and DurationMs (falling back to local clock if absent) Real-CLI capture (gemini-cli 0.38.2) deviated from the planned wire shape in two ways that this implementation accommodates: * There IS a terminal `result` event carrying stats.{total_tokens, input_tokens, output_tokens, cached, duration_ms}; the plan assumed the stream ended at EOF with no usage data. The parser uses the real usage instead of zeros. * Error envelopes ride inside the `result` event with status="error" and error={type,message}, not a separate "error" event type. The decoder reads error.message (privacy-preserving -- raw JSON lines never enter logs or *TextGenerationError.Stderr). Malformed lines are counted and surfaced in the no-content error so protocol drift doesn't disappear silently (matches the codex parser hardening in 47583e9c2). Falls back to non-streaming GenerateText if the CLI rejects the streaming flags. dupl lint did not fire on the per-agent flag-rejection heuristic at threshold 75, so no nolint directive was added; consolidation across all 4 agents is deferred to after Chunk 5 per plan. Verified end-to-end against the real gemini CLI on the same smoke checkpoint Chunk 2 used. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../cli/agent/geminicli/generate_streaming.go | 194 ++++++++++++++++++ .../geminicli/generate_streaming_test.go | 133 ++++++++++++ .../cli/agent/geminicli/stream_response.go | 38 ++++ .../geminicli/testdata/stream_error.jsonl | 3 + .../geminicli/testdata/stream_success.jsonl | 5 + 5 files changed, 373 insertions(+) create mode 100644 cmd/entire/cli/agent/geminicli/generate_streaming.go create mode 100644 cmd/entire/cli/agent/geminicli/generate_streaming_test.go create mode 100644 cmd/entire/cli/agent/geminicli/stream_response.go create mode 100644 cmd/entire/cli/agent/geminicli/testdata/stream_error.jsonl create mode 100644 cmd/entire/cli/agent/geminicli/testdata/stream_success.jsonl diff --git a/cmd/entire/cli/agent/geminicli/generate_streaming.go b/cmd/entire/cli/agent/geminicli/generate_streaming.go new file mode 100644 index 0000000000..43f541a0ed --- /dev/null +++ b/cmd/entire/cli/agent/geminicli/generate_streaming.go @@ -0,0 +1,194 @@ +package geminicli + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os/exec" + "strings" + "time" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +const streamBufferMax = 4 * 1024 * 1024 // 4 MiB + +// parseGeminiStream consumes `gemini --output-format stream-json` NDJSON +// output, dispatches progress callbacks, and returns the concatenated +// assistant content. +// +// Real-CLI shape (gemini-cli 0.38.2): +// - init -> PhaseConnecting +// - message(role=user) -> ignored (echo) +// - message(role=assistant, delta=true) -> PhaseFirstToken (1st) +// / PhaseGenerating (subsequent) +// content concatenated into result +// - result(status=success, stats={...}) -> PhaseDone (terminal) +// - result(status=error, error={type,message}, stats) -> returns error +// +// Note: Gemini typically emits ONE assistant message per turn rather than +// chunking deltas, so PhaseGenerating may not fire on real captures even +// though it works for multi-message fixtures. +func parseGeminiStream(stdout io.Reader, progress agent.ProgressFn) (string, error) { + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 64*1024), streamBufferMax) + + var ( + result strings.Builder + sawInit bool + firstTokenFired bool + stats *geminiStreamStats + malformed int + start = time.Now() + totalChars int + ) + + dispatch := func(p agent.GenerationProgress) { + if progress != nil { + progress(p) + } + } + + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var ev geminiStreamEvent + if err := json.Unmarshal(line, &ev); err != nil { + // Gemini may emit transient noise (blank lines, partial flushes); + // a schema-incompatible line is recoverable per-event but tracked + // so protocol regressions surface in the no-content error instead + // of disappearing silently. + malformed++ + continue + } + + switch ev.Type { + case "init": + sawInit = true + dispatch(agent.GenerationProgress{Phase: agent.PhaseConnecting}) + + case "message": + if ev.Role != "assistant" { + continue + } + result.WriteString(ev.Content) + totalChars += len(ev.Content) + if !firstTokenFired { + firstTokenFired = true + // Gemini's init/message events carry no TTFT, input-token, + // or cached-token data; usage is deferred to the terminal + // result event. PhaseFirstToken is dispatched without those + // fields so the progress writer's graceful degradation + // renders "Provider responded -- generating..." without + // parens. + dispatch(agent.GenerationProgress{Phase: agent.PhaseFirstToken}) + } else { + dispatch(agent.GenerationProgress{ + Phase: agent.PhaseGenerating, + OutputTokens: totalChars / 4, + }) + } + + case "result": + stats = ev.Stats + if ev.Status == "error" { + detail := "" + if ev.Error != nil { + detail = ev.Error.Message + } + if detail == "" { + // Partial decode tolerated: schema drift falls through to + // "unspecified error" rather than leaking raw lines (which + // may carry echoed user content or model fragments) into + // logs and *TextGenerationError.Stderr. + detail = "unspecified error" + } + return "", fmt.Errorf("gemini stream error: %s", detail) + } + } + } + if err := scanner.Err(); err != nil { + return "", fmt.Errorf("reading gemini stream: %w", err) + } + if !sawInit { + return "", errors.New("gemini stream ended without an init event") + } + if !firstTokenFired { + if malformed > 0 { + return "", fmt.Errorf("gemini stream produced no assistant content (%d malformed lines skipped)", malformed) + } + return "", errors.New("gemini stream produced no assistant content") + } + if progress != nil { + done := agent.GenerationProgress{Phase: agent.PhaseDone} + if stats != nil { + done.OutputTokens = stats.OutputTokens + done.InputTokens = stats.InputTokens + done.CachedInputTokens = stats.Cached + if stats.DurationMs > 0 { + done.DurationMs = stats.DurationMs + } + } + if done.DurationMs == 0 { + done.DurationMs = int(time.Since(start).Milliseconds()) + } + dispatch(done) + } + return result.String(), nil +} + +// GenerateTextStreaming implements agent.StreamingTextGenerator. +func (g *GeminiCLIAgent) GenerateTextStreaming( + ctx context.Context, + prompt, model string, + progress agent.ProgressFn, +) (string, error) { + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "gemini", + DisplayName: "gemini", + BuildCmd: g.buildStreamCmd, + Parser: parseGeminiStream, + LooksLikeUnrecognizedFlag: looksLikeGeminiUnrecognizedFlag, + } + + result, err := tmpl.Generate(ctx, prompt, model, progress) + if err != nil { + if errors.Is(err, agent.ErrUnrecognizedStreamingFlag) { + return g.GenerateText(ctx, prompt, model) + } + return "", fmt.Errorf("gemini streaming generate: %w", err) + } + return result, nil +} + +func (g *GeminiCLIAgent) buildStreamCmd(ctx context.Context, prompt, model string) *exec.Cmd { + commandRunner := g.CommandRunner + if commandRunner == nil { + commandRunner = exec.CommandContext + } + args := []string{"--output-format", "stream-json", "-p", " "} + if model != "" { + args = append(args, "--model", model) + } + cmd := commandRunner(ctx, "gemini", args...) + cmd.Stdin = strings.NewReader(prompt) + return cmd +} + +// looksLikeGeminiUnrecognizedFlag is the second of 4 per-agent flag-rejection +// heuristics. The controller will consolidate the 4 instances after Chunk 5. +func looksLikeGeminiUnrecognizedFlag(stderr string) bool { + lower := strings.ToLower(stderr) + hasRejectPhrase := strings.Contains(lower, "unknown") || + strings.Contains(lower, "unrecognized") || + strings.Contains(lower, "invalid") + if !hasRejectPhrase { + return false + } + return strings.Contains(lower, "output-format") || strings.Contains(lower, "stream-json") +} diff --git a/cmd/entire/cli/agent/geminicli/generate_streaming_test.go b/cmd/entire/cli/agent/geminicli/generate_streaming_test.go new file mode 100644 index 0000000000..9d3da0003d --- /dev/null +++ b/cmd/entire/cli/agent/geminicli/generate_streaming_test.go @@ -0,0 +1,133 @@ +package geminicli + +import ( + "bytes" + "context" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/agent/testutil" +) + +func TestParseGeminiStream_Success(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + var phases []agent.ProgressPhase + result, err := parseGeminiStream(bytes.NewReader(data), func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + }) + if err != nil { + t.Fatalf("parse: %v", err) + } + if !strings.Contains(result, "Hello") || !strings.Contains(result, "world") { + t.Errorf("result = %q, want concatenated assistant content", result) + } + + counts := map[agent.ProgressPhase]int{} + for _, p := range phases { + counts[p]++ + } + if counts[agent.PhaseConnecting] != 1 { + t.Errorf("PhaseConnecting count = %d, want 1", counts[agent.PhaseConnecting]) + } + if counts[agent.PhaseFirstToken] != 1 { + t.Errorf("PhaseFirstToken count = %d, want 1", counts[agent.PhaseFirstToken]) + } + if counts[agent.PhaseGenerating] < 1 { + t.Errorf("PhaseGenerating count = %d, want at least 1", counts[agent.PhaseGenerating]) + } + if counts[agent.PhaseDone] != 1 { + t.Errorf("PhaseDone count = %d, want 1 (emitted at result event)", counts[agent.PhaseDone]) + } +} + +func TestParseGeminiStream_ErrorEnvelope(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_error.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + _, err = parseGeminiStream(bytes.NewReader(data), nil) + if err == nil { + t.Fatal("expected error from error envelope") + } + if !strings.Contains(err.Error(), "invalid request") { + t.Errorf("error %q should mention 'invalid request' from error.message", err) + } +} + +func TestParseGeminiStream_EmptyStream(t *testing.T) { + t.Parallel() + + _, err := parseGeminiStream(strings.NewReader(""), nil) + if err == nil { + t.Fatal("expected error from empty stream (no init, no assistant content)") + } +} + +func TestGeminiCLIGenerateTextStreaming_Success(t *testing.T) { + t.Parallel() + + fixture, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + g := &GeminiCLIAgent{ + CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0), + } + + var phases []agent.ProgressPhase + result, err := g.GenerateTextStreaming(context.Background(), "test prompt", "haiku", func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(result, "Hello") || !strings.Contains(result, "world") { + t.Errorf("result = %q, want concatenated assistant content", result) + } + // Expect Connecting, FirstToken, Generating (at least once), Done. + if len(phases) < 4 { + t.Errorf("phases = %v (count %d), want >= 4 (Connecting, FirstToken, Generating+, Done)", phases, len(phases)) + } +} + +func TestGeminiCLIGenerateTextStreaming_FallbackOnUnrecognizedFlag(t *testing.T) { + t.Parallel() + + streamCall := testutil.FakeStreamCmd("", "error: unknown flag: --output-format", 1) + nonStreamCall := testutil.FakeStreamCmd("fallback response", "", 0) + calls := 0 + g := &GeminiCLIAgent{ + CommandRunner: func(ctx context.Context, name string, args ...string) *exec.Cmd { + calls++ + if calls == 1 { + return streamCall(ctx, name, args...) + } + return nonStreamCall(ctx, name, args...) + }, + } + + result, err := g.GenerateTextStreaming(context.Background(), "test", "haiku", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(result, "fallback") { + t.Errorf("result = %q, want substring 'fallback'", result) + } + if calls != 2 { + t.Errorf("calls = %d, want 2 (streaming + fallback)", calls) + } +} diff --git a/cmd/entire/cli/agent/geminicli/stream_response.go b/cmd/entire/cli/agent/geminicli/stream_response.go new file mode 100644 index 0000000000..88ac2e116f --- /dev/null +++ b/cmd/entire/cli/agent/geminicli/stream_response.go @@ -0,0 +1,38 @@ +package geminicli + +import "time" + +// geminiStreamEvent is one decoded line of `gemini --output-format stream-json` +// NDJSON output. Fields are populated based on Type: +// +// - "init": SessionID, Model +// - "message": Role ("user" or "assistant"), Content, Delta (true on incremental) +// - "result": Status ("success" | "error"), Error (when status=error), Stats +type geminiStreamEvent struct { + Type string `json:"type"` + Timestamp time.Time `json:"timestamp,omitempty"` + SessionID string `json:"session_id,omitempty"` + Model string `json:"model,omitempty"` + Role string `json:"role,omitempty"` + Content string `json:"content,omitempty"` + Delta bool `json:"delta,omitempty"` + Status string `json:"status,omitempty"` + Error *geminiStreamError `json:"error,omitempty"` + Stats *geminiStreamStats `json:"stats,omitempty"` +} + +// geminiStreamError is the body of a result event with status="error". +type geminiStreamError struct { + Type string `json:"type,omitempty"` + Message string `json:"message,omitempty"` +} + +// geminiStreamStats appears inside the terminal type=result event. +// Token field names mirror gemini-cli's stats schema. +type geminiStreamStats struct { + TotalTokens int `json:"total_tokens,omitempty"` + InputTokens int `json:"input_tokens,omitempty"` + OutputTokens int `json:"output_tokens,omitempty"` + Cached int `json:"cached,omitempty"` + DurationMs int `json:"duration_ms,omitempty"` +} diff --git a/cmd/entire/cli/agent/geminicli/testdata/stream_error.jsonl b/cmd/entire/cli/agent/geminicli/testdata/stream_error.jsonl new file mode 100644 index 0000000000..9f659be64f --- /dev/null +++ b/cmd/entire/cli/agent/geminicli/testdata/stream_error.jsonl @@ -0,0 +1,3 @@ +{"type":"init","timestamp":"2026-05-15T16:00:46.999Z","session_id":"test-session","model":"nonexistent-model"} +{"type":"message","timestamp":"2026-05-15T16:00:47.000Z","role":"user","content":"hi"} +{"type":"result","timestamp":"2026-05-15T16:00:47.100Z","status":"error","error":{"type":"unknown","message":"invalid request"},"stats":{"total_tokens":0,"input_tokens":0,"output_tokens":0,"cached":0,"duration_ms":0}} diff --git a/cmd/entire/cli/agent/geminicli/testdata/stream_success.jsonl b/cmd/entire/cli/agent/geminicli/testdata/stream_success.jsonl new file mode 100644 index 0000000000..3777856675 --- /dev/null +++ b/cmd/entire/cli/agent/geminicli/testdata/stream_success.jsonl @@ -0,0 +1,5 @@ +{"type":"init","timestamp":"2026-05-15T16:00:46.999Z","session_id":"test-session","model":"gemini-3-flash-preview"} +{"type":"message","timestamp":"2026-05-15T16:00:46.999Z","role":"user","content":"say hi"} +{"type":"message","timestamp":"2026-05-15T16:00:48.211Z","role":"assistant","content":"Hello","delta":true} +{"type":"message","timestamp":"2026-05-15T16:00:48.311Z","role":"assistant","content":", world.","delta":true} +{"type":"result","timestamp":"2026-05-15T16:00:48.500Z","status":"success","stats":{"total_tokens":42,"input_tokens":35,"output_tokens":7,"cached":0,"duration_ms":1501}} From d41483125c104683c7635f4de5d7944e724e7897 Mon Sep 17 00:00:00 2001 From: Peyton Montei Date: Tue, 19 May 2026 14:15:32 -0400 Subject: [PATCH 05/12] feat(copilotcli): implement StreamingTextGenerator Copilot now emits live progress events through the explain --generate UI. parseCopilotStream consumes `copilot --output-format json --stream on --allow-all-tools --disable-builtin-mcps` NDJSON output and dispatches our 4 phases: - assistant.turn_start (first turn) -> PhaseConnecting - first assistant.message_delta -> PhaseFirstToken - subsequent assistant.message_delta -> PhaseGenerating (deltaContent concatenated) - assistant.message (non-ephemeral) -> captures outputTokens (summed across turns) - result (terminal) -> PhaseDone with result.usage.totalApiDurationMs as DurationMs; non-zero exitCode treated as error Session bootstrap (session.mcp_servers_loaded / skills_loaded / tools_updated), user-message echoes, ephemeral message_start markers, and reasoning/tool-execution events are ignored -- only assistant.message_delta contributes to the concatenated result. Copilot's stream carries no TTFT, input-token, or cached-token data, so the progress writer renders \"Provider responded -- generating...\" (no parens) by graceful degradation. Real output tokens come from the non-ephemeral assistant.message events; multi-turn (tool-using) generations sum across turns. Error envelope handling decodes message / error.message / data.message fields rather than echoing the raw JSON line. The real CLI (1.0.49) currently signals errors via non-zero result.exitCode plus a stderr message (no in-stream type:\"error\" event observed), but the handler is kept defensive for forward-compat. Malformed lines are counted and surfaced in the no-content error so protocol drift does not disappear silently. Matches the privacy fix applied to codex (47583e9c2) and gemini (1e0e93f18) parsers. Falls back to non-streaming GenerateText if the CLI rejects streaming flags. The looksLikeCopilotUnrecognizedFlag heuristic uses the strict phrase set (\"unknown flag\" / \"unrecognized option\" / \"unknown option\" / \"invalid option\") matching Codex; the //nolint:dupl directive flags it as the 3rd of 4 per-agent heuristics pending post-Chunk-5 consolidation. Verified end-to-end against the real copilot CLI (1.0.49) on the same smoke checkpoint d86ffe2e995d Chunks 2-3 used: produced 750 output tokens in 15.9s; all four phases fired in order. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../agent/copilotcli/generate_streaming.go | 232 ++++++++++++++++++ .../copilotcli/generate_streaming_test.go | 148 +++++++++++ .../cli/agent/copilotcli/stream_response.go | 58 +++++ .../copilotcli/testdata/stream_error.jsonl | 5 + .../copilotcli/testdata/stream_success.jsonl | 11 + 5 files changed, 454 insertions(+) create mode 100644 cmd/entire/cli/agent/copilotcli/generate_streaming.go create mode 100644 cmd/entire/cli/agent/copilotcli/generate_streaming_test.go create mode 100644 cmd/entire/cli/agent/copilotcli/stream_response.go create mode 100644 cmd/entire/cli/agent/copilotcli/testdata/stream_error.jsonl create mode 100644 cmd/entire/cli/agent/copilotcli/testdata/stream_success.jsonl diff --git a/cmd/entire/cli/agent/copilotcli/generate_streaming.go b/cmd/entire/cli/agent/copilotcli/generate_streaming.go new file mode 100644 index 0000000000..2c30cc89da --- /dev/null +++ b/cmd/entire/cli/agent/copilotcli/generate_streaming.go @@ -0,0 +1,232 @@ +package copilotcli + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os/exec" + "strings" + "time" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +const streamBufferMax = 4 * 1024 * 1024 // 4 MiB + +// parseCopilotStream consumes `copilot --output-format json --stream on` NDJSON +// output, dispatches progress callbacks, and returns the concatenated assistant +// content. +// +// Real-CLI shape (GitHub Copilot CLI 1.0.48): +// - session.* / user.message → ignored (bootstrap + echo) +// - assistant.turn_start → PhaseConnecting +// - assistant.message_start → ignored (ephemeral marker) +// - assistant.message_delta (deltaContent) → PhaseFirstToken (1st) / +// PhaseGenerating (subsequent), +// content concatenated +// - assistant.message (non-ephemeral, outputTokens) → captures token count +// - assistant.turn_end → turn marker, no-op +// - result (exitCode, usage.totalApiDurationMs) → PhaseDone (terminal); +// non-zero exitCode is +// treated as a stream error +// +// The "error" event shape is defensive: Copilot's current public stream +// typically reports failures via non-zero result.exitCode plus a stderr +// message, not an in-stream JSON envelope. The error-event handler is kept +// for forward-compat with potential future CLI versions that adopt one. +func parseCopilotStream(stdout io.Reader, progress agent.ProgressFn) (string, error) { + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 64*1024), streamBufferMax) + + var ( + result strings.Builder + connectingFired bool + firstTokenFired bool + sawTerminal bool + usage *copilotStreamUsage + outputTokens int + malformed int + start = time.Now() + ) + + dispatch := func(p agent.GenerationProgress) { + if progress != nil { + progress(p) + } + } + + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var ev copilotStreamEvent + if err := json.Unmarshal(line, &ev); err != nil { + // Copilot may emit transient noise (blank lines, partial flushes); a + // schema-incompatible line is recoverable per-event but tracked so + // protocol regressions surface in the no-content error instead of + // disappearing silently. + malformed++ + continue + } + + switch ev.Type { + case "assistant.turn_start": + // Copilot emits one assistant.turn_start per tool-using turn. The + // progress UI expects PhaseConnecting once per generation, so we + // fire only on the first turn. + if !connectingFired { + connectingFired = true + dispatch(agent.GenerationProgress{Phase: agent.PhaseConnecting}) + } + + case "assistant.message_delta": + if ev.Data == nil || ev.Data.DeltaContent == "" { + continue + } + if !firstTokenFired { + firstTokenFired = true + // Copilot's delta events carry no TTFT, input-token, or + // cached-token data; the progress writer's graceful + // degradation will render "Provider responded -- generating..." + // without parens. + dispatch(agent.GenerationProgress{Phase: agent.PhaseFirstToken}) + } else { + dispatch(agent.GenerationProgress{Phase: agent.PhaseGenerating}) + } + result.WriteString(ev.Data.DeltaContent) + + case "assistant.message": + // Each non-ephemeral assistant.message carries the outputTokens for + // one turn. Multi-turn generations (tool-using runs) emit several; + // sum them so PhaseDone reports the total output tokens for the + // whole generation. + if ev.Data != nil && ev.Data.OutputTokens > 0 { + outputTokens += ev.Data.OutputTokens + } + + case "result": + sawTerminal = true + usage = ev.Usage + if ev.ExitCode != 0 { + // Partial decode tolerated: schema drift falls through to + // "unspecified error" rather than leaking the raw line (which + // may carry echoed user content or model fragments) into logs + // and *TextGenerationError.Stderr. + return "", fmt.Errorf("copilot stream error: non-zero exit code %d", ev.ExitCode) + } + + case "error": + var details struct { + Message string `json:"message"` + Error struct { + Message string `json:"message"` + } `json:"error"` + Data struct { + Message string `json:"message"` + } `json:"data"` + } + // Partial decode tolerated: schema drift falls through to + // "unspecified error" rather than leaking the raw line (which + // may carry echoed user content or model fragments) into logs + // and *TextGenerationError.Stderr. + _ = json.Unmarshal(line, &details) //nolint:errcheck // see comment above + detail := details.Message + if detail == "" { + detail = details.Error.Message + } + if detail == "" { + detail = details.Data.Message + } + if detail == "" { + detail = "unspecified error" + } + return "", fmt.Errorf("copilot stream error: %s", detail) + } + } + if err := scanner.Err(); err != nil { + return "", fmt.Errorf("reading copilot stream: %w", err) + } + if !firstTokenFired { + if malformed > 0 { + return "", fmt.Errorf("copilot stream produced no assistant content (%d malformed lines skipped)", malformed) + } + return "", errors.New("copilot stream produced no assistant content") + } + if progress != nil { + done := agent.GenerationProgress{Phase: agent.PhaseDone} + if outputTokens > 0 { + done.OutputTokens = outputTokens + } + if usage != nil && usage.TotalAPIDurationMs > 0 { + done.DurationMs = usage.TotalAPIDurationMs + } + if done.DurationMs == 0 { + // EOF / no-terminal fallback: compute locally so the progress + // writer always has a duration to render. + done.DurationMs = int(time.Since(start).Milliseconds()) + } + dispatch(done) + } + _ = sawTerminal // kept for diagnostic clarity; absence is acceptable (EOF fallback) + return result.String(), nil +} + +// GenerateTextStreaming implements agent.StreamingTextGenerator. +func (c *CopilotCLIAgent) GenerateTextStreaming( + ctx context.Context, + prompt, model string, + progress agent.ProgressFn, +) (string, error) { + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "copilot-cli", + DisplayName: "copilot", + BuildCmd: c.buildStreamCmd, + Parser: parseCopilotStream, + LooksLikeUnrecognizedFlag: looksLikeCopilotUnrecognizedFlag, + } + + result, err := tmpl.Generate(ctx, prompt, model, progress) + if err != nil { + if errors.Is(err, agent.ErrUnrecognizedStreamingFlag) { + return c.GenerateText(ctx, prompt, model) + } + return "", fmt.Errorf("copilot streaming generate: %w", err) + } + return result, nil +} + +func (c *CopilotCLIAgent) buildStreamCmd(ctx context.Context, prompt, model string) *exec.Cmd { + commandRunner := c.CommandRunner + if commandRunner == nil { + commandRunner = exec.CommandContext + } + args := []string{"--output-format", "json", "--stream", "on", "--allow-all-tools", "--disable-builtin-mcps"} + if model != "" { + args = append(args, "--model", model) + } + cmd := commandRunner(ctx, "copilot", args...) + cmd.Stdin = strings.NewReader(prompt) + return cmd +} + +// looksLikeCopilotUnrecognizedFlag is the 3rd of 4 per-agent flag-rejection +// heuristics. Consolidation deferred to a post-chunk-5 cleanup commit. Phrase +// set matches Codex's (strict — "unknown flag" rather than bare "unknown") to +// avoid the over-broad matches a Chunk-3 code review flagged on Gemini. +// +//nolint:dupl // 3rd of 4 per-agent flag-rejection heuristics; consolidation deferred to a post-chunk-5 cleanup commit +func looksLikeCopilotUnrecognizedFlag(stderr string) bool { + lower := strings.ToLower(stderr) + hasRejectPhrase := strings.Contains(lower, "unknown flag") || + strings.Contains(lower, "unrecognized option") || + strings.Contains(lower, "unknown option") || + strings.Contains(lower, "invalid option") + if !hasRejectPhrase { + return false + } + return strings.Contains(lower, "stream") || strings.Contains(lower, "output-format") +} diff --git a/cmd/entire/cli/agent/copilotcli/generate_streaming_test.go b/cmd/entire/cli/agent/copilotcli/generate_streaming_test.go new file mode 100644 index 0000000000..14a3cf58ef --- /dev/null +++ b/cmd/entire/cli/agent/copilotcli/generate_streaming_test.go @@ -0,0 +1,148 @@ +package copilotcli + +import ( + "bytes" + "context" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/agent/testutil" +) + +func TestParseCopilotStream_Success(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + var phases []agent.ProgressPhase + var doneEvent agent.GenerationProgress + result, err := parseCopilotStream(bytes.NewReader(data), func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + if p.Phase == agent.PhaseDone { + doneEvent = p + } + }) + if err != nil { + t.Fatalf("parse: %v", err) + } + if !strings.Contains(result, "Hello") || !strings.Contains(result, "world") { + t.Errorf("result = %q, want concatenated assistant content", result) + } + + counts := map[agent.ProgressPhase]int{} + for _, p := range phases { + counts[p]++ + } + if counts[agent.PhaseConnecting] != 1 { + t.Errorf("PhaseConnecting count = %d, want 1", counts[agent.PhaseConnecting]) + } + if counts[agent.PhaseFirstToken] != 1 { + t.Errorf("PhaseFirstToken count = %d, want 1", counts[agent.PhaseFirstToken]) + } + if counts[agent.PhaseGenerating] < 1 { + t.Errorf("PhaseGenerating count = %d, want at least 1", counts[agent.PhaseGenerating]) + } + if counts[agent.PhaseDone] != 1 { + t.Errorf("PhaseDone count = %d, want 1 (emitted at terminal result event)", counts[agent.PhaseDone]) + } + + // Verify that token + duration data from the real CLI's terminal events is + // carried into the PhaseDone progress event (assistant.message.outputTokens + // and result.usage.totalApiDurationMs). + if doneEvent.OutputTokens != 7 { + t.Errorf("Done.OutputTokens = %d, want 7 (from assistant.message)", doneEvent.OutputTokens) + } + if doneEvent.DurationMs != 2546 { + t.Errorf("Done.DurationMs = %d, want 2546 (from result.usage.totalApiDurationMs)", doneEvent.DurationMs) + } +} + +func TestParseCopilotStream_ErrorEnvelope(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_error.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + _, err = parseCopilotStream(bytes.NewReader(data), nil) + if err == nil { + t.Fatal("expected error from error envelope") + } + // Fix A: the error must surface the operational message, not the raw JSON line. + if !strings.Contains(err.Error(), "rate limited") { + t.Errorf("error %q should mention 'rate limited' from the operational message field", err) + } +} + +func TestParseCopilotStream_EmptyStream(t *testing.T) { + t.Parallel() + + _, err := parseCopilotStream(strings.NewReader(""), nil) + if err == nil { + t.Fatal("expected error from empty stream (no assistant content)") + } +} + +func TestCopilotCLIGenerateTextStreaming_Success(t *testing.T) { + t.Parallel() + + fixture, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + c := &CopilotCLIAgent{ + CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0), + } + + var phases []agent.ProgressPhase + result, err := c.GenerateTextStreaming(context.Background(), "test prompt", "haiku", func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(result, "Hello") || !strings.Contains(result, "world") { + t.Errorf("result = %q, want concatenated assistant content", result) + } + // Expect Connecting, FirstToken, Generating (>=1), Done. + if len(phases) < 4 { + t.Errorf("phases = %v (count %d), want >= 4 (Connecting, FirstToken, Generating+, Done)", phases, len(phases)) + } +} + +func TestCopilotCLIGenerateTextStreaming_FallbackOnUnrecognizedFlag(t *testing.T) { + t.Parallel() + + streamCall := testutil.FakeStreamCmd("", "error: unknown flag: --stream", 1) + nonStreamCall := testutil.FakeStreamCmd("fallback response", "", 0) + calls := 0 + c := &CopilotCLIAgent{ + CommandRunner: func(ctx context.Context, name string, args ...string) *exec.Cmd { + calls++ + if calls == 1 { + return streamCall(ctx, name, args...) + } + return nonStreamCall(ctx, name, args...) + }, + } + + result, err := c.GenerateTextStreaming(context.Background(), "test", "haiku", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(result, "fallback") { + t.Errorf("result = %q, want substring 'fallback'", result) + } + if calls != 2 { + t.Errorf("calls = %d, want 2 (streaming + fallback)", calls) + } +} diff --git a/cmd/entire/cli/agent/copilotcli/stream_response.go b/cmd/entire/cli/agent/copilotcli/stream_response.go new file mode 100644 index 0000000000..253b840ad1 --- /dev/null +++ b/cmd/entire/cli/agent/copilotcli/stream_response.go @@ -0,0 +1,58 @@ +package copilotcli + +import "time" + +// copilotStreamEvent is one decoded line of `copilot --output-format json +// --stream on` NDJSON output. Fields are populated based on Type. +// +// Real-CLI shape (GitHub Copilot CLI 1.0.48): +// - session.mcp_servers_loaded / session.skills_loaded / session.tools_updated +// → session bootstrap, ignored +// - user.message → echoed input, ignored +// - assistant.turn_start → turn begins +// - assistant.message_start (ephemeral) → announces messageId +// - assistant.message_delta (ephemeral, deltaContent) → incremental tokens +// - assistant.message (non-ephemeral, outputTokens) → consolidated message +// with final outputTokens count +// - assistant.turn_end → turn marker +// - result (terminal, usage.totalApiDurationMs etc.) → exitCode + duration +type copilotStreamEvent struct { + Type string `json:"type"` + Timestamp time.Time `json:"timestamp,omitempty"` + Data *copilotStreamData `json:"data,omitempty"` + ID string `json:"id,omitempty"` + ParentID string `json:"parentId,omitempty"` + Ephemeral bool `json:"ephemeral,omitempty"` + + // SessionID is populated on the terminal type=result event. + SessionID string `json:"sessionId,omitempty"` + + // ExitCode is populated on the terminal type=result event. 0=success. + ExitCode int `json:"exitCode,omitempty"` + + // Usage is populated on the terminal type=result event. + Usage *copilotStreamUsage `json:"usage,omitempty"` +} + +// copilotStreamData is the per-event payload. Fields are sparsely populated +// based on the parent event's Type. +type copilotStreamData struct { + MessageID string `json:"messageId,omitempty"` + TurnID string `json:"turnId,omitempty"` + DeltaContent string `json:"deltaContent,omitempty"` + Model string `json:"model,omitempty"` + Content string `json:"content,omitempty"` + + // OutputTokens appears on the consolidated assistant.message event. + // Copilot does not expose input/cached tokens in its public stream. + OutputTokens int `json:"outputTokens,omitempty"` +} + +// copilotStreamUsage appears inside the terminal type=result event. Copilot +// reports timing data and "premium request" counts but does not surface input +// or cached-input token counts in its public stream. +type copilotStreamUsage struct { + PremiumRequests int `json:"premiumRequests,omitempty"` + TotalAPIDurationMs int `json:"totalApiDurationMs,omitempty"` + SessionDurationMs int `json:"sessionDurationMs,omitempty"` +} diff --git a/cmd/entire/cli/agent/copilotcli/testdata/stream_error.jsonl b/cmd/entire/cli/agent/copilotcli/testdata/stream_error.jsonl new file mode 100644 index 0000000000..c428b6c233 --- /dev/null +++ b/cmd/entire/cli/agent/copilotcli/testdata/stream_error.jsonl @@ -0,0 +1,5 @@ +{"type":"session.tools_updated","data":{"model":"claude-sonnet-4.6"},"id":"c","timestamp":"2026-05-15T21:35:30.958Z","ephemeral":true} +{"type":"user.message","data":{"content":"hi","interactionId":"i0"},"id":"u0","timestamp":"2026-05-15T21:35:30.959Z"} +{"type":"assistant.turn_start","data":{"turnId":"0","interactionId":"i0"},"id":"t0","timestamp":"2026-05-15T21:35:31.071Z"} +{"type":"error","data":{},"message":"rate limited: too many requests","timestamp":"2026-05-15T21:35:31.100Z"} +{"type":"result","timestamp":"2026-05-15T21:35:31.110Z","sessionId":"sess-1","exitCode":2,"usage":{"premiumRequests":0,"totalApiDurationMs":0,"sessionDurationMs":42}} diff --git a/cmd/entire/cli/agent/copilotcli/testdata/stream_success.jsonl b/cmd/entire/cli/agent/copilotcli/testdata/stream_success.jsonl new file mode 100644 index 0000000000..74ec3883fd --- /dev/null +++ b/cmd/entire/cli/agent/copilotcli/testdata/stream_success.jsonl @@ -0,0 +1,11 @@ +{"type":"session.mcp_servers_loaded","data":{"servers":[]},"id":"a","timestamp":"2026-05-15T21:35:30.934Z","ephemeral":true} +{"type":"session.skills_loaded","data":{"skills":[]},"id":"b","timestamp":"2026-05-15T21:35:30.934Z","ephemeral":true} +{"type":"session.tools_updated","data":{"model":"claude-sonnet-4.6"},"id":"c","timestamp":"2026-05-15T21:35:30.958Z","ephemeral":true} +{"type":"user.message","data":{"content":"say hi","interactionId":"i0"},"id":"u0","timestamp":"2026-05-15T21:35:30.959Z"} +{"type":"assistant.turn_start","data":{"turnId":"0","interactionId":"i0"},"id":"t0","timestamp":"2026-05-15T21:35:31.071Z"} +{"type":"assistant.message_start","data":{"messageId":"m0"},"id":"ms0","timestamp":"2026-05-15T21:35:33.283Z","ephemeral":true} +{"type":"assistant.message_delta","data":{"messageId":"m0","deltaContent":"Hello"},"id":"d1","timestamp":"2026-05-15T21:35:33.283Z","ephemeral":true} +{"type":"assistant.message_delta","data":{"messageId":"m0","deltaContent":", world."},"id":"d2","timestamp":"2026-05-15T21:35:33.317Z","ephemeral":true} +{"type":"assistant.message","data":{"messageId":"m0","model":"claude-sonnet-4.6","content":"Hello, world.","outputTokens":7,"turnId":"0","interactionId":"i0"},"id":"am0","timestamp":"2026-05-15T21:35:33.668Z"} +{"type":"assistant.turn_end","data":{"turnId":"0"},"id":"te0","timestamp":"2026-05-15T21:35:33.669Z"} +{"type":"result","timestamp":"2026-05-15T21:35:33.679Z","sessionId":"sess-1","exitCode":0,"usage":{"premiumRequests":1,"totalApiDurationMs":2546,"sessionDurationMs":4443}} From e10cbfd0c0dafe83869fa004731c792c7cd478fd Mon Sep 17 00:00:00 2001 From: Peyton Montei Date: Tue, 19 May 2026 15:13:48 -0400 Subject: [PATCH 06/12] fix(copilotcli): tick output-token counter during streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PhaseGenerating dispatches used to send OutputTokens=0, leaving the progress writer stuck at `→ Writing summary... (~0 tokens)` until the final assistant.message event landed. Estimate a running count from accumulated deltaContent character length (totalChars/4) so the counter ticks during the stream, matching Claude's pattern from PR #964. The authoritative output_tokens from assistant.message events still overrides on PhaseDone, so the final summary line is unaffected. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/entire/cli/agent/copilotcli/generate_streaming.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cmd/entire/cli/agent/copilotcli/generate_streaming.go b/cmd/entire/cli/agent/copilotcli/generate_streaming.go index 2c30cc89da..1e2039d47b 100644 --- a/cmd/entire/cli/agent/copilotcli/generate_streaming.go +++ b/cmd/entire/cli/agent/copilotcli/generate_streaming.go @@ -48,6 +48,7 @@ func parseCopilotStream(stdout io.Reader, progress agent.ProgressFn) (string, er sawTerminal bool usage *copilotStreamUsage outputTokens int + streamedChars int malformed int start = time.Now() ) @@ -95,7 +96,13 @@ func parseCopilotStream(stdout io.Reader, progress agent.ProgressFn) (string, er // without parens. dispatch(agent.GenerationProgress{Phase: agent.PhaseFirstToken}) } else { - dispatch(agent.GenerationProgress{Phase: agent.PhaseGenerating}) + // Estimate running output tokens from accumulated character + // count so the writer's "Writing summary... (~Nk tokens)" + // counter ticks during streaming (matches Claude's pattern + // from PR #964). Authoritative output_tokens still arrives + // later via assistant.message events and overrides on Done. + streamedChars += len(ev.Data.DeltaContent) + dispatch(agent.GenerationProgress{Phase: agent.PhaseGenerating, OutputTokens: streamedChars / 4}) } result.WriteString(ev.Data.DeltaContent) From 422ea998baf0fd103fe210ea1ec075846dae30a8 Mon Sep 17 00:00:00 2001 From: Peyton Montei Date: Tue, 19 May 2026 17:54:55 -0400 Subject: [PATCH 07/12] feat(cursor): implement StreamingTextGenerator Cursor now emits live progress events through the explain --generate UI. parseCursorStream consumes `agent --print --output-format stream-json --stream-partial-output --force --trust --workspace ` NDJSON output and dispatches our 4 phases: - system,subtype:init -> PhaseConnecting - thinking,* -> ignored (internal reasoning; the Connecting phase persists during this window) - first assistant delta -> PhaseFirstToken (no parens -- (timestamp_ms present) cacheReadTokens unknown until result) - subsequent assistant deltas -> PhaseGenerating (running token estimate from accumulated text length) - assistant w/o timestamp_ms -> ignored (aggregated final message; use result.result as canonical) - result,subtype:success -> PhaseDone (real OutputTokens, real CachedInputTokens, real DurationMs from result.usage and result.duration_ms) - result,is_error:true -> typed error (privacy decode: surfaces result.result text only, never the raw line) Error envelopes ride inside the result event with is_error:true; the parser checks this flag before extracting usage. Malformed lines are counted and surfaced in the missing-result error so protocol drift doesn't disappear silently. Falls back to non-streaming GenerateText if the CLI rejects streaming flags. Cursor's stream-json mode requires authentication (`agent login` or CURSOR_API_KEY) -- without auth, the binary exits non-zero before the stream begins; the template surfaces this as a generic TextGenerationError. Verified end-to-end against the real `agent` CLI (Composer 2.5 Fast) on the same smoke checkpoint Chunks 2-3, 5 used. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../cli/agent/cursor/generate_streaming.go | 220 ++++++++++++++++++ .../agent/cursor/generate_streaming_test.go | 149 ++++++++++++ .../cli/agent/cursor/stream_response.go | 46 ++++ .../agent/cursor/testdata/stream_error.jsonl | 2 + .../cursor/testdata/stream_success.jsonl | 8 + 5 files changed, 425 insertions(+) create mode 100644 cmd/entire/cli/agent/cursor/generate_streaming.go create mode 100644 cmd/entire/cli/agent/cursor/generate_streaming_test.go create mode 100644 cmd/entire/cli/agent/cursor/stream_response.go create mode 100644 cmd/entire/cli/agent/cursor/testdata/stream_error.jsonl create mode 100644 cmd/entire/cli/agent/cursor/testdata/stream_success.jsonl diff --git a/cmd/entire/cli/agent/cursor/generate_streaming.go b/cmd/entire/cli/agent/cursor/generate_streaming.go new file mode 100644 index 0000000000..c657be6514 --- /dev/null +++ b/cmd/entire/cli/agent/cursor/generate_streaming.go @@ -0,0 +1,220 @@ +package cursor + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "strings" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +const streamBufferMax = 4 * 1024 * 1024 // 4 MiB + +// parseCursorStream consumes `agent --print --output-format stream-json +// --stream-partial-output` NDJSON output, dispatches progress callbacks, and +// returns the canonical result text (from the terminal result event, not +// from concatenated assistant deltas — see note on aggregated final message +// below). +// +// Real-CLI shape (Cursor agent, Composer 2.5 Fast): +// - system,subtype:init → PhaseConnecting (once) +// - user.message → ignored (echoed input) +// - thinking,subtype:delta / completed → ignored (internal +// reasoning; PhaseConnecting persists during this window so users see +// "Sending request to provider..." for longer on reasoning-heavy turns) +// - assistant w/ timestamp_ms (1st) → PhaseFirstToken (no parens — +// cacheReadTokens isn't known until result event) +// - assistant w/ timestamp_ms (subsequent) → PhaseGenerating (running +// token estimate from streamed character length) +// - assistant w/o timestamp_ms → ignored (aggregated final +// message; result.result is canonical) +// - result,subtype:success → PhaseDone (real +// OutputTokens / CachedInputTokens / DurationMs from result.usage and +// result.duration_ms) +// - result,is_error:true → typed error (privacy +// decode: surfaces result.result only, never the raw line) +func parseCursorStream(stdout io.Reader, progress agent.ProgressFn) (string, error) { + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 64*1024), streamBufferMax) + + var ( + connectingFired bool + firstTokenFired bool + sawResult bool + resultText string + streamedChars int + malformed int + usage *cursorStreamUsage + durationMs int + ) + + dispatch := func(p agent.GenerationProgress) { + if progress != nil { + progress(p) + } + } + + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var ev cursorStreamEvent + if err := json.Unmarshal(line, &ev); err != nil { + // Cursor may emit transient noise (blank lines, partial flushes); + // a schema-incompatible line is recoverable per-event but tracked + // so protocol regressions surface in the missing-result error + // instead of disappearing silently. + malformed++ + continue + } + + switch ev.Type { + case "system": + if ev.Subtype == "init" && !connectingFired { + connectingFired = true + dispatch(agent.GenerationProgress{Phase: agent.PhaseConnecting}) + } + + case "assistant": + // Skip the aggregated final assistant message (no timestamp_ms); + // only delta events (with timestamp_ms) drive PhaseGenerating. + if ev.TimestampMs == 0 { + continue + } + var textBuilder strings.Builder + if ev.Message != nil { + for _, c := range ev.Message.Content { + if c.Type == "text" { + textBuilder.WriteString(c.Text) + } + } + } + text := textBuilder.String() + if text == "" { + continue + } + if !firstTokenFired { + firstTokenFired = true + // CacheReadTokens isn't known until the result event arrives; + // FirstToken renders without parens here, and Done will + // expose cached/output/duration from result.usage. + dispatch(agent.GenerationProgress{Phase: agent.PhaseFirstToken}) + } else { + streamedChars += len(text) + dispatch(agent.GenerationProgress{Phase: agent.PhaseGenerating, OutputTokens: streamedChars / 4}) + } + + case "result": + sawResult = true + if ev.IsError { + // Privacy decode: surface only result text (CLI-authored, + // user-safe), never the raw line. Cursor's error envelope + // has not been observed in the wild beyond synthesized + // fixtures, so fall through to a generic message rather + // than leaking anything we haven't audited. + detail := ev.Result + if detail == "" { + detail = "unspecified error" + } + return "", fmt.Errorf("cursor stream error: %s", detail) + } + resultText = ev.Result + usage = ev.Usage + durationMs = ev.DurationMs + + // "thinking" and "user" events are intentionally ignored. + } + } + if err := scanner.Err(); err != nil { + return "", fmt.Errorf("reading cursor stream: %w", err) + } + if !sawResult { + if malformed > 0 { + return "", fmt.Errorf("cursor stream ended without a result event (%d malformed lines skipped)", malformed) + } + return "", errors.New("cursor stream ended without a result event") + } + if progress != nil { + done := agent.GenerationProgress{Phase: agent.PhaseDone, DurationMs: durationMs} + if usage != nil { + done.OutputTokens = usage.OutputTokens + done.InputTokens = usage.InputTokens + done.CachedInputTokens = usage.CacheReadTokens + } + dispatch(done) + } + return resultText, nil +} + +// GenerateTextStreaming implements agent.StreamingTextGenerator for *CursorAgent. +func (c *CursorAgent) GenerateTextStreaming( + ctx context.Context, + prompt, model string, + progress agent.ProgressFn, +) (string, error) { + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "cursor", + DisplayName: "agent", + BuildCmd: c.buildStreamCmd, + Parser: parseCursorStream, + LooksLikeUnrecognizedFlag: looksLikeCursorUnrecognizedFlag, + } + + result, err := tmpl.Generate(ctx, prompt, model, progress) + if err != nil { + if errors.Is(err, agent.ErrUnrecognizedStreamingFlag) { + return c.GenerateText(ctx, prompt, model) + } + return "", fmt.Errorf("cursor streaming generate: %w", err) + } + return result, nil +} + +func (c *CursorAgent) buildStreamCmd(ctx context.Context, prompt, model string) *exec.Cmd { + commandRunner := c.CommandRunner + if commandRunner == nil { + commandRunner = exec.CommandContext + } + args := []string{ + "--print", + "--force", + "--trust", + "--workspace", os.TempDir(), + "--output-format", "stream-json", + "--stream-partial-output", + } + if model != "" { + args = append(args, "--model", model) + } + cmd := commandRunner(ctx, "agent", args...) + cmd.Stdin = strings.NewReader(prompt) + return cmd +} + +// looksLikeCursorUnrecognizedFlag is the 4th of 4 per-agent flag-rejection +// heuristics. Consolidation deferred to a post-chunk-5 cleanup commit. Phrase +// set matches Codex/Copilot's (strict — "unknown flag" rather than bare +// "unknown") to avoid the over-broad matches a Chunk-3 code review flagged on +// Gemini. +// +//nolint:dupl // 4th of 4 per-agent flag-rejection heuristics; consolidation deferred to a post-chunk-5 cleanup commit +func looksLikeCursorUnrecognizedFlag(stderr string) bool { + lower := strings.ToLower(stderr) + hasRejectPhrase := strings.Contains(lower, "unknown flag") || + strings.Contains(lower, "unrecognized option") || + strings.Contains(lower, "unknown option") || + strings.Contains(lower, "invalid option") + if !hasRejectPhrase { + return false + } + return strings.Contains(lower, "stream-json") || + strings.Contains(lower, "stream-partial-output") || + strings.Contains(lower, "output-format") +} diff --git a/cmd/entire/cli/agent/cursor/generate_streaming_test.go b/cmd/entire/cli/agent/cursor/generate_streaming_test.go new file mode 100644 index 0000000000..bf4a79b153 --- /dev/null +++ b/cmd/entire/cli/agent/cursor/generate_streaming_test.go @@ -0,0 +1,149 @@ +package cursor + +import ( + "bytes" + "context" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/agent/testutil" +) + +func TestParseCursorStream_Success(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + var phases []agent.ProgressPhase + var doneProgress agent.GenerationProgress + result, err := parseCursorStream(bytes.NewReader(data), func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + if p.Phase == agent.PhaseDone { + doneProgress = p + } + }) + if err != nil { + t.Fatalf("parse: %v", err) + } + if result != "Hello, world." { + t.Errorf("result = %q, want %q (from result.result, not delta concat)", result, "Hello, world.") + } + + counts := map[agent.ProgressPhase]int{} + for _, p := range phases { + counts[p]++ + } + if counts[agent.PhaseConnecting] != 1 { + t.Errorf("PhaseConnecting count = %d, want 1", counts[agent.PhaseConnecting]) + } + if counts[agent.PhaseFirstToken] != 1 { + t.Errorf("PhaseFirstToken count = %d, want 1", counts[agent.PhaseFirstToken]) + } + if counts[agent.PhaseGenerating] < 1 { + t.Errorf("PhaseGenerating count = %d, want >= 1", counts[agent.PhaseGenerating]) + } + if counts[agent.PhaseDone] != 1 { + t.Errorf("PhaseDone count = %d, want 1", counts[agent.PhaseDone]) + } + if doneProgress.OutputTokens != 268 { + t.Errorf("Done.OutputTokens = %d, want 268 (from result.usage.outputTokens)", doneProgress.OutputTokens) + } + if doneProgress.CachedInputTokens != 4608 { + t.Errorf("Done.CachedInputTokens = %d, want 4608 (from result.usage.cacheReadTokens)", doneProgress.CachedInputTokens) + } + if doneProgress.DurationMs != 3372 { + t.Errorf("Done.DurationMs = %d, want 3372 (from result.duration_ms)", doneProgress.DurationMs) + } +} + +func TestParseCursorStream_ErrorEnvelope(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_error.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + _, err = parseCursorStream(bytes.NewReader(data), nil) + if err == nil { + t.Fatal("expected error from result envelope with is_error=true") + } + if !strings.Contains(err.Error(), "Max turns") { + t.Errorf("error %q should mention 'Max turns'", err) + } +} + +func TestParseCursorStream_MissingResult(t *testing.T) { + t.Parallel() + + stream := `{"type":"system","subtype":"init","session_id":"t"} +{"type":"assistant","message":{"role":"assistant","content":[{"type":"text","text":"partial"}]},"session_id":"t","timestamp_ms":1} +` + _, err := parseCursorStream(strings.NewReader(stream), nil) + if err == nil { + t.Fatal("expected error when stream lacks result event") + } +} + +func TestCursorGenerateTextStreaming_Success(t *testing.T) { + t.Parallel() + + fixture, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + c := &CursorAgent{ + CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0), + } + + var phases []agent.ProgressPhase + result, err := c.GenerateTextStreaming(context.Background(), "test prompt", "", func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result != "Hello, world." { + t.Errorf("result = %q, want %q (canonical from result.result)", result, "Hello, world.") + } + // Expect Connecting, FirstToken, Generating (>=1), Done. + if len(phases) < 4 { + t.Errorf("phases = %v (count %d), want >= 4 (Connecting, FirstToken, Generating+, Done)", phases, len(phases)) + } +} + +func TestCursorGenerateTextStreaming_FallbackOnUnrecognizedFlag(t *testing.T) { + t.Parallel() + + streamCall := testutil.FakeStreamCmd("", "error: unknown flag: --stream-json", 1) + nonStreamCall := testutil.FakeStreamCmd("fallback response", "", 0) + calls := 0 + c := &CursorAgent{ + CommandRunner: func(ctx context.Context, name string, args ...string) *exec.Cmd { + calls++ + if calls == 1 { + return streamCall(ctx, name, args...) + } + return nonStreamCall(ctx, name, args...) + }, + } + + result, err := c.GenerateTextStreaming(context.Background(), "test", "", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(result, "fallback") { + t.Errorf("result = %q, want substring 'fallback'", result) + } + if calls != 2 { + t.Errorf("calls = %d, want 2 (streaming + fallback)", calls) + } +} diff --git a/cmd/entire/cli/agent/cursor/stream_response.go b/cmd/entire/cli/agent/cursor/stream_response.go new file mode 100644 index 0000000000..178af54e86 --- /dev/null +++ b/cmd/entire/cli/agent/cursor/stream_response.go @@ -0,0 +1,46 @@ +package cursor + +// cursorStreamEvent is one decoded line of `agent --print --output-format +// stream-json --stream-partial-output` NDJSON output. Fields are populated +// based on Type. +// +// Real-CLI shape (Cursor agent, Composer 2.5 Fast): +// - system,subtype:init → session bootstrap (model, cwd) +// - user.message → echoed input, ignored +// - thinking,subtype:delta / completed → internal reasoning, ignored +// - assistant (with timestamp_ms) delta → incremental text token +// - assistant (no timestamp_ms) aggregated → final consolidated message, +// ignored (use result.result) +// - result,subtype:success → terminal: usage + duration +// - result,is_error:true → terminal error envelope +type cursorStreamEvent struct { + Type string `json:"type"` + Subtype string `json:"subtype,omitempty"` + IsError bool `json:"is_error,omitempty"` + Result string `json:"result,omitempty"` + DurationMs int `json:"duration_ms,omitempty"` + TimestampMs int64 `json:"timestamp_ms,omitempty"` + Message *cursorStreamMessage `json:"message,omitempty"` + Usage *cursorStreamUsage `json:"usage,omitempty"` +} + +// cursorStreamMessage is the assistant/user message payload. +type cursorStreamMessage struct { + Role string `json:"role,omitempty"` + Content []cursorStreamMessageContent `json:"content,omitempty"` +} + +// cursorStreamMessageContent is one content block within a message. +type cursorStreamMessageContent struct { + Type string `json:"type,omitempty"` + Text string `json:"text,omitempty"` +} + +// cursorStreamUsage appears inside the terminal type=result event. Cursor +// reports input, output, and cache-read token counts. +type cursorStreamUsage struct { + InputTokens int `json:"inputTokens,omitempty"` + OutputTokens int `json:"outputTokens,omitempty"` + CacheReadTokens int `json:"cacheReadTokens,omitempty"` + CacheWriteTokens int `json:"cacheWriteTokens,omitempty"` +} diff --git a/cmd/entire/cli/agent/cursor/testdata/stream_error.jsonl b/cmd/entire/cli/agent/cursor/testdata/stream_error.jsonl new file mode 100644 index 0000000000..224edbfb65 --- /dev/null +++ b/cmd/entire/cli/agent/cursor/testdata/stream_error.jsonl @@ -0,0 +1,2 @@ +{"type":"system","subtype":"init","apiKeySource":"login","cwd":"/private/tmp","session_id":"test","model":"Composer 2.5 Fast","permissionMode":"default"} +{"type":"result","subtype":"error_max_turns","duration_ms":100,"is_error":true,"result":"Max turns exceeded","session_id":"test"} diff --git a/cmd/entire/cli/agent/cursor/testdata/stream_success.jsonl b/cmd/entire/cli/agent/cursor/testdata/stream_success.jsonl new file mode 100644 index 0000000000..84c708af60 --- /dev/null +++ b/cmd/entire/cli/agent/cursor/testdata/stream_success.jsonl @@ -0,0 +1,8 @@ +{"type":"system","subtype":"init","apiKeySource":"login","cwd":"/private/tmp","session_id":"test","model":"Composer 2.5 Fast","permissionMode":"default"} +{"type":"user","message":{"role":"user","content":[{"type":"text","text":"say hi"}]},"session_id":"test"} +{"type":"thinking","subtype":"delta","text":"Hi","session_id":"test","timestamp_ms":1} +{"type":"thinking","subtype":"completed","session_id":"test","timestamp_ms":2} +{"type":"assistant","message":{"role":"assistant","content":[{"type":"text","text":"Hello"}]},"session_id":"test","timestamp_ms":3} +{"type":"assistant","message":{"role":"assistant","content":[{"type":"text","text":", world."}]},"session_id":"test","timestamp_ms":4} +{"type":"assistant","message":{"role":"assistant","content":[{"type":"text","text":"Hello, world."}]},"session_id":"test"} +{"type":"result","subtype":"success","duration_ms":3372,"is_error":false,"result":"Hello, world.","session_id":"test","usage":{"inputTokens":9141,"outputTokens":268,"cacheReadTokens":4608,"cacheWriteTokens":0}} From a15fba6f2be130b6bc9aaeec93bcf46993bd98e0 Mon Sep 17 00:00:00 2001 From: Peyton Montei Date: Wed, 20 May 2026 10:48:12 -0400 Subject: [PATCH 08/12] refactor(agent): extract LooksLikeUnrecognizedFlag shared helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces four near-identical per-agent flag-rejection heuristics with one shared helper that takes the agent's distinguishing flag keywords as variadic arguments. Side-effects of the consolidation: - Codex drops the `"exec"` keyword (Chunk 2 code-quality nit) — `codex exec --json` has "exec" in its argv so an unrelated stderr mentioning "exec" would have false-positive'd into a fallback. The shared helper takes only flag-name keywords, not subcommand names. - Gemini upgrades from loose phrases (`"unknown"`/`"invalid"` alone) to the canonical strict set (`"unknown flag"`, etc.) the shared helper enforces. This was a Chunk 3 code-quality review nit — generic one-word phrases could match unrelated stderr. Drops 4 `//nolint:dupl` annotations (`dupl` never actually fired at threshold 75; the markers were prophylactic insurance). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../cli/agent/codex/generate_streaming.go | 24 +++------- .../agent/copilotcli/generate_streaming.go | 30 +++---------- .../cli/agent/cursor/generate_streaming.go | 33 +++----------- .../cli/agent/geminicli/generate_streaming.go | 25 +++-------- .../cli/agent/streaming_unrecognized_flag.go | 33 ++++++++++++++ .../agent/streaming_unrecognized_flag_test.go | 44 +++++++++++++++++++ 6 files changed, 105 insertions(+), 84 deletions(-) create mode 100644 cmd/entire/cli/agent/streaming_unrecognized_flag.go create mode 100644 cmd/entire/cli/agent/streaming_unrecognized_flag_test.go diff --git a/cmd/entire/cli/agent/codex/generate_streaming.go b/cmd/entire/cli/agent/codex/generate_streaming.go index ef92feee97..b70fa9a51d 100644 --- a/cmd/entire/cli/agent/codex/generate_streaming.go +++ b/cmd/entire/cli/agent/codex/generate_streaming.go @@ -137,11 +137,13 @@ func (c *CodexAgent) GenerateTextStreaming( progress agent.ProgressFn, ) (string, error) { tmpl := &agent.StreamingGeneratorTemplate{ - AgentName: "codex", - DisplayName: "codex", - BuildCmd: c.buildStreamCmd, - Parser: parseCodexStream, - LooksLikeUnrecognizedFlag: looksLikeCodexUnrecognizedFlag, + AgentName: "codex", + DisplayName: "codex", + BuildCmd: c.buildStreamCmd, + Parser: parseCodexStream, + LooksLikeUnrecognizedFlag: func(stderr string) bool { + return agent.LooksLikeUnrecognizedFlag(stderr, "json") + }, } result, err := tmpl.Generate(ctx, prompt, model, progress) @@ -168,15 +170,3 @@ func (c *CodexAgent) buildStreamCmd(ctx context.Context, prompt, model string) * cmd.Stdin = strings.NewReader(prompt) return cmd } - -func looksLikeCodexUnrecognizedFlag(stderr string) bool { - lower := strings.ToLower(stderr) - hasRejectPhrase := strings.Contains(lower, "unrecognized option") || - strings.Contains(lower, "unknown flag") || - strings.Contains(lower, "unknown option") || - strings.Contains(lower, "invalid option") - if !hasRejectPhrase { - return false - } - return strings.Contains(lower, "json") || strings.Contains(lower, "exec") -} diff --git a/cmd/entire/cli/agent/copilotcli/generate_streaming.go b/cmd/entire/cli/agent/copilotcli/generate_streaming.go index 1e2039d47b..428dcce83e 100644 --- a/cmd/entire/cli/agent/copilotcli/generate_streaming.go +++ b/cmd/entire/cli/agent/copilotcli/generate_streaming.go @@ -189,11 +189,13 @@ func (c *CopilotCLIAgent) GenerateTextStreaming( progress agent.ProgressFn, ) (string, error) { tmpl := &agent.StreamingGeneratorTemplate{ - AgentName: "copilot-cli", - DisplayName: "copilot", - BuildCmd: c.buildStreamCmd, - Parser: parseCopilotStream, - LooksLikeUnrecognizedFlag: looksLikeCopilotUnrecognizedFlag, + AgentName: "copilot-cli", + DisplayName: "copilot", + BuildCmd: c.buildStreamCmd, + Parser: parseCopilotStream, + LooksLikeUnrecognizedFlag: func(stderr string) bool { + return agent.LooksLikeUnrecognizedFlag(stderr, "stream", "output-format") + }, } result, err := tmpl.Generate(ctx, prompt, model, progress) @@ -219,21 +221,3 @@ func (c *CopilotCLIAgent) buildStreamCmd(ctx context.Context, prompt, model stri cmd.Stdin = strings.NewReader(prompt) return cmd } - -// looksLikeCopilotUnrecognizedFlag is the 3rd of 4 per-agent flag-rejection -// heuristics. Consolidation deferred to a post-chunk-5 cleanup commit. Phrase -// set matches Codex's (strict — "unknown flag" rather than bare "unknown") to -// avoid the over-broad matches a Chunk-3 code review flagged on Gemini. -// -//nolint:dupl // 3rd of 4 per-agent flag-rejection heuristics; consolidation deferred to a post-chunk-5 cleanup commit -func looksLikeCopilotUnrecognizedFlag(stderr string) bool { - lower := strings.ToLower(stderr) - hasRejectPhrase := strings.Contains(lower, "unknown flag") || - strings.Contains(lower, "unrecognized option") || - strings.Contains(lower, "unknown option") || - strings.Contains(lower, "invalid option") - if !hasRejectPhrase { - return false - } - return strings.Contains(lower, "stream") || strings.Contains(lower, "output-format") -} diff --git a/cmd/entire/cli/agent/cursor/generate_streaming.go b/cmd/entire/cli/agent/cursor/generate_streaming.go index c657be6514..48d756cfb5 100644 --- a/cmd/entire/cli/agent/cursor/generate_streaming.go +++ b/cmd/entire/cli/agent/cursor/generate_streaming.go @@ -160,11 +160,13 @@ func (c *CursorAgent) GenerateTextStreaming( progress agent.ProgressFn, ) (string, error) { tmpl := &agent.StreamingGeneratorTemplate{ - AgentName: "cursor", - DisplayName: "agent", - BuildCmd: c.buildStreamCmd, - Parser: parseCursorStream, - LooksLikeUnrecognizedFlag: looksLikeCursorUnrecognizedFlag, + AgentName: "cursor", + DisplayName: "agent", + BuildCmd: c.buildStreamCmd, + Parser: parseCursorStream, + LooksLikeUnrecognizedFlag: func(stderr string) bool { + return agent.LooksLikeUnrecognizedFlag(stderr, "stream-json", "stream-partial-output", "output-format") + }, } result, err := tmpl.Generate(ctx, prompt, model, progress) @@ -197,24 +199,3 @@ func (c *CursorAgent) buildStreamCmd(ctx context.Context, prompt, model string) cmd.Stdin = strings.NewReader(prompt) return cmd } - -// looksLikeCursorUnrecognizedFlag is the 4th of 4 per-agent flag-rejection -// heuristics. Consolidation deferred to a post-chunk-5 cleanup commit. Phrase -// set matches Codex/Copilot's (strict — "unknown flag" rather than bare -// "unknown") to avoid the over-broad matches a Chunk-3 code review flagged on -// Gemini. -// -//nolint:dupl // 4th of 4 per-agent flag-rejection heuristics; consolidation deferred to a post-chunk-5 cleanup commit -func looksLikeCursorUnrecognizedFlag(stderr string) bool { - lower := strings.ToLower(stderr) - hasRejectPhrase := strings.Contains(lower, "unknown flag") || - strings.Contains(lower, "unrecognized option") || - strings.Contains(lower, "unknown option") || - strings.Contains(lower, "invalid option") - if !hasRejectPhrase { - return false - } - return strings.Contains(lower, "stream-json") || - strings.Contains(lower, "stream-partial-output") || - strings.Contains(lower, "output-format") -} diff --git a/cmd/entire/cli/agent/geminicli/generate_streaming.go b/cmd/entire/cli/agent/geminicli/generate_streaming.go index 43f541a0ed..5b7113aff4 100644 --- a/cmd/entire/cli/agent/geminicli/generate_streaming.go +++ b/cmd/entire/cli/agent/geminicli/generate_streaming.go @@ -149,11 +149,13 @@ func (g *GeminiCLIAgent) GenerateTextStreaming( progress agent.ProgressFn, ) (string, error) { tmpl := &agent.StreamingGeneratorTemplate{ - AgentName: "gemini", - DisplayName: "gemini", - BuildCmd: g.buildStreamCmd, - Parser: parseGeminiStream, - LooksLikeUnrecognizedFlag: looksLikeGeminiUnrecognizedFlag, + AgentName: "gemini", + DisplayName: "gemini", + BuildCmd: g.buildStreamCmd, + Parser: parseGeminiStream, + LooksLikeUnrecognizedFlag: func(stderr string) bool { + return agent.LooksLikeUnrecognizedFlag(stderr, "output-format", "stream-json") + }, } result, err := tmpl.Generate(ctx, prompt, model, progress) @@ -179,16 +181,3 @@ func (g *GeminiCLIAgent) buildStreamCmd(ctx context.Context, prompt, model strin cmd.Stdin = strings.NewReader(prompt) return cmd } - -// looksLikeGeminiUnrecognizedFlag is the second of 4 per-agent flag-rejection -// heuristics. The controller will consolidate the 4 instances after Chunk 5. -func looksLikeGeminiUnrecognizedFlag(stderr string) bool { - lower := strings.ToLower(stderr) - hasRejectPhrase := strings.Contains(lower, "unknown") || - strings.Contains(lower, "unrecognized") || - strings.Contains(lower, "invalid") - if !hasRejectPhrase { - return false - } - return strings.Contains(lower, "output-format") || strings.Contains(lower, "stream-json") -} diff --git a/cmd/entire/cli/agent/streaming_unrecognized_flag.go b/cmd/entire/cli/agent/streaming_unrecognized_flag.go new file mode 100644 index 0000000000..10992f6e55 --- /dev/null +++ b/cmd/entire/cli/agent/streaming_unrecognized_flag.go @@ -0,0 +1,33 @@ +package agent + +import "strings" + +// LooksLikeUnrecognizedFlag reports whether stderr matches the canonical +// CLI-rejected-a-flag pattern: a rejection phrase ("unknown flag", +// "unrecognized option", etc.) combined with at least one of the +// caller-specified flag-name keywords. +// +// Used by streaming text generators to detect when an older CLI version +// rejects a streaming-mode argv so the caller can fall back to a +// non-streaming path. Returns false if no rejection phrase is present or +// no keyword matches. +// +// Keyword matching is case-insensitive and substring-based; pass the +// distinguishing words from your streaming flags (e.g. "stream-json", +// "output-format") rather than the leading "--". +func LooksLikeUnrecognizedFlag(stderr string, flagKeywords ...string) bool { + lower := strings.ToLower(stderr) + rejection := strings.Contains(lower, "unknown flag") || + strings.Contains(lower, "unrecognized option") || + strings.Contains(lower, "unknown option") || + strings.Contains(lower, "invalid option") + if !rejection { + return false + } + for _, kw := range flagKeywords { + if strings.Contains(lower, strings.ToLower(kw)) { + return true + } + } + return false +} diff --git a/cmd/entire/cli/agent/streaming_unrecognized_flag_test.go b/cmd/entire/cli/agent/streaming_unrecognized_flag_test.go new file mode 100644 index 0000000000..61946327c8 --- /dev/null +++ b/cmd/entire/cli/agent/streaming_unrecognized_flag_test.go @@ -0,0 +1,44 @@ +package agent_test + +import ( + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +func TestLooksLikeUnrecognizedFlag(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + stderr string + keywords []string + want bool + }{ + {"unknown_flag_with_matching_keyword", "error: unknown flag: --stream-json", []string{"stream-json"}, true}, + {"unrecognized_option_with_matching_keyword", "unrecognized option '--json'", []string{"json"}, true}, + {"invalid_option_with_matching_keyword", "invalid option: --output-format", []string{"output-format"}, true}, + {"unknown_option_with_matching_keyword", "unknown option: --stream", []string{"stream"}, true}, + + {"case_insensitive_stderr", "ERROR: UNKNOWN FLAG: --JSON", []string{"json"}, true}, + {"case_insensitive_keyword", "unknown flag: --json", []string{"JSON"}, true}, + + {"rejection_present_but_no_matching_keyword", "unknown flag: --foo", []string{"json", "stream"}, false}, + {"matching_keyword_but_no_rejection", "json: parse error", []string{"json"}, false}, + + {"empty_stderr", "", []string{"json"}, false}, + {"empty_keywords", "unknown flag: --json", nil, false}, + + {"weak_rejection_words_alone_dont_match", "command unknown", []string{"unknown"}, false}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got := agent.LooksLikeUnrecognizedFlag(tc.stderr, tc.keywords...) + if got != tc.want { + t.Errorf("LooksLikeUnrecognizedFlag(%q, %v) = %v, want %v", tc.stderr, tc.keywords, got, tc.want) + } + }) + } +} From ebff9698193ce7158f42f3d8690db23e027cac79 Mon Sep 17 00:00:00 2001 From: Peyton Montei Date: Wed, 20 May 2026 11:46:10 -0400 Subject: [PATCH 09/12] refactor(agent): extract SafeErrorMessage shared helper Three of the four streaming parsers (codex, geminicli, copilotcli) had near-identical decode-with-fallback blocks that extracted operational error text from a raw NDJSON line without leaking the raw line itself. Consolidates to one shared helper that tries three commonly-observed message paths (`message`, `error.message`, `data.message`) and falls through to "unspecified error" on no match. Drops 2 `//nolint:errcheck` annotations along with the local decode blocks (codex, copilot). Gemini's branch was already using typed-event extraction (`ev.Error.Message`) so its conversion is a privacy-equivalent substitution to keep the codebase consistent. Cursor's parser does not use this pattern (its error envelope carries the message verbatim in `result.result`), so it is unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../cli/agent/codex/generate_streaming.go | 20 +--------- .../agent/copilotcli/generate_streaming.go | 26 +------------ .../cli/agent/geminicli/generate_streaming.go | 13 +------ cmd/entire/cli/agent/streaming_safe_error.go | 39 +++++++++++++++++++ .../cli/agent/streaming_safe_error_test.go | 37 ++++++++++++++++++ 5 files changed, 79 insertions(+), 56 deletions(-) create mode 100644 cmd/entire/cli/agent/streaming_safe_error.go create mode 100644 cmd/entire/cli/agent/streaming_safe_error_test.go diff --git a/cmd/entire/cli/agent/codex/generate_streaming.go b/cmd/entire/cli/agent/codex/generate_streaming.go index b70fa9a51d..7d5baac93c 100644 --- a/cmd/entire/cli/agent/codex/generate_streaming.go +++ b/cmd/entire/cli/agent/codex/generate_streaming.go @@ -75,25 +75,7 @@ func parseCodexStream(stdout io.Reader, progress agent.ProgressFn) (string, erro } case "turn.failed", "error": - var details struct { - Message string `json:"message"` - Error struct { - Message string `json:"message"` - } `json:"error"` - } - // Partial decode tolerated: if the schema changes we fall through - // to "unspecified error" rather than leaking the raw line (which - // may carry echoed user content or model-message fragments) into - // logs, telemetry, and *TextGenerationError.Stderr. - _ = json.Unmarshal(line, &details) //nolint:errcheck // see comment above - detail := details.Message - if detail == "" { - detail = details.Error.Message - } - if detail == "" { - detail = "unspecified error" - } - return "", fmt.Errorf("codex turn failed: %s", detail) + return "", fmt.Errorf("codex turn failed: %s", agent.SafeErrorMessage(line)) } } if err := scanner.Err(); err != nil { diff --git a/cmd/entire/cli/agent/copilotcli/generate_streaming.go b/cmd/entire/cli/agent/copilotcli/generate_streaming.go index 428dcce83e..22fbecd9da 100644 --- a/cmd/entire/cli/agent/copilotcli/generate_streaming.go +++ b/cmd/entire/cli/agent/copilotcli/generate_streaming.go @@ -127,31 +127,7 @@ func parseCopilotStream(stdout io.Reader, progress agent.ProgressFn) (string, er } case "error": - var details struct { - Message string `json:"message"` - Error struct { - Message string `json:"message"` - } `json:"error"` - Data struct { - Message string `json:"message"` - } `json:"data"` - } - // Partial decode tolerated: schema drift falls through to - // "unspecified error" rather than leaking the raw line (which - // may carry echoed user content or model fragments) into logs - // and *TextGenerationError.Stderr. - _ = json.Unmarshal(line, &details) //nolint:errcheck // see comment above - detail := details.Message - if detail == "" { - detail = details.Error.Message - } - if detail == "" { - detail = details.Data.Message - } - if detail == "" { - detail = "unspecified error" - } - return "", fmt.Errorf("copilot stream error: %s", detail) + return "", fmt.Errorf("copilot stream error: %s", agent.SafeErrorMessage(line)) } } if err := scanner.Err(); err != nil { diff --git a/cmd/entire/cli/agent/geminicli/generate_streaming.go b/cmd/entire/cli/agent/geminicli/generate_streaming.go index 5b7113aff4..b75eb56765 100644 --- a/cmd/entire/cli/agent/geminicli/generate_streaming.go +++ b/cmd/entire/cli/agent/geminicli/generate_streaming.go @@ -97,18 +97,7 @@ func parseGeminiStream(stdout io.Reader, progress agent.ProgressFn) (string, err case "result": stats = ev.Stats if ev.Status == "error" { - detail := "" - if ev.Error != nil { - detail = ev.Error.Message - } - if detail == "" { - // Partial decode tolerated: schema drift falls through to - // "unspecified error" rather than leaking raw lines (which - // may carry echoed user content or model fragments) into - // logs and *TextGenerationError.Stderr. - detail = "unspecified error" - } - return "", fmt.Errorf("gemini stream error: %s", detail) + return "", fmt.Errorf("gemini stream error: %s", agent.SafeErrorMessage(line)) } } } diff --git a/cmd/entire/cli/agent/streaming_safe_error.go b/cmd/entire/cli/agent/streaming_safe_error.go new file mode 100644 index 0000000000..cd13c7b16e --- /dev/null +++ b/cmd/entire/cli/agent/streaming_safe_error.go @@ -0,0 +1,39 @@ +package agent + +import "encoding/json" + +// SafeErrorMessage decodes operational error text from a raw NDJSON line +// without leaking the raw line itself. +// +// Stream parsers MUST NOT propagate raw protocol lines into error messages: +// CLI stderr/stdout can carry echoed user content or model-message +// fragments, which would then surface in logs, telemetry, and user-facing +// error output. This helper partially decodes the line against the three +// commonly-observed message paths (`message`, `error.message`, +// `data.message`) and falls back to "unspecified error" if none match. +// +// Callers pass the raw scanner line. Decode errors are tolerated: if the +// line is malformed or has none of these paths, the result is the safe +// sentinel rather than the raw bytes. +func SafeErrorMessage(line []byte) string { + var details struct { + Message string `json:"message"` + Error struct { + Message string `json:"message"` + } `json:"error"` + Data struct { + Message string `json:"message"` + } `json:"data"` + } + _ = json.Unmarshal(line, &details) //nolint:errcheck // partial decode tolerated; raw line MUST NOT leak + switch { + case details.Message != "": + return details.Message + case details.Error.Message != "": + return details.Error.Message + case details.Data.Message != "": + return details.Data.Message + default: + return "unspecified error" + } +} diff --git a/cmd/entire/cli/agent/streaming_safe_error_test.go b/cmd/entire/cli/agent/streaming_safe_error_test.go new file mode 100644 index 0000000000..919d9f1242 --- /dev/null +++ b/cmd/entire/cli/agent/streaming_safe_error_test.go @@ -0,0 +1,37 @@ +package agent_test + +import ( + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +func TestSafeErrorMessage(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + line string + want string + }{ + {"top_level_message", `{"type":"error","message":"model not found"}`, "model not found"}, + {"nested_error_message", `{"type":"turn.failed","error":{"message":"timed out"}}`, "timed out"}, + {"nested_data_message", `{"type":"error","data":{"message":"rate limited"}}`, "rate limited"}, + {"prefers_top_level_over_nested", `{"message":"top","error":{"message":"nested"}}`, "top"}, + {"prefers_error_over_data", `{"error":{"message":"e"},"data":{"message":"d"}}`, "e"}, + {"empty_strings_fall_through", `{"message":"","error":{"message":""},"data":{"message":""}}`, "unspecified error"}, + {"no_message_paths", `{"type":"thread.started","thread_id":"t"}`, "unspecified error"}, + {"malformed_json", `{not json at all`, "unspecified error"}, + {"empty_line", ``, "unspecified error"}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got := agent.SafeErrorMessage([]byte(tc.line)) + if got != tc.want { + t.Errorf("SafeErrorMessage(%q) = %q, want %q", tc.line, got, tc.want) + } + }) + } +} From 373e472b010c9da232222936f2393a057cd2db3a Mon Sep 17 00:00:00 2001 From: Peyton Montei Date: Wed, 20 May 2026 12:01:27 -0400 Subject: [PATCH 10/12] feat(explain): show cached-input-tokens on PhaseDone line MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the streaming parser populates GenerationProgress.CachedInputTokens (currently Cursor's result.usage.cacheReadTokens; potentially future agents), surface it in the "✓ Summary generated" line: Before: ✓ Summary generated (10.7s, 1.6k output tokens) After: ✓ Summary generated (10.7s, 1.6k output tokens, 4.6k cached input tokens) The cached-tokens clause is omitted when CachedInputTokens is zero, so agents that don't expose cache-read counters (Gemini, Copilot) keep the existing 2-clause format. Matches the graceful-degradation pattern already in place for PhaseFirstToken's parens block. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/entire/cli/explain.go | 13 +++++++--- cmd/entire/cli/explain_test.go | 46 ++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/cmd/entire/cli/explain.go b/cmd/entire/cli/explain.go index b818c502f8..b1df65cef6 100644 --- a/cmd/entire/cli/explain.go +++ b/cmd/entire/cli/explain.go @@ -1144,7 +1144,7 @@ func formatCheckpointSummaryError(err error, attempt *summaryAttempt) (string, [ var claudeErr *claudecode.ClaudeError switch { case errors.As(err, &claudeErr): - switch claudeErr.Kind { //nolint:exhaustive // ClaudeErrorUnknown handled by default + switch claudeErr.Kind { case claudecode.ClaudeErrorAuth: label := "Claude authentication failed" rows := []explainRow{ @@ -1469,9 +1469,16 @@ func (s *summaryProgressWriter) handle(p agent.GenerationProgress) { "%s Writing summary... (~%s tokens)", s.arrow, formatTokenCount(p.OutputTokens))) case agent.PhaseDone: + clauses := []string{ + formatMs(p.DurationMs), + formatTokenCount(p.OutputTokens) + " output tokens", + } + if p.CachedInputTokens > 0 { + clauses = append(clauses, formatTokenCount(p.CachedInputTokens)+" cached input tokens") + } s.printLine(fmt.Sprintf( - "%s Summary generated (%s, %s output tokens)", - s.check, formatMs(p.DurationMs), formatTokenCount(p.OutputTokens))) + "%s Summary generated (%s)", + s.check, strings.Join(clauses, ", "))) } } diff --git a/cmd/entire/cli/explain_test.go b/cmd/entire/cli/explain_test.go index d016c00533..a427cda7b6 100644 --- a/cmd/entire/cli/explain_test.go +++ b/cmd/entire/cli/explain_test.go @@ -6711,6 +6711,52 @@ func TestSummaryProgressWriter_FirstTokenWithOnlyCachedTokens(t *testing.T) { } } +func TestSummaryProgressWriter_DoneWithCachedTokens(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + attempt := newSummaryAttempt("cursor", 0) + pw := newSummaryProgressWriter(&buf, attempt) + + pw.handle(agent.GenerationProgress{ + Phase: agent.PhaseDone, + DurationMs: 10700, + OutputTokens: 1600, + CachedInputTokens: 4608, + }) + + out := buf.String() + if !strings.Contains(out, "1.6k output tokens") { + t.Errorf("output = %q, want '1.6k output tokens' clause", out) + } + if !strings.Contains(out, "4.6k cached input tokens") { + t.Errorf("output = %q, want '4.6k cached input tokens' clause when CachedInputTokens > 0", out) + } +} + +func TestSummaryProgressWriter_DoneWithoutCachedTokens(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + attempt := newSummaryAttempt("gemini", 0) + pw := newSummaryProgressWriter(&buf, attempt) + + pw.handle(agent.GenerationProgress{ + Phase: agent.PhaseDone, + DurationMs: 10700, + OutputTokens: 1600, + // CachedInputTokens: 0 — omitted + }) + + out := buf.String() + if !strings.Contains(out, "1.6k output tokens") { + t.Errorf("output = %q, want '1.6k output tokens' clause", out) + } + if strings.Contains(out, "cached input tokens") { + t.Errorf("output = %q, should not include cached tokens clause when CachedInputTokens == 0", out) + } +} + func TestSummaryProgressWriter_Accessible(t *testing.T) { // Cannot use t.Parallel() — t.Setenv mutates process-global state. // (Strictly, t.Setenv IS compatible with t.Parallel() in Go 1.17+ when From 3aa62ea077e0dc064333369bb409e99b7a39d1bd Mon Sep 17 00:00:00 2001 From: Peyton Montei Date: Wed, 20 May 2026 14:56:09 -0400 Subject: [PATCH 11/12] fix(explain): restore //nolint:exhaustive on ClaudeErrorKind switch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Gap 3 commit (8a247a9e1) accidentally dropped a load-bearing `//nolint:exhaustive // ClaudeErrorUnknown handled by default` annotation on the inner switch in formatCheckpointSummaryError. The exhaustive linter does not treat `default:` as covering an enum case — it requires every ClaudeErrorKind value to be enumerated explicitly. The default arm continues to handle ClaudeErrorUnknown plus any future-added kinds; the annotation just documents that intent and silences the lint. Caught by CI. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/entire/cli/explain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/entire/cli/explain.go b/cmd/entire/cli/explain.go index b1df65cef6..05804e38d4 100644 --- a/cmd/entire/cli/explain.go +++ b/cmd/entire/cli/explain.go @@ -1144,7 +1144,7 @@ func formatCheckpointSummaryError(err error, attempt *summaryAttempt) (string, [ var claudeErr *claudecode.ClaudeError switch { case errors.As(err, &claudeErr): - switch claudeErr.Kind { + switch claudeErr.Kind { //nolint:exhaustive // ClaudeErrorUnknown handled by default case claudecode.ClaudeErrorAuth: label := "Claude authentication failed" rows := []explainRow{ From ed9925cc927832179063f224b3dade2f77eda843 Mon Sep 17 00:00:00 2001 From: Peyton Montei Date: Wed, 20 May 2026 17:02:27 -0400 Subject: [PATCH 12/12] fix(agent): address PR #1230 review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five findings from cursor[bot] and Copilot reviewers on commit 3aa62ea07 (plus one defensive cleanup the existing tests didn't cover). 1. cursor parser silently returned ("", nil) on a successful result event with an empty result text — could happen if the CLI emits is_error:false without populating the result. Guard with an explicit `resultText == "" → "cursor stream produced no result text"` check (mirrors Codex's no-agent_message guard and RunIsolatedTextGeneratorCLI's empty-output rejection on the non-streaming path). Adds TestParseCursorStream_EmptyResultText so the guard is bisectable. 2. StreamingGeneratorTemplate.DisplayName was declared and set by all 4 agents but never read by Generate (all error messages and log attrs use AgentName). Drop the field and its 5 set-sites. The doc-comment rationale ("Cursor's agent is named 'cursor' but its binary is 'agent'") never materialized — Cursor's commit uses AgentName="cursor" because nothing in this code path consumes the binary name. Removing the dead configuration is cleaner than wiring it through retrospectively. 3. Generate's doc-comment claimed all errors are *TextGenerationError, but the pre-subprocess paths (StdoutPipe failure, non-cancelled cmd.Start failure) return plain wrapped errors and therefore drop the diagnostic fields. Tighten the doc to describe each branch's error shape explicitly — wrapping those paths in TextGenerationError would add empty Stderr/StdoutBytes anyway since the subprocess hadn't produced output yet. 4. countingReader was bypassed on the post-parse drain (io.Copy(io.Discard, stdout) instead of io.Copy(io.Discard, counter)), making StdoutBytes under-count when the parser bailed early on an in-stream error. Drain through the counter so the byte total reflects the full subprocess output for the explain layer's diagnostic. 5. testutil/streaming.go comment said "the project's CI is Linux/macOS only" but the repo includes e2e-windows.yml. Reword to describe the actual constraint (POSIX shell for the fake's heredoc generation) and note that the streaming agents these helpers exercise are not currently part of the Windows E2E workflow. cursor[bot]'s sixth finding (Gemini flag-rejection heuristic too broad) was already addressed by Gap 1 (a15fba6f2 → 5b4d9e... after rebase) — the shared LooksLikeUnrecognizedFlag helper enforces the canonical strict phrase set, eliminating the false-positive risk on bare "unknown"/"invalid". Co-Authored-By: Claude Opus 4.7 (1M context) --- .../cli/agent/codex/generate_streaming.go | 7 ++--- .../agent/copilotcli/generate_streaming.go | 7 ++--- .../cli/agent/cursor/generate_streaming.go | 10 ++++--- .../agent/cursor/generate_streaming_test.go | 19 +++++++++++++ .../cli/agent/geminicli/generate_streaming.go | 7 ++--- cmd/entire/cli/agent/streaming_template.go | 28 ++++++++++++------- .../cli/agent/streaming_template_test.go | 12 +++----- cmd/entire/cli/agent/testutil/streaming.go | 6 ++-- 8 files changed, 60 insertions(+), 36 deletions(-) diff --git a/cmd/entire/cli/agent/codex/generate_streaming.go b/cmd/entire/cli/agent/codex/generate_streaming.go index 7d5baac93c..9736cd16f6 100644 --- a/cmd/entire/cli/agent/codex/generate_streaming.go +++ b/cmd/entire/cli/agent/codex/generate_streaming.go @@ -119,10 +119,9 @@ func (c *CodexAgent) GenerateTextStreaming( progress agent.ProgressFn, ) (string, error) { tmpl := &agent.StreamingGeneratorTemplate{ - AgentName: "codex", - DisplayName: "codex", - BuildCmd: c.buildStreamCmd, - Parser: parseCodexStream, + AgentName: "codex", + BuildCmd: c.buildStreamCmd, + Parser: parseCodexStream, LooksLikeUnrecognizedFlag: func(stderr string) bool { return agent.LooksLikeUnrecognizedFlag(stderr, "json") }, diff --git a/cmd/entire/cli/agent/copilotcli/generate_streaming.go b/cmd/entire/cli/agent/copilotcli/generate_streaming.go index 22fbecd9da..ea7733fba1 100644 --- a/cmd/entire/cli/agent/copilotcli/generate_streaming.go +++ b/cmd/entire/cli/agent/copilotcli/generate_streaming.go @@ -165,10 +165,9 @@ func (c *CopilotCLIAgent) GenerateTextStreaming( progress agent.ProgressFn, ) (string, error) { tmpl := &agent.StreamingGeneratorTemplate{ - AgentName: "copilot-cli", - DisplayName: "copilot", - BuildCmd: c.buildStreamCmd, - Parser: parseCopilotStream, + AgentName: "copilot-cli", + BuildCmd: c.buildStreamCmd, + Parser: parseCopilotStream, LooksLikeUnrecognizedFlag: func(stderr string) bool { return agent.LooksLikeUnrecognizedFlag(stderr, "stream", "output-format") }, diff --git a/cmd/entire/cli/agent/cursor/generate_streaming.go b/cmd/entire/cli/agent/cursor/generate_streaming.go index 48d756cfb5..550e10dfc6 100644 --- a/cmd/entire/cli/agent/cursor/generate_streaming.go +++ b/cmd/entire/cli/agent/cursor/generate_streaming.go @@ -141,6 +141,9 @@ func parseCursorStream(stdout io.Reader, progress agent.ProgressFn) (string, err } return "", errors.New("cursor stream ended without a result event") } + if resultText == "" { + return "", errors.New("cursor stream produced no result text") + } if progress != nil { done := agent.GenerationProgress{Phase: agent.PhaseDone, DurationMs: durationMs} if usage != nil { @@ -160,10 +163,9 @@ func (c *CursorAgent) GenerateTextStreaming( progress agent.ProgressFn, ) (string, error) { tmpl := &agent.StreamingGeneratorTemplate{ - AgentName: "cursor", - DisplayName: "agent", - BuildCmd: c.buildStreamCmd, - Parser: parseCursorStream, + AgentName: "cursor", + BuildCmd: c.buildStreamCmd, + Parser: parseCursorStream, LooksLikeUnrecognizedFlag: func(stderr string) bool { return agent.LooksLikeUnrecognizedFlag(stderr, "stream-json", "stream-partial-output", "output-format") }, diff --git a/cmd/entire/cli/agent/cursor/generate_streaming_test.go b/cmd/entire/cli/agent/cursor/generate_streaming_test.go index bf4a79b153..998b02fe14 100644 --- a/cmd/entire/cli/agent/cursor/generate_streaming_test.go +++ b/cmd/entire/cli/agent/cursor/generate_streaming_test.go @@ -92,6 +92,25 @@ func TestParseCursorStream_MissingResult(t *testing.T) { } } +func TestParseCursorStream_EmptyResultText(t *testing.T) { + t.Parallel() + + // A successful (is_error:false) result event with an empty `result` + // field must error rather than returning ("", nil) — mirrors the + // guarantee Codex's parser makes and that RunIsolatedTextGeneratorCLI + // makes on the non-streaming path. + stream := `{"type":"system","subtype":"init","session_id":"t"} +{"type":"result","subtype":"success","duration_ms":1,"is_error":false,"result":"","session_id":"t"} +` + _, err := parseCursorStream(strings.NewReader(stream), nil) + if err == nil { + t.Fatal("expected error when result event carries empty result text") + } + if !strings.Contains(err.Error(), "no result text") { + t.Errorf("error %q should mention 'no result text'", err) + } +} + func TestCursorGenerateTextStreaming_Success(t *testing.T) { t.Parallel() diff --git a/cmd/entire/cli/agent/geminicli/generate_streaming.go b/cmd/entire/cli/agent/geminicli/generate_streaming.go index b75eb56765..5ee352c75a 100644 --- a/cmd/entire/cli/agent/geminicli/generate_streaming.go +++ b/cmd/entire/cli/agent/geminicli/generate_streaming.go @@ -138,10 +138,9 @@ func (g *GeminiCLIAgent) GenerateTextStreaming( progress agent.ProgressFn, ) (string, error) { tmpl := &agent.StreamingGeneratorTemplate{ - AgentName: "gemini", - DisplayName: "gemini", - BuildCmd: g.buildStreamCmd, - Parser: parseGeminiStream, + AgentName: "gemini", + BuildCmd: g.buildStreamCmd, + Parser: parseGeminiStream, LooksLikeUnrecognizedFlag: func(stderr string) bool { return agent.LooksLikeUnrecognizedFlag(stderr, "output-format", "stream-json") }, diff --git a/cmd/entire/cli/agent/streaming_template.go b/cmd/entire/cli/agent/streaming_template.go index 2742504996..3d740c81a0 100644 --- a/cmd/entire/cli/agent/streaming_template.go +++ b/cmd/entire/cli/agent/streaming_template.go @@ -24,15 +24,10 @@ import ( // Fields must be non-nil before Generate is called; nil values cause // Generate to return ErrTemplateMisconfigured. type StreamingGeneratorTemplate struct { - // AgentName is an identifier used in log entries (e.g., "codex"). + // AgentName is an identifier used in log entries and the error-message + // prefix wrapped into *TextGenerationError (e.g., "codex"). AgentName string - // DisplayName is the user-facing CLI binary name used in *TextGenerationError - // wrapping (e.g., "codex"). Often the same as AgentName but kept separate - // for cases where they differ (e.g., Cursor's agent is named "cursor" - // but its binary is "agent"). - DisplayName string - // BuildCmd constructs the *exec.Cmd for one streaming call. Implementations // MUST set cmd.Stdin to the prompt and cmd.Args to the agent's // streaming-mode invocation. The template will set cmd.Dir = os.TempDir() @@ -61,8 +56,18 @@ var ErrTemplateMisconfigured = errors.New("streaming template misconfigured") var ErrUnrecognizedStreamingFlag = errors.New("CLI rejected streaming flag") // Generate runs one streaming generation and returns the final result text. -// On error, returns *TextGenerationError carrying captured stderr and the -// stdout byte count, matching RunIsolatedTextGeneratorCLI's error shape. +// +// Error shapes by failure point: +// - Pre-subprocess (StdoutPipe failure): plain wrapped error, since no +// stderr/stdout exists yet to diagnose with. +// - cmd.Start failure: wrapped error, or *TextGenerationError when ctx is +// already cancelled at that point. +// - Anything after Start (parse error, non-zero exit, ctx cancellation): +// *TextGenerationError carrying captured stderr and the stdout byte +// count from countingReader, matching RunIsolatedTextGeneratorCLI's +// error shape so the explain layer's diagnostic path can read both. +// - LooksLikeUnrecognizedFlag predicate match: ErrUnrecognizedStreamingFlag +// sentinel so the caller can fall back to non-streaming. func (t *StreamingGeneratorTemplate) Generate( ctx context.Context, prompt, model string, @@ -97,7 +102,10 @@ func (t *StreamingGeneratorTemplate) Generate( counter := &countingReader{r: stdout} result, parseErr := t.Parser(counter, progress) - if _, drainErr := io.Copy(io.Discard, stdout); drainErr != nil { + // Drain through the counter so StdoutBytes reflects the full subprocess + // output even when the parser exited early (e.g. on a recognized + // in-stream error). Reading from stdout directly would bypass counter.n. + if _, drainErr := io.Copy(io.Discard, counter); drainErr != nil { logging.Debug(ctx, "draining stream stdout", slog.String("agent", t.AgentName), slog.String("error", drainErr.Error())) diff --git a/cmd/entire/cli/agent/streaming_template_test.go b/cmd/entire/cli/agent/streaming_template_test.go index 354d626e77..62b5af9e6c 100644 --- a/cmd/entire/cli/agent/streaming_template_test.go +++ b/cmd/entire/cli/agent/streaming_template_test.go @@ -17,8 +17,7 @@ func TestStreamingGeneratorTemplate_Generate_Success(t *testing.T) { parsed := false tmpl := &agent.StreamingGeneratorTemplate{ - AgentName: "fake", - DisplayName: "fake", + AgentName: "fake", BuildCmd: func(ctx context.Context, _, _ string) *exec.Cmd { return testutil.FakeStreamCmd("hello\nworld\n", "", 0)(ctx, "fake", []string{}...) }, @@ -58,8 +57,7 @@ func TestStreamingGeneratorTemplate_Generate_UnrecognizedFlagFallback(t *testing t.Parallel() tmpl := &agent.StreamingGeneratorTemplate{ - AgentName: "fake", - DisplayName: "fake", + AgentName: "fake", BuildCmd: func(ctx context.Context, _, _ string) *exec.Cmd { return testutil.FakeStreamCmd("", "error: unknown flag: --stream-json", 1)(ctx, "fake", []string{}...) }, @@ -82,8 +80,7 @@ func TestStreamingGeneratorTemplate_Generate_NonZeroExitWrapsError(t *testing.T) t.Parallel() tmpl := &agent.StreamingGeneratorTemplate{ - AgentName: "fake", - DisplayName: "fake", + AgentName: "fake", BuildCmd: func(ctx context.Context, _, _ string) *exec.Cmd { return testutil.FakeStreamCmd("partial\n", "boom\n", 1)(ctx, "fake", []string{}...) }, @@ -113,8 +110,7 @@ func TestStreamingGeneratorTemplate_Generate_ContextCancelled(t *testing.T) { cancel() tmpl := &agent.StreamingGeneratorTemplate{ - AgentName: "fake", - DisplayName: "fake", + AgentName: "fake", BuildCmd: func(ctx context.Context, _, _ string) *exec.Cmd { return testutil.FakeStreamCmd("ok\n", "", 0)(ctx, "fake", []string{}...) }, diff --git a/cmd/entire/cli/agent/testutil/streaming.go b/cmd/entire/cli/agent/testutil/streaming.go index 874c8c94fd..9e3d0d483e 100644 --- a/cmd/entire/cli/agent/testutil/streaming.go +++ b/cmd/entire/cli/agent/testutil/streaming.go @@ -11,8 +11,10 @@ import ( // FakeStreamCmd returns a CommandRunner factory whose *exec.Cmd, when // Start()'d and Wait()'d, produces stdout/stderr/exit-code as configured. -// Implementation uses `sh -c` to write fixture data; on Windows runners -// we would need PowerShell, but the project's CI is Linux/macOS only. +// Implementation assumes a POSIX shell (`sh -c`) to write fixture data; +// it is not usable from a Windows runner. The streaming agents these +// helpers test are not currently exercised by the Windows E2E workflow +// (e2e-windows.yml), so a POSIX-only fake is sufficient. func FakeStreamCmd(stdout, stderr string, exitCode int) func(ctx context.Context, name string, args ...string) *exec.Cmd { return func(ctx context.Context, _ string, _ ...string) *exec.Cmd { script := BuildFakeShellScript(stdout, stderr, exitCode)