Skip to content

Commit 46a5fcb

Browse files
authored
Merge pull request skydive-project#2369 from lebauce/peering
Add cluster peering support
2 parents 645135a + 842d08a commit 46a5fcb

25 files changed

+1187
-301
lines changed

analyzer/server.go

+32-3
Original file line numberDiff line numberDiff line change
@@ -251,16 +251,43 @@ func NewServerFromConfig() (*Server, error) {
251251
return nil, err
252252
}
253253

254-
peers, err := config.GetAnalyzerServiceAddresses()
254+
replicationPeers, err := config.GetAnalyzerServiceAddresses()
255255
if err != nil {
256-
return nil, fmt.Errorf("Unable to get the analyzers list: %s", err)
256+
return nil, fmt.Errorf("unable to get the analyzers list: %s", err)
257257
}
258258

259259
wsClientOpts, err := config.NewWSClientOpts(ClusterAuthenticationOpts())
260260
if err != nil {
261261
return nil, err
262262
}
263263

264+
clusterPeers := make(map[string]*hub.PeeringOpts)
265+
peeredClusters := config.GetStringMapString("analyzer.peers")
266+
for clusterName := range peeredClusters {
267+
peerConfig := "analyzer.peers." + clusterName
268+
endpoints, err := config.GetServiceAddresses(peerConfig + ".endpoints")
269+
if err != nil {
270+
return nil, fmt.Errorf("unable to get peers endpoints for cluster '%s': %w", clusterName, err)
271+
}
272+
peeringOpts := &hub.PeeringOpts{
273+
Endpoints: endpoints,
274+
PublisherFilter: config.GetString(peerConfig + ".publish_filter"),
275+
SubscriptionFilter: config.GetString(peerConfig + ".subscribe_filter"),
276+
}
277+
278+
if wsClientOpts, err := config.NewWSClientOpts(
279+
&shttp.AuthenticationOpts{
280+
Username: config.GetString(peerConfig + ".username"),
281+
Password: config.GetString(peerConfig + ".password"),
282+
},
283+
); err == nil {
284+
peeringOpts.WebsocketClientOpts = *wsClientOpts
285+
}
286+
287+
clusterPeers[clusterName] = peeringOpts
288+
289+
}
290+
264291
probeBundle, err := NewTopologyProbeBundleFromConfig(g)
265292
if err != nil {
266293
return nil, err
@@ -281,11 +308,13 @@ func NewServerFromConfig() (*Server, error) {
281308
GraphValidator: topology.SchemaValidator,
282309
StatusReporter: s,
283310
TLSConfig: tlsConfig,
284-
Peers: peers,
311+
ReplicationPeers: replicationPeers,
312+
ClusterPeers: clusterPeers,
285313
EtcdClient: etcdClient,
286314
TopologyMarshallers: api.TopologyMarshallers,
287315
Assets: &statics.Assets,
288316
Version: version.Version,
317+
ClusterName: config.GetString("analyzer.cluster"),
289318
}
290319

291320
if config.GetBool("etcd.embedded") {

cmd/client/edgerule.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"os"
2323

2424
api "github.com/skydive-project/skydive/api/types"
25-
gclient "github.com/skydive-project/skydive/graffiti/cmd/client"
2625
"github.com/skydive-project/skydive/graffiti/graph"
2726
"github.com/skydive-project/skydive/graffiti/logging"
2827
"github.com/skydive-project/skydive/validator"
@@ -57,7 +56,7 @@ var EdgeRuleCreate = &cobra.Command{
5756
}
5857
},
5958
Run: func(cmd *cobra.Command, args []string) {
60-
m, err := gclient.DefToMetadata(metadata, graph.Metadata{})
59+
m, err := graph.DefToMetadata(metadata, graph.Metadata{})
6160
if err != nil {
6261
exitOnError(err)
6362
}

cmd/client/noderule.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"os"
2424

2525
api "github.com/skydive-project/skydive/api/types"
26-
gclient "github.com/skydive-project/skydive/graffiti/cmd/client"
2726
"github.com/skydive-project/skydive/graffiti/graph"
2827
"github.com/skydive-project/skydive/graffiti/logging"
2928
"github.com/skydive-project/skydive/validator"
@@ -57,7 +56,7 @@ var NodeRuleCreate = &cobra.Command{
5756
SilenceUsage: false,
5857

5958
Run: func(cmd *cobra.Command, args []string) {
60-
m, err := gclient.DefToMetadata(metadata, graph.Metadata{})
59+
m, err := graph.DefToMetadata(metadata, graph.Metadata{})
6160
if err != nil {
6261
exitOnError(err)
6362
}

cmd/seed/seed.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ import (
2626

2727
"github.com/skydive-project/skydive/agent"
2828
"github.com/skydive-project/skydive/config"
29+
"github.com/skydive-project/skydive/graffiti/clients"
2930
"github.com/skydive-project/skydive/graffiti/graph"
3031
"github.com/skydive-project/skydive/graffiti/http"
3132
"github.com/skydive-project/skydive/graffiti/logging"
32-
"github.com/skydive-project/skydive/graffiti/seed"
3333
"github.com/skydive-project/skydive/graffiti/websocket"
3434
"github.com/skydive-project/skydive/probe"
3535
tp "github.com/skydive-project/skydive/topology/probes"
@@ -104,7 +104,7 @@ var SeedCmd = &cobra.Command{
104104
os.Exit(1)
105105
}
106106

107-
origin := graph.Origin(hostID, seed.Service)
107+
origin := graph.Origin(hostID, clients.SeedService)
108108
g := graph.NewGraph(hostID, memory, origin)
109109

110110
probeBundle = probe.NewBundle()
@@ -121,13 +121,12 @@ var SeedCmd = &cobra.Command{
121121
TLSConfig: tlsConfig,
122122
}
123123

124-
seed, err := seed.NewSeed(g, seed.Service, agentAddr, subscriberFilter, *wsOpts, logging.GetLogger())
124+
seed, err := clients.NewSeed(g, clients.SeedService, agentAddr, subscriberFilter, "", *wsOpts, logging.GetLogger())
125125
if err != nil {
126126
logging.GetLogger().Errorf("Failed to start seed: %s", err)
127127
os.Exit(1)
128128
}
129129

130-
seed.AddEventHandler(&seedHandler{g: g, probes: args})
131130
seed.Start()
132131

133132
logging.GetLogger().Notice("Skydive seed started")

config/config.go

+26-6
Original file line numberDiff line numberDiff line change
@@ -365,15 +365,15 @@ func (c *SkydiveConfig) SetDefault(key string, value interface{}) {
365365
c.Viper.SetDefault(key, value)
366366
}
367367

368-
// GetAnalyzerServiceAddresses returns a list of connectable Analyzers
369-
func GetAnalyzerServiceAddresses() ([]service.Address, error) {
370-
return cfg.GetAnalyzerServiceAddresses()
368+
// GetServiceAddresses returns a list of services
369+
func GetServiceAddresses(section string) ([]service.Address, error) {
370+
return cfg.GetServiceAddresses(section)
371371
}
372372

373-
// GetAnalyzerServiceAddresses returns a list of connectable Analyzers
374-
func (c *SkydiveConfig) GetAnalyzerServiceAddresses() ([]service.Address, error) {
373+
// GetServiceAddresses returns a list of services
374+
func (c *SkydiveConfig) GetServiceAddresses(section string) ([]service.Address, error) {
375375
var addresses []service.Address
376-
for _, a := range c.GetStringSlice("analyzers") {
376+
for _, a := range c.GetStringSlice(section) {
377377
sa, err := service.AddressFromString(a)
378378
if err != nil {
379379
return nil, err
@@ -390,6 +390,16 @@ func (c *SkydiveConfig) GetAnalyzerServiceAddresses() ([]service.Address, error)
390390
return addresses, nil
391391
}
392392

393+
// GetAnalyzerServiceAddresses returns a list of connectable Analyzers
394+
func GetAnalyzerServiceAddresses() ([]service.Address, error) {
395+
return cfg.GetAnalyzerServiceAddresses()
396+
}
397+
398+
// GetAnalyzerServiceAddresses returns a list of analyzers to connect to
399+
func (c *SkydiveConfig) GetAnalyzerServiceAddresses() ([]service.Address, error) {
400+
return c.GetServiceAddresses("analyzers")
401+
}
402+
393403
// GetOneAnalyzerServiceAddress returns a random connectable Analyzer
394404
func GetOneAnalyzerServiceAddress() (service.Address, error) {
395405
return cfg.GetOneAnalyzerServiceAddress()
@@ -557,6 +567,16 @@ func (c *SkydiveConfig) GetStringMapString(key string) map[string]string {
557567
return c.Viper.GetStringMapString(realKey(key))
558568
}
559569

570+
// GetStringMap returns a map of strings from the configuration
571+
func GetStringMap(key string) map[string]interface{} {
572+
return cfg.GetStringMap(key)
573+
}
574+
575+
// GetStringMapString returns a map of strings from the configuration
576+
func (c *SkydiveConfig) GetStringMap(key string) map[string]interface{} {
577+
return c.Viper.GetStringMap(realKey(key))
578+
}
579+
560580
// BindPFlag binds a command line flag to a configuration value
561581
func BindPFlag(key string, flag *pflag.Flag) error {
562582
return cfg.BindPFlag(key, flag)

config/websocket.go

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package config
1919

2020
import (
21+
"net/http"
2122
"net/url"
2223
"time"
2324

@@ -39,6 +40,7 @@ func NewWSClientOpts(authOpts *shttp.AuthenticationOpts) (*websocket.ClientOpts,
3940
WriteCompression: GetBool("http.ws.enable_write_compression"),
4041
TLSConfig: tlsConfig,
4142
AuthOpts: authOpts,
43+
Headers: http.Header{},
4244
}, nil
4345
}
4446

graffiti/forwarder/forwarder.go renamed to graffiti/clients/forwarder.go

+43-2
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
*
1616
*/
1717

18-
package forwarder
18+
package clients
1919

2020
import (
21+
"sync/atomic"
22+
23+
"github.com/skydive-project/skydive/graffiti/filters"
2124
"github.com/skydive-project/skydive/graffiti/graph"
2225
"github.com/skydive-project/skydive/graffiti/logging"
2326
"github.com/skydive-project/skydive/graffiti/messages"
@@ -31,6 +34,8 @@ type Forwarder struct {
3134
masterElection *ws.MasterElection
3235
graph *graph.Graph
3336
logger logging.Logger
37+
nodeFilter *filters.Filter
38+
inhibit atomic.Value
3439
}
3540

3641
func (t *Forwarder) triggerResync() {
@@ -68,6 +73,10 @@ func (t *Forwarder) OnNewMaster(c ws.Speaker) {
6873

6974
// OnNodeUpdated graph node updated event. Implements the EventListener interface.
7075
func (t *Forwarder) OnNodeUpdated(n *graph.Node, ops []graph.PartiallyUpdatedOp) {
76+
if t.inhibit.Load() == true || (t.nodeFilter != nil && !t.nodeFilter.Eval(n)) {
77+
return
78+
}
79+
7180
t.masterElection.SendMessageToMaster(
7281
messages.NewStructMessage(
7382
messages.NodePartiallyUpdatedMsgType,
@@ -83,20 +92,34 @@ func (t *Forwarder) OnNodeUpdated(n *graph.Node, ops []graph.PartiallyUpdatedOp)
8392

8493
// OnNodeAdded graph node added event. Implements the EventListener interface.
8594
func (t *Forwarder) OnNodeAdded(n *graph.Node) {
95+
if t.inhibit.Load() == true || (t.nodeFilter != nil && !t.nodeFilter.Eval(n)) {
96+
return
97+
}
98+
8699
t.masterElection.SendMessageToMaster(
87100
messages.NewStructMessage(messages.NodeAddedMsgType, n),
88101
)
89102
}
90103

91104
// OnNodeDeleted graph node deleted event. Implements the EventListener interface.
92105
func (t *Forwarder) OnNodeDeleted(n *graph.Node) {
106+
if t.inhibit.Load() == true || (t.nodeFilter != nil && !t.nodeFilter.Eval(n)) {
107+
return
108+
}
109+
93110
t.masterElection.SendMessageToMaster(
94111
messages.NewStructMessage(messages.NodeDeletedMsgType, n),
95112
)
96113
}
97114

98115
// OnEdgeUpdated graph edge updated event. Implements the EventListener interface.
99116
func (t *Forwarder) OnEdgeUpdated(e *graph.Edge, ops []graph.PartiallyUpdatedOp) {
117+
if t.inhibit.Load() == true || (t.nodeFilter != nil &&
118+
!t.nodeFilter.Eval(t.graph.GetEdge(e.Parent)) &&
119+
!t.nodeFilter.Eval(t.graph.GetEdge(e.Child))) {
120+
return
121+
}
122+
100123
t.masterElection.SendMessageToMaster(
101124
messages.NewStructMessage(
102125
messages.EdgePartiallyUpdatedMsgType,
@@ -112,26 +135,43 @@ func (t *Forwarder) OnEdgeUpdated(e *graph.Edge, ops []graph.PartiallyUpdatedOp)
112135

113136
// OnEdgeAdded graph edge added event. Implements the EventListener interface.
114137
func (t *Forwarder) OnEdgeAdded(e *graph.Edge) {
138+
if t.inhibit.Load() == true || (t.nodeFilter != nil &&
139+
!t.nodeFilter.Eval(t.graph.GetEdge(e.Parent)) &&
140+
!t.nodeFilter.Eval(t.graph.GetEdge(e.Child))) {
141+
return
142+
}
143+
115144
t.masterElection.SendMessageToMaster(
116145
messages.NewStructMessage(messages.EdgeAddedMsgType, e),
117146
)
118147
}
119148

120149
// OnEdgeDeleted graph edge deleted event. Implements the EventListener interface.
121150
func (t *Forwarder) OnEdgeDeleted(e *graph.Edge) {
151+
if t.inhibit.Load() == true || (t.nodeFilter != nil &&
152+
!t.nodeFilter.Eval(t.graph.GetEdge(e.Parent)) &&
153+
!t.nodeFilter.Eval(t.graph.GetEdge(e.Child))) {
154+
return
155+
}
156+
122157
t.masterElection.SendMessageToMaster(
123158
messages.NewStructMessage(messages.EdgeDeletedMsgType, e),
124159
)
125160
}
126161

162+
// Inhib node and edge forwarding
163+
func (t *Forwarder) Inhib(c ws.Speaker) {
164+
t.inhibit.Store(c != nil)
165+
}
166+
127167
// GetMaster returns the current analyzer the agent is sending its events to
128168
func (t *Forwarder) GetMaster() ws.Speaker {
129169
return t.masterElection.GetMaster()
130170
}
131171

132172
// NewForwarder returns a new Graph forwarder which forwards event of the given graph
133173
// to the given WebSocket JSON speakers.
134-
func NewForwarder(g *graph.Graph, pool ws.StructSpeakerPool, logger logging.Logger) *Forwarder {
174+
func NewForwarder(g *graph.Graph, pool ws.SpeakerPool, nodeFilter *filters.Filter, logger logging.Logger) *Forwarder {
135175
if logger == nil {
136176
logger = logging.GetLogger()
137177
}
@@ -142,6 +182,7 @@ func NewForwarder(g *graph.Graph, pool ws.StructSpeakerPool, logger logging.Logg
142182
masterElection: masterElection,
143183
graph: g,
144184
logger: logger,
185+
nodeFilter: nodeFilter,
145186
}
146187

147188
masterElection.AddEventHandler(t)

0 commit comments

Comments
 (0)