Skip to content

Commit

Permalink
Add locking to loki.write sink + nits (#5694)
Browse files Browse the repository at this point in the history
* Address PR comments

* added bench

* add locking to loki.write sink

* adding changelog

* imports order

* fix changelog

* remove unused client write to

* added missing entry

* typos

* add missing changelog entry

* remove duplicate entry
  • Loading branch information
thepalbi authored Nov 7, 2023
1 parent 49f1bd1 commit fd7e8de
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 337 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ Main (unreleased)
- Grafana Agent Operator: `config-reloader` container no longer runs as root.
(@rootmout)

- Added support for replaying not sent data for `loki.write` when WAL is enabled. (@thepalbi)

### Bugfixes

- Fixed an issue where `loki.process` validation for stage `metric.counter` was
Expand Down Expand Up @@ -129,6 +131,8 @@ Main (unreleased)

- Fixed a bug where UDP syslog messages were never processed (@joshuapare)

- Updating configuration for `loki.write` no longer drops data. (@thepalbi)

v0.37.4 (2023-11-06)
-----------------

Expand Down
78 changes: 0 additions & 78 deletions component/common/loki/client/client_writeto.go

This file was deleted.

247 changes: 0 additions & 247 deletions component/common/loki/client/client_writeto_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion component/common/loki/client/internal/marker_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var (
func EncodeMarkerV1(segment uint64) ([]byte, error) {
// marker format v1
// marker [ 0 , 1 ] - HEADER, which is used to track version
// marker [ 2 , 9 ] - encoded unit 64 which is the content of the marker, the last "consumed" segment
// marker [ 2 , 9 ] - encoded uint64 which is the content of the marker, the last "consumed" segment
// marker [ 10, 13 ] - CRC32 of the first 10 bytes of the marker, using IEEE polynomial
bs := make([]byte, 14)
// write header with marker format version
Expand Down
7 changes: 2 additions & 5 deletions component/common/loki/wal/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,9 @@ func (t *testWriteTo) AssertContainsLines(tst *testing.T, lines ...string) {
}
t.ReadEntries.DoneIterate()

allSeen := true
for _, wasSeen := range seen {
allSeen = allSeen && wasSeen
for line, wasSeen := range seen {
require.True(tst, wasSeen, "expected to have received line: %s", line)
}

require.True(tst, allSeen, "expected all entries to have been received")
}

// watcherTestResources contains all resources necessary to test an individual Watcher functionality
Expand Down
3 changes: 3 additions & 0 deletions component/loki/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,14 @@ func (c *Component) Run(ctx context.Context) error {
case <-ctx.Done():
return nil
case entry := <-c.receiver.Chan():
c.mut.RLock()
select {
case <-ctx.Done():
c.mut.RUnlock()
return nil
case c.sink.Chan() <- entry:
}
c.mut.RUnlock()
}
}
}
Expand Down
Loading

0 comments on commit fd7e8de

Please sign in to comment.