From d341235a7b3ac40289e1f71715b4b1502636eba3 Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Tue, 5 Sep 2023 10:18:25 +0200 Subject: [PATCH] Fix Beyla CTRL+C shutdown. Also simplify pipeline (#256) * moved transform.HTTPRequestSpan to request.Span * Removed converter as pipe codec * reformat * Bugfix: stop beyla with simple sigint/sigterm --- pkg/beyla/beyla.go | 12 +- pkg/internal/ebpf/common/common.go | 9 +- pkg/internal/ebpf/common/ringbuf.go | 55 +++-- pkg/internal/ebpf/common/ringbuf_test.go | 37 ++- pkg/internal/ebpf/common/spanner.go | 103 +++++++++ .../common}/spanner_test.go | 85 ++----- pkg/internal/ebpf/goruntime/goruntime.go | 8 +- pkg/internal/ebpf/grpc/grpc.go | 8 +- pkg/internal/ebpf/httpfltr/httpfltr.go | 14 +- pkg/internal/ebpf/httpfltr/httpfltr_test.go | 44 ++-- .../ebpf/httpfltr/httpfltr_transform.go | 33 +++ .../ebpf/httpfltr/httpfltr_transform_test.go | 63 +++++ pkg/internal/ebpf/nethttp/nethttp.go | 15 +- pkg/internal/ebpf/tracer.go | 9 +- pkg/internal/ebpf/tracer_darwin.go | 4 +- pkg/internal/export/debug/debug.go | 10 +- pkg/internal/export/otel/metrics.go | 24 +- pkg/internal/export/otel/metrics_test.go | 6 +- pkg/internal/export/otel/traces.go | 48 ++-- pkg/internal/export/otel/traces_test.go | 10 +- pkg/internal/export/prom/prom.go | 22 +- pkg/internal/pipe/instrumenter.go | 14 +- pkg/internal/pipe/instrumenter_test.go | 181 ++++++++------- pkg/internal/request/span.go | 73 ++++++ pkg/internal/traces/reader.go | 24 +- pkg/internal/transform/k8s.go | 29 +-- pkg/internal/transform/routes.go | 13 +- pkg/internal/transform/routes_test.go | 31 +-- pkg/internal/transform/spanner.go | 218 ------------------ 29 files changed, 619 insertions(+), 583 deletions(-) create mode 100644 pkg/internal/ebpf/common/spanner.go rename pkg/internal/{transform => ebpf/common}/spanner_test.go (54%) create mode 100644 pkg/internal/ebpf/httpfltr/httpfltr_transform.go create mode 100644 pkg/internal/ebpf/httpfltr/httpfltr_transform_test.go create mode 100644 pkg/internal/request/span.go delete mode 100644 pkg/internal/transform/spanner.go diff --git a/pkg/beyla/beyla.go b/pkg/beyla/beyla.go index 9f985ba96..f99768834 100644 --- a/pkg/beyla/beyla.go +++ b/pkg/beyla/beyla.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/beyla/pkg/internal/imetrics" "github.com/grafana/beyla/pkg/internal/pipe" "github.com/grafana/beyla/pkg/internal/pipe/global" + "github.com/grafana/beyla/pkg/internal/request" ) // Config as provided by the user to configure and run Beyla @@ -33,7 +34,7 @@ type Instrumenter struct { // the ProcessTracer. // TODO: When we split beyla into two executables, probably the BPF map // should be the traces' communication mechanism instead of a native channel - tracesInput chan []any + tracesInput chan []request.Span // TODO: temporary hack. REMOVE // This will force that the pipeline is not created until we have a service name. @@ -48,7 +49,7 @@ func New(config *Config) *Instrumenter { return &Instrumenter{ config: (*pipe.Config)(config), ctxInfo: buildContextInfo((*pipe.Config)(config)), - tracesInput: make(chan []any, config.ChannelBufferLen), + tracesInput: make(chan []request.Span, config.ChannelBufferLen), TempHackWaitForServiceName: make(chan struct{}), } } @@ -85,8 +86,11 @@ func (i *Instrumenter) FindAndInstrument(ctx context.Context) error { for { select { case <-ctx.Done(): - log().Debug("stopped searching for new processes to instrument") - finder.Close() + lg := log() + lg.Debug("stopped searching for new processes to instrument") + if err := finder.Close(); err != nil { + lg.Warn("error closing process finder instance", "error", err) + } return case pt := <-foundProcesses: select { diff --git a/pkg/internal/ebpf/common/common.go b/pkg/internal/ebpf/common/common.go index 51a80d7b4..6e6824fdf 100644 --- a/pkg/internal/ebpf/common/common.go +++ b/pkg/internal/ebpf/common/common.go @@ -10,6 +10,7 @@ import ( "github.com/cilium/ebpf/ringbuf" "github.com/grafana/beyla/pkg/internal/goexec" + "github.com/grafana/beyla/pkg/internal/request" ) //go:generate $BPF2GO -cc $BPF_CLANG -cflags $BPF_CFLAGS -target bpf -type http_request_trace bpf ../../../../bpf/http_trace.c -- -I../../../../bpf/headers @@ -72,13 +73,13 @@ type Filter struct { Fd int } -func Read[T any](record *ringbuf.Record) (any, error) { - var event T +func ReadHTTPRequestTraceAsSpan(record *ringbuf.Record) (request.Span, error) { + var event HTTPRequestTrace err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event) if err != nil { - return event, err + return request.Span{}, err } - return event, nil + return HTTPRequestTraceToSpan(&event), nil } diff --git a/pkg/internal/ebpf/common/ringbuf.go b/pkg/internal/ebpf/common/ringbuf.go index 871730184..78959d77a 100644 --- a/pkg/internal/ebpf/common/ringbuf.go +++ b/pkg/internal/ebpf/common/ringbuf.go @@ -13,6 +13,7 @@ import ( "golang.org/x/exp/slog" "github.com/grafana/beyla/pkg/internal/imetrics" + "github.com/grafana/beyla/pkg/internal/request" ) // ringBufReader interface extracts the used methods from ringbuf.Reader for proper @@ -33,32 +34,35 @@ type ringBufForwarder[T any] struct { logger *slog.Logger ringbuffer *ebpf.Map closers []io.Closer - events []T - evLen int + spans []request.Span + spansLen int access sync.Mutex ticker *time.Ticker - reader func(*ringbuf.Record) (T, error) + reader func(*ringbuf.Record) (request.Span, error) metrics imetrics.Reporter } // ForwardRingbuf returns a function reads HTTPRequestTraces from an input ring buffer, accumulates them into an -// internal buffer, and forwards them to an output events channel, previously converted to transform.HTTPRequestSpan -// instances +// internal buffer, and forwards them to an output events channel, previously converted to request.Span +// instances. +// Despite it returns a StartFuncCtx, this is not used inside the Pipes' library but it's invoked +// directly in the code as a simple function. func ForwardRingbuf[T any]( cfg *TracerConfig, logger *slog.Logger, ringbuffer *ebpf.Map, - reader func(*ringbuf.Record) (T, error), + reader func(*ringbuf.Record) (request.Span, error), metrics imetrics.Reporter, closers ...io.Closer, -) node.StartFuncCtx[[]T] { +) node.StartFuncCtx[[]request.Span] { rbf := ringBufForwarder[T]{ cfg: cfg, logger: logger, ringbuffer: ringbuffer, closers: closers, reader: reader, metrics: metrics, } return rbf.readAndForward } -func (rbf *ringBufForwarder[T]) readAndForward(ctx context.Context, eventsChan chan<- []T) { +func (rbf *ringBufForwarder[T]) readAndForward(ctx context.Context, spansChan chan<- []request.Span) { + rbf.logger.Debug("start reading and forwarding") // BPF will send each measured trace via Ring Buffer, so we listen for them from the // user space. eventsReader, err := readerFactory(rbf.ringbuffer) @@ -69,8 +73,8 @@ func (rbf *ringBufForwarder[T]) readAndForward(ctx context.Context, eventsChan c rbf.closers = append(rbf.closers, eventsReader) defer rbf.closeAllResources() - rbf.events = make([]T, rbf.cfg.BatchLength) - rbf.evLen = 0 + rbf.spans = make([]request.Span, rbf.cfg.BatchLength) + rbf.spansLen = 0 // If the underlying context is closed, it closes the events reader // so the function can exit. @@ -79,7 +83,7 @@ func (rbf *ringBufForwarder[T]) readAndForward(ctx context.Context, eventsChan c // Forwards periodically on timeout, if the batch is not full if rbf.cfg.BatchTimeout > 0 { rbf.ticker = time.NewTicker(rbf.cfg.BatchTimeout) - go rbf.bgFlushOnTimeout(eventsChan) + go rbf.bgFlushOnTimeout(spansChan) } // Main loop: @@ -102,16 +106,16 @@ func (rbf *ringBufForwarder[T]) readAndForward(ctx context.Context, eventsChan c } rbf.access.Lock() - rbf.events[rbf.evLen], err = rbf.reader(&record) + rbf.spans[rbf.spansLen], err = rbf.reader(&record) if err != nil { rbf.logger.Error("error parsing perf event", err) rbf.access.Unlock() continue } - rbf.evLen++ - if rbf.evLen == rbf.cfg.BatchLength { - rbf.logger.Debug("submitting traces after batch is full", "len", rbf.evLen) - rbf.flushEvents(eventsChan) + rbf.spansLen++ + if rbf.spansLen == rbf.cfg.BatchLength { + rbf.logger.Debug("submitting traces after batch is full", "len", rbf.spansLen) + rbf.flushEvents(spansChan) if rbf.ticker != nil { rbf.ticker.Reset(rbf.cfg.BatchTimeout) } @@ -120,20 +124,20 @@ func (rbf *ringBufForwarder[T]) readAndForward(ctx context.Context, eventsChan c } } -func (rbf *ringBufForwarder[T]) flushEvents(eventsChan chan<- []T) { - rbf.metrics.TracerFlush(rbf.evLen) - eventsChan <- rbf.events[:rbf.evLen] - rbf.events = make([]T, rbf.cfg.BatchLength) - rbf.evLen = 0 +func (rbf *ringBufForwarder[T]) flushEvents(spansChan chan<- []request.Span) { + rbf.metrics.TracerFlush(rbf.spansLen) + spansChan <- rbf.spans[:rbf.spansLen] + rbf.spans = make([]request.Span, rbf.cfg.BatchLength) + rbf.spansLen = 0 } -func (rbf *ringBufForwarder[T]) bgFlushOnTimeout(eventsChan chan<- []T) { +func (rbf *ringBufForwarder[T]) bgFlushOnTimeout(spansChan chan<- []request.Span) { for { <-rbf.ticker.C rbf.access.Lock() - if rbf.evLen > 0 { - rbf.logger.Debug("submitting traces on timeout", "len", rbf.evLen) - rbf.flushEvents(eventsChan) + if rbf.spansLen > 0 { + rbf.logger.Debug("submitting traces on timeout", "len", rbf.spansLen) + rbf.flushEvents(spansChan) } rbf.access.Unlock() } @@ -141,6 +145,7 @@ func (rbf *ringBufForwarder[T]) bgFlushOnTimeout(eventsChan chan<- []T) { func (rbf *ringBufForwarder[T]) bgListenContextCancelation(ctx context.Context, eventsReader ringBufReader) { <-ctx.Done() + rbf.logger.Debug("context is cancelled. Closing events reader") _ = eventsReader.Close() } diff --git a/pkg/internal/ebpf/common/ringbuf_test.go b/pkg/internal/ebpf/common/ringbuf_test.go index 0d3d3752a..cd8bf7c91 100644 --- a/pkg/internal/ebpf/common/ringbuf_test.go +++ b/pkg/internal/ebpf/common/ringbuf_test.go @@ -17,6 +17,7 @@ import ( "golang.org/x/exp/slog" "github.com/grafana/beyla/pkg/internal/imetrics" + "github.com/grafana/beyla/pkg/internal/request" "github.com/grafana/beyla/pkg/internal/testutil" ) @@ -27,12 +28,12 @@ func TestForwardRingbuf_CapacityFull(t *testing.T) { ringBuf, restore := replaceTestRingBuf() defer restore() metrics := &metricsReporter{} - forwardedMessages := make(chan []any, 100) - go ForwardRingbuf( + forwardedMessages := make(chan []request.Span, 100) + go ForwardRingbuf[HTTPRequestTrace]( &TracerConfig{BatchLength: 10}, slog.With("test", "TestForwardRingbuf_CapacityFull"), nil, // the source ring buffer can be null - toRequestTrace, + ReadHTTPRequestTraceAsSpan, metrics, )(context.Background(), forwardedMessages) @@ -46,13 +47,13 @@ func TestForwardRingbuf_CapacityFull(t *testing.T) { batch := testutil.ReadChannel(t, forwardedMessages, testTimeout) require.Len(t, batch, 10) for i := range batch { - assert.Equal(t, HTTPRequestTrace{Type: 1, Method: get, ContentLength: int64(i)}, batch[i]) + assert.Equal(t, request.Span{Type: 1, Method: "GET", ContentLength: int64(i)}, batch[i]) } batch = testutil.ReadChannel(t, forwardedMessages, testTimeout) require.Len(t, batch, 10) for i := range batch { - assert.Equal(t, HTTPRequestTrace{Type: 1, Method: get, ContentLength: int64(10 + i)}, batch[i]) + assert.Equal(t, request.Span{Type: 1, Method: "GET", ContentLength: int64(10 + i)}, batch[i]) } // AND metrics are properly updated assert.Equal(t, 2, metrics.flushes) @@ -73,12 +74,12 @@ func TestForwardRingbuf_Deadline(t *testing.T) { defer restore() metrics := &metricsReporter{} - forwardedMessages := make(chan []any, 100) - go ForwardRingbuf( + forwardedMessages := make(chan []request.Span, 100) + go ForwardRingbuf[HTTPRequestTrace]( &TracerConfig{BatchLength: 10, BatchTimeout: 20 * time.Millisecond}, slog.With("test", "TestForwardRingbuf_Deadline"), nil, // the source ring buffer can be null - toRequestTrace, + ReadHTTPRequestTraceAsSpan, metrics, )(context.Background(), forwardedMessages) @@ -95,7 +96,7 @@ func TestForwardRingbuf_Deadline(t *testing.T) { } require.Len(t, batch, 7) for i := range batch { - assert.Equal(t, HTTPRequestTrace{Type: 1, Method: get, ContentLength: int64(i)}, batch[i]) + assert.Equal(t, request.Span{Type: 1, Method: "GET", ContentLength: int64(i)}, batch[i]) } // AND metrics are properly updated @@ -110,14 +111,14 @@ func TestForwardRingbuf_Close(t *testing.T) { metrics := &metricsReporter{} closable := closableObject{} - go ForwardRingbuf( + go ForwardRingbuf[HTTPRequestTrace]( &TracerConfig{BatchLength: 10}, slog.With("test", "TestForwardRingbuf_Close"), nil, // the source ring buffer can be null - toRequestTrace, + ReadHTTPRequestTraceAsSpan, metrics, &closable, - )(context.Background(), make(chan []any, 100)) + )(context.Background(), make(chan []request.Span, 100)) assert.False(t, ringBuf.explicitClose.Load()) assert.False(t, closable.closed) @@ -189,18 +190,6 @@ func (c *closableObject) Close() error { return nil } -func toRequestTrace(record *ringbuf.Record) (any, error) { - var event HTTPRequestTrace - - err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event) - if err != nil { - slog.Error("Error reading generic HTTP event", err) - return nil, err - } - - return event, nil -} - type metricsReporter struct { imetrics.NoopReporter flushes int diff --git a/pkg/internal/ebpf/common/spanner.go b/pkg/internal/ebpf/common/spanner.go new file mode 100644 index 000000000..485409cdd --- /dev/null +++ b/pkg/internal/ebpf/common/spanner.go @@ -0,0 +1,103 @@ +package ebpfcommon + +import ( + "bytes" + "net" + "strconv" + + "golang.org/x/exp/slog" + + "github.com/grafana/beyla/pkg/internal/request" +) + +var log = slog.With("component", "goexec.spanner") + +func HTTPRequestTraceToSpan(trace *HTTPRequestTrace) request.Span { + // From C, assuming 0-ended strings + methodLen := bytes.IndexByte(trace.Method[:], 0) + if methodLen < 0 { + methodLen = len(trace.Method) + } + pathLen := bytes.IndexByte(trace.Path[:], 0) + if pathLen < 0 { + pathLen = len(trace.Path) + } + + peer := "" + hostname := "" + hostPort := 0 + traceID := "" + + switch request.EventType(trace.Type) { + case request.EventTypeHTTPClient, request.EventTypeHTTP: + peer, _ = extractHostPort(trace.RemoteAddr[:]) + hostname, hostPort = extractHostPort(trace.Host[:]) + traceID = extractTraceID(trace.Traceparent) + case request.EventTypeGRPC: + hostPort = int(trace.HostPort) + peer = extractIP(trace.RemoteAddr[:], int(trace.RemoteAddrLen)) + hostname = extractIP(trace.Host[:], int(trace.HostLen)) + case request.EventTypeGRPCClient: + hostname, hostPort = extractHostPort(trace.Host[:]) + default: + log.Warn("unknown trace type %d", trace.Type) + } + + return request.Span{ + Type: request.EventType(trace.Type), + ID: trace.Id, + Method: string(trace.Method[:methodLen]), + Path: string(trace.Path[:pathLen]), + Peer: peer, + Host: hostname, + HostPort: hostPort, + ContentLength: trace.ContentLength, + RequestStart: int64(trace.GoStartMonotimeNs), + Start: int64(trace.StartMonotimeNs), + End: int64(trace.EndMonotimeNs), + Status: int(trace.Status), + TraceID: traceID, + } +} + +func extractHostPort(b []uint8) (string, int) { + addrLen := bytes.IndexByte(b, 0) + if addrLen < 0 { + addrLen = len(b) + } + + peer := "" + peerPort := 0 + + if addrLen > 0 { + addr := string(b[:addrLen]) + ip, port, err := net.SplitHostPort(addr) + if err != nil { + peer = addr + } else { + peer = ip + peerPort, _ = strconv.Atoi(port) + } + } + + return peer, peerPort +} + +func extractIP(b []uint8, size int) string { + if size > len(b) { + size = len(b) + } + return net.IP(b[:size]).String() +} + +func extractTraceID(traceparent [55]byte) string { + // If traceparent was not set in eBPF, entire field should be zeroed bytes. + if traceparent[0] == 0 { + return "" + } + + // It is assumed that eBPF code has already verified the length is exactly 55 + // See https://www.w3.org/TR/trace-context/#traceparent-header-field-values for format. + // 2 hex version + dash + 32 hex traceID + dash + 16 hex parent + dash + 2 hex flags + return string(traceparent[3:35]) +} diff --git a/pkg/internal/transform/spanner_test.go b/pkg/internal/ebpf/common/spanner_test.go similarity index 54% rename from pkg/internal/transform/spanner_test.go rename to pkg/internal/ebpf/common/spanner_test.go index 2b7e78825..344d7bd0c 100644 --- a/pkg/internal/transform/spanner_test.go +++ b/pkg/internal/ebpf/common/spanner_test.go @@ -1,12 +1,11 @@ -package transform +package ebpfcommon import ( "testing" "github.com/stretchr/testify/assert" - ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" - "github.com/grafana/beyla/pkg/internal/ebpf/httpfltr" + "github.com/grafana/beyla/pkg/internal/request" ) func cstr(s string) []byte { @@ -14,7 +13,7 @@ func cstr(s string) []byte { return append(b, 0) } -func makeHTTPRequestTrace(method, path, peerInfo string, status uint16, durationMs uint64) ebpfcommon.HTTPRequestTrace { +func makeHTTPRequestTrace(method, path, peerInfo string, status uint16, durationMs uint64) HTTPRequestTrace { m := [6]uint8{} copy(m[:], cstr(method)[:]) p := [100]uint8{} @@ -22,7 +21,7 @@ func makeHTTPRequestTrace(method, path, peerInfo string, status uint16, duration r := [50]uint8{} copy(r[:], cstr(peerInfo)[:]) - return ebpfcommon.HTTPRequestTrace{ + return HTTPRequestTrace{ Type: 1, // transform.EventTypeHTTP Method: m, Path: p, @@ -34,13 +33,13 @@ func makeHTTPRequestTrace(method, path, peerInfo string, status uint16, duration } } -func makeGRPCRequestTrace(path string, peerInfo []byte, status uint16, durationMs uint64) ebpfcommon.HTTPRequestTrace { +func makeGRPCRequestTrace(path string, peerInfo []byte, status uint16, durationMs uint64) HTTPRequestTrace { p := [100]uint8{} copy(p[:], cstr(path)[:]) r := [50]uint8{} copy(r[:], peerInfo[:]) - return ebpfcommon.HTTPRequestTrace{ + return HTTPRequestTrace{ Type: 2, // transform.EventTypeGRPC Path: p, RemoteAddr: r, @@ -52,7 +51,7 @@ func makeGRPCRequestTrace(path string, peerInfo []byte, status uint16, durationM } } -func assertMatches(t *testing.T, span *HTTPRequestSpan, method, path, peer string, status int, durationMs uint64) { +func assertMatches(t *testing.T, span *request.Span, method, path, peer string, status int, durationMs uint64) { assert.Equal(t, method, span.Method) assert.Equal(t, path, span.Path) assert.Equal(t, peer, span.Peer) @@ -64,37 +63,37 @@ func assertMatches(t *testing.T, span *HTTPRequestSpan, method, path, peer strin func TestRequestTraceParsing(t *testing.T) { t.Run("Test basic parsing", func(t *testing.T) { tr := makeHTTPRequestTrace("POST", "/users", "127.0.0.1:1234", 200, 5) - s := convertFromHTTPTrace(&tr) + s := HTTPRequestTraceToSpan(&tr) assertMatches(t, &s, "POST", "/users", "127.0.0.1", 200, 5) }) t.Run("Test with empty path and missing peer host", func(t *testing.T) { tr := makeHTTPRequestTrace("GET", "", ":1234", 403, 6) - s := convertFromHTTPTrace(&tr) + s := HTTPRequestTraceToSpan(&tr) assertMatches(t, &s, "GET", "", "", 403, 6) }) t.Run("Test with missing peer port", func(t *testing.T) { tr := makeHTTPRequestTrace("GET", "/posts/1/1", "1234", 500, 1) - s := convertFromHTTPTrace(&tr) + s := HTTPRequestTraceToSpan(&tr) assertMatches(t, &s, "GET", "/posts/1/1", "1234", 500, 1) }) t.Run("Test with invalid peer port", func(t *testing.T) { tr := makeHTTPRequestTrace("GET", "/posts/1/1", "1234:aaa", 500, 1) - s := convertFromHTTPTrace(&tr) + s := HTTPRequestTraceToSpan(&tr) assertMatches(t, &s, "GET", "/posts/1/1", "1234", 500, 1) }) t.Run("Test with GRPC request", func(t *testing.T) { tr := makeGRPCRequestTrace("/posts/1/1", []byte{0x7f, 0, 0, 0x1}, 2, 1) - s := convertFromHTTPTrace(&tr) + s := HTTPRequestTraceToSpan(&tr) assertMatches(t, &s, "", "/posts/1/1", "127.0.0.1", 2, 1) }) } -func makeSpanWithTimings(goStart, start, end uint64) HTTPRequestSpan { - tr := ebpfcommon.HTTPRequestTrace{ +func makeSpanWithTimings(goStart, start, end uint64) request.Span { + tr := HTTPRequestTrace{ Type: 1, Path: [100]uint8{}, RemoteAddr: [50]uint8{}, @@ -105,7 +104,7 @@ func makeSpanWithTimings(goStart, start, end uint64) HTTPRequestSpan { EndMonotimeNs: end, } - return convertFromHTTPTrace(&tr) + return HTTPRequestTraceToSpan(&tr) } func TestSpanNesting(t *testing.T) { @@ -125,57 +124,3 @@ func TestSpanNesting(t *testing.T) { b = makeSpanWithTimings(10000, 30000, 30000) assert.False(t, (&a).Inside(&b)) } - -func makeHTTPInfo(method, path, peer, host, comm string, peerPort, hostPort uint32, status uint16, durationMs uint64) httpfltr.HTTPInfo { - bpfInfo := httpfltr.BPFHTTPInfo{ - Type: 1, - Status: status, - StartMonotimeNs: durationMs * 1000000, - EndMonotimeNs: durationMs * 2 * 1000000, - } - i := httpfltr.HTTPInfo{ - BPFHTTPInfo: bpfInfo, - Method: method, - Peer: peer, - URL: path, - Host: host, - Comm: comm, - } - - i.ConnInfo.D_port = uint16(hostPort) - i.ConnInfo.S_port = uint16(peerPort) - - return i -} - -func TestHTTPInfoParsing(t *testing.T) { - t.Run("Test basic parsing", func(t *testing.T) { - tr := makeHTTPInfo("POST", "/users", "127.0.0.1", "127.0.0.2", "curl", 12345, 8080, 200, 5) - s := convertFromHTTPInfo(&tr) - assertMatchesInfo(t, &s, "POST", "/users", "127.0.0.1", "127.0.0.2", "curl", 8080, 200, 5) - }) - - t.Run("Test empty URL", func(t *testing.T) { - tr := makeHTTPInfo("POST", "", "127.0.0.1", "127.0.0.2", "curl", 12345, 8080, 200, 5) - s := convertFromHTTPInfo(&tr) - assertMatchesInfo(t, &s, "POST", "", "127.0.0.1", "127.0.0.2", "curl", 8080, 200, 5) - }) - - t.Run("Test parsing with URL parameters", func(t *testing.T) { - tr := makeHTTPInfo("POST", "/users?query=1234", "127.0.0.1", "127.0.0.2", "curl", 12345, 8080, 200, 5) - s := convertFromHTTPInfo(&tr) - assertMatchesInfo(t, &s, "POST", "/users", "127.0.0.1", "127.0.0.2", "curl", 8080, 200, 5) - }) -} - -func assertMatchesInfo(t *testing.T, span *HTTPRequestSpan, method, path, peer, host, comm string, hostPort int, status int, durationMs uint64) { - assert.Equal(t, method, span.Method) - assert.Equal(t, path, span.Path) - assert.Equal(t, host, span.Host) - assert.Equal(t, hostPort, span.HostPort) - assert.Equal(t, peer, span.Peer) - assert.Equal(t, status, span.Status) - assert.Equal(t, comm, span.ServiceName) - assert.Equal(t, int64(durationMs*1000000), int64(span.End-span.Start)) - assert.Equal(t, int64(durationMs*1000000), int64(span.End-span.RequestStart)) -} diff --git a/pkg/internal/ebpf/goruntime/goruntime.go b/pkg/internal/ebpf/goruntime/goruntime.go index 85848a683..66705212e 100644 --- a/pkg/internal/ebpf/goruntime/goruntime.go +++ b/pkg/internal/ebpf/goruntime/goruntime.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/beyla/pkg/internal/exec" "github.com/grafana/beyla/pkg/internal/goexec" "github.com/grafana/beyla/pkg/internal/imetrics" + "github.com/grafana/beyla/pkg/internal/request" ) //go:generate $BPF2GO -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64 bpf ../../../../bpf/go_runtime.c -- -I../../../../bpf/headers @@ -79,10 +80,11 @@ func (p *Tracer) SocketFilters() []*ebpf.Program { return nil } -func (p *Tracer) Run(ctx context.Context, eventsChan chan<- []any) { +func (p *Tracer) Run(ctx context.Context, eventsChan chan<- []request.Span) { logger := slog.With("component", "goruntime.Tracer") - ebpfcommon.ForwardRingbuf( - p.Cfg, logger, p.bpfObjects.Events, ebpfcommon.Read[ebpfcommon.HTTPRequestTrace], + ebpfcommon.ForwardRingbuf[ebpfcommon.HTTPRequestTrace]( + p.Cfg, logger, p.bpfObjects.Events, + ebpfcommon.ReadHTTPRequestTraceAsSpan, p.Metrics, append(p.closers, &p.bpfObjects)..., )(ctx, eventsChan) diff --git a/pkg/internal/ebpf/grpc/grpc.go b/pkg/internal/ebpf/grpc/grpc.go index 67145a0ee..692220d80 100644 --- a/pkg/internal/ebpf/grpc/grpc.go +++ b/pkg/internal/ebpf/grpc/grpc.go @@ -26,6 +26,7 @@ import ( "github.com/grafana/beyla/pkg/internal/exec" "github.com/grafana/beyla/pkg/internal/goexec" "github.com/grafana/beyla/pkg/internal/imetrics" + "github.com/grafana/beyla/pkg/internal/request" ) //go:generate $BPF2GO -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64 bpf ../../../../bpf/go_grpc.c -- -I../../../../bpf/headers @@ -108,10 +109,11 @@ func (p *Tracer) SocketFilters() []*ebpf.Program { return nil } -func (p *Tracer) Run(ctx context.Context, eventsChan chan<- []any) { +func (p *Tracer) Run(ctx context.Context, eventsChan chan<- []request.Span) { logger := slog.With("component", "grpc.Tracer") - ebpfcommon.ForwardRingbuf( - p.Cfg, logger, p.bpfObjects.Events, ebpfcommon.Read[ebpfcommon.HTTPRequestTrace], + ebpfcommon.ForwardRingbuf[ebpfcommon.HTTPRequestTrace]( + p.Cfg, logger, p.bpfObjects.Events, + ebpfcommon.ReadHTTPRequestTraceAsSpan, p.Metrics, append(p.closers, &p.bpfObjects)..., )(ctx, eventsChan) diff --git a/pkg/internal/ebpf/httpfltr/httpfltr.go b/pkg/internal/ebpf/httpfltr/httpfltr.go index c111afb0d..dbfb44763 100644 --- a/pkg/internal/ebpf/httpfltr/httpfltr.go +++ b/pkg/internal/ebpf/httpfltr/httpfltr.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/beyla/pkg/internal/exec" "github.com/grafana/beyla/pkg/internal/goexec" "github.com/grafana/beyla/pkg/internal/imetrics" + "github.com/grafana/beyla/pkg/internal/request" ) //go:generate $BPF2GO -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64 bpf ../../../../bpf/http_sock.c -- -I../../../../bpf/headers @@ -200,21 +201,22 @@ func (p *Tracer) SocketFilters() []*ebpf.Program { return []*ebpf.Program{p.bpfObjects.SocketHttpFilter} } -func (p *Tracer) Run(ctx context.Context, eventsChan chan<- []any) { - ebpfcommon.ForwardRingbuf( - p.Cfg, logger(), p.bpfObjects.Events, p.toRequestTrace, +func (p *Tracer) Run(ctx context.Context, eventsChan chan<- []request.Span) { + ebpfcommon.ForwardRingbuf[HTTPInfo]( + p.Cfg, logger(), p.bpfObjects.Events, + p.readHTTPInfoIntoSpan, p.Metrics, append(p.closers, &p.bpfObjects)..., )(ctx, eventsChan) } -func (p *Tracer) toRequestTrace(record *ringbuf.Record) (any, error) { +func (p *Tracer) readHTTPInfoIntoSpan(record *ringbuf.Record) (request.Span, error) { var event BPFHTTPInfo var result HTTPInfo err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event) if err != nil { - return result, err + return request.Span{}, err } result = HTTPInfo{BPFHTTPInfo: event} @@ -239,7 +241,7 @@ func (p *Tracer) toRequestTrace(record *ringbuf.Record) (any, error) { result.Comm = p.serviceName(event.Pid) } - return result, nil + return httpInfoToSpan(&result), nil } func (event *BPFHTTPInfo) url() string { diff --git a/pkg/internal/ebpf/httpfltr/httpfltr_test.go b/pkg/internal/ebpf/httpfltr/httpfltr_test.go index 9e26da59a..0e37ba7b9 100644 --- a/pkg/internal/ebpf/httpfltr/httpfltr_test.go +++ b/pkg/internal/ebpf/httpfltr/httpfltr_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" + "github.com/grafana/beyla/pkg/internal/request" ) const bufSize = 160 @@ -93,22 +94,27 @@ func TestToRequestTrace(t *testing.T) { record.ConnInfo.D_port = 1 record.ConnInfo.S_addr = [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, 192, 168, 0, 1} record.ConnInfo.D_addr = [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, 8, 8, 8, 8} - copy(record.Buf[:], []byte("GET /hello HTTP/1.1\r\nHost: example.com\r\n\r\n")) + copy(record.Buf[:], "GET /hello HTTP/1.1\r\nHost: example.com\r\n\r\n") buf := new(bytes.Buffer) err := binary.Write(buf, binary.LittleEndian, &record) assert.NoError(t, err) tracer := Tracer{Cfg: &ebpfcommon.TracerConfig{}} - result, err := tracer.toRequestTrace(&ringbuf.Record{RawSample: buf.Bytes()}) + result, err := tracer.readHTTPInfoIntoSpan(&ringbuf.Record{RawSample: buf.Bytes()}) assert.NoError(t, err) - expected := HTTPInfo{ - BPFHTTPInfo: record, - Host: "8.8.8.8", - Peer: "192.168.0.1", - URL: "/hello", - Method: "GET", + expected := request.Span{ + Host: "8.8.8.8", + Peer: "192.168.0.1", + Path: "/hello", + Method: "GET", + Status: 200, + Type: request.EventTypeHTTP, + RequestStart: 123456, + Start: 123456, + End: 789012, + HostPort: 1, } assert.Equal(t, expected, result) } @@ -121,24 +127,28 @@ func TestToRequestTraceNoConnection(t *testing.T) { record.Status = 200 record.ConnInfo.S_addr = [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, 192, 168, 0, 1} record.ConnInfo.D_addr = [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, 8, 8, 8, 8} - copy(record.Buf[:], []byte("GET /hello HTTP/1.1\r\nHost: localhost:7033\r\n\r\nUser-Agent: curl/7.81.0\r\nAccept: */*\r\n")) + copy(record.Buf[:], "GET /hello HTTP/1.1\r\nHost: localhost:7033\r\n\r\nUser-Agent: curl/7.81.0\r\nAccept: */*\r\n") buf := new(bytes.Buffer) err := binary.Write(buf, binary.LittleEndian, &record) assert.NoError(t, err) tracer := Tracer{Cfg: &ebpfcommon.TracerConfig{}} - result, err := tracer.toRequestTrace(&ringbuf.Record{RawSample: buf.Bytes()}) + result, err := tracer.readHTTPInfoIntoSpan(&ringbuf.Record{RawSample: buf.Bytes()}) assert.NoError(t, err) // change the expected port just before testing - record.ConnInfo.D_port = 7033 - expected := HTTPInfo{ - BPFHTTPInfo: record, - Host: "localhost", - Peer: "", - URL: "/hello", - Method: "GET", + expected := request.Span{ + Host: "localhost", + Peer: "", + Path: "/hello", + Method: "GET", + Type: request.EventTypeHTTP, + Start: 123456, + RequestStart: 123456, + End: 789012, + Status: 200, + HostPort: 7033, } assert.Equal(t, expected, result) } diff --git a/pkg/internal/ebpf/httpfltr/httpfltr_transform.go b/pkg/internal/ebpf/httpfltr/httpfltr_transform.go new file mode 100644 index 000000000..306644414 --- /dev/null +++ b/pkg/internal/ebpf/httpfltr/httpfltr_transform.go @@ -0,0 +1,33 @@ +package httpfltr + +import ( + "strings" + + "github.com/grafana/beyla/pkg/internal/request" +) + +func httpInfoToSpan(info *HTTPInfo) request.Span { + return request.Span{ + Type: request.EventType(info.Type), + ID: 0, + Method: info.Method, + Path: removeQuery(info.URL), + Peer: info.Peer, + Host: info.Host, + HostPort: int(info.ConnInfo.D_port), + ContentLength: int64(info.Len), + RequestStart: int64(info.StartMonotimeNs), + Start: int64(info.StartMonotimeNs), + End: int64(info.EndMonotimeNs), + Status: int(info.Status), + ServiceName: info.Comm, + } +} + +func removeQuery(url string) string { + idx := strings.IndexByte(url, '?') + if idx > 0 { + return url[:idx] + } + return url +} diff --git a/pkg/internal/ebpf/httpfltr/httpfltr_transform_test.go b/pkg/internal/ebpf/httpfltr/httpfltr_transform_test.go new file mode 100644 index 000000000..18e058c1b --- /dev/null +++ b/pkg/internal/ebpf/httpfltr/httpfltr_transform_test.go @@ -0,0 +1,63 @@ +package httpfltr + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/grafana/beyla/pkg/internal/request" +) + +func TestHTTPInfoParsing(t *testing.T) { + t.Run("Test basic parsing", func(t *testing.T) { + tr := makeHTTPInfo("POST", "/users", "127.0.0.1", "127.0.0.2", "curl", 12345, 8080, 200, 5) + s := httpInfoToSpan(&tr) + assertMatchesInfo(t, &s, "POST", "/users", "127.0.0.1", "127.0.0.2", "curl", 8080, 200, 5) + }) + + t.Run("Test empty URL", func(t *testing.T) { + tr := makeHTTPInfo("POST", "", "127.0.0.1", "127.0.0.2", "curl", 12345, 8080, 200, 5) + s := httpInfoToSpan(&tr) + assertMatchesInfo(t, &s, "POST", "", "127.0.0.1", "127.0.0.2", "curl", 8080, 200, 5) + }) + + t.Run("Test parsing with URL parameters", func(t *testing.T) { + tr := makeHTTPInfo("POST", "/users?query=1234", "127.0.0.1", "127.0.0.2", "curl", 12345, 8080, 200, 5) + s := httpInfoToSpan(&tr) + assertMatchesInfo(t, &s, "POST", "/users", "127.0.0.1", "127.0.0.2", "curl", 8080, 200, 5) + }) +} + +func makeHTTPInfo(method, path, peer, host, comm string, peerPort, hostPort uint32, status uint16, durationMs uint64) HTTPInfo { + bpfInfo := BPFHTTPInfo{ + Type: 1, + Status: status, + StartMonotimeNs: durationMs * 1000000, + EndMonotimeNs: durationMs * 2 * 1000000, + } + i := HTTPInfo{ + BPFHTTPInfo: bpfInfo, + Method: method, + Peer: peer, + URL: path, + Host: host, + Comm: comm, + } + + i.ConnInfo.D_port = uint16(hostPort) + i.ConnInfo.S_port = uint16(peerPort) + + return i +} + +func assertMatchesInfo(t *testing.T, span *request.Span, method, path, peer, host, comm string, hostPort int, status int, durationMs uint64) { + assert.Equal(t, method, span.Method) + assert.Equal(t, path, span.Path) + assert.Equal(t, host, span.Host) + assert.Equal(t, hostPort, span.HostPort) + assert.Equal(t, peer, span.Peer) + assert.Equal(t, status, span.Status) + assert.Equal(t, comm, span.ServiceName) + assert.Equal(t, int64(durationMs*1000000), int64(span.End-span.Start)) + assert.Equal(t, int64(durationMs*1000000), int64(span.End-span.RequestStart)) +} diff --git a/pkg/internal/ebpf/nethttp/nethttp.go b/pkg/internal/ebpf/nethttp/nethttp.go index 540ba3ec5..d6330cff0 100644 --- a/pkg/internal/ebpf/nethttp/nethttp.go +++ b/pkg/internal/ebpf/nethttp/nethttp.go @@ -26,6 +26,7 @@ import ( "github.com/grafana/beyla/pkg/internal/exec" "github.com/grafana/beyla/pkg/internal/goexec" "github.com/grafana/beyla/pkg/internal/imetrics" + "github.com/grafana/beyla/pkg/internal/request" ) //go:generate $BPF2GO -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64 bpf ../../../../bpf/go_nethttp.c -- -I../../../../bpf/headers @@ -107,10 +108,11 @@ func (p *Tracer) SocketFilters() []*ebpf.Program { return nil } -func (p *Tracer) Run(ctx context.Context, eventsChan chan<- []any) { +func (p *Tracer) Run(ctx context.Context, eventsChan chan<- []request.Span) { logger := slog.With("component", "nethttp.Tracer") - ebpfcommon.ForwardRingbuf( - p.Cfg, logger, p.bpfObjects.Events, ebpfcommon.Read[ebpfcommon.HTTPRequestTrace], + ebpfcommon.ForwardRingbuf[ebpfcommon.HTTPRequestTrace]( + p.Cfg, logger, p.bpfObjects.Events, + ebpfcommon.ReadHTTPRequestTraceAsSpan, p.Metrics, append(p.closers, &p.bpfObjects)..., )(ctx, eventsChan) @@ -133,10 +135,11 @@ func (p *GinTracer) GoProbes() map[string]ebpfcommon.FunctionPrograms { } } -func (p *GinTracer) Run(ctx context.Context, eventsChan chan<- []any) { +func (p *GinTracer) Run(ctx context.Context, eventsChan chan<- []request.Span) { logger := slog.With("component", "nethttp.GinTracer") - ebpfcommon.ForwardRingbuf( - p.Cfg, logger, p.bpfObjects.Events, ebpfcommon.Read[ebpfcommon.HTTPRequestTrace], + ebpfcommon.ForwardRingbuf[ebpfcommon.HTTPRequestTrace]( + p.Cfg, logger, p.bpfObjects.Events, + ebpfcommon.ReadHTTPRequestTraceAsSpan, p.Metrics, append(p.closers, &p.bpfObjects)..., )(ctx, eventsChan) diff --git a/pkg/internal/ebpf/tracer.go b/pkg/internal/ebpf/tracer.go index 32759fe20..ffd0d3470 100644 --- a/pkg/internal/ebpf/tracer.go +++ b/pkg/internal/ebpf/tracer.go @@ -19,6 +19,7 @@ import ( ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" "github.com/grafana/beyla/pkg/internal/exec" "github.com/grafana/beyla/pkg/internal/goexec" + "github.com/grafana/beyla/pkg/internal/request" ) // Tracer is an individual eBPF program (e.g. the net/http or the grpc tracers) @@ -45,7 +46,7 @@ type Tracer interface { SocketFilters() []*ebpf.Program // Run will do the action of listening for eBPF traces and forward them // periodically to the output channel. - Run(context.Context, chan<- []any) + Run(context.Context, chan<- []request.Span) // AddCloser adds io.Closer instances that need to be invoked when the // Run function ends. AddCloser(c ...io.Closer) @@ -62,7 +63,7 @@ type ProcessTracer struct { pinPath string } -func (pt *ProcessTracer) Run(ctx context.Context, out chan<- []any) { +func (pt *ProcessTracer) Run(ctx context.Context, out chan<- []request.Span) { var log = logger() // Searches for traceable functions @@ -78,11 +79,11 @@ func (pt *ProcessTracer) Run(ctx context.Context, out chan<- []any) { } // tracerFunctions returns a tracing function for each discovered eBPF traceable source: GRPC, HTTP... -func (pt *ProcessTracer) tracerFunctions() ([]func(context.Context, chan<- []any), error) { +func (pt *ProcessTracer) tracerFunctions() ([]func(context.Context, chan<- []request.Span), error) { var log = logger() // tracerFuncs contains the eBPF programs (HTTP, GRPC tracers...) - var tracerFuncs []func(context.Context, chan<- []any) + var tracerFuncs []func(context.Context, chan<- []request.Span) for _, p := range pt.programs { plog := log.With("program", reflect.TypeOf(p)) diff --git a/pkg/internal/ebpf/tracer_darwin.go b/pkg/internal/ebpf/tracer_darwin.go index 69488da51..992b0cea7 100644 --- a/pkg/internal/ebpf/tracer_darwin.go +++ b/pkg/internal/ebpf/tracer_darwin.go @@ -2,9 +2,11 @@ package ebpf import ( "context" + + "github.com/grafana/beyla/pkg/internal/request" ) // dummy functions and types to avoid compilation errors in Darwin. The tracer component is only usable in Linux. type ProcessTracer struct{} -func (pt *ProcessTracer) Run(_ context.Context, _ chan<- []any) {} +func (pt *ProcessTracer) Run(_ context.Context, _ chan<- []request.Span) {} diff --git a/pkg/internal/export/debug/debug.go b/pkg/internal/export/debug/debug.go index 7fdb4cff7..55a4b8291 100644 --- a/pkg/internal/export/debug/debug.go +++ b/pkg/internal/export/debug/debug.go @@ -7,7 +7,7 @@ import ( "github.com/mariomac/pipes/pkg/node" - "github.com/grafana/beyla/pkg/internal/transform" + "github.com/grafana/beyla/pkg/internal/request" ) type PrintEnabled bool @@ -16,8 +16,8 @@ func (p PrintEnabled) Enabled() bool { return bool(p) } -func PrinterNode(_ context.Context, _ PrintEnabled) (node.TerminalFunc[[]transform.HTTPRequestSpan], error) { - return func(input <-chan []transform.HTTPRequestSpan) { +func PrinterNode(_ context.Context, _ PrintEnabled) (node.TerminalFunc[[]request.Span], error) { + return func(input <-chan []request.Span) { for spans := range input { for i := range spans { t := spans[i].Timings() @@ -45,9 +45,9 @@ type NoopEnabled bool func (n NoopEnabled) Enabled() bool { return bool(n) } -func NoopNode(_ context.Context, _ NoopEnabled) (node.TerminalFunc[[]transform.HTTPRequestSpan], error) { +func NoopNode(_ context.Context, _ NoopEnabled) (node.TerminalFunc[[]request.Span], error) { counter := 0 - return func(spans <-chan []transform.HTTPRequestSpan) { + return func(spans <-chan []request.Span) { for range spans { counter += len(spans) } diff --git a/pkg/internal/export/otel/metrics.go b/pkg/internal/export/otel/metrics.go index 919b85b36..a606a549e 100644 --- a/pkg/internal/export/otel/metrics.go +++ b/pkg/internal/export/otel/metrics.go @@ -23,7 +23,7 @@ import ( "github.com/grafana/beyla/pkg/internal/imetrics" "github.com/grafana/beyla/pkg/internal/pipe/global" - "github.com/grafana/beyla/pkg/internal/transform" + "github.com/grafana/beyla/pkg/internal/request" ) func mlog() *slog.Logger { @@ -92,7 +92,7 @@ type MetricsReporter struct { func ReportMetrics( ctx context.Context, cfg *MetricsConfig, ctxInfo *global.ContextInfo, -) (node.TerminalFunc[[]transform.HTTPRequestSpan], error) { +) (node.TerminalFunc[[]request.Span], error) { mr, err := newMetricsReporter(ctx, cfg, ctxInfo) if err != nil { @@ -239,11 +239,11 @@ func otelHistogramBuckets(metricName string, buckets []float64) metric.View { }) } -func (r *MetricsReporter) metricAttributes(span *transform.HTTPRequestSpan) attribute.Set { +func (r *MetricsReporter) metricAttributes(span *request.Span) attribute.Set { var attrs []attribute.KeyValue switch span.Type { - case transform.EventTypeHTTP: + case request.EventTypeHTTP: attrs = []attribute.KeyValue{ semconv.HTTPMethod(span.Method), semconv.HTTPStatusCode(span.Status), @@ -257,7 +257,7 @@ func (r *MetricsReporter) metricAttributes(span *transform.HTTPRequestSpan) attr if span.Route != "" { attrs = append(attrs, semconv.HTTPRoute(span.Route)) } - case transform.EventTypeGRPC, transform.EventTypeGRPCClient: + case request.EventTypeGRPC, request.EventTypeGRPCClient: attrs = []attribute.KeyValue{ semconv.RPCMethod(span.Path), semconv.RPCSystemGRPC, @@ -266,7 +266,7 @@ func (r *MetricsReporter) metricAttributes(span *transform.HTTPRequestSpan) attr if r.reportPeer { attrs = append(attrs, semconv.NetSockPeerAddr(span.Peer)) } - case transform.EventTypeHTTPClient: + case request.EventTypeHTTPClient: attrs = []attribute.KeyValue{ semconv.HTTPMethod(span.Method), semconv.HTTPStatusCode(span.Status), @@ -288,26 +288,26 @@ func (r *MetricsReporter) metricAttributes(span *transform.HTTPRequestSpan) attr return attribute.NewSet(attrs...) } -func (r *MetricsReporter) record(span *transform.HTTPRequestSpan, attrs attribute.Set) { +func (r *MetricsReporter) record(span *request.Span, attrs attribute.Set) { t := span.Timings() duration := t.End.Sub(t.RequestStart).Seconds() attrOpt := instrument.WithAttributeSet(attrs) switch span.Type { - case transform.EventTypeHTTP: + case request.EventTypeHTTP: // TODO: for more accuracy, there must be a way to set the metric time from the actual span end time r.httpDuration.Record(r.ctx, duration, attrOpt) r.httpRequestSize.Record(r.ctx, float64(span.ContentLength), attrOpt) - case transform.EventTypeGRPC: + case request.EventTypeGRPC: r.grpcDuration.Record(r.ctx, duration, attrOpt) - case transform.EventTypeGRPCClient: + case request.EventTypeGRPCClient: r.grpcClientDuration.Record(r.ctx, duration, attrOpt) - case transform.EventTypeHTTPClient: + case request.EventTypeHTTPClient: r.httpClientDuration.Record(r.ctx, duration, attrOpt) r.httpClientRequestSize.Record(r.ctx, float64(span.ContentLength), attrOpt) } } -func (r *MetricsReporter) reportMetrics(input <-chan []transform.HTTPRequestSpan) { +func (r *MetricsReporter) reportMetrics(input <-chan []request.Span) { defer r.close() for spans := range input { for i := range spans { diff --git a/pkg/internal/export/otel/metrics_test.go b/pkg/internal/export/otel/metrics_test.go index d5d3c7771..e65f28faf 100644 --- a/pkg/internal/export/otel/metrics_test.go +++ b/pkg/internal/export/otel/metrics_test.go @@ -16,7 +16,7 @@ import ( "github.com/grafana/beyla/pkg/internal/imetrics" "github.com/grafana/beyla/pkg/internal/pipe/global" - "github.com/grafana/beyla/pkg/internal/transform" + "github.com/grafana/beyla/pkg/internal/request" ) const timeout = 5 * time.Second @@ -98,10 +98,10 @@ func TestMetrics_InternalInstrumentation(t *testing.T) { // create a simple dummy graph to send data to the Metrics reporter, which will send // metrics to the fake collector sendData := make(chan struct{}) - inputNode := node.AsStart(func(out chan<- []transform.HTTPRequestSpan) { + inputNode := node.AsStart(func(out chan<- []request.Span) { // on every send data signal, the traces generator sends a dummy trace for range sendData { - out <- []transform.HTTPRequestSpan{{Type: transform.EventTypeHTTP}} + out <- []request.Span{{Type: request.EventTypeHTTP}} } }) internalMetrics := &fakeInternalMetrics{} diff --git a/pkg/internal/export/otel/traces.go b/pkg/internal/export/otel/traces.go index 75f186547..ac1a5337a 100644 --- a/pkg/internal/export/otel/traces.go +++ b/pkg/internal/export/otel/traces.go @@ -23,7 +23,7 @@ import ( "github.com/grafana/beyla/pkg/internal/imetrics" "github.com/grafana/beyla/pkg/internal/pipe/global" - "github.com/grafana/beyla/pkg/internal/transform" + "github.com/grafana/beyla/pkg/internal/request" ) func tlog() *slog.Logger { @@ -31,12 +31,12 @@ func tlog() *slog.Logger { } type SessionSpan struct { - ReqSpan transform.HTTPRequestSpan + ReqSpan request.Span RootCtx context.Context } var topSpans, _ = lru.New[uint64, SessionSpan](8192) -var clientSpans, _ = lru.New[uint64, []transform.HTTPRequestSpan](8192) +var clientSpans, _ = lru.New[uint64, []request.Span](8192) var namedTracers, _ = lru.New[string, *trace.TracerProvider](512) const reporterName = "github.com/grafana/beyla" @@ -82,7 +82,7 @@ type TracesReporter struct { bsp trace.SpanProcessor } -func ReportTraces(ctx context.Context, cfg *TracesConfig, ctxInfo *global.ContextInfo) (node.TerminalFunc[[]transform.HTTPRequestSpan], error) { +func ReportTraces(ctx context.Context, cfg *TracesConfig, ctxInfo *global.ContextInfo) (node.TerminalFunc[[]request.Span], error) { tr, err := newTracesReporter(ctx, cfg, ctxInfo) if err != nil { slog.Error("can't instantiate OTEL traces reporter", err) @@ -188,11 +188,11 @@ func (r *TracesReporter) close() { } } -func (r *TracesReporter) traceAttributes(span *transform.HTTPRequestSpan) []attribute.KeyValue { +func (r *TracesReporter) traceAttributes(span *request.Span) []attribute.KeyValue { var attrs []attribute.KeyValue switch span.Type { - case transform.EventTypeHTTP: + case request.EventTypeHTTP: attrs = []attribute.KeyValue{ semconv.HTTPMethod(span.Method), semconv.HTTPStatusCode(span.Status), @@ -205,7 +205,7 @@ func (r *TracesReporter) traceAttributes(span *transform.HTTPRequestSpan) []attr if span.Route != "" { attrs = append(attrs, semconv.HTTPRoute(span.Route)) } - case transform.EventTypeGRPC: + case request.EventTypeGRPC: attrs = []attribute.KeyValue{ semconv.RPCMethod(span.Path), semconv.RPCSystemGRPC, @@ -214,7 +214,7 @@ func (r *TracesReporter) traceAttributes(span *transform.HTTPRequestSpan) []attr semconv.NetHostName(span.Host), semconv.NetHostPort(span.HostPort), } - case transform.EventTypeHTTPClient: + case request.EventTypeHTTPClient: attrs = []attribute.KeyValue{ semconv.HTTPMethod(span.Method), semconv.HTTPStatusCode(span.Status), @@ -223,7 +223,7 @@ func (r *TracesReporter) traceAttributes(span *transform.HTTPRequestSpan) []attr semconv.NetPeerPort(span.HostPort), semconv.HTTPRequestContentLength(int(span.ContentLength)), } - case transform.EventTypeGRPCClient: + case request.EventTypeGRPCClient: attrs = []attribute.KeyValue{ semconv.RPCMethod(span.Path), semconv.RPCSystemGRPC, @@ -245,33 +245,33 @@ func (r *TracesReporter) traceAttributes(span *transform.HTTPRequestSpan) []attr return attrs } -func traceName(span *transform.HTTPRequestSpan) string { +func traceName(span *request.Span) string { switch span.Type { - case transform.EventTypeHTTP: + case request.EventTypeHTTP: name := span.Method if span.Route != "" { name += " " + span.Route } return name - case transform.EventTypeGRPC, transform.EventTypeGRPCClient: + case request.EventTypeGRPC, request.EventTypeGRPCClient: return span.Path - case transform.EventTypeHTTPClient: + case request.EventTypeHTTPClient: return span.Method } return "" } -func spanKind(span *transform.HTTPRequestSpan) trace2.SpanKind { +func spanKind(span *request.Span) trace2.SpanKind { switch span.Type { - case transform.EventTypeHTTP, transform.EventTypeGRPC: + case request.EventTypeHTTP, request.EventTypeGRPC: return trace2.SpanKindServer - case transform.EventTypeHTTPClient, transform.EventTypeGRPCClient: + case request.EventTypeHTTPClient, request.EventTypeGRPCClient: return trace2.SpanKindClient } return trace2.SpanKindInternal } -func (r *TracesReporter) makeSpan(parentCtx context.Context, tracer trace2.Tracer, span *transform.HTTPRequestSpan) SessionSpan { +func (r *TracesReporter) makeSpan(parentCtx context.Context, tracer trace2.Tracer, span *request.Span) SessionSpan { t := span.Timings() if span.TraceID != "" { @@ -316,7 +316,7 @@ func (r *TracesReporter) makeSpan(parentCtx context.Context, tracer trace2.Trace return SessionSpan{*span, ctx} } -func (r *TracesReporter) reportClientSpan(span *transform.HTTPRequestSpan, tracer trace2.Tracer) { +func (r *TracesReporter) reportClientSpan(span *request.Span, tracer trace2.Tracer) { ctx := r.ctx // we have a parent request span @@ -329,7 +329,7 @@ func (r *TracesReporter) reportClientSpan(span *transform.HTTPRequestSpan, trace // stash the client span for later addition cs, ok := clientSpans.Get(span.ID) if !ok { - cs = []transform.HTTPRequestSpan{*span} + cs = []request.Span{*span} } else { cs = append(cs, *span) } @@ -343,13 +343,13 @@ func (r *TracesReporter) reportClientSpan(span *transform.HTTPRequestSpan, trace r.makeSpan(ctx, tracer, span) } -func (r *TracesReporter) reportServerSpan(span *transform.HTTPRequestSpan, tracer trace2.Tracer) { +func (r *TracesReporter) reportServerSpan(span *request.Span, tracer trace2.Tracer) { s := r.makeSpan(r.ctx, tracer, span) if span.ID != 0 { topSpans.Add(span.ID, s) cs, ok := clientSpans.Get(span.ID) - newer := []transform.HTTPRequestSpan{} + newer := []request.Span{} if ok { // finish any client spans that were waiting for this parent span for j := range cs { @@ -371,7 +371,7 @@ func (r *TracesReporter) reportServerSpan(span *transform.HTTPRequestSpan, trace } } -func (r *TracesReporter) reportTraces(input <-chan []transform.HTTPRequestSpan) { +func (r *TracesReporter) reportTraces(input <-chan []request.Span) { defer r.close() defaultTracer := r.traceProvider.Tracer(reporterName) for spans := range input { @@ -384,9 +384,9 @@ func (r *TracesReporter) reportTraces(input <-chan []transform.HTTPRequestSpan) } switch span.Type { - case transform.EventTypeHTTPClient, transform.EventTypeGRPCClient: + case request.EventTypeHTTPClient, request.EventTypeGRPCClient: r.reportClientSpan(span, spanTracer) - case transform.EventTypeHTTP, transform.EventTypeGRPC: + case request.EventTypeHTTP, request.EventTypeGRPC: r.reportServerSpan(span, spanTracer) } } diff --git a/pkg/internal/export/otel/traces_test.go b/pkg/internal/export/otel/traces_test.go index 0f77f087e..1ababbf9f 100644 --- a/pkg/internal/export/otel/traces_test.go +++ b/pkg/internal/export/otel/traces_test.go @@ -16,7 +16,7 @@ import ( "github.com/grafana/beyla/pkg/internal/imetrics" "github.com/grafana/beyla/pkg/internal/pipe/global" - "github.com/grafana/beyla/pkg/internal/transform" + "github.com/grafana/beyla/pkg/internal/request" ) func TestTracesEndpoint(t *testing.T) { @@ -175,10 +175,10 @@ func TestTraces_InternalInstrumentation(t *testing.T) { // create a simple dummy graph to send data to the Metrics reporter, which will send // metrics to the fake collector sendData := make(chan struct{}) - inputNode := node.AsStart(func(out chan<- []transform.HTTPRequestSpan) { + inputNode := node.AsStart(func(out chan<- []request.Span) { // on every send data signal, the traces generator sends a dummy trace for range sendData { - out <- []transform.HTTPRequestSpan{{Type: transform.EventTypeHTTP}} + out <- []request.Span{{Type: request.EventTypeHTTP}} } }) internalTraces := &fakeInternalTraces{} @@ -266,10 +266,10 @@ func TestTraces_InternalInstrumentationSampling(t *testing.T) { // create a simple dummy graph to send data to the Metrics reporter, which will send // metrics to the fake collector sendData := make(chan struct{}) - inputNode := node.AsStart(func(out chan<- []transform.HTTPRequestSpan) { + inputNode := node.AsStart(func(out chan<- []request.Span) { // on every send data signal, the traces generator sends a dummy trace for range sendData { - out <- []transform.HTTPRequestSpan{{Type: transform.EventTypeHTTP}} + out <- []request.Span{{Type: request.EventTypeHTTP}} } }) internalTraces := &fakeInternalTraces{} diff --git a/pkg/internal/export/prom/prom.go b/pkg/internal/export/prom/prom.go index bb9d2ee4f..5911d82f2 100644 --- a/pkg/internal/export/prom/prom.go +++ b/pkg/internal/export/prom/prom.go @@ -10,7 +10,7 @@ import ( "github.com/grafana/beyla/pkg/internal/connector" "github.com/grafana/beyla/pkg/internal/export/otel" "github.com/grafana/beyla/pkg/internal/pipe/global" - "github.com/grafana/beyla/pkg/internal/transform" + "github.com/grafana/beyla/pkg/internal/request" ) // using labels and names that are equivalent names to the OTEL attributes @@ -71,7 +71,7 @@ type metricsReporter struct { bgCtx context.Context } -func PrometheusEndpoint(ctx context.Context, cfg *PrometheusConfig, ctxInfo *global.ContextInfo) (node.TerminalFunc[[]transform.HTTPRequestSpan], error) { +func PrometheusEndpoint(ctx context.Context, cfg *PrometheusConfig, ctxInfo *global.ContextInfo) (node.TerminalFunc[[]request.Span], error) { reporter := newReporter(ctx, cfg, ctxInfo) return reporter.reportMetrics, nil } @@ -129,7 +129,7 @@ func newReporter(ctx context.Context, cfg *PrometheusConfig, ctxInfo *global.Con return mr } -func (r *metricsReporter) reportMetrics(input <-chan []transform.HTTPRequestSpan) { +func (r *metricsReporter) reportMetrics(input <-chan []request.Span) { go r.promConnect.StartHTTP(r.bgCtx) for spans := range input { for i := range spans { @@ -138,21 +138,21 @@ func (r *metricsReporter) reportMetrics(input <-chan []transform.HTTPRequestSpan } } -func (r *metricsReporter) observe(span *transform.HTTPRequestSpan) { +func (r *metricsReporter) observe(span *request.Span) { t := span.Timings() duration := t.End.Sub(t.RequestStart).Seconds() switch span.Type { - case transform.EventTypeHTTP: + case request.EventTypeHTTP: lv := r.labelValuesHTTP(span) r.httpDuration.WithLabelValues(lv...).Observe(duration) r.httpRequestSize.WithLabelValues(lv...).Observe(float64(span.ContentLength)) - case transform.EventTypeHTTPClient: + case request.EventTypeHTTPClient: lv := r.labelValuesHTTPClient(span) r.httpClientDuration.WithLabelValues(lv...).Observe(duration) r.httpClientRequestSize.WithLabelValues(lv...).Observe(float64(span.ContentLength)) - case transform.EventTypeGRPC: + case request.EventTypeGRPC: r.grpcDuration.WithLabelValues(r.labelValuesGRPC(span)...).Observe(duration) - case transform.EventTypeGRPCClient: + case request.EventTypeGRPCClient: r.grpcClientDuration.WithLabelValues(r.labelValuesGRPC(span)...).Observe(duration) } } @@ -172,7 +172,7 @@ func labelNamesGRPC(cfg *PrometheusConfig) []string { // labelValuesGRPC must return the label names in the same order as would be returned // by labelNamesGRPC -func (r *metricsReporter) labelValuesGRPC(span *transform.HTTPRequestSpan) []string { +func (r *metricsReporter) labelValuesGRPC(span *request.Span) []string { // serviceNameKey, rpcMethodKey, rpcSystemGRPC, rpcGRPCStatusCodeKey // In some situations e.g. system-wide instrumentation, the global service name // is empty and we need to take the name from the trace @@ -205,7 +205,7 @@ func labelNamesHTTPClient(cfg *PrometheusConfig) []string { // labelValuesHTTPClient must return the label names in the same order as would be returned // by labelNamesHTTPClient -func (r *metricsReporter) labelValuesHTTPClient(span *transform.HTTPRequestSpan) []string { +func (r *metricsReporter) labelValuesHTTPClient(span *request.Span) []string { // httpMethodKey, httpStatusCodeKey names := []string{r.cfg.ServiceName, span.Method, strconv.Itoa(span.Status)} if r.cfg.ServiceNamespace != "" { @@ -239,7 +239,7 @@ func labelNamesHTTP(cfg *PrometheusConfig, reportRoutes bool) []string { // labelValuesGRPC must return the label names in the same order as would be returned // by labelNamesHTTP -func (r *metricsReporter) labelValuesHTTP(span *transform.HTTPRequestSpan) []string { +func (r *metricsReporter) labelValuesHTTP(span *request.Span) []string { // httpMethodKey, httpStatusCodeKey names := []string{r.cfg.ServiceName, span.Method, strconv.Itoa(span.Status)} if r.cfg.ServiceNamespace != "" { diff --git a/pkg/internal/pipe/instrumenter.go b/pkg/internal/pipe/instrumenter.go index 0eb7fda9d..7e7462f95 100644 --- a/pkg/internal/pipe/instrumenter.go +++ b/pkg/internal/pipe/instrumenter.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/beyla/pkg/internal/export/prom" "github.com/grafana/beyla/pkg/internal/imetrics" "github.com/grafana/beyla/pkg/internal/pipe/global" + "github.com/grafana/beyla/pkg/internal/request" "github.com/grafana/beyla/pkg/internal/traces" "github.com/grafana/beyla/pkg/internal/transform" ) @@ -55,12 +56,12 @@ type graphFunctions struct { // tracesCh is shared across all the eBPF tracing programs, which send there // any discovered trace, and the input node of the graph, which reads and // forwards them to the next stages. - tracesCh <-chan []any + tracesCh <-chan []request.Span } // Build instantiates the whole instrumentation --> processing --> submit // pipeline graph and returns it as a startable item -func Build(ctx context.Context, config *Config, ctxInfo *global.ContextInfo, tracesCh <-chan []any) (*Instrumenter, error) { +func Build(ctx context.Context, config *Config, ctxInfo *global.ContextInfo, tracesCh <-chan []request.Span) (*Instrumenter, error) { if err := config.Validate(); err != nil { return nil, fmt.Errorf("validating configuration: %w", err) } @@ -70,7 +71,7 @@ func Build(ctx context.Context, config *Config, ctxInfo *global.ContextInfo, tra // private constructor that can be instantiated from tests to override the node providers // and offsets inspector -func newGraphBuilder(config *Config, ctxInfo *global.ContextInfo, tracesCh <-chan []any) *graphFunctions { +func newGraphBuilder(config *Config, ctxInfo *global.ContextInfo, tracesCh <-chan []request.Span) *graphFunctions { // This is how the github.com/mariomac/pipes library, works: // First, we create a graph builder gnb := graph.NewBuilder(node.ChannelBufferLen(config.ChannelBufferLen)) @@ -83,7 +84,6 @@ func newGraphBuilder(config *Config, ctxInfo *global.ContextInfo, tracesCh <-cha // Second, we register providers for each node. Each provider is a function that receives the // type of each of the "nodesMap" struct fields, and returns the function that represents // each node. Each function will have input and/or output channels. - graph.RegisterCodec(gnb, transform.ConvertToSpan) graph.RegisterStart(gnb, traces.ReaderProvider) graph.RegisterMiddle(gnb, transform.RoutesProvider) graph.RegisterMiddle(gnb, transform.KubeDecoratorProvider) @@ -131,16 +131,16 @@ func (i *Instrumenter) Run(ctx context.Context) { // argument in the functions below need to be a value. //nolint:gocritic -func (gb *graphFunctions) tracesReporterProvicer(ctx context.Context, config otel.TracesConfig) (node.TerminalFunc[[]transform.HTTPRequestSpan], error) { +func (gb *graphFunctions) tracesReporterProvicer(ctx context.Context, config otel.TracesConfig) (node.TerminalFunc[[]request.Span], error) { return otel.ReportTraces(ctx, &config, gb.ctxInfo) } //nolint:gocritic -func (gb *graphFunctions) metricsReporterProvider(ctx context.Context, config otel.MetricsConfig) (node.TerminalFunc[[]transform.HTTPRequestSpan], error) { +func (gb *graphFunctions) metricsReporterProvider(ctx context.Context, config otel.MetricsConfig) (node.TerminalFunc[[]request.Span], error) { return otel.ReportMetrics(ctx, &config, gb.ctxInfo) } //nolint:gocritic -func (gb *graphFunctions) prometheusProvider(ctx context.Context, config prom.PrometheusConfig) (node.TerminalFunc[[]transform.HTTPRequestSpan], error) { +func (gb *graphFunctions) prometheusProvider(ctx context.Context, config prom.PrometheusConfig) (node.TerminalFunc[[]request.Span], error) { return prom.PrometheusEndpoint(ctx, &config, gb.ctxInfo) } diff --git a/pkg/internal/pipe/instrumenter_test.go b/pkg/internal/pipe/instrumenter_test.go index ea586f4aa..4ac4a82f7 100644 --- a/pkg/internal/pipe/instrumenter_test.go +++ b/pkg/internal/pipe/instrumenter_test.go @@ -3,6 +3,7 @@ package pipe import ( "context" "os" + "strings" "testing" "time" @@ -14,11 +15,10 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" - ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" - "github.com/grafana/beyla/pkg/internal/ebpf/httpfltr" "github.com/grafana/beyla/pkg/internal/export/otel" "github.com/grafana/beyla/pkg/internal/imetrics" "github.com/grafana/beyla/pkg/internal/pipe/global" + "github.com/grafana/beyla/pkg/internal/request" "github.com/grafana/beyla/pkg/internal/testutil" "github.com/grafana/beyla/pkg/internal/traces" "github.com/grafana/beyla/pkg/internal/transform" @@ -42,10 +42,10 @@ func TestBasicPipeline(t *testing.T) { gb := newGraphBuilder(&Config{ Metrics: otel.MetricsConfig{MetricsEndpoint: tc.ServerEndpoint, ReportTarget: true, ReportPeerInfo: true}, - }, gctx(), make(<-chan []any)) + }, gctx(), make(<-chan []request.Span)) // Override eBPF tracer to send some fake data - graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]any], error) { - return func(_ context.Context, out chan<- []any) { + graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]request.Span], error) { + return func(_ context.Context, out chan<- []request.Span) { out <- newRequest(1, "GET", "/foo/bar", "1.1.1.1:3456", 404) }, nil }) @@ -78,10 +78,10 @@ func TestTracerPipeline(t *testing.T) { gb := newGraphBuilder(&Config{ Traces: otel.TracesConfig{TracesEndpoint: tc.ServerEndpoint, SamplingRatio: 1.0}, - }, gctx(), make(<-chan []any)) + }, gctx(), make(<-chan []request.Span)) // Override eBPF tracer to send some fake data - graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]any], error) { - return func(_ context.Context, out chan<- []any) { + graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]request.Span], error) { + return func(_ context.Context, out chan<- []request.Span) { out <- newRequest(1, "GET", "/foo/bar", "1.1.1.1:3456", 404) }, nil }) @@ -108,10 +108,10 @@ func TestRouteConsolidation(t *testing.T) { gb := newGraphBuilder(&Config{ Metrics: otel.MetricsConfig{MetricsEndpoint: tc.ServerEndpoint}, // ReportPeerInfo = false, no peer info Routes: &transform.RoutesConfig{Patterns: []string{"/user/{id}", "/products/{id}/push"}}, - }, gctx(), make(<-chan []any)) + }, gctx(), make(<-chan []request.Span)) // Override eBPF tracer to send some fake data - graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]any], error) { - return func(_ context.Context, out chan<- []any) { + graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]request.Span], error) { + return func(_ context.Context, out chan<- []request.Span) { out <- newRequest(1, "GET", "/user/1234", "1.1.1.1:3456", 200) out <- newRequest(2, "GET", "/products/3210/push", "1.1.1.1:3456", 200) out <- newRequest(3, "GET", "/attach", "1.1.1.1:3456", 200) // undefined route: won't report as route @@ -172,10 +172,10 @@ func TestGRPCPipeline(t *testing.T) { gb := newGraphBuilder(&Config{ Metrics: otel.MetricsConfig{MetricsEndpoint: tc.ServerEndpoint, ReportTarget: true, ReportPeerInfo: true}, - }, gctx(), make(<-chan []any)) + }, gctx(), make(<-chan []request.Span)) // Override eBPF tracer to send some fake data - graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]any], error) { - return func(_ context.Context, out chan<- []any) { + graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]request.Span], error) { + return func(_ context.Context, out chan<- []request.Span) { out <- newGRPCRequest(1, "/foo/bar", 3) }, nil }) @@ -207,10 +207,10 @@ func TestTraceGRPCPipeline(t *testing.T) { gb := newGraphBuilder(&Config{ Traces: otel.TracesConfig{TracesEndpoint: tc.ServerEndpoint, SamplingRatio: 1.0}, - }, gctx(), make(<-chan []any)) + }, gctx(), make(<-chan []request.Span)) // Override eBPF tracer to send some fake data - graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]any], error) { - return func(_ context.Context, out chan<- []any) { + graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]request.Span], error) { + return func(_ context.Context, out chan<- []request.Span) { out <- newGRPCRequest(1, "foo.bar", 3) }, nil }) @@ -236,19 +236,19 @@ func TestNestedSpanMatching(t *testing.T) { gb := newGraphBuilder(&Config{ Traces: otel.TracesConfig{TracesEndpoint: tc.ServerEndpoint, SamplingRatio: 1.0}, - }, gctx(), make(<-chan []any)) + }, gctx(), make(<-chan []request.Span)) // Override eBPF tracer to send some fake data with nested client span - graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]any], error) { - return func(_ context.Context, out chan<- []any) { - out <- newRequestWithTiming(1, transform.EventTypeHTTPClient, "GET", "/attach", "2.2.2.2:1234", 200, 60000, 60000, 70000) - out <- newRequestWithTiming(1, transform.EventTypeHTTP, "GET", "/user/1234", "1.1.1.1:3456", 200, 10000, 10000, 50000) - out <- newRequestWithTiming(3, transform.EventTypeHTTPClient, "GET", "/products/3210/pull", "2.2.2.2:3456", 204, 80000, 80000, 90000) - out <- newRequestWithTiming(3, transform.EventTypeHTTPClient, "GET", "/products/3211/pull", "2.2.2.2:3456", 203, 80000, 80000, 90000) - out <- newRequestWithTiming(2, transform.EventTypeHTTP, "GET", "/products/3210/push", "1.1.1.1:3456", 200, 10000, 20000, 50000) - out <- newRequestWithTiming(3, transform.EventTypeHTTP, "GET", "/attach", "1.1.1.1:3456", 200, 70000, 80000, 100000) - out <- newRequestWithTiming(1, transform.EventTypeHTTPClient, "GET", "/attach2", "2.2.2.2:1234", 200, 30000, 30000, 40000) - out <- newRequestWithTiming(0, transform.EventTypeHTTPClient, "GET", "/attach1", "2.2.2.2:1234", 200, 20000, 20000, 40000) - out <- newRequestWithTiming(1, transform.EventTypeHTTP, "GET", "/user/3456", "1.1.1.1:3456", 200, 56000, 56000, 80000) + graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]request.Span], error) { + return func(_ context.Context, out chan<- []request.Span) { + out <- newRequestWithTiming(1, request.EventTypeHTTPClient, "GET", "/attach", "2.2.2.2:1234", 200, 60000, 60000, 70000) + out <- newRequestWithTiming(1, request.EventTypeHTTP, "GET", "/user/1234", "1.1.1.1:3456", 200, 10000, 10000, 50000) + out <- newRequestWithTiming(3, request.EventTypeHTTPClient, "GET", "/products/3210/pull", "2.2.2.2:3456", 204, 80000, 80000, 90000) + out <- newRequestWithTiming(3, request.EventTypeHTTPClient, "GET", "/products/3211/pull", "2.2.2.2:3456", 203, 80000, 80000, 90000) + out <- newRequestWithTiming(2, request.EventTypeHTTP, "GET", "/products/3210/push", "1.1.1.1:3456", 200, 10000, 20000, 50000) + out <- newRequestWithTiming(3, request.EventTypeHTTP, "GET", "/attach", "1.1.1.1:3456", 200, 70000, 80000, 100000) + out <- newRequestWithTiming(1, request.EventTypeHTTPClient, "GET", "/attach2", "2.2.2.2:1234", 200, 30000, 30000, 40000) + out <- newRequestWithTiming(0, request.EventTypeHTTPClient, "GET", "/attach1", "2.2.2.2:1234", 200, 20000, 20000, 40000) + out <- newRequestWithTiming(1, request.EventTypeHTTP, "GET", "/user/3456", "1.1.1.1:3456", 200, 56000, 56000, 80000) }, nil }) pipe, err := gb.buildGraph(ctx) @@ -308,51 +308,51 @@ func TestNestedSpanMatching(t *testing.T) { assert.Equal(t, parent1ID, event.Attributes["parent_span_id"]) } -func newRequest(id uint64, method, path, peer string, status int) []any { - rt := ebpfcommon.HTTPRequestTrace{} - copy(rt.Path[:], path) - copy(rt.Method[:], method) - copy(rt.RemoteAddr[:], peer) - copy(rt.Host[:], getHostname()+":8080") - rt.Status = uint16(status) - rt.Type = uint8(transform.EventTypeHTTP) - rt.Id = id - rt.GoStartMonotimeNs = 1 - rt.StartMonotimeNs = 2 - rt.EndMonotimeNs = 3 - return []any{rt} +func newRequest(id uint64, method, path, peer string, status int) []request.Span { + return []request.Span{{ + Path: path, + Method: method, + Peer: strings.Split(peer, ":")[0], + Host: getHostname(), + HostPort: 8080, + Status: status, + Type: request.EventTypeHTTP, + ID: id, + Start: 2, + RequestStart: 1, + End: 3, + }} } -func newRequestWithTiming(id uint64, kind transform.EventType, method, path, peer string, status int, goStart, start, end uint64) []any { - rt := ebpfcommon.HTTPRequestTrace{} - copy(rt.Path[:], path) - copy(rt.Method[:], method) - copy(rt.RemoteAddr[:], peer) - copy(rt.Host[:], getHostname()+":8080") - rt.Status = uint16(status) - rt.Type = uint8(kind) - rt.Id = id - rt.GoStartMonotimeNs = goStart - rt.StartMonotimeNs = start - rt.EndMonotimeNs = end - return []any{rt} +func newRequestWithTiming(id uint64, kind request.EventType, method, path, peer string, status int, goStart, start, end uint64) []request.Span { + return []request.Span{{ + Path: path, + Method: method, + Peer: strings.Split(peer, ":")[0], + Host: getHostname(), + HostPort: 8080, + Type: kind, + Status: status, + ID: id, + RequestStart: int64(goStart), + Start: int64(start), + End: int64(end), + }} } -func newGRPCRequest(id uint64, path string, status int) []any { - rt := ebpfcommon.HTTPRequestTrace{} - copy(rt.Path[:], path) - copy(rt.RemoteAddr[:], []byte{0x1, 0x1, 0x1, 0x1}) - rt.RemoteAddrLen = 4 - copy(rt.Host[:], []byte{0x7f, 0x0, 0x0, 0x1}) - rt.HostLen = 4 - rt.HostPort = 8080 - rt.Status = uint16(status) - rt.Type = uint8(transform.EventTypeGRPC) - rt.Id = id - rt.GoStartMonotimeNs = 1 - rt.StartMonotimeNs = 2 - rt.EndMonotimeNs = 3 - return []any{rt} +func newGRPCRequest(id uint64, path string, status int) []request.Span { + return []request.Span{{ + Path: path, + Peer: "1.1.1.1", + Host: "127.0.0.1", + HostPort: 8080, + Status: status, + Type: request.EventTypeGRPC, + ID: id, + Start: 2, + RequestStart: 1, + End: 3, + }} } func getHostname() string { @@ -432,21 +432,20 @@ func matchNestedEvent(t *testing.T, name, method, target, status string, kind pt assert.Equal(t, kind, event.Kind) } -func newHTTPInfo(method, path, peer string, status int) []any { - var i httpfltr.HTTPInfo - i.Type = 1 - i.Method = method - i.Peer = peer - i.URL = path - i.Host = getHostname() - i.ConnInfo.D_port = uint16(8080) - i.ConnInfo.S_port = uint16(12345) - i.Status = uint16(status) - i.StartMonotimeNs = 2 - i.EndMonotimeNs = 3 - i.Comm = "comm" - - return []any{i} +func newHTTPInfo(method, path, peer string, status int) []request.Span { + return []request.Span{{ + Type: 1, + Method: method, + Peer: peer, + Path: path, + Host: getHostname(), + HostPort: 8080, + Status: status, + Start: 2, + RequestStart: 2, + End: 3, + ServiceName: "comm", + }} } func matchInfoEvent(t *testing.T, name string, event collector.TraceRecord) { @@ -477,10 +476,10 @@ func TestBasicPipelineInfo(t *testing.T) { gb := newGraphBuilder(&Config{ Metrics: otel.MetricsConfig{MetricsEndpoint: tc.ServerEndpoint, ReportTarget: true, ReportPeerInfo: true}, - }, gctx(), make(<-chan []any)) + }, gctx(), make(<-chan []request.Span)) // Override eBPF tracer to send some fake data - graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]any], error) { - return func(_ context.Context, out chan<- []any) { + graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]request.Span], error) { + return func(_ context.Context, out chan<- []request.Span) { out <- newHTTPInfo("PATCH", "/aaa/bbb", "1.1.1.1", 204) }, nil }) @@ -513,10 +512,10 @@ func TestTracerPipelineInfo(t *testing.T) { gb := newGraphBuilder(&Config{ Traces: otel.TracesConfig{TracesEndpoint: tc.ServerEndpoint, SamplingRatio: 1.0}, - }, gctx(), make(<-chan []any)) + }, gctx(), make(<-chan []request.Span)) // Override eBPF tracer to send some fake data - graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]any], error) { - return func(_ context.Context, out chan<- []any) { + graph.RegisterStart(gb.builder, func(_ context.Context, _ traces.Reader) (node.StartFuncCtx[[]request.Span], error) { + return func(_ context.Context, out chan<- []request.Span) { out <- newHTTPInfo("PATCH", "/aaa/bbb", "1.1.1.1", 204) }, nil }) diff --git a/pkg/internal/request/span.go b/pkg/internal/request/span.go new file mode 100644 index 000000000..ecf2823c0 --- /dev/null +++ b/pkg/internal/request/span.go @@ -0,0 +1,73 @@ +package request + +import ( + "time" + + "github.com/gavv/monotime" +) + +type EventType int + +const ( + EventTypeHTTP EventType = iota + 1 + EventTypeGRPC + EventTypeHTTPClient + EventTypeGRPCClient +) + +type converter struct { + clock func() time.Time + monoClock func() time.Duration +} + +var clocks = converter{monoClock: monotime.Now, clock: time.Now} + +// Span contains the information being submitted by the following nodes in the graph. +// It enables comfortable handling of data from Go. +type Span struct { + Type EventType + ID uint64 + Method string + Path string + Route string + Peer string + Host string + HostPort int + Status int + ContentLength int64 + RequestStart int64 + Start int64 + End int64 + ServiceName string + Metadata []MetadataTag + TraceID string +} + +type MetadataTag struct { + Key string + Val string +} + +func (s *Span) Inside(parent *Span) bool { + return s.RequestStart >= parent.RequestStart && s.End <= parent.End +} + +type Timings struct { + RequestStart time.Time + Start time.Time + End time.Time +} + +func (s *Span) Timings() Timings { + now := clocks.clock() + monoNow := clocks.monoClock() + startDelta := monoNow - time.Duration(s.Start) + endDelta := monoNow - time.Duration(s.End) + goStartDelta := monoNow - time.Duration(s.RequestStart) + + return Timings{ + RequestStart: now.Add(-goStartDelta), + Start: now.Add(-startDelta), + End: now.Add(-endDelta), + } +} diff --git a/pkg/internal/traces/reader.go b/pkg/internal/traces/reader.go index 4a5a74530..10ff6db30 100644 --- a/pkg/internal/traces/reader.go +++ b/pkg/internal/traces/reader.go @@ -4,19 +4,33 @@ import ( "context" "github.com/mariomac/pipes/pkg/node" + "golang.org/x/exp/slog" + + "github.com/grafana/beyla/pkg/internal/request" ) +func rlog() *slog.Logger { + return slog.With("component", "traces.Reader") +} + // Reader is the input node of the processing graph. The eBPF tracers will send their // traces to the Reader's TracesInput, and the Reader will forward them to the next // pipeline stage type Reader struct { - TracesInput <-chan []any + TracesInput <-chan []request.Span } -func ReaderProvider(_ context.Context, r Reader) (node.StartFuncCtx[[]any], error) { - return func(ctx context.Context, out chan<- []any) { - for trace := range r.TracesInput { - out <- trace +func ReaderProvider(_ context.Context, r Reader) (node.StartFuncCtx[[]request.Span], error) { + return func(ctx context.Context, out chan<- []request.Span) { + cancelChan := ctx.Done() + for { + select { + case trace := <-r.TracesInput: + out <- trace + case <-cancelChan: + rlog().Debug("context canceled. Exiting traces input loop") + return + } } }, nil } diff --git a/pkg/internal/transform/k8s.go b/pkg/internal/transform/k8s.go index 80ee01113..4dbaf4d6b 100644 --- a/pkg/internal/transform/k8s.go +++ b/pkg/internal/transform/k8s.go @@ -10,6 +10,7 @@ import ( "github.com/mariomac/pipes/pkg/node" "golang.org/x/exp/slog" + "github.com/grafana/beyla/pkg/internal/request" "github.com/grafana/beyla/pkg/internal/transform/kube" ) @@ -54,12 +55,12 @@ func (d KubernetesDecorator) Enabled() bool { } } -func KubeDecoratorProvider(_ context.Context, cfg KubernetesDecorator) (node.MiddleFunc[[]HTTPRequestSpan, []HTTPRequestSpan], error) { +func KubeDecoratorProvider(_ context.Context, cfg KubernetesDecorator) (node.MiddleFunc[[]request.Span, []request.Span], error) { decorator, err := newMetadataDecorator(&cfg) if err != nil { return nil, fmt.Errorf("instantiating kubernetes metadata decorator: %w", err) } - return func(in <-chan []HTTPRequestSpan, out chan<- []HTTPRequestSpan) { + return func(in <-chan []request.Span, out chan<- []request.Span) { decorator.refreshOwnPodMetadata() klog().Debug("starting kubernetes decoration loop") @@ -78,8 +79,8 @@ type metadataDecorator struct { kube kube.Metadata cfg *KubernetesDecorator - ownMetadataAsSrc []MetadataTag - ownMetadataAsDst []MetadataTag + ownMetadataAsSrc []request.MetadataTag + ownMetadataAsDst []request.MetadataTag } func newMetadataDecorator(cfg *KubernetesDecorator) (*metadataDecorator, error) { @@ -90,7 +91,7 @@ func newMetadataDecorator(cfg *KubernetesDecorator) (*metadataDecorator, error) return dec, nil } -func (md *metadataDecorator) do(span *HTTPRequestSpan) { +func (md *metadataDecorator) do(span *request.Span) { // We decorate each trace by looking up into the local kubernetes cache for the // Peer address, when we are instrumenting server-side traces, or the // Host name, when we are instrumenting client-side traces. @@ -98,12 +99,12 @@ func (md *metadataDecorator) do(span *HTTPRequestSpan) { // changes the way it works. // Extensive integration test cases are provided as a safeguard. switch span.Type { - case EventTypeGRPC, EventTypeHTTP: + case request.EventTypeGRPC, request.EventTypeHTTP: if peerInfo, ok := md.kube.GetInfo(span.Peer); ok { span.Metadata = appendSRCMetadata(span.Metadata, peerInfo) } span.Metadata = append(span.Metadata, md.ownMetadataAsDst...) - case EventTypeGRPCClient, EventTypeHTTPClient: + case request.EventTypeGRPCClient, request.EventTypeHTTPClient: if peerInfo, ok := md.kube.GetInfo(span.Host); ok { span.Metadata = appendDSTMetadata(span.Metadata, peerInfo) } @@ -113,18 +114,18 @@ func (md *metadataDecorator) do(span *HTTPRequestSpan) { // TODO: allow users to filter which attributes they want, instead of adding all of them // TODO: cache -func appendDSTMetadata(dst []MetadataTag, info *kube.Info) []MetadataTag { +func appendDSTMetadata(dst []request.MetadataTag, info *kube.Info) []request.MetadataTag { return append(dst, - MetadataTag{Key: "k8s.dst.namespace", Val: info.Namespace}, - MetadataTag{Key: "k8s.dst.name", Val: info.Name}, - MetadataTag{Key: "k8s.dst.type", Val: info.Type}, + request.MetadataTag{Key: "k8s.dst.namespace", Val: info.Namespace}, + request.MetadataTag{Key: "k8s.dst.name", Val: info.Name}, + request.MetadataTag{Key: "k8s.dst.type", Val: info.Type}, ) } -func appendSRCMetadata(dst []MetadataTag, info *kube.Info) []MetadataTag { +func appendSRCMetadata(dst []request.MetadataTag, info *kube.Info) []request.MetadataTag { return append(dst, - MetadataTag{Key: "k8s.src.namespace", Val: info.Namespace}, - MetadataTag{Key: "k8s.src.name", Val: info.Name}, + request.MetadataTag{Key: "k8s.src.namespace", Val: info.Namespace}, + request.MetadataTag{Key: "k8s.src.name", Val: info.Name}, ) } diff --git a/pkg/internal/transform/routes.go b/pkg/internal/transform/routes.go index 88c6a557f..9d338d04b 100644 --- a/pkg/internal/transform/routes.go +++ b/pkg/internal/transform/routes.go @@ -7,6 +7,7 @@ import ( "github.com/mariomac/pipes/pkg/node" "golang.org/x/exp/slog" + "github.com/grafana/beyla/pkg/internal/request" "github.com/grafana/beyla/pkg/internal/transform/route" ) @@ -34,9 +35,9 @@ type RoutesConfig struct { Patterns []string `yaml:"patterns"` } -func RoutesProvider(_ context.Context, rc *RoutesConfig) (node.MiddleFunc[[]HTTPRequestSpan, []HTTPRequestSpan], error) { +func RoutesProvider(_ context.Context, rc *RoutesConfig) (node.MiddleFunc[[]request.Span, []request.Span], error) { // set default value for Unmatch action - var unmatchAction func(span *HTTPRequestSpan) + var unmatchAction func(span *request.Span) switch rc.Unmatch { case UnmatchWildcard, "": // default unmatchAction = setUnmatchToWildcard @@ -51,7 +52,7 @@ func RoutesProvider(_ context.Context, rc *RoutesConfig) (node.MiddleFunc[[]HTTP unmatchAction = setUnmatchToWildcard } matcher := route.NewMatcher(rc.Patterns) - return func(in <-chan []HTTPRequestSpan, out chan<- []HTTPRequestSpan) { + return func(in <-chan []request.Span, out chan<- []request.Span) { for spans := range in { for i := range spans { spans[i].Route = matcher.Find(spans[i].Path) @@ -62,15 +63,15 @@ func RoutesProvider(_ context.Context, rc *RoutesConfig) (node.MiddleFunc[[]HTTP }, nil } -func leaveUnmatchEmpty(_ *HTTPRequestSpan) {} +func leaveUnmatchEmpty(_ *request.Span) {} -func setUnmatchToWildcard(str *HTTPRequestSpan) { +func setUnmatchToWildcard(str *request.Span) { if str.Route == "" { str.Route = wildCard } } -func setUnmatchToPath(str *HTTPRequestSpan) { +func setUnmatchToPath(str *request.Span) { if str.Route == "" { str.Route = str.Path } diff --git a/pkg/internal/transform/routes_test.go b/pkg/internal/transform/routes_test.go index 767a3f906..12fc76aaa 100644 --- a/pkg/internal/transform/routes_test.go +++ b/pkg/internal/transform/routes_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/beyla/pkg/internal/request" "github.com/grafana/beyla/pkg/internal/testutil" ) @@ -18,16 +19,16 @@ func TestUnmatchedWildcard(t *testing.T) { t.Run(string(tc), func(t *testing.T) { router, err := RoutesProvider(context.TODO(), &RoutesConfig{Unmatch: tc, Patterns: []string{"/user/:id"}}) require.NoError(t, err) - in, out := make(chan []HTTPRequestSpan, 10), make(chan []HTTPRequestSpan, 10) + in, out := make(chan []request.Span, 10), make(chan []request.Span, 10) defer close(in) go router(in, out) - in <- []HTTPRequestSpan{{Path: "/user/1234"}} - assert.Equal(t, []HTTPRequestSpan{{ + in <- []request.Span{{Path: "/user/1234"}} + assert.Equal(t, []request.Span{{ Path: "/user/1234", Route: "/user/:id", }}, testutil.ReadChannel(t, out, testTimeout)) - in <- []HTTPRequestSpan{{Path: "/some/path"}} - assert.Equal(t, []HTTPRequestSpan{{ + in <- []request.Span{{Path: "/some/path"}} + assert.Equal(t, []request.Span{{ Path: "/some/path", Route: "*", }}, testutil.ReadChannel(t, out, testTimeout)) @@ -38,16 +39,16 @@ func TestUnmatchedWildcard(t *testing.T) { func TestUnmatchedPath(t *testing.T) { router, err := RoutesProvider(context.TODO(), &RoutesConfig{Unmatch: UnmatchPath, Patterns: []string{"/user/:id"}}) require.NoError(t, err) - in, out := make(chan []HTTPRequestSpan, 10), make(chan []HTTPRequestSpan, 10) + in, out := make(chan []request.Span, 10), make(chan []request.Span, 10) defer close(in) go router(in, out) - in <- []HTTPRequestSpan{{Path: "/user/1234"}} - assert.Equal(t, []HTTPRequestSpan{{ + in <- []request.Span{{Path: "/user/1234"}} + assert.Equal(t, []request.Span{{ Path: "/user/1234", Route: "/user/:id", }}, testutil.ReadChannel(t, out, testTimeout)) - in <- []HTTPRequestSpan{{Path: "/some/path"}} - assert.Equal(t, []HTTPRequestSpan{{ + in <- []request.Span{{Path: "/some/path"}} + assert.Equal(t, []request.Span{{ Path: "/some/path", Route: "/some/path", }}, testutil.ReadChannel(t, out, testTimeout)) @@ -56,16 +57,16 @@ func TestUnmatchedPath(t *testing.T) { func TestUnmatchedEmpty(t *testing.T) { router, err := RoutesProvider(context.TODO(), &RoutesConfig{Unmatch: UnmatchUnset, Patterns: []string{"/user/:id"}}) require.NoError(t, err) - in, out := make(chan []HTTPRequestSpan, 10), make(chan []HTTPRequestSpan, 10) + in, out := make(chan []request.Span, 10), make(chan []request.Span, 10) defer close(in) go router(in, out) - in <- []HTTPRequestSpan{{Path: "/user/1234"}} - assert.Equal(t, []HTTPRequestSpan{{ + in <- []request.Span{{Path: "/user/1234"}} + assert.Equal(t, []request.Span{{ Path: "/user/1234", Route: "/user/:id", }}, testutil.ReadChannel(t, out, testTimeout)) - in <- []HTTPRequestSpan{{Path: "/some/path"}} - assert.Equal(t, []HTTPRequestSpan{{ + in <- []request.Span{{Path: "/some/path"}} + assert.Equal(t, []request.Span{{ Path: "/some/path", }}, testutil.ReadChannel(t, out, testTimeout)) } diff --git a/pkg/internal/transform/spanner.go b/pkg/internal/transform/spanner.go deleted file mode 100644 index 29fbf0eca..000000000 --- a/pkg/internal/transform/spanner.go +++ /dev/null @@ -1,218 +0,0 @@ -package transform - -import ( - "bytes" - "net" - "strconv" - "strings" - "time" - - "github.com/gavv/monotime" - "golang.org/x/exp/slog" - - ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" - httpfltr "github.com/grafana/beyla/pkg/internal/ebpf/httpfltr" -) - -type EventType int - -const ( - EventTypeHTTP EventType = iota + 1 - EventTypeGRPC - EventTypeHTTPClient - EventTypeGRPCClient -) - -var log = slog.With("component", "goexec.spanner") - -type converter struct { - clock func() time.Time - monoClock func() time.Duration -} - -var clocks = converter{monoClock: monotime.Now, clock: time.Now} - -// HTTPRequestSpan contains the information being submitted by the following nodes in the graph. -// It enables comfortable handling of data from Go. -type HTTPRequestSpan struct { - Type EventType - ID uint64 - Method string - Path string - Route string - Peer string - Host string - HostPort int - Status int - ContentLength int64 - RequestStart int64 - Start int64 - End int64 - ServiceName string - Metadata []MetadataTag - TraceID string -} - -type MetadataTag struct { - Key string - Val string -} - -func ConvertToSpan(in <-chan []any, out chan<- []HTTPRequestSpan) { - for traces := range in { - spans := make([]HTTPRequestSpan, 0, len(traces)) - for i := range traces { - v := traces[i] - - switch t := v.(type) { - case ebpfcommon.HTTPRequestTrace: - httpTrace := t - spans = append(spans, convertFromHTTPTrace(&httpTrace)) - case httpfltr.HTTPInfo: - info := t - spans = append(spans, convertFromHTTPInfo(&info)) - } - } - out <- spans - } -} - -func extractHostPort(b []uint8) (string, int) { - addrLen := bytes.IndexByte(b, 0) - if addrLen < 0 { - addrLen = len(b) - } - - peer := "" - peerPort := 0 - - if addrLen > 0 { - addr := string(b[:addrLen]) - ip, port, err := net.SplitHostPort(addr) - if err != nil { - peer = addr - } else { - peer = ip - peerPort, _ = strconv.Atoi(port) - } - } - - return peer, peerPort -} - -func extractIP(b []uint8, size int) string { - if size > len(b) { - size = len(b) - } - return net.IP(b[:size]).String() -} - -func extractTraceID(traceparent [55]byte) string { - // If traceparent was not set in eBPF, entire field should be zeroed bytes. - if traceparent[0] == 0 { - return "" - } - - // It is assumed that eBPF code has already verified the length is exactly 55 - // See https://www.w3.org/TR/trace-context/#traceparent-header-field-values for format. - // 2 hex version + dash + 32 hex traceID + dash + 16 hex parent + dash + 2 hex flags - return string(traceparent[3:35]) -} - -func (s *HTTPRequestSpan) Inside(parent *HTTPRequestSpan) bool { - return s.RequestStart >= parent.RequestStart && s.End <= parent.End -} - -type Timings struct { - RequestStart time.Time - Start time.Time - End time.Time -} - -func (s *HTTPRequestSpan) Timings() Timings { - now := clocks.clock() - monoNow := clocks.monoClock() - startDelta := monoNow - time.Duration(s.Start) - endDelta := monoNow - time.Duration(s.End) - goStartDelta := monoNow - time.Duration(s.RequestStart) - - return Timings{ - RequestStart: now.Add(-goStartDelta), - Start: now.Add(-startDelta), - End: now.Add(-endDelta), - } -} - -func convertFromHTTPTrace(trace *ebpfcommon.HTTPRequestTrace) HTTPRequestSpan { - // From C, assuming 0-ended strings - methodLen := bytes.IndexByte(trace.Method[:], 0) - if methodLen < 0 { - methodLen = len(trace.Method) - } - pathLen := bytes.IndexByte(trace.Path[:], 0) - if pathLen < 0 { - pathLen = len(trace.Path) - } - - peer := "" - hostname := "" - hostPort := 0 - traceID := "" - - switch EventType(trace.Type) { - case EventTypeHTTPClient, EventTypeHTTP: - peer, _ = extractHostPort(trace.RemoteAddr[:]) - hostname, hostPort = extractHostPort(trace.Host[:]) - traceID = extractTraceID(trace.Traceparent) - case EventTypeGRPC: - hostPort = int(trace.HostPort) - peer = extractIP(trace.RemoteAddr[:], int(trace.RemoteAddrLen)) - hostname = extractIP(trace.Host[:], int(trace.HostLen)) - case EventTypeGRPCClient: - hostname, hostPort = extractHostPort(trace.Host[:]) - default: - log.Warn("unknown trace type %d", trace.Type) - } - - return HTTPRequestSpan{ - Type: EventType(trace.Type), - ID: trace.Id, - Method: string(trace.Method[:methodLen]), - Path: string(trace.Path[:pathLen]), - Peer: peer, - Host: hostname, - HostPort: hostPort, - ContentLength: trace.ContentLength, - RequestStart: int64(trace.GoStartMonotimeNs), - Start: int64(trace.StartMonotimeNs), - End: int64(trace.EndMonotimeNs), - Status: int(trace.Status), - TraceID: traceID, - } -} - -func removeQuery(url string) string { - idx := strings.IndexByte(url, '?') - if idx > 0 { - return url[:idx] - } - return url -} - -func convertFromHTTPInfo(info *httpfltr.HTTPInfo) HTTPRequestSpan { - return HTTPRequestSpan{ - Type: EventType(info.Type), - ID: 0, - Method: info.Method, - Path: removeQuery(info.URL), - Peer: info.Peer, - Host: info.Host, - HostPort: int(info.ConnInfo.D_port), - ContentLength: int64(info.Len), - RequestStart: int64(info.StartMonotimeNs), - Start: int64(info.StartMonotimeNs), - End: int64(info.EndMonotimeNs), - Status: int(info.Status), - ServiceName: info.Comm, - } -}