diff --git a/knip.json b/knip.json index 034f186a2..0a2351b79 100644 --- a/knip.json +++ b/knip.json @@ -17,16 +17,17 @@ "**/*.css", "packages/appkit/src/plugins/vector-search/**", "packages/appkit/src/plugin/index.ts", + "packages/appkit/src/plugin/to-plugin.ts", "packages/appkit/src/plugins/agents/index.ts", - "template/**", - "tools/**", - "docs/**", - ".github/scripts/**", + "packages/appkit/src/core/agent/index.ts", "packages/appkit/src/core/agent/tools/index.ts", "packages/appkit/src/core/agent/from-plugin.ts", "packages/appkit/src/core/agent/load-agents.ts", "packages/appkit/src/connectors/mcp/index.ts", - "packages/appkit/src/plugin/to-plugin.ts" + "template/**", + "tools/**", + "docs/**", + ".github/scripts/**" ], "ignoreDependencies": ["json-schema-to-typescript"], "ignoreBinaries": ["tarball"] diff --git a/packages/appkit/src/beta.ts b/packages/appkit/src/beta.ts index 562cfd43a..9295a43ed 100644 --- a/packages/appkit/src/beta.ts +++ b/packages/appkit/src/beta.ts @@ -52,8 +52,10 @@ export type { AgentDefinition, AgentsPluginConfig, AgentTool, + AgentTools, AutoInheritToolsConfig, BaseSystemPromptOption, + FromPluginMarker, PromptContext, RegisteredAgent, ResolvedToolEntry, @@ -62,6 +64,8 @@ export type { } from "./plugins/agents"; export { agentIdFromMarkdownPath, + fromPlugin, + isFromPluginMarker, isToolkitEntry, loadAgentFromFile, loadAgentsFromDir, diff --git a/packages/appkit/src/connectors/mcp/types.ts b/packages/appkit/src/connectors/mcp/types.ts index d74f0a46c..aeb61788e 100644 --- a/packages/appkit/src/connectors/mcp/types.ts +++ b/packages/appkit/src/connectors/mcp/types.ts @@ -1,7 +1,7 @@ /** * Input shape consumed by {@link AppKitMcpClient.connect}. Produced by the * agents plugin from user-facing `HostedTool` declarations (see - * `plugins/agents/tools/hosted-tools.ts`) and accepted directly by the + * `core/agent/tools/hosted-tools.ts`) and accepted directly by the * connector to keep its surface free of agent-layer concepts. */ export interface McpEndpointConfig { diff --git a/packages/appkit/src/core/agent/from-plugin.ts b/packages/appkit/src/core/agent/from-plugin.ts new file mode 100644 index 000000000..b11285941 --- /dev/null +++ b/packages/appkit/src/core/agent/from-plugin.ts @@ -0,0 +1,97 @@ +import type { NamedPluginFactory } from "../../plugin/to-plugin"; +import type { ToolkitOptions } from "./types"; + +/** + * Symbol brand for the `fromPlugin` marker. Using a globally-interned symbol + * (`Symbol.for`) keeps the brand stable across module boundaries / bundle + * duplicates so `isFromPluginMarker` stays reliable. + */ +export const FROM_PLUGIN_MARKER = Symbol.for( + "@databricks/appkit.fromPluginMarker", +); + +/** + * A lazy reference to a plugin's tools, produced by {@link fromPlugin} and + * resolved to concrete `ToolkitEntry`s at `AgentsPlugin.setup()` time. + * + * The marker is spread under a unique symbol key so multiple calls to + * `fromPlugin` (even for the same plugin) coexist in an `AgentDefinition.tools` + * record without colliding. + */ +export interface FromPluginMarker { + readonly [FROM_PLUGIN_MARKER]: true; + readonly pluginName: string; + readonly opts: ToolkitOptions | undefined; +} + +/** + * Record shape returned by {@link fromPlugin} — a single symbol-keyed entry + * suitable for spreading into `AgentDefinition.tools`. + */ +export type FromPluginSpread = { readonly [key: symbol]: FromPluginMarker }; + +/** + * Reference a plugin's tools inside an `AgentDefinition.tools` record without + * naming the plugin instance. The returned spread-friendly object carries a + * symbol-keyed marker that the agents plugin resolves against registered + * `ToolProvider`s at setup time. + * + * The factory argument must come from `toPlugin` (or any function that + * carries a `pluginName` field). `fromPlugin` reads `factory.pluginName` + * synchronously — it does not construct an instance. + * + * If the referenced plugin is also registered in `createApp({ plugins })`, the + * same runtime instance is used for dispatch. If the plugin is missing, + * `AgentsPlugin.setup()` throws with a clear `Available: …` listing. + * + * @example + * ```ts + * import { analytics, createAgent, files, fromPlugin, tool } from "@databricks/appkit"; + * + * const support = createAgent({ + * instructions: "You help customers.", + * tools: { + * ...fromPlugin(analytics), + * ...fromPlugin(files, { only: ["uploads.read"] }), + * get_weather: tool({ ... }), + * }, + * }); + * ``` + * + * @param factory A plugin factory produced by `toPlugin`. Must expose a + * `pluginName` field. + * @param opts Optional toolkit scoping — `prefix`, `only`, `except`, `rename`. + * Same shape as the `.toolkit()` method. + */ +export function fromPlugin( + factory: F, + opts?: ToolkitOptions, +): FromPluginSpread { + if ( + !factory || + typeof factory.pluginName !== "string" || + !factory.pluginName + ) { + throw new Error( + "fromPlugin(): factory is missing pluginName. Pass a factory created by toPlugin().", + ); + } + const pluginName = factory.pluginName; + const marker: FromPluginMarker = { + [FROM_PLUGIN_MARKER]: true, + pluginName, + opts, + }; + return { [Symbol(`fromPlugin:${pluginName}`)]: marker }; +} + +/** + * Type guard for {@link FromPluginMarker}. + */ +export function isFromPluginMarker(value: unknown): value is FromPluginMarker { + return ( + typeof value === "object" && + value !== null && + (value as Record)[FROM_PLUGIN_MARKER] === true + ); +} diff --git a/packages/appkit/src/core/agent/index.ts b/packages/appkit/src/core/agent/index.ts new file mode 100644 index 000000000..c11c48282 --- /dev/null +++ b/packages/appkit/src/core/agent/index.ts @@ -0,0 +1,64 @@ +/** + * Agent runtime primitives. All framework-level agent types, tool helpers, + * and the standalone runner live here. The HTTP-facing `agents()` plugin in + * `plugins/agents/` consumes these but does not own them — peer plugins + * (analytics, files, genie, lakebase) can depend on this module without + * reaching across the sibling boundary. + */ +export { buildToolkitEntries } from "./build-toolkit"; +export { consumeAdapterStream } from "./consume-adapter-stream"; +export { createAgent } from "./create-agent"; +export { + FROM_PLUGIN_MARKER, + type FromPluginMarker, + type FromPluginSpread, + fromPlugin, + isFromPluginMarker, +} from "./from-plugin"; +export { + agentIdFromMarkdownPath, + type LoadContext, + type LoadResult, + loadAgentFromFile, + loadAgentsFromDir, + parseFrontmatter, +} from "./load-agents"; +export { normalizeToolResult } from "./normalize-result"; +export { + type RunAgentInput, + type RunAgentResult, + runAgent, +} from "./run-agent"; +export { buildBaseSystemPrompt, composeSystemPrompt } from "./system-prompt"; +export { dispatchToolCall } from "./tool-dispatch"; +export { resolveToolkitFromProvider } from "./toolkit-resolver"; +export { + defineTool, + executeFromRegistry, + type FunctionTool, + functionToolToDefinition, + type HostedTool, + isFunctionTool, + isHostedTool, + mcpServer, + resolveHostedTools, + type ToolConfig, + type ToolEntry, + type ToolRegistry, + tool, + toolsFromRegistry, +} from "./tools"; +export { + type AgentDefinition, + type AgentsPluginConfig, + type AgentTool, + type AgentTools, + type AutoInheritToolsConfig, + type BaseSystemPromptOption, + isToolkitEntry, + type PromptContext, + type RegisteredAgent, + type ResolvedToolEntry, + type ToolkitEntry, + type ToolkitOptions, +} from "./types"; diff --git a/packages/appkit/src/core/agent/run-agent.ts b/packages/appkit/src/core/agent/run-agent.ts index 5ef4109cf..2c7e60f07 100644 --- a/packages/appkit/src/core/agent/run-agent.ts +++ b/packages/appkit/src/core/agent/run-agent.ts @@ -4,18 +4,23 @@ import type { AgentEvent, AgentToolDefinition, Message, + PluginConstructor, + PluginData, + ToolProvider, } from "shared"; +import { isFromPluginMarker } from "./from-plugin"; +import { AgentRunner } from "./runner"; +import { + type StandaloneEntry, + StandaloneToolExecutor, +} from "./standalone-tool-executor"; +import { resolveToolkitFromProvider } from "./toolkit-resolver"; import { - type FunctionTool, functionToolToDefinition, isFunctionTool, } from "./tools/function-tool"; import { isHostedTool } from "./tools/hosted-tools"; -import type { - AgentDefinition, - AgentTool, - ToolkitEntry, -} from "./types"; +import type { AgentDefinition, AgentTool, ToolkitEntry } from "./types"; import { isToolkitEntry } from "./types"; export interface RunAgentInput { @@ -23,6 +28,14 @@ export interface RunAgentInput { messages: string | Message[]; /** Abort signal for cancellation. */ signal?: AbortSignal; + /** + * Optional plugin list used to resolve `fromPlugin` markers in `def.tools`. + * Required when the def contains any `...fromPlugin(factory)` spreads; + * ignored otherwise. `runAgent` constructs a fresh instance per plugin + * and dispatches tool calls against it as the service principal (no + * OBO — there is no HTTP request in standalone mode). + */ + plugins?: PluginData[]; } export interface RunAgentResult { @@ -39,11 +52,12 @@ export interface RunAgentResult { * Limitations vs. running through the agents() plugin: * - No OBO: there is no HTTP request, so plugin tools run as the service * principal (when they work at all). - * - Plugin tools (`ToolkitEntry`) are not supported — they require a live - * `PluginContext` that only exists when registered in a `createApp` - * instance. This function throws a clear error if encountered. + * - Hosted tools (MCP) are not supported — they require a live MCP client + * that only exists inside the agents plugin. * - Sub-agents (`agents: { ... }` on the def) are executed as nested * `runAgent` calls with no shared thread state. + * - Plugin tools (`fromPlugin` markers or `ToolkitEntry` spreads) require + * passing `plugins: [...]` via `RunAgentInput`. */ export async function runAgent( def: AgentDefinition, @@ -51,59 +65,36 @@ export async function runAgent( ): Promise { const adapter = await resolveAdapter(def); const messages = normalizeMessages(input.messages, def.instructions); - const toolIndex = buildStandaloneToolIndex(def); + const toolIndex = buildStandaloneToolIndex(def, input.plugins ?? []); const tools = Array.from(toolIndex.values()).map((e) => e.def); - const signal = input.signal; - - const executeTool = async (name: string, args: unknown): Promise => { - const entry = toolIndex.get(name); - if (!entry) throw new Error(`Unknown tool: ${name}`); - if (entry.kind === "function") { - return entry.tool.execute(args as Record); - } - if (entry.kind === "subagent") { - const subInput: RunAgentInput = { - messages: - typeof args === "object" && - args !== null && - typeof (args as { input?: unknown }).input === "string" - ? (args as { input: string }).input - : JSON.stringify(args), - signal, - }; - const res = await runAgent(entry.agentDef, subInput); - return res.text; - } - throw new Error( - `runAgent: tool "${name}" is a ${entry.kind} tool. ` + - "Plugin toolkits and MCP tools are only usable via createApp({ plugins: [..., agents(...)] }).", - ); - }; + // `runAgent` historically allowed callers to omit a signal. Synthesize one + // so AgentRunner / ToolExecutor can rely on always having a real signal. + const signal = input.signal ?? new AbortController().signal; const events: AgentEvent[] = []; - let text = ""; - - const stream = adapter.run( - { - messages, - tools, - threadId: randomUUID(), - signal, + + const executor = new StandaloneToolExecutor( + toolIndex, + async (subDef, subInput, subSignal) => { + const res = await runAgent(subDef, { + messages: subInput, + signal: subSignal, + plugins: input.plugins, + }); + return res.text; }, - { executeTool, signal }, ); - for await (const event of stream) { - if (signal?.aborted) break; - events.push(event); - if (event.type === "message_delta") { - text += event.content; - } else if (event.type === "message") { - text = event.content; - } - } + const runner = new AgentRunner({ + adapter, + tools, + executeTool: executor, + signal, + onEvent: (event) => events.push(event), + }); + const text = await runner.run({ messages, threadId: randomUUID() }); return { text, events }; } @@ -144,34 +135,52 @@ function normalizeMessages( return [systemMessage, ...input]; } -type StandaloneEntry = - | { - kind: "function"; - def: AgentToolDefinition; - tool: FunctionTool; - } - | { - kind: "subagent"; - def: AgentToolDefinition; - agentDef: AgentDefinition; - } - | { - kind: "toolkit"; - def: AgentToolDefinition; - entry: ToolkitEntry; - } - | { - kind: "hosted"; - def: AgentToolDefinition; - }; - +/** + * Resolves `def.tools` (string-keyed entries + symbol-keyed `fromPlugin` + * markers) and `def.agents` (sub-agents) into a flat dispatch index. + * Symbol-keyed markers are resolved against `plugins`; missing references + * throw with an `Available: …` listing. + */ function buildStandaloneToolIndex( def: AgentDefinition, + plugins: PluginData[], ): Map { const index = new Map(); + const tools = def.tools; + + const symbolKeys = tools ? Object.getOwnPropertySymbols(tools) : []; + if (symbolKeys.length > 0) { + const providerCache = new Map(); + for (const sym of symbolKeys) { + const marker = (tools as Record)[sym]; + if (!isFromPluginMarker(marker)) continue; - for (const [key, tool] of Object.entries(def.tools ?? {})) { - index.set(key, classifyTool(key, tool)); + const provider = resolveStandaloneProvider( + marker.pluginName, + plugins, + providerCache, + ); + const entries = resolveToolkitFromProvider( + marker.pluginName, + provider, + marker.opts, + ); + for (const [key, entry] of Object.entries(entries)) { + index.set(key, { + kind: "toolkit", + provider, + pluginName: entry.pluginName, + localName: entry.localName, + def: { ...entry.def, name: key }, + }); + } + } + } + + if (tools) { + for (const [key, tool] of Object.entries(tools)) { + index.set(key, classifyTool(key, tool)); + } } for (const [childKey, child] of Object.entries(def.agents ?? {})) { @@ -203,7 +212,7 @@ function buildStandaloneToolIndex( function classifyTool(key: string, tool: AgentTool): StandaloneEntry { if (isToolkitEntry(tool)) { - return { kind: "toolkit", def: { ...tool.def, name: key }, entry: tool }; + return toolkitEntryToStandalone(key, tool); } if (isFunctionTool(tool)) { return { @@ -224,3 +233,67 @@ function classifyTool(key: string, tool: AgentTool): StandaloneEntry { } throw new Error(`runAgent: unrecognized tool shape at key "${key}"`); } + +/** + * Pre-`fromPlugin` code could reach a `ToolkitEntry` by calling + * `.toolkit()` at module scope (which requires an instance). Those entries + * still flow through `def.tools` but without a provider we can dispatch + * against — runAgent cannot execute them and errors clearly. + */ +function toolkitEntryToStandalone( + key: string, + entry: ToolkitEntry, +): StandaloneEntry { + const def: AgentToolDefinition = { ...entry.def, name: key }; + return { + kind: "hosted", + def: { + ...def, + description: + `${def.description ?? ""} ` + + `[runAgent: this ToolkitEntry refers to plugin '${entry.pluginName}' but ` + + "runAgent cannot dispatch it without the plugin instance. Pass the " + + "plugin via plugins: [...] and use fromPlugin(factory) instead of " + + ".toolkit() spreads.]".trim(), + }, + }; +} + +function resolveStandaloneProvider( + pluginName: string, + plugins: PluginData[], + cache: Map, +): ToolProvider { + const cached = cache.get(pluginName); + if (cached) return cached; + + const match = plugins.find((p) => p.name === pluginName); + if (!match) { + const available = plugins.map((p) => p.name).join(", ") || "(none)"; + throw new Error( + `runAgent: agent references plugin '${pluginName}' via fromPlugin(), but ` + + "that plugin is missing from RunAgentInput.plugins. " + + `Available: ${available}.`, + ); + } + + const instance = new match.plugin({ + ...(match.config ?? {}), + name: pluginName, + }); + const provider = instance as unknown as ToolProvider; + if ( + typeof (provider as { getAgentTools?: unknown }).getAgentTools !== + "function" || + typeof (provider as { executeAgentTool?: unknown }).executeAgentTool !== + "function" + ) { + throw new Error( + `runAgent: plugin '${pluginName}' is not a ToolProvider ` + + "(missing getAgentTools/executeAgentTool). Only ToolProvider plugins " + + "are supported via fromPlugin() in runAgent.", + ); + } + cache.set(pluginName, provider); + return provider; +} diff --git a/packages/appkit/src/core/agent/runner.ts b/packages/appkit/src/core/agent/runner.ts new file mode 100644 index 000000000..ac6f2ed3f --- /dev/null +++ b/packages/appkit/src/core/agent/runner.ts @@ -0,0 +1,65 @@ +import type { + AgentAdapter, + AgentEvent, + AgentToolDefinition, + Message, +} from "shared"; +import { consumeAdapterStream } from "./consume-adapter-stream"; + +/** + * Execution strategy for a tool call. Lives behind {@link AgentRunner} so + * the runner doesn't care whether the tool ends up dispatched via HTTP + * plumbing (approval gate, OBO, MCP client) or by direct in-process call. + * + * The runner injects the adapter's per-invocation {@link AbortSignal} so + * implementations can wire it through to long-running work. + */ +export interface ToolExecutor { + execute(name: string, args: unknown, signal: AbortSignal): Promise; +} + +interface AgentRunnerInput { + messages: Message[]; + threadId: string; +} + +interface AgentRunnerDeps { + adapter: AgentAdapter; + tools: AgentToolDefinition[]; + executeTool: ToolExecutor; + signal: AbortSignal; + /** Called for every event the adapter emits, in order. */ + onEvent?: (event: AgentEvent) => void; +} + +/** + * Single execution loop for an AgentDefinition. Intentionally thin — its + * only job is to drive the adapter to completion and surface events. + * + * Tool-dispatch policy (approval gating, per-user budget, OBO, MCP) + * is owned by the injected {@link ToolExecutor}. The plugin layer wires an + * `HttpToolExecutor` for the streaming chat path; `runAgent()` wires a + * `StandaloneToolExecutor` for in-process scripts. + */ +export class AgentRunner { + constructor(private deps: AgentRunnerDeps) {} + + async run(input: AgentRunnerInput): Promise { + const { adapter, tools, executeTool, signal, onEvent } = this.deps; + return consumeAdapterStream( + adapter.run( + { + messages: input.messages, + tools, + threadId: input.threadId, + signal, + }, + { + executeTool: (name, args) => executeTool.execute(name, args, signal), + signal, + }, + ), + { signal, onEvent }, + ); + } +} diff --git a/packages/appkit/src/core/agent/standalone-tool-executor.ts b/packages/appkit/src/core/agent/standalone-tool-executor.ts new file mode 100644 index 000000000..d78e0e6bc --- /dev/null +++ b/packages/appkit/src/core/agent/standalone-tool-executor.ts @@ -0,0 +1,78 @@ +import type { AgentToolDefinition, ToolProvider } from "shared"; +import type { ToolExecutor } from "./runner"; +import type { FunctionTool } from "./tools/function-tool"; +import type { AgentDefinition } from "./types"; + +/** + * Tool entry shape used by `runAgent`'s in-process dispatcher. Distinct + * from {@link import("./types").ResolvedToolEntry} because the standalone + * path holds live `provider`/`agentDef` references at index-build time + * (no PluginContext to resolve from at dispatch time). + */ +export type StandaloneEntry = + | { kind: "function"; def: AgentToolDefinition; tool: FunctionTool } + | { kind: "subagent"; def: AgentToolDefinition; agentDef: AgentDefinition } + | { + kind: "toolkit"; + def: AgentToolDefinition; + provider: ToolProvider; + pluginName: string; + localName: string; + } + | { kind: "hosted"; def: AgentToolDefinition }; + +/** + * In-process tool executor for {@link import("./run-agent").runAgent}. + * + * No approval gate, no per-user budget, no OBO — there is no HTTP + * request in standalone mode. Hosted/MCP tools error with a clear + * message because they require a live MCP client owned by the + * `agents()` plugin. + * + * Sub-agent recursion delegates back to the caller (which is `runAgent` + * itself); this keeps the executor free of a circular import on the + * top-level entry point. + */ +export class StandaloneToolExecutor implements ToolExecutor { + constructor( + private readonly toolIndex: Map, + private readonly subAgentRunner: ( + def: AgentDefinition, + input: string, + signal: AbortSignal, + ) => Promise, + ) {} + + async execute( + name: string, + args: unknown, + signal: AbortSignal, + ): Promise { + const entry = this.toolIndex.get(name); + if (!entry) throw new Error(`Unknown tool: ${name}`); + + if (entry.kind === "function") { + return entry.tool.execute(args as Record); + } + if (entry.kind === "toolkit") { + return entry.provider.executeAgentTool( + entry.localName, + args as Record, + signal, + ); + } + if (entry.kind === "subagent") { + const sub = + typeof args === "object" && + args !== null && + typeof (args as { input?: unknown }).input === "string" + ? (args as { input: string }).input + : JSON.stringify(args); + return this.subAgentRunner(entry.agentDef, sub, signal); + } + throw new Error( + `runAgent: tool "${name}" is a ${entry.kind} tool. ` + + "Hosted/MCP tools are only usable via createApp({ plugins: [..., agents(...)] }).", + ); + } +} diff --git a/packages/appkit/src/core/agent/tests/create-agent.test.ts b/packages/appkit/src/core/agent/tests/create-agent.test.ts index 30bcc98e9..df920369e 100644 --- a/packages/appkit/src/core/agent/tests/create-agent.test.ts +++ b/packages/appkit/src/core/agent/tests/create-agent.test.ts @@ -1,8 +1,8 @@ import { describe, expect, test } from "vitest"; import { z } from "zod"; import { createAgent } from "../create-agent"; -import { tool } from "../../../core/agent/tools/tool"; -import type { AgentDefinition } from "../../../core/agent/types"; +import { tool } from "../tools/tool"; +import type { AgentDefinition } from "../types"; describe("createAgent", () => { test("returns the definition unchanged for a simple agent", () => { diff --git a/packages/appkit/src/core/agent/tests/from-plugin.test.ts b/packages/appkit/src/core/agent/tests/from-plugin.test.ts new file mode 100644 index 000000000..eb31d0f7d --- /dev/null +++ b/packages/appkit/src/core/agent/tests/from-plugin.test.ts @@ -0,0 +1,80 @@ +import { describe, expect, test } from "vitest"; +import { + FROM_PLUGIN_MARKER, + fromPlugin, + isFromPluginMarker, +} from "../../../core/agent/from-plugin"; + +function fakeFactory(name: string) { + const f = () => ({ name }); + Object.defineProperty(f, "pluginName", { value: name, enumerable: true }); + return f as typeof f & { readonly pluginName: string }; +} + +describe("fromPlugin", () => { + test("returns a spread-friendly object with a single symbol-keyed marker", () => { + const spread = fromPlugin(fakeFactory("analytics")); + + expect(Object.keys(spread)).toHaveLength(0); + const syms = Object.getOwnPropertySymbols(spread); + expect(syms).toHaveLength(1); + + const marker = (spread as Record)[syms[0]!]; + expect(isFromPluginMarker(marker)).toBe(true); + expect((marker as { pluginName: string }).pluginName).toBe("analytics"); + }); + + test("multiple calls produce distinct symbol keys (spreads coexist)", () => { + const spread = { + ...fromPlugin(fakeFactory("analytics")), + ...fromPlugin(fakeFactory("analytics")), + ...fromPlugin(fakeFactory("files")), + }; + + const syms = Object.getOwnPropertySymbols(spread); + expect(syms).toHaveLength(3); + }); + + test("passes opts through to the marker", () => { + const spread = fromPlugin(fakeFactory("analytics"), { + only: ["query"], + prefix: "q_", + }); + const sym = Object.getOwnPropertySymbols(spread)[0]!; + const marker = (spread as Record)[sym] as { + opts: { only: string[]; prefix: string }; + }; + expect(marker.opts.only).toEqual(["query"]); + expect(marker.opts.prefix).toBe("q_"); + }); + + test("throws when factory has no pluginName", () => { + const missing = () => ({ name: "nope" }); + expect(() => + fromPlugin(missing as unknown as { readonly pluginName: string }), + ).toThrow(/missing pluginName/); + }); + + test("FROM_PLUGIN_MARKER is a globally-interned symbol", () => { + expect(FROM_PLUGIN_MARKER).toBe( + Symbol.for("@databricks/appkit.fromPluginMarker"), + ); + }); +}); + +describe("isFromPluginMarker", () => { + test("returns true for real markers", () => { + const spread = fromPlugin(fakeFactory("analytics")); + const sym = Object.getOwnPropertySymbols(spread)[0]!; + expect(isFromPluginMarker((spread as Record)[sym])).toBe( + true, + ); + }); + + test("returns false for objects without the brand", () => { + expect(isFromPluginMarker({ pluginName: "x" })).toBe(false); + expect(isFromPluginMarker(null)).toBe(false); + expect(isFromPluginMarker(undefined)).toBe(false); + expect(isFromPluginMarker("string")).toBe(false); + }); +}); diff --git a/packages/appkit/src/plugins/agents/tests/mcp-server-helper.test.ts b/packages/appkit/src/core/agent/tests/mcp-server-helper.test.ts similarity index 100% rename from packages/appkit/src/plugins/agents/tests/mcp-server-helper.test.ts rename to packages/appkit/src/core/agent/tests/mcp-server-helper.test.ts diff --git a/packages/appkit/src/core/agent/tests/run-agent.test.ts b/packages/appkit/src/core/agent/tests/run-agent.test.ts index 55b3e7b39..9ead2c177 100644 --- a/packages/appkit/src/core/agent/tests/run-agent.test.ts +++ b/packages/appkit/src/core/agent/tests/run-agent.test.ts @@ -3,13 +3,18 @@ import type { AgentEvent, AgentInput, AgentRunContext, + AgentToolDefinition, + PluginConstructor, + PluginData, + ToolProvider, } from "shared"; import { describe, expect, test, vi } from "vitest"; import { z } from "zod"; import { createAgent } from "../create-agent"; +import { fromPlugin } from "../from-plugin"; import { runAgent } from "../run-agent"; -import { tool } from "../../../core/agent/tools/tool"; -import type { ToolkitEntry } from "../../../core/agent/types"; +import { tool } from "../tools/tool"; +import type { ToolkitEntry } from "../types"; function scriptedAdapter(events: AgentEvent[]): AgentAdapter { return { @@ -84,6 +89,97 @@ describe("runAgent", () => { expect(weatherFn).toHaveBeenCalledWith({ city: "NYC" }); }); + test("resolves fromPlugin markers against RunAgentInput.plugins", async () => { + const pingExec = vi.fn(async () => "pong"); + class FakePlugin implements ToolProvider { + static manifest = { name: "ping" }; + static DEFAULT_CONFIG = {}; + name = "ping"; + constructor(public config: unknown) {} + async setup() {} + injectRoutes() {} + getEndpoints() { + return {}; + } + getAgentTools(): AgentToolDefinition[] { + return [ + { + name: "ping", + description: "ping", + parameters: { type: "object", properties: {} }, + }, + ]; + } + executeAgentTool = pingExec; + } + + const factory = () => ({ + plugin: FakePlugin as unknown as PluginConstructor, + config: {}, + name: "ping" as const, + }); + Object.defineProperty(factory, "pluginName", { + value: "ping", + enumerable: true, + }); + + let capturedCtx: AgentRunContext | null = null; + const adapter: AgentAdapter = { + async *run(_input, context) { + capturedCtx = context; + yield { type: "message_delta", content: "" }; + }, + }; + + const def = createAgent({ + instructions: "x", + model: adapter, + tools: { + ...fromPlugin(factory as unknown as { readonly pluginName: string }), + }, + }); + + const pluginData = factory() as PluginData< + PluginConstructor, + unknown, + string + >; + + await runAgent(def, { messages: "hi", plugins: [pluginData] }); + expect(capturedCtx).not.toBeNull(); + // biome-ignore lint/style/noNonNullAssertion: asserted above + const result = await capturedCtx!.executeTool("ping.ping", {}); + expect(result).toBe("pong"); + expect(pingExec).toHaveBeenCalled(); + }); + + test("throws with guidance when fromPlugin marker has no matching plugin", async () => { + const factory = () => ({ name: "absent" as const }); + Object.defineProperty(factory, "pluginName", { + value: "absent", + enumerable: true, + }); + + const adapter: AgentAdapter = { + async *run(_input, _context) { + yield { type: "message_delta", content: "" }; + }, + }; + + const def = createAgent({ + instructions: "x", + model: adapter, + tools: { + ...fromPlugin(factory as unknown as { readonly pluginName: string }), + }, + }); + + await expect(runAgent(def, { messages: "hi" })).rejects.toThrow(/absent/); + await expect(runAgent(def, { messages: "hi" })).rejects.toThrow( + /Available:/, + ); + }); + test("throws a clear error when a ToolkitEntry is invoked", async () => { const toolkitEntry: ToolkitEntry = { __toolkitRef: true, diff --git a/packages/appkit/src/core/agent/tool-dispatch.ts b/packages/appkit/src/core/agent/tool-dispatch.ts new file mode 100644 index 000000000..728f57990 --- /dev/null +++ b/packages/appkit/src/core/agent/tool-dispatch.ts @@ -0,0 +1,97 @@ +import type express from "express"; +import type { AppKitMcpClient } from "../../connectors/mcp"; +import type { PluginContext } from "../plugin-context"; +import type { ResolvedToolEntry } from "./types"; + +interface ToolDispatchContext { + /** + * The originating HTTP request. Used by `toolkit` entries to scope execution + * to the caller's user context (`asUser(req)`) and by `mcp` entries to pick + * up the OBO bearer token from `x-forwarded-access-token`. + */ + req: express.Request; + /** Cancellation signal, forwarded to the tool implementation. */ + signal: AbortSignal; + /** + * PluginContext mediator — required to dispatch `toolkit` entries. Absent in + * unit tests that construct `AgentsPlugin` directly; callers may pass + * `null` / `undefined`, in which case toolkit calls throw a clear error. + */ + pluginContext?: PluginContext | null; + /** Live MCP client. Required for `mcp` entries. */ + mcpClient?: AppKitMcpClient | null; + /** + * Delegates a sub-agent invocation. The closure owns the recursion depth so + * the dispatcher itself remains depth-agnostic — the top-level caller + * passes `depth = 1`, and a sub-agent's inner dispatcher passes `depth + 1`. + */ + runSubAgent: (agentName: string, args: unknown) => Promise; +} + +/** + * Fan-out a resolved tool entry to the correct executor. One place to add a + * fifth `source` variant; `never`-typed default forces every caller to + * update in lockstep. + * + * This only handles dispatch — result normalisation (`normalizeToolResult`), + * budget counting, and approval gating remain at the call site, where each + * stream has different policies. + */ +export async function dispatchToolCall( + entry: ResolvedToolEntry, + args: unknown, + ctx: ToolDispatchContext, +): Promise { + switch (entry.source) { + case "toolkit": { + if (!ctx.pluginContext) { + throw new Error( + "Plugin tool execution requires PluginContext; " + + "this should never happen through createApp.", + ); + } + return ctx.pluginContext.executeTool( + ctx.req, + entry.pluginName, + entry.localName, + args, + ctx.signal, + ); + } + case "function": + return entry.functionTool.execute(args as Record); + case "mcp": { + if (!ctx.mcpClient) throw new Error("MCP client not connected"); + return ctx.mcpClient.callTool( + entry.mcpToolName, + args, + extractOboMcpAuth(ctx.req), + ); + } + case "subagent": + return ctx.runSubAgent(entry.agentName, args); + default: { + // Exhaustiveness guard: adding a new `source` to ResolvedToolEntry + // without teaching this switch breaks the build here. + const _exhaustive: never = entry; + throw new Error( + `Unsupported tool source: ${(_exhaustive as ResolvedToolEntry).source}`, + ); + } + } +} + +/** + * Extracts the caller's OBO bearer token from the standard Databricks Apps + * forwarded-auth header. MCP destinations that `forwardWorkspaceAuth` admits + * as same-origin will receive this header; non-workspace destinations drop + * it inside {@link AppKitMcpClient.callTool}. + */ +function extractOboMcpAuth( + req: express.Request, +): Record | undefined { + const oboToken = req.headers["x-forwarded-access-token"]; + return typeof oboToken === "string" + ? { Authorization: `Bearer ${oboToken}` } + : undefined; +} diff --git a/packages/appkit/src/core/agent/toolkit-resolver.ts b/packages/appkit/src/core/agent/toolkit-resolver.ts new file mode 100644 index 000000000..8ec8cf1f7 --- /dev/null +++ b/packages/appkit/src/core/agent/toolkit-resolver.ts @@ -0,0 +1,62 @@ +import type { ToolProvider } from "shared"; +import type { ToolkitEntry, ToolkitOptions } from "./types"; + +/** + * Internal interface: a `ToolProvider` that optionally exposes a typed + * `.toolkit(opts)` method. Core plugins (analytics, files, genie, lakebase) + * implement this; third-party `ToolProvider`s may not. + */ +type MaybeToolkitProvider = ToolProvider & { + toolkit?: (opts?: ToolkitOptions) => Record; +}; + +/** + * Resolve a plugin's tools into a keyed record of {@link ToolkitEntry} markers + * ready to be merged into an agent's tool index. + * + * Preferred path: call the plugin's own `.toolkit(opts)` method, which + * typically delegates to `buildToolkitEntries` with full `ToolkitOptions` + * support (prefix, only, except, rename). + * + * Fallback path: when the plugin doesn't expose `.toolkit()` (e.g. a + * third-party `ToolProvider` built with plain `toPlugin`), walk + * `getAgentTools()` and synthesize namespaced keys (`${pluginName}.${name}`) + * while still honoring `only` / `except` / `rename` / `prefix`. + * + * This helper is the single source of truth for "turn a provider into a + * toolkit entry record" and is used by `AgentsPlugin.buildToolIndex` + * (both the `fromPlugin` resolution pass and auto-inherit) and by the + * standalone `runAgent` executor. + */ +export function resolveToolkitFromProvider( + pluginName: string, + provider: ToolProvider, + opts?: ToolkitOptions, +): Record { + const withToolkit = provider as MaybeToolkitProvider; + if (typeof withToolkit.toolkit === "function") { + return withToolkit.toolkit(opts); + } + + const only = opts?.only ? new Set(opts.only) : null; + const except = opts?.except ? new Set(opts.except) : null; + const rename = opts?.rename ?? {}; + const prefix = opts?.prefix ?? `${pluginName}.`; + + const out: Record = {}; + for (const tool of provider.getAgentTools()) { + if (only && !only.has(tool.name)) continue; + if (except?.has(tool.name)) continue; + + const keyAfterPrefix = `${prefix}${tool.name}`; + const key = rename[tool.name] ?? keyAfterPrefix; + + out[key] = { + __toolkitRef: true, + pluginName, + localName: tool.name, + def: { ...tool, name: key }, + }; + } + return out; +} diff --git a/packages/appkit/src/core/agent/types.ts b/packages/appkit/src/core/agent/types.ts index 14366e9ab..ce3d0463f 100644 --- a/packages/appkit/src/core/agent/types.ts +++ b/packages/appkit/src/core/agent/types.ts @@ -6,6 +6,7 @@ import type { ToolAnnotations, } from "shared"; import type { McpHostPolicyConfig } from "../../connectors/mcp"; +import type { FromPluginMarker } from "./from-plugin"; import type { FunctionTool } from "./tools/function-tool"; import type { HostedTool } from "./tools/hosted-tools"; @@ -62,6 +63,16 @@ export type BaseSystemPromptOption = | string | ((ctx: PromptContext) => string); +/** + * Per-agent tool record. String keys map to inline tools, toolkit entries, + * hosted tools, etc. Symbol keys hold `FromPluginMarker` references produced + * by `fromPlugin(factory)` spreads — these are resolved at + * `AgentsPlugin.setup()` time against registered `ToolProvider` plugins. + */ +export type AgentTools = { [key: string]: AgentTool } & { + [key: symbol]: FromPluginMarker; +}; + export interface AgentDefinition { /** Filled in from the enclosing key when used in `agents: { foo: def }`. */ name?: string; @@ -74,7 +85,7 @@ export interface AgentDefinition { */ model?: AgentAdapter | Promise | string; /** Per-agent tool record. Key is the LLM-visible tool-call name. */ - tools?: Record; + tools?: AgentTools; /** Sub-agents, exposed as `agent-` tools on this agent. */ agents?: Record; /** Override the plugin's baseSystemPrompt for this agent only. */ diff --git a/packages/appkit/src/plugins/agents/agents.ts b/packages/appkit/src/plugins/agents/agents.ts index 3bfe45cbf..80cda55e4 100644 --- a/packages/appkit/src/plugins/agents/agents.ts +++ b/packages/appkit/src/plugins/agents/agents.ts @@ -15,11 +15,13 @@ import type { ToolProvider, } from "shared"; import { AppKitMcpClient, buildMcpHostPolicy } from "../../connectors/mcp"; +import { getWorkspaceClient } from "../../context"; +import { consumeAdapterStream } from "../../core/agent/consume-adapter-stream"; +import { isFromPluginMarker } from "../../core/agent/from-plugin"; import { loadAgentsFromDir } from "../../core/agent/load-agents"; -import { - buildBaseSystemPrompt, - composeSystemPrompt, -} from "../../core/agent/system-prompt"; +import { AgentRunner } from "../../core/agent/runner"; +import { dispatchToolCall } from "../../core/agent/tool-dispatch"; +import { resolveToolkitFromProvider } from "../../core/agent/toolkit-resolver"; import { functionToolToDefinition, isFunctionTool, @@ -29,8 +31,6 @@ import { import type { AgentDefinition, AgentsPluginConfig, - BaseSystemPromptOption, - PromptContext, RegisteredAgent, ResolvedToolEntry, } from "../../core/agent/types"; @@ -41,7 +41,14 @@ import type { PluginManifest } from "../../registry"; import { agentStreamDefaults } from "./defaults"; import { EventChannel } from "./event-channel"; import { AgentEventTranslator } from "./event-translator"; +import { + type ApprovalCheck, + HttpToolExecutor, + type ToolBudget, +} from "./http-tool-executor"; import manifest from "./manifest.json"; +import { composePromptForAgent, normalizeAutoInherit } from "./prompt"; +import { printRegistry } from "./registry-printer"; import { approvalRequestSchema, chatRequestSchema, @@ -69,15 +76,21 @@ export class AgentsPlugin extends Plugin implements ToolProvider { protected declare config: AgentsPluginConfig; - private agents = new Map(); - private defaultAgentName: string | null = null; - private activeStreams = new Map< + /** @internal - Mutated by route handlers and tool-execution helpers. */ + agents = new Map(); + /** @internal */ + defaultAgentName: string | null = null; + /** @internal */ + activeStreams = new Map< string, { controller: AbortController; userId: string } >(); - private mcpClient: AppKitMcpClient | null = null; - private threadStore; - private approvalGate = new ToolApprovalGate(); + /** @internal */ + mcpClient: AppKitMcpClient | null = null; + /** @internal */ + threadStore; + /** @internal */ + approvalGate = new ToolApprovalGate(); constructor(config: AgentsPluginConfig) { super(config); @@ -100,8 +113,8 @@ export class AgentsPlugin extends Plugin implements ToolProvider { } } - /** Effective approval policy with defaults applied. */ - private get resolvedApprovalPolicy(): { + /** Effective approval policy with defaults applied. @internal */ + get resolvedApprovalPolicy(): { requireForDestructive: boolean; timeoutMs: number; } { @@ -112,8 +125,8 @@ export class AgentsPlugin extends Plugin implements ToolProvider { }; } - /** Effective DoS limits with defaults applied. */ - private get resolvedLimits(): { + /** Effective DoS limits with defaults applied. @internal */ + get resolvedLimits(): { maxConcurrentStreamsPerUser: number; maxToolCalls: number; maxSubAgentDepth: number; @@ -126,8 +139,8 @@ export class AgentsPlugin extends Plugin implements ToolProvider { }; } - /** Count active streams owned by a given user. */ - private countUserStreams(userId: string): number { + /** Count active streams owned by a given user. @internal */ + countUserStreams(userId: string): number { let n = 0; for (const entry of this.activeStreams.values()) { if (entry.userId === userId) n++; @@ -138,7 +151,7 @@ export class AgentsPlugin extends Plugin implements ToolProvider { async setup() { await this.loadAgents(); this.mountInvocationsRoute(); - this.printRegistry(); + printRegistry(this.agents, this.defaultAgentName); } /** @@ -304,7 +317,7 @@ export class AgentsPlugin extends Plugin implements ToolProvider { ); } catch (err) { throw new Error( - `Agent '${name}' has no model configured and no DATABRICKS_AGENT_ENDPOINT default available`, + `Agent '${name}' has no model configured and no DATABRICKS_SERVING_ENDPOINT_NAME default available`, { cause: err instanceof Error ? err : undefined }, ); } @@ -327,7 +340,11 @@ export class AgentsPlugin extends Plugin implements ToolProvider { src: AgentSource, ): Promise> { const index = new Map(); - const hasExplicitTools = def.tools && Object.keys(def.tools).length > 0; + const toolsRecord = def.tools ?? {}; + const hasExplicitTools = + def.tools !== undefined && + (Object.keys(toolsRecord).length > 0 || + Object.getOwnPropertySymbols(toolsRecord).length > 0); const hasExplicitSubAgents = def.agents && Object.keys(def.agents).length > 0; @@ -366,10 +383,14 @@ export class AgentsPlugin extends Plugin implements ToolProvider { }); } - // 2. Explicit tools (toolkit entries, function tools, hosted tools) + // 2. fromPlugin markers — resolve against registered ToolProviders first so + // explicit string-keyed tools can still overwrite on the same key. + this.resolveFromPluginMarkers(agentName, toolsRecord, index); + + // 3. Explicit tools (toolkit entries, function tools, hosted tools) const hostedToCollect: import("../../core/agent/tools/hosted-tools").HostedTool[] = []; - for (const [key, tool] of Object.entries(def.tools ?? {})) { + for (const [key, tool] of Object.entries(toolsRecord)) { if (isToolkitEntry(tool)) { index.set(key, { source: "toolkit", @@ -421,32 +442,19 @@ export class AgentsPlugin extends Plugin implements ToolProvider { provider, } of this.context.getToolProviders()) { if (pluginName === this.name) continue; - const withToolkit = provider as ToolProvider & { - toolkit?: (opts?: unknown) => Record; - }; - if (typeof withToolkit.toolkit === "function") { - const entries = withToolkit.toolkit() as Record; - for (const [key, maybeEntry] of Object.entries(entries)) { - if (!isToolkitEntry(maybeEntry)) continue; - if (maybeEntry.autoInheritable !== true) { - recordSkip(maybeEntry.pluginName, maybeEntry.localName); - continue; - } - index.set(key, { - source: "toolkit", - pluginName: maybeEntry.pluginName, - localName: maybeEntry.localName, - def: { ...maybeEntry.def, name: key }, - }); - inherited.push(key); + const entries = resolveToolkitFromProvider(pluginName, provider); + for (const [key, entry] of Object.entries(entries)) { + if (entry.autoInheritable !== true) { + recordSkip(entry.pluginName, entry.localName); + continue; } - continue; - } - // Fallback: providers without a toolkit() still expose getAgentTools(). - // These cannot be selectively opted in per tool, so we conservatively - // skip them during auto-inherit and require explicit `tools:` wiring. - for (const tool of provider.getAgentTools()) { - recordSkip(pluginName, tool.name); + index.set(key, { + source: "toolkit", + pluginName: entry.pluginName, + localName: entry.localName, + def: { ...entry.def, name: key }, + }); + inherited.push(key); } } @@ -474,39 +482,74 @@ export class AgentsPlugin extends Plugin implements ToolProvider { } } + /** + * Walks the symbol-keyed `fromPlugin` markers in an agent's `tools` record + * and resolves each one against a registered `ToolProvider`. Throws with a + * helpful `Available: …` listing if a referenced plugin isn't registered. + */ + private resolveFromPluginMarkers( + agentName: string, + toolsRecord: Record, + index: Map, + ): void { + const symbolKeys = Object.getOwnPropertySymbols(toolsRecord); + if (symbolKeys.length === 0) return; + + const providers = this.context?.getToolProviders() ?? []; + + for (const sym of symbolKeys) { + const marker = (toolsRecord as Record)[sym]; + if (!isFromPluginMarker(marker)) continue; + + const providerEntry = providers.find((p) => p.name === marker.pluginName); + if (!providerEntry) { + const available = providers.map((p) => p.name).join(", ") || "(none)"; + throw new Error( + `Agent '${agentName}' references plugin '${marker.pluginName}' via ` + + `fromPlugin(), but that plugin is not registered in createApp. ` + + `Available: ${available}.`, + ); + } + + const entries = resolveToolkitFromProvider( + marker.pluginName, + providerEntry.provider, + marker.opts, + ); + for (const [key, entry] of Object.entries(entries)) { + index.set(key, { + source: "toolkit", + pluginName: entry.pluginName, + localName: entry.localName, + def: { ...entry.def, name: key }, + }); + } + } + } + private async connectHostedTools( hostedTools: import("../../core/agent/tools/hosted-tools").HostedTool[], index: Map, ): Promise { - let host: string | undefined; - let authenticate: () => Promise>; - - try { - const { getWorkspaceClient } = await import("../../context"); - const wsClient = getWorkspaceClient(); - await wsClient.config.ensureResolved(); - host = wsClient.config.host; - authenticate = async () => { - const headers = new Headers(); - await wsClient.config.authenticate(headers); - return Object.fromEntries(headers.entries()); - }; - } catch { - host = process.env.DATABRICKS_HOST; - authenticate = async (): Promise> => { - const token = process.env.DATABRICKS_TOKEN; - return token ? { Authorization: `Bearer ${token}` } : {}; - }; - } + const wsClient = await this.resolveWorkspaceClient(); + await wsClient.config.ensureResolved(); + const host = wsClient.config.host; if (!host) { logger.warn( - "No Databricks host available — skipping %d hosted tool(s)", + "No Databricks host available — skipping %d hosted tool(s). " + + "Set DATABRICKS_HOST or configure a profile in ~/.databrickscfg.", hostedTools.length, ); return; } + const authenticate = async (): Promise> => { + const headers = new Headers(); + await wsClient.config.authenticate(headers); + return Object.fromEntries(headers.entries()); + }; + if (!this.mcpClient) { const policy = buildMcpHostPolicy(this.config.mcp, host); this.mcpClient = new AppKitMcpClient(host, authenticate, policy); @@ -524,6 +567,23 @@ export class AgentsPlugin extends Plugin implements ToolProvider { } } + /** + * Return the ambient workspace client from {@link getWorkspaceClient} when + * `ServiceContext` is initialized (the normal `createApp` path). Fall back + * to a fresh `WorkspaceClient()` that walks the SDK's credential chain — + * `DATABRICKS_HOST` / `DATABRICKS_TOKEN`, `~/.databrickscfg` profiles, + * DAB auth, OAuth, metadata service — for test rigs and manual embeds + * that never ran through `createApp`. + */ + private async resolveWorkspaceClient() { + try { + return getWorkspaceClient(); + } catch { + const { WorkspaceClient } = await import("@databricks/sdk-experimental"); + return new WorkspaceClient({}); + } + } + // ----------------- ToolProvider (no tools of our own) -------------------- getAgentTools(): AgentToolDefinition[] { @@ -719,112 +779,53 @@ export class AgentsPlugin extends Plugin implements ToolProvider { this.activeStreams.set(requestId, { controller: abortController, userId }); const tools = Array.from(registered.toolIndex.values()).map((e) => e.def); - const approvalPolicy = this.resolvedApprovalPolicy; const limits = this.resolvedLimits; const outboundEvents = new EventChannel(); const translator = new AgentEventTranslator(); - // Per-run tool-call budget (shared across the top-level adapter and any - // sub-agents it delegates to). Counted pre-dispatch so a prompt-injected + // Per-run tool-call budget shared across the top-level adapter and any + // sub-agents it delegates to. Counted pre-dispatch so a prompt-injected // agent cannot drain the budget silently via denied calls. - let toolCallsUsed = 0; - - const executeTool = async ( - name: string, - args: unknown, - ): Promise => { - if (toolCallsUsed >= limits.maxToolCalls) { - abortController.abort( - new Error( - `Tool-call budget exhausted (limit ${limits.maxToolCalls}).`, - ), - ); - throw new Error( - `Tool-call budget exhausted (limit ${limits.maxToolCalls}). Raise agents({ limits: { maxToolCalls } }) or review the agent's tool-selection logic.`, - ); - } - toolCallsUsed++; - - const entry = registered.toolIndex.get(name); - if (!entry) throw new Error(`Unknown tool: ${name}`); - - if ( - approvalPolicy.requireForDestructive && - entry.def.annotations?.destructive === true - ) { - const approvalId = randomUUID(); - for (const ev of translator.translate({ - type: "approval_pending", - approvalId, - streamId: requestId, - toolName: name, - args, - annotations: entry.def.annotations, - })) { - outboundEvents.push(ev); - } - const decision = await this.approvalGate.wait({ - approvalId, - streamId: requestId, - userId, - timeoutMs: approvalPolicy.timeoutMs, - }); - if (decision === "deny") { - return `Tool execution denied by user approval gate (tool: ${name}).`; - } - } - - let result: unknown; - if (entry.source === "toolkit") { - if (!this.context) { - throw new Error( - "Plugin tool execution requires PluginContext; this should never happen through createApp", - ); - } - result = await this.context.executeTool( + const budget: ToolBudget = { used: 0, limit: limits.maxToolCalls }; + + const executor = new HttpToolExecutor({ + toolIndex: registered.toolIndex, + approvalPolicy: this.resolvedApprovalPolicy, + approvalGate: this.approvalGate, + translator, + outboundEvents, + abortController, + budget, + req, + streamId: requestId, + userId, + pluginContext: this.context, + mcpClient: this.mcpClient, + runSubAgent: (agentName, subArgs, subSignal, forwardEvent, check) => { + const childAgent = this.agents.get(agentName); + if (!childAgent) throw new Error(`Sub-agent not found: ${agentName}`); + return this.runSubAgent( req, - entry.pluginName, - entry.localName, - args, - signal, - ); - } else if (entry.source === "function") { - result = await entry.functionTool.execute( - args as Record, - ); - } else if (entry.source === "mcp") { - if (!this.mcpClient) throw new Error("MCP client not connected"); - const oboToken = req.headers["x-forwarded-access-token"]; - const mcpAuth = - typeof oboToken === "string" - ? { Authorization: `Bearer ${oboToken}` } - : undefined; - result = await this.mcpClient.callTool( - entry.mcpToolName, - args, - mcpAuth, + childAgent, + subArgs, + subSignal, + 1, + forwardEvent, + check, ); - } else if (entry.source === "subagent") { - const childAgent = this.agents.get(entry.agentName); - if (!childAgent) - throw new Error(`Sub-agent not found: ${entry.agentName}`); - result = await this.runSubAgent(req, childAgent, args, signal, 1); - } + }, + }); - // A `void` / `undefined` return is a legitimate tool outcome (e.g., a - // "send notification" side-effecting tool). Return an empty string so - // the LLM sees a successful-but-empty result rather than a bogus - // "execution failed" error. - if (result === undefined) { - return ""; - } - const MAX = 50_000; - const serialized = - typeof result === "string" ? result : JSON.stringify(result); - if (serialized.length > MAX) { - return `${serialized.slice(0, MAX)}\n\n[Result truncated: ${serialized.length} chars exceeds ${MAX} limit]`; - } - return result; - }; + const runner = new AgentRunner({ + adapter: registered.adapter, + tools, + executeTool: executor, + signal, + onEvent: (event) => { + for (const translated of translator.translate(event)) { + outboundEvents.push(translated); + } + }, + }); // Drive the adapter and the approval-event side-channel concurrently. // Outbound events from both sources flow through `outboundEvents`; the @@ -864,36 +865,10 @@ export class AgentsPlugin extends Plugin implements ToolProvider { ...thread.messages, ]; - const stream = registered.adapter.run( - { - messages: messagesWithSystem, - tools, - threadId: thread.id, - signal, - }, - { executeTool, signal }, - ); - - // Accumulate assistant output from BOTH streaming and non-streaming - // adapters. Delta-based adapters (Databricks, Vercel AI) emit - // `message_delta` chunks that we concatenate; adapters that yield a - // single final assistant message (e.g. LangChain's `on_chain_end` - // path) emit a `message` event whose content replaces whatever - // deltas already arrived. Without the `message` branch, multi-turn - // LangChain conversations silently dropped the assistant turn from - // thread history. - let fullContent = ""; - for await (const event of stream) { - if (signal.aborted) break; - if (event.type === "message_delta") { - fullContent += event.content; - } else if (event.type === "message") { - fullContent = event.content; - } - for (const translated of translator.translate(event)) { - outboundEvents.push(translated); - } - } + const fullContent = await runner.run({ + messages: messagesWithSystem, + threadId: thread.id, + }); if (fullContent) { await this.threadStore.addMessage(thread.id, userId, { @@ -971,6 +946,23 @@ export class AgentsPlugin extends Plugin implements ToolProvider { args: unknown, signal: AbortSignal, depth: number, + /** + * Optional per-event sink installed by the parent `_streamAgent`. When + * supplied, each adapter event the child yields is passed through — + * the parent's closure forwards everything except `metadata` so the + * sub-agent's streaming text, tool invocations, and thinking blocks + * all surface to the client's SSE stream in real time. + */ + onEvent?: (event: AgentEvent) => void, + /** + * Optional approval gate injected by the parent `_streamAgent`. When + * present, sub-agent tool calls annotated `destructive: true` fire + * `appkit.approval_pending` through the parent's outbound channel and + * await the user's decision, exactly like the parent's own executeTool. + * Absent (or returning `null`) means no gate — non-destructive tools + * or approval disabled policy-wide. + */ + checkApproval?: ApprovalCheck, ): Promise { const limits = this.resolvedLimits; if (depth > limits.maxSubAgentDepth) { @@ -988,39 +980,43 @@ export class AgentsPlugin extends Plugin implements ToolProvider { : JSON.stringify(args); const childTools = Array.from(child.toolIndex.values()).map((e) => e.def); + // Sub-agent dispatch reuses the parent's approval check so a destructive + // tool fires `approval_pending` on the parent's SSE stream. Sub-agents + // do not enforce their own budget — the parent already counted the + // `agent-` invocation. const childExecute = async ( name: string, childArgs: unknown, ): Promise => { const entry = child.toolIndex.get(name); if (!entry) throw new Error(`Unknown tool in sub-agent: ${name}`); - if (entry.source === "toolkit" && this.context) { - return this.context.executeTool( - req, - entry.pluginName, - entry.localName, - childArgs, - signal, - ); - } - if (entry.source === "function") { - return entry.functionTool.execute(childArgs as Record); - } - if (entry.source === "subagent") { - const grandchild = this.agents.get(entry.agentName); - if (!grandchild) - throw new Error(`Sub-agent not found: ${entry.agentName}`); - return this.runSubAgent(req, grandchild, childArgs, signal, depth + 1); - } - if (entry.source === "mcp" && this.mcpClient) { - const oboToken = req.headers["x-forwarded-access-token"]; - const mcpAuth = - typeof oboToken === "string" - ? { Authorization: `Bearer ${oboToken}` } - : undefined; - return this.mcpClient.callTool(entry.mcpToolName, childArgs, mcpAuth); + + if (checkApproval) { + const decision = await checkApproval(entry, childArgs); + if (decision === "deny") { + return `Tool execution denied by user approval gate (tool: ${name}).`; + } } - throw new Error(`Unsupported sub-agent tool source: ${entry.source}`); + + return dispatchToolCall(entry, childArgs, { + req, + signal, + pluginContext: this.context, + mcpClient: this.mcpClient, + runSubAgent: (agentName, args) => { + const grandchild = this.agents.get(agentName); + if (!grandchild) throw new Error(`Sub-agent not found: ${agentName}`); + return this.runSubAgent( + req, + grandchild, + args, + signal, + depth + 1, + onEvent, + checkApproval, + ); + }, + }); }; const runContext: AgentRunContext = { executeTool: childExecute, signal }; @@ -1055,17 +1051,13 @@ export class AgentsPlugin extends Plugin implements ToolProvider { }, ]; - let output = ""; - const events: AgentEvent[] = []; - for await (const event of child.adapter.run( - { messages, tools: childTools, threadId: randomUUID(), signal }, - runContext, - )) { - events.push(event); - if (event.type === "message_delta") output += event.content; - else if (event.type === "message") output = event.content; - } - return output; + return consumeAdapterStream( + child.adapter.run( + { messages, tools: childTools, threadId: randomUUID(), signal }, + runContext, + ), + { signal, onEvent }, + ); } private async _handleCancel(req: express.Request, res: express.Response) { @@ -1171,22 +1163,6 @@ export class AgentsPlugin extends Plugin implements ToolProvider { return first.done ? null : first.value; } - private printRegistry(): void { - if (this.agents.size === 0) return; - console.log(""); - console.log(` ${pc.bold("Agents")} ${pc.dim(`(${this.agents.size})`)}`); - console.log(` ${pc.dim("─".repeat(60))}`); - for (const [name, reg] of this.agents) { - const tools = reg.toolIndex.size; - const marker = name === this.defaultAgentName ? pc.green("●") : " "; - console.log( - ` ${marker} ${pc.bold(name.padEnd(24))} ${pc.dim(`${tools} tools`)}`, - ); - } - console.log(` ${pc.dim("─".repeat(60))}`); - console.log(""); - } - async shutdown(): Promise { this.approvalGate.abortAll(); if (this.mcpClient) { @@ -1219,44 +1195,8 @@ export class AgentsPlugin extends Plugin implements ToolProvider { } } -function normalizeAutoInherit(value: AgentsPluginConfig["autoInheritTools"]): { - file: boolean; - code: boolean; -} { - // Default is opt-out for both origins. A markdown agent or code-defined - // agent with no declared `tools:` gets an empty tool index unless the - // developer explicitly flips `autoInheritTools` on. Even then, only tools - // whose plugin author marked `autoInheritable: true` are spread — see - // `applyAutoInherit` for the filter. - if (value === undefined) return { file: false, code: false }; - if (typeof value === "boolean") return { file: value, code: value }; - return { file: value.file ?? false, code: value.code ?? false }; -} - -function composePromptForAgent( - registered: RegisteredAgent, - pluginLevel: BaseSystemPromptOption | undefined, - ctx: PromptContext, -): string { - const perAgent = registered.baseSystemPrompt; - const resolved = perAgent !== undefined ? perAgent : pluginLevel; - - let base = ""; - if (resolved === false) { - base = ""; - } else if (typeof resolved === "string") { - base = resolved; - } else if (typeof resolved === "function") { - base = resolved(ctx); - } else { - base = buildBaseSystemPrompt(ctx); - } - - return composeSystemPrompt(base, registered.instructions); -} - /** - * Plugin factory for the agents plugin. Reads `config/agents/*.md` by default, + * Plugin factory for the agents plugin. Reads `config/agents//agent.md` by default, * resolves toolkits/tools from registered plugins, exposes `appkit.agents.*` * runtime API and mounts `/invocations`. * diff --git a/packages/appkit/src/plugins/agents/http-tool-executor.ts b/packages/appkit/src/plugins/agents/http-tool-executor.ts new file mode 100644 index 000000000..5217ab414 --- /dev/null +++ b/packages/appkit/src/plugins/agents/http-tool-executor.ts @@ -0,0 +1,243 @@ +import { randomUUID } from "node:crypto"; +import type express from "express"; +import type { AgentEvent, ResponseStreamEvent, ToolAnnotations } from "shared"; +import type { AppKitMcpClient } from "../../connectors/mcp"; +import { normalizeToolResult } from "../../core/agent/normalize-result"; +import type { ToolExecutor } from "../../core/agent/runner"; +import { dispatchToolCall } from "../../core/agent/tool-dispatch"; +import type { ResolvedToolEntry } from "../../core/agent/types"; +import type { PluginContext } from "../../core/plugin-context"; +import type { EventChannel } from "./event-channel"; +import type { AgentEventTranslator } from "./event-translator"; +import type { ToolApprovalGate } from "./tool-approval-gate"; + +/** + * Decision returned by the approval check. `null` means "no gate fires" + * (tool isn't gated, or policy disabled gating). `"approve"` / `"deny"` + * mirror the user's submission via `POST /approve`. + */ +export type ApprovalDecision = "approve" | "deny" | null; + +/** + * Approval-check function reused by both the parent stream's executor and + * any sub-agent executors it spawns. Lifted to a callable so sub-agents + * can share the parent's translator + outboundEvents + approvalGate. + */ +export type ApprovalCheck = ( + entry: ResolvedToolEntry, + args: unknown, +) => Promise; + +/** + * Sub-agent runner injected by the plugin. Returns the sub-agent's + * concatenated text output to hand back to the parent adapter as the + * tool result. Hidden behind a callback so the executor doesn't need to + * import the plugin class (cycle). + */ +type RunSubAgentFn = ( + agentName: string, + args: unknown, + signal: AbortSignal, + forwardEvent: (e: AgentEvent) => void, + checkApproval: ApprovalCheck, +) => Promise; + +/** + * Mutable per-run tool-call budget. Shared by reference between the + * top-level executor and any sub-agent executors so `maxToolCalls` is + * enforced across the whole run, not per-agent. + */ +export interface ToolBudget { + used: number; + limit: number; +} + +interface HttpToolExecutorDeps { + toolIndex: Map; + /** Approval policy as resolved from `agents({ approval: ... })`. */ + approvalPolicy: { requireForDestructive: boolean; timeoutMs: number }; + approvalGate: ToolApprovalGate; + /** Translator used to emit `approval_pending` to the SSE stream. */ + translator: AgentEventTranslator; + /** Channel the SSE stream drains. Approval events are pushed here. */ + outboundEvents: EventChannel; + /** Aborted on budget exhaustion to unwind the adapter promptly. */ + abortController: AbortController; + /** + * Shared tool-call budget. Pass the same object to every executor in the + * run (top-level + sub-agents) so the cap is global. Pass `null` for + * sub-agent executors that should not count against the budget — only + * the parent enforces, mirroring the original closure's behaviour. + */ + budget: ToolBudget | null; + /** OBO source: forwarded to dispatchToolCall for plugin-tool dispatch. */ + req: express.Request; + /** SSE stream id (used for approval gate scoping + telemetry). */ + streamId: string; + /** Authenticated user id, scoped per-stream by `_handleApprove`. */ + userId: string; + /** PluginContext for OBO tool dispatch. May be undefined in tests. */ + pluginContext: PluginContext | undefined; + /** MCP client for hosted-tool dispatch. May be null pre-connect. */ + mcpClient: AppKitMcpClient | null; + /** Plugin-supplied factory that runs a sub-agent. */ + runSubAgent: RunSubAgentFn; +} + +/** + * HTTP-path tool executor for the streaming chat surface. + * + * Wraps the same logic that used to live as a closure inside + * `_streamAgent`: per-run budget, the approval gate, OBO dispatch via + * {@link dispatchToolCall}, sub-agent recursion, and event forwarding. + * + * Sub-agents share the parent's `translator`, `outboundEvents`, + * `approvalGate`, and `abortController` (so a sub-agent's destructive + * tool surfaces an `approval_pending` event on the parent's SSE stream + * and a sub-agent's budget exhaustion aborts the whole run). The + * `budget` is null for sub-agents so they don't double-count against + * the top-level cap — the parent already incremented when it dispatched + * the `agent-` call. + */ +export class HttpToolExecutor implements ToolExecutor { + constructor(private deps: HttpToolExecutorDeps) {} + + async execute( + name: string, + args: unknown, + signal: AbortSignal, + ): Promise { + const { budget, abortController } = this.deps; + + if (budget) { + if (budget.used >= budget.limit) { + abortController.abort( + new Error(`Tool-call budget exhausted (limit ${budget.limit}).`), + ); + throw new Error( + `Tool-call budget exhausted (limit ${budget.limit}). ` + + "Raise agents({ limits: { maxToolCalls } }) or review the agent's tool-selection logic.", + ); + } + budget.used++; + } + + const entry = this.deps.toolIndex.get(name); + if (!entry) throw new Error(`Unknown tool: ${name}`); + + const decision = await this.checkApproval(entry, args); + if (decision === "deny") { + return `Tool execution denied by user approval gate (tool: ${name}).`; + } + + // Forward events from nested sub-agents into the parent's outbound SSE + // stream so the client sees inner tool calls AND the sub-agent's + // streaming text as it's generated. Without this the user stares at + // "thinking…" for the full duration of the sub-agent run. + // + // The one exception is `metadata`: sub-agents have their own threadId, + // and forwarding it would overwrite the parent's thread state on the + // client and break multi-turn continuity. + // + // `approval_pending` is not emitted by adapters directly — it comes + // through `checkApproval()` which already pushes to the parent's + // outboundEvents — so sub-agent destructive approvals surface + // independently of this forwarder. + const forwardSubAgentEvent = (ev: AgentEvent): void => { + if (ev.type === "metadata") return; + for (const translated of this.deps.translator.translate(ev)) { + this.deps.outboundEvents.push(translated); + } + }; + + const raw = await dispatchToolCall(entry, args, { + req: this.deps.req, + signal, + pluginContext: this.deps.pluginContext, + mcpClient: this.deps.mcpClient, + runSubAgent: (agentName, subArgs) => + this.deps.runSubAgent( + agentName, + subArgs, + signal, + forwardSubAgentEvent, + this.checkApproval, + ), + }); + return normalizeToolResult(raw); + } + + /** + * Approval gate hook. Bound as an arrow so sub-agent executors can pass + * it through to {@link RunSubAgentFn} and the gate fires using the + * parent's translator + outboundEvents + approvalGate. Public so tests + * can drive it directly. + */ + readonly checkApproval: ApprovalCheck = async (entry, args) => { + const { + approvalPolicy, + approvalGate, + translator, + outboundEvents, + streamId, + userId, + } = this.deps; + if (!approvalPolicy.requireForDestructive) return null; + if (!isDestructiveToolEntry(entry)) return null; + const approvalId = randomUUID(); + for (const ev of translator.translate({ + type: "approval_pending", + approvalId, + streamId, + toolName: entry.def.name, + args, + annotations: combinedToolAnnotations(entry), + })) { + outboundEvents.push(ev); + } + return approvalGate.wait({ + approvalId, + streamId, + userId, + timeoutMs: approvalPolicy.timeoutMs, + }); + }; +} + +/** + * True when the tool should go through the approval gate. Historically + * scoped to `destructive: true` — hence the name — but now also fires for + * the semantic `effect` enum on {@link ToolAnnotations}. Any effect that + * mutates the world (`write` | `update` | `destructive`) gates; `read` and + * unannotated tools do not. `def.annotations` is the normal path; for + * `function` tools we also read `functionTool.annotations` so a mismatch + * between the spread def and the original {@link FunctionTool} cannot drop + * the hint. + */ +function isDestructiveToolEntry(entry: ResolvedToolEntry): boolean { + const defAnn = entry.def.annotations; + const fnAnn = + entry.source === "function" ? entry.functionTool.annotations : undefined; + + const effect = defAnn?.effect ?? fnAnn?.effect; + if (effect === "write" || effect === "update" || effect === "destructive") { + return true; + } + if (defAnn?.destructive === true) return true; + if (fnAnn?.destructive === true) return true; + return false; +} + +/** Merged annotations for the approval SSE payload (client UI + debugging). */ +function combinedToolAnnotations( + entry: ResolvedToolEntry, +): ToolAnnotations | undefined { + if (entry.source === "function") { + const merged: ToolAnnotations = { + ...entry.functionTool.annotations, + ...entry.def.annotations, + }; + return Object.keys(merged).length > 0 ? merged : undefined; + } + return entry.def.annotations; +} diff --git a/packages/appkit/src/plugins/agents/index.ts b/packages/appkit/src/plugins/agents/index.ts index c8a31aac3..f630cc681 100644 --- a/packages/appkit/src/plugins/agents/index.ts +++ b/packages/appkit/src/plugins/agents/index.ts @@ -1,5 +1,14 @@ -export { AgentsPlugin, agents } from "./agents"; +// Re-exports of agent primitives that now live in core/agent/. Kept here so +// the public package barrel (`@databricks/appkit`) and any callers that +// already imported via `./plugins/agents` continue to resolve unchanged. export { buildToolkitEntries } from "../../core/agent/build-toolkit"; +export { + FROM_PLUGIN_MARKER, + type FromPluginMarker, + type FromPluginSpread, + fromPlugin, + isFromPluginMarker, +} from "../../core/agent/from-plugin"; export { agentIdFromMarkdownPath, type LoadContext, @@ -12,6 +21,7 @@ export { type AgentDefinition, type AgentsPluginConfig, type AgentTool, + type AgentTools, type AutoInheritToolsConfig, type BaseSystemPromptOption, isToolkitEntry, @@ -21,3 +31,4 @@ export { type ToolkitEntry, type ToolkitOptions, } from "../../core/agent/types"; +export { AgentsPlugin, agents } from "./agents"; diff --git a/packages/appkit/src/plugins/agents/manifest.json b/packages/appkit/src/plugins/agents/manifest.json index 4571031a8..01e843a85 100644 --- a/packages/appkit/src/plugins/agents/manifest.json +++ b/packages/appkit/src/plugins/agents/manifest.json @@ -11,12 +11,12 @@ "type": "serving_endpoint", "alias": "Model Serving (agents)", "resourceKey": "agents-serving-endpoint", - "description": "Databricks Model Serving endpoint for agents using workspace-hosted models (`DatabricksAdapter.fromModelServing`). Wire the same endpoint name AppKit reads from `DATABRICKS_AGENT_ENDPOINT` when no per-agent model is configured. Omit when agents use only external adapters.", + "description": "Databricks Model Serving endpoint for agents using workspace-hosted models (`DatabricksAdapter.fromModelServing`). Wire the same endpoint name AppKit reads from `DATABRICKS_SERVING_ENDPOINT_NAME` when no per-agent model is configured. The same env var the `serving` plugin reads — one value covers both. Omit when agents use only external adapters.", "permission": "CAN_QUERY", "fields": { "name": { - "env": "DATABRICKS_AGENT_ENDPOINT", - "description": "Endpoint name passed to Model Serving when agents default to `DatabricksAdapter.fromModelServing()`" + "env": "DATABRICKS_SERVING_ENDPOINT_NAME", + "description": "Endpoint name passed to Model Serving when agents default to `DatabricksAdapter.fromModelServing()`. Shared with the `serving` plugin." } } } diff --git a/packages/appkit/src/plugins/agents/prompt.ts b/packages/appkit/src/plugins/agents/prompt.ts new file mode 100644 index 000000000..d38c6e645 --- /dev/null +++ b/packages/appkit/src/plugins/agents/prompt.ts @@ -0,0 +1,57 @@ +import { + buildBaseSystemPrompt, + composeSystemPrompt, +} from "../../core/agent/system-prompt"; +import type { + AgentsPluginConfig, + BaseSystemPromptOption, + PromptContext, + RegisteredAgent, +} from "../../core/agent/types"; + +/** + * Resolves the per-agent and plugin-level base prompt options into the + * final system prompt sent to the adapter. Per-agent setting wins over + * plugin-level; `false` opts out entirely; functions receive the same + * `PromptContext` that the default builder uses. + */ +export function composePromptForAgent( + registered: RegisteredAgent, + pluginLevel: BaseSystemPromptOption | undefined, + ctx: PromptContext, +): string { + const perAgent = registered.baseSystemPrompt; + const resolved = perAgent !== undefined ? perAgent : pluginLevel; + + let base = ""; + if (resolved === false) { + base = ""; + } else if (typeof resolved === "string") { + base = resolved; + } else if (typeof resolved === "function") { + base = resolved(ctx); + } else { + base = buildBaseSystemPrompt(ctx); + } + + return composeSystemPrompt(base, registered.instructions); +} + +/** + * Resolves the plugin-level `autoInheritTools` config into a per-origin + * decision. Default is opt-out for both origins. A markdown agent or + * code-defined agent with no declared `tools:` gets an empty tool index + * unless the developer explicitly flips `autoInheritTools` on. Even then, + * only tools whose plugin author marked `autoInheritable: true` are + * spread — see `applyAutoInherit` for the filter. + */ +export function normalizeAutoInherit( + value: AgentsPluginConfig["autoInheritTools"], +): { + file: boolean; + code: boolean; +} { + if (value === undefined) return { file: false, code: false }; + if (typeof value === "boolean") return { file: value, code: value }; + return { file: value.file ?? false, code: value.code ?? false }; +} diff --git a/packages/appkit/src/plugins/agents/registry-printer.ts b/packages/appkit/src/plugins/agents/registry-printer.ts new file mode 100644 index 000000000..9231ee077 --- /dev/null +++ b/packages/appkit/src/plugins/agents/registry-printer.ts @@ -0,0 +1,25 @@ +import pc from "picocolors"; +import type { RegisteredAgent } from "../../core/agent/types"; + +/** + * Pretty-prints the registered agent set during plugin setup. Decorative — + * no behaviour change if it's skipped (e.g., from tests). + */ +export function printRegistry( + agents: Map, + defaultAgentName: string | null, +): void { + if (agents.size === 0) return; + console.log(""); + console.log(` ${pc.bold("Agents")} ${pc.dim(`(${agents.size})`)}`); + console.log(` ${pc.dim("─".repeat(60))}`); + for (const [name, reg] of agents) { + const tools = reg.toolIndex.size; + const marker = name === defaultAgentName ? pc.green("●") : " "; + console.log( + ` ${marker} ${pc.bold(name.padEnd(24))} ${pc.dim(`${tools} tools`)}`, + ); + } + console.log(` ${pc.dim("─".repeat(60))}`); + console.log(""); +} diff --git a/packages/appkit/src/plugins/agents/tests/agents-plugin.test.ts b/packages/appkit/src/plugins/agents/tests/agents-plugin.test.ts index 9c043d56f..42cb6b127 100644 --- a/packages/appkit/src/plugins/agents/tests/agents-plugin.test.ts +++ b/packages/appkit/src/plugins/agents/tests/agents-plugin.test.ts @@ -11,12 +11,26 @@ import type { import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; import { z } from "zod"; import { CacheManager } from "../../../cache"; -// Import the class directly so we can construct it without a createApp -import { AgentsPlugin } from "../agents"; import { buildToolkitEntries } from "../../../core/agent/build-toolkit"; -import { defineTool, type ToolRegistry } from "../../../core/agent/tools/define-tool"; -import type { AgentsPluginConfig, ToolkitEntry } from "../../../core/agent/types"; +import { fromPlugin } from "../../../core/agent/from-plugin"; +import { + defineTool, + type ToolRegistry, +} from "../../../core/agent/tools/define-tool"; +import { tool } from "../../../core/agent/tools/tool"; +import type { + AgentsPluginConfig, + ToolkitEntry, +} from "../../../core/agent/types"; import { isToolkitEntry } from "../../../core/agent/types"; +// Import the class directly so we can construct it without a createApp +import { AgentsPlugin } from "../agents"; + +function namedFactory(name: string) { + const f = () => ({ name }); + Object.defineProperty(f, "pluginName", { value: name, enumerable: true }); + return f as typeof f & { readonly pluginName: string }; +} interface FakeContext { providers: Array<{ name: string; provider: ToolProvider }>; @@ -364,4 +378,202 @@ describe("AgentsPlugin", () => { expect(isToolkitEntry({ foo: 1 })).toBe(false); expect(isToolkitEntry(null)).toBe(false); }); + + describe("fromPlugin markers", () => { + test("spreading fromPlugin registers all tools from the referenced plugin", async () => { + const registry: ToolRegistry = { + query: defineTool({ + description: "q", + schema: z.object({ sql: z.string() }), + handler: () => "ok", + }), + }; + const ctx = fakeContext([ + { + name: "analytics", + provider: makeToolProvider("analytics", registry), + }, + ]); + + const plugin = instantiate( + { + dir: false, + agents: { + support: { + instructions: "...", + model: stubAdapter(), + tools: { ...fromPlugin(namedFactory("analytics")) }, + }, + }, + }, + ctx, + ); + await plugin.setup(); + + const api = plugin.exports() as { + get: (name: string) => { toolIndex: Map } | null; + }; + const agent = api.get("support"); + expect(agent?.toolIndex.has("analytics.query")).toBe(true); + }); + + test("mixed inline + fromPlugin tools coexist", async () => { + const registry: ToolRegistry = { + query: defineTool({ + description: "q", + schema: z.object({ sql: z.string() }), + handler: () => "ok", + }), + }; + const ctx = fakeContext([ + { + name: "analytics", + provider: makeToolProvider("analytics", registry), + }, + ]); + + const plugin = instantiate( + { + dir: false, + agents: { + support: { + instructions: "...", + model: stubAdapter(), + tools: { + ...fromPlugin(namedFactory("analytics")), + get_weather: tool({ + name: "get_weather", + description: "Weather", + schema: z.object({ city: z.string() }), + execute: async ({ city }) => `Sunny in ${city}`, + }), + }, + }, + }, + }, + ctx, + ); + await plugin.setup(); + + const api = plugin.exports() as { + get: (name: string) => { toolIndex: Map } | null; + }; + const agent = api.get("support"); + expect(agent?.toolIndex.has("analytics.query")).toBe(true); + expect(agent?.toolIndex.has("get_weather")).toBe(true); + }); + + test("missing plugin throws at setup with Available: listing", async () => { + const ctx = fakeContext([ + { + name: "files", + provider: makeToolProvider("files", {}), + }, + ]); + + const plugin = instantiate( + { + dir: false, + agents: { + support: { + instructions: "...", + model: stubAdapter(), + tools: { ...fromPlugin(namedFactory("analytics")) }, + }, + }, + }, + ctx, + ); + await expect(plugin.setup()).rejects.toThrow(/analytics/); + await expect(plugin.setup()).rejects.toThrow(/Available:/); + await expect(plugin.setup()).rejects.toThrow(/files/); + }); + + test("symbol-only tools record disables auto-inherit", async () => { + const analyticsReg: ToolRegistry = { + query: defineTool({ + description: "q", + schema: z.object({ sql: z.string() }), + handler: () => "ok", + }), + }; + const filesReg: ToolRegistry = { + list: defineTool({ + description: "l", + schema: z.object({}), + handler: () => [], + }), + }; + const ctx = fakeContext([ + { + name: "analytics", + provider: makeToolProvider("analytics", analyticsReg), + }, + { + name: "files", + provider: makeToolProvider("files", filesReg), + }, + ]); + + const plugin = instantiate( + { + dir: false, + autoInheritTools: { code: true }, + agents: { + support: { + instructions: "...", + model: stubAdapter(), + tools: { ...fromPlugin(namedFactory("analytics")) }, + }, + }, + }, + ctx, + ); + await plugin.setup(); + + const api = plugin.exports() as { + get: (name: string) => { toolIndex: Map } | null; + }; + const agent = api.get("support"); + const toolNames = Array.from(agent?.toolIndex.keys() ?? []); + expect(toolNames.some((n) => n.startsWith("analytics."))).toBe(true); + expect(toolNames.some((n) => n.startsWith("files."))).toBe(false); + }); + + test("falls back to getAgentTools() for providers without toolkit()", async () => { + // Provider lacks .toolkit() — only getAgentTools/executeAgentTool. + const bareProvider: ToolProvider = { + getAgentTools: () => [ + { + name: "ping", + description: "ping", + parameters: { type: "object", properties: {} }, + }, + ], + executeAgentTool: vi.fn(async () => "pong"), + }; + const ctx = fakeContext([{ name: "bare", provider: bareProvider }]); + + const plugin = instantiate( + { + dir: false, + agents: { + support: { + instructions: "...", + model: stubAdapter(), + tools: { ...fromPlugin(namedFactory("bare")) }, + }, + }, + }, + ctx, + ); + await plugin.setup(); + + const api = plugin.exports() as { + get: (name: string) => { toolIndex: Map } | null; + }; + const agent = api.get("support"); + expect(agent?.toolIndex.has("bare.ping")).toBe(true); + }); + }); }); diff --git a/packages/appkit/src/plugins/agents/tests/http-tool-executor.test.ts b/packages/appkit/src/plugins/agents/tests/http-tool-executor.test.ts new file mode 100644 index 000000000..90b6d0ff4 --- /dev/null +++ b/packages/appkit/src/plugins/agents/tests/http-tool-executor.test.ts @@ -0,0 +1,357 @@ +import type express from "express"; +import type { ResponseStreamEvent } from "shared"; +import { describe, expect, test, vi } from "vitest"; +import type { ResolvedToolEntry } from "../../../core/agent/types"; +import { EventChannel } from "../event-channel"; +import { AgentEventTranslator } from "../event-translator"; +import { HttpToolExecutor, type ToolBudget } from "../http-tool-executor"; +import { ToolApprovalGate } from "../tool-approval-gate"; + +/** + * Focused tests for HttpToolExecutor — particularly the sub-agent approval + * forwarding path. The runner-level abstraction makes it tractable to drive + * the executor without spinning up a full HTTP stream: + * + * - Per-run budget gating (top-level enforces, sub-agents skip) + * - approval_pending emission to the parent's outbound channel + * - approve / deny decision flow + * - Sub-agent dispatch reuses the parent's checkApproval (the bit that + * used to be a private nested closure inside `_streamAgent` and was + * hard to test pre-refactor) + */ + +function functionEntry( + name: string, + opts?: { effect?: "write" | "destructive" }, +) { + const ann = opts?.effect ? { effect: opts.effect } : undefined; + return { + source: "function", + def: { + name, + description: `${name} tool`, + parameters: { type: "object", properties: {} }, + annotations: ann, + }, + functionTool: { + name, + description: `${name} tool`, + schema: { type: "object", properties: {} }, + annotations: ann, + execute: vi.fn(async () => `${name}-result`), + }, + } as unknown as ResolvedToolEntry; +} + +function subAgentEntry(name: string) { + return { + source: "subagent", + agentName: name, + def: { + name: `agent-${name}`, + description: `Delegate to ${name}`, + parameters: { type: "object", properties: {} }, + }, + } as unknown as ResolvedToolEntry; +} + +function mockReq(): express.Request { + return { + headers: { "x-forwarded-user": "alice" }, + header: () => "alice", + } as unknown as express.Request; +} + +function fixture(opts?: { + budget?: ToolBudget | null; + requireForDestructive?: boolean; + toolIndex?: Map; + runSubAgent?: HttpToolExecutorDepsRunSubAgent; +}) { + const outboundEvents = new EventChannel(); + const translator = new AgentEventTranslator(); + const approvalGate = new ToolApprovalGate(); + const abortController = new AbortController(); + const toolIndex = + opts?.toolIndex ?? + new Map([ + ["safe", functionEntry("safe")], + ["risky", functionEntry("risky", { effect: "write" })], + ]); + + const executor = new HttpToolExecutor({ + toolIndex, + approvalPolicy: { + requireForDestructive: opts?.requireForDestructive ?? true, + timeoutMs: 5_000, + }, + approvalGate, + translator, + outboundEvents, + abortController, + budget: opts?.budget === undefined ? { used: 0, limit: 50 } : opts.budget, + req: mockReq(), + streamId: "stream-1", + userId: "alice", + pluginContext: undefined, + mcpClient: null, + runSubAgent: + opts?.runSubAgent ?? + ((_n, _a, _s, _f, _c) => Promise.resolve("(no sub-agent)")), + }); + + return { + executor, + outboundEvents, + approvalGate, + abortController, + toolIndex, + drainEvents: async () => { + const events: ResponseStreamEvent[] = []; + // Cap reads so a hang here surfaces as a test timeout, not a leak. + for (let i = 0; i < 100; i++) { + const next = await Promise.race([ + (async () => { + for await (const ev of outboundEvents) return ev; + return null; + })(), + new Promise((r) => setTimeout(() => r(null), 10)), + ]); + if (!next) break; + events.push(next); + } + return events; + }, + }; +} + +type HttpToolExecutorDepsRunSubAgent = ConstructorParameters< + typeof HttpToolExecutor +>[0]["runSubAgent"]; + +describe("HttpToolExecutor", () => { + describe("budget", () => { + test("rejects + aborts when top-level budget is exhausted", async () => { + const { executor, abortController } = fixture({ + budget: { used: 50, limit: 50 }, + }); + + await expect( + executor.execute("safe", {}, abortController.signal), + ).rejects.toThrow(/Tool-call budget exhausted/); + + expect(abortController.signal.aborted).toBe(true); + }); + + test("budget=null skips counting (sub-agent semantics)", async () => { + const { executor, abortController } = fixture({ budget: null }); + + const r1 = await executor.execute("safe", {}, abortController.signal); + const r2 = await executor.execute("safe", {}, abortController.signal); + expect(r1).toBe("safe-result"); + expect(r2).toBe("safe-result"); + expect(abortController.signal.aborted).toBe(false); + }); + }); + + describe("approval gate", () => { + test("non-destructive tools bypass the gate", async () => { + const { executor, abortController, outboundEvents } = fixture(); + + const result = await executor.execute("safe", {}, abortController.signal); + + expect(result).toBe("safe-result"); + // Drain — there should be no approval_pending event in the channel. + outboundEvents.close(); + const events: ResponseStreamEvent[] = []; + for await (const ev of outboundEvents) events.push(ev); + const approvals = events.filter( + (e) => e.type === "appkit.approval_pending", + ); + expect(approvals).toEqual([]); + }); + + test("write-effect tool emits approval_pending and waits for decision", async () => { + const { executor, abortController, approvalGate, outboundEvents } = + fixture(); + + const promise = executor.execute("risky", {}, abortController.signal); + + // The executor pushes approval_pending synchronously and then awaits + // the gate. Settle the gate by reading the approvalId from the SSE + // payload — this mirrors what `POST /approve` does in production. + const approvalId = await readApprovalId(outboundEvents); + expect(approvalId).toBeDefined(); + + approvalGate.submit({ + approvalId, + userId: "alice", + decision: "approve", + }); + + const result = await promise; + expect(result).toBe("risky-result"); + }); + + test("denied destructive tool returns a deny string instead of dispatching", async () => { + const { executor, abortController, approvalGate, outboundEvents } = + fixture(); + + const promise = executor.execute("risky", {}, abortController.signal); + + const approvalId = await readApprovalId(outboundEvents); + + approvalGate.submit({ + approvalId, + userId: "alice", + decision: "deny", + }); + + const result = await promise; + expect(result).toMatch(/denied by user approval gate/); + }); + + test("requireForDestructive=false short-circuits the gate even on write tools", async () => { + const { executor, abortController, outboundEvents } = fixture({ + requireForDestructive: false, + }); + + const result = await executor.execute( + "risky", + {}, + abortController.signal, + ); + expect(result).toBe("risky-result"); + + outboundEvents.close(); + const seen: ResponseStreamEvent[] = []; + for await (const ev of outboundEvents) seen.push(ev); + expect(seen.some((e) => e.type === "appkit.approval_pending")).toBe( + false, + ); + }); + }); + + describe("sub-agent approval forwarding", () => { + test("destructive tool inside a sub-agent surfaces approval_pending on the parent's stream", async () => { + // This is the bit that used to be a private nested closure inside + // `_streamAgent` and was effectively untestable pre-refactor: the + // parent's `checkApproval` is passed *into* the sub-agent's runner + // so the SSE payload lands on the parent's outbound channel. + const childIndex = new Map([ + ["destroy", functionEntry("destroy", { effect: "destructive" })], + ]); + + const parentIndex = new Map([ + ["agent-worker", subAgentEntry("worker")], + ]); + + // Spy on what the runSubAgent factory receives. + const runSubAgentSpy = vi.fn( + async (_name, _args, signal, _forwardEvent, checkApproval) => { + // Sub-agent invokes its destructive tool through the parent's + // approval check, exactly as `runSubAgent.childExecute` does. + const childEntry = childIndex.get("destroy"); + if (!childEntry) throw new Error("destroy missing from child index"); + const decision = await checkApproval(childEntry, { x: 1 }); + if (decision === "deny") return "denied"; + if (signal.aborted) throw new Error("aborted"); + return "destroyed"; + }, + ); + + const { executor, approvalGate, outboundEvents, abortController } = + fixture({ + toolIndex: parentIndex, + runSubAgent: runSubAgentSpy, + }); + + const promise = executor.execute( + "agent-worker", + { input: "do it" }, + abortController.signal, + ); + + const { approvalId, toolName } = + await readApprovalDetails(outboundEvents); + + expect(toolName, "approval_pending must surface on parent stream").toBe( + "destroy", + ); + + approvalGate.submit({ + approvalId, + userId: "alice", + decision: "approve", + }); + + await expect(promise).resolves.toBe("destroyed"); + expect(runSubAgentSpy).toHaveBeenCalledTimes(1); + }); + + test("denied sub-agent tool yields a deny string handled inside the sub-agent", async () => { + const childIndex = new Map([ + ["destroy", functionEntry("destroy", { effect: "destructive" })], + ]); + const parentIndex = new Map([ + ["agent-worker", subAgentEntry("worker")], + ]); + + const runSubAgentSpy = vi.fn( + async (_name, _args, _signal, _forward, checkApproval) => { + const childEntry = childIndex.get("destroy"); + if (!childEntry) throw new Error("destroy missing from child index"); + const decision = await checkApproval(childEntry, {}); + if (decision === "deny") return "child-saw-deny"; + return "should-not-happen"; + }, + ); + + const { executor, approvalGate, outboundEvents, abortController } = + fixture({ + toolIndex: parentIndex, + runSubAgent: runSubAgentSpy, + }); + + const promise = executor.execute( + "agent-worker", + { input: "x" }, + abortController.signal, + ); + + const approvalId = await readApprovalId(outboundEvents); + + approvalGate.submit({ + approvalId, + userId: "alice", + decision: "deny", + }); + + await expect(promise).resolves.toBe("child-saw-deny"); + }); + }); +}); + +interface ApprovalEvent { + type: "appkit.approval_pending"; + approval_id: string; + tool_name: string; +} + +async function readApprovalDetails( + channel: EventChannel, +): Promise<{ approvalId: string; toolName: string }> { + for await (const ev of channel) { + if (ev.type === "appkit.approval_pending") { + const a = ev as unknown as ApprovalEvent; + return { approvalId: a.approval_id, toolName: a.tool_name }; + } + } + throw new Error("Channel closed before approval_pending arrived"); +} + +async function readApprovalId( + channel: EventChannel, +): Promise { + return (await readApprovalDetails(channel)).approvalId; +} diff --git a/template/appkit.plugins.json b/template/appkit.plugins.json index 2f22a50c1..87e4304ac 100644 --- a/template/appkit.plugins.json +++ b/template/appkit.plugins.json @@ -14,12 +14,12 @@ "type": "serving_endpoint", "alias": "Model Serving (agents)", "resourceKey": "agents-serving-endpoint", - "description": "Databricks Model Serving endpoint for agents using workspace-hosted models (`DatabricksAdapter.fromModelServing`). Wire the same endpoint name AppKit reads from `DATABRICKS_AGENT_ENDPOINT` when no per-agent model is configured. Omit when agents use only external adapters.", + "description": "Databricks Model Serving endpoint for agents using workspace-hosted models (`DatabricksAdapter.fromModelServing`). Wire the same endpoint name AppKit reads from `DATABRICKS_SERVING_ENDPOINT_NAME` when no per-agent model is configured. The same env var the `serving` plugin reads — one value covers both. Omit when agents use only external adapters.", "permission": "CAN_QUERY", "fields": { "name": { - "env": "DATABRICKS_AGENT_ENDPOINT", - "description": "Endpoint name passed to Model Serving when agents default to `DatabricksAdapter.fromModelServing()`" + "env": "DATABRICKS_SERVING_ENDPOINT_NAME", + "description": "Endpoint name passed to Model Serving when agents default to `DatabricksAdapter.fromModelServing()`. Shared with the `serving` plugin." } } }