Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure TC programs play nice with 3rdparty programs #1462

Merged
merged 4 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions bpf/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
#define __FLOW_H__

#include "vmlinux.h"
#include "tc_act.h"

#define TC_ACT_OK 0
#define TC_ACT_SHOT 2
#define IP_MAX_LEN 16

#define ETH_ALEN 6 /* Octets in one ethernet addr */
Expand Down
10 changes: 5 additions & 5 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ static inline int fill_ethhdr(struct ethhdr *eth, void *data_end, flow_id *id, u
static inline int flow_monitor(struct __sk_buff *skb) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) {
return TC_ACT_OK;
return TC_ACT_UNSPEC;
}
void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;
Expand All @@ -157,7 +157,7 @@ static inline int flow_monitor(struct __sk_buff *skb) {
struct ethhdr *eth = (struct ethhdr *)data;
u16 flags = 0;
if (fill_ethhdr(eth, data_end, &id, &flags) == DISCARD) {
return TC_ACT_OK;
return TC_ACT_UNSPEC;
}
id.if_index = skb->ifindex;

Expand Down Expand Up @@ -261,16 +261,16 @@ static inline int flow_monitor(struct __sk_buff *skb) {
if (flags & FIN_FLAG || flags & RST_FLAG || flags & FIN_ACK_FLAG || flags & RST_ACK_FLAG) {
bpf_map_delete_elem(&flow_directions, &id);
}
return TC_ACT_OK;
return TC_ACT_UNSPEC;
}

SEC("tc_ingress")
int ingress_flow_parse(struct __sk_buff *skb) {
int beyla_ingress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb);
}

