-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathelasticsearch_connector.go
90 lines (76 loc) · 2.55 KB
/
elasticsearch_connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package connect
import (
"crypto/tls"
"net/http"
"time"
"github.com/olivere/elastic/v7"
log "github.com/sirupsen/logrus"
)
// ElasticsearchConnectionOptions options for the Elasticsearch connection
type ElasticsearchConnectionOptions struct {
TLSHandshakeTimeout time.Duration
TLSInsecureSkipVerify bool
MaxIdleConnections int
MaxConnsPerHost int
SetSniff bool
SetHealthcheck bool
UseOpenTelemetry bool
}
var defaultElasticsearchConnectionOptions = &ElasticsearchConnectionOptions{
TLSHandshakeTimeout: 5 * time.Second,
TLSInsecureSkipVerify: false,
MaxIdleConnections: 2,
MaxConnsPerHost: 10,
SetSniff: false,
SetHealthcheck: false,
UseOpenTelemetry: false,
}
// ESInfoLogger :nodoc:
type ESInfoLogger struct{}
// ESErrorLogger :nodoc:
type ESErrorLogger struct{}
// ESTraceLogger :nodoc:
type ESTraceLogger struct{}
// NewElasticsearchClient :nodoc:
func NewElasticsearchClient(url string, httpClient *http.Client, opt *ElasticsearchConnectionOptions) (*elastic.Client, error) {
options := applyElasticsearchConnectionOptions(opt)
httpTranspost := &http.Transport{
TLSHandshakeTimeout: options.TLSHandshakeTimeout,
// Set true on purpose
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec
MaxIdleConnsPerHost: options.MaxIdleConnections,
MaxConnsPerHost: options.MaxConnsPerHost,
}
httpClient.Transport = httpTranspost
if options.UseOpenTelemetry {
httpClient.Transport = NewTransport(WithRoundTripper(httpTranspost))
}
return elastic.NewClient(
elastic.SetURL(url),
elastic.SetScheme("https"),
elastic.SetSniff(options.SetSniff),
elastic.SetHealthcheck(options.SetHealthcheck),
elastic.SetErrorLog(&ESErrorLogger{}),
elastic.SetInfoLog(&ESInfoLogger{}),
elastic.SetTraceLog(&ESTraceLogger{}),
elastic.SetHttpClient(httpClient),
)
}
// Printf :nodoc:
func (*ESTraceLogger) Printf(format string, values ...interface{}) {
log.WithFields(log.Fields{"type": "elasticsearch-log"}).Debugf(format, values...)
}
// Printf :nodoc:
func (*ESInfoLogger) Printf(format string, values ...interface{}) {
log.WithFields(log.Fields{"type": "elasticsearch-log"}).Infof(format, values...)
}
// Printf :nodoc:
func (*ESErrorLogger) Printf(format string, values ...interface{}) {
log.WithFields(log.Fields{"type": "elasticsearch-log"}).Errorf(format, values...)
}
func applyElasticsearchConnectionOptions(opt *ElasticsearchConnectionOptions) *ElasticsearchConnectionOptions {
if opt != nil {
return opt
}
return defaultElasticsearchConnectionOptions
}