Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ PATHINSTSERVERLESS = $(DEST_DIR)/serverless
PATHINSTDOCKER = $(DEST_DIR)/docker
DOCKER_IMAGE ?= ghcr.io/warpstreamlabs/bento

VERSION := $(shell git describe --tags || echo "v0.0.0")
VERSION := $(shell git describe --tags 2>/dev/null || echo "v0.0.0")
VER_CUT := $(shell echo $(VERSION) | cut -c2-)
VER_MAJOR := $(shell echo $(VER_CUT) | cut -f1 -d.)
VER_MINOR := $(shell echo $(VER_CUT) | cut -f2 -d.)
Expand Down
15 changes: 15 additions & 0 deletions internal/bloblang/query/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,21 @@ var _ = registerSimpleFunction(
},
)

var _ = registerSimpleFunction(
NewFunctionSpec(
FunctionCategoryMessage, "flow_id",
"Returns the message flow ID used for tracing the journey of a message through the pipeline. Flow IDs are automatically assigned at the input layer.",
NewExampleSpec("",
`meta flow_id = flow_id()`,
),
).Experimental(),
func(fCtx FunctionContext) (any, error) {
part := fCtx.MsgBatch.Get(fCtx.Index)
ctx := part.GetContext()
return tracing.GetFlowID(ctx), nil
},
)

//------------------------------------------------------------------------------

var _ = registerFunction(
Expand Down
20 changes: 20 additions & 0 deletions internal/bundle/tracing/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,23 @@ func TracedBundle(b *bundle.Environment) (*bundle.Environment, *Summary) {

return tracedEnv, summary
}

// FlowIDBundle modifies a provided bundle environment so that all inputs
// are wrapped to ensure flow IDs are initialized on all messages.
// This is independent of tracing and should be applied to all builds.
func FlowIDBundle(b *bundle.Environment) *bundle.Environment {
flowEnv := b.Clone()

for _, spec := range b.InputDocs() {
_ = flowEnv.InputAdd(func(conf input.Config, nm bundle.NewManagement) (input.Streamed, error) {
i, err := b.InputInit(conf, nm)
if err != nil {
return nil, err
}
i = wrapWithFlowID(i)
return i, err
}, spec)
}

return flowEnv
}
27 changes: 25 additions & 2 deletions internal/bundle/tracing/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/warpstreamlabs/bento/internal/component/testutil"
"github.com/warpstreamlabs/bento/internal/manager"
"github.com/warpstreamlabs/bento/internal/message"
internaltracing "github.com/warpstreamlabs/bento/internal/tracing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -520,6 +521,7 @@ meta bar = "new bar value"
for i := 0; i < 10; i++ {
part := message.NewPart([]byte(strconv.Itoa(i)))
part.MetaSetMut("foo", fmt.Sprintf("foo value %v", i))
part = internaltracing.EnsureFlowID(part)
batch, res := proc.ProcessBatch(tCtx, message.Batch{part})
require.NoError(t, res)
require.Len(t, batch, 1)
Expand All @@ -540,7 +542,14 @@ meta bar = "new bar value"

type tMap = map[string]any

assert.Equal(t, []tracing.NodeEvent{
// Check that all events have flow IDs and timestamps
for i, event := range events {
assert.NotEmpty(t, event.FlowID, "Event %d should have a flow ID", i)
assert.False(t, event.Timestamp.IsZero(), "Event %d should have a timestamp", i)
}

// Check the event types and content (ignoring FlowID and Timestamp for exact comparison)
expectedEvents := []tracing.NodeEvent{
{Type: "CONSUME", Content: "0", Meta: tMap{"foo": "foo value 0"}},
{Type: "PRODUCE", Content: "0", Meta: tMap{"foo": "foo value 0"}},
{Type: "ERROR", Content: "failed assignment (line 3): nah 0"},
Expand All @@ -566,7 +575,21 @@ meta bar = "new bar value"
{Type: "ERROR", Content: "failed assignment (line 3): nah 8"},
{Type: "CONSUME", Content: "9", Meta: tMap{"foo": "foo value 9"}},
{Type: "PRODUCE", Content: "{\"count\":9}", Meta: tMap{"bar": "new bar value", "foo": "foo value 9"}},
}, events)
}

require.Len(t, expectedEvents, len(events))
for i, expected := range expectedEvents {
actual := events[i]
assert.Equal(t, expected.Type, actual.Type, "Event %d type mismatch", i)
assert.Equal(t, expected.Content, actual.Content, "Event %d content mismatch", i)

// For metadata comparison, we need to account for the flow ID that gets added
if expected.Meta != nil {
for key, value := range expected.Meta {
assert.Equal(t, value, actual.Meta[key], "Event %d meta[%s] mismatch", i, key)
}
}
}
}

func TestBundleProcessorTracingError(t *testing.T) {
Expand Down
65 changes: 53 additions & 12 deletions internal/bundle/tracing/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package tracing
import (
"sync"
"sync/atomic"
"time"

"github.com/warpstreamlabs/bento/internal/message"
"github.com/warpstreamlabs/bento/internal/tracing"
)

// EventType describes the type of event a component might experience during
Expand All @@ -21,9 +23,11 @@ var (

// NodeEvent represents a single event that occurred within the stream.
type NodeEvent struct {
Type EventType
Content string
Meta map[string]any
Type EventType
Content string
Meta map[string]any
FlowID string
Timestamp time.Time
}

// EventProduceOf creates a produce event from a message part.
Expand All @@ -35,9 +39,11 @@ func EventProduceOf(part *message.Part) NodeEvent {
})

return NodeEvent{
Type: EventProduce,
Content: string(part.AsBytes()),
Meta: meta,
Type: EventProduce,
Content: string(part.AsBytes()),
Meta: meta,
FlowID: getFlowID(part),
Timestamp: time.Now(),
}
}

Expand All @@ -50,27 +56,62 @@ func EventConsumeOf(part *message.Part) NodeEvent {
})

