Skip to content

Commit

Permalink
Split eBPF load and attach for Go programs (#1169)
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski authored Sep 19, 2024
1 parent 008f4f2 commit 9a23f73
Show file tree
Hide file tree
Showing 80 changed files with 962 additions and 546 deletions.
40 changes: 28 additions & 12 deletions bpf/go_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@
#include "tracer_common.h"
#include "tracing.h"
#include "trace_util.h"
#include "go_offsets.h"
#include "go_traceparent.h"

volatile const u64 conn_fd_pos;
volatile const u64 fd_laddr_pos;
volatile const u64 fd_raddr_pos;
volatile const u64 tcp_addr_port_ptr_pos;
volatile const u64 tcp_addr_ip_ptr_pos;

char __license[] SEC("license") = "Dual MIT/GPL";

// Temporary information about a function invocation. It stores the invocation time of a function
Expand Down Expand Up @@ -175,6 +170,10 @@ server_trace_parent(void *goroutine_addr, tp_info_t *tp, void *req_header) {

urand_bytes(tp->span_id, SPAN_ID_SIZE_BYTES);
bpf_map_update_elem(&go_trace_map, &goroutine_addr, tp, BPF_ANY);

unsigned char tp_buf[TP_MAX_VAL_LENGTH];
make_tp_string(tp_buf, tp);
bpf_dbg_printk("tp: %s", tp_buf);
}

static __always_inline u8 client_trace_parent(void *goroutine_addr,
Expand Down Expand Up @@ -225,11 +224,19 @@ static __always_inline u8 client_trace_parent(void *goroutine_addr,
static __always_inline void read_ip_and_port(u8 *dst_ip, u16 *dst_port, void *src) {
s64 addr_len = 0;
void *addr_ip = 0;

bpf_probe_read(dst_port, sizeof(u16), (void *)(src + tcp_addr_port_ptr_pos));
bpf_probe_read(&addr_ip, sizeof(addr_ip), (void *)(src + tcp_addr_ip_ptr_pos));
off_table_t *ot = get_offsets_table();

bpf_probe_read(dst_port,
sizeof(u16),
(void *)(src + go_offset_of(ot, (go_offset){.v = _tcp_addr_port_ptr_pos})));
bpf_probe_read(&addr_ip,
sizeof(addr_ip),
(void *)(src + go_offset_of(ot, (go_offset){.v = _tcp_addr_ip_ptr_pos})));
if (addr_ip) {
bpf_probe_read(&addr_len, sizeof(addr_len), (void *)(src + tcp_addr_ip_ptr_pos + 8));
bpf_probe_read(
&addr_len,
sizeof(addr_len),
(void *)(src + go_offset_of(ot, (go_offset){.v = _tcp_addr_ip_ptr_pos}) + 8));
if (addr_len == 4) {
__builtin_memcpy(dst_ip, ip4ip6_prefix, sizeof(ip4ip6_prefix));
bpf_probe_read(dst_ip + sizeof(ip4ip6_prefix), 4, addr_ip);
Expand All @@ -243,11 +250,15 @@ static __always_inline u8 get_conn_info_from_fd(void *fd_ptr, connection_info_t
if (fd_ptr) {
void *laddr_ptr = 0;
void *raddr_ptr = 0;
off_table_t *ot = get_offsets_table();
u64 fd_laddr_pos = go_offset_of(ot, (go_offset){.v = _fd_laddr_pos});

bpf_probe_read(
&laddr_ptr, sizeof(laddr_ptr), (void *)(fd_ptr + fd_laddr_pos + 8)); // find laddr
bpf_probe_read(
&raddr_ptr, sizeof(raddr_ptr), (void *)(fd_ptr + fd_raddr_pos + 8)); // find raddr
&raddr_ptr,
sizeof(raddr_ptr),
(void *)(fd_ptr + go_offset_of(ot, (go_offset){.v = _fd_raddr_pos}) + 8)); // find raddr

bpf_dbg_printk("laddr_ptr %llx, laddr %llx, raddr %llx",
fd_ptr + fd_laddr_pos + 8,
Expand Down Expand Up @@ -279,7 +290,12 @@ static __always_inline u8 get_conn_info_from_fd(void *fd_ptr, connection_info_t
static __always_inline u8 get_conn_info(void *conn_ptr, connection_info_t *info) {
if (conn_ptr) {
void *fd_ptr = 0;
bpf_probe_read(&fd_ptr, sizeof(fd_ptr), (void *)(conn_ptr + conn_fd_pos)); // find fd
off_table_t *ot = get_offsets_table();

bpf_probe_read(
&fd_ptr,
sizeof(fd_ptr),
(void *)(conn_ptr + go_offset_of(ot, (go_offset){.v = _conn_fd_pos}))); // find fd

bpf_dbg_printk("Found fd ptr %llx", fd_ptr);

Expand Down
126 changes: 80 additions & 46 deletions bpf/go_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,26 +93,6 @@ struct {
#define TRANSPORT_HTTP2 1
#define TRANSPORT_HANDLER 2

// To be Injected from the user space during the eBPF program load & initialization

volatile const u64 grpc_stream_st_ptr_pos;
volatile const u64 grpc_stream_method_ptr_pos;
volatile const u64 grpc_status_s_pos;
volatile const u64 grpc_status_code_ptr_pos;
volatile const u64 tcp_addr_port_ptr_pos;
volatile const u64 tcp_addr_ip_ptr_pos;
volatile const u64 grpc_stream_ctx_ptr_pos;
volatile const u64 value_context_val_ptr_pos;
volatile const u64 grpc_st_conn_pos;
volatile const u64 grpc_t_conn_pos;
volatile const u64 grpc_t_scheme_pos;

// Context propagation
volatile const u64 http2_client_next_id_pos;
volatile const u64 framer_w_pos;
volatile const u64 grpc_transport_buf_writer_buf_pos;
volatile const u64 grpc_transport_buf_writer_offset_pos;

#define OPTIMISTIC_GRPC_ENCODED_HEADER_LEN \
49 // 1 + 1 + 8 + 1 +~ 38 = type byte + hpack_len_as_byte("traceparent") + strlen(hpack("traceparent")) + len_as_byte(38) + hpack(generated tracepanent id)

Expand All @@ -123,6 +103,7 @@ int uprobe_server_handleStream(struct pt_regs *ctx) {
bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);

void *stream_ptr = GO_PARAM4(ctx);
off_table_t *ot = get_offsets_table();

grpc_srv_func_invocation_t invocation = {
.start_monotime_ns = bpf_ktime_get_ns(),
Expand All @@ -135,12 +116,16 @@ int uprobe_server_handleStream(struct pt_regs *ctx) {
// Read the embedded context object ptr
bpf_probe_read(&ctx_ptr,
sizeof(ctx_ptr),
(void *)(stream_ptr + grpc_stream_ctx_ptr_pos + sizeof(void *)));
(void *)(stream_ptr +
go_offset_of(ot, (go_offset){.v = _grpc_stream_ctx_ptr_pos}) +
sizeof(void *)));

if (ctx_ptr) {
server_trace_parent(goroutine_addr,
&invocation.tp,
(void *)(ctx_ptr + value_context_val_ptr_pos + sizeof(void *)));
server_trace_parent(
goroutine_addr,
&invocation.tp,
(void *)(ctx_ptr + go_offset_of(ot, (go_offset){.v = _value_context_val_ptr_pos}) +
sizeof(void *)));
}
}

Expand Down Expand Up @@ -222,6 +207,8 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/server_handleStream return === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
off_table_t *ot = get_offsets_table();

bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);

grpc_srv_func_invocation_t *invocation =
Expand All @@ -239,6 +226,8 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
}

void *stream_ptr = (void *)invocation->stream;
u64 grpc_stream_method_ptr_pos =
go_offset_of(ot, (go_offset){.v = _grpc_stream_method_ptr_pos});
bpf_dbg_printk("stream_ptr %lx, method pos %lx", stream_ptr, grpc_stream_method_ptr_pos);

http_request_trace *trace = bpf_ringbuf_reserve(&events, sizeof(http_request_trace), 0);
Expand Down Expand Up @@ -275,8 +264,11 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
void *st_ptr = 0;
u8 found_conn = 0;
// Read the embedded object ptr
bpf_probe_read(
&st_ptr, sizeof(st_ptr), (void *)(stream_ptr + grpc_stream_st_ptr_pos + sizeof(void *)));
bpf_probe_read(&st_ptr,
sizeof(st_ptr),
(void *)(stream_ptr +
go_offset_of(ot, (go_offset){.v = _grpc_stream_st_ptr_pos}) +
sizeof(void *)));

bpf_dbg_printk("st_ptr %llx", st_ptr);
if (st_ptr) {
Expand Down Expand Up @@ -315,20 +307,28 @@ int uprobe_transport_writeStatus(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/transport_writeStatus === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
off_table_t *ot = get_offsets_table();

bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);

void *status_ptr = GO_PARAM3(ctx);
bpf_dbg_printk("status_ptr %lx", status_ptr);

if (status_ptr != NULL) {
void *s_ptr;
bpf_probe_read(&s_ptr, sizeof(s_ptr), (void *)(status_ptr + grpc_status_s_pos));
bpf_probe_read(
&s_ptr,
sizeof(s_ptr),
(void *)(status_ptr + go_offset_of(ot, (go_offset){.v = _grpc_status_s_pos})));

bpf_dbg_printk("s_ptr %lx", s_ptr);

if (s_ptr != NULL) {
u16 status = -1;
bpf_probe_read(&status, sizeof(status), (void *)(s_ptr + grpc_status_code_ptr_pos));
bpf_probe_read(
&status,
sizeof(status),
(void *)(s_ptr + go_offset_of(ot, (go_offset){.v = _grpc_status_code_ptr_pos})));
bpf_dbg_printk("status code %d", status);
bpf_map_update_elem(&ongoing_grpc_request_status, &goroutine_addr, &status, BPF_ANY);
}
Expand All @@ -348,13 +348,16 @@ static __always_inline void clientConnStart(
.tp = {0},
.flags = 0,
};
off_table_t *ot = get_offsets_table();

if (ctx_ptr) {
void *val_ptr = 0;
// Read the embedded val object ptr from ctx if there's one
bpf_probe_read(&val_ptr,
sizeof(val_ptr),
(void *)(ctx_ptr + value_context_val_ptr_pos + sizeof(void *)));
(void *)(ctx_ptr +
go_offset_of(ot, (go_offset){.v = _value_context_val_ptr_pos}) +
sizeof(void *)));

invocation.flags = client_trace_parent(goroutine_addr, &invocation.tp, (void *)(val_ptr));
} else {
Expand Down Expand Up @@ -522,18 +525,20 @@ int uprobe_transport_http2Client_NewStream(struct pt_regs *ctx) {

void *goroutine_addr = GOROUTINE_PTR(ctx);
void *t_ptr = GO_PARAM1(ctx);
off_table_t *ot = get_offsets_table();

u64 grpc_t_conn_pos = go_offset_of(ot, (go_offset){.v = _grpc_t_scheme_pos});
bpf_dbg_printk(
"goroutine_addr %lx, t_ptr %llx, t.conn_pos %x", goroutine_addr, t_ptr, grpc_t_conn_pos);

if (t_ptr) {
void *conn_ptr = t_ptr + grpc_t_conn_pos + 8;
void *conn_ptr = t_ptr + go_offset_of(ot, (go_offset){.v = _grpc_t_conn_pos}) + 8;
u8 buf[16];
u64 is_secure = 0;

void *s_ptr = 0;
buf[0] = 0;
bpf_probe_read(&s_ptr, sizeof(s_ptr), (void *)(t_ptr + grpc_t_scheme_pos));
bpf_probe_read(&s_ptr, sizeof(s_ptr), (void *)(t_ptr + grpc_t_conn_pos));
bpf_probe_read(buf, sizeof(buf), s_ptr);

//bpf_dbg_printk("scheme %s", buf);
Expand Down Expand Up @@ -565,7 +570,10 @@ int uprobe_transport_http2Client_NewStream(struct pt_regs *ctx) {
#ifndef NO_HEADER_PROPAGATION
u32 next_id = 0;
// Read the next stream id from the httpClient
bpf_probe_read(&next_id, sizeof(next_id), (void *)(t_ptr + http2_client_next_id_pos));
bpf_probe_read(
&next_id,
sizeof(next_id),
(void *)(t_ptr + go_offset_of(ot, (go_offset){.v = _http2_client_next_id_pos})));

bpf_dbg_printk("next_id %d", next_id);

Expand Down Expand Up @@ -608,14 +616,17 @@ SEC("uprobe/grpcFramerWriteHeaders")
int uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc Framer writeHeaders === ");

if (framer_w_pos == 0) {
void *framer = GO_PARAM1(ctx);
u64 stream_id = (u64)GO_PARAM2(ctx);

off_table_t *ot = get_offsets_table();
u64 framer_w_pos = go_offset_of(ot, (go_offset){.v = _framer_w_pos});

if (framer_w_pos == -1) {
bpf_dbg_printk("framer w not found");
return 0;
}

void *framer = GO_PARAM1(ctx);
u64 stream_id = (u64)GO_PARAM2(ctx);

bpf_dbg_printk(
"framer=%llx, stream_id=%lld, framer_w_pos %llx", framer, ((u64)stream_id), framer_w_pos);

Expand All @@ -634,7 +645,10 @@ int uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
if (w_ptr) {
s64 offset;
bpf_probe_read(
&offset, sizeof(offset), (void *)(w_ptr + grpc_transport_buf_writer_offset_pos));
&offset,
sizeof(offset),
(void *)(w_ptr + go_offset_of(
ot, (go_offset){.v = _grpc_transport_buf_writer_offset_pos})));

bpf_dbg_printk("Found initial data offset %d", offset);

Expand Down Expand Up @@ -674,13 +688,18 @@ int uprobe_grpcFramerWriteHeaders_returns(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc Framer writeHeaders returns === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
off_table_t *ot = get_offsets_table();

grpc_framer_func_invocation_t *f_info =
bpf_map_lookup_elem(&grpc_framer_invocation_map, &goroutine_addr);

if (f_info) {
void *w_ptr = (void *)(f_info->framer_ptr + framer_w_pos + 16);
bpf_probe_read(&w_ptr, sizeof(w_ptr), (void *)(f_info->framer_ptr + framer_w_pos + 8));
void *w_ptr =
(void *)(f_info->framer_ptr + go_offset_of(ot, (go_offset){.v = _framer_w_pos}) + 16);
bpf_probe_read(
&w_ptr,
sizeof(w_ptr),
(void *)(f_info->framer_ptr + go_offset_of(ot, (go_offset){.v = _framer_w_pos}) + 8));

if (w_ptr) {
void *buf_arr = 0;
Expand All @@ -691,13 +710,24 @@ int uprobe_grpcFramerWriteHeaders_returns(struct pt_regs *ctx) {
bpf_probe_read(
&buf_arr,
sizeof(buf_arr),
(void
*)(w_ptr +
go_offset_of(
ot,
(go_offset){
.v =
_grpc_transport_buf_writer_buf_pos}))); // the buffer is the first field
bpf_probe_read(
&n,
sizeof(n),
(void *)(w_ptr + go_offset_of(
ot, (go_offset){.v = _grpc_transport_buf_writer_offset_pos})));
bpf_probe_read(
&cap,
sizeof(cap),
(void *)(w_ptr +
grpc_transport_buf_writer_buf_pos)); // the buffer is the first field
bpf_probe_read(&n, sizeof(n), (void *)(w_ptr + grpc_transport_buf_writer_offset_pos));
bpf_probe_read(&cap,
sizeof(cap),
(void *)(w_ptr + grpc_transport_buf_writer_offset_pos +
16)); // the offset of the capacity is 2 * 8 bytes from the buf
go_offset_of(ot, (go_offset){.v = _grpc_transport_buf_writer_offset_pos}) +
16)); // the offset of the capacity is 2 * 8 bytes from the buf

bpf_clamp_umax(off, MAX_W_PTR_OFFSET);

Expand Down Expand Up @@ -730,7 +760,11 @@ int uprobe_grpcFramerWriteHeaders_returns(struct pt_regs *ctx) {
n += TP_MAX_VAL_LENGTH;
// Update the value of n in w to reflect the new size
bpf_probe_write_user(
(void *)(w_ptr + grpc_transport_buf_writer_offset_pos), &n, sizeof(n));
(void *)(w_ptr +
go_offset_of(ot,
(go_offset){.v = _grpc_transport_buf_writer_offset_pos})),
&n,
sizeof(n));

// http2 encodes the length of the headers in the first 3 bytes of buf, we need to update those
u8 size_1 = 0;
Expand Down
Loading

0 comments on commit 9a23f73

Please sign in to comment.