diff --git a/pkg/internal/kube/cache_svc_client.go b/pkg/internal/kube/cache_svc_client.go index d9b7978d2..e1c436c9a 100644 --- a/pkg/internal/kube/cache_svc_client.go +++ b/pkg/internal/kube/cache_svc_client.go @@ -22,12 +22,15 @@ type cacheSvcClient struct { address string log *slog.Logger - waitForSubscription chan struct{} + syncTimeout time.Duration + waitForSubscription chan struct{} + waitForSynchronization chan struct{} } func (sc *cacheSvcClient) Start(ctx context.Context) { sc.log = cslog() sc.waitForSubscription = make(chan struct{}) + sc.waitForSynchronization = make(chan struct{}) go func() { select { case <-ctx.Done(): @@ -53,6 +56,15 @@ func (sc *cacheSvcClient) Start(ctx context.Context) { } } }() + sc.log.Debug("waiting for K8s metadata synchronization") + select { + case <-sc.waitForSynchronization: + sc.log.Debug("K8s metadata cache service synchronized") + case <-ctx.Done(): + sc.log.Debug("context done. Nothing to do") + case <-time.After(sc.syncTimeout): + sc.log.Warn("timeout waiting for K8s metadata synchronization. Some metadata might be temporarily missing") + } } func (sc *cacheSvcClient) connect(ctx context.Context) error { @@ -73,12 +85,19 @@ func (sc *cacheSvcClient) connect(ctx context.Context) error { return fmt.Errorf("could not subscribe: %w", err) } + unsynced := true // Receive and print messages. for { event, err := stream.Recv() if err != nil { return fmt.Errorf("error receiving message: %w", err) } + // send a notification about the client being synced with the K8s metadata service + // so Beyla can start processing/decorating the received flows and traces + if event.GetType() == informer.EventType_SYNC_FINISHED && unsynced { + close(sc.waitForSynchronization) + unsynced = false + } sc.BaseNotifier.Notify(event) } } diff --git a/pkg/internal/kube/informer_provider.go b/pkg/internal/kube/informer_provider.go index 7f2c2504e..d7bbb0802 100644 --- a/pkg/internal/kube/informer_provider.go +++ b/pkg/internal/kube/informer_provider.go @@ -195,7 +195,11 @@ func (mp *MetadataProvider) initLocalInformers(ctx context.Context) (*meta.Infor // initRemoteInformerCacheClient connects via gRPC/Protobuf to a remote beyla-k8s-cache service, to avoid that // each Beyla instance connects to the Kube API informer on each node, which would overload the Kube API func (mp *MetadataProvider) initRemoteInformerCacheClient(ctx context.Context) *cacheSvcClient { - client := &cacheSvcClient{address: mp.cfg.MetaCacheAddr, BaseNotifier: meta.NewBaseNotifier()} + client := &cacheSvcClient{ + address: mp.cfg.MetaCacheAddr, + BaseNotifier: meta.NewBaseNotifier(), + syncTimeout: mp.cfg.SyncTimeout, + } client.Start(ctx) return client }