Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2][storage] Create v2 query service to operate on otlp data model #6343

Merged
merged 42 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
ad0de07
Create Query Service V2 To Operate on OTEL Data Model
mahadzaryab1 Dec 11, 2024
24278bb
Merge branch 'main' into query-service-v2
mahadzaryab1 Dec 24, 2024
41f1d51
Merge branch 'main' into query-service-v2
mahadzaryab1 Dec 24, 2024
a33ab6c
Address Build Failures And Feedback
mahadzaryab1 Dec 24, 2024
ffad629
Fix ArchiveTrace And GetTraces To Operate On Iterators
mahadzaryab1 Dec 24, 2024
1d721ac
Change Adjuster To Work On Seq
mahadzaryab1 Dec 24, 2024
a6df503
Fix Error Capture
mahadzaryab1 Dec 24, 2024
60a875a
Address Feedback From PR Review
mahadzaryab1 Dec 25, 2024
d31a5e5
Merge branch 'main' into query-service-v2
mahadzaryab1 Dec 25, 2024
2c7a848
Merge branch 'main' into query-service-v2
mahadzaryab1 Dec 29, 2024
d32ba48
Run Linter
mahadzaryab1 Dec 29, 2024
ecab080
Update Query Service To Perform Adjustments
mahadzaryab1 Dec 30, 2024
9ee2d38
Fix Signature of ArchiveTrace Function
mahadzaryab1 Dec 30, 2024
57ef9e7
Add Some Unit Tests
mahadzaryab1 Dec 30, 2024
40d9a68
Add Missing License
mahadzaryab1 Dec 30, 2024
32491f5
Adjust Archive Trace
mahadzaryab1 Dec 30, 2024
3d6af5f
Aggregate Traces
mahadzaryab1 Dec 30, 2024
ab95ab7
Remove TODO Comment
mahadzaryab1 Dec 30, 2024
6737ca9
Merge branch 'main' into query-service-v2
mahadzaryab1 Dec 30, 2024
005d8d7
Use SpanIter
mahadzaryab1 Dec 30, 2024
7171785
Add Proceed Check
mahadzaryab1 Dec 30, 2024
e5a4b1b
Address Feedback From PR Review
mahadzaryab1 Dec 30, 2024
f887d09
Create Receive Traces Helper Function
mahadzaryab1 Dec 30, 2024
959d2d4
Add Unit Tests For Get Traces
mahadzaryab1 Dec 30, 2024
d468249
Fix Lint
mahadzaryab1 Dec 30, 2024
f6d157e
Add Unit Tests For Find Traces
mahadzaryab1 Dec 30, 2024
d2f1c95
Add Test For Archive Trace Writer Error
mahadzaryab1 Dec 31, 2024
6f69d63
Add Test For Archive Trace Writer Success
mahadzaryab1 Dec 31, 2024
9250285
Write Test For Get Traces In Archive Storage
mahadzaryab1 Dec 31, 2024
1e0c5b3
Move Tests To Separate Package To Simplify Naming
mahadzaryab1 Dec 31, 2024
c14fc4f
Fix Comment
mahadzaryab1 Dec 31, 2024
73fbb19
Add Test For Error In Get Trace
mahadzaryab1 Dec 31, 2024
5020f89
Use Flatten With Errors
mahadzaryab1 Dec 31, 2024
e29eafe
Add Test For Error In Get Trace Reader
mahadzaryab1 Dec 31, 2024
2c716bf
Cleanup Tests
mahadzaryab1 Dec 31, 2024
28e5fd0
Combine Tests For Brevity
mahadzaryab1 Dec 31, 2024
5ae23f2
Fix Lint
mahadzaryab1 Dec 31, 2024
a79bcac
Merge branch 'main' into query-service-v2
mahadzaryab1 Dec 31, 2024
5636427
Move Query Service To V2 Package
mahadzaryab1 Dec 31, 2024
529bcda
Fix Lint
mahadzaryab1 Dec 31, 2024
806a265
Drop V2 Suffix
mahadzaryab1 Dec 31, 2024
f163d55
Merge branch 'main' into query-service-v2
mahadzaryab1 Dec 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/integration/trace_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *traceReader) GetServices(ctx context.Context) ([]string, error) {
return res.Services, nil
}

