diff --git a/Makefile b/Makefile index ba2e79d18..b0743dcd9 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ PATHINSTSERVERLESS = $(DEST_DIR)/serverless PATHINSTDOCKER = $(DEST_DIR)/docker DOCKER_IMAGE ?= ghcr.io/warpstreamlabs/bento -VERSION := $(shell git describe --tags || echo "v0.0.0") +VERSION := $(shell git describe --tags 2>/dev/null || echo "v0.0.0") VER_CUT := $(shell echo $(VERSION) | cut -c2-) VER_MAJOR := $(shell echo $(VER_CUT) | cut -f1 -d.) VER_MINOR := $(shell echo $(VER_CUT) | cut -f2 -d.) diff --git a/internal/bloblang/query/functions.go b/internal/bloblang/query/functions.go index d8c28f7c4..578b59748 100644 --- a/internal/bloblang/query/functions.go +++ b/internal/bloblang/query/functions.go @@ -263,6 +263,21 @@ var _ = registerSimpleFunction( }, ) +var _ = registerSimpleFunction( + NewFunctionSpec( + FunctionCategoryMessage, "flow_id", + "Returns the message flow ID used for tracing the journey of a message through the pipeline. Flow IDs are automatically assigned at the input layer.", + NewExampleSpec("", + `meta flow_id = flow_id()`, + ), + ).Experimental(), + func(fCtx FunctionContext) (any, error) { + part := fCtx.MsgBatch.Get(fCtx.Index) + ctx := part.GetContext() + return tracing.GetFlowID(ctx), nil + }, +) + //------------------------------------------------------------------------------ var _ = registerFunction( diff --git a/internal/bundle/tracing/bundle.go b/internal/bundle/tracing/bundle.go index 1503f8db3..07fadcef7 100644 --- a/internal/bundle/tracing/bundle.go +++ b/internal/bundle/tracing/bundle.go @@ -71,3 +71,23 @@ func TracedBundle(b *bundle.Environment) (*bundle.Environment, *Summary) { return tracedEnv, summary } + +// FlowIDBundle modifies a provided bundle environment so that all inputs +// are wrapped to ensure flow IDs are initialized on all messages. +// This is independent of tracing and should be applied to all builds. +func FlowIDBundle(b *bundle.Environment) *bundle.Environment { + flowEnv := b.Clone() + + for _, spec := range b.InputDocs() { + _ = flowEnv.InputAdd(func(conf input.Config, nm bundle.NewManagement) (input.Streamed, error) { + i, err := b.InputInit(conf, nm) + if err != nil { + return nil, err + } + i = wrapWithFlowID(i) + return i, err + }, spec) + } + + return flowEnv +} diff --git a/internal/bundle/tracing/bundle_test.go b/internal/bundle/tracing/bundle_test.go index f732418a1..66611c321 100644 --- a/internal/bundle/tracing/bundle_test.go +++ b/internal/bundle/tracing/bundle_test.go @@ -14,6 +14,7 @@ import ( "github.com/warpstreamlabs/bento/internal/component/testutil" "github.com/warpstreamlabs/bento/internal/manager" "github.com/warpstreamlabs/bento/internal/message" + internaltracing "github.com/warpstreamlabs/bento/internal/tracing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -520,6 +521,7 @@ meta bar = "new bar value" for i := 0; i < 10; i++ { part := message.NewPart([]byte(strconv.Itoa(i))) part.MetaSetMut("foo", fmt.Sprintf("foo value %v", i)) + part = internaltracing.EnsureFlowID(part) batch, res := proc.ProcessBatch(tCtx, message.Batch{part}) require.NoError(t, res) require.Len(t, batch, 1) @@ -540,7 +542,14 @@ meta bar = "new bar value" type tMap = map[string]any - assert.Equal(t, []tracing.NodeEvent{ + // Check that all events have flow IDs and timestamps + for i, event := range events { + assert.NotEmpty(t, event.FlowID, "Event %d should have a flow ID", i) + assert.False(t, event.Timestamp.IsZero(), "Event %d should have a timestamp", i) + } + + // Check the event types and content (ignoring FlowID and Timestamp for exact comparison) + expectedEvents := []tracing.NodeEvent{ {Type: "CONSUME", Content: "0", Meta: tMap{"foo": "foo value 0"}}, {Type: "PRODUCE", Content: "0", Meta: tMap{"foo": "foo value 0"}}, {Type: "ERROR", Content: "failed assignment (line 3): nah 0"}, @@ -566,7 +575,21 @@ meta bar = "new bar value" {Type: "ERROR", Content: "failed assignment (line 3): nah 8"}, {Type: "CONSUME", Content: "9", Meta: tMap{"foo": "foo value 9"}}, {Type: "PRODUCE", Content: "{\"count\":9}", Meta: tMap{"bar": "new bar value", "foo": "foo value 9"}}, - }, events) + } + + require.Len(t, expectedEvents, len(events)) + for i, expected := range expectedEvents { + actual := events[i] + assert.Equal(t, expected.Type, actual.Type, "Event %d type mismatch", i) + assert.Equal(t, expected.Content, actual.Content, "Event %d content mismatch", i) + + // For metadata comparison, we need to account for the flow ID that gets added + if expected.Meta != nil { + for key, value := range expected.Meta { + assert.Equal(t, value, actual.Meta[key], "Event %d meta[%s] mismatch", i, key) + } + } + } } func TestBundleProcessorTracingError(t *testing.T) { diff --git a/internal/bundle/tracing/events.go b/internal/bundle/tracing/events.go index 043519cb9..974b7b246 100644 --- a/internal/bundle/tracing/events.go +++ b/internal/bundle/tracing/events.go @@ -3,8 +3,10 @@ package tracing import ( "sync" "sync/atomic" + "time" "github.com/warpstreamlabs/bento/internal/message" + "github.com/warpstreamlabs/bento/internal/tracing" ) // EventType describes the type of event a component might experience during @@ -21,9 +23,11 @@ var ( // NodeEvent represents a single event that occurred within the stream. type NodeEvent struct { - Type EventType - Content string - Meta map[string]any + Type EventType + Content string + Meta map[string]any + FlowID string + Timestamp time.Time } // EventProduceOf creates a produce event from a message part. @@ -35,9 +39,11 @@ func EventProduceOf(part *message.Part) NodeEvent { }) return NodeEvent{ - Type: EventProduce, - Content: string(part.AsBytes()), - Meta: meta, + Type: EventProduce, + Content: string(part.AsBytes()), + Meta: meta, + FlowID: getFlowID(part), + Timestamp: time.Now(), } } @@ -50,27 +56,62 @@ func EventConsumeOf(part *message.Part) NodeEvent { }) return NodeEvent{ - Type: EventConsume, - Content: string(part.AsBytes()), - Meta: meta, + Type: EventConsume, + Content: string(part.AsBytes()), + Meta: meta, + FlowID: getFlowID(part), + Timestamp: time.Now(), } } // EventDeleteOf creates a deleted event from a message part. +// +// Deprecated: Use EventDeleteOfPart instead. This function cannot track flow IDs. func EventDeleteOf() NodeEvent { return NodeEvent{ - Type: EventDelete, + Type: EventDelete, + FlowID: "", + Timestamp: time.Now(), + } +} + +// EventDeleteOfPart creates a deleted event with flow ID from a message part. +func EventDeleteOfPart(part *message.Part) NodeEvent { + return NodeEvent{ + Type: EventDelete, + FlowID: getFlowID(part), + Timestamp: time.Now(), } } // EventErrorOf creates an error event from a message part. +// +// Deprecated: Use EventErrorOfPart instead. This function cannot track flow IDs. func EventErrorOf(err error) NodeEvent { return NodeEvent{ - Type: EventError, - Content: err.Error(), + Type: EventError, + Content: err.Error(), + FlowID: "", + Timestamp: time.Now(), + } +} + +// EventErrorOfPart creates an error event with flow ID from a message part. +func EventErrorOfPart(part *message.Part, err error) NodeEvent { + return NodeEvent{ + Type: EventError, + Content: err.Error(), + FlowID: getFlowID(part), + Timestamp: time.Now(), } } +// getFlowID retrieves the flow ID from a message part. +// Flow IDs are initialized at the input layer, so this should always return a value. +func getFlowID(part *message.Part) string { + return tracing.GetFlowID(message.GetContext(part)) +} + type control struct { isEnabled int32 eventLimit int64 diff --git a/internal/bundle/tracing/events_test.go b/internal/bundle/tracing/events_test.go new file mode 100644 index 000000000..5ee92f7e0 --- /dev/null +++ b/internal/bundle/tracing/events_test.go @@ -0,0 +1,112 @@ +package tracing + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/warpstreamlabs/bento/internal/message" + "github.com/warpstreamlabs/bento/internal/tracing" +) + +func TestFlowIDGeneration(t *testing.T) { + part := message.NewPart([]byte("test message")) + + part = tracing.EnsureFlowID(part) + flowID1 := getFlowID(part) + assert.NotEmpty(t, flowID1) + // UUID V7 format: xxxxxxxx-xxxx-7xxx-xxxx-xxxxxxxxxxxx + assert.Regexp(t, `^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$`, flowID1) + + flowID2 := getFlowID(part) + assert.Equal(t, flowID1, flowID2) + + ctx := message.GetContext(part) + storedFlowID := tracing.GetFlowID(ctx) + assert.NotEmpty(t, storedFlowID) + assert.Equal(t, flowID1, storedFlowID) +} + +func TestFlowIDFromExistingContext(t *testing.T) { + part := message.NewPart([]byte("test message")) + expectedFlowID := "existing_flow_123" + ctx := tracing.WithFlowID(message.GetContext(part), expectedFlowID) + part = part.WithContext(ctx) + + flowID := getFlowID(part) + assert.Equal(t, expectedFlowID, flowID) +} + +func TestEnsureFlowIDWithExisting(t *testing.T) { + part := message.NewPart([]byte("test message")) + expectedFlowID := "existing_flow_123" + ctx := tracing.WithFlowID(message.GetContext(part), expectedFlowID) + part = part.WithContext(ctx) + + part = tracing.EnsureFlowID(part) + flowID := getFlowID(part) + assert.Equal(t, expectedFlowID, flowID) +} + +func TestEventCreationWithFlowID(t *testing.T) { + part := message.NewPart([]byte("test content")) + part.MetaSetMut("test_meta", "test_value") + part = tracing.EnsureFlowID(part) + + before := time.Now() + produceEvent := EventProduceOf(part) + after := time.Now() + + assert.Equal(t, EventProduce, produceEvent.Type) + assert.Equal(t, "test content", produceEvent.Content) + assert.Equal(t, "test_value", produceEvent.Meta["test_meta"]) + assert.NotEmpty(t, produceEvent.FlowID) + assert.True(t, produceEvent.Timestamp.After(before) || produceEvent.Timestamp.Equal(before)) + assert.True(t, produceEvent.Timestamp.Before(after) || produceEvent.Timestamp.Equal(after)) + + consumeEvent := EventConsumeOf(part) + assert.Equal(t, EventConsume, consumeEvent.Type) + assert.Equal(t, produceEvent.FlowID, consumeEvent.FlowID) + + deleteEvent := EventDeleteOfPart(part) + assert.Equal(t, EventDelete, deleteEvent.Type) + assert.Equal(t, produceEvent.FlowID, deleteEvent.FlowID) + assert.Empty(t, deleteEvent.Content) + + testErr := assert.AnError + errorEvent := EventErrorOfPart(part, testErr) + assert.Equal(t, EventError, errorEvent.Type) + assert.Equal(t, testErr.Error(), errorEvent.Content) + assert.Equal(t, produceEvent.FlowID, errorEvent.FlowID) +} + +// TestEventCreationWithoutPart tests deprecated functions that create events without flow IDs. +// These functions are kept for backward compatibility. +func TestEventCreationWithoutPart(t *testing.T) { + deleteEvent := EventDeleteOf() // Deprecated: use EventDeleteOfPart + assert.Equal(t, EventDelete, deleteEvent.Type) + assert.Empty(t, deleteEvent.FlowID) + assert.Empty(t, deleteEvent.Content) + + testErr := assert.AnError + errorEvent := EventErrorOf(testErr) // Deprecated: use EventErrorOfPart + assert.Equal(t, EventError, errorEvent.Type) + assert.Equal(t, testErr.Error(), errorEvent.Content) + assert.Empty(t, errorEvent.FlowID) +} + +func TestUniqueFlowIDs(t *testing.T) { + part1 := message.NewPart([]byte("message 1")) + part2 := message.NewPart([]byte("message 2")) + + part1 = tracing.EnsureFlowID(part1) + part2 = tracing.EnsureFlowID(part2) + + flowID1 := getFlowID(part1) + flowID2 := getFlowID(part2) + + assert.NotEqual(t, flowID1, flowID2) + assert.NotEmpty(t, flowID1) + assert.NotEmpty(t, flowID2) +} diff --git a/internal/bundle/tracing/flow_wrapper.go b/internal/bundle/tracing/flow_wrapper.go new file mode 100644 index 000000000..39ce6d0f3 --- /dev/null +++ b/internal/bundle/tracing/flow_wrapper.go @@ -0,0 +1,85 @@ +package tracing + +import ( + "context" + + "github.com/Jeffail/shutdown" + + "github.com/warpstreamlabs/bento/internal/component" + "github.com/warpstreamlabs/bento/internal/component/input" + "github.com/warpstreamlabs/bento/internal/message" + internaltracing "github.com/warpstreamlabs/bento/internal/tracing" +) + +// flowIDInput wraps an input to ensure all messages have flow IDs initialized. +// This is independent of tracing and ensures flow IDs are always available. +type flowIDInput struct { + wrapped input.Streamed + tChan chan message.Transaction + shutSig *shutdown.Signaller +} + +func wrapWithFlowID(i input.Streamed) input.Streamed { + f := &flowIDInput{ + wrapped: i, + tChan: make(chan message.Transaction), + shutSig: shutdown.NewSignaller(), + } + go f.loop() + return f +} + +func (f *flowIDInput) UnwrapInput() input.Streamed { + return f.wrapped +} + +func (f *flowIDInput) loop() { + defer close(f.tChan) + readChan := f.wrapped.TransactionChan() + for { + var tran message.Transaction + var open bool + select { + case tran, open = <-readChan: + if !open { + return + } + case <-f.shutSig.HardStopChan(): + return + } + + _ = tran.Payload.Iter(func(i int, part *message.Part) error { + tran.Payload[i] = internaltracing.EnsureFlowID(part) + return nil + }) + + select { + case f.tChan <- tran: + case <-f.shutSig.HardStopChan(): + return + } + } +} + +func (f *flowIDInput) TransactionChan() <-chan message.Transaction { + return f.tChan +} + +func (f *flowIDInput) ConnectionStatus() component.ConnectionStatuses { + return f.wrapped.ConnectionStatus() +} + +func (f *flowIDInput) TriggerStopConsuming() { + f.wrapped.TriggerStopConsuming() +} + +func (f *flowIDInput) TriggerCloseNow() { + f.wrapped.TriggerCloseNow() + f.shutSig.TriggerHardStop() +} + +func (f *flowIDInput) WaitForClose(ctx context.Context) error { + err := f.wrapped.WaitForClose(ctx) + f.shutSig.TriggerHardStop() + return err +} diff --git a/internal/bundle/tracing/processor.go b/internal/bundle/tracing/processor.go index ff5af621f..8602362d7 100644 --- a/internal/bundle/tracing/processor.go +++ b/internal/bundle/tracing/processor.go @@ -32,14 +32,19 @@ func (t *tracedProcessor) ProcessBatch(ctx context.Context, m message.Batch) ([] return t.wrapped.ProcessBatch(ctx, m) } + inputParts := make([]*message.Part, m.Len()) prevErrs := make([]error, m.Len()) _ = m.Iter(func(i int, part *message.Part) error { + inputParts[i] = part t.e.Add(EventConsumeOf(part)) prevErrs[i] = part.ErrorGet() return nil }) outMsgs, res := t.wrapped.ProcessBatch(ctx, m) + + hasOutput := make([]bool, len(inputParts)) + for _, outMsg := range outMsgs { _ = outMsg.Iter(func(i int, part *message.Part) error { t.e.Add(EventProduceOf(part)) @@ -52,13 +57,23 @@ func (t *tracedProcessor) ProcessBatch(ctx context.Context, m message.Batch) ([] return nil } _ = atomic.AddUint64(t.errCtr, 1) - t.e.Add(EventErrorOf(fail)) + t.e.Add(EventErrorOfPart(part, fail)) return nil }) + + if outMsg.Len() > 0 { + for i := 0; i < len(hasOutput) && i < outMsg.Len(); i++ { + hasOutput[i] = true + } + } } + if len(outMsgs) == 0 { - // TODO: Find a better way of locating deletes (using batch index tracking). - t.e.Add(EventDeleteOf()) + for _, part := range inputParts { + if part != nil { + t.e.Add(EventDeleteOfPart(part)) + } + } } return outMsgs, res diff --git a/internal/impl/pure/processor_workflow_test.go b/internal/impl/pure/processor_workflow_test.go index 162058b4d..5c3be9f1d 100644 --- a/internal/impl/pure/processor_workflow_test.go +++ b/internal/impl/pure/processor_workflow_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "maps" "sort" "strconv" "strings" @@ -1038,6 +1039,31 @@ workflow: require.NoError(t, strm.Stop(tCtx)) assert.Equal(t, `{"content":"waddup","id":"HELLO WORLD","meta":{"workflow":{"succeeded":["fooproc"]}}}`, outValue) + + normalizeEvents := func(events map[string][]service.TracingEvent) map[string][]service.TracingEvent { + normalized := make(map[string][]service.TracingEvent) + for k, evs := range events { + normalizedEvs := make([]service.TracingEvent, len(evs)) + for i, ev := range evs { + normalizedEvs[i] = service.TracingEvent{ + Type: ev.Type, + Content: ev.Content, + FlowID: "", + Timestamp: time.Time{}, + } + + if ev.Meta == nil { + normalizedEvs[i].Meta = map[string]any{} + } else { + normalizedEvs[i].Meta = make(map[string]any) + maps.Copy(normalizedEvs[i].Meta, ev.Meta) + } + } + normalized[k] = normalizedEvs + } + return normalized + } + assert.Equal(t, map[string][]service.TracingEvent{ "barproc": { {Type: "CONSUME", Content: "{\"id\":\"hello world\",\"content\":\"waddup\"}", Meta: map[string]interface{}{}}, @@ -1048,5 +1074,5 @@ workflow: {Type: "CONSUME", Content: "hello world", Meta: map[string]interface{}{}}, {Type: "PRODUCE", Content: "{\"id\":\"HELLO WORLD\"}", Meta: map[string]interface{}{}}, }, - }, tracer.ProcessorEvents(false)) + }, normalizeEvents(tracer.ProcessorEvents(false))) } diff --git a/internal/tracing/flow.go b/internal/tracing/flow.go new file mode 100644 index 000000000..74cf82d2e --- /dev/null +++ b/internal/tracing/flow.go @@ -0,0 +1,45 @@ +package tracing + +import ( + "context" + + "github.com/google/uuid" + "github.com/warpstreamlabs/bento/internal/message" +) + +const EmptyTraceID = "00000000000000000000000000000000" + +type flowTraceKey struct{} + +// WithFlowID returns a context with the flow ID stored in it +func WithFlowID(ctx context.Context, flowID string) context.Context { + return context.WithValue(ctx, flowTraceKey{}, flowID) +} + +// GetFlowID retrieves the flow ID from a context +func GetFlowID(ctx context.Context) string { + if id, ok := ctx.Value(flowTraceKey{}).(string); ok { + return id + } + return "" +} + +// EnsureFlowID ensures a flow ID exists on the message part, creating one if necessary. +// This should be called at the input layer to initialize flow IDs for all messages. +func EnsureFlowID(part *message.Part) *message.Part { + ctx := message.GetContext(part) + + if flowID := GetFlowID(ctx); flowID != "" { + return part + } + + if traceID := GetTraceID(part); traceID != "" && traceID != EmptyTraceID { + ctx = WithFlowID(ctx, traceID) + return part.WithContext(ctx) + } + + flowID, _ := uuid.NewV7() + flowIDStr := flowID.String() + ctx = WithFlowID(ctx, flowIDStr) + return part.WithContext(ctx) +} diff --git a/public/service/flow_id_test.go b/public/service/flow_id_test.go new file mode 100644 index 000000000..f81b33631 --- /dev/null +++ b/public/service/flow_id_test.go @@ -0,0 +1,174 @@ +package service_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/warpstreamlabs/bento/public/service" + + _ "github.com/warpstreamlabs/bento/public/components/pure" +) + +func TestFlowIDInNonTracedBuild(t *testing.T) { + config := ` +input: + generate: + count: 3 + interval: 1us + mapping: | + root.id = count("test_flow") + meta captured_flow_id = flow_id() + +pipeline: + threads: 1 + processors: + - bloblang: | + root.message = "processed" + root.original_id = this.id + meta flow_check = flow_id() + +output: + drop: {} + +logger: + level: OFF +` + + strmBuilder := service.NewStreamBuilder() + require.NoError(t, strmBuilder.SetYAML(config)) + + strm, err := strmBuilder.Build() + require.NoError(t, err) + + require.NoError(t, strm.Run(context.Background())) +} + +func TestFlowIDConsistencyInTracedBuild(t *testing.T) { + config := ` +input: + generate: + count: 2 + interval: 1us + mapping: | + root.id = count("consistency_test") + meta flow_at_input = flow_id() + +pipeline: + threads: 1 + processors: + - bloblang: | + root.message = "step1" + root.original_id = this.id + meta flow_at_proc1 = flow_id() + - bloblang: | + root.message = "step2" + meta flow_at_proc2 = flow_id() + +output: + drop: {} + +logger: + level: OFF +` + + strmBuilder := service.NewStreamBuilder() + require.NoError(t, strmBuilder.SetYAML(config)) + + strm, trace, err := strmBuilder.BuildTraced() + require.NoError(t, err) + + require.NoError(t, strm.Run(context.Background())) + + assert.Equal(t, 2, int(trace.TotalInput())) + assert.Equal(t, 2, int(trace.TotalOutput())) + + inputEvents := trace.InputEvents(false) + require.Contains(t, inputEvents, "root.input") + + for _, event := range inputEvents["root.input"] { + assert.NotEmpty(t, event.FlowID, "Input event should have flow ID") + assert.False(t, event.Timestamp.IsZero(), "Input event should have timestamp") + } + + processorEvents := trace.ProcessorEvents(false) + for procName, events := range processorEvents { + for _, event := range events { + assert.NotEmpty(t, event.FlowID, "Processor %s event should have flow ID", procName) + assert.False(t, event.Timestamp.IsZero(), "Processor %s event should have timestamp", procName) + } + } + + outputEvents := trace.OutputEvents(false) + require.Contains(t, outputEvents, "root.output") + + for _, event := range outputEvents["root.output"] { + assert.NotEmpty(t, event.FlowID, "Output event should have flow ID") + assert.False(t, event.Timestamp.IsZero(), "Output event should have timestamp") + } +} + +func TestFlowIDsByFlowID(t *testing.T) { + config := ` +input: + generate: + count: 3 + interval: 1us + mapping: | + root.id = count("flow_grouping_test") + +pipeline: + threads: 1 + processors: + - bloblang: | + root.processed = true + +output: + drop: {} + +logger: + level: OFF +` + + strmBuilder := service.NewStreamBuilder() + require.NoError(t, strmBuilder.SetYAML(config)) + + strm, trace, err := strmBuilder.BuildTraced() + require.NoError(t, err) + + require.NoError(t, strm.Run(context.Background())) + + flowEvents := trace.EventsByFlowID(false) + + assert.NotEmpty(t, flowEvents, "Should have events grouped by flow ID") + + for flowID, events := range flowEvents { + assert.NotEmpty(t, flowID, "Flow ID should not be empty") + assert.NotEmpty(t, events, "Should have events for flow %s", flowID) + + var hasInput, hasOutput bool + for _, event := range events { + assert.Equal(t, flowID, event.FlowID, "All events should have same flow ID") + if event.Type == service.TracingEventProduce { + hasInput = true + } + if event.Type == service.TracingEventConsume { + hasOutput = true + } + } + + assert.True(t, hasInput, "Flow %s should have input events", flowID) + assert.True(t, hasOutput, "Flow %s should have output events", flowID) + + for i := 1; i < len(events); i++ { + assert.True(t, + events[i-1].Timestamp.Before(events[i].Timestamp) || + events[i-1].Timestamp.Equal(events[i].Timestamp), + "Events should be sorted by timestamp for flow %s", flowID) + } + } + + assert.Len(t, flowEvents, 3, "Should have 3 distinct flows for 3 messages") +} diff --git a/public/service/stream_builder.go b/public/service/stream_builder.go index 4c3863609..5ae751d0d 100644 --- a/public/service/stream_builder.go +++ b/public/service/stream_builder.go @@ -828,7 +828,8 @@ func (s *StreamBuilder) runConsumerFunc(mgr *manager.Type) error { // Build a Bento stream pipeline according to the components specified by this // stream builder. func (s *StreamBuilder) Build() (*Stream, error) { - return s.buildWithEnv(s.env.internal, false) + flowEnv := tracing.FlowIDBundle(s.env.internal) + return s.buildWithEnv(flowEnv, false) } // BuildTraced creates a Bento stream pipeline according to the components @@ -841,7 +842,8 @@ func (s *StreamBuilder) Build() (*Stream, error) { // Experimental: The behaviour of this method could change outside of major // version releases. func (s *StreamBuilder) BuildTraced() (*Stream, *TracingSummary, error) { - tenv, summary := tracing.TracedBundle(s.env.internal) + flowEnv := tracing.FlowIDBundle(s.env.internal) + tenv, summary := tracing.TracedBundle(flowEnv) strm, err := s.buildWithEnv(tenv, false) return strm, &TracingSummary{summary}, err } diff --git a/public/service/tracing.go b/public/service/tracing.go index 62a1ee3e5..9297bc046 100644 --- a/public/service/tracing.go +++ b/public/service/tracing.go @@ -1,7 +1,9 @@ package service import ( + "sort" "sync/atomic" + "time" "github.com/warpstreamlabs/bento/internal/bundle/tracing" ) @@ -42,9 +44,11 @@ func convertTracingEventType(t tracing.EventType) TracingEventType { // // Experimental: This type may change outside of major version releases. type TracingEvent struct { - Type TracingEventType - Content string - Meta map[string]any + Type TracingEventType + Content string + Meta map[string]any + FlowID string + Timestamp time.Time } // TracingSummary is a high level description of all traced events. When tracing @@ -86,9 +90,11 @@ func (s *TracingSummary) InputEvents(flush bool) map[string][]TracingEvent { events := make([]TracingEvent, len(v)) for i, e := range v { events[i] = TracingEvent{ - Type: convertTracingEventType(e.Type), - Content: e.Content, - Meta: e.Meta, + Type: convertTracingEventType(e.Type), + Content: e.Content, + Meta: e.Meta, + FlowID: e.FlowID, + Timestamp: e.Timestamp, } } m[k] = events @@ -106,9 +112,11 @@ func (s *TracingSummary) ProcessorEvents(flush bool) map[string][]TracingEvent { events := make([]TracingEvent, len(v)) for i, e := range v { events[i] = TracingEvent{ - Type: convertTracingEventType(e.Type), - Content: e.Content, - Meta: e.Meta, + Type: convertTracingEventType(e.Type), + Content: e.Content, + Meta: e.Meta, + FlowID: e.FlowID, + Timestamp: e.Timestamp, } } m[k] = events @@ -126,12 +134,56 @@ func (s *TracingSummary) OutputEvents(flush bool) map[string][]TracingEvent { events := make([]TracingEvent, len(v)) for i, e := range v { events[i] = TracingEvent{ - Type: convertTracingEventType(e.Type), - Content: e.Content, - Meta: e.Meta, + Type: convertTracingEventType(e.Type), + Content: e.Content, + Meta: e.Meta, + FlowID: e.FlowID, + Timestamp: e.Timestamp, } } m[k] = events } return m } + +// EventsByFlowID returns all events grouped by flow ID, allowing you to trace +// the complete journey of each message through the pipeline. Events are sorted +// by timestamp within each flow. +// +// Experimental: This method may change outside of major version releases. +func (s *TracingSummary) EventsByFlowID(flush bool) map[string][]TracingEvent { + flowEvents := map[string][]TracingEvent{} + + for _, events := range s.InputEvents(flush) { + for _, event := range events { + if event.FlowID != "" { + flowEvents[event.FlowID] = append(flowEvents[event.FlowID], event) + } + } + } + + for _, events := range s.ProcessorEvents(flush) { + for _, event := range events { + if event.FlowID != "" { + flowEvents[event.FlowID] = append(flowEvents[event.FlowID], event) + } + } + } + + for _, events := range s.OutputEvents(flush) { + for _, event := range events { + if event.FlowID != "" { + flowEvents[event.FlowID] = append(flowEvents[event.FlowID], event) + } + } + } + + for flowID, events := range flowEvents { + sort.Slice(events, func(i, j int) bool { + return events[i].Timestamp.Before(events[j].Timestamp) + }) + flowEvents[flowID] = events + } + + return flowEvents +} diff --git a/public/service/tracing_test.go b/public/service/tracing_test.go index 8abcca88d..0fa7888f8 100644 --- a/public/service/tracing_test.go +++ b/public/service/tracing_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "maps" "testing" "time" @@ -104,6 +105,33 @@ logger: type tMap = map[string]any + // Helper function to normalize events for testing + normalizeEvents := func(events map[string][]service.TracingEvent) map[string][]service.TracingEvent { + normalized := make(map[string][]service.TracingEvent) + for k, evs := range events { + normalizedEvs := make([]service.TracingEvent, len(evs)) + for i, ev := range evs { + normalizedEvs[i] = service.TracingEvent{ + Type: ev.Type, + Content: ev.Content, + Meta: func() map[string]any { + if ev.Meta == nil { + return nil + } + meta := make(map[string]any) + maps.Copy(meta, ev.Meta) + if len(meta) == 0 { + return tMap{} + } + return meta + }(), + } + } + normalized[k] = normalizedEvs + } + return normalized + } + assert.Equal(t, map[string][]service.TracingEvent{ "root.input": { {Type: service.TracingEventProduce, Content: `{"id":1}`, Meta: tMap{}}, @@ -112,7 +140,7 @@ logger: {Type: service.TracingEventProduce, Content: `{"id":4}`, Meta: tMap{}}, {Type: service.TracingEventProduce, Content: `{"id":5}`, Meta: tMap{}}, }, - }, trace.InputEvents(false)) + }, normalizeEvents(trace.InputEvents(false))) assert.Equal(t, map[string][]service.TracingEvent{ "root.pipeline.processors.0": { @@ -129,7 +157,7 @@ logger: {Type: service.TracingEventConsume, Content: `{"id":5}`, Meta: tMap{}}, {Type: service.TracingEventProduce, Content: `{"count":5}`, Meta: tMap{"foo": int64(5)}}, }, - }, trace.ProcessorEvents(false)) + }, normalizeEvents(trace.ProcessorEvents(false))) assert.Equal(t, map[string][]service.TracingEvent{ "root.output": { @@ -139,7 +167,7 @@ logger: {Type: service.TracingEventConsume, Content: `{"id":4}`, Meta: tMap{}}, {Type: service.TracingEventConsume, Content: `{"count":5}`, Meta: tMap{"foo": int64(5)}}, }, - }, trace.OutputEvents(false)) + }, normalizeEvents(trace.OutputEvents(false))) } func BenchmarkStreamTracing(b *testing.B) { diff --git a/website/docs/guides/bloblang/functions.md b/website/docs/guides/bloblang/functions.md index e56f348ff..ff6f314d6 100644 --- a/website/docs/guides/bloblang/functions.md +++ b/website/docs/guides/bloblang/functions.md @@ -402,6 +402,20 @@ Returns a boolean value indicating whether an error has occurred during the proc root.doc.status = if errored() { 400 } else { 200 } ``` +### `flow_id` + +:::caution EXPERIMENTAL +This function is experimental and therefore breaking changes could be made to it outside of major version releases. +::: +Returns the message flow ID used for tracing the journey of a message through the pipeline. Flow IDs are automatically assigned at the input layer. + +#### Examples + + +```coffee +meta flow_id = flow_id() +``` + ### `json` Returns the value of a field within a JSON message located by a [dot path][field_paths] argument. This function always targets the entire source JSON document regardless of the mapping context.