diff --git a/cmd/entire/cli/agent/claudecode/generate_streaming_test.go b/cmd/entire/cli/agent/claudecode/generate_streaming_test.go index c45d28d62c..1faa960ca8 100644 --- a/cmd/entire/cli/agent/claudecode/generate_streaming_test.go +++ b/cmd/entire/cli/agent/claudecode/generate_streaming_test.go @@ -6,10 +6,10 @@ import ( "os" "os/exec" "path/filepath" - "strings" "testing" "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/agent/testutil" ) func TestGenerateTextStreaming_Success(t *testing.T) { @@ -21,7 +21,7 @@ func TestGenerateTextStreaming_Success(t *testing.T) { } agentInst := &ClaudeCodeAgent{ - CommandRunner: fakeStreamCmd(string(fixture), "", 0), + CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0), } var phases []agent.ProgressPhase @@ -57,8 +57,8 @@ func TestGenerateTextStreaming_FallbackOnUnrecognizedFlag(t *testing.T) { // Old CLI: exit non-zero with stderr complaining about --output-format=stream-json. // Fallback path is exercised by routing the *second* call (GenerateText) to a // canned non-streaming envelope. - streamCall := fakeStreamCmd("", "error: unknown flag: --output-format=stream-json", 1) - nonStreamCall := fakeStreamCmd(`{"is_error":false,"result":"fallback ok","subtype":"success"}`, "", 0) + streamCall := testutil.FakeStreamCmd("", "error: unknown flag: --output-format=stream-json", 1) + nonStreamCall := testutil.FakeStreamCmd(`{"is_error":false,"result":"fallback ok","subtype":"success"}`, "", 0) calls := 0 agentInst := &ClaudeCodeAgent{ CommandRunner: func(ctx context.Context, name string, args ...string) *exec.Cmd { @@ -100,7 +100,7 @@ func TestGenerateTextStreaming_EnvelopeErrorSurfaced(t *testing.T) { } agentInst := &ClaudeCodeAgent{ - CommandRunner: fakeStreamCmd(string(fixture), "", 0), + CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0), } _, err = agentInst.GenerateTextStreaming(context.Background(), "test", "haiku", nil) if err == nil { @@ -135,62 +135,3 @@ func equalPhases(a, b []agent.ProgressPhase) bool { } return true } - -// fakeStreamCmd returns a CommandRunner factory whose *exec.Cmd, when Start()'d -// and Wait()'d, produces stdout/stderr/exit-code as configured. Implementation -// uses 'sh -c' to write fixture data; on Windows runners we'd need PowerShell, -// but our CI is Linux/macOS. -func fakeStreamCmd(stdout, stderr string, exitCode int) func(ctx context.Context, name string, args ...string) *exec.Cmd { - return func(ctx context.Context, _ string, _ ...string) *exec.Cmd { - script := buildFakeShellScript(stdout, stderr, exitCode) - return exec.CommandContext(ctx, "sh", "-c", script) - } -} - -func buildFakeShellScript(stdout, stderr string, exitCode int) string { - var sb strings.Builder - if stdout != "" { - sb.WriteString("cat <<'__EOF__'\n") - sb.WriteString(stdout) - if !strings.HasSuffix(stdout, "\n") { - sb.WriteString("\n") - } - sb.WriteString("__EOF__\n") - } - if stderr != "" { - sb.WriteString("cat <<'__EOF__' 1>&2\n") - sb.WriteString(stderr) - if !strings.HasSuffix(stderr, "\n") { - sb.WriteString("\n") - } - sb.WriteString("__EOF__\n") - } - if exitCode != 0 { - sb.WriteString("exit ") - sb.WriteString(itoa(exitCode)) - sb.WriteString("\n") - } - return sb.String() -} - -func itoa(n int) string { - if n == 0 { - return "0" - } - neg := n < 0 - if neg { - n = -n - } - var buf [12]byte - i := len(buf) - for n > 0 { - i-- - buf[i] = byte('0' + n%10) - n /= 10 - } - if neg { - i-- - buf[i] = '-' - } - return string(buf[i:]) -} diff --git a/cmd/entire/cli/agent/codex/generate_streaming.go b/cmd/entire/cli/agent/codex/generate_streaming.go new file mode 100644 index 0000000000..9736cd16f6 --- /dev/null +++ b/cmd/entire/cli/agent/codex/generate_streaming.go @@ -0,0 +1,153 @@ +package codex + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os/exec" + "strings" + "time" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +const streamBufferMax = 4 * 1024 * 1024 // 4 MiB + +// parseCodexStream consumes `codex exec --json` NDJSON output, dispatches +// progress callbacks, and returns the final agent_message text. +func parseCodexStream(stdout io.Reader, progress agent.ProgressFn) (string, error) { + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 64*1024), streamBufferMax) + + var ( + resultText string + sawTurnComplete bool + usage *codexStreamUsage + turnStartedAt time.Time + turnDuration time.Duration + malformed int + ) + + dispatch := func(p agent.GenerationProgress) { + if progress != nil { + progress(p) + } + } + + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var ev codexStreamEvent + if err := json.Unmarshal(line, &ev); err != nil { + // Codex may emit transient noise (blank lines, partial flushes); a + // schema-incompatible line is recoverable per-event but tracked so + // protocol regressions surface in the "no agent_message" error + // instead of disappearing silently. + malformed++ + continue + } + + switch ev.Type { + case "turn.started": + turnStartedAt = time.Now() + dispatch(agent.GenerationProgress{Phase: agent.PhaseConnecting}) + + case "item.completed": + // Codex emits the full agent_message in one item; we capture + // the text but defer PhaseFirstToken until turn.completed so + // the cached_input_tokens usage clause can be attached. The + // CLI buffers and emits items in one chunk per turn, so there + // is no incremental "first token" signal to surface anyway. + if ev.Item != nil && ev.Item.Type == "agent_message" { + resultText = ev.Item.Text + } + + case "turn.completed": + sawTurnComplete = true + usage = ev.Usage + if !turnStartedAt.IsZero() { + turnDuration = time.Since(turnStartedAt) + } + + case "turn.failed", "error": + return "", fmt.Errorf("codex turn failed: %s", agent.SafeErrorMessage(line)) + } + } + if err := scanner.Err(); err != nil { + return "", fmt.Errorf("reading codex stream: %w", err) + } + if !sawTurnComplete { + return "", errors.New("codex stream ended without a turn.completed event") + } + if resultText == "" { + if malformed > 0 { + return "", fmt.Errorf("codex stream produced no agent_message (%d malformed lines skipped)", malformed) + } + return "", errors.New("codex stream produced no agent_message") + } + if progress != nil { + firstToken := agent.GenerationProgress{Phase: agent.PhaseFirstToken} + if usage != nil { + firstToken.InputTokens = usage.InputTokens + firstToken.CachedInputTokens = usage.CachedInputTokens + } + dispatch(firstToken) + + done := agent.GenerationProgress{Phase: agent.PhaseDone} + if usage != nil { + done.OutputTokens = usage.OutputTokens + done.InputTokens = usage.InputTokens + done.CachedInputTokens = usage.CachedInputTokens + } + if turnDuration > 0 { + done.DurationMs = int(turnDuration.Milliseconds()) + } + dispatch(done) + } + return resultText, nil +} + +// GenerateTextStreaming implements agent.StreamingTextGenerator. +func (c *CodexAgent) GenerateTextStreaming( + ctx context.Context, + prompt, model string, + progress agent.ProgressFn, +) (string, error) { + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "codex", + BuildCmd: c.buildStreamCmd, + Parser: parseCodexStream, + LooksLikeUnrecognizedFlag: func(stderr string) bool { + return agent.LooksLikeUnrecognizedFlag(stderr, "json") + }, + } + + result, err := tmpl.Generate(ctx, prompt, model, progress) + if err != nil { + if errors.Is(err, agent.ErrUnrecognizedStreamingFlag) { + return c.GenerateText(ctx, prompt, model) + } + return "", fmt.Errorf("codex streaming generate: %w", err) + } + return result, nil +} + +func (c *CodexAgent) buildStreamCmd(ctx context.Context, prompt, model string) *exec.Cmd { + commandRunner := c.CommandRunner + if commandRunner == nil { + commandRunner = exec.CommandContext + } + args := []string{"exec", "--skip-git-repo-check", "--json"} + if model != "" { + args = append(args, "--model", model) + } + args = append(args, "-") + cmd := commandRunner(ctx, "codex", args...) + cmd.Stdin = strings.NewReader(prompt) + return cmd +} diff --git a/cmd/entire/cli/agent/codex/generate_streaming_test.go b/cmd/entire/cli/agent/codex/generate_streaming_test.go new file mode 100644 index 0000000000..a2add02644 --- /dev/null +++ b/cmd/entire/cli/agent/codex/generate_streaming_test.go @@ -0,0 +1,132 @@ +package codex + +import ( + "bytes" + "context" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/agent/testutil" +) + +func TestParseCodexStream_Success(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + var phases []agent.ProgressPhase + result, err := parseCodexStream(bytes.NewReader(data), func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + }) + if err != nil { + t.Fatalf("parse: %v", err) + } + if result != "Hello, world." { + t.Errorf("result = %q, want %q", result, "Hello, world.") + } + + counts := map[agent.ProgressPhase]int{} + for _, p := range phases { + counts[p]++ + } + if counts[agent.PhaseConnecting] != 1 { + t.Errorf("PhaseConnecting count = %d, want 1", counts[agent.PhaseConnecting]) + } + if counts[agent.PhaseFirstToken] != 1 { + t.Errorf("PhaseFirstToken count = %d, want 1", counts[agent.PhaseFirstToken]) + } + if counts[agent.PhaseDone] != 1 { + t.Errorf("PhaseDone count = %d, want 1", counts[agent.PhaseDone]) + } +} + +func TestParseCodexStream_ErrorEnvelope(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_error.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + _, err = parseCodexStream(bytes.NewReader(data), nil) + if err == nil { + t.Fatal("expected error from error envelope") + } + if !strings.Contains(err.Error(), "model not found") { + t.Errorf("error %q should mention 'model not found'", err) + } +} + +func TestParseCodexStream_MissingTurnCompleted(t *testing.T) { + t.Parallel() + + stream := `{"type":"thread.started","thread_id":"t"} +{"type":"item.completed","item":{"id":"i","type":"agent_message","text":"partial"}} +` + _, err := parseCodexStream(strings.NewReader(stream), nil) + if err == nil { + t.Fatal("expected error when stream lacks turn.completed") + } +} + +func TestCodexGenerateTextStreaming_Success(t *testing.T) { + t.Parallel() + + fixture, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + c := &CodexAgent{ + CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0), + } + + var phases []agent.ProgressPhase + result, err := c.GenerateTextStreaming(context.Background(), "test prompt", "haiku", func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result != "Hello, world." { + t.Errorf("result = %q, want %q", result, "Hello, world.") + } + if len(phases) != 3 { + t.Errorf("phases = %v (count %d), want 3 (Connecting, FirstToken, Done)", phases, len(phases)) + } +} + +func TestCodexGenerateTextStreaming_FallbackOnUnrecognizedFlag(t *testing.T) { + t.Parallel() + + streamCall := testutil.FakeStreamCmd("", "error: unknown flag: --json", 1) + nonStreamCall := testutil.FakeStreamCmd("fallback response", "", 0) + calls := 0 + c := &CodexAgent{ + CommandRunner: func(ctx context.Context, name string, args ...string) *exec.Cmd { + calls++ + if calls == 1 { + return streamCall(ctx, name, args...) + } + return nonStreamCall(ctx, name, args...) + }, + } + + result, err := c.GenerateTextStreaming(context.Background(), "test", "haiku", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(result, "fallback") { + t.Errorf("result = %q, want substring 'fallback'", result) + } + if calls != 2 { + t.Errorf("calls = %d, want 2 (streaming + fallback)", calls) + } +} diff --git a/cmd/entire/cli/agent/codex/stream_response.go b/cmd/entire/cli/agent/codex/stream_response.go new file mode 100644 index 0000000000..1a84198b2b --- /dev/null +++ b/cmd/entire/cli/agent/codex/stream_response.go @@ -0,0 +1,27 @@ +package codex + +// codexStreamEvent is one decoded line of `codex exec --json` output. +// Fields are populated based on the event Type/Item.Type. +type codexStreamEvent struct { + Type string `json:"type"` + ThreadID string `json:"thread_id,omitempty"` + Item *codexStreamItem `json:"item,omitempty"` + Usage *codexStreamUsage `json:"usage,omitempty"` +} + +// codexStreamItem appears inside type=item.completed events. For summary +// generation we care only about Type="agent_message" items, which carry +// the model's response in Text. +type codexStreamItem struct { + ID string `json:"id"` + Type string `json:"type"` // "agent_message" | "command_execution" | ... + Text string `json:"text"` // populated for agent_message +} + +// codexStreamUsage appears inside the terminal type=turn.completed event. +type codexStreamUsage struct { + InputTokens int `json:"input_tokens,omitempty"` + CachedInputTokens int `json:"cached_input_tokens,omitempty"` + OutputTokens int `json:"output_tokens,omitempty"` + ReasoningOutputTokens int `json:"reasoning_output_tokens,omitempty"` +} diff --git a/cmd/entire/cli/agent/codex/testdata/stream_error.jsonl b/cmd/entire/cli/agent/codex/testdata/stream_error.jsonl new file mode 100644 index 0000000000..e2799e9b06 --- /dev/null +++ b/cmd/entire/cli/agent/codex/testdata/stream_error.jsonl @@ -0,0 +1,4 @@ +{"type":"thread.started","thread_id":"test-thread"} +{"type":"turn.started"} +{"type":"error","message":"invalid_request_error: model not found"} +{"type":"turn.failed","error":{"message":"model not found"}} diff --git a/cmd/entire/cli/agent/codex/testdata/stream_success.jsonl b/cmd/entire/cli/agent/codex/testdata/stream_success.jsonl new file mode 100644 index 0000000000..a13b4004b4 --- /dev/null +++ b/cmd/entire/cli/agent/codex/testdata/stream_success.jsonl @@ -0,0 +1,4 @@ +{"type":"thread.started","thread_id":"test-thread"} +{"type":"turn.started"} +{"type":"item.completed","item":{"id":"item_0","type":"agent_message","text":"Hello, world."}} +{"type":"turn.completed","usage":{"input_tokens":9,"cached_input_tokens":1234,"output_tokens":3,"reasoning_output_tokens":0}} diff --git a/cmd/entire/cli/agent/copilotcli/generate_streaming.go b/cmd/entire/cli/agent/copilotcli/generate_streaming.go new file mode 100644 index 0000000000..ea7733fba1 --- /dev/null +++ b/cmd/entire/cli/agent/copilotcli/generate_streaming.go @@ -0,0 +1,198 @@ +package copilotcli + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os/exec" + "strings" + "time" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +const streamBufferMax = 4 * 1024 * 1024 // 4 MiB + +// parseCopilotStream consumes `copilot --output-format json --stream on` NDJSON +// output, dispatches progress callbacks, and returns the concatenated assistant +// content. +// +// Real-CLI shape (GitHub Copilot CLI 1.0.48): +// - session.* / user.message → ignored (bootstrap + echo) +// - assistant.turn_start → PhaseConnecting +// - assistant.message_start → ignored (ephemeral marker) +// - assistant.message_delta (deltaContent) → PhaseFirstToken (1st) / +// PhaseGenerating (subsequent), +// content concatenated +// - assistant.message (non-ephemeral, outputTokens) → captures token count +// - assistant.turn_end → turn marker, no-op +// - result (exitCode, usage.totalApiDurationMs) → PhaseDone (terminal); +// non-zero exitCode is +// treated as a stream error +// +// The "error" event shape is defensive: Copilot's current public stream +// typically reports failures via non-zero result.exitCode plus a stderr +// message, not an in-stream JSON envelope. The error-event handler is kept +// for forward-compat with potential future CLI versions that adopt one. +func parseCopilotStream(stdout io.Reader, progress agent.ProgressFn) (string, error) { + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 64*1024), streamBufferMax) + + var ( + result strings.Builder + connectingFired bool + firstTokenFired bool + sawTerminal bool + usage *copilotStreamUsage + outputTokens int + streamedChars int + malformed int + start = time.Now() + ) + + dispatch := func(p agent.GenerationProgress) { + if progress != nil { + progress(p) + } + } + + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var ev copilotStreamEvent + if err := json.Unmarshal(line, &ev); err != nil { + // Copilot may emit transient noise (blank lines, partial flushes); a + // schema-incompatible line is recoverable per-event but tracked so + // protocol regressions surface in the no-content error instead of + // disappearing silently. + malformed++ + continue + } + + switch ev.Type { + case "assistant.turn_start": + // Copilot emits one assistant.turn_start per tool-using turn. The + // progress UI expects PhaseConnecting once per generation, so we + // fire only on the first turn. + if !connectingFired { + connectingFired = true + dispatch(agent.GenerationProgress{Phase: agent.PhaseConnecting}) + } + + case "assistant.message_delta": + if ev.Data == nil || ev.Data.DeltaContent == "" { + continue + } + if !firstTokenFired { + firstTokenFired = true + // Copilot's delta events carry no TTFT, input-token, or + // cached-token data; the progress writer's graceful + // degradation will render "Provider responded -- generating..." + // without parens. + dispatch(agent.GenerationProgress{Phase: agent.PhaseFirstToken}) + } else { + // Estimate running output tokens from accumulated character + // count so the writer's "Writing summary... (~Nk tokens)" + // counter ticks during streaming (matches Claude's pattern + // from PR #964). Authoritative output_tokens still arrives + // later via assistant.message events and overrides on Done. + streamedChars += len(ev.Data.DeltaContent) + dispatch(agent.GenerationProgress{Phase: agent.PhaseGenerating, OutputTokens: streamedChars / 4}) + } + result.WriteString(ev.Data.DeltaContent) + + case "assistant.message": + // Each non-ephemeral assistant.message carries the outputTokens for + // one turn. Multi-turn generations (tool-using runs) emit several; + // sum them so PhaseDone reports the total output tokens for the + // whole generation. + if ev.Data != nil && ev.Data.OutputTokens > 0 { + outputTokens += ev.Data.OutputTokens + } + + case "result": + sawTerminal = true + usage = ev.Usage + if ev.ExitCode != 0 { + // Partial decode tolerated: schema drift falls through to + // "unspecified error" rather than leaking the raw line (which + // may carry echoed user content or model fragments) into logs + // and *TextGenerationError.Stderr. + return "", fmt.Errorf("copilot stream error: non-zero exit code %d", ev.ExitCode) + } + + case "error": + return "", fmt.Errorf("copilot stream error: %s", agent.SafeErrorMessage(line)) + } + } + if err := scanner.Err(); err != nil { + return "", fmt.Errorf("reading copilot stream: %w", err) + } + if !firstTokenFired { + if malformed > 0 { + return "", fmt.Errorf("copilot stream produced no assistant content (%d malformed lines skipped)", malformed) + } + return "", errors.New("copilot stream produced no assistant content") + } + if progress != nil { + done := agent.GenerationProgress{Phase: agent.PhaseDone} + if outputTokens > 0 { + done.OutputTokens = outputTokens + } + if usage != nil && usage.TotalAPIDurationMs > 0 { + done.DurationMs = usage.TotalAPIDurationMs + } + if done.DurationMs == 0 { + // EOF / no-terminal fallback: compute locally so the progress + // writer always has a duration to render. + done.DurationMs = int(time.Since(start).Milliseconds()) + } + dispatch(done) + } + _ = sawTerminal // kept for diagnostic clarity; absence is acceptable (EOF fallback) + return result.String(), nil +} + +// GenerateTextStreaming implements agent.StreamingTextGenerator. +func (c *CopilotCLIAgent) GenerateTextStreaming( + ctx context.Context, + prompt, model string, + progress agent.ProgressFn, +) (string, error) { + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "copilot-cli", + BuildCmd: c.buildStreamCmd, + Parser: parseCopilotStream, + LooksLikeUnrecognizedFlag: func(stderr string) bool { + return agent.LooksLikeUnrecognizedFlag(stderr, "stream", "output-format") + }, + } + + result, err := tmpl.Generate(ctx, prompt, model, progress) + if err != nil { + if errors.Is(err, agent.ErrUnrecognizedStreamingFlag) { + return c.GenerateText(ctx, prompt, model) + } + return "", fmt.Errorf("copilot streaming generate: %w", err) + } + return result, nil +} + +func (c *CopilotCLIAgent) buildStreamCmd(ctx context.Context, prompt, model string) *exec.Cmd { + commandRunner := c.CommandRunner + if commandRunner == nil { + commandRunner = exec.CommandContext + } + args := []string{"--output-format", "json", "--stream", "on", "--allow-all-tools", "--disable-builtin-mcps"} + if model != "" { + args = append(args, "--model", model) + } + cmd := commandRunner(ctx, "copilot", args...) + cmd.Stdin = strings.NewReader(prompt) + return cmd +} diff --git a/cmd/entire/cli/agent/copilotcli/generate_streaming_test.go b/cmd/entire/cli/agent/copilotcli/generate_streaming_test.go new file mode 100644 index 0000000000..14a3cf58ef --- /dev/null +++ b/cmd/entire/cli/agent/copilotcli/generate_streaming_test.go @@ -0,0 +1,148 @@ +package copilotcli + +import ( + "bytes" + "context" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/agent/testutil" +) + +func TestParseCopilotStream_Success(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + var phases []agent.ProgressPhase + var doneEvent agent.GenerationProgress + result, err := parseCopilotStream(bytes.NewReader(data), func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + if p.Phase == agent.PhaseDone { + doneEvent = p + } + }) + if err != nil { + t.Fatalf("parse: %v", err) + } + if !strings.Contains(result, "Hello") || !strings.Contains(result, "world") { + t.Errorf("result = %q, want concatenated assistant content", result) + } + + counts := map[agent.ProgressPhase]int{} + for _, p := range phases { + counts[p]++ + } + if counts[agent.PhaseConnecting] != 1 { + t.Errorf("PhaseConnecting count = %d, want 1", counts[agent.PhaseConnecting]) + } + if counts[agent.PhaseFirstToken] != 1 { + t.Errorf("PhaseFirstToken count = %d, want 1", counts[agent.PhaseFirstToken]) + } + if counts[agent.PhaseGenerating] < 1 { + t.Errorf("PhaseGenerating count = %d, want at least 1", counts[agent.PhaseGenerating]) + } + if counts[agent.PhaseDone] != 1 { + t.Errorf("PhaseDone count = %d, want 1 (emitted at terminal result event)", counts[agent.PhaseDone]) + } + + // Verify that token + duration data from the real CLI's terminal events is + // carried into the PhaseDone progress event (assistant.message.outputTokens + // and result.usage.totalApiDurationMs). + if doneEvent.OutputTokens != 7 { + t.Errorf("Done.OutputTokens = %d, want 7 (from assistant.message)", doneEvent.OutputTokens) + } + if doneEvent.DurationMs != 2546 { + t.Errorf("Done.DurationMs = %d, want 2546 (from result.usage.totalApiDurationMs)", doneEvent.DurationMs) + } +} + +func TestParseCopilotStream_ErrorEnvelope(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_error.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + _, err = parseCopilotStream(bytes.NewReader(data), nil) + if err == nil { + t.Fatal("expected error from error envelope") + } + // Fix A: the error must surface the operational message, not the raw JSON line. + if !strings.Contains(err.Error(), "rate limited") { + t.Errorf("error %q should mention 'rate limited' from the operational message field", err) + } +} + +func TestParseCopilotStream_EmptyStream(t *testing.T) { + t.Parallel() + + _, err := parseCopilotStream(strings.NewReader(""), nil) + if err == nil { + t.Fatal("expected error from empty stream (no assistant content)") + } +} + +func TestCopilotCLIGenerateTextStreaming_Success(t *testing.T) { + t.Parallel() + + fixture, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + c := &CopilotCLIAgent{ + CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0), + } + + var phases []agent.ProgressPhase + result, err := c.GenerateTextStreaming(context.Background(), "test prompt", "haiku", func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(result, "Hello") || !strings.Contains(result, "world") { + t.Errorf("result = %q, want concatenated assistant content", result) + } + // Expect Connecting, FirstToken, Generating (>=1), Done. + if len(phases) < 4 { + t.Errorf("phases = %v (count %d), want >= 4 (Connecting, FirstToken, Generating+, Done)", phases, len(phases)) + } +} + +func TestCopilotCLIGenerateTextStreaming_FallbackOnUnrecognizedFlag(t *testing.T) { + t.Parallel() + + streamCall := testutil.FakeStreamCmd("", "error: unknown flag: --stream", 1) + nonStreamCall := testutil.FakeStreamCmd("fallback response", "", 0) + calls := 0 + c := &CopilotCLIAgent{ + CommandRunner: func(ctx context.Context, name string, args ...string) *exec.Cmd { + calls++ + if calls == 1 { + return streamCall(ctx, name, args...) + } + return nonStreamCall(ctx, name, args...) + }, + } + + result, err := c.GenerateTextStreaming(context.Background(), "test", "haiku", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(result, "fallback") { + t.Errorf("result = %q, want substring 'fallback'", result) + } + if calls != 2 { + t.Errorf("calls = %d, want 2 (streaming + fallback)", calls) + } +} diff --git a/cmd/entire/cli/agent/copilotcli/stream_response.go b/cmd/entire/cli/agent/copilotcli/stream_response.go new file mode 100644 index 0000000000..253b840ad1 --- /dev/null +++ b/cmd/entire/cli/agent/copilotcli/stream_response.go @@ -0,0 +1,58 @@ +package copilotcli + +import "time" + +// copilotStreamEvent is one decoded line of `copilot --output-format json +// --stream on` NDJSON output. Fields are populated based on Type. +// +// Real-CLI shape (GitHub Copilot CLI 1.0.48): +// - session.mcp_servers_loaded / session.skills_loaded / session.tools_updated +// → session bootstrap, ignored +// - user.message → echoed input, ignored +// - assistant.turn_start → turn begins +// - assistant.message_start (ephemeral) → announces messageId +// - assistant.message_delta (ephemeral, deltaContent) → incremental tokens +// - assistant.message (non-ephemeral, outputTokens) → consolidated message +// with final outputTokens count +// - assistant.turn_end → turn marker +// - result (terminal, usage.totalApiDurationMs etc.) → exitCode + duration +type copilotStreamEvent struct { + Type string `json:"type"` + Timestamp time.Time `json:"timestamp,omitempty"` + Data *copilotStreamData `json:"data,omitempty"` + ID string `json:"id,omitempty"` + ParentID string `json:"parentId,omitempty"` + Ephemeral bool `json:"ephemeral,omitempty"` + + // SessionID is populated on the terminal type=result event. + SessionID string `json:"sessionId,omitempty"` + + // ExitCode is populated on the terminal type=result event. 0=success. + ExitCode int `json:"exitCode,omitempty"` + + // Usage is populated on the terminal type=result event. + Usage *copilotStreamUsage `json:"usage,omitempty"` +} + +// copilotStreamData is the per-event payload. Fields are sparsely populated +// based on the parent event's Type. +type copilotStreamData struct { + MessageID string `json:"messageId,omitempty"` + TurnID string `json:"turnId,omitempty"` + DeltaContent string `json:"deltaContent,omitempty"` + Model string `json:"model,omitempty"` + Content string `json:"content,omitempty"` + + // OutputTokens appears on the consolidated assistant.message event. + // Copilot does not expose input/cached tokens in its public stream. + OutputTokens int `json:"outputTokens,omitempty"` +} + +// copilotStreamUsage appears inside the terminal type=result event. Copilot +// reports timing data and "premium request" counts but does not surface input +// or cached-input token counts in its public stream. +type copilotStreamUsage struct { + PremiumRequests int `json:"premiumRequests,omitempty"` + TotalAPIDurationMs int `json:"totalApiDurationMs,omitempty"` + SessionDurationMs int `json:"sessionDurationMs,omitempty"` +} diff --git a/cmd/entire/cli/agent/copilotcli/testdata/stream_error.jsonl b/cmd/entire/cli/agent/copilotcli/testdata/stream_error.jsonl new file mode 100644 index 0000000000..c428b6c233 --- /dev/null +++ b/cmd/entire/cli/agent/copilotcli/testdata/stream_error.jsonl @@ -0,0 +1,5 @@ +{"type":"session.tools_updated","data":{"model":"claude-sonnet-4.6"},"id":"c","timestamp":"2026-05-15T21:35:30.958Z","ephemeral":true} +{"type":"user.message","data":{"content":"hi","interactionId":"i0"},"id":"u0","timestamp":"2026-05-15T21:35:30.959Z"} +{"type":"assistant.turn_start","data":{"turnId":"0","interactionId":"i0"},"id":"t0","timestamp":"2026-05-15T21:35:31.071Z"} +{"type":"error","data":{},"message":"rate limited: too many requests","timestamp":"2026-05-15T21:35:31.100Z"} +{"type":"result","timestamp":"2026-05-15T21:35:31.110Z","sessionId":"sess-1","exitCode":2,"usage":{"premiumRequests":0,"totalApiDurationMs":0,"sessionDurationMs":42}} diff --git a/cmd/entire/cli/agent/copilotcli/testdata/stream_success.jsonl b/cmd/entire/cli/agent/copilotcli/testdata/stream_success.jsonl new file mode 100644 index 0000000000..74ec3883fd --- /dev/null +++ b/cmd/entire/cli/agent/copilotcli/testdata/stream_success.jsonl @@ -0,0 +1,11 @@ +{"type":"session.mcp_servers_loaded","data":{"servers":[]},"id":"a","timestamp":"2026-05-15T21:35:30.934Z","ephemeral":true} +{"type":"session.skills_loaded","data":{"skills":[]},"id":"b","timestamp":"2026-05-15T21:35:30.934Z","ephemeral":true} +{"type":"session.tools_updated","data":{"model":"claude-sonnet-4.6"},"id":"c","timestamp":"2026-05-15T21:35:30.958Z","ephemeral":true} +{"type":"user.message","data":{"content":"say hi","interactionId":"i0"},"id":"u0","timestamp":"2026-05-15T21:35:30.959Z"} +{"type":"assistant.turn_start","data":{"turnId":"0","interactionId":"i0"},"id":"t0","timestamp":"2026-05-15T21:35:31.071Z"} +{"type":"assistant.message_start","data":{"messageId":"m0"},"id":"ms0","timestamp":"2026-05-15T21:35:33.283Z","ephemeral":true} +{"type":"assistant.message_delta","data":{"messageId":"m0","deltaContent":"Hello"},"id":"d1","timestamp":"2026-05-15T21:35:33.283Z","ephemeral":true} +{"type":"assistant.message_delta","data":{"messageId":"m0","deltaContent":", world."},"id":"d2","timestamp":"2026-05-15T21:35:33.317Z","ephemeral":true} +{"type":"assistant.message","data":{"messageId":"m0","model":"claude-sonnet-4.6","content":"Hello, world.","outputTokens":7,"turnId":"0","interactionId":"i0"},"id":"am0","timestamp":"2026-05-15T21:35:33.668Z"} +{"type":"assistant.turn_end","data":{"turnId":"0"},"id":"te0","timestamp":"2026-05-15T21:35:33.669Z"} +{"type":"result","timestamp":"2026-05-15T21:35:33.679Z","sessionId":"sess-1","exitCode":0,"usage":{"premiumRequests":1,"totalApiDurationMs":2546,"sessionDurationMs":4443}} diff --git a/cmd/entire/cli/agent/cursor/generate_streaming.go b/cmd/entire/cli/agent/cursor/generate_streaming.go new file mode 100644 index 0000000000..550e10dfc6 --- /dev/null +++ b/cmd/entire/cli/agent/cursor/generate_streaming.go @@ -0,0 +1,203 @@ +package cursor + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "strings" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +const streamBufferMax = 4 * 1024 * 1024 // 4 MiB + +// parseCursorStream consumes `agent --print --output-format stream-json +// --stream-partial-output` NDJSON output, dispatches progress callbacks, and +// returns the canonical result text (from the terminal result event, not +// from concatenated assistant deltas — see note on aggregated final message +// below). +// +// Real-CLI shape (Cursor agent, Composer 2.5 Fast): +// - system,subtype:init → PhaseConnecting (once) +// - user.message → ignored (echoed input) +// - thinking,subtype:delta / completed → ignored (internal +// reasoning; PhaseConnecting persists during this window so users see +// "Sending request to provider..." for longer on reasoning-heavy turns) +// - assistant w/ timestamp_ms (1st) → PhaseFirstToken (no parens — +// cacheReadTokens isn't known until result event) +// - assistant w/ timestamp_ms (subsequent) → PhaseGenerating (running +// token estimate from streamed character length) +// - assistant w/o timestamp_ms → ignored (aggregated final +// message; result.result is canonical) +// - result,subtype:success → PhaseDone (real +// OutputTokens / CachedInputTokens / DurationMs from result.usage and +// result.duration_ms) +// - result,is_error:true → typed error (privacy +// decode: surfaces result.result only, never the raw line) +func parseCursorStream(stdout io.Reader, progress agent.ProgressFn) (string, error) { + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 64*1024), streamBufferMax) + + var ( + connectingFired bool + firstTokenFired bool + sawResult bool + resultText string + streamedChars int + malformed int + usage *cursorStreamUsage + durationMs int + ) + + dispatch := func(p agent.GenerationProgress) { + if progress != nil { + progress(p) + } + } + + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var ev cursorStreamEvent + if err := json.Unmarshal(line, &ev); err != nil { + // Cursor may emit transient noise (blank lines, partial flushes); + // a schema-incompatible line is recoverable per-event but tracked + // so protocol regressions surface in the missing-result error + // instead of disappearing silently. + malformed++ + continue + } + + switch ev.Type { + case "system": + if ev.Subtype == "init" && !connectingFired { + connectingFired = true + dispatch(agent.GenerationProgress{Phase: agent.PhaseConnecting}) + } + + case "assistant": + // Skip the aggregated final assistant message (no timestamp_ms); + // only delta events (with timestamp_ms) drive PhaseGenerating. + if ev.TimestampMs == 0 { + continue + } + var textBuilder strings.Builder + if ev.Message != nil { + for _, c := range ev.Message.Content { + if c.Type == "text" { + textBuilder.WriteString(c.Text) + } + } + } + text := textBuilder.String() + if text == "" { + continue + } + if !firstTokenFired { + firstTokenFired = true + // CacheReadTokens isn't known until the result event arrives; + // FirstToken renders without parens here, and Done will + // expose cached/output/duration from result.usage. + dispatch(agent.GenerationProgress{Phase: agent.PhaseFirstToken}) + } else { + streamedChars += len(text) + dispatch(agent.GenerationProgress{Phase: agent.PhaseGenerating, OutputTokens: streamedChars / 4}) + } + + case "result": + sawResult = true + if ev.IsError { + // Privacy decode: surface only result text (CLI-authored, + // user-safe), never the raw line. Cursor's error envelope + // has not been observed in the wild beyond synthesized + // fixtures, so fall through to a generic message rather + // than leaking anything we haven't audited. + detail := ev.Result + if detail == "" { + detail = "unspecified error" + } + return "", fmt.Errorf("cursor stream error: %s", detail) + } + resultText = ev.Result + usage = ev.Usage + durationMs = ev.DurationMs + + // "thinking" and "user" events are intentionally ignored. + } + } + if err := scanner.Err(); err != nil { + return "", fmt.Errorf("reading cursor stream: %w", err) + } + if !sawResult { + if malformed > 0 { + return "", fmt.Errorf("cursor stream ended without a result event (%d malformed lines skipped)", malformed) + } + return "", errors.New("cursor stream ended without a result event") + } + if resultText == "" { + return "", errors.New("cursor stream produced no result text") + } + if progress != nil { + done := agent.GenerationProgress{Phase: agent.PhaseDone, DurationMs: durationMs} + if usage != nil { + done.OutputTokens = usage.OutputTokens + done.InputTokens = usage.InputTokens + done.CachedInputTokens = usage.CacheReadTokens + } + dispatch(done) + } + return resultText, nil +} + +// GenerateTextStreaming implements agent.StreamingTextGenerator for *CursorAgent. +func (c *CursorAgent) GenerateTextStreaming( + ctx context.Context, + prompt, model string, + progress agent.ProgressFn, +) (string, error) { + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "cursor", + BuildCmd: c.buildStreamCmd, + Parser: parseCursorStream, + LooksLikeUnrecognizedFlag: func(stderr string) bool { + return agent.LooksLikeUnrecognizedFlag(stderr, "stream-json", "stream-partial-output", "output-format") + }, + } + + result, err := tmpl.Generate(ctx, prompt, model, progress) + if err != nil { + if errors.Is(err, agent.ErrUnrecognizedStreamingFlag) { + return c.GenerateText(ctx, prompt, model) + } + return "", fmt.Errorf("cursor streaming generate: %w", err) + } + return result, nil +} + +func (c *CursorAgent) buildStreamCmd(ctx context.Context, prompt, model string) *exec.Cmd { + commandRunner := c.CommandRunner + if commandRunner == nil { + commandRunner = exec.CommandContext + } + args := []string{ + "--print", + "--force", + "--trust", + "--workspace", os.TempDir(), + "--output-format", "stream-json", + "--stream-partial-output", + } + if model != "" { + args = append(args, "--model", model) + } + cmd := commandRunner(ctx, "agent", args...) + cmd.Stdin = strings.NewReader(prompt) + return cmd +} diff --git a/cmd/entire/cli/agent/cursor/generate_streaming_test.go b/cmd/entire/cli/agent/cursor/generate_streaming_test.go new file mode 100644 index 0000000000..998b02fe14 --- /dev/null +++ b/cmd/entire/cli/agent/cursor/generate_streaming_test.go @@ -0,0 +1,168 @@ +package cursor + +import ( + "bytes" + "context" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/agent/testutil" +) + +func TestParseCursorStream_Success(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + var phases []agent.ProgressPhase + var doneProgress agent.GenerationProgress + result, err := parseCursorStream(bytes.NewReader(data), func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + if p.Phase == agent.PhaseDone { + doneProgress = p + } + }) + if err != nil { + t.Fatalf("parse: %v", err) + } + if result != "Hello, world." { + t.Errorf("result = %q, want %q (from result.result, not delta concat)", result, "Hello, world.") + } + + counts := map[agent.ProgressPhase]int{} + for _, p := range phases { + counts[p]++ + } + if counts[agent.PhaseConnecting] != 1 { + t.Errorf("PhaseConnecting count = %d, want 1", counts[agent.PhaseConnecting]) + } + if counts[agent.PhaseFirstToken] != 1 { + t.Errorf("PhaseFirstToken count = %d, want 1", counts[agent.PhaseFirstToken]) + } + if counts[agent.PhaseGenerating] < 1 { + t.Errorf("PhaseGenerating count = %d, want >= 1", counts[agent.PhaseGenerating]) + } + if counts[agent.PhaseDone] != 1 { + t.Errorf("PhaseDone count = %d, want 1", counts[agent.PhaseDone]) + } + if doneProgress.OutputTokens != 268 { + t.Errorf("Done.OutputTokens = %d, want 268 (from result.usage.outputTokens)", doneProgress.OutputTokens) + } + if doneProgress.CachedInputTokens != 4608 { + t.Errorf("Done.CachedInputTokens = %d, want 4608 (from result.usage.cacheReadTokens)", doneProgress.CachedInputTokens) + } + if doneProgress.DurationMs != 3372 { + t.Errorf("Done.DurationMs = %d, want 3372 (from result.duration_ms)", doneProgress.DurationMs) + } +} + +func TestParseCursorStream_ErrorEnvelope(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_error.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + _, err = parseCursorStream(bytes.NewReader(data), nil) + if err == nil { + t.Fatal("expected error from result envelope with is_error=true") + } + if !strings.Contains(err.Error(), "Max turns") { + t.Errorf("error %q should mention 'Max turns'", err) + } +} + +func TestParseCursorStream_MissingResult(t *testing.T) { + t.Parallel() + + stream := `{"type":"system","subtype":"init","session_id":"t"} +{"type":"assistant","message":{"role":"assistant","content":[{"type":"text","text":"partial"}]},"session_id":"t","timestamp_ms":1} +` + _, err := parseCursorStream(strings.NewReader(stream), nil) + if err == nil { + t.Fatal("expected error when stream lacks result event") + } +} + +func TestParseCursorStream_EmptyResultText(t *testing.T) { + t.Parallel() + + // A successful (is_error:false) result event with an empty `result` + // field must error rather than returning ("", nil) — mirrors the + // guarantee Codex's parser makes and that RunIsolatedTextGeneratorCLI + // makes on the non-streaming path. + stream := `{"type":"system","subtype":"init","session_id":"t"} +{"type":"result","subtype":"success","duration_ms":1,"is_error":false,"result":"","session_id":"t"} +` + _, err := parseCursorStream(strings.NewReader(stream), nil) + if err == nil { + t.Fatal("expected error when result event carries empty result text") + } + if !strings.Contains(err.Error(), "no result text") { + t.Errorf("error %q should mention 'no result text'", err) + } +} + +func TestCursorGenerateTextStreaming_Success(t *testing.T) { + t.Parallel() + + fixture, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + c := &CursorAgent{ + CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0), + } + + var phases []agent.ProgressPhase + result, err := c.GenerateTextStreaming(context.Background(), "test prompt", "", func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result != "Hello, world." { + t.Errorf("result = %q, want %q (canonical from result.result)", result, "Hello, world.") + } + // Expect Connecting, FirstToken, Generating (>=1), Done. + if len(phases) < 4 { + t.Errorf("phases = %v (count %d), want >= 4 (Connecting, FirstToken, Generating+, Done)", phases, len(phases)) + } +} + +func TestCursorGenerateTextStreaming_FallbackOnUnrecognizedFlag(t *testing.T) { + t.Parallel() + + streamCall := testutil.FakeStreamCmd("", "error: unknown flag: --stream-json", 1) + nonStreamCall := testutil.FakeStreamCmd("fallback response", "", 0) + calls := 0 + c := &CursorAgent{ + CommandRunner: func(ctx context.Context, name string, args ...string) *exec.Cmd { + calls++ + if calls == 1 { + return streamCall(ctx, name, args...) + } + return nonStreamCall(ctx, name, args...) + }, + } + + result, err := c.GenerateTextStreaming(context.Background(), "test", "", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(result, "fallback") { + t.Errorf("result = %q, want substring 'fallback'", result) + } + if calls != 2 { + t.Errorf("calls = %d, want 2 (streaming + fallback)", calls) + } +} diff --git a/cmd/entire/cli/agent/cursor/stream_response.go b/cmd/entire/cli/agent/cursor/stream_response.go new file mode 100644 index 0000000000..178af54e86 --- /dev/null +++ b/cmd/entire/cli/agent/cursor/stream_response.go @@ -0,0 +1,46 @@ +package cursor + +// cursorStreamEvent is one decoded line of `agent --print --output-format +// stream-json --stream-partial-output` NDJSON output. Fields are populated +// based on Type. +// +// Real-CLI shape (Cursor agent, Composer 2.5 Fast): +// - system,subtype:init → session bootstrap (model, cwd) +// - user.message → echoed input, ignored +// - thinking,subtype:delta / completed → internal reasoning, ignored +// - assistant (with timestamp_ms) delta → incremental text token +// - assistant (no timestamp_ms) aggregated → final consolidated message, +// ignored (use result.result) +// - result,subtype:success → terminal: usage + duration +// - result,is_error:true → terminal error envelope +type cursorStreamEvent struct { + Type string `json:"type"` + Subtype string `json:"subtype,omitempty"` + IsError bool `json:"is_error,omitempty"` + Result string `json:"result,omitempty"` + DurationMs int `json:"duration_ms,omitempty"` + TimestampMs int64 `json:"timestamp_ms,omitempty"` + Message *cursorStreamMessage `json:"message,omitempty"` + Usage *cursorStreamUsage `json:"usage,omitempty"` +} + +// cursorStreamMessage is the assistant/user message payload. +type cursorStreamMessage struct { + Role string `json:"role,omitempty"` + Content []cursorStreamMessageContent `json:"content,omitempty"` +} + +// cursorStreamMessageContent is one content block within a message. +type cursorStreamMessageContent struct { + Type string `json:"type,omitempty"` + Text string `json:"text,omitempty"` +} + +// cursorStreamUsage appears inside the terminal type=result event. Cursor +// reports input, output, and cache-read token counts. +type cursorStreamUsage struct { + InputTokens int `json:"inputTokens,omitempty"` + OutputTokens int `json:"outputTokens,omitempty"` + CacheReadTokens int `json:"cacheReadTokens,omitempty"` + CacheWriteTokens int `json:"cacheWriteTokens,omitempty"` +} diff --git a/cmd/entire/cli/agent/cursor/testdata/stream_error.jsonl b/cmd/entire/cli/agent/cursor/testdata/stream_error.jsonl new file mode 100644 index 0000000000..224edbfb65 --- /dev/null +++ b/cmd/entire/cli/agent/cursor/testdata/stream_error.jsonl @@ -0,0 +1,2 @@ +{"type":"system","subtype":"init","apiKeySource":"login","cwd":"/private/tmp","session_id":"test","model":"Composer 2.5 Fast","permissionMode":"default"} +{"type":"result","subtype":"error_max_turns","duration_ms":100,"is_error":true,"result":"Max turns exceeded","session_id":"test"} diff --git a/cmd/entire/cli/agent/cursor/testdata/stream_success.jsonl b/cmd/entire/cli/agent/cursor/testdata/stream_success.jsonl new file mode 100644 index 0000000000..84c708af60 --- /dev/null +++ b/cmd/entire/cli/agent/cursor/testdata/stream_success.jsonl @@ -0,0 +1,8 @@ +{"type":"system","subtype":"init","apiKeySource":"login","cwd":"/private/tmp","session_id":"test","model":"Composer 2.5 Fast","permissionMode":"default"} +{"type":"user","message":{"role":"user","content":[{"type":"text","text":"say hi"}]},"session_id":"test"} +{"type":"thinking","subtype":"delta","text":"Hi","session_id":"test","timestamp_ms":1} +{"type":"thinking","subtype":"completed","session_id":"test","timestamp_ms":2} +{"type":"assistant","message":{"role":"assistant","content":[{"type":"text","text":"Hello"}]},"session_id":"test","timestamp_ms":3} +{"type":"assistant","message":{"role":"assistant","content":[{"type":"text","text":", world."}]},"session_id":"test","timestamp_ms":4} +{"type":"assistant","message":{"role":"assistant","content":[{"type":"text","text":"Hello, world."}]},"session_id":"test"} +{"type":"result","subtype":"success","duration_ms":3372,"is_error":false,"result":"Hello, world.","session_id":"test","usage":{"inputTokens":9141,"outputTokens":268,"cacheReadTokens":4608,"cacheWriteTokens":0}} diff --git a/cmd/entire/cli/agent/geminicli/generate_streaming.go b/cmd/entire/cli/agent/geminicli/generate_streaming.go new file mode 100644 index 0000000000..5ee352c75a --- /dev/null +++ b/cmd/entire/cli/agent/geminicli/generate_streaming.go @@ -0,0 +1,171 @@ +package geminicli + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os/exec" + "strings" + "time" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +const streamBufferMax = 4 * 1024 * 1024 // 4 MiB + +// parseGeminiStream consumes `gemini --output-format stream-json` NDJSON +// output, dispatches progress callbacks, and returns the concatenated +// assistant content. +// +// Real-CLI shape (gemini-cli 0.38.2): +// - init -> PhaseConnecting +// - message(role=user) -> ignored (echo) +// - message(role=assistant, delta=true) -> PhaseFirstToken (1st) +// / PhaseGenerating (subsequent) +// content concatenated into result +// - result(status=success, stats={...}) -> PhaseDone (terminal) +// - result(status=error, error={type,message}, stats) -> returns error +// +// Note: Gemini typically emits ONE assistant message per turn rather than +// chunking deltas, so PhaseGenerating may not fire on real captures even +// though it works for multi-message fixtures. +func parseGeminiStream(stdout io.Reader, progress agent.ProgressFn) (string, error) { + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 64*1024), streamBufferMax) + + var ( + result strings.Builder + sawInit bool + firstTokenFired bool + stats *geminiStreamStats + malformed int + start = time.Now() + totalChars int + ) + + dispatch := func(p agent.GenerationProgress) { + if progress != nil { + progress(p) + } + } + + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + var ev geminiStreamEvent + if err := json.Unmarshal(line, &ev); err != nil { + // Gemini may emit transient noise (blank lines, partial flushes); + // a schema-incompatible line is recoverable per-event but tracked + // so protocol regressions surface in the no-content error instead + // of disappearing silently. + malformed++ + continue + } + + switch ev.Type { + case "init": + sawInit = true + dispatch(agent.GenerationProgress{Phase: agent.PhaseConnecting}) + + case "message": + if ev.Role != "assistant" { + continue + } + result.WriteString(ev.Content) + totalChars += len(ev.Content) + if !firstTokenFired { + firstTokenFired = true + // Gemini's init/message events carry no TTFT, input-token, + // or cached-token data; usage is deferred to the terminal + // result event. PhaseFirstToken is dispatched without those + // fields so the progress writer's graceful degradation + // renders "Provider responded -- generating..." without + // parens. + dispatch(agent.GenerationProgress{Phase: agent.PhaseFirstToken}) + } else { + dispatch(agent.GenerationProgress{ + Phase: agent.PhaseGenerating, + OutputTokens: totalChars / 4, + }) + } + + case "result": + stats = ev.Stats + if ev.Status == "error" { + return "", fmt.Errorf("gemini stream error: %s", agent.SafeErrorMessage(line)) + } + } + } + if err := scanner.Err(); err != nil { + return "", fmt.Errorf("reading gemini stream: %w", err) + } + if !sawInit { + return "", errors.New("gemini stream ended without an init event") + } + if !firstTokenFired { + if malformed > 0 { + return "", fmt.Errorf("gemini stream produced no assistant content (%d malformed lines skipped)", malformed) + } + return "", errors.New("gemini stream produced no assistant content") + } + if progress != nil { + done := agent.GenerationProgress{Phase: agent.PhaseDone} + if stats != nil { + done.OutputTokens = stats.OutputTokens + done.InputTokens = stats.InputTokens + done.CachedInputTokens = stats.Cached + if stats.DurationMs > 0 { + done.DurationMs = stats.DurationMs + } + } + if done.DurationMs == 0 { + done.DurationMs = int(time.Since(start).Milliseconds()) + } + dispatch(done) + } + return result.String(), nil +} + +// GenerateTextStreaming implements agent.StreamingTextGenerator. +func (g *GeminiCLIAgent) GenerateTextStreaming( + ctx context.Context, + prompt, model string, + progress agent.ProgressFn, +) (string, error) { + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "gemini", + BuildCmd: g.buildStreamCmd, + Parser: parseGeminiStream, + LooksLikeUnrecognizedFlag: func(stderr string) bool { + return agent.LooksLikeUnrecognizedFlag(stderr, "output-format", "stream-json") + }, + } + + result, err := tmpl.Generate(ctx, prompt, model, progress) + if err != nil { + if errors.Is(err, agent.ErrUnrecognizedStreamingFlag) { + return g.GenerateText(ctx, prompt, model) + } + return "", fmt.Errorf("gemini streaming generate: %w", err) + } + return result, nil +} + +func (g *GeminiCLIAgent) buildStreamCmd(ctx context.Context, prompt, model string) *exec.Cmd { + commandRunner := g.CommandRunner + if commandRunner == nil { + commandRunner = exec.CommandContext + } + args := []string{"--output-format", "stream-json", "-p", " "} + if model != "" { + args = append(args, "--model", model) + } + cmd := commandRunner(ctx, "gemini", args...) + cmd.Stdin = strings.NewReader(prompt) + return cmd +} diff --git a/cmd/entire/cli/agent/geminicli/generate_streaming_test.go b/cmd/entire/cli/agent/geminicli/generate_streaming_test.go new file mode 100644 index 0000000000..9d3da0003d --- /dev/null +++ b/cmd/entire/cli/agent/geminicli/generate_streaming_test.go @@ -0,0 +1,133 @@ +package geminicli + +import ( + "bytes" + "context" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/agent/testutil" +) + +func TestParseGeminiStream_Success(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + var phases []agent.ProgressPhase + result, err := parseGeminiStream(bytes.NewReader(data), func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + }) + if err != nil { + t.Fatalf("parse: %v", err) + } + if !strings.Contains(result, "Hello") || !strings.Contains(result, "world") { + t.Errorf("result = %q, want concatenated assistant content", result) + } + + counts := map[agent.ProgressPhase]int{} + for _, p := range phases { + counts[p]++ + } + if counts[agent.PhaseConnecting] != 1 { + t.Errorf("PhaseConnecting count = %d, want 1", counts[agent.PhaseConnecting]) + } + if counts[agent.PhaseFirstToken] != 1 { + t.Errorf("PhaseFirstToken count = %d, want 1", counts[agent.PhaseFirstToken]) + } + if counts[agent.PhaseGenerating] < 1 { + t.Errorf("PhaseGenerating count = %d, want at least 1", counts[agent.PhaseGenerating]) + } + if counts[agent.PhaseDone] != 1 { + t.Errorf("PhaseDone count = %d, want 1 (emitted at result event)", counts[agent.PhaseDone]) + } +} + +func TestParseGeminiStream_ErrorEnvelope(t *testing.T) { + t.Parallel() + + data, err := os.ReadFile(filepath.Join("testdata", "stream_error.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + _, err = parseGeminiStream(bytes.NewReader(data), nil) + if err == nil { + t.Fatal("expected error from error envelope") + } + if !strings.Contains(err.Error(), "invalid request") { + t.Errorf("error %q should mention 'invalid request' from error.message", err) + } +} + +func TestParseGeminiStream_EmptyStream(t *testing.T) { + t.Parallel() + + _, err := parseGeminiStream(strings.NewReader(""), nil) + if err == nil { + t.Fatal("expected error from empty stream (no init, no assistant content)") + } +} + +func TestGeminiCLIGenerateTextStreaming_Success(t *testing.T) { + t.Parallel() + + fixture, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl")) + if err != nil { + t.Fatalf("read fixture: %v", err) + } + + g := &GeminiCLIAgent{ + CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0), + } + + var phases []agent.ProgressPhase + result, err := g.GenerateTextStreaming(context.Background(), "test prompt", "haiku", func(p agent.GenerationProgress) { + phases = append(phases, p.Phase) + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(result, "Hello") || !strings.Contains(result, "world") { + t.Errorf("result = %q, want concatenated assistant content", result) + } + // Expect Connecting, FirstToken, Generating (at least once), Done. + if len(phases) < 4 { + t.Errorf("phases = %v (count %d), want >= 4 (Connecting, FirstToken, Generating+, Done)", phases, len(phases)) + } +} + +func TestGeminiCLIGenerateTextStreaming_FallbackOnUnrecognizedFlag(t *testing.T) { + t.Parallel() + + streamCall := testutil.FakeStreamCmd("", "error: unknown flag: --output-format", 1) + nonStreamCall := testutil.FakeStreamCmd("fallback response", "", 0) + calls := 0 + g := &GeminiCLIAgent{ + CommandRunner: func(ctx context.Context, name string, args ...string) *exec.Cmd { + calls++ + if calls == 1 { + return streamCall(ctx, name, args...) + } + return nonStreamCall(ctx, name, args...) + }, + } + + result, err := g.GenerateTextStreaming(context.Background(), "test", "haiku", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(result, "fallback") { + t.Errorf("result = %q, want substring 'fallback'", result) + } + if calls != 2 { + t.Errorf("calls = %d, want 2 (streaming + fallback)", calls) + } +} diff --git a/cmd/entire/cli/agent/geminicli/stream_response.go b/cmd/entire/cli/agent/geminicli/stream_response.go new file mode 100644 index 0000000000..88ac2e116f --- /dev/null +++ b/cmd/entire/cli/agent/geminicli/stream_response.go @@ -0,0 +1,38 @@ +package geminicli + +import "time" + +// geminiStreamEvent is one decoded line of `gemini --output-format stream-json` +// NDJSON output. Fields are populated based on Type: +// +// - "init": SessionID, Model +// - "message": Role ("user" or "assistant"), Content, Delta (true on incremental) +// - "result": Status ("success" | "error"), Error (when status=error), Stats +type geminiStreamEvent struct { + Type string `json:"type"` + Timestamp time.Time `json:"timestamp,omitempty"` + SessionID string `json:"session_id,omitempty"` + Model string `json:"model,omitempty"` + Role string `json:"role,omitempty"` + Content string `json:"content,omitempty"` + Delta bool `json:"delta,omitempty"` + Status string `json:"status,omitempty"` + Error *geminiStreamError `json:"error,omitempty"` + Stats *geminiStreamStats `json:"stats,omitempty"` +} + +// geminiStreamError is the body of a result event with status="error". +type geminiStreamError struct { + Type string `json:"type,omitempty"` + Message string `json:"message,omitempty"` +} + +// geminiStreamStats appears inside the terminal type=result event. +// Token field names mirror gemini-cli's stats schema. +type geminiStreamStats struct { + TotalTokens int `json:"total_tokens,omitempty"` + InputTokens int `json:"input_tokens,omitempty"` + OutputTokens int `json:"output_tokens,omitempty"` + Cached int `json:"cached,omitempty"` + DurationMs int `json:"duration_ms,omitempty"` +} diff --git a/cmd/entire/cli/agent/geminicli/testdata/stream_error.jsonl b/cmd/entire/cli/agent/geminicli/testdata/stream_error.jsonl new file mode 100644 index 0000000000..9f659be64f --- /dev/null +++ b/cmd/entire/cli/agent/geminicli/testdata/stream_error.jsonl @@ -0,0 +1,3 @@ +{"type":"init","timestamp":"2026-05-15T16:00:46.999Z","session_id":"test-session","model":"nonexistent-model"} +{"type":"message","timestamp":"2026-05-15T16:00:47.000Z","role":"user","content":"hi"} +{"type":"result","timestamp":"2026-05-15T16:00:47.100Z","status":"error","error":{"type":"unknown","message":"invalid request"},"stats":{"total_tokens":0,"input_tokens":0,"output_tokens":0,"cached":0,"duration_ms":0}} diff --git a/cmd/entire/cli/agent/geminicli/testdata/stream_success.jsonl b/cmd/entire/cli/agent/geminicli/testdata/stream_success.jsonl new file mode 100644 index 0000000000..3777856675 --- /dev/null +++ b/cmd/entire/cli/agent/geminicli/testdata/stream_success.jsonl @@ -0,0 +1,5 @@ +{"type":"init","timestamp":"2026-05-15T16:00:46.999Z","session_id":"test-session","model":"gemini-3-flash-preview"} +{"type":"message","timestamp":"2026-05-15T16:00:46.999Z","role":"user","content":"say hi"} +{"type":"message","timestamp":"2026-05-15T16:00:48.211Z","role":"assistant","content":"Hello","delta":true} +{"type":"message","timestamp":"2026-05-15T16:00:48.311Z","role":"assistant","content":", world.","delta":true} +{"type":"result","timestamp":"2026-05-15T16:00:48.500Z","status":"success","stats":{"total_tokens":42,"input_tokens":35,"output_tokens":7,"cached":0,"duration_ms":1501}} diff --git a/cmd/entire/cli/agent/streaming_safe_error.go b/cmd/entire/cli/agent/streaming_safe_error.go new file mode 100644 index 0000000000..cd13c7b16e --- /dev/null +++ b/cmd/entire/cli/agent/streaming_safe_error.go @@ -0,0 +1,39 @@ +package agent + +import "encoding/json" + +// SafeErrorMessage decodes operational error text from a raw NDJSON line +// without leaking the raw line itself. +// +// Stream parsers MUST NOT propagate raw protocol lines into error messages: +// CLI stderr/stdout can carry echoed user content or model-message +// fragments, which would then surface in logs, telemetry, and user-facing +// error output. This helper partially decodes the line against the three +// commonly-observed message paths (`message`, `error.message`, +// `data.message`) and falls back to "unspecified error" if none match. +// +// Callers pass the raw scanner line. Decode errors are tolerated: if the +// line is malformed or has none of these paths, the result is the safe +// sentinel rather than the raw bytes. +func SafeErrorMessage(line []byte) string { + var details struct { + Message string `json:"message"` + Error struct { + Message string `json:"message"` + } `json:"error"` + Data struct { + Message string `json:"message"` + } `json:"data"` + } + _ = json.Unmarshal(line, &details) //nolint:errcheck // partial decode tolerated; raw line MUST NOT leak + switch { + case details.Message != "": + return details.Message + case details.Error.Message != "": + return details.Error.Message + case details.Data.Message != "": + return details.Data.Message + default: + return "unspecified error" + } +} diff --git a/cmd/entire/cli/agent/streaming_safe_error_test.go b/cmd/entire/cli/agent/streaming_safe_error_test.go new file mode 100644 index 0000000000..919d9f1242 --- /dev/null +++ b/cmd/entire/cli/agent/streaming_safe_error_test.go @@ -0,0 +1,37 @@ +package agent_test + +import ( + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +func TestSafeErrorMessage(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + line string + want string + }{ + {"top_level_message", `{"type":"error","message":"model not found"}`, "model not found"}, + {"nested_error_message", `{"type":"turn.failed","error":{"message":"timed out"}}`, "timed out"}, + {"nested_data_message", `{"type":"error","data":{"message":"rate limited"}}`, "rate limited"}, + {"prefers_top_level_over_nested", `{"message":"top","error":{"message":"nested"}}`, "top"}, + {"prefers_error_over_data", `{"error":{"message":"e"},"data":{"message":"d"}}`, "e"}, + {"empty_strings_fall_through", `{"message":"","error":{"message":""},"data":{"message":""}}`, "unspecified error"}, + {"no_message_paths", `{"type":"thread.started","thread_id":"t"}`, "unspecified error"}, + {"malformed_json", `{not json at all`, "unspecified error"}, + {"empty_line", ``, "unspecified error"}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got := agent.SafeErrorMessage([]byte(tc.line)) + if got != tc.want { + t.Errorf("SafeErrorMessage(%q) = %q, want %q", tc.line, got, tc.want) + } + }) + } +} diff --git a/cmd/entire/cli/agent/streaming_template.go b/cmd/entire/cli/agent/streaming_template.go new file mode 100644 index 0000000000..3d740c81a0 --- /dev/null +++ b/cmd/entire/cli/agent/streaming_template.go @@ -0,0 +1,158 @@ +package agent + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "log/slog" + "os" + "os/exec" + "strings" + + "github.com/entireio/cli/cmd/entire/cli/logging" +) + +// StreamingGeneratorTemplate is the shared subprocess-lifecycle wrapper for +// streaming text generators. Per-agent code provides BuildCmd (argv) and +// Parser (stdout → progress events); the template handles Start, drain, +// Wait, stderr capture, and error wrapping. +// +// Parallels review/types/template.ReviewerTemplate (established in PR #1192). +// +// Fields must be non-nil before Generate is called; nil values cause +// Generate to return ErrTemplateMisconfigured. +type StreamingGeneratorTemplate struct { + // AgentName is an identifier used in log entries and the error-message + // prefix wrapped into *TextGenerationError (e.g., "codex"). + AgentName string + + // BuildCmd constructs the *exec.Cmd for one streaming call. Implementations + // MUST set cmd.Stdin to the prompt and cmd.Args to the agent's + // streaming-mode invocation. The template will set cmd.Dir = os.TempDir() + // and cmd.Env = StripGitEnv(os.Environ()) before Start. + BuildCmd func(ctx context.Context, prompt, model string) *exec.Cmd + + // Parser consumes the agent's stdout stream and dispatches progress + // callbacks. Returns the final result text on success. Must read until + // stdout EOF before returning so the template can call Wait cleanly. + // progress may be nil; Parser must handle that. + Parser func(stdout io.Reader, progress ProgressFn) (result string, err error) + + // LooksLikeUnrecognizedFlag is optional. When non-nil and the subprocess + // fails with stderr matching the predicate, the caller can fall back to + // the agent's non-streaming GenerateText path. The template surfaces + // this signal via ErrUnrecognizedStreamingFlag so the caller decides. + LooksLikeUnrecognizedFlag func(stderr string) bool +} + +// ErrTemplateMisconfigured is returned when required template fields are nil. +var ErrTemplateMisconfigured = errors.New("streaming template misconfigured") + +// ErrUnrecognizedStreamingFlag is returned when LooksLikeUnrecognizedFlag +// indicates the CLI rejected a streaming-specific flag. Callers that +// implement a fallback should errors.Is this to detect. +var ErrUnrecognizedStreamingFlag = errors.New("CLI rejected streaming flag") + +// Generate runs one streaming generation and returns the final result text. +// +// Error shapes by failure point: +// - Pre-subprocess (StdoutPipe failure): plain wrapped error, since no +// stderr/stdout exists yet to diagnose with. +// - cmd.Start failure: wrapped error, or *TextGenerationError when ctx is +// already cancelled at that point. +// - Anything after Start (parse error, non-zero exit, ctx cancellation): +// *TextGenerationError carrying captured stderr and the stdout byte +// count from countingReader, matching RunIsolatedTextGeneratorCLI's +// error shape so the explain layer's diagnostic path can read both. +// - LooksLikeUnrecognizedFlag predicate match: ErrUnrecognizedStreamingFlag +// sentinel so the caller can fall back to non-streaming. +func (t *StreamingGeneratorTemplate) Generate( + ctx context.Context, + prompt, model string, + progress ProgressFn, +) (string, error) { + if t.BuildCmd == nil || t.Parser == nil { + return "", ErrTemplateMisconfigured + } + + cmd := t.BuildCmd(ctx, prompt, model) + cmd.Dir = os.TempDir() + cmd.Env = StripGitEnv(os.Environ()) + + stdout, err := cmd.StdoutPipe() + if err != nil { + return "", fmt.Errorf("%s stream stdout pipe: %w", t.AgentName, err) + } + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Start(); err != nil { + if ctx.Err() != nil { + return "", &TextGenerationError{ + Err: ctx.Err(), + Stderr: strings.TrimSpace(stderr.String()), + StdoutBytes: 0, + } + } + return "", fmt.Errorf("%s stream start: %w", t.AgentName, err) + } + + counter := &countingReader{r: stdout} + result, parseErr := t.Parser(counter, progress) + + // Drain through the counter so StdoutBytes reflects the full subprocess + // output even when the parser exited early (e.g. on a recognized + // in-stream error). Reading from stdout directly would bypass counter.n. + if _, drainErr := io.Copy(io.Discard, counter); drainErr != nil { + logging.Debug(ctx, "draining stream stdout", + slog.String("agent", t.AgentName), + slog.String("error", drainErr.Error())) + } + waitErr := cmd.Wait() + + if ctx.Err() != nil { + stderrStr := strings.TrimSpace(stderr.String()) + return "", &TextGenerationError{ + Err: ctx.Err(), + Stderr: stderrStr, + StdoutBytes: counter.n, + } + } + + if parseErr == nil && waitErr == nil { + return result, nil + } + + stderrStr := strings.TrimSpace(stderr.String()) + if waitErr != nil && t.LooksLikeUnrecognizedFlag != nil && t.LooksLikeUnrecognizedFlag(stderrStr) { + logging.Warn(ctx, "CLI rejected streaming flags; caller should fall back to non-streaming", + slog.String("agent", t.AgentName), + slog.String("stderr", stderrStr)) + return "", ErrUnrecognizedStreamingFlag + } + + wrappedErr := waitErr + if wrappedErr == nil { + wrappedErr = parseErr + } + return "", &TextGenerationError{ + Err: fmt.Errorf("%s stream failed: %w", t.AgentName, wrappedErr), + Stderr: stderrStr, + StdoutBytes: counter.n, + } +} + +// countingReader passes bytes through and counts them. Used by the template +// so the diagnostic path can ask "did the subprocess produce any output?". +type countingReader struct { + r io.Reader + n int +} + +func (c *countingReader) Read(p []byte) (int, error) { + n, err := c.r.Read(p) + c.n += n + return n, err //nolint:wrapcheck // io.Reader contract requires passthrough (including io.EOF) without wrapping +} diff --git a/cmd/entire/cli/agent/streaming_template_test.go b/cmd/entire/cli/agent/streaming_template_test.go new file mode 100644 index 0000000000..62b5af9e6c --- /dev/null +++ b/cmd/entire/cli/agent/streaming_template_test.go @@ -0,0 +1,131 @@ +package agent_test + +import ( + "context" + "errors" + "io" + "os/exec" + "strings" + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/agent/testutil" +) + +func TestStreamingGeneratorTemplate_Generate_Success(t *testing.T) { + t.Parallel() + + parsed := false + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "fake", + BuildCmd: func(ctx context.Context, _, _ string) *exec.Cmd { + return testutil.FakeStreamCmd("hello\nworld\n", "", 0)(ctx, "fake", []string{}...) + }, + Parser: func(stdout io.Reader, _ agent.ProgressFn) (string, error) { + b, err := io.ReadAll(stdout) + if err != nil { + return "", err + } + parsed = true + return strings.TrimSpace(string(b)), nil + }, + } + + result, err := tmpl.Generate(context.Background(), "prompt", "model", nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result != "hello\nworld" { + t.Errorf("result = %q, want %q", result, "hello\nworld") + } + if !parsed { + t.Error("expected parser to have been called") + } +} + +func TestStreamingGeneratorTemplate_Generate_NilFieldsReturnError(t *testing.T) { + t.Parallel() + + tmpl := &agent.StreamingGeneratorTemplate{} + _, err := tmpl.Generate(context.Background(), "prompt", "model", nil) + if !errors.Is(err, agent.ErrTemplateMisconfigured) { + t.Errorf("err = %v, want ErrTemplateMisconfigured", err) + } +} + +func TestStreamingGeneratorTemplate_Generate_UnrecognizedFlagFallback(t *testing.T) { + t.Parallel() + + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "fake", + BuildCmd: func(ctx context.Context, _, _ string) *exec.Cmd { + return testutil.FakeStreamCmd("", "error: unknown flag: --stream-json", 1)(ctx, "fake", []string{}...) + }, + Parser: func(stdout io.Reader, _ agent.ProgressFn) (string, error) { + _, _ = io.Copy(io.Discard, stdout) //nolint:errcheck // best-effort drain in test fake; failure here is irrelevant + return "", nil + }, + LooksLikeUnrecognizedFlag: func(stderr string) bool { + return strings.Contains(stderr, "unknown flag") && strings.Contains(stderr, "stream-json") + }, + } + + _, err := tmpl.Generate(context.Background(), "prompt", "model", nil) + if !errors.Is(err, agent.ErrUnrecognizedStreamingFlag) { + t.Errorf("err = %v, want ErrUnrecognizedStreamingFlag", err) + } +} + +func TestStreamingGeneratorTemplate_Generate_NonZeroExitWrapsError(t *testing.T) { + t.Parallel() + + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "fake", + BuildCmd: func(ctx context.Context, _, _ string) *exec.Cmd { + return testutil.FakeStreamCmd("partial\n", "boom\n", 1)(ctx, "fake", []string{}...) + }, + Parser: func(stdout io.Reader, _ agent.ProgressFn) (string, error) { + _, _ = io.Copy(io.Discard, stdout) //nolint:errcheck // best-effort drain in test fake; failure here is irrelevant + return "", nil + }, + } + + _, err := tmpl.Generate(context.Background(), "prompt", "model", nil) + var failure *agent.TextGenerationError + if !errors.As(err, &failure) { + t.Fatalf("err = %v, want *TextGenerationError", err) + } + if !strings.Contains(failure.Stderr, "boom") { + t.Errorf("stderr captured = %q, want substring 'boom'", failure.Stderr) + } + if failure.StdoutBytes == 0 { + t.Errorf("stdoutBytes = 0, want > 0 (subprocess emitted 'partial\\n')") + } +} + +func TestStreamingGeneratorTemplate_Generate_ContextCancelled(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + tmpl := &agent.StreamingGeneratorTemplate{ + AgentName: "fake", + BuildCmd: func(ctx context.Context, _, _ string) *exec.Cmd { + return testutil.FakeStreamCmd("ok\n", "", 0)(ctx, "fake", []string{}...) + }, + Parser: func(stdout io.Reader, _ agent.ProgressFn) (string, error) { + _, _ = io.Copy(io.Discard, stdout) //nolint:errcheck // best-effort drain in test fake; failure here is irrelevant + return "", nil + }, + } + + _, err := tmpl.Generate(ctx, "prompt", "model", nil) + var failure *agent.TextGenerationError + if !errors.As(err, &failure) { + t.Fatalf("err = %v, want *TextGenerationError wrapping context error", err) + } + if !errors.Is(failure.Err, context.Canceled) { + t.Errorf("inner err = %v, want context.Canceled", failure.Err) + } +} diff --git a/cmd/entire/cli/agent/streaming_unrecognized_flag.go b/cmd/entire/cli/agent/streaming_unrecognized_flag.go new file mode 100644 index 0000000000..10992f6e55 --- /dev/null +++ b/cmd/entire/cli/agent/streaming_unrecognized_flag.go @@ -0,0 +1,33 @@ +package agent + +import "strings" + +// LooksLikeUnrecognizedFlag reports whether stderr matches the canonical +// CLI-rejected-a-flag pattern: a rejection phrase ("unknown flag", +// "unrecognized option", etc.) combined with at least one of the +// caller-specified flag-name keywords. +// +// Used by streaming text generators to detect when an older CLI version +// rejects a streaming-mode argv so the caller can fall back to a +// non-streaming path. Returns false if no rejection phrase is present or +// no keyword matches. +// +// Keyword matching is case-insensitive and substring-based; pass the +// distinguishing words from your streaming flags (e.g. "stream-json", +// "output-format") rather than the leading "--". +func LooksLikeUnrecognizedFlag(stderr string, flagKeywords ...string) bool { + lower := strings.ToLower(stderr) + rejection := strings.Contains(lower, "unknown flag") || + strings.Contains(lower, "unrecognized option") || + strings.Contains(lower, "unknown option") || + strings.Contains(lower, "invalid option") + if !rejection { + return false + } + for _, kw := range flagKeywords { + if strings.Contains(lower, strings.ToLower(kw)) { + return true + } + } + return false +} diff --git a/cmd/entire/cli/agent/streaming_unrecognized_flag_test.go b/cmd/entire/cli/agent/streaming_unrecognized_flag_test.go new file mode 100644 index 0000000000..61946327c8 --- /dev/null +++ b/cmd/entire/cli/agent/streaming_unrecognized_flag_test.go @@ -0,0 +1,44 @@ +package agent_test + +import ( + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +func TestLooksLikeUnrecognizedFlag(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + stderr string + keywords []string + want bool + }{ + {"unknown_flag_with_matching_keyword", "error: unknown flag: --stream-json", []string{"stream-json"}, true}, + {"unrecognized_option_with_matching_keyword", "unrecognized option '--json'", []string{"json"}, true}, + {"invalid_option_with_matching_keyword", "invalid option: --output-format", []string{"output-format"}, true}, + {"unknown_option_with_matching_keyword", "unknown option: --stream", []string{"stream"}, true}, + + {"case_insensitive_stderr", "ERROR: UNKNOWN FLAG: --JSON", []string{"json"}, true}, + {"case_insensitive_keyword", "unknown flag: --json", []string{"JSON"}, true}, + + {"rejection_present_but_no_matching_keyword", "unknown flag: --foo", []string{"json", "stream"}, false}, + {"matching_keyword_but_no_rejection", "json: parse error", []string{"json"}, false}, + + {"empty_stderr", "", []string{"json"}, false}, + {"empty_keywords", "unknown flag: --json", nil, false}, + + {"weak_rejection_words_alone_dont_match", "command unknown", []string{"unknown"}, false}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got := agent.LooksLikeUnrecognizedFlag(tc.stderr, tc.keywords...) + if got != tc.want { + t.Errorf("LooksLikeUnrecognizedFlag(%q, %v) = %v, want %v", tc.stderr, tc.keywords, got, tc.want) + } + }) + } +} diff --git a/cmd/entire/cli/agent/testutil/streaming.go b/cmd/entire/cli/agent/testutil/streaming.go new file mode 100644 index 0000000000..9e3d0d483e --- /dev/null +++ b/cmd/entire/cli/agent/testutil/streaming.go @@ -0,0 +1,52 @@ +// Package testutil holds shared test helpers for the agent package and +// its sub-packages. Not for production use. +package testutil + +import ( + "context" + "os/exec" + "strconv" + "strings" +) + +// FakeStreamCmd returns a CommandRunner factory whose *exec.Cmd, when +// Start()'d and Wait()'d, produces stdout/stderr/exit-code as configured. +// Implementation assumes a POSIX shell (`sh -c`) to write fixture data; +// it is not usable from a Windows runner. The streaming agents these +// helpers test are not currently exercised by the Windows E2E workflow +// (e2e-windows.yml), so a POSIX-only fake is sufficient. +func FakeStreamCmd(stdout, stderr string, exitCode int) func(ctx context.Context, name string, args ...string) *exec.Cmd { + return func(ctx context.Context, _ string, _ ...string) *exec.Cmd { + script := BuildFakeShellScript(stdout, stderr, exitCode) + return exec.CommandContext(ctx, "sh", "-c", script) + } +} + +// BuildFakeShellScript renders a shell snippet that emits the given stdout, +// stderr, and exit code. Exported so callers that need a customized fake +// (e.g. multi-stage stdout) can compose against it. +func BuildFakeShellScript(stdout, stderr string, exitCode int) string { + var sb strings.Builder + if stdout != "" { + sb.WriteString("cat <<'__EOF__'\n") + sb.WriteString(stdout) + if !strings.HasSuffix(stdout, "\n") { + sb.WriteString("\n") + } + sb.WriteString("__EOF__\n") + } + if stderr != "" { + sb.WriteString("cat <<'__EOF__' 1>&2\n") + sb.WriteString(stderr) + if !strings.HasSuffix(stderr, "\n") { + sb.WriteString("\n") + } + sb.WriteString("__EOF__\n") + } + if exitCode != 0 { + sb.WriteString("exit ") + sb.WriteString(strconv.Itoa(exitCode)) + sb.WriteString("\n") + } + return sb.String() +} diff --git a/cmd/entire/cli/explain.go b/cmd/entire/cli/explain.go index aabdbc89a2..05804e38d4 100644 --- a/cmd/entire/cli/explain.go +++ b/cmd/entire/cli/explain.go @@ -1447,9 +1447,20 @@ func (s *summaryProgressWriter) handle(p agent.GenerationProgress) { case agent.PhaseConnecting: s.printLine(s.arrow + " Sending request to provider...") case agent.PhaseFirstToken: + var clauses []string + if p.TTFTms > 0 { + clauses = append(clauses, "TTFT "+formatMs(p.TTFTms)) + } + if p.CachedInputTokens > 0 { + clauses = append(clauses, formatTokenCount(p.CachedInputTokens)+" cached input tokens") + } + detail := "" + if len(clauses) > 0 { + detail = " (" + strings.Join(clauses, ", ") + ")" + } s.printLine(fmt.Sprintf( - "%s Provider responded (TTFT %s, %s cached input tokens) -- generating...", - s.arrow, formatMs(p.TTFTms), formatTokenCount(p.CachedInputTokens))) + "%s Provider responded%s -- generating...", + s.arrow, detail)) case agent.PhaseGenerating: if !s.shouldEmitGenerating(p) { return @@ -1458,9 +1469,16 @@ func (s *summaryProgressWriter) handle(p agent.GenerationProgress) { "%s Writing summary... (~%s tokens)", s.arrow, formatTokenCount(p.OutputTokens))) case agent.PhaseDone: + clauses := []string{ + formatMs(p.DurationMs), + formatTokenCount(p.OutputTokens) + " output tokens", + } + if p.CachedInputTokens > 0 { + clauses = append(clauses, formatTokenCount(p.CachedInputTokens)+" cached input tokens") + } s.printLine(fmt.Sprintf( - "%s Summary generated (%s, %s output tokens)", - s.check, formatMs(p.DurationMs), formatTokenCount(p.OutputTokens))) + "%s Summary generated (%s)", + s.check, strings.Join(clauses, ", "))) } } diff --git a/cmd/entire/cli/explain_test.go b/cmd/entire/cli/explain_test.go index 76db9a13ff..a427cda7b6 100644 --- a/cmd/entire/cli/explain_test.go +++ b/cmd/entire/cli/explain_test.go @@ -6675,6 +6675,88 @@ func TestSummaryProgressWriter_NonTTYGenerateThrottle(t *testing.T) { } } +func TestSummaryProgressWriter_FirstTokenWithoutOptionalFields(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + attempt := newSummaryAttempt("gemini-cli", 0) + pw := newSummaryProgressWriter(&buf, attempt) + + pw.handle(agent.GenerationProgress{Phase: agent.PhaseFirstToken}) + + out := buf.String() + if !strings.Contains(out, "Provider responded -- generating...") { + t.Errorf("output = %q, want 'Provider responded -- generating...' (no parens)", out) + } + if strings.Contains(out, "(") { + t.Errorf("output = %q, should not contain '(' when both fields are zero", out) + } +} + +func TestSummaryProgressWriter_FirstTokenWithOnlyCachedTokens(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + attempt := newSummaryAttempt("codex", 0) + pw := newSummaryProgressWriter(&buf, attempt) + + pw.handle(agent.GenerationProgress{Phase: agent.PhaseFirstToken, CachedInputTokens: 23808}) + + out := buf.String() + if !strings.Contains(out, "Provider responded (23.8k cached input tokens) -- generating...") { + t.Errorf("output = %q, want '(23.8k cached input tokens)' clause", out) + } + if strings.Contains(out, "TTFT") { + t.Errorf("output = %q, should not mention TTFT when TTFTms == 0", out) + } +} + +func TestSummaryProgressWriter_DoneWithCachedTokens(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + attempt := newSummaryAttempt("cursor", 0) + pw := newSummaryProgressWriter(&buf, attempt) + + pw.handle(agent.GenerationProgress{ + Phase: agent.PhaseDone, + DurationMs: 10700, + OutputTokens: 1600, + CachedInputTokens: 4608, + }) + + out := buf.String() + if !strings.Contains(out, "1.6k output tokens") { + t.Errorf("output = %q, want '1.6k output tokens' clause", out) + } + if !strings.Contains(out, "4.6k cached input tokens") { + t.Errorf("output = %q, want '4.6k cached input tokens' clause when CachedInputTokens > 0", out) + } +} + +func TestSummaryProgressWriter_DoneWithoutCachedTokens(t *testing.T) { + t.Parallel() + + var buf bytes.Buffer + attempt := newSummaryAttempt("gemini", 0) + pw := newSummaryProgressWriter(&buf, attempt) + + pw.handle(agent.GenerationProgress{ + Phase: agent.PhaseDone, + DurationMs: 10700, + OutputTokens: 1600, + // CachedInputTokens: 0 — omitted + }) + + out := buf.String() + if !strings.Contains(out, "1.6k output tokens") { + t.Errorf("output = %q, want '1.6k output tokens' clause", out) + } + if strings.Contains(out, "cached input tokens") { + t.Errorf("output = %q, should not include cached tokens clause when CachedInputTokens == 0", out) + } +} + func TestSummaryProgressWriter_Accessible(t *testing.T) { // Cannot use t.Parallel() — t.Setenv mutates process-global state. // (Strictly, t.Setenv IS compatible with t.Parallel() in Go 1.17+ when