Skip to content

Commit

Permalink
Fix Beyla CTRL+C shutdown. Also simplify pipeline (#256)
Browse files Browse the repository at this point in the history
* moved transform.HTTPRequestSpan to request.Span

* Removed converter as pipe codec

* reformat

* Bugfix: stop beyla with simple sigint/sigterm
  • Loading branch information
mariomac authored Sep 5, 2023
1 parent d3586d4 commit d341235
Show file tree
Hide file tree
Showing 29 changed files with 619 additions and 583 deletions.
12 changes: 8 additions & 4 deletions pkg/beyla/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/pipe"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
)

// Config as provided by the user to configure and run Beyla
Expand All @@ -33,7 +34,7 @@ type Instrumenter struct {
// the ProcessTracer.
// TODO: When we split beyla into two executables, probably the BPF map
// should be the traces' communication mechanism instead of a native channel
tracesInput chan []any
tracesInput chan []request.Span

// TODO: temporary hack. REMOVE
// This will force that the pipeline is not created until we have a service name.
Expand All @@ -48,7 +49,7 @@ func New(config *Config) *Instrumenter {
return &Instrumenter{
config: (*pipe.Config)(config),
ctxInfo: buildContextInfo((*pipe.Config)(config)),
tracesInput: make(chan []any, config.ChannelBufferLen),
tracesInput: make(chan []request.Span, config.ChannelBufferLen),
TempHackWaitForServiceName: make(chan struct{}),
}
}
Expand Down Expand Up @@ -85,8 +86,11 @@ func (i *Instrumenter) FindAndInstrument(ctx context.Context) error {
for {
select {
case <-ctx.Done():
log().Debug("stopped searching for new processes to instrument")
finder.Close()
lg := log()
lg.Debug("stopped searching for new processes to instrument")
if err := finder.Close(); err != nil {
lg.Warn("error closing process finder instance", "error", err)
}
return
case pt := <-foundProcesses:
select {
Expand Down
9 changes: 5 additions & 4 deletions pkg/internal/ebpf/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/cilium/ebpf/ringbuf"

"github.com/grafana/beyla/pkg/internal/goexec"
"github.com/grafana/beyla/pkg/internal/request"
)

//go:generate $BPF2GO -cc $BPF_CLANG -cflags $BPF_CFLAGS -target bpf -type http_request_trace bpf ../../../../bpf/http_trace.c -- -I../../../../bpf/headers
Expand Down Expand Up @@ -72,13 +73,13 @@ type Filter struct {
Fd int
}

func Read[T any](record *ringbuf.Record) (any, error) {
var event T
func ReadHTTPRequestTraceAsSpan(record *ringbuf.Record) (request.Span, error) {
var event HTTPRequestTrace

err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event)
if err != nil {
return event, err
return request.Span{}, err
}

return event, nil
return HTTPRequestTraceToSpan(&event), nil
}
55 changes: 30 additions & 25 deletions pkg/internal/ebpf/common/ringbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"golang.org/x/exp/slog"

"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/request"
)

// ringBufReader interface extracts the used methods from ringbuf.Reader for proper
Expand All @@ -33,32 +34,35 @@ type ringBufForwarder[T any] struct {
logger *slog.Logger
ringbuffer *ebpf.Map
closers []io.Closer
events []T
evLen int
spans []request.Span
spansLen int
access sync.Mutex
ticker *time.Ticker
reader func(*ringbuf.Record) (T, error)
reader func(*ringbuf.Record) (request.Span, error)
metrics imetrics.Reporter
}

// ForwardRingbuf returns a function reads HTTPRequestTraces from an input ring buffer, accumulates them into an
// internal buffer, and forwards them to an output events channel, previously converted to transform.HTTPRequestSpan
// instances
// internal buffer, and forwards them to an output events channel, previously converted to request.Span
// instances.
// Despite it returns a StartFuncCtx, this is not used inside the Pipes' library but it's invoked
// directly in the code as a simple function.
func ForwardRingbuf[T any](
cfg *TracerConfig,
logger *slog.Logger,
ringbuffer *ebpf.Map,
reader func(*ringbuf.Record) (T, error),
reader func(*ringbuf.Record) (request.Span, error),
metrics imetrics.Reporter,
closers ...io.Closer,
) node.StartFuncCtx[[]T] {
) node.StartFuncCtx[[]request.Span] {
rbf := ringBufForwarder[T]{
cfg: cfg, logger: logger, ringbuffer: ringbuffer, closers: closers, reader: reader, metrics: metrics,
}
return rbf.readAndForward
}

func (rbf *ringBufForwarder[T]) readAndForward(ctx context.Context, eventsChan chan<- []T) {
func (rbf *ringBufForwarder[T]) readAndForward(ctx context.Context, spansChan chan<- []request.Span) {
rbf.logger.Debug("start reading and forwarding")
// BPF will send each measured trace via Ring Buffer, so we listen for them from the
// user space.
eventsReader, err := readerFactory(rbf.ringbuffer)
Expand All @@ -69,8 +73,8 @@ func (rbf *ringBufForwarder[T]) readAndForward(ctx context.Context, eventsChan c
rbf.closers = append(rbf.closers, eventsReader)
defer rbf.closeAllResources()

rbf.events = make([]T, rbf.cfg.BatchLength)
rbf.evLen = 0
rbf.spans = make([]request.Span, rbf.cfg.BatchLength)
rbf.spansLen = 0

// If the underlying context is closed, it closes the events reader
// so the function can exit.
Expand All @@ -79,7 +83,7 @@ func (rbf *ringBufForwarder[T]) readAndForward(ctx context.Context, eventsChan c
// Forwards periodically on timeout, if the batch is not full
if rbf.cfg.BatchTimeout > 0 {
rbf.ticker = time.NewTicker(rbf.cfg.BatchTimeout)
go rbf.bgFlushOnTimeout(eventsChan)
go rbf.bgFlushOnTimeout(spansChan)
}

// Main loop:
Expand All @@ -102,16 +106,16 @@ func (rbf *ringBufForwarder[T]) readAndForward(ctx context.Context, eventsChan c
}

rbf.access.Lock()
rbf.events[rbf.evLen], err = rbf.reader(&record)
rbf.spans[rbf.spansLen], err = rbf.reader(&record)
if err != nil {
rbf.logger.Error("error parsing perf event", err)
rbf.access.Unlock()
continue
}
rbf.evLen++
if rbf.evLen == rbf.cfg.BatchLength {
rbf.logger.Debug("submitting traces after batch is full", "len", rbf.evLen)
rbf.flushEvents(eventsChan)
rbf.spansLen++
if rbf.spansLen == rbf.cfg.BatchLength {
rbf.logger.Debug("submitting traces after batch is full", "len", rbf.spansLen)
rbf.flushEvents(spansChan)
if rbf.ticker != nil {
rbf.ticker.Reset(rbf.cfg.BatchTimeout)
}
Expand All @@ -120,27 +124,28 @@ func (rbf *ringBufForwarder[T]) readAndForward(ctx context.Context, eventsChan c
}
}

func (rbf *ringBufForwarder[T]) flushEvents(eventsChan chan<- []T) {
rbf.metrics.TracerFlush(rbf.evLen)
eventsChan <- rbf.events[:rbf.evLen]
rbf.events = make([]T, rbf.cfg.BatchLength)
rbf.evLen = 0
func (rbf *ringBufForwarder[T]) flushEvents(spansChan chan<- []request.Span) {
rbf.metrics.TracerFlush(rbf.spansLen)
spansChan <- rbf.spans[:rbf.spansLen]
rbf.spans = make([]request.Span, rbf.cfg.BatchLength)
rbf.spansLen = 0
}

func (rbf *ringBufForwarder[T]) bgFlushOnTimeout(eventsChan chan<- []T) {
func (rbf *ringBufForwarder[T]) bgFlushOnTimeout(spansChan chan<- []request.Span) {
for {
<-rbf.ticker.C
rbf.access.Lock()
if rbf.evLen > 0 {
rbf.logger.Debug("submitting traces on timeout", "len", rbf.evLen)
rbf.flushEvents(eventsChan)
if rbf.spansLen > 0 {
rbf.logger.Debug("submitting traces on timeout", "len", rbf.spansLen)
rbf.flushEvents(spansChan)
}
rbf.access.Unlock()
}
}

func (rbf *ringBufForwarder[T]) bgListenContextCancelation(ctx context.Context, eventsReader ringBufReader) {
<-ctx.Done()
rbf.logger.Debug("context is cancelled. Closing events reader")
_ = eventsReader.Close()
}

Expand Down
37 changes: 13 additions & 24 deletions pkg/internal/ebpf/common/ringbuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"golang.org/x/exp/slog"

"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/testutil"
)

Expand All @@ -27,12 +28,12 @@ func TestForwardRingbuf_CapacityFull(t *testing.T) {
ringBuf, restore := replaceTestRingBuf()
defer restore()
metrics := &metricsReporter{}
forwardedMessages := make(chan []any, 100)
go ForwardRingbuf(
forwardedMessages := make(chan []request.Span, 100)
go ForwardRingbuf[HTTPRequestTrace](
&TracerConfig{BatchLength: 10},
slog.With("test", "TestForwardRingbuf_CapacityFull"),
nil, // the source ring buffer can be null
toRequestTrace,
ReadHTTPRequestTraceAsSpan,
metrics,
)(context.Background(), forwardedMessages)

Expand All @@ -46,13 +47,13 @@ func TestForwardRingbuf_CapacityFull(t *testing.T) {
batch := testutil.ReadChannel(t, forwardedMessages, testTimeout)
require.Len(t, batch, 10)
for i := range batch {
assert.Equal(t, HTTPRequestTrace{Type: 1, Method: get, ContentLength: int64(i)}, batch[i])
assert.Equal(t, request.Span{Type: 1, Method: "GET", ContentLength: int64(i)}, batch[i])
}

batch = testutil.ReadChannel(t, forwardedMessages, testTimeout)
require.Len(t, batch, 10)
for i := range batch {
assert.Equal(t, HTTPRequestTrace{Type: 1, Method: get, ContentLength: int64(10 + i)}, batch[i])
assert.Equal(t, request.Span{Type: 1, Method: "GET", ContentLength: int64(10 + i)}, batch[i])
}
// AND metrics are properly updated
assert.Equal(t, 2, metrics.flushes)
Expand All @@ -73,12 +74,12 @@ func TestForwardRingbuf_Deadline(t *testing.T) {
defer restore()

metrics := &metricsReporter{}
forwardedMessages := make(chan []any, 100)
go ForwardRingbuf(
forwardedMessages := make(chan []request.Span, 100)
go ForwardRingbuf[HTTPRequestTrace](
&TracerConfig{BatchLength: 10, BatchTimeout: 20 * time.Millisecond},
slog.With("test", "TestForwardRingbuf_Deadline"),
nil, // the source ring buffer can be null
toRequestTrace,
ReadHTTPRequestTraceAsSpan,
metrics,
)(context.Background(), forwardedMessages)

Expand All @@ -95,7 +96,7 @@ func TestForwardRingbuf_Deadline(t *testing.T) {
}
require.Len(t, batch, 7)
for i := range batch {
assert.Equal(t, HTTPRequestTrace{Type: 1, Method: get, ContentLength: int64(i)}, batch[i])
assert.Equal(t, request.Span{Type: 1, Method: "GET", ContentLength: int64(i)}, batch[i])
}

// AND metrics are properly updated
Expand All @@ -110,14 +111,14 @@ func TestForwardRingbuf_Close(t *testing.T) {

metrics := &metricsReporter{}
closable := closableObject{}
go ForwardRingbuf(
go ForwardRingbuf[HTTPRequestTrace](
&TracerConfig{BatchLength: 10},
slog.With("test", "TestForwardRingbuf_Close"),
nil, // the source ring buffer can be null
toRequestTrace,
ReadHTTPRequestTraceAsSpan,
metrics,
&closable,
)(context.Background(), make(chan []any, 100))
)(context.Background(), make(chan []request.Span, 100))

assert.False(t, ringBuf.explicitClose.Load())
assert.False(t, closable.closed)
Expand Down Expand Up @@ -189,18 +190,6 @@ func (c *closableObject) Close() error {
return nil
}

func toRequestTrace(record *ringbuf.Record) (any, error) {
var event HTTPRequestTrace

err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event)
if err != nil {
slog.Error("Error reading generic HTTP event", err)
return nil, err
}

return event, nil
}

type metricsReporter struct {
imetrics.NoopReporter
flushes int
Expand Down
Loading

0 comments on commit d341235

Please sign in to comment.