Skip to content

Commit

Permalink
kill the fuse process if the app container has exited long time (#1113)
Browse files Browse the repository at this point in the history
* kill the fuse process if the app container has exited long time
  • Loading branch information
zxh326 authored Sep 19, 2024
1 parent f09818a commit ad57e95
Showing 1 changed file with 54 additions and 2 deletions.
56 changes: 54 additions & 2 deletions pkg/controller/app_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -49,7 +50,7 @@ func NewAppController(client *k8sclient.K8sClient) *AppController {
}

func (a *AppController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
appCtrlLog.V(1).Info("Receive pod %s %s", "name", request.Name, "namespace", request.Namespace)
appCtrlLog.V(1).Info("Receive pod", "name", request.Name, "namespace", request.Namespace)
pod, err := a.K8sClient.GetPod(ctx, request.Name, request.Namespace)
if err != nil && !k8serrors.IsNotFound(err) {
appCtrlLog.Error(err, "get pod error", "name", request.Name)
Expand All @@ -64,12 +65,36 @@ func (a *AppController) Reconcile(ctx context.Context, request reconcile.Request
appCtrlLog.V(1).Info("pod should not in queue", "name", request.Name)
return reconcile.Result{}, nil
}

// get a last terminated container finsh time
// if the time is more than 5 minutes, kill the fuse process
var appContainerExitedTime time.Time
for _, containerStatus := range pod.Status.ContainerStatuses {
if !strings.Contains(containerStatus.Name, config.MountContainerName) {
if containerStatus.State.Terminated != nil {
if containerStatus.State.Terminated.FinishedAt.After(appContainerExitedTime) {
appContainerExitedTime = containerStatus.State.Terminated.FinishedAt.Time
}
}
}
}

if !appContainerExitedTime.IsZero() && time.Since(appContainerExitedTime) > 5*time.Minute {
appCtrlLog.V(1).Info("app container exited more than 5 minutes, kill the mount process, app pod will enter an error phase")
err = a.killFuseProcesss(pod)
if err != nil {
appCtrlLog.Error(err, "kill fuse process error", "name", request.Name)
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

// umount fuse sidecars
err = a.umountFuseSidecars(pod)
if err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil
}

func (a *AppController) umountFuseSidecars(pod *corev1.Pod) (err error) {
Expand Down Expand Up @@ -113,6 +138,33 @@ func (a *AppController) umountFuseSidecar(pod *corev1.Pod, fuseContainer corev1.
}
return
}

func (a *AppController) killFuseProcesss(pod *corev1.Pod) error {
for _, cn := range pod.Spec.Containers {
if strings.Contains(cn.Name, config.MountContainerName) {
if e := a.killFuseProcess(pod, cn); e != nil {
return e
}
}
}
return nil
}

func (a AppController) killFuseProcess(pod *corev1.Pod, fuseContainer corev1.Container) error {
if fuseContainer.Name == "" {
return nil
}
log := klog.NewKlogr().WithName("app-ctrl").WithValues("pod", pod.Name, "namespace", pod.Namespace)
cmd := []string{"sh", "-c", "pkill mount.juicefs"}
log.Info("exec cmd in container of pod", "command", cmd, "cnName", config.MountContainerName)
stdout, stderr, err := a.K8sClient.ExecuteInContainer(pod.Name, pod.Namespace, fuseContainer.Name, cmd)
if err != nil {
return err
}
log.Info("exec cmd result", "stdout", stdout, "stderr", stderr)
return err
}

func (a *AppController) SetupWithManager(mgr ctrl.Manager) error {
c, err := controller.New("app", mgr, controller.Options{Reconciler: a})
if err != nil {
Expand Down

0 comments on commit ad57e95

Please sign in to comment.