Skip to content

AI-310: streaming support for @temporalio/openai-agents#2147

Open
xumaple wants to merge 1 commit into
mainfrom
maplexu/AI-310-openai-agents-streaming
Open

AI-310: streaming support for @temporalio/openai-agents#2147
xumaple wants to merge 1 commit into
mainfrom
maplexu/AI-310-openai-agents-streaming

Conversation

@xumaple

@xumaple xumaple commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Adds streaming to the OpenAI Agents integration by wiring in the @temporalio/workflow-streams primitive, mirroring the Python SDK. An agent running inside a Workflow can stream model events to an external subscriber via a Workflow Stream topic, exposed through run(agent, input, { stream: true }).

The streaming model Activity publishes each event live to the configured topic and returns the full event list, which the Workflow yields deterministically on replay rather than polling the live stream. The user hosts the WorkflowStream; the topic name is plugin config via modelParams.streamingTopic.

Wire the @temporalio/workflow-streams primitive into the OpenAI Agents runner
so an agent running inside a Workflow can stream model events to an external
subscriber via a Workflow Stream topic, reaching parity with the Python SDK.

Streaming is exposed through run(agent, input, { stream: true }). The streaming
model Activity publishes each event live to the configured topic and returns the
full event list, which the Workflow yields deterministically on replay (it never
polls the live stream). The user hosts the WorkflowStream; the topic is plugin
config via modelParams.streamingTopic.
@xumaple xumaple requested review from a team as code owners June 29, 2026 18:23
export const STREAMING_TOPIC_NOT_CONFIGURED = {
type: 'StreamingTopicNotConfigured',
message:
'Streaming requires modelParams.streamingTopic to be set on OpenAIAgentsPlugin. ' +

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the readme to ensure we document that clients need to set modelParams / modelParams.streamingTopic on the plugin? And/or have a snippet in the readme showing how people need to update their client code to use streaming, not just the workflow side?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also - workflows started via schedules or the Temporal UI/CLI don't get a header at all so streaming can never be enabled for them, right? (IIUC modelParams reaches the Workflow exclusively through the __openai_agents_config header injected at workflow start by the client-side interceptor...) When you're calling configureBundler could you bake in a default modelParams to enable that use case when no header is present? Or would that force streaming to be enabled all the time, which is also not what we want? Maybe just a limitation to document?


The agent passed to `RunState.fromString` must define the same tool names, handoff graph, and MCP servers as the run that produced the serialized state.

### Streaming

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth noting somewhere (here?) that the hosted WorkflowStream log also holds every event on the workflow heap until the user calls truncate(), so can basically grow unbounded till it hits some limits (looks like we enforce a 50K-event/50MB history limit - and apparently hard-terminate the workflow if it exceeds those limits)? Should we add some guidance that users using workflow streams should be conscious of periodically calling truncate()?

throw new Error(
'Streaming is not supported in Temporal workflows. ' + 'Use non-streaming mode with TemporalOpenAIRunner.'
);
async *getStreamedResponse(request: ModelRequest): AsyncIterable<StreamEvent> {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So getResponse wraps with withGenerationSpan but getStreamedResponse does not. What are your thoughts on this / how intentional was this? You could do like

async *getStreamedResponse(request: ModelRequest): AsyncIterable<StreamEvent> {
  const streamingTopic = this.modelParams.streamingTopic;
  if (streamingTopic === undefined) {
    throw ApplicationFailure.create({ /* unchanged guard */ });
  }

  const events = await withGenerationSpan(async (span) => {
    span.spanData.model = this.modelName;
    const input: InvokeModelStreamActivityInput = { /* unchanged construction */ };
    return this.activities.invokeModelStreamActivity(input);
  });

  for (const event of events) {
    yield fromSerializedStreamEvent(event);
  }
}

if you wanted this to wrap with withGenerationSpan as well, and then you'd have better parity of semantics?


const modelSummary = this.modelParams.summary;
if (modelSummary && typeof modelSummary !== 'string') {
const provider = modelSummary as ModelSummaryProvider;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So getResponse supports this dynamic ModelSummaryProvider branch (the executeWithOptions({ summary } below), but getStreamedResponse ignores modelParams.summary, so streamed model Activities always show the default summary in the UI. Do we want to fix that for parity?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants