Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dependency processor using Apache Beam #6560

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/jaeger/internal/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (b builders) build() (otelcol.Factories, error) {
attributesprocessor.NewFactory(),
// add-ons
adaptivesampling.NewFactory(),
dependencyprocessor.NewFactory(),
)
if err != nil {
return otelcol.Factories{}, err
Expand Down
205 changes: 205 additions & 0 deletions cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package dependencyprocessor
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"sync"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
)

type dependencyAggregator struct {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
config *Config
telset component.TelemetrySettings
dependencyWriter *memory.Store
traces map[model.TraceID]*TraceState
tracesLock sync.RWMutex
closeChan chan struct{}
beamPipeline *beam.Pipeline
beamScope beam.Scope
}

type TraceState struct {
spans []*model.Span
spanMap map[model.SpanID]*model.Span
lastUpdateTime time.Time
timer *time.Timer
}

func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyAggregator {
beam.Init()
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
p, s := beam.NewPipelineWithRoot()
return &dependencyAggregator{
config: &cfg,
telset: telset,
dependencyWriter: dependencyWriter,
traces: make(map[model.TraceID]*TraceState),
beamPipeline: p,
beamScope: s,
}
}

func (agg *dependencyAggregator) Start(closeChan chan struct{}) {
agg.closeChan = closeChan
Comment on lines +57 to +58
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (agg *dependencyAggregator) Start(closeChan chan struct{}) {
agg.closeChan = closeChan
func (agg *dependencyAggregator) Start() {

go func() {
ticker := time.NewTicker(agg.config.AggregationInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
agg.processTraces(context.Background()) // Pass context
case <-agg.closeChan:
agg.processTraces(context.Background()) // Pass context
return
}
}
}()
}

func (agg *dependencyAggregator) Close() error {
agg.tracesLock.Lock()
defer agg.tracesLock.Unlock()
for _, traceState := range agg.traces {
if traceState.timer != nil {
traceState.timer.Stop()
}
}
return nil
}

func (agg *dependencyAggregator) HandleSpan(span *model.Span) {
agg.tracesLock.Lock()
defer agg.tracesLock.Unlock()

traceState, ok := agg.traces[span.TraceID]
if !ok {
traceState = &TraceState{
spans: []*model.Span{},
spanMap: make(map[model.SpanID]*model.Span),
lastUpdateTime: time.Now(),
}
agg.traces[span.TraceID] = traceState
}

traceState.spans = append(traceState.spans, span)
traceState.spanMap[span.SpanID] = span
traceState.lastUpdateTime = time.Now()

if traceState.timer != nil {
traceState.timer.Stop()
}
traceState.timer = time.AfterFunc(agg.config.InactivityTimeout, func() {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
agg.processTraces(context.Background()) // Pass context
})
}

func (agg *dependencyAggregator) processTraces(ctx context.Context) {
agg.tracesLock.Lock()
if len(agg.traces) == 0 {
agg.tracesLock.Unlock()
return
}
traces := agg.traces
agg.traces = make(map[model.TraceID]*TraceState)
agg.tracesLock.Unlock()
for _, traceState := range traces {
if traceState.timer != nil {
traceState.timer.Stop()
}
}

beamInput := agg.createBeamInput(traces)
if beamInput.IsValid() {
agg.calculateDependencies(ctx, beamInput)
}
}

func (agg *dependencyAggregator) createBeamInput(traces map[model.TraceID]*TraceState) beam.PCollection {
var allSpans []*model.Span
for _, traceState := range traces {
allSpans = append(allSpans, traceState.spans...)
}
if len(allSpans) == 0 {
return beam.PCollection{}
}
return beam.CreateList(agg.beamScope, allSpans)
}

func (agg *dependencyAggregator) calculateDependencies(ctx context.Context, input beam.PCollection) {
keyedSpans := beam.ParDo(agg.beamScope, func(s *model.Span) (model.TraceID, *model.Span) {
return s.TraceID, s
}, input)

groupedSpans := beam.GroupByKey(agg.beamScope, keyedSpans)
depLinks := beam.ParDo(agg.beamScope, func(_ model.TraceID, spansIter func(*model.Span) bool) []*model.DependencyLink {
deps := map[string]*model.DependencyLink{}
var span *model.Span
for spansIter(span) {
processSpan(deps, span, agg.traces)
}
return depMapToSlice(deps)
}, groupedSpans)
flattened := beam.Flatten(agg.beamScope, depLinks)

beam.ParDo0(agg.beamScope, func(deps []*model.DependencyLink) {
err := agg.dependencyWriter.WriteDependencies(ctx, time.Now(), deps)
if err != nil {
agg.telset.Logger.Error("Error writing dependencies", zap.Error(err))
}
}, flattened)

go func() {
err := beamx.Run(ctx, agg.beamPipeline)
if err != nil {
agg.telset.Logger.Error("Error running beam pipeline", zap.Error(err))
}
agg.beamPipeline = beam.NewPipeline()
agg.beamScope = beam.Scope{Parent: beam.PipelineScope{ID: "dependency"}, Name: "dependency"}
}()
}

// processSpan is a copy from the memory storage plugin
func processSpan(deps map[string]*model.DependencyLink, s *model.Span, traces map[model.TraceID]*TraceState) {
parentSpan := seekToSpan(s, traces)
if parentSpan != nil {
if parentSpan.Process.ServiceName == s.Process.ServiceName {
return
}
depKey := parentSpan.Process.ServiceName + "&&&" + s.Process.ServiceName
if _, ok := deps[depKey]; !ok {
deps[depKey] = &model.DependencyLink{
Parent: parentSpan.Process.ServiceName,
Child: s.Process.ServiceName,
CallCount: 1,
}
} else {
deps[depKey].CallCount++
}
}
}

func seekToSpan(span *model.Span, traces map[model.TraceID]*TraceState) *model.Span {
traceState, ok := traces[span.TraceID]
if !ok {
return nil
}
return traceState.spanMap[span.ParentSpanID()]
}

// depMapToSlice modifies the spans to DependencyLink in the same way as the memory storage plugin
func depMapToSlice(deps map[string]*model.DependencyLink) []*model.DependencyLink {
retMe := make([]*model.DependencyLink, 0, len(deps))
for _, dep := range deps {
retMe = append(retMe, dep)
}
return retMe
}
24 changes: 24 additions & 0 deletions cmd/jaeger/internal/processors/dependencyprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package dependencyprocessor

import (
"time"

"github.com/jaegertracing/jaeger/plugin/storage/memory"
)

type Config struct {
AggregationInterval time.Duration `yaml:"aggregation_interval"`
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
InactivityTimeout time.Duration `yaml:"inactivity_timeout"`
Store *memory.Store `yaml:"-"`
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}

func DefaultConfig() Config {
return Config{
AggregationInterval: 5 * time.Second, // Default dependency aggregation interval: 5 seconds
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
InactivityTimeout: 2 * time.Second, // Default trace completion timeout: 2 seconds of inactivity
Store: memory.NewStore(),
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add Validate method and use valid: notations in the field tags.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

103 changes: 103 additions & 0 deletions cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package dependencyprocessor

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processortest"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
)

func TestDependencyProcessorEndToEnd(t *testing.T) {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
// Create a mock receiver, processor, and exporter
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

// Create a mock next consumer (exporter)
sink := new(consumertest.TracesSink)

// Create a memory store to store dependency links
store := memory.NewStore()

// Create the processor
processor, err := factory.CreateTraces(
context.Background(),
processortest.NewNopSettings(),
cfg,
sink,
)
require.NoError(t, err)

// Start the processor
err = processor.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
assert.NoError(t, processor.Shutdown(context.Background()))
}()

// Create a test trace
trace := createTestTrace()

// Send the trace to the processor
err = processor.ConsumeTraces(context.Background(), trace)
require.NoError(t, err)

// Wait for the processor to process the trace
time.Sleep(cfg.AggregationInterval + 100*time.Millisecond)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

// Verify dependency links
deps, err := store.GetDependencies(context.Background(), time.Now(), cfg.AggregationInterval)
require.NoError(t, err)

// Expected dependency links
expectedDeps := []model.DependencyLink{
{
Parent: "service1",
Child: "service2",
CallCount: 1,
},
}
assert.Equal(t, expectedDeps, deps, "dependency links do not match expected output")
}

// createTestTrace creates a test trace with two spans from different services.
func createTestTrace() ptrace.Traces {
traces := ptrace.NewTraces()

// Create a resource span for the parent span (service1)
rs1 := traces.ResourceSpans().AppendEmpty()
rs1.Resource().Attributes().PutStr("service.name", "service1")
ils1 := rs1.ScopeSpans().AppendEmpty()
parentSpan := ils1.Spans().AppendEmpty()
parentSpan.SetTraceID([16]byte{1, 2, 3, 4})
parentSpan.SetSpanID([8]byte{5, 6, 7, 8})
parentSpan.SetName("span2")
parentSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now()))
parentSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(time.Second)))

