Skip to content

Commit

Permalink
Stop handleOut only after the pipeline shut down
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Aug 22, 2024
1 parent 6924b8b commit 9534e4a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
17 changes: 12 additions & 5 deletions internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,24 @@ func New(o component.Options, args Arguments) (*Component, error) {

// Run implements component.Component.
func (c *Component) Run(ctx context.Context) error {
shutdownCh := make(chan struct{})
wgOut := &sync.WaitGroup{}
defer func() {
c.mut.RLock()
if c.entryHandler != nil {
c.entryHandler.Stop()
// Stop handleOut only after the entryHandler has stopped.
// If handleOut stops first, entryHandler might get stuck on a channel send.
close(shutdownCh)
wgOut.Wait()
}
c.mut.RUnlock()
}()
wg := &sync.WaitGroup{}
wg.Add(2)
wg.Add(1)
go c.handleIn(ctx, wg)
go c.handleOut(ctx, wg)
wgOut.Add(1)
go c.handleOut(shutdownCh, wgOut)

wg.Wait()
return nil
Expand Down Expand Up @@ -158,19 +165,19 @@ func (c *Component) handleIn(ctx context.Context, wg *sync.WaitGroup) {
}
}

func (c *Component) handleOut(ctx context.Context, wg *sync.WaitGroup) {
func (c *Component) handleOut(shutdownCh chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
case <-shutdownCh:
return
case entry := <-c.processOut:
c.fanoutMut.RLock()
fanout := c.fanout
c.fanoutMut.RUnlock()
for _, f := range fanout {
select {
case <-ctx.Done():
case <-shutdownCh:
return
case f.Chan() <- entry:
// no-op
Expand Down
31 changes: 18 additions & 13 deletions internal/component/loki/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,11 @@ type testFrequentUpdate struct {
receiver2 loki.LogsReceiver

keepSending atomic.Bool
keepReceiving atomic.Bool
stopReceiving chan struct{}
keepUpdating atomic.Bool

wgLogSend sync.WaitGroup
wgSend sync.WaitGroup
wgReceive sync.WaitGroup
wgRun sync.WaitGroup
wgUpdate sync.WaitGroup

Expand All @@ -423,27 +424,30 @@ type testFrequentUpdate struct {

func startTestFrequentUpdate(t *testing.T, cfg string) *testFrequentUpdate {
res := testFrequentUpdate{
t: t,
receiver1: loki.NewLogsReceiver(),
receiver2: loki.NewLogsReceiver(),
t: t,
receiver1: loki.NewLogsReceiver(),
receiver2: loki.NewLogsReceiver(),
stopReceiving: make(chan struct{}),
}

ctx, cancel := context.WithCancel(context.Background())

res.keepSending.Store(true)
res.keepReceiving.Store(true)
res.keepUpdating.Store(true)

res.stop = func() {
res.keepUpdating.Store(false)
res.wgUpdate.Wait()

res.keepSending.Store(false)
res.wgLogSend.Wait()
res.wgSend.Wait()

cancel()
res.wgRun.Wait()

close(res.stopReceiving)
res.wgReceive.Wait()

close(res.receiver1.Chan())
close(res.receiver2.Chan())
}
Expand Down Expand Up @@ -478,23 +482,25 @@ func (r *testFrequentUpdate) drainLogs() {
r.lastSend.Store(time.Now())
}

r.wgLogSend.Add(1)
r.wgReceive.Add(1)
go func() {
for r.keepReceiving.Load() {
for {
select {
case <-r.stopReceiving:
r.wgReceive.Done()
return
case <-r.receiver1.Chan():
drainLogs()
case <-r.receiver2.Chan():
drainLogs()
}
}
r.wgLogSend.Done()
}()
}

// Continuously send entries to both channels
func (r *testFrequentUpdate) sendLogs() {
r.wgLogSend.Add(1)
r.wgSend.Add(1)
go func() {
for r.keepSending.Load() {
ts := time.Now()
Expand All @@ -511,8 +517,7 @@ func (r *testFrequentUpdate) sendLogs() {
// continue
}
}
r.keepReceiving.Store(false)
r.wgLogSend.Done()
r.wgSend.Done()
}()
}

Expand Down

0 comments on commit 9534e4a

Please sign in to comment.