Skip to content

Commit

Permalink
Enabling external informer (#1266)
Browse files Browse the repository at this point in the history
* enabling external informer

* preparing integration tests

* Fixed tests

* almost-working tests with external cache

* specifying cache port in integration tests

* updating beyla-k8s-cache library
  • Loading branch information
mariomac authored Oct 25, 2024
1 parent ca5f23e commit 76d52ee
Show file tree
Hide file tree
Showing 24 changed files with 1,258 additions and 203 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
github.com/grafana/beyla-k8s-cache v0.0.0-20241017152938-23a6d2229caf
github.com/grafana/beyla-k8s-cache v0.0.0-20241022104537-4c9302930749
github.com/grafana/go-offsets-tracker v0.1.7
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/mariomac/guara v0.0.0-20230621100729-42bd7716e524
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/beyla-k8s-cache v0.0.0-20241017152938-23a6d2229caf h1:OQnoOTdx+7aCdNZqYGRZMNNdmyToL6U3DG3IoZJOeX8=
github.com/grafana/beyla-k8s-cache v0.0.0-20241017152938-23a6d2229caf/go.mod h1:SMZM7CaUstB0UGj+Wf4PWHPRCJ6cO/ax9ebUyb0Dt1o=
github.com/grafana/beyla-k8s-cache v0.0.0-20241022104537-4c9302930749 h1:NO8sqkVRhF4ZdAQvhUMjwWqDItkOq24BKkPiBYbpkoE=
github.com/grafana/beyla-k8s-cache v0.0.0-20241022104537-4c9302930749/go.mod h1:SMZM7CaUstB0UGj+Wf4PWHPRCJ6cO/ax9ebUyb0Dt1o=
github.com/grafana/go-offsets-tracker v0.1.7 h1:2zBQ7iiGzvyXY7LA8kaaSiEqH/Yx82UcfRabbY5aOG4=
github.com/grafana/go-offsets-tracker v0.1.7/go.mod h1:qcQdu7zlUKIFNUdBJlLyNHuJGW0SKWKjkrN6jtt+jds=
github.com/grafana/opentelemetry-go v1.28.0-grafana.3 h1:vExZiZKDZTdDi7fP1GG3GOGuoZ0GNu76408tNXfsnD0=
Expand Down
1 change: 1 addition & 0 deletions pkg/components/beyla.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func buildCommonContextInfo(
SyncTimeout: config.Attributes.Kubernetes.InformersSyncTimeout,
ResyncPeriod: config.Attributes.Kubernetes.InformersResyncPeriod,
DisabledInformers: config.Attributes.Kubernetes.DisableInformers,
MetaCacheAddr: config.Attributes.Kubernetes.MetaCacheAddress,
}),
}
switch {
Expand Down
13 changes: 6 additions & 7 deletions pkg/internal/discover/watcher_kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ func (wk *watcherKubeEnricher) enrichPodEvent(podEvent Event[*informer.ObjectMet
switch podEvent.Type {
case EventCreated:
wk.log.Debug("Pod added",
"namespace", podEvent.Obj.Namespace, "name", podEvent.Obj.Name,
"containers", podEvent.Obj.Pod.ContainerIds)
"namespace", podEvent.Obj.Namespace, "name", podEvent.Obj.Name)
if events := wk.onNewPod(podEvent.Obj); len(events) > 0 {
out <- events
}
Expand Down Expand Up @@ -184,8 +183,8 @@ func (wk *watcherKubeEnricher) onNewPod(pod *informer.ObjectMeta) []Event[proces
wk.mt.RLock()
defer wk.mt.RUnlock()
var events []Event[processAttrs]
for _, containerID := range pod.Pod.ContainerIds {
if procInfo, ok := wk.processByContainer[containerID]; ok {
for _, cnt := range pod.Pod.Containers {
if procInfo, ok := wk.processByContainer[cnt.Id]; ok {
events = append(events, Event[processAttrs]{
Type: EventCreated,
Obj: withMetadata(procInfo, pod),
Expand All @@ -198,11 +197,11 @@ func (wk *watcherKubeEnricher) onNewPod(pod *informer.ObjectMeta) []Event[proces
func (wk *watcherKubeEnricher) onDeletedPod(pod *informer.ObjectMeta) {
wk.mt.Lock()
defer wk.mt.Unlock()
for _, containerID := range pod.Pod.ContainerIds {
if pbc, ok := wk.processByContainer[containerID]; ok {
for _, cnt := range pod.Pod.Containers {
if pbc, ok := wk.processByContainer[cnt.Id]; ok {
delete(wk.containerByPID, pbc.pid)
}
delete(wk.processByContainer, containerID)
delete(wk.processByContainer, cnt.Id)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/discover/watcher_kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func deployPod(fInformer meta.Notifier, ns, name, containerID string, labels map
Name: name, Namespace: ns, Labels: labels,
Kind: "Pod",
Pod: &informer.PodInfo{
ContainerIds: []string{containerID},
Containers: []*informer.ContainerInfo{{Id: containerID}},
},
},
})
Expand All @@ -249,7 +249,7 @@ func deployOwnedPod(fInformer meta.Notifier, ns, name, replicaSetName, deploymen
Name: name, Namespace: ns,
Kind: "Pod",
Pod: &informer.PodInfo{
ContainerIds: []string{containerID},
Containers: []*informer.ContainerInfo{{Id: containerID}},
Owners: []*informer.Owner{{Name: replicaSetName, Kind: "ReplicaSet"},
{Name: deploymentName, Kind: "Deployment"}},
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/ebpf/tctracer/tctracer_notlinux.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ import (
ebpfcommon "github.com/grafana/beyla/pkg/internal/ebpf/common"
"github.com/grafana/beyla/pkg/internal/exec"
"github.com/grafana/beyla/pkg/internal/goexec"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/svc"
)

type Tracer struct{}

func New(_ *beyla.Config, _ imetrics.Reporter) *Tracer { return nil }
func New(_ *beyla.Config) *Tracer { return nil }
func (p *Tracer) AllowPID(_, _ uint32, _ *svc.ID) {}
func (p *Tracer) BlockPID(_, _ uint32) {}
func (p *Tracer) Load() (*ebpf.CollectionSpec, error) { return nil, nil }
Expand All @@ -40,3 +39,4 @@ func (p *Tracer) Constants() map[string]any { r
func (p *Tracer) SetupTC() {}
func (p *Tracer) SetupTailCalls() {}
func (p *Tracer) RegisterOffsets(_ *exec.FileInfo, _ *goexec.Offsets) {}
func (p *Tracer) AddModuleCloser(uint64, ...io.Closer) {}
89 changes: 89 additions & 0 deletions pkg/internal/kube/cache_svc_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package kube

import (
"context"
"fmt"
"log/slog"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/grafana/beyla-k8s-cache/pkg/informer"
"github.com/grafana/beyla-k8s-cache/pkg/meta"
)

func cslog() *slog.Logger {
return slog.With("component", "kube.CacheSvcClient")
}

type cacheSvcClient struct {
meta.BaseNotifier
address string
log *slog.Logger

waitForSubscription chan struct{}
}

func (sc *cacheSvcClient) Start(ctx context.Context) {
sc.log = cslog()
sc.waitForSubscription = make(chan struct{})
go func() {
select {
case <-ctx.Done():
sc.log.Debug("context done, stopping client")
return
case <-sc.waitForSubscription:
sc.log.Debug("subscriptor attached, start connection to K8s cache service")
}

for {
select {
case <-ctx.Done():
sc.log.Debug("context done, stopping client")
return
default:
// TODO: reconnection should include a timestamp
// with the last received event, to avoid unnecessarily
// receiving the whole metadata snapshot on each reconnection
err := sc.connect(ctx)
sc.log.Info("K8s cache service connection lost. Reconnecting...", "error", err)
// TODO: exponential backoff
time.Sleep(5 * time.Second)
}
}
}()
}

func (sc *cacheSvcClient) connect(ctx context.Context) error {
// Set up a connection to the server.
conn, err := grpc.NewClient(sc.address,
// TODO: allow configuring the transport credentials
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("did not connect: %w", err)
}
defer conn.Close()

client := informer.NewEventStreamServiceClient(conn)

// Subscribe to the event stream.
stream, err := client.Subscribe(ctx, &informer.SubscribeMessage{})
if err != nil {
return fmt.Errorf("could not subscribe: %w", err)
}

// Receive and print messages.
for {
event, err := stream.Recv()
if err != nil {
return fmt.Errorf("error receiving message: %w", err)
}
sc.BaseNotifier.Notify(event)
}
}

func (sc *cacheSvcClient) Subscribe(observer meta.Observer) {
sc.BaseNotifier.Subscribe(observer)
close(sc.waitForSubscription)
}
33 changes: 25 additions & 8 deletions pkg/internal/kube/informer_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ type MetadataConfig struct {
KubeConfigPath string
SyncTimeout time.Duration
ResyncPeriod time.Duration
MetaCacheAddr string
}

type MetadataProvider struct {
mt sync.Mutex

metadata *Store
informer *meta.Informers
informer meta.Notifier

cfg *MetadataConfig
}
Expand Down Expand Up @@ -117,14 +118,18 @@ func (mp *MetadataProvider) Get(ctx context.Context) (*Store, error) {
return mp.metadata, nil
}

func (mp *MetadataProvider) getInformer(ctx context.Context) (*meta.Informers, error) {
func (mp *MetadataProvider) getInformer(ctx context.Context) (meta.Notifier, error) {
if mp.informer != nil {
return mp.informer, nil
}
var err error
mp.informer, err = mp.initInformers(ctx)
if err != nil {
return nil, fmt.Errorf("can't get informer: %w", err)
if mp.cfg.MetaCacheAddr != "" {
mp.informer = mp.initRemoteInformerCacheClient(ctx)
} else {
var err error
mp.informer, err = mp.initLocalInformers(ctx)
if err != nil {
return nil, fmt.Errorf("can't get informer: %w", err)
}
}
return mp.informer, nil
}
Expand Down Expand Up @@ -153,12 +158,16 @@ func (mp *MetadataProvider) CurrentNodeName(ctx context.Context) (string, error)
FieldSelector: "metadata.name=" + currentPod,
})
if err != nil || len(pods.Items) == 0 {
return "", fmt.Errorf("can't get pod %s/%s: %w", currentNamespace, currentPod, err)
log.Debug("attention: can't get Pod info. This is expected if the pod is using the host network. Will use the"+
" host name as node name", "nodeName", currentPod, "namespace", currentNamespace, "error", err)
return currentPod, nil
}
return pods.Items[0].Spec.NodeName, nil
}

func (mp *MetadataProvider) initInformers(ctx context.Context) (*meta.Informers, error) {
// initLocalInformers initializes an informer client that directly connects to the Node Kube API
// for getting informer data
func (mp *MetadataProvider) initLocalInformers(ctx context.Context) (*meta.Informers, error) {
done := make(chan error)
var informers *meta.Informers
go func() {
Expand All @@ -184,6 +193,14 @@ func (mp *MetadataProvider) initInformers(ctx context.Context) (*meta.Informers,
return informers, nil
}

// initRemoteInformerCacheClient connects via gRPC/Protobuf to a remote beyla-k8s-cache service, to avoid that
// each Beyla instance connects to the Kube API informer on each node, which would overload the Kube API
func (mp *MetadataProvider) initRemoteInformerCacheClient(ctx context.Context) *cacheSvcClient {
client := &cacheSvcClient{address: mp.cfg.MetaCacheAddr, BaseNotifier: meta.NewBaseNotifier()}
client.Start(ctx)
return client
}

func loadKubeConfig(kubeConfigPath string) (*rest.Config, error) {
// if no config path is provided, load it from the env variable
if kubeConfigPath == "" {
Expand Down
18 changes: 9 additions & 9 deletions pkg/internal/kube/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ func (s *Store) updateNewObjectMetaByIPIndex(meta *informer.ObjectMeta) {
s.ipInfos[ip] = meta
}
if meta.Pod != nil {
s.log.Debug("adding pod to store",
"ips", meta.Ips, "pod", meta.Name, "namespace", meta.Namespace, "containerIDs", meta.Pod.ContainerIds)
for _, cid := range meta.Pod.ContainerIds {
s.podsByContainer[cid] = meta
s.log.Debug(
"adding pod to store", "ips", meta.Ips, "pod", meta.Name, "namespace", meta.Namespace)
for _, cnt := range meta.Pod.Containers {
s.podsByContainer[cnt.Id] = meta
// TODO: make sure we can handle when the containerIDs is set after this function is triggered
info, ok := s.containerIDs[cid]
info, ok := s.containerIDs[cnt.Id]
if ok {
s.namespaces[info.PIDNamespace] = info
}
Expand All @@ -137,11 +137,11 @@ func (s *Store) updateDeletedObjectMetaByIPIndex(meta *informer.ObjectMeta) {
}
if meta.Pod != nil {
s.log.Debug("deleting pod from store",
"ips", meta.Ips, "pod", meta.Name, "namespace", meta.Namespace, "containerIDs", meta.Pod.ContainerIds)
for _, cid := range meta.Pod.ContainerIds {
info, ok := s.containerIDs[cid]
"ips", meta.Ips, "pod", meta.Name, "namespace", meta.Namespace)
for _, cnt := range meta.Pod.Containers {
info, ok := s.containerIDs[cnt.Id]
if ok {
delete(s.containerIDs, cid)
delete(s.containerIDs, cnt.Id)
delete(s.namespaces, info.PIDNamespace)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/pipe/global/host_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@ func (ci *ContextInfo) kubeNodeFetcher(ctx context.Context, _ time.Duration) (st
}

func linuxLocalMachineIDFetcher(_ context.Context, _ time.Duration) (string, error) {
if result, err := os.ReadFile("/etc/machine-id"); err == nil || len(result) == 0 {
if result, err := os.ReadFile("/etc/machine-id"); err == nil && len(bytes.TrimSpace(result)) > 0 {
return string(bytes.TrimSpace(result)), nil
}

if result, err := os.ReadFile("/var/lib/dbus/machine-id"); err == nil || len(result) == 0 {
if result, err := os.ReadFile("/var/lib/dbus/machine-id"); err == nil && len(bytes.TrimSpace(result)) > 0 {
return string(bytes.TrimSpace(result)), nil
} else {
return "", fmt.Errorf("can't read host ID: %w", err)
Expand Down
4 changes: 4 additions & 0 deletions pkg/transform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type KubernetesDecorator struct {
// Pods informer can't be disabled. For that purpose, you should disable the whole
// kubernetes metadata decoration.
DisableInformers []string `yaml:"disable_informers" env:"BEYLA_KUBE_DISABLE_INFORMERS"`

// MetaCacheAddress is the host:port address of the beyla-k8s-cache service instance
MetaCacheAddress string `yaml:"meta_cache_address" env:"BEYLA_KUBE_META_CACHE_ADDRESS"`
}

const (
Expand Down Expand Up @@ -179,6 +182,7 @@ func OwnerLabelName(kind string) attr.Name {
func KubeClusterName(ctx context.Context, cfg *KubernetesDecorator) string {
log := klog().With("func", "KubeClusterName")
if cfg.ClusterName != "" {
log.Debug("using cluster name from configuration", "cluster_name", cfg.ClusterName)
return cfg.ClusterName
}
retries := 0
Expand Down
6 changes: 3 additions & 3 deletions pkg/transform/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestDecoration(t *testing.T) {
StartTimeStr: "2020-01-02 12:12:56",
Uid: "uid-12",
Owners: []*informer.Owner{{Kind: "Deployment", Name: "deployment-12"}},
ContainerIds: []string{"container-12"},
Containers: []*informer.ContainerInfo{{Id: "container-12"}},
},
}})
inf.Notify(&informer.Event{Type: informer.EventType_CREATED, Resource: &informer.ObjectMeta{
Expand All @@ -42,7 +42,7 @@ func TestDecoration(t *testing.T) {
StartTimeStr: "2020-01-02 12:34:56",
Uid: "uid-34",
Owners: []*informer.Owner{{Kind: "ReplicaSet", Name: "rs"}},
ContainerIds: []string{"container-34"},
Containers: []*informer.ContainerInfo{{Id: "container-34"}},
},
}})
inf.Notify(&informer.Event{Type: informer.EventType_CREATED, Resource: &informer.ObjectMeta{
Expand All @@ -51,7 +51,7 @@ func TestDecoration(t *testing.T) {
NodeName: "the-node",
Uid: "uid-56",
StartTimeStr: "2020-01-02 12:56:56",
ContainerIds: []string{"container-56"},
Containers: []*informer.ContainerInfo{{Id: "container-56"}},
},
}})
kube.InfoForPID = func(pid uint32) (container.Info, error) {
Expand Down
Loading

0 comments on commit 76d52ee

Please sign in to comment.