Skip to content

Commit

Permalink
This PR introduces graceful shutdown functionality to the Multus daem…
Browse files Browse the repository at this point in the history
…on by adding a /readyz endpoint

That is added alongside the existing /healthz. The /readyz endpoint starts returning 500 once a SIGTERM is received, indicating the daemon is in shutdown mode. During this time, CNI requests can still be processed for a short window. The daemonset configs have been updated to increase terminationGracePeriodSeconds from 10 to 30 seconds, ensuring we have a bit more time for these clean shutdowns.

This addresses a race condition during pod transitions where the readiness check might return true, but a subsequent CNI request could fail if the daemon shuts down too quickly. By introducing the /readyz endpoint and delaying the shutdown, we can handle ongoing CNI requests more gracefully, reducing the risk of disruptions during critical transitions.

Major thanks to @deads2k for the find, identification, fix, and of course, the explanations. Appreciate it.
  • Loading branch information
deads2k authored and dougbtv committed Sep 19, 2024
1 parent 100766d commit 531dec1
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 11 deletions.
20 changes: 17 additions & 3 deletions cmd/multus-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"path/filepath"
"sync"
"syscall"
"time"

utilwait "k8s.io/apimachinery/pkg/util/wait"

Expand All @@ -41,6 +42,10 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// SigtermCancelAfter sets the wait time to cancel after sig term

Check warning on line 45 in cmd/multus-daemon/main.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

comment on exported const SigTermCancelAfter should be of the form "SigTermCancelAfter ..."

Check warning on line 45 in cmd/multus-daemon/main.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

comment on exported const SigTermCancelAfter should be of the form "SigTermCancelAfter ..."
// TODO: This could be a configuration option
const SigTermCancelAfter = 10 * time.Second

func main() {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)

