Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Light refactoring of the TC attachment code #1466

Merged
merged 5 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/internal/ebpf/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ var IntegrityModeOverride = false

var ActiveNamespaces = make(map[uint32]uint32)

// These represent unique traffic control (tc) handles to be supplied as
// arguments to RegisterIngress, RegisterEgress or RegisterTC. They all start
// from the 0xb310 offset in simplistic attempt to avoid collisions with
// 3rdparty handles.
const (
NetollyTCHandle = 0xb310 + iota
HTTPTracerTCHandle
TCTracerTCHandle
)

// ProbeDesc holds the information of the instrumentation points of a given
// function/symbol
type ProbeDesc struct {
Expand Down
111 changes: 60 additions & 51 deletions pkg/internal/ebpf/common/tc_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io/fs"
"log/slog"
"time"

"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
Expand All @@ -21,10 +22,7 @@ type TCLinks struct {
IngressFilter *netlink.BpfFilter
}

func WatchAndRegisterTC(ctx context.Context, channelBufferLen int, register func(iface ifaces.Interface), log *slog.Logger) {
informer := ifaces.NewWatcher(channelBufferLen)
registerer := ifaces.NewRegisterer(informer, channelBufferLen)

func StartTCMonitorLoop(ctx context.Context, registerer *ifaces.Registerer, register func(iface ifaces.Interface), log *slog.Logger) {
log.Debug("subscribing for network interface events")
ifaceEvents, err := registerer.Subscribe(ctx)
if err != nil {
Expand Down Expand Up @@ -54,10 +52,25 @@ func WatchAndRegisterTC(ctx context.Context, channelBufferLen int, register func
}()
}

func RegisterTC(iface ifaces.Interface, egressFD, ingressFD int, log *slog.Logger) *TCLinks {
links := TCLinks{}
// Convenience function
func WatchAndRegisterTC(ctx context.Context, channelBufferLen int, register func(iface ifaces.Interface), log *slog.Logger) {
log.Debug("listening for new interfaces: use watching")

informer := ifaces.NewWatcher(channelBufferLen)
registerer := ifaces.NewRegisterer(informer, channelBufferLen)
StartTCMonitorLoop(ctx, registerer, register, log)
}

// Convenience function
func PollAndRegisterTC(ctx context.Context, channelBufferLen int, register func(iface ifaces.Interface), period time.Duration, log *slog.Logger) {
log.Debug("listening for new interfaces: use polling", "period", period)

// Load pre-compiled programs and maps into the kernel, and rewrites the configuration
informer := ifaces.NewPoller(period, channelBufferLen)
registerer := ifaces.NewRegisterer(informer, channelBufferLen)
StartTCMonitorLoop(ctx, registerer, register, log)
}

func GetClsactQdisc(iface ifaces.Interface, log *slog.Logger) *netlink.GenericQdisc {
ipvlan, err := netlink.LinkByIndex(iface.Index)
if err != nil {
log.Error("failed to lookup ipvlan device", "index", iface.Index, "name", iface.Name, "error", err)
Expand All @@ -80,15 +93,29 @@ func RegisterTC(iface ifaces.Interface, egressFD, ingressFD int, log *slog.Logge
return nil
}
}
links.Qdisc = qdisc

egressFilter, err := registerEgress(ipvlan, egressFD)
return qdisc
}

func RegisterTC(iface ifaces.Interface, egressFD int, egressHandle uint32, egressName string,
ingressFD int, ingressHandle uint32, ingressName string, log *slog.Logger) *TCLinks {
links := TCLinks{
Qdisc: GetClsactQdisc(iface, log),
}

if links.Qdisc == nil {
return nil
}

linkIndex := links.Qdisc.QdiscAttrs.LinkIndex

egressFilter, err := RegisterEgress(linkIndex, egressFD, egressHandle, egressName)
if err != nil {
log.Error("failed to install egress filters", "error", err)
}
links.EgressFilter = egressFilter

ingressFilter, err := registerIngress(ipvlan, ingressFD)
ingressFilter, err := RegisterIngress(linkIndex, ingressFD, ingressHandle, ingressName)
if err != nil {
log.Error("failed to install ingres filters", "error", err)
}
Expand All @@ -97,62 +124,44 @@ func RegisterTC(iface ifaces.Interface, egressFD, ingressFD int, log *slog.Logge
return &links
}

func registerEgress(ipvlan netlink.Link, egressFD int) (*netlink.BpfFilter, error) {
// Fetch events on egress
egressAttrs := netlink.FilterAttrs{
LinkIndex: ipvlan.Attrs().Index,
Parent: netlink.HANDLE_MIN_EGRESS,
Handle: netlink.MakeHandle(0, 1),
Protocol: unix.ETH_P_ALL,
Priority: 1,
}
egressFilter := &netlink.BpfFilter{
FilterAttrs: egressAttrs,
Fd: egressFD,
Name: "tc/tc_http_egress",
DirectAction: true,
}
if err := netlink.FilterDel(egressFilter); err == nil {
log.Warn("egress filter already existed. Deleted it")
}
if err := netlink.FilterAdd(egressFilter); err != nil {
if errors.Is(err, fs.ErrExist) {
log.Warn("egress filter already exists. Ignoring", "error", err)
} else {
return nil, fmt.Errorf("failed to create egress filter: %w", err)
}
}
func RegisterEgress(linkIndex int, egressFD int, handle uint32, name string) (*netlink.BpfFilter, error) {
return registerFilter(linkIndex, egressFD, handle, netlink.HANDLE_MIN_EGRESS, name)
}

return egressFilter, nil
func RegisterIngress(linkIndex int, ingressFD int, handle uint32, name string) (*netlink.BpfFilter, error) {
return registerFilter(linkIndex, ingressFD, handle, netlink.HANDLE_MIN_INGRESS, name)
}

func registerIngress(ipvlan netlink.Link, ingressFD int) (*netlink.BpfFilter, error) {
func registerFilter(linkIndex int, fd int, handle uint32, parent uint32, name string) (*netlink.BpfFilter, error) {
// Fetch events on ingress
ingressAttrs := netlink.FilterAttrs{
LinkIndex: ipvlan.Attrs().Index,
Parent: netlink.HANDLE_MIN_INGRESS,
Handle: netlink.MakeHandle(0, 1),
attrs := netlink.FilterAttrs{
LinkIndex: linkIndex,
Parent: parent,
Handle: handle,
Protocol: unix.ETH_P_ALL,
Priority: 1,
}
ingressFilter := &netlink.BpfFilter{
FilterAttrs: ingressAttrs,
Fd: ingressFD,
Name: "tc/tc_http_ingress",

filter := &netlink.BpfFilter{
FilterAttrs: attrs,
Fd: fd,
Name: name,
DirectAction: true,
}
if err := netlink.FilterDel(ingressFilter); err == nil {
log.Warn("ingress filter already existed. Deleted it")

if err := netlink.FilterDel(filter); err == nil {
log.Warn("filter already existed. Deleted it", "filter", name, "iface", linkIndex)
}
if err := netlink.FilterAdd(ingressFilter); err != nil {

if err := netlink.FilterAdd(filter); err != nil {
if errors.Is(err, fs.ErrExist) {
log.Warn("ingress filter already exists. Ignoring", "error", err)
log.Warn("filter already exists. Ignoring", "error", err)
} else {
return nil, fmt.Errorf("failed to create ingress filter: %w", err)
return nil, fmt.Errorf("failed to create filter: %w", err)
}
}

return ingressFilter, nil
return filter, nil
}

// doIgnoreNoDev runs the provided syscall over the provided device and ignores the error
Expand Down
6 changes: 5 additions & 1 deletion pkg/internal/ebpf/httptracer/httptracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ func (p *Tracer) Run(ctx context.Context, _ chan<- []request.Span) {
}

func (p *Tracer) registerTC(iface ifaces.Interface) {
links := ebpfcommon.RegisterTC(iface, p.bpfObjects.BeylaTcHttpEgress.FD(), p.bpfObjects.BeylaTcHttpIngress.FD(), p.log)
links := ebpfcommon.RegisterTC(iface,
p.bpfObjects.BeylaTcHttpEgress.FD(), ebpfcommon.HTTPTracerTCHandle, "tc/tc_http_egress",
p.bpfObjects.BeylaTcHttpIngress.FD(), ebpfcommon.HTTPTracerTCHandle, "tc/tc_http_ingress",
p.log)

if links == nil {
return
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/internal/ebpf/tctracer/tctracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@ func (p *Tracer) Run(ctx context.Context, _ chan<- []request.Span) {
}

func (p *Tracer) registerTC(iface ifaces.Interface) {
links := ebpfcommon.RegisterTC(iface, p.bpfObjects.BeylaAppEgress.FD(), p.bpfObjects.BeylaAppIngress.FD(), p.log)
links := ebpfcommon.RegisterTC(iface,
p.bpfObjects.BeylaAppEgress.FD(), ebpfcommon.TCTracerTCHandle, "tc/tc_egress",
p.bpfObjects.BeylaAppIngress.FD(), ebpfcommon.TCTracerTCHandle, "tc/tc_ingress",
p.log)

if links == nil {
return
}
Expand Down
32 changes: 4 additions & 28 deletions pkg/internal/netolly/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cilium/ebpf/ringbuf"

"github.com/grafana/beyla/pkg/beyla"
ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common"
"github.com/grafana/beyla/pkg/internal/netolly/ebpf"
"github.com/grafana/beyla/pkg/internal/netolly/flow"
"github.com/grafana/beyla/pkg/internal/netolly/ifaces"
Expand Down Expand Up @@ -89,7 +90,7 @@ type Flows struct {
ctxInfo *global.ContextInfo

// input data providers
interfaces ifaces.Informer
registerer *ifaces.Registerer
filter interfaceFilter
ebpf ebpfFlowFetcher

Expand Down Expand Up @@ -193,7 +194,7 @@ func flowsAgent(
return &Flows{
ctxInfo: ctxInfo,
ebpf: fetcher,
interfaces: registerer,
registerer: registerer,
filter: filter,
cfg: cfg,
mapTracer: mapTracer,
Expand Down Expand Up @@ -259,32 +260,7 @@ func (f *Flows) Status() Status {
func (f *Flows) interfacesManager(ctx context.Context) error {
slog := alog().With("function", "interfacesManager")

slog.Debug("subscribing for network interface events")
ifaceEvents, err := f.interfaces.Subscribe(ctx)
if err != nil {
return fmt.Errorf("instantiating interfaces' informer: %w", err)
}

go func() {
for {
select {
case <-ctx.Done():
slog.Debug("stopping interfaces' listener")
return
case event := <-ifaceEvents:
slog.Debug("received event", "event", event)
switch event.Type {
case ifaces.EventAdded:
f.onInterfaceAdded(event.Interface)
case ifaces.EventDeleted:
// qdiscs, ingress and egress filters are automatically deleted so we don't need to
// specifically detach them from the ebpfFetcher
default:
slog.Warn("unknown event type", "event", event)
}
}
}
}()
ebpfcommon.StartTCMonitorLoop(ctx, f.registerer, f.onInterfaceAdded, slog)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/netolly/agent/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestFilter(t *testing.T) {
},
}},
},
interfaces: fakeInterfacesInformer{},
registerer: ifaces.NewRegisterer(fakeInterfacesInformer{}, 10),
interfaceNamer: func(_ int) string { return "fakeiface" },
}

Expand Down
Loading
Loading