Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Carry 2337] shutdown logger after exit #2621

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions cmd/nerdctl/container_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"fmt"
"runtime"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -143,3 +144,55 @@ func TestLogsWithFailingContainer(t *testing.T) {
base.Cmd("logs", "-f", containerName).AssertNoOut("baz")
base.Cmd("rm", "-f", containerName).AssertOK()
}

func TestLogsWithRunningContainer(t *testing.T) {
t.Parallel()
base := testutil.NewBase(t)
containerName := testutil.Identifier(t)
defer base.Cmd("rm", containerName).Run()
expected := []string{}
for i := 1; i <= 10; i++ {
expected = append(expected, fmt.Sprint(i))
}

base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage,
"sh", "-euc", "for i in `seq 1 10`; do echo $i; sleep 1; done").AssertOK()
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
base.Cmd("rm", "-f", containerName).AssertOK()
}

func TestLogsWithoutNewlineOrEOF(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("FIXME: test does not work on Windows yet because containerd doesn't send an exit event appropriately after task exit on Windows")
}
t.Parallel()
base := testutil.NewBase(t)
containerName := testutil.Identifier(t)
defer base.Cmd("rm", containerName).Run()
expected := []string{"Hello World!", "There is no newline"}
base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage,
"printf", "'Hello World!\nThere is no newline'").AssertOK()
time.Sleep(3 * time.Second)
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
base.Cmd("rm", "-f", containerName).AssertOK()
}

func TestLogsAfterRestartingContainer(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("FIXME: test does not work on Windows yet. Restarting a container fails with: failed to create shim task: hcs::CreateComputeSystem <id>: The requested operation for attach namespace failed.: unknown")
}
t.Parallel()
base := testutil.NewBase(t)
containerName := testutil.Identifier(t)
defer base.Cmd("rm", containerName).Run()
base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage,
"printf", "'Hello World!\nThere is no newline'").AssertOK()
expected := []string{"Hello World!", "There is no newline"}
time.Sleep(3 * time.Second)
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
// restart and check logs again
base.Cmd("start", containerName)
time.Sleep(3 * time.Second)
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
base.Cmd("rm", "-f", containerName).AssertOK()
}
5 changes: 3 additions & 2 deletions pkg/cmd/container/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func Create(ctx context.Context, client *containerd.Client, args []string, netMa
// 1, nerdctl run --name demo -it imagename
// 2, ctrl + c to stop demo container
// 3, nerdctl start/restart demo
logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, options.GOptions.Namespace)
logConfig, err := generateLogConfig(dataStore, id, options.LogDriver, options.LogOpt, options.GOptions.Namespace, options.GOptions.Address)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -661,8 +661,9 @@ func writeCIDFile(path, id string) error {
}

// generateLogConfig creates a LogConfig for the current container store
func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, ns string) (logConfig logging.LogConfig, err error) {
func generateLogConfig(dataStore string, id string, logDriver string, logOpt []string, ns, hostAddress string) (logConfig logging.LogConfig, err error) {
var u *url.URL
logConfig.HostAddress = hostAddress
if u, err = url.Parse(logDriver); err == nil && u.Scheme != "" {
logConfig.LogURI = logDriver
} else {
Expand Down
9 changes: 7 additions & 2 deletions pkg/cmd/container/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
"os/signal"
"syscall"

"github.com/containerd/log"

"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/nerdctl/pkg/api/types"
"github.com/containerd/nerdctl/pkg/api/types/cri"
"github.com/containerd/nerdctl/pkg/clientutil"
Expand Down Expand Up @@ -90,7 +91,11 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti
// Setup goroutine to send stop event if container task finishes:
go func() {
<-waitCh
log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer")
// Wait for logger to process remaining logs after container exit
if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil {
log.L.WithError(err).Error("failed to wait for logger shutdown")
}
log.L.Debugf("container task has finished, sending kill signal to log viewer")
stopChannel <- os.Interrupt
}()
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/dnsutil/hostsstore/hostsstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func AllocHostsFile(dataStore, ns, id string) (string, error) {
fn := func() error {
return ensureFile(path)
}
err := lockutil.WithDirLock(lockDir, fn)
err := lockutil.WithLock(lockDir, fn)
return path, err
}

Expand All @@ -86,7 +86,7 @@ func DeallocHostsFile(dataStore, ns, id string) error {
fn := func() error {
return os.RemoveAll(dirToBeRemoved)
}
return lockutil.WithDirLock(lockDir, fn)
return lockutil.WithLock(lockDir, fn)
}

func NewStore(dataStore string) (Store, error) {
Expand Down Expand Up @@ -135,7 +135,7 @@ func (x *store) Acquire(meta Meta) error {
}
return newUpdater(meta.ID, x.hostsD).update()
}
return lockutil.WithDirLock(x.hostsD, fn)
return lockutil.WithLock(x.hostsD, fn)
}

// Release is triggered by Poststop hooks.
Expand All @@ -155,7 +155,7 @@ func (x *store) Release(ns, id string) error {
}
return newUpdater(id, x.hostsD).update()
}
return lockutil.WithDirLock(x.hostsD, fn)
return lockutil.WithLock(x.hostsD, fn)
}

