From ce4fe0990d235ad8a46f58060dd868ba6cfcffb0 Mon Sep 17 00:00:00 2001 From: Martin Gencur Date: Wed, 20 Apr 2022 16:08:29 +0200 Subject: [PATCH] [release-1.2] Backport upgrade event tracing (#6342) * Wathola Tracing for upgrade tests (#6219) * wathola exposing trace information * Run update-deps.sh * Fix license * Fix import * Ensure backwards compatibility * Assert ParentID not nil in test * Separate old and new events sender APIs * Make loggingCfg in client private * Wait only 1 second for flushing tracing info The Reporter is created with a default batch interval 1 second. So, it should be enough to wait just 1 second because the data is flushed every 1 second. * Increase the sleep time to 1.5 seconds to be safe * The ticker runs every 100ms so it could be 1100 ms until the buffer really flushes. * Use Log.Fatal when tracing is not set up properly * Increase the sleep time to 5 seconds and reference knative/pkg issue * Process empty tracing config in test images (#6289) * Print traces for missed events in upgrade tests (#6249) * Upgrade tests reporting Trace information for missed events * TMP: Induce missed event * Revert "TMP: Induce missed event" This reverts commit 2fec7c747accab25141bb297b40ec03d8aa90082. * Report trace also for Duplicated events * TMP: Induce missed event * TMP: Simulate duplicate events * Fix readme * Unify path for duplicate and missed events * Revert "TMP: Simulate duplicate events" This reverts commit c126521778c6af6bcc324436a5b36d55ab166d9c. * Revert "TMP: Induce missed event" This reverts commit fcd918596c8c4d1cf8cad8787db74ceead391275. * Do not fail upgrade tests if tracing is not configured (#6299) * Do not fail upgrade tests if tracing is not configured * TMP: Do not deploy Knative Monitoring * Revert "TMP: Do not deploy Knative Monitoring" This reverts commit 086a8f9e15ddd5af12158b965e4f2401e6673222. * Limit the number of exported traces (#6329) Exporting traces for a large number of events can exceed the timeout of the whole test suite, leading to all upgrade tests being reported as failed. * Cleanup Zipkin tracing only once in upgrade test suite (#6331) * NPE fix (#6343) Co-authored-by: Chris Suszynski --- test/config/monitoring/monitoring.yaml | 6 +- test/lib/client.go | 29 ++-- test/lib/creation.go | 7 +- test/test_images/utils.go | 12 +- test/upgrade/README.md | 21 +++ test/upgrade/prober/config.toml | 3 +- test/upgrade/prober/configuration.go | 24 +++- test/upgrade/prober/forwarder.go | 25 ++-- test/upgrade/prober/receiver.go | 28 ++-- test/upgrade/prober/sender.go | 23 ++-- test/upgrade/prober/verify.go | 125 +++++++++++++++--- .../upgrade/prober/wathola/client/receiver.go | 16 ++- .../upgrade/prober/wathola/config/defaults.go | 3 +- .../prober/wathola/config/structure.go | 11 +- test/upgrade/prober/wathola/config/tracing.go | 33 +++++ test/upgrade/prober/wathola/event/services.go | 5 +- test/upgrade/prober/wathola/event/tracing.go | 80 +++++++++++ .../prober/wathola/event/tracing_test.go | 75 +++++++++++ .../prober/wathola/forwarder/services.go | 12 +- .../prober/wathola/receiver/services.go | 10 +- .../prober/wathola/receiver/services_test.go | 3 +- .../prober/wathola/sender/operations.go | 1 + .../upgrade/prober/wathola/sender/services.go | 89 ++++++++++--- .../prober/wathola/sender/services_test.go | 81 +++++++++++- test/upgrade/prober/wathola/sender/types.go | 22 ++- test/upgrade/upgrade.go | 10 +- .../zipkin-go/reporter/recorder/recorder.go | 57 ++++++++ .../knative.dev/pkg/tracing/testing/zipkin.go | 47 +++++++ vendor/modules.txt | 2 + 29 files changed, 720 insertions(+), 140 deletions(-) create mode 100644 test/upgrade/prober/wathola/config/tracing.go create mode 100644 test/upgrade/prober/wathola/event/tracing.go create mode 100644 test/upgrade/prober/wathola/event/tracing_test.go create mode 100644 vendor/github.com/openzipkin/zipkin-go/reporter/recorder/recorder.go create mode 100644 vendor/knative.dev/pkg/tracing/testing/zipkin.go diff --git a/test/config/monitoring/monitoring.yaml b/test/config/monitoring/monitoring.yaml index ccc3518773c..9d9ec3dbdda 100644 --- a/test/config/monitoring/monitoring.yaml +++ b/test/config/monitoring/monitoring.yaml @@ -53,9 +53,13 @@ spec: fieldRef: apiVersion: v1 fieldPath: metadata.namespace + - name: JAVA_OPTS + value: '-Xms128m -Xmx5G -XX:+ExitOnOutOfMemoryError' + - name: MEM_MAX_SPANS + value: '10000000' resources: limits: - memory: 1000Mi + memory: 6Gi requests: memory: 256Mi diff --git a/test/lib/client.go b/test/lib/client.go index 271298b5b3b..778adafe9b1 100644 --- a/test/lib/client.go +++ b/test/lib/client.go @@ -42,7 +42,6 @@ import ( eventing "knative.dev/eventing/pkg/client/clientset/versioned" "knative.dev/eventing/test/lib/duck" - ti "knative.dev/eventing/test/test_images" ) // Client holds instances of interfaces for making requests to Knative. @@ -61,8 +60,8 @@ type Client struct { podsCreated []string - tracingEnv corev1.EnvVar - loggingEnv *corev1.EnvVar + TracingCfg string + loggingCfg string cleanup func() } @@ -105,12 +104,12 @@ func NewClient(namespace string, t *testing.T) (*Client, error) { client.EventListener = NewEventListener(client.Kube, client.Namespace, client.T.Logf) client.Cleanup(client.EventListener.Stop) - client.tracingEnv, err = getTracingConfig(client.Kube) + client.TracingCfg, err = getTracingConfig(client.Kube) if err != nil { return nil, err } - client.loggingEnv, err = getLoggingConfig(client.Kube) + client.loggingCfg, err = getLoggingConfig(client.Kube) if err != nil { t.Log("Cannot retrieve the logging config map: ", err) } @@ -161,40 +160,40 @@ func getGenericResource(tm metav1.TypeMeta) runtime.Object { return &duckv1.KResource{} } -func getTracingConfig(c kubernetes.Interface) (corev1.EnvVar, error) { +func getTracingConfig(c kubernetes.Interface) (string, error) { cm, err := c.CoreV1().ConfigMaps(system.Namespace()).Get(context.Background(), configtracing.ConfigName, metav1.GetOptions{}) if err != nil { - return corev1.EnvVar{}, fmt.Errorf("error while retrieving the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err)) + return "", fmt.Errorf("error while retrieving the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err)) } config, err := configtracing.NewTracingConfigFromConfigMap(cm) if err != nil { - return corev1.EnvVar{}, fmt.Errorf("error while parsing the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err)) + return "", fmt.Errorf("error while parsing the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err)) } configSerialized, err := configtracing.TracingConfigToJSON(config) if err != nil { - return corev1.EnvVar{}, fmt.Errorf("error while serializing the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err)) + return "", fmt.Errorf("error while serializing the %s config map: %+v", configtracing.ConfigName, errors.WithStack(err)) } - return corev1.EnvVar{Name: ti.ConfigTracingEnv, Value: configSerialized}, nil + return configSerialized, nil } -func getLoggingConfig(c kubernetes.Interface) (*corev1.EnvVar, error) { +func getLoggingConfig(c kubernetes.Interface) (string, error) { cm, err := c.CoreV1().ConfigMaps(system.Namespace()).Get(context.Background(), logging.ConfigMapName(), metav1.GetOptions{}) if err != nil { - return nil, fmt.Errorf("error while retrieving the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err)) + return "", fmt.Errorf("error while retrieving the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err)) } config, err := logging.NewConfigFromMap(cm.Data) if err != nil { - return nil, fmt.Errorf("error while parsing the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err)) + return "", fmt.Errorf("error while parsing the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err)) } configSerialized, err := logging.ConfigToJSON(config) if err != nil { - return nil, fmt.Errorf("error while serializing the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err)) + return "", fmt.Errorf("error while serializing the %s config map: %+v", logging.ConfigMapName(), errors.WithStack(err)) } - return &corev1.EnvVar{Name: ti.ConfigLoggingEnv, Value: configSerialized}, nil + return configSerialized, nil } diff --git a/test/lib/creation.go b/test/lib/creation.go index c502ca4d727..cb28f16e212 100644 --- a/test/lib/creation.go +++ b/test/lib/creation.go @@ -41,6 +41,7 @@ import ( "knative.dev/eventing/pkg/utils" "knative.dev/eventing/test/lib/duck" "knative.dev/eventing/test/lib/resources" + ti "knative.dev/eventing/test/test_images" ) // TODO(chizhg): break this file into multiple files when it grows too large. @@ -566,9 +567,9 @@ func (c *Client) CreateClusterRoleBindingOrFail(saName, crName, crbName string) func (c *Client) applyAdditionalEnv(pod *corev1.PodSpec) { for i := 0; i < len(pod.Containers); i++ { - pod.Containers[i].Env = append(pod.Containers[i].Env, c.tracingEnv) - if c.loggingEnv != nil { - pod.Containers[i].Env = append(pod.Containers[i].Env, *c.loggingEnv) + pod.Containers[i].Env = append(pod.Containers[i].Env, corev1.EnvVar{Name: ti.ConfigTracingEnv, Value: c.TracingCfg}) + if c.loggingCfg != "" { + pod.Containers[i].Env = append(pod.Containers[i].Env, corev1.EnvVar{Name: ti.ConfigLoggingEnv, Value: c.loggingCfg}) } } } diff --git a/test/test_images/utils.go b/test/test_images/utils.go index cafde81201d..2771d2ce4bc 100644 --- a/test/test_images/utils.go +++ b/test/test_images/utils.go @@ -56,23 +56,17 @@ const ( ConfigLoggingEnv = "K_CONFIG_LOGGING" ) -// ConfigureTracing can be used in test-images to configure tracing +// ConfigureTracing can be used in test-images to configure tracing. func ConfigureTracing(logger *zap.SugaredLogger, serviceName string) error { tracingEnv := os.Getenv(ConfigTracingEnv) - - if tracingEnv == "" { - return tracing.SetupStaticPublishing(logger, serviceName, config.NoopConfig()) - } - conf, err := config.JSONToTracingConfig(tracingEnv) if err != nil { - return err + logger.Warn("Error while trying to read the tracing config, using NoopConfig: ", err) } - return tracing.SetupStaticPublishing(logger, serviceName, conf) } -// ConfigureTracing can be used in test-images to configure tracing +// ConfigureLogging can be used in test-images to configure logging. func ConfigureLogging(ctx context.Context, name string) context.Context { loggingEnv := os.Getenv(ConfigLoggingEnv) conf, err := logging.JSONToConfig(loggingEnv) diff --git a/test/upgrade/README.md b/test/upgrade/README.md index f8354a90337..b5398886a89 100644 --- a/test/upgrade/README.md +++ b/test/upgrade/README.md @@ -119,3 +119,24 @@ struct can be influenced, by using `EVENTING_UPGRADE_TESTS_XXXXX` environmental variable prefix (using [kelseyhightower/envconfig](https://github.com/kelseyhightower/envconfig#usage) usage). + +#### Inspecting Zipkin traces for undelivered events + +When tracing is enabled in the `config-tracing` config map in the system namespace +the prober collects traces for undelivered events. The traces are exported as json files +under the artifacts dir. Traces for each event are stored in a separate file. +Step event traces are stored as `$ARTIFACTS/traces/missed-events/step-.json` +The finished event traces are stored as `$ARTIFACTS/traces/missed-events/finished.json` + +Traces can be viewed as follows: +- Start a Zipkin container on localhost: + ``` + $ docker run -d -p 9411:9411 ghcr.io/openzipkin/zipkin:2 + ``` +- Send traces to the Zipkin endpoint: + ``` + $ curl -v -X POST localhost:9411/api/v2/spans \ + -H 'Content-Type: application/json' \ + -d @$ARTIFACTS/traces/missed-events/step-.json + ``` +- View traces in Zipkin UI at `http://localhost:9411/zipkin` diff --git a/test/upgrade/prober/config.toml b/test/upgrade/prober/config.toml index 20afa87d6fa..7f7ca9f872e 100644 --- a/test/upgrade/prober/config.toml +++ b/test/upgrade/prober/config.toml @@ -1,6 +1,7 @@ # logLevel = 'DEBUG' +tracingConfig = '{{- .TracingConfig -}}' [sender] address = '{{- .Endpoint -}}' interval = {{ .Config.Interval.Nanoseconds }} [forwarder] -target = 'http://wathola-receiver.{{- .Namespace -}}.svc.cluster.local' +target = '{{- .ForwarderTarget -}}' diff --git a/test/upgrade/prober/configuration.go b/test/upgrade/prober/configuration.go index 963963c45db..94259d78700 100644 --- a/test/upgrade/prober/configuration.go +++ b/test/upgrade/prober/configuration.go @@ -18,7 +18,6 @@ package prober import ( "bytes" "context" - "errors" "fmt" "io/ioutil" "path" @@ -27,8 +26,11 @@ import ( "time" "github.com/kelseyhightower/envconfig" + "github.com/pkg/errors" "knative.dev/eventing/test/lib/resources" "knative.dev/eventing/test/upgrade/prober/sut" + "knative.dev/eventing/test/upgrade/prober/wathola/forwarder" + "knative.dev/eventing/test/upgrade/prober/wathola/receiver" duckv1 "knative.dev/pkg/apis/duck/v1" pkgTest "knative.dev/pkg/test" pkgupgrade "knative.dev/pkg/test/upgrade" @@ -49,6 +51,8 @@ const ( Error DuplicateAction = "error" prefix = "eventing_upgrade_tests" + + forwarderTargetFmt = "http://" + receiver.Name + ".%s.svc.cluster.local" ) var ( @@ -141,9 +145,9 @@ func (p *prober) deployConfiguration() { Log: p.log, Client: p.client, } - ref := resources.KnativeRefForService(receiverName, p.client.Namespace) + ref := resources.KnativeRefForService(receiver.Name, p.client.Namespace) if p.config.Serving.Use { - ref = resources.KnativeRefForKservice(forwarderName, p.client.Namespace) + ref = resources.KnativeRefForKservice(forwarder.Name, p.client.Namespace) } dest := duckv1.Destination{Ref: ref} s := p.config.SystemUnderTest @@ -153,19 +157,20 @@ func (p *prober) deployConfiguration() { tr.Teardown(sc) } }) + p.deployConfigToml(endpoint) } func (p *prober) deployConfigToml(endpoint interface{}) { name := p.config.ConfigMapName p.log.Infof("Deploying config map: \"%s/%s\"", p.client.Namespace, name) - configData := p.compileTemplate(p.config.ConfigTemplate, endpoint) + configData := p.compileTemplate(p.config.ConfigTemplate, endpoint, p.client.TracingCfg) p.client.CreateConfigMapOrFail(name, p.client.Namespace, map[string]string{ p.config.ConfigFilename: configData, }) } -func (p *prober) compileTemplate(templateName string, endpoint interface{}) string { +func (p *prober) compileTemplate(templateName string, endpoint interface{}, tracingConfig string) string { _, filename, _, _ := runtime.Caller(0) templateFilepath := path.Join(path.Dir(filename), templateName) templateBytes, err := ioutil.ReadFile(templateFilepath) @@ -175,15 +180,20 @@ func (p *prober) compileTemplate(templateName string, endpoint interface{}) stri var buff bytes.Buffer data := struct { *Config + // Deprecated: use ForwarderTarget Namespace string // Deprecated: use Endpoint - BrokerURL string - Endpoint interface{} + BrokerURL string + Endpoint interface{} + TracingConfig string + ForwarderTarget string }{ p.config, p.client.Namespace, fmt.Sprintf("%v", endpoint), endpoint, + tracingConfig, + fmt.Sprintf(forwarderTargetFmt, p.client.Namespace), } p.ensureNoError(tmpl.Execute(&buff, data)) return buff.String() diff --git a/test/upgrade/prober/forwarder.go b/test/upgrade/prober/forwarder.go index 14965fafb88..77e5628a366 100644 --- a/test/upgrade/prober/forwarder.go +++ b/test/upgrade/prober/forwarder.go @@ -23,28 +23,25 @@ import ( testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/duck" "knative.dev/eventing/test/lib/resources" -) - -var ( - forwarderName = "wathola-forwarder" + "knative.dev/eventing/test/upgrade/prober/wathola/forwarder" ) func (p *prober) deployForwarder() { - p.log.Infof("Deploy forwarder knative service: %v", forwarderName) + p.log.Infof("Deploy forwarder knative service: %v", forwarder.Name) serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace) - service := p.forwarderKService(forwarderName, p.client.Namespace) + service := p.forwarderKService(forwarder.Name, p.client.Namespace) if _, err := serving.Create(p.config.Ctx, service, metav1.CreateOptions{}); err != nil { p.client.T.Fatal(err) } sc := p.servingClient() - testlib.WaitFor(fmt.Sprintf("forwarder ksvc be ready: %v", forwarderName), func() error { - return duck.WaitForKServiceReady(sc, forwarderName, p.client.Namespace) + testlib.WaitFor(fmt.Sprintf("forwarder ksvc be ready: %v", forwarder.Name), func() error { + return duck.WaitForKServiceReady(sc, forwarder.Name, p.client.Namespace) }) if p.config.Serving.ScaleToZero { - testlib.WaitFor(fmt.Sprintf("forwarder scales to zero: %v", forwarderName), func() error { - return duck.WaitForKServiceScales(p.config.Ctx, sc, forwarderName, p.client.Namespace, func(scale int) bool { + testlib.WaitFor(fmt.Sprintf("forwarder scales to zero: %v", forwarder.Name), func() error { + return duck.WaitForKServiceScales(p.config.Ctx, sc, forwarder.Name, p.client.Namespace, func(scale int) bool { return scale == 0 }) }) @@ -52,9 +49,9 @@ func (p *prober) deployForwarder() { } func (p *prober) removeForwarder() { - p.log.Infof("Remove forwarder knative service: %v", forwarderName) + p.log.Infof("Remove forwarder knative service: %v", forwarder.Name) serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace) - err := serving.Delete(p.config.Ctx, forwarderName, metav1.DeleteOptions{}) + err := serving.Delete(p.config.Ctx, forwarder.Name, metav1.DeleteOptions{}) p.ensureNoError(err) } @@ -73,8 +70,8 @@ func (p *prober) forwarderKService(name, namespace string) *unstructured.Unstruc "template": map[string]interface{}{ "spec": map[string]interface{}{ "containers": []map[string]interface{}{{ - "name": forwarderName, - "image": p.config.ImageResolver(forwarderName), + "name": forwarder.Name, + "image": p.config.ImageResolver(forwarder.Name), "volumeMounts": []map[string]interface{}{{ "name": p.config.ConfigMapName, "mountPath": p.config.ConfigMountPoint, diff --git a/test/upgrade/prober/receiver.go b/test/upgrade/prober/receiver.go index d5537988191..d610c28d185 100644 --- a/test/upgrade/prober/receiver.go +++ b/test/upgrade/prober/receiver.go @@ -22,13 +22,11 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" - testlib "knative.dev/eventing/test/lib" - watholaconfig "knative.dev/eventing/test/upgrade/prober/wathola/config" pkgTest "knative.dev/pkg/test" -) -var ( - receiverName = "wathola-receiver" + testlib "knative.dev/eventing/test/lib" + watholaconfig "knative.dev/eventing/test/upgrade/prober/wathola/config" + "knative.dev/eventing/test/upgrade/prober/wathola/receiver" ) func (p *prober) deployReceiver() { @@ -37,22 +35,22 @@ func (p *prober) deployReceiver() { } func (p *prober) deployReceiverDeployment() { - p.log.Info("Deploy of receiver deployment: ", receiverName) + p.log.Info("Deploy of receiver deployment: ", receiver.Name) deployment := p.createReceiverDeployment() p.client.CreateDeploymentOrFail(deployment) - testlib.WaitFor(fmt.Sprint("receiver deployment be ready: ", receiverName), func() error { + testlib.WaitFor(fmt.Sprint("receiver deployment be ready: ", receiver.Name), func() error { return pkgTest.WaitForDeploymentScale( - p.config.Ctx, p.client.Kube, receiverName, p.client.Namespace, 1, + p.config.Ctx, p.client.Kube, receiver.Name, p.client.Namespace, 1, ) }) } func (p *prober) deployReceiverService() { - p.log.Infof("Deploy of receiver service: %v", receiverName) + p.log.Infof("Deploy of receiver service: %v", receiver.Name) service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: receiverName, + Name: receiver.Name, Namespace: p.client.Namespace, }, Spec: corev1.ServiceSpec{ @@ -68,7 +66,7 @@ func (p *prober) deployReceiverService() { }, }, Selector: map[string]string{ - "app": receiverName, + "app": receiver.Name, }, Type: corev1.ServiceTypeClusterIP, }, @@ -80,20 +78,20 @@ func (p *prober) createReceiverDeployment() *appsv1.Deployment { var replicas int32 = 1 return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: receiverName, + Name: receiver.Name, Namespace: p.client.Namespace, }, Spec: appsv1.DeploymentSpec{ Replicas: &replicas, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ - "app": receiverName, + "app": receiver.Name, }, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "app": receiverName, + "app": receiver.Name, }, }, Spec: corev1.PodSpec{ @@ -109,7 +107,7 @@ func (p *prober) createReceiverDeployment() *appsv1.Deployment { }}, Containers: []corev1.Container{{ Name: "receiver", - Image: p.config.ImageResolver(receiverName), + Image: p.config.ImageResolver(receiver.Name), VolumeMounts: []corev1.VolumeMount{{ Name: p.config.ConfigMapName, ReadOnly: true, diff --git a/test/upgrade/prober/sender.go b/test/upgrade/prober/sender.go index 6c65a378169..511b4a66522 100644 --- a/test/upgrade/prober/sender.go +++ b/test/upgrade/prober/sender.go @@ -25,33 +25,32 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "knative.dev/eventing/test/upgrade/prober/wathola/sender" pkgTest "knative.dev/pkg/test" testlib "knative.dev/eventing/test/lib" ) -var senderName = "wathola-sender" - func (p *prober) deploySender() { - p.log.Info("Deploy sender deployment: ", senderName) + p.log.Info("Deploy sender deployment: ", sender.Name) var replicas int32 = 1 var gracePeriodSeconds int64 = 300 deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: senderName, + Name: sender.Name, Namespace: p.client.Namespace, }, Spec: appsv1.DeploymentSpec{ Replicas: &replicas, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ - "app": senderName, + "app": sender.Name, }, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "app": senderName, + "app": sender.Name, }, }, Spec: corev1.PodSpec{ @@ -67,7 +66,7 @@ func (p *prober) deploySender() { }}, Containers: []corev1.Container{{ Name: "sender", - Image: p.config.ImageResolver(senderName), + Image: p.config.ImageResolver(sender.Name), VolumeMounts: []corev1.VolumeMount{{ Name: p.config.ConfigMapName, ReadOnly: true, @@ -85,21 +84,21 @@ func (p *prober) deploySender() { Create(p.config.Ctx, deployment, metav1.CreateOptions{}) p.ensureNoError(err) - testlib.WaitFor(fmt.Sprint("sender deployment be ready: ", senderName), func() error { + testlib.WaitFor(fmt.Sprint("sender deployment be ready: ", sender.Name), func() error { return pkgTest.WaitForDeploymentScale( - p.config.Ctx, p.client.Kube, senderName, p.client.Namespace, int(replicas), + p.config.Ctx, p.client.Kube, sender.Name, p.client.Namespace, int(replicas), ) }) } func (p *prober) removeSender() { - p.log.Info("Remove of sender deployment: ", senderName) + p.log.Info("Remove of sender deployment: ", sender.Name) foreground := metav1.DeletePropagationForeground dOpts := metav1.DeleteOptions{PropagationPolicy: &foreground} err := p.client.Kube.AppsV1(). Deployments(p.client.Namespace). - Delete(p.config.Ctx, senderName, dOpts) + Delete(p.config.Ctx, sender.Name, dOpts) p.ensureNoError(err) var d *appsv1.Deployment @@ -107,7 +106,7 @@ func (p *prober) removeSender() { // Save err and deployment for error reporting. d, err = p.client.Kube.AppsV1(). Deployments(p.client.Namespace). - Get(p.config.Ctx, senderName, metav1.GetOptions{}) + Get(p.config.Ctx, sender.Name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { return true, nil } diff --git a/test/upgrade/prober/verify.go b/test/upgrade/prober/verify.go index 0b1e5eda389..a3f2577a805 100644 --- a/test/upgrade/prober/verify.go +++ b/test/upgrade/prober/verify.go @@ -20,6 +20,9 @@ import ( "encoding/json" "errors" "fmt" + "os" + "path/filepath" + "regexp" "time" "go.uber.org/zap" @@ -28,27 +31,49 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + "knative.dev/eventing/test/upgrade/prober/wathola/event" "knative.dev/eventing/test/upgrade/prober/wathola/fetcher" "knative.dev/eventing/test/upgrade/prober/wathola/receiver" + "knative.dev/pkg/system" pkgTest "knative.dev/pkg/test" + "knative.dev/pkg/test/helpers" "knative.dev/pkg/test/logging" + "knative.dev/pkg/test/prow" + "knative.dev/pkg/test/zipkin" ) const ( - fetcherName = "wathola-fetcher" - jobWaitInterval = time.Second - jobWaitTimeout = 10 * time.Minute + fetcherName = "wathola-fetcher" + jobWaitInterval = time.Second + jobWaitTimeout = 10 * time.Minute + stepEventMsgPattern = "event #([0-9]+).*" + exportTraceLimit = 1000 ) // Verify will verify prober state after finished has been sent. func (p *prober) Verify() (eventErrs []error, eventsSent int) { var report *receiver.Report + // Enable port-forwarding for Zipkin endpoint. + if err := zipkin.SetupZipkinTracingFromConfigTracing(p.config.Ctx, + p.client.Kube, p.client.T.Logf, system.Namespace()); err != nil { + p.log.Warnf("Failed to setup Zipkin tracing. Traces for events won't be available.") + } else { + // Required for proper cleanup. + zipkin.ZipkinTracingEnabled = true + } p.log.Info("Waiting for complete report from receiver...") start := time.Now() if err := wait.PollImmediate(jobWaitInterval, jobWaitTimeout, func() (bool, error) { - report = p.fetchReport() - return report.State != "active", nil + var err error + report, err = p.fetchReport() + if err != nil { + return false, err + } + return report != nil && report.State != "active", nil }); err != nil { + if err := p.exportTrace(p.getTraceForFinishedEvent(), "finished.json"); err != nil { + p.log.Warnf("Failed to export trace for Finished event: %v", err) + } p.client.T.Fatalf("Error fetching complete/inactive report: %v\nReport: %+v", err, report) } elapsed := time.Since(start) @@ -60,8 +85,15 @@ func (p *prober) Verify() (eventErrs []error, eventsSent int) { elapsed, report.EventsSent, report.State) p.log.Infof("Availability: %.3f%%, Requests sent: %d.", availRate, report.TotalRequests) - for _, t := range report.Thrown.Missing { + for i, t := range report.Thrown.Missing { eventErrs = append(eventErrs, errors.New(t)) + if i > exportTraceLimit { + continue + } + stepNo := p.getStepNoFromMsg(t) + if err := p.exportTrace(p.getTraceForStepEvent(stepNo), fmt.Sprintf("step-%s.json", stepNo)); err != nil { + p.log.Warnf("Failed to export trace for Step event #%s: %v", stepNo, err) + } } for _, t := range report.Thrown.Unexpected { eventErrs = append(eventErrs, errors.New(t)) @@ -69,12 +101,19 @@ func (p *prober) Verify() (eventErrs []error, eventsSent int) { for _, t := range report.Thrown.Unavailable { eventErrs = append(eventErrs, errors.New(t)) } - for _, t := range report.Thrown.Duplicated { + for i, t := range report.Thrown.Duplicated { if p.config.OnDuplicate == Warn { p.log.Warn("Duplicate events: ", t) } else if p.config.OnDuplicate == Error { eventErrs = append(eventErrs, errors.New(t)) } + if i > exportTraceLimit { + continue + } + stepNo := p.getStepNoFromMsg(t) + if err := p.exportTrace(p.getTraceForStepEvent(stepNo), fmt.Sprintf("step-%s.json", stepNo)); err != nil { + p.log.Warnf("Failed to export trace for Step event #%s: %v", stepNo, err) + } } return eventErrs, report.EventsSent } @@ -84,10 +123,63 @@ func (p *prober) Finish() { p.removeSender() } -func (p *prober) fetchReport() *receiver.Report { - exec := p.fetchExecution() +func (p *prober) getStepNoFromMsg(message string) string { + r, _ := regexp.Compile(stepEventMsgPattern) + matches := r.FindStringSubmatch(message) + if len(matches) != 2 { + p.log.Warnf("message does not match pattern %s: %s", stepEventMsgPattern, message) + } + return matches[1] +} + +func (p *prober) getTraceForStepEvent(eventNo string) []byte { + p.log.Infof("Fetching trace for Step event #%s", eventNo) + query := fmt.Sprintf("step=%s and cloudevents.type=%s and target=%s", + eventNo, event.StepType, fmt.Sprintf(forwarderTargetFmt, p.client.Namespace)) + trace, err := event.FindTrace(query) + if err != nil { + p.log.Warn(err) + } + return trace +} + +func (p *prober) getTraceForFinishedEvent() []byte { + p.log.Info("Fetching trace for Finished event") + query := fmt.Sprintf("cloudevents.type=%s and target=%s", + event.FinishedType, fmt.Sprintf(forwarderTargetFmt, p.client.Namespace)) + trace, err := event.FindTrace(query) + if err != nil { + p.log.Warn(err) + } + return trace +} + +func (p *prober) exportTrace(trace []byte, fileName string) error { + tracesDir := filepath.Join(prow.GetLocalArtifactsDir(), "traces", "events") + if err := helpers.CreateDir(tracesDir); err != nil { + return fmt.Errorf("error creating directory %q: %w", tracesDir, err) + } + fp := filepath.Join(tracesDir, fileName) + p.log.Infof("Exporting trace into %s", fp) + f, err := os.Create(fp) + if err != nil { + return fmt.Errorf("error creating file %q: %w", fp, err) + } + defer f.Close() + _, err = f.Write(trace) + if err != nil { + return fmt.Errorf("error writing trace into file %q: %w", fp, err) + } + return nil +} + +func (p *prober) fetchReport() (*receiver.Report, error) { + exec, err := p.fetchExecution() + if err != nil { + return nil, err + } replayLogs(p.log, exec) - return exec.Report + return exec.Report, nil } func replayLogs(log *zap.SugaredLogger, exec *fetcher.Execution) { @@ -105,14 +197,18 @@ func replayLogs(log *zap.SugaredLogger, exec *fetcher.Execution) { } } -func (p *prober) fetchExecution() *fetcher.Execution { +func (p *prober) fetchExecution() (*fetcher.Execution, error) { ns := p.client.Namespace job := p.deployFetcher() defer p.deleteFetcher(job.Name) pod, err := p.findSucceededPod(job) - p.ensureNoError(err) + if err != nil { + return nil, err + } bytes, err := pkgTest.PodLogs(p.config.Ctx, p.client.Kube, pod.Name, fetcherName, ns) - p.ensureNoError(err) + if err != nil { + return nil, err + } ex := &fetcher.Execution{ Logs: []fetcher.LogEntry{}, Report: &receiver.Report{ @@ -128,8 +224,7 @@ func (p *prober) fetchExecution() *fetcher.Execution { }, } err = json.Unmarshal(bytes, ex) - p.ensureNoError(err) - return ex + return ex, err } func (p *prober) deployFetcher() *batchv1.Job { diff --git a/test/upgrade/prober/wathola/client/receiver.go b/test/upgrade/prober/wathola/client/receiver.go index ed73e67794b..487bc093003 100644 --- a/test/upgrade/prober/wathola/client/receiver.go +++ b/test/upgrade/prober/wathola/client/receiver.go @@ -31,7 +31,7 @@ import ( var log = config.Log // ReceiveEvent represents a function that receive event -type ReceiveEvent func(e cloudevents.Event) +type ReceiveEvent func(ctx context.Context, e cloudevents.Event) // Receive events and push then to passed fn func Receive( @@ -45,6 +45,7 @@ func Receive( opts = append(opts, cloudevents.WithRoundTripper(&ochttp.Transport{ Propagation: tracecontextb3.TraceContextEgress, })) + opts = append(opts, cloudevents.WithMiddleware(tracingMiddleware)) if config.Instance.Readiness.Enabled { readyOpt := cloudevents.WithMiddleware(readinessMiddleware) opts = append(opts, readyOpt) @@ -53,11 +54,7 @@ func Receive( opt := cloudevents.WithMiddleware(m) opts = append(opts, opt) } - http, err := cloudevents.NewHTTP(opts...) - if err != nil { - log.Fatalf("failed to create http transport, %v", err) - } - c, err := cloudevents.NewClient(http) + c, err := cloudevents.NewClientHTTP(opts...) if err != nil { log.Fatalf("failed to create client, %v", err) } @@ -106,3 +103,10 @@ func headersOf(req *nethttp.Request) string { headers := b.String() return strings.ReplaceAll(headers, "\r\n", "; ") } + +func tracingMiddleware(h nethttp.Handler) nethttp.Handler { + return &ochttp.Handler{ + Propagation: tracecontextb3.TraceContextEgress, + Handler: h, + } +} diff --git a/test/upgrade/prober/wathola/config/defaults.go b/test/upgrade/prober/wathola/config/defaults.go index 77f300a61ef..58f8751fce8 100644 --- a/test/upgrade/prober/wathola/config/defaults.go +++ b/test/upgrade/prober/wathola/config/defaults.go @@ -79,6 +79,7 @@ func defaultValues() *Config { Message: "OK", Status: nethttp.StatusOK, }, - LogLevel: zap.InfoLevel.String(), + LogLevel: zap.InfoLevel.String(), + TracingConfig: `{"backend":"none","debug":"false","sample-rate":"0.1"}`, } } diff --git a/test/upgrade/prober/wathola/config/structure.go b/test/upgrade/prober/wathola/config/structure.go index dfbf73f64fe..b0fffa0e238 100644 --- a/test/upgrade/prober/wathola/config/structure.go +++ b/test/upgrade/prober/wathola/config/structure.go @@ -65,9 +65,10 @@ type ReadinessConfig struct { // Config hold complete configuration type Config struct { - Sender SenderConfig - Forwarder ForwarderConfig - Receiver ReceiverConfig - Readiness ReadinessConfig - LogLevel string + Sender SenderConfig + Forwarder ForwarderConfig + Receiver ReceiverConfig + Readiness ReadinessConfig + LogLevel string + TracingConfig string } diff --git a/test/upgrade/prober/wathola/config/tracing.go b/test/upgrade/prober/wathola/config/tracing.go new file mode 100644 index 00000000000..79f5adbc5c1 --- /dev/null +++ b/test/upgrade/prober/wathola/config/tracing.go @@ -0,0 +1,33 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "go.uber.org/zap" + "knative.dev/pkg/tracing" + tracingconfig "knative.dev/pkg/tracing/config" +) + +func SetupTracing() { + config, err := tracingconfig.JSONToTracingConfig(Instance.TracingConfig) + if err != nil { + Log.Warn("Tracing configuration is invalid, using the no-op default", zap.Error(err)) + } + if err = tracing.SetupStaticPublishing(Log, "", config); err != nil { + Log.Fatal("Error setting up trace publishing", zap.Error(err)) + } +} diff --git a/test/upgrade/prober/wathola/event/services.go b/test/upgrade/prober/wathola/event/services.go index 9e805e6b09d..7149dff35a2 100644 --- a/test/upgrade/prober/wathola/event/services.go +++ b/test/upgrade/prober/wathola/event/services.go @@ -159,10 +159,7 @@ func asStrings(errThrown []thrown) []string { func (f *finishedStore) reportViolations(finished *Finished) { steps := f.steps.(*stepStore) for eventNo := 1; eventNo <= finished.EventsSent; eventNo++ { - times, ok := steps.store[eventNo] - if !ok { - times = 0 - } + times := steps.store[eventNo] if times != 1 { throwMethod := f.errors.throwMissing if times > 1 { diff --git a/test/upgrade/prober/wathola/event/tracing.go b/test/upgrade/prober/wathola/event/tracing.go new file mode 100644 index 00000000000..86ecf59d1dc --- /dev/null +++ b/test/upgrade/prober/wathola/event/tracing.go @@ -0,0 +1,80 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package event + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + + "github.com/openzipkin/zipkin-go/model" +) + +const ZipkinTracesEndpoint = "http://localhost:9411/api/v2/traces" + +// FindTrace fetches tracing endpoint and retrieves Zipkin traces matching the annotation query. +func FindTrace(annotationQuery string) ([]byte, error) { + trace := []byte("") + spanModelsAll, err := SendTraceQuery(ZipkinTracesEndpoint, annotationQuery) + if err != nil { + return trace, fmt.Errorf("failed to send trace query %q: %w", annotationQuery, err) + } + if len(spanModelsAll) == 0 { + return trace, fmt.Errorf("no traces found for query %q", annotationQuery) + } + var models []model.SpanModel + for _, m := range spanModelsAll { + models = append(models, m...) + } + b, err := json.MarshalIndent(models, "", " ") + if err != nil { + return trace, fmt.Errorf("failed to marshall span models: %w", err) + } + return b, nil +} + +// SendTraceQuery sends the query to the tracing endpoint and returns all spans matching the query. +func SendTraceQuery(endpoint string, annotationQuery string) ([][]model.SpanModel, error) { + var empty [][]model.SpanModel + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + return empty, err + } + q := req.URL.Query() + q.Add("annotationQuery", annotationQuery) + req.URL.RawQuery = q.Encode() + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return empty, err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return empty, err + } + + var models [][]model.SpanModel + err = json.Unmarshal(body, &models) + if err != nil { + return empty, fmt.Errorf("got an error in unmarshalling JSON %q: %w", body, err) + } + + return models, nil +} diff --git a/test/upgrade/prober/wathola/event/tracing_test.go b/test/upgrade/prober/wathola/event/tracing_test.go new file mode 100644 index 00000000000..31a381ca4bd --- /dev/null +++ b/test/upgrade/prober/wathola/event/tracing_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package event + +import ( + "io" + "net/http" + "net/http/httptest" + + "testing" + + "github.com/openzipkin/zipkin-go/model" + "github.com/stretchr/testify/assert" +) + +func TestTraceParsing(t *testing.T) { + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + // Using a reduced variant of the response at https://zipkin.io/zipkin-api/#/default/get_traces + io.WriteString(w, + `[ + [ + { + "id": "352bff9a74ca9ad2", + "traceId": "5af7183fb1d4cf5f", + "name": "get /api", + "timestamp": 1556604172355737, + "duration": 1431, + "kind": "SERVER", + "tags": { + "http.method": "GET", + "http.path": "/api" + } + }, + { + "id": "352bff9a74ca9ad3", + "traceId": "5af7183fb1d4cf60", + "parentId": "352bff9a74ca9ad2", + "name": "get /api", + "timestamp": 1556604172355737, + "duration": 1431, + "kind": "SERVER", + "tags": { + "http.method": "GET", + "http.path": "/api" + } + } + ] +]`) + })) + defer ts.Close() + trace, err := SendTraceQuery(ts.URL, "") + if err != nil { + t.Fatal(err) + } + assert.Len(t, trace, 1) + assert.Len(t, trace[0], 2) + assert.Equal(t, model.Kind("SERVER"), trace[0][0].Kind) +} diff --git a/test/upgrade/prober/wathola/forwarder/services.go b/test/upgrade/prober/wathola/forwarder/services.go index 97f47f82ab8..582154234c0 100644 --- a/test/upgrade/prober/wathola/forwarder/services.go +++ b/test/upgrade/prober/wathola/forwarder/services.go @@ -19,6 +19,7 @@ import ( "context" cloudevents "github.com/cloudevents/sdk-go/v2" + "go.opencensus.io/trace" "knative.dev/eventing/test/upgrade/prober/wathola/client" "knative.dev/eventing/test/upgrade/prober/wathola/config" "knative.dev/eventing/test/upgrade/prober/wathola/sender" @@ -26,6 +27,10 @@ import ( "time" ) +const ( + Name = "wathola-forwarder" +) + var ( log = config.Log lastProgressReport = time.Now() @@ -35,6 +40,7 @@ var ( // New creates new forwarder func New() Forwarder { config.ReadIfPresent() + config.SetupTracing() f := &forwarder{ count: 0, } @@ -46,10 +52,12 @@ func (f *forwarder) Forward() { client.Receive(port, Canceling, f.forwardEvent) } -func (f *forwarder) forwardEvent(e cloudevents.Event) { +func (f *forwarder) forwardEvent(ctx context.Context, e cloudevents.Event) { target := config.Instance.Forwarder.Target log.Debugf("Forwarding event %v to %v", e.ID(), target) - err := sender.SendEvent(e, target) + ctx, span := trace.StartSpan(ctx, Name) + defer span.End() + err := sender.SendEvent(ctx, e, target) if err != nil { log.Error(err) } diff --git a/test/upgrade/prober/wathola/receiver/services.go b/test/upgrade/prober/wathola/receiver/services.go index f437d6e163c..eec2a0e257a 100644 --- a/test/upgrade/prober/wathola/receiver/services.go +++ b/test/upgrade/prober/wathola/receiver/services.go @@ -22,6 +22,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/wavesoftware/go-ensure" + "go.opencensus.io/trace" "knative.dev/eventing/test/upgrade/prober/wathola/client" "knative.dev/eventing/test/upgrade/prober/wathola/config" "knative.dev/eventing/test/upgrade/prober/wathola/event" @@ -29,6 +30,10 @@ import ( "net/http" ) +const ( + Name = "wathola-receiver" +) + var ( log = config.Log Canceling = make(chan context.CancelFunc, 1) @@ -37,6 +42,7 @@ var ( // New creates new Receiver func New() Receiver { config.ReadIfPresent() + config.SetupTracing() errors := event.NewErrorStore() stepsStore := event.NewStepsStore(errors) finishedStore := event.NewFinishedStore(stepsStore, errors) @@ -49,8 +55,10 @@ func (r receiver) Receive() { client.Receive(port, Canceling, r.receiveEvent, r.reportMiddleware) } -func (r receiver) receiveEvent(e cloudevents.Event) { +func (r receiver) receiveEvent(ctx context.Context, e cloudevents.Event) { log.Debug("Event received: ", e) + _, span := trace.StartSpan(ctx, Name) + defer span.End() t := e.Context.GetType() if t == event.StepType { step := &event.Step{} diff --git a/test/upgrade/prober/wathola/receiver/services_test.go b/test/upgrade/prober/wathola/receiver/services_test.go index 2b17db58ef7..5267eaaf6a8 100644 --- a/test/upgrade/prober/wathola/receiver/services_test.go +++ b/test/upgrade/prober/wathola/receiver/services_test.go @@ -16,6 +16,7 @@ package receiver import ( + "context" "fmt" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -75,6 +76,6 @@ func waitUntilFinished(r *receiver) error { func sendEvent(t *testing.T, e cloudevents.Event, port int) { url := fmt.Sprintf("http://localhost:%v/", port) - err := sender.SendEvent(e, url) + err := sender.SendEvent(context.Background(), e, url) assert.NoError(t, err) } diff --git a/test/upgrade/prober/wathola/sender/operations.go b/test/upgrade/prober/wathola/sender/operations.go index 1594de5d47f..259d375ddcb 100644 --- a/test/upgrade/prober/wathola/sender/operations.go +++ b/test/upgrade/prober/wathola/sender/operations.go @@ -25,6 +25,7 @@ import ( // New creates new Sender func New() Sender { config.ReadIfPresent() + config.SetupTracing() return &sender{ eventsSent: 0, } diff --git a/test/upgrade/prober/wathola/sender/services.go b/test/upgrade/prober/wathola/sender/services.go index 89a5ed70ad8..b5cbaecdf8d 100644 --- a/test/upgrade/prober/wathola/sender/services.go +++ b/test/upgrade/prober/wathola/sender/services.go @@ -22,15 +22,23 @@ import ( "fmt" "strings" + "os" + "os/signal" + "syscall" + "time" + cloudevents "github.com/cloudevents/sdk-go/v2" + cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/wavesoftware/go-ensure" + "go.opencensus.io/plugin/ochttp" + "go.opencensus.io/trace" "knative.dev/eventing/test/upgrade/prober/wathola/config" "knative.dev/eventing/test/upgrade/prober/wathola/event" + "knative.dev/pkg/tracing/propagation/tracecontextb3" +) - "os" - "os/signal" - "syscall" - "time" +const ( + Name = "wathola-sender" ) var ( @@ -38,9 +46,10 @@ var ( // supported by any of the event senders that are registered. ErrEndpointTypeNotSupported = errors.New("given endpoint isn't " + "supported by any registered event sender") - log = config.Log - senderConfig = &config.Instance.Sender - eventSenders = make([]EventSender, 0, 1) + log = config.Log + senderConfig = &config.Instance.Sender + eventSenders = make([]EventSender, 0, 1) + eventSendersWithContext = make([]EventSenderWithContext, 0, 1) ) type sender struct { @@ -54,7 +63,12 @@ type sender struct { func (s *sender) SendContinually() { var shutdownCh = make(chan struct{}) - defer s.sendFinished() + defer func() { + s.sendFinished() + // Give time to send tracing information. + // https://github.com/knative/pkg/issues/2475 + time.Sleep(5 * time.Second) + }() go func() { c := make(chan os.Signal, 1) @@ -119,20 +133,36 @@ func NewCloudEvent(data interface{}, typ string) cloudevents.Event { // ResetEventSenders will reset configured event senders to defaults. func ResetEventSenders() { eventSenders = make([]EventSender, 0, 1) + eventSendersWithContext = make([]EventSenderWithContext, 0, 1) } // RegisterEventSender will register a EventSender to be used. +// Deprecated. Use RegisterEventSenderWithContext. func RegisterEventSender(es EventSender) { eventSenders = append(eventSenders, es) } +// RegisterEventSenderWithContext will register EventSenderWithContext to be used. +func RegisterEventSenderWithContext(es EventSenderWithContext) { + eventSendersWithContext = append(eventSendersWithContext, es) +} + // SendEvent will send cloud event to given url -func SendEvent(ce cloudevents.Event, endpoint interface{}) error { +func SendEvent(ctx context.Context, ce cloudevents.Event, endpoint interface{}) error { + sendersWithCtx := make([]EventSenderWithContext, 0, len(eventSendersWithContext)+1) + sendersWithCtx = append(sendersWithCtx, eventSendersWithContext...) + if len(eventSendersWithContext) == 0 && len(eventSenders) == 0 { + sendersWithCtx = append(sendersWithCtx, httpSender{}) + } + for _, eventSender := range sendersWithCtx { + if eventSender.Supports(endpoint) { + return eventSender.SendEventWithContext(ctx, ce, endpoint) + } + } + // Backwards compatibility. + // TODO: Remove when downstream repositories start using EventSenderWithContext. senders := make([]EventSender, 0, len(eventSenders)+1) senders = append(senders, eventSenders...) - if len(senders) == 0 { - senders = append(senders, httpSender{}) - } for _, eventSender := range senders { if eventSender.Supports(endpoint) { return eventSender.SendEvent(ce, endpoint) @@ -154,14 +184,22 @@ func (h httpSender) Supports(endpoint interface{}) bool { } func (h httpSender) SendEvent(ce cloudevents.Event, endpoint interface{}) error { + return h.SendEventWithContext(context.Background(), ce, endpoint) +} + +func (h httpSender) SendEventWithContext(ctx context.Context, ce cloudevents.Event, endpoint interface{}) error { url := endpoint.(string) - c, err := cloudevents.NewClientHTTP() + opts := []cloudeventshttp.Option{ + cloudevents.WithRoundTripper(&ochttp.Transport{ + Propagation: tracecontextb3.TraceContextEgress, + }), + } + c, err := cloudevents.NewClientHTTP(opts...) if err != nil { return err } - ctx := cloudevents.ContextWithTarget(context.Background(), url) - - result := c.Send(ctx, ce) + ctxWithTarget := cloudevents.ContextWithTarget(ctx, url) + result := c.Send(ctxWithTarget, ce) if cloudevents.IsACK(result) { return nil } @@ -171,9 +209,12 @@ func (h httpSender) SendEvent(ce cloudevents.Event, endpoint interface{}) error func (s *sender) sendStep() error { step := event.Step{Number: s.eventsSent + 1} ce := NewCloudEvent(step, event.StepType) + ctx, span := PopulateSpanWithEvent(context.Background(), ce, Name) + defer span.End() + span.AddAttributes(trace.Int64Attribute("step", int64(step.Number))) endpoint := senderConfig.Address log.Infof("Sending step event #%v to %#v", step.Number, endpoint) - err := SendEvent(ce, endpoint) + err := SendEvent(ctx, ce, endpoint) // Record every request regardless of the result s.totalRequests++ if err != nil { @@ -190,6 +231,18 @@ func (s *sender) sendFinished() { finished := event.Finished{EventsSent: s.eventsSent, TotalRequests: s.totalRequests, UnavailablePeriods: s.unavailablePeriods} endpoint := senderConfig.Address ce := NewCloudEvent(finished, event.FinishedType) + ctx, span := PopulateSpanWithEvent(context.Background(), ce, Name) + defer span.End() log.Infof("Sending finished event (count: %v) to %#v", finished.EventsSent, endpoint) - ensure.NoError(SendEvent(ce, endpoint)) + ensure.NoError(SendEvent(ctx, ce, endpoint)) +} + +func PopulateSpanWithEvent(ctx context.Context, ce cloudevents.Event, spanName string) (context.Context, *trace.Span) { + ctxWithSpan, span := trace.StartSpan(ctx, spanName) + span.AddAttributes( + trace.StringAttribute("cloudevents.type", ce.Type()), + trace.StringAttribute("cloudevents.id", ce.ID()), + trace.StringAttribute("target", config.Instance.Forwarder.Target), + ) + return ctxWithSpan, span } diff --git a/test/upgrade/prober/wathola/sender/services_test.go b/test/upgrade/prober/wathola/sender/services_test.go index 1c16f0895d2..7d2ee8b8dd2 100644 --- a/test/upgrade/prober/wathola/sender/services_test.go +++ b/test/upgrade/prober/wathola/sender/services_test.go @@ -25,11 +25,19 @@ import ( "time" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/openzipkin/zipkin-go/model" "github.com/phayes/freeport" "github.com/stretchr/testify/assert" + "go.opencensus.io/trace" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "knative.dev/eventing/test/upgrade/prober/wathola/client" + "knative.dev/eventing/test/upgrade/prober/wathola/receiver" "knative.dev/eventing/test/upgrade/prober/wathola/sender" + "knative.dev/pkg/tracing" + "knative.dev/pkg/tracing/config" + tracetesting "knative.dev/pkg/tracing/testing" ) func TestHTTPEventSender(t *testing.T) { @@ -38,20 +46,77 @@ func TestHTTPEventSender(t *testing.T) { port := freeport.GetPort() canceling := make(chan context.CancelFunc, 1) events := make([]cloudevents.Event, 0, 1) - go client.Receive(port, canceling, func(e cloudevents.Event) { + go client.Receive(port, canceling, func(ctx context.Context, e cloudevents.Event) { events = append(events, e) }) cancel := <-canceling waitForPort(t, port) - err := sender.SendEvent(ce, fmt.Sprintf("http://localhost:%d", port)) + err := sender.SendEvent(context.Background(), ce, fmt.Sprintf("http://localhost:%d", port)) cancel() assert.NoError(t, err) assert.Len(t, events, 1) assert.Equal(t, ce, events[0]) } +func TestTracePropagation(t *testing.T) { + sender.ResetEventSenders() + reporter, co := tracetesting.FakeZipkinExporter() + oct := tracing.NewOpenCensusTracer(co) + t.Cleanup(func() { + reporter.Close() + oct.Finish() + }) + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: config.ConfigName, + }, + Data: map[string]string{ + "backend": "zipkin", + "zipkin-endpoint": "foo.bar", + "debug": "true", + }, + } + cfg, err := config.NewTracingConfigFromConfigMap(cm) + if err != nil { + t.Fatal("Failed to parse tracing config", err) + } + if err := oct.ApplyConfig(cfg); err != nil { + t.Fatal("Failed to apply tracer config:", err) + } + ce := sender.NewCloudEvent(nil, "cetype") + port := freeport.GetPort() + canceling := make(chan context.CancelFunc, 1) + go client.Receive(port, canceling, func(ctx context.Context, e cloudevents.Event) { + _, span := trace.StartSpan(ctx, receiver.Name) + span.End() + }) + + cancel := <-canceling + waitForPort(t, port) + ctx, span := sender.PopulateSpanWithEvent(context.Background(), ce, sender.Name) + err = sender.SendEvent(ctx, ce, fmt.Sprintf("http://localhost:%d", port)) + cancel() + span.End() + spans := reporter.Flush() + assert.NoError(t, err) + assert.Len(t, spans, 4) + assert.Equal(t, receiver.Name, spans[0].Name) + // Generated by tracing Middleware in Receiver. + assert.Equal(t, model.Server, spans[1].Kind) + // Generated by tracing RoundTripper in Sender. + assert.Equal(t, model.Client, spans[2].Kind) + assert.Equal(t, sender.Name, spans[3].Name) + // Verify the trace is uninterrupted. + for i := 0; i != len(spans)-1; i++ { + assert.NotNil(t, spans[i].ParentID) + if spans[i].ParentID != nil { + assert.Equal(t, *spans[i].ParentID, spans[i+1].ID) + } + } +} + func TestRegisterEventSender(t *testing.T) { - sender.RegisterEventSender(testEventSender{}) + sender.RegisterEventSenderWithContext(testEventSender{}) defer sender.ResetEventSenders() c := testConfig{ topic: "sample", @@ -59,16 +124,16 @@ func TestRegisterEventSender(t *testing.T) { valid: true, } ce := sender.NewCloudEvent(nil, "cetype") - err := sender.SendEvent(ce, c) + err := sender.SendEvent(context.Background(), ce, c) assert.NoError(t, err) } func TestUnsupportedEventSender(t *testing.T) { - sender.RegisterEventSender(testEventSender{}) + sender.RegisterEventSenderWithContext(testEventSender{}) defer sender.ResetEventSenders() ce := sender.NewCloudEvent(nil, "cetype") - err := sender.SendEvent(ce, "https://example.org/") + err := sender.SendEvent(context.Background(), ce, "https://example.org/") assert.Error(t, err) assert.True(t, errors.Is(err, sender.ErrEndpointTypeNotSupported)) @@ -109,6 +174,10 @@ func (t testEventSender) Supports(endpoint interface{}) bool { } func (t testEventSender) SendEvent(ce cloudevents.Event, endpoint interface{}) error { + return t.SendEventWithContext(context.Background(), ce, endpoint) +} + +func (t testEventSender) SendEventWithContext(ctx context.Context, ce cloudevents.Event, endpoint interface{}) error { cfg := endpoint.(testConfig) if cfg.valid { return nil diff --git a/test/upgrade/prober/wathola/sender/types.go b/test/upgrade/prober/wathola/sender/types.go index 26bafd473ca..bdf94eb7535 100644 --- a/test/upgrade/prober/wathola/sender/types.go +++ b/test/upgrade/prober/wathola/sender/types.go @@ -16,18 +16,34 @@ limitations under the License. package sender -import cloudevents "github.com/cloudevents/sdk-go/v2" +import ( + "context" + + cloudevents "github.com/cloudevents/sdk-go/v2" +) // Sender will send messages continuously until process receives a SIGINT type Sender interface { SendContinually() } -// EventSender will be used to send events to configured endpoint. -type EventSender interface { +type EndpointSupporter interface { // Supports will check given endpoint definition and decide if it's valid for // this sender. Supports(endpoint interface{}) bool +} + +// EventSender will be used to send events to configured endpoint. +// Deprecated. Use EventSenderWithContext. +type EventSender interface { + EndpointSupporter // SendEvent will send event to given endpoint. SendEvent(ce cloudevents.Event, endpoint interface{}) error } + +// EventSenderWithContext will be used to send events to configured endpoint, passing a context. +type EventSenderWithContext interface { + EndpointSupporter + // SendEventWithContext will send event to the given endpoint and pass context. + SendEventWithContext(ctx context.Context, ce cloudevents.Event, endpoint interface{}) error +} diff --git a/test/upgrade/upgrade.go b/test/upgrade/upgrade.go index 700ba114c9e..72b55f7810c 100644 --- a/test/upgrade/upgrade.go +++ b/test/upgrade/upgrade.go @@ -17,11 +17,13 @@ limitations under the License. package upgrade import ( + "log" "os" "testing" "knative.dev/eventing/test" testlib "knative.dev/eventing/test/lib" + "knative.dev/pkg/test/zipkin" ) // RunMainTest initializes the flags to run the eventing upgrade tests, and runs the channel tests. @@ -33,5 +35,11 @@ func RunMainTest(m *testing.M) { ComponentFeatureMap: testlib.ChannelFeatureMap, ComponentsToTest: test.EventingFlags.Channels, } - os.Exit(m.Run()) + os.Exit(func() int { + // Any tests may SetupZipkinTracing, it will only actually be done once. This should be the ONLY + // place that cleans it up. If an individual test calls this instead, then it will break other + // tests that need the tracing in place. + defer zipkin.CleanupZipkinTracingSetup(log.Printf) + return m.Run() + }()) } diff --git a/vendor/github.com/openzipkin/zipkin-go/reporter/recorder/recorder.go b/vendor/github.com/openzipkin/zipkin-go/reporter/recorder/recorder.go new file mode 100644 index 00000000000..d1b91bed058 --- /dev/null +++ b/vendor/github.com/openzipkin/zipkin-go/reporter/recorder/recorder.go @@ -0,0 +1,57 @@ +// Copyright 2021 The OpenZipkin Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package recorder implements a reporter to record spans in v2 format. +*/ +package recorder + +import ( + "sync" + + "github.com/openzipkin/zipkin-go/model" +) + +// ReporterRecorder records Zipkin spans. +type ReporterRecorder struct { + mtx sync.Mutex + spans []model.SpanModel +} + +// NewReporter returns a new recording reporter. +func NewReporter() *ReporterRecorder { + return &ReporterRecorder{} +} + +// Send adds the provided span to the span list held by the recorder. +func (r *ReporterRecorder) Send(span model.SpanModel) { + r.mtx.Lock() + r.spans = append(r.spans, span) + r.mtx.Unlock() +} + +// Flush returns all recorded spans and clears its internal span storage +func (r *ReporterRecorder) Flush() []model.SpanModel { + r.mtx.Lock() + spans := r.spans + r.spans = nil + r.mtx.Unlock() + return spans +} + +// Close flushes the reporter +func (r *ReporterRecorder) Close() error { + r.Flush() + return nil +} diff --git a/vendor/knative.dev/pkg/tracing/testing/zipkin.go b/vendor/knative.dev/pkg/tracing/testing/zipkin.go new file mode 100644 index 00000000000..70827738879 --- /dev/null +++ b/vendor/knative.dev/pkg/tracing/testing/zipkin.go @@ -0,0 +1,47 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + openzipkin "github.com/openzipkin/zipkin-go" + zipkinreporter "github.com/openzipkin/zipkin-go/reporter" + "github.com/openzipkin/zipkin-go/reporter/recorder" + "knative.dev/pkg/tracing" + "knative.dev/pkg/tracing/config" +) + +// FakeZipkinExporter is intended to capture the testing boilerplate of building +// up the ConfigOption to pass NewOpenCensusTracer and expose a mechanism for examining +// the traces it would have reported. To set it up, use something like: +// reporter, co := FakeZipkinExporter() +// defer reporter.Close() +// oct := NewOpenCensusTracer(co) +// defer oct.Close() +// // Do stuff. +// spans := reporter.Flush() +// // Check reported spans. +func FakeZipkinExporter() (*recorder.ReporterRecorder, tracing.ConfigOption) { + // Create tracer with reporter recorder + reporter := recorder.NewReporter() + endpoint, _ := openzipkin.NewEndpoint("test", "localhost:1234") + //nolint:staticcheck // This is the new endpoint we're asking clients to use. + exp := tracing.WithZipkinExporter(func(cfg *config.Config) (zipkinreporter.Reporter, error) { + return reporter, nil + }, endpoint) + + return reporter, exp +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 69ca9f1f732..87993c4cb00 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -268,6 +268,7 @@ github.com/openzipkin/zipkin-go/model github.com/openzipkin/zipkin-go/propagation github.com/openzipkin/zipkin-go/reporter github.com/openzipkin/zipkin-go/reporter/http +github.com/openzipkin/zipkin-go/reporter/recorder # github.com/pelletier/go-toml/v2 v2.0.0-beta.2 ## explicit github.com/pelletier/go-toml/v2 @@ -1219,6 +1220,7 @@ knative.dev/pkg/tracing knative.dev/pkg/tracing/config knative.dev/pkg/tracing/propagation knative.dev/pkg/tracing/propagation/tracecontextb3 +knative.dev/pkg/tracing/testing knative.dev/pkg/tracker knative.dev/pkg/version knative.dev/pkg/webhook