return NodeEvent{
Type: EventConsume,
Content: string(part.AsBytes()),
Meta: meta,
Type: EventConsume,
Content: string(part.AsBytes()),
Meta: meta,
FlowID: getFlowID(part),
Timestamp: time.Now(),
}
}

// EventDeleteOf creates a deleted event from a message part.
//
// Deprecated: Use EventDeleteOfPart instead. This function cannot track flow IDs.
func EventDeleteOf() NodeEvent {
return NodeEvent{
Type: EventDelete,
Type: EventDelete,
FlowID: "",
Timestamp: time.Now(),
}
}

// EventDeleteOfPart creates a deleted event with flow ID from a message part.
func EventDeleteOfPart(part *message.Part) NodeEvent {
return NodeEvent{
Type: EventDelete,
FlowID: getFlowID(part),
Timestamp: time.Now(),
}
}

// EventErrorOf creates an error event from a message part.
//
// Deprecated: Use EventErrorOfPart instead. This function cannot track flow IDs.
func EventErrorOf(err error) NodeEvent {
return NodeEvent{
Type: EventError,
Content: err.Error(),
Type: EventError,
Content: err.Error(),
FlowID: "",
Timestamp: time.Now(),
}
}

// EventErrorOfPart creates an error event with flow ID from a message part.
func EventErrorOfPart(part *message.Part, err error) NodeEvent {
return NodeEvent{
Type: EventError,
Content: err.Error(),
FlowID: getFlowID(part),
Timestamp: time.Now(),
}
}

// getFlowID retrieves the flow ID from a message part.
// Flow IDs are initialized at the input layer, so this should always return a value.
func getFlowID(part *message.Part) string {
return tracing.GetFlowID(message.GetContext(part))
}

type control struct {
isEnabled int32
eventLimit int64
Expand Down
112 changes: 112 additions & 0 deletions internal/bundle/tracing/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package tracing

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/warpstreamlabs/bento/internal/message"
"github.com/warpstreamlabs/bento/internal/tracing"
)

