Skip to content

Commit

Permalink
COMPUTE-6540:Create Kubelet 1.28 image
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwans authored and murongl-lyft committed Mar 8, 2024
1 parent c8dcb00 commit cc8d8ad
Show file tree
Hide file tree
Showing 8 changed files with 421 additions and 35 deletions.
15 changes: 14 additions & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -2651,6 +2651,7 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
// TODO: reconcile being calculated in the config manager is questionable, and avoiding
// extra syncs may no longer be necessary. Reevaluate whether Reconcile and Sync can be
// merged (after resolving the next two TODOs).
sidecarsStatus := status.GetSidecarsStatus(pod)

// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
// TODO: this should be unnecessary today - determine what is the cause for this to
Expand All @@ -2663,6 +2664,18 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
UpdateType: kubetypes.SyncPodSync,
StartTime: start,
})
} else if sidecarsStatus.ContainersWaiting {
// if containers aren't running and the sidecars are all ready trigger a sync so that the containers get started
if sidecarsStatus.SidecarsPresent && sidecarsStatus.SidecarsReady {
klog.InfoS("sidecars: sidecars are ready, dispatching work", "pod", klog.KObj(pod))
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodSync,
StartTime: start,
})
}
}

// After an evicted pod is synced, all dead containers in the pod can be removed.
Expand Down
83 changes: 68 additions & 15 deletions pkg/kubelet/kuberuntime/kuberuntime_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -288,9 +288,11 @@ func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandb
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
// do not record the message in the event so that secrets won't leak from the server.
m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, "PostStartHook failed")
if err := m.killContainer(ctx, pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil); err != nil {
klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod),
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod),
"podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
if err := m.killContainer(ctx, pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, 0); err != nil {
klog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",
container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)
}
return msg, ErrPostStartHook
}
Expand Down Expand Up @@ -673,6 +675,12 @@ func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(ctx context.

l := getContainerInfoFromLabels(s.Labels)
a := getContainerInfoFromAnnotations(s.Annotations)

annotations := make(map[string]string)
if a.Sidecar {
annotations[fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", l.ContainerName)] = "Sidecar"
}

// Notice that the followings are not full spec. The container killing code should not use
// un-restored fields.
pod = &v1.Pod{
Expand All @@ -681,6 +689,7 @@ func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(ctx context.
Name: l.PodName,
Namespace: l.PodNamespace,
DeletionGracePeriodSeconds: a.PodDeletionGracePeriod,
Annotations: annotations,
},
Spec: v1.PodSpec{
TerminationGracePeriodSeconds: a.PodTerminationGracePeriod,
Expand All @@ -702,7 +711,7 @@ func (m *kubeGenericRuntimeManager) restoreSpecsFromContainerLabels(ctx context.
// killContainer kills a container through the following steps:
// * Run the pre-stop lifecycle hooks (if applicable).
// * Stop the container.
func (m *kubeGenericRuntimeManager) killContainer(ctx context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, reason containerKillReason, gracePeriodOverride *int64) error {
func (m *kubeGenericRuntimeManager) killContainer(ctx context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, reason containerKillReason, gracePeriodDuration time.Duration) error {
var containerSpec *v1.Container
if pod != nil {
if containerSpec = kubecontainer.GetContainerSpec(pod, containerName); containerSpec == nil {
Expand All @@ -726,10 +735,17 @@ func (m *kubeGenericRuntimeManager) killContainer(ctx context.Context, pod *v1.P
}
m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeNormal, events.KillingContainer, message)

if gracePeriodOverride != nil {
gracePeriod = *gracePeriodOverride
klog.V(3).InfoS("Killing container with a grace period override", "pod", klog.KObj(pod), "podUID", pod.UID,
"containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)
// Run the pre-stop lifecycle hooks if applicable and if there is enough time to run it
if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 {
gracePeriod = gracePeriod - m.executePreStopHook(ctx, pod, containerID, containerSpec, gracePeriod)
}
// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
if gracePeriod < minimumGracePeriodInSeconds {
gracePeriod = minimumGracePeriodInSeconds
}
if gracePeriodDuration > 0 {
gracePeriod = int64(gracePeriodDuration.Seconds())
klog.V(3).Infof("Killing container %q, but using %d second grace period override", containerID, gracePeriod)
}

// Run the pre-stop lifecycle hooks if applicable and if there is enough time to run it
Expand Down Expand Up @@ -758,18 +774,54 @@ func (m *kubeGenericRuntimeManager) killContainer(ctx context.Context, pod *v1.P
}

// killContainersWithSyncResult kills all pod's containers with sync results.
func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(ctx context.Context, pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (syncResults []*kubecontainer.SyncResult) {
func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(ctx context.Context, pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodDuration time.Duration) (syncResults []*kubecontainer.SyncResult) {
// split out sidecars and non-sidecars
var (
sidecars []*kubecontainer.Container
nonSidecars []*kubecontainer.Container
)
for _, container := range runningPod.Containers {
if isSidecar(pod, container.Name) {
sidecars = append(sidecars, container)
} else {
nonSidecars = append(nonSidecars, container)
}
}

containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers))
wg := sync.WaitGroup{}
// non-sidecars first
start := time.Now()
klog.Infof("Pod: %s, killContainersWithSyncResult: killing %d non-sidecars, %s termination period", runningPod.Name, len(nonSidecars), gracePeriodDuration)
nonSidecarsWg := sync.WaitGroup{}
nonSidecarsWg.Add(len(nonSidecars))
for _, container := range nonSidecars {
go func(container *kubecontainer.Container) {
defer utilruntime.HandleCrash()
defer nonSidecarsWg.Done()
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name)
if err := m.killContainer(ctx, pod, container.ID, container.Name, "Need to kill Pod", reasonUnknown, gracePeriodDuration); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
}
containerResults <- killContainerResult
}(container)
}
nonSidecarsWg.Wait()

wg.Add(len(runningPod.Containers))
for _, container := range runningPod.Containers {
gracePeriodDuration = gracePeriodDuration - time.Since(start)
if gracePeriodDuration < 0 {
gracePeriodDuration = 0
}

// then sidecars
klog.Infof("Pod: %s, killContainersWithSyncResult: killing %d sidecars, %s left", runningPod.Name, len(sidecars), gracePeriodDuration)
wg := sync.WaitGroup{}
wg.Add(len(sidecars))
for _, container := range sidecars {
go func(container *kubecontainer.Container) {
defer utilruntime.HandleCrash()
defer wg.Done()

killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name)
if err := m.killContainer(ctx, pod, container.ID, container.Name, "", reasonUnknown, gracePeriodOverride); err != nil {
if err := m.killContainer(ctx, pod, container.ID, container.Name, "Need to kill Pod", reasonUnknown, gracePeriodDuration); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
// Use runningPod for logging as the pod passed in could be *nil*.
klog.ErrorS(err, "Kill container failed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID,
Expand All @@ -779,6 +831,7 @@ func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(ctx context.Con
}(container)
}
wg.Wait()

close(containerResults)

for containerResult := range containerResults {
Expand Down
15 changes: 7 additions & 8 deletions pkg/kubelet/kuberuntime/kuberuntime_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestKillContainer(t *testing.T) {

for _, test := range tests {
ctx := context.Background()
err := m.killContainer(ctx, test.pod, test.containerID, test.containerName, test.reason, "", &test.gracePeriodOverride)
err := m.killContainer(ctx, test.pod, test.containerID, test.containerName, test.reason, "", time.Duration(test.gracePeriodOverride)*time.Second)
if test.succeed != (err == nil) {
t.Errorf("%s: expected %v, got %v (%v)", test.caseName, test.succeed, (err == nil), err)
}
Expand Down Expand Up @@ -401,8 +401,8 @@ func testLifeCycleHook(t *testing.T, testPod *v1.Pod, testContainer *v1.Containe
// Configured and works as expected
t.Run("PreStop-CMDExec", func(t *testing.T) {
ctx := context.Background()
testContainer.Lifecycle = cmdLifeCycle
m.killContainer(ctx, testPod, cID, "foo", "testKill", "", &gracePeriod)
testPod.Spec.Containers[0].Lifecycle = cmdLifeCycle
m.killContainer(ctx, testPod, cID, "foo", "testKill", "", time.Duration(gracePeriod)*time.Second)
if fakeRunner.Cmd[0] != cmdLifeCycle.PreStop.Exec.Command[0] {
t.Errorf("CMD Prestop hook was not invoked")
}
Expand All @@ -416,8 +416,7 @@ func testLifeCycleHook(t *testing.T, testPod *v1.Pod, testContainer *v1.Containe
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentHTTPGetHandlers, false)()
httpLifeCycle.PreStop.HTTPGet.Port = intstr.IntOrString{}
testContainer.Lifecycle = httpLifeCycle
m.killContainer(ctx, testPod, cID, "foo", "testKill", "", &gracePeriod)

m.killContainer(ctx, testPod, cID, "foo", "testKill", "", time.Duration(gracePeriod)*time.Second)
if fakeHTTP.req == nil || !strings.Contains(fakeHTTP.req.URL.String(), httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Prestop hook was not invoked")
}
Expand All @@ -427,7 +426,7 @@ func testLifeCycleHook(t *testing.T, testPod *v1.Pod, testContainer *v1.Containe
defer func() { fakeHTTP.req = nil }()
httpLifeCycle.PreStop.HTTPGet.Port = intstr.FromInt32(80)
testContainer.Lifecycle = httpLifeCycle
m.killContainer(ctx, testPod, cID, "foo", "testKill", "", &gracePeriod)
m.killContainer(ctx, testPod, cID, "foo", "testKill", "", time.Duration(gracePeriod)*time.Second)

if fakeHTTP.req == nil || !strings.Contains(fakeHTTP.req.URL.String(), httpLifeCycle.PreStop.HTTPGet.Host) {
t.Errorf("HTTP Prestop hook was not invoked")
Expand All @@ -443,7 +442,7 @@ func testLifeCycleHook(t *testing.T, testPod *v1.Pod, testContainer *v1.Containe
testPod.DeletionGracePeriodSeconds = &gracePeriodLocal
testPod.Spec.TerminationGracePeriodSeconds = &gracePeriodLocal

m.killContainer(ctx, testPod, cID, "foo", "testKill", "", &gracePeriodLocal)
m.killContainer(ctx, testPod, cID, "foo", "testKill", "", time.Duration(gracePeriodLocal)*time.Second)

if fakeHTTP.req != nil {
t.Errorf("HTTP Prestop hook Should not execute when gracePeriod is 0")
Expand Down
6 changes: 3 additions & 3 deletions pkg/kubelet/kuberuntime/kuberuntime_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -141,8 +141,8 @@ func (cgc *containerGC) removeOldestN(ctx context.Context, containers []containe
ID: containers[i].id,
}
message := "Container is in unknown state, try killing it before removal"
if err := cgc.manager.killContainer(ctx, nil, id, containers[i].name, message, reasonUnknown, nil); err != nil {
klog.ErrorS(err, "Failed to stop container", "containerID", containers[i].id)
if err := cgc.manager.killContainer(ctx, nil, id, containers[i].name, message, reasonUnknown, 0); err != nil {
klog.Errorf("Failed to stop container %q: %v", containers[i].id, err)
continue
}
}
Expand Down
Loading

0 comments on commit cc8d8ad

Please sign in to comment.