Skip to content

Commit

Permalink
Add exclusion rules to filter unwanted events (#259)
Browse files Browse the repository at this point in the history
* Add exclusion rules to filter unwanted events

* Add json logic error handling

* Fix format/imports with goimport

* Rework tests to cater for out of order events

Looks like the ordering for events received from the channel is
non-deterministic. I had changed the tests in this PR to expect events
in an exact order, which passed reliably on my local machine but failed
when run by the GitHub Actions workflow. In this commit I've reverted
the existing big picture test (the one with no exclusion rules) to not
check the payload, and modified the new test (including an exclusion
rule) to:

a) wait 1 second for all events to be received on the channel
b) verify that the excluded event is not received regardless of order
c) verify that the expected number of events is received

* Fix format
  • Loading branch information
acbox committed May 15, 2023
1 parent fc7dbd3 commit f0bd290
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 5 deletions.
58 changes: 58 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,64 @@ Open your browser to http://localhost:9090.

An example of a useful query is [rate(kubewatch_event_count[5m])](<http://localhost:9090/graph?g0.range_input=1h&g0.expr=rate(kubewatch_event_count%5B1m%5D)&g0.tab=0>)

## Event filtering

Events can be excluded from Sloop by adding `exclusionRules` to the config file:

```
{
"defaultNamespace": "default",
"defaultKind": "Pod",
"defaultLookback": "1h",
[...]
"exclusionRules": {
"_all": [
{"==": [ { "var": "metadata.namespace" }, "kube-system" ]}
],
"Pod": [
{"==": [ { "var": "metadata.name" }, "sloop-0" ]}
],
"Job": [
{"in": [ { "var": "metadata.name" }, [ "cron1", "cron3" ] ]}
]
}
}`
```

Adding rules can help to reduce resources consumed by Sloop and remove unwanted noise from the UI for events that are of no interest.

### Limiting rules to specific kinds

* Rules under the special key `_all` are evaluated against events for objects of any kind
* Rules under any other key are evaluated only against objects whose kind matches the key, e.g. `Pod` only applies to pods, `Job` only applies to jobs etc.

### Rule format and supported operations

Rules should follow the [JsonLogic](https://jsonlogic.com) format and are evaluated against the json representation of the Kubernetes API object related to the event (see below).

Available operators, such as `==` and `in` shown above, are documented [here](https://jsonlogic.com/operations.html).

### Data available to rule logic

Kubernetes API conventions for [objects](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#objects) require the following keys to exist in the json data for all resources, all of which can be referenced in rules:

* `metadata`
* `spec`
* `status`

Some commonly useful fields under the `metadata` [object](https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#ObjectMeta) are:

* `name`
* `namespace`
* `labels`

#### Type specific data

Some resources contain additional type-specific fields, for example `PersistentVolumeClaimSpec` objects have fields named `selector` and `storageClassName`.

Type specific fields for each object and their corresponding keys in the object json representation are documented in the [core API](https://pkg.go.dev/k8s.io/[email protected]/core/v1), e.g. for `PersistentVolumeClaimSpec` objects the documentation is [here](https://pkg.go.dev/k8s.io/[email protected]/core/v1#PersistentVolumeClaimSpec).

## Contributing

Refer to [CONTRIBUTING.md](CONTRIBUTING.md)<br>
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/Jeffail/gabs/v2 v2.2.0
github.com/dgraph-io/badger/v2 v2.0.3
github.com/diegoholiveira/jsonlogic/v3 v3.2.7
github.com/ghodss/yaml v1.0.0
github.com/golang/glog v1.0.0
github.com/golang/protobuf v1.5.2
Expand Down Expand Up @@ -46,6 +47,8 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect
github.com/mitchellh/copystructure v1.0.0 // indirect
github.com/mitchellh/reflectwalk v1.0.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70d
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/diegoholiveira/jsonlogic/v3 v3.2.7 h1:awX07pFPnlntZzRNBcO4a2Ivxa77NMt+narq/6xcS0E=
github.com/diegoholiveira/jsonlogic/v3 v3.2.7/go.mod h1:9oE8z9G+0OMxOoLHF3fhek3KuqD5CBqM0B6XFL08MSg=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
Expand Down Expand Up @@ -220,8 +222,12 @@ github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.2 h1:hAHbPm5IJGijwng3PWk09JkG9WeqChjprR5s9bBZ+OM=
github.com/matttproud/golang_protobuf_extensions v1.0.2/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mitchellh/copystructure v1.0.0 h1:Laisrj+bAB6b/yJwB5Bt3ITZhGJdqmxquMKeZ+mmkFQ=
github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY=
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down
24 changes: 24 additions & 0 deletions pkg/sloop/common/utilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,27 @@ func Contains(stringList []string, elem string) bool {
func GetFilePath(filePath string, fileName string) string {
return path.Join(filePath, fileName)
}

func Max(x int, y int) int {
if x < y {
return y
}
return x
}

func Truncate(text string, width int, delimiter ...string) (string, error) {
d := "..."
if len(delimiter) > 0 {
d = delimiter[0]
}
d_len := len(d)
if width < 0 {
return "", fmt.Errorf("invalid width")
}
if len(text) <= width {
return text, nil
}
r := []rune(text)
truncated := r[:(Max(width, d_len) - d_len)]
return string(truncated) + d, nil
}
28 changes: 28 additions & 0 deletions pkg/sloop/common/utilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,31 @@ func Test_GetFilePath(t *testing.T) {
actualOutput := GetFilePath(filePrefix, fileName)
assert.Equal(t, expectedOutput, actualOutput)
}

func Test_Truncate_StringLongerThanWidth(t *testing.T) {
stringLong := "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec eget odio quis felis laoreet dictum."
expectedOutput := "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec eget odio quis..."
actualOutput, _ := Truncate(stringLong, 80)
assert.Equal(t, expectedOutput, actualOutput)
}

func Test_Truncate_StringShorterThanWidth(t *testing.T) {
stringMedium := "Lorem ipsum dolor"
expectedOutput := "Lorem ipsum dolor"
actualOutput, _ := Truncate(stringMedium, 80)
assert.Equal(t, expectedOutput, actualOutput)
}

func Test_Truncate_WidthShorterThanDelimiter(t *testing.T) {
stringShort := "Lorem"
expectedOutput := "..."
actualOutput, _ := Truncate(stringShort, 1)
assert.Equal(t, expectedOutput, actualOutput)
}

func Test_Truncate_StringEmpty(t *testing.T) {
stringEmpty := ""
expectedOutput := ""
actualOutput, _ := Truncate(stringEmpty, 1)
assert.Equal(t, expectedOutput, actualOutput)
}
54 changes: 53 additions & 1 deletion pkg/sloop/ingress/kubewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
package ingress

import (
"bytes"
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/diegoholiveira/jsonlogic/v3"
"github.com/golang/glog"
"github.com/golang/protobuf/ptypes"
"github.com/pkg/errors"
Expand Down Expand Up @@ -67,6 +71,7 @@ type kubeWatcherImpl struct {
stopped bool
refreshCrd *time.Ticker
currentContext string
exclusionRules map[string][]any
}

var (
Expand All @@ -79,11 +84,12 @@ var (
)

// Todo: Add additional parameters for filtering
func NewKubeWatcherSource(kubeClient kubernetes.Interface, outChan chan typed.KubeWatchResult, resync time.Duration, includeCrds bool, crdRefreshInterval time.Duration, masterURL string, kubeContext string, enableGranularMetrics bool) (KubeWatcher, error) {
func NewKubeWatcherSource(kubeClient kubernetes.Interface, outChan chan typed.KubeWatchResult, resync time.Duration, includeCrds bool, crdRefreshInterval time.Duration, masterURL string, kubeContext string, enableGranularMetrics bool, exclusionRules map[string][]any) (KubeWatcher, error) {
kw := &kubeWatcherImpl{resync: resync, protection: &sync.Mutex{}}
kw.stopChan = make(chan struct{})
kw.crdInformers = make(map[crdGroupVersionResourceKind]*crdInformerInfo)
kw.outchan = outChan
kw.exclusionRules = exclusionRules

kw.startWellKnownInformers(kubeClient, enableGranularMetrics)
if includeCrds {
Expand Down Expand Up @@ -304,6 +310,13 @@ func (i *kubeWatcherImpl) processUpdate(kind string, obj interface{}, watchResul
}
glog.V(99).Infof("processUpdate: obj json: %v", resourceJson)

eventExcluded := i.eventExcluded(kind, resourceJson)
if eventExcluded {
objName := reflect.ValueOf(obj).Elem().FieldByName("ObjectMeta").FieldByName("Name")
glog.V(2).Infof("Event for object excluded: %s/%s", kind, objName)
return
}

kubeMetadata, err := kubeextractor.ExtractMetadata(resourceJson)
if err != nil || kubeMetadata.Namespace == "" {
// We are only grabbing namespace here for a prometheus metric, so if metadata extract fails we just log and continue
Expand Down Expand Up @@ -360,6 +373,45 @@ func (i *kubeWatcherImpl) refreshCrdInformers(masterURL string, kubeContext stri
}
}

func (i *kubeWatcherImpl) getExclusionRules(kind string) []any {
kindRules, _ := i.exclusionRules[kind]
globalRules, _ := i.exclusionRules["_all"]
combinedRules := append(
kindRules,
globalRules...,
)
glog.V(common.GlogVerbose).Infof("Fetched rules: %s", combinedRules)
return combinedRules
}

func (i *kubeWatcherImpl) eventExcluded(kind string, resourceJson string) bool {
filters := i.getExclusionRules(kind)
for _, logic := range filters {
logicJson, err := json.Marshal(logic)
if err != nil {
glog.Errorf(`Failed to parse event filtering rule "%s": %s`, string(logicJson), err)
return false
}
var result bytes.Buffer
err = jsonlogic.Apply(
strings.NewReader(string(logicJson)),
strings.NewReader(resourceJson),
&result,
)
if err != nil {
glog.Errorf(`Failed to apply event filtering rule "%s": %s`, string(logicJson), err)
return false
}
resultBool := strings.Contains(result.String(), "true")
if resultBool {
truncated, _ := common.Truncate(resourceJson, 40)
glog.V(2).Infof(`Event matched logic: logic="%s" resource="%s"`, string(logicJson), truncated)
return true
}
}
return false
}

func (i *kubeWatcherImpl) Stop() {
glog.Infof("Stopping kubeWatcher")

Expand Down
78 changes: 77 additions & 1 deletion pkg/sloop/ingress/kubewatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func Test_bigPicture(t *testing.T) {
masterURL := "url"
kubeContext := "" // empty string makes things work
enableGranularMetrics := true
kw, err := NewKubeWatcherSource(kubeClient, outChan, resync, includeCrds, time.Duration(10*time.Second), masterURL, kubeContext, enableGranularMetrics)
exclusionRules := map[string][]any{}
kw, err := NewKubeWatcherSource(kubeClient, outChan, resync, includeCrds, time.Duration(10*time.Second), masterURL, kubeContext, enableGranularMetrics, exclusionRules)
assert.NoError(t, err)

// create service and await corresponding event
Expand All @@ -100,6 +101,81 @@ func Test_bigPicture(t *testing.T) {
kw.Stop()
}

// As above but specify non-default exclusion rules to exclude events for service named s2
func Test_bigPictureWithExclusionRules(t *testing.T) {
newCrdClient = newTestCrdClient(reactionListOfOne) // force startCustomInformers() to use a fake clientset

kubeClient := kubernetesFake.NewSimpleClientset()
outChan := make(chan typed.KubeWatchResult, 5)
resync := 30 * time.Minute
includeCrds := true
masterURL := "url"
kubeContext := "" // empty string makes things work
enableGranularMetrics := true
exclusionRules := map[string][]any{
"_all": []any{
map[string]any{
"==": []any{
map[string]any{
"var": "metadata.name",
},
"s2",
},
},
},
}

kw, err := NewKubeWatcherSource(kubeClient, outChan, resync, includeCrds, time.Duration(10*time.Second), masterURL, kubeContext, enableGranularMetrics, exclusionRules)
assert.NoError(t, err)

// create namespace
ns := "ns"
_, err = kubeClient.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}}, metav1.CreateOptions{})
if err != nil {
t.FailNow()
}

// create first service
svc := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "s1"}}
_, err = kubeClient.CoreV1().Services(ns).Create(context.TODO(), svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating service: %v\n", err)
}

// create second service, corresponding event should be excluded by exclusion rule
svc = &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "s2"}}
_, err = kubeClient.CoreV1().Services(ns).Create(context.TODO(), svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating service: %v\n", err)
}

// create third service
svc = &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "s3"}}
_, err = kubeClient.CoreV1().Services(ns).Create(context.TODO(), svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating service: %v\n", err)
}

eventCount := 0
loop:
for {
select {
case <-time.After(1 * time.Second):
break loop
case result, ok := <-outChan:
if ok {
eventCount++
assert.NotContains(t, result.Payload, `"name":"s2"`)
} else {
t.Fatalf("Channel closed unexpectedly: %v\n", ok)
}
}
}
assert.Equal(t, 3, eventCount) // assert no event for service named s2

kw.Stop()
}

func Test_getCrdList(t *testing.T) {
crdClient, _ := newTestCrdClient(reactionError)(&rest.Config{})
crdList, err := getCrdList(crdClient)
Expand Down
6 changes: 4 additions & 2 deletions pkg/sloop/server/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type SloopConfig struct {
// These fields can only come from command line
ConfigFile string
// These fields can only come from file because they use complex types
LeftBarLinks []webserver.LinkTemplate `json:"leftBarLinks"`
ResourceLinks []webserver.ResourceLinkTemplate `json:"resourceLinks"`
LeftBarLinks []webserver.LinkTemplate `json:"leftBarLinks"`
ResourceLinks []webserver.ResourceLinkTemplate `json:"resourceLinks"`
ExclusionRules map[string][]any `json:"exclusionRules"`
// Normal fields that can come from file or cmd line
DisableKubeWatcher bool `json:"disableKubeWatch"`
KubeWatchResyncInterval time.Duration `json:"kubeWatchResyncInterval"`
Expand Down Expand Up @@ -177,6 +178,7 @@ func getDefaultConfig() *SloopConfig {
EnableGranularMetrics: false,
PrivilegedAccess: true,
BadgerDetailLogEnabled: false,
ExclusionRules: map[string][]any{},
}
return &defaultConfig
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sloop/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func RealMain() error {
return errors.Wrap(err, "failed to create kubernetes client")
}

kubeWatcherSource, err = ingress.NewKubeWatcherSource(kubeClient, kubeWatchChan, conf.KubeWatchResyncInterval, conf.WatchCrds, conf.CrdRefreshInterval, conf.ApiServerHost, kubeContext, conf.EnableGranularMetrics)
kubeWatcherSource, err = ingress.NewKubeWatcherSource(kubeClient, kubeWatchChan, conf.KubeWatchResyncInterval, conf.WatchCrds, conf.CrdRefreshInterval, conf.ApiServerHost, kubeContext, conf.EnableGranularMetrics, conf.ExclusionRules)
if err != nil {
return errors.Wrap(err, "failed to initialize kubeWatcher")
}
Expand Down

0 comments on commit f0bd290

Please sign in to comment.