Skip to content

Commit 284aff6

Browse files
google-genai-botcopybara-github
authored andcommitted
refactor: Simplifiying Tracing code
Refactor the tracing implementation within the Google ADK to simplify how OpenTelemetry spans are managed, especially within RxJava streams. The key changes include: 1. **Introducing `Tracing.TracerProvider`**: A new set of RxJava transformers (`FlowableTransformer`, `SingleTransformer`, `MaybeTransformer`, `CompletableTransformer`) is added in `Tracing.java`. These transformers, created via `Tracing.trace()` methods, handle the lifecycle of OpenTelemetry spans, including span creation, making the span current, and ending the span upon stream completion or error. 2. **Simplifying Tracing Calls**: Instead of manually creating and managing spans with `Tracer`, `Span`, and `Scope`, various parts of the codebase now use the `.compose(Tracing.trace(...))` operator on RxJava streams. This is applied in: * `BaseAgent.java`: For agent invocations using `Tracing.traceAgent`. * `BaseLlmFlow.java`: For LLM calls (`call_llm`) and sending data (`send_data`). * `Functions.java`: For tool responses and tool calls. * `Runner.java`: For overall invocation spans in `runAsync` and `runLive`. 3. **Centralized Attribute Setting**: Helper methods like `getValidCurrentSpan`, `setInvocationAttributes`, `setToolExecutionAttributes`, and `setJsonAttribute` are added to `Tracing.java` to encapsulate and standardize how attributes are set on spans, including handling JSON serialization and checks for valid spans. These changes aim to reduce tracing-related boilerplate, improve consistency, and make the tracing logic more robust by tying span lifetimes to RxJava stream lifecycles. PiperOrigin-RevId: 869714739
1 parent bd1b82e commit 284aff6

File tree

6 files changed

+657
-573
lines changed

6 files changed

+657
-573
lines changed

core/src/main/java/com/google/adk/agents/BaseAgent.java

