From 0cb26473efd0c94968776789e40d7fbad5adb5df Mon Sep 17 00:00:00 2001 From: Lars Grammel Date: Tue, 4 Mar 2025 15:48:09 +0100 Subject: [PATCH] feat (ai/core): add streamText sendStart & sendFinish data stream options (#5047) Co-authored-by: Nico Albanese <49612682+nicoalbanese@users.noreply.github.com> --- .changeset/proud-cougars-suffer.md | 5 + .../01-next/24-stream-text-multistep.mdx | 105 ++++++++++++++++++ .../01-ai-sdk-core/02-stream-text.mdx | 56 ++++++++++ .../use-chat-streamdata-multistep/route.ts | 48 ++++++++ .../use-chat-streamdata-multistep/page.tsx | 61 ++++++++++ .../__snapshots__/stream-text.test.ts.snap | 22 ++++ .../core/generate-text/stream-text-result.ts | 23 +++- .../ai/core/generate-text/stream-text.test.ts | 52 +++++++++ packages/ai/core/generate-text/stream-text.ts | 32 ++++-- packages/ui-utils/src/data-stream-parts.ts | 1 + 10 files changed, 393 insertions(+), 12 deletions(-) create mode 100644 .changeset/proud-cougars-suffer.md create mode 100644 content/cookbook/01-next/24-stream-text-multistep.mdx create mode 100644 examples/next-openai/app/api/use-chat-streamdata-multistep/route.ts create mode 100644 examples/next-openai/app/use-chat-streamdata-multistep/page.tsx diff --git a/.changeset/proud-cougars-suffer.md b/.changeset/proud-cougars-suffer.md new file mode 100644 index 000000000000..6fa86b22c137 --- /dev/null +++ b/.changeset/proud-cougars-suffer.md @@ -0,0 +1,5 @@ +--- +'ai': patch +--- + +feat (ai/core): add streamText sendStart & sendFinish data stream options diff --git a/content/cookbook/01-next/24-stream-text-multistep.mdx b/content/cookbook/01-next/24-stream-text-multistep.mdx new file mode 100644 index 000000000000..fec93204aa78 --- /dev/null +++ b/content/cookbook/01-next/24-stream-text-multistep.mdx @@ -0,0 +1,105 @@ +--- +title: streamText Multi-Step Cookbook +description: Learn how to create several streamText steps with different settings +tags: ['next', 'streaming'] +--- + +# Stream Text Multi-Step + +You may want to have different steps in your stream where each step has different settings, +e.g. models, tools, or system prompts. + +With `createDataStreamResponse` and `sendFinish` / `sendStart` options when merging +into the data stream, you can control when the finish and start events are sent to the client, +allowing you to have different steps in a single assistant UI message. + +## Server + +```typescript filename='app/api/chat/route.ts' +import { openai } from '@ai-sdk/openai'; +import { createDataStreamResponse, streamText, tool } from 'ai'; +import { z } from 'zod'; + +export async function POST(req: Request) { + const { messages } = await req.json(); + + return createDataStreamResponse({ + execute: async dataStream => { + // step 1 example: forced tool call + const result1 = streamText({ + model: openai('gpt-4o-mini', { structuredOutputs: true }), + system: 'Extract the user goal from the conversation.', + messages, + toolChoice: 'required', // force the model to call a tool + tools: { + extractGoal: tool({ + parameters: z.object({ goal: z.string() }), + execute: async ({ goal }) => goal, // no-op extract tool + }), + }, + }); + + // forward the initial result to the client without the finish event: + result1.mergeIntoDataStream(dataStream, { + experimental_sendFinish: false, // omit the finish event + }); + + // note: you can use any programming construct here, e.g. if-else, loops, etc. + // workflow programming is normal programming with this approach. + + // example: continue stream with forced tool call from previous step + const result2 = streamText({ + // different system prompt, different model, no tools: + model: openai('gpt-4o'), + system: + 'You are a helpful assistant with a different system prompt. Repeat the extract user goal in your answer.', + // continue the workflow stream with the messages from the previous step: + messages: [...messages, ...(await result1.response).messages], + }); + + // forward the 2nd result to the client (incl. the finish event): + result2.mergeIntoDataStream(dataStream, { + experimental_sendStart: false, // omit the start event + }); + }, + }); +} +``` + +## Client + +```tsx filename="app/page.tsx" +'use client'; + +import { useChat } from '@ai-sdk/react'; + +export default function Chat() { + const { messages, input, handleInputChange, handleSubmit } = useChat(); + + return ( +
+ {messages?.map(message => ( +
+ {`${message.role}: `} + {message.parts.map((part, index) => { + switch (part.type) { + case 'text': + return {part.text}; + case 'tool-invocation': { + return ( +
+                    {JSON.stringify(part.toolInvocation, null, 2)}
+                  
+ ); + } + } + })} +
+ ))} +
+ +
+
+ ); +} +``` diff --git a/content/docs/07-reference/01-ai-sdk-core/02-stream-text.mdx b/content/docs/07-reference/01-ai-sdk-core/02-stream-text.mdx index 8cda1fe6201f..0ee0df930616 100644 --- a/content/docs/07-reference/01-ai-sdk-core/02-stream-text.mdx +++ b/content/docs/07-reference/01-ai-sdk-core/02-stream-text.mdx @@ -2205,6 +2205,20 @@ To see `streamText` in action, check out [these examples](#examples). description: 'Whether to send the sources information in the stream. Defaults to false.', }, + { + name: 'experimental_sendFinish', + type: 'boolean', + isOptional: true, + description: + 'Send the finish event to the client. Set to false if you are using additional streamText calls that send additional data. Default to true.', + }, + { + name: 'experimental_sendStart', + type: 'boolean', + isOptional: true, + description: + 'Send the message start event to the client. Set to false if you are using additional streamText calls and the message start event has already been sent. Default to true.', + }, ], }, ], @@ -2282,6 +2296,20 @@ To see `streamText` in action, check out [these examples](#examples). description: 'Whether to send the sources information in the stream. Defaults to false.', }, + { + name: 'experimental_sendFinish', + type: 'boolean', + isOptional: true, + description: + 'Send the finish event to the client. Set to false if you are using additional streamText calls that send additional data. Default to true.', + }, + { + name: 'experimental_sendStart', + type: 'boolean', + isOptional: true, + description: + 'Send the message start event to the client. Set to false if you are using additional streamText calls and the message start event has already been sent. Default to true.', + }, ], }, ], @@ -2347,6 +2375,20 @@ To see `streamText` in action, check out [these examples](#examples). description: 'Whether to send the sources information in the stream. Defaults to false.', }, + { + name: 'experimental_sendFinish', + type: 'boolean', + isOptional: true, + description: + 'Send the finish event to the client. Set to false if you are using additional streamText calls that send additional data. Default to true.', + }, + { + name: 'experimental_sendStart', + type: 'boolean', + isOptional: true, + description: + 'Send the message start event to the client. Set to false if you are using additional streamText calls and the message start event has already been sent. Default to true.', + }, ], }, ], @@ -2412,6 +2454,20 @@ To see `streamText` in action, check out [these examples](#examples). description: 'Whether to send the sources information in the stream. Defaults to false.', }, + { + name: 'experimental_sendFinish', + type: 'boolean', + isOptional: true, + description: + 'Send the finish event to the client. Set to false if you are using additional streamText calls that send additional data. Default to true.', + }, + { + name: 'experimental_sendStart', + type: 'boolean', + isOptional: true, + description: + 'Send the message start event to the client. Set to false if you are using additional streamText calls and the message start event has already been sent. Default to true.', + }, ], }, ], diff --git a/examples/next-openai/app/api/use-chat-streamdata-multistep/route.ts b/examples/next-openai/app/api/use-chat-streamdata-multistep/route.ts new file mode 100644 index 000000000000..a00791049146 --- /dev/null +++ b/examples/next-openai/app/api/use-chat-streamdata-multistep/route.ts @@ -0,0 +1,48 @@ +import { openai } from '@ai-sdk/openai'; +import { createDataStreamResponse, streamText, tool } from 'ai'; +import { z } from 'zod'; + +export async function POST(req: Request) { + const { messages } = await req.json(); + + return createDataStreamResponse({ + execute: async dataStream => { + // step 1 example: forced tool call + const result1 = streamText({ + model: openai('gpt-4o-mini', { structuredOutputs: true }), + system: 'Extract the user goal from the conversation.', + messages, + toolChoice: 'required', // force the model to call a tool + tools: { + extractGoal: tool({ + parameters: z.object({ goal: z.string() }), + execute: async ({ goal }) => goal, // no-op extract tool + }), + }, + }); + + // forward the initial result to the client without the finish event: + result1.mergeIntoDataStream(dataStream, { + experimental_sendFinish: false, // omit the finish event + }); + + // note: you can use any programming construct here, e.g. if-else, loops, etc. + // workflow programming is normal programming with this approach. + + // example: continue stream with forced tool call from previous step + const result2 = streamText({ + // different system prompt, different model, no tools: + model: openai('gpt-4o'), + system: + 'You are a helpful assistant with a different system prompt. Repeat the extract user goal in your answer.', + // continue the workflow stream with the messages from the previous step: + messages: [...messages, ...(await result1.response).messages], + }); + + // forward the 2nd result to the client (incl. the finish event): + result2.mergeIntoDataStream(dataStream, { + experimental_sendStart: false, // omit the start event + }); + }, + }); +} diff --git a/examples/next-openai/app/use-chat-streamdata-multistep/page.tsx b/examples/next-openai/app/use-chat-streamdata-multistep/page.tsx new file mode 100644 index 000000000000..e588593b0ba1 --- /dev/null +++ b/examples/next-openai/app/use-chat-streamdata-multistep/page.tsx @@ -0,0 +1,61 @@ +'use client'; + +import { useChat } from '@ai-sdk/react'; + +export default function Chat() { + const { messages, input, handleInputChange, handleSubmit, data, setData } = + useChat({ api: '/api/use-chat-streamdata-multistep' }); + + return ( +
+ {data && ( + <> +
+            {JSON.stringify(data, null, 2)}
+          
+ + + )} + + {messages?.map(message => ( +
+ {`${message.role}: `} + {message.parts.map((part, index) => { + switch (part.type) { + case 'text': + return {part.text}; + case 'tool-invocation': { + return ( +
+                    {JSON.stringify(part.toolInvocation, null, 2)}
+                  
+ ); + } + } + })} +
+
+
+ ))} + +
{ + setData(undefined); // clear stream data + handleSubmit(e); + }} + > + +
+
+ ); +} diff --git a/packages/ai/core/generate-text/__snapshots__/stream-text.test.ts.snap b/packages/ai/core/generate-text/__snapshots__/stream-text.test.ts.snap index 0b3146009b07..0a9e5e56ca60 100644 --- a/packages/ai/core/generate-text/__snapshots__/stream-text.test.ts.snap +++ b/packages/ai/core/generate-text/__snapshots__/stream-text.test.ts.snap @@ -3781,6 +3781,17 @@ exports[`streamText > result.pipeDataStreamToResponse > should mask error messag ] `; +exports[`streamText > result.pipeDataStreamToResponse > should omit message finish event (d:) when sendFinish is false 1`] = ` +[ + "f:{"messageId":"msg-0"} +", + "0:"Hello, World!" +", + "e:{"finishReason":"stop","usage":{"promptTokens":3,"completionTokens":10},"isContinued":false} +", +] +`; + exports[`streamText > result.pipeDataStreamToResponse > should support custom error messages 1`] = ` [ "f:{"messageId":"msg-0"} @@ -4228,6 +4239,17 @@ exports[`streamText > result.toDataStream > should mask error messages by defaul ] `; +exports[`streamText > result.toDataStream > should omit message finish event (d:) when sendFinish is false 1`] = ` +[ + "f:{"messageId":"msg-0"} +", + "0:"Hello, World!" +", + "e:{"finishReason":"stop","usage":{"promptTokens":3,"completionTokens":10},"isContinued":false} +", +] +`; + exports[`streamText > result.toDataStream > should send reasoning content when sendReasoning is true 1`] = ` [ "f:{"messageId":"msg-0"} diff --git a/packages/ai/core/generate-text/stream-text-result.ts b/packages/ai/core/generate-text/stream-text-result.ts index 29917a6a9812..34082fb45136 100644 --- a/packages/ai/core/generate-text/stream-text-result.ts +++ b/packages/ai/core/generate-text/stream-text-result.ts @@ -13,11 +13,11 @@ import { Source } from '../types/language-model'; import { LanguageModelResponseMetadata } from '../types/language-model-response-metadata'; import { LanguageModelUsage } from '../types/usage'; import { AsyncIterableStream } from '../util/async-iterable-stream'; +import { ReasoningDetail } from './reasoning-detail'; import { StepResult } from './step-result'; import { ToolCallUnion } from './tool-call'; import { ToolResultUnion } from './tool-result'; import { ToolSet } from './tool-set'; -import { ReasoningDetail } from './reasoning-detail'; export type DataStreamOptions = { /** @@ -38,6 +38,27 @@ export type DataStreamOptions = { * Default to false. */ sendSources?: boolean; + + /** + * Send the finish event to the client. + * Set to false if you are using additional streamText calls + * that send additional data. + * Default to true. + */ + experimental_sendFinish?: boolean; + + /** + * Send the message start event to the client. + * Set to false if you are using additional streamText calls + * and the message start event has already been sent. + * Default to true. + * + * Note: this setting is currently not used, but you should + * already set it to false if you are using additional + * streamText calls that send additional data to prevent + * the message start event from being sent multiple times. + */ + experimental_sendStart?: boolean; }; /** diff --git a/packages/ai/core/generate-text/stream-text.test.ts b/packages/ai/core/generate-text/stream-text.test.ts index 748e0c958e92..aca041b0936b 100644 --- a/packages/ai/core/generate-text/stream-text.test.ts +++ b/packages/ai/core/generate-text/stream-text.test.ts @@ -937,6 +937,32 @@ describe('streamText', () => { expect(mockResponse.getDecodedChunks()).toMatchSnapshot(); }); + it('should omit message finish event (d:) when sendFinish is false', async () => { + const mockResponse = createMockServerResponse(); + + const result = streamText({ + model: createTestModel({ + stream: convertArrayToReadableStream([ + { type: 'text-delta', textDelta: 'Hello, World!' }, + { + type: 'finish', + finishReason: 'stop', + usage: { promptTokens: 3, completionTokens: 10 }, + }, + ]), + }), + ...defaultSettings(), + }); + + result.pipeDataStreamToResponse(mockResponse, { + experimental_sendFinish: false, + }); + + await mockResponse.waitForEnd(); + + expect(mockResponse.getDecodedChunks()).toMatchSnapshot(); + }); + it('should write reasoning content to a Node.js response-like object', async () => { const mockResponse = createMockServerResponse(); @@ -1211,6 +1237,32 @@ describe('streamText', () => { ).toMatchSnapshot(); }); + it('should omit message finish event (d:) when sendFinish is false', async () => { + const result = streamText({ + model: createTestModel({ + stream: convertArrayToReadableStream([ + { type: 'text-delta', textDelta: 'Hello, World!' }, + { + type: 'finish', + finishReason: 'stop', + usage: { promptTokens: 3, completionTokens: 10 }, + }, + ]), + }), + ...defaultSettings(), + }); + + const dataStream = result.toDataStream({ + experimental_sendFinish: false, + }); + + expect( + await convertReadableStreamToArray( + dataStream.pipeThrough(new TextDecoderStream()), + ), + ).toMatchSnapshot(); + }); + it('should send reasoning content when sendReasoning is true', async () => { const result = streamText({ model: modelWithReasoning, diff --git a/packages/ai/core/generate-text/stream-text.ts b/packages/ai/core/generate-text/stream-text.ts index 08519a9b4f6a..aa62b5a24755 100644 --- a/packages/ai/core/generate-text/stream-text.ts +++ b/packages/ai/core/generate-text/stream-text.ts @@ -1596,11 +1596,13 @@ However, the LLM results are expected to be small enough to not cause issues. sendUsage = true, sendReasoning = false, sendSources = false, + experimental_sendFinish = true, }: { getErrorMessage: ((error: unknown) => string) | undefined; sendUsage: boolean | undefined; sendReasoning: boolean | undefined; sendSources: boolean | undefined; + experimental_sendFinish: boolean | undefined; }): ReadableStream { return this.fullStream.pipeThrough( new TransformStream, DataStreamString>({ @@ -1726,17 +1728,19 @@ However, the LLM results are expected to be small enough to not cause issues. } case 'finish': { - controller.enqueue( - formatDataStreamPart('finish_message', { - finishReason: chunk.finishReason, - usage: sendUsage - ? { - promptTokens: chunk.usage.promptTokens, - completionTokens: chunk.usage.completionTokens, - } - : undefined, - }), - ); + if (experimental_sendFinish) { + controller.enqueue( + formatDataStreamPart('finish_message', { + finishReason: chunk.finishReason, + usage: sendUsage + ? { + promptTokens: chunk.usage.promptTokens, + completionTokens: chunk.usage.completionTokens, + } + : undefined, + }), + ); + } break; } @@ -1761,6 +1765,7 @@ However, the LLM results are expected to be small enough to not cause issues. sendUsage, sendReasoning, sendSources, + experimental_sendFinish, }: ResponseInit & DataStreamOptions & { data?: StreamData; @@ -1781,6 +1786,7 @@ However, the LLM results are expected to be small enough to not cause issues. sendUsage, sendReasoning, sendSources, + experimental_sendFinish, }), }); } @@ -1809,6 +1815,7 @@ However, the LLM results are expected to be small enough to not cause issues. sendUsage: options?.sendUsage, sendReasoning: options?.sendReasoning, sendSources: options?.sendSources, + experimental_sendFinish: options?.experimental_sendFinish, }).pipeThrough(new TextEncoderStream()); return options?.data ? mergeStreams(options?.data.stream, stream) : stream; @@ -1821,6 +1828,7 @@ However, the LLM results are expected to be small enough to not cause issues. sendUsage: options?.sendUsage, sendReasoning: options?.sendReasoning, sendSources: options?.sendSources, + experimental_sendFinish: options?.experimental_sendFinish, }), ); } @@ -1834,6 +1842,7 @@ However, the LLM results are expected to be small enough to not cause issues. sendUsage, sendReasoning, sendSources, + experimental_sendFinish, }: ResponseInit & DataStreamOptions & { data?: StreamData; @@ -1846,6 +1855,7 @@ However, the LLM results are expected to be small enough to not cause issues. sendUsage, sendReasoning, sendSources, + experimental_sendFinish, }), { status, diff --git a/packages/ui-utils/src/data-stream-parts.ts b/packages/ui-utils/src/data-stream-parts.ts index fdf309b73d6c..06dc8401b2c3 100644 --- a/packages/ui-utils/src/data-stream-parts.ts +++ b/packages/ui-utils/src/data-stream-parts.ts @@ -192,6 +192,7 @@ const finishMessageStreamPart: DataStreamPart< 'finish_message', { finishReason: LanguageModelV1FinishReason; + // TODO v5 remove usage from finish event (only on step-finish) usage?: { promptTokens: number; completionTokens: number;