diff --git a/bpf/http_maps.h b/bpf/http_maps.h new file mode 100644 index 000000000..46865f45c --- /dev/null +++ b/bpf/http_maps.h @@ -0,0 +1,25 @@ +#ifndef HTTP_MAPS_H +#define HTTP_MAPS_H + +#include "vmlinux.h" +#include "bpf_helpers.h" +#include "http_types.h" + +// Keeps track of the ongoing http connections we match for request/response +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __type(key, pid_connection_info_t); + __type(value, http_info_t); + __uint(max_entries, MAX_CONCURRENT_SHARED_REQUESTS); + __uint(pinning, BEYLA_PIN_INTERNAL); +} ongoing_http SEC(".maps"); + +struct { + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __type(key, connection_info_t); + __type(value, http_info_t); + __uint(max_entries, 1024); + __uint(pinning, BEYLA_PIN_INTERNAL); +} ongoing_http_fallback SEC(".maps"); + +#endif diff --git a/bpf/http_ssl_defs.h b/bpf/http_ssl_defs.h index bddc1999a..ddd2453d6 100644 --- a/bpf/http_ssl_defs.h +++ b/bpf/http_ssl_defs.h @@ -6,7 +6,6 @@ #include "http_types.h" #include "k_tracer_defs.h" #include "bpf_dbg.h" -#include "pid.h" #include "sockaddr.h" #include "tcp_info.h" #include "pin_internal.h" diff --git a/bpf/k_tracer.h b/bpf/k_tracer.h index 6d31c44ad..d393fb5ca 100644 --- a/bpf/k_tracer.h +++ b/bpf/k_tracer.h @@ -7,7 +7,6 @@ #include "tcp_info.h" #include "k_tracer_defs.h" #include "http_ssl_defs.h" -#include "tc_ip.h" #include "pin_internal.h" // Temporary tracking of accept arguments @@ -691,91 +690,3 @@ int BPF_KPROBE(kprobe_sys_exit, int status) { return 0; } - -SEC("tc_ingress") -int app_ingress(struct __sk_buff *skb) { - //bpf_printk("ingress"); - - protocol_info_t tcp = {}; - connection_info_t conn = {}; - - if (!read_sk_buff(skb, &tcp, &conn)) { - return 0; - } - - if (tcp_ack(&tcp)) { // ack field must be set, which means we are looking at non SYN packet - // assumes we are the only ones that added options, this can be improved - if (tcp.h_proto == ETH_P_IP && tcp.ip_len == MIN_IP_LEN + MAX_TC_TP_LEN) { - parse_ip_options_ipv4(skb, &conn, &tcp); - } else if (tcp.h_proto == ETH_P_IPV6 && - tcp.l4_proto == IP_V6_DEST_OPTS) { // Destination options used - parse_ip_options_ipv6(skb, &conn, &tcp); - } - } - - return 0; -} - -static __always_inline void -update_outgoing_request_span_id(connection_info_t *conn, protocol_info_t *tcp, tp_info_pid_t *tp) { - pid_connection_info_t p_conn = {}; - __builtin_memcpy(&p_conn.conn, conn, sizeof(connection_info_t)); - p_conn.pid = tp->pid; - - http_info_t *h_info = bpf_map_lookup_elem(&ongoing_http, &p_conn); - if (h_info && tp->valid) { - bpf_printk("Found HTTP info, resetting the span id to %x%x", tcp->seq, tcp->ack); - *((u32 *)(&h_info->tp.span_id[0])) = tcp->seq; - *((u32 *)(&h_info->tp.span_id[4])) = tcp->ack; - } -} - -static __always_inline void encode_data_in_ip_options(struct __sk_buff *skb, - connection_info_t *conn, - protocol_info_t *tcp, - tp_info_pid_t *tp) { - // Handling IPv4 - // We only do this if the IP header doesn't have any options, this can be improved if needed - if (tcp->h_proto == ETH_P_IP && tcp->ip_len == MIN_IP_LEN) { - bpf_printk("Adding the trace_id in the IP Options"); - - if (inject_tc_ip_options_ipv4(skb, conn, tcp, tp)) { - update_outgoing_request_span_id(conn, tcp, tp); - } - - bpf_map_delete_elem(&outgoing_trace_map, conn); - } else if (tcp->h_proto == ETH_P_IPV6 && tcp->l4_proto == IPPROTO_TCP) { // Handling IPv6 - bpf_printk("Found IPv6 header"); - - if (inject_tc_ip_options_ipv6(skb, conn, tcp, tp)) { - update_outgoing_request_span_id(conn, tcp, tp); - } - - bpf_map_delete_elem(&outgoing_trace_map, conn); - } -} - -SEC("tc_egress") -int app_egress(struct __sk_buff *skb) { - //bpf_printk("egress"); - - protocol_info_t tcp = {}; - connection_info_t conn = {}; - - if (!read_sk_buff(skb, &tcp, &conn)) { - return 0; - } - - sort_connection_info(&conn); - - tp_info_pid_t *tp = bpf_map_lookup_elem(&outgoing_trace_map, &conn); - - if (tp) { - bpf_printk("egress flags %x, sequence %x", tcp.flags, tcp.seq); - print_http_connection_info(&conn); - - encode_data_in_ip_options(skb, &conn, &tcp, tp); - } - - return 0; -} diff --git a/bpf/protocol_common.h b/bpf/protocol_common.h index 234bf4805..8a6c71ad0 100644 --- a/bpf/protocol_common.h +++ b/bpf/protocol_common.h @@ -5,7 +5,6 @@ #include "bpf_helpers.h" #include "http_types.h" #include "ringbuf.h" -#include "pid.h" #include "bpf_dbg.h" #include "pin_internal.h" diff --git a/bpf/protocol_http.h b/bpf/protocol_http.h index 110050016..276656a95 100644 --- a/bpf/protocol_http.h +++ b/bpf/protocol_http.h @@ -5,11 +5,11 @@ #include "bpf_helpers.h" #include "http_types.h" #include "ringbuf.h" -#include "pid.h" #include "runtime.h" #include "protocol_common.h" #include "trace_common.h" #include "pin_internal.h" +#include "http_maps.h" volatile const u32 high_request_volume; @@ -22,23 +22,6 @@ struct { __uint(max_entries, 1); } http_info_mem SEC(".maps"); -// Keeps track of the ongoing http connections we match for request/response -struct { - __uint(type, BPF_MAP_TYPE_LRU_HASH); - __type(key, pid_connection_info_t); - __type(value, http_info_t); - __uint(max_entries, MAX_CONCURRENT_SHARED_REQUESTS); - __uint(pinning, BEYLA_PIN_INTERNAL); -} ongoing_http SEC(".maps"); - -struct { - __uint(type, BPF_MAP_TYPE_LRU_HASH); - __type(key, connection_info_t); - __type(value, http_info_t); - __uint(max_entries, 1024); - __uint(pinning, BEYLA_PIN_INTERNAL); -} ongoing_http_fallback SEC(".maps"); - // empty_http_info zeroes and return the unique percpu copy in the map // this function assumes that a given thread is not trying to use many // instances at the same time diff --git a/bpf/protocol_http2.h b/bpf/protocol_http2.h index fcab9a37b..0fa4afea8 100644 --- a/bpf/protocol_http2.h +++ b/bpf/protocol_http2.h @@ -5,7 +5,6 @@ #include "bpf_helpers.h" #include "http_types.h" #include "ringbuf.h" -#include "pid.h" #include "protocol_common.h" #include "http2_grpc.h" #include "pin_internal.h" diff --git a/bpf/protocol_tcp.h b/bpf/protocol_tcp.h index a72381014..9de0c40d0 100644 --- a/bpf/protocol_tcp.h +++ b/bpf/protocol_tcp.h @@ -5,7 +5,6 @@ #include "bpf_helpers.h" #include "http_types.h" #include "ringbuf.h" -#include "pid.h" #include "protocol_common.h" #include "trace_common.h" #include "pin_internal.h" diff --git a/bpf/tc_tracer.c b/bpf/tc_tracer.c new file mode 100644 index 000000000..38bac5473 --- /dev/null +++ b/bpf/tc_tracer.c @@ -0,0 +1,98 @@ +#include "vmlinux.h" +#include "bpf_helpers.h" +#include "bpf_dbg.h" + +#include "http_maps.h" +#include "http_types.h" +#include "tc_ip.h" +#include "tcp_info.h" + +char __license[] SEC("license") = "Dual MIT/GPL"; + +SEC("tc_ingress") +int app_ingress(struct __sk_buff *skb) { + //bpf_printk("ingress"); + + protocol_info_t tcp = {}; + connection_info_t conn = {}; + + if (!read_sk_buff(skb, &tcp, &conn)) { + return 0; + } + + if (tcp_ack(&tcp)) { // ack field must be set, which means we are looking at non SYN packet + // assumes we are the only ones that added options, this can be improved + if (tcp.h_proto == ETH_P_IP && tcp.ip_len == MIN_IP_LEN + MAX_TC_TP_LEN) { + parse_ip_options_ipv4(skb, &conn, &tcp); + } else if (tcp.h_proto == ETH_P_IPV6 && + tcp.l4_proto == IP_V6_DEST_OPTS) { // Destination options used + parse_ip_options_ipv6(skb, &conn, &tcp); + } + } + + return 0; +} + +static __always_inline void +update_outgoing_request_span_id(connection_info_t *conn, protocol_info_t *tcp, tp_info_pid_t *tp) { + pid_connection_info_t p_conn = {}; + __builtin_memcpy(&p_conn.conn, conn, sizeof(connection_info_t)); + p_conn.pid = tp->pid; + + http_info_t *h_info = bpf_map_lookup_elem(&ongoing_http, &p_conn); + if (h_info && tp->valid) { + bpf_printk("Found HTTP info, resetting the span id to %x%x", tcp->seq, tcp->ack); + *((u32 *)(&h_info->tp.span_id[0])) = tcp->seq; + *((u32 *)(&h_info->tp.span_id[4])) = tcp->ack; + } +} + +static __always_inline void encode_data_in_ip_options(struct __sk_buff *skb, + connection_info_t *conn, + protocol_info_t *tcp, + tp_info_pid_t *tp) { + // Handling IPv4 + // We only do this if the IP header doesn't have any options, this can be improved if needed + if (tcp->h_proto == ETH_P_IP && tcp->ip_len == MIN_IP_LEN) { + bpf_printk("Adding the trace_id in the IP Options"); + + if (inject_tc_ip_options_ipv4(skb, conn, tcp, tp)) { + update_outgoing_request_span_id(conn, tcp, tp); + } + + bpf_map_delete_elem(&outgoing_trace_map, conn); + } else if (tcp->h_proto == ETH_P_IPV6 && tcp->l4_proto == IPPROTO_TCP) { // Handling IPv6 + bpf_printk("Found IPv6 header"); + + if (inject_tc_ip_options_ipv6(skb, conn, tcp, tp)) { + update_outgoing_request_span_id(conn, tcp, tp); + } + + bpf_map_delete_elem(&outgoing_trace_map, conn); + } +} + +SEC("tc_egress") +int app_egress(struct __sk_buff *skb) { + //bpf_printk("egress"); + + protocol_info_t tcp = {}; + connection_info_t conn = {}; + + if (!read_sk_buff(skb, &tcp, &conn)) { + return 0; + } + + sort_connection_info(&conn); + + tp_info_pid_t *tp = bpf_map_lookup_elem(&outgoing_trace_map, &conn); + + if (tp) { + bpf_printk("egress flags %x, sequence %x", tcp.flags, tcp.seq); + print_http_connection_info(&conn); + + encode_data_in_ip_options(skb, &conn, &tcp, tp); + } + + return 0; +} diff --git a/pkg/beyla/os.go b/pkg/beyla/os.go index b6833ce82..af416fa2c 100644 --- a/pkg/beyla/os.go +++ b/pkg/beyla/os.go @@ -83,6 +83,10 @@ func checkCapabilitiesForSetOptions(config *Config, caps *helpers.OSCapabilities if config.Enabled(FeatureAppO11y) { testAndSet(caps, capError, unix.CAP_CHECKPOINT_RESTORE) testAndSet(caps, capError, unix.CAP_SYS_PTRACE) + + if config.EBPF.UseLinuxTC { + testAndSet(caps, capError, unix.CAP_NET_ADMIN) + } } if config.Enabled(FeatureNetO11y) { diff --git a/pkg/components/beyla.go b/pkg/components/beyla.go index bba274c8a..a12941ea5 100644 --- a/pkg/components/beyla.go +++ b/pkg/components/beyla.go @@ -60,8 +60,12 @@ func RunBeyla(ctx context.Context, cfg *beyla.Config) error { func setupAppO11y(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) error { slog.Info("starting Beyla in Application Observability mode") + + wg := sync.WaitGroup{} + defer wg.Wait() + instr := appolly.New(ctx, ctxInfo, config) - if err := instr.FindAndInstrument(); err != nil { + if err := instr.FindAndInstrument(&wg); err != nil { return fmt.Errorf("can't find target process: %w", err) } if err := instr.ReadAndForward(); err != nil { diff --git a/pkg/internal/appolly/appolly.go b/pkg/internal/appolly/appolly.go index d28b5c50f..4ec7e8b1f 100644 --- a/pkg/internal/appolly/appolly.go +++ b/pkg/internal/appolly/appolly.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "log/slog" + "sync" "github.com/grafana/beyla/pkg/beyla" "github.com/grafana/beyla/pkg/internal/discover" @@ -43,7 +44,7 @@ func New(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) // FindAndInstrument searches in background for any new executable matching the // selection criteria. -func (i *Instrumenter) FindAndInstrument() error { +func (i *Instrumenter) FindAndInstrument(wg *sync.WaitGroup) error { finder := discover.NewProcessFinder(i.ctx, i.config, i.ctxInfo, i.tracesInput) foundProcesses, deletedProcesses, err := finder.Start() if err != nil { @@ -51,7 +52,9 @@ func (i *Instrumenter) FindAndInstrument() error { } // In background, listen indefinitely for each new process and run its // associated ebpf.ProcessTracer once it is found. + wg.Add(1) go func() { + defer wg.Done() log := log() type cancelCtx struct { ctx context.Context @@ -76,7 +79,11 @@ func (i *Instrumenter) FindAndInstrument() error { cctx.ctx, cctx.cancel = context.WithCancel(i.ctx) contexts[pt.FileInfo.Ino] = cctx } - go pt.Tracer.Run(cctx.ctx, i.tracesInput) + wg.Add(1) + go func() { + defer wg.Done() + pt.Tracer.Run(cctx.ctx, i.tracesInput) + }() } case dp := <-deletedProcesses: log.Debug("stopping ProcessTracer because there are no more instances of such process", diff --git a/pkg/internal/discover/finder.go b/pkg/internal/discover/finder.go index d959a7344..456325c8a 100644 --- a/pkg/internal/discover/finder.go +++ b/pkg/internal/discover/finder.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/beyla/pkg/internal/ebpf" "github.com/grafana/beyla/pkg/internal/ebpf/generictracer" "github.com/grafana/beyla/pkg/internal/ebpf/gotracer" + "github.com/grafana/beyla/pkg/internal/ebpf/tctracer" "github.com/grafana/beyla/pkg/internal/imetrics" "github.com/grafana/beyla/pkg/internal/pipe/global" "github.com/grafana/beyla/pkg/internal/request" @@ -97,5 +98,9 @@ func newGoTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Trac } func newGenericTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer { + if cfg.EBPF.UseLinuxTC { + return []ebpf.Tracer{generictracer.New(cfg, metrics), tctracer.New(cfg)} + } + return []ebpf.Tracer{generictracer.New(cfg, metrics)} } diff --git a/pkg/internal/ebpf/common/ringbuf.go b/pkg/internal/ebpf/common/ringbuf.go index 99831b7bd..0ffd6a9b1 100644 --- a/pkg/internal/ebpf/common/ringbuf.go +++ b/pkg/internal/ebpf/common/ringbuf.go @@ -104,7 +104,7 @@ func (rbf *ringBufForwarder) sharedReadAndForward(ctx context.Context, closers [ rbf.spansLen = 0 // If the underlying context is closed, it closes the objects we have allocated for this bpf program - go rbf.bgListenSharedContextCancelation(ctx, closers) + go rbf.bgListenSharedContextCancelation(ctx, closers, eventsReader) rbf.readAndForwardInner(eventsReader, spansChan) } @@ -221,12 +221,13 @@ func (rbf *ringBufForwarder) bgListenContextCancelation(ctx context.Context, eve _ = eventsReader.Close() } -func (rbf *ringBufForwarder) bgListenSharedContextCancelation(ctx context.Context, closers []io.Closer) { +func (rbf *ringBufForwarder) bgListenSharedContextCancelation(ctx context.Context, closers []io.Closer, eventsReader ringBufReader) { <-ctx.Done() rbf.logger.Debug("context is cancelled. Closing eBPF resources") for _, c := range closers { _ = c.Close() } + _ = eventsReader.Close() } func (rbf *ringBufForwarder) closeAllResources() { diff --git a/pkg/internal/ebpf/generictracer/bpf_arm64_bpfel.go b/pkg/internal/ebpf/generictracer/bpf_arm64_bpfel.go index 6fb0016d8..2573268e6 100644 --- a/pkg/internal/ebpf/generictracer/bpf_arm64_bpfel.go +++ b/pkg/internal/ebpf/generictracer/bpf_arm64_bpfel.go @@ -238,8 +238,6 @@ type bpfSpecs struct { // // It can be passed ebpf.CollectionSpec.Assign. type bpfProgramSpecs struct { - AppEgress *ebpf.ProgramSpec `ebpf:"app_egress"` - AppIngress *ebpf.ProgramSpec `ebpf:"app_ingress"` AsyncReset *ebpf.ProgramSpec `ebpf:"async_reset"` AsyncResetRet *ebpf.ProgramSpec `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.ProgramSpec `ebpf:"emit_async_init"` @@ -426,8 +424,6 @@ func (m *bpfMaps) Close() error { // // It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. type bpfPrograms struct { - AppEgress *ebpf.Program `ebpf:"app_egress"` - AppIngress *ebpf.Program `ebpf:"app_ingress"` AsyncReset *ebpf.Program `ebpf:"async_reset"` AsyncResetRet *ebpf.Program `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.Program `ebpf:"emit_async_init"` @@ -463,8 +459,6 @@ type bpfPrograms struct { func (p *bpfPrograms) Close() error { return _BpfClose( - p.AppEgress, - p.AppIngress, p.AsyncReset, p.AsyncResetRet, p.EmitAsyncInit, diff --git a/pkg/internal/ebpf/generictracer/bpf_arm64_bpfel.o b/pkg/internal/ebpf/generictracer/bpf_arm64_bpfel.o index bb0c1e42f..db6322ad6 100644 --- a/pkg/internal/ebpf/generictracer/bpf_arm64_bpfel.o +++ b/pkg/internal/ebpf/generictracer/bpf_arm64_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:aa4015886560c0f21873c8734c6af04a466210811eb8b9088511c079b3a91cbb -size 503760 +oid sha256:c3a955e4580ca0c0bf747e6d8327371f8909e71ea239498bc4f111e952c840aa +size 466944 diff --git a/pkg/internal/ebpf/generictracer/bpf_debug_arm64_bpfel.go b/pkg/internal/ebpf/generictracer/bpf_debug_arm64_bpfel.go index cd308fb75..1639ea8c2 100644 --- a/pkg/internal/ebpf/generictracer/bpf_debug_arm64_bpfel.go +++ b/pkg/internal/ebpf/generictracer/bpf_debug_arm64_bpfel.go @@ -238,8 +238,6 @@ type bpf_debugSpecs struct { // // It can be passed ebpf.CollectionSpec.Assign. type bpf_debugProgramSpecs struct { - AppEgress *ebpf.ProgramSpec `ebpf:"app_egress"` - AppIngress *ebpf.ProgramSpec `ebpf:"app_ingress"` AsyncReset *ebpf.ProgramSpec `ebpf:"async_reset"` AsyncResetRet *ebpf.ProgramSpec `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.ProgramSpec `ebpf:"emit_async_init"` @@ -429,8 +427,6 @@ func (m *bpf_debugMaps) Close() error { // // It can be passed to loadBpf_debugObjects or ebpf.CollectionSpec.LoadAndAssign. type bpf_debugPrograms struct { - AppEgress *ebpf.Program `ebpf:"app_egress"` - AppIngress *ebpf.Program `ebpf:"app_ingress"` AsyncReset *ebpf.Program `ebpf:"async_reset"` AsyncResetRet *ebpf.Program `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.Program `ebpf:"emit_async_init"` @@ -466,8 +462,6 @@ type bpf_debugPrograms struct { func (p *bpf_debugPrograms) Close() error { return _Bpf_debugClose( - p.AppEgress, - p.AppIngress, p.AsyncReset, p.AsyncResetRet, p.EmitAsyncInit, diff --git a/pkg/internal/ebpf/generictracer/bpf_debug_arm64_bpfel.o b/pkg/internal/ebpf/generictracer/bpf_debug_arm64_bpfel.o index a7d3ffd21..52ff22896 100644 --- a/pkg/internal/ebpf/generictracer/bpf_debug_arm64_bpfel.o +++ b/pkg/internal/ebpf/generictracer/bpf_debug_arm64_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:b29d84926317e699513e5c96ac554c1e5ac3195b72f85a2805a2e85c279c8181 -size 813536 +oid sha256:f6bb1cdffaf73894fb41d8d08e4f1fc354e0a3db7a744321cbf35d47ef8a64c8 +size 776928 diff --git a/pkg/internal/ebpf/generictracer/bpf_debug_x86_bpfel.go b/pkg/internal/ebpf/generictracer/bpf_debug_x86_bpfel.go index ac4511745..c6cd7f80a 100644 --- a/pkg/internal/ebpf/generictracer/bpf_debug_x86_bpfel.go +++ b/pkg/internal/ebpf/generictracer/bpf_debug_x86_bpfel.go @@ -238,8 +238,6 @@ type bpf_debugSpecs struct { // // It can be passed ebpf.CollectionSpec.Assign. type bpf_debugProgramSpecs struct { - AppEgress *ebpf.ProgramSpec `ebpf:"app_egress"` - AppIngress *ebpf.ProgramSpec `ebpf:"app_ingress"` AsyncReset *ebpf.ProgramSpec `ebpf:"async_reset"` AsyncResetRet *ebpf.ProgramSpec `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.ProgramSpec `ebpf:"emit_async_init"` @@ -429,8 +427,6 @@ func (m *bpf_debugMaps) Close() error { // // It can be passed to loadBpf_debugObjects or ebpf.CollectionSpec.LoadAndAssign. type bpf_debugPrograms struct { - AppEgress *ebpf.Program `ebpf:"app_egress"` - AppIngress *ebpf.Program `ebpf:"app_ingress"` AsyncReset *ebpf.Program `ebpf:"async_reset"` AsyncResetRet *ebpf.Program `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.Program `ebpf:"emit_async_init"` @@ -466,8 +462,6 @@ type bpf_debugPrograms struct { func (p *bpf_debugPrograms) Close() error { return _Bpf_debugClose( - p.AppEgress, - p.AppIngress, p.AsyncReset, p.AsyncResetRet, p.EmitAsyncInit, diff --git a/pkg/internal/ebpf/generictracer/bpf_debug_x86_bpfel.o b/pkg/internal/ebpf/generictracer/bpf_debug_x86_bpfel.o index 1883adedb..4d74cf4cd 100644 --- a/pkg/internal/ebpf/generictracer/bpf_debug_x86_bpfel.o +++ b/pkg/internal/ebpf/generictracer/bpf_debug_x86_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:9c2e293c53ea9a4a75e9e2290a63126c11c6da0bc3d4173244b8aa7adc272edd -size 813056 +oid sha256:53d7c9ae1ba161fda8daf890302a78f13481f0b09e0697a7b138428642c59458 +size 776448 diff --git a/pkg/internal/ebpf/generictracer/bpf_tp_arm64_bpfel.go b/pkg/internal/ebpf/generictracer/bpf_tp_arm64_bpfel.go index 09590bac4..c5446645a 100644 --- a/pkg/internal/ebpf/generictracer/bpf_tp_arm64_bpfel.go +++ b/pkg/internal/ebpf/generictracer/bpf_tp_arm64_bpfel.go @@ -238,8 +238,6 @@ type bpf_tpSpecs struct { // // It can be passed ebpf.CollectionSpec.Assign. type bpf_tpProgramSpecs struct { - AppEgress *ebpf.ProgramSpec `ebpf:"app_egress"` - AppIngress *ebpf.ProgramSpec `ebpf:"app_ingress"` AsyncReset *ebpf.ProgramSpec `ebpf:"async_reset"` AsyncResetRet *ebpf.ProgramSpec `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.ProgramSpec `ebpf:"emit_async_init"` @@ -426,8 +424,6 @@ func (m *bpf_tpMaps) Close() error { // // It can be passed to loadBpf_tpObjects or ebpf.CollectionSpec.LoadAndAssign. type bpf_tpPrograms struct { - AppEgress *ebpf.Program `ebpf:"app_egress"` - AppIngress *ebpf.Program `ebpf:"app_ingress"` AsyncReset *ebpf.Program `ebpf:"async_reset"` AsyncResetRet *ebpf.Program `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.Program `ebpf:"emit_async_init"` @@ -463,8 +459,6 @@ type bpf_tpPrograms struct { func (p *bpf_tpPrograms) Close() error { return _Bpf_tpClose( - p.AppEgress, - p.AppIngress, p.AsyncReset, p.AsyncResetRet, p.EmitAsyncInit, diff --git a/pkg/internal/ebpf/generictracer/bpf_tp_arm64_bpfel.o b/pkg/internal/ebpf/generictracer/bpf_tp_arm64_bpfel.o index f786de955..702f321a9 100644 --- a/pkg/internal/ebpf/generictracer/bpf_tp_arm64_bpfel.o +++ b/pkg/internal/ebpf/generictracer/bpf_tp_arm64_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:e4a000e33d2c08400976c97e5547914abf720d75175988e29600eda9efc3b13f -size 518384 +oid sha256:c3010994df087433bf63dc1a9b99757bd07c770cc93c060685e7b57a27972a41 +size 481592 diff --git a/pkg/internal/ebpf/generictracer/bpf_tp_debug_arm64_bpfel.go b/pkg/internal/ebpf/generictracer/bpf_tp_debug_arm64_bpfel.go index 9271418a1..f996f2686 100644 --- a/pkg/internal/ebpf/generictracer/bpf_tp_debug_arm64_bpfel.go +++ b/pkg/internal/ebpf/generictracer/bpf_tp_debug_arm64_bpfel.go @@ -238,8 +238,6 @@ type bpf_tp_debugSpecs struct { // // It can be passed ebpf.CollectionSpec.Assign. type bpf_tp_debugProgramSpecs struct { - AppEgress *ebpf.ProgramSpec `ebpf:"app_egress"` - AppIngress *ebpf.ProgramSpec `ebpf:"app_ingress"` AsyncReset *ebpf.ProgramSpec `ebpf:"async_reset"` AsyncResetRet *ebpf.ProgramSpec `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.ProgramSpec `ebpf:"emit_async_init"` @@ -429,8 +427,6 @@ func (m *bpf_tp_debugMaps) Close() error { // // It can be passed to loadBpf_tp_debugObjects or ebpf.CollectionSpec.LoadAndAssign. type bpf_tp_debugPrograms struct { - AppEgress *ebpf.Program `ebpf:"app_egress"` - AppIngress *ebpf.Program `ebpf:"app_ingress"` AsyncReset *ebpf.Program `ebpf:"async_reset"` AsyncResetRet *ebpf.Program `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.Program `ebpf:"emit_async_init"` @@ -466,8 +462,6 @@ type bpf_tp_debugPrograms struct { func (p *bpf_tp_debugPrograms) Close() error { return _Bpf_tp_debugClose( - p.AppEgress, - p.AppIngress, p.AsyncReset, p.AsyncResetRet, p.EmitAsyncInit, diff --git a/pkg/internal/ebpf/generictracer/bpf_tp_debug_arm64_bpfel.o b/pkg/internal/ebpf/generictracer/bpf_tp_debug_arm64_bpfel.o index 467bb8e37..f87f3dfd4 100644 --- a/pkg/internal/ebpf/generictracer/bpf_tp_debug_arm64_bpfel.o +++ b/pkg/internal/ebpf/generictracer/bpf_tp_debug_arm64_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:d40c442615787c2f85b499babfa5be6fcb9c017516ed9cae829ca80a38888569 -size 831744 +oid sha256:5726821976ddf2cd4cf55a359f495944d5378feb9bc19ffc06f9c1e26b7e7109 +size 795152 diff --git a/pkg/internal/ebpf/generictracer/bpf_tp_debug_x86_bpfel.go b/pkg/internal/ebpf/generictracer/bpf_tp_debug_x86_bpfel.go index d9a185f96..c1afa6a31 100644 --- a/pkg/internal/ebpf/generictracer/bpf_tp_debug_x86_bpfel.go +++ b/pkg/internal/ebpf/generictracer/bpf_tp_debug_x86_bpfel.go @@ -238,8 +238,6 @@ type bpf_tp_debugSpecs struct { // // It can be passed ebpf.CollectionSpec.Assign. type bpf_tp_debugProgramSpecs struct { - AppEgress *ebpf.ProgramSpec `ebpf:"app_egress"` - AppIngress *ebpf.ProgramSpec `ebpf:"app_ingress"` AsyncReset *ebpf.ProgramSpec `ebpf:"async_reset"` AsyncResetRet *ebpf.ProgramSpec `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.ProgramSpec `ebpf:"emit_async_init"` @@ -429,8 +427,6 @@ func (m *bpf_tp_debugMaps) Close() error { // // It can be passed to loadBpf_tp_debugObjects or ebpf.CollectionSpec.LoadAndAssign. type bpf_tp_debugPrograms struct { - AppEgress *ebpf.Program `ebpf:"app_egress"` - AppIngress *ebpf.Program `ebpf:"app_ingress"` AsyncReset *ebpf.Program `ebpf:"async_reset"` AsyncResetRet *ebpf.Program `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.Program `ebpf:"emit_async_init"` @@ -466,8 +462,6 @@ type bpf_tp_debugPrograms struct { func (p *bpf_tp_debugPrograms) Close() error { return _Bpf_tp_debugClose( - p.AppEgress, - p.AppIngress, p.AsyncReset, p.AsyncResetRet, p.EmitAsyncInit, diff --git a/pkg/internal/ebpf/generictracer/bpf_tp_debug_x86_bpfel.o b/pkg/internal/ebpf/generictracer/bpf_tp_debug_x86_bpfel.o index af7e278a3..d7f606f1e 100644 --- a/pkg/internal/ebpf/generictracer/bpf_tp_debug_x86_bpfel.o +++ b/pkg/internal/ebpf/generictracer/bpf_tp_debug_x86_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:a03de6ded0676a3a01d580f24bf97d5ec322397e0b136dc2834eb28e6e441a06 -size 831256 +oid sha256:aba3ce059d1483e7a9e870f972072f71005e54d80878985770b2a24b8e6c84fb +size 794672 diff --git a/pkg/internal/ebpf/generictracer/bpf_tp_x86_bpfel.go b/pkg/internal/ebpf/generictracer/bpf_tp_x86_bpfel.go index f8660a8b2..c001ef8b2 100644 --- a/pkg/internal/ebpf/generictracer/bpf_tp_x86_bpfel.go +++ b/pkg/internal/ebpf/generictracer/bpf_tp_x86_bpfel.go @@ -238,8 +238,6 @@ type bpf_tpSpecs struct { // // It can be passed ebpf.CollectionSpec.Assign. type bpf_tpProgramSpecs struct { - AppEgress *ebpf.ProgramSpec `ebpf:"app_egress"` - AppIngress *ebpf.ProgramSpec `ebpf:"app_ingress"` AsyncReset *ebpf.ProgramSpec `ebpf:"async_reset"` AsyncResetRet *ebpf.ProgramSpec `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.ProgramSpec `ebpf:"emit_async_init"` @@ -426,8 +424,6 @@ func (m *bpf_tpMaps) Close() error { // // It can be passed to loadBpf_tpObjects or ebpf.CollectionSpec.LoadAndAssign. type bpf_tpPrograms struct { - AppEgress *ebpf.Program `ebpf:"app_egress"` - AppIngress *ebpf.Program `ebpf:"app_ingress"` AsyncReset *ebpf.Program `ebpf:"async_reset"` AsyncResetRet *ebpf.Program `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.Program `ebpf:"emit_async_init"` @@ -463,8 +459,6 @@ type bpf_tpPrograms struct { func (p *bpf_tpPrograms) Close() error { return _Bpf_tpClose( - p.AppEgress, - p.AppIngress, p.AsyncReset, p.AsyncResetRet, p.EmitAsyncInit, diff --git a/pkg/internal/ebpf/generictracer/bpf_tp_x86_bpfel.o b/pkg/internal/ebpf/generictracer/bpf_tp_x86_bpfel.o index e28a9e4f8..5e61edf2f 100644 --- a/pkg/internal/ebpf/generictracer/bpf_tp_x86_bpfel.o +++ b/pkg/internal/ebpf/generictracer/bpf_tp_x86_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:ceaa92616118a8d5edd108a17b68e2d41ba26834f85c98a3d9a3f1220c8db92b -size 517784 +oid sha256:68ddb1cea0a7886348ec3ef1cbf3f955e897222abe803f98e0736a5e22d5770e +size 480992 diff --git a/pkg/internal/ebpf/generictracer/bpf_x86_bpfel.go b/pkg/internal/ebpf/generictracer/bpf_x86_bpfel.go index 827a935a1..61fdb20dc 100644 --- a/pkg/internal/ebpf/generictracer/bpf_x86_bpfel.go +++ b/pkg/internal/ebpf/generictracer/bpf_x86_bpfel.go @@ -238,8 +238,6 @@ type bpfSpecs struct { // // It can be passed ebpf.CollectionSpec.Assign. type bpfProgramSpecs struct { - AppEgress *ebpf.ProgramSpec `ebpf:"app_egress"` - AppIngress *ebpf.ProgramSpec `ebpf:"app_ingress"` AsyncReset *ebpf.ProgramSpec `ebpf:"async_reset"` AsyncResetRet *ebpf.ProgramSpec `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.ProgramSpec `ebpf:"emit_async_init"` @@ -426,8 +424,6 @@ func (m *bpfMaps) Close() error { // // It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. type bpfPrograms struct { - AppEgress *ebpf.Program `ebpf:"app_egress"` - AppIngress *ebpf.Program `ebpf:"app_ingress"` AsyncReset *ebpf.Program `ebpf:"async_reset"` AsyncResetRet *ebpf.Program `ebpf:"async_reset_ret"` EmitAsyncInit *ebpf.Program `ebpf:"emit_async_init"` @@ -463,8 +459,6 @@ type bpfPrograms struct { func (p *bpfPrograms) Close() error { return _BpfClose( - p.AppEgress, - p.AppIngress, p.AsyncReset, p.AsyncResetRet, p.EmitAsyncInit, diff --git a/pkg/internal/ebpf/generictracer/bpf_x86_bpfel.o b/pkg/internal/ebpf/generictracer/bpf_x86_bpfel.o index ed6362b48..e8a6b077c 100644 --- a/pkg/internal/ebpf/generictracer/bpf_x86_bpfel.o +++ b/pkg/internal/ebpf/generictracer/bpf_x86_bpfel.o @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:60ce162a7c00153d2ad5a577bf4db122b75c165a0b4e0fafd249db1e6c84210d -size 503168 +oid sha256:52fb6808d0ef5d3b78ea68cb038fa869d2f172260fa206f0996236c40f8fc32f +size 466344 diff --git a/pkg/internal/ebpf/generictracer/generictracer.go b/pkg/internal/ebpf/generictracer/generictracer.go index 70af4eb2f..bc914dc10 100644 --- a/pkg/internal/ebpf/generictracer/generictracer.go +++ b/pkg/internal/ebpf/generictracer/generictracer.go @@ -4,10 +4,7 @@ package generictracer import ( "context" - "errors" - "fmt" "io" - "io/fs" "log/slog" "sync" "time" @@ -16,7 +13,6 @@ import ( "github.com/cilium/ebpf" "github.com/gavv/monotime" "github.com/vishvananda/netlink" - "golang.org/x/sys/unix" "github.com/grafana/beyla/pkg/beyla" ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" @@ -410,41 +406,6 @@ func (p *Tracer) AlreadyInstrumentedLib(id uint64) bool { } func (p *Tracer) SetupTC() { - if !p.cfg.EBPF.UseLinuxTC { - return - } - - informer := ifaces.NewWatcher(p.cfg.ChannelBufferLen) - registerer := ifaces.NewRegisterer(informer, p.cfg.ChannelBufferLen) - ctx := context.Background() - - p.log.Debug("subscribing for network interface events") - ifaceEvents, err := registerer.Subscribe(ctx) - if err != nil { - p.log.Error("instantiating interfaces' informer", "error", err) - return - } - - go func() { - for { - select { - case <-ctx.Done(): - slog.Debug("stopping interfaces' listener") - return - case event := <-ifaceEvents: - slog.Debug("received event", "event", event) - switch event.Type { - case ifaces.EventAdded: - p.registerTC(event.Interface) - case ifaces.EventDeleted: - // qdiscs, ingress and egress filters are automatically deleted so we don't need to - // specifically detach them from the ebpfFetcher - default: - slog.Warn("unknown event type", "event", event) - } - } - } - }() } func (p *Tracer) Run(ctx context.Context, eventsChan chan<- []request.Span) { @@ -468,8 +429,6 @@ func (p *Tracer) Run(ctx context.Context, eventsChan chan<- []request.Span) { p.bpfObjects.Events, p.metrics, )(ctx, append(p.closers, &p.bpfObjects), eventsChan) - - p.closeTC() } func kernelTime(ktime uint64) time.Time { @@ -539,158 +498,3 @@ func (p *Tracer) watchForMisclassifedEvents() { } } } - -func (p *Tracer) registerTC(iface ifaces.Interface) { - // Load pre-compiled programs and maps into the kernel, and rewrites the configuration - ipvlan, err := netlink.LinkByIndex(iface.Index) - if err != nil { - p.log.Error("failed to lookup ipvlan device", "index", iface.Index, "name", iface.Name, "error", err) - return - } - qdiscAttrs := netlink.QdiscAttrs{ - LinkIndex: ipvlan.Attrs().Index, - Handle: netlink.MakeHandle(0xffff, 0), - Parent: netlink.HANDLE_CLSACT, - } - qdisc := &netlink.GenericQdisc{ - QdiscAttrs: qdiscAttrs, - QdiscType: "clsact", - } - if err := netlink.QdiscDel(qdisc); err == nil { - p.log.Warn("qdisc clsact already existed. Deleted it") - } - if err := netlink.QdiscAdd(qdisc); err != nil { - if errors.Is(err, fs.ErrExist) { - p.log.Warn("qdisc clsact already exists. Ignoring", "error", err) - } else { - p.log.Error("failed to create clsact qdisc on", "index", iface.Index, "name", iface.Name, "error", err) - return - } - } - p.qdiscs[iface] = qdisc - - if err := p.registerEgress(iface, ipvlan); err != nil { - p.log.Error("failed to install egress filters", "error", err) - } - - if err := p.registerIngress(iface, ipvlan); err != nil { - p.log.Error("failed to install ingres filters", "error", err) - } -} - -func (p *Tracer) registerEgress(iface ifaces.Interface, ipvlan netlink.Link) error { - // Fetch events on egress - egressAttrs := netlink.FilterAttrs{ - LinkIndex: ipvlan.Attrs().Index, - Parent: netlink.HANDLE_MIN_EGRESS, - Handle: netlink.MakeHandle(0, 1), - Protocol: 3, - Priority: 1, - } - egressFilter := &netlink.BpfFilter{ - FilterAttrs: egressAttrs, - Fd: p.bpfObjects.AppEgress.FD(), - Name: "tc/app_tc_egress", - DirectAction: true, - } - if err := netlink.FilterDel(egressFilter); err == nil { - p.log.Warn("egress filter already existed. Deleted it") - } - if err := netlink.FilterAdd(egressFilter); err != nil { - if errors.Is(err, fs.ErrExist) { - p.log.Warn("egress filter already exists. Ignoring", "error", err) - } else { - return fmt.Errorf("failed to create egress filter: %w", err) - } - } - - p.egressFilters[iface] = egressFilter - return nil -} - -func (p *Tracer) registerIngress(iface ifaces.Interface, ipvlan netlink.Link) error { - // Fetch events on ingress - ingressAttrs := netlink.FilterAttrs{ - LinkIndex: ipvlan.Attrs().Index, - Parent: netlink.HANDLE_MIN_INGRESS, - Handle: netlink.MakeHandle(0, 1), - Protocol: unix.ETH_P_ALL, - Priority: 1, - } - ingressFilter := &netlink.BpfFilter{ - FilterAttrs: ingressAttrs, - Fd: p.bpfObjects.AppIngress.FD(), - Name: "tc/app_tc_ingress", - DirectAction: true, - } - if err := netlink.FilterDel(ingressFilter); err == nil { - p.log.Warn("ingress filter already existed. Deleted it") - } - if err := netlink.FilterAdd(ingressFilter); err != nil { - if errors.Is(err, fs.ErrExist) { - p.log.Warn("ingress filter already exists. Ignoring", "error", err) - } else { - return fmt.Errorf("failed to create ingress filter: %w", err) - } - } - - p.ingressFilters[iface] = ingressFilter - return nil -} - -func (p *Tracer) closeTC() { - p.log.Info("removing traffic control probes") - - p.bpfObjects.AppEgress.Close() - p.bpfObjects.AppIngress.Close() - - // cleanup egress - for iface, ef := range p.egressFilters { - p.log.Debug("deleting egress filter", "interface", iface) - if err := doIgnoreNoDev(netlink.FilterDel, netlink.Filter(ef)); err != nil { - p.log.Error("deleting egress filter", "error", err) - } - } - p.egressFilters = map[ifaces.Interface]*netlink.BpfFilter{} - - // cleanup ingress - for iface, igf := range p.ingressFilters { - p.log.Debug("deleting ingress filter", "interface", iface) - if err := doIgnoreNoDev(netlink.FilterDel, netlink.Filter(igf)); err != nil { - p.log.Error("deleting ingress filter", "error", err) - } - } - p.ingressFilters = map[ifaces.Interface]*netlink.BpfFilter{} - - // cleanup qdiscs - for iface, qd := range p.qdiscs { - p.log.Debug("deleting Qdisc", "interface", iface) - if err := doIgnoreNoDev(netlink.QdiscDel, netlink.Qdisc(qd)); err != nil { - p.log.Error("deleting qdisc", "error", err) - } - } - p.qdiscs = map[ifaces.Interface]*netlink.GenericQdisc{} -} - -// doIgnoreNoDev runs the provided syscall over the provided device and ignores the error -// if the cause is a non-existing device (just logs the error as debug). -// If the agent is deployed as part of the Network Metrics pipeline, normally -// undeploying the FlowCollector could cause the agent to try to remove resources -// from Pods that have been removed immediately before (e.g. flowlogs-pipeline or the -// console plugin), so we avoid logging some errors that would unnecessarily raise the -// user's attention. -// This function uses generics because the set of provided functions accept different argument -// types. -func doIgnoreNoDev[T any](sysCall func(T) error, dev T) error { - if err := sysCall(dev); err != nil { - if errors.Is(err, unix.ENODEV) { - slog.Error("can't delete. Ignore this error if other pods or interfaces "+ - " are also being deleted at this moment. For example, if you are undeploying "+ - " a FlowCollector or Deployment where this agent is part of", - "error", err) - } else { - return err - } - } - return nil -} diff --git a/pkg/internal/ebpf/tctracer/bpf_arm64_bpfel.go b/pkg/internal/ebpf/tctracer/bpf_arm64_bpfel.go new file mode 100644 index 000000000..20fbc7376 --- /dev/null +++ b/pkg/internal/ebpf/tctracer/bpf_arm64_bpfel.go @@ -0,0 +1,191 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build arm64 + +package tctracer + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +type bpfConnectionInfoT struct { + S_addr [16]uint8 + D_addr [16]uint8 + S_port uint16 + D_port uint16 +} + +type bpfHttpInfoT struct { + Flags uint8 + _ [1]byte + ConnInfo bpfConnectionInfoT + _ [2]byte + StartMonotimeNs uint64 + EndMonotimeNs uint64 + Buf [192]uint8 + Len uint32 + RespLen uint32 + Status uint16 + Type uint8 + Ssl uint8 + Pid struct { + HostPid uint32 + UserPid uint32 + Ns uint32 + } + Tp struct { + TraceId [16]uint8 + SpanId [8]uint8 + ParentId [8]uint8 + Ts uint64 + Flags uint8 + _ [7]byte + } + ExtraId uint64 + TaskTid uint32 + _ [4]byte +} + +type bpfPidConnectionInfoT struct { + Conn bpfConnectionInfoT + Pid uint32 +} + +type bpfTpInfoPidT struct { + Tp struct { + TraceId [16]uint8 + SpanId [8]uint8 + ParentId [8]uint8 + Ts uint64 + Flags uint8 + _ [7]byte + } + Pid uint32 + Valid uint8 + _ [3]byte +} + +// loadBpf returns the embedded CollectionSpec for bpf. +func loadBpf() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_BpfBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load bpf: %w", err) + } + + return spec, err +} + +// loadBpfObjects loads bpf and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *bpfObjects +// *bpfPrograms +// *bpfMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := loadBpf() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// bpfSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfSpecs struct { + bpfProgramSpecs + bpfMapSpecs +} + +// bpfSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfProgramSpecs struct { + AppEgress *ebpf.ProgramSpec `ebpf:"app_egress"` + AppIngress *ebpf.ProgramSpec `ebpf:"app_ingress"` +} + +// bpfMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfMapSpecs struct { + IncomingTraceMap *ebpf.MapSpec `ebpf:"incoming_trace_map"` + OngoingHttp *ebpf.MapSpec `ebpf:"ongoing_http"` + OngoingHttpFallback *ebpf.MapSpec `ebpf:"ongoing_http_fallback"` + OutgoingTraceMap *ebpf.MapSpec `ebpf:"outgoing_trace_map"` + TraceMap *ebpf.MapSpec `ebpf:"trace_map"` +} + +// bpfObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfObjects struct { + bpfPrograms + bpfMaps +} + +func (o *bpfObjects) Close() error { + return _BpfClose( + &o.bpfPrograms, + &o.bpfMaps, + ) +} + +// bpfMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfMaps struct { + IncomingTraceMap *ebpf.Map `ebpf:"incoming_trace_map"` + OngoingHttp *ebpf.Map `ebpf:"ongoing_http"` + OngoingHttpFallback *ebpf.Map `ebpf:"ongoing_http_fallback"` + OutgoingTraceMap *ebpf.Map `ebpf:"outgoing_trace_map"` + TraceMap *ebpf.Map `ebpf:"trace_map"` +} + +func (m *bpfMaps) Close() error { + return _BpfClose( + m.IncomingTraceMap, + m.OngoingHttp, + m.OngoingHttpFallback, + m.OutgoingTraceMap, + m.TraceMap, + ) +} + +// bpfPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfPrograms struct { + AppEgress *ebpf.Program `ebpf:"app_egress"` + AppIngress *ebpf.Program `ebpf:"app_ingress"` +} + +func (p *bpfPrograms) Close() error { + return _BpfClose( + p.AppEgress, + p.AppIngress, + ) +} + +func _BpfClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed bpf_arm64_bpfel.o +var _BpfBytes []byte diff --git a/pkg/internal/ebpf/tctracer/bpf_arm64_bpfel.o b/pkg/internal/ebpf/tctracer/bpf_arm64_bpfel.o new file mode 100644 index 000000000..2e87e2193 --- /dev/null +++ b/pkg/internal/ebpf/tctracer/bpf_arm64_bpfel.o @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9ad1eced0b565ceee177b2f0f2e3ede9183082fbedee0c8c95cf7b561186da85 +size 46960 diff --git a/pkg/internal/ebpf/tctracer/bpf_debug_arm64_bpfel.go b/pkg/internal/ebpf/tctracer/bpf_debug_arm64_bpfel.go new file mode 100644 index 000000000..f7801160f --- /dev/null +++ b/pkg/internal/ebpf/tctracer/bpf_debug_arm64_bpfel.go @@ -0,0 +1,194 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build arm64 + +package tctracer + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +type bpf_debugConnectionInfoT struct { + S_addr [16]uint8 + D_addr [16]uint8 + S_port uint16 + D_port uint16 +} + +type bpf_debugHttpInfoT struct { + Flags uint8 + _ [1]byte + ConnInfo bpf_debugConnectionInfoT + _ [2]byte + StartMonotimeNs uint64 + EndMonotimeNs uint64 + Buf [192]uint8 + Len uint32 + RespLen uint32 + Status uint16 + Type uint8 + Ssl uint8 + Pid struct { + HostPid uint32 + UserPid uint32 + Ns uint32 + } + Tp struct { + TraceId [16]uint8 + SpanId [8]uint8 + ParentId [8]uint8 + Ts uint64 + Flags uint8 + _ [7]byte + } + ExtraId uint64 + TaskTid uint32 + _ [4]byte +} + +type bpf_debugPidConnectionInfoT struct { + Conn bpf_debugConnectionInfoT + Pid uint32 +} + +type bpf_debugTpInfoPidT struct { + Tp struct { + TraceId [16]uint8 + SpanId [8]uint8 + ParentId [8]uint8 + Ts uint64 + Flags uint8 + _ [7]byte + } + Pid uint32 + Valid uint8 + _ [3]byte +} + +// loadBpf_debug returns the embedded CollectionSpec for bpf_debug. +func loadBpf_debug() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_Bpf_debugBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load bpf_debug: %w", err) + } + + return spec, err +} + +// loadBpf_debugObjects loads bpf_debug and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *bpf_debugObjects +// *bpf_debugPrograms +// *bpf_debugMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func loadBpf_debugObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := loadBpf_debug() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// bpf_debugSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpf_debugSpecs struct { + bpf_debugProgramSpecs + bpf_debugMapSpecs +} + +// bpf_debugSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpf_debugProgramSpecs struct { + AppEgress *ebpf.ProgramSpec `ebpf:"app_egress"` + AppIngress *ebpf.ProgramSpec `ebpf:"app_ingress"` +} + +// bpf_debugMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpf_debugMapSpecs struct { + DebugEvents *ebpf.MapSpec `ebpf:"debug_events"` + IncomingTraceMap *ebpf.MapSpec `ebpf:"incoming_trace_map"` + OngoingHttp *ebpf.MapSpec `ebpf:"ongoing_http"` + OngoingHttpFallback *ebpf.MapSpec `ebpf:"ongoing_http_fallback"` + OutgoingTraceMap *ebpf.MapSpec `ebpf:"outgoing_trace_map"` + TraceMap *ebpf.MapSpec `ebpf:"trace_map"` +} + +// bpf_debugObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to loadBpf_debugObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpf_debugObjects struct { + bpf_debugPrograms + bpf_debugMaps +} + +func (o *bpf_debugObjects) Close() error { + return _Bpf_debugClose( + &o.bpf_debugPrograms, + &o.bpf_debugMaps, + ) +} + +// bpf_debugMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to loadBpf_debugObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpf_debugMaps struct { + DebugEvents *ebpf.Map `ebpf:"debug_events"` + IncomingTraceMap *ebpf.Map `ebpf:"incoming_trace_map"` + OngoingHttp *ebpf.Map `ebpf:"ongoing_http"` + OngoingHttpFallback *ebpf.Map `ebpf:"ongoing_http_fallback"` + OutgoingTraceMap *ebpf.Map `ebpf:"outgoing_trace_map"` + TraceMap *ebpf.Map `ebpf:"trace_map"` +} + +func (m *bpf_debugMaps) Close() error { + return _Bpf_debugClose( + m.DebugEvents, + m.IncomingTraceMap, + m.OngoingHttp, + m.OngoingHttpFallback, + m.OutgoingTraceMap, + m.TraceMap, + ) +} + +// bpf_debugPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to loadBpf_debugObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpf_debugPrograms struct { + AppEgress *ebpf.Program `ebpf:"app_egress"` + AppIngress *ebpf.Program `ebpf:"app_ingress"` +} + +func (p *bpf_debugPrograms) Close() error { + return _Bpf_debugClose( + p.AppEgress, + p.AppIngress, + ) +} + +func _Bpf_debugClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed bpf_debug_arm64_bpfel.o +var _Bpf_debugBytes []byte diff --git a/pkg/internal/ebpf/tctracer/bpf_debug_arm64_bpfel.o b/pkg/internal/ebpf/tctracer/bpf_debug_arm64_bpfel.o new file mode 100644 index 000000000..6dbb2536a --- /dev/null +++ b/pkg/internal/ebpf/tctracer/bpf_debug_arm64_bpfel.o @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:b4aa089b0e4981d82f656711676ce7a573221f007062c6f537cd7331285bddb2 +size 47200 diff --git a/pkg/internal/ebpf/tctracer/bpf_debug_x86_bpfel.go b/pkg/internal/ebpf/tctracer/bpf_debug_x86_bpfel.go new file mode 100644 index 000000000..9424d5997 --- /dev/null +++ b/pkg/internal/ebpf/tctracer/bpf_debug_x86_bpfel.go @@ -0,0 +1,194 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build 386 || amd64 + +package tctracer + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +type bpf_debugConnectionInfoT struct { + S_addr [16]uint8 + D_addr [16]uint8 + S_port uint16 + D_port uint16 +} + +type bpf_debugHttpInfoT struct { + Flags uint8 + _ [1]byte + ConnInfo bpf_debugConnectionInfoT + _ [2]byte + StartMonotimeNs uint64 + EndMonotimeNs uint64 + Buf [192]uint8 + Len uint32 + RespLen uint32 + Status uint16 + Type uint8 + Ssl uint8 + Pid struct { + HostPid uint32 + UserPid uint32 + Ns uint32 + } + Tp struct { + TraceId [16]uint8 + SpanId [8]uint8 + ParentId [8]uint8 + Ts uint64 + Flags uint8 + _ [7]byte + } + ExtraId uint64 + TaskTid uint32 + _ [4]byte +} + +type bpf_debugPidConnectionInfoT struct { + Conn bpf_debugConnectionInfoT + Pid uint32 +} + +type bpf_debugTpInfoPidT struct { + Tp struct { + TraceId [16]uint8 + SpanId [8]uint8 + ParentId [8]uint8 + Ts uint64 + Flags uint8 + _ [7]byte + } + Pid uint32 + Valid uint8 + _ [3]byte +} + +// loadBpf_debug returns the embedded CollectionSpec for bpf_debug. +func loadBpf_debug() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_Bpf_debugBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load bpf_debug: %w", err) + } + + return spec, err +} + +// loadBpf_debugObjects loads bpf_debug and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *bpf_debugObjects +// *bpf_debugPrograms +// *bpf_debugMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func loadBpf_debugObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := loadBpf_debug() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// bpf_debugSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpf_debugSpecs struct { + bpf_debugProgramSpecs + bpf_debugMapSpecs +} + +// bpf_debugSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpf_debugProgramSpecs struct { + AppEgress *ebpf.ProgramSpec `ebpf:"app_egress"` + AppIngress *ebpf.ProgramSpec `ebpf:"app_ingress"` +} + +// bpf_debugMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpf_debugMapSpecs struct { + DebugEvents *ebpf.MapSpec `ebpf:"debug_events"` + IncomingTraceMap *ebpf.MapSpec `ebpf:"incoming_trace_map"` + OngoingHttp *ebpf.MapSpec `ebpf:"ongoing_http"` + OngoingHttpFallback *ebpf.MapSpec `ebpf:"ongoing_http_fallback"` + OutgoingTraceMap *ebpf.MapSpec `ebpf:"outgoing_trace_map"` + TraceMap *ebpf.MapSpec `ebpf:"trace_map"` +} + +// bpf_debugObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to loadBpf_debugObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpf_debugObjects struct { + bpf_debugPrograms + bpf_debugMaps +} + +func (o *bpf_debugObjects) Close() error { + return _Bpf_debugClose( + &o.bpf_debugPrograms, + &o.bpf_debugMaps, + ) +} + +// bpf_debugMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to loadBpf_debugObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpf_debugMaps struct { + DebugEvents *ebpf.Map `ebpf:"debug_events"` + IncomingTraceMap *ebpf.Map `ebpf:"incoming_trace_map"` + OngoingHttp *ebpf.Map `ebpf:"ongoing_http"` + OngoingHttpFallback *ebpf.Map `ebpf:"ongoing_http_fallback"` + OutgoingTraceMap *ebpf.Map `ebpf:"outgoing_trace_map"` + TraceMap *ebpf.Map `ebpf:"trace_map"` +} + +func (m *bpf_debugMaps) Close() error { + return _Bpf_debugClose( + m.DebugEvents, + m.IncomingTraceMap, + m.OngoingHttp, + m.OngoingHttpFallback, + m.OutgoingTraceMap, + m.TraceMap, + ) +} + +// bpf_debugPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to loadBpf_debugObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpf_debugPrograms struct { + AppEgress *ebpf.Program `ebpf:"app_egress"` + AppIngress *ebpf.Program `ebpf:"app_ingress"` +} + +func (p *bpf_debugPrograms) Close() error { + return _Bpf_debugClose( + p.AppEgress, + p.AppIngress, + ) +} + +func _Bpf_debugClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed bpf_debug_x86_bpfel.o +var _Bpf_debugBytes []byte diff --git a/pkg/internal/ebpf/tctracer/bpf_debug_x86_bpfel.o b/pkg/internal/ebpf/tctracer/bpf_debug_x86_bpfel.o new file mode 100644 index 000000000..32f14dfb5 --- /dev/null +++ b/pkg/internal/ebpf/tctracer/bpf_debug_x86_bpfel.o @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:0534c0d89b3122d79414722cbcb6a28837199fb55eae20bce01c176ad0e94c88 +size 46984 diff --git a/pkg/internal/ebpf/tctracer/bpf_x86_bpfel.go b/pkg/internal/ebpf/tctracer/bpf_x86_bpfel.go new file mode 100644 index 000000000..d1483a503 --- /dev/null +++ b/pkg/internal/ebpf/tctracer/bpf_x86_bpfel.go @@ -0,0 +1,191 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build 386 || amd64 + +package tctracer + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +type bpfConnectionInfoT struct { + S_addr [16]uint8 + D_addr [16]uint8 + S_port uint16 + D_port uint16 +} + +type bpfHttpInfoT struct { + Flags uint8 + _ [1]byte + ConnInfo bpfConnectionInfoT + _ [2]byte + StartMonotimeNs uint64 + EndMonotimeNs uint64 + Buf [192]uint8 + Len uint32 + RespLen uint32 + Status uint16 + Type uint8 + Ssl uint8 + Pid struct { + HostPid uint32 + UserPid uint32 + Ns uint32 + } + Tp struct { + TraceId [16]uint8 + SpanId [8]uint8 + ParentId [8]uint8 + Ts uint64 + Flags uint8 + _ [7]byte + } + ExtraId uint64 + TaskTid uint32 + _ [4]byte +} + +type bpfPidConnectionInfoT struct { + Conn bpfConnectionInfoT + Pid uint32 +} + +type bpfTpInfoPidT struct { + Tp struct { + TraceId [16]uint8 + SpanId [8]uint8 + ParentId [8]uint8 + Ts uint64 + Flags uint8 + _ [7]byte + } + Pid uint32 + Valid uint8 + _ [3]byte +} + +// loadBpf returns the embedded CollectionSpec for bpf. +func loadBpf() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_BpfBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load bpf: %w", err) + } + + return spec, err +} + +// loadBpfObjects loads bpf and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *bpfObjects +// *bpfPrograms +// *bpfMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := loadBpf() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// bpfSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfSpecs struct { + bpfProgramSpecs + bpfMapSpecs +} + +// bpfSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfProgramSpecs struct { + AppEgress *ebpf.ProgramSpec `ebpf:"app_egress"` + AppIngress *ebpf.ProgramSpec `ebpf:"app_ingress"` +} + +// bpfMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfMapSpecs struct { + IncomingTraceMap *ebpf.MapSpec `ebpf:"incoming_trace_map"` + OngoingHttp *ebpf.MapSpec `ebpf:"ongoing_http"` + OngoingHttpFallback *ebpf.MapSpec `ebpf:"ongoing_http_fallback"` + OutgoingTraceMap *ebpf.MapSpec `ebpf:"outgoing_trace_map"` + TraceMap *ebpf.MapSpec `ebpf:"trace_map"` +} + +// bpfObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfObjects struct { + bpfPrograms + bpfMaps +} + +func (o *bpfObjects) Close() error { + return _BpfClose( + &o.bpfPrograms, + &o.bpfMaps, + ) +} + +// bpfMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfMaps struct { + IncomingTraceMap *ebpf.Map `ebpf:"incoming_trace_map"` + OngoingHttp *ebpf.Map `ebpf:"ongoing_http"` + OngoingHttpFallback *ebpf.Map `ebpf:"ongoing_http_fallback"` + OutgoingTraceMap *ebpf.Map `ebpf:"outgoing_trace_map"` + TraceMap *ebpf.Map `ebpf:"trace_map"` +} + +func (m *bpfMaps) Close() error { + return _BpfClose( + m.IncomingTraceMap, + m.OngoingHttp, + m.OngoingHttpFallback, + m.OutgoingTraceMap, + m.TraceMap, + ) +} + +// bpfPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfPrograms struct { + AppEgress *ebpf.Program `ebpf:"app_egress"` + AppIngress *ebpf.Program `ebpf:"app_ingress"` +} + +func (p *bpfPrograms) Close() error { + return _BpfClose( + p.AppEgress, + p.AppIngress, + ) +} + +func _BpfClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed bpf_x86_bpfel.o +var _BpfBytes []byte diff --git a/pkg/internal/ebpf/tctracer/bpf_x86_bpfel.o b/pkg/internal/ebpf/tctracer/bpf_x86_bpfel.o new file mode 100644 index 000000000..f3ae3462e --- /dev/null +++ b/pkg/internal/ebpf/tctracer/bpf_x86_bpfel.o @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:48c5a189f88796a6f863cd20cd26e784cf45620f4855f224a37170649d6b7c29 +size 46744 diff --git a/pkg/internal/ebpf/tctracer/tctracer.go b/pkg/internal/ebpf/tctracer/tctracer.go new file mode 100644 index 000000000..bd95ae121 --- /dev/null +++ b/pkg/internal/ebpf/tctracer/tctracer.go @@ -0,0 +1,307 @@ +//go:build linux + +package tctracer + +import ( + "context" + "errors" + "fmt" + "io" + "io/fs" + "log/slog" + + "github.com/cilium/ebpf" + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" + + "github.com/grafana/beyla/pkg/beyla" + ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" + "github.com/grafana/beyla/pkg/internal/exec" + "github.com/grafana/beyla/pkg/internal/goexec" + "github.com/grafana/beyla/pkg/internal/netolly/ifaces" + "github.com/grafana/beyla/pkg/internal/request" + "github.com/grafana/beyla/pkg/internal/svc" +) + +//go:generate $BPF2GO -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64 bpf ../../../../bpf/tc_tracer.c -- -I../../../../bpf/headers +//go:generate $BPF2GO -cc $BPF_CLANG -cflags $BPF_CFLAGS -target amd64,arm64 bpf_debug ../../../../bpf/tc_tracer.c -- -I../../../../bpf/headers -DBPF_DEBUG + +type Tracer struct { + cfg *beyla.Config + bpfObjects bpfObjects + closers []io.Closer + log *slog.Logger + qdiscs map[ifaces.Interface]*netlink.GenericQdisc + egressFilters map[ifaces.Interface]*netlink.BpfFilter + ingressFilters map[ifaces.Interface]*netlink.BpfFilter +} + +func New(cfg *beyla.Config) *Tracer { + log := slog.With("component", "tc.Tracer") + return &Tracer{ + log: log, + cfg: cfg, + qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, + egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, + ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, + } +} + +func (p *Tracer) AllowPID(uint32, uint32, *svc.ID) {} + +func (p *Tracer) BlockPID(uint32, uint32) {} + +func (p *Tracer) Load() (*ebpf.CollectionSpec, error) { + if p.cfg.EBPF.BpfDebug { + return loadBpf_debug() + } + + return loadBpf() +} + +func (p *Tracer) SetupTailCalls() {} + +func (p *Tracer) Constants() map[string]any { + return map[string]any{} +} + +func (p *Tracer) RegisterOffsets(_ *exec.FileInfo, _ *goexec.Offsets) {} + +func (p *Tracer) BpfObjects() any { + return &p.bpfObjects +} + +func (p *Tracer) AddCloser(c ...io.Closer) { + p.closers = append(p.closers, c...) +} + +func (p *Tracer) GoProbes() map[string][]ebpfcommon.FunctionPrograms { + return nil +} + +func (p *Tracer) KProbes() map[string]ebpfcommon.FunctionPrograms { + return nil +} + +func (p *Tracer) Tracepoints() map[string]ebpfcommon.FunctionPrograms { + return nil +} + +func (p *Tracer) UProbes() map[string]map[string]ebpfcommon.FunctionPrograms { + return nil +} + +func (p *Tracer) SocketFilters() []*ebpf.Program { + return nil +} + +func (p *Tracer) RecordInstrumentedLib(uint64) {} + +func (p *Tracer) UnlinkInstrumentedLib(uint64) {} + +func (p *Tracer) AddModuleCloser(uint64, ...io.Closer) {} + +func (p *Tracer) AlreadyInstrumentedLib(uint64) bool { + return false +} + +func (p *Tracer) SetupTC() { + if !p.cfg.EBPF.UseLinuxTC { + return + } + + informer := ifaces.NewWatcher(p.cfg.ChannelBufferLen) + registerer := ifaces.NewRegisterer(informer, p.cfg.ChannelBufferLen) + ctx := context.Background() + + p.log.Debug("subscribing for network interface events") + ifaceEvents, err := registerer.Subscribe(ctx) + if err != nil { + p.log.Error("instantiating interfaces' informer", "error", err) + return + } + + go func() { + for { + select { + case <-ctx.Done(): + slog.Debug("stopping interfaces' listener") + return + case event := <-ifaceEvents: + slog.Debug("received event", "event", event) + switch event.Type { + case ifaces.EventAdded: + p.registerTC(event.Interface) + case ifaces.EventDeleted: + // qdiscs, ingress and egress filters are automatically deleted so we don't need to + // specifically detach them from the ebpfFetcher + default: + slog.Warn("unknown event type", "event", event) + } + } + } + }() +} + +func (p *Tracer) Run(ctx context.Context, _ chan<- []request.Span) { + <-ctx.Done() + + p.bpfObjects.Close() + + p.closeTC() +} + +func (p *Tracer) registerTC(iface ifaces.Interface) { + // Load pre-compiled programs and maps into the kernel, and rewrites the configuration + ipvlan, err := netlink.LinkByIndex(iface.Index) + if err != nil { + p.log.Error("failed to lookup ipvlan device", "index", iface.Index, "name", iface.Name, "error", err) + return + } + qdiscAttrs := netlink.QdiscAttrs{ + LinkIndex: ipvlan.Attrs().Index, + Handle: netlink.MakeHandle(0xffff, 0), + Parent: netlink.HANDLE_CLSACT, + } + qdisc := &netlink.GenericQdisc{ + QdiscAttrs: qdiscAttrs, + QdiscType: "clsact", + } + if err := netlink.QdiscDel(qdisc); err == nil { + p.log.Warn("qdisc clsact already existed. Deleted it") + } + if err := netlink.QdiscAdd(qdisc); err != nil { + if errors.Is(err, fs.ErrExist) { + p.log.Warn("qdisc clsact already exists. Ignoring", "error", err) + } else { + p.log.Error("failed to create clsact qdisc on", "index", iface.Index, "name", iface.Name, "error", err) + return + } + } + p.qdiscs[iface] = qdisc + + if err := p.registerEgress(iface, ipvlan); err != nil { + p.log.Error("failed to install egress filters", "error", err) + } + + if err := p.registerIngress(iface, ipvlan); err != nil { + p.log.Error("failed to install ingres filters", "error", err) + } +} + +func (p *Tracer) registerEgress(iface ifaces.Interface, ipvlan netlink.Link) error { + // Fetch events on egress + egressAttrs := netlink.FilterAttrs{ + LinkIndex: ipvlan.Attrs().Index, + Parent: netlink.HANDLE_MIN_EGRESS, + Handle: netlink.MakeHandle(0, 1), + Protocol: 3, + Priority: 1, + } + egressFilter := &netlink.BpfFilter{ + FilterAttrs: egressAttrs, + Fd: p.bpfObjects.AppEgress.FD(), + Name: "tc/app_tc_egress", + DirectAction: true, + } + if err := netlink.FilterDel(egressFilter); err == nil { + p.log.Warn("egress filter already existed. Deleted it") + } + if err := netlink.FilterAdd(egressFilter); err != nil { + if errors.Is(err, fs.ErrExist) { + p.log.Warn("egress filter already exists. Ignoring", "error", err) + } else { + return fmt.Errorf("failed to create egress filter: %w", err) + } + } + + p.egressFilters[iface] = egressFilter + return nil +} + +func (p *Tracer) registerIngress(iface ifaces.Interface, ipvlan netlink.Link) error { + // Fetch events on ingress + ingressAttrs := netlink.FilterAttrs{ + LinkIndex: ipvlan.Attrs().Index, + Parent: netlink.HANDLE_MIN_INGRESS, + Handle: netlink.MakeHandle(0, 1), + Protocol: unix.ETH_P_ALL, + Priority: 1, + } + ingressFilter := &netlink.BpfFilter{ + FilterAttrs: ingressAttrs, + Fd: p.bpfObjects.AppIngress.FD(), + Name: "tc/app_tc_ingress", + DirectAction: true, + } + if err := netlink.FilterDel(ingressFilter); err == nil { + p.log.Warn("ingress filter already existed. Deleted it") + } + if err := netlink.FilterAdd(ingressFilter); err != nil { + if errors.Is(err, fs.ErrExist) { + p.log.Warn("ingress filter already exists. Ignoring", "error", err) + } else { + return fmt.Errorf("failed to create ingress filter: %w", err) + } + } + + p.ingressFilters[iface] = ingressFilter + return nil +} + +func (p *Tracer) closeTC() { + p.log.Info("removing traffic control probes") + + p.bpfObjects.AppEgress.Close() + p.bpfObjects.AppIngress.Close() + + // cleanup egress + for iface, ef := range p.egressFilters { + p.log.Debug("deleting egress filter", "interface", iface) + if err := doIgnoreNoDev(netlink.FilterDel, netlink.Filter(ef)); err != nil { + p.log.Error("deleting egress filter", "error", err) + } + } + p.egressFilters = map[ifaces.Interface]*netlink.BpfFilter{} + + // cleanup ingress + for iface, igf := range p.ingressFilters { + p.log.Debug("deleting ingress filter", "interface", iface) + if err := doIgnoreNoDev(netlink.FilterDel, netlink.Filter(igf)); err != nil { + p.log.Error("deleting ingress filter", "error", err) + } + } + p.ingressFilters = map[ifaces.Interface]*netlink.BpfFilter{} + + // cleanup qdiscs + for iface, qd := range p.qdiscs { + p.log.Debug("deleting Qdisc", "interface", iface) + if err := doIgnoreNoDev(netlink.QdiscDel, netlink.Qdisc(qd)); err != nil { + p.log.Error("deleting qdisc", "error", err) + } + } + p.qdiscs = map[ifaces.Interface]*netlink.GenericQdisc{} +} + +// doIgnoreNoDev runs the provided syscall over the provided device and ignores the error +// if the cause is a non-existing device (just logs the error as debug). +// If the agent is deployed as part of the Network Metrics pipeline, normally +// undeploying the FlowCollector could cause the agent to try to remove resources +// from Pods that have been removed immediately before (e.g. flowlogs-pipeline or the +// console plugin), so we avoid logging some errors that would unnecessarily raise the +// user's attention. +// This function uses generics because the set of provided functions accept different argument +// types. +func doIgnoreNoDev[T any](sysCall func(T) error, dev T) error { + if err := sysCall(dev); err != nil { + if errors.Is(err, unix.ENODEV) { + slog.Error("can't delete. Ignore this error if other pods or interfaces "+ + " are also being deleted at this moment. For example, if you are undeploying "+ + " a FlowCollector or Deployment where this agent is part of", + "error", err) + } else { + return err + } + } + return nil +} diff --git a/pkg/internal/ebpf/tctracer/tctracer_notlinux.go b/pkg/internal/ebpf/tctracer/tctracer_notlinux.go new file mode 100644 index 000000000..94574a71c --- /dev/null +++ b/pkg/internal/ebpf/tctracer/tctracer_notlinux.go @@ -0,0 +1,42 @@ +//go:build !linux + +// this file is emptied on purpose to allow Beyla compiling in non-linux environments + +package tctracer + +import ( + "context" + "io" + + "github.com/cilium/ebpf" + + "github.com/grafana/beyla/pkg/beyla" + ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" + "github.com/grafana/beyla/pkg/internal/exec" + "github.com/grafana/beyla/pkg/internal/goexec" + "github.com/grafana/beyla/pkg/internal/imetrics" + "github.com/grafana/beyla/pkg/internal/request" + "github.com/grafana/beyla/pkg/internal/svc" +) + +type Tracer struct{} + +func New(_ *beyla.Config, _ imetrics.Reporter) *Tracer { return nil } +func (p *Tracer) AllowPID(_, _ uint32, _ *svc.ID) {} +func (p *Tracer) BlockPID(_, _ uint32) {} +func (p *Tracer) Load() (*ebpf.CollectionSpec, error) { return nil, nil } +func (p *Tracer) BpfObjects() any { return nil } +func (p *Tracer) AddCloser(_ ...io.Closer) {} +func (p *Tracer) GoProbes() map[string][]ebpfcommon.FunctionPrograms { return nil } +func (p *Tracer) KProbes() map[string]ebpfcommon.FunctionPrograms { return nil } +func (p *Tracer) UProbes() map[string]map[string]ebpfcommon.FunctionPrograms { return nil } +func (p *Tracer) Tracepoints() map[string]ebpfcommon.FunctionPrograms { return nil } +func (p *Tracer) SocketFilters() []*ebpf.Program { return nil } +func (p *Tracer) RecordInstrumentedLib(_ uint64) {} +func (p *Tracer) UnlinkInstrumentedLib(_ uint64) {} +func (p *Tracer) AlreadyInstrumentedLib(_ uint64) bool { return false } +func (p *Tracer) Run(_ context.Context, _ chan<- []request.Span) {} +func (p *Tracer) Constants() map[string]any { return nil } +func (p *Tracer) SetupTC() {} +func (p *Tracer) SetupTailCalls() {} +func (p *Tracer) RegisterOffsets(_ *exec.FileInfo, _ *goexec.Offsets) {} diff --git a/pkg/internal/ebpf/tracer_linux.go b/pkg/internal/ebpf/tracer_linux.go index b42e4f19b..fc229e35a 100644 --- a/pkg/internal/ebpf/tracer_linux.go +++ b/pkg/internal/ebpf/tracer_linux.go @@ -88,12 +88,19 @@ func (pt *ProcessTracer) Run(ctx context.Context, out chan<- []request.Span) { // Searches for traceable functions trcrs := pt.Programs + wg := sync.WaitGroup{} + for _, t := range trcrs { - go t.Run(ctx, out) + wg.Add(1) + go func() { + t.Run(ctx, out) + wg.Done() + }() } - go func() { - <-ctx.Done() - }() + + <-ctx.Done() + + wg.Wait() } func (pt *ProcessTracer) loadSpec(p Tracer) (*ebpf.CollectionSpec, error) { @@ -108,77 +115,83 @@ func (pt *ProcessTracer) loadSpec(p Tracer) (*ebpf.CollectionSpec, error) { return spec, nil } -func (pt *ProcessTracer) loadTracers() error { - loadMux.Lock() - defer loadMux.Unlock() +func (pt *ProcessTracer) loadAndAssign(p Tracer) error { + spec, err := pt.loadSpec(p) - var log = ptlog() + if err != nil { + return err + } - i := instrumenter{} // dummy instrumenter to setup the kprobes, socket filters and tracepoint probes + collOpts, err := resolveInternalMaps(spec) - for _, p := range pt.Programs { - plog := log.With("program", reflect.TypeOf(p)) - plog.Debug("loading eBPF program", "type", pt.Type) - spec, err := pt.loadSpec(p) - if err != nil { - return err - } + if err != nil { + return err + } - collOpts, err := resolveInternalMaps(spec) - if err != nil { - return err - } + collOpts.Programs = ebpf.ProgramOptions{LogSize: 640 * 1024} - collOpts.Programs = ebpf.ProgramOptions{LogSize: 640 * 1024} + return spec.LoadAndAssign(p.BpfObjects(), collOpts) +} - if err := spec.LoadAndAssign(p.BpfObjects(), collOpts); err != nil { - if strings.Contains(err.Error(), "unknown func bpf_probe_write_user") { - plog.Warn("Failed to enable distributed tracing context-propagation on a Linux Kernel without write memory support. " + - "To avoid seeing this message, please ensure you have correctly mounted /sys/kernel/security. " + - "and ensure beyla has the SYS_ADMIN linux capability" + - "For more details set BEYLA_LOG_LEVEL=DEBUG.") +func (pt *ProcessTracer) loadTracer(p Tracer, log *slog.Logger) error { + plog := log.With("program", reflect.TypeOf(p)) + plog.Debug("loading eBPF program", "type", pt.Type) - common.IntegrityModeOverride = true - spec, err = pt.loadSpec(p) - if err == nil { + err := pt.loadAndAssign(p) - collOpts, err = resolveInternalMaps(spec) - if err != nil { - return err - } + if err != nil && strings.Contains(err.Error(), "unknown func bpf_probe_write_user") { + plog.Warn("Failed to enable distributed tracing context-propagation on a " + + "Linux Kernel without write memory support. " + + "To avoid seeing this message, please ensure you have correctly mounted /sys/kernel/security. " + + "and ensure beyla has the SYS_ADMIN linux capability" + + "For more details set BEYLA_LOG_LEVEL=DEBUG.") - collOpts.Programs = ebpf.ProgramOptions{LogSize: 640 * 1024} + common.IntegrityModeOverride = true + err = pt.loadAndAssign(p) + } - err = spec.LoadAndAssign(p.BpfObjects(), collOpts) - } - } - if err != nil { - printVerifierErrorInfo(err) - return fmt.Errorf("loading and assigning BPF objects: %w", err) - } - } + if err != nil { + printVerifierErrorInfo(err) + return fmt.Errorf("loading and assigning BPF objects: %w", err) + } - // Setup any tail call jump tables - p.SetupTailCalls() + // Setup any tail call jump tables + p.SetupTailCalls() - // Setup any traffic control probes - p.SetupTC() + // Setup any traffic control probes + p.SetupTC() - // Kprobes to be used for native instrumentation points - if err := i.kprobes(p); err != nil { - printVerifierErrorInfo(err) - return err - } + i := instrumenter{} // dummy instrumenter to setup the kprobes, socket filters and tracepoint probes - // Tracepoints support - if err := i.tracepoints(p); err != nil { - printVerifierErrorInfo(err) - return err - } + // Kprobes to be used for native instrumentation points + if err := i.kprobes(p); err != nil { + printVerifierErrorInfo(err) + return err + } - // Sock filters support - if err := i.sockfilters(p); err != nil { - printVerifierErrorInfo(err) + // Tracepoints support + if err := i.tracepoints(p); err != nil { + printVerifierErrorInfo(err) + return err + } + + // Sock filters support + if err := i.sockfilters(p); err != nil { + printVerifierErrorInfo(err) + return err + } + + return nil +} + +func (pt *ProcessTracer) loadTracers() error { + loadMux.Lock() + defer loadMux.Unlock() + + var log = ptlog() + + for _, p := range pt.Programs { + if err := pt.loadTracer(p, log); err != nil { return err } }