Skip to content

Commit cc2a52d

Browse files
committed
Add support for ES index aliases / rollover to the dependency store and give DependencyStore a params struct like the SpanStore to carry its configuration parameters
Signed-off-by: Christian Rohmann <[email protected]>
1 parent 6929183 commit cc2a52d

File tree

4 files changed

+100
-29
lines changed

4 files changed

+100
-29
lines changed

plugin/storage/es/dependencystore/storage.go

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,36 +31,50 @@ import (
3131
)
3232

3333
const (
34-
dependencyType = "dependencies"
35-
dependencyIndex = "jaeger-dependencies-"
34+
dependencyType = "dependencies"
35+
dependencyIndex = "jaeger-dependencies-"
36+
indexPrefixSeparator = "-"
3637
)
3738

3839
// DependencyStore handles all queries and insertions to ElasticSearch dependencies
3940
type DependencyStore struct {
40-
ctx context.Context
41-
client es.Client
42-
logger *zap.Logger
43-
indexPrefix string
41+
ctx context.Context
42+
client es.Client
43+
logger *zap.Logger
44+
dependencyIndexPrefix string
45+
useReadWriteAliases bool
46+
}
47+
48+
// DSParams holds constructor parameters for NewDependencyStore
49+
type DSParams struct {
50+
Client es.Client
51+
Logger *zap.Logger
52+
IndexPrefix string
53+
UseReadWriteAliases bool
4454
}
4555

4656
// NewDependencyStore returns a DependencyStore
47-
func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string) *DependencyStore {
48-
var prefix string
49-
if indexPrefix != "" {
50-
prefix = indexPrefix + "-"
51-
}
57+
func NewDependencyStore(p DSParams) *DependencyStore {
5258
return &DependencyStore{
53-
ctx: context.Background(),
54-
client: client,
55-
logger: logger,
56-
indexPrefix: prefix + dependencyIndex,
59+
ctx: context.Background(),
60+
client: p.Client,
61+
logger: p.Logger,
62+
dependencyIndexPrefix: prefixIndexName(p.IndexPrefix, dependencyIndex),
63+
useReadWriteAliases: p.UseReadWriteAliases,
5764
}
5865
}
5966

67+
func prefixIndexName(prefix, index string) string {
68+
if prefix != "" {
69+
return prefix + indexPrefixSeparator + index
70+
}
71+
return index
72+
}
73+
6074
// WriteDependencies implements dependencystore.Writer#WriteDependencies.
6175
func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error {
62-
indexName := indexWithDate(s.indexPrefix, ts)
63-
s.writeDependencies(indexName, ts, dependencies)
76+
writeIndexName := s.getWriteIndex(ts)
77+
s.writeDependencies(writeIndexName, ts, dependencies)
6478
return nil
6579
}
6680

@@ -82,7 +96,8 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe
8296

8397
// GetDependencies returns all interservice dependencies
8498
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
85-
indices := getIndices(s.indexPrefix, endTs, lookback)
99+
indices := s.getReadIndices(endTs, lookback)
100+
86101
searchResult, err := s.client.Search(indices...).
87102
Size(10000). // the default elasticsearch allowed limit
88103
Query(buildTSQuery(endTs, lookback)).
@@ -109,18 +124,30 @@ func buildTSQuery(endTs time.Time, lookback time.Duration) elastic.Query {
109124
return elastic.NewRangeQuery("timestamp").Gte(endTs.Add(-lookback)).Lte(endTs)
110125
}
111126

112-
func getIndices(prefix string, ts time.Time, lookback time.Duration) []string {
127+
func (s *DependencyStore) getReadIndices(ts time.Time, lookback time.Duration) []string {
113128
var indices []string
114-
firstIndex := indexWithDate(prefix, ts.Add(-lookback))
115-
currentIndex := indexWithDate(prefix, ts)
129+
130+
if s.useReadWriteAliases {
131+
return append(indices, s.dependencyIndexPrefix+"read")
132+
}
133+
134+
firstIndex := indexWithDate(s.dependencyIndexPrefix, ts.Add(-lookback))
135+
currentIndex := indexWithDate(s.dependencyIndexPrefix, ts)
116136
for currentIndex != firstIndex {
117137
indices = append(indices, currentIndex)
118138
ts = ts.Add(-24 * time.Hour)
119-
currentIndex = indexWithDate(prefix, ts)
139+
currentIndex = indexWithDate(s.dependencyIndexPrefix, ts)
120140
}
121141
return append(indices, firstIndex)
122142
}
123143

144+
func (s *DependencyStore) getWriteIndex(ts time.Time) string {
145+
if s.useReadWriteAliases {
146+
return s.dependencyIndexPrefix + "write"
147+
}
148+
return indexWithDate(s.dependencyIndexPrefix, ts)
149+
}
150+
124151
func indexWithDate(indexNamePrefix string, date time.Time) string {
125152
return indexNamePrefix + date.UTC().Format("2006-01-02")
126153
}

plugin/storage/es/dependencystore/storage_test.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,12 @@ func withDepStorage(indexPrefix string, fn func(r *depStorageTest)) {
4747
client: client,
4848
logger: logger,
4949
logBuffer: logBuffer,
50-
storage: NewDependencyStore(client, logger, indexPrefix),
50+
storage: NewDependencyStore(DSParams{
51+
Client: client,
52+
Logger: logger,
53+
IndexPrefix: indexPrefix,
54+
UseReadWriteAliases: false,
55+
}),
5156
}
5257
fn(r)
5358
}
@@ -66,8 +71,13 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) {
6671
}
6772
for _, testCase := range testCases {
6873
client := &mocks.Client{}
69-
r := NewDependencyStore(client, zap.NewNop(), testCase.prefix)
70-
assert.Equal(t, testCase.expected+dependencyIndex, r.indexPrefix)
74+
r := NewDependencyStore(DSParams{
75+
Client: client,
76+
Logger: zap.NewNop(),
77+
IndexPrefix: testCase.prefix,
78+
UseReadWriteAliases: false,
79+
})
80+
assert.Equal(t, testCase.expected+dependencyIndex, r.dependencyIndexPrefix)
7181
}
7282
}
7383

