@@ -32,6 +32,113 @@ You can access the jaeger dashboard as [jaeger dashboard](http://localhost:16686
32
32
33
33
You can run the demo as ` go run main.go `
34
34
35
+ ``` go
36
+ package main
37
+
38
+ import (
39
+ " context"
40
+ " fmt"
41
+ " github.com/Trendyol/kafka-konsumer/v2"
42
+ " go.opentelemetry.io/otel"
43
+ " go.opentelemetry.io/otel/attribute"
44
+ " go.opentelemetry.io/otel/exporters/jaeger"
45
+ " go.opentelemetry.io/otel/propagation"
46
+ " go.opentelemetry.io/otel/sdk/resource"
47
+ " go.opentelemetry.io/otel/sdk/trace"
48
+ semconv " go.opentelemetry.io/otel/semconv/v1.19.0"
49
+ " os"
50
+ " os/signal"
51
+ " time"
52
+ )
53
+
54
+ func main () {
55
+ jaegerUrl := " http://localhost:14268/api/traces"
56
+ tp := initJaegerTracer (jaegerUrl)
57
+ defer tp.Shutdown (context.Background ())
58
+
59
+ otel.SetTracerProvider (tp)
60
+ otel.SetTextMapPropagator (propagation.TraceContext {})
61
+
62
+ // ===============SIMULATE PRODUCER===============
63
+ producer , _ := kafka.NewProducer (&kafka.ProducerConfig {
64
+ Writer: kafka.WriterConfig {
65
+ Brokers: []string {" localhost:29092" },
66
+ },
67
+ DistributedTracingEnabled: true ,
68
+ })
69
+
70
+ const topicName = " standart-topic"
71
+ producedMessage := kafka.Message {
72
+ Topic: topicName,
73
+ Key: []byte (" 1" ),
74
+ Value: []byte (` { "foo": "bar" }` ),
75
+ }
76
+
77
+ tr := otel.Tracer (" after producing" )
78
+ parentCtx , span := tr.Start (context.Background (), " before producing work" )
79
+ time.Sleep (100 * time.Millisecond )
80
+ span.End ()
81
+
82
+ _ = producer.Produce (parentCtx, producedMessage)
83
+
84
+ // ===============SIMULATE CONSUMER===============
85
+ consumerCfg := &kafka.ConsumerConfig {
86
+ Reader: kafka.ReaderConfig {
87
+ Brokers: []string {" localhost:29092" },
88
+ Topic: topicName,
89
+ GroupID: " standart-cg" ,
90
+ },
91
+ ConsumeFn: consumeFn,
92
+ DistributedTracingEnabled: true ,
93
+ }
94
+
95
+ consumer , _ := kafka.NewConsumer (consumerCfg)
96
+ defer consumer.Stop ()
97
+
98
+ consumer.Consume ()
99
+
100
+ fmt.Println (" Consumer started...!" )
101
+
102
+ c := make (chan os.Signal , 1 )
103
+ signal.Notify (c, os.Interrupt )
104
+ <- c
105
+ }
106
+
107
+ func consumeFn (message *kafka .Message ) error {
108
+ fmt.Printf (" Message From %s with value %s " , message.Topic , string (message.Value ))
109
+
110
+ tr := otel.Tracer (" consumer" )
111
+ parentCtx , span := tr.Start (message.Context , " work" )
112
+ time.Sleep (100 * time.Millisecond )
113
+ span.End ()
114
+
115
+ _, span = tr.Start (parentCtx, " another work" )
116
+ time.Sleep (50 * time.Millisecond )
117
+ span.End ()
118
+
119
+ return nil
120
+ }
121
+
122
+ func initJaegerTracer (url string ) *trace .TracerProvider {
123
+ // Create the Jaeger exporter
124
+ exp , err := jaeger.New (jaeger.WithCollectorEndpoint (jaeger.WithEndpoint (url)))
125
+ if err != nil {
126
+ panic (" Err initializing jaeger instance" + err.Error ())
127
+ }
128
+
129
+ tp := trace.NewTracerProvider (
130
+ trace.WithBatcher (exp),
131
+ trace.WithResource (resource.NewWithAttributes (
132
+ semconv.SchemaURL ,
133
+ semconv.ServiceName (" kafka-konsumer-demo" ),
134
+ attribute.String (" environment" , " prod" ),
135
+ )),
136
+ )
137
+
138
+ return tp
139
+ }
140
+ ```
141
+
35
142
In the producing step, we open only two spans. In the consuming step, we open three spans. You can see their relationship via the jeager dashboard, as shown below.
36
143
37
144
![ Demo Jeager] ( ../../.github/images/jaeger-dashboard-example.png )
0 commit comments