Skip to content

Commit

Permalink
Socket map based L7 context-propagation (#1396)
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski authored Nov 26, 2024
1 parent 25689c0 commit d2a8a83
Show file tree
Hide file tree
Showing 241 changed files with 16,137 additions and 3,206 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ checkfmt:
exit 1; \
fi

.PHONY: clang-tidy
clang-tidy:
cd bpf && $(CLANG_TIDY) *.c *.h

.PHONY: lint-dashboard
lint-dashboard: prereqs
@echo "### Linting dashboard";
Expand Down Expand Up @@ -379,10 +383,6 @@ clean-testoutput:
check-ebpf-integrity: docker-generate
git diff --name-status --exit-code || (echo "Run make docker-generate locally and commit the code changes" && false)

.PHONY: clang-tidy
clang-tidy:
cd bpf && $(CLANG_TIDY) *.c *.h

.PHONY: protoc-gen
protoc-gen:
docker run --rm -v $(PWD):/work -w /work $(PROTOC_IMAGE) protoc --go_out=pkg/kubecache --go-grpc_out=pkg/kubecache proto/informer.proto
19 changes: 19 additions & 0 deletions bpf/go_nethttp.h
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,25 @@ int uprobe_writeSubset(struct pt_regs *ctx) {
(void *)(io_writer_addr + go_offset_of(ot, (go_offset){.v = _io_writer_n_pos})),
&len,
sizeof(len));

// For Go we support two types of HTTP context propagation for now.
// 1. The one that this code does, which uses the locked down bpf_probe_write_user.
// 2. By using a sock_msg program that will extend the packet.
// If this code ran, we should ensure that the second part doesn't run, therefore
// we remove the metadata setup in uprobe_persistConnRoundTrip(struct pt_regs *ctx), so
// that approach 2. skips this packet.
connection_info_t *info = bpf_map_lookup_elem(&ongoing_client_connections, &g_key);
if (info) {
egress_key_t e_key = {
.d_port = info->d_port,
.s_port = info->s_port,
};
bpf_map_delete_elem(&outgoing_trace_map, &e_key);
bpf_dbg_printk(
"wrote traceparent using bpf_probe_write_user, removing outgoing trace map %d:%d",
e_key.s_port,
e_key.d_port);
}
}

done:
Expand Down
23 changes: 23 additions & 0 deletions bpf/http_ssl_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,27 @@ handle_ssl_buf(void *ctx, u64 id, ssl_args_t *args, int bytes_len, u8 direction)
}
}

static __always_inline void *is_ssl_connection(u64 id) {
void *ssl = 0;
// Checks if it's sandwitched between active SSL handshake, read or write uprobe/uretprobe
void **s = bpf_map_lookup_elem(&active_ssl_handshakes, &id);
if (s) {
ssl = *s;
} else {
ssl_args_t *ssl_args = bpf_map_lookup_elem(&active_ssl_read_args, &id);
if (!ssl_args) {
ssl_args = bpf_map_lookup_elem(&active_ssl_write_args, &id);
}
if (ssl_args) {
ssl = (void *)ssl_args->ssl;
}
}

return ssl;
}

static __always_inline void *is_active_ssl(pid_connection_info_t *conn) {
return bpf_map_lookup_elem(&active_ssl_connections, conn);
}

#endif
31 changes: 29 additions & 2 deletions bpf/http_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@
// h = high word, l = low word
// used as hashmap key, must be 4 byte aligned?
typedef struct http_connection_info {
u8 s_addr[IP_V6_ADDR_LEN];
u8 d_addr[IP_V6_ADDR_LEN];
union {
u8 s_addr[IP_V6_ADDR_LEN];
u32 s_ip[IP_V6_ADDR_LEN_WORDS];
};
union {
u8 d_addr[IP_V6_ADDR_LEN];
u32 d_ip[IP_V6_ADDR_LEN_WORDS];
};
u16 s_port;
u16 d_port;
} connection_info_t;
Expand Down Expand Up @@ -187,6 +193,14 @@ typedef struct http2_grpc_request {
tp_info_t tp;
} http2_grpc_request_t;

