diff --git a/src/a2a/client/client.py b/src/a2a/client/client.py index 2f032707..4703691e 100644 --- a/src/a2a/client/client.py +++ b/src/a2a/client/client.py @@ -24,6 +24,7 @@ SetTaskPushNotificationConfigRequest, SetTaskPushNotificationConfigResponse, ) +from a2a.utils.telemetry import SpanKind, trace_class class A2ACardResolver: @@ -59,6 +60,7 @@ async def get_agent_card( ) from e +@trace_class(kind=SpanKind.CLIENT) class A2AClient: """A2A Client.""" @@ -111,7 +113,7 @@ async def send_message_streaming( request: SendStreamingMessageRequest, *, http_kwargs: dict[str, Any] | None = None, - ) -> AsyncGenerator[SendStreamingMessageResponse, None]: + ) -> AsyncGenerator[SendStreamingMessageResponse]: if not request.id: request.id = str(uuid4()) diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index 73cc9c75..05f1dde4 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -12,11 +12,13 @@ TaskStatusUpdateEvent, ) from a2a.utils.errors import ServerError +from a2a.utils.telemetry import SpanKind, trace_class logger = logging.getLogger(__name__) +@trace_class(kind=SpanKind.SERVER) class EventConsumer: """Consumer to read events from the agent event queue.""" diff --git a/src/a2a/server/events/event_queue.py b/src/a2a/server/events/event_queue.py index ec651761..fbe63822 100644 --- a/src/a2a/server/events/event_queue.py +++ b/src/a2a/server/events/event_queue.py @@ -11,6 +11,7 @@ TaskArtifactUpdateEvent, TaskStatusUpdateEvent, ) +from a2a.utils.telemetry import SpanKind, trace_class logger = logging.getLogger(__name__) @@ -26,6 +27,7 @@ ) +@trace_class(kind=SpanKind.SERVER) class EventQueue: """Event queue for A2A responses from agent.""" diff --git a/src/a2a/server/events/in_memory_queue_manager.py b/src/a2a/server/events/in_memory_queue_manager.py index 9d4a135b..c2b07937 100644 --- a/src/a2a/server/events/in_memory_queue_manager.py +++ b/src/a2a/server/events/in_memory_queue_manager.py @@ -6,8 +6,10 @@ QueueManager, TaskQueueExists, ) +from a2a.utils.telemetry import SpanKind, trace_class +@trace_class(kind=SpanKind.SERVER) class InMemoryQueueManager(QueueManager): """InMemoryQueueManager is used for a single binary management. diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index bf616c0c..7c3b2de5 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -27,11 +27,13 @@ UnsupportedOperationError, ) from a2a.utils.errors import ServerError +from a2a.utils.telemetry import SpanKind, trace_class logger = logging.getLogger(__name__) +@trace_class(kind=SpanKind.SERVER) class DefaultRequestHandler(RequestHandler): """Default request handler for all incoming requests.""" @@ -128,7 +130,7 @@ async def on_message_send( # agents. queue = await self._queue_manager.create_or_tap(task_id) result_aggregator = ResultAggregator(task_manager) - # TODO to manage the non-blocking flows. + # TODO: to manage the non-blocking flows. producer_task = asyncio.create_task( self._run_event_stream( request_context, diff --git a/src/a2a/server/request_handlers/jsonrpc_handler.py b/src/a2a/server/request_handlers/jsonrpc_handler.py index d3192f59..0475ab12 100644 --- a/src/a2a/server/request_handlers/jsonrpc_handler.py +++ b/src/a2a/server/request_handlers/jsonrpc_handler.py @@ -36,13 +36,15 @@ ) from a2a.utils.errors import ServerError from a2a.utils.helpers import validate +from a2a.utils.telemetry import SpanKind, trace_class logger = logging.getLogger(__name__) +@trace_class(kind=SpanKind.SERVER) class JSONRPCHandler: - """A handler that maps the JSONRPC Objects to the request handler and back.""" + """Maps the JSONRPC Objects to the request handler and back.""" def __init__( self, @@ -53,7 +55,7 @@ def __init__( Args: agent_card: The AgentCard describing the agent's capabilities. - request_handler: The handler instance responsible for processing A2A requests. + request_handler: The handler instance to process A2A requests. """ self.agent_card = agent_card self.request_handler = request_handler diff --git a/src/a2a/utils/helpers.py b/src/a2a/utils/helpers.py index 6d838d98..651c37ea 100644 --- a/src/a2a/utils/helpers.py +++ b/src/a2a/utils/helpers.py @@ -13,11 +13,13 @@ TextPart, ) from a2a.utils.errors import ServerError, UnsupportedOperationError +from a2a.utils.telemetry import trace_function logger = logging.getLogger(__name__) +@trace_function() def create_task_obj(message_send_params: MessageSendParams) -> Task: """Create a new task object from message send params.""" if not message_send_params.message.contextId: @@ -31,6 +33,7 @@ def create_task_obj(message_send_params: MessageSendParams) -> Task: ) +@trace_function() def append_artifact_to_task(task: Task, event: TaskArtifactUpdateEvent) -> None: """Helper method for updating Task with new artifact data.""" if not task.artifacts: diff --git a/src/a2a/utils/telemetry.py b/src/a2a/utils/telemetry.py index 926e3771..29843f74 100644 --- a/src/a2a/utils/telemetry.py +++ b/src/a2a/utils/telemetry.py @@ -1,4 +1,3 @@ -# type: ignore """OpenTelemetry Tracing Utilities for A2A Python SDK. This module provides decorators to simplify the integration of OpenTelemetry @@ -59,9 +58,12 @@ def internal_method(self): import logging from opentelemetry import trace -from opentelemetry.trace import SpanKind, StatusCode +from opentelemetry.trace import SpanKind as _SpanKind +from opentelemetry.trace import StatusCode +SpanKind = _SpanKind +__all__ = ['SpanKind'] INSTRUMENTING_MODULE_NAME = 'a2a-python-sdk' INSTRUMENTING_MODULE_VERSION = '1.0.0' @@ -210,7 +212,11 @@ def sync_wrapper(*args, **kwargs): return async_wrapper if is_async_func else sync_wrapper -def trace_class(include_list: list[str] = None, exclude_list: list[str] = None): +def trace_class( + include_list: list[str] | None = None, + exclude_list: list[str] | None = None, + kind=SpanKind.INTERNAL, +): """A class decorator to automatically trace specified methods of a class. This decorator iterates over the methods of a class and applies the @@ -278,7 +284,11 @@ def decorator(cls): all_methods[name] = method span_name = f'{cls.__module__}.{cls.__name__}.{name}' # Set the decorator on the method. - setattr(cls, name, trace_function(span_name=span_name)(method)) + setattr( + cls, + name, + trace_function(span_name=span_name, kind=kind)(method), + ) return cls return decorator diff --git a/uv.lock b/uv.lock index 07b8c11a..ec7e65b3 100644 --- a/uv.lock +++ b/uv.lock @@ -2003,4 +2003,5 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/02/90/2633473864f67a15526324b007a9f96c96f56d5f32ef2a56cc12f9548723/zstandard-0.23.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:fa6ce8b52c5987b3e34d5674b0ab529a4602b632ebab0a93b07bfb4dfc8f8a33", size = 5191299 }, { url = "https://files.pythonhosted.org/packages/b0/4c/315ca5c32da7e2dc3455f3b2caee5c8c2246074a61aac6ec3378a97b7136/zstandard-0.23.0-cp313-cp313-win32.whl", hash = "sha256:a9b07268d0c3ca5c170a385a0ab9fb7fdd9f5fd866be004c4ea39e44edce47dd", size = 430862 }, { url = "https://files.pythonhosted.org/packages/a2/bf/c6aaba098e2d04781e8f4f7c0ba3c7aa73d00e4c436bcc0cf059a66691d1/zstandard-0.23.0-cp313-cp313-win_amd64.whl", hash = "sha256:f3513916e8c645d0610815c257cbfd3242adfd5c4cfa78be514e5a3ebb42a41b", size = 495578 }, + ]