diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 0d4d496ec74..aec8112688d 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -27,6 +27,7 @@ import ( "path/filepath" "sort" "sync" + "time" "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" @@ -156,29 +157,45 @@ func WaitForLogger(dataStore, ns, id string) error { // getContainerWait loads the container from ID and returns its wait channel func getContainerWait(ctx context.Context, hostAddress string, config *logging.Config) (<-chan containerd.ExitStatus, error) { + // Try to retrieve container task for at most 10 seconds. + // FIXME: It's possible that container runtime takes longer than 10 seconds to be created. + // If taskutil can signal to the logger that the task has been created, this method would be better synchronized + taskTimeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + client, err := containerd.New(hostAddress, containerd.WithDefaultNamespace(config.Namespace)) if err != nil { return nil, err } - con, err := client.LoadContainer(ctx, config.ID) + con, err := client.LoadContainer(taskTimeoutCtx, config.ID) if err != nil { return nil, err } - task, err := con.Task(ctx, nil) - if err != nil { - return nil, err + + for { + select { + case <-taskTimeoutCtx.Done(): + return nil, fmt.Errorf("timed out waiting for container task to start") + default: + task, err := con.Task(taskTimeoutCtx, nil) + if err != nil { + if errdefs.IsNotFound(err) { + // If task was not found, it's possible that the container runtime is still being created. + // Try again in 10ms + time.Sleep(10 * time.Millisecond) + break + } + return nil, err + } + return task.Wait(ctx) + } } - return task.Wait(ctx) } func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAddress string, config *logging.Config) error { if err := driver.PreProcess(dataStore, config); err != nil { return err } - exitCh, err := getContainerWait(ctx, hostAddress, config) - if err != nil { - return err - } // initialize goroutines to copy stdout and stderr streams to a closable pipe stdoutR, stdoutW := io.Pipe() @@ -220,9 +237,14 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAd }() go func() { // close stdout and stderr upon container exit + defer stdoutW.Close() + defer stderrW.Close() + + exitCh, err := getContainerWait(ctx, hostAddress, config) + if err != nil { + return + } <-exitCh - stdoutW.Close() - stderrW.Close() }() wg.Wait() return driver.PostProcess()