Skip to content

Commit

Permalink
Add support for ES index aliases / rollover to the dependency store (R…
Browse files Browse the repository at this point in the history
…esolves #2143) (#2144)

* Add support for ES index aliases / rollover to the dependency store

 * Give DependencyStore a params struct like the SpanStore to carry its configuration parameters
 * Adapt and extend the tests accordingly

Signed-off-by: Christian Rohmann <[email protected]>

* Extend es-rollover and es-index-cleaner to support rolling dependencies indices

Signed-off-by: Christian Rohmann <[email protected]>

Co-authored-by: Christian Rohmann <[email protected]>
Co-authored-by: Albert <[email protected]>
  • Loading branch information
3 people authored Feb 3, 2022
1 parent 95bc026 commit 18e9d5b
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 53 deletions.
7 changes: 5 additions & 2 deletions cmd/es-index-cleaner/app/index_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (i *IndexFilter) filter(indices []client.Index) []client.Index {
// archive works only for rollover
reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-span-archive-\\d{6}", i.IndexPrefix))
} else if i.Rollover {
reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service)-\\d{6}", i.IndexPrefix))
reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies)-\\d{6}", i.IndexPrefix))
} else {
reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies)-\\d{4}%s\\d{2}%s\\d{2}", i.IndexPrefix, i.IndexDateSeparator, i.IndexDateSeparator))
}
Expand All @@ -58,7 +58,10 @@ func (i *IndexFilter) filter(indices []client.Index) []client.Index {
for _, in := range indices {
if reg.MatchString(in.Index) {
// index in write alias cannot be removed
if in.Aliases[i.IndexPrefix+"jaeger-span-write"] || in.Aliases[i.IndexPrefix+"jaeger-service-write"] || in.Aliases[i.IndexPrefix+"jaeger-span-archive-write"] {
if in.Aliases[i.IndexPrefix+"jaeger-span-write"] ||
in.Aliases[i.IndexPrefix+"jaeger-service-write"] ||
in.Aliases[i.IndexPrefix+"jaeger-span-archive-write"] ||
in.Aliases[i.IndexPrefix+"jaeger-dependencies-write"] {
continue
}
filtered = append(filtered, in)
Expand Down
5 changes: 5 additions & 0 deletions cmd/es-rollover/app/index_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func RolloverIndices(archive bool, prefix string) []IndexOption {
Mapping: "jaeger-service",
indexType: "jaeger-service",
},
{
prefix: prefix,
Mapping: "jaeger-dependencies",
indexType: "jaeger-dependencies",
},
}
}

Expand Down
14 changes: 14 additions & 0 deletions cmd/es-rollover/app/index_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ func TestRolloverIndices(t *testing.T) {
writeAliasName: "jaeger-service-write",
initialRolloverIndex: "jaeger-service-000001",
},
{
templateName: "jaeger-dependencies",
mapping: "jaeger-dependencies",
readAliasName: "jaeger-dependencies-read",
writeAliasName: "jaeger-dependencies-write",
initialRolloverIndex: "jaeger-dependencies-000001",
},
},
},
{
Expand Down Expand Up @@ -99,6 +106,13 @@ func TestRolloverIndices(t *testing.T) {
writeAliasName: "mytenant-jaeger-service-write",
initialRolloverIndex: "mytenant-jaeger-service-000001",
},
{
mapping: "jaeger-dependencies",
templateName: "mytenant-jaeger-dependencies",
readAliasName: "mytenant-jaeger-dependencies-read",
writeAliasName: "mytenant-jaeger-dependencies-write",
initialRolloverIndex: "mytenant-jaeger-dependencies-000001",
},
},
},
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/esmapping-generator/app/renderer/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
)

var supportedMappings = map[string]struct{}{
"jaeger-span": {},
"jaeger-service": {},
"jaeger-span": {},
"jaeger-service": {},
"jaeger-dependencies": {},
}

// GetMappingAsString returns rendered index templates as string
Expand Down
75 changes: 50 additions & 25 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/olivere/elastic"
Expand All @@ -32,38 +31,54 @@ import (
)

const (
dependencyType = "dependencies"
dependencyIndex = "jaeger-dependencies-"
dependencyType = "dependencies"
dependencyIndex = "jaeger-dependencies-"
indexPrefixSeparator = "-"
)

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
client es.Client
logger *zap.Logger
indexPrefix string
indexDateLayout string
maxDocCount int
client es.Client
logger *zap.Logger
dependencyIndexPrefix string
indexDateLayout string
maxDocCount int
useReadWriteAliases bool
}

// DependencyStoreParams holds constructor parameters for NewDependencyStore
type DependencyStoreParams struct {
Client es.Client
Logger *zap.Logger
IndexPrefix string
IndexDateLayout string
MaxDocCount int
UseReadWriteAliases bool
}

