diff --git a/bun.lock b/bun.lock index 12e53065df2..ada1a57699e 100644 --- a/bun.lock +++ b/bun.lock @@ -366,6 +366,7 @@ "hono-openapi": "catalog:", "ignore": "7.0.5", "jsonc-parser": "3.3.1", + "langfuse": "3.38.6", "mime-types": "3.0.2", "minimatch": "10.0.3", "open": "10.1.2", @@ -3407,6 +3408,10 @@ "lang-map": ["lang-map@0.4.0", "", { "dependencies": { "language-map": "^1.1.0" } }, "sha512-oiSqZIEUnWdFeDNsp4HId4tAxdFbx5iMBOwA3666Fn2L8Khj8NiD9xRvMsGmKXopPVkaDFtSv3CJOmXFUB0Hcg=="], + "langfuse": ["langfuse@3.38.6", "", { "dependencies": { "langfuse-core": "^3.38.6" } }, "sha512-mtwfsNGIYvObRh+NYNGlJQJDiBN+Wr3Hnr++wN25mxuOpSTdXX+JQqVCyAqGL5GD2TAXRZ7COsN42Vmp9krYmg=="], + + "langfuse-core": ["langfuse-core@3.38.6", "", { "dependencies": { "mustache": "^4.2.0" } }, "sha512-EcZXa+DK9FJdi1I30+u19eKjuBJ04du6j2Nybk19KKCuraLczg/ppkTQcGvc4QOk//OAi3qUHrajUuV74RXsBQ=="], + "language-map": ["language-map@1.5.0", "", {}, "sha512-n7gFZpe+DwEAX9cXVTw43i3wiudWDDtSn28RmdnS/HCPr284dQI/SztsamWanRr75oSlKSaGbV2nmWCTzGCoVg=="], "lazy-val": ["lazy-val@1.0.5", "", {}, "sha512-0/BnGCCfyUMkBpeDgWihanIAF9JmZhHBgUhEqzvf+adhNGLoP6TaiI5oF8oyb3I45P+PcnrqihSf01M0l0G5+Q=="], diff --git a/packages/opencode/package.json b/packages/opencode/package.json index 47948acb228..5521f2881a8 100644 --- a/packages/opencode/package.json +++ b/packages/opencode/package.json @@ -130,6 +130,7 @@ "hono-openapi": "catalog:", "ignore": "7.0.5", "jsonc-parser": "3.3.1", + "langfuse": "3.38.6", "mime-types": "3.0.2", "minimatch": "10.0.3", "open": "10.1.2", diff --git a/packages/opencode/src/provider/transform.ts b/packages/opencode/src/provider/transform.ts index f651a5b91aa..ad608629c58 100644 --- a/packages/opencode/src/provider/transform.ts +++ b/packages/opencode/src/provider/transform.ts @@ -329,7 +329,7 @@ export namespace ProviderTransform { if (id.includes("glm-4.7")) return 1.0 if (id.includes("minimax-m2")) return 1.0 if (id.includes("kimi-k2")) { - // kimi-k2-thinking & kimi-k2.5 && kimi-k2p5 && kimi-k2-5 + // kimi-k2-thinking, kimi-k2.5, kimi-k2p5, kimi-k2-5 if (["thinking", "k2.", "k2p", "k2-5"].some((s) => id.includes(s))) { return 1.0 } @@ -373,9 +373,7 @@ export namespace ProviderTransform { id.includes("minimax") || id.includes("glm") || id.includes("mistral") || - id.includes("kimi") || - // TODO: Remove this after models.dev data is fixed to use "kimi-k2.5" instead of "k2p5" - id.includes("k2p5") + id.includes("kimi") ) return {} diff --git a/packages/opencode/src/session/langfuse.ts b/packages/opencode/src/session/langfuse.ts new file mode 100644 index 00000000000..c7bd6d11c6a --- /dev/null +++ b/packages/opencode/src/session/langfuse.ts @@ -0,0 +1,42 @@ +import Langfuse from "langfuse" +import { Log } from "../util/log" + +const log = Log.create({ service: "langfuse" }) + +let client: Langfuse | undefined +let initFailed = false + +export function getLangfuse(): Langfuse | undefined { + if (client) return client + if (initFailed) return undefined + + const secretKey = process.env.LANGFUSE_SECRET_KEY + const publicKey = process.env.LANGFUSE_PUBLIC_KEY + const baseUrl = process.env.LANGFUSE_BASE_URL + + if (!secretKey || !publicKey) { + log.info("langfuse disabled: missing LANGFUSE_SECRET_KEY or LANGFUSE_PUBLIC_KEY") + initFailed = true + return undefined + } + + try { + client = new Langfuse({ + secretKey, + publicKey, + baseUrl: baseUrl ?? "https://cloud.langfuse.com", + }) + log.info("langfuse initialized", { baseUrl }) + return client + } catch (e) { + log.error("langfuse init failed", { error: e }) + initFailed = true + return undefined + } +} + +export async function flushLangfuse() { + if (client) { + await client.flushAsync() + } +} diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index a9edf838ca8..3e66b1f8990 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -49,6 +49,7 @@ import { Shell } from "@/shell/shell" import { Truncate } from "@/tool/truncate" import { decodeDataUrl } from "@/util/data-url" import { Process } from "@/util/process" +import { getLangfuse, flushLangfuse } from "./langfuse" // @ts-ignore globalThis.AI_SDK_LOG_WARNINGS = false @@ -293,6 +294,14 @@ export namespace SessionPrompt { // on the user message and will be retrieved from lastUser below let structuredOutput: unknown | undefined + // Langfuse tracing: one trace per loop invocation (coding session turn) + const langfuse = getLangfuse() + const trace = langfuse?.trace({ + name: "opencode.loop", + sessionId: sessionID, + metadata: { resume_existing }, + }) + let step = 0 const session = await Session.get(sessionID) while (true) { @@ -335,13 +344,29 @@ export namespace SessionPrompt { } step++ - if (step === 1) + if (step === 1) { + // Langfuse: capture user input on first step + if (trace) { + const userMsg = msgs.findLast((m) => m.info.role === "user") + const userText = userMsg?.parts + .filter((p): p is MessageV2.TextPart => p.type === "text") + .map((p) => p.text) + .join("\n") + trace.update({ + input: userText, + metadata: { + agent: lastUser.agent, + model: `${lastUser.model.providerID}/${lastUser.model.modelID}`, + }, + }) + } ensureTitle({ session, modelID: lastUser.model.modelID, providerID: lastUser.model.providerID, history: msgs, }) + } const model = await Provider.getModel(lastUser.model.providerID, lastUser.model.modelID).catch((e) => { if (Provider.ModelNotFoundError.isInstance(e)) { @@ -690,6 +715,16 @@ export namespace SessionPrompt { system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT) } + // Langfuse: create a span for this loop iteration + const iterationSpan = trace?.span({ + name: `loop.step-${step}`, + input: { + agent: agent.name, + model: `${model.providerID}/${model.id}`, + toolCount: Object.keys(tools).length, + }, + }) + const result = await processor.process({ user: lastUser, agent, @@ -713,6 +748,151 @@ export namespace SessionPrompt { toolChoice: format.type === "json_schema" ? "required" : undefined, }) + // Langfuse: record the LLM generation and tool calls from this iteration + // The parts array for an assistant message flows in order: + // step-start → reasoning → text → tool (calls+results) → step-finish + // → step-start (next internal step) → reasoning → text → ... → step-finish + // We reconstruct each LLM call as a generation with its full context. + if (iterationSpan) { + const parts = await MessageV2.parts(processor.message.id) + + let stepIndex = 0 + // Accumulate parts for the current LLM step + let currentReasoning: string[] = [] + let currentText: string[] = [] + let currentToolCalls: { tool: string; input: any; output?: string; error?: string }[] = [] + // Track tool results from the PREVIOUS step (they become input to the next LLM call) + let prevToolResults: { tool: string; input: any; output?: string; error?: string }[] = [] + + const flushStep = (stepPart: MessageV2.StepFinishPart) => { + const reasoning = currentReasoning.join("") + const text = currentText.join("") + const toolCalls = currentToolCalls.map((tc) => ({ + tool: tc.tool, + input: tc.input, + ...(tc.output ? { output: tc.output.slice(0, 500) } : {}), + ...(tc.error ? { error: tc.error } : {}), + })) + + // Input: first step gets the user message; subsequent steps get tool results + const input = stepIndex === 0 + ? msgs + .filter((m) => m.info.role === "user") + .slice(-1) + .map((m) => + m.parts + .filter((p): p is MessageV2.TextPart => p.type === "text") + .map((p) => p.text) + .join("\n"), + ) + .join("\n") + : prevToolResults + + // Output: structured to show reasoning + text + tool calls (mirrors the TUI) + const outputParts: Record = {} + if (reasoning) outputParts.thinking = reasoning + if (text) outputParts.text = text + if (toolCalls.length > 0) outputParts.toolCalls = toolCalls + const output = Object.keys(outputParts).length > 0 ? outputParts : undefined + + iterationSpan.generation({ + name: `llm-call-${stepIndex}`, + model: `${model.providerID}/${model.id}`, + modelParameters: { + ...(agent.temperature != null ? { temperature: String(agent.temperature) } : {}), + }, + input, + output, + usage: { + input: stepPart.tokens.input, + output: stepPart.tokens.output, + total: stepPart.tokens.total ?? stepPart.tokens.input + stepPart.tokens.output, + }, + metadata: { + finishReason: stepPart.reason, + cost: stepPart.cost, + reasoning_tokens: stepPart.tokens.reasoning, + cache_read: stepPart.tokens.cache.read, + cache_write: stepPart.tokens.cache.write, + }, + }) + + // Carry tool calls forward as input context for the next step + prevToolResults = currentToolCalls.map((tc) => ({ + tool: tc.tool, + input: tc.input, + ...(tc.output ? { output: tc.output.slice(0, 500) } : {}), + ...(tc.error ? { error: tc.error } : {}), + })) + + stepIndex++ + currentReasoning = [] + currentText = [] + currentToolCalls = [] + } + + for (const part of parts) { + // Accumulate reasoning (thinking) for the current step + if (part.type === "reasoning") { + currentReasoning.push(part.text) + } + + // Accumulate text output for the current step + if (part.type === "text") { + currentText.push(part.text) + } + + // Record each tool call as a span and accumulate for generation context + if (part.type === "tool" && part.state.status === "completed") { + currentToolCalls.push({ + tool: part.tool, + input: part.state.input, + output: part.state.output, + }) + iterationSpan.span({ + name: `tool.${part.tool}`, + input: part.state.input, + output: part.state.output, + startTime: new Date(part.state.time.start), + endTime: new Date(part.state.time.end), + metadata: { + title: part.state.title, + ...part.state.metadata, + }, + }) + } + if (part.type === "tool" && part.state.status === "error") { + currentToolCalls.push({ + tool: part.tool, + input: part.state.input, + error: part.state.error, + }) + iterationSpan.span({ + name: `tool.${part.tool}`, + input: part.state.input, + output: part.state.error, + startTime: new Date(part.state.time.start), + endTime: new Date(part.state.time.end), + level: "ERROR", + }) + } + + // Flush accumulated content when a step finishes + if (part.type === "step-finish") { + flushStep(part) + } + } + + iterationSpan.end({ + output: { + finish: processor.message.finish, + result, + tokens: processor.message.tokens, + cost: processor.message.cost, + }, + }) + } + // If structured output was captured, save it and exit immediately // This takes priority because the StructuredOutput tool was called successfully if (structuredOutput !== undefined) { @@ -749,6 +929,14 @@ export namespace SessionPrompt { } continue } + // Langfuse: finalize trace + if (trace) { + trace.update({ + output: { steps: step }, + }) + flushLangfuse().catch(() => {}) + } + SessionCompaction.prune({ sessionID }) for await (const item of MessageV2.stream(sessionID)) { if (item.info.role === "user") continue diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts index 270950fd4b8..b40e8d2acd4 100644 --- a/packages/opencode/src/sync/index.ts +++ b/packages/opencode/src/sync/index.ts @@ -159,8 +159,6 @@ export namespace SyncEvent { }) } - // TODO: - // // * Support applying multiple events at one time. One transaction, // and it validets all the sequence ids // * when loading events from db, apply zod validation to ensure shape