From 9534e4aecf3b1ab1f4e58a246ccdeacbb60d0284 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 21 Aug 2024 18:58:42 +0100 Subject: [PATCH] Stop handleOut only after the pipeline shut down --- internal/component/loki/process/process.go | 17 +++++++--- .../component/loki/process/process_test.go | 31 +++++++++++-------- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/internal/component/loki/process/process.go b/internal/component/loki/process/process.go index ecc863762fc4..e7429f618e57 100644 --- a/internal/component/loki/process/process.go +++ b/internal/component/loki/process/process.go @@ -85,17 +85,24 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { + shutdownCh := make(chan struct{}) + wgOut := &sync.WaitGroup{} defer func() { c.mut.RLock() if c.entryHandler != nil { c.entryHandler.Stop() + // Stop handleOut only after the entryHandler has stopped. + // If handleOut stops first, entryHandler might get stuck on a channel send. + close(shutdownCh) + wgOut.Wait() } c.mut.RUnlock() }() wg := &sync.WaitGroup{} - wg.Add(2) + wg.Add(1) go c.handleIn(ctx, wg) - go c.handleOut(ctx, wg) + wgOut.Add(1) + go c.handleOut(shutdownCh, wgOut) wg.Wait() return nil @@ -158,11 +165,11 @@ func (c *Component) handleIn(ctx context.Context, wg *sync.WaitGroup) { } } -func (c *Component) handleOut(ctx context.Context, wg *sync.WaitGroup) { +func (c *Component) handleOut(shutdownCh chan struct{}, wg *sync.WaitGroup) { defer wg.Done() for { select { - case <-ctx.Done(): + case <-shutdownCh: return case entry := <-c.processOut: c.fanoutMut.RLock() @@ -170,7 +177,7 @@ func (c *Component) handleOut(ctx context.Context, wg *sync.WaitGroup) { c.fanoutMut.RUnlock() for _, f := range fanout { select { - case <-ctx.Done(): + case <-shutdownCh: return case f.Chan() <- entry: // no-op diff --git a/internal/component/loki/process/process_test.go b/internal/component/loki/process/process_test.go index da9998b49d9b..c2099456950b 100644 --- a/internal/component/loki/process/process_test.go +++ b/internal/component/loki/process/process_test.go @@ -409,10 +409,11 @@ type testFrequentUpdate struct { receiver2 loki.LogsReceiver keepSending atomic.Bool - keepReceiving atomic.Bool + stopReceiving chan struct{} keepUpdating atomic.Bool - wgLogSend sync.WaitGroup + wgSend sync.WaitGroup + wgReceive sync.WaitGroup wgRun sync.WaitGroup wgUpdate sync.WaitGroup @@ -423,15 +424,15 @@ type testFrequentUpdate struct { func startTestFrequentUpdate(t *testing.T, cfg string) *testFrequentUpdate { res := testFrequentUpdate{ - t: t, - receiver1: loki.NewLogsReceiver(), - receiver2: loki.NewLogsReceiver(), + t: t, + receiver1: loki.NewLogsReceiver(), + receiver2: loki.NewLogsReceiver(), + stopReceiving: make(chan struct{}), } ctx, cancel := context.WithCancel(context.Background()) res.keepSending.Store(true) - res.keepReceiving.Store(true) res.keepUpdating.Store(true) res.stop = func() { @@ -439,11 +440,14 @@ func startTestFrequentUpdate(t *testing.T, cfg string) *testFrequentUpdate { res.wgUpdate.Wait() res.keepSending.Store(false) - res.wgLogSend.Wait() + res.wgSend.Wait() cancel() res.wgRun.Wait() + close(res.stopReceiving) + res.wgReceive.Wait() + close(res.receiver1.Chan()) close(res.receiver2.Chan()) } @@ -478,23 +482,25 @@ func (r *testFrequentUpdate) drainLogs() { r.lastSend.Store(time.Now()) } - r.wgLogSend.Add(1) + r.wgReceive.Add(1) go func() { - for r.keepReceiving.Load() { + for { select { + case <-r.stopReceiving: + r.wgReceive.Done() + return case <-r.receiver1.Chan(): drainLogs() case <-r.receiver2.Chan(): drainLogs() } } - r.wgLogSend.Done() }() } // Continuously send entries to both channels func (r *testFrequentUpdate) sendLogs() { - r.wgLogSend.Add(1) + r.wgSend.Add(1) go func() { for r.keepSending.Load() { ts := time.Now() @@ -511,8 +517,7 @@ func (r *testFrequentUpdate) sendLogs() { // continue } } - r.keepReceiving.Store(false) - r.wgLogSend.Done() + r.wgSend.Done() }() }