Skip to content

Commit

Permalink
Don't read Go maps anymore (#1413)
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski authored Nov 27, 2024
1 parent 043c46e commit 9efc255
Show file tree
Hide file tree
Showing 44 changed files with 1,555 additions and 380 deletions.
1,115 changes: 1,115 additions & 0 deletions .gitignore

Large diffs are not rendered by default.

106 changes: 72 additions & 34 deletions bpf/go_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
#include "tracing.h"
#include "trace_util.h"
#include "go_offsets.h"
#include "go_traceparent.h"
#include "pin_internal.h"

char __license[] SEC("license") = "Dual MIT/GPL";

enum { W3C_KEY_LENGTH = 11, W3C_VAL_LENGTH = 55 };

// Temporary information about a function invocation. It stores the invocation time of a function
// as well as the value of registers at the invocation time. This way we can retrieve them at the
// return uprobes so we can know the values of the function arguments (which are passed as registers
Expand Down Expand Up @@ -78,8 +79,9 @@ struct {
} ongoing_grpc_operate_headers SEC(".maps");

typedef struct grpc_transports {
u8 type;
connection_info_t conn;
tp_info_t tp;
u8 type;
} grpc_transports_t;

// TODO: use go_addr_key_t as key
Expand All @@ -105,6 +107,30 @@ struct {
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_sql_queries SEC(".maps");

typedef struct grpc_header_field {
u8 *key_ptr;
u64 key_len;
u8 *val_ptr;
u64 val_len;
u64 sensitive;
} grpc_header_field_t;

// assumes s2 is all lowercase
static __always_inline int bpf_memicmp(const char *s1, const char *s2, s32 size) {
for (int i = 0; i < size; i++) {
if (s1[i] != s2[i] && s1[i] != (s2[i] - 32)) // compare with each uppercase character
{
return i + 1;
}
}

return 0;
}

static __always_inline u8 valid_trace(const unsigned char *trace_id) {
return *((u64 *)trace_id) != 0 || *((u64 *)(trace_id + 8)) != 0;
}

static __always_inline void go_addr_key_from_id(go_addr_key_t *current, void *addr) {
u64 pid_tid = bpf_get_current_pid_tgid();
u32 pid = pid_from_pid_tgid(pid_tid);
Expand Down Expand Up @@ -178,24 +204,14 @@ static __always_inline void tp_clone(tp_info_t *dest, tp_info_t *src) {
}

static __always_inline void
server_trace_parent(void *goroutine_addr, tp_info_t *tp, void *req_header) {
server_trace_parent(void *goroutine_addr, tp_info_t *tp, tp_info_t *found_tp) {
// May get overriden when decoding existing traceparent, but otherwise we set sample ON
tp->flags = 1;
// Get traceparent from the Request.Header
void *traceparent_ptr = extract_traceparent_from_req_headers(req_header);
go_addr_key_t g_key = {};
go_addr_key_from_id(&g_key, goroutine_addr);
if (traceparent_ptr != NULL) {
unsigned char buf[TP_MAX_VAL_LENGTH];
long res = bpf_probe_read(buf, sizeof(buf), traceparent_ptr);
if (res < 0) {
bpf_dbg_printk("can't copy traceparent header");
urand_bytes(tp->trace_id, TRACE_ID_SIZE_BYTES);
*((u64 *)tp->parent_id) = 0;
} else {
bpf_dbg_printk("Decoding traceparent from headers %s", buf);
decode_go_traceparent(buf, tp->trace_id, tp->parent_id, &tp->flags);
}
if (found_tp) {
bpf_dbg_printk("Decoded from existing traceparent");
__builtin_memcpy(tp, found_tp, sizeof(tp_info_t));
} else {
connection_info_t *info = bpf_map_lookup_elem(&ongoing_server_connections, &g_key);
u8 found_info = 0;
Expand Down Expand Up @@ -242,29 +258,12 @@ server_trace_parent(void *goroutine_addr, tp_info_t *tp, void *req_header) {
bpf_dbg_printk("tp: %s", tp_buf);
}

static __always_inline u8 client_trace_parent(void *goroutine_addr,
tp_info_t *tp_i,
void *req_header) {
// Get traceparent from the Request.Header
static __always_inline u8 client_trace_parent(void *goroutine_addr, tp_info_t *tp_i) {
u8 found_trace_id = 0;

// May get overriden when decoding existing traceparent or finding a server span, but otherwise we set sample ON
tp_i->flags = 1;

if (req_header) {
void *traceparent_ptr = extract_traceparent_from_req_headers(req_header);
if (traceparent_ptr != NULL) {
unsigned char buf[TP_MAX_VAL_LENGTH];
long res = bpf_probe_read(buf, sizeof(buf), traceparent_ptr);
if (res < 0) {
bpf_dbg_printk("can't copy traceparent header");
} else {
found_trace_id = 1;
decode_go_traceparent(buf, tp_i->trace_id, tp_i->span_id, &tp_i->flags);
}
}
}

go_addr_key_t g_key = {};
go_addr_key_from_id(&g_key, goroutine_addr);

Expand Down Expand Up @@ -401,4 +400,43 @@ static __always_inline void *unwrap_tls_conn_info(void *conn_ptr, void *tls_stat
return conn_ptr;
}

static __always_inline void process_meta_frame_headers(void *frame, tp_info_t *tp) {
if (!frame) {
return;
}

off_table_t *ot = get_offsets_table();

void *fields = 0;
u64 fields_off = go_offset_of(ot, (go_offset){.v = _meta_headers_frame_fields_ptr_pos});
bpf_probe_read(&fields, sizeof(fields), (void *)(frame + fields_off));
u64 fields_len = 0;
bpf_probe_read(&fields_len, sizeof(fields_len), (void *)(frame + fields_off + 8));
bpf_dbg_printk("fields ptr %llx, len %d", fields, fields_len);
if (fields && fields_len > 0) {
for (u8 i = 0; i < 16; i++) {
if (i >= fields_len) {
break;
}
void *field_ptr = fields + (i * sizeof(grpc_header_field_t));
//bpf_dbg_printk("field_ptr %llx", field_ptr);
grpc_header_field_t field = {};
bpf_probe_read(&field, sizeof(grpc_header_field_t), field_ptr);
//bpf_dbg_printk("grpc header %s:%s", field.key_ptr, field.val_ptr);
//bpf_dbg_printk("grpc sizes %d:%d", field.key_len, field.val_len);
if (field.key_len == W3C_KEY_LENGTH && field.val_len == W3C_VAL_LENGTH) {
u8 temp[W3C_VAL_LENGTH];

bpf_probe_read(&temp, W3C_KEY_LENGTH, field.key_ptr);
if (!bpf_memicmp((const char *)temp, "traceparent", W3C_KEY_LENGTH)) {
//bpf_dbg_printk("found grpc traceparent header");
bpf_probe_read(&temp, W3C_VAL_LENGTH, field.val_ptr);
decode_go_traceparent(temp, tp->trace_id, tp->parent_id, &tp->flags);
break;
}
}
}
}
}

#endif // GO_COMMON_H
53 changes: 38 additions & 15 deletions bpf/go_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include "go_byte_arr.h"
#include "bpf_dbg.h"
#include "go_common.h"
#include "go_traceparent.h"
#include "hpack.h"
#include "ringbuf.h"

Expand Down Expand Up @@ -97,21 +96,29 @@ int uprobe_server_handleStream(struct pt_regs *ctx) {
};

if (stream_ptr) {
void *ctx_ptr = 0;
// Read the embedded context object ptr
bpf_probe_read(&ctx_ptr,
sizeof(ctx_ptr),
void *st_ptr = 0;
void *tp_ptr = 0;
// Read the embedded object ptr
bpf_probe_read(&st_ptr,
sizeof(st_ptr),
(void *)(stream_ptr +
go_offset_of(ot, (go_offset){.v = _grpc_stream_ctx_ptr_pos}) +
go_offset_of(ot, (go_offset){.v = _grpc_stream_st_ptr_pos}) +
sizeof(void *)));

if (ctx_ptr) {
server_trace_parent(
goroutine_addr,
&invocation.tp,
(void *)(ctx_ptr + go_offset_of(ot, (go_offset){.v = _value_context_val_ptr_pos}) +
sizeof(void *)));
bpf_dbg_printk("st_ptr %llx", st_ptr);
if (st_ptr) {
grpc_transports_t *t = bpf_map_lookup_elem(&ongoing_grpc_transports, &st_ptr);

bpf_dbg_printk("found t %llx", t);
if (t) {
bpf_dbg_printk("reading the traceparent from frame headers");
if (valid_trace(t->tp.trace_id)) {
tp_ptr = &t->tp;
}
}
}

server_trace_parent(goroutine_addr, &invocation.tp, tp_ptr);
}

if (bpf_map_update_elem(&ongoing_grpc_server_requests, &g_key, &invocation, BPF_ANY)) {
Expand All @@ -126,16 +133,32 @@ SEC("uprobe/http2Server_operateHeaders")
int uprobe_http2Server_operateHeaders(struct pt_regs *ctx) {
void *goroutine_addr = GOROUTINE_PTR(ctx);
void *tr = GO_PARAM1(ctx);
bpf_dbg_printk(
"=== uprobe/http2Server_operateHeaders tr %llx goroutine %lx === ", tr, goroutine_addr);
void *frame = GO_PARAM2(ctx);
off_table_t *ot = get_offsets_table();

u64 new_offset_version = go_offset_of(ot, (go_offset){.v = _operate_headers_new});

// After grpc version 1.60, they added extra context argument to the
// function call, which adds two extra arguments.
if (new_offset_version) {
frame = GO_PARAM4(ctx);
}

bpf_dbg_printk("=== uprobe/GRPC http2Server_operateHeaders tr %llx goroutine %lx, new %d === ",
tr,
goroutine_addr,
new_offset_version);
go_addr_key_t g_key = {};
go_addr_key_from_id(&g_key, goroutine_addr);

grpc_transports_t t = {
.type = TRANSPORT_HTTP2,
.conn = {0},
.tp = {0},
};

process_meta_frame_headers(frame, &t.tp);

bpf_map_update_elem(&ongoing_grpc_operate_headers, &g_key, &tr, BPF_ANY);
bpf_map_update_elem(&ongoing_grpc_transports, &tr, &t, BPF_ANY);

Expand Down Expand Up @@ -337,7 +360,7 @@ static __always_inline void clientConnStart(
go_offset_of(ot, (go_offset){.v = _value_context_val_ptr_pos}) +
sizeof(void *)));

invocation.flags = client_trace_parent(goroutine_addr, &invocation.tp, val_ptr);
invocation.flags = client_trace_parent(goroutine_addr, &invocation.tp);
} else {
// it's OK sending empty tp for a client, the userspace id generator will make random trace_id, span_id
bpf_dbg_printk("No ctx_ptr %llx", ctx_ptr);
Expand Down
3 changes: 1 addition & 2 deletions bpf/go_kafka_go.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ int uprobe_writer_write_messages(struct pt_regs *ctx) {

tp_info_t tp = {};

// We don't look up in the headers, no http/grpc request, therefore 0 as last argument
client_trace_parent(goroutine_addr, &tp, 0);
client_trace_parent(goroutine_addr, &tp);
go_addr_key_t p_key = {};
go_addr_key_from_id(&p_key, w_ptr);

Expand Down
Loading

0 comments on commit 9efc255

Please sign in to comment.