Skip to content

Commit e11b90c

Browse files
Steven Karisrghetia
authored andcommitted
Implement W3C Correlation Context propagator (#179)
* Implement W3C Correlation Context propagator * PR comments * PR comments * Update test to inject context properly * Fix merge
1 parent 0025ffc commit e11b90c

File tree

9 files changed

+308
-29
lines changed

9 files changed

+308
-29
lines changed

api/propagation/noop_propagator.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919

2020
"go.opentelemetry.io/api/core"
21+
dctx "go.opentelemetry.io/api/distributedcontext"
2122
)
2223

2324
// NoopTextFormatPropagator implements TextFormatPropagator that does nothing.
@@ -30,8 +31,8 @@ func (np NoopTextFormatPropagator) Inject(ctx context.Context, supplier Supplier
3031
}
3132

3233
// Extract does nothing and returns an empty SpanContext
33-
func (np NoopTextFormatPropagator) Extract(ctx context.Context, supplier Supplier) core.SpanContext {
34-
return core.EmptySpanContext()
34+
func (np NoopTextFormatPropagator) Extract(ctx context.Context, supplier Supplier) (core.SpanContext, dctx.Map) {
35+
return core.EmptySpanContext(), dctx.NewEmptyMap()
3536
}
3637

3738
// GetAllKeys returns empty list of strings.

api/propagation/propagator.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,25 @@ import (
1818
"context"
1919

2020
"go.opentelemetry.io/api/core"
21+
dctx "go.opentelemetry.io/api/distributedcontext"
2122
)
2223

2324
// TextFormatPropagator is an interface that specifies methods to inject and extract SpanContext
24-
// into/from a carrier using Supplier interface.
25+
// and distributed context into/from a carrier using Supplier interface.
2526
// For example, HTTP Trace Context propagator would encode SpanContext into W3C Trace
2627
// Context Header and set the header into HttpRequest.
2728
type TextFormatPropagator interface {
2829
// Inject method retrieves current SpanContext from the ctx, encodes it into propagator
2930
// specific format and then injects the encoded SpanContext using supplier into a carrier
30-
// associated with the supplier.
31+
// associated with the supplier. It also takes a correlationCtx whose values will be
32+
// injected into a carrier using the supplier.
3133
Inject(ctx context.Context, supplier Supplier)
3234

3335
// Extract method retrieves encoded SpanContext using supplier from the associated carrier.
34-
// It decodes the SpanContext and returns it. If no SpanContext was retrieved OR
35-
// if the retrieved SpanContext is invalid then an empty SpanContext is returned.
36-
Extract(ctx context.Context, supplier Supplier) core.SpanContext
36+
// It decodes the SpanContext and returns it and a dctx of correlated context.
37+
// If no SpanContext was retrieved OR if the retrieved SpanContext is invalid then
38+
// an empty SpanContext is returned.
39+
Extract(ctx context.Context, supplier Supplier) (core.SpanContext, dctx.Map)
3740

3841
// GetAllKeys returns all the keys that this propagator injects/extracts into/from a
3942
// carrier. The use cases for this are

plugin/httptrace/httptrace.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,20 @@ var (
3636

3737
// Returns the Attributes, Context Entries, and SpanContext that were encoded by Inject.
3838
func Extract(ctx context.Context, req *http.Request) ([]core.KeyValue, []core.KeyValue, core.SpanContext) {
39-
sc := propagator.Extract(ctx, req.Header)
39+
sc, correlationCtx := propagator.Extract(ctx, req.Header)
4040

4141
attrs := []core.KeyValue{
4242
URLKey.String(req.URL.String()),
4343
// Etc.
4444
}
4545

46-
return attrs, nil, sc
46+
var correlationCtxKVs []core.KeyValue
47+
correlationCtx.Foreach(func(kv core.KeyValue) bool {
48+
correlationCtxKVs = append(correlationCtxKVs, kv)
49+
return true
50+
})
51+
52+
return attrs, correlationCtxKVs, sc
4753
}
4854

4955
func Inject(ctx context.Context, req *http.Request) {

plugin/othttp/handler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ func NewHandler(handler http.Handler, operation string, opts ...Option) http.Han
143143
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
144144
opts := append([]trace.SpanOption{}, h.spanOptions...) // start with the configured options
145145

146-
sc := h.prop.Extract(r.Context(), r.Header)
146+
// TODO: do something with the correlation context
147+
sc, _ := h.prop.Extract(r.Context(), r.Header)
147148
if sc.IsValid() { // not a valid span context, so no link / parent relationship to establish
148149
var opt trace.SpanOption
149150
if h.public {

propagation/http_b3_propagator.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"go.opentelemetry.io/api/trace"
2525

2626
"go.opentelemetry.io/api/core"
27+
dctx "go.opentelemetry.io/api/distributedcontext"
2728
apipropagation "go.opentelemetry.io/api/propagation"
2829
)
2930

@@ -84,11 +85,11 @@ func (b3 HTTPB3Propagator) Inject(ctx context.Context, supplier apipropagation.S
8485
}
8586

8687
// Extract retrieves B3 Headers from the supplier
87-
func (b3 HTTPB3Propagator) Extract(ctx context.Context, supplier apipropagation.Supplier) core.SpanContext {
88+
func (b3 HTTPB3Propagator) Extract(ctx context.Context, supplier apipropagation.Supplier) (core.SpanContext, dctx.Map) {
8889
if b3.SingleHeader {
89-
return b3.extractSingleHeader(supplier)
90+
return b3.extractSingleHeader(supplier), dctx.NewEmptyMap()
9091
}
91-
return b3.extract(supplier)
92+
return b3.extract(supplier), dctx.NewEmptyMap()
9293
}
9394

9495
func (b3 HTTPB3Propagator) GetAllKeys() []string {

propagation/http_b3_propagator_benchmark_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func BenchmarkExtractB3(b *testing.B) {
6565
b.ReportAllocs()
6666
b.ResetTimer()
6767
for i := 0; i < b.N; i++ {
68-
_ = propagator.Extract(ctx, req.Header)
68+
_, _ = propagator.Extract(ctx, req.Header)
6969
}
7070
})
7171
}

propagation/http_b3_propagator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func TestExtractB3(t *testing.T) {
6565
}
6666

6767
ctx := context.Background()
68-
gotSc := propagator.Extract(ctx, req.Header)
68+
gotSc, _ := propagator.Extract(ctx, req.Header)
6969
if diff := cmp.Diff(gotSc, tt.wantSc); diff != "" {
7070
t.Errorf("%s: %s: -got +want %s", tg.name, tt.name, diff)
7171
}

propagation/http_trace_context_propagator.go

Lines changed: 80 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,23 @@ import (
1818
"context"
1919
"encoding/hex"
2020
"fmt"
21+
"net/url"
2122
"regexp"
2223
"strconv"
2324
"strings"
2425

25-
"go.opentelemetry.io/api/trace"
26-
2726
"go.opentelemetry.io/api/core"
27+
dctx "go.opentelemetry.io/api/distributedcontext"
28+
"go.opentelemetry.io/api/key"
2829
apipropagation "go.opentelemetry.io/api/propagation"
30+
"go.opentelemetry.io/api/trace"
2931
)
3032

3133
const (
32-
supportedVersion = 0
33-
maxVersion = 254
34-
TraceparentHeader = "Traceparent"
34+
supportedVersion = 0
35+
maxVersion = 254
36+
TraceparentHeader = "Traceparent"
37+
CorrelationContextHeader = "Correlation-Context"
3538
)
3639

3740
// HTTPTraceContextPropagator propagates SpanContext in W3C TraceContext format.
@@ -51,9 +54,35 @@ func (hp HTTPTraceContextPropagator) Inject(ctx context.Context, supplier apipro
5154
sc.TraceFlags&core.TraceFlagsSampled)
5255
supplier.Set(TraceparentHeader, h)
5356
}
57+
58+
correlationCtx := dctx.FromContext(ctx)
59+
firstIter := true
60+
var headerValueBuilder strings.Builder
61+
correlationCtx.Foreach(func(kv core.KeyValue) bool {
62+
if !firstIter {
63+
headerValueBuilder.WriteRune(',')
64+
}
65+
firstIter = false
66+
headerValueBuilder.WriteString(url.QueryEscape(strings.TrimSpace((string)(kv.Key))))
67+
headerValueBuilder.WriteRune('=')
68+
headerValueBuilder.WriteString(url.QueryEscape(strings.TrimSpace(kv.Value.Emit())))
69+
return true
70+
})
71+
if headerValueBuilder.Len() > 0 {
72+
headerString := headerValueBuilder.String()
73+
supplier.Set(CorrelationContextHeader, headerString)
74+
}
5475
}
5576

56-
func (hp HTTPTraceContextPropagator) Extract(ctx context.Context, supplier apipropagation.Supplier) core.SpanContext {
77+
func (hp HTTPTraceContextPropagator) Extract(
78+
ctx context.Context, supplier apipropagation.Supplier,
79+
) (core.SpanContext, dctx.Map) {
80+
return hp.extractSpanContext(ctx, supplier), hp.extractCorrelationCtx(ctx, supplier)
81+
}
82+
83+
func (hp HTTPTraceContextPropagator) extractSpanContext(
84+
ctx context.Context, supplier apipropagation.Supplier,
85+
) core.SpanContext {
5786
h := supplier.Get(TraceparentHeader)
5887
if h == "" {
5988
return core.EmptySpanContext()
@@ -128,6 +157,50 @@ func (hp HTTPTraceContextPropagator) Extract(ctx context.Context, supplier apipr
128157
return sc
129158
}
130159

160+
func (hp HTTPTraceContextPropagator) extractCorrelationCtx(ctx context.Context, supplier apipropagation.Supplier) dctx.Map {
161+
correlationContext := supplier.Get(CorrelationContextHeader)
162+
if correlationContext == "" {
163+
return dctx.NewEmptyMap()
164+
}
165+
166+
contextValues := strings.Split(correlationContext, ",")
167+
keyValues := make([]core.KeyValue, 0, len(contextValues))
168+
for _, contextValue := range contextValues {
169+
valueAndProps := strings.Split(contextValue, ";")
170+
if len(valueAndProps) < 1 {
171+
continue
172+
}
173+
nameValue := strings.Split(valueAndProps[0], "=")
174+
if len(nameValue) < 2 {
175+
continue
176+
}
177+
name, err := url.QueryUnescape(nameValue[0])
178+
if err != nil {
179+
continue
180+
}
181+
trimmedName := strings.TrimSpace(name)
182+
value, err := url.QueryUnescape(nameValue[1])
183+
if err != nil {
184+
continue
185+
}
186+
trimmedValue := strings.TrimSpace(value)
187+
188+
// TODO (skaris): properties defiend https://w3c.github.io/correlation-context/, are currently
189+
// just put as part of the value.
190+
var trimmedValueWithProps strings.Builder
191+
trimmedValueWithProps.WriteString(trimmedValue)
192+
for _, prop := range valueAndProps[1:] {
193+
trimmedValueWithProps.WriteRune(';')
194+
trimmedValueWithProps.WriteString(prop)
195+
}
196+
197+
keyValues = append(keyValues, key.New(trimmedName).String(trimmedValueWithProps.String()))
198+
}
199+
return dctx.NewMap(dctx.MapUpdate{
200+
MultiKV: keyValues,
201+
})
202+
}
203+
131204
func (hp HTTPTraceContextPropagator) GetAllKeys() []string {
132-
return []string{TraceparentHeader}
205+
return []string{TraceparentHeader, CorrelationContextHeader}
133206
}

0 commit comments

Comments
 (0)