diff --git a/pkg/internal/ebpf/common/common.go b/pkg/internal/ebpf/common/common.go index c8d22d987..ad2203ba8 100644 --- a/pkg/internal/ebpf/common/common.go +++ b/pkg/internal/ebpf/common/common.go @@ -43,16 +43,6 @@ var IntegrityModeOverride = false var ActiveNamespaces = make(map[uint32]uint32) -// These represent unique traffic control (tc) handles to be supplied as -// arguments to RegisterIngress, RegisterEgress or RegisterTC. They all start -// from the 0xb310 offset in simplistic attempt to avoid collisions with -// 3rdparty handles. -const ( - NetollyTCHandle = 0xb310 + iota - HTTPTracerTCHandle - TCTracerTCHandle -) - // ProbeDesc holds the information of the instrumentation points of a given // function/symbol type ProbeDesc struct { diff --git a/pkg/internal/ebpf/common/tc_linux.go b/pkg/internal/ebpf/common/tc_linux.go deleted file mode 100644 index 0ffa0379c..000000000 --- a/pkg/internal/ebpf/common/tc_linux.go +++ /dev/null @@ -1,244 +0,0 @@ -//go:build linux - -package ebpfcommon - -import ( - "context" - "errors" - "fmt" - "io/fs" - "log/slog" - "time" - - "github.com/vishvananda/netlink" - "golang.org/x/sys/unix" - - "github.com/grafana/beyla/pkg/internal/netolly/ifaces" -) - -type TCLinks struct { - Qdisc *netlink.GenericQdisc - EgressFilter *netlink.BpfFilter - IngressFilter *netlink.BpfFilter -} - -func StartTCMonitorLoop(ctx context.Context, registerer *ifaces.Registerer, register func(iface ifaces.Interface), log *slog.Logger) { - log.Debug("subscribing for network interface events") - ifaceEvents, err := registerer.Subscribe(ctx) - if err != nil { - log.Error("instantiating interfaces' informer", "error", err) - return - } - - go func() { - for { - select { - case <-ctx.Done(): - slog.Debug("stopping interfaces' listener") - return - case event := <-ifaceEvents: - slog.Debug("received event", "event", event) - switch event.Type { - case ifaces.EventAdded: - register(event.Interface) - case ifaces.EventDeleted: - // qdiscs, ingress and egress filters are automatically deleted so we don't need to - // specifically detach them from the ebpfFetcher - default: - slog.Warn("unknown event type", "event", event) - } - } - } - }() -} - -// Convenience function -func WatchAndRegisterTC(ctx context.Context, channelBufferLen int, register func(iface ifaces.Interface), log *slog.Logger) { - log.Debug("listening for new interfaces: use watching") - - informer := ifaces.NewWatcher(channelBufferLen) - registerer := ifaces.NewRegisterer(informer, channelBufferLen) - StartTCMonitorLoop(ctx, registerer, register, log) -} - -// Convenience function -func PollAndRegisterTC(ctx context.Context, channelBufferLen int, register func(iface ifaces.Interface), period time.Duration, log *slog.Logger) { - log.Debug("listening for new interfaces: use polling", "period", period) - - informer := ifaces.NewPoller(period, channelBufferLen) - registerer := ifaces.NewRegisterer(informer, channelBufferLen) - StartTCMonitorLoop(ctx, registerer, register, log) -} - -func GetClsactQdisc(iface ifaces.Interface, log *slog.Logger) *netlink.GenericQdisc { - ipvlan, err := netlink.LinkByIndex(iface.Index) - if err != nil { - log.Error("failed to lookup ipvlan device", "index", iface.Index, "name", iface.Name, "error", err) - return nil - } - qdiscAttrs := netlink.QdiscAttrs{ - LinkIndex: ipvlan.Attrs().Index, - Handle: netlink.MakeHandle(0xffff, 0), - Parent: netlink.HANDLE_CLSACT, - } - qdisc := &netlink.GenericQdisc{ - QdiscAttrs: qdiscAttrs, - QdiscType: "clsact", - } - if err := netlink.QdiscAdd(qdisc); err != nil { - if errors.Is(err, fs.ErrExist) { - log.Warn("qdisc clsact already exists. Ignoring", "error", err) - } else { - log.Error("failed to create clsact qdisc on", "index", iface.Index, "name", iface.Name, "error", err) - return nil - } - } - - return qdisc -} - -func RegisterTC(iface ifaces.Interface, egressFD int, egressHandle uint32, egressName string, - ingressFD int, ingressHandle uint32, ingressName string, log *slog.Logger) *TCLinks { - links := TCLinks{ - Qdisc: GetClsactQdisc(iface, log), - } - - if links.Qdisc == nil { - return nil - } - - linkIndex := links.Qdisc.QdiscAttrs.LinkIndex - - egressFilter, err := RegisterEgress(linkIndex, egressFD, egressHandle, egressName) - if err != nil { - log.Error("failed to install egress filters", "error", err) - } - links.EgressFilter = egressFilter - - ingressFilter, err := RegisterIngress(linkIndex, ingressFD, ingressHandle, ingressName) - if err != nil { - log.Error("failed to install ingres filters", "error", err) - } - links.IngressFilter = ingressFilter - - return &links -} - -func RegisterEgress(linkIndex int, egressFD int, handle uint32, name string) (*netlink.BpfFilter, error) { - return registerFilter(linkIndex, egressFD, handle, netlink.HANDLE_MIN_EGRESS, name) -} - -func RegisterIngress(linkIndex int, ingressFD int, handle uint32, name string) (*netlink.BpfFilter, error) { - return registerFilter(linkIndex, ingressFD, handle, netlink.HANDLE_MIN_INGRESS, name) -} - -func registerFilter(linkIndex int, fd int, handle uint32, parent uint32, name string) (*netlink.BpfFilter, error) { - // Fetch events on ingress - attrs := netlink.FilterAttrs{ - LinkIndex: linkIndex, - Parent: parent, - Handle: handle, - Protocol: unix.ETH_P_ALL, - Priority: 1, - } - - filter := &netlink.BpfFilter{ - FilterAttrs: attrs, - Fd: fd, - Name: name, - DirectAction: true, - } - - if err := netlink.FilterDel(filter); err == nil { - log.Warn("filter already existed. Deleted it", "filter", name, "iface", linkIndex) - } - - if err := netlink.FilterAdd(filter); err != nil { - if errors.Is(err, fs.ErrExist) { - log.Warn("filter already exists. Ignoring", "error", err) - } else { - return nil, fmt.Errorf("failed to create filter: %w", err) - } - } - - return filter, nil -} - -// doIgnoreNoDev runs the provided syscall over the provided device and ignores the error -// if the cause is a non-existing device (just logs the error as debug). -// If the agent is deployed as part of the Network Metrics pipeline, normally -// undeploying the FlowCollector could cause the agent to try to remove resources -// from Pods that have been removed immediately before (e.g. flowlogs-pipeline or the -// console plugin), so we avoid logging some errors that would unnecessarily raise the -// user's attention. -// This function uses generics because the set of provided functions accept different argument -// types. -func doIgnoreNoDev[T any](sysCall func(T) error, dev T) error { - if err := sysCall(dev); err != nil { - if errors.Is(err, unix.ENODEV) { - slog.Error("can't delete. Ignore this error if other pods or interfaces "+ - " are also being deleted at this moment. For example, if you are undeploying "+ - " a FlowCollector or Deployment where this agent is part of", - "error", err) - } else { - return err - } - } - return nil -} - -func ifaceHasFilters(iface ifaces.Interface, parent uint32) bool { - ipvlan, err := netlink.LinkByIndex(iface.Index) - - if err != nil { - return true // be conservative assume we have filters if we can't detect them - } - - filters, err := netlink.FilterList(ipvlan, parent) - - if err != nil { - return true // be conservative assume we have filters if we can't detect them - } - - return len(filters) > 0 -} - -func cleanupQdiscs(qdiscs map[ifaces.Interface]*netlink.GenericQdisc) { - for iface, qd := range qdiscs { - hasEgressFilters := ifaceHasFilters(iface, netlink.HANDLE_MIN_EGRESS) - hasIngressFilters := ifaceHasFilters(iface, netlink.HANDLE_MIN_INGRESS) - - if hasEgressFilters || hasIngressFilters { - log.Debug("not deleting Qdisc as it still has children", "interface", iface) - } else { - log.Debug("deleting Qdisc", "interface", iface) - - if err := doIgnoreNoDev(netlink.QdiscDel, netlink.Qdisc(qd)); err != nil { - log.Error("deleting qdisc", "error", err) - } - } - } -} - -func cleanupFilters(filters map[ifaces.Interface]*netlink.BpfFilter, kind string) { - for iface, ef := range filters { - log.Debug(fmt.Sprintf("deleting %s filter", kind), "interface", iface) - if err := doIgnoreNoDev(netlink.FilterDel, netlink.Filter(ef)); err != nil { - log.Error(fmt.Sprintf("deleting %s filter", kind), "error", err) - } - } -} - -func CloseTCLinks(qdiscs map[ifaces.Interface]*netlink.GenericQdisc, - egressFilters map[ifaces.Interface]*netlink.BpfFilter, - ingressFilters map[ifaces.Interface]*netlink.BpfFilter, - log *slog.Logger) { - log.Info("removing traffic control probes") - - cleanupFilters(egressFilters, "egress") - cleanupFilters(ingressFilters, "ingress") - - // make sure this happens only after cleaning up filters, so that we don't - // remove 3rdparty filters - cleanupQdiscs(qdiscs) -} diff --git a/pkg/internal/ebpf/generictracer/generictracer.go b/pkg/internal/ebpf/generictracer/generictracer.go index 4a2a60603..3e296417c 100644 --- a/pkg/internal/ebpf/generictracer/generictracer.go +++ b/pkg/internal/ebpf/generictracer/generictracer.go @@ -481,9 +481,6 @@ func (p *Tracer) AlreadyInstrumentedLib(id uint64) bool { return module != nil } -func (p *Tracer) SetupTC() { -} - func (p *Tracer) Run(ctx context.Context, eventsChan chan<- []request.Span) { // At this point we now have loaded the bpf objects, which means we should insert any // pids that are allowed into the bpf map diff --git a/pkg/internal/ebpf/generictracer/generictracer_notlinux.go b/pkg/internal/ebpf/generictracer/generictracer_notlinux.go index 28d81f78c..5407af386 100644 --- a/pkg/internal/ebpf/generictracer/generictracer_notlinux.go +++ b/pkg/internal/ebpf/generictracer/generictracer_notlinux.go @@ -40,6 +40,5 @@ func (p *Tracer) UnlinkInstrumentedLib(_ uint64) {} func (p *Tracer) AlreadyInstrumentedLib(_ uint64) bool { return false } func (p *Tracer) Run(_ context.Context, _ chan<- []request.Span) {} func (p *Tracer) Constants() map[string]any { return nil } -func (p *Tracer) SetupTC() {} func (p *Tracer) SetupTailCalls() {} func (p *Tracer) RegisterOffsets(_ *exec.FileInfo, _ *goexec.Offsets) {} diff --git a/pkg/internal/ebpf/gotracer/gotracer.go b/pkg/internal/ebpf/gotracer/gotracer.go index 6ba102c5c..af0fc8ffb 100644 --- a/pkg/internal/ebpf/gotracer/gotracer.go +++ b/pkg/internal/ebpf/gotracer/gotracer.go @@ -401,8 +401,6 @@ func (p *Tracer) AlreadyInstrumentedLib(_ uint64) bool { return false } -func (p *Tracer) SetupTC() {} - func (p *Tracer) Run(ctx context.Context, eventsChan chan<- []request.Span) { ebpfcommon.SharedRingbuf( p.cfg, diff --git a/pkg/internal/ebpf/httptracer/httptracer.go b/pkg/internal/ebpf/httptracer/httptracer.go index 83cd4ebae..505e2403c 100644 --- a/pkg/internal/ebpf/httptracer/httptracer.go +++ b/pkg/internal/ebpf/httptracer/httptracer.go @@ -8,13 +8,12 @@ import ( "log/slog" "github.com/cilium/ebpf" - "github.com/vishvananda/netlink" "github.com/grafana/beyla/pkg/beyla" ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" + "github.com/grafana/beyla/pkg/internal/ebpf/tcmanager" "github.com/grafana/beyla/pkg/internal/exec" "github.com/grafana/beyla/pkg/internal/goexec" - "github.com/grafana/beyla/pkg/internal/netolly/ifaces" "github.com/grafana/beyla/pkg/internal/request" "github.com/grafana/beyla/pkg/internal/svc" ) @@ -27,19 +26,15 @@ type Tracer struct { bpfObjects bpfObjects closers []io.Closer log *slog.Logger - qdiscs map[ifaces.Interface]*netlink.GenericQdisc - egressFilters map[ifaces.Interface]*netlink.BpfFilter - ingressFilters map[ifaces.Interface]*netlink.BpfFilter + tcManager tcmanager.TCManager } func New(cfg *beyla.Config) *Tracer { log := slog.With("component", "tc_http.Tracer") + return &Tracer{ log: log, cfg: cfg, - qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, - egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, } } @@ -121,7 +116,11 @@ func (p *Tracer) AlreadyInstrumentedLib(uint64) bool { return false } -func (p *Tracer) SetupTC() { +func (p *Tracer) startTC(ctx context.Context) { + if p.tcManager != nil { + return + } + if !p.cfg.EBPF.UseTCForL7CP { return } @@ -131,41 +130,29 @@ func (p *Tracer) SetupTC() { p.log.Error("cannot enable L7 context-propagation, kernel 5.17 or newer required") } - ebpfcommon.WatchAndRegisterTC(context.Background(), p.cfg.ChannelBufferLen, p.registerTC, p.log) + p.tcManager = tcmanager.NewNetlinkManager() + p.tcManager.AddProgram("tc/tc_http_egress", p.bpfObjects.BeylaTcHttpEgress, tcmanager.AttachmentEgress) + p.tcManager.AddProgram("tc/tc_http_ingress", p.bpfObjects.BeylaTcHttpIngress, tcmanager.AttachmentIngress) + p.tcManager.Start(ctx) } func (p *Tracer) Run(ctx context.Context, _ chan<- []request.Span) { + p.startTC(ctx) + <-ctx.Done() p.bpfObjects.Close() - p.closeTC() + p.stopTC() } -func (p *Tracer) registerTC(iface ifaces.Interface) { - links := ebpfcommon.RegisterTC(iface, - p.bpfObjects.BeylaTcHttpEgress.FD(), ebpfcommon.HTTPTracerTCHandle, "tc/tc_http_egress", - p.bpfObjects.BeylaTcHttpIngress.FD(), ebpfcommon.HTTPTracerTCHandle, "tc/tc_http_ingress", - p.log) - - if links == nil { +func (p *Tracer) stopTC() { + if p.tcManager == nil { return } - p.qdiscs[iface] = links.Qdisc - p.ingressFilters[iface] = links.IngressFilter - p.egressFilters[iface] = links.EgressFilter -} - -func (p *Tracer) closeTC() { p.log.Info("removing traffic control probes") - p.bpfObjects.BeylaTcHttpEgress.Close() - p.bpfObjects.BeylaTcHttpIngress.Close() - - ebpfcommon.CloseTCLinks(p.qdiscs, p.egressFilters, p.ingressFilters, p.log) - - p.egressFilters = map[ifaces.Interface]*netlink.BpfFilter{} - p.ingressFilters = map[ifaces.Interface]*netlink.BpfFilter{} - p.qdiscs = map[ifaces.Interface]*netlink.GenericQdisc{} + p.tcManager.Stop() + p.tcManager = nil } diff --git a/pkg/internal/ebpf/httptracer/httptracer_notlinux.go b/pkg/internal/ebpf/httptracer/httptracer_notlinux.go index 71248c748..1a0ce0159 100644 --- a/pkg/internal/ebpf/httptracer/httptracer_notlinux.go +++ b/pkg/internal/ebpf/httptracer/httptracer_notlinux.go @@ -39,6 +39,5 @@ func (p *Tracer) UnlinkInstrumentedLib(_ uint64) {} func (p *Tracer) AlreadyInstrumentedLib(_ uint64) bool { return false } func (p *Tracer) Run(_ context.Context, _ chan<- []request.Span) {} func (p *Tracer) Constants() map[string]any { return nil } -func (p *Tracer) SetupTC() {} func (p *Tracer) SetupTailCalls() {} func (p *Tracer) RegisterOffsets(_ *exec.FileInfo, _ *goexec.Offsets) {} diff --git a/pkg/internal/ebpf/tcmanager/netlinkmanager.go b/pkg/internal/ebpf/tcmanager/netlinkmanager.go new file mode 100644 index 000000000..367e091c5 --- /dev/null +++ b/pkg/internal/ebpf/tcmanager/netlinkmanager.go @@ -0,0 +1,432 @@ +//go:build linux + +package tcmanager + +import ( + "context" + "errors" + "fmt" + "io/fs" + "log/slog" + "sync/atomic" + "time" + + "github.com/cilium/ebpf" + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" + + "github.com/grafana/beyla/pkg/internal/netolly/ifaces" +) + +var nextTCHandle = atomic.Uint32{} + +func nextHandle() uint32 { + // handles start at the 0xb310 value + nextTCHandle.CompareAndSwap(0, 0xb310) + return nextTCHandle.Add(1) +} + +type netlinkProg struct { + prog *ebpf.Program + name string + attachType AttachmentType +} + +type netlinkIface struct { + ifaces.Interface + qdisc *netlink.GenericQdisc + filters []*netlink.BpfFilter +} + +type netlinkIfaceMap map[int]*netlinkIface + +type netlinkManager struct { + tcManagerBase + interfaces netlinkIfaceMap + programs []*netlinkProg +} + +func netlinkAttachType(attachment AttachmentType) (uint32, error) { + switch attachment { + case AttachmentEgress: + return netlink.HANDLE_MIN_EGRESS, nil + case AttachmentIngress: + return netlink.HANDLE_MIN_INGRESS, nil + } + + return 0, fmt.Errorf("Invalid attachment type: %d", attachment) +} + +func NewNetlinkManager() TCManager { + return &netlinkManager { + tcManagerBase: newTCManagerBase("tc_manager_netlink"), + interfaces: netlinkIfaceMap{}, + programs: []*netlinkProg{}, + } +} + +func (tc *netlinkManager) Start(ctx context.Context) { + if tc.registerer != nil { + return + } + + var informer ifaces.Informer + + if tc.monitorMode == MonitorPoll { + informer = ifaces.NewPoller(tc.pollPeriod, tc.channelBufferLen) + } else { + informer = ifaces.NewWatcher(tc.channelBufferLen) + } + + registerer := ifaces.NewRegisterer(informer, tc.channelBufferLen) + + ifaceEvents, err := registerer.Subscribe(ctx) + + if err != nil { + tc.log.Error("instantiating interfaces' informer", "error", err) + return + } + + tc.registerer = registerer + + tc.wg.Add(1) + + go func() { + for { + select { + case <-ctx.Done(): + tc.shutdown() + tc.wg.Done() + return + case event := <-ifaceEvents: + tc.log.Debug("received event", "event", event) + switch event.Type { + case ifaces.EventAdded: + tc.onInterfaceAdded(event.Interface) + case ifaces.EventDeleted: + tc.onInterfaceRemoved(event.Interface) + default: + tc.log.Warn("unknown event type", "event", event) + } + } + } + }() +} + +func (tc *netlinkManager) Stop() { + tc.wg.Wait() +} + +func (tc *netlinkManager) shutdown() { + tc.log.Debug("TC initiated shutdown") + + tc.mutex.Lock() + defer tc.mutex.Unlock() + + tc.cleanupInterfacesLocked() + tc.cleanupProgsLocked() + + tc.registerer = nil + + tc.log.Debug("TC completed shutdown") +} + +func (tc *netlinkManager) AddProgram(name string, prog *ebpf.Program, attachment AttachmentType) { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + p := &netlinkProg { + prog: prog, + name: name, + attachType: attachment, + } + + tc.programs = append(tc.programs, p) + tc.attachProgramLocked(p) +} + +func (tc *netlinkManager) RemoveProgram(name string) { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + //FIXME + //delete(tc.programs, name) +} + +func (tc *netlinkManager) attachProgramLocked(prog *netlinkProg) { + for _, iface := range tc.interfaces { + tc.attachProgramToIfaceLocked(prog, iface) + } +} + +func (tc *netlinkManager) attachProgramToIfaceLocked(prog *netlinkProg, iface *netlinkIface) { + if prog == nil { + return + } + + attachType, err := netlinkAttachType(prog.attachType) + + if err != nil { + tc.log.Error("Error attaching program", "error", err) + return + } + + attrs := netlink.FilterAttrs{ + LinkIndex: iface.Index, + Parent: attachType, + Handle: nextHandle(), + Protocol: unix.ETH_P_ALL, + Priority: 1, + } + + filter := &netlink.BpfFilter{ + FilterAttrs: attrs, + Fd: prog.prog.FD(), + Name: prog.name, + DirectAction: true, + } + + if err := netlink.FilterDel(filter); err == nil { + tc.log.Warn("filter already existed. Deleted it", "filter", prog.name, "iface", iface) + } + + if err := netlink.FilterAdd(filter); err != nil { + if errors.Is(err, fs.ErrExist) { + tc.log.Warn("filter already exists. Ignoring", "error", err) + } else { + tc.log.Error("failed to create filter", "error", err) + } + + return + } + + iface.filters = append(iface.filters, filter) +} + +func (tc *netlinkManager) onInterfaceAdded(i ifaces.Interface) { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + if tc.filter != nil && !tc.filter.IsAllowed(i.Name) { + tc.log.Debug("Interface now allowed", "interface", i.Name) + return + } + + qdisc := tc.installQdisc(i) + + if qdisc == nil { + tc.log.Debug("Unable to install qdisc, ignoring interface", "interface", i.Name) + return + } + + iface := &netlinkIface{i, qdisc, []*netlink.BpfFilter{}} + tc.interfaces[i.Index] = iface + + for _, prog := range tc.programs { + tc.attachProgramToIfaceLocked(prog, iface) + } +} + +func (tc *netlinkManager) onInterfaceRemoved(iface ifaces.Interface) { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + // links, qdiscs and other associated resources are automatically removed + // when an interface is removed, there's no need to explicitly remove them + delete(tc.interfaces, iface.Index) +} + +func (tc *netlinkManager) InterfaceName(ifaceIndex int) (string, bool) { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + if iface, ok := tc.interfaces[ifaceIndex]; ok { + return iface.Name, true + } + + return "", false +} + +func (tc *netlinkManager) SetInterfaceFilter(filter *InterfaceFilter) { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + tc.filter = filter +} + +func (tc *netlinkManager) installQdisc(iface ifaces.Interface) *netlink.GenericQdisc { + link, err := netlink.LinkByIndex(iface.Index) + + if err != nil { + tc.log.Error("failed to lookup link device", "index", iface.Index, "name", iface.Name, "error", err) + return nil + } + + qdiscAttrs := netlink.QdiscAttrs{ + LinkIndex: link.Attrs().Index, + Handle: netlink.MakeHandle(0xffff, 0), + Parent: netlink.HANDLE_CLSACT, + } + + qdisc := &netlink.GenericQdisc{ + QdiscAttrs: qdiscAttrs, + QdiscType: "clsact", + } + + if err := netlink.QdiscAdd(qdisc); err != nil { + if errors.Is(err, fs.ErrExist) { + tc.log.Warn("qdisc clsact already exists. Ignoring", "error", err) + } else { + tc.log.Error("failed to create clsact qdisc on", "index", iface.Index, "name", iface.Name, "error", err) + return nil + } + } + + return qdisc +} + +func (tc *netlinkManager) registerFilter(linkIndex int, fd int, handle uint32, parent uint32, name string) (*netlink.BpfFilter, error) { + attrs := netlink.FilterAttrs{ + LinkIndex: linkIndex, + Parent: parent, + Handle: handle, + Protocol: unix.ETH_P_ALL, + Priority: 1, + } + + filter := &netlink.BpfFilter{ + FilterAttrs: attrs, + Fd: fd, + Name: name, + DirectAction: true, + } + + if err := netlink.FilterDel(filter); err == nil { + tc.log.Warn("filter already existed. Deleted it", "filter", name, "iface", linkIndex) + } + + if err := netlink.FilterAdd(filter); err != nil { + if errors.Is(err, fs.ErrExist) { + tc.log.Warn("filter already exists. Ignoring", "error", err) + } else { + return nil, fmt.Errorf("failed to create filter: %w", err) + } + } + + return filter, nil +} + +func (tc *netlinkManager) cleanupInterfacesLocked() { + for _, iface := range tc.interfaces { + tc.cleanupFiltersLocked(iface) + + // make sure this happens only after cleaning up filters, + // so that we don't remove 3rdparty filters + tc.cleanupQdiscLocked(iface) + } + + tc.interfaces = netlinkIfaceMap{} +} + +func (tc *netlinkManager) cleanupFiltersLocked(iface *netlinkIface) { + for _, filter := range iface.filters { + tc.log.Debug("deleting filter", "interface", iface, "name", filter.Name) + + err := doIgnoreNoDev(netlink.FilterDel, netlink.Filter(filter)) + + if err != nil { + tc.log.Error("deleting filter", "interface", iface, + "filter", filter.Name, "error", err) + } + } +} + +func (tc *netlinkManager) cleanupQdiscLocked(iface *netlinkIface) { + if iface.qdisc == nil { + return + } + + hasEgressFilters := ifaceHasFilters(iface, netlink.HANDLE_MIN_EGRESS) + hasIngressFilters := ifaceHasFilters(iface, netlink.HANDLE_MIN_INGRESS) + + if hasEgressFilters || hasIngressFilters { + tc.log.Debug("not deleting Qdisc as it still has children", "interface", iface) + return + } + + tc.log.Debug("deleting Qdisc", "interface", iface) + + if err := doIgnoreNoDev(netlink.QdiscDel, netlink.Qdisc(iface.qdisc)); err != nil { + tc.log.Error("deleting qdisc", "error", err) + } +} + +func ifaceHasFilters(iface *netlinkIface, parent uint32) bool { + link, err := netlink.LinkByIndex(iface.Index) + + if err != nil { + return true // be conservative assume we have filters if we can't detect them + } + + filters, err := netlink.FilterList(link, parent) + + if err != nil { + return true // be conservative assume we have filters if we can't detect them + } + + return len(filters) > 0 +} + +func (tc *netlinkManager) cleanupProgsLocked() { + for _, prog := range tc.programs { + tc.log.Debug("closing tc program", "name", prog.name) + prog.prog.Close() + } + + tc.programs = []*netlinkProg{} +} + +// doIgnoreNoDev runs the provided syscall over the provided device and ignores the error +// if the cause is a non-existing device (just logs the error as debug). +// If the agent is deployed as part of the Network Metrics pipeline, normally +// undeploying the FlowCollector could cause the agent to try to remove resources +// from Pods that have been removed immediately before (e.g. flowlogs-pipeline or the +// console plugin), so we avoid logging some errors that would unnecessarily raise the +// user's attention. +// This function uses generics because the set of provided functions accept different argument +// types. +func doIgnoreNoDev[T any](sysCall func(T) error, dev T) error { + if err := sysCall(dev); err != nil { + if errors.Is(err, unix.ENODEV) { + slog.Error("can't delete. Ignore this error if other pods or interfaces "+ + " are also being deleted at this moment. For example, if you are undeploying "+ + " a FlowCollector or Deployment where this agent is part of", + "error", err) + } else { + return err + } + } + return nil +} + +func (tc *netlinkManager) SetMonitorMode(mode MonitorMode) { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + tc.monitorMode = mode +} + +func (tc *netlinkManager) SetChannelBufferLen(channelBufferLen int) { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + tc.channelBufferLen = channelBufferLen +} + +func (tc *netlinkManager) SetPollPeriod(period time.Duration) { + tc.mutex.Lock() + defer tc.mutex.Unlock() + + tc.pollPeriod = period +} + diff --git a/pkg/internal/ebpf/tcmanager/tcmanager.go b/pkg/internal/ebpf/tcmanager/tcmanager.go new file mode 100644 index 000000000..27486f1be --- /dev/null +++ b/pkg/internal/ebpf/tcmanager/tcmanager.go @@ -0,0 +1,66 @@ +package tcmanager + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/cilium/ebpf" + + "github.com/grafana/beyla/pkg/internal/netolly/ifaces" +) + +type AttachmentType uint8 + +const ( + AttachmentEgress = AttachmentType(iota) + AttachmentIngress +) + +type MonitorMode uint8 + +const ( + MonitorPoll = MonitorMode(iota) + MonitorWatch +) + +const DefaultMonitorMode = MonitorMode(MonitorWatch) +const DefaultChannelBufferLen = 10 +const DefaultPollPeriod = 10 * time.Second + +type TCManager interface { + Start(ctx context.Context) + Stop() + AddProgram(name string, prog *ebpf.Program, attachment AttachmentType) + RemoveProgram(name string) + InterfaceName(ifaceIndex int) (string, bool) + SetInterfaceFilter(filter *InterfaceFilter) + SetMonitorMode(mode MonitorMode) + SetChannelBufferLen(channelBufferLen int) + SetPollPeriod(period time.Duration) +} + +type tcManagerBase struct { + filter *InterfaceFilter + monitorMode MonitorMode + channelBufferLen int + pollPeriod time.Duration + registerer *ifaces.Registerer + log *slog.Logger + mutex sync.Mutex + wg sync.WaitGroup +} + +func newTCManagerBase(component string) tcManagerBase { + return tcManagerBase { + filter: nil, + monitorMode: DefaultMonitorMode, + channelBufferLen: DefaultChannelBufferLen, + pollPeriod: DefaultPollPeriod, + registerer: nil, + log: slog.With("component", component), + mutex: sync.Mutex{}, + wg: sync.WaitGroup{}, + } +} diff --git a/pkg/internal/ebpf/tcmanager/tcxmanager.go b/pkg/internal/ebpf/tcmanager/tcxmanager.go new file mode 100644 index 000000000..3aa733974 --- /dev/null +++ b/pkg/internal/ebpf/tcmanager/tcxmanager.go @@ -0,0 +1,246 @@ +//go:build linux + +package tcmanager + +import ( + "context" + "fmt" + "time" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" + + "github.com/grafana/beyla/pkg/internal/netolly/ifaces" +) + +type attachedProg struct { + prog *ebpf.Program + attachType AttachmentType +} + +type tcxInterfaceMap map[int]ifaces.Interface +type tcxProgramsMap map[string]*attachedProg +type tcxLinksMap map[int][]link.Link + +type tcxManager struct { + tcManagerBase + interfaces tcxInterfaceMap + programs tcxProgramsMap + links tcxLinksMap +} + +func NewTCXManager() TCManager { + return &tcxManager { + tcManagerBase: newTCManagerBase("tcx_manager"), + interfaces: tcxInterfaceMap{}, + programs: tcxProgramsMap{}, + links: tcxLinksMap{}, + } +} + +func tcxAttachType(attachment AttachmentType) (ebpf.AttachType, error) { + switch attachment { + case AttachmentEgress: + return ebpf.AttachTCXEgress, nil + case AttachmentIngress: + return ebpf.AttachTCXIngress, nil + } + + return 0, fmt.Errorf("Invalid attachment type: %d", attachment) +} + +func (tcx *tcxManager) Start(ctx context.Context) { + if tcx.registerer != nil { + return + } + + informer := ifaces.NewWatcher(tcx.channelBufferLen) + registerer := ifaces.NewRegisterer(informer, tcx.channelBufferLen) + + ifaceEvents, err := registerer.Subscribe(ctx) + + if err != nil { + tcx.log.Error("instantiating interfaces' informer", "error", err) + return + } + + tcx.registerer = registerer + + tcx.wg.Add(1) + + go func() { + for { + select { + case <-ctx.Done(): + tcx.shutdown() + tcx.wg.Done() + return + case event := <-ifaceEvents: + tcx.log.Debug("received event", "event", event) + switch event.Type { + case ifaces.EventAdded: + tcx.onInterfaceAdded(event.Interface) + case ifaces.EventDeleted: + tcx.onInterfaceRemoved(event.Interface) + default: + tcx.log.Warn("unknown event type", "event", event) + } + } + } + }() +} + +func (tcx *tcxManager) Stop() { + tcx.wg.Wait() +} + +func (tcx *tcxManager) shutdown() { + tcx.log.Debug("TCX initiated shutdown") + + tcx.mutex.Lock() + defer tcx.mutex.Unlock() + + for _, iface := range tcx.interfaces { + tcx.removeInterfaceLocked(iface) + } + + tcx.registerer = nil + tcx.interfaces = tcxInterfaceMap{} + tcx.programs = tcxProgramsMap{} + tcx.links = tcxLinksMap{} + + tcx.log.Debug("TCX completed shutdown") +} + +func (tcx *tcxManager) AddProgram(name string, prog *ebpf.Program, attachment AttachmentType) { + tcx.mutex.Lock() + defer tcx.mutex.Unlock() + + p := &attachedProg { + prog: prog, + attachType: attachment, + } + + tcx.programs[name] = p + tcx.attachProgramLocked(name, p) +} + +func (tcx *tcxManager) RemoveProgram(name string) { + delete(tcx.programs, name) +} + +func (tcx *tcxManager) attachProgramLocked(name string, prog *attachedProg) { + for iface, _ := range tcx.interfaces { + tcx.attachProgramToIfaceLocked(prog, iface) + } +} + +func (tcx *tcxManager) attachProgramToIfaceLocked(prog *attachedProg, iface int) { + if prog == nil { + return + } + + attachType, err := tcxAttachType(prog.attachType) + + if err != nil { + tcx.log.Error("Error attaching program", "error", err) + return + } + + link, err := link.AttachTCX(link.TCXOptions{ + Program: prog.prog, + Attach: attachType, + Interface: iface, + Anchor: link.Head(), + }) + + if err != nil { + tcx.log.Error("Error attaching tcx", "error", err) + return + } + + tcx.links[iface] = append(tcx.links[iface], link) +} + +func (tcx *tcxManager) onInterfaceAdded(iface ifaces.Interface) { + tcx.mutex.Lock() + defer tcx.mutex.Unlock() + + if tcx.filter != nil && !tcx.filter.IsAllowed(iface.Name) { + tcx.log.Debug("Interface now allowed", "interface", iface.Name) + return + } + + tcx.interfaces[iface.Index] = iface + + for _, prog := range tcx.programs { + tcx.attachProgramToIfaceLocked(prog, iface.Index) + } +} + +func (tcx *tcxManager) onInterfaceRemoved(iface ifaces.Interface) { + tcx.mutex.Lock() + defer tcx.mutex.Unlock() + + tcx.removeInterfaceLocked(iface) +} + +func (tcx *tcxManager) removeInterfaceLocked(iface ifaces.Interface) { + tcx.closeLinksLocked(iface) + + delete(tcx.interfaces, iface.Index) +} + +func (tcx *tcxManager) closeLinksLocked(iface ifaces.Interface) { + links, ok := tcx.links[iface.Index] + + if !ok { + return + } + + for _, link := range links { + link.Close() + } + + delete(tcx.links, iface.Index) +} + +func (tcx *tcxManager) InterfaceName(ifaceIndex int) (string, bool) { + tcx.mutex.Lock() + defer tcx.mutex.Unlock() + + if iface, ok := tcx.interfaces[ifaceIndex]; ok { + return iface.Name, true + } + + return "", false +} + +func (tcx *tcxManager) SetInterfaceFilter(filter *InterfaceFilter) { + tcx.mutex.Lock() + defer tcx.mutex.Unlock() + + tcx.filter = filter +} + +func (tcx *tcxManager) SetMonitorMode(mode MonitorMode) { + tcx.mutex.Lock() + defer tcx.mutex.Unlock() + + tcx.monitorMode = mode +} + +func (tcx *tcxManager) SetChannelBufferLen(channelBufferLen int) { + tcx.mutex.Lock() + defer tcx.mutex.Unlock() + + tcx.channelBufferLen = channelBufferLen +} + +func (tcx *tcxManager) SetPollPeriod(period time.Duration) { + tcx.mutex.Lock() + defer tcx.mutex.Unlock() + + tcx.pollPeriod = period +} + diff --git a/pkg/internal/ebpf/tctracer/tctracer.go b/pkg/internal/ebpf/tctracer/tctracer.go index 5e5862966..0840b4092 100644 --- a/pkg/internal/ebpf/tctracer/tctracer.go +++ b/pkg/internal/ebpf/tctracer/tctracer.go @@ -9,13 +9,12 @@ import ( "unsafe" "github.com/cilium/ebpf" - "github.com/vishvananda/netlink" "github.com/grafana/beyla/pkg/beyla" ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" + "github.com/grafana/beyla/pkg/internal/ebpf/tcmanager" "github.com/grafana/beyla/pkg/internal/exec" "github.com/grafana/beyla/pkg/internal/goexec" - "github.com/grafana/beyla/pkg/internal/netolly/ifaces" "github.com/grafana/beyla/pkg/internal/request" "github.com/grafana/beyla/pkg/internal/svc" ) @@ -28,19 +27,15 @@ type Tracer struct { bpfObjects bpfObjects closers []io.Closer log *slog.Logger - qdiscs map[ifaces.Interface]*netlink.GenericQdisc - egressFilters map[ifaces.Interface]*netlink.BpfFilter - ingressFilters map[ifaces.Interface]*netlink.BpfFilter + tcManager tcmanager.TCManager } func New(cfg *beyla.Config) *Tracer { log := slog.With("component", "tc.Tracer") + return &Tracer{ log: log, cfg: cfg, - qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, - egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, } } @@ -135,48 +130,36 @@ func (p *Tracer) AlreadyInstrumentedLib(uint64) bool { return false } -func (p *Tracer) SetupTC() { +func (p *Tracer) startTC(ctx context.Context) { + if p.tcManager != nil { + return + } + if !p.cfg.EBPF.UseTCForCP { return } p.log.Info("enabling L4 context-propagation with Linux Traffic Control") - ebpfcommon.WatchAndRegisterTC(context.Background(), p.cfg.ChannelBufferLen, p.registerTC, p.log) + p.tcManager = tcmanager.NewNetlinkManager() + p.tcManager.AddProgram("tc/tc_egress", p.bpfObjects.BeylaAppEgress, tcmanager.AttachmentEgress) + p.tcManager.AddProgram("tc/tc_ingress", p.bpfObjects.BeylaAppIngress, tcmanager.AttachmentIngress) + p.tcManager.Start(ctx) } func (p *Tracer) Run(ctx context.Context, _ chan<- []request.Span) { + p.startTC(ctx) + <-ctx.Done() p.bpfObjects.Close() - p.closeTC() + p.stopTC() } -func (p *Tracer) registerTC(iface ifaces.Interface) { - links := ebpfcommon.RegisterTC(iface, - p.bpfObjects.BeylaAppEgress.FD(), ebpfcommon.TCTracerTCHandle, "tc/tc_egress", - p.bpfObjects.BeylaAppIngress.FD(), ebpfcommon.TCTracerTCHandle, "tc/tc_ingress", - p.log) - - if links == nil { - return - } - - p.qdiscs[iface] = links.Qdisc - p.ingressFilters[iface] = links.IngressFilter - p.egressFilters[iface] = links.EgressFilter -} - -func (p *Tracer) closeTC() { +func (p *Tracer) stopTC() { p.log.Info("removing traffic control probes") - p.bpfObjects.BeylaAppEgress.Close() - p.bpfObjects.BeylaAppIngress.Close() - - ebpfcommon.CloseTCLinks(p.qdiscs, p.egressFilters, p.ingressFilters, p.log) - - p.egressFilters = map[ifaces.Interface]*netlink.BpfFilter{} - p.ingressFilters = map[ifaces.Interface]*netlink.BpfFilter{} - p.qdiscs = map[ifaces.Interface]*netlink.GenericQdisc{} + p.tcManager.Stop() + p.tcManager = nil } diff --git a/pkg/internal/ebpf/tctracer/tctracer_notlinux.go b/pkg/internal/ebpf/tctracer/tctracer_notlinux.go index 812410a34..7cf3a0928 100644 --- a/pkg/internal/ebpf/tctracer/tctracer_notlinux.go +++ b/pkg/internal/ebpf/tctracer/tctracer_notlinux.go @@ -39,6 +39,5 @@ func (p *Tracer) UnlinkInstrumentedLib(_ uint64) {} func (p *Tracer) AlreadyInstrumentedLib(_ uint64) bool { return false } func (p *Tracer) Run(_ context.Context, _ chan<- []request.Span) {} func (p *Tracer) Constants() map[string]any { return nil } -func (p *Tracer) SetupTC() {} func (p *Tracer) SetupTailCalls() {} func (p *Tracer) RegisterOffsets(_ *exec.FileInfo, _ *goexec.Offsets) {} diff --git a/pkg/internal/ebpf/tracer.go b/pkg/internal/ebpf/tracer.go index e0a715cf5..84fda3483 100644 --- a/pkg/internal/ebpf/tracer.go +++ b/pkg/internal/ebpf/tracer.go @@ -83,8 +83,6 @@ type Tracer interface { // SockOps returns a list of programs that need to be loaded as a // BPF_PROG_TYPE_SOCK_OPS eBPF programs SockOps() []ebpfcommon.SockOps - // Sets up Linux traffic control egress/ingress - SetupTC() // Probes can potentially instrument a shared library among multiple executables // These two functions alow programs to remember this and avoid duplicated instrumentations // The argument is the OS file id diff --git a/pkg/internal/ebpf/tracer_linux.go b/pkg/internal/ebpf/tracer_linux.go index 7243574ff..b0d889cf5 100644 --- a/pkg/internal/ebpf/tracer_linux.go +++ b/pkg/internal/ebpf/tracer_linux.go @@ -180,9 +180,6 @@ func (pt *ProcessTracer) loadTracer(p Tracer, log *slog.Logger) error { // Setup any tail call jump tables p.SetupTailCalls() - // Setup any traffic control probes - p.SetupTC() - i := instrumenter{} // dummy instrumenter to setup the kprobes, socket filters and tracepoint probes // Kprobes to be used for native instrumentation points diff --git a/pkg/internal/netolly/agent/agent.go b/pkg/internal/netolly/agent/agent.go index ada16e167..d85612c7e 100644 --- a/pkg/internal/netolly/agent/agent.go +++ b/pkg/internal/netolly/agent/agent.go @@ -28,7 +28,8 @@ import ( "github.com/cilium/ebpf/ringbuf" "github.com/grafana/beyla/pkg/beyla" - ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" + //ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" + "github.com/grafana/beyla/pkg/internal/ebpf/tcmanager" "github.com/grafana/beyla/pkg/internal/netolly/ebpf" "github.com/grafana/beyla/pkg/internal/netolly/flow" "github.com/grafana/beyla/pkg/internal/netolly/ifaces" @@ -90,8 +91,7 @@ type Flows struct { ctxInfo *global.ContextInfo // input data providers - registerer *ifaces.Registerer - filter interfaceFilter + tcManager tcmanager.TCManager ebpf ebpfFlowFetcher // processing nodes to be wired in the buildPipeline method @@ -119,70 +119,85 @@ func FlowsAgent(ctxInfo *global.ContextInfo, cfg *beyla.Config) (*Flows, error) alog := alog() alog.Info("initializing Flows agent") - // configure informer for new interfaces - var informer ifaces.Informer - switch cfg.NetworkFlows.ListenInterfaces { - case listenPoll: - alog.Debug("listening for new interfaces: use polling", - "period", cfg.NetworkFlows.ListenPollPeriod) - informer = ifaces.NewPoller(cfg.NetworkFlows.ListenPollPeriod, cfg.ChannelBufferLen) - case listenWatch: - alog.Debug("listening for new interfaces: use watching") - informer = ifaces.NewWatcher(cfg.ChannelBufferLen) - default: - alog.Warn("wrong interface listen method. Using file watcher as default", - "providedValue", cfg.NetworkFlows.ListenInterfaces) - informer = ifaces.NewWatcher(cfg.ChannelBufferLen) - } + tcManager := tcmanager.NewNetlinkManager() + tcManager.SetChannelBufferLen(cfg.ChannelBufferLen) + tcManager.SetPollPeriod(cfg.NetworkFlows.ListenPollPeriod) + tcManager.SetMonitorMode(monitorMode(cfg, alog)) alog.Debug("acquiring Agent IP") + agentIP, err := fetchAgentIP(&cfg.NetworkFlows) + if err != nil { return nil, fmt.Errorf("acquiring Agent IP: %w", err) } + alog.Debug("agent IP: " + agentIP.String()) - var fetcher ebpfFlowFetcher + fetcher, err := newFetcher(cfg, tcManager, alog) + + if err != nil { + return nil, err + } + + return flowsAgent(ctxInfo, cfg, fetcher, agentIP, tcManager) +} +func newFetcher(cfg *beyla.Config, tcManager tcmanager.TCManager, + alog *slog.Logger) (ebpfFlowFetcher, error) { switch cfg.NetworkFlows.Source { case beyla.EbpfSourceSock: alog.Info("using socket filter for collecting network events") - fetcher, err = ebpf.NewSockFlowFetcher(cfg.NetworkFlows.Sampling, cfg.NetworkFlows.CacheMaxFlows) - if err != nil { - return nil, err - } + + return ebpf.NewSockFlowFetcher(cfg.NetworkFlows.Sampling, cfg.NetworkFlows.CacheMaxFlows) case beyla.EbpfSourceTC: alog.Info("using kernel Traffic Control for collecting network events") ingress, egress := flowDirections(&cfg.NetworkFlows) - fetcher, err = ebpf.NewFlowFetcher(cfg.NetworkFlows.Sampling, cfg.NetworkFlows.CacheMaxFlows, ingress, egress) - if err != nil { - return nil, err - } - default: - return nil, fmt.Errorf("unknown network configuration eBPF source specified, allowed options are [tc, socket_filter]") + + return ebpf.NewFlowFetcher(cfg.NetworkFlows.Sampling, + cfg.NetworkFlows.CacheMaxFlows, ingress, egress, tcManager) } - return flowsAgent(ctxInfo, cfg, informer, fetcher, agentIP) + return nil, fmt.Errorf("unknown network configuration eBPF source specified, allowed options are [tc, socket_filter]") +} + +func monitorMode(cfg *beyla.Config, alog *slog.Logger) tcmanager.MonitorMode { + switch cfg.NetworkFlows.ListenInterfaces { + case listenPoll: + alog.Debug("listening for new interfaces: use polling", + "period", cfg.NetworkFlows.ListenPollPeriod) + + return tcmanager.MonitorPoll + case listenWatch: + alog.Debug("listening for new interfaces: use watching") + + return tcmanager.MonitorWatch + } + + alog.Warn("wrong interface listen method. Using file watcher as default", + "providedValue", cfg.NetworkFlows.ListenInterfaces) + + return tcmanager.MonitorWatch } // flowsAgent is a private constructor with injectable dependencies, usable for tests func flowsAgent( ctxInfo *global.ContextInfo, cfg *beyla.Config, - informer ifaces.Informer, fetcher ebpfFlowFetcher, agentIP net.IP, + tcManager tcmanager.TCManager, ) (*Flows, error) { // configure allow/deny interfaces filter - filter, err := initInterfaceFilter(cfg.NetworkFlows.Interfaces, cfg.NetworkFlows.ExcludeInterfaces) + filter, err := tcmanager.NewInterfaceFilter(cfg.NetworkFlows.Interfaces, cfg.NetworkFlows.ExcludeInterfaces) if err != nil { return nil, fmt.Errorf("configuring interface filters: %w", err) } - registerer := ifaces.NewRegisterer(informer, cfg.ChannelBufferLen) + tcManager.SetInterfaceFilter(filter) interfaceNamer := func(ifIndex int) string { - iface, ok := registerer.IfaceNameForIndex(ifIndex) + iface, ok := tcManager.InterfaceName(ifIndex) if !ok { return "unknown" } @@ -191,11 +206,11 @@ func flowsAgent( mapTracer := flow.NewMapTracer(fetcher, cfg.NetworkFlows.CacheActiveTimeout) rbTracer := flow.NewRingBufTracer(fetcher, mapTracer, cfg.NetworkFlows.CacheActiveTimeout) + return &Flows{ ctxInfo: ctxInfo, ebpf: fetcher, - registerer: registerer, - filter: filter, + tcManager: tcManager, cfg: cfg, mapTracer: mapTracer, rbTracer: rbTracer, @@ -230,6 +245,8 @@ func (f *Flows) Run(ctx context.Context) error { return fmt.Errorf("starting processing graph: %w", err) } + f.tcManager.Start(ctx) + graph.Start() f.status = StatusStarted @@ -243,6 +260,9 @@ func (f *Flows) Run(ctx context.Context) error { } alog.Debug("waiting for all nodes to finish their pending work") + + f.tcManager.Stop() + <-graph.Done() f.status = StatusStopped @@ -253,28 +273,3 @@ func (f *Flows) Run(ctx context.Context) error { func (f *Flows) Status() Status { return f.status } - -// interfacesManager uses an informer to check new/deleted network interfaces. For each running -// interface, it registers a flow ebpfFetcher that will forward new flows to the returned channel -// TODO: consider move this method and "onInterfaceAdded" to another type -func (f *Flows) interfacesManager(ctx context.Context) error { - slog := alog().With("function", "interfacesManager") - - ebpfcommon.StartTCMonitorLoop(ctx, f.registerer, f.onInterfaceAdded, slog) - - return nil -} - -func (f *Flows) onInterfaceAdded(iface ifaces.Interface) { - alog := alog().With("interface", iface) - // ignore interfaces that do not match the user configuration acceptance/exclusion lists - if !f.filter.Allowed(iface.Name) { - alog.Debug("interface does not match the allow/exclusion filters. Ignoring") - return - } - alog.Info("interface detected. Registering flow ebpfFetcher") - if err := f.ebpf.Register(iface); err != nil { - alog.Warn("can't register flow ebpfFetcher. Ignoring", "error", err) - return - } -} diff --git a/pkg/internal/netolly/agent/pipeline.go b/pkg/internal/netolly/agent/pipeline.go index 1803261e8..6e370f19d 100644 --- a/pkg/internal/netolly/agent/pipeline.go +++ b/pkg/internal/netolly/agent/pipeline.go @@ -77,11 +77,6 @@ func (f *Flows) buildPipeline(ctx context.Context) (*pipe.Runner, error) { func (f *Flows) pipelineBuilder(ctx context.Context) (*pipe.Builder[*FlowsPipeline], error) { alog := alog() - alog.Debug("registering interfaces' listener in background") - err := f.interfacesManager(ctx) - if err != nil { - return nil, err - } alog.Debug("creating flows' processing graph") pb := pipe.NewBuilder(&FlowsPipeline{}, pipe.ChannelBufferLen(f.cfg.ChannelBufferLen)) diff --git a/pkg/internal/netolly/ebpf/tracer.go b/pkg/internal/netolly/ebpf/tracer.go index 1077d352c..885380bb6 100644 --- a/pkg/internal/netolly/ebpf/tracer.go +++ b/pkg/internal/netolly/ebpf/tracer.go @@ -29,10 +29,9 @@ import ( "github.com/cilium/ebpf/ringbuf" "github.com/cilium/ebpf/rlimit" - "github.com/vishvananda/netlink" - ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common" - "github.com/grafana/beyla/pkg/internal/netolly/ifaces" + "github.com/grafana/beyla/pkg/internal/ebpf/tcmanager" + "github.com/grafana/beyla/pkg/internal/netolly/ifaces" //FIXME remove ) // $BPF_CLANG and $BPF_CFLAGS are set by the Makefile. @@ -58,9 +57,6 @@ func tlog() *slog.Logger { type FlowFetcher struct { log *slog.Logger objects *NetObjects - qdiscs map[ifaces.Interface]*netlink.GenericQdisc - egressFilters map[ifaces.Interface]*netlink.BpfFilter - ingressFilters map[ifaces.Interface]*netlink.BpfFilter ringbufReader *ringbuf.Reader cacheMaxSize int enableIngress bool @@ -70,6 +66,7 @@ type FlowFetcher struct { func NewFlowFetcher( sampling, cacheMaxSize int, ingress, egress bool, + tcManager tcmanager.TCManager, ) (*FlowFetcher, error) { tlog := tlog() if err := rlimit.RemoveMemlock(); err != nil { @@ -107,65 +104,27 @@ func NewFlowFetcher( if err != nil { return nil, fmt.Errorf("accessing to ringbuffer: %w", err) } + + if egress { + tcManager.AddProgram("tc/egress_flow_parse", objects.BeylaEgressFlowParse, tcmanager.AttachmentEgress) + } + + if ingress { + tcManager.AddProgram("tc/ingress_flow_parse", objects.BeylaIngressFlowParse, tcmanager.AttachmentIngress) + } + return &FlowFetcher{ log: tlog, objects: &objects, ringbufReader: flows, - egressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - ingressFilters: map[ifaces.Interface]*netlink.BpfFilter{}, - qdiscs: map[ifaces.Interface]*netlink.GenericQdisc{}, cacheMaxSize: cacheMaxSize, enableIngress: ingress, enableEgress: egress, }, nil } -// Register and links the eBPF fetcher into the system. The program should invoke Unregister -// before exiting. -func (m *FlowFetcher) Register(iface ifaces.Interface) error { - ilog := m.log.With("interface", iface) - - if !m.enableEgress && !m.enableIngress { - ilog.Debug("both egress and ingress have been disabled in the configuration, skipping") - return nil - } - - qdisc := ebpfcommon.GetClsactQdisc(iface, ilog) - - if qdisc == nil { - return fmt.Errorf("failed to obtain a clsact qdisc") - } - - m.qdiscs[iface] = qdisc - - linkIndex := qdisc.QdiscAttrs.LinkIndex - - if m.enableEgress { - filter, err := ebpfcommon.RegisterEgress(linkIndex, - m.objects.BeylaEgressFlowParse.FD(), ebpfcommon.NetollyTCHandle, "tc/egress_flow_parse") - - if err != nil { - return fmt.Errorf("failed to install egress filters: %w", err) - } - - m.egressFilters[iface] = filter - } else { - ilog.Debug("ignoring egress traffic, according to user configuration") - } - - if m.enableIngress { - filter, err := ebpfcommon.RegisterIngress(linkIndex, - m.objects.BeylaIngressFlowParse.FD(), ebpfcommon.NetollyTCHandle, "tc/ingress_flow_parse") - - if err != nil { - return fmt.Errorf("failed to install ingress filters: %w", err) - } - - m.ingressFilters[iface] = filter - } else { - ilog.Debug("ignoring ingress traffic, according to user configuration") - } - +//FIXME remove +func (m *FlowFetcher) Register(_ ifaces.Interface) error { return nil } @@ -176,12 +135,6 @@ func (m *FlowFetcher) Close() error { log := tlog() log.Debug("unregistering eBPF objects") - ebpfcommon.CloseTCLinks(m.qdiscs, m.egressFilters, m.ingressFilters, log) - - m.egressFilters = map[ifaces.Interface]*netlink.BpfFilter{} - m.ingressFilters = map[ifaces.Interface]*netlink.BpfFilter{} - m.qdiscs = map[ifaces.Interface]*netlink.GenericQdisc{} - var errs []error // m.ringbufReader.Read is a blocking operation, so we need to close the ring buffer // from another goroutine to avoid the system not being able to exit if there