Expand All @@ -58,6 +63,13 @@ func main() {

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
sigTermCtx, sigTermCancel := context.WithCancel(ctx)
isInGracefulShutdownMode := func() bool {
if sigTermCtx.Err() == nil {
return false
}
return true
}

daemonConf, err := cniServerConfig(*configFilePath)
if err != nil {
Expand Down Expand Up @@ -105,7 +117,7 @@ func main() {
}
}

if err := startMultusDaemon(ctx, daemonConf, ignoreReadinessIndicator); err != nil {
if err := startMultusDaemon(ctx, daemonConf, ignoreReadinessIndicator, isInGracefulShutdownMode); err != nil {
logging.Panicf("failed start the multus thick-plugin listener: %v", err)
os.Exit(3)
}
Expand All @@ -123,6 +135,8 @@ func main() {
go func() {
for sig := range signalCh {
logging.Verbosef("caught %v, stopping...", sig)
sigTermCancel()
<-time.After(SigTermCancelAfter)
cancel()
}
}()
Expand All @@ -139,7 +153,7 @@ func main() {
logging.Verbosef("multus daemon is exited")
}

func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf, ignoreReadinessIndicator bool) error {
func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf, ignoreReadinessIndicator bool, isInGracefulShutdownMode func() bool) error {
if user, err := user.Current(); err != nil || user.Uid != "0" {
return fmt.Errorf("failed to run multus-daemon with root: %v, now running in uid: %s", err, user.Uid)
}
Expand All @@ -148,7 +162,7 @@ func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf,
return fmt.Errorf("failed to prepare the cni-socket for communicating with the shim: %w", err)
}

server, err := srv.NewCNIServer(daemonConfig, daemonConfig.ConfigFileContents, ignoreReadinessIndicator)
server, err := srv.NewCNIServer(daemonConfig, daemonConfig.ConfigFileContents, ignoreReadinessIndicator, isInGracefulShutdownMode)
if err != nil {
return fmt.Errorf("failed to create the server: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion deployments/multus-daemonset-crio.yml
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ spec:
mountPath: /host/usr/libexec/cni
- name: multus-cfg
mountPath: /tmp/multus-conf
terminationGracePeriodSeconds: 10
terminationGracePeriodSeconds: 30
volumes:
- name: run
hostPath:
Expand Down
2 changes: 1 addition & 1 deletion deployments/multus-daemonset-thick.yml
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ spec:
- name: cnibin
mountPath: /host/opt/cni/bin
mountPropagation: Bidirectional
terminationGracePeriodSeconds: 10
terminationGracePeriodSeconds: 30
volumes:
- name: cni
hostPath:
Expand Down
2 changes: 1 addition & 1 deletion deployments/multus-daemonset.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ spec:
- name: cnibin
mountPath: /host/opt/cni/bin
mountPropagation: Bidirectional
terminationGracePeriodSeconds: 10
terminationGracePeriodSeconds: 30
volumes:
- name: cni
hostPath:
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ const (

// MultusHealthAPIEndpoint is an endpoint API clients can query to know if they can communicate w/ multus server
MultusHealthAPIEndpoint = "/healthz"

// MultusReadyAPIEndpoint is like health, but starts returning status 500 once a sig-term is received.
MultusReadyAPIEndpoint = "/readyz"
)

// DoCNI sends a CNI request to the CNI server via JSON + HTTP over a root-owned unix socket,
Expand Down Expand Up @@ -100,7 +103,7 @@ func CreateDelegateRequest(cniCommand, cniContainerID, cniNetNS, cniIFName, podN
// WaitUntilAPIReady checks API readiness
func WaitUntilAPIReady(socketPath string) error {
return utilwait.PollImmediate(APIReadyPollDuration, APIReadyPollTimeout, func() (bool, error) {
_, err := DoCNI(GetAPIEndpoint(MultusHealthAPIEndpoint), nil, SocketPath(socketPath))
_, err := DoCNI(GetAPIEndpoint(MultusReadyAPIEndpoint), nil, SocketPath(socketPath))
return err == nil, nil
})
}
Expand Down
23 changes: 20 additions & 3 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func isPerNodeCertEnabled(config *PerNodeCertificate) (bool, error) {
}

// NewCNIServer creates and returns a new Server object which will listen on a socket in the given path
func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreReadinessIndicator bool) (*Server, error) {
func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreReadinessIndicator bool, isInGracefulShutdownMode func() bool) (*Server, error) {
var kubeClient *k8s.ClientInfo
enabled, err := isPerNodeCertEnabled(daemonConfig.PerNodeCertificate)
if enabled {
Expand Down Expand Up @@ -251,10 +251,10 @@ func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreRe
logging.Verbosef("server configured with chroot: %s", daemonConfig.ChrootDir)
}

return newCNIServer(daemonConfig.SocketDir, kubeClient, exec, serverConfig, ignoreReadinessIndicator)
return newCNIServer(daemonConfig.SocketDir, kubeClient, exec, serverConfig, ignoreReadinessIndicator, isInGracefulShutdownMode)
}

func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, servConfig []byte, ignoreReadinessIndicator bool) (*Server, error) {
func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, servConfig []byte, ignoreReadinessIndicator bool, isInGracefulShutdownMode func() bool) (*Server, error) {
informerFactory, podInformer := newPodInformer(kubeClient.Client, os.Getenv("MULTUS_NODE_NAME"))
netdefInformerFactory, netdefInformer := newNetDefInformer(kubeClient.NetClient)
kubeClient.SetK8sClientInformers(podInformer, netdefInformer)
Expand Down Expand Up @@ -344,6 +344,23 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s
w.Header().Set("Content-Type", "application/json")
})))

// handle for '/readyz'
router.HandleFunc(api.MultusReadyAPIEndpoint, promhttp.InstrumentHandlerCounter(s.metrics.requestCounter.MustCurryWith(prometheus.Labels{"handler": api.MultusHealthAPIEndpoint}),
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet && r.Method != http.MethodPost {
http.Error(w, fmt.Sprintf("Method not allowed"), http.StatusMethodNotAllowed)
return
}

if !isInGracefulShutdownMode() {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
} else {
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set("Content-Type", "application/json")
}
})))

// this handle for the rest of above
router.HandleFunc("/", promhttp.InstrumentHandlerCounter(s.metrics.requestCounter.MustCurryWith(prometheus.Labels{"handler": "NotFound"}),
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/thick_cni_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func createFakePod(k8sClient *k8s.ClientInfo, podName string) error {
func startCNIServer(ctx context.Context, runDir string, k8sClient *k8s.ClientInfo, servConfig []byte) (*Server, error) {
const period = 0

cniServer, err := newCNIServer(runDir, k8sClient, &fakeExec{}, servConfig, true)
cniServer, err := newCNIServer(runDir, k8sClient, &fakeExec{}, servConfig, true, func() bool { return false })
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 531dec1

Please sign in to comment.