Lines changed: 40 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@
2929
import com.google.errorprone.annotations.CanIgnoreReturnValue;
3030
import com.google.errorprone.annotations.DoNotCall;
3131
import com.google.genai.types.Content;
32-
import io.opentelemetry.api.trace.Span;
33-
import io.opentelemetry.api.trace.Tracer;
34-
import io.opentelemetry.context.Context;
3532
import io.reactivex.rxjava3.core.Completable;
3633
import io.reactivex.rxjava3.core.Flowable;
3734
import io.reactivex.rxjava3.core.Maybe;
@@ -315,44 +312,37 @@ public Flowable<Event> runAsync(InvocationContext parentContext) {
315312
private Flowable<Event> run(
316313
InvocationContext parentContext,
317314
Function<InvocationContext, Flowable<Event>> runImplementation) {
318-
Tracer tracer = Tracing.getTracer();
319315
return Flowable.defer(
320316
() -> {
321317
InvocationContext invocationContext = createInvocationContext(parentContext);
322-
Span span =
323-
tracer.spanBuilder("invoke_agent " + name()).setParent(Context.current()).startSpan();
324-
Tracing.traceAgentInvocation(span, name(), description(), invocationContext);
325-
Context spanContext = Context.current().with(span);
326-
327-
return Tracing.traceFlowable(
328-
spanContext,
329-
span,
330-
() ->
331-
callCallback(
332-
beforeCallbacksToFunctions(
333-
invocationContext.pluginManager(), beforeAgentCallback),
334-
invocationContext)
335-
.flatMapPublisher(
336-
beforeEventOpt -> {
337-
if (invocationContext.endInvocation()) {
338-
return Flowable.fromOptional(beforeEventOpt);
339-
}
340-
341-
Flowable<Event> beforeEvents = Flowable.fromOptional(beforeEventOpt);
342-
Flowable<Event> mainEvents =
343-
Flowable.defer(() -> runImplementation.apply(invocationContext));
344-
Flowable<Event> afterEvents =
345-
Flowable.defer(
346-
() ->
347-
callCallback(
348-
afterCallbacksToFunctions(
349-
invocationContext.pluginManager(),
350-
afterAgentCallback),
351-
invocationContext)
352-
.flatMapPublisher(Flowable::fromOptional));
353-
354-
return Flowable.concat(beforeEvents, mainEvents, afterEvents);
355-
}));
318+
319+
return callCallback(
320+
beforeCallbacksToFunctions(
321+
invocationContext.pluginManager(), beforeAgentCallback),
322+
invocationContext)
323+
.flatMapPublisher(
324+
beforeEventOpt -> {
325+
if (invocationContext.endInvocation()) {
326+
return Flowable.fromOptional(beforeEventOpt);
327+
}
328+
329+
Flowable<Event> beforeEvents = Flowable.fromOptional(beforeEventOpt);
330+
Flowable<Event> mainEvents =
331+
Flowable.defer(() -> runImplementation.apply(invocationContext));
332+
Flowable<Event> afterEvents =
333+
Flowable.defer(
334+
() ->
335+
callCallback(
336+
afterCallbacksToFunctions(
337+
invocationContext.pluginManager(), afterAgentCallback),
338+
invocationContext)
339+
.flatMapPublisher(Flowable::fromOptional));
340+
341+
return Flowable.concat(beforeEvents, mainEvents, afterEvents);
342+
})
343+
.compose(
344+
Tracing.traceAgent(
345+
"invoke_agent " + name(), name(), description(), invocationContext));
356346
});
357347
}
358348

@@ -364,11 +354,8 @@ private Flowable<Event> run(
364354
*/
365355
private ImmutableList<Function<CallbackContext, Maybe<Content>>> beforeCallbacksToFunctions(
366356
Plugin pluginManager, List<? extends BeforeAgentCallback> callbacks) {
367-
return Stream.concat(
368-
Stream.of(ctx -> pluginManager.beforeAgentCallback(this, ctx)),
369-
callbacks.stream()
370-
.map(callback -> (Function<CallbackContext, Maybe<Content>>) callback::call))
371-
.collect(toImmutableList());
357+
return callbacksToFunctions(
358+
ctx -> pluginManager.beforeAgentCallback(this, ctx), callbacks, c -> c::call);
372359
}
373360

374361
/**
@@ -379,10 +366,15 @@ private ImmutableList<Function<CallbackContext, Maybe<Content>>> beforeCallbacks
379366
*/
380367
private ImmutableList<Function<CallbackContext, Maybe<Content>>> afterCallbacksToFunctions(
381368
Plugin pluginManager, List<? extends AfterAgentCallback> callbacks) {
382-
return Stream.concat(
383-
Stream.of(ctx -> pluginManager.afterAgentCallback(this, ctx)),
384-
callbacks.stream()
385-
.map(callback -> (Function<CallbackContext, Maybe<Content>>) callback::call))
369+
return callbacksToFunctions(
370+
ctx -> pluginManager.afterAgentCallback(this, ctx), callbacks, c -> c::call);
371+
}
372+
373+
private <T> ImmutableList<Function<CallbackContext, Maybe<Content>>> callbacksToFunctions(
374+
Function<CallbackContext, Maybe<Content>> pluginCallback,
375+
List<T> callbacks,
376+
Function<T, Function<CallbackContext, Maybe<Content>>> mapper) {
377+
return Stream.concat(Stream.of(pluginCallback), callbacks.stream().map(mapper))
386378
.collect(toImmutableList());
387379
}
388380

@@ -523,8 +515,7 @@ public B subAgents(List<? extends BaseAgent> subAgents) {
523515

524516
@CanIgnoreReturnValue
525517
public B subAgents(BaseAgent... subAgents) {
526-
this.subAgents = ImmutableList.copyOf(subAgents);
527-
return self();
518+
return subAgents(ImmutableList.copyOf(subAgents));
528519
}
529520

530521
@CanIgnoreReturnValue

0 commit comments

Comments
 (0)