forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoutput_elasticsearch.go
152 lines (125 loc) · 4.1 KB
/
output_elasticsearch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package main
import (
"encoding/json"
"fmt"
"strings"
"github.com/packetbeat/elastigo/api"
"github.com/packetbeat/elastigo/core"
)
type ElasticsearchOutputType struct {
OutputInterface
Index string
TopologyExpire int
TopologyMap map[string]string
}
type PublishedTopology struct {
Name string
IPs string
}
var ElasticsearchOutput ElasticsearchOutputType
func (out *ElasticsearchOutputType) Init(config tomlMothership) error {
api.Domain = config.Host
api.Port = fmt.Sprintf("%d", config.Port)
api.Username = config.Username
api.Password = config.Password
api.BasePath = config.Path
if config.Protocol != "" {
api.Protocol = config.Protocol
}
if config.Index != "" {
out.Index = config.Index
} else {
out.Index = "packetbeat"
}
out.TopologyExpire = 15000
if _Config.Agent.Topology_expire != 0 {
out.TopologyExpire = _Config.Agent.Topology_expire /*sec*/ * 1000 // millisec
}
err := out.EnableTTL()
if err != nil {
ERR("Fail to set _ttl mapping: %s", err)
return err
}
INFO("[ElasticsearchOutput] Using Elasticsearch %s://%s:%s%s", api.Protocol, api.Domain, api.Port, api.BasePath)
INFO("[ElasticsearchOutput] Using index pattern [%s-]YYYY.MM.DD", out.Index)
INFO("[ElasticsearchOutput] Topology expires after %ds", out.TopologyExpire / 1000)
return nil
}
func (out *ElasticsearchOutputType) EnableTTL() error {
setting := map[string]interface{}{
"server-ip": map[string]interface{}{
"_ttl": map[string]string{"enabled": "true", "default": "15000"},
},
}
// Make sure the index exists
_, err := core.Index("packetbeat-topology", "", "", nil, nil)
if err != nil {
return err
}
_, err = core.Index("packetbeat-topology", "server-ip", "_mapping", nil, setting)
if err != nil {
return err
}
return nil
}
func (out *ElasticsearchOutputType) GetNameByIP(ip string) string {
name, exists := out.TopologyMap[ip]
if !exists {
return ""
}
return name
}
func (out *ElasticsearchOutputType) PublishIPs(name string, localAddrs []string) error {
DEBUG("output_elasticsearch", "Publish IPs %s with expiration time %d", localAddrs, out.TopologyExpire)
_, err := core.IndexWithParameters(
"packetbeat-topology", /*index*/
"server-ip", /*type*/
name, /* id */
"", /*parent id */
0, /* version */
"", /* op_type */
"", /* routing */
"", /* timestamp */
out.TopologyExpire, /*ttl*/
"", /* percolate */
"", /* timeout */
false, /*refresh */
nil, /*args */
PublishedTopology{name, strings.Join(localAddrs, ",")} /* data */)
if err != nil {
ERR("Fail to publish IP addresses: %s", err)
return err
}
out.UpdateLocalTopologyMap()
return nil
}
func (out *ElasticsearchOutputType) UpdateLocalTopologyMap() {
// get all agents IPs from Elasticsearch
TopologyMapTmp := make(map[string]string)
res, err := core.SearchUri("packetbeat-topology", "server-ip", nil)
if err == nil {
for _, server := range res.Hits.Hits {
var pub PublishedTopology
err = json.Unmarshal([]byte(*server.Source), &pub)
if err != nil {
ERR("json.Unmarshal fails with: %s", err)
}
// add mapping
ipaddrs := strings.Split(pub.IPs, ",")
for _, addr := range ipaddrs {
TopologyMapTmp[addr] = pub.Name
}
}
} else {
ERR("Getting topology map fails with: %s", err)
}
// update topology map
out.TopologyMap = TopologyMapTmp
DEBUG("output_elasticsearch", "Topology map %s", out.TopologyMap)
}
func (out *ElasticsearchOutputType) PublishEvent(event *Event) error {
index := fmt.Sprintf("%s-%d.%02d.%02d", out.Index, event.Timestamp.Year(), event.Timestamp.Month(), event.Timestamp.Day())
_, err := core.Index(index, event.Type, "", nil, event)
DEBUG("output_elasticsearch", "Publish event")
return err
}