Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@

/* Do flow filtering. Is optional. */
#include "flows_filter.h"

/*
* Defines an Network events monitoring tracker,
* which runs inside flow_monitor. Is optional.
*/
#include "network_events_monitoring.h"

/*
* Defines packets translation tracker
*/
Expand All @@ -57,6 +59,11 @@
*/
#include "ipsec.h"

/*
* Defines ktls tracker
*/
#include "ktls_tracker.h"

// return 0 on success, 1 if capacity reached
static __always_inline int add_observed_intf(flow_metrics *value, pkt_info *pkt, u32 if_index,
u8 direction) {
Expand Down
169 changes: 169 additions & 0 deletions bpf/ktls_tracker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
kTLS tracker
*/
#include "utils.h"

#ifndef __KTLS_TRACKER_H__
#define __KTLS_TRACKER_H__

#define MAX_SOCK_OPS_MAP_ENTRIES 65535
struct sock_key {
u8 remote_ip[IP_MAX_LEN];
u8 local_ip[IP_MAX_LEN];
u32 remote_port;
u32 local_port;
u32 family;
};

struct {
__uint(type, BPF_MAP_TYPE_SOCKHASH);
__uint(max_entries, MAX_SOCK_OPS_MAP_ENTRIES);
__type(key, struct sock_key);
__type(value, u64);
} sock_hash SEC(".maps");

static __always_inline void bpf_sock_ops_ip(struct bpf_sock_ops *skops) {
int ret;

struct sock_key skk = {
.local_port = skops->local_port,
.remote_port = bpf_ntohl(skops->remote_port),
.family = skops->family,
};

switch (skops->family) {
case AF_INET:
__builtin_memcpy(skk.remote_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(skk.local_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(skk.remote_ip + sizeof(ip4in6), &skops->remote_ip4,
sizeof(skops->remote_ip4));
__builtin_memcpy(skk.local_ip + sizeof(ip4in6), &skops->local_ip4,
sizeof(skops->local_ip4));
break;
case AF_INET6:
return;
}

ret = bpf_sock_hash_update(skops, &sock_hash, &skk, BPF_NOEXIST);
if (ret) {
bpf_printk("failed to update sock hash op: %d, port %d --> %d\n", skops->op, skk.local_port,
skk.remote_port);
return;
}
}

SEC("sockops")
int bpf_sockops(struct bpf_sock_ops *skops) {
u32 op = skops->op;

switch (op) {
case BPF_SOCK_OPS_PASSIVE_ESTABLISHED_CB:
case BPF_SOCK_OPS_ACTIVE_ESTABLISHED_CB:
bpf_sock_ops_ip(skops);
break;
default:
break;
}

return 0;
}

static __always_inline int find_update_flow(struct sk_msg_md *msg, int verdict) {
int ret = 0;
flow_id id;

__builtin_memset(&id, 0, sizeof(id));

id.src_port = msg->sk->src_port;
id.dst_port = bpf_ntohs(msg->sk->dst_port);
id.transport_protocol = msg->sk->protocol;
__builtin_memcpy(id.src_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id.dst_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id.src_ip + sizeof(ip4in6), &msg->sk->src_ip4, sizeof(msg->sk->src_ip4));
__builtin_memcpy(id.dst_ip + sizeof(ip4in6), &msg->sk->dst_ip4, sizeof(msg->sk->dst_ip4));

u64 current_ts = bpf_ktime_get_ns();
additional_metrics *aggregate_flow =
(additional_metrics *)bpf_map_lookup_elem(&additional_flow_metrics, &id);
if (aggregate_flow != NULL) {
aggregate_flow->end_mono_time_ts = current_ts;
if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = current_ts;
}
aggregate_flow->verdict = verdict;
aggregate_flow->tls_msg.family = (u8)msg->family;
aggregate_flow->tls_msg.size = msg->size;
aggregate_flow->tls_msg.local_port = (u16)msg->local_port;
aggregate_flow->tls_msg.remote_port = (u16)bpf_ntohl(msg->remote_port);

unsigned char *p = (unsigned char *)(long)msg->data;
unsigned char *end = (unsigned char *)(long)msg->data_end;
if (p + 5 <= end) {
aggregate_flow->tls_msg.tls_content_type = p[0];
unsigned char *q = p + 5;
if (aggregate_flow->tls_msg.tls_content_type == 22 && q + 1 <= end) {
aggregate_flow->tls_msg.tls_handshake_type = q[0];
} else if (aggregate_flow->tls_msg.tls_content_type == 21 && q + 2 <= end) {
aggregate_flow->tls_msg.tls_alert_level = q[0];
aggregate_flow->tls_msg.tls_alert_desc = q[1];
}
}
ret = bpf_map_update_elem(&additional_flow_metrics, &id, aggregate_flow, BPF_ANY);
} else {
additional_metrics new_flow = {
.start_mono_time_ts = current_ts,
.end_mono_time_ts = current_ts,
.verdict = verdict,
.tls_msg = {
.family = (u8)msg->family,
.size = msg->size,
.local_port = (u16)msg->local_port,
.remote_port = (u16)bpf_ntohl(msg->remote_port),
},
};
unsigned char *p2 = (unsigned char *)(long)msg->data;
unsigned char *end2 = (unsigned char *)(long)msg->data_end;
if (p2 + 5 <= end2) {
new_flow.tls_msg.tls_content_type = p2[0];
unsigned char *q2 = p2 + 5;
if (new_flow.tls_msg.tls_content_type == 22 && q2 + 1 <= end2) {
new_flow.tls_msg.tls_handshake_type = q2[0];
} else if (new_flow.tls_msg.tls_content_type == 21 && q2 + 2 <= end2) {
new_flow.tls_msg.tls_alert_level = q2[0];
new_flow.tls_msg.tls_alert_desc = q2[1];
}
}
ret = bpf_map_update_elem(&additional_flow_metrics, &id, &new_flow, BPF_ANY);
}
return ret;
}

SEC("sk_msg")
int bpf_ktls_redir(struct sk_msg_md *msg) {
struct sock_key skk = {
.local_port = bpf_ntohl(msg->remote_port),
.remote_port = msg->local_port,
.family = msg->family,
};

switch (msg->family) {
case AF_INET:
__builtin_memcpy(skk.remote_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(skk.local_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(skk.remote_ip + sizeof(ip4in6), &msg->remote_ip4, sizeof(msg->remote_ip4));
__builtin_memcpy(skk.local_ip + sizeof(ip4in6), &msg->local_ip4, sizeof(msg->local_ip4));
break;
case AF_INET6:
return SK_PASS;
}

int verdict = bpf_msg_redirect_hash(msg, &sock_hash, &skk, BPF_F_INGRESS);
int ret = find_update_flow(msg, verdict);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@msherif1234 was the goal of your initial PoC to get the content of msg->data here ?

I'm trying to see how we could read some parts of the packet payload, especially when the traffic is encrypted 👼

Copy link
Contributor

@msherif1234 msherif1234 Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind this PoC was triggered by this talk in KubeCon https://youtu.be/-2FykHaqvlg?si=ArqarppDqd33YH15
this approach avoid using userspace probes

https://elixir.bootlin.com/linux/v6.17.1/source/include/uapi/linux/bpf.h#L6553 sk_msg_md has the same layout like tc or tcp hook look like u can collect all L2/L3/L4 headers info from there

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the key thing here is this whole approach relies on using kTLS and more advanced openssl libraries if that is a GO I would like to help with project if there is interest, pls lmk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are looking for user-space approach for SSL/TLS we can do something like https://github.com/gojue/ecapture/tree/master which I can help PoC it in separate PR if there is interest

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your feedback @msherif1234

Let's discuss this on the upstream slack 😸

if (ret != 0 && trace_messages) {
bpf_printk("error updating flow %d\n", ret);
}

return SK_PASS;
}

#endif // __KTLS_TRACKER_H__
12 changes: 12 additions & 0 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ typedef struct additional_metrics_t {
u8 latest_state;
} pkt_drops;
u64 flow_rtt;
u8 verdict;
u8 network_events[MAX_NETWORK_EVENTS][MAX_EVENT_MD];
struct translated_flow_t {
u8 saddr[IP_MAX_LEN];
Expand All @@ -144,6 +145,17 @@ typedef struct additional_metrics_t {
u8 network_events_idx;
bool ipsec_encrypted;
int ipsec_encrypted_ret;
struct tls_msg_t {
// Minimal TLS/kTLS debugging fields
u8 family; // AF_INET/AF_INET6
u8 tls_content_type; // 20=CCS,21=Alert,22=Handshake,23=AppData
u8 tls_handshake_type; // if content_type==22
u8 tls_alert_level; // if content_type==21
u8 tls_alert_desc; // if content_type==21
u16 local_port; // host order
u16 remote_port; // host order
u32 size; // message size
} tls_msg;
} additional_metrics;

// Force emitting enums/structs into the ELF
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func FlowsAgent(cfg *config.Agent) (*Flows, error) {
EnableDNSTracker: cfg.EnableDNSTracking,
DNSTrackerPort: cfg.DNSTrackingPort,
EnableRTT: cfg.EnableRTT,
EnableKTLS: cfg.EnableKTLSTracking,
EnableNetworkEventsMonitoring: cfg.EnableNetworkEventsMonitoring,
NetworkEventsMonitoringGroupID: cfg.NetworkEventsMonitoringGroupID,
EnablePktTranslation: cfg.EnablePktTranslationTracking,
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ type Agent struct {
StaleEntriesEvictTimeout time.Duration `env:"STALE_ENTRIES_EVICT_TIMEOUT" envDefault:"5s"`
// EnablePCA enables Packet Capture Agent (PCA). By default, PCA is off.
EnablePCA bool `env:"ENABLE_PCA" envDefault:"false"`
// EnableKTLSTracking enable tracking kernel encrypted packets
EnableKTLSTracking bool `env:"ENABLE_KTLS_TRACKING" envDefault:"false"`
// MetricsEnable enables http server to collect ebpf agent metrics, default is false.
MetricsEnable bool `env:"METRICS_ENABLE" envDefault:"false"`
// Metrics verbosity level. From more to less verbose: trace!, debug, info (default).
Expand Down
Loading
Loading