From 99e0b8e6021435f9b5ce672c9b258ba2b18534b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Sevilla?= Date: Tue, 30 Jan 2024 12:35:37 +0100 Subject: [PATCH] Service latency measurement (#516) ## Type of change - [ ] Refactor - [x] New feature - [ ] Bug fix - [ ] Optimization - [x] Documentation Update ## Description Service ready latency measurement, please take a look at the updated docs to get more information about this feature ![image](https://github.com/cloud-bulldozer/kube-burner/assets/4614641/566df440-8961-4de1-a41e-faee4c40074e) ## Related Tickets & Documents - Closes #467 ## Checklist before requesting a review - [x] I have performed a self-review of my code. - [x] If it is a core feature, I have added thorough tests. --------- Signed-off-by: Raul Sevilla --- cmd/kube-burner/kube-burner.go | 4 +- docs/measurements.md | 118 +++++++- docs/observability/metrics.md | 2 +- go.mod | 2 +- go.sum | 4 +- hack/build_service_checker.sh | 4 + pkg/burner/create.go | 8 +- pkg/burner/delete.go | 3 +- pkg/burner/job.go | 3 +- pkg/burner/namespaces.go | 117 +------- pkg/burner/patch.go | 2 +- pkg/burner/pre_load.go | 9 +- pkg/burner/utils.go | 3 +- pkg/measurements/factory.go | 23 +- pkg/measurements/metrics/watcher.go | 6 +- pkg/measurements/pod_latency.go | 58 ++-- pkg/measurements/pprof.go | 9 +- pkg/measurements/service_latency.go | 397 +++++++++++++++++++++++++++ pkg/measurements/types/types.go | 40 +++ pkg/measurements/util/svc_checker.go | 74 +++++ pkg/measurements/vmi_latency.go | 30 +- pkg/util/namespaces.go | 139 ++++++++++ pkg/util/utils.go | 34 +++ test/k8s/kube-burner-delete.yml | 2 +- test/k8s/kube-burner.yml | 7 +- test/k8s/objectTemplates/pod.yml | 2 +- test/k8s/objectTemplates/service.yml | 13 + test/test-k8s.bats | 20 +- 28 files changed, 913 insertions(+), 220 deletions(-) create mode 100755 hack/build_service_checker.sh create mode 100644 pkg/measurements/service_latency.go create mode 100644 pkg/measurements/util/svc_checker.go create mode 100644 pkg/util/namespaces.go create mode 100644 pkg/util/utils.go create mode 100644 test/k8s/objectTemplates/service.yml diff --git a/cmd/kube-burner/kube-burner.go b/cmd/kube-burner/kube-burner.go index efce22ff1..ebd75044d 100644 --- a/cmd/kube-burner/kube-burner.go +++ b/cmd/kube-burner/kube-burner.go @@ -169,8 +169,8 @@ func destroyCmd() *cobra.Command { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() labelSelector := fmt.Sprintf("kube-burner-uuid=%s", uuid) - burner.CleanupNamespaces(ctx, labelSelector) - burner.CleanupNonNamespacedResources(ctx, labelSelector) + util.CleanupNamespaces(ctx, clientSet, labelSelector) + util.CleanupNonNamespacedResources(ctx, clientSet, burner.DynamicClient, labelSelector) }, } cmd.Flags().StringVar(&uuid, "uuid", "", "UUID") diff --git a/docs/measurements.md b/docs/measurements.md index 30fc2c4c6..ab013ee83 100644 --- a/docs/measurements.md +++ b/docs/measurements.md @@ -2,11 +2,7 @@ Kube-burner allows you to get further metrics using other mechanisms or data sources, such as the Kubernetes API. These mechanisms are called measurements. -Measurements are enabled in the measurements section of the configuration file. This section contains a list of measurements with their options. -'kube-burner' supports the following measurements so far: - -!!! Warning - `podLatency`, as any other measurement, is only captured during a benchmark runtime. It does not work with the `index` subcommand of kube-burner +Measurements are enabled in the `measurements` object of the configuration file. This object contains a list of measurements with their options. ## Pod latency @@ -17,7 +13,9 @@ Collects latencies from the different pod startup phases, these **latency metric - name: podLatency ``` -This measurement sends its metrics to a configured indexer. The metrics collected are pod latency histograms (`podLatencyMeasurement`) and four documents holding a summary with different pod latency quantiles of each pod condition (`podLatencyQuantilesMeasurement`). It is possible to skip indexing the `podLatencyMeasurement` metric by configuring the field `podLatencyMetrics` of this measurement to `quantiles`. +### Metrics + +The metrics collected are pod latency timeeries (`podLatencyMeasurement`) and four documents holding a summary with different pod latency quantiles of each pod condition (`podLatencyQuantilesMeasurement`). It's possible to skip indexing the `podLatencyMeasurement` metric by configuring the field `podLatencyMetrics` of this measurement to `quantiles`. One document, such as the following, is indexed per each pod created by the workload that enters in `Running` condition during the workload: @@ -29,12 +27,11 @@ One document, such as the following, is indexed per each pod created by the work "containersReadyLatency": 2997, "podReadyLatency": 2997, "metricName": "podLatencyMeasurement", - "jobName": "kubelet-density", "uuid": "c40b4346-7af7-4c63-9ab4-aae7ccdd0616", "namespace": "kubelet-density", "podName": "kubelet-density-13", - "jobConfig": {}, - "nodeName": "worker-001" + "nodeName": "worker-001", + "jobConfig": {"config": "params"} } ``` @@ -53,7 +50,9 @@ Pod latency quantile sample: "avg": 2876.3, "timestamp": "2020-11-15T22:26:51.553221077+01:00", "metricName": "podLatencyQuantilesMeasurement", - "jobConfig": {} + "jobConfig": { + "config": "params" + } }, { "quantileName": "PodScheduled", @@ -65,7 +64,9 @@ Pod latency quantile sample: "avg": 5.38, "timestamp": "2020-11-15T22:26:51.553225151+01:00", "metricName": "podLatencyQuantilesMeasurement", - "jobConfig": {} + "jobConfig": { + "config": "params" + } } ``` @@ -134,6 +135,101 @@ time="2023-11-19 17:46:08" level=info msg="Pod latencies error rate was: 0.00" f time="2023-11-19 17:46:08" level=info msg="👋 Exiting kube-burner vchalla" file="kube-burner.go:209" ``` +## Service latency + +Calculates the time taken the services to serve requests once their endpoints are ready. This measurement works as follows. + +```mermaid +graph LR + A[Service created] --> C{active endpoints?} + C -->|No| C + C -->|Yes| D[Save timestamp] + D --> G{TCP connectivity?} + G-->|Yes| F(Generate metric) + G -->|No| G +``` + +Where the service latency is the time elapsed since the service has at least one endpoint ready till the connectivity is verified. + +The connectivity check is done through a pod running in the `kube-burner-service-latency` namespace, kube-burner connects to this pod and uses `netcat` to verify connectivity. + +This measure is enabled with: + +```yaml + measurements: + - name: serviceLatency + svcTimeout: 5s +``` + +Where `svcTimeout`, by default `5s`, defines the maximum amount of time the measurement will wait for a service to be ready, when this timeout is met, the metric from that service is **discarded**. + +!!! warning "Considerations" + - Only TCP is supported. + - Supported services are `ClusterIP`, `NodePort` and `LoadBalancer`. + - kube-burner starts checking service connectivity when its endpoints object has at least one address. + - Make sure the endpoints of the service are correct and reachable from the pod running in the `kube-burner-service-latency`. + - When the service is `NodePort`, the connectivity check is done against the node where the connectivity check pods runs. + - By default all services created by the benchmark are tracked by this measurement, it's possible to discard service objects from tracking by annotating them with `kube-burner.io/service-latency=false`. + - Keep in mind that When service is `LoadBalancer` type, the provider needs to setup the load balancer, which adds some extra delay. + - Endpoints are pinged one after another, this can create some delay when the number of endpoints of the service is big. + +### Metrics + +The metrics collected are service latency timeseries (`svcLatencyMeasurement`) and another document that holds a summary with the different service latency quantiles (`svcLatencyQuantilesMeasurement`). It is possible to skip indexing the `svcLatencyMeasurement` metric by configuring the field `svcLatencyMetrics` of this measurement to `quantiles`. Metric documents have the following structure: + +```json +{ + "timestamp": "2023-11-19T00:41:51Z", + "ready": 1631880721, + "metricName": "svcLatencyMeasurement", + "jobConfig": { + "config": "params" + }, + "uuid": "c4558ba8-1e29-4660-9b31-02b9f01c29bf", + "namespace": "cluster-density-v2-2", + "service": "cluster-density-1", + "type": "ClusterIP" +} +``` + +!!! note + When type is `LoadBalancer`, it includes an extra field `ipAssigned`, that reports the IP assignation latency of the service. + +And the quantiles document has the structure: + +```json +{ + "quantileName": "Ready", + "uuid": "c4558ba8-1e29-4660-9b31-02b9f01c29bf", + "P99": 1867593282, + "P95": 1856488440, + "P50": 1723817691, + "max": 1868307027, + "avg": 1722308938, + "timestamp": "2023-11-19T00:42:26.663991359Z", + "metricName": "svcLatencyQuantilesMeasurement", + "jobConfig": { + "config": "params" + } +}, +{ + "quantileName": "LoadBalancer", + "uuid": "c4558ba8-1e29-4660-9b31-02b9f01c29bf", + "P99": 1467593282, + "P95": 1356488440, + "P50": 1323817691, + "max": 2168307027, + "avg": 1822308938, + "timestamp": "2023-11-19T00:42:26.663991359Z", + "metricName": "svcLatencyQuantilesMeasurement", + "jobConfig": { + "config": "params" + } +} +``` + +When there're `LoadBalancer` services, an extra document with `quantileName` as `LoadBalancer` is also generated as shown above. + ## pprof collection This measurement can be used to collect Golang profiling information from processes running in pods from the cluster. To do so, kube-burner connects to pods labeled with `labelSelector` and running in `namespace`. This measurement uses an implementation similar to `kubectl exec`, and as soon as it connects to one pod it executes the command `curl ` to get the pprof data. pprof files are collected in a regular basis configured by the parameter `pprofInterval`, the collected pprof files are downloaded from the pods to the local directory configured by the parameter `pprofDirectory` which by default is `pprof`. diff --git a/docs/observability/metrics.md b/docs/observability/metrics.md index 0db133886..414226042 100644 --- a/docs/observability/metrics.md +++ b/docs/observability/metrics.md @@ -66,7 +66,7 @@ The collected metrics have the following shape: ] ``` -Notice that kube-burner enriches the query results by adding some extra fields like `uuid`, `query`, `metricName` and `jobName`. +Notice that kube-burner enriches the query results by adding some extra fields like `uuid`, `query`, `metricName` and `jobConfig`. !!! info These extra fields are especially useful at the time of identifying and representing the collected metrics. diff --git a/go.mod b/go.mod index 63f159f96..cb8b6dc21 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( k8s.io/apimachinery v0.27.2 k8s.io/client-go v0.27.2 k8s.io/kubectl v0.27.2 - k8s.io/utils v0.0.0-20230505201702-9f6742963106 + k8s.io/utils v0.0.0-20240102154912-e7106e64919e kubevirt.io/api v0.58.0 ) diff --git a/go.sum b/go.sum index 88a915394..1bace9b5c 100644 --- a/go.sum +++ b/go.sum @@ -1042,8 +1042,8 @@ k8s.io/kubectl v0.27.2 h1:sSBM2j94MHBFRWfHIWtEXWCicViQzZsb177rNsKBhZg= k8s.io/kubectl v0.27.2/go.mod h1:GCOODtxPcrjh+EC611MqREkU8RjYBh10ldQCQ6zpFKw= k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= -k8s.io/utils v0.0.0-20230505201702-9f6742963106 h1:EObNQ3TW2D+WptiYXlApGNLVy0zm/JIBVY9i+M4wpAU= -k8s.io/utils v0.0.0-20230505201702-9f6742963106/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ= +k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= kubevirt.io/api v0.58.0 h1:qeNeRtD6AIJ5WVJuRXajmmXtnrO5dYchy+hpCm6QwhE= kubevirt.io/api v0.58.0/go.mod h1:U0CQlZR0JoJCaC+Va0wz4dMOtYDdVywJ98OT1KmOkzI= kubevirt.io/containerized-data-importer-api v1.50.0 h1:O01F8L5K8qRLnkYICIfmAu0dU0P48jdO42uFPElht38= diff --git a/hack/build_service_checker.sh b/hack/build_service_checker.sh new file mode 100755 index 000000000..eb6f19fd3 --- /dev/null +++ b/hack/build_service_checker.sh @@ -0,0 +1,4 @@ +#!/bin/bash +# +echo -e "FROM registry.fedoraproject.org/fedora-minimal:latest\nRUN microdnf install -y nmap-ncat procps-ng" | podman build --jobs=4 --platform=linux/amd64,linux/arm64,linux/ppc64le,linux/s390x --manifest=quay.io/cloud-bulldozer/fedora-nc:latest -f - . +podman manifest push quay.io/cloud-bulldozer/fedora-nc:latest diff --git a/pkg/burner/create.go b/pkg/burner/create.go index 65289ebda..da2a17dcd 100644 --- a/pkg/burner/create.go +++ b/pkg/burner/create.go @@ -118,7 +118,7 @@ func (ex *Executor) RunCreateJob(iterationStart, iterationEnd int, waitListNames } if ex.nsRequired && !ex.NamespacedIterations { ns = ex.Namespace - if err = createNamespace(ns, nsLabels, nsAnnotations); err != nil { + if err = util.CreateNamespace(ClientSet, ns, nsLabels, nsAnnotations); err != nil { log.Fatal(err.Error()) } *waitListNamespaces = append(*waitListNamespaces, ns) @@ -137,7 +137,7 @@ func (ex *Executor) RunCreateJob(iterationStart, iterationEnd int, waitListNames if ex.nsRequired && ex.NamespacedIterations { ns = ex.generateNamespace(i) if !namespacesCreated[ns] { - if err = createNamespace(ns, nsLabels, nsAnnotations); err != nil { + if err = util.CreateNamespace(ClientSet, ns, nsLabels, nsAnnotations); err != nil { log.Error(err.Error()) continue } @@ -272,7 +272,7 @@ func (ex *Executor) replicaHandler(labels map[string]string, obj object, ns stri func createRequest(gvr schema.GroupVersionResource, ns string, obj *unstructured.Unstructured, timeout time.Duration) { var uns *unstructured.Unstructured var err error - RetryWithExponentialBackOff(func() (bool, error) { + util.RetryWithExponentialBackOff(func() (bool, error) { // When the object has a namespace already specified, use it if objNs := obj.GetNamespace(); objNs != "" { ns = objNs @@ -368,7 +368,7 @@ func (ex *Executor) RunCreateJobWithChurn() { if ex.ChurnDeletionStrategy == "gvr" { CleanupNamespacesUsingGVR(ctx, *ex, namespacesToDelete) } - CleanupNamespaces(ctx, "churndelete=delete") + util.CleanupNamespaces(ctx, ClientSet, "churndelete=delete") log.Info("Re-creating deleted objects") // Re-create objects that were deleted ex.RunCreateJob(randStart, numToChurn+randStart, &[]string{}) diff --git a/pkg/burner/delete.go b/pkg/burner/delete.go index 53ba5e401..cc1c5c304 100644 --- a/pkg/burner/delete.go +++ b/pkg/burner/delete.go @@ -20,6 +20,7 @@ import ( "time" "github.com/kube-burner/kube-burner/pkg/config" + "github.com/kube-burner/kube-burner/pkg/util" log "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -65,7 +66,7 @@ func (ex *Executor) RunDeleteJob() { listOptions := metav1.ListOptions{ LabelSelector: labelSelector, } - err := RetryWithExponentialBackOff(func() (done bool, err error) { + err := util.RetryWithExponentialBackOff(func() (done bool, err error) { itemList, err = DynamicClient.Resource(obj.gvr).List(context.TODO(), listOptions) if err != nil { log.Errorf("Error found listing %s labeled with %s: %s", obj.gvr.Resource, labelSelector, err) diff --git a/pkg/burner/job.go b/pkg/burner/job.go index c78e851f2..f5ac1f0a8 100644 --- a/pkg/burner/job.go +++ b/pkg/burner/job.go @@ -31,6 +31,7 @@ import ( "github.com/kube-burner/kube-burner/pkg/config" "github.com/kube-burner/kube-burner/pkg/measurements" "github.com/kube-burner/kube-burner/pkg/prometheus" + "github.com/kube-burner/kube-burner/pkg/util" "github.com/kube-burner/kube-burner/pkg/util/metrics" log "github.com/sirupsen/logrus" "golang.org/x/time/rate" @@ -334,7 +335,7 @@ func garbageCollectJob(ctx context.Context, jobExecutor Executor, labelSelector if wg != nil { defer wg.Done() } - CleanupNamespaces(ctx, labelSelector) + util.CleanupNamespaces(ctx, ClientSet, labelSelector) for _, obj := range jobExecutor.objects { jobExecutor.limiter.Wait(ctx) if !obj.Namespaced { diff --git a/pkg/burner/namespaces.go b/pkg/burner/namespaces.go index 324f8755b..f424414cb 100644 --- a/pkg/burner/namespaces.go +++ b/pkg/burner/namespaces.go @@ -19,68 +19,16 @@ import ( "fmt" "time" + "github.com/kube-burner/kube-burner/pkg/util" log "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/dynamic" ) -func createNamespace(namespaceName string, nsLabels map[string]string, nsAnnotations map[string]string) error { - ns := corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: namespaceName, Labels: nsLabels, Annotations: nsAnnotations}, - } - return RetryWithExponentialBackOff(func() (done bool, err error) { - _, err = ClientSet.CoreV1().Namespaces().Create(context.TODO(), &ns, metav1.CreateOptions{}) - if errors.IsForbidden(err) { - log.Fatalf("authorization error creating namespace %s: %s", ns.Name, err) - return false, err - } - if errors.IsAlreadyExists(err) { - log.Infof("Namespace %s already exists", ns.Name) - nsSpec, _ := ClientSet.CoreV1().Namespaces().Get(context.TODO(), namespaceName, metav1.GetOptions{}) - if nsSpec.Status.Phase == corev1.NamespaceTerminating { - log.Warnf("Namespace %s is in %v state, retrying", namespaceName, corev1.NamespaceTerminating) - return false, nil - } - return true, nil - } else if err != nil { - log.Errorf("unexpected error creating namespace %s: %v", namespaceName, err) - return false, nil - } - log.Debugf("Created namespace: %s", ns.Name) - return true, err - }, 5*time.Second, 3, 0, 5*time.Hour) -} - -// CleanupNamespaces deletes namespaces with the given selector -func CleanupNamespaces(ctx context.Context, labelSelector string) { - ns, err := ClientSet.CoreV1().Namespaces().List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) - if err != nil { - log.Errorf("Error cleaning up namespaces: %v", err.Error()) - return - } - if len(ns.Items) > 0 { - log.Infof("Deleting namespaces with label %s", labelSelector) - for _, ns := range ns.Items { - err := ClientSet.CoreV1().Namespaces().Delete(ctx, ns.Name, metav1.DeleteOptions{}) - if err != nil { - if !errors.IsNotFound(err) { - log.Errorf("Error deleting namespace %s: %v", ns.Name, err) - } - } - } - waitForDeleteNamespaces(ctx, labelSelector) - } -} - // Cleanup resources specific to kube-burner with in a given list of namespaces func CleanupNamespacesUsingGVR(ctx context.Context, ex Executor, namespacesToDelete []string) { for _, namespace := range namespacesToDelete { - log.Infof("Deleting resources in namespace %s", namespace) labelSelector := fmt.Sprintf("kube-burner-job=%s", ex.Name) for _, obj := range ex.objects { CleanupNamespaceResourcesUsingGVR(ctx, obj, namespace, labelSelector) @@ -106,33 +54,6 @@ func CleanupNamespaceResourcesUsingGVR(ctx context.Context, obj object, namespac } } -// Cleanup non-namespaced resources with the given selector -func CleanupNonNamespacedResources(ctx context.Context, labelSelector string) { - serverResources, _ := ClientSet.Discovery().ServerPreferredResources() - log.Infof("Deleting non-namespace resources with label %s", labelSelector) - for _, resourceList := range serverResources { - for _, resource := range resourceList.APIResources { - if !resource.Namespaced { - gv, err := schema.ParseGroupVersion(resourceList.GroupVersion) - if err != nil { - log.Errorf("Unable to scan the resource group version: %v", err) - } - resourceInterface := DynamicClient.Resource(schema.GroupVersionResource{ - Group: gv.Group, - Version: gv.Version, - Resource: resource.Name, - }) - resources, err := resourceInterface.List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) - if err != nil { - log.Debugf("Unable to list resource: %s error: %v. Hence skipping it", resource.Name, err) - continue - } - deleteNonNamespacedResources(ctx, resources, resourceInterface) - } - } - } -} - // Cleanup non-namespaced resources using executor list func CleanupNonNamespacedResourcesUsingGVR(ctx context.Context, object object, labelSelector string) { log.Infof("Deleting non-namespace %v with selector %v", object.kind, labelSelector) @@ -142,41 +63,7 @@ func CleanupNonNamespacedResourcesUsingGVR(ctx context.Context, object object, l log.Debugf("Unable to list resources for object: %v error: %v. Hence skipping it", object.Object, err) return } - deleteNonNamespacedResources(ctx, resources, resourceInterface) -} - -func deleteNonNamespacedResources(ctx context.Context, resources *unstructured.UnstructuredList, resourceInterface dynamic.NamespaceableResourceInterface) { - if len(resources.Items) > 0 { - for _, item := range resources.Items { - go func(item unstructured.Unstructured) { - err := resourceInterface.Delete(ctx, item.GetName(), metav1.DeleteOptions{}) - if err != nil { - log.Errorf("Error deleting non-namespaced resources: %v", err) - } - }(item) - } - } -} - -func waitForDeleteNamespaces(ctx context.Context, labelSelector string) { - log.Info("Waiting for namespaces to be definitely deleted") - err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) { - ns, err := ClientSet.CoreV1().Namespaces().List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) - if err != nil { - return false, err - } - if len(ns.Items) == 0 { - return true, nil - } - log.Debugf("Waiting for %d namespaces labeled with %s to be deleted", len(ns.Items), labelSelector) - return false, nil - }) - if err != nil { - if ctx.Err() == context.DeadlineExceeded { - log.Fatalf("Timeout cleaning up namespaces: %v", err) - } - log.Errorf("Error cleaning up namespaces: %v", err) - } + util.DeleteNonNamespacedResources(ctx, resources, resourceInterface) } func waitForDeleteNamespacedResources(ctx context.Context, namespace string, objects []object, labelSelector string) { diff --git a/pkg/burner/patch.go b/pkg/burner/patch.go index 0d14dbad6..08fe849d4 100644 --- a/pkg/burner/patch.go +++ b/pkg/burner/patch.go @@ -94,7 +94,7 @@ func (ex *Executor) RunPatchJob() { } // Try to find the list of resources by GroupVersionResource. - err := RetryWithExponentialBackOff(func() (done bool, err error) { + err := util.RetryWithExponentialBackOff(func() (done bool, err error) { itemList, err = DynamicClient.Resource(obj.gvr).List(context.TODO(), listOptions) if err != nil { log.Errorf("Error found listing %s labeled with %s: %s", obj.gvr.Resource, labelSelector, err) diff --git a/pkg/burner/pre_load.go b/pkg/burner/pre_load.go index 83a8da4fe..04a21c45d 100644 --- a/pkg/burner/pre_load.go +++ b/pkg/burner/pre_load.go @@ -26,7 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/utils/pointer" + "k8s.io/utils/ptr" ) const preLoadNs = "preload-kube-burner" @@ -57,11 +57,10 @@ func preLoadImages(job Executor) error { } log.Infof("Pre-load: Sleeping for %v", job.PreLoadPeriod) time.Sleep(job.PreLoadPeriod) - log.Infof("Pre-load: Deleting namespace %s", preLoadNs) // 5 minutes should be more than enough to cleanup this namespace ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() - CleanupNamespaces(ctx, "kube-burner-preload=true") + util.CleanupNamespaces(ctx, ClientSet, "kube-burner-preload=true") return nil } @@ -105,7 +104,7 @@ func createDSs(imageList []string, namespaceLabels map[string]string, namespaceA for annotation, value := range namespaceAnnotations { nsAnnotations[annotation] = value } - if err := createNamespace(preLoadNs, nsLabels, nsAnnotations); err != nil { + if err := util.CreateNamespace(ClientSet, preLoadNs, nsLabels, nsAnnotations); err != nil { log.Fatal(err) } dsName := "preload" @@ -126,7 +125,7 @@ func createDSs(imageList []string, namespaceLabels map[string]string, namespaceA Labels: map[string]string{"app": dsName}, }, Spec: corev1.PodSpec{ - TerminationGracePeriodSeconds: pointer.Int64(0), + TerminationGracePeriodSeconds: ptr.To[int64](0), InitContainers: []corev1.Container{}, // Only Always restart policy is supported Containers: []corev1.Container{ diff --git a/pkg/burner/utils.go b/pkg/burner/utils.go index a4e27b989..ac43b4b6f 100644 --- a/pkg/burner/utils.go +++ b/pkg/burner/utils.go @@ -22,6 +22,7 @@ import ( "strings" "time" + "github.com/kube-burner/kube-burner/pkg/util" log "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -87,7 +88,7 @@ func (ex *Executor) Verify() bool { LabelSelector: fmt.Sprintf("kube-burner-uuid=%s,kube-burner-job=%s,kube-burner-index=%d", ex.uuid, ex.Name, objectIndex), Limit: objectLimit, } - err := RetryWithExponentialBackOff(func() (done bool, err error) { + err := util.RetryWithExponentialBackOff(func() (done bool, err error) { replicas = 0 for { objList, err = DynamicClient.Resource(obj.gvr).Namespace(metav1.NamespaceAll).List(context.TODO(), listOptions) diff --git a/pkg/measurements/factory.go b/pkg/measurements/factory.go index dd9275c21..276f7dc65 100644 --- a/pkg/measurements/factory.go +++ b/pkg/measurements/factory.go @@ -38,10 +38,11 @@ type measurementFactory struct { } type measurement interface { - start(*sync.WaitGroup) + start(*sync.WaitGroup) error stop() error collect(*sync.WaitGroup) - setConfig(types.Measurement) error + setConfig(types.Measurement) + validateConfig() error } var factory measurementFactory @@ -51,14 +52,7 @@ var globalCfg config.GlobalConfig // NewMeasurementFactory initializes the measurement facture func NewMeasurementFactory(configSpec config.Spec, indexer *indexers.Indexer, metadata map[string]interface{}) { globalCfg = configSpec.GlobalConfig - _, restConfig, err := config.GetClientSet(0, 0) - if err != nil { - log.Fatalf("Error creating clientSet: %s", err) - } - clientSet := kubernetes.NewForConfigOrDie(restConfig) factory = measurementFactory{ - clientSet: clientSet, - restConfig: restConfig, createFuncs: make(map[string]measurement), indexer: indexer, metadata: metadata, @@ -78,8 +72,9 @@ func (mf *measurementFactory) register(measurement types.Measurement, measuremen if _, exists := mf.createFuncs[measurement.Name]; exists { log.Warnf("Measurement already registered: %s", measurement.Name) } else { - if err := measurementFunc.setConfig(measurement); err != nil { - return fmt.Errorf("Config validation error: %s", err) + measurementFunc.setConfig(measurement) + if err := measurementFunc.validateConfig(); err != nil { + return fmt.Errorf("%s config error: %s", measurement.Name, err) } mf.createFuncs[measurement.Name] = measurementFunc log.Infof("📈 Registered measurement: %s", measurement.Name) @@ -89,6 +84,12 @@ func (mf *measurementFactory) register(measurement types.Measurement, measuremen func SetJobConfig(jobConfig *config.Job) { factory.jobConfig = jobConfig + _, restConfig, err := config.GetClientSet(factory.jobConfig.QPS, factory.jobConfig.Burst) + if err != nil { + log.Fatalf("Error creating clientSet: %s", err) + } + factory.clientSet = kubernetes.NewForConfigOrDie(restConfig) + factory.restConfig = restConfig } // Start starts registered measurements diff --git a/pkg/measurements/metrics/watcher.go b/pkg/measurements/metrics/watcher.go index d10ea6ad0..7cee9c345 100644 --- a/pkg/measurements/metrics/watcher.go +++ b/pkg/measurements/metrics/watcher.go @@ -28,11 +28,11 @@ const informerTimeout = time.Minute type Watcher struct { name string stopChannel chan struct{} - Informer cache.SharedInformer + Informer cache.SharedIndexInformer } // NewWatcher return a new ListWatcher of the specified resource and namespace -func NewWatcher(restClient *rest.RESTClient, name string, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *Watcher { +func NewWatcher(restClient *rest.RESTClient, name string, resource string, namespace string, optionsModifier func(options *metav1.ListOptions), indexers cache.Indexers) *Watcher { lw := cache.NewFilteredListWatchFromClient( restClient, resource, @@ -42,7 +42,7 @@ func NewWatcher(restClient *rest.RESTClient, name string, resource string, names return &Watcher{ name: name, stopChannel: make(chan struct{}), - Informer: cache.NewSharedInformer(lw, nil, 0), + Informer: cache.NewSharedIndexInformer(lw, nil, 0, indexers), } } diff --git a/pkg/measurements/pod_latency.go b/pkg/measurements/pod_latency.go index 59728ec9e..bf8ebf701 100644 --- a/pkg/measurements/pod_latency.go +++ b/pkg/measurements/pod_latency.go @@ -67,7 +67,9 @@ type podLatency struct { } func init() { - measurementMap["podLatency"] = &podLatency{} + measurementMap["podLatency"] = &podLatency{ + metrics: make(map[string]podMetric), + } } func (p *podLatency) handleCreatePod(obj interface{}) { @@ -120,20 +122,37 @@ func (p *podLatency) handleUpdatePod(obj interface{}) { } } -func (p *podLatency) setConfig(cfg types.Measurement) error { +func (p *podLatency) setConfig(cfg types.Measurement) { p.config = cfg - if err := p.validateConfig(); err != nil { - return err +} + +func (p *podLatency) validateConfig() error { + var metricFound bool + var latencyMetrics = []string{"P99", "P95", "P50", "Avg", "Max"} + for _, th := range p.config.LatencyThresholds { + if th.ConditionType == string(corev1.ContainersReady) || th.ConditionType == string(corev1.PodInitialized) || th.ConditionType == string(corev1.PodReady) || th.ConditionType == string(corev1.PodScheduled) { + for _, lm := range latencyMetrics { + if th.Metric == lm { + metricFound = true + break + } + } + if !metricFound { + return fmt.Errorf("unsupported metric %s in podLatency measurement, supported are: %s", th.Metric, strings.Join(latencyMetrics, ", ")) + } + } else { + return fmt.Errorf("unsupported pod condition type in podLatency measurement: %s", th.ConditionType) + } } return nil } -// start starts podLatency measurement -func (p *podLatency) start(measurementWg *sync.WaitGroup) { +// start podLatency measurement +func (p *podLatency) start(measurementWg *sync.WaitGroup) error { defer measurementWg.Done() if factory.jobConfig.JobType == config.DeletionJob { log.Info("Pod latency measurement not compatible with delete jobs, skipping") - return + return nil } p.metrics = make(map[string]podMetric) log.Infof("Creating Pod latency watcher for %s", factory.jobConfig.Name) @@ -143,8 +162,9 @@ func (p *podLatency) start(measurementWg *sync.WaitGroup) { "pods", corev1.NamespaceAll, func(options *metav1.ListOptions) { - options.LabelSelector = fmt.Sprintf("kube-burner-runid=%s", globalCfg.RUNID) + options.LabelSelector = fmt.Sprintf("kube-burner-runid=%v", globalCfg.RUNID) }, + nil, ) p.watcher.Informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: p.handleCreatePod, @@ -155,6 +175,7 @@ func (p *podLatency) start(measurementWg *sync.WaitGroup) { if err := p.watcher.StartAndCacheSync(); err != nil { log.Errorf("Pod Latency measurement error: %s", err) } + return nil } // collects pod measurements triggered in the past @@ -340,24 +361,3 @@ func (p *podLatency) calcQuantiles() { p.latencyQuantiles = append(p.latencyQuantiles, calcSummary(string(podCondition), latencies)) } } - -func (p *podLatency) validateConfig() error { - var metricFound bool - var latencyMetrics = []string{"P99", "P95", "P50", "Avg", "Max"} - for _, th := range p.config.LatencyThresholds { - if th.ConditionType == string(corev1.ContainersReady) || th.ConditionType == string(corev1.PodInitialized) || th.ConditionType == string(corev1.PodReady) || th.ConditionType == string(corev1.PodScheduled) { - for _, lm := range latencyMetrics { - if th.Metric == lm { - metricFound = true - break - } - } - if !metricFound { - return fmt.Errorf("unsupported metric %s in podLatency measurement, supported are: %s", th.Metric, strings.Join(latencyMetrics, ", ")) - } - } else { - return fmt.Errorf("unsupported pod condition type in podLatency measurement: %s", th.ConditionType) - } - } - return nil -} diff --git a/pkg/measurements/pprof.go b/pkg/measurements/pprof.go index 9f57d806e..a3110a988 100644 --- a/pkg/measurements/pprof.go +++ b/pkg/measurements/pprof.go @@ -45,15 +45,11 @@ func init() { measurementMap["pprof"] = &pprof{} } -func (p *pprof) setConfig(cfg types.Measurement) error { +func (p *pprof) setConfig(cfg types.Measurement) { p.config = cfg - if err := p.validateConfig(); err != nil { - return err - } - return nil } -func (p *pprof) start(measurementWg *sync.WaitGroup) { +func (p *pprof) start(measurementWg *sync.WaitGroup) error { defer measurementWg.Done() var wg sync.WaitGroup err := os.MkdirAll(p.config.PProfDirectory, 0744) @@ -79,6 +75,7 @@ func (p *pprof) start(measurementWg *sync.WaitGroup) { } } }() + return nil } func getPods(target types.PProftarget) []corev1.Pod { diff --git a/pkg/measurements/service_latency.go b/pkg/measurements/service_latency.go new file mode 100644 index 000000000..88fcd4532 --- /dev/null +++ b/pkg/measurements/service_latency.go @@ -0,0 +1,397 @@ +// Copyright 2023 The Kube-burner Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package measurements + +import ( + "bytes" + "context" + "fmt" + "sync" + "time" + + "github.com/cloud-bulldozer/go-commons/indexers" + "github.com/kube-burner/kube-burner/pkg/config" + "github.com/kube-burner/kube-burner/pkg/measurements/metrics" + "github.com/kube-burner/kube-burner/pkg/measurements/types" + kutil "github.com/kube-burner/kube-burner/pkg/util" + log "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + lcorev1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/remotecommand" + "k8s.io/kubectl/pkg/scheme" +) + +const ( + svcLatencyMetric = "svcLatencyMeasurement" + svcLatencyQuantilesMeasurement = "svcLatencyQuantilesMeasurement" +) + +type serviceLatency struct { + config types.Measurement + svcWatcher *metrics.Watcher + epWatcher *metrics.Watcher + epLister lcorev1.EndpointsLister + svcLister lcorev1.ServiceLister + metrics map[string]svcMetric + latencyQuantiles []interface{} + normLatencies []interface{} + metricLock sync.RWMutex +} + +type svcMetric struct { + Timestamp time.Time `json:"timestamp"` + IPAssignedLatency time.Duration `json:"ipAssigned,omitempty"` + ReadyLatency time.Duration `json:"ready"` + MetricName string `json:"metricName"` + JobConfig config.Job `json:"jobConfig"` + UUID string `json:"uuid"` + Namespace string `json:"namespace"` + Name string `json:"service"` + Metadata interface{} `json:"metadata,omitempty"` + ServiceType corev1.ServiceType `json:"type"` +} + +type SvcLatencyChecker struct { + pod *corev1.Pod + clientSet kubernetes.Clientset + restConfig rest.Config +} + +func init() { + measurementMap["serviceLatency"] = &serviceLatency{ + metrics: map[string]svcMetric{}, + } +} + +func deployAssets() error { + var err error + if err = kutil.CreateNamespace(factory.clientSet, types.SvcLatencyNs, nil, nil); err != nil { + return err + } + if _, err = factory.clientSet.CoreV1().Pods(types.SvcLatencyNs).Create(context.TODO(), types.SvcLatencyCheckerPod, metav1.CreateOptions{}); err != nil { + if errors.IsAlreadyExists(err) { + log.Warn(err) + } else { + return err + } + } + err = wait.PollUntilContextCancel(context.TODO(), 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { + pod, err := factory.clientSet.CoreV1().Pods(types.SvcLatencyNs).Get(context.TODO(), types.SvcLatencyCheckerPod.Name, metav1.GetOptions{}) + if err != nil { + return true, err + } + if pod.Status.Phase != corev1.PodRunning { + return false, nil + } + return true, nil + }) + return err +} + +func (s *serviceLatency) handleCreateSvc(obj interface{}) { + // TODO Magic annotation to skip service + svc := obj.(*corev1.Service) + if annotation, ok := svc.Annotations["kube-burner.io/service-latency"]; ok { + if annotation == "false" { + log.Debugf("Annotation found, discarding service %v/%v", svc.Namespace, svc.Name) + } + } + log.Debugf("Handling service: %v/%v", svc.Namespace, svc.Name) + go func(svc *corev1.Service) { + var ips []string + var port int32 + var ipAssignedLatency time.Duration + now := time.Now() + // If service is loadbalancer first wait for the IP assignment + if svc.Spec.Type == corev1.ServiceTypeLoadBalancer { + if err := s.waitForIngress(svc); err != nil { + log.Fatal(err) + } + ipAssignedLatency = time.Since(now) + } + if err := s.waitForEndpoints(svc); err != nil { + log.Fatal(err) + } + endpointsReadyTs := time.Now().UTC() + log.Debugf("Endpoints %v/%v ready", svc.Namespace, svc.Name) + svcLatencyChecker, err := newSvcLatencyChecker(*factory.clientSet, *factory.restConfig) + if err != nil { + log.Error(err) + } + for _, specPort := range svc.Spec.Ports { + if specPort.Protocol == corev1.ProtocolTCP { // Support TCP protocol + switch svc.Spec.Type { + case corev1.ServiceTypeClusterIP: + ips = svc.Spec.ClusterIPs + port = specPort.Port + case corev1.ServiceTypeNodePort: + ips = []string{svcLatencyChecker.pod.Status.HostIP} + port = specPort.NodePort + case corev1.ServiceTypeLoadBalancer: + for _, ingress := range svc.Status.LoadBalancer.Ingress { + if ingress.IP != "" { + ips = append(ips, ingress.Hostname) + } else { + ips = append(ips, ingress.IP) + } + } + port = specPort.Port + default: + log.Warnf("Service type %v not supported, skipping", svc.Spec.Type) + return + } + for _, ip := range ips { + err = svcLatencyChecker.ping(ip, port, s.config.ServiceTimeout) + if err != nil { + log.Error(err) + return + } + } + } + } + svcLatency := time.Since(endpointsReadyTs) + log.Debugf("Service %v/%v latency was: %vms", svc.Namespace, svc.Name, svcLatency.Milliseconds()) + s.metricLock.Lock() + s.metrics[string(svc.UID)] = svcMetric{ + Name: svc.Name, + Namespace: svc.Namespace, + Timestamp: svc.CreationTimestamp.Time.UTC(), + MetricName: svcLatencyMetric, + ServiceType: svc.Spec.Type, + ReadyLatency: svcLatency, + JobConfig: *factory.jobConfig, + UUID: globalCfg.UUID, + Metadata: factory.metadata, + IPAssignedLatency: ipAssignedLatency, + } + s.metricLock.Unlock() + }(svc) +} + +func (s *serviceLatency) setConfig(cfg types.Measurement) { + s.config = cfg +} + +func (s *serviceLatency) validateConfig() error { + if s.config.ServiceTimeout == 0 { + return fmt.Errorf("svcTimeout cannot be 0") + } + return nil +} + +// start service latency measurement +func (s *serviceLatency) start(measurementWg *sync.WaitGroup) error { + defer measurementWg.Done() + if err := deployAssets(); err != nil { + log.Fatal(err) + return err + } + log.Infof("Creating service latency watcher for %s", factory.jobConfig.Name) + s.svcWatcher = metrics.NewWatcher( + factory.clientSet.CoreV1().RESTClient().(*rest.RESTClient), + "svcWatcher", + "services", + corev1.NamespaceAll, + func(options *metav1.ListOptions) { + options.LabelSelector = fmt.Sprintf("kube-burner-runid=%v", globalCfg.RUNID) + }, + cache.Indexers{}, + ) + s.svcWatcher.Informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: s.handleCreateSvc, + }) + s.epWatcher = metrics.NewWatcher( + factory.clientSet.CoreV1().RESTClient().(*rest.RESTClient), + "epWatcher", + "endpoints", + corev1.NamespaceAll, + func(options *metav1.ListOptions) {}, + cache.Indexers{}, + ) + // Use an endpoints lister to reduce and optimize API interactions in waitForEndpoints + s.svcLister = lcorev1.NewServiceLister(s.svcWatcher.Informer.GetIndexer()) + s.epLister = lcorev1.NewEndpointsLister(s.epWatcher.Informer.GetIndexer()) + if err := s.svcWatcher.StartAndCacheSync(); err != nil { + log.Errorf("Service Latency measurement error: %s", err) + } + if err := s.epWatcher.StartAndCacheSync(); err != nil { + log.Errorf("Service Latency measurement error: %s", err) + } + return nil +} + +func (s *serviceLatency) stop() error { + s.svcWatcher.StopWatcher() + s.epWatcher.StopWatcher() + // 5 minutes should be more than enough to cleanup this namespace + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + kutil.CleanupNamespaces(ctx, factory.clientSet, fmt.Sprintf("kubernetes.io/metadata.name=%s", types.SvcLatencyNs)) + s.normalizeMetrics() + if globalCfg.IndexerConfig.Type != "" { + if factory.jobConfig.SkipIndexing { + log.Infof("Skipping service latency data indexing in job: %s", factory.jobConfig.Name) + } else { + log.Infof("Indexing service latency data for job: %s", factory.jobConfig.Name) + s.index() + } + } + for _, q := range s.latencyQuantiles { + pq := q.(metrics.LatencyQuantiles) + // Divide nanoseconds by 1e6 to get milliseconds + log.Infof("%s: %s 50th: %dms 99th: %dms max: %dms avg: %dms", factory.jobConfig.Name, pq.QuantileName, pq.P50/1e6, pq.P99/1e6, pq.Max/1e6, pq.Avg/1e6) + } + return nil +} + +func (s *serviceLatency) normalizeMetrics() { + var latencies []float64 + var ipAssignedLatencies []float64 + for _, metric := range s.metrics { + latencies = append(latencies, float64(metric.ReadyLatency)) + s.normLatencies = append(s.normLatencies, metric) + if metric.IPAssignedLatency != 0 { + ipAssignedLatencies = append(ipAssignedLatencies, float64(metric.IPAssignedLatency)) + } + } + calcSummary := func(name string, inputLatencies []float64) metrics.LatencyQuantiles { + latencySummary := metrics.NewLatencySummary(inputLatencies, name) + latencySummary.UUID = globalCfg.UUID + latencySummary.JobConfig = *factory.jobConfig + latencySummary.Timestamp = time.Now().UTC() + latencySummary.Metadata = factory.metadata + latencySummary.MetricName = svcLatencyQuantilesMeasurement + return latencySummary + } + if len(s.metrics) > 0 { + s.latencyQuantiles = append(s.latencyQuantiles, calcSummary("Ready", latencies)) + } + if len(ipAssignedLatencies) > 0 { + s.latencyQuantiles = append(s.latencyQuantiles, calcSummary("IPAssigned", ipAssignedLatencies)) + } +} + +func (s *serviceLatency) index() { + metricMap := map[string][]interface{}{ + svcLatencyMetric: s.normLatencies, + svcLatencyQuantilesMeasurement: s.latencyQuantiles, + } + if s.config.ServiceLatencyMetrics == types.Quantiles { + delete(metricMap, svcLatencyMetric) + } + for metricName, documents := range metricMap { + indexingOpts := indexers.IndexingOpts{ + MetricName: fmt.Sprintf("%s-%s", metricName, factory.jobConfig.Name), + } + log.Debugf("Indexing [%d] documents: %s", len(documents), metricName) + resp, err := (*factory.indexer).Index(documents, indexingOpts) + if err != nil { + log.Error(err.Error()) + } else { + log.Info(resp) + } + } +} + +func (s *serviceLatency) waitForEndpoints(svc *corev1.Service) error { + err := wait.PollUntilContextCancel(context.TODO(), 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { + endpoints, err := s.epLister.Endpoints(svc.Namespace).Get(svc.Name) + if err != nil { + return false, nil + } + for _, subset := range endpoints.Subsets { + if len(subset.Addresses) > 0 { + return true, nil + } + } + return false, nil + }) + return err +} + +func (s *serviceLatency) waitForIngress(svc *corev1.Service) error { + err := wait.PollUntilContextCancel(context.TODO(), 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { + svc, err := s.svcLister.Services(svc.Namespace).Get(svc.Name) + if err != nil { + return false, nil + } + if len(svc.Status.LoadBalancer.Ingress) > 0 { + return true, nil + } + return false, nil + }) + return err +} + +func (s *serviceLatency) collect(measurementWg *sync.WaitGroup) { + defer measurementWg.Done() +} + +func newSvcLatencyChecker(clientSet kubernetes.Clientset, restConfig rest.Config) (SvcLatencyChecker, error) { + pod, err := clientSet.CoreV1().Pods(types.SvcLatencyNs).Get(context.TODO(), types.SvcLatencyCheckerName, metav1.GetOptions{}) + if err != nil { + return SvcLatencyChecker{}, err + } + return SvcLatencyChecker{ + pod: pod, + clientSet: clientSet, + restConfig: restConfig, + }, nil +} + +func (lc *SvcLatencyChecker) ping(address string, port int32, timeout time.Duration) error { + var stdout, stderr bytes.Buffer + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + // We use 50ms precision thanks to sleep 0.05 + cmd := []string{"bash", "-c", fmt.Sprintf("while true; do nc -w 0.05s -z %s %d && break; sleep 0.05; done", address, port)} + req := lc.clientSet.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(lc.pod.Name). + Namespace(lc.pod.Namespace). + SubResource("exec") + req.VersionedParams(&corev1.PodExecOptions{ + Container: types.SvcLatencyCheckerName, + Stdin: false, + Stdout: true, + Stderr: true, + Command: cmd, + TTY: false, + }, scheme.ParameterCodec) + err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { + exec, err := remotecommand.NewSPDYExecutor(&lc.restConfig, "POST", req.URL()) + if err != nil { + return false, err + } + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdout: &stdout, + Stderr: &stderr, + }) + if err != nil { + return false, err + } + return true, nil + }) + if ctx.Err() == context.DeadlineExceeded { + return fmt.Errorf("timeout waiting for endpoint %s:%d to be ready", address, port) + } + return err +} diff --git a/pkg/measurements/types/types.go b/pkg/measurements/types/types.go index 6a87fbeb6..18706ac0d 100644 --- a/pkg/measurements/types/types.go +++ b/pkg/measurements/types/types.go @@ -16,6 +16,11 @@ package types import ( "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + "k8s.io/utils/ptr" ) type latencyMetric string @@ -32,6 +37,7 @@ func (m *Measurement) UnmarshalMeasurement(unmarshal func(interface{}) error) er measurement := rawMeasurement{ PProfDirectory: pprofDirectory, PodLatencyMetrics: All, + ServiceTimeout: 5 * time.Second, } if err := unmarshal(&measurement); err != nil { return err @@ -54,6 +60,10 @@ type Measurement struct { PProfDirectory string `yaml:"pprofDirectory"` // Pod latency metrics to index PodLatencyMetrics latencyMetric `yaml:"podLatencyMetrics"` + // Service latency metrics to index + ServiceLatencyMetrics latencyMetric `yaml:"svcLatencyMetrics"` + // Service latency endpoint timeout + ServiceTimeout time.Duration `yaml:"svcTimeout"` } // LatencyThreshold holds the thresholds configuration @@ -87,3 +97,33 @@ type PProftarget struct { // Key Private key content Key string `yaml:"key"` } + +const ( + SvcLatencyNs = "kube-burner-service-latency" + SvcLatencyCheckerName = "svc-checker" +) + +var SvcLatencyCheckerPod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: SvcLatencyCheckerName, + Namespace: SvcLatencyNs, + }, + Spec: corev1.PodSpec{ + TerminationGracePeriodSeconds: ptr.To[int64](0), + Containers: []corev1.Container{ + { + Image: "quay.io/cloud-bulldozer/fedora-nc:latest", + Command: []string{"sleep", "inf"}, + Name: SvcLatencyCheckerName, + ImagePullPolicy: corev1.PullAlways, + SecurityContext: &corev1.SecurityContext{ + AllowPrivilegeEscalation: pointer.Bool(false), + Capabilities: &corev1.Capabilities{Drop: []corev1.Capability{"ALL"}}, + RunAsNonRoot: pointer.Bool(true), + SeccompProfile: &corev1.SeccompProfile{Type: corev1.SeccompProfileTypeRuntimeDefault}, + RunAsUser: pointer.Int64(1000), + }, + }, + }, + }, +} diff --git a/pkg/measurements/util/svc_checker.go b/pkg/measurements/util/svc_checker.go new file mode 100644 index 000000000..c33990d51 --- /dev/null +++ b/pkg/measurements/util/svc_checker.go @@ -0,0 +1,74 @@ +package util + +import ( + "bytes" + "context" + "fmt" + "time" + + "github.com/kube-burner/kube-burner/pkg/measurements/types" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" + "k8s.io/kubectl/pkg/scheme" +) + +type SvcLatencyChecker struct { + Pod *corev1.Pod + clientSet kubernetes.Clientset + restConfig rest.Config +} + +func NewSvcLatencyChecker(clientSet kubernetes.Clientset, restConfig rest.Config) (SvcLatencyChecker, error) { + pod, err := clientSet.CoreV1().Pods(types.SvcLatencyNs).Get(context.TODO(), types.SvcLatencyCheckerName, metav1.GetOptions{}) + if err != nil { + return SvcLatencyChecker{}, err + } + return SvcLatencyChecker{ + Pod: pod, + clientSet: clientSet, + restConfig: restConfig, + }, nil +} + +func (lc *SvcLatencyChecker) Ping(address string, port int32, timeout time.Duration) error { + var stdout, stderr bytes.Buffer + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + // We use 50ms precision thanks to sleep 0.1 + cmd := []string{"bash", "-c", fmt.Sprintf("while true; do nc -w 0.1s -z %s %d && break; sleep 0.05; done", address, port)} + req := lc.clientSet.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(lc.Pod.Name). + Namespace(lc.Pod.Namespace). + SubResource("exec") + req.VersionedParams(&corev1.PodExecOptions{ + Container: types.SvcLatencyCheckerName, + Stdin: false, + Stdout: true, + Stderr: true, + Command: cmd, + TTY: false, + }, scheme.ParameterCodec) + err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { + exec, err := remotecommand.NewSPDYExecutor(&lc.restConfig, "POST", req.URL()) + if err != nil { + return false, err + } + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdout: &stdout, + Stderr: &stderr, + }) + if err != nil { + return false, err + } + return true, nil + }) + if ctx.Err() == context.DeadlineExceeded { + return fmt.Errorf("timeout waiting for endpoint %s:%d to be ready", address, port) + } + return err +} diff --git a/pkg/measurements/vmi_latency.go b/pkg/measurements/vmi_latency.go index d6554e9db..ae9ebfeaf 100644 --- a/pkg/measurements/vmi_latency.go +++ b/pkg/measurements/vmi_latency.go @@ -272,23 +272,21 @@ func (p *vmiLatency) handleUpdateVMIPod(obj interface{}) { p.mu.Unlock() } -func (p *vmiLatency) setConfig(cfg types.Measurement) error { +func (p *vmiLatency) setConfig(cfg types.Measurement) { p.config = cfg - if err := p.validateConfig(); err != nil { - return err - } - return nil } // Start starts vmiLatency measurement -func (p *vmiLatency) start(measurementWg *sync.WaitGroup) { +func (p *vmiLatency) start(measurementWg *sync.WaitGroup) error { if factory.jobConfig.JobType == config.DeletionJob { log.Info("VMI latency measurement not compatible with delete jobs, skipping") - return + return nil } defer measurementWg.Done() + if err := p.validateConfig(); err != nil { + return err + } p.metrics = make(map[string]*vmiMetric) - log.Infof("Creating VM latency watcher for %s", factory.jobConfig.Name) restClient := newRESTClientWithRegisteredKubevirtResource() p.vmWatcher = metrics.NewWatcher( @@ -297,8 +295,9 @@ func (p *vmiLatency) start(measurementWg *sync.WaitGroup) { "virtualmachines", corev1.NamespaceAll, func(options *metav1.ListOptions) { - options.LabelSelector = fmt.Sprintf("kube-burner-runid=%s", globalCfg.RUNID) + options.LabelSelector = fmt.Sprintf("kube-burner-runid=%v", globalCfg.RUNID) }, + nil, ) p.vmWatcher.Informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: p.handleCreateVM, @@ -307,7 +306,7 @@ func (p *vmiLatency) start(measurementWg *sync.WaitGroup) { }, }) if err := p.vmWatcher.StartAndCacheSync(); err != nil { - log.Errorf("VMI Latency measurement error: %s", err) + return fmt.Errorf("VMI Latency measurement error: %s", err) } log.Infof("Creating VMI latency watcher for %s", factory.jobConfig.Name) @@ -317,8 +316,9 @@ func (p *vmiLatency) start(measurementWg *sync.WaitGroup) { "virtualmachineinstances", corev1.NamespaceAll, func(options *metav1.ListOptions) { - options.LabelSelector = fmt.Sprintf("kube-burner-runid=%s", globalCfg.RUNID) + options.LabelSelector = fmt.Sprintf("kube-burner-runid=%v", globalCfg.RUNID) }, + nil, ) p.vmiWatcher.Informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: p.handleCreateVMI, @@ -327,7 +327,7 @@ func (p *vmiLatency) start(measurementWg *sync.WaitGroup) { }, }) if err := p.vmiWatcher.StartAndCacheSync(); err != nil { - log.Errorf("VMI Latency measurement error: %s", err) + return fmt.Errorf("VMI Latency measurement error: %s", err) } log.Infof("Creating VMI Pod latency watcher for %s", factory.jobConfig.Name) @@ -337,8 +337,9 @@ func (p *vmiLatency) start(measurementWg *sync.WaitGroup) { "pods", corev1.NamespaceAll, func(options *metav1.ListOptions) { - options.LabelSelector = fmt.Sprintf("kube-burner-runid=%s", globalCfg.RUNID) + options.LabelSelector = fmt.Sprintf("kube-burner-runid=%v", globalCfg.RUNID) }, + nil, ) p.vmiPodWatcher.Informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: p.handleCreateVMIPod, @@ -347,8 +348,9 @@ func (p *vmiLatency) start(measurementWg *sync.WaitGroup) { }, }) if err := p.vmiPodWatcher.StartAndCacheSync(); err != nil { - log.Errorf("VMI Pod Latency measurement error: %s", err) + return fmt.Errorf("VMI Pod Latency measurement error: %s", err) } + return nil } func newRESTClientWithRegisteredKubevirtResource() *rest.RESTClient { diff --git a/pkg/util/namespaces.go b/pkg/util/namespaces.go new file mode 100644 index 000000000..d145ff3bf --- /dev/null +++ b/pkg/util/namespaces.go @@ -0,0 +1,139 @@ +// Copyright 2023 The Kube-burner Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "time" + + log "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" +) + +func CreateNamespace(clientSet *kubernetes.Clientset, name string, nsLabels map[string]string, nsAnnotations map[string]string) error { + ns := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: name, Labels: nsLabels, Annotations: nsAnnotations}, + } + return RetryWithExponentialBackOff(func() (done bool, err error) { + _, err = clientSet.CoreV1().Namespaces().Create(context.TODO(), &ns, metav1.CreateOptions{}) + if errors.IsForbidden(err) { + log.Fatalf("authorization error creating namespace %s: %s", ns.Name, err) + return false, err + } + if errors.IsAlreadyExists(err) { + log.Infof("Namespace %s already exists", ns.Name) + nsSpec, _ := clientSet.CoreV1().Namespaces().Get(context.TODO(), name, metav1.GetOptions{}) + if nsSpec.Status.Phase == corev1.NamespaceTerminating { + log.Warnf("Namespace %s is in %v state, retrying", name, corev1.NamespaceTerminating) + return false, nil + } + return true, nil + } else if err != nil { + log.Errorf("unexpected error creating namespace %s: %v", name, err) + return false, nil + } + log.Debugf("Created namespace: %s", ns.Name) + return true, err + }, 5*time.Second, 3, 0, 5*time.Hour) +} + +// CleanupNamespaces deletes namespaces with the given selector +func CleanupNamespaces(ctx context.Context, clientSet *kubernetes.Clientset, labelSelector string) { + ns, err := clientSet.CoreV1().Namespaces().List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + log.Errorf("Error listing namespaces: %v", err.Error()) + return + } + if len(ns.Items) > 0 { + log.Infof("Deleting %d namespaces with label: %s", len(ns.Items), labelSelector) + for _, ns := range ns.Items { + err := clientSet.CoreV1().Namespaces().Delete(ctx, ns.Name, metav1.DeleteOptions{}) + if err != nil { + if !errors.IsNotFound(err) { + log.Errorf("Error deleting namespace %s: %v", ns.Name, err) + } + } + } + waitForDeleteNamespaces(ctx, clientSet, labelSelector) + } +} + +func waitForDeleteNamespaces(ctx context.Context, clientSet *kubernetes.Clientset, labelSelector string) { + log.Info("Waiting for namespaces to be definitely deleted") + err := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) { + ns, err := clientSet.CoreV1().Namespaces().List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + return false, err + } + if len(ns.Items) == 0 { + return true, nil + } + log.Debugf("Waiting for %d namespaces labeled with %s to be deleted", len(ns.Items), labelSelector) + return false, nil + }) + if err != nil { + if ctx.Err() == context.DeadlineExceeded { + log.Fatalf("Timeout cleaning up namespaces: %v", err) + } + log.Errorf("Error cleaning up namespaces: %v", err) + } +} + +// Cleanup non-namespaced resources with the given selector +func CleanupNonNamespacedResources(ctx context.Context, clientSet *kubernetes.Clientset, dynamicClient dynamic.Interface, labelSelector string) { + serverResources, _ := clientSet.Discovery().ServerPreferredResources() + log.Infof("Deleting non-namespace resources with label: %s", labelSelector) + for _, resourceList := range serverResources { + for _, resource := range resourceList.APIResources { + if !resource.Namespaced { + gv, err := schema.ParseGroupVersion(resourceList.GroupVersion) + if err != nil { + log.Errorf("Unable to scan the resource group version: %v", err) + } + resourceInterface := dynamicClient.Resource(schema.GroupVersionResource{ + Group: gv.Group, + Version: gv.Version, + Resource: resource.Name, + }) + resources, err := resourceInterface.List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + log.Debugf("Unable to list resource %s: %v", resource.Name, err) + continue + } + DeleteNonNamespacedResources(ctx, resources, resourceInterface) + } + } + } +} + +func DeleteNonNamespacedResources(ctx context.Context, resources *unstructured.UnstructuredList, resourceInterface dynamic.NamespaceableResourceInterface) { + if len(resources.Items) > 0 { + for _, item := range resources.Items { + go func(item unstructured.Unstructured) { + err := resourceInterface.Delete(ctx, item.GetName(), metav1.DeleteOptions{}) + if err != nil { + log.Errorf("Error deleting %v/%v: %v", item.GetKind(), item.GetName(), err) + } + }(item) + } + } +} diff --git a/pkg/util/utils.go b/pkg/util/utils.go new file mode 100644 index 000000000..8277e8398 --- /dev/null +++ b/pkg/util/utils.go @@ -0,0 +1,34 @@ +// Copyright 2024 The Kube-burner Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "math" + "time" + + "k8s.io/apimachinery/pkg/util/wait" +) + +// RetryWithExponentialBackOff a utility for retrying the given function with exponential backoff. +func RetryWithExponentialBackOff(fn wait.ConditionFunc, duration time.Duration, factor, jitter float64, timeout time.Duration) error { + steps := int(math.Ceil(math.Log(float64(timeout)/(float64(duration)*(1+jitter))) / math.Log(factor))) + backoff := wait.Backoff{ + Duration: duration, + Factor: factor, + Jitter: jitter, + Steps: steps, + } + return wait.ExponentialBackoff(backoff, fn) +} diff --git a/test/k8s/kube-burner-delete.yml b/test/k8s/kube-burner-delete.yml index 9a410d587..85a26ed66 100644 --- a/test/k8s/kube-burner-delete.yml +++ b/test/k8s/kube-burner-delete.yml @@ -30,7 +30,7 @@ jobs: - objectTemplate: objectTemplates/deployment.yml replicas: 1 inputVars: - containerImage: gcr.io/google_containers/pause-amd64:3.0 + containerImage: gcr.io/google_containers/pause-amd64:3.2 wait: true diff --git a/test/k8s/kube-burner.yml b/test/k8s/kube-burner.yml index eba725ec8..ca108500d 100644 --- a/test/k8s/kube-burner.yml +++ b/test/k8s/kube-burner.yml @@ -4,6 +4,8 @@ global: gc: {{env "GC"}} measurements: - name: podLatency + - name: serviceLatency + svcTimeout: 5s indexerConfig: type: {{ .INDEXING_TYPE }} @@ -40,7 +42,10 @@ jobs: - objectTemplate: objectTemplates/deployment.yml replicas: 1 inputVars: - containerImage: gcr.io/google_containers/pause-amd64:3.0 + containerImage: quay.io/cloud-bulldozer/sampleapp:latest - objectTemplate: objectTemplates/pod.yml replicas: 1 + + - objectTemplate: objectTemplates/service.yml + replicas: 1 diff --git a/test/k8s/objectTemplates/pod.yml b/test/k8s/objectTemplates/pod.yml index 684eec7c4..f2f32c84f 100644 --- a/test/k8s/objectTemplates/pod.yml +++ b/test/k8s/objectTemplates/pod.yml @@ -7,6 +7,6 @@ metadata: namespace: default spec: containers: - - image: gcr.io/google_containers/pause:3.1 + - image: gcr.io/google_containers/pause:3.2 name: pause imagePullPolicy: IfNotPresent diff --git a/test/k8s/objectTemplates/service.yml b/test/k8s/objectTemplates/service.yml new file mode 100644 index 000000000..5c48e33ff --- /dev/null +++ b/test/k8s/objectTemplates/service.yml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: mysvc-{{ .Iteration }}-{{ .Replica }} +spec: + ports: + - name: "8080" + port: 8080 + protocol: TCP + targetPort: 8080 + selector: + app: sleep-app-{{ .Iteration }}-{{ .Replica }} + type: ClusterIP diff --git a/test/test-k8s.bats b/test/test-k8s.bats index c3924d9cd..bae3dc3ca 100755 --- a/test/test-k8s.bats +++ b/test/test-k8s.bats @@ -7,9 +7,9 @@ load helpers.bash setup_file() { cd k8s export BATS_TEST_TIMEOUT=600 - export JOB_ITERATIONS=5 - export QPS=2 - export BURST=2 + export JOB_ITERATIONS=4 + export QPS=3 + export BURST=3 export GC=true export CHURN=false setup-kind @@ -43,9 +43,9 @@ teardown_file() { @test "kube-burner init: gc=false" { export GC=false run_cmd kube-burner init -c kube-burner.yml --uuid="${UUID}" --log-level=debug - check_ns kube-burner-job=namespaced,kube-burner-uuid="${UUID}" 6 - check_running_pods kube-burner-job=namespaced,kube-burner-uuid="${UUID}" 12 - check_running_pods_in_ns default 6 + check_ns kube-burner-job=namespaced,kube-burner-uuid="${UUID}" 5 + check_running_pods kube-burner-job=namespaced,kube-burner-uuid="${UUID}" 10 + check_running_pods_in_ns default 5 kube-burner destroy --uuid "${UUID}" check_destroyed_ns kube-burner-job=namespaced,kube-burner-uuid="${UUID}" check_destroyed_ns kube-burner-job=not-namespaced,kube-burner-uuid="${UUID}" @@ -54,7 +54,7 @@ teardown_file() { @test "kube-burner init: local-indexing=true; pod-latency-metrics-indexing=true" { export INDEXING_TYPE=local run_cmd kube-burner init -c kube-burner.yml --uuid="${UUID}" --log-level=debug - check_file_list ${METRICS_FOLDER}/podLatencyMeasurement-namespaced.json ${METRICS_FOLDER}/podLatencyQuantilesMeasurement-namespaced.json + check_file_list ${METRICS_FOLDER}/podLatencyMeasurement-namespaced.json ${METRICS_FOLDER}/podLatencyQuantilesMeasurement-namespaced.json ${METRICS_FOLDER}/svcLatencyMeasurement-namespaced.json ${METRICS_FOLDER}/svcLatencyQuantilesMeasurement-namespaced.json check_destroyed_ns kube-burner-job=not-namespaced,kube-burner-uuid="${UUID}" check_destroyed_pods default kube-burner-job=not-namespaced,kube-burner-uuid="${UUID}" } @@ -70,17 +70,19 @@ teardown_file() { @test "kube-burner init: os-indexing=true; metrics-endpoint=true" { export INDEXING_TYPE=opensearch run_cmd kube-burner init -c kube-burner.yml --uuid="${UUID}" --log-level=debug -e metrics-endpoints.yaml - check_metric_value jobSummary top2PrometheusCPU prometheusRSS podLatencyMeasurement podLatencyQuantilesMeasurement + check_metric_value jobSummary top2PrometheusCPU prometheusRSS podLatencyMeasurement podLatencyQuantilesMeasurement svcLatencyMeasurement svcLatencyQuantilesMeasurement check_destroyed_ns kube-burner-job=not-namespaced,kube-burner-uuid="${UUID}" check_destroyed_pods default kube-burner-job=not-namespaced,kube-burner-uuid="${UUID}" } @test "kube-burner index: local-indexing=true" { - run_cmd kube-burner index --uuid="${UUID}" -u http://localhost:9090 -m metrics-profile.yaml + run_cmd kube-burner index --uuid="${UUID}" -u http://localhost:9090 -m metrics-profile.yaml + check_file_list collected-metrics/top2PrometheusCPU.json collected-metrics/prometheusRSS.json } @test "kube-burner index: metrics-endpoint=true; os-indexing=true" { run_cmd kube-burner index --uuid="${UUID}" -e metrics-endpoints.yaml --es-server=${ES_SERVER} --es-index=${ES_INDEX} + check_metric_value top2PrometheusCPU prometheusRSS } @test "kube-burner init: crd" {