Skip to content

Commit cd85cc5

Browse files
committed
Add option to start new root span with link to remote context
1 parent 9d484c8 commit cd85cc5

File tree

10 files changed

+95
-10
lines changed

10 files changed

+95
-10
lines changed

internal/impl/kafka/input_sarama_kafka.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ Unfortunately this error message will appear for a wide range of connection prob
126126
Description("A maximum estimate for the time taken to process a message, this is used for tuning consumer group synchronization.").
127127
Advanced().Default("100ms"),
128128
service.NewExtractTracingSpanMappingField(),
129+
service.NewRootSpanWithLinkField(),
129130
service.NewObjectField(iskFieldGroup,
130131
service.NewDurationField(iskFieldGroupSessionTimeout).
131132
Description("A period after which a consumer of the group is kicked after no heartbeats.").

internal/impl/nats/docs.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@ NATS component, so that monitoring tools between NATS and bento can stay in sync
2424
`
2525
}
2626

27-
func inputTracingDocs() *service.ConfigField {
28-
return service.NewExtractTracingSpanMappingField().Version(tracingVersion)
27+
func inputTracingDocs() []*service.ConfigField {
28+
return []*service.ConfigField{
29+
service.NewExtractTracingSpanMappingField().Version(tracingVersion),
30+
service.NewRootSpanWithLinkField().Version(tracingVersion),
31+
}
2932
}
33+
3034
func outputTracingDocs() *service.ConfigField {
3135
return service.NewInjectTracingSpanMappingField().Version(tracingVersion)
3236
}

internal/impl/nats/input.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ You can access these metadata fields using [function interpolation](/docs/config
4949
Default(nats.DefaultSubPendingMsgsLimit).
5050
LintRule(`root = if this < 0 { ["prefetch count must be greater than or equal to zero"] }`)).
5151
Fields(connectionTailFields()...).
52-
Field(inputTracingDocs())
52+
Fields(inputTracingDocs()...)
5353
}
5454

5555
func init() {

internal/impl/nats/input_jetstream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ You can access these metadata fields using
8080
Advanced().
8181
Default(1024)).
8282
Fields(connectionTailFields()...).
83-
Field(inputTracingDocs())
83+
Fields(inputTracingDocs()...)
8484
}
8585

8686
func init() {

internal/impl/nats/input_stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ You can access these metadata fields using [function interpolation](/docs/config
135135
Default("30s"),
136136
).
137137
Fields(connectionTailFields()...).
138-
Field(inputTracingDocs())
138+
Fields(inputTracingDocs()...)
139139
}
140140

141141
func init() {

public/service/config_extract_tracing.go

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ import (
77

88
"go.opentelemetry.io/otel"
99
"go.opentelemetry.io/otel/propagation"
10+
"go.opentelemetry.io/otel/trace"
1011

1112
"github.com/warpstreamlabs/bento/internal/bundle"
1213
"github.com/warpstreamlabs/bento/public/bloblang"
1314
)
1415

1516
const (
16-
etsField = "extract_tracing_map"
17+
etsField = "extract_tracing_map"
18+
nrswlField = "new_root_span_with_link"
1719
)
1820

1921
// NewExtractTracingSpanMappingField returns a config field for mapping messages
@@ -27,18 +29,33 @@ func NewExtractTracingSpanMappingField() *ConfigField {
2729
Advanced()
2830
}
2931

32+
func NewRootSpanWithLinkField() *ConfigField {
33+
return NewBoolField(nrswlField).
34+
Description("EXPERIMENTAL: Starts a new root span with link to parent.").
35+
Version("1.0.0").
36+
Optional().
37+
Advanced()
38+
}
39+
3040
// WrapBatchInputExtractTracingSpanMapping wraps a BatchInput with a mechanism
3141
// for extracting tracing spans using a bloblang mapping.
3242
func (p *ParsedConfig) WrapBatchInputExtractTracingSpanMapping(inputName string, i BatchInput) (
3343
BatchInput, error) {
3444
if str, _ := p.FieldString(etsField); str == "" {
3545
return i, nil
3646
}
47+
3748
exe, err := p.FieldBloblang(etsField)
3849
if err != nil {
3950
return nil, err
4051
}
41-
return &spanInjectBatchInput{inputName: inputName, mgr: p.mgr, mapping: exe, rdr: i}, nil
52+
53+
newSpan, err := p.FieldBool(nrswlField)
54+
if err != nil && !strings.Contains(err.Error(), "was not found in the config") {
55+
return nil, err
56+
}
57+
58+
return &spanInjectBatchInput{inputName: inputName, mgr: p.mgr, mapping: exe, rdr: i, newSpan: newSpan}, nil
4259
}
4360

4461
// WrapInputExtractTracingSpanMapping wraps a Input with a mechanism for
@@ -51,7 +68,13 @@ func (p *ParsedConfig) WrapInputExtractTracingSpanMapping(inputName string, i In
5168
if err != nil {
5269
return nil, err
5370
}
54-
return &spanInjectInput{inputName: inputName, mgr: p.mgr, mapping: exe, rdr: i}, nil
71+
72+
newSpan, err := p.FieldBool(nrswlField)
73+
if err != nil && !strings.Contains(err.Error(), "was not found in the config") {
74+
return nil, err
75+
}
76+
77+
return &spanInjectInput{inputName: inputName, mgr: p.mgr, mapping: exe, rdr: i, newSpan: newSpan}, nil
5578
}
5679

5780
func getPropMapCarrier(spanPart *Message) (propagation.MapCarrier, error) {
@@ -82,6 +105,7 @@ type spanInjectBatchInput struct {
82105

83106
mapping *bloblang.Executor
84107
rdr BatchInput
108+
newSpan bool
85109
}
86110

87111
func (s *spanInjectBatchInput) Connect(ctx context.Context) error {
@@ -112,7 +136,16 @@ func (s *spanInjectBatchInput) ReadBatch(ctx context.Context) (MessageBatch, Ack
112136
textProp := otel.GetTextMapPropagator()
113137
for i, p := range m {
114138
ctx := textProp.Extract(p.Context(), c)
115-
pCtx, _ := prov.Tracer("bento").Start(ctx, operationName)
139+
140+
var opts []trace.SpanStartOption
141+
if s.newSpan {
142+
opts = []trace.SpanStartOption{
143+
trace.WithNewRoot(),
144+
trace.WithLinks(trace.LinkFromContext(ctx)),
145+
}
146+
}
147+
148+
pCtx, _ := prov.Tracer("bento").Start(ctx, operationName, opts...)
116149
m[i] = p.WithContext(pCtx)
117150
}
118151
return m, afn, nil
@@ -130,6 +163,7 @@ type spanInjectInput struct {
130163

131164
mapping *bloblang.Executor
132165
rdr Input
166+
newSpan bool
133167
}
134168

135169
func (s *spanInjectInput) Connect(ctx context.Context) error {
@@ -159,7 +193,17 @@ func (s *spanInjectInput) Read(ctx context.Context) (*Message, AckFunc, error) {
159193

160194
textProp := otel.GetTextMapPropagator()
161195

162-
pCtx, _ := prov.Tracer("bento").Start(textProp.Extract(m.Context(), c), operationName)
196+
ctx = textProp.Extract(m.Context(), c)
197+
198+
var opts []trace.SpanStartOption
199+
if s.newSpan {
200+
opts = []trace.SpanStartOption{
201+
trace.WithNewRoot(),
202+
trace.WithLinks(trace.LinkFromContext(ctx)),
203+
}
204+
}
205+
206+
pCtx, _ := prov.Tracer("bento").Start(ctx, operationName, opts...)
163207
m = m.WithContext(pCtx)
164208

165209
return m, afn, nil

website/docs/components/inputs/kafka.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ input:
8383
commit_period: 1s
8484
max_processing_period: 100ms
8585
extract_tracing_map: root = @ # No default (optional)
86+
new_root_span_with_link: false # No default (optional)
8687
group:
8788
session_timeout: 10s
8889
heartbeat_interval: 3s
@@ -599,6 +600,14 @@ extract_tracing_map: root = @
599600
extract_tracing_map: root = this.meta.span
600601
```
601602

603+
### `new_root_span_with_link`
604+
605+
EXPERIMENTAL: Starts a new root span with link to parent.
606+
607+
608+
Type: `bool`
609+
Requires version 1.0.0 or newer
610+
602611
### `group`
603612

604613
Tuning parameters for consumer group synchronization.

website/docs/components/inputs/nats.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ input:
6363
user_jwt: "" # No default (optional)
6464
user_nkey_seed: "" # No default (optional)
6565
extract_tracing_map: root = @ # No default (optional)
66+
new_root_span_with_link: false # No default (optional)
6667
```
6768
6869
</TabItem>
@@ -401,4 +402,12 @@ extract_tracing_map: root = @
401402
extract_tracing_map: root = this.meta.span
402403
```
403404

405+
### `new_root_span_with_link`
406+
407+
EXPERIMENTAL: Starts a new root span with link to parent.
408+
409+
410+
Type: `bool`
411+
Requires version 4.23.0 or newer
412+
404413

website/docs/components/inputs/nats_jetstream.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ input:
7171
user_jwt: "" # No default (optional)
7272
user_nkey_seed: "" # No default (optional)
7373
extract_tracing_map: root = @ # No default (optional)
74+
new_root_span_with_link: false # No default (optional)
7475
```
7576
7677
</TabItem>
@@ -450,4 +451,12 @@ extract_tracing_map: root = @
450451
extract_tracing_map: root = this.meta.span
451452
```
452453

454+
### `new_root_span_with_link`
455+
456+
EXPERIMENTAL: Starts a new root span with link to parent.
457+
458+
459+
Type: `bool`
460+
Requires version 4.23.0 or newer
461+
453462

website/docs/components/inputs/nats_stream.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ input:
7070
user_jwt: "" # No default (optional)
7171
user_nkey_seed: "" # No default (optional)
7272
extract_tracing_map: root = @ # No default (optional)
73+
new_root_span_with_link: false # No default (optional)
7374
```
7475
7576
</TabItem>
@@ -423,4 +424,12 @@ extract_tracing_map: root = @
423424
extract_tracing_map: root = this.meta.span
424425
```
425426

427+
### `new_root_span_with_link`
428+
429+
EXPERIMENTAL: Starts a new root span with link to parent.
430+
431+
432+
Type: `bool`
433+
Requires version 4.23.0 or newer
434+
426435

0 commit comments

Comments
 (0)