// When sock_msg is installed it disables the kprobes attached to tcp_sendmsg.
// We use this data structure to provide the buffer to the tcp_sendmsg logic,
// because we can't read the bvec physical pages.
typedef struct msg_buffer {
u8 buf[KPROBES_HTTP2_BUF_SIZE];
u16 pos;
} msg_buffer_t;

// Force emitting struct http_request_trace into the ELF for automatic creation of Golang struct
const http_info_t *unused __attribute__((unused));
const http2_grpc_request_t *unused_http2 __attribute__((unused));
Expand Down Expand Up @@ -244,6 +258,19 @@ static __always_inline void sort_connection_info(connection_info_t *info) {
}
}

// Equivalent to sort_connection_info, but works only with the ports key (egress_key_t),
// which we use for egress connection tracking
static __always_inline void sort_egress_key(egress_key_t *info) {
if (likely_ephemeral_port(info->s_port) && !likely_ephemeral_port(info->d_port)) {
return;
}

if ((likely_ephemeral_port(info->d_port) && !likely_ephemeral_port(info->s_port)) ||
(info->d_port > info->s_port)) {
__SWAP(u16, info->s_port, info->d_port);
}
}

static __always_inline bool client_call(connection_info_t *info) {
return likely_ephemeral_port(info->s_port) && !likely_ephemeral_port(info->d_port);
}
Expand Down
60 changes: 32 additions & 28 deletions bpf/k_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ struct {
__type(value, recv_args_t);
} active_recv_args SEC(".maps");

