Skip to content

Commit

Permalink
Split tc programs from generic tracer (grafana#1267)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelroquetto authored Oct 18, 2024
1 parent 66e17c6 commit ca5f23e
Show file tree
Hide file tree
Showing 41 changed files with 1,370 additions and 436 deletions.
25 changes: 25 additions & 0 deletions bpf/http_maps.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#ifndef HTTP_MAPS_H
#define HTTP_MAPS_H

#include "vmlinux.h"
#include "bpf_helpers.h"
#include "http_types.h"

// Keeps track of the ongoing http connections we match for request/response
struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, pid_connection_info_t);
__type(value, http_info_t);
__uint(max_entries, MAX_CONCURRENT_SHARED_REQUESTS);
__uint(pinning, BEYLA_PIN_INTERNAL);
} ongoing_http SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, connection_info_t);
__type(value, http_info_t);
__uint(max_entries, 1024);
__uint(pinning, BEYLA_PIN_INTERNAL);
} ongoing_http_fallback SEC(".maps");

#endif
1 change: 0 additions & 1 deletion bpf/http_ssl_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "http_types.h"
#include "k_tracer_defs.h"
#include "bpf_dbg.h"
#include "pid.h"
#include "sockaddr.h"
#include "tcp_info.h"
#include "pin_internal.h"
Expand Down
89 changes: 0 additions & 89 deletions bpf/k_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "tcp_info.h"
#include "k_tracer_defs.h"
#include "http_ssl_defs.h"
#include "tc_ip.h"
#include "pin_internal.h"

// Temporary tracking of accept arguments
Expand Down Expand Up @@ -691,91 +690,3 @@ int BPF_KPROBE(kprobe_sys_exit, int status) {

return 0;
}

SEC("tc_ingress")
int app_ingress(struct __sk_buff *skb) {
//bpf_printk("ingress");

protocol_info_t tcp = {};
connection_info_t conn = {};

if (!read_sk_buff(skb, &tcp, &conn)) {
return 0;
}

if (tcp_ack(&tcp)) { // ack field must be set, which means we are looking at non SYN packet
// assumes we are the only ones that added options, this can be improved
if (tcp.h_proto == ETH_P_IP && tcp.ip_len == MIN_IP_LEN + MAX_TC_TP_LEN) {
parse_ip_options_ipv4(skb, &conn, &tcp);
} else if (tcp.h_proto == ETH_P_IPV6 &&
tcp.l4_proto == IP_V6_DEST_OPTS) { // Destination options used
parse_ip_options_ipv6(skb, &conn, &tcp);
}
}

return 0;
}

static __always_inline void
update_outgoing_request_span_id(connection_info_t *conn, protocol_info_t *tcp, tp_info_pid_t *tp) {
pid_connection_info_t p_conn = {};
__builtin_memcpy(&p_conn.conn, conn, sizeof(connection_info_t));
p_conn.pid = tp->pid;

http_info_t *h_info = bpf_map_lookup_elem(&ongoing_http, &p_conn);
if (h_info && tp->valid) {
bpf_printk("Found HTTP info, resetting the span id to %x%x", tcp->seq, tcp->ack);
*((u32 *)(&h_info->tp.span_id[0])) = tcp->seq;
*((u32 *)(&h_info->tp.span_id[4])) = tcp->ack;
}
}

static __always_inline void encode_data_in_ip_options(struct __sk_buff *skb,
connection_info_t *conn,
protocol_info_t *tcp,
tp_info_pid_t *tp) {
// Handling IPv4
// We only do this if the IP header doesn't have any options, this can be improved if needed
if (tcp->h_proto == ETH_P_IP && tcp->ip_len == MIN_IP_LEN) {
bpf_printk("Adding the trace_id in the IP Options");

if (inject_tc_ip_options_ipv4(skb, conn, tcp, tp)) {
update_outgoing_request_span_id(conn, tcp, tp);
}

bpf_map_delete_elem(&outgoing_trace_map, conn);
} else if (tcp->h_proto == ETH_P_IPV6 && tcp->l4_proto == IPPROTO_TCP) { // Handling IPv6
bpf_printk("Found IPv6 header");

if (inject_tc_ip_options_ipv6(skb, conn, tcp, tp)) {
update_outgoing_request_span_id(conn, tcp, tp);
}

bpf_map_delete_elem(&outgoing_trace_map, conn);
}
}

