@@ -7,13 +7,15 @@ import (
77
88 "go.opentelemetry.io/otel"
99 "go.opentelemetry.io/otel/propagation"
10+ "go.opentelemetry.io/otel/trace"
1011
1112 "github.com/warpstreamlabs/bento/internal/bundle"
1213 "github.com/warpstreamlabs/bento/public/bloblang"
1314)
1415
1516const (
16- etsField = "extract_tracing_map"
17+ etsField = "extract_tracing_map"
18+ nrswlField = "new_root_span_with_link"
1719)
1820
1921// NewExtractTracingSpanMappingField returns a config field for mapping messages
@@ -27,6 +29,14 @@ func NewExtractTracingSpanMappingField() *ConfigField {
2729 Advanced ()
2830}
2931
32+ func NewRootSpanWithLinkField () * ConfigField {
33+ return NewBoolField (nrswlField ).
34+ Description ("EXPERIMENTAL: Starts a new root span with link to parent." ).
35+ Version ("1.0.0" ).
36+ Optional ().
37+ Advanced ()
38+ }
39+
3040// WrapBatchInputExtractTracingSpanMapping wraps a BatchInput with a mechanism
3141// for extracting tracing spans using a bloblang mapping.
3242func (p * ParsedConfig ) WrapBatchInputExtractTracingSpanMapping (inputName string , i BatchInput ) (
@@ -38,7 +48,13 @@ func (p *ParsedConfig) WrapBatchInputExtractTracingSpanMapping(inputName string,
3848 if err != nil {
3949 return nil , err
4050 }
41- return & spanInjectBatchInput {inputName : inputName , mgr : p .mgr , mapping : exe , rdr : i }, nil
51+
52+ newSpan , err := p .FieldBool (nrswlField )
53+ if err != nil {
54+ return nil , err
55+ }
56+
57+ return & spanInjectBatchInput {inputName : inputName , mgr : p .mgr , mapping : exe , rdr : i , newSpan : newSpan }, nil
4258}
4359
4460// WrapInputExtractTracingSpanMapping wraps a Input with a mechanism for
@@ -51,7 +67,13 @@ func (p *ParsedConfig) WrapInputExtractTracingSpanMapping(inputName string, i In
5167 if err != nil {
5268 return nil , err
5369 }
54- return & spanInjectInput {inputName : inputName , mgr : p .mgr , mapping : exe , rdr : i }, nil
70+
71+ newSpan , err := p .FieldBool (nrswlField )
72+ if err != nil {
73+ return nil , err
74+ }
75+
76+ return & spanInjectInput {inputName : inputName , mgr : p .mgr , mapping : exe , rdr : i , newSpan : newSpan }, nil
5577}
5678
5779func getPropMapCarrier (spanPart * Message ) (propagation.MapCarrier , error ) {
@@ -82,6 +104,7 @@ type spanInjectBatchInput struct {
82104
83105 mapping * bloblang.Executor
84106 rdr BatchInput
107+ newSpan bool
85108}
86109
87110func (s * spanInjectBatchInput ) Connect (ctx context.Context ) error {
@@ -112,7 +135,16 @@ func (s *spanInjectBatchInput) ReadBatch(ctx context.Context) (MessageBatch, Ack
112135 textProp := otel .GetTextMapPropagator ()
113136 for i , p := range m {
114137 ctx := textProp .Extract (p .Context (), c )
115- pCtx , _ := prov .Tracer ("bento" ).Start (ctx , operationName )
138+
139+ var opts []trace.SpanStartOption
140+ if s .newSpan {
141+ opts = []trace.SpanStartOption {
142+ trace .WithNewRoot (),
143+ trace .WithLinks (trace .LinkFromContext (ctx )),
144+ }
145+ }
146+
147+ pCtx , _ := prov .Tracer ("bento" ).Start (ctx , operationName , opts ... )
116148 m [i ] = p .WithContext (pCtx )
117149 }
118150 return m , afn , nil
@@ -130,6 +162,7 @@ type spanInjectInput struct {
130162
131163 mapping * bloblang.Executor
132164 rdr Input
165+ newSpan bool
133166}
134167
135168func (s * spanInjectInput ) Connect (ctx context.Context ) error {
@@ -159,7 +192,17 @@ func (s *spanInjectInput) Read(ctx context.Context) (*Message, AckFunc, error) {
159192
160193 textProp := otel .GetTextMapPropagator ()
161194
162- pCtx , _ := prov .Tracer ("bento" ).Start (textProp .Extract (m .Context (), c ), operationName )
195+ ctx = textProp .Extract (m .Context (), c )
196+
197+ var opts []trace.SpanStartOption
198+ if s .newSpan {
199+ opts = []trace.SpanStartOption {
200+ trace .WithNewRoot (),
201+ trace .WithLinks (trace .LinkFromContext (ctx )),
202+ }
203+ }
204+
205+ pCtx , _ := prov .Tracer ("bento" ).Start (ctx , operationName , opts ... )
163206 m = m .WithContext (pCtx )
164207
165208 return m , afn , nil
0 commit comments