// NewDependencyStore returns a DependencyStore
func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix, indexDateLayout string, maxDocCount int) *DependencyStore {
var prefix string
if indexPrefix != "" && !strings.HasSuffix(indexPrefix, "-") {
prefix = indexPrefix + "-"
}
func NewDependencyStore(p DependencyStoreParams) *DependencyStore {
return &DependencyStore{
client: client,
logger: logger,
indexPrefix: prefix + dependencyIndex,
indexDateLayout: indexDateLayout,
maxDocCount: maxDocCount,
client: p.Client,
logger: p.Logger,
dependencyIndexPrefix: prefixIndexName(p.IndexPrefix, dependencyIndex),
indexDateLayout: p.IndexDateLayout,
maxDocCount: p.MaxDocCount,
useReadWriteAliases: p.UseReadWriteAliases,
}
}

func prefixIndexName(prefix, index string) string {
if prefix != "" {
return prefix + indexPrefixSeparator + index
}
return index
}

// WriteDependencies implements dependencystore.Writer#WriteDependencies.
func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error {
indexName := indexWithDate(s.indexPrefix, s.indexDateLayout, ts)
s.writeDependencies(indexName, ts, dependencies)
writeIndexName := s.getWriteIndex(ts)
s.writeDependencies(writeIndexName, ts, dependencies)
return nil
}

Expand All @@ -85,7 +100,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
indices := getIndices(s.indexPrefix, s.indexDateLayout, endTs, lookback)
indices := s.getReadIndices(endTs, lookback)
searchResult, err := s.client.Search(indices...).
Size(s.maxDocCount).
Query(buildTSQuery(endTs, lookback)).
Expand All @@ -112,18 +127,28 @@ func buildTSQuery(endTs time.Time, lookback time.Duration) elastic.Query {
return elastic.NewRangeQuery("timestamp").Gte(endTs.Add(-lookback)).Lte(endTs)
}

func getIndices(prefix, dateLayout string, ts time.Time, lookback time.Duration) []string {
func (s *DependencyStore) getReadIndices(ts time.Time, lookback time.Duration) []string {
if s.useReadWriteAliases {
return []string{s.dependencyIndexPrefix + "read"}
}
var indices []string
firstIndex := indexWithDate(prefix, dateLayout, ts.Add(-lookback))
currentIndex := indexWithDate(prefix, dateLayout, ts)
firstIndex := indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts.Add(-lookback))
currentIndex := indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
ts = ts.Add(-24 * time.Hour)
currentIndex = indexWithDate(prefix, dateLayout, ts)
currentIndex = indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts)
}
return append(indices, firstIndex)
}

func indexWithDate(indexNamePrefix, indexDateLayout string, date time.Time) string {
return indexNamePrefix + date.UTC().Format(indexDateLayout)
}

func (s *DependencyStore) getWriteIndex(ts time.Time) string {
if s.useReadWriteAliases {
return s.dependencyIndexPrefix + "write"
}
return indexWithDate(s.dependencyIndexPrefix, s.indexDateLayout, ts)
}
83 changes: 68 additions & 15 deletions plugin/storage/es/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ func withDepStorage(indexPrefix, indexDateLayout string, maxDocCount int, fn fun
client: client,
logger: logger,
logBuffer: logBuffer,
storage: NewDependencyStore(client, logger, indexPrefix, indexDateLayout, maxDocCount),
storage: NewDependencyStore(DependencyStoreParams{
Client: client,
Logger: logger,
IndexPrefix: indexPrefix,
IndexDateLayout: indexDateLayout,
MaxDocCount: maxDocCount,
}),
}
fn(r)
}
Expand All @@ -69,8 +75,15 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) {
}
for _, testCase := range testCases {
client := &mocks.Client{}
r := NewDependencyStore(client, zap.NewNop(), testCase.prefix, "2006-01-02", defaultMaxDocCount)
assert.Equal(t, testCase.expected+dependencyIndex, r.indexPrefix)
r := NewDependencyStore(DependencyStoreParams{
Client: client,
Logger: zap.NewNop(),
IndexPrefix: testCase.prefix,
IndexDateLayout: "2006-01-02",
MaxDocCount: defaultMaxDocCount,
})

assert.Equal(t, testCase.expected+dependencyIndex, r.dependencyIndexPrefix)
}
}

Expand Down Expand Up @@ -200,36 +213,76 @@ func createSearchResult(dependencyLink string) *elastic.SearchResult {
return searchResult
}

