Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions .github/workflows/python-merge-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,13 @@ jobs:
-m "not integration"
--timeout=120 --session-timeout=900 --timeout_method thread
--retries 2 --retry-delay 5
--junitxml=pytest.xml
working-directory: ./python
- name: Surface failing tests
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
Expand Down Expand Up @@ -163,6 +164,7 @@ jobs:
-n logical --dist worksteal
--timeout=120 --session-timeout=900 --timeout_method thread
--retries 2 --retry-delay 5
--junitxml=pytest.xml
working-directory: ./python
- name: Test OpenAI samples
timeout-minutes: 10
Expand All @@ -173,7 +175,7 @@ jobs:
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
Expand Down Expand Up @@ -225,6 +227,7 @@ jobs:
-n logical --dist worksteal
--timeout=120 --session-timeout=900 --timeout_method thread
--retries 2 --retry-delay 5
--junitxml=pytest.xml
working-directory: ./python
- name: Test Azure samples
timeout-minutes: 10
Expand All @@ -235,7 +238,7 @@ jobs:
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
Expand Down Expand Up @@ -285,6 +288,7 @@ jobs:
-n logical --dist worksteal
--timeout=120 --session-timeout=900 --timeout_method thread
--retries 2 --retry-delay 5
--junitxml=pytest.xml
working-directory: ./python
- name: Stop local MCP server
if: always()
Expand All @@ -310,7 +314,7 @@ jobs:
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
Expand Down Expand Up @@ -375,12 +379,13 @@ jobs:
-x
--timeout=360 --session-timeout=900 --timeout_method thread
--retries 2 --retry-delay 5
--junitxml=pytest.xml
working-directory: ./python
- name: Surface failing tests
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
Expand Down Expand Up @@ -430,12 +435,13 @@ jobs:
-n logical --dist worksteal
--timeout=120 --session-timeout=900 --timeout_method thread
--retries 2 --retry-delay 5
--junitxml=pytest.xml
working-directory: ./python
- name: Surface failing tests
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
Expand Down Expand Up @@ -489,13 +495,13 @@ jobs:
echo "Cosmos DB emulator did not become ready in time." >&2
exit 1
- name: Test with pytest (Cosmos integration)
run: uv run --directory packages/azure-cosmos poe integration-tests -n logical --dist worksteal --timeout=120 --session-timeout=900 --timeout_method thread --retries 2 --retry-delay 5
run: uv run --directory packages/azure-cosmos poe integration-tests -n logical --dist worksteal --timeout=120 --session-timeout=900 --timeout_method thread --retries 2 --retry-delay 5 --junitxml=pytest.xml
working-directory: ./python
- name: Surface failing tests
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/python-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ jobs:
UV_CACHE_DIR: /tmp/.uv-cache
# Unit tests
- name: Run all tests
run: uv run poe test -A
run: uv run poe test -A --junitxml=pytest.xml
working-directory: ./python

