diff --git a/CHANGELOG.md b/CHANGELOG.md index fabedfa99040..4533ecedec3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -95,6 +95,8 @@ Main (unreleased) - Grafana Agent Operator: `config-reloader` container no longer runs as root. (@rootmout) +- Added support for replaying not sent data for `loki.write` when WAL is enabled. (@thepalbi) + ### Bugfixes - Fixed an issue where `loki.process` validation for stage `metric.counter` was @@ -129,6 +131,8 @@ Main (unreleased) - Fixed a bug where UDP syslog messages were never processed (@joshuapare) +- Updating configuration for `loki.write` no longer drops data. (@thepalbi) + v0.37.4 (2023-11-06) ----------------- diff --git a/component/common/loki/client/client_writeto.go b/component/common/loki/client/client_writeto.go deleted file mode 100644 index 355b060286ef..000000000000 --- a/component/common/loki/client/client_writeto.go +++ /dev/null @@ -1,78 +0,0 @@ -package client - -import ( - "fmt" - "sync" - - "github.com/go-kit/log" - "github.com/grafana/agent/pkg/flow/logging/level" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/record" - - "github.com/grafana/agent/component/common/loki" - "github.com/grafana/loki/pkg/ingester/wal" - "github.com/grafana/loki/pkg/util" -) - -// clientWriteTo implements a wal.WriteTo that re-builds entries with the stored series, and the received entries. After, -// sends each to the provided Client channel. -type clientWriteTo struct { - series map[chunks.HeadSeriesRef]model.LabelSet - seriesSegment map[chunks.HeadSeriesRef]int - seriesLock sync.RWMutex - - logger log.Logger - toClient chan<- loki.Entry -} - -// newClientWriteTo creates a new clientWriteTo -func newClientWriteTo(toClient chan<- loki.Entry, logger log.Logger) *clientWriteTo { - return &clientWriteTo{ - series: make(map[chunks.HeadSeriesRef]model.LabelSet), - seriesSegment: make(map[chunks.HeadSeriesRef]int), - toClient: toClient, - logger: logger, - } -} - -func (c *clientWriteTo) StoreSeries(series []record.RefSeries, segment int) { - c.seriesLock.Lock() - defer c.seriesLock.Unlock() - for _, seriesRec := range series { - c.seriesSegment[seriesRec.Ref] = segment - labels := util.MapToModelLabelSet(seriesRec.Labels.Map()) - c.series[seriesRec.Ref] = labels - } -} - -// SeriesReset will delete all cache entries that were last seen in segments numbered equal or lower than segmentNum -func (c *clientWriteTo) SeriesReset(segmentNum int) { - c.seriesLock.Lock() - defer c.seriesLock.Unlock() - for k, v := range c.seriesSegment { - if v <= segmentNum { - level.Debug(c.logger).Log("msg", fmt.Sprintf("reclaiming series under segment %d", segmentNum)) - delete(c.seriesSegment, k) - delete(c.series, k) - } - } -} - -func (c *clientWriteTo) AppendEntries(entries wal.RefEntries, _ int) error { - var entry loki.Entry - c.seriesLock.RLock() - l, ok := c.series[entries.Ref] - c.seriesLock.RUnlock() - if ok { - entry.Labels = l - for _, e := range entries.Entries { - entry.Entry = e - c.toClient <- entry - } - } else { - // TODO(thepalbi): Add metric here - level.Debug(c.logger).Log("msg", "series for entry not found") - } - return nil -} diff --git a/component/common/loki/client/client_writeto_test.go b/component/common/loki/client/client_writeto_test.go deleted file mode 100644 index 1ad7929b58ad..000000000000 --- a/component/common/loki/client/client_writeto_test.go +++ /dev/null @@ -1,247 +0,0 @@ -package client - -import ( - "fmt" - "math/rand" - "os" - "sync" - "testing" - "time" - - "go.uber.org/atomic" - - "github.com/go-kit/log" - "github.com/grafana/agent/pkg/flow/logging/level" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/record" - "github.com/stretchr/testify/require" - - "github.com/grafana/agent/component/common/loki" - "github.com/grafana/agent/component/common/loki/utils" - "github.com/grafana/loki/pkg/ingester/wal" - "github.com/grafana/loki/pkg/logproto" -) - -func TestClientWriter_LogEntriesAreReconstructedAndForwardedCorrectly(t *testing.T) { - logger := level.NewFilter(log.NewLogfmtLogger(os.Stdout), level.AllowDebug()) - ch := make(chan loki.Entry) - defer close(ch) - - var receivedEntries = utils.NewSyncSlice[loki.Entry]() - - go func() { - for e := range ch { - receivedEntries.Append(e) - } - }() - - var lines = []string{ - "some entry", - "some other entry", - "this is a song", - "about entries", - "I'm in a starbucks", - } - - writeTo := newClientWriteTo(ch, logger) - testAppLabelsRef := chunks.HeadSeriesRef(1) - writeTo.StoreSeries([]record.RefSeries{ - { - Ref: testAppLabelsRef, - Labels: []labels.Label{ - { - Name: "app", - Value: "test", - }, - }, - }, - }, 1) - - for _, line := range lines { - _ = writeTo.AppendEntries(wal.RefEntries{ - Ref: testAppLabelsRef, - Entries: []logproto.Entry{ - { - Timestamp: time.Now(), - Line: line, - }, - }, - }, 0) - } - - require.Eventually(t, func() bool { - return receivedEntries.Length() == len(lines) - }, time.Second*10, time.Second) - defer receivedEntries.DoneIterate() - for _, receivedEntry := range receivedEntries.StartIterate() { - require.Contains(t, lines, receivedEntry.Line, "entry line was not expected") - require.Equal(t, model.LabelValue("test"), receivedEntry.Labels["app"]) - } -} - -func TestClientWriter_LogEntriesWithoutMatchingSeriesAreIgnored(t *testing.T) { - logger := level.NewFilter(log.NewLogfmtLogger(os.Stdout), level.AllowDebug()) - ch := make(chan loki.Entry) - defer close(ch) - - var receivedEntries = utils.NewSyncSlice[loki.Entry]() - - go func() { - for e := range ch { - receivedEntries.Append(e) - } - }() - - var lines = []string{ - "some entry", - "some other entry", - "this is a song", - "about entries", - "I'm in a starbucks", - } - - writeTo := newClientWriteTo(ch, logger) - testAppLabelsRef := chunks.HeadSeriesRef(1) - writeTo.StoreSeries([]record.RefSeries{ - { - Ref: testAppLabelsRef, - Labels: []labels.Label{ - { - Name: "app", - Value: "test", - }, - }, - }, - }, 1) - - for _, line := range lines { - _ = writeTo.AppendEntries(wal.RefEntries{ - Ref: chunks.HeadSeriesRef(61324), - Entries: []logproto.Entry{ - { - Timestamp: time.Now(), - Line: line, - }, - }, - }, 0) - } - - time.Sleep(time.Second * 2) - require.Equal(t, 0, receivedEntries.Length(), "no entry should have arrived") -} - -func BenchmarkClientWriteTo(b *testing.B) { - type testCase struct { - numWriters int - totalLines int - } - for _, tc := range []testCase{ - {numWriters: 5, totalLines: 1_000}, - {numWriters: 10, totalLines: 1_000}, - {numWriters: 5, totalLines: 10_000}, - {numWriters: 10, totalLines: 10_000}, - {numWriters: 5, totalLines: 100_000}, - {numWriters: 10, totalLines: 100_000}, - {numWriters: 10, totalLines: 1_000_000}, - } { - b.Run(fmt.Sprintf("num writers %d, with %d lines written in total", tc.numWriters, tc.totalLines), - func(b *testing.B) { - require.True(b, tc.totalLines%tc.numWriters == 0, "total lines must be divisible by num of writers") - for n := 0; n < b.N; n++ { - bench(tc.numWriters, tc.totalLines, b) - } - }) - } -} - -// bench is a single execution of the ClientWriteTo benchmark. During the execution of the benchmark, numWriters routines -// will run, writing totalLines in total. After the writers have finished, the execution will block until the number of -// expected written entries is reached, or it times out. -func bench(numWriters, totalLines int, b *testing.B) { - logger := level.NewFilter(log.NewLogfmtLogger(os.Stdout), level.AllowDebug()) - readerWG := sync.WaitGroup{} - ch := make(chan loki.Entry) - - b.Log("starting reader routine") - readerWG.Add(1) - var totalReceived atomic.Int64 - go func() { - defer readerWG.Done() - // discard received entries - for range ch { - totalReceived.Inc() - } - }() - - writeTo := newClientWriteTo(ch, logger) - - // spin up the numWriters routines - writersWG := sync.WaitGroup{} - for n := 0; n < numWriters; n++ { - writersWG.Add(1) - go func(n int) { - defer writersWG.Done() - b.Logf("starting writer routine %d", n) - // run as writing from segment n+w, and after finishing reset series up to n. That way we are only causing blocking, - // and not deleting actually used series - startWriter(numWriters+n, n, writeTo, totalLines/numWriters, record.RefSeries{ - Ref: chunks.HeadSeriesRef(n), - Labels: labels.Labels{ - {Name: "n", Value: fmt.Sprint(n)}, - }, - }, time.Millisecond*500) - }(n) - } - - b.Log("waiting for writers to finish") - writersWG.Wait() - - b.Log("waiting for reader to finish") - require.Eventually(b, func() bool { - var read = totalReceived.Load() - b.Logf("checking, value read: %d", read) - return read == int64(totalLines) - }, time.Second*10, time.Second) - close(ch) - readerWG.Wait() - require.Equal(b, int64(totalLines), totalReceived.Load(), "some lines where not read") -} - -// startWriter orchestrates each writer routine that bench launches. Each routine will execute the following actions: -// 1. Add some jitter to the whole routine execution by waiting between 0 and maxInitialSleep. -// 2. Store the series that all written entries will use. -// 3. Write the lines entries, adding a small jitter between 0 and 1 ms after each write operation. -// 4. After all are written, call a SeriesReset. This will block the entire series map and will hopefully block -// some other writing routine. -func startWriter(segmentNum, seriesToReset int, target *clientWriteTo, lines int, series record.RefSeries, maxInitialSleep time.Duration) { - randomSleepMax := func(max time.Duration) { - // random sleep to add some jitter - s := int64(rand.Uint64()) % int64(max) - time.Sleep(time.Duration(s)) - } - // random sleep to add some jitter - randomSleepMax(maxInitialSleep) - - // store series - target.StoreSeries([]record.RefSeries{series}, segmentNum) - - // write lines with that series - for i := 0; i < lines; i++ { - _ = target.AppendEntries(wal.RefEntries{ - Ref: series.Ref, - Entries: []logproto.Entry{ - { - Timestamp: time.Now(), - Line: fmt.Sprintf("%d - %d - hellooo", segmentNum, i), - }, - }, - }, 0) - // add some jitter between writes - randomSleepMax(time.Millisecond * 1) - } - - // reset segment - target.SeriesReset(seriesToReset) -} diff --git a/component/common/loki/client/internal/marker_encoding.go b/component/common/loki/client/internal/marker_encoding.go index 48415f421c7f..c3ed7dd5778f 100644 --- a/component/common/loki/client/internal/marker_encoding.go +++ b/component/common/loki/client/internal/marker_encoding.go @@ -15,7 +15,7 @@ var ( func EncodeMarkerV1(segment uint64) ([]byte, error) { // marker format v1 // marker [ 0 , 1 ] - HEADER, which is used to track version - // marker [ 2 , 9 ] - encoded unit 64 which is the content of the marker, the last "consumed" segment + // marker [ 2 , 9 ] - encoded uint64 which is the content of the marker, the last "consumed" segment // marker [ 10, 13 ] - CRC32 of the first 10 bytes of the marker, using IEEE polynomial bs := make([]byte, 14) // write header with marker format version diff --git a/component/common/loki/wal/watcher_test.go b/component/common/loki/wal/watcher_test.go index a24b7ff63049..97de19748dee 100644 --- a/component/common/loki/wal/watcher_test.go +++ b/component/common/loki/wal/watcher_test.go @@ -64,12 +64,9 @@ func (t *testWriteTo) AssertContainsLines(tst *testing.T, lines ...string) { } t.ReadEntries.DoneIterate() - allSeen := true - for _, wasSeen := range seen { - allSeen = allSeen && wasSeen + for line, wasSeen := range seen { + require.True(tst, wasSeen, "expected to have received line: %s", line) } - - require.True(tst, allSeen, "expected all entries to have been received") } // watcherTestResources contains all resources necessary to test an individual Watcher functionality diff --git a/component/loki/write/write.go b/component/loki/write/write.go index 45d4aa063ec3..a9d9c2730936 100644 --- a/component/loki/write/write.go +++ b/component/loki/write/write.go @@ -119,11 +119,14 @@ func (c *Component) Run(ctx context.Context) error { case <-ctx.Done(): return nil case entry := <-c.receiver.Chan(): + c.mut.RLock() select { case <-ctx.Done(): + c.mut.RUnlock() return nil case c.sink.Chan() <- entry: } + c.mut.RUnlock() } } } diff --git a/component/loki/write/write_test.go b/component/loki/write/write_test.go index 0e7c1a3165cb..642b53703e0c 100644 --- a/component/loki/write/write_test.go +++ b/component/loki/write/write_test.go @@ -10,18 +10,19 @@ import ( "testing" "time" - "github.com/prometheus/common/model" - "github.com/stretchr/testify/require" - "github.com/grafana/agent/component/common/loki" "github.com/grafana/agent/component/common/loki/wal" "github.com/grafana/agent/component/discovery" lsf "github.com/grafana/agent/component/loki/source/file" "github.com/grafana/agent/pkg/flow/componenttest" "github.com/grafana/agent/pkg/util" + "github.com/grafana/river" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "github.com/grafana/loki/pkg/logproto" loki_util "github.com/grafana/loki/pkg/util" - "github.com/grafana/river" ) func TestRiverConfig(t *testing.T) { @@ -304,3 +305,98 @@ func testMultipleEndpoint(t *testing.T, alterArgs func(arguments *Arguments)) { } } } + +type testCase struct { + linesCount int + seriesCount int +} + +func BenchmarkLokiWrite(b *testing.B) { + for name, tc := range map[string]testCase{ + "100 lines, single series": { + linesCount: 100, + seriesCount: 1, + }, + "100k lines, 100 series": { + linesCount: 100_000, + seriesCount: 100, + }, + } { + b.Run(name, func(b *testing.B) { + benchSingleEndpoint(b, tc, func(arguments *Arguments) {}) + }) + } +} + +func benchSingleEndpoint(b *testing.B, tc testCase, alterConfig func(arguments *Arguments)) { + // Set up the server that will receive the log entry, and expose it on ch. + var seenLines atomic.Int64 + ch := make(chan logproto.PushRequest) + + // just count seenLines for each entry received + go func() { + for pr := range ch { + count := 0 + for _, str := range pr.Streams { + count += len(str.Entries) + } + seenLines.Add(int64(count)) + } + }() + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var pushReq logproto.PushRequest + err := loki_util.ParseProtoReader(context.Background(), r.Body, int(r.ContentLength), math.MaxInt32, &pushReq, loki_util.RawSnappy) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + tenantHeader := r.Header.Get("X-Scope-OrgID") + require.Equal(b, tenantHeader, "tenant-1") + + ch <- pushReq + })) + defer srv.Close() + + // Set up the component Arguments. + cfg := fmt.Sprintf(` + endpoint { + url = "%s" + batch_wait = "10ms" + tenant_id = "tenant-1" + } + `, srv.URL) + var args Arguments + require.NoError(b, river.Unmarshal([]byte(cfg), &args)) + + alterConfig(&args) + + // Set up and start the component. + testComp, err := componenttest.NewControllerFromID(util.TestLogger(b), "loki.write") + require.NoError(b, err) + go func() { + err = testComp.Run(componenttest.TestContext(b), args) + require.NoError(b, err) + }() + require.NoError(b, testComp.WaitExports(time.Second)) + + // get exports from component + exports := testComp.Exports().(Exports) + + for i := 0; i < b.N; i++ { + for j := 0; j < tc.linesCount; j++ { + logEntry := loki.Entry{ + Labels: model.LabelSet{"foo": model.LabelValue(fmt.Sprintf("bar-%d", i%tc.seriesCount))}, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: "very important log", + }, + } + exports.Receiver.Chan() <- logEntry + } + + require.Eventually(b, func() bool { + return int64(tc.linesCount) == seenLines.Load() + }, time.Minute, time.Second, "haven't seen expected number of lines") + } +} diff --git a/pkg/flow/componenttest/context.go b/pkg/flow/componenttest/context.go index b0ede0221b39..462787177324 100644 --- a/pkg/flow/componenttest/context.go +++ b/pkg/flow/componenttest/context.go @@ -6,7 +6,7 @@ import ( ) // TestContext returns a context which cancels itself when t finishes. -func TestContext(t *testing.T) context.Context { +func TestContext(t testing.TB) context.Context { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) return ctx diff --git a/pkg/util/test_logger.go b/pkg/util/test_logger.go index 150c5f14f9a6..739d178cba49 100644 --- a/pkg/util/test_logger.go +++ b/pkg/util/test_logger.go @@ -11,7 +11,7 @@ import ( ) // TestLogger generates a logger for a test. -func TestLogger(t *testing.T) log.Logger { +func TestLogger(t testing.TB) log.Logger { t.Helper() l := log.NewSyncLogger(log.NewLogfmtLogger(os.Stderr))