From 93559b421cf45a9453b0a6fcf8bb76b111efe0b3 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Mon, 2 Sep 2024 13:27:20 +0200 Subject: [PATCH] backport container restart support in loki source (#6897) --- CHANGELOG.md | 2 + .../component/loki/source/docker/docker.go | 7 +- .../loki/source/docker/docker_test.go | 105 ++++++++++++++++++ .../docker/internal/dockertarget/target.go | 2 - .../component/loki/source/docker/runner.go | 41 ++++--- 5 files changed, 134 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ad8f9b53ffed..014846b92f45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ Main (unreleased) - Fix an issue where nested import.git config blocks could conflict if they had the same labels. (@wildum) +- Fix an issue where `loki.source.docker` stops collecting logs after a container restart. (@wildum) + ### Other changes - Change the Docker base image for Linux containers to `ubuntu:noble`. (@amontalban) diff --git a/internal/component/loki/source/docker/docker.go b/internal/component/loki/source/docker/docker.go index d1b1430db7fd..7d22777b4018 100644 --- a/internal/component/loki/source/docker/docker.go +++ b/internal/component/loki/source/docker/docker.go @@ -305,9 +305,10 @@ func (c *Component) getManagerOptions(args Arguments) (*options, error) { } return &options{ - client: client, - handler: loki.NewEntryHandler(c.handler.Chan(), func() {}), - positions: c.posFile, + client: client, + handler: loki.NewEntryHandler(c.handler.Chan(), func() {}), + positions: c.posFile, + targetRestartInterval: 5 * time.Second, }, nil } diff --git a/internal/component/loki/source/docker/docker_test.go b/internal/component/loki/source/docker/docker_test.go index 722a72077246..153a8c354e50 100644 --- a/internal/component/loki/source/docker/docker_test.go +++ b/internal/component/loki/source/docker/docker_test.go @@ -4,17 +4,32 @@ package docker import ( "context" + "io" + "os" + "strings" "testing" "time" + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/client" + "github.com/go-kit/log" "github.com/grafana/agent/internal/component" + "github.com/grafana/agent/internal/component/common/loki/client/fake" + "github.com/grafana/agent/internal/component/common/loki/positions" + dt "github.com/grafana/agent/internal/component/loki/source/docker/internal/dockertarget" "github.com/grafana/agent/internal/flow/componenttest" "github.com/grafana/agent/internal/util" "github.com/grafana/river" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +const targetRestartInterval = 20 * time.Millisecond + func Test(t *testing.T) { // Use host that works on all platforms (including Windows). var cfg = ` @@ -73,3 +88,93 @@ func TestDuplicateTargets(t *testing.T) { require.Len(t, cmp.manager.tasks, 1) } + +func TestRestart(t *testing.T) { + runningState := true + client := clientMock{ + logLine: "2024-05-02T13:11:55.879889Z caller=module_service.go:114 msg=\"module stopped\" module=distributor", + running: func() bool { return runningState }, + } + expectedLogLine := "caller=module_service.go:114 msg=\"module stopped\" module=distributor" + + tailer, entryHandler := setupTailer(t, client) + go tailer.Run(context.Background()) + + // The container is already running, expect log lines. + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logLines := entryHandler.Received() + if assert.NotEmpty(c, logLines) { + assert.Equal(c, expectedLogLine, logLines[0].Line) + } + }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit.") + + // Stops the container. + runningState = false + time.Sleep(targetRestartInterval + 10*time.Millisecond) // Sleep for a duration greater than targetRestartInterval to make sure it stops sending log lines. + entryHandler.Clear() + time.Sleep(targetRestartInterval + 10*time.Millisecond) + assert.Empty(t, entryHandler.Received()) // No log lines because the container was not running. + + // Restart the container and expect log lines. + runningState = true + assert.EventuallyWithT(t, func(c *assert.CollectT) { + logLines := entryHandler.Received() + if assert.NotEmpty(c, logLines) { + assert.Equal(c, expectedLogLine, logLines[0].Line) + } + }, time.Second, 20*time.Millisecond, "Expected log lines were not found within the time limit after restart.") +} + +func setupTailer(t *testing.T, client clientMock) (tailer *tailer, entryHandler *fake.Client) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + entryHandler = fake.NewClient(func() {}) + + ps, err := positions.New(logger, positions.Config{ + SyncPeriod: 10 * time.Second, + PositionsFile: t.TempDir() + "/positions.yml", + }) + require.NoError(t, err) + + tgt, err := dt.NewTarget( + dt.NewMetrics(prometheus.NewRegistry()), + logger, + entryHandler, + ps, + "flog", + model.LabelSet{"job": "docker"}, + []*relabel.Config{}, + client, + ) + require.NoError(t, err) + tailerTask := &tailerTask{ + options: &options{ + client: client, + targetRestartInterval: targetRestartInterval, + }, + target: tgt, + } + return newTailer(logger, tailerTask), entryHandler +} + +type clientMock struct { + client.APIClient + logLine string + running func() bool +} + +func (mock clientMock) ContainerInspect(ctx context.Context, c string) (types.ContainerJSON, error) { + return types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + ID: c, + State: &types.ContainerState{ + Running: mock.running(), + }, + }, + Config: &container.Config{Tty: true}, + }, nil +} + +func (mock clientMock) ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader(mock.logLine)), nil +} diff --git a/internal/component/loki/source/docker/internal/dockertarget/target.go b/internal/component/loki/source/docker/internal/dockertarget/target.go index 5e118fab98c3..1005f712971a 100644 --- a/internal/component/loki/source/docker/internal/dockertarget/target.go +++ b/internal/component/loki/source/docker/internal/dockertarget/target.go @@ -230,8 +230,6 @@ func (t *Target) StartIfNotRunning() { ctx, cancel := context.WithCancel(context.Background()) t.cancel = cancel go t.processLoop(ctx) - } else { - level.Debug(t.logger).Log("msg", "attempted to start process loop but it's already running", "container", t.containerName) } } diff --git a/internal/component/loki/source/docker/runner.go b/internal/component/loki/source/docker/runner.go index 61fb9d640762..9f77f6f0c5a6 100644 --- a/internal/component/loki/source/docker/runner.go +++ b/internal/component/loki/source/docker/runner.go @@ -5,8 +5,8 @@ package docker import ( "context" "sync" + "time" - "github.com/docker/docker/api/types/container" "github.com/docker/docker/client" "github.com/go-kit/log" "github.com/grafana/agent/internal/component/common/loki" @@ -52,6 +52,9 @@ type options struct { // positions interface so tailers can save/restore offsets in log files. positions positions.Positions + + // targetRestartInterval to restart task that has stopped running. + targetRestartInterval time.Duration } // tailerTask is the payload used to create tailers. It implements runner.Task. @@ -95,23 +98,25 @@ func newTailer(l log.Logger, task *tailerTask) *tailer { } func (t *tailer) Run(ctx context.Context) { - ch, chErr := t.opts.client.ContainerWait(ctx, t.target.Name(), container.WaitConditionNextExit) - - t.target.StartIfNotRunning() - - select { - case err := <-chErr: - // Error setting up the Wait request from the client; either failed to - // read from /containers/{containerID}/wait, or couldn't parse the - // response. Stop the target and exit the task after logging; if it was - // a transient error, the target will be retried on the next discovery - // refresh. - level.Error(t.log).Log("msg", "could not set up a wait request to the Docker client", "error", err) - t.target.Stop() - return - case <-ch: - t.target.Stop() - return + ticker := time.NewTicker(t.opts.targetRestartInterval) + tickerC := ticker.C + + for { + select { + case <-tickerC: + res, err := t.opts.client.ContainerInspect(ctx, t.target.Name()) + if err != nil { + level.Error(t.log).Log("msg", "error inspecting Docker container", "id", t.target.Name(), "error", err) + continue + } + if res.State.Running { + t.target.StartIfNotRunning() + } + case <-ctx.Done(): + t.target.Stop() + ticker.Stop() + return + } } }