Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] Decouple storage factory creation and initialization #6272

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
_ "go.uber.org/automaxprocs"
"go.uber.org/zap"

Expand Down Expand Up @@ -55,10 +52,7 @@ func main() {
if os.Getenv(storage.SpanStorageTypeEnvVar) == "" {
os.Setenv(storage.SpanStorageTypeEnvVar, "memory") // other storage types default to SpanStorage
}
storageFactory, err := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr))
if err != nil {
log.Fatalf("Cannot initialize storage factory: %v", err)
}
storageFactory := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr))
samplingStrategyFactoryConfig, err := ss.FactoryConfigFromEnv()
if err != nil {
log.Fatalf("Cannot initialize sampling strategy factory config: %v", err)
Expand Down Expand Up @@ -95,8 +89,15 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to initialize tracer", zap.Error(err))
}

baseTelset := telemetry.Setting{
Logger: svc.Logger,
TracerProvider: tracer.OTEL,
Metrics: baseFactory,
ReportStatus: telemetry.HCAdapter(svc.HC()),
}

storageFactory.InitFromViper(v, logger)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
if err := storageFactory.Initialize(baseTelset); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}

Expand Down Expand Up @@ -159,20 +160,13 @@ by default uses only in-memory database.`,
log.Fatal(err)
}

telset := telemetry.Setting{
Logger: svc.Logger,
TracerProvider: tracer.OTEL,
Metrics: queryMetricsFactory,
ReportStatus: telemetry.HCAdapter(svc.HC()),
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noop.NewMeterProvider()
},
}
// query
qyeryTelset := baseTelset // copy
qyeryTelset.Metrics = queryMetricsFactory
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader, metricsQueryService,
tm, telset,
tm, qyeryTelset,
)

svc.RunAndThen(func() {
Expand Down
13 changes: 8 additions & 5 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/internal/status"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/version"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider"
Expand All @@ -36,10 +37,7 @@ const serviceName = "jaeger-collector"
func main() {
svc := cmdFlags.NewService(ports.CollectorAdminHTTP)

storageFactory, err := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr))
if err != nil {
log.Fatalf("Cannot initialize storage factory: %v", err)
}
storageFactory := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr))
samplingStrategyFactoryConfig, err := ss.FactoryConfigFromEnv()
if err != nil {
log.Fatalf("Cannot initialize sampling strategy store factory config: %v", err)
Expand All @@ -63,8 +61,13 @@ func main() {
metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "collector"})
version.NewInfoMetrics(metricsFactory)

baseTelset := telemetry.NoopSettings()
baseTelset.Logger = svc.Logger
baseTelset.Metrics = baseFactory
baseTelset.ReportStatus = telemetry.HCAdapter(svc.HC())

storageFactory.InitFromViper(v, logger)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
if err := storageFactory.Initialize(baseTelset); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}
spanWriter, err := storageFactory.CreateSpanWriter()
Expand Down
14 changes: 8 additions & 6 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package main
import (
"fmt"
"io"
"log"
"os"

"github.com/spf13/cobra"
Expand All @@ -23,6 +22,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/internal/status"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
Expand All @@ -31,10 +31,7 @@ import (
func main() {
svc := flags.NewService(ports.IngesterAdminHTTP)

storageFactory, err := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr))
if err != nil {
log.Fatalf("Cannot initialize storage factory: %v", err)
}
storageFactory := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr))

v := viper.New()
command := &cobra.Command{
Expand All @@ -50,8 +47,13 @@ func main() {
metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "ingester"})
version.NewInfoMetrics(metricsFactory)

baseTelset := telemetry.NoopSettings()
baseTelset.Logger = svc.Logger
baseTelset.Metrics = baseFactory
baseTelset.ReportStatus = telemetry.HCAdapter(svc.HC())

storageFactory.InitFromViper(v, logger)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
if err := storageFactory.Initialize(baseTelset); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}
spanWriter, err := storageFactory.CreateSpanWriter()
Expand Down
3 changes: 1 addition & 2 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
ReportStatus: func(event *componentstatus.Event) {
componentstatus.ReportStatus(host, event)
},
LeveledMeterProvider: s.telset.LeveledMeterProvider,
Host: host,
Host: host,
}

s.server, err = queryApp.NewServer(
Expand Down
3 changes: 1 addition & 2 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/internal/grpctest"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand Down Expand Up @@ -62,7 +61,7 @@ func (ff fakeFactory) CreateSpanWriter() (spanstore.Writer, error) {
return &spanstoremocks.Writer{}, nil
}

func (ff fakeFactory) Initialize(metrics.Factory, *zap.Logger) error {
func (ff fakeFactory) Initialize() error {
if ff.name == "need-initialize-error" {
return errors.New("test-error")
}
Expand Down
25 changes: 7 additions & 18 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@
"io"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/otel/metric"

"github.com/jaegertracing/jaeger/internal/metrics/otelmetrics"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/plugin/metrics/prometheus"
Expand Down Expand Up @@ -118,34 +115,26 @@
}

func (s *storageExt) Start(_ context.Context, host component.Host) error {
baseFactory := otelmetrics.NewFactory(s.telset.MeterProvider)
mf := baseFactory.Namespace(metrics.NSOptions{Name: "jaeger"})
telset := telemetry.FromOtelComponent(s.telset, host)
telset.Metrics = telset.Metrics.Namespace(metrics.NSOptions{Name: "jaeger"})

Check warning on line 119 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L118-L119

Added lines #L118 - L119 were not covered by tests
for storageName, cfg := range s.config.TraceBackends {
s.telset.Logger.Sugar().Infof("Initializing storage '%s'", storageName)
var factory storage.Factory
var err error = errors.New("empty configuration")
switch {
case cfg.Memory != nil:
factory, err = memory.NewFactoryWithConfig(*cfg.Memory, mf, s.telset.Logger), nil
factory, err = memory.NewFactoryWithConfig(*cfg.Memory, telset), nil

Check warning on line 126 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L126

Added line #L126 was not covered by tests
case cfg.Badger != nil:
factory, err = badger.NewFactoryWithConfig(*cfg.Badger, mf, s.telset.Logger)
factory, err = badger.NewFactoryWithConfig(*cfg.Badger, telset)

Check warning on line 128 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L128

Added line #L128 was not covered by tests
case cfg.GRPC != nil:
telset := telemetry.Setting{
Logger: s.telset.Logger,
Host: host,
Metrics: mf,
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return s.telset.MeterProvider
},
}
//nolint: contextcheck
factory, err = grpc.NewFactoryWithConfig(*cfg.GRPC, telset)
case cfg.Cassandra != nil:
factory, err = cassandra.NewFactoryWithConfig(*cfg.Cassandra, mf, s.telset.Logger)
factory, err = cassandra.NewFactoryWithConfig(*cfg.Cassandra, telset)

Check warning on line 133 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L133

Added line #L133 was not covered by tests
case cfg.Elasticsearch != nil:
factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, mf, s.telset.Logger)
factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, telset)

Check warning on line 135 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L135

Added line #L135 was not covered by tests
case cfg.Opensearch != nil:
factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, mf, s.telset.Logger)
factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, telset)

Check warning on line 137 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L137

Added line #L137 was not covered by tests
}
if err != nil {
return fmt.Errorf("failed to initialize storage '%s': %w", storageName, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"go.uber.org/zap"

esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
promCfg "github.com/jaegertracing/jaeger/pkg/prometheus/config"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
Expand All @@ -39,7 +38,7 @@ type errorFactory struct {
closeErr error
}

func (errorFactory) Initialize(metrics.Factory, *zap.Logger) error {
func (errorFactory) Initialize() error {
panic("not implemented")
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Q
}

// BuildQueryServiceOptions creates a QueryServiceOptions struct with appropriate adjusters and archive config
func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.Factory, logger *zap.Logger) *querysvc.QueryServiceOptions {
func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.BaseFactory, logger *zap.Logger) *querysvc.QueryServiceOptions {
opts := &querysvc.QueryServiceOptions{}
if !opts.InitArchiveStorage(storageFactory, logger) {
logger.Info("Archive storage not initialized")
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (qs QueryService) GetCapabilities() StorageCapabilities {
}

// InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them.
func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.Factory, logger *zap.Logger) bool {
func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.BaseFactory, logger *zap.Logger) bool {
archiveFactory, ok := storageFactory.(storage.ArchiveFactory)
if !ok {
logger.Info("Archive storage not supported by the factory")
Expand Down
3 changes: 1 addition & 2 deletions cmd/query/app/querysvc/query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand Down Expand Up @@ -308,7 +307,7 @@ type fakeStorageFactory2 struct {
wErr error
}

func (*fakeStorageFactory1) Initialize(metrics.Factory, *zap.Logger) error {
func (*fakeStorageFactory1) Initialize() error {
return nil
}
func (*fakeStorageFactory1) CreateSpanReader() (spanstore.Reader, error) { return nil, nil }
Expand Down
12 changes: 6 additions & 6 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ func createGRPCServer(
ctx,
telset.Host,
component.TelemetrySettings{
Logger: telset.Logger,
TracerProvider: telset.TracerProvider,
LeveledMeterProvider: telset.LeveledMeterProvider,
Logger: telset.Logger,
TracerProvider: telset.TracerProvider,
MeterProvider: telset.MeterProvider,
},
grpcOpts...)
}
Expand Down Expand Up @@ -214,9 +214,9 @@ func createHTTPServer(
ctx,
telset.Host,
component.TelemetrySettings{
Logger: telset.Logger,
TracerProvider: telset.TracerProvider,
LeveledMeterProvider: telset.LeveledMeterProvider,
Logger: telset.Logger,
TracerProvider: telset.TracerProvider,
MeterProvider: telset.MeterProvider,
},
handler,
)
Expand Down
18 changes: 5 additions & 13 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.uber.org/zap"
Expand All @@ -50,15 +46,11 @@ import (
var testCertKeyLocation = "../../../pkg/config/tlscfg/testdata"

func initTelSet(logger *zap.Logger, tracerProvider *jtracer.JTracer, hc *healthcheck.HealthCheck) telemetry.Setting {
return telemetry.Setting{
Logger: logger,
TracerProvider: tracerProvider.OTEL,
ReportStatus: telemetry.HCAdapter(hc),
Host: componenttest.NewNopHost(),
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noop.NewMeterProvider()
},
}
telset := telemetry.NoopSettings()
telset.Logger = logger
telset.TracerProvider = tracerProvider.OTEL
telset.ReportStatus = telemetry.HCAdapter(hc)
return telset
}

func TestServerError(t *testing.T) {
Expand Down
21 changes: 6 additions & 15 deletions cmd/query/app/token_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,12 @@ import (
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger/cmd/internal/flags"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/bearertoken"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage/es"
Expand Down Expand Up @@ -71,7 +66,11 @@ func runQueryService(t *testing.T, esURL string) *Server {
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zaptest.NewLogger(t)

f := es.NewFactory()
telset := telemetry.NoopSettings()
telset.Logger = flagsSvc.Logger
telset.ReportStatus = telemetry.HCAdapter(flagsSvc.HC())

f := es.NewFactory(telset)
v, command := config.Viperize(f.AddFlags)
require.NoError(t, command.ParseFlags([]string{
"--es.tls.enabled=false",
Expand All @@ -81,21 +80,13 @@ func runQueryService(t *testing.T, esURL string) *Server {
f.InitFromViper(v, flagsSvc.Logger)
// set AllowTokenFromContext manually because we don't register the respective CLI flag from query svc
f.Options.Primary.Authentication.BearerTokenAuthentication.AllowFromContext = true
require.NoError(t, f.Initialize(metrics.NullFactory, flagsSvc.Logger))
require.NoError(t, f.Initialize())
defer f.Close()

spanReader, err := f.CreateSpanReader()
require.NoError(t, err)

querySvc := querysvc.NewQueryService(spanReader, nil, querysvc.QueryServiceOptions{})
telset := telemetry.Setting{
Logger: flagsSvc.Logger,
TracerProvider: jtracer.NoOp().OTEL,
ReportStatus: telemetry.HCAdapter(flagsSvc.HC()),
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noop.NewMeterProvider()
},
}
server, err := NewServer(context.Background(), querySvc, nil,
&QueryOptions{
BearerTokenPropagation: true,
Expand Down
Loading
Loading