func (x *store) Update(ns, id, newName string) error {
Expand All @@ -179,5 +179,5 @@ func (x *store) Update(ns, id, newName string) error {
}
return newUpdater(meta.ID, x.hostsD).update()
}
return lockutil.WithDirLock(x.hostsD, fn)
return lockutil.WithLock(x.hostsD, fn)
}
8 changes: 4 additions & 4 deletions pkg/lockutil/lockutil_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ import (
"golang.org/x/sys/unix"
)

func WithDirLock(dir string, fn func() error) error {
dirFile, err := os.Open(dir)
func WithLock(name string, fn func() error) error {
dirFile, err := os.Open(name)
if err != nil {
return err
}
defer dirFile.Close()
if err := Flock(dirFile, unix.LOCK_EX); err != nil {
return fmt.Errorf("failed to lock %q: %w", dir, err)
return fmt.Errorf("failed to lock %q: %w", name, err)
}
defer func() {
if err := Flock(dirFile, unix.LOCK_UN); err != nil {
log.L.WithError(err).Errorf("failed to unlock %q", dir)
log.L.WithError(err).Errorf("failed to unlock %q", name)
}
}()
return fn()
Expand Down
8 changes: 4 additions & 4 deletions pkg/lockutil/lockutil_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@ import (
"golang.org/x/sys/windows"
)

func WithDirLock(dir string, fn func() error) error {
dirFile, err := os.OpenFile(dir+".lock", os.O_CREATE, 0644)
func WithLock(name string, fn func() error) error {
dirFile, err := os.OpenFile(name+".lock", os.O_CREATE, 0644)
if err != nil {
return err
}
defer dirFile.Close()
// see https://msdn.microsoft.com/en-us/library/windows/desktop/aa365203(v=vs.85).aspx
// 1 lock immediately
if err = windows.LockFileEx(windows.Handle(dirFile.Fd()), 1, 0, 1, 0, &windows.Overlapped{}); err != nil {
return fmt.Errorf("failed to lock %q: %w", dir, err)
return fmt.Errorf("failed to lock %q: %w", name, err)
}

defer func() {
if err := windows.UnlockFileEx(windows.Handle(dirFile.Fd()), 0, 1, 0, &windows.Overlapped{}); err != nil {
log.L.WithError(err).Errorf("failed to unlock %q", dir)
log.L.WithError(err).Errorf("failed to unlock %q", name)
}
}()
return fn()
Expand Down
110 changes: 101 additions & 9 deletions pkg/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import (
"path/filepath"
"sort"
"sync"
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/runtime/v2/logging"
"github.com/containerd/log"
"github.com/containerd/nerdctl/pkg/lockutil"
)

const (
Expand Down Expand Up @@ -113,9 +116,10 @@ func Main(argv2 string) error {

// LogConfig is marshalled as "log-config.json"
type LogConfig struct {
Driver string `json:"driver"`
Opts map[string]string `json:"opts,omitempty"`
LogURI string `json:"-"`
Driver string `json:"driver"`
Opts map[string]string `json:"opts,omitempty"`
HostAddress string `json:"host"`
LogURI string `json:"-"`
}

// LogConfigFilePath returns the path of log-config.json
Expand All @@ -140,10 +144,73 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) {
return logConfig, nil
}

func loggingProcessAdapter(driver Driver, dataStore string, config *logging.Config) error {
func getLockPath(dataStore, ns, id string) string {
return filepath.Join(dataStore, "containers", ns, id, "logger-lock")
}

// WaitForLogger waits until the logger has finished executing and processing container logs
func WaitForLogger(dataStore, ns, id string) error {
return lockutil.WithLock(getLockPath(dataStore, ns, id), func() error {
return nil
})
}

// getContainerWait loads the container from ID and returns its wait channel
func getContainerWait(ctx context.Context, hostAddress string, config *logging.Config) (<-chan containerd.ExitStatus, error) {
client, err := containerd.New(hostAddress, containerd.WithDefaultNamespace(config.Namespace))
if err != nil {
return nil, err
}
con, err := client.LoadContainer(ctx, config.ID)
if err != nil {
return nil, err
}
task, err := con.Task(ctx, nil)
if err != nil {
return nil, err
}

// If task was not found, it's possible that the container runtime is still being created.
// Retry every 100ms.
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
fahedouch marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-ctx.Done():
return nil, errors.New("timed out waiting for container task to start")
case <-ticker.C:
task, err = con.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
continue
}
return nil, err
}
return task.Wait(ctx)
}
}
}

func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, hostAddress string, config *logging.Config) error {
if err := driver.PreProcess(dataStore, config); err != nil {
return err
}

// initialize goroutines to copy stdout and stderr streams to a closable pipe
stdoutR, stdoutW := io.Pipe()
stderrR, stderrW := io.Pipe()
copyStream := func(reader io.Reader, writer *io.PipeWriter) {
// copy using a buffer of size 32K
buf := make([]byte, 32<<10)
_, err := io.CopyBuffer(writer, reader, buf)
if err != nil {
log.L.Errorf("failed to copy stream: %s", err)
}
}
go copyStream(config.Stdout, stdoutW)
go copyStream(config.Stderr, stderrW)

// scan and process logs from pipes
var wg sync.WaitGroup
wg.Add(3)
stdout := make(chan string, 10000)
Expand All @@ -161,12 +228,24 @@ func loggingProcessAdapter(driver Driver, dataStore string, config *logging.Conf
}
}

go processLogFunc(config.Stdout, stdout)
go processLogFunc(config.Stderr, stderr)
go processLogFunc(stdoutR, stdout)
go processLogFunc(stderrR, stderr)
go func() {
defer wg.Done()
driver.Process(stdout, stderr)
}()
go func() {
// close stdout and stderr upon container exit
defer stdoutW.Close()
defer stderrW.Close()

exitCh, err := getContainerWait(ctx, hostAddress, config)
if err != nil {
log.L.Errorf("failed to get container task wait channel: %v", err)
return
}
<-exitCh
}()
wg.Wait()
return driver.PostProcess()
}
Expand All @@ -175,7 +254,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) {
if dataStore == "" {
return nil, errors.New("got empty data store")
}
return func(_ context.Context, config *logging.Config, ready func() error) error {
return func(ctx context.Context, config *logging.Config, ready func() error) error {
if config.Namespace == "" || config.ID == "" {
return errors.New("got invalid config")
}
Expand All @@ -189,11 +268,24 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) {
if err != nil {
return err
}
if err := ready(); err != nil {

lockFile := getLockPath(dataStore, config.Namespace, config.ID)
f, err := os.Create(lockFile)
if err != nil {
return err
}
defer f.Close()

// the logger will obtain an exclusive lock on a file until the container is
// stopped and the driver has finished processing all output,
// so that waiting log viewers can be signalled when the process is complete.
return lockutil.WithLock(lockFile, func() error {
if err := ready(); err != nil {
return err
}

return loggingProcessAdapter(driver, dataStore, config)
return loggingProcessAdapter(ctx, driver, dataStore, logConfig.HostAddress, config)
})
} else if !errors.Is(err, os.ErrNotExist) {
// the file does not exist if the container was created with nerdctl < 0.20
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/mountutil/volumestore/volumestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (vs *volumeStore) Create(name string, labels []string) (*native.Volume, err
return os.WriteFile(volFilePath, labelsJSON, 0644)
}

if err := lockutil.WithDirLock(vs.dir, fn); err != nil {
if err := lockutil.WithLock(vs.dir, fn); err != nil {
return nil, err
}

Expand Down Expand Up @@ -188,7 +188,7 @@ func (vs *volumeStore) Remove(names []string) ([]string, error) {
}
return nil
}
err := lockutil.WithDirLock(vs.dir, fn)
err := lockutil.WithLock(vs.dir, fn)
return removed, err
}

Expand Down
Loading
Loading