Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2][query] Add interface for adjuster to operate on OTLP data format #6346

Merged
merged 5 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions cmd/query/app/querysvc/adjuster/adjuster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"errors"

"go.opentelemetry.io/collector/pdata/ptrace"
)

// Adjuster defines an interface for modifying a trace object.
// It returns the adjusted trace object, which is also updated in place.
// If the adjuster encounters an issue that prevents it from applying
// modifications, it should return the original trace object along with an error.
type Adjuster interface {
Adjust(ptrace.Traces) (ptrace.Traces, error)
}

// Func is a type alias that wraps a function and makes an Adjuster from it.
type Func func(trace ptrace.Traces) (ptrace.Traces, error)

// Adjust implements Adjuster interface for the Func alias.
func (f Func) Adjust(trace ptrace.Traces) (ptrace.Traces, error) {
return f(trace)
}

// Sequence creates an adjuster that combines a series of adjusters
// applied in order. Errors from each step are accumulated and returned
// in the end as a single wrapper error. Errors do not interrupt the
// sequence of adapters.
func Sequence(adjusters ...Adjuster) Adjuster {
return sequence{adjusters: adjusters}
}

// FailFastSequence is similar to Sequence() but returns immediately
// if any adjuster returns an error.
func FailFastSequence(adjusters ...Adjuster) Adjuster {
return sequence{adjusters: adjusters, failFast: true}
}

type sequence struct {
adjusters []Adjuster
failFast bool
}

func (c sequence) Adjust(trace ptrace.Traces) (ptrace.Traces, error) {
var errs []error
for _, adjuster := range c.adjusters {
var err error
trace, err = adjuster.Adjust(trace)
if err != nil {
if c.failFast {
return trace, err
}
errs = append(errs, err)
}
}
return trace, errors.Join(errs...)
}
68 changes: 68 additions & 0 deletions cmd/query/app/querysvc/adjuster/adjuster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package adjuster_test

import (
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/adjuster"
)

func TestSequences(t *testing.T) {
// mock adjuster that increments last byte of span ID
adj := adjuster.Func(func(trace ptrace.Traces) (ptrace.Traces, error) {
span := trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
spanId := span.SpanID()
spanId[7]++
span.SetSpanID(spanId)
return trace, nil
})

adjErr := errors.New("mock adjuster error")
failingAdj := adjuster.Func(func(trace ptrace.Traces) (ptrace.Traces, error) {
return trace, adjErr
})

tests := []struct {
name string
adjuster adjuster.Adjuster
err string
lastSpanID pcommon.SpanID
}{
{
name: "normal sequence",
adjuster: adjuster.Sequence(adj, failingAdj, adj, failingAdj),
err: fmt.Sprintf("%s\n%s", adjErr, adjErr),
lastSpanID: [8]byte{0, 0, 0, 0, 0, 0, 0, 2},
},
{
name: "fail fast sequence",
adjuster: adjuster.FailFastSequence(adj, failingAdj, adj, failingAdj),
err: adjErr.Error(),
lastSpanID: [8]byte{0, 0, 0, 0, 0, 0, 0, 1},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
trace := ptrace.NewTraces()
span := trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.SetSpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 0})

adjTrace, err := test.adjuster.Adjust(trace)
adjTraceSpan := adjTrace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)

assert.Equal(t, span, adjTraceSpan)
assert.EqualValues(t, test.lastSpanID, span.SpanID())
require.EqualError(t, err, test.err)
})
}
}
14 changes: 14 additions & 0 deletions cmd/query/app/querysvc/adjuster/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"testing"

"github.com/jaegertracing/jaeger/pkg/testutils"
)

func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
}
Loading