AI-310: streaming support for @temporalio/openai-agents#2147
Conversation
Wire the @temporalio/workflow-streams primitive into the OpenAI Agents runner
so an agent running inside a Workflow can stream model events to an external
subscriber via a Workflow Stream topic, reaching parity with the Python SDK.
Streaming is exposed through run(agent, input, { stream: true }). The streaming
model Activity publishes each event live to the configured topic and returns the
full event list, which the Workflow yields deterministically on replay (it never
polls the live stream). The user hosts the WorkflowStream; the topic is plugin
config via modelParams.streamingTopic.
| export const STREAMING_TOPIC_NOT_CONFIGURED = { | ||
| type: 'StreamingTopicNotConfigured', | ||
| message: | ||
| 'Streaming requires modelParams.streamingTopic to be set on OpenAIAgentsPlugin. ' + |
There was a problem hiding this comment.
Can we update the readme to ensure we document that clients need to set modelParams / modelParams.streamingTopic on the plugin? And/or have a snippet in the readme showing how people need to update their client code to use streaming, not just the workflow side?
There was a problem hiding this comment.
Also - workflows started via schedules or the Temporal UI/CLI don't get a header at all so streaming can never be enabled for them, right? (IIUC modelParams reaches the Workflow exclusively through the __openai_agents_config header injected at workflow start by the client-side interceptor...) When you're calling configureBundler could you bake in a default modelParams to enable that use case when no header is present? Or would that force streaming to be enabled all the time, which is also not what we want? Maybe just a limitation to document?
|
|
||
| The agent passed to `RunState.fromString` must define the same tool names, handoff graph, and MCP servers as the run that produced the serialized state. | ||
|
|
||
| ### Streaming |
There was a problem hiding this comment.
Is it worth noting somewhere (here?) that the hosted WorkflowStream log also holds every event on the workflow heap until the user calls truncate(), so can basically grow unbounded till it hits some limits (looks like we enforce a 50K-event/50MB history limit - and apparently hard-terminate the workflow if it exceeds those limits)? Should we add some guidance that users using workflow streams should be conscious of periodically calling truncate()?
| throw new Error( | ||
| 'Streaming is not supported in Temporal workflows. ' + 'Use non-streaming mode with TemporalOpenAIRunner.' | ||
| ); | ||
| async *getStreamedResponse(request: ModelRequest): AsyncIterable<StreamEvent> { |
There was a problem hiding this comment.
So getResponse wraps with withGenerationSpan but getStreamedResponse does not. What are your thoughts on this / how intentional was this? You could do like
async *getStreamedResponse(request: ModelRequest): AsyncIterable<StreamEvent> {
const streamingTopic = this.modelParams.streamingTopic;
if (streamingTopic === undefined) {
throw ApplicationFailure.create({ /* unchanged guard */ });
}
const events = await withGenerationSpan(async (span) => {
span.spanData.model = this.modelName;
const input: InvokeModelStreamActivityInput = { /* unchanged construction */ };
return this.activities.invokeModelStreamActivity(input);
});
for (const event of events) {
yield fromSerializedStreamEvent(event);
}
}
if you wanted this to wrap with withGenerationSpan as well, and then you'd have better parity of semantics?
|
|
||
| const modelSummary = this.modelParams.summary; | ||
| if (modelSummary && typeof modelSummary !== 'string') { | ||
| const provider = modelSummary as ModelSummaryProvider; |
There was a problem hiding this comment.
So getResponse supports this dynamic ModelSummaryProvider branch (the executeWithOptions({ summary } below), but getStreamedResponse ignores modelParams.summary, so streamed model Activities always show the default summary in the UI. Do we want to fix that for parity?
Adds streaming to the OpenAI Agents integration by wiring in the @temporalio/workflow-streams primitive, mirroring the Python SDK. An agent running inside a Workflow can stream model events to an external subscriber via a Workflow Stream topic, exposed through
run(agent, input, { stream: true }).The streaming model Activity publishes each event live to the configured topic and returns the full event list, which the Workflow yields deterministically on replay rather than polling the live stream. The user hosts the
WorkflowStream; the topic name is plugin config viamodelParams.streamingTopic.