SEC("tc_egress")
int app_egress(struct __sk_buff *skb) {
//bpf_printk("egress");

protocol_info_t tcp = {};
connection_info_t conn = {};

if (!read_sk_buff(skb, &tcp, &conn)) {
return 0;
}

sort_connection_info(&conn);

tp_info_pid_t *tp = bpf_map_lookup_elem(&outgoing_trace_map, &conn);

if (tp) {
bpf_printk("egress flags %x, sequence %x", tcp.flags, tcp.seq);
print_http_connection_info(&conn);

encode_data_in_ip_options(skb, &conn, &tcp, tp);
}

return 0;
}
1 change: 0 additions & 1 deletion bpf/protocol_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "bpf_helpers.h"
#include "http_types.h"
#include "ringbuf.h"
#include "pid.h"
#include "bpf_dbg.h"
#include "pin_internal.h"

Expand Down
19 changes: 1 addition & 18 deletions bpf/protocol_http.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
#include "bpf_helpers.h"
#include "http_types.h"
#include "ringbuf.h"
#include "pid.h"
#include "runtime.h"
#include "protocol_common.h"
#include "trace_common.h"
#include "pin_internal.h"
#include "http_maps.h"

volatile const u32 high_request_volume;

Expand All @@ -22,23 +22,6 @@ struct {
__uint(max_entries, 1);
} http_info_mem SEC(".maps");

// Keeps track of the ongoing http connections we match for request/response
struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, pid_connection_info_t);
__type(value, http_info_t);
__uint(max_entries, MAX_CONCURRENT_SHARED_REQUESTS);
__uint(pinning, BEYLA_PIN_INTERNAL);
} ongoing_http SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, connection_info_t);
__type(value, http_info_t);
__uint(max_entries, 1024);
__uint(pinning, BEYLA_PIN_INTERNAL);
} ongoing_http_fallback SEC(".maps");

// empty_http_info zeroes and return the unique percpu copy in the map
// this function assumes that a given thread is not trying to use many
// instances at the same time
Expand Down
1 change: 0 additions & 1 deletion bpf/protocol_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "bpf_helpers.h"
#include "http_types.h"
#include "ringbuf.h"
#include "pid.h"
#include "protocol_common.h"
#include "http2_grpc.h"
#include "pin_internal.h"
Expand Down
1 change: 0 additions & 1 deletion bpf/protocol_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "bpf_helpers.h"
#include "http_types.h"
#include "ringbuf.h"
#include "pid.h"
#include "protocol_common.h"
#include "trace_common.h"
#include "pin_internal.h"
Expand Down
98 changes: 98 additions & 0 deletions bpf/tc_tracer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#include "vmlinux.h"
#include "bpf_helpers.h"
#include "bpf_dbg.h"

#include "http_maps.h"
#include "http_types.h"
#include "tc_ip.h"
#include "tcp_info.h"

char __license[] SEC("license") = "Dual MIT/GPL";

SEC("tc_ingress")
int app_ingress(struct __sk_buff *skb) {
//bpf_printk("ingress");

protocol_info_t tcp = {};
connection_info_t conn = {};

if (!read_sk_buff(skb, &tcp, &conn)) {
return 0;
}

if (tcp_ack(&tcp)) { // ack field must be set, which means we are looking at non SYN packet
// assumes we are the only ones that added options, this can be improved
if (tcp.h_proto == ETH_P_IP && tcp.ip_len == MIN_IP_LEN + MAX_TC_TP_LEN) {
parse_ip_options_ipv4(skb, &conn, &tcp);
} else if (tcp.h_proto == ETH_P_IPV6 &&
tcp.l4_proto == IP_V6_DEST_OPTS) { // Destination options used
parse_ip_options_ipv6(skb, &conn, &tcp);
}
}

return 0;
}

static __always_inline void
update_outgoing_request_span_id(connection_info_t *conn, protocol_info_t *tcp, tp_info_pid_t *tp) {
pid_connection_info_t p_conn = {};
__builtin_memcpy(&p_conn.conn, conn, sizeof(connection_info_t));
p_conn.pid = tp->pid;

http_info_t *h_info = bpf_map_lookup_elem(&ongoing_http, &p_conn);
if (h_info && tp->valid) {
bpf_printk("Found HTTP info, resetting the span id to %x%x", tcp->seq, tcp->ack);
*((u32 *)(&h_info->tp.span_id[0])) = tcp->seq;
*((u32 *)(&h_info->tp.span_id[4])) = tcp->ack;
}
}

