Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions python/packages/devui/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,32 @@ agents/
└── .env # Optional: shared environment variables
```

### Importing from External Modules

If your agents import tools or utilities from sibling directories (e.g., `from tools.helpers import my_tool`), you must set `PYTHONPATH` to include the parent directory:

```bash
# Project structure:
# backend/
# ├── agents/
# │ └── my_agent/
# │ └── agent.py # contains: from tools.helpers import my_tool
# └── tools/
# └── helpers.py

# Run from project root with PYTHONPATH
cd backend
PYTHONPATH=. devui ./agents --port 8080
```

Without `PYTHONPATH`, Python cannot find modules in sibling directories and DevUI will report an import error.

## Viewing Telemetry (Otel Traces) in DevUI

Agent Framework emits OpenTelemetry (Otel) traces for various operations. You can view these traces in DevUI by enabling tracing when starting the server.

```bash
devui ./agents --tracing framework
devui ./agents --tracing
```

## OpenAI-Compatible API
Expand Down Expand Up @@ -196,11 +216,12 @@ Options:
--port, -p Port (default: 8080)
--host Host (default: 127.0.0.1)
--headless API only, no UI
--config YAML config file
--tracing none|framework|workflow|all
--no-open Don't automatically open browser
--tracing Enable OpenTelemetry tracing
--reload Enable auto-reload
--mode developer|user (default: developer)
--auth Enable Bearer token authentication
--auth-token Custom authentication token
```

