Skip to content

Commit

Permalink
[v2][storage] Move span reader decorator to storage factories (#6280)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Towards #6219 

## Description of the changes
- This PR moves the decoration of the span readers with the metrics
factory to inside the storage factory itself rather than handling it a
higher level (ex. all-in-one, query server/extension).
- For v2, the namespacing of the metrics has been moved from the query
extension to the storage extension.
- For v1, the namespacing of the metrics has been moved from the various
binaries to the storage meta-factory.
- This PR contains a few breaking changes as it changes the namespace
under which the following metrics are published:
- Storage specific metrics were that were being pushed under
`jaeger_query` to will now be pushed undder `jaeger_storage`
- Cassandra specific metrics were pushed under `jaeger_cassandra` and
`jaeger_cassandra-archive` will now be pushed under `jaeger_storage`
with the following tags:
    - `kind=cassandra`
    - `role=primary` or `role=archive`
    - `name` with the name of the storage (in jaeger-v2 only)
- ElasticSearch/OpenSearch specific metrics were being pushed under
`jaeger_index_create` will now be pushed under
`jaeger_storage_index_create` with the following tags:
      - `kind=elasticsearch`/kind=`opensearch`
      - `role=primary` or `role=archive`
      - `name` with the name of the storage (in jaeger-v2 only)

## How was this change tested?
- CI
- We've booked #6278 to
implement a framework that will allow for validation of metrics

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
mahadzaryab1 and yurishkuro authored Nov 30, 2024
1 parent e9fac05 commit 557f149
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 50 deletions.
2 changes: 0 additions & 2 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/metricsstore/metricstoremetrics"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)

// all-in-one/main is a standalone full-stack jaeger backend, backed by a memory store
Expand Down Expand Up @@ -223,7 +222,6 @@ func startQuery(
tm *tenancy.Manager,
telset telemetry.Settings,
) *queryApp.Server {
spanReader = spanstoremetrics.NewReaderDecorator(spanReader, telset.Metrics)
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)

server, err := queryApp.NewServer(context.Background(), qs, metricsQueryService, qOpts, tm, telset)
Expand Down
3 changes: 0 additions & 3 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/metricsstore/metricstoremetrics"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)

var (
Expand Down Expand Up @@ -85,8 +84,6 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
return fmt.Errorf("cannot create span reader: %w", err)
}

spanReader = spanstoremetrics.NewReaderDecorator(spanReader, telset.Metrics)

depReader, err := f.CreateDependencyReader()
if err != nil {
return fmt.Errorf("cannot create dependencies reader: %w", err)
Expand Down
42 changes: 36 additions & 6 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,54 @@ func newStorageExt(config *Config, telset component.TelemetrySettings) *storageE
func (s *storageExt) Start(_ context.Context, host component.Host) error {
telset := telemetry.FromOtelComponent(s.telset, host)
telset.Metrics = telset.Metrics.Namespace(metrics.NSOptions{Name: "jaeger"})
getMetricsFactory := func(name, kind string) metrics.Factory {
return telset.Metrics.Namespace(metrics.NSOptions{
Name: "storage",
Tags: map[string]string{
"name": name,
"kind": kind,
},
})
}
for storageName, cfg := range s.config.TraceBackends {
s.telset.Logger.Sugar().Infof("Initializing storage '%s'", storageName)
var factory storage.Factory
var err error = errors.New("empty configuration")
switch {
case cfg.Memory != nil:
factory, err = memory.NewFactoryWithConfig(*cfg.Memory, telset.Metrics, s.telset.Logger), nil
factory, err = memory.NewFactoryWithConfig(
*cfg.Memory,
getMetricsFactory(storageName, "memory"),
s.telset.Logger,
), nil
case cfg.Badger != nil:
factory, err = badger.NewFactoryWithConfig(*cfg.Badger, telset.Metrics, s.telset.Logger)
factory, err = badger.NewFactoryWithConfig(
*cfg.Badger,
getMetricsFactory(storageName, "badger"),
s.telset.Logger)
case cfg.GRPC != nil:
grpcTelset := telset
grpcTelset.Metrics = getMetricsFactory(storageName, "grpc")
//nolint: contextcheck
factory, err = grpc.NewFactoryWithConfig(*cfg.GRPC, telset)
factory, err = grpc.NewFactoryWithConfig(*cfg.GRPC, grpcTelset)
case cfg.Cassandra != nil:
factory, err = cassandra.NewFactoryWithConfig(*cfg.Cassandra, telset.Metrics, s.telset.Logger)
factory, err = cassandra.NewFactoryWithConfig(
*cfg.Cassandra,
getMetricsFactory(storageName, "cassandra"),
s.telset.Logger,
)
case cfg.Elasticsearch != nil:
factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, telset.Metrics, s.telset.Logger)
factory, err = es.NewFactoryWithConfig(
*cfg.Elasticsearch,
getMetricsFactory(storageName, "elasticsearch"),
s.telset.Logger,
)
case cfg.Opensearch != nil:
factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, telset.Metrics, s.telset.Logger)
factory, err = es.NewFactoryWithConfig(
*cfg.Opensearch,
getMetricsFactory(storageName, "opensearch"),
s.telset.Logger,
)
}
if err != nil {
return fmt.Errorf("failed to initialize storage '%s': %w", storageName, err)
Expand Down
2 changes: 0 additions & 2 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/metricsstore/metricstoremetrics"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)

func main() {
Expand Down Expand Up @@ -95,7 +94,6 @@ func main() {
if err != nil {
logger.Fatal("Failed to create span reader", zap.Error(err))
}
spanReader = spanstoremetrics.NewReaderDecorator(spanReader, metricsFactory)
dependencyReader, err := storageFactory.CreateDependencyReader()
if err != nil {
logger.Fatal("Failed to create dependency reader", zap.Error(err))
Expand Down
14 changes: 9 additions & 5 deletions plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)

const (
Expand All @@ -50,10 +51,11 @@ var ( // interface comformance checks

// Factory implements storage.Factory for Badger backend.
type Factory struct {
Config *Config
store *badger.DB
cache *badgerStore.CacheStore
logger *zap.Logger
Config *Config
store *badger.DB
cache *badgerStore.CacheStore
logger *zap.Logger
metricsFactory metrics.Factory

tmpDir string
maintenanceDone chan bool
Expand Down Expand Up @@ -115,6 +117,7 @@ func (f *Factory) configure(config *Config) {
// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.logger = logger
f.metricsFactory = metricsFactory

opts := badger.DefaultOptions("")

Expand Down Expand Up @@ -173,7 +176,8 @@ func initializeDir(path string) {

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return badgerStore.NewTraceReader(f.store, f.cache), nil
tr := badgerStore.NewTraceReader(f.store, f.cache)
return spanstoremetrics.NewReaderDecorator(tr, f.metricsFactory), nil
}

// CreateSpanWriter implements storage.Factory
Expand Down
40 changes: 35 additions & 5 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)

const (
Expand All @@ -52,6 +53,7 @@ var ( // interface comformance checks
type Factory struct {
Options *Options

metricsFactory metrics.Factory
primaryMetricsFactory metrics.Factory
archiveMetricsFactory metrics.Factory
logger *zap.Logger
Expand Down Expand Up @@ -138,8 +140,21 @@ func (f *Factory) configureFromOptions(o *Options) {

// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.primaryMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra", Tags: nil})
f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil})
f.metricsFactory = metricsFactory
f.primaryMetricsFactory = metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "primary",
},
},
)
f.archiveMetricsFactory = metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "archive",
},
},
)
f.logger = logger

primarySession, err := f.sessionBuilderFn(&f.primaryConfig)
Expand Down Expand Up @@ -204,7 +219,11 @@ func NewSession(c *config.Configuration) (cassandra.Session, error) {

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
sr, err := cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
if err != nil {
return sr, err
}
return spanstoremetrics.NewReaderDecorator(sr, f.primaryMetricsFactory), nil
}

// CreateSpanWriter implements storage.Factory
Expand All @@ -227,7 +246,11 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
sr, err := cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
if err != nil {
return sr, err
}
return spanstoremetrics.NewReaderDecorator(sr, f.archiveMetricsFactory), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
Expand Down Expand Up @@ -255,7 +278,14 @@ func (f *Factory) CreateLock() (distributedlock.Lock, error) {

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
return cSamplingStore.New(f.primarySession, f.primaryMetricsFactory, f.logger), nil
samplingMetricsFactory := f.metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "sampling",
},
},
)
return cSamplingStore.New(f.primarySession, samplingMetricsFactory, f.logger), nil
}

func writerOptions(opts *Options) ([]cSpanStore.Option, error) {
Expand Down
50 changes: 36 additions & 14 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)

const (
Expand All @@ -52,9 +53,10 @@ var ( // interface comformance checks
type Factory struct {
Options *Options

metricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider
primaryMetricsFactory metrics.Factory
archiveMetricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider

newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)

Expand Down Expand Up @@ -129,7 +131,21 @@ func (f *Factory) configureFromOptions(o *Options) {

// Initialize implements storage.Factory.
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger
f.primaryMetricsFactory = metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "primary",
},
},
)
f.archiveMetricsFactory = metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "archive",
},
},
)
f.logger = logger

primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory)
if err != nil {
Expand Down Expand Up @@ -180,12 +196,16 @@ func (f *Factory) getArchiveClient() es.Client {

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer)
sr, err := createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.logger, f.tracer)
if err != nil {
return sr, err
}
return spanstoremetrics.NewReaderDecorator(sr, f.primaryMetricsFactory), nil
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.primaryMetricsFactory, f.logger)
}

// CreateDependencyReader implements storage.Factory
Expand All @@ -198,22 +218,25 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer)
sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.logger, f.tracer)
if err != nil {
return sr, err
}
return spanstoremetrics.NewReaderDecorator(sr, f.archiveMetricsFactory), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger)
return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, f.archiveMetricsFactory, f.logger)
}

func createSpanReader(
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
logger *zap.Logger,
tp trace.TracerProvider,
) (spanstore.Reader, error) {
Expand All @@ -232,7 +255,6 @@ func createSpanReader(
Archive: archive,
RemoteReadClusters: cfg.RemoteReadClusters,
Logger: logger,
MetricsFactory: mFactory,
Tracer: tp.Tracer("esSpanStore.SpanReader"),
}), nil
}
Expand Down Expand Up @@ -352,14 +374,14 @@ func (f *Factory) Close() error {
}

func (f *Factory) onPrimaryPasswordChange() {
f.onClientPasswordChange(f.primaryConfig, &f.primaryClient)
f.onClientPasswordChange(f.primaryConfig, &f.primaryClient, f.primaryMetricsFactory)
}

