Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,4 @@ local.settings.json

# Database files
*.db
python/dotnet-ref
63 changes: 61 additions & 2 deletions python/packages/core/agent_framework/_workflows/_edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ class Edge(DictConvertible):
serialising the edge down to primitives we can reconstruct the topology of
a workflow irrespective of the original Python process.

Edges support two types of conditions:
- Sync conditions: `condition(data) -> bool` - evaluated against message data only
- Async conditions: `async_condition(data, shared_state) -> bool` - evaluated against
message data AND shared state, allowing state-aware routing decisions

Examples:
.. code-block:: python

Expand All @@ -85,6 +90,7 @@ class Edge(DictConvertible):
target_id: str
condition_name: str | None
_condition: Callable[[Any], bool] | None = field(default=None, repr=False, compare=False)
_async_condition: Callable[[Any, Any], Any] | None = field(default=None, repr=False, compare=False)

def __init__(
self,
Expand All @@ -93,6 +99,7 @@ def __init__(
condition: Callable[[Any], bool] | None = None,
*,
condition_name: str | None = None,
async_condition: Callable[[Any, Any], Any] | None = None,
) -> None:
"""Initialize a fully-specified edge between two workflow executors.

Expand All @@ -110,6 +117,10 @@ def __init__(
Optional override that pins a human-friendly name for the condition
when the callable cannot be introspected (for example after
deserialization).
async_condition:
Optional async predicate that receives (message_data, shared_state) and
returns `True` when the edge should be traversed. This is evaluated
instead of `condition` when present, allowing state-aware routing.

Examples:
.. code-block:: python
Expand All @@ -125,7 +136,14 @@ def __init__(
self.source_id = source_id
self.target_id = target_id
self._condition = condition
self.condition_name = _extract_function_name(condition) if condition is not None else condition_name
self._async_condition = async_condition
# Use async_condition name if provided, otherwise sync condition name
if async_condition is not None:
self.condition_name = _extract_function_name(async_condition) if condition_name is None else condition_name
elif condition is not None:
self.condition_name = _extract_function_name(condition) if condition_name is None else condition_name
else:
self.condition_name = condition_name

@property
def id(self) -> str:
Expand All @@ -144,6 +162,15 @@ def id(self) -> str:
"""
return f"{self.source_id}{self.ID_SEPARATOR}{self.target_id}"

@property
def has_async_condition(self) -> bool:
"""Check if this edge has an async state-aware condition.

Returns True if the edge was configured with an async_condition that
requires shared state for evaluation.
"""
return self._async_condition is not None

def should_route(self, data: Any) -> bool:
"""Evaluate the edge predicate against an incoming payload.

Expand All @@ -153,17 +180,41 @@ def should_route(self, data: Any) -> bool:
this edge. Any exception raised by the callable is deliberately allowed
to surface to the caller to avoid masking logic bugs.

Note: If the edge has an async_condition, this method returns True and
the actual routing decision must be made via should_route_async().

Examples:
.. code-block:: python

edge = Edge("stage1", "stage2", condition=lambda payload: payload["score"] > 0.8)
assert edge.should_route({"score": 0.9}) is True
assert edge.should_route({"score": 0.4}) is False
"""
# If there's an async condition, defer to should_route_async
if self._async_condition is not None:
return True # Let the async check handle it
if self._condition is None:
return True
return self._condition(data)

async def should_route_async(self, data: Any, shared_state: Any) -> bool:
"""Evaluate the async edge predicate against payload and shared state.

This method is used for edges with async_condition that need access to
shared state for routing decisions (e.g., declarative workflow conditions).

Args:
data: The message payload
shared_state: The workflow's shared state

Returns:
True if the edge should be traversed, False otherwise.
"""
if self._async_condition is not None:
return bool(await self._async_condition(data, shared_state))
# Fall back to sync condition
return self.should_route(data)

def to_dict(self) -> dict[str, Any]:
"""Produce a JSON-serialisable view of the edge metadata.

Expand Down Expand Up @@ -445,17 +496,25 @@ def __init__(
target_id: str,
condition: Callable[[Any], bool] | None = None,
*,
async_condition: Callable[[Any, Any], Any] | None = None,
id: str | None = None,
) -> None:
"""Create a one-to-one edge group between two executors.

Args:
source_id: The source executor ID.
target_id: The target executor ID.
condition: Optional sync condition function (data) -> bool.
async_condition: Optional async condition function (data, shared_state) -> bool.
id: Optional explicit ID for the edge group.

Examples:
.. code-block:: python

group = SingleEdgeGroup("ingest", "validate")
assert group.edges[0].source_id == "ingest"
"""
edge = Edge(source_id=source_id, target_id=target_id, condition=condition)
edge = Edge(source_id=source_id, target_id=target_id, condition=condition, async_condition=async_condition)
super().__init__([edge], id=id, type=self.__class__.__name__)


Expand Down
30 changes: 24 additions & 6 deletions python/packages/core/agent_framework/_workflows/_edge_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,13 @@ async def send_message(self, message: Message, shared_state: SharedState, ctx: R
return False

if self._can_handle(self._edge.target_id, message):
if self._edge.should_route(message.data):
# Use async routing if edge has async condition, otherwise sync
if self._edge.has_async_condition:
route_result = await self._edge.should_route_async(message.data, shared_state)
else:
route_result = self._edge.should_route(message.data)

if route_result:
span.set_attributes({
OtelAttr.EDGE_GROUP_DELIVERED: True,
OtelAttr.EDGE_GROUP_DELIVERY_STATUS: EdgeGroupDeliveryStatus.DELIVERED.value,
Expand Down Expand Up @@ -162,8 +168,8 @@ def __init__(self, edge_group: FanOutEdgeGroup, executors: dict[str, Executor])

async def send_message(self, message: Message, shared_state: SharedState, ctx: RunnerContext) -> bool:
"""Send a message through all edges in the fan-out edge group."""
deliverable_edges = []
single_target_edge = None
deliverable_edges: list[Edge] = []
single_target_edge: Edge | None = None
# Process routing logic within span
with create_edge_group_processing_span(
self._edge_group.__class__.__name__,
Expand Down Expand Up @@ -192,7 +198,13 @@ async def send_message(self, message: Message, shared_state: SharedState, ctx: R
if message.target_id in selection_results:
edge = self._target_map.get(message.target_id)
if edge and self._can_handle(edge.target_id, message):
if edge.should_route(message.data):
# Use async routing if edge has async condition
if edge.has_async_condition:
route_result = await edge.should_route_async(message.data, shared_state)
else:
route_result = edge.should_route(message.data)

if route_result:
span.set_attributes({
OtelAttr.EDGE_GROUP_DELIVERED: True,
OtelAttr.EDGE_GROUP_DELIVERY_STATUS: EdgeGroupDeliveryStatus.DELIVERED.value,
Expand Down Expand Up @@ -223,8 +235,14 @@ async def send_message(self, message: Message, shared_state: SharedState, ctx: R
# If no target ID, send the message to the selected targets
for target_id in selection_results:
edge = self._target_map[target_id]
if self._can_handle(edge.target_id, message) and edge.should_route(message.data):
deliverable_edges.append(edge)
if self._can_handle(edge.target_id, message):
# Use async routing if edge has async condition
if edge.has_async_condition:
route_result = await edge.should_route_async(message.data, shared_state)
else:
route_result = edge.should_route(message.data)
if route_result:
deliverable_edges.append(edge)

if len(deliverable_edges) > 0:
span.set_attributes({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ def add_edge(
source: Executor | AgentProtocol | str,
target: Executor | AgentProtocol | str,
condition: Callable[[Any], bool] | None = None,
async_condition: Callable[[Any, Any], Any] | None = None,
) -> Self:
"""Add a directed edge between two executors.

Expand All @@ -457,12 +458,15 @@ def add_edge(
source: The source executor or registered name of the source factory for the edge.
target: The target executor or registered name of the target factory for the edge.
condition: An optional condition function that determines whether the edge
should be traversed based on the message.
should be traversed based on the message type.
async_condition: An optional async condition function that receives
(message_data, shared_state) and determines whether the edge
should be traversed. Takes precedence over condition when present.

Note: If instances are provided for both source and target, they will be shared across
all workflow instances created from the built Workflow. To avoid this, consider
registering the executors and agents using `register_executor` and `register_agent`
and referencing them by factory name for lazy initialization instead.
Note: If instances are provided for both source and target, they will be shared across
all workflow instances created from the built Workflow. To avoid this, consider
registering the executors and agents using `register_executor` and `register_agent`
and referencing them by factory name for lazy initialization instead.

Returns:
Self: The WorkflowBuilder instance for method chaining.
Expand Down Expand Up @@ -529,7 +533,7 @@ def only_large_numbers(msg: int) -> bool:
target_exec = self._maybe_wrap_agent(target) # type: ignore[arg-type]
source_id = self._add_executor(source_exec)
target_id = self._add_executor(target_exec)
self._edge_groups.append(SingleEdgeGroup(source_id, target_id, condition)) # type: ignore[call-arg]
self._edge_groups.append(SingleEdgeGroup(source_id, target_id, condition, async_condition=async_condition)) # type: ignore[call-arg]
return self

def add_fan_out_edges(
Expand Down
6 changes: 4 additions & 2 deletions python/packages/core/agent_framework/openai/_chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,10 @@ def _openai_chat_message_parser(self, message: ChatMessage) -> list[dict[str, An
args["tool_calls"] = [self._openai_content_parser(content)] # type: ignore
case FunctionResultContent():
args["tool_call_id"] = content.call_id
if content.result is not None:
args["content"] = prepare_function_call_results(content.result)
# Always include content for tool results, even if None (API requires it)
args["content"] = (
prepare_function_call_results(content.result) if content.result is not None else ""
)
case _:
if "content" not in args:
args["content"] = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,24 @@
from importlib import metadata

from ._loader import AgentFactory, DeclarativeLoaderError, ProviderLookupError, ProviderTypeMapping
from ._workflows import DeclarativeWorkflowError, WorkflowFactory, WorkflowState
from ._workflows._graph import AgentInvocationError, ExternalInputRequest, ExternalInputResponse

try:
__version__ = metadata.version(__name__)
except metadata.PackageNotFoundError:
__version__ = "0.0.0" # Fallback for development mode

__all__ = ["AgentFactory", "DeclarativeLoaderError", "ProviderLookupError", "ProviderTypeMapping", "__version__"]
__all__ = [
"AgentFactory",
"AgentInvocationError",
"DeclarativeLoaderError",
"DeclarativeWorkflowError",
"ExternalInputRequest",
"ExternalInputResponse",
"ProviderLookupError",
"ProviderTypeMapping",
"WorkflowFactory",
"WorkflowState",
"__version__",
]
Loading
Loading