### UI Modes
Expand Down
7 changes: 4 additions & 3 deletions python/packages/devui/agent_framework_devui/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,10 @@ def serve(
os.environ["ENABLE_SENSITIVE_DATA"] = "true"
logger.info("Set ENABLE_SENSITIVE_DATA=true for tracing")

if not os.environ.get("OTLP_ENDPOINT"):
os.environ["OTLP_ENDPOINT"] = "http://localhost:4317"
logger.info("Set OTLP_ENDPOINT=http://localhost:4317 for tracing")
# Note: We intentionally do NOT set a default OTLP_ENDPOINT here.
# If the user wants to export traces to an external collector, they
# should set OTLP_ENDPOINT explicitly. Otherwise, DevUI uses NoOp
# exporters to enable local trace capture without connection errors.

# Create server with direct parameters
server = DevServer(
Expand Down
72 changes: 68 additions & 4 deletions python/packages/devui/agent_framework_devui/_conversations.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,31 @@ async def list_conversations_by_metadata(self, metadata_filter: dict[str, str])
"""
pass

@abstractmethod
def add_trace(self, conversation_id: str, trace_event: dict[str, Any]) -> None:
"""Add a trace event to the conversation for context inspection.

Traces capture execution metadata like token usage, timing, and LLM context
that isn't stored in the AgentThread but is useful for debugging.

Args:
conversation_id: Conversation ID
trace_event: Trace event data (from ResponseTraceEvent.data)
"""
pass

@abstractmethod
def get_traces(self, conversation_id: str) -> list[dict[str, Any]]:
"""Get all trace events for a conversation.

Args:
conversation_id: Conversation ID

Returns:
List of trace event dicts, or empty list if not found
"""
pass


class InMemoryConversationStore(ConversationStore):
"""In-memory conversation storage wrapping AgentThread.
Expand Down Expand Up @@ -215,6 +240,7 @@ def create_conversation(
"metadata": metadata or {},
"created_at": created_at,
"items": [],
"traces": [], # Trace events for context inspection (token usage, timing, etc.)
}

# Initialize item index for this conversation
Expand Down Expand Up @@ -407,10 +433,20 @@ async def list_items(
elif content_type == "function_result":
# Function result - create separate ConversationItem
call_id = getattr(content, "call_id", None)
# Output is stored in additional_properties
output = ""
if hasattr(content, "additional_properties"):
output = content.additional_properties.get("output", "")
# Output is stored in the 'result' field of FunctionResultContent
result_value = getattr(content, "result", None)
# Convert result to string (it could be dict, list, or other types)
if result_value is None:
output = ""
elif isinstance(result_value, str):
output = result_value
else:
import json

try:
output = json.dumps(result_value)
except (TypeError, ValueError):
output = str(result_value)

if call_id:
function_results.append(
Expand Down Expand Up @@ -556,6 +592,34 @@ def get_thread(self, conversation_id: str) -> AgentThread | None:
conv_data = self._conversations.get(conversation_id)
return conv_data["thread"] if conv_data else None

def add_trace(self, conversation_id: str, trace_event: dict[str, Any]) -> None:
"""Add a trace event to the conversation for context inspection.

Traces capture execution metadata like token usage, timing, and LLM context
that isn't stored in the AgentThread but is useful for debugging.

Args:
conversation_id: Conversation ID
trace_event: Trace event data (from ResponseTraceEvent.data)
"""
conv_data = self._conversations.get(conversation_id)
if conv_data:
traces = conv_data.get("traces", [])
traces.append(trace_event)
conv_data["traces"] = traces

def get_traces(self, conversation_id: str) -> list[dict[str, Any]]:
"""Get all trace events for a conversation.

Args:
conversation_id: Conversation ID

Returns:
List of trace event dicts, or empty list if not found
"""
conv_data = self._conversations.get(conversation_id)
return conv_data.get("traces", []) if conv_data else []

async def list_conversations_by_metadata(self, metadata_filter: dict[str, str]) -> list[Conversation]:
"""Filter conversations by metadata (e.g., agent_id)."""
results = []
Expand Down
11 changes: 10 additions & 1 deletion python/packages/devui/agent_framework_devui/_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,16 @@ def _load_module_from_pattern(self, pattern: str) -> tuple[Any | None, Exception
logger.debug(f"Successfully imported {pattern}")
return module, None

except ModuleNotFoundError:
except ModuleNotFoundError as e:
# Distinguish between "module pattern doesn't exist" vs "module has import errors"
# If the missing module is the pattern itself, it's just not found (try next pattern)
# If the missing module is something else (a dependency), capture the error
missing_module = getattr(e, "name", None)
if missing_module and missing_module != pattern and not pattern.endswith(f".{missing_module}"):
# The module exists but has an import error (missing dependency)
logger.warning(f"Error importing {pattern}: {e}")
return None, e
# The module pattern itself doesn't exist - this is expected, try next pattern
logger.debug(f"Import pattern {pattern} not found")
return None, None
except Exception as e:
Expand Down
94 changes: 87 additions & 7 deletions python/packages/devui/agent_framework_devui/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,91 @@ def _setup_agent_framework_tracing(self) -> None:

# Only configure if not already executed
if not OBSERVABILITY_SETTINGS._executed_setup:
# Run the configure_otel_providers
# This ensures OTLP exporters are created even if env vars were set late
configure_otel_providers(enable_sensitive_data=True)
logger.info("Enabled Agent Framework observability")
# Get OTLP endpoint from standard env vars
otlp_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT")

if otlp_endpoint:
# User provided an OTLP endpoint - use it
configure_otel_providers(enable_sensitive_data=True)
logger.info(f"Enabled Agent Framework observability with OTLP endpoint: {otlp_endpoint}")
else:
# No OTLP endpoint - use NoOp exporters to enable tracing without
# console spam or failed connection attempts.
# DevUI's SimpleTraceCollector will still capture spans via the
# TracerProvider's span processors.
from ._tracing import NoOpLogExporter, NoOpMetricExporter, NoOpSpanExporter

configure_otel_providers(
enable_sensitive_data=True,
exporters=[NoOpSpanExporter(), NoOpLogExporter(), NoOpMetricExporter()], # type: ignore[list-item]
)
logger.info("Enabled Agent Framework observability with local-only tracing")
else:
logger.debug("Agent Framework observability already configured")
except Exception as e:
logger.warning(f"Failed to enable Agent Framework observability: {e}")
else:
logger.debug("ENABLE_INSTRUMENTATION not set, skipping observability setup")

async def _ensure_mcp_connections(self, agent: Any) -> None:
"""Ensure MCP tool connections are healthy before agent execution.

This is a workaround for an Agent Framework bug where MCP tool connections
can become stale (underlying streams closed) but is_connected remains True.
This happens when HTTP streaming responses end and GeneratorExit propagates.

This method detects stale connections and reconnects them. It's designed to
be a no-op once the Agent Framework fixes this issue upstream.

Args:
agent: Agent object that may have MCP tools
"""
if not hasattr(agent, "_local_mcp_tools"):
return

for mcp_tool in agent._local_mcp_tools:
if not getattr(mcp_tool, "is_connected", False):
continue

tool_name = getattr(mcp_tool, "name", "unknown")

try:
# Check if underlying write stream is closed
session = getattr(mcp_tool, "session", None)
if session is None:
continue

write_stream = getattr(session, "_write_stream", None)
if write_stream is None:
continue

# Detect stale connection: is_connected=True but stream is closed
is_closed = getattr(write_stream, "_closed", False)
if not is_closed:
continue # Connection is healthy

# Stale connection detected - reconnect
logger.warning(f"MCP tool '{tool_name}' has stale connection (stream closed), reconnecting...")

# Clean up old connection
try:
if hasattr(mcp_tool, "close"):
await mcp_tool.close()
except Exception as close_err:
logger.debug(f"Error closing stale MCP tool '{tool_name}': {close_err}")
# Force reset state
mcp_tool.is_connected = False
mcp_tool.session = None

# Reconnect
if hasattr(mcp_tool, "connect"):
await mcp_tool.connect()
logger.info(f"MCP tool '{tool_name}' reconnected successfully")

except Exception as e:
# If detection fails, log and continue - let it fail naturally during execution
logger.debug(f"Error checking MCP tool '{tool_name}' connection: {e}")

async def discover_entities(self) -> list[EntityInfo]:
"""Discover all available entities.

Expand Down Expand Up @@ -192,11 +266,11 @@ async def execute_entity(self, entity_id: str, request: AgentFrameworkRequest) -

logger.info(f"Executing {entity_info.type}: {entity_id}")

# Extract session_id from request for trace context
session_id = getattr(request.extra_body, "session_id", None) if request.extra_body else None
# Extract response_id from request for trace context (added by _server.py)
response_id = request.extra_body.get("response_id") if request.extra_body else None

# Use simplified trace capture
with capture_traces(session_id=session_id, entity_id=entity_id) as trace_collector:
with capture_traces(response_id=response_id, entity_id=entity_id) as trace_collector:
if entity_info.type == "agent":
async for event in self._execute_agent(entity_obj, request, trace_collector):
yield event
Expand Down Expand Up @@ -260,6 +334,12 @@ async def _execute_agent(
logger.debug(f"Executing agent with text input: {user_message[:100]}...")
else:
logger.debug(f"Executing agent with multimodal ChatMessage: {type(user_message)}")

# Workaround for MCP tool stale connection bug (GitHub issue pending)
# When HTTP streaming ends, GeneratorExit can close MCP stdio streams
# but is_connected stays True. Detect and reconnect before execution.
await self._ensure_mcp_connections(agent)

# Check if agent supports streaming
if hasattr(agent, "run_stream") and callable(agent.run_stream):
# Use Agent Framework's native streaming with optional thread
Expand Down
Loading
Loading