From a45f5d11e104aee42cece298958ef95b446bf388 Mon Sep 17 00:00:00 2001 From: Mrudul Harwani Date: Thu, 29 Jun 2023 02:29:06 -0700 Subject: [PATCH 1/6] fix: shutdown logger after container process exits Signed-off-by: Mrudul Harwani --- pkg/cmd/container/create.go | 9 +++--- pkg/logging/json_logger.go | 63 ++++++++++++++++++++++--------------- pkg/logging/logging.go | 61 ++++++++++++++++++++++++++++++----- 3 files changed, 95 insertions(+), 38 deletions(-) diff --git a/pkg/cmd/container/create.go b/pkg/cmd/container/create.go index 88246e3d7b3..d70c4cea86d 100644 --- a/pkg/cmd/container/create.go +++ b/pkg/cmd/container/create.go @@ -161,7 +161,7 @@ func Create(ctx context.Context, client *containerd.Client, args []string, netMa // 1, nerdctl run --name demo -it imagename // 2, ctrl + c to stop demo container // 3, nerdctl start/restart demo - logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, options.GOptions.Namespace) + logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, &options.GOptions) if err != nil { return nil, nil, err } @@ -655,8 +655,9 @@ func writeCIDFile(path, id string) error { } // generateLogConfig creates a LogConfig for the current container store -func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, ns string) (logConfig logging.LogConfig, err error) { +func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, gOpt *types.GlobalCommandOptions) (logConfig logging.LogConfig, err error) { var u *url.URL + logConfig.HostAddress = gOpt.Address if u, err = url.Parse(logDriver); err == nil && u.Scheme != "" { logConfig.LogURI = logDriver } else { @@ -674,7 +675,7 @@ func generateLogConfig(dataStore string, id string, logDriver string, logOpt []s if err != nil { return } - if err = logDriverInst.Init(dataStore, ns, id); err != nil { + if err = logDriverInst.Init(dataStore, gOpt.Namespace, id); err != nil { return } @@ -683,7 +684,7 @@ func generateLogConfig(dataStore string, id string, logDriver string, logOpt []s return } - logConfigFilePath := logging.LogConfigFilePath(dataStore, ns, id) + logConfigFilePath := logging.LogConfigFilePath(dataStore, gOpt.Namespace, id) if err = os.WriteFile(logConfigFilePath, logConfigB, 0600); err != nil { return } diff --git a/pkg/logging/json_logger.go b/pkg/logging/json_logger.go index 5e61c615d93..4f3c4522ed9 100644 --- a/pkg/logging/json_logger.go +++ b/pkg/logging/json_logger.go @@ -157,37 +157,46 @@ func viewLogsJSONFileDirect(lvopts LogViewOptions, jsonLogFilePath string, stdou return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) } fin.Close() + + readFromLastPos := func() error { + // Re-open the file and seek to the last-consumed offset. + fin, err = os.OpenFile(jsonLogFilePath, os.O_RDONLY, 0400) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while trying to re-open JSON logfile %q: %s", jsonLogFilePath, err) + } + _, err = fin.Seek(lastPos, 0) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) + } + + err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while doing follow-up decoding of JSON logfile %q at starting position %d: %s", jsonLogFilePath, lastPos, err) + } + + // Record current file seek position before looping again. + lastPos, err = fin.Seek(0, io.SeekCurrent) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while trying to seek JSON logfile %q at current position: %s", jsonLogFilePath, err) + } + fin.Close() + return nil + } + for { select { case <-stopChannel: - logrus.Debugf("received stop signal while re-reading JSON logfile, returning") - return nil + logrus.Debugf("received stop signal, re-reading JSON logfile and returning") + // read final logs before returning + return readFromLastPos() default: - // Re-open the file and seek to the last-consumed offset. - fin, err = os.OpenFile(jsonLogFilePath, os.O_RDONLY, 0400) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while trying to re-open JSON logfile %q: %s", jsonLogFilePath, err) + if err = readFromLastPos(); err != nil { + return err } - _, err = fin.Seek(lastPos, 0) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) - } - - err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while doing follow-up decoding of JSON logfile %q at starting position %d: %s", jsonLogFilePath, lastPos, err) - } - - // Record current file seek position before looping again. - lastPos, err = fin.Seek(0, io.SeekCurrent) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while trying to seek JSON logfile %q at current position: %s", jsonLogFilePath, err) - } - fin.Close() } // Give the OS a second to breathe before re-opening the file: time.Sleep(time.Second) @@ -224,6 +233,8 @@ func viewLogsJSONFileThroughTailExec(lvopts LogViewOptions, jsonLogFilePath stri // Setup killing goroutine: go func() { <-stopChannel + // sleep 100ms to get logs post container exit + time.Sleep(100 * time.Millisecond) logrus.Debugf("killing tail logs process with PID: %d", cmd.Process.Pid) cmd.Process.Kill() }() diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index f81180503f0..cf8eb9b8ad8 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -28,6 +28,7 @@ import ( "sort" "sync" + "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/runtime/v2/logging" "github.com/sirupsen/logrus" @@ -113,9 +114,10 @@ func Main(argv2 string) error { // LogConfig is marshalled as "log-config.json" type LogConfig struct { - Driver string `json:"driver"` - Opts map[string]string `json:"opts,omitempty"` - LogURI string `json:"-"` + Driver string `json:"driver"` + Opts map[string]string `json:"opts,omitempty"` + HostAddress string `json:"host"` + LogURI string `json:"-"` } // LogConfigFilePath returns the path of log-config.json @@ -140,10 +142,47 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) { return logConfig, nil } -func loggingProcessAdapter(driver Driver, dataStore string, config *logging.Config) error { +// getContainerWait loads the container from ID and returns its wait channel +func getContainerWait(ctx context.Context, hostAddress string, config *logging.Config) (<-chan containerd.ExitStatus, error) { + client, err := containerd.New(hostAddress, containerd.WithDefaultNamespace(config.Namespace)) + if err != nil { + return nil, err + } + con, err := client.LoadContainer(ctx, config.ID) + if err != nil { + return nil, err + } + task, err := con.Task(ctx, nil) + if err != nil { + return nil, err + } + return task.Wait(ctx) +} + +func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAddress string, config *logging.Config) error { if err := driver.PreProcess(dataStore, config); err != nil { return err } + exit, err := getContainerWait(ctx, hostAddress, config) + if err != nil { + return err + } + + // initialize goroutines to copy stdout and stderr streams to a closable pipe + stdoutR, stdoutW := io.Pipe() + stderrR, stderrW := io.Pipe() + copyStream := func(reader io.Reader, writer *io.PipeWriter) { + // copy using a buffer of size 16K + buf := make([]byte, 16*1024) + _, err := io.CopyBuffer(writer, reader, buf) + if err != nil { + logrus.Errorf("failed to copy stream: %s", err) + } + } + go copyStream(config.Stdout, stdoutW) + go copyStream(config.Stderr, stderrW) + + // scan and process logs from pipes var wg sync.WaitGroup wg.Add(3) stdout := make(chan string, 10000) @@ -161,12 +200,18 @@ func loggingProcessAdapter(driver Driver, dataStore string, config *logging.Conf } } - go processLogFunc(config.Stdout, stdout) - go processLogFunc(config.Stderr, stderr) + go processLogFunc(stdoutR, stdout) + go processLogFunc(stderrR, stderr) go func() { defer wg.Done() driver.Process(stdout, stderr) }() + go func() { + // close stdout and stderr upon container exit + <-exit + stdoutW.Close() + stderrW.Close() + }() wg.Wait() return driver.PostProcess() } @@ -175,7 +220,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { if dataStore == "" { return nil, errors.New("got empty data store") } - return func(_ context.Context, config *logging.Config, ready func() error) error { + return func(ctx context.Context, config *logging.Config, ready func() error) error { if config.Namespace == "" || config.ID == "" { return errors.New("got invalid config") } @@ -193,7 +238,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { return err } - return loggingProcessAdapter(driver, dataStore, config) + return loggingProcessAdapter(ctx, driver, dataStore, logConfig.HostAddress, config) } else if !errors.Is(err, os.ErrNotExist) { // the file does not exist if the container was created with nerdctl < 0.20 return err From 5d5aa5c6f0f6eba9908dc7cd29c7962606d5a483 Mon Sep 17 00:00:00 2001 From: Mrudul Harwani Date: Thu, 29 Jun 2023 11:41:16 -0700 Subject: [PATCH 2/6] add tests for the logging changes Signed-off-by: Mrudul Harwani --- cmd/nerdctl/container_logs_test.go | 46 ++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/cmd/nerdctl/container_logs_test.go b/cmd/nerdctl/container_logs_test.go index 3f78ba8e2ce..b5bac2fea65 100644 --- a/cmd/nerdctl/container_logs_test.go +++ b/cmd/nerdctl/container_logs_test.go @@ -143,3 +143,49 @@ func TestLogsWithFailingContainer(t *testing.T) { base.Cmd("logs", "-f", containerName).AssertNoOut("baz") base.Cmd("rm", "-f", containerName).AssertOK() } + +func TestLogsWithRunningContainer(t *testing.T) { + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", containerName).Run() + expected := []string{} + for i := 1; i <= 10; i++ { + expected = append(expected, fmt.Sprint(i)) + } + + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "sh", "-euc", "for i in `seq 1 10`; do echo $i; sleep 1; done").AssertOK() + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + base.Cmd("rm", "-f", containerName).AssertOK() +} + +func TestLogsWithoutNewline(t *testing.T) { + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", containerName).Run() + expected := []string{"Hello World!", "There is no newline"} + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "printf", "'Hello World!\nThere is no newline'").AssertOK() + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + base.Cmd("rm", "-f", containerName).AssertOK() +} + +func TestLogsAfterRestartingContainer(t *testing.T) { + t.Parallel() + base := testutil.NewBase(t) + containerName := testutil.Identifier(t) + defer base.Cmd("rm", containerName).Run() + base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, + "printf", "'Hello World!\nThere is no newline'").AssertOK() + expected := []string{"Hello World!", "There is no newline"} + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + // restart and check logs again + base.Cmd("start", containerName) + time.Sleep(3 * time.Second) + base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) + base.Cmd("rm", "-f", containerName).AssertOK() +} From 6316ae99bc463a389feef2ea28364bdb00c8920f Mon Sep 17 00:00:00 2001 From: Mrudul Harwani Date: Thu, 29 Jun 2023 12:39:56 -0700 Subject: [PATCH 3/6] wait after container exit in logs cmd instead of json_logger for unprocessed data Signed-off-by: Mrudul Harwani --- pkg/cmd/container/logs.go | 3 ++ pkg/logging/json_logger.go | 63 ++++++++++++++++---------------------- 2 files changed, 29 insertions(+), 37 deletions(-) diff --git a/pkg/cmd/container/logs.go b/pkg/cmd/container/logs.go index c971a15d480..4e7a4f8fc1f 100644 --- a/pkg/cmd/container/logs.go +++ b/pkg/cmd/container/logs.go @@ -22,6 +22,7 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" @@ -90,6 +91,8 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti // Setup goroutine to send stop event if container task finishes: go func() { <-waitCh + // wait 100ms to let logViewer process data sent after container exit, if any + time.Sleep(100 * time.Millisecond) logrus.Debugf("container task has finished, sending kill signal to log viewer") stopChannel <- os.Interrupt }() diff --git a/pkg/logging/json_logger.go b/pkg/logging/json_logger.go index 4f3c4522ed9..5e61c615d93 100644 --- a/pkg/logging/json_logger.go +++ b/pkg/logging/json_logger.go @@ -157,46 +157,37 @@ func viewLogsJSONFileDirect(lvopts LogViewOptions, jsonLogFilePath string, stdou return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) } fin.Close() - - readFromLastPos := func() error { - // Re-open the file and seek to the last-consumed offset. - fin, err = os.OpenFile(jsonLogFilePath, os.O_RDONLY, 0400) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while trying to re-open JSON logfile %q: %s", jsonLogFilePath, err) - } - _, err = fin.Seek(lastPos, 0) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) - } - - err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while doing follow-up decoding of JSON logfile %q at starting position %d: %s", jsonLogFilePath, lastPos, err) - } - - // Record current file seek position before looping again. - lastPos, err = fin.Seek(0, io.SeekCurrent) - if err != nil { - fin.Close() - return fmt.Errorf("error occurred while trying to seek JSON logfile %q at current position: %s", jsonLogFilePath, err) - } - fin.Close() - return nil - } - for { select { case <-stopChannel: - logrus.Debugf("received stop signal, re-reading JSON logfile and returning") - // read final logs before returning - return readFromLastPos() + logrus.Debugf("received stop signal while re-reading JSON logfile, returning") + return nil default: - if err = readFromLastPos(); err != nil { - return err + // Re-open the file and seek to the last-consumed offset. + fin, err = os.OpenFile(jsonLogFilePath, os.O_RDONLY, 0400) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while trying to re-open JSON logfile %q: %s", jsonLogFilePath, err) } + _, err = fin.Seek(lastPos, 0) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while trying to seek JSON logfile %q at position %d: %s", jsonLogFilePath, lastPos, err) + } + + err = jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until, 0) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while doing follow-up decoding of JSON logfile %q at starting position %d: %s", jsonLogFilePath, lastPos, err) + } + + // Record current file seek position before looping again. + lastPos, err = fin.Seek(0, io.SeekCurrent) + if err != nil { + fin.Close() + return fmt.Errorf("error occurred while trying to seek JSON logfile %q at current position: %s", jsonLogFilePath, err) + } + fin.Close() } // Give the OS a second to breathe before re-opening the file: time.Sleep(time.Second) @@ -233,8 +224,6 @@ func viewLogsJSONFileThroughTailExec(lvopts LogViewOptions, jsonLogFilePath stri // Setup killing goroutine: go func() { <-stopChannel - // sleep 100ms to get logs post container exit - time.Sleep(100 * time.Millisecond) logrus.Debugf("killing tail logs process with PID: %d", cmd.Process.Pid) cmd.Process.Kill() }() From 17a33f9a50ce4de8b081430acc499b1c5dff8fed Mon Sep 17 00:00:00 2001 From: Mrudul Harwani Date: Wed, 19 Jul 2023 07:58:12 -0700 Subject: [PATCH 4/6] minor changes to address review feedback Signed-off-by: Mrudul Harwani --- cmd/nerdctl/container_logs_test.go | 17 +++++++---------- pkg/cmd/container/create.go | 10 +++++----- pkg/logging/logging.go | 8 ++++---- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/cmd/nerdctl/container_logs_test.go b/cmd/nerdctl/container_logs_test.go index b5bac2fea65..47e451b505a 100644 --- a/cmd/nerdctl/container_logs_test.go +++ b/cmd/nerdctl/container_logs_test.go @@ -148,36 +148,34 @@ func TestLogsWithRunningContainer(t *testing.T) { t.Parallel() base := testutil.NewBase(t) containerName := testutil.Identifier(t) - defer base.Cmd("rm", containerName).Run() - expected := []string{} - for i := 1; i <= 10; i++ { - expected = append(expected, fmt.Sprint(i)) + defer base.Cmd("rm", "-f", containerName).Run() + expected := make([]string, 10) + for i := 0; i < 10; i++ { + expected[i] = fmt.Sprint(i + 1) } base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, "sh", "-euc", "for i in `seq 1 10`; do echo $i; sleep 1; done").AssertOK() base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) - base.Cmd("rm", "-f", containerName).AssertOK() } -func TestLogsWithoutNewline(t *testing.T) { +func TestLogsWithoutNewlineOrEOF(t *testing.T) { t.Parallel() base := testutil.NewBase(t) containerName := testutil.Identifier(t) - defer base.Cmd("rm", containerName).Run() + defer base.Cmd("rm", "-f", containerName).Run() expected := []string{"Hello World!", "There is no newline"} base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, "printf", "'Hello World!\nThere is no newline'").AssertOK() time.Sleep(3 * time.Second) base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) - base.Cmd("rm", "-f", containerName).AssertOK() } func TestLogsAfterRestartingContainer(t *testing.T) { t.Parallel() base := testutil.NewBase(t) containerName := testutil.Identifier(t) - defer base.Cmd("rm", containerName).Run() + defer base.Cmd("rm", "-f", containerName).Run() base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage, "printf", "'Hello World!\nThere is no newline'").AssertOK() expected := []string{"Hello World!", "There is no newline"} @@ -187,5 +185,4 @@ func TestLogsAfterRestartingContainer(t *testing.T) { base.Cmd("start", containerName) time.Sleep(3 * time.Second) base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...) - base.Cmd("rm", "-f", containerName).AssertOK() } diff --git a/pkg/cmd/container/create.go b/pkg/cmd/container/create.go index d70c4cea86d..a595b9cea91 100644 --- a/pkg/cmd/container/create.go +++ b/pkg/cmd/container/create.go @@ -161,7 +161,7 @@ func Create(ctx context.Context, client *containerd.Client, args []string, netMa // 1, nerdctl run --name demo -it imagename // 2, ctrl + c to stop demo container // 3, nerdctl start/restart demo - logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, &options.GOptions) + logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, options.GOptions.Namespace, options.GOptions.Address) if err != nil { return nil, nil, err } @@ -655,9 +655,9 @@ func writeCIDFile(path, id string) error { } // generateLogConfig creates a LogConfig for the current container store -func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, gOpt *types.GlobalCommandOptions) (logConfig logging.LogConfig, err error) { +func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, ns, hostAddress string) (logConfig logging.LogConfig, err error) { var u *url.URL - logConfig.HostAddress = gOpt.Address + logConfig.HostAddress = hostAddress if u, err = url.Parse(logDriver); err == nil && u.Scheme != "" { logConfig.LogURI = logDriver } else { @@ -675,7 +675,7 @@ func generateLogConfig(dataStore string, id string, logDriver string, logOpt []s if err != nil { return } - if err = logDriverInst.Init(dataStore, gOpt.Namespace, id); err != nil { + if err = logDriverInst.Init(dataStore, ns, id); err != nil { return } @@ -684,7 +684,7 @@ func generateLogConfig(dataStore string, id string, logDriver string, logOpt []s return } - logConfigFilePath := logging.LogConfigFilePath(dataStore, gOpt.Namespace, id) + logConfigFilePath := logging.LogConfigFilePath(dataStore, ns, id) if err = os.WriteFile(logConfigFilePath, logConfigB, 0600); err != nil { return } diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index cf8eb9b8ad8..864dc79db39 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -163,7 +163,7 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAd if err := driver.PreProcess(dataStore, config); err != nil { return err } - exit, err := getContainerWait(ctx, hostAddress, config) + exitCh, err := getContainerWait(ctx, hostAddress, config) if err != nil { return err } @@ -172,8 +172,8 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAd stdoutR, stdoutW := io.Pipe() stderrR, stderrW := io.Pipe() copyStream := func(reader io.Reader, writer *io.PipeWriter) { - // copy using a buffer of size 16K - buf := make([]byte, 16*1024) + // copy using a buffer of size 32K + buf := make([]byte, 32<<10) _, err := io.CopyBuffer(writer, reader, buf) if err != nil { logrus.Errorf("failed to copy stream: %s", err) @@ -208,7 +208,7 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAd }() go func() { // close stdout and stderr upon container exit - <-exit + <-exitCh stdoutW.Close() stderrW.Close() }() From 808dd0c8a994fb9803018311ebb0d85d51389767 Mon Sep 17 00:00:00 2001 From: Mrudul Harwani Date: Sun, 30 Jul 2023 21:44:46 -0700 Subject: [PATCH 5/6] implement method to wait for logger exit Signed-off-by: Mrudul Harwani --- pkg/cmd/container/logs.go | 7 ++++--- pkg/logging/logging.go | 29 +++++++++++++++++++++++++++-- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/pkg/cmd/container/logs.go b/pkg/cmd/container/logs.go index 4e7a4f8fc1f..b41e1d701c5 100644 --- a/pkg/cmd/container/logs.go +++ b/pkg/cmd/container/logs.go @@ -22,7 +22,6 @@ import ( "os" "os/signal" "syscall" - "time" "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" @@ -91,8 +90,10 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti // Setup goroutine to send stop event if container task finishes: go func() { <-waitCh - // wait 100ms to let logViewer process data sent after container exit, if any - time.Sleep(100 * time.Millisecond) + // Wait for logger to process remaining logs after container exit + if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { + logrus.WithError(err).Error("failed to wait for logger shutdown") + } logrus.Debugf("container task has finished, sending kill signal to log viewer") stopChannel <- os.Interrupt }() diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 864dc79db39..0d4d496ec74 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -31,6 +31,7 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/runtime/v2/logging" + "github.com/containerd/nerdctl/pkg/lockutil" "github.com/sirupsen/logrus" ) @@ -142,6 +143,17 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) { return logConfig, nil } +func getLockPath(dataStore, ns, id string) string { + return filepath.Join(dataStore, "containers", ns, id, "logger-lock") +} + +// WaitForLogger waits until the logger has finished executing and processing container logs +func WaitForLogger(dataStore, ns, id string) error { + return lockutil.WithDirLock(getLockPath(dataStore, ns, id), func() error { + return nil + }) +} + // getContainerWait loads the container from ID and returns its wait channel func getContainerWait(ctx context.Context, hostAddress string, config *logging.Config) (<-chan containerd.ExitStatus, error) { client, err := containerd.New(hostAddress, containerd.WithDefaultNamespace(config.Namespace)) @@ -234,11 +246,24 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { if err != nil { return err } - if err := ready(); err != nil { + + lockFile := getLockPath(dataStore, config.Namespace, config.ID) + f, err := os.Create(lockFile) + if err != nil { return err } + defer f.Close() + + // the logger will obtain an exclusive lock on a file until the container is + // stopped and the driver has finished processing all output, + // so that waiting log viewers can be signalled when the process is complete. + return lockutil.WithDirLock(lockFile, func() error { + if err := ready(); err != nil { + return err + } - return loggingProcessAdapter(ctx, driver, dataStore, logConfig.HostAddress, config) + return loggingProcessAdapter(ctx, driver, dataStore, logConfig.HostAddress, config) + }) } else if !errors.Is(err, os.ErrNotExist) { // the file does not exist if the container was created with nerdctl < 0.20 return err From 0050ac57e16be1c618e18f3e1a5b59fbbf06b0b1 Mon Sep 17 00:00:00 2001 From: Mrudul Harwani Date: Mon, 31 Jul 2023 23:48:04 -0700 Subject: [PATCH 6/6] add retry attempts in logger to wait for task start Signed-off-by: Mrudul Harwani --- cmd/nerdctl/container_logs_test.go | 7 +++++ pkg/logging/logging.go | 47 +++++++++++++++++++++++------- 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/cmd/nerdctl/container_logs_test.go b/cmd/nerdctl/container_logs_test.go index 47e451b505a..d7334dc9298 100644 --- a/cmd/nerdctl/container_logs_test.go +++ b/cmd/nerdctl/container_logs_test.go @@ -18,6 +18,7 @@ package main import ( "fmt" + "runtime" "strings" "testing" "time" @@ -160,6 +161,9 @@ func TestLogsWithRunningContainer(t *testing.T) { } func TestLogsWithoutNewlineOrEOF(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("FIXME: test does not work on Windows yet because containerd doesn't send an exit event appropriately after task exit on Windows") + } t.Parallel() base := testutil.NewBase(t) containerName := testutil.Identifier(t) @@ -172,6 +176,9 @@ func TestLogsWithoutNewlineOrEOF(t *testing.T) { } func TestLogsAfterRestartingContainer(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("FIXME: test does not work on Windows yet. Restarting a container fails with: failed to create shim task: hcs::CreateComputeSystem : The requested operation for attach namespace failed.: unknown") + } t.Parallel() base := testutil.NewBase(t) containerName := testutil.Identifier(t) diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 0d4d496ec74..1122be01b4f 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -27,6 +27,7 @@ import ( "path/filepath" "sort" "sync" + "time" "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" @@ -164,21 +165,39 @@ func getContainerWait(ctx context.Context, hostAddress string, config *logging.C if err != nil { return nil, err } + task, err := con.Task(ctx, nil) - if err != nil { + if err == nil { + return task.Wait(ctx) + } + if !errdefs.IsNotFound(err) { return nil, err } - return task.Wait(ctx) + + // If task was not found, it's possible that the container runtime is still being created. + // Retry every 100ms. + ticker := time.NewTicker(100 * time.Millisecond) + for { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("timed out waiting for container task to start") + case <-ticker.C: + task, err = con.Task(ctx, nil) + if err != nil { + if errdefs.IsNotFound(err) { + continue + } + return nil, err + } + return task.Wait(ctx) + } + } } func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAddress string, config *logging.Config) error { if err := driver.PreProcess(dataStore, config); err != nil { return err } - exitCh, err := getContainerWait(ctx, hostAddress, config) - if err != nil { - return err - } // initialize goroutines to copy stdout and stderr streams to a closable pipe stdoutR, stdoutW := io.Pipe() @@ -220,9 +239,15 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAd }() go func() { // close stdout and stderr upon container exit + defer stdoutW.Close() + defer stderrW.Close() + + exitCh, err := getContainerWait(ctx, hostAddress, config) + if err != nil { + logrus.Errorf("failed to get container task wait channel: %v", err) + return + } <-exitCh - stdoutW.Close() - stderrW.Close() }() wg.Wait() return driver.PostProcess() @@ -247,8 +272,8 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { return err } - lockFile := getLockPath(dataStore, config.Namespace, config.ID) - f, err := os.Create(lockFile) + loggerLock := getLockPath(dataStore, config.Namespace, config.ID) + f, err := os.Create(loggerLock) if err != nil { return err } @@ -257,7 +282,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { // the logger will obtain an exclusive lock on a file until the container is // stopped and the driver has finished processing all output, // so that waiting log viewers can be signalled when the process is complete. - return lockutil.WithDirLock(lockFile, func() error { + return lockutil.WithDirLock(loggerLock, func() error { if err := ready(); err != nil { return err }