diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index a2a04aa98da..840466554f2 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -31,36 +31,50 @@ import ( ) const ( - dependencyType = "dependencies" - dependencyIndex = "jaeger-dependencies-" + dependencyType = "dependencies" + dependencyIndex = "jaeger-dependencies-" + indexPrefixSeparator = "-" ) // DependencyStore handles all queries and insertions to ElasticSearch dependencies type DependencyStore struct { - ctx context.Context - client es.Client - logger *zap.Logger - indexPrefix string + ctx context.Context + client es.Client + logger *zap.Logger + dependencyIndexPrefix string + useReadWriteAliases bool +} + +// DSParams holds constructor parameters for NewDependencyStore +type DSParams struct { + Client es.Client + Logger *zap.Logger + IndexPrefix string + UseReadWriteAliases bool } // NewDependencyStore returns a DependencyStore -func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string) *DependencyStore { - var prefix string - if indexPrefix != "" { - prefix = indexPrefix + "-" - } +func NewDependencyStore(p DSParams) *DependencyStore { return &DependencyStore{ - ctx: context.Background(), - client: client, - logger: logger, - indexPrefix: prefix + dependencyIndex, + ctx: context.Background(), + client: p.Client, + logger: p.Logger, + dependencyIndexPrefix: prefixIndexName(p.IndexPrefix, dependencyIndex), + useReadWriteAliases: p.UseReadWriteAliases, } } +func prefixIndexName(prefix, index string) string { + if prefix != "" { + return prefix + indexPrefixSeparator + index + } + return index +} + // WriteDependencies implements dependencystore.Writer#WriteDependencies. func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error { - indexName := indexWithDate(s.indexPrefix, ts) - s.writeDependencies(indexName, ts, dependencies) + writeIndexName := s.getWriteIndex(ts) + s.writeDependencies(writeIndexName, ts, dependencies) return nil } @@ -82,7 +96,8 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe // GetDependencies returns all interservice dependencies func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - indices := getIndices(s.indexPrefix, endTs, lookback) + indices := s.getReadIndices(endTs, lookback) + searchResult, err := s.client.Search(indices...). Size(10000). // the default elasticsearch allowed limit Query(buildTSQuery(endTs, lookback)). @@ -109,18 +124,30 @@ func buildTSQuery(endTs time.Time, lookback time.Duration) elastic.Query { return elastic.NewRangeQuery("timestamp").Gte(endTs.Add(-lookback)).Lte(endTs) } -func getIndices(prefix string, ts time.Time, lookback time.Duration) []string { +func (s *DependencyStore) getReadIndices(ts time.Time, lookback time.Duration) []string { var indices []string - firstIndex := indexWithDate(prefix, ts.Add(-lookback)) - currentIndex := indexWithDate(prefix, ts) + + if s.useReadWriteAliases { + return append(indices, s.dependencyIndexPrefix+"read") + } + + firstIndex := indexWithDate(s.dependencyIndexPrefix, ts.Add(-lookback)) + currentIndex := indexWithDate(s.dependencyIndexPrefix, ts) for currentIndex != firstIndex { indices = append(indices, currentIndex) ts = ts.Add(-24 * time.Hour) - currentIndex = indexWithDate(prefix, ts) + currentIndex = indexWithDate(s.dependencyIndexPrefix, ts) } return append(indices, firstIndex) } +func (s *DependencyStore) getWriteIndex(ts time.Time) string { + if s.useReadWriteAliases { + return s.dependencyIndexPrefix + "write" + } + return indexWithDate(s.dependencyIndexPrefix, ts) +} + func indexWithDate(indexNamePrefix string, date time.Time) string { return indexNamePrefix + date.UTC().Format("2006-01-02") } diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index 1a001cbd802..be585080390 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -47,7 +47,12 @@ func withDepStorage(indexPrefix string, fn func(r *depStorageTest)) { client: client, logger: logger, logBuffer: logBuffer, - storage: NewDependencyStore(client, logger, indexPrefix), + storage: NewDependencyStore(DSParams{ + Client: client, + Logger: logger, + IndexPrefix: indexPrefix, + UseReadWriteAliases: false, + }), } fn(r) } @@ -60,14 +65,19 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) { prefix string expected string }{ - {prefix: "", expected: ""}, - {prefix: "foo", expected: "foo-"}, - {prefix: ":", expected: ":-"}, + {prefix: "", expected: "jaeger-dependencies-"}, + {prefix: "foo", expected: "foo-jaeger-dependencies-"}, + {prefix: ":", expected: ":-jaeger-dependencies-"}, } for _, testCase := range testCases { client := &mocks.Client{} - r := NewDependencyStore(client, zap.NewNop(), testCase.prefix) - assert.Equal(t, testCase.expected+dependencyIndex, r.indexPrefix) + r := NewDependencyStore(DSParams{ + Client: client, + Logger: zap.NewNop(), + IndexPrefix: testCase.prefix, + UseReadWriteAliases: false, + }) + assert.Equal(t, testCase.expected+dependencyIndex, r.dependencyIndexPrefix) } } @@ -222,7 +232,14 @@ func TestGetIndices(t *testing.T) { }, } for _, testCase := range testCases { - assert.EqualValues(t, testCase.expected, getIndices(testCase.prefix, fixedTime, testCase.lookback)) + client := &mocks.Client{} + r := NewDependencyStore(DSParams{ + Client: client, + Logger: zap.NewNop(), + IndexPrefix: testCase.prefix, + UseReadWriteAliases: false, + }) + assert.EqualValues(t, testCase.expected, r.getReadIndices(fixedTime, testCase.lookback)) } } diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 232697a41e9..de4076cedfc 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -113,8 +113,7 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix()) - return reader, nil + return createDependencyReader(f.logger, f.primaryClient, f.primaryConfig) } func loadTagsFromFile(filePath string) ([]string, error) { @@ -209,6 +208,29 @@ func createSpanWriter( return writer, nil } +func createDependencyReader( + logger *zap.Logger, + client es.Client, + cfg config.ClientBuilder, +) (dependencystore.Reader, error) { + + dependenciesMappings := GetDependenciesMappings(cfg.GetNumShards(), cfg.GetNumReplicas(), client.GetVersion()) + reader := esDepStore.NewDependencyStore(esDepStore.DSParams{ + Client: client, + Logger: logger, + IndexPrefix: cfg.GetIndexPrefix(), + UseReadWriteAliases: cfg.GetUseReadWriteAliases(), + }) + if cfg.IsCreateIndexTemplates() { + err := reader.CreateTemplates(dependenciesMappings) + if err != nil { + return nil, err + } + } + + return reader, nil +} + // GetSpanServiceMappings returns span and service mappings func GetSpanServiceMappings(shards, replicas int64, esVersion uint) (string, string) { if esVersion == 7 { diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index d899577f4b8..002da11ccfe 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -130,7 +130,12 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro TagDotReplacement: tagKeyDeDotChar, Archive: archive, }) - dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix) + dependencyStore := dependencystore.NewDependencyStore(dependencystore.DSParams{ + Client: client, + Logger: s.logger, + IndexPrefix: indexPrefix, + UseReadWriteAliases: false, + }) depMapping := es.GetDependenciesMappings(5, 1, client.GetVersion()) err = dependencyStore.CreateTemplates(depMapping) if err != nil {