func (r *traceReader) GetOperations(ctx context.Context, query tracestore.OperationQueryParameters) ([]tracestore.Operation, error) {
func (r *traceReader) GetOperations(ctx context.Context, query tracestore.OperationQueryParams) ([]tracestore.Operation, error) {
var operations []tracestore.Operation
res, err := r.client.GetOperations(ctx, &api_v3.GetOperationsRequest{
Service: query.ServiceName,
Expand Down
14 changes: 14 additions & 0 deletions cmd/query/app/querysvc/v2/querysvc/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package querysvc
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro is this file path good? currently the adjusters are in querysvc/v2/adjuster/ - should we also move it under querysvc/v2/querysvc/adjuster?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V2/adjuster is good path. But now that you mentioned, I think we could move the whole thing out of query/app into query/internal, as part of the overall cleanup I'm pushing for. But we can do it in a follow up PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good - will do in a follow-up PR


import (
"testing"

"github.com/jaegertracing/jaeger/pkg/testutils"
)

func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
}
203 changes: 203 additions & 0 deletions cmd/query/app/querysvc/v2/querysvc/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package querysvc

import (
"context"
"errors"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/adjuster"
"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

var errNoArchiveSpanStorage = errors.New("archive span storage was not configured")

const (
defaultMaxClockSkewAdjust = time.Second
)

// QueryServiceOptions holds the configuration options for the query service.
type QueryServiceOptions struct {
// ArchiveTraceReader is used to read archived traces from the storage.
ArchiveTraceReader tracestore.Reader
// ArchiveTraceWriter is used to write traces to the archive storage.
ArchiveTraceWriter tracestore.Writer
// Adjuster is used to adjust traces before they are returned to the client.
// If not set, the default adjuster will be used.
Adjuster adjuster.Adjuster
}

// StorageCapabilities is a feature flag for query service
type StorageCapabilities struct {
ArchiveStorage bool `json:"archiveStorage"`
// TODO: Maybe add metrics Storage here
// SupportRegex bool
// SupportTagFilter bool
}

// QueryService provides methods to query data from the storage.
type QueryService struct {
traceReader tracestore.Reader
dependencyReader depstore.Reader
options QueryServiceOptions
}

// GetTraceParams defines the parameters for retrieving traces using the GetTraces function.
type GetTraceParams struct {
// TraceIDs is a slice of trace identifiers to fetch.
TraceIDs []tracestore.GetTraceParams
// RawTraces indicates whether to retrieve raw traces.
// If set to false, the traces will be adjusted using QueryServiceOptions.Adjuster.
RawTraces bool
}

// TraceQueryParams represents the parameters for querying a batch of traces.
type TraceQueryParams struct {
tracestore.TraceQueryParams
// RawTraces indicates whether to retrieve raw traces.
// If set to false, the traces will be adjusted using QueryServiceOptions.Adjuster.
RawTraces bool
}

func NewQueryService(
traceReader tracestore.Reader,
dependencyReader depstore.Reader,
options QueryServiceOptions,
) *QueryService {
qsvc := &QueryService{
traceReader: traceReader,
dependencyReader: dependencyReader,
options: options,
}

if qsvc.options.Adjuster == nil {
qsvc.options.Adjuster = adjuster.Sequence(
adjuster.StandardAdjusters(defaultMaxClockSkewAdjust)...)
}
return qsvc
}

// GetTraces retrieves traces with given trace IDs from the primary reader,
// and if any of them are not found it then queries the archive reader.
// The iterator is single-use: once consumed, it cannot be used again.
func (qs QueryService) GetTraces(
ctx context.Context,
params GetTraceParams,
) iter.Seq2[[]ptrace.Traces, error] {
getTracesIter := qs.traceReader.GetTraces(ctx, params.TraceIDs...)
return func(yield func([]ptrace.Traces, error) bool) {
foundTraceIDs, proceed := qs.receiveTraces(getTracesIter, yield, params.RawTraces)
if proceed && qs.options.ArchiveTraceReader != nil {
var missingTraceIDs []tracestore.GetTraceParams
for _, id := range params.TraceIDs {
if _, found := foundTraceIDs[id.TraceID]; !found {
missingTraceIDs = append(missingTraceIDs, id)
}
}
if len(missingTraceIDs) > 0 {
getArchiveTracesIter := qs.options.ArchiveTraceReader.GetTraces(ctx, missingTraceIDs...)
qs.receiveTraces(getArchiveTracesIter, yield, params.RawTraces)
}
}
}
}

func (qs QueryService) GetServices(ctx context.Context) ([]string, error) {
return qs.traceReader.GetServices(ctx)
}

func (qs QueryService) GetOperations(
ctx context.Context,
query tracestore.OperationQueryParams,
) ([]tracestore.Operation, error) {
return qs.traceReader.GetOperations(ctx, query)
}

func (qs QueryService) FindTraces(
ctx context.Context,
query TraceQueryParams,
) iter.Seq2[[]ptrace.Traces, error] {
return func(yield func([]ptrace.Traces, error) bool) {
tracesIter := qs.traceReader.FindTraces(ctx, query.TraceQueryParams)
qs.receiveTraces(tracesIter, yield, query.RawTraces)
}
}

// ArchiveTrace archives a trace specified by the given query parameters.
// If the ArchiveTraceWriter is not configured, it returns
// an error indicating that there is no archive span storage available.
func (qs QueryService) ArchiveTrace(ctx context.Context, query tracestore.GetTraceParams) error {
if qs.options.ArchiveTraceWriter == nil {
return errNoArchiveSpanStorage
}
getTracesIter := qs.GetTraces(
ctx, GetTraceParams{TraceIDs: []tracestore.GetTraceParams{query}},
)
var archiveErr error
getTracesIter(func(traces []ptrace.Traces, err error) bool {
if err != nil {
archiveErr = err
return false
}
for _, trace := range traces {
err = qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace)
if err != nil {
archiveErr = errors.Join(archiveErr, err)
}
}
return true
})
return archiveErr
}

func (qs QueryService) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{
StartTime: endTs.Add(-lookback),
EndTime: endTs,
})
}

func (qs QueryService) GetCapabilities() StorageCapabilities {
return StorageCapabilities{
ArchiveStorage: qs.options.hasArchiveStorage(),
}
}

func (opts *QueryServiceOptions) hasArchiveStorage() bool {
return opts.ArchiveTraceReader != nil && opts.ArchiveTraceWriter != nil
}

func (qs QueryService) receiveTraces(
seq iter.Seq2[[]ptrace.Traces, error],
yield func([]ptrace.Traces, error) bool,
rawTraces bool,
) (map[pcommon.TraceID]struct{}, bool) {
aggregatedTraces := jptrace.AggregateTraces(seq)
foundTraceIDs := make(map[pcommon.TraceID]struct{})
proceed := true
aggregatedTraces(func(trace ptrace.Traces, err error) bool {
if err != nil {
proceed = yield(nil, err)
return proceed
}
if !rawTraces {
qs.options.Adjuster.Adjust(trace)
}
jptrace.SpanIter(trace)(func(_ jptrace.SpanIterPos, span ptrace.Span) bool {
foundTraceIDs[span.TraceID()] = struct{}{}
return true
})
proceed = yield([]ptrace.Traces{trace}, nil)
return proceed
})
return foundTraceIDs, proceed
}
Loading
Loading