Skip to content

Commit 7248b52

Browse files
authored
Merge pull request #9 from tinybirdco/sink-panic
feat: sink panic and send timeout
2 parents 55803cd + 17c7ee1 commit 7248b52

File tree

1 file changed

+54
-2
lines changed

1 file changed

+54
-2
lines changed

pkg/exporter/channel_registry.go

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,19 @@ package exporter
33
import (
44
"context"
55
"sync"
6+
"time"
67

78
"github.com/tinybirdco/kubernetes-event-exporter/pkg/kube"
89
"github.com/tinybirdco/kubernetes-event-exporter/pkg/metrics"
910
"github.com/tinybirdco/kubernetes-event-exporter/pkg/sinks"
1011
"github.com/rs/zerolog/log"
1112
)
1213

14+
const (
15+
// sinkSendTimeout is the maximum time allowed for a sink to process an event
16+
sinkSendTimeout = 30 * time.Second
17+
)
18+
1319
// ChannelBasedReceiverRegistry creates two channels for each receiver. One is for receiving events and other one is
1420
// for breaking out of the infinite loop. Each message is passed to receivers
1521
// This might not be the best way to implement such feature. A ring buffer can be better
@@ -63,15 +69,61 @@ func (r *ChannelBasedReceiverRegistry) Register(name string, receiver sinks.Sink
6369
r.wg.Add(1)
6470

6571
go func() {
72+
// Ensure panic recovery to prevent sink crashes from stopping event processing
73+
defer func() {
74+
if panicErr := recover(); panicErr != nil {
75+
log.Error().
76+
Str("sink", name).
77+
Interface("panic", panicErr).
78+
Msg("Sink panic recovered, closing sink")
79+
80+
// Protect against Close() panicking again
81+
func() {
82+
defer func() {
83+
if closeErr := recover(); closeErr != nil {
84+
log.Error().
85+
Str("sink", name).
86+
Interface("panic", closeErr).
87+
Msg("Panic during Close() in recovery handler")
88+
}
89+
}()
90+
receiver.Close()
91+
}()
92+
93+
log.Info().Str("sink", name).Msg("Closed after panic")
94+
r.wg.Done()
95+
}
96+
}()
97+
6698
Loop:
6799
for {
68100
select {
69101
case ev := <-ch:
70102
log.Debug().Str("sink", name).Str("event", ev.Message).Msg("sending event to sink")
71-
err := receiver.Send(context.Background(), &ev)
103+
104+
// Create context with timeout to prevent indefinite blocking
105+
ctx, cancel := context.WithTimeout(context.Background(), sinkSendTimeout)
106+
107+
// Send event with timeout protection
108+
err := receiver.Send(ctx, &ev)
109+
cancel() // Always cancel context to release resources
110+
72111
if err != nil {
73112
r.MetricsStore.SendErrors.Inc()
74-
log.Debug().Err(err).Str("sink", name).Str("event", ev.Message).Msg("Cannot send event")
113+
if err == context.DeadlineExceeded {
114+
log.Error().
115+
Err(err).
116+
Str("sink", name).
117+
Str("event", ev.Message).
118+
Dur("timeout", sinkSendTimeout).
119+
Msg("Sink send timeout exceeded")
120+
} else {
121+
log.Debug().
122+
Err(err).
123+
Str("sink", name).
124+
Str("event", ev.Message).
125+
Msg("Cannot send event")
126+
}
75127
}
76128
case <-exitCh:
77129
log.Info().Str("sink", name).Msg("Closing the sink")

0 commit comments

Comments
 (0)