From f0bd290fff747ece9e8801ec6832d11c744bfcaf Mon Sep 17 00:00:00 2001 From: Alex Box Date: Mon, 15 May 2023 17:48:46 +0100 Subject: [PATCH] Add exclusion rules to filter unwanted events (#259) * Add exclusion rules to filter unwanted events * Add json logic error handling * Fix format/imports with goimport * Rework tests to cater for out of order events Looks like the ordering for events received from the channel is non-deterministic. I had changed the tests in this PR to expect events in an exact order, which passed reliably on my local machine but failed when run by the GitHub Actions workflow. In this commit I've reverted the existing big picture test (the one with no exclusion rules) to not check the payload, and modified the new test (including an exclusion rule) to: a) wait 1 second for all events to be received on the channel b) verify that the excluded event is not received regardless of order c) verify that the expected number of events is received * Fix format --- README.md | 58 ++++++++++++++++ go.mod | 3 + go.sum | 6 ++ pkg/sloop/common/utilities.go | 24 +++++++ pkg/sloop/common/utilities_test.go | 28 ++++++++ pkg/sloop/ingress/kubewatcher.go | 54 ++++++++++++++- pkg/sloop/ingress/kubewatcher_test.go | 78 +++++++++++++++++++++- pkg/sloop/server/internal/config/config.go | 6 +- pkg/sloop/server/server.go | 2 +- 9 files changed, 254 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 0534285a..5a451c6e 100644 --- a/README.md +++ b/README.md @@ -165,6 +165,64 @@ Open your browser to http://localhost:9090. An example of a useful query is [rate(kubewatch_event_count[5m])]() +## Event filtering + +Events can be excluded from Sloop by adding `exclusionRules` to the config file: + +``` +{ + "defaultNamespace": "default", + "defaultKind": "Pod", + "defaultLookback": "1h", + [...] + "exclusionRules": { + "_all": [ + {"==": [ { "var": "metadata.namespace" }, "kube-system" ]} + ], + "Pod": [ + {"==": [ { "var": "metadata.name" }, "sloop-0" ]} + ], + "Job": [ + {"in": [ { "var": "metadata.name" }, [ "cron1", "cron3" ] ]} + ] + } +}` + +``` + +Adding rules can help to reduce resources consumed by Sloop and remove unwanted noise from the UI for events that are of no interest. + +### Limiting rules to specific kinds + + * Rules under the special key `_all` are evaluated against events for objects of any kind + * Rules under any other key are evaluated only against objects whose kind matches the key, e.g. `Pod` only applies to pods, `Job` only applies to jobs etc. + +### Rule format and supported operations + +Rules should follow the [JsonLogic](https://jsonlogic.com) format and are evaluated against the json representation of the Kubernetes API object related to the event (see below). + +Available operators, such as `==` and `in` shown above, are documented [here](https://jsonlogic.com/operations.html). + +### Data available to rule logic + +Kubernetes API conventions for [objects](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#objects) require the following keys to exist in the json data for all resources, all of which can be referenced in rules: + + * `metadata` + * `spec` + * `status` + +Some commonly useful fields under the `metadata` [object](https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#ObjectMeta) are: + + * `name` + * `namespace` + * `labels` + +#### Type specific data + +Some resources contain additional type-specific fields, for example `PersistentVolumeClaimSpec` objects have fields named `selector` and `storageClassName`. + +Type specific fields for each object and their corresponding keys in the object json representation are documented in the [core API](https://pkg.go.dev/k8s.io/api@v0.27.1/core/v1), e.g. for `PersistentVolumeClaimSpec` objects the documentation is [here](https://pkg.go.dev/k8s.io/api@v0.27.1/core/v1#PersistentVolumeClaimSpec). + ## Contributing Refer to [CONTRIBUTING.md](CONTRIBUTING.md)
diff --git a/go.mod b/go.mod index e2caedd6..16d7c40d 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/Jeffail/gabs/v2 v2.2.0 github.com/dgraph-io/badger/v2 v2.0.3 + github.com/diegoholiveira/jsonlogic/v3 v3.2.7 github.com/ghodss/yaml v1.0.0 github.com/golang/glog v1.0.0 github.com/golang/protobuf v1.5.2 @@ -46,6 +47,8 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/mailru/easyjson v0.7.6 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect + github.com/mitchellh/copystructure v1.0.0 // indirect + github.com/mitchellh/reflectwalk v1.0.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect diff --git a/go.sum b/go.sum index 6015f07c..9405a870 100644 --- a/go.sum +++ b/go.sum @@ -77,6 +77,8 @@ github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70d github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/diegoholiveira/jsonlogic/v3 v3.2.7 h1:awX07pFPnlntZzRNBcO4a2Ivxa77NMt+narq/6xcS0E= +github.com/diegoholiveira/jsonlogic/v3 v3.2.7/go.mod h1:9oE8z9G+0OMxOoLHF3fhek3KuqD5CBqM0B6XFL08MSg= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -220,8 +222,12 @@ github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2 h1:hAHbPm5IJGijwng3PWk09JkG9WeqChjprR5s9bBZ+OM= github.com/matttproud/golang_protobuf_extensions v1.0.2/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/mitchellh/copystructure v1.0.0 h1:Laisrj+bAB6b/yJwB5Bt3ITZhGJdqmxquMKeZ+mmkFQ= +github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY= +github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/pkg/sloop/common/utilities.go b/pkg/sloop/common/utilities.go index 2e0402ef..166152d7 100644 --- a/pkg/sloop/common/utilities.go +++ b/pkg/sloop/common/utilities.go @@ -37,3 +37,27 @@ func Contains(stringList []string, elem string) bool { func GetFilePath(filePath string, fileName string) string { return path.Join(filePath, fileName) } + +func Max(x int, y int) int { + if x < y { + return y + } + return x +} + +func Truncate(text string, width int, delimiter ...string) (string, error) { + d := "..." + if len(delimiter) > 0 { + d = delimiter[0] + } + d_len := len(d) + if width < 0 { + return "", fmt.Errorf("invalid width") + } + if len(text) <= width { + return text, nil + } + r := []rune(text) + truncated := r[:(Max(width, d_len) - d_len)] + return string(truncated) + d, nil +} diff --git a/pkg/sloop/common/utilities_test.go b/pkg/sloop/common/utilities_test.go index f30afbbd..b7d8a2f9 100644 --- a/pkg/sloop/common/utilities_test.go +++ b/pkg/sloop/common/utilities_test.go @@ -60,3 +60,31 @@ func Test_GetFilePath(t *testing.T) { actualOutput := GetFilePath(filePrefix, fileName) assert.Equal(t, expectedOutput, actualOutput) } + +func Test_Truncate_StringLongerThanWidth(t *testing.T) { + stringLong := "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec eget odio quis felis laoreet dictum." + expectedOutput := "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec eget odio quis..." + actualOutput, _ := Truncate(stringLong, 80) + assert.Equal(t, expectedOutput, actualOutput) +} + +func Test_Truncate_StringShorterThanWidth(t *testing.T) { + stringMedium := "Lorem ipsum dolor" + expectedOutput := "Lorem ipsum dolor" + actualOutput, _ := Truncate(stringMedium, 80) + assert.Equal(t, expectedOutput, actualOutput) +} + +func Test_Truncate_WidthShorterThanDelimiter(t *testing.T) { + stringShort := "Lorem" + expectedOutput := "..." + actualOutput, _ := Truncate(stringShort, 1) + assert.Equal(t, expectedOutput, actualOutput) +} + +func Test_Truncate_StringEmpty(t *testing.T) { + stringEmpty := "" + expectedOutput := "" + actualOutput, _ := Truncate(stringEmpty, 1) + assert.Equal(t, expectedOutput, actualOutput) +} diff --git a/pkg/sloop/ingress/kubewatcher.go b/pkg/sloop/ingress/kubewatcher.go index 2598473f..f9c8dfa9 100644 --- a/pkg/sloop/ingress/kubewatcher.go +++ b/pkg/sloop/ingress/kubewatcher.go @@ -8,13 +8,17 @@ package ingress import ( + "bytes" "context" "encoding/json" "fmt" + "reflect" + "strings" "sync" "sync/atomic" "time" + "github.com/diegoholiveira/jsonlogic/v3" "github.com/golang/glog" "github.com/golang/protobuf/ptypes" "github.com/pkg/errors" @@ -67,6 +71,7 @@ type kubeWatcherImpl struct { stopped bool refreshCrd *time.Ticker currentContext string + exclusionRules map[string][]any } var ( @@ -79,11 +84,12 @@ var ( ) // Todo: Add additional parameters for filtering -func NewKubeWatcherSource(kubeClient kubernetes.Interface, outChan chan typed.KubeWatchResult, resync time.Duration, includeCrds bool, crdRefreshInterval time.Duration, masterURL string, kubeContext string, enableGranularMetrics bool) (KubeWatcher, error) { +func NewKubeWatcherSource(kubeClient kubernetes.Interface, outChan chan typed.KubeWatchResult, resync time.Duration, includeCrds bool, crdRefreshInterval time.Duration, masterURL string, kubeContext string, enableGranularMetrics bool, exclusionRules map[string][]any) (KubeWatcher, error) { kw := &kubeWatcherImpl{resync: resync, protection: &sync.Mutex{}} kw.stopChan = make(chan struct{}) kw.crdInformers = make(map[crdGroupVersionResourceKind]*crdInformerInfo) kw.outchan = outChan + kw.exclusionRules = exclusionRules kw.startWellKnownInformers(kubeClient, enableGranularMetrics) if includeCrds { @@ -304,6 +310,13 @@ func (i *kubeWatcherImpl) processUpdate(kind string, obj interface{}, watchResul } glog.V(99).Infof("processUpdate: obj json: %v", resourceJson) + eventExcluded := i.eventExcluded(kind, resourceJson) + if eventExcluded { + objName := reflect.ValueOf(obj).Elem().FieldByName("ObjectMeta").FieldByName("Name") + glog.V(2).Infof("Event for object excluded: %s/%s", kind, objName) + return + } + kubeMetadata, err := kubeextractor.ExtractMetadata(resourceJson) if err != nil || kubeMetadata.Namespace == "" { // We are only grabbing namespace here for a prometheus metric, so if metadata extract fails we just log and continue @@ -360,6 +373,45 @@ func (i *kubeWatcherImpl) refreshCrdInformers(masterURL string, kubeContext stri } } +func (i *kubeWatcherImpl) getExclusionRules(kind string) []any { + kindRules, _ := i.exclusionRules[kind] + globalRules, _ := i.exclusionRules["_all"] + combinedRules := append( + kindRules, + globalRules..., + ) + glog.V(common.GlogVerbose).Infof("Fetched rules: %s", combinedRules) + return combinedRules +} + +func (i *kubeWatcherImpl) eventExcluded(kind string, resourceJson string) bool { + filters := i.getExclusionRules(kind) + for _, logic := range filters { + logicJson, err := json.Marshal(logic) + if err != nil { + glog.Errorf(`Failed to parse event filtering rule "%s": %s`, string(logicJson), err) + return false + } + var result bytes.Buffer + err = jsonlogic.Apply( + strings.NewReader(string(logicJson)), + strings.NewReader(resourceJson), + &result, + ) + if err != nil { + glog.Errorf(`Failed to apply event filtering rule "%s": %s`, string(logicJson), err) + return false + } + resultBool := strings.Contains(result.String(), "true") + if resultBool { + truncated, _ := common.Truncate(resourceJson, 40) + glog.V(2).Infof(`Event matched logic: logic="%s" resource="%s"`, string(logicJson), truncated) + return true + } + } + return false +} + func (i *kubeWatcherImpl) Stop() { glog.Infof("Stopping kubeWatcher") diff --git a/pkg/sloop/ingress/kubewatcher_test.go b/pkg/sloop/ingress/kubewatcher_test.go index 2ca40ea1..d0fe6e08 100644 --- a/pkg/sloop/ingress/kubewatcher_test.go +++ b/pkg/sloop/ingress/kubewatcher_test.go @@ -81,7 +81,8 @@ func Test_bigPicture(t *testing.T) { masterURL := "url" kubeContext := "" // empty string makes things work enableGranularMetrics := true - kw, err := NewKubeWatcherSource(kubeClient, outChan, resync, includeCrds, time.Duration(10*time.Second), masterURL, kubeContext, enableGranularMetrics) + exclusionRules := map[string][]any{} + kw, err := NewKubeWatcherSource(kubeClient, outChan, resync, includeCrds, time.Duration(10*time.Second), masterURL, kubeContext, enableGranularMetrics, exclusionRules) assert.NoError(t, err) // create service and await corresponding event @@ -100,6 +101,81 @@ func Test_bigPicture(t *testing.T) { kw.Stop() } +// As above but specify non-default exclusion rules to exclude events for service named s2 +func Test_bigPictureWithExclusionRules(t *testing.T) { + newCrdClient = newTestCrdClient(reactionListOfOne) // force startCustomInformers() to use a fake clientset + + kubeClient := kubernetesFake.NewSimpleClientset() + outChan := make(chan typed.KubeWatchResult, 5) + resync := 30 * time.Minute + includeCrds := true + masterURL := "url" + kubeContext := "" // empty string makes things work + enableGranularMetrics := true + exclusionRules := map[string][]any{ + "_all": []any{ + map[string]any{ + "==": []any{ + map[string]any{ + "var": "metadata.name", + }, + "s2", + }, + }, + }, + } + + kw, err := NewKubeWatcherSource(kubeClient, outChan, resync, includeCrds, time.Duration(10*time.Second), masterURL, kubeContext, enableGranularMetrics, exclusionRules) + assert.NoError(t, err) + + // create namespace + ns := "ns" + _, err = kubeClient.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}}, metav1.CreateOptions{}) + if err != nil { + t.FailNow() + } + + // create first service + svc := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "s1"}} + _, err = kubeClient.CoreV1().Services(ns).Create(context.TODO(), svc, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating service: %v\n", err) + } + + // create second service, corresponding event should be excluded by exclusion rule + svc = &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "s2"}} + _, err = kubeClient.CoreV1().Services(ns).Create(context.TODO(), svc, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating service: %v\n", err) + } + + // create third service + svc = &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "s3"}} + _, err = kubeClient.CoreV1().Services(ns).Create(context.TODO(), svc, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating service: %v\n", err) + } + + eventCount := 0 +loop: + for { + select { + case <-time.After(1 * time.Second): + break loop + case result, ok := <-outChan: + if ok { + eventCount++ + assert.NotContains(t, result.Payload, `"name":"s2"`) + } else { + t.Fatalf("Channel closed unexpectedly: %v\n", ok) + } + } + } + assert.Equal(t, 3, eventCount) // assert no event for service named s2 + + kw.Stop() +} + func Test_getCrdList(t *testing.T) { crdClient, _ := newTestCrdClient(reactionError)(&rest.Config{}) crdList, err := getCrdList(crdClient) diff --git a/pkg/sloop/server/internal/config/config.go b/pkg/sloop/server/internal/config/config.go index 2e2e76ef..2933299f 100644 --- a/pkg/sloop/server/internal/config/config.go +++ b/pkg/sloop/server/internal/config/config.go @@ -29,8 +29,9 @@ type SloopConfig struct { // These fields can only come from command line ConfigFile string // These fields can only come from file because they use complex types - LeftBarLinks []webserver.LinkTemplate `json:"leftBarLinks"` - ResourceLinks []webserver.ResourceLinkTemplate `json:"resourceLinks"` + LeftBarLinks []webserver.LinkTemplate `json:"leftBarLinks"` + ResourceLinks []webserver.ResourceLinkTemplate `json:"resourceLinks"` + ExclusionRules map[string][]any `json:"exclusionRules"` // Normal fields that can come from file or cmd line DisableKubeWatcher bool `json:"disableKubeWatch"` KubeWatchResyncInterval time.Duration `json:"kubeWatchResyncInterval"` @@ -177,6 +178,7 @@ func getDefaultConfig() *SloopConfig { EnableGranularMetrics: false, PrivilegedAccess: true, BadgerDetailLogEnabled: false, + ExclusionRules: map[string][]any{}, } return &defaultConfig } diff --git a/pkg/sloop/server/server.go b/pkg/sloop/server/server.go index 5abf04a8..ea7401bb 100644 --- a/pkg/sloop/server/server.go +++ b/pkg/sloop/server/server.go @@ -103,7 +103,7 @@ func RealMain() error { return errors.Wrap(err, "failed to create kubernetes client") } - kubeWatcherSource, err = ingress.NewKubeWatcherSource(kubeClient, kubeWatchChan, conf.KubeWatchResyncInterval, conf.WatchCrds, conf.CrdRefreshInterval, conf.ApiServerHost, kubeContext, conf.EnableGranularMetrics) + kubeWatcherSource, err = ingress.NewKubeWatcherSource(kubeClient, kubeWatchChan, conf.KubeWatchResyncInterval, conf.WatchCrds, conf.CrdRefreshInterval, conf.ApiServerHost, kubeContext, conf.EnableGranularMetrics, conf.ExclusionRules) if err != nil { return errors.Wrap(err, "failed to initialize kubeWatcher") }