Skip to content

Commit c19b265

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Implement basic version of BigQuery Agent Analytics Plugin
This change introduces a new plugin for the Agent Development Kit (ADK) that logs agent execution events to BigQuery. It includes: - `BigQueryAgentAnalyticsPlugin`: A plugin that captures various agent lifecycle events (user messages, tool calls, model invocations) and sends them to BigQuery. - `BigQueryLoggerConfig`: Configuration options for the plugin, including project/dataset/table IDs, batching, and retry settings. - `BigQuerySchema`: Defines the BigQuery and Arrow schemas used for the event table. - `BatchProcessor`: Handles batching of events and writing them to BigQuery using the Storage Write API with Arrow format. - `JsonFormatter`: Utility for safely formatting JSON content for BigQuery. PiperOrigin-RevId: 877497234
1 parent 444e0f0 commit c19b265

File tree

9 files changed

+2081
-0
lines changed

9 files changed

+2081
-0
lines changed

core/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,26 @@
193193
<artifactId>opentelemetry-sdk-testing</artifactId>
194194
<scope>test</scope>
195195
</dependency>
196+
<dependency>
197+
<groupId>com.google.cloud</groupId>
198+
<artifactId>google-cloud-bigquery</artifactId>
199+
<version>2.40.0</version>
200+
</dependency>
201+
<dependency>
202+
<groupId>org.apache.arrow</groupId>
203+
<artifactId>arrow-vector</artifactId>
204+
<version>17.0.0</version>
205+
</dependency>
206+
<dependency>
207+
<groupId>org.apache.arrow</groupId>
208+
<artifactId>arrow-memory-core</artifactId>
209+
<version>17.0.0</version>
210+
</dependency>
211+
<dependency>
212+
<groupId>org.apache.arrow</groupId>
213+
<artifactId>arrow-memory-netty</artifactId>
214+
<version>17.0.0</version>
215+
</dependency>
196216
</dependencies>
197217
<build>
198218
<resources>
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.adk.plugins.agentanalytics;
18+
19+
import static java.nio.charset.StandardCharsets.UTF_8;
20+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
21+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
22+
import static java.util.concurrent.TimeUnit.SECONDS;
23+
24+
import com.fasterxml.jackson.databind.JsonNode;
25+
import com.fasterxml.jackson.databind.node.ArrayNode;
26+
import com.fasterxml.jackson.databind.node.ObjectNode;
27+
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
28+
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
29+
import com.google.cloud.bigquery.storage.v1.StreamWriter;
30+
import com.google.common.annotations.VisibleForTesting;
31+
import java.time.Duration;
32+
import java.time.Instant;
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.concurrent.BlockingQueue;
37+
import java.util.concurrent.LinkedBlockingQueue;
38+
import java.util.concurrent.ScheduledExecutorService;
39+
import java.util.concurrent.atomic.AtomicBoolean;
40+
import java.util.logging.Level;
41+
import java.util.logging.Logger;
42+
import org.apache.arrow.memory.BufferAllocator;
43+
import org.apache.arrow.memory.RootAllocator;
44+
import org.apache.arrow.vector.BigIntVector;
45+
import org.apache.arrow.vector.BitVector;
46+
import org.apache.arrow.vector.FieldVector;
47+
import org.apache.arrow.vector.TimeStampVector;
48+
import org.apache.arrow.vector.VarCharVector;
49+
import org.apache.arrow.vector.VectorSchemaRoot;
50+
import org.apache.arrow.vector.VectorUnloader;
51+
import org.apache.arrow.vector.complex.ListVector;
52+
import org.apache.arrow.vector.complex.StructVector;
53+
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
54+
import org.apache.arrow.vector.types.pojo.Field;
55+
import org.apache.arrow.vector.types.pojo.Schema;
56+
57+
/** Handles asynchronous batching and writing of events to BigQuery. */
58+
class BatchProcessor implements AutoCloseable {
59+
private static final Logger logger = Logger.getLogger(BatchProcessor.class.getName());
60+
61+
private final StreamWriter writer;
62+
private final int batchSize;
63+
private final Duration flushInterval;
64+
private final BlockingQueue<Map<String, Object>> queue;
65+
private final ScheduledExecutorService executor;
66+
@VisibleForTesting final BufferAllocator allocator;
67+
final AtomicBoolean flushLock = new AtomicBoolean(false);
68+
private final Schema arrowSchema;
69+
private final VectorSchemaRoot root;
70+
71+
public BatchProcessor(
72+
StreamWriter writer,
73+
int batchSize,
74+
Duration flushInterval,
75+
int queueMaxSize,
76+
ScheduledExecutorService executor) {
77+
this.writer = writer;
78+
this.batchSize = batchSize;
79+
this.flushInterval = flushInterval;
80+
this.queue = new LinkedBlockingQueue<>(queueMaxSize);
81+
this.executor = executor;
82+
// It's safe to use Long.MAX_VALUE here as this is a top-level RootAllocator,
83+
// and memory is properly managed via try-with-resources in the flush() method.
84+
// The actual memory usage is bounded by the batchSize and individual row sizes.
85+
this.allocator = new RootAllocator(Long.MAX_VALUE);
86+
this.arrowSchema = BigQuerySchema.getArrowSchema();
87+
this.root = VectorSchemaRoot.create(arrowSchema, allocator);
88+
}
89+
90+
public void start() {
91+
@SuppressWarnings("unused")
92+
var unused =
93+
executor.scheduleWithFixedDelay(
94+
() -> {
95+
try {
96+
flush();
97+
} catch (RuntimeException e) {
98+
logger.log(Level.SEVERE, "Error in background flush", e);
99+
}
100+
},
101+
flushInterval.toMillis(),
102+
flushInterval.toMillis(),
103+
MILLISECONDS);
104+
}
105+
106+
public void append(Map<String, Object> row) {
107+
if (!queue.offer(row)) {
108+
logger.warning("BigQuery event queue is full, dropping event.");
109+
return;
110+
}
111+
if (queue.size() >= batchSize && !flushLock.get()) {
112+
executor.execute(this::flush);
113+
}
114+
}
115+
116+
public void flush() {
117+
// Acquire the flushLock. If another flush is already in progress, return immediately.
118+
if (!flushLock.compareAndSet(false, true)) {
119+
return;
120+
}
121+
try {
122+
if (queue.isEmpty()) {
123+
return;
124+
}
125+
List<Map<String, Object>> batch = new ArrayList<>();
126+
queue.drainTo(batch, batchSize);
127+
if (batch.isEmpty()) {
128+
return;
129+
}
130+
try {
131+
root.allocateNew();
132+
for (int i = 0; i < batch.size(); i++) {
133+
Map<String, Object> row = batch.get(i);
134+
for (Field field : arrowSchema.getFields()) {
135+
populateVector(root.getVector(field.getName()), i, row.get(field.getName()));
136+
}
137+
}
138+
root.setRowCount(batch.size());
139+
try (ArrowRecordBatch recordBatch = new VectorUnloader(root).getRecordBatch()) {
140+
AppendRowsResponse result = writer.append(recordBatch).get();
141+
if (result.hasError()) {
142+
logger.severe("BigQuery append error: " + result.getError().getMessage());
143+
for (var error : result.getRowErrorsList()) {
144+
logger.severe(
145+
String.format("Row error at index %d: %s", error.getIndex(), error.getMessage()));
146+
}
147+
} else {
148+
logger.fine("Successfully wrote " + batch.size() + " rows to BigQuery.");
149+
}
150+
} catch (AppendSerializationError ase) {
151+
logger.log(
152+
Level.SEVERE, "Failed to write batch to BigQuery due to serialization error", ase);
153+
Map<Integer, String> rowIndexToErrorMessage = ase.getRowIndexToErrorMessage();
154+
if (rowIndexToErrorMessage != null && !rowIndexToErrorMessage.isEmpty()) {
155+
logger.severe("Row-level errors found:");
156+
for (Map.Entry<Integer, String> entry : rowIndexToErrorMessage.entrySet()) {
157+
logger.severe(
158+
String.format("Row error at index %d: %s", entry.getKey(), entry.getValue()));
159+
}
160+
} else {
161+
logger.severe(
162+
"AppendSerializationError occurred, but no row-specific errors were provided.");
163+
}
164+
}
165+
} catch (Exception e) {
166+
logger.log(Level.SEVERE, "Failed to write batch to BigQuery", e);
167+
} finally {
168+
// Clear the vectors to release the memory.
169+
root.clear();
170+
}
171+
} finally {
172+
flushLock.set(false);
173+
if (queue.size() >= batchSize && !flushLock.get()) {
174+
executor.execute(this::flush);
175+
}
176+
}
177+
}
178+
179+
private void populateVector(FieldVector vector, int index, Object value) {
180+
if (value == null || (value instanceof JsonNode jsonNode && jsonNode.isNull())) {
181+
vector.setNull(index);
182+
return;
183+
}
184+
if (vector instanceof VarCharVector varCharVector) {
185+
String strValue = (value instanceof JsonNode jsonNode) ? jsonNode.asText() : value.toString();
186+
varCharVector.setSafe(index, strValue.getBytes(UTF_8));
187+
} else if (vector instanceof BigIntVector bigIntVector) {
188+
long longValue;
189+
if (value instanceof JsonNode jsonNode) {
190+
longValue = jsonNode.asLong();
191+
} else if (value instanceof Number number) {
192+
longValue = number.longValue();
193+
} else {
194+
longValue = Long.parseLong(value.toString());
195+
}
196+
bigIntVector.setSafe(index, longValue);
197+
} else if (vector instanceof BitVector bitVector) {
198+
boolean boolValue =
199+
(value instanceof JsonNode jsonNode) ? jsonNode.asBoolean() : (Boolean) value;
200+
bitVector.setSafe(index, boolValue ? 1 : 0);
201+
} else if (vector instanceof TimeStampVector timeStampVector) {
202+
if (value instanceof Instant instant) {
203+
long micros =
204+
SECONDS.toMicros(instant.getEpochSecond()) + NANOSECONDS.toMicros(instant.getNano());
205+
timeStampVector.setSafe(index, micros);
206+
} else if (value instanceof JsonNode jsonNode) {
207+
timeStampVector.setSafe(index, jsonNode.asLong());
208+
} else if (value instanceof Long longValue) {
209+
timeStampVector.setSafe(index, longValue);
210+
}
211+
} else if (vector instanceof ListVector listVector) {
212+
int start = listVector.startNewValue(index);
213+
if (value instanceof ArrayNode arrayNode) {
214+
for (int i = 0; i < arrayNode.size(); i++) {
215+
populateVector(listVector.getDataVector(), start + i, arrayNode.get(i));
216+
}
217+
listVector.endValue(index, arrayNode.size());
218+
} else if (value instanceof List) {
219+
List<?> list = (List<?>) value;
220+
for (int i = 0; i < list.size(); i++) {
221+
populateVector(listVector.getDataVector(), start + i, list.get(i));
222+
}
223+
listVector.endValue(index, list.size());
224+
}
225+
} else if (vector instanceof StructVector structVector) {
226+
structVector.setIndexDefined(index);
227+
if (value instanceof ObjectNode objectNode) {
228+
for (FieldVector child : structVector.getChildrenFromFields()) {
229+
populateVector(child, index, objectNode.get(child.getName()));
230+
}
231+
} else if (value instanceof Map) {
232+
Map<?, ?> map = (Map<?, ?>) value;
233+
for (FieldVector child : structVector.getChildrenFromFields()) {
234+
populateVector(child, index, map.get(child.getName()));
235+
}
236+
}
237+
}
238+
}
239+
240+
@Override
241+
public void close() {
242+
flush();
243+
root.close();
244+
if (writer != null) {
245+
writer.close();
246+
}
247+
if (allocator != null) {
248+
allocator.close();
249+
}
250+
}
251+
}

0 commit comments

Comments
 (0)