From 7aa4e8dc34fadcf2b4962d17fe3387a0748f55c8 Mon Sep 17 00:00:00 2001 From: Etienne Perot Date: Sat, 14 Dec 2024 01:29:45 -0800 Subject: [PATCH] HTTPBenchmark: Tolerate probe failures when initially waiting for server. Prior to this change, the benchmark would fail if the HTTP backend server failed to respond as soon as the Kubernetes service was available. PiperOrigin-RevId: 706155955 --- .../benchmarks/httpbench/httpbench.go | 66 ++++++++++++++----- test/kubernetes/testcluster/testcluster.go | 37 +++++++---- 2 files changed, 72 insertions(+), 31 deletions(-) diff --git a/test/kubernetes/benchmarks/httpbench/httpbench.go b/test/kubernetes/benchmarks/httpbench/httpbench.go index a698f08cc2..34f3dce87e 100644 --- a/test/kubernetes/benchmarks/httpbench/httpbench.go +++ b/test/kubernetes/benchmarks/httpbench/httpbench.go @@ -105,16 +105,18 @@ type HTTPBenchmark struct { // Run runs the HTTP-based benchmark. func (h *HTTPBenchmark) Run(ctx context.Context, t *testing.T) { t.Helper() - if err := h.Cluster.WaitForServiceReady(ctx, h.Service); err != nil { + serverWaitCtx, serverWaitCancel := context.WithTimeout(ctx, 10*time.Minute) + if err := h.Cluster.WaitForServiceReady(serverWaitCtx, h.Service); err != nil { t.Fatalf("Failed to wait for service: %v", err) } ip := testcluster.GetIPFromService(h.Service) if ip == "" { t.Fatalf("did not get valid ip: %s", ip) } - if err := h.waitForServer(ctx, ip); err != nil { + if err := h.waitForServer(serverWaitCtx, ip); err != nil { t.Fatalf("Failed to wait for server: %v", err) } + serverWaitCancel() for _, round := range h.Rounds { qpsText := fmt.Sprintf("%d", round.TargetQPS) if round.TargetQPS == InfiniteQPS { @@ -146,7 +148,10 @@ func (h *HTTPBenchmark) runRound(ctx context.Context, t *testing.T, round Round, } defer h.Cluster.DeletePod(ctx, client) - if err := h.Cluster.WaitForPodCompleted(ctx, client); err != nil { + waitCtx, waitCancel := context.WithTimeout(ctx, round.Duration+2*time.Minute) + err = h.Cluster.WaitForPodCompleted(waitCtx, client) + waitCancel() + if err != nil { t.Fatalf("failed to wait for wrk2 pod: %v", err) } @@ -243,21 +248,48 @@ func (h *HTTPBenchmark) getWgetPod(ip string) *v13.Pod { // waitForServer waits for an HTTP server to start responding on the given // IP and port. func (h *HTTPBenchmark) waitForServer(ctx context.Context, ip string) error { - wget, err := h.Cluster.ConfigurePodForClientNodepool(ctx, h.getWgetPod(ip)) - if err != nil { - return fmt.Errorf("failed to configure wget pod for client nodepool: %v", err) - } - wget, err = h.Cluster.CreatePod(ctx, wget) - if err != nil { - return fmt.Errorf("failed to create wget pod: %v", err) - } - defer h.Cluster.DeletePod(ctx, wget) - waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Minute) - defer waitCancel() - if err := h.Cluster.WaitForPodCompleted(waitCtx, wget); err != nil { - return fmt.Errorf("failed to wait for HTTP server %s:%d%s: %v", ip, h.Port, h.Path, err) + lastPhase := v13.PodUnknown + var lastLogs string + for ctx.Err() == nil { + wget, err := h.Cluster.ConfigurePodForClientNodepool(ctx, h.getWgetPod(ip)) + if err != nil { + return fmt.Errorf("failed to configure wget pod for client nodepool: %w", err) + } + wget, err = h.Cluster.CreatePod(ctx, wget) + if err != nil { + return fmt.Errorf("failed to create wget pod: %w", err) + } + waitCtx, waitCancel := context.WithTimeout(ctx, 2*time.Minute) + phase, waitErr := h.Cluster.WaitForPodTerminated(waitCtx, wget) + waitCtxErr := waitCtx.Err() + waitCancel() + if waitErr == nil && phase != v13.PodSucceeded { + logs, err := h.Cluster.ReadPodLogs(ctx, wget) + if err != nil { + _ = h.Cluster.DeletePod(ctx, wget) // Best-effort delete. + return fmt.Errorf("failed to read wget pod logs: %w", err) + } + lastLogs = logs + } + deleteErr := h.Cluster.DeletePod(ctx, wget) + if ctx.Err() != nil { + break + } + if waitCtxErr != nil { + continue + } + if waitErr != nil { + return fmt.Errorf("failed to wait for wget pod: %w", waitErr) + } + if deleteErr != nil { + return fmt.Errorf("failed to delete wget pod: %w", deleteErr) + } + if phase == v13.PodSucceeded { + return nil + } + lastPhase = phase } - return nil + return fmt.Errorf("wget pod still fails after context expiry (last phase: %v; last logs: %q)", lastPhase, lastLogs) } /* diff --git a/test/kubernetes/testcluster/testcluster.go b/test/kubernetes/testcluster/testcluster.go index 123a97b107..8d615a6c7e 100644 --- a/test/kubernetes/testcluster/testcluster.go +++ b/test/kubernetes/testcluster/testcluster.go @@ -451,18 +451,27 @@ func (t *TestCluster) ReadPodLogs(ctx context.Context, pod *v13.Pod) (string, er // WaitForPodRunning is a helper method to wait for a pod to be running. func (t *TestCluster) WaitForPodRunning(ctx context.Context, pod *v13.Pod) error { - return t.doWaitForPod(ctx, pod, v13.PodRunning) + _, err := t.doWaitForPod(ctx, pod, func(p v13.PodPhase) bool { return p == v13.PodRunning }) + return err } // WaitForPodCompleted is a helper method to wait for a pod to be completed. func (t *TestCluster) WaitForPodCompleted(ctx context.Context, pod *v13.Pod) error { - return t.doWaitForPod(ctx, pod, v13.PodSucceeded) + _, err := t.doWaitForPod(ctx, pod, func(p v13.PodPhase) bool { return p == v13.PodSucceeded }) + return err +} + +// WaitForPodTerminated is a helper method to wait for a pod to exit, +// whether it succeeded or failed. +func (t *TestCluster) WaitForPodTerminated(ctx context.Context, pod *v13.Pod) (v13.PodPhase, error) { + return t.doWaitForPod(ctx, pod, func(p v13.PodPhase) bool { return p == v13.PodSucceeded || p == v13.PodFailed }) } // doWaitForPod waits for a pod to complete based on a given v13.PodPhase. -func (t *TestCluster) doWaitForPod(ctx context.Context, pod *v13.Pod, phase v13.PodPhase) error { +func (t *TestCluster) doWaitForPod(ctx context.Context, pod *v13.Pod, phasePredicate func(v13.PodPhase) bool) (v13.PodPhase, error) { podLogger := log.BasicRateLimitedLogger(5 * time.Minute) - startLogTime := time.Now().Add(3 * time.Minute) + startTime := time.Now() + startLogTime := startTime.Add(3 * time.Minute) var p *v13.Pod var err error @@ -472,29 +481,29 @@ func (t *TestCluster) doWaitForPod(ctx context.Context, pod *v13.Pod, phase v13. select { case <-pollCh.C: if p, err = t.GetPod(ctx, pod); err != nil { - return fmt.Errorf("failed to poll pod: %w", err) + return v13.PodUnknown, fmt.Errorf("failed to poll pod: %w", err) } case <-ctx.Done(): - return fmt.Errorf("context expired waiting for pod %q: %w", pod.GetName(), ctx.Err()) + return v13.PodUnknown, fmt.Errorf("context expired waiting for pod %q: %w", pod.GetName(), ctx.Err()) } if p.Status.Reason == v13.PodReasonUnschedulable { - return fmt.Errorf("pod %q failed: reason: %q message: %q", pod.GetName(), p.Status.Reason, p.Status.Message) + return v13.PodPending, fmt.Errorf("pod %q cannot be scheduled: reason: %q message: %q", p.GetName(), p.Status.Reason, p.Status.Message) } for _, c := range p.Status.Conditions { if strings.Contains(c.Reason, v13.PodReasonUnschedulable) { - return fmt.Errorf("pod %q failed: reason: %q message: %q", p.GetName(), c.Reason, c.Message) + return v13.PodPending, fmt.Errorf("pod %q cannot be scheduled: reason: %q message: %q", p.GetName(), c.Reason, c.Message) } } - switch p.Status.Phase { - case v13.PodFailed: - return fmt.Errorf("pod %q failed: %s", pod.GetName(), p.Status.Message) - case phase: - return nil + if phasePredicate(p.Status.Phase) { + return p.Status.Phase, nil + } + if p.Status.Phase == v13.PodFailed { + return v13.PodFailed, fmt.Errorf("pod %q failed: %s", p.GetName(), p.Status.Message) } if time.Now().After(startLogTime) { - podLogger.Infof("Still waiting for pod %q after %v; pod status: %v", pod.GetName(), time.Since(startLogTime), p.Status) + podLogger.Infof("Still waiting for pod %q after %v; pod status: %v", p.GetName(), time.Since(startTime), p.Status) } } }