diff --git a/Gopkg.lock b/Gopkg.lock index 1ca09c857b8..cbdf41f0478 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -655,6 +655,18 @@ revision = "4dadeb3030eda0273a12382bb2348ffc7c9d1a39" version = "v1.0.0" +[[projects]] + digest = "1:c2d643e8b2c25cf18d6e1237233df6457729d9491bbad6fa9598cad8d01466a6" + name = "github.com/olivere/elastic" + packages = [ + ".", + "config", + "uritemplates", + ] + pruneopts = "UT" + revision = "8ebe6a0fc23d9d53fbd8890da8ae7ee7cea78dbe" + version = "v6.2.22" + [[projects]] branch = "master" digest = "1:bccaead3121ab7964fd80fab704f612e5893fb5a2c581d520ec847ed8cfac27e" @@ -1221,18 +1233,6 @@ revision = "99a8ce2fbf8b8087b6ed12a37c61b10f04070043" version = "v1.1.0" -[[projects]] - digest = "1:9a1d716749c77399bfa71792d77eef3278586423947f3431dbac6d6049c24787" - name = "gopkg.in/olivere/elastic.v5" - packages = [ - ".", - "config", - "uritemplates", - ] - pruneopts = "UT" - revision = "f72acaba629a7ec879103d17b7426a31bc38e199" - version = "v5.0.80" - [[projects]] digest = "1:4d2e5a73dc1500038e504a8d78b986630e3626dc027bc030ba5c75da257cdb96" name = "gopkg.in/yaml.v2" @@ -1309,6 +1309,7 @@ "github.com/hashicorp/go-hclog", "github.com/hashicorp/go-plugin", "github.com/kr/pretty", + "github.com/olivere/elastic", "github.com/opentracing-contrib/go-stdlib/nethttp", "github.com/opentracing/opentracing-go", "github.com/opentracing/opentracing-go/ext", @@ -1361,7 +1362,6 @@ "google.golang.org/grpc/status", "google.golang.org/grpc/test/bufconn", "google.golang.org/grpc/test/grpc_testing", - "gopkg.in/olivere/elastic.v5", "gopkg.in/yaml.v2", "honnef.co/go/tools/cmd/staticcheck", ] diff --git a/Gopkg.toml b/Gopkg.toml index 390299e7c60..018b0cb5e6f 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -116,8 +116,8 @@ required = [ version = "=1.20.1" [[constraint]] - name = "gopkg.in/olivere/elastic.v5" - version = "^5.0.53" + name = "github.com/olivere/elastic" + version = "^6.2.22" [[constraint]] name = "github.com/gocql/gocql" diff --git a/pkg/es/client.go b/pkg/es/client.go index 1a2eff51023..ec9cf45fa01 100644 --- a/pkg/es/client.go +++ b/pkg/es/client.go @@ -19,7 +19,7 @@ import ( "context" "io" - "gopkg.in/olivere/elastic.v5" + "github.com/olivere/elastic" ) // Client is an abstraction for elastic.Client @@ -31,6 +31,7 @@ type Client interface { Search(indices ...string) SearchService MultiSearch() MultiSearchService io.Closer + GetVersion() int } // IndicesExistsService is an abstraction for elastic.IndicesExistsService @@ -61,7 +62,6 @@ type IndexService interface { // SearchService is an abstraction for elastic.SearchService type SearchService interface { - Type(typ string) SearchService Size(size int) SearchService Aggregation(name string, aggregation elastic.Aggregation) SearchService IgnoreUnavailable(ignoreUnavailable bool) SearchService diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index eb445323075..a7012446f0c 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -22,14 +22,15 @@ import ( "io/ioutil" "net/http" "path/filepath" + "strconv" "strings" "sync" "time" + "github.com/olivere/elastic" "github.com/pkg/errors" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" - "gopkg.in/olivere/elastic.v5" "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/wrapper" @@ -155,7 +156,19 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac if err != nil { return nil, err } - return eswrapper.WrapESClient(rawClient, service), nil + + // Determine ElasticSearch Version + pingResult, _, err := rawClient.Ping(c.Servers[0]).Do(context.Background()) + if err != nil { + return nil, err + } + esVersion, err := strconv.Atoi(string(pingResult.Version.Number[0])) + if err != nil { + return nil, err + } + logger.Info("Elasticsearch detected", zap.Int("version", esVersion)) + + return eswrapper.WrapESClient(rawClient, service, esVersion), nil } // ApplyDefaults copies settings from source unless its own value is non-zero. diff --git a/pkg/es/mocks/Client.go b/pkg/es/mocks/Client.go index ff0a1febbcb..58be0f1b095 100644 --- a/pkg/es/mocks/Client.go +++ b/pkg/es/mocks/Client.go @@ -71,6 +71,20 @@ func (_m *Client) CreateTemplate(id string) es.TemplateCreateService { return r0 } +// GetVersion provides a mock function with given fields: +func (_m *Client) GetVersion() int { + ret := _m.Called() + + var r0 int + if rf, ok := ret.Get(0).(func() int); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + // Index provides a mock function with given fields: func (_m *Client) Index() es.IndexService { ret := _m.Called() diff --git a/pkg/es/mocks/IndicesCreateService.go b/pkg/es/mocks/IndicesCreateService.go index 79dc25fc436..eef1aa50b7f 100644 --- a/pkg/es/mocks/IndicesCreateService.go +++ b/pkg/es/mocks/IndicesCreateService.go @@ -18,7 +18,7 @@ package mocks import context "context" -import elastic "gopkg.in/olivere/elastic.v5" +import elastic "github.com/olivere/elastic" import es "github.com/jaegertracing/jaeger/pkg/es" import mock "github.com/stretchr/testify/mock" diff --git a/pkg/es/mocks/MultiSearchService.go b/pkg/es/mocks/MultiSearchService.go index b7bab92ddae..e204734343e 100644 --- a/pkg/es/mocks/MultiSearchService.go +++ b/pkg/es/mocks/MultiSearchService.go @@ -18,7 +18,7 @@ package mocks import context "context" -import elastic "gopkg.in/olivere/elastic.v5" +import elastic "github.com/olivere/elastic" import es "github.com/jaegertracing/jaeger/pkg/es" import mock "github.com/stretchr/testify/mock" diff --git a/pkg/es/mocks/SearchService.go b/pkg/es/mocks/SearchService.go index 83fdb63c6f0..6cc725521b4 100644 --- a/pkg/es/mocks/SearchService.go +++ b/pkg/es/mocks/SearchService.go @@ -18,7 +18,7 @@ package mocks import context "context" -import elastic "gopkg.in/olivere/elastic.v5" +import elastic "github.com/olivere/elastic" import es "github.com/jaegertracing/jaeger/pkg/es" import mock "github.com/stretchr/testify/mock" @@ -113,19 +113,3 @@ func (_m *SearchService) Size(size int) es.SearchService { return r0 } - -// Type provides a mock function with given fields: typ -func (_m *SearchService) Type(typ string) es.SearchService { - ret := _m.Called(typ) - - var r0 es.SearchService - if rf, ok := ret.Get(0).(func(string) es.SearchService); ok { - r0 = rf(typ) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(es.SearchService) - } - } - - return r0 -} diff --git a/pkg/es/mocks/TemplateCreateService.go b/pkg/es/mocks/TemplateCreateService.go index b140c4d3451..77db1ece9d1 100644 --- a/pkg/es/mocks/TemplateCreateService.go +++ b/pkg/es/mocks/TemplateCreateService.go @@ -18,7 +18,7 @@ package mocks import context "context" -import elastic "gopkg.in/olivere/elastic.v5" +import elastic "github.com/olivere/elastic" import es "github.com/jaegertracing/jaeger/pkg/es" import mock "github.com/stretchr/testify/mock" diff --git a/pkg/es/wrapper/wrapper.go b/pkg/es/wrapper/wrapper.go index bf05de6a798..07d2b15c6e5 100644 --- a/pkg/es/wrapper/wrapper.go +++ b/pkg/es/wrapper/wrapper.go @@ -18,7 +18,7 @@ package eswrapper import ( "context" - "gopkg.in/olivere/elastic.v5" + "github.com/olivere/elastic" "github.com/jaegertracing/jaeger/pkg/es" ) @@ -29,11 +29,17 @@ import ( type ClientWrapper struct { client *elastic.Client bulkService *elastic.BulkProcessor + esVersion int +} + +// GetVersion returns the ElasticSearch Version +func (c ClientWrapper) GetVersion() int { + return c.esVersion } // WrapESClient creates a ESClient out of *elastic.Client. -func WrapESClient(client *elastic.Client, s *elastic.BulkProcessor) ClientWrapper { - return ClientWrapper{client: client, bulkService: s} +func WrapESClient(client *elastic.Client, s *elastic.BulkProcessor, esVersion int) ClientWrapper { + return ClientWrapper{client: client, bulkService: s, esVersion: esVersion} } // IndexExists calls this function to internal client. @@ -54,17 +60,25 @@ func (c ClientWrapper) CreateTemplate(ttype string) es.TemplateCreateService { // Index calls this function to internal client. func (c ClientWrapper) Index() es.IndexService { r := elastic.NewBulkIndexRequest() - return WrapESIndexService(r, c.bulkService) + return WrapESIndexService(r, c.bulkService, c.esVersion) } // Search calls this function to internal client. func (c ClientWrapper) Search(indices ...string) es.SearchService { - return WrapESSearchService(c.client.Search(indices...)) + searchService := c.client.Search(indices...) + if c.esVersion == 7 { + searchService = searchService.RestTotalHitsAsInt(true) + } + return WrapESSearchService(searchService) } // MultiSearch calls this function to internal client. func (c ClientWrapper) MultiSearch() es.MultiSearchService { - return WrapESMultiSearchService(c.client.MultiSearch()) + multiSearchService := c.client.MultiSearch() + if c.esVersion == 7 { + multiSearchService = multiSearchService.RestTotalHitsAsInt(true) + } + return WrapESMultiSearchService(multiSearchService) } // Close closes ESClient and flushes all data to the storage. @@ -138,21 +152,25 @@ func (c TemplateCreateServiceWrapper) Do(ctx context.Context) (*elastic.IndicesP type IndexServiceWrapper struct { bulkIndexReq *elastic.BulkIndexRequest bulkService *elastic.BulkProcessor + esVersion int } // WrapESIndexService creates an ESIndexService out of *elastic.ESIndexService. -func WrapESIndexService(indexService *elastic.BulkIndexRequest, bulkService *elastic.BulkProcessor) IndexServiceWrapper { - return IndexServiceWrapper{bulkIndexReq: indexService, bulkService: bulkService} +func WrapESIndexService(indexService *elastic.BulkIndexRequest, bulkService *elastic.BulkProcessor, esVersion int) IndexServiceWrapper { + return IndexServiceWrapper{bulkIndexReq: indexService, bulkService: bulkService, esVersion: esVersion} } // Index calls this function to internal service. func (i IndexServiceWrapper) Index(index string) es.IndexService { - return WrapESIndexService(i.bulkIndexReq.Index(index), i.bulkService) + return WrapESIndexService(i.bulkIndexReq.Index(index), i.bulkService, i.esVersion) } // Type calls this function to internal service. func (i IndexServiceWrapper) Type(typ string) es.IndexService { - return WrapESIndexService(i.bulkIndexReq.Type(typ), i.bulkService) + if i.esVersion == 7 { + return WrapESIndexService(i.bulkIndexReq, i.bulkService, i.esVersion) + } + return WrapESIndexService(i.bulkIndexReq.Type(typ), i.bulkService, i.esVersion) } // Add adds the request to bulk service @@ -172,11 +190,6 @@ func WrapESSearchService(searchService *elastic.SearchService) SearchServiceWrap return SearchServiceWrapper{searchService: searchService} } -// Type calls this function to internal service. -func (s SearchServiceWrapper) Type(typ string) es.SearchService { - return WrapESSearchService(s.searchService.Type(typ)) -} - // Size calls this function to internal service. func (s SearchServiceWrapper) Size(size int) es.SearchService { return WrapESSearchService(s.searchService.Size(size)) diff --git a/pkg/es/wrapper/wrapper_nolint.go b/pkg/es/wrapper/wrapper_nolint.go index 8f22fd4619e..c28830e1dfb 100644 --- a/pkg/es/wrapper/wrapper_nolint.go +++ b/pkg/es/wrapper/wrapper_nolint.go @@ -21,10 +21,10 @@ import "github.com/jaegertracing/jaeger/pkg/es" // Id calls this function to internal service. func (i IndexServiceWrapper) Id(id string) es.IndexService { - return WrapESIndexService(i.bulkIndexReq.Id(id), i.bulkService) + return WrapESIndexService(i.bulkIndexReq.Id(id), i.bulkService, i.esVersion) } // BodyJson calls this function to internal service. func (i IndexServiceWrapper) BodyJson(body interface{}) es.IndexService { - return WrapESIndexService(i.bulkIndexReq.Doc(body), i.bulkService) + return WrapESIndexService(i.bulkIndexReq.Doc(body), i.bulkService, i.esVersion) } diff --git a/plugin/storage/es/dependencystore/schema.go b/plugin/storage/es/dependencystore/schema.go index e901d1b240f..bb87bdb1438 100644 --- a/plugin/storage/es/dependencystore/schema.go +++ b/plugin/storage/es/dependencystore/schema.go @@ -23,3 +23,17 @@ const dependenciesMapping = `{ "` + dependencyType + `":{} } }` + +const dependenciesMapping7 = `{ + "settings":{ + "index.requests.cache.enable":true + }, + "mappings":{} +}` + +func getMapping(version int) string { + if version == 7 { + return dependenciesMapping7 + } + return dependenciesMapping +} diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index 464de9393cb..90d0eea1901 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -20,9 +20,9 @@ import ( "encoding/json" "time" + "github.com/olivere/elastic" "github.com/pkg/errors" "go.uber.org/zap" - "gopkg.in/olivere/elastic.v5" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es" @@ -67,7 +67,7 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D } func (s *DependencyStore) createIndex(indexName string) error { - _, err := s.client.CreateIndex(indexName).Body(dependenciesMapping).Do(s.ctx) + _, err := s.client.CreateIndex(indexName).Body(getMapping(s.client.GetVersion())).Do(s.ctx) if err != nil { return errors.Wrap(err, "Failed to create index") } @@ -85,7 +85,6 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { indices := getIndices(s.indexPrefix, endTs, lookback) searchResult, err := s.client.Search(indices...). - Type(dependencyType). Size(10000). // the default elasticsearch allowed limit Query(buildTSQuery(endTs, lookback)). IgnoreUnavailable(true). diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index 784edcca7e5..a9badc93e2d 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -21,11 +21,11 @@ import ( "testing" "time" + "github.com/olivere/elastic" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "go.uber.org/zap" - "gopkg.in/olivere/elastic.v5" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es/mocks" @@ -76,12 +76,18 @@ func TestWriteDependencies(t *testing.T) { createIndexError error writeError error expectedError string + esVersion int }{ { createIndexError: errors.New("index not created"), expectedError: "Failed to create index: index not created", + esVersion: 6, + }, + { + createIndexError: errors.New("index not created"), + expectedError: "Failed to create index: index not created", + esVersion: 7, }, - {}, } for _, testCase := range testCases { withDepStorage("", func(r *depStorageTest) { @@ -91,9 +97,14 @@ func TestWriteDependencies(t *testing.T) { indexService := &mocks.IndicesCreateService{} writeService := &mocks.IndexService{} r.client.On("Index").Return(writeService) + r.client.On("GetVersion").Return(testCase.esVersion) r.client.On("CreateIndex", stringMatcher(indexName)).Return(indexService) - indexService.On("Body", stringMatcher(dependenciesMapping)).Return(indexService) + if testCase.esVersion == 7 { + indexService.On("Body", stringMatcher(dependenciesMapping7)).Return(indexService) + } else { + indexService.On("Body", stringMatcher(dependenciesMapping)).Return(indexService) + } indexService.On("Do", mock.Anything).Return(nil, testCase.createIndexError) writeService.On("Index", stringMatcher(indexName)).Return(writeService) @@ -168,7 +179,6 @@ func TestGetDependencies(t *testing.T) { searchService := &mocks.SearchService{} r.client.On("Search", testCase.indices...).Return(searchService) - searchService.On("Type", stringMatcher(dependencyType)).Return(searchService) searchService.On("Size", mock.Anything).Return(searchService) searchService.On("Query", mock.Anything).Return(searchService) searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService) diff --git a/plugin/storage/es/esRollover.py b/plugin/storage/es/esRollover.py index eef7a687c6b..662d3584db7 100755 --- a/plugin/storage/es/esRollover.py +++ b/plugin/storage/es/esRollover.py @@ -80,8 +80,12 @@ def perform_action(action, client, write_alias, read_alias, index_to_rollover, t if action == 'init': shards = os.getenv('SHARDS', SHARDS) replicas = os.getenv('REPLICAS', REPLICAS) - mapping = Path('./mappings/'+template_name+'.json').read_text() - create_index_template(fix_mapping(mapping, shards, replicas), template_name) + esVersion = get_version(client) + if esVersion == 7: + mapping = Path('./mappings/'+template_name+'-7.json').read_text() + else: + mapping = Path('./mappings/'+template_name+'.json').read_text() + create_index_template(fix_mapping(mapping, shards, replicas), template_name, esVersion) index = index_to_rollover + '-000001' create_index(client, index) @@ -99,7 +103,7 @@ def perform_action(action, client, write_alias, read_alias, index_to_rollover, t sys.exit(1) -def create_index_template(template, template_name): +def create_index_template(template, template_name, esVersion): print('Creating index template {}'.format(template_name)) headers = {'Content-Type': 'application/json'} s = get_request_session(os.getenv("ES_USERNAME"), os.getenv("ES_PASSWORD"), str2bool(os.getenv("ES_TLS", 'false')), os.getenv("ES_TLS_CA"), os.getenv("ES_TLS_CERT"), os.getenv("ES_TLS_KEY")) @@ -207,6 +211,12 @@ def get_request_session(username, password, tls, ca, cert, key): return session +def get_version(client): + esVersion = client.info()['version']['number'][0] + print('Detected ElasticSearch Version {}'.format(esVersion)) + return int(esVersion) + + if __name__ == "__main__": logging.getLogger().setLevel(logging.DEBUG) main() diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index a4ca1f645cf..4961ff3d637 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -177,7 +177,7 @@ func createSpanWriter( } } - spanMapping, serviceMapping := GetMappings(cfg.GetNumShards(), cfg.GetNumReplicas()) + spanMapping, serviceMapping := GetMappings(cfg.GetNumShards(), cfg.GetNumReplicas(), client.GetVersion()) writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ Client: client, Logger: logger, @@ -199,7 +199,11 @@ func createSpanWriter( } // GetMappings returns span and service mappings -func GetMappings(shards, replicas int64) (string, string) { +func GetMappings(shards, replicas int64, esVersion int) (string, string) { + if esVersion == 7 { + return fixMapping(loadMapping("/jaeger-span-7.json"), shards, replicas), + fixMapping(loadMapping("/jaeger-service-7.json"), shards, replicas) + } return fixMapping(loadMapping("/jaeger-span.json"), shards, replicas), fixMapping(loadMapping("/jaeger-service.json"), shards, replicas) } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 960652c2c2e..7b619436ace 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -52,6 +52,7 @@ func (m *mockClientBuilder) NewClient(logger *zap.Logger, metricsFactory metrics tService.On("Body", mock.Anything).Return(tService) tService.On("Do", context.Background()).Return(nil, m.createTemplateError) c.On("CreateTemplate", mock.Anything).Return(tService) + c.On("GetVersion").Return(6) return c, nil } return nil, m.err @@ -135,13 +136,19 @@ func TestLoadTagsFromFile(t *testing.T) { } func TestFactory_LoadMapping(t *testing.T) { - spanMapping, serviceMapping := GetMappings(10, 0) + spanMapping5, serviceMapping5 := GetMappings(10, 0, 5) + spanMapping6, serviceMapping6 := GetMappings(10, 0, 6) + spanMapping7, serviceMapping7 := GetMappings(10, 0, 7) tests := []struct { name string toTest string }{ - {name: "/jaeger-span.json", toTest: spanMapping}, - {name: "/jaeger-service.json", toTest: serviceMapping}, + {name: "/jaeger-span.json", toTest: spanMapping5}, + {name: "/jaeger-service.json", toTest: serviceMapping5}, + {name: "/jaeger-span.json", toTest: spanMapping6}, + {name: "/jaeger-service.json", toTest: serviceMapping6}, + {name: "/jaeger-span-7.json", toTest: spanMapping7}, + {name: "/jaeger-service-7.json", toTest: serviceMapping7}, } for _, test := range tests { mapping := loadMapping(test.name) diff --git a/plugin/storage/es/mappings/gen_assets.go b/plugin/storage/es/mappings/gen_assets.go index b4b8ac6072b..2fdbd15c75c 100644 --- a/plugin/storage/es/mappings/gen_assets.go +++ b/plugin/storage/es/mappings/gen_assets.go @@ -213,18 +213,33 @@ var _escData = map[string]*_escFile{ name: ".nocover", local: "plugin/storage/es/mappings/.nocover", size: 43, - modtime: 1551270965, + modtime: 1563813000, compressed: ` H4sIAAAAAAAC/youSSzJzFYoSEzOTkxPVcjILy4pVkgsLcnXTU/NSy1KLElNUUjLzEkt1uMCBAAA//8y IKK1KwAAAA== `, }, + "/jaeger-service-7.json": { + name: "jaeger-service-7.json", + local: "plugin/storage/es/mappings/jaeger-service-7.json", + size: 878, + modtime: 1566212432, + compressed: ` +H4sIAAAAAAAC/8ySwW7aQBCG736K1agnBFZViR72RluqVmppBcopikaDPdibeNeb3YEEIb97ZGTAhHDL +IReP5H++35/t3SVKgXE5P6MnEQ4uglYwuCcuOIwih43JeDSAYbsYWcS4IoJuuQOZurVdcsB6hbGkkLcN +n3aIs5u/36Zz/PcTF78m8x8LxGb4NhbYVyajS3A+/f/n9/fJBWrJe+OK1HEUznFluMpjWhlrBPT489lu +4Mc1R4lpRlnJKTtaVgxawpoTpfa90PWdXizfOrImQ2HrKxKOoG/3iVK7brbfw5NDoSKiJX9gu6yrPL+r +FMjWM2h44O1THXIYnqemcHVgpGW9YdBfxl97cdPfBU9SoiXJStAgVKQDOMZN8oroOftQZxzjh9DuXNJr ++vt51/1NH2rPQQzHkxx0B3RGlvvK13Wvqh41oX0Miande7Qmh2uTNMlLAAAA//8YcMrbbgMAAA== +`, + }, + "/jaeger-service.json": { name: "jaeger-service.json", local: "plugin/storage/es/mappings/jaeger-service.json", size: 1060, - modtime: 1551270965, + modtime: 1565794261, compressed: ` H4sIAAAAAAAC/8yTT2/UMBDF7/kU1ojTamshpHLwrUARSFDQVpwQGs3Gs1mD7RjbKayqfHfk4pKkW+2J Q3PIn/F78/xz7NtGCMjsgqXMoASsvhN3HM8SxxvT8tkK1kWSOGfjuwSqOIQA4zX/ln5wW47Y7zDtKeoE @@ -236,11 +251,30 @@ TmGdQJrhQAmkbHr//7o382e5j83Y/AkAAP//qd2MzCQEAAA= `, }, + "/jaeger-span-7.json": { + name: "jaeger-span-7.json", + local: "plugin/storage/es/mappings/jaeger-span-7.json", + size: 3420, + modtime: 1566212432, + compressed: ` +H4sIAAAAAAAC/+xWXW+UQBR951eQG5+aLTEm9YG3amtsYqtp65Mxk7twYaedL2fuVjcN/93A0hYKbE2k +xhhflix3zuHcmXsO3EZxDNLk9EM4ZCZvAqQx7F0hleT3g0OzvweLuF4WiFmaMkBao+5wiVnrJXlhCxFW +6PMa/+JWiLPPp2+Oz8XHd+Li/eH50YUQ1WIc5skpmeEQeH786cPJ28MBVKNz0pSJocCUi0KSykOipJYM +6cHL3lpP39YUOCQZZitKyOBSEaTs1xTFccMLLd9DY/nGoJaZYNJOIVOA9EtTiePb9lrvh0MjGMsgNLo7 +bFtrKft34xh44whSuKbNd+tzWPSrsjTWk8ClvSFIXx287pSr7lpwyCuhkbMVpMBYJntwX66iR4iOZudt +RiH8FbJbLcmU/Ob6tT1N560jz5LCgzhgjxmdHHXlTkudlHkvERx6Mnzh0MxIGualq7cBWVpzhprmE8no ++VKOMyprSpiGnEqlZBgD5sjU1VFYr5EhBXI2Wwm9BQ6Y8/W2w1/XUigsRxVIw3WQDRHKjgO2mdIV3YYB +pAWqQIuelwYjuaWSmgKjdlM+6jYxNMk2z6awA4E7Re4U2hSvaTO8+5Tln7T9oKsGcYNqTX/saYzlZUP7 +PM+Lpv5V00l8l3m9yZuco0D+Rmb02OizJjLjZNrb5RVlDLug/6f0n5xSTwV5Mhk9W0R6Ksa6nm+sh18G +s/IPX+q/S7/jOB55dNyfveXPdm4jPpxtT0d9N2fQzT1xE5+s9W8VVdHPAAAA//+SuQbQXA0AAA== +`, + }, + "/jaeger-span.json": { name: "jaeger-span.json", local: "plugin/storage/es/mappings/jaeger-span.json", size: 3830, - modtime: 1561117834, + modtime: 1565794261, compressed: ` H4sIAAAAAAAC/+xW0W/TPhB+z18RnX5PUxf9hDQe8jbYEJPYQNt4Qsi6JpfUm2Mb+zqopv7vKE1Lm9ZJ QGoQEvShbWx/391n333xcxTHwFRZhUyQxnDygFSSO/UW9ekJTOp5T8xSlx7Senkcg9Q5fUv0vJqSE6YQ @@ -267,7 +301,9 @@ var _escDirs = map[string][]os.FileInfo{ "plugin/storage/es/mappings": { _escData["/.nocover"], + _escData["/jaeger-service-7.json"], _escData["/jaeger-service.json"], + _escData["/jaeger-span-7.json"], _escData["/jaeger-span.json"], }, } diff --git a/plugin/storage/es/mappings/jaeger-service-7.json b/plugin/storage/es/mappings/jaeger-service-7.json new file mode 100644 index 00000000000..d27627e26ce --- /dev/null +++ b/plugin/storage/es/mappings/jaeger-service-7.json @@ -0,0 +1,41 @@ +{ + "index_patterns": "*jaeger-service-*", + "settings":{ + "index.number_of_shards": ${__NUMBER_OF_SHARDS__}, + "index.number_of_replicas": ${__NUMBER_OF_REPLICAS__}, + "index.mapping.nested_fields.limit":50, + "index.requests.cache.enable":true + }, + "mappings":{ + "dynamic_templates":[ + { + "span_tags_map":{ + "mapping":{ + "type":"keyword", + "ignore_above":256 + }, + "path_match":"tag.*" + } + }, + { + "process_tags_map":{ + "mapping":{ + "type":"keyword", + "ignore_above":256 + }, + "path_match":"process.tag.*" + } + } + ], + "properties":{ + "serviceName":{ + "type":"keyword", + "ignore_above":256 + }, + "operationName":{ + "type":"keyword", + "ignore_above":256 + } + } + } +} diff --git a/plugin/storage/es/mappings/jaeger-span-7.json b/plugin/storage/es/mappings/jaeger-span-7.json new file mode 100644 index 00000000000..ad8b96c2727 --- /dev/null +++ b/plugin/storage/es/mappings/jaeger-span-7.json @@ -0,0 +1,157 @@ +{ + "index_patterns": "*jaeger-span-*", + "settings":{ + "index.number_of_shards": ${__NUMBER_OF_SHARDS__}, + "index.number_of_replicas": ${__NUMBER_OF_REPLICAS__}, + "index.mapping.nested_fields.limit":50, + "index.requests.cache.enable":true + }, + "mappings":{ + "dynamic_templates":[ + { + "span_tags_map":{ + "mapping":{ + "type":"keyword", + "ignore_above":256 + }, + "path_match":"tag.*" + } + }, + { + "process_tags_map":{ + "mapping":{ + "type":"keyword", + "ignore_above":256 + }, + "path_match":"process.tag.*" + } + } + ], + "properties":{ + "traceID":{ + "type":"keyword", + "ignore_above":256 + }, + "parentSpanID":{ + "type":"keyword", + "ignore_above":256 + }, + "spanID":{ + "type":"keyword", + "ignore_above":256 + }, + "operationName":{ + "type":"keyword", + "ignore_above":256 + }, + "startTime":{ + "type":"long" + }, + "startTimeMillis":{ + "type":"date", + "format":"epoch_millis" + }, + "duration":{ + "type":"long" + }, + "flags":{ + "type":"integer" + }, + "logs":{ + "type":"nested", + "dynamic":false, + "properties":{ + "timestamp":{ + "type":"long" + }, + "fields":{ + "type":"nested", + "dynamic":false, + "properties":{ + "key":{ + "type":"keyword", + "ignore_above":256 + }, + "value":{ + "type":"keyword", + "ignore_above":256 + }, + "tagType":{ + "type":"keyword", + "ignore_above":256 + } + } + } + } + }, + "process":{ + "properties":{ + "serviceName":{ + "type":"keyword", + "ignore_above":256 + }, + "tag":{ + "type":"object" + }, + "tags":{ + "type":"nested", + "dynamic":false, + "properties":{ + "key":{ + "type":"keyword", + "ignore_above":256 + }, + "value":{ + "type":"keyword", + "ignore_above":256 + }, + "tagType":{ + "type":"keyword", + "ignore_above":256 + } + } + } + } + }, + "references":{ + "type":"nested", + "dynamic":false, + "properties":{ + "refType":{ + "type":"keyword", + "ignore_above":256 + }, + "traceID":{ + "type":"keyword", + "ignore_above":256 + }, + "spanID":{ + "type":"keyword", + "ignore_above":256 + } + } + }, + "tag":{ + "type":"object" + }, + "tags":{ + "type":"nested", + "dynamic":false, + "properties":{ + "key":{ + "type":"keyword", + "ignore_above":256 + }, + "value":{ + "type":"keyword", + "ignore_above":256 + }, + "tagType":{ + "type":"keyword", + "ignore_above":256 + } + } + } + } + } +} diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 0989f59fb06..5b0caa50fbd 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -22,13 +22,13 @@ import ( "fmt" "time" + "github.com/olivere/elastic" "github.com/opentracing/opentracing-go" ottag "github.com/opentracing/opentracing-go/ext" otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" - "gopkg.in/olivere/elastic.v5" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es" @@ -333,7 +333,6 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st searchRequests[i] = elastic.NewSearchRequest(). IgnoreUnavailable(true). - Type(spanType). Source(s) } // set traceIDs to empty @@ -476,7 +475,6 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra jaegerIndices := s.timeRangeIndices(s.spanIndexPrefix, traceQuery.StartTimeMin, traceQuery.StartTimeMax) searchService := s.client.Search(jaegerIndices...). - Type(spanType). Size(0). // set to 0 because we don't want actual documents. Aggregation(traceIDAggregation, aggregation). IgnoreUnavailable(true). diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 65caff3c1a8..3268db9a9af 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -25,13 +25,13 @@ import ( "testing" "time" + "github.com/olivere/elastic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" "github.com/uber/jaeger-lib/metrics/metricstest" "go.uber.org/zap" - "gopkg.in/olivere/elastic.v5" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es/mocks" @@ -219,16 +219,13 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) { id1Query := elastic.NewTermQuery("traceID", model.TraceID{High: 0, Low: 1}.String()) id1Search := elastic.NewSearchRequest(). IgnoreUnavailable(true). - Type(spanType). Source(r.reader.sourceFn(id1Query, model.TimeAsEpochMicroseconds(now.Add(-time.Hour)))) id2Query := elastic.NewTermQuery("traceID", model.TraceID{High: 0, Low: 2}.String()) id2Search := elastic.NewSearchRequest(). IgnoreUnavailable(true). - Type(spanType). Source(r.reader.sourceFn(id2Query, model.TimeAsEpochMicroseconds(now.Add(-time.Hour)))) id1SearchSpanTime := elastic.NewSearchRequest(). IgnoreUnavailable(true). - Type(spanType). Source(r.reader.sourceFn(id1Query, spanID1.StartTime)) multiSearchService := &mocks.MultiSearchService{} @@ -819,8 +816,6 @@ func mockArchiveMultiSearchService(r *spanReaderTest, indexName string) *mock.Ca func mockSearchService(r *spanReaderTest) *mock.Call { searchService := &mocks.SearchService{} - searchService.On("Type", stringMatcher(serviceType)).Return(searchService) - searchService.On("Type", stringMatcher(spanType)).Return(searchService) searchService.On("Query", mock.Anything).Return(searchService) searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService) searchService.On("Size", mock.MatchedBy(func(i int) bool { diff --git a/plugin/storage/es/spanstore/service_operation.go b/plugin/storage/es/spanstore/service_operation.go index 704a2f3d82a..e394a0a20eb 100644 --- a/plugin/storage/es/spanstore/service_operation.go +++ b/plugin/storage/es/spanstore/service_operation.go @@ -21,9 +21,9 @@ import ( "hash/fnv" "time" + "github.com/olivere/elastic" "github.com/pkg/errors" "go.uber.org/zap" - "gopkg.in/olivere/elastic.v5" "github.com/jaegertracing/jaeger/pkg/cache" "github.com/jaegertracing/jaeger/pkg/es" @@ -81,7 +81,6 @@ func (s *ServiceOperationStorage) getServices(context context.Context, indices [ serviceAggregation := getServicesAggregation() searchService := s.client.Search(indices...). - Type(serviceType). Size(0). // set to 0 because we don't want actual documents. IgnoreUnavailable(true). Aggregation(servicesAggregation, serviceAggregation) @@ -112,7 +111,6 @@ func (s *ServiceOperationStorage) getOperations(context context.Context, indices serviceFilter := getOperationsAggregation() searchService := s.client.Search(indices...). - Type(serviceType). Size(0). Query(serviceQuery). IgnoreUnavailable(true). diff --git a/plugin/storage/es/spanstore/service_operation_test.go b/plugin/storage/es/spanstore/service_operation_test.go index 9427834f356..6ebbf194502 100644 --- a/plugin/storage/es/spanstore/service_operation_test.go +++ b/plugin/storage/es/spanstore/service_operation_test.go @@ -19,10 +19,10 @@ import ( "context" "testing" + "github.com/olivere/elastic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "gopkg.in/olivere/elastic.v5" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index c29ee698f6b..22bd3eb2cb5 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -19,15 +19,16 @@ import ( "context" "net/http" "os" + "strconv" "testing" "time" + "github.com/olivere/elastic" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" - "gopkg.in/olivere/elastic.v5" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es/wrapper" @@ -55,6 +56,18 @@ type ESStorageIntegration struct { logger *zap.Logger } +func (s *ESStorageIntegration) getVersion() (int, error) { + pingResult, _, err := s.client.Ping(queryURL).Do(context.Background()) + if err != nil { + return 0, err + } + esVersion, err := strconv.Atoi(string(pingResult.Version.Number[0])) + if err != nil { + return 0, err + } + return esVersion, nil +} + func (s *ESStorageIntegration) initializeES(allTagsAsFields, archive bool) error { rawClient, err := elastic.NewClient( elastic.SetURL(queryURL), @@ -66,8 +79,12 @@ func (s *ESStorageIntegration) initializeES(allTagsAsFields, archive bool) error s.client = rawClient + esVersion, err := s.getVersion() + if err != nil { + return err + } s.bulkProcessor, _ = s.client.BulkProcessor().Do(context.Background()) - client := eswrapper.WrapESClient(s.client, s.bulkProcessor) + client := eswrapper.WrapESClient(s.client, s.bulkProcessor, esVersion) dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix) s.DependencyReader = dependencyStore s.DependencyWriter = dependencyStore @@ -90,8 +107,12 @@ func (s *ESStorageIntegration) esCleanUp(allTagsAsFields, archive bool) error { func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) error { bp, _ := s.client.BulkProcessor().BulkActions(1).FlushInterval(time.Nanosecond).Do(context.Background()) - client := eswrapper.WrapESClient(s.client, bp) - spanMapping, serviceMapping := es.GetMappings(5, 1) + esVersion, err := s.getVersion() + if err != nil { + return err + } + client := eswrapper.WrapESClient(s.client, bp, esVersion) + spanMapping, serviceMapping := es.GetMappings(5, 1, client.GetVersion()) w := spanstore.NewSpanWriter( spanstore.SpanWriterParams{ Client: client, @@ -102,7 +123,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro TagDotReplacement: tagKeyDeDotChar, Archive: archive, }) - err := w.CreateTemplates(spanMapping, serviceMapping) + err = w.CreateTemplates(spanMapping, serviceMapping) if err != nil { return err } diff --git a/plugin/storage/integration/es_index_cleaner_test.go b/plugin/storage/integration/es_index_cleaner_test.go index a5e10a38d96..d70adec0ae9 100644 --- a/plugin/storage/integration/es_index_cleaner_test.go +++ b/plugin/storage/integration/es_index_cleaner_test.go @@ -22,9 +22,9 @@ import ( "os/exec" "testing" + "github.com/olivere/elastic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gopkg.in/olivere/elastic.v5" ) const ( @@ -192,7 +192,7 @@ func runEsCleaner(days int, envs []string) error { for _, e := range envs { dockerEnv += fmt.Sprintf(" -e %s", e) } - args := fmt.Sprintf("sudo docker run %s --net=host %s %d http://%s", dockerEnv, indexCleanerImage, days, queryHostPort) + args := fmt.Sprintf("docker run %s --rm --net=host %s %d http://%s", dockerEnv, indexCleanerImage, days, queryHostPort) cmd := exec.Command("/bin/sh", "-c", args) out, err := cmd.CombinedOutput() fmt.Println(string(out)) @@ -204,7 +204,7 @@ func runEsRollover(action string, envs []string) error { for _, e := range envs { dockerEnv += fmt.Sprintf(" -e %s", e) } - args := fmt.Sprintf("sudo docker run %s --rm --net=host %s %s http://%s", dockerEnv, rolloverImage, action, queryHostPort) + args := fmt.Sprintf("docker run %s --rm --net=host %s %s http://%s", dockerEnv, rolloverImage, action, queryHostPort) cmd := exec.Command("/bin/sh", "-c", args) out, err := cmd.CombinedOutput() fmt.Println(string(out)) diff --git a/plugin/storage/integration/token_propagation_test.go b/plugin/storage/integration/token_propagation_test.go index f71045362ec..8f82a988947 100644 --- a/plugin/storage/integration/token_propagation_test.go +++ b/plugin/storage/integration/token_propagation_test.go @@ -20,12 +20,15 @@ import ( "context" "encoding/json" "net/http" + "os" + "os/exec" "strings" "testing" + "time" + "github.com/olivere/elastic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gopkg.in/olivere/elastic.v5" ) const bearerToken = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsIm5hbWUiOiJKb2huIERvZSIsImlhdCI" @@ -37,6 +40,16 @@ type esTokenPropagationTestHandler struct { } func (h *esTokenPropagationTestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Return the elasticsearch version + if r.URL.Path == "/" { + ret := new(elastic.PingResult) + ret.Version.Number = "7.3.0" + json_ret, _ := json.Marshal(ret) + w.Header().Add("Content-Type", "application/json; charset=UTF-8") + w.Write(json_ret) + return + } + authValue := r.Header.Get("Authorization") if authValue != "" { headerValue := strings.Split(authValue, " ") @@ -89,6 +102,19 @@ func TestBearTokenPropagation(t *testing.T) { defer srv.Shutdown(context.Background()) go createElasticSearchMock(srv, t) + // Wait for http server to start + time.Sleep(100 * time.Millisecond) + + // Path relative to plugin/storage/integration/token_propagation_test.go + cmd := exec.Command("../../../cmd/query/query-linux", "--es.server-urls=http://127.0.0.1:9200", "--es.tls=false", "--query.bearer-token-propagation=true") + cmd.Env = []string{"SPAN_STORAGE_TYPE=elasticsearch"} + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Start() + assert.NoError(t, err) + + // Wait for query service to start + time.Sleep(100 * time.Millisecond) // Test cases. for _, testCase := range testCases { @@ -104,4 +130,6 @@ func TestBearTokenPropagation(t *testing.T) { assert.Equal(t, resp.StatusCode, http.StatusOK) } } + + cmd.Process.Kill() } diff --git a/scripts/travis/es-integration-test.sh b/scripts/travis/es-integration-test.sh index 4798e181f31..a85ece18840 100755 --- a/scripts/travis/es-integration-test.sh +++ b/scripts/travis/es-integration-test.sh @@ -2,24 +2,22 @@ set -e -docker pull docker.elastic.co/elasticsearch/elasticsearch:5.6.12 -CID=$(docker run -d -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "xpack.security.enabled=false" -e "xpack.monitoring.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:5.6.12) - -STORAGE=elasticsearch make storage-integration-test -make index-cleaner-integration-test - -docker kill $CID +run_integration_test() { + ES_VERSION=$1 + docker pull docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION} + CID=$(docker run --rm -d -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "xpack.security.enabled=false" -e "xpack.monitoring.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION}) + STORAGE=elasticsearch make storage-integration-test + make index-cleaner-integration-test + docker kill $CID +} + +run_integration_test "5.6.16" +run_integration_test "6.8.2" +run_integration_test "7.3.0" echo "Executing token propatagion test" # Mock UI, needed only for build query service. make build-crossdock-ui-placeholder GOOS=linux make build-query - -SPAN_STORAGE_TYPE=elasticsearch ./cmd/query/query-linux --es.server-urls=http://127.0.0.1:9200 --es.tls=false --query.bearer-token-propagation=true & - -PID=$(echo $!) - make token-propagation-integration-test - -kill -9 ${PID} \ No newline at end of file