SEC("tc_egress")
int egress_flow_parse(struct __sk_buff *skb) {
int beyla_egress_flow_parse(struct __sk_buff *skb) {
return flow_monitor(skb);
}

Expand Down
10 changes: 5 additions & 5 deletions bpf/flows_sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,22 @@ static __always_inline bool same_ip(const u8 *ip1, const u8 *ip2) {
}

SEC("socket/http_filter")
int socket__http_filter(struct __sk_buff *skb) {
int beyla_socket__http_filter(struct __sk_buff *skb) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling != 0 && (bpf_get_prandom_u32() % sampling) != 0) {
return TC_ACT_OK;
return TC_ACT_UNSPEC;
}

u16 flags = 0;
flow_id id;
__builtin_memset(&id, 0, sizeof(id));
if (!read_sk_buff(skb, &id, &flags)) {
return TC_ACT_OK;
return TC_ACT_UNSPEC;
}

// ignore traffic that's not egress or ingress
if (same_ip(id.src_ip.s6_addr, id.dst_ip.s6_addr)) {
return TC_ACT_OK;
return TC_ACT_UNSPEC;
}

u64 current_time = bpf_ktime_get_ns();
Expand Down Expand Up @@ -296,7 +296,7 @@ int socket__http_filter(struct __sk_buff *skb) {
if (flags & FIN_FLAG || flags & RST_FLAG) {
bpf_map_delete_elem(&flow_directions, &id);
}
return TC_ACT_OK;
return TC_ACT_UNSPEC;
}

// Force emitting structs into the ELF for automatic creation of Golang struct
Expand Down
32 changes: 16 additions & 16 deletions bpf/go_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ struct {
49 // 1 + 1 + 8 + 1 +~ 38 = type byte + hpack_len_as_byte("traceparent") + strlen(hpack("traceparent")) + len_as_byte(38) + hpack(generated tracepanent id)

SEC("uprobe/server_handleStream")
int uprobe_server_handleStream(struct pt_regs *ctx) {
int beyla_uprobe_server_handleStream(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/server_handleStream === ");
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_dbg_printk("goroutine_addr %lx", goroutine_addr);
Expand Down Expand Up @@ -130,7 +130,7 @@ int uprobe_server_handleStream(struct pt_regs *ctx) {

// Handles finding the connection information for http2 servers in grpc
SEC("uprobe/http2Server_operateHeaders")
int uprobe_http2Server_operateHeaders(struct pt_regs *ctx) {
int beyla_uprobe_http2Server_operateHeaders(struct pt_regs *ctx) {
void *goroutine_addr = GOROUTINE_PTR(ctx);
void *tr = GO_PARAM1(ctx);
void *frame = GO_PARAM2(ctx);
Expand Down Expand Up @@ -167,7 +167,7 @@ int uprobe_http2Server_operateHeaders(struct pt_regs *ctx) {

// Handles finding the connection information for grpc ServeHTTP
SEC("uprobe/serverHandlerTransport_HandleStreams")
int uprobe_server_handler_transport_handle_streams(struct pt_regs *ctx) {
int beyla_uprobe_server_handler_transport_handle_streams(struct pt_regs *ctx) {
void *tr = GO_PARAM1(ctx);
void *goroutine_addr = GOROUTINE_PTR(ctx);
bpf_printk("=== uprobe/serverHandlerTransport_HandleStreams tr %llx goroutine %lx === ",
Expand Down Expand Up @@ -198,7 +198,7 @@ int uprobe_server_handler_transport_handle_streams(struct pt_regs *ctx) {
}

SEC("uprobe/server_handleStream")
int uprobe_server_handleStream_return(struct pt_regs *ctx) {
int beyla_uprobe_server_handleStream_return(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/server_handleStream return === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
Expand Down Expand Up @@ -300,7 +300,7 @@ int uprobe_server_handleStream_return(struct pt_regs *ctx) {
}

SEC("uprobe/transport_writeStatus")
int uprobe_transport_writeStatus(struct pt_regs *ctx) {
int beyla_uprobe_transport_writeStatus(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/transport_writeStatus === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
Expand Down Expand Up @@ -373,7 +373,7 @@ static __always_inline void clientConnStart(
}

SEC("uprobe/ClientConn_Invoke")
int uprobe_ClientConn_Invoke(struct pt_regs *ctx) {
int beyla_uprobe_ClientConn_Invoke(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc ClientConn.Invoke === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
Expand All @@ -391,7 +391,7 @@ int uprobe_ClientConn_Invoke(struct pt_regs *ctx) {

// Same as ClientConn_Invoke, registers for the method are offset by one
SEC("uprobe/ClientConn_NewStream")
int uprobe_ClientConn_NewStream(struct pt_regs *ctx) {
int beyla_uprobe_ClientConn_NewStream(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc ClientConn.NewStream === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
Expand Down Expand Up @@ -475,7 +475,7 @@ static __always_inline int grpc_connect_done(struct pt_regs *ctx, void *err) {

// Same as ClientConn_Invoke, registers for the method are offset by one
SEC("uprobe/ClientConn_NewStream")
int uprobe_ClientConn_NewStream_return(struct pt_regs *ctx) {
int beyla_uprobe_ClientConn_NewStream_return(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc ClientConn.NewStream return === ");

void *stream = GO_PARAM1(ctx);
Expand All @@ -488,7 +488,7 @@ int uprobe_ClientConn_NewStream_return(struct pt_regs *ctx) {
}

SEC("uprobe/ClientConn_Close")
int uprobe_ClientConn_Close(struct pt_regs *ctx) {
int beyla_uprobe_ClientConn_Close(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc ClientConn.Close === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
Expand All @@ -502,7 +502,7 @@ int uprobe_ClientConn_Close(struct pt_regs *ctx) {
}

SEC("uprobe/ClientConn_Invoke")
int uprobe_ClientConn_Invoke_return(struct pt_regs *ctx) {
int beyla_uprobe_ClientConn_Invoke_return(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc ClientConn.Invoke return === ");

void *err = GO_PARAM1(ctx);
Expand All @@ -516,7 +516,7 @@ int uprobe_ClientConn_Invoke_return(struct pt_regs *ctx) {

// google.golang.org/grpc.(*clientStream).RecvMsg
SEC("uprobe/clientStream_RecvMsg")
int uprobe_clientStream_RecvMsg_return(struct pt_regs *ctx) {
int beyla_uprobe_clientStream_RecvMsg_return(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc clientStream.RecvMsg return === ");
void *err = (void *)GO_PARAM1(ctx);
return grpc_connect_done(ctx, err);
Expand All @@ -525,7 +525,7 @@ int uprobe_clientStream_RecvMsg_return(struct pt_regs *ctx) {
// The gRPC client stream is written on another goroutine in transport loopyWriter (controlbuf.go).
// We extract the stream ID when it's just created and make a mapping of it to our goroutine that's executing ClientConn.Invoke.
SEC("uprobe/transport_http2Client_NewStream")
int uprobe_transport_http2Client_NewStream(struct pt_regs *ctx) {
int beyla_uprobe_transport_http2Client_NewStream(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc transport.(*http2Client).NewStream === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
Expand Down Expand Up @@ -619,7 +619,7 @@ struct {
} grpc_framer_invocation_map SEC(".maps");

SEC("uprobe/grpcFramerWriteHeaders")
int uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
int beyla_uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc Framer writeHeaders === ");

void *framer = GO_PARAM1(ctx);
Expand Down Expand Up @@ -682,7 +682,7 @@ int uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
}
#else
SEC("uprobe/grpcFramerWriteHeaders")
int uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
int beyla_uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
return 0;
}
#endif
Expand All @@ -692,7 +692,7 @@ int uprobe_grpcFramerWriteHeaders(struct pt_regs *ctx) {
66 // 1 + 1 + 8 + 1 + 55 = type byte + hpack_len_as_byte("traceparent") + strlen(hpack("traceparent")) + len_as_byte(55) + generated traceparent id

SEC("uprobe/grpcFramerWriteHeaders_returns")
int uprobe_grpcFramerWriteHeaders_returns(struct pt_regs *ctx) {
int beyla_uprobe_grpcFramerWriteHeaders_returns(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/proc grpc Framer writeHeaders returns === ");

void *goroutine_addr = GOROUTINE_PTR(ctx);
Expand Down Expand Up @@ -807,7 +807,7 @@ int uprobe_grpcFramerWriteHeaders_returns(struct pt_regs *ctx) {
}
#else
SEC("uprobe/grpcFramerWriteHeaders_returns")
int uprobe_grpcFramerWriteHeaders_returns(struct pt_regs *ctx) {
int beyla_uprobe_grpcFramerWriteHeaders_returns(struct pt_regs *ctx) {
return 0;
}
#endif
18 changes: 9 additions & 9 deletions bpf/go_kafka_go.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct {

// Code for the produce messages path
SEC("uprobe/writer_write_messages")
int uprobe_writer_write_messages(struct pt_regs *ctx) {
int beyla_uprobe_writer_write_messages(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
void *w_ptr = (void *)GO_PARAM1(ctx);
bpf_dbg_printk(
Expand All @@ -81,7 +81,7 @@ int uprobe_writer_write_messages(struct pt_regs *ctx) {
}

SEC("uprobe/writer_produce")
int uprobe_writer_produce(struct pt_regs *ctx) {
int beyla_uprobe_writer_produce(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/kafka-go writer_produce %llx === ", goroutine_addr);
go_addr_key_t g_key = {};
Expand Down Expand Up @@ -129,7 +129,7 @@ int uprobe_writer_produce(struct pt_regs *ctx) {
}

SEC("uprobe/client_roundTrip")
int uprobe_client_roundTrip(struct pt_regs *ctx) {
int beyla_uprobe_client_roundTrip(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/kafka-go client_roundTrip %llx === ", goroutine_addr);
go_addr_key_t g_key = {};
Expand All @@ -154,7 +154,7 @@ int uprobe_client_roundTrip(struct pt_regs *ctx) {
}

SEC("uprobe/protocol_RoundTrip")
int uprobe_protocol_roundtrip(struct pt_regs *ctx) {
int beyla_uprobe_protocol_roundtrip(struct pt_regs *ctx) {
bpf_dbg_printk("=== uprobe/kafka-go protocol_RoundTrip === ");
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
void *rw_ptr = (void *)GO_PARAM2(ctx);
Expand Down Expand Up @@ -187,7 +187,7 @@ int uprobe_protocol_roundtrip(struct pt_regs *ctx) {
}

SEC("uprobe/protocol_RoundTrip_ret")
int uprobe_protocol_roundtrip_ret(struct pt_regs *ctx) {
int beyla_uprobe_protocol_roundtrip_ret(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/protocol_RoundTrip ret %llx === ", goroutine_addr);
go_addr_key_t g_key = {};
Expand Down Expand Up @@ -241,7 +241,7 @@ int uprobe_protocol_roundtrip_ret(struct pt_regs *ctx) {

// Code for the fetch messages path
SEC("uprobe/reader_read")
int uprobe_reader_read(struct pt_regs *ctx) {
int beyla_uprobe_reader_read(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
void *r_ptr = (void *)GO_PARAM1(ctx);
void *conn = (void *)GO_PARAM5(ctx);
Expand Down Expand Up @@ -287,7 +287,7 @@ int uprobe_reader_read(struct pt_regs *ctx) {
}

SEC("uprobe/reader_send_message")
int uprobe_reader_send_message(struct pt_regs *ctx) {
int beyla_uprobe_reader_send_message(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/kafka-go reader_send_message %llx === ", goroutine_addr);
go_addr_key_t g_key = {};
Expand All @@ -304,7 +304,7 @@ int uprobe_reader_send_message(struct pt_regs *ctx) {
}

SEC("uprobe/reader_read")
int uprobe_reader_read_ret(struct pt_regs *ctx) {
int beyla_uprobe_reader_read_ret(struct pt_regs *ctx) {
void *goroutine_addr = (void *)GOROUTINE_PTR(ctx);
bpf_dbg_printk("=== uprobe/kafka-go reader_read ret %llx === ", goroutine_addr);
go_addr_key_t g_key = {};
Expand All @@ -330,4 +330,4 @@ int uprobe_reader_read_ret(struct pt_regs *ctx) {
bpf_map_delete_elem(&fetch_requests, &g_key);

return 0;
}
}
Loading
Loading