Skip to content

Commit c04ec63

Browse files
adinauerclaude
andcommitted
test(samples): Add Kafka queue system tests for Spring Boot 3
Add KafkaQueueSystemTest with e2e tests for: - Producer endpoint creates queue.publish span - Consumer creates queue.process transaction - Distributed tracing (producer and consumer share same trace) - Messaging attributes on publish span and process transaction Also add produceKafkaMessage to RestTestClient and enable sentry.enable-queue-tracing in the kafka profile properties. Requires a running Kafka broker at localhost:9092 and the sample app started with --spring.profiles.active=kafka. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 04a4689 commit c04ec63

File tree

4 files changed

+121
-0
lines changed

4 files changed

+121
-0
lines changed

sentry-samples/sentry-samples-spring-boot-jakarta/src/main/resources/application-kafka.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
# Kafka — activate with: --spring.profiles.active=kafka
2+
sentry.enable-queue-tracing=true
3+
24
spring.autoconfigure.exclude=
35
spring.kafka.bootstrap-servers=localhost:9092
46
spring.kafka.consumer.group-id=sentry-sample-group
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package io.sentry.systemtest
2+
3+
import io.sentry.systemtest.util.TestHelper
4+
import kotlin.test.Test
5+
import kotlin.test.assertEquals
6+
import org.junit.Before
7+
8+
/**
9+
* System tests for Kafka queue instrumentation.
10+
*
11+
* Requires:
12+
* - The sample app running with `--spring.profiles.active=kafka`
13+
* - A Kafka broker at localhost:9092
14+
* - The mock Sentry server at localhost:8000
15+
*/
16+
class KafkaQueueSystemTest {
17+
lateinit var testHelper: TestHelper
18+
19+
@Before
20+
fun setup() {
21+
testHelper = TestHelper("http://localhost:8080")
22+
testHelper.reset()
23+
}
24+
25+
@Test
26+
fun `producer endpoint creates queue publish span`() {
27+
val restClient = testHelper.restClient
28+
29+
restClient.produceKafkaMessage("test-message")
30+
assertEquals(200, restClient.lastKnownStatusCode)
31+
32+
testHelper.ensureTransactionReceived { transaction, _ ->
33+
testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish")
34+
}
35+
}
36+
37+
@Test
38+
fun `consumer creates queue process transaction`() {
39+
val restClient = testHelper.restClient
40+
41+
restClient.produceKafkaMessage("test-consumer-message")
42+
assertEquals(200, restClient.lastKnownStatusCode)
43+
44+
// The consumer runs asynchronously, so wait for the queue.process transaction
45+
testHelper.ensureTransactionReceived { transaction, _ ->
46+
testHelper.doesTransactionHaveOp(transaction, "queue.process")
47+
}
48+
}
49+
50+
@Test
51+
fun `producer and consumer share same trace`() {
52+
val restClient = testHelper.restClient
53+
54+
restClient.produceKafkaMessage("trace-test-message")
55+
assertEquals(200, restClient.lastKnownStatusCode)
56+
57+
// Capture the trace ID from the producer transaction
58+
var producerTraceId: String? = null
59+
testHelper.ensureTransactionReceived { transaction, _ ->
60+
if (testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish")) {
61+
producerTraceId = transaction.contexts.trace?.traceId?.toString()
62+
true
63+
} else {
64+
false
65+
}
66+
}
67+
68+
// Verify the consumer transaction has the same trace ID
69+
testHelper.ensureTransactionReceived { transaction, _ ->
70+
if (testHelper.doesTransactionHaveOp(transaction, "queue.process")) {
71+
val consumerTraceId = transaction.contexts.trace?.traceId?.toString()
72+
producerTraceId != null && consumerTraceId == producerTraceId
73+
} else {
74+
false
75+
}
76+
}
77+
}
78+
79+
@Test
80+
fun `queue publish span has messaging attributes`() {
81+
val restClient = testHelper.restClient
82+
83+
restClient.produceKafkaMessage("attrs-test")
84+
assertEquals(200, restClient.lastKnownStatusCode)
85+
86+
testHelper.ensureTransactionReceived { transaction, _ ->
87+
val span = transaction.spans.firstOrNull { it.op == "queue.publish" }
88+
if (span == null) return@ensureTransactionReceived false
89+
90+
val data = span.data ?: return@ensureTransactionReceived false
91+
data["messaging.system"] == "kafka" && data["messaging.destination.name"] == "sentry-topic"
92+
}
93+
}
94+
95+
@Test
96+
fun `queue process transaction has messaging attributes`() {
97+
val restClient = testHelper.restClient
98+
99+
restClient.produceKafkaMessage("process-attrs-test")
100+
assertEquals(200, restClient.lastKnownStatusCode)
101+
102+
testHelper.ensureTransactionReceived { transaction, _ ->
103+
if (!testHelper.doesTransactionHaveOp(transaction, "queue.process")) {
104+
return@ensureTransactionReceived false
105+
}
106+
107+
val data = transaction.contexts.trace?.data ?: return@ensureTransactionReceived false
108+
data["messaging.system"] == "kafka" && data["messaging.destination.name"] == "sentry-topic"
109+
}
110+
}
111+
}

sentry-system-test-support/api/sentry-system-test-support.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,8 @@ public final class io/sentry/systemtest/util/RestTestClient : io/sentry/systemte
560560
public final fun getTodo (J)Lio/sentry/systemtest/Todo;
561561
public final fun getTodoRestClient (J)Lio/sentry/systemtest/Todo;
562562
public final fun getTodoWebclient (J)Lio/sentry/systemtest/Todo;
563+
public final fun produceKafkaMessage (Ljava/lang/String;)Ljava/lang/String;
564+
public static synthetic fun produceKafkaMessage$default (Lio/sentry/systemtest/util/RestTestClient;Ljava/lang/String;ILjava/lang/Object;)Ljava/lang/String;
563565
public final fun saveCachedTodo (Lio/sentry/systemtest/Todo;)Lio/sentry/systemtest/Todo;
564566
}
565567

sentry-system-test-support/src/main/kotlin/io/sentry/systemtest/util/RestTestClient.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@ class RestTestClient(private val backendBaseUrl: String) : LoggingInsecureRestCl
8181
return response?.body?.string()
8282
}
8383

84+
fun produceKafkaMessage(message: String = "hello from sentry!"): String? {
85+
val request = Request.Builder().url("$backendBaseUrl/kafka/produce?message=$message")
86+
87+
return callTyped(request, true)
88+
}
89+
8490
fun getCountMetric(): String? {
8591
val request = Request.Builder().url("$backendBaseUrl/metric/count")
8692

0 commit comments

Comments
 (0)