Skip to content

Commit 4e95787

Browse files
rajeshvelichetiholtskinnerkthota-g
authored
feat: Telemetry tracing (#17)
* Open Telemetry for A2A-Python-SDK * Open Telemetry for A2A-python-sdk, enable trace * Telemetry-tracing with conflict resolution * Formatting * feat: Telemetry in A2A Python SDK * feat: Telemetry in A2A Python SDK * feat: Telemetry in A2A Python SDK --------- Co-authored-by: Holt Skinner <[email protected]> Co-authored-by: Holt Skinner <[email protected]> Co-authored-by: kthota-g <[email protected]>
1 parent 16dbf22 commit 4e95787

File tree

9 files changed

+34
-8
lines changed

9 files changed

+34
-8
lines changed

src/a2a/client/client.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
SetTaskPushNotificationConfigRequest,
2525
SetTaskPushNotificationConfigResponse,
2626
)
27+
from a2a.utils.telemetry import SpanKind, trace_class
2728

2829

2930
class A2ACardResolver:
@@ -59,6 +60,7 @@ async def get_agent_card(
5960
) from e
6061

6162

63+
@trace_class(kind=SpanKind.CLIENT)
6264
class A2AClient:
6365
"""A2A Client."""
6466

@@ -111,7 +113,7 @@ async def send_message_streaming(
111113
request: SendStreamingMessageRequest,
112114
*,
113115
http_kwargs: dict[str, Any] | None = None,
114-
) -> AsyncGenerator[SendStreamingMessageResponse, None]:
116+
) -> AsyncGenerator[SendStreamingMessageResponse]:
115117
if not request.id:
116118
request.id = str(uuid4())
117119

src/a2a/server/events/event_consumer.py

+2
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
TaskStatusUpdateEvent,
1313
)
1414
from a2a.utils.errors import ServerError
15+
from a2a.utils.telemetry import SpanKind, trace_class
1516

1617

1718
logger = logging.getLogger(__name__)
1819

1920

21+
@trace_class(kind=SpanKind.SERVER)
2022
class EventConsumer:
2123
"""Consumer to read events from the agent event queue."""
2224

src/a2a/server/events/event_queue.py

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
TaskArtifactUpdateEvent,
1212
TaskStatusUpdateEvent,
1313
)
14+
from a2a.utils.telemetry import SpanKind, trace_class
1415

1516

1617
logger = logging.getLogger(__name__)
@@ -26,6 +27,7 @@
2627
)
2728

2829

30+
@trace_class(kind=SpanKind.SERVER)
2931
class EventQueue:
3032
"""Event queue for A2A responses from agent."""
3133

src/a2a/server/events/in_memory_queue_manager.py

+2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
QueueManager,
77
TaskQueueExists,
88
)
9+
from a2a.utils.telemetry import SpanKind, trace_class
910

1011

12+
@trace_class(kind=SpanKind.SERVER)
1113
class InMemoryQueueManager(QueueManager):
1214
"""InMemoryQueueManager is used for a single binary management.
1315

src/a2a/server/request_handlers/default_request_handler.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
UnsupportedOperationError,
2828
)
2929
from a2a.utils.errors import ServerError
30+
from a2a.utils.telemetry import SpanKind, trace_class
3031

3132

3233
logger = logging.getLogger(__name__)
3334

3435

36+
@trace_class(kind=SpanKind.SERVER)
3537
class DefaultRequestHandler(RequestHandler):
3638
"""Default request handler for all incoming requests."""
3739

@@ -128,7 +130,7 @@ async def on_message_send(
128130
# agents.
129131
queue = await self._queue_manager.create_or_tap(task_id)
130132
result_aggregator = ResultAggregator(task_manager)
131-
# TODO to manage the non-blocking flows.
133+
# TODO: to manage the non-blocking flows.
132134
producer_task = asyncio.create_task(
133135
self._run_event_stream(
134136
request_context,

src/a2a/server/request_handlers/jsonrpc_handler.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@
3636
)
3737
from a2a.utils.errors import ServerError
3838
from a2a.utils.helpers import validate
39+
from a2a.utils.telemetry import SpanKind, trace_class
3940

4041

4142
logger = logging.getLogger(__name__)
4243

4344

45+
@trace_class(kind=SpanKind.SERVER)
4446
class JSONRPCHandler:
45-
"""A handler that maps the JSONRPC Objects to the request handler and back."""
47+
"""Maps the JSONRPC Objects to the request handler and back."""
4648

4749
def __init__(
4850
self,
@@ -53,7 +55,7 @@ def __init__(
5355
5456
Args:
5557
agent_card: The AgentCard describing the agent's capabilities.
56-
request_handler: The handler instance responsible for processing A2A requests.
58+
request_handler: The handler instance to process A2A requests.
5759
"""
5860
self.agent_card = agent_card
5961
self.request_handler = request_handler

src/a2a/utils/helpers.py

+3
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
TextPart,
1414
)
1515
from a2a.utils.errors import ServerError, UnsupportedOperationError
16+
from a2a.utils.telemetry import trace_function
1617

1718

1819
logger = logging.getLogger(__name__)
1920

2021

22+
@trace_function()
2123
def create_task_obj(message_send_params: MessageSendParams) -> Task:
2224
"""Create a new task object from message send params."""
2325
if not message_send_params.message.contextId:
@@ -31,6 +33,7 @@ def create_task_obj(message_send_params: MessageSendParams) -> Task:
3133
)
3234

3335

36+
@trace_function()
3437
def append_artifact_to_task(task: Task, event: TaskArtifactUpdateEvent) -> None:
3538
"""Helper method for updating Task with new artifact data."""
3639
if not task.artifacts:

src/a2a/utils/telemetry.py

+14-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
# type: ignore
21
"""OpenTelemetry Tracing Utilities for A2A Python SDK.
32
43
This module provides decorators to simplify the integration of OpenTelemetry
@@ -59,9 +58,12 @@ def internal_method(self):
5958
import logging
6059

6160
from opentelemetry import trace
62-
from opentelemetry.trace import SpanKind, StatusCode
61+
from opentelemetry.trace import SpanKind as _SpanKind
62+
from opentelemetry.trace import StatusCode
6363

6464

65+
SpanKind = _SpanKind
66+
__all__ = ['SpanKind']
6567
INSTRUMENTING_MODULE_NAME = 'a2a-python-sdk'
6668
INSTRUMENTING_MODULE_VERSION = '1.0.0'
6769

@@ -210,7 +212,11 @@ def sync_wrapper(*args, **kwargs):
210212
return async_wrapper if is_async_func else sync_wrapper
211213

212214

213-
def trace_class(include_list: list[str] = None, exclude_list: list[str] = None):
215+
def trace_class(
216+
include_list: list[str] | None = None,
217+
exclude_list: list[str] | None = None,
218+
kind=SpanKind.INTERNAL,
219+
):
214220
"""A class decorator to automatically trace specified methods of a class.
215221
216222
This decorator iterates over the methods of a class and applies the
@@ -278,7 +284,11 @@ def decorator(cls):
278284
all_methods[name] = method
279285
span_name = f'{cls.__module__}.{cls.__name__}.{name}'
280286
# Set the decorator on the method.
281-
setattr(cls, name, trace_function(span_name=span_name)(method))
287+
setattr(
288+
cls,
289+
name,
290+
trace_function(span_name=span_name, kind=kind)(method),
291+
)
282292
return cls
283293

284294
return decorator

uv.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)