typedef struct send_args {
pid_connection_info_t p_conn;
u64 size;
} send_args_t;

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
Expand Down Expand Up @@ -256,29 +251,6 @@ int BPF_KRETPROBE(kretprobe_sys_connect, int fd) {

// Main HTTP read and write operations are handled with tcp_sendmsg and tcp_recvmsg

static __always_inline void *is_ssl_connection(u64 id) {
void *ssl = 0;
// Checks if it's sandwitched between active SSL handshake, read or write uprobe/uretprobe
void **s = bpf_map_lookup_elem(&active_ssl_handshakes, &id);
if (s) {
ssl = *s;
} else {
ssl_args_t *ssl_args = bpf_map_lookup_elem(&active_ssl_read_args, &id);
if (!ssl_args) {
ssl_args = bpf_map_lookup_elem(&active_ssl_write_args, &id);
}
if (ssl_args) {
ssl = (void *)ssl_args->ssl;
}
}

return ssl;
}

static __always_inline void *is_active_ssl(pid_connection_info_t *conn) {
return bpf_map_lookup_elem(&active_ssl_connections, conn);
}

// The size argument here will be always the total response size.
// However, the return value of tcp_sendmsg tells us how much it sent. When the
// response is large it will get chunked, so we have to use a kretprobe to
Expand All @@ -302,6 +274,11 @@ int BPF_KPROBE(kprobe_tcp_sendmsg, struct sock *sk, struct msghdr *msg, size_t s
u16 orig_dport = s_args.p_conn.conn.d_port;
dbg_print_http_connection_info(
&s_args.p_conn.conn); // commented out since GitHub CI doesn't like this call
// Create the egress key before we sort the connection info.
egress_key_t e_key = {
.d_port = s_args.p_conn.conn.d_port,
.s_port = s_args.p_conn.conn.s_port,
};
sort_connection_info(&s_args.p_conn.conn);
s_args.p_conn.pid = pid_from_pid_tgid(id);

Expand All @@ -313,6 +290,33 @@ int BPF_KPROBE(kprobe_tcp_sendmsg, struct sock *sk, struct msghdr *msg, size_t s
u8 *buf = iovec_memory();
if (buf) {
size = read_msghdr_buf(msg, buf, size);
// If a sock_msg program is installed, this kprobe will fail to
// read anything, because the data is in bvec physical pages. However,
// the sock_msg will setup a buffer for us if this is the case. We
// look up this buffer and use it instead of what we'd get from
// calling read_msghdr_buf.
if (!size) {
msg_buffer_t *m_buf = bpf_map_lookup_elem(&msg_buffers, &e_key);
bpf_dbg_printk("No size, m_buf[%llx]", m_buf);
if (m_buf) {
buf = m_buf->buf;
// The buffer setup for us by a sock_msg program is always the
// full buffer, but when we extend a packet to be able to inject
// a Traceparent field, it will actually be split in 3 chunks:
// [before the injected header],[70 bytes for 'Traceparent...'],[the rest].
// We don't want the handle_buf_with_connection logic to run more than
// once on the same data, so if we find a buf we send all of it to the
// handle_buf_with_connection logic and then mark it as seen by making
// m_buf->pos be the size of the buffer.
if (!m_buf->pos) {
size = sizeof(m_buf->buf);
m_buf->pos = size;
bpf_dbg_printk("msg_buffer: size %d, buf[%s]", size, buf);
} else {
size = 0;
}
}
}
if (size) {
u64 sock_p = (u64)sk;
bpf_map_update_elem(&active_send_args, &id, &s_args, BPF_ANY);
Expand Down
62 changes: 57 additions & 5 deletions bpf/k_tracer_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
#include "protocol_http.h"
#include "protocol_http2.h"
#include "protocol_tcp.h"
#include "tc_common.h"

typedef struct send_args {
pid_connection_info_t p_conn;
u64 size;
} send_args_t;

struct bpf_map_def SEC("maps") jump_table = {
.type = BPF_MAP_TYPE_PROG_ARRAY,
Expand All @@ -19,13 +25,19 @@ struct bpf_map_def SEC("maps") jump_table = {
.max_entries = 8,
};

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, egress_key_t);
__type(value, msg_buffer_t);
__uint(max_entries, 1000);
__uint(pinning, BEYLA_PIN_INTERNAL);
} msg_buffers SEC(".maps");

#define TAIL_PROTOCOL_HTTP 0
#define TAIL_PROTOCOL_HTTP2 1
#define TAIL_PROTOCOL_TCP 2

static __always_inline void handle_buf_with_args(void *ctx, call_protocol_args_t *args) {
bpf_probe_read(args->small_buf, MIN_HTTP2_SIZE, (void *)args->u_buf);

bpf_dbg_printk(
"buf=[%s], pid=%d, len=%d", args->small_buf, args->pid_conn.pid, args->bytes_len);

Expand All @@ -42,8 +54,48 @@ static __always_inline void handle_buf_with_args(void *ctx, call_protocol_args_t
} else { // large request tracking
http_info_t *info = bpf_map_lookup_elem(&ongoing_http, &args->pid_conn);

if (info && still_responding(info)) {
info->end_monotime_ns = bpf_ktime_get_ns();
if (info) {
// Still reading checks if we are processing buffers of a HTTP request
// that has started, but we haven't seen a response yet.
if (still_reading(info)) {
// Packets are split into chunks if Beyla injected the Traceparent
// Make sure you look for split packets containing the real Traceparent.
// Essentially, when a packet is extended by our sock_msg program and
// passed down another service, the receiving side may reassemble the
// packets into one buffer or not. If they are reassembled, then the
// call to bpf_tail_call(ctx, &jump_table, TAIL_PROTOCOL_HTTP); will
// scan for the incoming 'Traceparent' header. If they are not reassembled
// we'll see something like this:
// [before the injected header],[70 bytes for 'Traceparent...'],[the rest].
if (is_traceparent(args->small_buf)) {
unsigned char *buf = tp_char_buf();
if (buf) {
bpf_probe_read(buf, EXTEND_SIZE, (u8 *)args->u_buf);
bpf_dbg_printk("Found traceparent %s", buf);
unsigned char *t_id = extract_trace_id(buf);
unsigned char *s_id = extract_span_id(buf);
unsigned char *f_id = extract_flags(buf);

decode_hex(info->tp.trace_id, t_id, TRACE_ID_CHAR_LEN);
decode_hex((unsigned char *)&info->tp.flags, f_id, FLAGS_CHAR_LEN);
decode_hex(info->tp.parent_id, s_id, SPAN_ID_CHAR_LEN);

trace_key_t t_key = {0};
task_tid(&t_key.p_key);
t_key.extra_id = extra_runtime_id();

tp_info_pid_t *existing = bpf_map_lookup_elem(&server_traces, &t_key);
if (existing) {
__builtin_memcpy(&existing->tp, &info->tp, sizeof(tp_info_t));
set_trace_info_for_connection(&args->pid_conn.conn, existing);
} else {
bpf_dbg_printk("Didn't find existing trace, this might be a bug!");
}
}
}
} else if (still_responding(info)) {
info->end_monotime_ns = bpf_ktime_get_ns();
}
} else if (!info) {
// SSL requests will see both TCP traffic and text traffic, ignore the TCP if
// we are processing SSL request. HTTP2 is already checked in handle_buf_with_connection.
Expand Down Expand Up @@ -87,7 +139,7 @@ static __always_inline void handle_buf_with_connection(void *ctx,
}

__builtin_memcpy(&args->pid_conn, pid_conn, sizeof(pid_connection_info_t));

bpf_probe_read(args->small_buf, MIN_HTTP2_SIZE, (void *)args->u_buf);
handle_buf_with_args(ctx, args);
}

Expand Down
5 changes: 2 additions & 3 deletions bpf/pid.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ static __always_inline u8 pid_matches(pid_key_t *p) {
u64 k =
(((u64)p->ns) << 32) | p->pid; // combine the namespace id and the pid into one single u64

u32 h =
(u32)(k %
PRIME_HASH); // divide with prime number lower than max pids * 64, modulo with primes gives good hash functions
// divide with prime number lower than max pids * 64, modulo with primes gives good hash functions
u32 h = (u32)(k % PRIME_HASH);
u32 segment = h / 64; // divide by the segment size (8 bytes) to find the segment
u32 bit = h & 63; // lowest 64 bits gives us the placement inside the segment

Expand Down
1 change: 1 addition & 0 deletions bpf/protocol_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define AF_INET6 10 /* IP version 6 */

#define IP_V6_ADDR_LEN 16
#define IP_V6_ADDR_LEN_WORDS 4

// Most Linux distros use 32768 to 61000 for the ephemeral ports, so we look up from 32768
// IANA suggests that the range should be 49152-65535, which is what Windows uses
Expand Down
2 changes: 1 addition & 1 deletion bpf/protocol_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ static __always_inline void process_http2_grpc_frames(pid_connection_info_t *pid
int saved_buf_pos = 0;
u8 found_data_frame = 0;
http2_conn_stream_t stream = {0};
stream.pid_conn = *pid_conn;

unsigned char frame_buf[FRAME_HEADER_LEN];
frame_header_t frame = {0};
Expand All @@ -124,7 +125,6 @@ static __always_inline void process_http2_grpc_frames(pid_connection_info_t *pid
//bpf_dbg_printk("http2 frame type = %d, len = %d, stream_id = %d, flags = %d", frame.type, frame.length, frame.stream_id, frame.flags);

if (is_headers_frame(&frame)) {
stream.pid_conn = *pid_conn;
stream.stream_id = frame.stream_id;
if (!prev_info) {
prev_info = bpf_map_lookup_elem(&ongoing_http2_grpc, &stream);
Expand Down
Loading

0 comments on commit d2a8a83

Please sign in to comment.