// Create a resource span for the child span (service2)
rs2 := traces.ResourceSpans().AppendEmpty()
rs2.Resource().Attributes().PutStr("service.name", "service2")
ils2 := rs2.ScopeSpans().AppendEmpty()
span := ils2.Spans().AppendEmpty()
span.SetTraceID([16]byte{1, 2, 3, 4})
span.SetSpanID([8]byte{1, 2, 3, 4})
span.SetName("span1")
span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now()))
span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(time.Second)))
span.SetParentSpanID(parentSpan.SpanID()) // Set parent-child relationship

return traces
}
50 changes: 50 additions & 0 deletions cmd/jaeger/internal/processors/dependencyprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package dependencyprocessor

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"

"github.com/jaegertracing/jaeger/plugin/storage/memory"
)

// componentType is the name of this processor in configuration.
var componentType = component.MustNewType("dependencyprocessor")

// NewFactory creates a factory for the dependency processor.
func NewFactory() processor.Factory {
return processor.NewFactory(
componentType,
createDefaultConfig,
processor.WithTraces(createTracesProcessor, component.StabilityLevelAlpha),
)
}

// createDefaultConfig returns the default configuration for the dependency processor.
func createDefaultConfig() component.Config {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
return &Config{
AggregationInterval: 5 * time.Second,
InactivityTimeout: 2 * time.Second,
Store: memory.NewStore(),
}
}

// createTracesProcessor creates a new instance of the dependency processor.
func createTracesProcessor(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Traces,
) (processor.Traces, error) {
oCfg := cfg.(*Config)

dp := newDependencyProcessor(*oCfg, set.TelemetrySettings, oCfg.Store, nextConsumer)

return dp, nil
}
Loading