diff --git a/cmd/jobsink/main.go b/cmd/jobsink/main.go index a79cf5d7655..b0d66298fec 100644 --- a/cmd/jobsink/main.go +++ b/cmd/jobsink/main.go @@ -20,6 +20,7 @@ import ( "context" "crypto/md5" //nolint:gosec "crypto/tls" + "encoding/hex" "fmt" "log" "net/http" @@ -231,11 +232,11 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - id := toIdHashLabelValue(event.Source(), event.ID()) - logger.Debug("Getting job for event", zap.String("URI", r.RequestURI), zap.String("id", id)) + jobName := toJobName(ref.Name, event.Source(), event.ID()) + logger.Debug("Getting job for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) jobs, err := h.k8s.BatchV1().Jobs(js.GetNamespace()).List(r.Context(), metav1.ListOptions{ - LabelSelector: jobLabelSelector(ref, id), + LabelSelector: jobLabelSelector(ref, jobName), Limit: 1, }) if err != nil { @@ -256,56 +257,21 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - jobName := kmeta.ChildName(ref.Name, id) - - logger.Debug("Creating secret for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) - - jobSinkUID := js.GetUID() - - or := metav1.OwnerReference{ - APIVersion: sinksv.SchemeGroupVersion.String(), - Kind: sinks.JobSinkResource.Resource, - Name: js.GetName(), - UID: jobSinkUID, - Controller: ptr.Bool(true), - BlockOwnerDeletion: ptr.Bool(false), - } - - secret := &corev1.Secret{ - TypeMeta: metav1.TypeMeta{}, - ObjectMeta: metav1.ObjectMeta{ - Name: jobName, - Namespace: ref.Namespace, - Labels: map[string]string{ - sinks.JobSinkIDLabel: id, - sinks.JobSinkNameLabel: ref.Name, - }, - OwnerReferences: []metav1.OwnerReference{or}, - }, - Immutable: ptr.Bool(true), - Data: map[string][]byte{"event": eventBytes}, - Type: corev1.SecretTypeOpaque, - } - - _, err = h.k8s.CoreV1().Secrets(ref.Namespace).Create(r.Context(), secret, metav1.CreateOptions{}) - if err != nil && !apierrors.IsAlreadyExists(err) { - logger.Warn("Failed to create secret", zap.Error(err)) - - w.Header().Add("Reason", err.Error()) - w.WriteHeader(http.StatusInternalServerError) - return - } - - logger.Debug("Creating job for event", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) - job := js.Spec.Job.DeepCopy() job.Name = jobName if job.Labels == nil { job.Labels = make(map[string]string, 4) } - job.Labels[sinks.JobSinkIDLabel] = id + job.Labels[sinks.JobSinkIDLabel] = jobName job.Labels[sinks.JobSinkNameLabel] = ref.Name - job.OwnerReferences = append(job.OwnerReferences, or) + job.OwnerReferences = append(job.OwnerReferences, metav1.OwnerReference{ + APIVersion: sinksv.SchemeGroupVersion.String(), + Kind: sinks.JobSinkResource.Resource, + Name: js.GetName(), + UID: js.GetUID(), + Controller: ptr.Bool(true), + BlockOwnerDeletion: ptr.Bool(false), + }) var mountPathName string for i := range job.Spec.Template.Spec.Containers { found := false @@ -346,14 +312,66 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { }) } - _, err = h.k8s.BatchV1().Jobs(ref.Namespace).Create(r.Context(), job, metav1.CreateOptions{}) - if err != nil { + logger.Debug("Creating job for event", + zap.String("URI", r.RequestURI), + zap.String("jobName", jobName), + zap.Any("job", job), + ) + + createdJob, err := h.k8s.BatchV1().Jobs(ref.Namespace).Create(r.Context(), job, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { logger.Warn("Failed to create job", zap.Error(err)) w.Header().Add("Reason", err.Error()) w.WriteHeader(http.StatusInternalServerError) return } + if apierrors.IsAlreadyExists(err) { + logger.Debug("Job already exists", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) + } + + secret := &corev1.Secret{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: ref.Namespace, + Labels: map[string]string{ + sinks.JobSinkIDLabel: jobName, + sinks.JobSinkNameLabel: ref.Name, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "batch/v1", + Kind: "Job", + Name: createdJob.Name, + UID: createdJob.UID, + Controller: ptr.Bool(true), + BlockOwnerDeletion: ptr.Bool(false), + }, + }, + }, + Immutable: ptr.Bool(true), + Data: map[string][]byte{"event": eventBytes}, + Type: corev1.SecretTypeOpaque, + } + + logger.Debug("Creating secret for event", + zap.String("URI", r.RequestURI), + zap.String("jobName", jobName), + zap.Any("secret.metadata", secret.ObjectMeta), + ) + + _, err = h.k8s.CoreV1().Secrets(ref.Namespace).Create(r.Context(), secret, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + logger.Warn("Failed to create secret", zap.Error(err)) + + w.Header().Add("Reason", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + return + } + if apierrors.IsAlreadyExists(err) { + logger.Debug("Secret already exists", zap.String("URI", r.RequestURI), zap.String("jobName", jobName)) + } w.Header().Add("Location", locationHeader(ref, event.Source(), event.ID())) w.WriteHeader(http.StatusAccepted) @@ -391,8 +409,7 @@ func (h *Handler) handleGet(ctx context.Context, w http.ResponseWriter, r *http. eventSource := parts[6] eventID := parts[8] - id := toIdHashLabelValue(eventSource, eventID) - jobName := kmeta.ChildName(ref.Name, id) + jobName := toJobName(ref.Name, eventSource, eventID) job, err := h.k8s.BatchV1().Jobs(ref.Namespace).Get(r.Context(), jobName, metav1.GetOptions{}) if err != nil { @@ -445,6 +462,7 @@ func jobLabelSelector(ref types.NamespacedName, id string) string { return fmt.Sprintf("%s=%s,%s=%s", sinks.JobSinkIDLabel, id, sinks.JobSinkNameLabel, ref.Name) } -func toIdHashLabelValue(source, id string) string { - return utils.ToDNS1123Subdomain(fmt.Sprintf("%s", md5.Sum([]byte(fmt.Sprintf("%s-%s", source, id))))) //nolint:gosec +func toJobName(js string, source, id string) string { + h := md5.Sum([]byte(source + id)) //nolint:gosec + return kmeta.ChildName(js+"-", utils.ToDNS1123Subdomain(hex.EncodeToString(h[:]))) } diff --git a/cmd/jobsink/main_test.go b/cmd/jobsink/main_test.go new file mode 100644 index 00000000000..1c1e5741eee --- /dev/null +++ b/cmd/jobsink/main_test.go @@ -0,0 +1,106 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "testing" + + "k8s.io/apimachinery/pkg/api/validation" + + "knative.dev/eventing/pkg/utils" +) + +type testCase struct { + JobSinkName string + Source string + Id string +} + +func TestToJobName(t *testing.T) { + testcases := []testCase{ + { + JobSinkName: "job-sink-success", + Source: "mysource3/myservice", + Id: "2234-5678", + }, + { + JobSinkName: "a", + Source: "0", + Id: "0", + }, + } + + for _, tc := range testcases { + t.Run(tc.JobSinkName+"_"+tc.Source+"_"+tc.Id, func(t *testing.T) { + if errs := validation.NameIsDNS1035Label(tc.JobSinkName, false); len(errs) != 0 { + t.Errorf("Invalid JobSinkName: %v", errs) + } + + name := toJobName(tc.JobSinkName, tc.Source, tc.Id) + doubleName := toJobName(tc.JobSinkName, tc.Source, tc.Id) + if name != doubleName { + t.Errorf("Before: %q, after: %q", name, doubleName) + } + + if got := utils.ToDNS1123Subdomain(name); got != name { + t.Errorf("ToDNS1123Subdomain(Want) returns a different result, Want: %q, Got: %q", name, got) + } + + if errs := validation.NameIsDNS1035Label(name, false); len(errs) != 0 { + t.Errorf("toJobName produced invalid name %q given %q, %q, %q: errors: %#v", name, tc.JobSinkName, tc.Source, tc.Id, errs) + } + }) + } +} + +func FuzzToJobName(f *testing.F) { + testcases := []testCase{ + { + JobSinkName: "job-sink-success", + Source: "mysource3/myservice", + Id: "2234-5678", + }, + { + JobSinkName: "a", + Source: "0", + Id: "0", + }, + } + + for _, tc := range testcases { + f.Add(tc.JobSinkName, tc.Source, tc.Id) // Use f.Add to provide a seed corpus + } + f.Fuzz(func(t *testing.T, js, source, id string) { + if errs := validation.NameIsDNSLabel(js, false); len(errs) != 0 { + t.Skip("Prerequisite: invalid jobsink name") + } + + name := toJobName(js, source, id) + doubleName := toJobName(js, source, id) + if name != doubleName { + t.Errorf("Before: %q, after: %q", name, doubleName) + } + + if got := utils.ToDNS1123Subdomain(name); got != name { + t.Errorf("ToDNS1123Subdomain(Want) returns a different result, Want: %q, Got: %q", name, got) + } + + if errs := validation.NameIsDNSLabel(name, false); len(errs) != 0 { + t.Errorf("toJobName produced invalid name %q given %q, %q, %q: errors: %#v", name, js, source, id, errs) + } + }) +} diff --git a/hack/e2e-debug.sh b/hack/e2e-debug.sh index b9650e467b3..b2bc01bccd6 100755 --- a/hack/e2e-debug.sh +++ b/hack/e2e-debug.sh @@ -35,4 +35,4 @@ wait_until_pods_running knative-eventing || fail_test "Pods in knative-eventing header "Running tests" -go test -tags=e2e -v -timeout=30m -run="${test_name}" "${test_dir}" || fail_test "Test(s) failed" +go test -tags=e2e -v -timeout=30m -parallel=12 -run="${test_name}" "${test_dir}" || fail_test "Test(s) failed" diff --git a/test/rekt/features/jobsink/jobsink.go b/test/rekt/features/jobsink/jobsink.go index c872dc64f46..4ad54779160 100644 --- a/test/rekt/features/jobsink/jobsink.go +++ b/test/rekt/features/jobsink/jobsink.go @@ -24,6 +24,7 @@ import ( cetest "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" @@ -43,11 +44,14 @@ import ( "knative.dev/eventing/test/rekt/resources/jobsink" ) -func Success() *feature.Feature { +func Success(jobSinkName string) *feature.Feature { f := feature.NewFeature() sink := feature.MakeRandomK8sName("sink") jobSink := feature.MakeRandomK8sName("jobsink") + if jobSinkName != "" { + jobSink = jobSinkName + } source := feature.MakeRandomK8sName("source") event := cetest.FullEvent() @@ -83,6 +87,32 @@ func Success() *feature.Feature { return f } +func DeleteJobsCascadeSecretsDeletion(jobSink string) *feature.Feature { + f := feature.NewFeature() + + f.Setup("Prerequisite: At least one secret for jobsink present", verifySecretsForJobSink(jobSink, func(secrets *corev1.SecretList) bool { + return len(secrets.Items) > 0 + })) + + f.Requirement("delete jobs for jobsink", func(ctx context.Context, t feature.T) { + policy := metav1.DeletePropagationBackground + err := kubeclient.Get(ctx).BatchV1(). + Jobs(environment.FromContext(ctx).Namespace()). + DeleteCollection(ctx, metav1.DeleteOptions{PropagationPolicy: &policy}, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", sinks.JobSinkNameLabel, jobSink), + }) + if err != nil { + t.Error(err) + } + }) + + f.Assert("No secrets for jobsink are present", verifySecretsForJobSink(jobSink, func(secrets *corev1.SecretList) bool { + return len(secrets.Items) == 0 + })) + + return f +} + func SuccessTLS() *feature.Feature { f := feature.NewFeature() @@ -239,3 +269,27 @@ func AtLeastOneJobIsComplete(jobSinkName string) feature.StepFn { t.Errorf("No job is complete:\n%v", string(bytes)) } } + +func verifySecretsForJobSink(jobSink string, verify func(secrets *corev1.SecretList) bool) feature.StepFn { + return func(ctx context.Context, t feature.T) { + + interval, timeout := environment.PollTimingsFromContext(ctx) + lastSecretList := &corev1.SecretList{} + err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) { + var err error + lastSecretList, err = kubeclient.Get(ctx).CoreV1(). + Secrets(environment.FromContext(ctx).Namespace()). + List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", sinks.JobSinkNameLabel, jobSink), + }) + if err != nil { + return false, fmt.Errorf("failed to list secrets: %w", err) + } + return verify(lastSecretList), nil + }) + if err != nil { + bytes, _ := json.Marshal(lastSecretList) + t.Errorf("failed to wait for no secrets: %v\nSecret list:\n%s", err, string(bytes)) + } + } +} diff --git a/test/rekt/job_sink_test.go b/test/rekt/job_sink_test.go index 3263c450a9a..f933bdc65c1 100644 --- a/test/rekt/job_sink_test.go +++ b/test/rekt/job_sink_test.go @@ -46,7 +46,23 @@ func TestJobSinkSuccess(t *testing.T) { environment.Managed(t), ) - env.Test(ctx, t, jobsink.Success()) + env.Test(ctx, t, jobsink.Success("")) +} + +func TestJobSinkDeleteJobCascadeSecretDeletion(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + jobSinkName := feature.MakeRandomK8sName("jobsink") + env.Test(ctx, t, jobsink.Success(jobSinkName)) + env.Test(ctx, t, jobsink.DeleteJobsCascadeSecretsDeletion(jobSinkName)) } func TestJobSinkSuccessTLS(t *testing.T) {