Skip to content

Commit

Permalink
Add support for dynamic gRPC tables (#1530)
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski authored Jan 15, 2025
1 parent bf091bb commit 5a281d0
Show file tree
Hide file tree
Showing 73 changed files with 1,993 additions and 130 deletions.
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,13 @@ oats-test-kafka: oats-prereq
mkdir -p test/oats/kafka/$(TEST_OUTPUT)/run
cd test/oats/kafka && TESTCASE_TIMEOUT=120s TESTCASE_BASE_PATH=./yaml $(GINKGO) -v -r

.PHONY: oats-test-http
oats-test-http: oats-prereq
mkdir -p test/oats/http/$(TEST_OUTPUT)/run
cd test/oats/http && TESTCASE_BASE_PATH=./yaml $(GINKGO) -v -r

.PHONY: oats-test
oats-test: oats-test-sql oats-test-redis oats-test-kafka
oats-test: oats-test-sql oats-test-redis oats-test-kafka oats-test-http
$(MAKE) itest-coverage-data

.PHONY: oats-test-debug
Expand Down
1 change: 1 addition & 0 deletions bpf/http_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ typedef struct http2_grpc_request {
// with other instrumented processes
pid_info pid;
u8 ssl;
u8 new_conn;
tp_info_t tp;
} http2_grpc_request_t;

Expand Down
9 changes: 6 additions & 3 deletions bpf/k_tracer_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ static __always_inline void handle_buf_with_args(void *ctx, call_protocol_args_t
bpf_tail_call(ctx, &jump_table, k_tail_protocol_http);
} else if (is_http2_or_grpc(args->small_buf, MIN_HTTP2_SIZE)) {
bpf_dbg_printk("Found HTTP2 or gRPC connection");
u8 is_ssl = args->ssl;
bpf_map_update_elem(&ongoing_http2_connections, &args->pid_conn, &is_ssl, BPF_ANY);
u8 flags = http2_conn_flag_new;
if (args->ssl) {
flags |= http2_conn_flag_ssl;
}
bpf_map_update_elem(&ongoing_http2_connections, &args->pid_conn, &flags, BPF_ANY);
} else {
u8 *h2g = bpf_map_lookup_elem(&ongoing_http2_connections, &args->pid_conn);
if (h2g && *h2g == args->ssl) {
if (h2g && (http2_flag_ssl(*h2g) == args->ssl)) {
bpf_tail_call(ctx, &jump_table, k_tail_protocol_http2);
} else { // large request tracking
http_info_t *info = bpf_map_lookup_elem(&ongoing_http, &args->pid_conn);
Expand Down
20 changes: 19 additions & 1 deletion bpf/protocol_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
#include "protocol_common.h"
#include "ringbuf.h"

// These are bit flags, if you add any use power of 2 values
enum { http2_conn_flag_ssl = WITH_SSL, http2_conn_flag_new = 0x2 };

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, pid_connection_info_t);
__type(value, u8); // ssl or not
__type(value, u8); // flags
__uint(max_entries, MAX_CONCURRENT_REQUESTS);
} ongoing_http2_connections SEC(".maps");

Expand Down Expand Up @@ -62,6 +65,14 @@ static __always_inline grpc_frames_ctx_t *grpc_ctx() {
return bpf_map_lookup_elem(&grpc_frames_ctx_mem, &zero);
}

static __always_inline u8 http2_flag_ssl(u8 flags) {
return flags & http2_conn_flag_ssl;
}

static __always_inline u8 http2_flag_new(u8 flags) {
return flags & http2_conn_flag_new;
}

static __always_inline http2_grpc_request_t *empty_http2_info() {
int zero = 0;
http2_grpc_request_t *value = bpf_map_lookup_elem(&http2_info_mem, &zero);
Expand Down Expand Up @@ -98,6 +109,13 @@ static __always_inline void http2_grpc_start(
h2g_info->pid = meta->pid;
h2g_info->type = meta->type;
}

h2g_info->new_conn = 0;
u8 *h2g = bpf_map_lookup_elem(&ongoing_http2_connections, &s_key->pid_conn);
if (h2g && http2_flag_new(*h2g)) {
h2g_info->new_conn = 1;
}

fixup_connection_info(
&h2g_info->conn_info, h2g_info->type == EVENT_HTTP_CLIENT, orig_dport);
bpf_probe_read(h2g_info->data, KPROBES_HTTP2_BUF_SIZE, u_buf);
Expand Down
15 changes: 10 additions & 5 deletions pkg/internal/ebpf/bhpack/hpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,19 @@ type Decoder struct {
// to fully parse before. Unlike buf, we own this data.
saveBuf bytes.Buffer

firstField bool // processing the first field of the header block
firstField bool // processing the first field of the header block
failedToIndex bool
}

// NewDecoder returns a new decoder with the provided maximum dynamic
// table size. The emitFunc will be called for each valid field
// parsed, in the same goroutine as calls to Write, before Write returns.
func NewDecoder(maxDynamicTableSize uint32, emitFunc func(f HeaderField)) *Decoder {
d := &Decoder{
emit: emitFunc,
emitEnabled: true,
firstField: true,
emit: emitFunc,
emitEnabled: true,
firstField: true,
failedToIndex: false,
}
d.dynTab.table.init()
d.dynTab.allowedMaxSize = maxDynamicTableSize
Expand Down Expand Up @@ -345,7 +347,10 @@ func (d *Decoder) parseFieldIndexed() error {
}
hf, ok := d.at(idx)
d.buf = buf
// If we've failed once to find an index, don't allow us to find
// a value for index that's greater than the last successful one
if !ok {
d.failedToIndex = true
return d.callEmit(HeaderField{Name: "<BAD INDEX>", Value: ""})
}
return d.callEmit(HeaderField{Name: hf.Name, Value: hf.Value})
Expand Down Expand Up @@ -392,7 +397,7 @@ func (d *Decoder) parseFieldLiteral(n uint8, it indexType) error {
}
}
d.buf = buf
if it.indexed() {
if it.indexed() && !d.failedToIndex {
d.dynTab.add(hf)
}
hf.Sensitive = it.sensitive()
Expand Down
7 changes: 4 additions & 3 deletions pkg/internal/ebpf/common/bpf_arm64_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/internal/ebpf/common/bpf_arm64_bpfel.o
Git LFS file not shown
7 changes: 4 additions & 3 deletions pkg/internal/ebpf/common/bpf_x86_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/internal/ebpf/common/bpf_x86_bpfel.o
Git LFS file not shown
38 changes: 25 additions & 13 deletions pkg/internal/ebpf/common/http2grpc_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
GRPC
)

const initialHeaderTableSize = 4096

type h2Connection struct {
hdec *bhpack.Decoder
hdecRet *bhpack.Decoder
Expand All @@ -45,13 +47,18 @@ func byteFramer(data []uint8) *http2.Framer {
return fr
}

func getOrInitH2Conn(conn *BPFConnInfo) *h2Connection {
func getOrInitH2Conn(conn *BPFConnInfo, newConn bool) *h2Connection {
v, ok := activeGRPCConnections.Get(*conn)

dynamicTableSize := initialHeaderTableSize
if !newConn {
dynamicTableSize = 0
}

if !ok {
h := h2Connection{
hdec: bhpack.NewDecoder(0, nil),
hdecRet: bhpack.NewDecoder(0, nil),
hdec: bhpack.NewDecoder(uint32(dynamicTableSize), nil),
hdecRet: bhpack.NewDecoder(uint32(dynamicTableSize), nil),
protocol: HTTP2,
}
activeGRPCConnections.Add(*conn, h)
Expand All @@ -64,8 +71,8 @@ func getOrInitH2Conn(conn *BPFConnInfo) *h2Connection {
return &v
}

func protocolIsGRPC(conn *BPFConnInfo) {
h2c := getOrInitH2Conn(conn)
func protocolIsGRPC(conn *BPFConnInfo, newConn bool) {
h2c := getOrInitH2Conn(conn, newConn)
if h2c != nil {
h2c.protocol = GRPC
}
Expand Down Expand Up @@ -102,8 +109,8 @@ func knownFrameKeys(fr *http2.Framer, hf *http2.HeadersFrame) bool {
return known
}

func readMetaFrame(conn *BPFConnInfo, fr *http2.Framer, hf *http2.HeadersFrame) (string, string, string, bool) {
h2c := getOrInitH2Conn(conn)
func readMetaFrame(conn *BPFConnInfo, newConn bool, fr *http2.Framer, hf *http2.HeadersFrame) (string, string, string, bool) {
h2c := getOrInitH2Conn(conn, newConn)

ok := false
method := ""
Expand All @@ -126,7 +133,7 @@ func readMetaFrame(conn *BPFConnInfo, fr *http2.Framer, hf *http2.HeadersFrame)
case "content-type":
contentType = strings.ToLower(hf.Value)
if contentType == "application/grpc" {
protocolIsGRPC(conn)
protocolIsGRPC(conn, newConn)
}
ok = true
}
Expand Down Expand Up @@ -162,8 +169,8 @@ func http2grpcStatus(status int) int {
return 2 // Unknown
}

func readRetMetaFrame(conn *BPFConnInfo, fr *http2.Framer, hf *http2.HeadersFrame) (int, bool, bool) {
h2c := getOrInitH2Conn(conn)
func readRetMetaFrame(conn *BPFConnInfo, newConn bool, fr *http2.Framer, hf *http2.HeadersFrame) (int, bool, bool) {
h2c := getOrInitH2Conn(conn, newConn)

ok := false
status := 0
Expand All @@ -184,7 +191,7 @@ func readRetMetaFrame(conn *BPFConnInfo, fr *http2.Framer, hf *http2.HeadersFram
ok = true
case "grpc-status":
status, _ = strconv.Atoi(hf.Value)
protocolIsGRPC(conn)
protocolIsGRPC(conn, newConn)
grpc = true
ok = true
}
Expand Down Expand Up @@ -276,6 +283,11 @@ func http2FromBuffers(event *BPFHTTP2Info) (request.Span, bool, error) {
status := 0
eventType := HTTP2

newConn := true
if event.NewConn == 0 {
newConn = false
}

for {
f, err := framer.ReadFrame()

Expand All @@ -285,7 +297,7 @@ func http2FromBuffers(event *BPFHTTP2Info) (request.Span, bool, error) {

if ff, ok := f.(*http2.HeadersFrame); ok {
rok := false
method, path, contentType, ok := readMetaFrame((*BPFConnInfo)(&event.ConnInfo), framer, ff)
method, path, contentType, ok := readMetaFrame((*BPFConnInfo)(&event.ConnInfo), newConn, framer, ff)

if path == "" {
path = "*"
Expand All @@ -301,7 +313,7 @@ func http2FromBuffers(event *BPFHTTP2Info) (request.Span, bool, error) {
}

if ff, ok := retF.(*http2.HeadersFrame); ok {
status, grpcInStatus, rok = readRetMetaFrame((*BPFConnInfo)(&event.ConnInfo), retFramer, ff)
status, grpcInStatus, rok = readRetMetaFrame((*BPFConnInfo)(&event.ConnInfo), newConn, retFramer, ff)
break
}
}
Expand Down
85 changes: 81 additions & 4 deletions pkg/internal/ebpf/common/http2grpc_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ func TestHTTP2Parsing(t *testing.T) {

if ff, ok := f.(*http2.HeadersFrame); ok {
connInfo := BPFConnInfo{}
method, path, contentType, _ := readMetaFrame(&connInfo, framer, ff)
assert.Equal(t, method, tt.method)
assert.Equal(t, path, tt.path)
assert.Equal(t, contentType, tt.contentType)
method, path, contentType, _ := readMetaFrame(&connInfo, false, framer, ff)
assert.Equal(t, tt.method, method)
assert.Equal(t, tt.path, path)
assert.Equal(t, tt.contentType, contentType)
}
}
})
Expand Down Expand Up @@ -169,6 +169,74 @@ func TestHTTP2EventsParsing(t *testing.T) {
}
}

func TestDynamicTableUpdates(t *testing.T) {
rinput := []byte{0, 0, 138, 1, 36, 0, 0, 0, 11, 0, 0, 0, 0, 15, 0, 0, 0, 0, 45, 0, 0, 0, 0, 0, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}

tests := []struct {
name string
input []byte
inputLen int
}{
{
name: "Full path, lots of headers",
input: []byte{0, 0, 222, 1, 4, 0, 0, 0, 1, 64, 5, 58, 112, 97, 116, 104, 33, 47, 114, 111, 117, 116, 101, 103, 117, 105, 100, 101, 46, 82, 111, 117, 116, 101, 71, 117, 105, 100, 101, 47, 71, 101, 116, 70, 101, 97, 116, 117, 114, 101, 64, 10, 58, 97, 117, 116, 104, 111, 114, 105, 116, 121, 15, 108, 111, 99, 97, 108, 104, 111, 115, 116, 58, 53, 48, 48, 53, 49, 131, 134, 64, 12, 99, 111, 110, 116, 101, 110, 116, 45, 116, 121, 112, 101, 16, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 103, 114, 112, 99, 64, 2, 116, 101, 8, 116, 114, 97, 105, 108, 101, 114, 115, 64, 20, 103, 114, 112, 99, 45, 97, 99, 99, 101, 112, 116, 45, 101, 110, 99, 111, 100, 105, 110, 103, 23, 105, 100, 101, 110, 116, 105, 116, 121, 44, 32, 100, 101, 102, 108, 97, 116, 101, 44, 32, 103, 122, 105, 112, 64, 10, 117, 115, 101, 114, 45, 97, 103, 101, 110, 116, 48, 103, 114, 112, 99, 45, 112, 121, 116, 104, 111, 110, 47, 49, 46, 54, 57, 46, 48, 32, 103, 114, 112, 99, 45, 99, 47, 52, 52, 46, 50, 46, 48, 32, 40, 108, 105, 110, 117, 120, 59, 32, 99, 104, 116, 116, 112, 50, 41, 0, 0, 4, 8, 0, 0, 0, 0, 1, 0, 0, 0, 5, 0, 0, 22, 0, 1, 0, 0, 0, 1, 0, 0, 0},
inputLen: 1024,
},
{
name: "Full path only",
input: []byte{0, 0, 222, 1, 4, 0, 0, 0, 1, 64, 5, 58, 112, 97, 116, 104, 33, 47, 114, 111, 117, 116, 101, 103, 117, 105, 100, 101, 46, 82, 111, 117, 116, 101, 71, 117, 105, 100, 101, 47, 71, 101, 116, 70, 101, 97, 116, 117, 114, 101, 131},
inputLen: 1024,
},
{
name: "Index encoded",
input: []byte{0, 0, 8, 1, 4, 0, 0, 0, 3, 195, 194, 131, 134, 193, 192, 191, 190, 0, 0, 4, 8, 0, 0, 0, 0, 3, 0, 0, 0, 5, 0, 0, 5, 0, 1, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4, 8, 0, 0, 0, 0, 0, 0, 0, 0, 84},
inputLen: 1024,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
info := makeBPFHTTP2InfoNewRequest(tt.input, rinput, tt.inputLen)
s, ignore, _ := http2FromBuffers(&info)
assert.False(t, ignore)
assert.Equal(t, "POST", s.Method)
assert.Equal(t, "/routeguide.RouteGuide/GetFeature", s.Path)
})
}

// Now let's break the decoder with pushing unknown indices
unknownIndexInput := []byte{0, 0, 8, 1, 4, 0, 0, 0, 3, 199, 200, 131, 134, 201, 202, 203, 204, 0, 0, 4, 8, 0, 0, 0, 0, 3, 0, 0, 0, 5, 0, 0, 5, 0, 1, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4, 8, 0, 0, 0, 0, 0, 0, 0, 0, 84}

info := makeBPFHTTP2InfoNewRequest(unknownIndexInput, rinput, 1024)
s, ignore, _ := http2FromBuffers(&info)
assert.False(t, ignore)
assert.Equal(t, "POST", s.Method)
assert.Equal(t, "*", s.Path)

nextIndex := 8 + 61 // 61 is the static table index size, 7 is how many entries we store in the dynamic table with that first request

// Now let's send new path
newPathInput := []byte{0, 0, 222, 1, 4, 0, 0, 0, 1, 64, 5, 58, 112, 97, 116, 104, 33, 47, 112, 111, 117, 116, 101, 103, 117, 105, 100, 101, 46, 82, 111, 117, 116, 101, 71, 117, 105, 100, 101, 47, 71, 101, 116, 70, 101, 97, 116, 117, 114, 101, 64, 10, 58, 97, 117, 116, 104, 111, 114, 105, 116, 121, 15, 108, 111, 99, 97, 108, 104, 111, 115, 116, 58, 53, 48, 48, 53, 49, 131, 134, 64, 12, 99, 111, 110, 116, 101, 110, 116, 45, 116, 121, 112, 101, 16, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 103, 114, 112, 99, 64, 2, 116, 101, 8, 116, 114, 97, 105, 108, 101, 114, 115, 64, 20, 103, 114, 112, 99, 45, 97, 99, 99, 101, 112, 116, 45, 101, 110, 99, 111, 100, 105, 110, 103, 23, 105, 100, 101, 110, 116, 105, 116, 121, 44, 32, 100, 101, 102, 108, 97, 116, 101, 44, 32, 103, 122, 105, 112, 64, 10, 117, 115, 101, 114, 45, 97, 103, 101, 110, 116, 48, 103, 114, 112, 99, 45, 112, 121, 116, 104, 111, 110, 47, 49, 46, 54, 57, 46, 48, 32, 103, 114, 112, 99, 45, 99, 47, 52, 52, 46, 50, 46, 48, 32, 40, 108, 105, 110, 117, 120, 59, 32, 99, 104, 116, 116, 112, 50, 41, 0, 0, 4, 8, 0, 0, 0, 0, 1, 0, 0, 0, 5, 0, 0, 22, 0, 1, 0, 0, 0, 1, 0, 0, 0}

// We'll be able to decode this correctly, even with broken decoder, beause the values are sent as text
info = makeBPFHTTP2InfoNewRequest(newPathInput, rinput, 1024)
s, ignore, _ = http2FromBuffers(&info)
assert.False(t, ignore)
assert.Equal(t, "POST", s.Method)
assert.Equal(t, "/pouteguide.RouteGuide/GetFeature", s.Path) // this value is the same I just changed the first character from r to p

// indexed version of newPathInput
// if we cached a new pair nextIndex + 128 is the high bit encoded next index which should be in the dynamic table
// however we mark the decoder as invalid and it shouldn't resolve to anything for :path
indexedNewPath := []byte{0, 0, 8, 1, 4, 0, 0, 0, 3, 195, 194, 131, 134, 193, 192, 191, byte(nextIndex + 128), 0, 0, 4, 8, 0, 0, 0, 0, 3, 0, 0, 0, 5, 0, 0, 5, 0, 1, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4, 8, 0, 0, 0, 0, 0, 0, 0, 0, 84}

info = makeBPFHTTP2InfoNewRequest(indexedNewPath, rinput, 1024)
s, ignore, _ = http2FromBuffers(&info)
assert.False(t, ignore)
assert.Equal(t, "POST", s.Method)
assert.Equal(t, "*", s.Path) // this value is the same I just changed the first character from r to p
}

func makeBPFHTTP2Info(buf, rbuf []byte, len int) BPFHTTP2Info {
var info BPFHTTP2Info
copy(info.Data[:], buf)
Expand All @@ -177,3 +245,12 @@ func makeBPFHTTP2Info(buf, rbuf []byte, len int) BPFHTTP2Info {

return info
}

func makeBPFHTTP2InfoNewRequest(buf, rbuf []byte, len int) BPFHTTP2Info {
info := makeBPFHTTP2Info(buf, rbuf, len)
info.ConnInfo.D_port = 1
info.ConnInfo.S_port = 1
info.NewConn = 1

return info
}
7 changes: 4 additions & 3 deletions pkg/internal/ebpf/generictracer/bpf_arm64_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5a281d0

Please sign in to comment.