Skip to content

feat: Telemetry tracing #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 16, 2025
4 changes: 3 additions & 1 deletion src/a2a/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
SetTaskPushNotificationConfigRequest,
SetTaskPushNotificationConfigResponse,
)
from a2a.utils.telemetry import SpanKind, trace_class


class A2ACardResolver:
Expand Down Expand Up @@ -59,6 +60,7 @@ async def get_agent_card(
) from e


@trace_class(kind=SpanKind.CLIENT)
class A2AClient:
"""A2A Client."""

Expand Down Expand Up @@ -111,7 +113,7 @@ async def send_message_streaming(
request: SendStreamingMessageRequest,
*,
http_kwargs: dict[str, Any] | None = None,
) -> AsyncGenerator[SendStreamingMessageResponse, None]:
) -> AsyncGenerator[SendStreamingMessageResponse]:
if not request.id:
request.id = str(uuid4())

Expand Down
2 changes: 2 additions & 0 deletions src/a2a/server/events/event_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
TaskStatusUpdateEvent,
)
from a2a.utils.errors import ServerError
from a2a.utils.telemetry import SpanKind, trace_class


logger = logging.getLogger(__name__)


@trace_class(kind=SpanKind.SERVER)
class EventConsumer:
"""Consumer to read events from the agent event queue."""

Expand Down
2 changes: 2 additions & 0 deletions src/a2a/server/events/event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
TaskArtifactUpdateEvent,
TaskStatusUpdateEvent,
)
from a2a.utils.telemetry import SpanKind, trace_class


logger = logging.getLogger(__name__)
Expand All @@ -26,6 +27,7 @@
)


@trace_class(kind=SpanKind.SERVER)
class EventQueue:
"""Event queue for A2A responses from agent."""

Expand Down
2 changes: 2 additions & 0 deletions src/a2a/server/events/in_memory_queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
QueueManager,
TaskQueueExists,
)
from a2a.utils.telemetry import SpanKind, trace_class


@trace_class(kind=SpanKind.SERVER)
class InMemoryQueueManager(QueueManager):
"""InMemoryQueueManager is used for a single binary management.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
UnsupportedOperationError,
)
from a2a.utils.errors import ServerError
from a2a.utils.telemetry import SpanKind, trace_class


logger = logging.getLogger(__name__)


@trace_class(kind=SpanKind.SERVER)
class DefaultRequestHandler(RequestHandler):
"""Default request handler for all incoming requests."""

Expand Down Expand Up @@ -128,7 +130,7 @@ async def on_message_send(
# agents.
queue = await self._queue_manager.create_or_tap(task_id)
result_aggregator = ResultAggregator(task_manager)
# TODO to manage the non-blocking flows.
# TODO: to manage the non-blocking flows.
producer_task = asyncio.create_task(
self._run_event_stream(
request_context,
Expand Down
6 changes: 4 additions & 2 deletions src/a2a/server/request_handlers/jsonrpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
)
from a2a.utils.errors import ServerError
from a2a.utils.helpers import validate
from a2a.utils.telemetry import SpanKind, trace_class


logger = logging.getLogger(__name__)


@trace_class(kind=SpanKind.SERVER)
class JSONRPCHandler:
"""A handler that maps the JSONRPC Objects to the request handler and back."""
"""Maps the JSONRPC Objects to the request handler and back."""

def __init__(
self,
Expand All @@ -53,7 +55,7 @@ def __init__(

Args:
agent_card: The AgentCard describing the agent's capabilities.
request_handler: The handler instance responsible for processing A2A requests.
request_handler: The handler instance to process A2A requests.
"""
self.agent_card = agent_card
self.request_handler = request_handler
Expand Down
3 changes: 3 additions & 0 deletions src/a2a/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
TextPart,
)
from a2a.utils.errors import ServerError, UnsupportedOperationError
from a2a.utils.telemetry import trace_function


logger = logging.getLogger(__name__)


@trace_function()
def create_task_obj(message_send_params: MessageSendParams) -> Task:
"""Create a new task object from message send params."""
if not message_send_params.message.contextId:
Expand All @@ -31,6 +33,7 @@ def create_task_obj(message_send_params: MessageSendParams) -> Task:
)


@trace_function()
def append_artifact_to_task(task: Task, event: TaskArtifactUpdateEvent) -> None:
"""Helper method for updating Task with new artifact data."""
if not task.artifacts:
Expand Down
18 changes: 14 additions & 4 deletions src/a2a/utils/telemetry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# type: ignore
"""OpenTelemetry Tracing Utilities for A2A Python SDK.

This module provides decorators to simplify the integration of OpenTelemetry
Expand Down Expand Up @@ -59,9 +58,12 @@ def internal_method(self):
import logging

from opentelemetry import trace
from opentelemetry.trace import SpanKind, StatusCode
from opentelemetry.trace import SpanKind as _SpanKind
from opentelemetry.trace import StatusCode


SpanKind = _SpanKind
__all__ = ['SpanKind']
INSTRUMENTING_MODULE_NAME = 'a2a-python-sdk'
INSTRUMENTING_MODULE_VERSION = '1.0.0'

Expand Down Expand Up @@ -210,7 +212,11 @@ def sync_wrapper(*args, **kwargs):
return async_wrapper if is_async_func else sync_wrapper


def trace_class(include_list: list[str] = None, exclude_list: list[str] = None):
def trace_class(
include_list: list[str] | None = None,
exclude_list: list[str] | None = None,
kind=SpanKind.INTERNAL,
):
"""A class decorator to automatically trace specified methods of a class.

This decorator iterates over the methods of a class and applies the
Expand Down Expand Up @@ -278,7 +284,11 @@ def decorator(cls):
all_methods[name] = method
span_name = f'{cls.__module__}.{cls.__name__}.{name}'
# Set the decorator on the method.
setattr(cls, name, trace_function(span_name=span_name)(method))
setattr(
cls,
name,
trace_function(span_name=span_name, kind=kind)(method),
)
return cls

return decorator
1 change: 1 addition & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.