func TestFlowIDGeneration(t *testing.T) {
part := message.NewPart([]byte("test message"))

part = tracing.EnsureFlowID(part)
flowID1 := getFlowID(part)
assert.NotEmpty(t, flowID1)
// UUID V7 format: xxxxxxxx-xxxx-7xxx-xxxx-xxxxxxxxxxxx
assert.Regexp(t, `^[0-9a-f]{8}-[0-9a-f]{4}-7[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$`, flowID1)

flowID2 := getFlowID(part)
assert.Equal(t, flowID1, flowID2)

ctx := message.GetContext(part)
storedFlowID := tracing.GetFlowID(ctx)
assert.NotEmpty(t, storedFlowID)
assert.Equal(t, flowID1, storedFlowID)
}

func TestFlowIDFromExistingContext(t *testing.T) {
part := message.NewPart([]byte("test message"))
expectedFlowID := "existing_flow_123"
ctx := tracing.WithFlowID(message.GetContext(part), expectedFlowID)
part = part.WithContext(ctx)

flowID := getFlowID(part)
assert.Equal(t, expectedFlowID, flowID)
}

func TestEnsureFlowIDWithExisting(t *testing.T) {
part := message.NewPart([]byte("test message"))
expectedFlowID := "existing_flow_123"
ctx := tracing.WithFlowID(message.GetContext(part), expectedFlowID)
part = part.WithContext(ctx)

part = tracing.EnsureFlowID(part)
flowID := getFlowID(part)
assert.Equal(t, expectedFlowID, flowID)
}

func TestEventCreationWithFlowID(t *testing.T) {
part := message.NewPart([]byte("test content"))
part.MetaSetMut("test_meta", "test_value")
part = tracing.EnsureFlowID(part)

before := time.Now()
produceEvent := EventProduceOf(part)
after := time.Now()

assert.Equal(t, EventProduce, produceEvent.Type)
assert.Equal(t, "test content", produceEvent.Content)
assert.Equal(t, "test_value", produceEvent.Meta["test_meta"])
assert.NotEmpty(t, produceEvent.FlowID)
assert.True(t, produceEvent.Timestamp.After(before) || produceEvent.Timestamp.Equal(before))
assert.True(t, produceEvent.Timestamp.Before(after) || produceEvent.Timestamp.Equal(after))

consumeEvent := EventConsumeOf(part)
assert.Equal(t, EventConsume, consumeEvent.Type)
assert.Equal(t, produceEvent.FlowID, consumeEvent.FlowID)

deleteEvent := EventDeleteOfPart(part)
assert.Equal(t, EventDelete, deleteEvent.Type)
assert.Equal(t, produceEvent.FlowID, deleteEvent.FlowID)
assert.Empty(t, deleteEvent.Content)

testErr := assert.AnError
errorEvent := EventErrorOfPart(part, testErr)
assert.Equal(t, EventError, errorEvent.Type)
assert.Equal(t, testErr.Error(), errorEvent.Content)
assert.Equal(t, produceEvent.FlowID, errorEvent.FlowID)
}

// TestEventCreationWithoutPart tests deprecated functions that create events without flow IDs.
// These functions are kept for backward compatibility.
func TestEventCreationWithoutPart(t *testing.T) {
deleteEvent := EventDeleteOf() // Deprecated: use EventDeleteOfPart
assert.Equal(t, EventDelete, deleteEvent.Type)
assert.Empty(t, deleteEvent.FlowID)
assert.Empty(t, deleteEvent.Content)

testErr := assert.AnError
errorEvent := EventErrorOf(testErr) // Deprecated: use EventErrorOfPart
assert.Equal(t, EventError, errorEvent.Type)
assert.Equal(t, testErr.Error(), errorEvent.Content)
assert.Empty(t, errorEvent.FlowID)
}

func TestUniqueFlowIDs(t *testing.T) {
part1 := message.NewPart([]byte("message 1"))
part2 := message.NewPart([]byte("message 2"))

part1 = tracing.EnsureFlowID(part1)
part2 = tracing.EnsureFlowID(part2)

flowID1 := getFlowID(part1)
flowID2 := getFlowID(part2)

assert.NotEqual(t, flowID1, flowID2)
assert.NotEmpty(t, flowID1)
assert.NotEmpty(t, flowID2)
}
Loading