func (f *Factory) onArchivePasswordChange() {
f.onClientPasswordChange(f.archiveConfig, &f.archiveClient)
f.onClientPasswordChange(f.archiveConfig, &f.archiveClient, f.archiveMetricsFactory)
}

func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client]) {
func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client], mf metrics.Factory) {
newPassword, err := loadTokenFromFile(cfg.Authentication.BasicAuthentication.PasswordFilePath)
if err != nil {
f.logger.Error("failed to reload password for Elasticsearch client", zap.Error(err))
Expand All @@ -370,7 +392,7 @@ func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atom
newCfg.Authentication.BasicAuthentication.Password = newPassword
newCfg.Authentication.BasicAuthentication.PasswordFilePath = "" // avoid error that both are set

newClient, err := f.newClientFn(&newCfg, f.logger, f.metricsFactory)
newClient, err := f.newClientFn(&newCfg, f.logger, mf)
if err != nil {
f.logger.Error("failed to recreate Elasticsearch client with new password", zap.Error(err))
return
Expand Down
2 changes: 0 additions & 2 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/es"
cfg "github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand Down Expand Up @@ -113,7 +112,6 @@ type SpanReaderParams struct {
Archive bool
UseReadWriteAliases bool
RemoteReadClusters []string
MetricsFactory metrics.Factory
Logger *zap.Logger
Tracer trace.Tracer
}
Expand Down
Loading

0 comments on commit 557f149

Please sign in to comment.