Skip to content

Commit

Permalink
Remote K8s meta service: wait for synchronization at startup
Browse files Browse the repository at this point in the history
  • Loading branch information
mariomac committed Oct 29, 2024
1 parent 4f7e880 commit 9c9ff95
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
21 changes: 20 additions & 1 deletion pkg/internal/kube/cache_svc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ type cacheSvcClient struct {
address string
log *slog.Logger

waitForSubscription chan struct{}
syncTimeout time.Duration
waitForSubscription chan struct{}
waitForSynchronization chan struct{}
}

func (sc *cacheSvcClient) Start(ctx context.Context) {
sc.log = cslog()
sc.waitForSubscription = make(chan struct{})
sc.waitForSynchronization = make(chan struct{})
go func() {
select {
case <-ctx.Done():
Expand All @@ -53,6 +56,15 @@ func (sc *cacheSvcClient) Start(ctx context.Context) {
}
}
}()
sc.log.Debug("waiting for K8s metadata synchronization")
select {
case <-sc.waitForSynchronization:
sc.log.Debug("K8s metadata cache service synchronized")
case <-ctx.Done():
sc.log.Debug("context done. Nothing to do")

Check warning on line 64 in pkg/internal/kube/cache_svc_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/internal/kube/cache_svc_client.go#L61-L64

Added lines #L61 - L64 were not covered by tests
case <-time.After(sc.syncTimeout):
sc.log.Warn("timeout waiting for K8s metadata synchronization. Some metadata might be temporarily missing")
}
}

func (sc *cacheSvcClient) connect(ctx context.Context) error {
Expand All @@ -73,12 +85,19 @@ func (sc *cacheSvcClient) connect(ctx context.Context) error {
return fmt.Errorf("could not subscribe: %w", err)
}

unsynced := true
// Receive and print messages.
for {
event, err := stream.Recv()
if err != nil {
return fmt.Errorf("error receiving message: %w", err)
}
// send a notification about the client being synced with the K8s metadata service
// so Beyla can start processing/decorating the received flows and traces
if event.GetType() == informer.EventType_SYNC_FINISHED && unsynced {
close(sc.waitForSynchronization)
unsynced = false
}
sc.BaseNotifier.Notify(event)
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/internal/kube/informer_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,11 @@ func (mp *MetadataProvider) initLocalInformers(ctx context.Context) (*meta.Infor
// initRemoteInformerCacheClient connects via gRPC/Protobuf to a remote beyla-k8s-cache service, to avoid that
// each Beyla instance connects to the Kube API informer on each node, which would overload the Kube API
func (mp *MetadataProvider) initRemoteInformerCacheClient(ctx context.Context) *cacheSvcClient {
client := &cacheSvcClient{address: mp.cfg.MetaCacheAddr, BaseNotifier: meta.NewBaseNotifier()}
client := &cacheSvcClient{
address: mp.cfg.MetaCacheAddr,
BaseNotifier: meta.NewBaseNotifier(),
syncTimeout: mp.cfg.SyncTimeout,
}
client.Start(ctx)
return client
}
Expand Down

0 comments on commit 9c9ff95

Please sign in to comment.