Skip to content

Commit c8ab0f9

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Implement basic version of BigQuery Agent Analytics Plugin
This change introduces a new plugin for the Agent Development Kit (ADK) that logs agent execution events to BigQuery. It includes: - `BigQueryAgentAnalyticsPlugin`: A plugin that captures various agent lifecycle events (user messages, tool calls, model invocations) and sends them to BigQuery. - `BigQueryLoggerConfig`: Configuration options for the plugin, including project/dataset/table IDs, batching, and retry settings. - `BigQuerySchema`: Defines the BigQuery and Arrow schemas used for the event table. - `BatchProcessor`: Handles batching of events and writing them to BigQuery using the Storage Write API with Arrow format. - `JsonFormatter`: Utility for safely formatting JSON content for BigQuery. PiperOrigin-RevId: 885133967
1 parent 4eb3613 commit c8ab0f9

File tree

9 files changed

+2180
-0
lines changed

9 files changed

+2180
-0
lines changed

core/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,26 @@
197197
<artifactId>opentelemetry-sdk-testing</artifactId>
198198
<scope>test</scope>
199199
</dependency>
200+
<dependency>
201+
<groupId>com.google.cloud</groupId>
202+
<artifactId>google-cloud-bigquery</artifactId>
203+
<version>2.40.0</version>
204+
</dependency>
205+
<dependency>
206+
<groupId>org.apache.arrow</groupId>
207+
<artifactId>arrow-vector</artifactId>
208+
<version>17.0.0</version>
209+
</dependency>
210+
<dependency>
211+
<groupId>org.apache.arrow</groupId>
212+
<artifactId>arrow-memory-core</artifactId>
213+
<version>17.0.0</version>
214+
</dependency>
215+
<dependency>
216+
<groupId>org.apache.arrow</groupId>
217+
<artifactId>arrow-memory-netty</artifactId>
218+
<version>17.0.0</version>
219+
</dependency>
200220
</dependencies>
201221
<build>
202222
<resources>
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.adk.plugins.agentanalytics;
18+
19+
import static java.nio.charset.StandardCharsets.UTF_8;
20+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
21+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
22+
import static java.util.concurrent.TimeUnit.SECONDS;
23+
24+
import com.fasterxml.jackson.databind.JsonNode;
25+
import com.fasterxml.jackson.databind.node.ArrayNode;
26+
import com.fasterxml.jackson.databind.node.ObjectNode;
27+
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
28+
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
29+
import com.google.cloud.bigquery.storage.v1.StreamWriter;
30+
import com.google.common.annotations.VisibleForTesting;
31+
import java.time.Duration;
32+
import java.time.Instant;
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.concurrent.BlockingQueue;
37+
import java.util.concurrent.LinkedBlockingQueue;
38+
import java.util.concurrent.ScheduledExecutorService;
39+
import java.util.concurrent.atomic.AtomicBoolean;
40+
import java.util.logging.Level;
41+
import java.util.logging.Logger;
42+
import org.apache.arrow.memory.BufferAllocator;
43+
import org.apache.arrow.memory.RootAllocator;
44+
import org.apache.arrow.vector.BigIntVector;
45+
import org.apache.arrow.vector.BitVector;
46+
import org.apache.arrow.vector.FieldVector;
47+
import org.apache.arrow.vector.TimeStampVector;
48+
import org.apache.arrow.vector.VarCharVector;
49+
import org.apache.arrow.vector.VectorSchemaRoot;
50+
import org.apache.arrow.vector.VectorUnloader;
51+
import org.apache.arrow.vector.complex.ListVector;
52+
import org.apache.arrow.vector.complex.StructVector;
53+
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
54+
import org.apache.arrow.vector.types.pojo.Field;
55+
import org.apache.arrow.vector.types.pojo.Schema;
56+
57+
/** Handles asynchronous batching and writing of events to BigQuery. */
58+
class BatchProcessor implements AutoCloseable {
59+
private static final Logger logger = Logger.getLogger(BatchProcessor.class.getName());
60+
61+
private final StreamWriter writer;
62+
private final int batchSize;
63+
private final Duration flushInterval;
64+
@VisibleForTesting final BlockingQueue<Map<String, Object>> queue;
65+
private final ScheduledExecutorService executor;
66+
@VisibleForTesting final BufferAllocator allocator;
67+
final AtomicBoolean flushLock = new AtomicBoolean(false);
68+
private final Schema arrowSchema;
69+
private final VectorSchemaRoot root;
70+
71+
public BatchProcessor(
72+
StreamWriter writer,
73+
int batchSize,
74+
Duration flushInterval,
75+
int queueMaxSize,
76+
ScheduledExecutorService executor) {
77+
this.writer = writer;
78+
this.batchSize = batchSize;
79+
this.flushInterval = flushInterval;
80+
this.queue = new LinkedBlockingQueue<>(queueMaxSize);
81+
this.executor = executor;
82+
// It's safe to use Long.MAX_VALUE here as this is a top-level RootAllocator,
83+
// and memory is properly managed via try-with-resources in the flush() method.
84+
// The actual memory usage is bounded by the batchSize and individual row sizes.
85+
this.allocator = new RootAllocator(Long.MAX_VALUE);
86+
this.arrowSchema = BigQuerySchema.getArrowSchema();
87+
this.root = VectorSchemaRoot.create(arrowSchema, allocator);
88+
}
89+
90+
public void start() {
91+
@SuppressWarnings("unused")
92+
var unused =
93+
executor.scheduleWithFixedDelay(
94+
() -> {
95+
try {
96+
flush();
97+
} catch (RuntimeException e) {
98+
logger.log(Level.SEVERE, "Error in background flush", e);
99+
}
100+
},
101+
flushInterval.toMillis(),
102+
flushInterval.toMillis(),
103+
MILLISECONDS);
104+
}
105+
106+
public void append(Map<String, Object> row) {
107+
if (!queue.offer(row)) {
108+
logger.warning("BigQuery event queue is full, dropping event.");
109+
return;
110+
}
111+
if (queue.size() >= batchSize && !flushLock.get()) {
112+
executor.execute(this::flush);
113+
}
114+
}
115+
116+
public void flush() {
117+
// Acquire the flushLock. If another flush is already in progress, return immediately.
118+
if (!flushLock.compareAndSet(false, true)) {
119+
return;
120+
}
121+
try {
122+
if (queue.isEmpty()) {
123+
return;
124+
}
125+
List<Map<String, Object>> batch = new ArrayList<>();
126+
queue.drainTo(batch, batchSize);
127+
if (batch.isEmpty()) {
128+
return;
129+
}
130+
try {
131+
root.allocateNew();
132+
for (int i = 0; i < batch.size(); i++) {
133+
Map<String, Object> row = batch.get(i);
134+
for (Field field : arrowSchema.getFields()) {
135+
populateVector(root.getVector(field.getName()), i, row.get(field.getName()));
136+
}
137+
}
138+
root.setRowCount(batch.size());
139+
try (ArrowRecordBatch recordBatch = new VectorUnloader(root).getRecordBatch()) {
140+
AppendRowsResponse result = writer.append(recordBatch).get();
141+
if (result.hasError()) {
142+
logger.severe("BigQuery append error: " + result.getError().getMessage());
143+
for (var error : result.getRowErrorsList()) {
144+
logger.severe(
145+
String.format("Row error at index %d: %s", error.getIndex(), error.getMessage()));
146+
}
147+
} else {
148+
logger.fine("Successfully wrote " + batch.size() + " rows to BigQuery.");
149+
}
150+
} catch (AppendSerializationError ase) {
151+
logger.log(
152+
Level.SEVERE, "Failed to write batch to BigQuery due to serialization error", ase);
153+
Map<Integer, String> rowIndexToErrorMessage = ase.getRowIndexToErrorMessage();
154+
if (rowIndexToErrorMessage != null && !rowIndexToErrorMessage.isEmpty()) {
155+
logger.severe("Row-level errors found:");
156+
for (Map.Entry<Integer, String> entry : rowIndexToErrorMessage.entrySet()) {
157+
logger.severe(
158+
String.format("Row error at index %d: %s", entry.getKey(), entry.getValue()));
159+
}
160+
} else {
161+
logger.severe(
162+
"AppendSerializationError occurred, but no row-specific errors were provided.");
163+
}
164+
}
165+
} catch (Exception e) {
166+
if (e instanceof InterruptedException) {
167+
Thread.currentThread().interrupt();
168+
}
169+
logger.log(Level.SEVERE, "Failed to write batch to BigQuery", e);
170+
} finally {
171+
// Clear the vectors to release the memory.
172+
root.clear();
173+
}
174+
} finally {
175+
flushLock.set(false);
176+
if (queue.size() >= batchSize && !flushLock.get()) {
177+
executor.execute(this::flush);
178+
}
179+
}
180+
}
181+
182+
private void populateVector(FieldVector vector, int index, Object value) {
183+
if (value == null || (value instanceof JsonNode jsonNode && jsonNode.isNull())) {
184+
vector.setNull(index);
185+
return;
186+
}
187+
if (vector instanceof VarCharVector varCharVector) {
188+
String strValue = (value instanceof JsonNode jsonNode) ? jsonNode.asText() : value.toString();
189+
varCharVector.setSafe(index, strValue.getBytes(UTF_8));
190+
} else if (vector instanceof BigIntVector bigIntVector) {
191+
long longValue;
192+
if (value instanceof JsonNode jsonNode) {
193+
longValue = jsonNode.asLong();
194+
} else if (value instanceof Number number) {
195+
longValue = number.longValue();
196+
} else {
197+
longValue = Long.parseLong(value.toString());
198+
}
199+
bigIntVector.setSafe(index, longValue);
200+
} else if (vector instanceof BitVector bitVector) {
201+
boolean boolValue =
202+
(value instanceof JsonNode jsonNode) ? jsonNode.asBoolean() : (Boolean) value;
203+
bitVector.setSafe(index, boolValue ? 1 : 0);
204+
} else if (vector instanceof TimeStampVector timeStampVector) {
205+
if (value instanceof Instant instant) {
206+
long micros =
207+
SECONDS.toMicros(instant.getEpochSecond()) + NANOSECONDS.toMicros(instant.getNano());
208+
timeStampVector.setSafe(index, micros);
209+
} else if (value instanceof JsonNode jsonNode) {
210+
timeStampVector.setSafe(index, jsonNode.asLong());
211+
} else if (value instanceof Long longValue) {
212+
timeStampVector.setSafe(index, longValue);
213+
}
214+
} else if (vector instanceof ListVector listVector) {
215+
int start = listVector.startNewValue(index);
216+
if (value instanceof ArrayNode arrayNode) {
217+
for (int i = 0; i < arrayNode.size(); i++) {
218+
populateVector(listVector.getDataVector(), start + i, arrayNode.get(i));
219+
}
220+
listVector.endValue(index, arrayNode.size());
221+
} else if (value instanceof List) {
222+
List<?> list = (List<?>) value;
223+
for (int i = 0; i < list.size(); i++) {
224+
populateVector(listVector.getDataVector(), start + i, list.get(i));
225+
}
226+
listVector.endValue(index, list.size());
227+
}
228+
} else if (vector instanceof StructVector structVector) {
229+
structVector.setIndexDefined(index);
230+
if (value instanceof ObjectNode objectNode) {
231+
for (FieldVector child : structVector.getChildrenFromFields()) {
232+
populateVector(child, index, objectNode.get(child.getName()));
233+
}
234+
} else if (value instanceof Map) {
235+
Map<?, ?> map = (Map<?, ?>) value;
236+
for (FieldVector child : structVector.getChildrenFromFields()) {
237+
populateVector(child, index, map.get(child.getName()));
238+
}
239+
}
240+
}
241+
}
242+
243+
@Override
244+
public void close() {
245+
if (this.queue != null && !this.queue.isEmpty()) {
246+
this.flush();
247+
}
248+
if (this.allocator != null) {
249+
try {
250+
this.allocator.close();
251+
} catch (RuntimeException e) {
252+
logger.log(Level.SEVERE, "Failed to close Buffer allocator", e);
253+
}
254+
}
255+
if (this.root != null) {
256+
try {
257+
this.root.close();
258+
} catch (RuntimeException e) {
259+
logger.log(Level.SEVERE, "Failed to close VectorSchemaRoot", e);
260+
}
261+
}
262+
if (this.writer != null) {
263+
try {
264+
this.writer.close();
265+
} catch (RuntimeException e) {
266+
logger.log(Level.SEVERE, "Failed to close BigQuery writer", e);
267+
}
268+
}
269+
}
270+
}

0 commit comments

Comments
 (0)