static __always_inline void encode_data_in_ip_options(struct __sk_buff *skb,
connection_info_t *conn,
protocol_info_t *tcp,
tp_info_pid_t *tp) {
// Handling IPv4
// We only do this if the IP header doesn't have any options, this can be improved if needed
if (tcp->h_proto == ETH_P_IP && tcp->ip_len == MIN_IP_LEN) {
bpf_printk("Adding the trace_id in the IP Options");

if (inject_tc_ip_options_ipv4(skb, conn, tcp, tp)) {
update_outgoing_request_span_id(conn, tcp, tp);
}

bpf_map_delete_elem(&outgoing_trace_map, conn);
} else if (tcp->h_proto == ETH_P_IPV6 && tcp->l4_proto == IPPROTO_TCP) { // Handling IPv6
bpf_printk("Found IPv6 header");

if (inject_tc_ip_options_ipv6(skb, conn, tcp, tp)) {
update_outgoing_request_span_id(conn, tcp, tp);
}

bpf_map_delete_elem(&outgoing_trace_map, conn);
}
}

SEC("tc_egress")
int app_egress(struct __sk_buff *skb) {
//bpf_printk("egress");

protocol_info_t tcp = {};
connection_info_t conn = {};

if (!read_sk_buff(skb, &tcp, &conn)) {
return 0;
}

sort_connection_info(&conn);

tp_info_pid_t *tp = bpf_map_lookup_elem(&outgoing_trace_map, &conn);

if (tp) {
bpf_printk("egress flags %x, sequence %x", tcp.flags, tcp.seq);
print_http_connection_info(&conn);

encode_data_in_ip_options(skb, &conn, &tcp, tp);
}

return 0;
}
4 changes: 4 additions & 0 deletions pkg/beyla/os.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func checkCapabilitiesForSetOptions(config *Config, caps *helpers.OSCapabilities
if config.Enabled(FeatureAppO11y) {
testAndSet(caps, capError, unix.CAP_CHECKPOINT_RESTORE)
testAndSet(caps, capError, unix.CAP_SYS_PTRACE)

if config.EBPF.UseLinuxTC {
testAndSet(caps, capError, unix.CAP_NET_ADMIN)
}
}

if config.Enabled(FeatureNetO11y) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ func RunBeyla(ctx context.Context, cfg *beyla.Config) error {

func setupAppO11y(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config) error {
slog.Info("starting Beyla in Application Observability mode")

wg := sync.WaitGroup{}
defer wg.Wait()

instr := appolly.New(ctx, ctxInfo, config)
if err := instr.FindAndInstrument(); err != nil {
if err := instr.FindAndInstrument(&wg); err != nil {
return fmt.Errorf("can't find target process: %w", err)
}
if err := instr.ReadAndForward(); err != nil {
Expand Down
11 changes: 9 additions & 2 deletions pkg/internal/appolly/appolly.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"log/slog"
"sync"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/discover"
Expand Down Expand Up @@ -43,15 +44,17 @@ func New(ctx context.Context, ctxInfo *global.ContextInfo, config *beyla.Config)

// FindAndInstrument searches in background for any new executable matching the
// selection criteria.
func (i *Instrumenter) FindAndInstrument() error {
func (i *Instrumenter) FindAndInstrument(wg *sync.WaitGroup) error {
finder := discover.NewProcessFinder(i.ctx, i.config, i.ctxInfo, i.tracesInput)
foundProcesses, deletedProcesses, err := finder.Start()
if err != nil {
return fmt.Errorf("couldn't start Process Finder: %w", err)
}
// In background, listen indefinitely for each new process and run its
// associated ebpf.ProcessTracer once it is found.
wg.Add(1)
go func() {
defer wg.Done()
log := log()
type cancelCtx struct {
ctx context.Context
Expand All @@ -76,7 +79,11 @@ func (i *Instrumenter) FindAndInstrument() error {
cctx.ctx, cctx.cancel = context.WithCancel(i.ctx)
contexts[pt.FileInfo.Ino] = cctx
}
go pt.Tracer.Run(cctx.ctx, i.tracesInput)
wg.Add(1)
go func() {
defer wg.Done()
pt.Tracer.Run(cctx.ctx, i.tracesInput)
}()
}
case dp := <-deletedProcesses:
log.Debug("stopping ProcessTracer because there are no more instances of such process",
Expand Down
5 changes: 5 additions & 0 deletions pkg/internal/discover/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/beyla/pkg/internal/ebpf"
"github.com/grafana/beyla/pkg/internal/ebpf/generictracer"
"github.com/grafana/beyla/pkg/internal/ebpf/gotracer"
"github.com/grafana/beyla/pkg/internal/ebpf/tctracer"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
Expand Down Expand Up @@ -97,5 +98,9 @@ func newGoTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Trac
}

func newGenericTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer {
if cfg.EBPF.UseLinuxTC {
return []ebpf.Tracer{generictracer.New(cfg, metrics), tctracer.New(cfg)}
}

return []ebpf.Tracer{generictracer.New(cfg, metrics)}
}
Loading

0 comments on commit ca5f23e

Please sign in to comment.