Skip to content

Commit

Permalink
Add support for ES index aliases / rollover to the dependency store a…
Browse files Browse the repository at this point in the history
…nd give DependencyStore a params struct like the SpanStore to carry its configuration parameters

Signed-off-by: Christian Rohmann <[email protected]>
  • Loading branch information
frittentheke committed Jun 11, 2020
1 parent 6929183 commit 1f314b1
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 32 deletions.
71 changes: 49 additions & 22 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,36 +31,50 @@ import (
)

const (
dependencyType = "dependencies"
dependencyIndex = "jaeger-dependencies-"
dependencyType = "dependencies"
dependencyIndex = "jaeger-dependencies-"
indexPrefixSeparator = "-"
)

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
ctx context.Context
client es.Client
logger *zap.Logger
indexPrefix string
ctx context.Context
client es.Client
logger *zap.Logger
dependencyIndexPrefix string
useReadWriteAliases bool
}

// DSParams holds constructor parameters for NewDependencyStore
type DSParams struct {
Client es.Client
Logger *zap.Logger
IndexPrefix string
UseReadWriteAliases bool
}

// NewDependencyStore returns a DependencyStore
func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string) *DependencyStore {
var prefix string
if indexPrefix != "" {
prefix = indexPrefix + "-"
}
func NewDependencyStore(p DSParams) *DependencyStore {
return &DependencyStore{
ctx: context.Background(),
client: client,
logger: logger,
indexPrefix: prefix + dependencyIndex,
ctx: context.Background(),
client: p.Client,
logger: p.Logger,
dependencyIndexPrefix: prefixIndexName(p.IndexPrefix, dependencyIndex),
useReadWriteAliases: p.UseReadWriteAliases,
}
}

func prefixIndexName(prefix, index string) string {
if prefix != "" {
return prefix + indexPrefixSeparator + index
}
return index
}

// WriteDependencies implements dependencystore.Writer#WriteDependencies.
func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error {
indexName := indexWithDate(s.indexPrefix, ts)
s.writeDependencies(indexName, ts, dependencies)
writeIndexName := s.getWriteIndex(ts)
s.writeDependencies(writeIndexName, ts, dependencies)
return nil
}

Expand All @@ -82,7 +96,8 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
indices := getIndices(s.indexPrefix, endTs, lookback)
indices := s.getReadIndices(endTs, lookback)

searchResult, err := s.client.Search(indices...).
Size(10000). // the default elasticsearch allowed limit
Query(buildTSQuery(endTs, lookback)).
Expand All @@ -109,18 +124,30 @@ func buildTSQuery(endTs time.Time, lookback time.Duration) elastic.Query {
return elastic.NewRangeQuery("timestamp").Gte(endTs.Add(-lookback)).Lte(endTs)
}

func getIndices(prefix string, ts time.Time, lookback time.Duration) []string {
func (s *DependencyStore) getReadIndices(ts time.Time, lookback time.Duration) []string {
var indices []string
firstIndex := indexWithDate(prefix, ts.Add(-lookback))
currentIndex := indexWithDate(prefix, ts)

if s.useReadWriteAliases {
return append(indices, s.dependencyIndexPrefix+"read")
}

firstIndex := indexWithDate(s.dependencyIndexPrefix, ts.Add(-lookback))
currentIndex := indexWithDate(s.dependencyIndexPrefix, ts)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
ts = ts.Add(-24 * time.Hour)
currentIndex = indexWithDate(prefix, ts)
currentIndex = indexWithDate(s.dependencyIndexPrefix, ts)
}
return append(indices, firstIndex)
}

func (s *DependencyStore) getWriteIndex(ts time.Time) string {
if s.useReadWriteAliases {
return s.dependencyIndexPrefix + "write"
}
return indexWithDate(s.dependencyIndexPrefix, ts)
}

func indexWithDate(indexNamePrefix string, date time.Time) string {
return indexNamePrefix + date.UTC().Format("2006-01-02")
}
31 changes: 24 additions & 7 deletions plugin/storage/es/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ func withDepStorage(indexPrefix string, fn func(r *depStorageTest)) {
client: client,
logger: logger,
logBuffer: logBuffer,
storage: NewDependencyStore(client, logger, indexPrefix),
storage: NewDependencyStore(DSParams{
Client: client,
Logger: logger,
IndexPrefix: indexPrefix,
UseReadWriteAliases: false,
}),
}
fn(r)
}
Expand All @@ -60,14 +65,19 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) {
prefix string
expected string
}{
{prefix: "", expected: ""},
{prefix: "foo", expected: "foo-"},
{prefix: ":", expected: ":-"},
{prefix: "", expected: "jaeger-dependencies-"},
{prefix: "foo", expected: "foo-jaeger-dependencies-"},
{prefix: ":", expected: ":-jaeger-dependencies-"},
}
for _, testCase := range testCases {
client := &mocks.Client{}
r := NewDependencyStore(client, zap.NewNop(), testCase.prefix)
assert.Equal(t, testCase.expected+dependencyIndex, r.indexPrefix)
r := NewDependencyStore(DSParams{
Client: client,
Logger: zap.NewNop(),
IndexPrefix: testCase.prefix,
UseReadWriteAliases: false,
})
assert.Equal(t, testCase.expected+dependencyIndex, r.dependencyIndexPrefix)
}
}

Expand Down Expand Up @@ -222,7 +232,14 @@ func TestGetIndices(t *testing.T) {
},
}
for _, testCase := range testCases {
assert.EqualValues(t, testCase.expected, getIndices(testCase.prefix, fixedTime, testCase.lookback))
client := &mocks.Client{}
r := NewDependencyStore(DSParams{
Client: client,
Logger: zap.NewNop(),
IndexPrefix: testCase.prefix,
UseReadWriteAliases: false,
})
assert.EqualValues(t, testCase.expected, r.getReadIndices(fixedTime, testCase.lookback))
}
}

Expand Down
26 changes: 24 additions & 2 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix())
return reader, nil
return createDependencyReader(f.logger, f.primaryClient, f.primaryConfig)
}

func loadTagsFromFile(filePath string) ([]string, error) {
Expand Down Expand Up @@ -209,6 +208,29 @@ func createSpanWriter(
return writer, nil
}

func createDependencyReader(
logger *zap.Logger,
client es.Client,
cfg config.ClientBuilder,
) (dependencystore.Reader, error) {

dependenciesMappings := GetDependenciesMappings(cfg.GetNumShards(), cfg.GetNumReplicas(), client.GetVersion())
reader := esDepStore.NewDependencyStore(esDepStore.DSParams{
Client: client,
Logger: logger,
IndexPrefix: cfg.GetIndexPrefix(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
})
if cfg.IsCreateIndexTemplates() {
err := reader.CreateTemplates(dependenciesMappings)
if err != nil {
return nil, err
}
}

return reader, nil
}

// GetSpanServiceMappings returns span and service mappings
func GetSpanServiceMappings(shards, replicas int64, esVersion uint) (string, string) {
if esVersion == 7 {
Expand Down
7 changes: 6 additions & 1 deletion plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,12 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro
TagDotReplacement: tagKeyDeDotChar,
Archive: archive,
})
dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix)
dependencyStore := dependencystore.NewDependencyStore(dependencystore.DSParams{
Client: client,
Logger: s.logger,
IndexPrefix: indexPrefix,
UseReadWriteAliases: false,
})
depMapping := es.GetDependenciesMappings(5, 1, client.GetVersion())
err = dependencyStore.CreateTemplates(depMapping)
if err != nil {
Expand Down

0 comments on commit 1f314b1

Please sign in to comment.