Skip to content

Commit

Permalink
Cleanup loki.process on update
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Aug 22, 2024
1 parent b9c3594 commit d32659c
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ internal API changes are not present.
Main (unreleased)
-----------------

### Bugfixes

- Fix a memory leak which would occur any time `loki.process` had its configuration reloaded. (@ptodev)

### Other changes

- Change the Docker base image for Linux containers to `ubuntu:noble`. (@amontalban)
Expand Down
6 changes: 3 additions & 3 deletions internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (c *Component) Run(ctx context.Context) error {
if c.entryHandler != nil {
c.entryHandler.Stop()
}
close(c.processIn)
c.mut.RUnlock()
}()
wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -127,8 +126,9 @@ func (c *Component) Update(args component.Arguments) error {
if err != nil {
return err
}
c.entryHandler = loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() })
c.processIn = pipeline.Wrap(c.entryHandler).Chan()
entryHandler := loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() })
c.entryHandler = pipeline.Wrap(entryHandler)
c.processIn = c.entryHandler.Chan()
c.stages = newArgs.Stages
}

Expand Down
55 changes: 54 additions & 1 deletion internal/component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,60 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) {

// Run everything for a while
time.Sleep(1 * time.Second)
require.WithinDuration(t, time.Now(), lastSend.Load().(time.Time), 300*time.Millisecond)
// Make sure there are no goroutine leaks when the config is updated.
// Goroutine leaks often cause memory leaks.
func TestLeakyUpdate(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

tester := newTester(t)
defer tester.stop()

forwardArgs := `
// This will be filled later
forward_to = []`

numLogsToSend := 1

cfg1 := `
stage.metrics {
metric.counter {
name = "paulin_test1"
action = "inc"
match_all = true
}
}` + forwardArgs

cfg2 := `
stage.metrics {
metric.counter {
name = "paulin_test2"
action = "inc"
match_all = true
}
}` + forwardArgs

metricsTemplate1 := `
# HELP loki_process_custom_paulin_test1
# TYPE loki_process_custom_paulin_test1 counter
loki_process_custom_paulin_test1{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
`

metricsTemplate2 := `
# HELP loki_process_custom_paulin_test2
# TYPE loki_process_custom_paulin_test2 counter
loki_process_custom_paulin_test2{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d
`

metrics1 := fmt.Sprintf(metricsTemplate1, numLogsToSend)
metrics2 := fmt.Sprintf(metricsTemplate2, numLogsToSend)

tester.updateAndTest(numLogsToSend, cfg1, "", metrics1)
tester.updateAndTest(numLogsToSend, cfg2, "", metrics2)

for i := 0; i < 100; i++ {
tester.updateAndTest(numLogsToSend, cfg1, "", metrics1)
tester.updateAndTest(numLogsToSend, cfg2, "", metrics2)
}
}

func TestMetricsStageRefresh(t *testing.T) {
Expand Down

0 comments on commit d32659c

Please sign in to comment.