func TestGetIndices(t *testing.T) {
func TestGetReadIndices(t *testing.T) {
fixedTime := time.Date(1995, time.April, 21, 4, 12, 19, 95, time.UTC)
testCases := []struct {
expected []string
indices []string
lookback time.Duration
prefix string
params DependencyStoreParams
}{
{
expected: []string{indexWithDate("", "2006-01-02", fixedTime), indexWithDate("", "2006-01-02", fixedTime.Add(-24*time.Hour))},
params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02", UseReadWriteAliases: true},
lookback: 23 * time.Hour,
indices: []string{
dependencyIndex + "read",
},
},
{
params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02"},
lookback: 23 * time.Hour,
prefix: "",
indices: []string{
dependencyIndex + fixedTime.Format("2006-01-02"),
dependencyIndex + fixedTime.Add(-23*time.Hour).Format("2006-01-02"),
},
},
{
expected: []string{indexWithDate("", "2006-01-02", fixedTime), indexWithDate("", "2006-01-02", fixedTime.Add(-24*time.Hour))},
params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02"},
lookback: 13 * time.Hour,
prefix: "",
indices: []string{
dependencyIndex + fixedTime.UTC().Format("2006-01-02"),
dependencyIndex + fixedTime.Add(-13*time.Hour).Format("2006-01-02"),
},
},
{
expected: []string{indexWithDate("foo:", "2006-01-02", fixedTime)},
params: DependencyStoreParams{IndexPrefix: "foo:", IndexDateLayout: "2006-01-02"},
lookback: 1 * time.Hour,
prefix: "foo:",
indices: []string{
"foo:" + indexPrefixSeparator + dependencyIndex + fixedTime.Format("2006-01-02"),
},
},
{
expected: []string{indexWithDate("foo-", "2006-01-02", fixedTime)},
params: DependencyStoreParams{IndexPrefix: "foo-", IndexDateLayout: "2006-01-02"},
lookback: 0,
prefix: "foo-",
indices: []string{
"foo-" + indexPrefixSeparator + dependencyIndex + fixedTime.Format("2006-01-02"),
},
},
}
for _, testCase := range testCases {
s := NewDependencyStore(testCase.params)
assert.EqualValues(t, testCase.indices, s.getReadIndices(fixedTime, testCase.lookback))
}
}

func TestGetWriteIndex(t *testing.T) {
fixedTime := time.Date(1995, time.April, 21, 4, 12, 19, 95, time.UTC)
testCases := []struct {
writeIndex string
lookback time.Duration
params DependencyStoreParams
}{
{
params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02", UseReadWriteAliases: true},
writeIndex: dependencyIndex + "write",
},
{
params: DependencyStoreParams{IndexPrefix: "", IndexDateLayout: "2006-01-02", UseReadWriteAliases: false},
writeIndex: dependencyIndex + fixedTime.Format("2006-01-02"),
},
}
for _, testCase := range testCases {
assert.EqualValues(t, testCase.expected, getIndices(testCase.prefix, "2006-01-02", fixedTime, testCase.lookback))
s := NewDependencyStore(testCase.params)
assert.EqualValues(t, testCase.writeIndex, s.getWriteIndex(fixedTime))
}
}

Expand Down
20 changes: 17 additions & 3 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix(),
f.primaryConfig.GetIndexDateLayoutDependencies(), f.primaryConfig.GetMaxDocCount())
return reader, nil
return createDependencyReader(f.logger, f.primaryClient, f.primaryConfig)
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
Expand Down Expand Up @@ -210,6 +208,22 @@ func createSpanWriter(
return writer, nil
}

func createDependencyReader(
logger *zap.Logger,
client es.Client,
cfg config.ClientBuilder,
) (dependencystore.Reader, error) {

reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{
Client: client,
Logger: logger,
IndexPrefix: cfg.GetIndexPrefix(),
MaxDocCount: cfg.GetMaxDocCount(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
})
return reader, nil
}

var _ io.Closer = (*Factory)(nil)

// Close closes the resources held by the factory
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
{
"index_patterns": "*jaeger-dependencies-*",
"aliases": {
"test-jaeger-dependencies-read" : {}
},
"settings":{
"index.number_of_shards": 3,
"index.number_of_replicas": 3,
"index.mapping.nested_fields.limit":50,
"index.requests.cache.enable":true
,"lifecycle": {
"name": "jaeger-test-policy",
"rollover_alias": "test-jaeger-dependencies-write"
}
},
"mappings":{}
}
11 changes: 11 additions & 0 deletions plugin/storage/es/mappings/jaeger-dependencies-7.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
{
"index_patterns": "*jaeger-dependencies-*",
{{- if .UseILM }}
"aliases": {
"{{ .IndexPrefix }}jaeger-dependencies-read" : {}
},
{{- end }}
"settings":{
"index.number_of_shards": {{ .Shards }},
"index.number_of_replicas": {{ .Replicas }},
"index.mapping.nested_fields.limit":50,
"index.requests.cache.enable":true
{{- if .UseILM }}
,"lifecycle": {
"name": "{{ .ILMPolicyName }}",
"rollover_alias": "{{ .IndexPrefix }}jaeger-dependencies-write"
}
{{- end }}
},
"mappings":{}
}
Loading

0 comments on commit 18e9d5b

Please sign in to comment.