Skip to content

Commit

Permalink
backoff when err in reconcile mount pod (#720)
Browse files Browse the repository at this point in the history
Signed-off-by: zwwhdls <[email protected]>
  • Loading branch information
zwwhdls authored Aug 21, 2023
1 parent b94c1ab commit 22bfb0f
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var (
MountLabels = ""
HostIp = ""
KubeletPort = ""
ReconcileTimeout = 1 * time.Minute
ReconcileTimeout = 5 * time.Minute
ReconcilerInterval = 5

CSIPod = corev1.Pod{}
Expand Down
24 changes: 20 additions & 4 deletions pkg/controller/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package controller

import (
"context"
"fmt"
"strconv"
"time"

"golang.org/x/sync/errgroup"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog"
k8sexec "k8s.io/utils/exec"
"k8s.io/utils/mount"
Expand All @@ -30,6 +32,11 @@ import (
"github.com/juicedata/juicefs-csi-driver/pkg/k8sclient"
)

const (
retryPeriod = 5 * time.Second
maxRetryPeriod = 300 * time.Second
)

type PodReconciler struct {
mount.SafeFormatAndMount
*k8sclient.K8sClient
Expand Down Expand Up @@ -64,6 +71,7 @@ func StartReconciler() error {
}

func doReconcile(ks *k8sclient.K8sClient, kc *k8sclient.KubeletClient) {
backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod)
for {
ctx := context.TODO()
timeoutCtx, cancel := context.WithTimeout(context.Background(), config.ReconcileTimeout)
Expand All @@ -89,6 +97,8 @@ func doReconcile(ks *k8sclient.K8sClient, kc *k8sclient.KubeletClient) {
if value, ok := pod.Labels[config.PodTypeKey]; !ok || value != config.PodTypeValue {
continue
}
backOffID := fmt.Sprintf("mountpod/%s", pod.Name)

g.Go(func() error {
mounter := mount.SafeFormatAndMount{
Interface: mount.New(""),
Expand All @@ -104,14 +114,20 @@ func doReconcile(ks *k8sclient.K8sClient, kc *k8sclient.KubeletClient) {
klog.Infof("goroutine of pod %s cancel", pod.Name)
return nil
default:
err := podDriver.Run(ctx, pod)
if err != nil {
klog.Errorf("Driver check pod %s error: %v", pod.Name, err)
if !backOff.IsInBackOffSinceUpdate(backOffID, backOff.Clock.Now()) {
err = podDriver.Run(ctx, pod)
if err != nil {
klog.Errorf("Driver check pod %s error, will retry: %v", pod.Name, err)
backOff.Next(backOffID, time.Now())
return err
}
backOff.Reset(backOffID)
}
return err
}
return nil
})
}
backOff.GC()
g.Wait()
finish:
cancel()
Expand Down
17 changes: 17 additions & 0 deletions pkg/k8sclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"net/url"
"os"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -84,6 +85,22 @@ func NewClient() (*K8sClient, error) {
return nil, status.Error(codes.NotFound, "Can't get kube InClusterConfig")
}
config.Timeout = timeout

if os.Getenv("KUBE_QPS") != "" {
kubeQpsInt, err := strconv.Atoi(os.Getenv("KUBE_QPS"))
if err != nil {
return nil, err
}
config.QPS = float32(kubeQpsInt)
}
if os.Getenv("KUBE_BURST") != "" {
kubeBurstInt, err := strconv.Atoi(os.Getenv("KUBE_BURST"))
if err != nil {
return nil, err
}
config.Burst = kubeBurstInt
}

client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
Expand Down

0 comments on commit 22bfb0f

Please sign in to comment.