Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dependency processor using Apache Beam #6560

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/jaeger/internal/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (b builders) build() (otelcol.Factories, error) {
attributesprocessor.NewFactory(),
// add-ons
adaptivesampling.NewFactory(),
dependencyprocessor.NewFactory(),
)
if err != nil {
return otelcol.Factories{}, err
Expand Down
228 changes: 228 additions & 0 deletions cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package dependencyprocessor
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"sync"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

var beamInitOnce sync.Once

type dependencyAggregator struct {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
config *Config
telset component.TelemetrySettings
dependencyWriter spanstore.Writer
traces map[pcommon.TraceID]*TraceState
tracesLock sync.RWMutex
closeChan chan struct{}
beamPipeline *beam.Pipeline
beamScope beam.Scope
}

type TraceState struct {
spans []*ptrace.Span
spanMap map[pcommon.SpanID]*ptrace.Span
lastUpdateTime time.Time
timer *time.Timer
serviceName string
}

func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter spanstore.Writer) *dependencyAggregator {
beamInitOnce.Do(func() {
beam.Init()
})
p, s := beam.NewPipelineWithRoot()
return &dependencyAggregator{
config: &cfg,
telset: telset,
dependencyWriter: dependencyWriter,
traces: make(map[pcommon.TraceID]*TraceState),
beamPipeline: p,
beamScope: s,
}
}

func (agg *dependencyAggregator) Start(closeChan chan struct{}) {
agg.closeChan = closeChan
Comment on lines +57 to +58
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (agg *dependencyAggregator) Start(closeChan chan struct{}) {
agg.closeChan = closeChan
func (agg *dependencyAggregator) Start() {

go func() {
ticker := time.NewTicker(agg.config.AggregationInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
agg.processTraces(context.Background())
case <-agg.closeChan:
agg.processTraces(context.Background())
return
}
}
}()
}

func (agg *dependencyAggregator) Close() error {
agg.tracesLock.Lock()
defer agg.tracesLock.Unlock()
for _, traceState := range agg.traces {
if traceState.timer != nil {
traceState.timer.Stop()
}
}
return nil
}

func (agg *dependencyAggregator) HandleSpan(ctx context.Context, span ptrace.Span, serviceName string) {
agg.tracesLock.Lock()
defer agg.tracesLock.Unlock()

traceID := span.TraceID()
spanID := span.SpanID()

traceState, ok := agg.traces[traceID]
if !ok {
traceState = &TraceState{
spans: []*ptrace.Span{},
spanMap: make(map[pcommon.SpanID]*ptrace.Span),
lastUpdateTime: time.Now(),
serviceName: serviceName,
}
agg.traces[traceID] = traceState
}

spanCopy := span
traceState.spans = append(traceState.spans, &spanCopy)
traceState.spanMap[spanID] = &spanCopy
traceState.lastUpdateTime = time.Now()

if traceState.timer != nil {
traceState.timer.Stop()
}
traceState.timer = time.AfterFunc(agg.config.InactivityTimeout, func() {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
agg.processTraces(ctx)
})
}

func (agg *dependencyAggregator) processTraces(ctx context.Context) {
agg.tracesLock.Lock()
if len(agg.traces) == 0 {
agg.tracesLock.Unlock()
return
}
traces := agg.traces
agg.traces = make(map[pcommon.TraceID]*TraceState)
agg.tracesLock.Unlock()

for _, traceState := range traces {
if traceState.timer != nil {
traceState.timer.Stop()
}
}

beamInput := agg.createBeamInput(traces)
if beamInput.IsValid() {
agg.calculateDependencies(ctx, beamInput)
}
}

func (agg *dependencyAggregator) createBeamInput(traces map[pcommon.TraceID]*TraceState) beam.PCollection {
var allSpans []*ptrace.Span
for _, traceState := range traces {
allSpans = append(allSpans, traceState.spans...)
}
if len(allSpans) == 0 {
return beam.PCollection{}
}
return beam.CreateList(agg.beamScope, allSpans)
}

func (agg *dependencyAggregator) calculateDependencies(ctx context.Context, input beam.PCollection) {
keyedSpans := beam.ParDo(agg.beamScope, func(s *ptrace.Span) (pcommon.TraceID, *ptrace.Span) {
return s.TraceID(), s
}, input)

groupedSpans := beam.GroupByKey(agg.beamScope, keyedSpans)
depLinks := beam.ParDo(agg.beamScope, func(_ pcommon.TraceID, spansIter func(*ptrace.Span) bool) []*model.DependencyLink {
deps := map[string]*model.DependencyLink{}
var span *ptrace.Span
for spansIter(span) {
processSpanOtel(deps, span, agg.traces)
}
return depMapToSlice(deps)
}, groupedSpans)
flattened := beam.Flatten(agg.beamScope, depLinks)

beam.ParDo0(agg.beamScope, func(deps []*model.DependencyLink) {
err := agg.dependencyWriter.WriteDependencies(ctx, time.Now(), deps)
if err != nil {
agg.telset.Logger.Error("Error writing dependencies", zap.Error(err))
}
}, flattened)

go func() {
err := beamx.Run(ctx, agg.beamPipeline)
if err != nil {
agg.telset.Logger.Error("Error running beam pipeline", zap.Error(err))
}
agg.beamPipeline = beam.NewPipeline()
agg.beamScope = beam.Scope{Parent: beam.PipelineScope{ID: "dependency"}, Name: "dependency"}
}()
}

func processSpanOtel(deps map[string]*model.DependencyLink, span *ptrace.Span, traces map[pcommon.TraceID]*TraceState) {
traceID := span.TraceID()
parentSpanID := span.ParentSpanID()

traceState, ok := traces[traceID]
if !ok {
return
}

parentSpan := traceState.spanMap[parentSpanID]
if parentSpan == nil {
return
}

parentTraceState := traces[traceID]
if parentTraceState == nil {
return
}

parentService := parentTraceState.serviceName
currentService := traceState.serviceName

if parentService == currentService || parentService == "" || currentService == "" {
return
}

depKey := parentService + "&&&" + currentService
if _, ok := deps[depKey]; !ok {
deps[depKey] = &model.DependencyLink{
Parent: parentService,
Child: currentService,
CallCount: 1,
}
} else {
deps[depKey].CallCount++
}
}

// depMapToSlice converts dependency map to slice
func depMapToSlice(deps map[string]*model.DependencyLink) []*model.DependencyLink {
retMe := make([]*model.DependencyLink, 0, len(deps))
for _, dep := range deps {
retMe = append(retMe, dep)
}
return retMe
}
Loading