From 2321c7682cb22b7c4b2c3dba76336edeb152f4ca Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sat, 16 Nov 2024 11:39:51 -0500 Subject: [PATCH] Hold TraceReader In Query Service Signed-off-by: Mahad Zaryab --- cmd/all-in-one/main.go | 4 +- .../internal/extension/jaegerquery/server.go | 4 +- cmd/query/app/querysvc/query_service.go | 44 +++++++++++++------ cmd/query/main.go | 4 +- 4 files changed, 40 insertions(+), 16 deletions(-) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index e5718a2afff..1673ffde66e 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -41,6 +41,7 @@ import ( metricsstoreMetrics "github.com/jaegertracing/jaeger/storage/metricsstore/metrics" "github.com/jaegertracing/jaeger/storage/spanstore" storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) // all-in-one/main is a standalone full-stack jaeger backend, backed by a memory store @@ -219,7 +220,8 @@ func startQuery( telset telemetery.Setting, ) *queryApp.Server { spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, telset.Metrics) - qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts) + traceReader := factoryadapter.NewTraceReader(spanReader) + qs := querysvc.NewQueryService(traceReader, depReader, *queryOpts) server, err := queryApp.NewServer(context.Background(), qs, metricsQueryService, qOpts, tm, telset) if err != nil { diff --git a/cmd/jaeger/internal/extension/jaegerquery/server.go b/cmd/jaeger/internal/extension/jaegerquery/server.go index 46bcfc4cc87..01f1f36e657 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server.go @@ -24,6 +24,7 @@ import ( "github.com/jaegertracing/jaeger/plugin/metrics/disabled" "github.com/jaegertracing/jaeger/storage/metricsstore" storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) var ( @@ -66,6 +67,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error { } spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, queryMetricsFactory) + traceReader := factoryadapter.NewTraceReader(spanReader) depReader, err := f.CreateDependencyReader() if err != nil { @@ -76,7 +78,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error { if err := s.addArchiveStorage(&opts, host); err != nil { return err } - qs := querysvc.NewQueryService(spanReader, depReader, opts) + qs := querysvc.NewQueryService(traceReader, depReader, opts) mqs, err := s.createMetricReader(host) if err != nil { diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index 135ecc60bbe..5e23de8f499 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -14,7 +14,9 @@ import ( "github.com/jaegertracing/jaeger/model/adjuster" "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/dependencystore" - "github.com/jaegertracing/jaeger/storage/spanstore" + spanstore_v1 "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" + "github.com/jaegertracing/jaeger/storage_v2/spanstore" ) var errNoArchiveSpanStorage = errors.New("archive span storage was not configured") @@ -25,8 +27,8 @@ const ( // QueryServiceOptions has optional members of QueryService type QueryServiceOptions struct { - ArchiveSpanReader spanstore.Reader - ArchiveSpanWriter spanstore.Writer + ArchiveSpanReader spanstore_v1.Reader + ArchiveSpanWriter spanstore_v1.Writer Adjuster adjuster.Adjuster } @@ -40,15 +42,15 @@ type StorageCapabilities struct { // QueryService contains span utils required by the query-service. type QueryService struct { - spanReader spanstore.Reader + traceReader spanstore.Reader dependencyReader dependencystore.Reader options QueryServiceOptions } // NewQueryService returns a new QueryService. -func NewQueryService(spanReader spanstore.Reader, dependencyReader dependencystore.Reader, options QueryServiceOptions) *QueryService { +func NewQueryService(traceReader spanstore.Reader, dependencyReader dependencystore.Reader, options QueryServiceOptions) *QueryService { qsvc := &QueryService{ - spanReader: spanReader, + traceReader: traceReader, dependencyReader: dependencyReader, options: options, } @@ -61,7 +63,11 @@ func NewQueryService(spanReader spanstore.Reader, dependencyReader dependencysto // GetTrace is the queryService implementation of spanstore.Reader.GetTrace func (qs QueryService) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - trace, err := qs.spanReader.GetTrace(ctx, traceID) + spanReader, err := factoryadapter.GetV1Reader(qs.traceReader) + if err != nil { + return nil, err + } + trace, err := spanReader.GetTrace(ctx, traceID) if errors.Is(err, spanstore.ErrTraceNotFound) { if qs.options.ArchiveSpanReader == nil { return nil, err @@ -73,20 +79,32 @@ func (qs QueryService) GetTrace(ctx context.Context, traceID model.TraceID) (*mo // GetServices is the queryService implementation of spanstore.Reader.GetServices func (qs QueryService) GetServices(ctx context.Context) ([]string, error) { - return qs.spanReader.GetServices(ctx) + spanReader, err := factoryadapter.GetV1Reader(qs.traceReader) + if err != nil { + return nil, err + } + return spanReader.GetServices(ctx) } // GetOperations is the queryService implementation of spanstore.Reader.GetOperations func (qs QueryService) GetOperations( ctx context.Context, - query spanstore.OperationQueryParameters, -) ([]spanstore.Operation, error) { - return qs.spanReader.GetOperations(ctx, query) + query spanstore_v1.OperationQueryParameters, +) ([]spanstore_v1.Operation, error) { + spanReader, err := factoryadapter.GetV1Reader(qs.traceReader) + if err != nil { + return nil, err + } + return spanReader.GetOperations(ctx, query) } // FindTraces is the queryService implementation of spanstore.Reader.FindTraces -func (qs QueryService) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { - return qs.spanReader.FindTraces(ctx, query) +func (qs QueryService) FindTraces(ctx context.Context, query *spanstore_v1.TraceQueryParameters) ([]*model.Trace, error) { + spanReader, err := factoryadapter.GetV1Reader(qs.traceReader) + if err != nil { + return nil, err + } + return spanReader.FindTraces(ctx, query) } // ArchiveTrace is the queryService utility to archive traces. diff --git a/cmd/query/main.go b/cmd/query/main.go index 1e8197d3e3f..09a5c13ddbd 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -34,6 +34,7 @@ import ( "github.com/jaegertracing/jaeger/ports" metricsstoreMetrics "github.com/jaegertracing/jaeger/storage/metricsstore/metrics" spanstoreMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) func main() { @@ -89,6 +90,7 @@ func main() { logger.Fatal("Failed to create span reader", zap.Error(err)) } spanReader = spanstoreMetrics.NewReadMetricsDecorator(spanReader, metricsFactory) + traceReader := factoryadapter.NewTraceReader(spanReader) dependencyReader, err := storageFactory.CreateDependencyReader() if err != nil { logger.Fatal("Failed to create dependency reader", zap.Error(err)) @@ -100,7 +102,7 @@ func main() { } queryServiceOptions := queryOpts.BuildQueryServiceOptions(storageFactory, logger) queryService := querysvc.NewQueryService( - spanReader, + traceReader, dependencyReader, *queryServiceOptions) tm := tenancy.NewManager(&queryOpts.Tenancy)