@@ -222,7 +232,14 @@ func TestGetIndices(t *testing.T) {
222232
},
223233
}
224234
for _, testCase := range testCases {
225-
assert.EqualValues(t, testCase.expected, getIndices(testCase.prefix, fixedTime, testCase.lookback))
235+
client := &mocks.Client{}
236+
r := NewDependencyStore(DSParams{
237+
Client: client,
238+
Logger: zap.NewNop(),
239+
IndexPrefix: testCase.prefix,
240+
UseReadWriteAliases: false,
241+
})
242+
assert.EqualValues(t, testCase.expected, r.getReadIndices(fixedTime, testCase.lookback))
226243
}
227244
}
228245

plugin/storage/es/factory.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,7 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
113113

114114
// CreateDependencyReader implements storage.Factory
115115
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
116-
reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix())
117-
return reader, nil
116+
return createDependencyReader(f.logger, f.primaryClient, f.primaryConfig)
118117
}
119118

120119
func loadTagsFromFile(filePath string) ([]string, error) {
@@ -209,6 +208,29 @@ func createSpanWriter(
209208
return writer, nil
210209
}
211210

211+
func createDependencyReader(
212+
logger *zap.Logger,
213+
client es.Client,
214+
cfg config.ClientBuilder,
215+
) (dependencystore.Reader, error) {
216+
217+
dependenciesMappings := GetDependenciesMappings(cfg.GetNumShards(), cfg.GetNumReplicas(), client.GetVersion())
218+
reader := esDepStore.NewDependencyStore(esDepStore.DSParams{
219+
Client: client,
220+
Logger: logger,
221+
IndexPrefix: cfg.GetIndexPrefix(),
222+
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
223+
})
224+
if cfg.IsCreateIndexTemplates() {
225+
err := reader.CreateTemplates(dependenciesMappings)
226+
if err != nil {
227+
return nil, err
228+
}
229+
}
230+
231+
return reader, nil
232+
}
233+
212234
// GetSpanServiceMappings returns span and service mappings
213235
func GetSpanServiceMappings(shards, replicas int64, esVersion uint) (string, string) {
214236
if esVersion == 7 {

plugin/storage/integration/elasticsearch_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,12 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro
130130
TagDotReplacement: tagKeyDeDotChar,
131131
Archive: archive,
132132
})
133-
dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix)
133+
dependencyStore := dependencystore.NewDependencyStore(dependencystore.DSParams{
134+
Client: client,
135+
Logger: s.logger,
136+
IndexPrefix: indexPrefix,
137+
UseReadWriteAliases: false,
138+
})
134139
depMapping := es.GetDependenciesMappings(5, 1, client.GetVersion())
135140
err = dependencyStore.CreateTemplates(depMapping)
136141
if err != nil {

0 commit comments

Comments
 (0)