Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2][storage] Implement read path for v2 storage interface #6170

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,6 @@ packages:
github.com/jaegertracing/jaeger/storage/spanstore:
config:
all: true
github.com/jaegertracing/jaeger/storage_v2/spanstore:
config:
all: true
12 changes: 7 additions & 5 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import (
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/metricsstore/metricstoremetrics"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
"github.com/jaegertracing/jaeger/storage_v2/spanstore"
)

// all-in-one/main is a standalone full-stack jaeger backend, backed by a memory store
Expand Down Expand Up @@ -105,7 +106,8 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to init storage factory", zap.Error(err))
}

spanReader, err := storageFactory.CreateSpanReader()
v2Factory := factoryadapter.NewFactory(storageFactory)
traceReader, err := v2Factory.CreateTraceReader()
if err != nil {
logger.Fatal("Failed to create span reader", zap.Error(err))
}
Expand Down Expand Up @@ -169,7 +171,7 @@ by default uses only in-memory database.`,
queryTelset.Metrics = queryMetricsFactory
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader, metricsQueryService,
traceReader, dependencyReader, metricsQueryService,
tm, queryTelset,
)

Expand Down Expand Up @@ -216,13 +218,13 @@ func startQuery(
svc *flags.Service,
qOpts *queryApp.QueryOptions,
queryOpts *querysvc.QueryServiceOptions,
spanReader spanstore.Reader,
traceReader spanstore.Reader,
depReader dependencystore.Reader,
metricsQueryService querysvc.MetricsQueryService,
tm *tenancy.Manager,
telset telemetry.Settings,
) *queryApp.Server {
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
qs := querysvc.NewQueryService(traceReader, depReader, *queryOpts)

server, err := queryApp.NewServer(context.Background(), qs, metricsQueryService, qOpts, tm, telset)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/anonymizer/app/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

var (
Expand Down Expand Up @@ -55,11 +56,12 @@ type testServer struct {

func newTestServer(t *testing.T) *testServer {
spanReader := &spanstoremocks.Reader{}
traceReader := factoryadapter.NewTraceReader(spanReader)
metricsReader, err := disabled.NewMetricsReader()
require.NoError(t, err)

q := querysvc.NewQueryService(
spanReader,
traceReader,
&dependencyStoreMocks.Reader{},
querysvc.QueryServiceOptions{},
)
Expand Down
16 changes: 10 additions & 6 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,21 @@
Namespace(metrics.NSOptions{Name: "jaeger"}).
Namespace(metrics.NSOptions{Name: "query"})

f, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesPrimary, host)
v1Factory, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesPrimary, host)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment why v1 is still needed (because of dependencies)

if err != nil {
return fmt.Errorf("cannot find primary storage %s: %w", s.config.Storage.TracesPrimary, err)
return fmt.Errorf("cannot find primary storage from v1 factory %s: %w", s.config.Storage.TracesPrimary, err)
}
f, err := jaegerstorage.GetStorageFactoryV2(s.config.Storage.TracesPrimary, host)
if err != nil {
return fmt.Errorf("cannot find primary storage from v2 factory %s: %w", s.config.Storage.TracesPrimary, err)

Check warning on line 83 in cmd/jaeger/internal/extension/jaegerquery/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerquery/server.go#L83

Added line #L83 was not covered by tests
}

spanReader, err := f.CreateSpanReader()
traceReader, err := f.CreateTraceReader()
if err != nil {
return fmt.Errorf("cannot create span reader: %w", err)
return fmt.Errorf("cannot create trace reader: %w", err)
}

depReader, err := f.CreateDependencyReader()
depReader, err := v1Factory.CreateDependencyReader()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to create storage_v2/depstore/ before continuing, so that we don't have to keep this bifurcation?

if err != nil {
return fmt.Errorf("cannot create dependencies reader: %w", err)
}
Expand All @@ -93,7 +97,7 @@
if err := s.addArchiveStorage(&opts, host); err != nil {
return err
}
qs := querysvc.NewQueryService(spanReader, depReader, opts)
qs := querysvc.NewQueryService(traceReader, depReader, opts)

mqs, err := s.createMetricReader(host)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestServerStart(t *testing.T) {
TracesPrimary: "need-span-reader-error",
},
},
expectedErr: "cannot create span reader",
expectedErr: "cannot create trace reader",
},
{
name: "dependency error",
Expand Down
3 changes: 2 additions & 1 deletion cmd/query/app/apiv3/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

var (
Expand Down Expand Up @@ -57,7 +58,7 @@ func newTestServerClient(t *testing.T) *testServerClient {
}

q := querysvc.NewQueryService(
tsc.reader,
factoryadapter.NewTraceReader(tsc.reader),
&dependencyStoreMocks.Reader{},
querysvc.QueryServiceOptions{},
)
Expand Down
3 changes: 2 additions & 1 deletion cmd/query/app/apiv3/http_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

func setupHTTPGatewayNoServer(
Expand All @@ -35,7 +36,7 @@ func setupHTTPGatewayNoServer(
reader: &spanstoremocks.Reader{},
}

q := querysvc.NewQueryService(gw.reader,
q := querysvc.NewQueryService(factoryadapter.NewTraceReader(gw.reader),
&dependencyStoreMocks.Reader{},
querysvc.QueryServiceOptions{},
)
Expand Down
5 changes: 3 additions & 2 deletions cmd/query/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
metricsmocks "github.com/jaegertracing/jaeger/storage/metricsstore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

var (
Expand Down Expand Up @@ -901,7 +902,7 @@ func initializeTenantedTestServerGRPCWithOptions(t *testing.T, tm *tenancy.Manag
require.NoError(t, err)

q := querysvc.NewQueryService(
spanReader,
factoryadapter.NewTraceReader(spanReader),
dependencyReader,
querysvc.QueryServiceOptions{
ArchiveSpanReader: archiveSpanReader,
Expand Down Expand Up @@ -1165,7 +1166,7 @@ func TestNewGRPCHandlerWithEmptyOptions(t *testing.T) {
require.NoError(t, err)

q := querysvc.NewQueryService(
&spanstoremocks.Reader{},
factoryadapter.NewTraceReader(&spanstoremocks.Reader{}),
&depsmocks.Reader{},
querysvc.QueryServiceOptions{})

Expand Down
7 changes: 5 additions & 2 deletions cmd/query/app/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
metricsmocks "github.com/jaegertracing/jaeger/storage/metricsstore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

const millisToNanosMultiplier = int64(time.Millisecond / time.Nanosecond)
Expand Down Expand Up @@ -119,7 +120,8 @@ func initializeTestServerWithOptions(
options = append(options, HandlerOptions.Logger(zaptest.NewLogger(t)))
readStorage := &spanstoremocks.Reader{}
dependencyStorage := &depsmocks.Reader{}
qs := querysvc.NewQueryService(readStorage, dependencyStorage, queryOptions)
traceReader := factoryadapter.NewTraceReader(readStorage)
qs := querysvc.NewQueryService(traceReader, dependencyStorage, queryOptions)
r := NewRouter()
apiHandler := NewAPIHandler(qs, options...)
apiHandler.RegisterRoutes(r)
Expand Down Expand Up @@ -198,8 +200,9 @@ func TestLogOnServerError(t *testing.T) {
zapCore, logs := observer.New(zap.InfoLevel)
logger := zap.New(zapCore)
readStorage := &spanstoremocks.Reader{}
traceReader := factoryadapter.NewTraceReader(readStorage)
dependencyStorage := &depsmocks.Reader{}
qs := querysvc.NewQueryService(readStorage, dependencyStorage, querysvc.QueryServiceOptions{})
qs := querysvc.NewQueryService(traceReader, dependencyStorage, querysvc.QueryServiceOptions{})
h := NewAPIHandler(qs, HandlerOptions.Logger(logger))
e := errors.New("test error")
h.handleError(&httptest.ResponseRecorder{}, e, http.StatusInternalServerError)
Expand Down
44 changes: 31 additions & 13 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstore_v1 "github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
"github.com/jaegertracing/jaeger/storage_v2/spanstore"
)

var errNoArchiveSpanStorage = errors.New("archive span storage was not configured")
Expand All @@ -25,8 +27,8 @@ const (

// QueryServiceOptions has optional members of QueryService
type QueryServiceOptions struct {
ArchiveSpanReader spanstore.Reader
ArchiveSpanWriter spanstore.Writer
ArchiveSpanReader spanstore_v1.Reader
ArchiveSpanWriter spanstore_v1.Writer
Adjuster adjuster.Adjuster
}

Expand All @@ -40,15 +42,15 @@ type StorageCapabilities struct {

// QueryService contains span utils required by the query-service.
type QueryService struct {
spanReader spanstore.Reader
traceReader spanstore.Reader
dependencyReader dependencystore.Reader
options QueryServiceOptions
}

// NewQueryService returns a new QueryService.
func NewQueryService(spanReader spanstore.Reader, dependencyReader dependencystore.Reader, options QueryServiceOptions) *QueryService {
func NewQueryService(traceReader spanstore.Reader, dependencyReader dependencystore.Reader, options QueryServiceOptions) *QueryService {
qsvc := &QueryService{
spanReader: spanReader,
traceReader: traceReader,
dependencyReader: dependencyReader,
options: options,
}
Expand All @@ -61,7 +63,11 @@ func NewQueryService(spanReader spanstore.Reader, dependencyReader dependencysto

// GetTrace is the queryService implementation of spanstore.Reader.GetTrace
func (qs QueryService) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
trace, err := qs.spanReader.GetTrace(ctx, traceID)
spanReader, err := factoryadapter.GetV1Reader(qs.traceReader)
if err != nil {
return nil, err
}
trace, err := spanReader.GetTrace(ctx, traceID)
if errors.Is(err, spanstore.ErrTraceNotFound) {
if qs.options.ArchiveSpanReader == nil {
return nil, err
Expand All @@ -73,20 +79,32 @@ func (qs QueryService) GetTrace(ctx context.Context, traceID model.TraceID) (*mo

// GetServices is the queryService implementation of spanstore.Reader.GetServices
func (qs QueryService) GetServices(ctx context.Context) ([]string, error) {
return qs.spanReader.GetServices(ctx)
spanReader, err := factoryadapter.GetV1Reader(qs.traceReader)
if err != nil {
return nil, err
}
return spanReader.GetServices(ctx)
}

// GetOperations is the queryService implementation of spanstore.Reader.GetOperations
func (qs QueryService) GetOperations(
ctx context.Context,
query spanstore.OperationQueryParameters,
) ([]spanstore.Operation, error) {
return qs.spanReader.GetOperations(ctx, query)
query spanstore_v1.OperationQueryParameters,
) ([]spanstore_v1.Operation, error) {
spanReader, err := factoryadapter.GetV1Reader(qs.traceReader)
if err != nil {
return nil, err
}
return spanReader.GetOperations(ctx, query)
}

// FindTraces is the queryService implementation of spanstore.Reader.FindTraces
func (qs QueryService) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
return qs.spanReader.FindTraces(ctx, query)
func (qs QueryService) FindTraces(ctx context.Context, query *spanstore_v1.TraceQueryParameters) ([]*model.Trace, error) {
spanReader, err := factoryadapter.GetV1Reader(qs.traceReader)
if err != nil {
return nil, err
}
return spanReader.FindTraces(ctx, query)
}

// ArchiveTrace is the queryService utility to archive traces.
Expand Down
Loading
Loading