# Surface failing tests
- name: Surface failing tests
if: always()
uses: pmeier/pytest-results-action@v0.7.2
with:
path: ./python/**.xml
path: ./python/pytest.xml
summary: true
display-options: fEX
fail-on-empty: false
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ htmlcov/
.cache
nosetests.xml
coverage.xml
pytest.xml
python-coverage.xml
*.cover
*.py,cover
.hypothesis/
Expand Down
5 changes: 5 additions & 0 deletions python/packages/ag-ui/agent_framework_ag_ui/_agent_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from ._run_common import (
FlowState,
_build_run_finished_event, # type: ignore
_close_reasoning_block, # type: ignore
_emit_content, # type: ignore
_extract_resume_payload, # type: ignore
_has_only_tool_calls, # type: ignore
Expand Down Expand Up @@ -1058,6 +1059,10 @@ async def run_agent_stream(
}
)

# Close any open reasoning block
for event in _close_reasoning_block(flow):
yield event

# Close any open message
if flow.message_id:
logger.debug(f"End of run: closing text message message_id={flow.message_id}")
Expand Down
106 changes: 82 additions & 24 deletions python/packages/ag-ui/agent_framework_ag_ui/_run_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class FlowState:
interrupts: list[dict[str, Any]] = field(default_factory=list) # pyright: ignore[reportUnknownVariableType]
reasoning_messages: list[dict[str, Any]] = field(default_factory=list) # pyright: ignore[reportUnknownVariableType]
accumulated_reasoning: dict[str, str] = field(default_factory=dict) # pyright: ignore[reportUnknownVariableType]
reasoning_message_id: str | None = None

def get_tool_name(self, call_id: str | None) -> str | None:
"""Get tool name by call ID."""
Expand Down Expand Up @@ -462,12 +463,39 @@ def _emit_mcp_tool_result(
return _emit_tool_result_common(content.call_id, raw_output, flow, predictive_handler)


def _close_reasoning_block(flow: FlowState) -> list[BaseEvent]:
"""Close an open reasoning block, emitting end events.

Should be called when the reasoning block is complete -- e.g. when
non-reasoning content arrives or at end of a run.
"""
if not flow.reasoning_message_id:
return []
message_id = flow.reasoning_message_id
flow.reasoning_message_id = None
return [
ReasoningMessageEndEvent(message_id=message_id),
ReasoningEndEvent(message_id=message_id),
]


def _emit_text_reasoning(content: Content, flow: FlowState | None = None) -> list[BaseEvent]:
"""Emit AG-UI reasoning events for text_reasoning content.

Uses the protocol-defined reasoning event types so that AG-UI consumers
such as CopilotKit can render reasoning natively.

When *flow* is provided the function follows the streaming pattern: it
emits ``ReasoningStartEvent`` / ``ReasoningMessageStartEvent`` only on
the first delta for a given ``message_id`` and just
``ReasoningMessageContentEvent`` for subsequent deltas. The matching
``ReasoningMessageEndEvent`` / ``ReasoningEndEvent`` are deferred until
``_close_reasoning_block`` is called (e.g. when non-reasoning content
arrives or at end-of-run).

Without *flow* (backward-compat) the full Start→Content→End sequence is
emitted for every call.

Only ``content.text`` is used for the visible reasoning message. If
``content.protected_data`` is present it is emitted as a
``ReasoningEncryptedValueEvent`` so that consumers can persist encrypted
Expand All @@ -483,26 +511,49 @@ def _emit_text_reasoning(content: Content, flow: FlowState | None = None) -> lis

message_id = content.id or generate_event_id()

events: list[BaseEvent] = [
ReasoningStartEvent(message_id=message_id),
ReasoningMessageStartEvent(message_id=message_id, role="assistant"),
]
events: list[BaseEvent] = []

if text:
events.append(ReasoningMessageContentEvent(message_id=message_id, delta=text))
if flow is not None:
# Streaming mode: track open reasoning block in flow state.
if flow.reasoning_message_id != message_id:
# Close any previously open reasoning block (different message_id).
events.extend(_close_reasoning_block(flow))
# Open new reasoning block.
events.append(ReasoningStartEvent(message_id=message_id))
events.append(ReasoningMessageStartEvent(message_id=message_id, role="assistant"))
flow.reasoning_message_id = message_id

events.append(ReasoningMessageEndEvent(message_id=message_id))
if text:
events.append(ReasoningMessageContentEvent(message_id=message_id, delta=text))

if content.protected_data is not None:
events.append(
ReasoningEncryptedValueEvent(
subtype="message",
entity_id=message_id,
encrypted_value=content.protected_data,
)
)
else:
# No flow -- backward-compatible full sequence per call.
events.append(ReasoningStartEvent(message_id=message_id))
events.append(ReasoningMessageStartEvent(message_id=message_id, role="assistant"))

if content.protected_data is not None:
events.append(
ReasoningEncryptedValueEvent(
subtype="message",
entity_id=message_id,
encrypted_value=content.protected_data,
if text:
events.append(ReasoningMessageContentEvent(message_id=message_id, delta=text))

events.append(ReasoningMessageEndEvent(message_id=message_id))

if content.protected_data is not None:
events.append(
ReasoningEncryptedValueEvent(
subtype="message",
entity_id=message_id,
encrypted_value=content.protected_data,
)
)
)

events.append(ReasoningEndEvent(message_id=message_id))
events.append(ReasoningEndEvent(message_id=message_id))

# Persist reasoning into flow state for MESSAGES_SNAPSHOT.
# Accumulate reasoning text per message_id, similar to flow.accumulated_text,
Expand Down Expand Up @@ -546,23 +597,30 @@ def _emit_content(
) -> list[BaseEvent]:
"""Emit appropriate events for any content type."""
content_type = getattr(content, "type", None)

# Close open reasoning block when switching to non-reasoning content.
if content_type != "text_reasoning":
events = _close_reasoning_block(flow)
else:
events = []

if content_type == "text":
return _emit_text(content, flow, skip_text)
return events + _emit_text(content, flow, skip_text)
if content_type == "function_call":
return _emit_tool_call(content, flow, predictive_handler)
return events + _emit_tool_call(content, flow, predictive_handler)
if content_type == "function_result":
return _emit_tool_result(content, flow, predictive_handler)
return events + _emit_tool_result(content, flow, predictive_handler)
if content_type == "function_approval_request":
return _emit_approval_request(content, flow, predictive_handler, require_confirmation)
return events + _emit_approval_request(content, flow, predictive_handler, require_confirmation)
if content_type == "usage":
return _emit_usage(content)
return events + _emit_usage(content)
if content_type == "oauth_consent_request":
return _emit_oauth_consent(content)
return events + _emit_oauth_consent(content)
if content_type == "mcp_server_tool_call":
return _emit_mcp_tool_call(content, flow)
return events + _emit_mcp_tool_call(content, flow)
if content_type == "mcp_server_tool_result":
return _emit_mcp_tool_result(content, flow, predictive_handler)
return events + _emit_mcp_tool_result(content, flow, predictive_handler)
if content_type == "text_reasoning":
return _emit_text_reasoning(content, flow)
logger.debug("Skipping unsupported content type in AG-UI emitter: %s", content_type)
return []
return events
4 changes: 4 additions & 0 deletions python/packages/ag-ui/agent_framework_ag_ui/_workflow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ._run_common import (
FlowState,
_build_run_finished_event,
_close_reasoning_block,
_emit_content,
_extract_resume_payload,
_normalize_resume_interrupts,
Expand Down Expand Up @@ -729,6 +730,9 @@ def _drain_open_message() -> list[TextMessageEndEvent]:
run_error_emitted = True
terminal_emitted = True

for reasoning_evt in _close_reasoning_block(flow):
yield reasoning_evt

for end_event in _drain_open_message():
yield end_event

Expand Down
Loading
Loading