Skip to content

Commit

Permalink
Remote K8s meta service: wait for synchronization at startup (#1283)
Browse files Browse the repository at this point in the history
* Remote K8s meta service: wait for synchronization at startup

* More visible messages for synchronization

* Fix dockerfile of cache
  • Loading branch information
mariomac authored Oct 29, 2024
1 parent a112937 commit eed03fc
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish_dockerhub_main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
uses: grafana/shared-workflows/actions/build-push-to-dockerhub@main
with:
repository: grafana/beyla-k8s-cache
file: k8scache.DockerFile
file: k8scache.Dockerfile
context: .
# cache image layers from/to github actions internal cache, for faster building
cache-from: type=gha
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish_dockerhub_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
uses: grafana/shared-workflows/actions/build-push-to-dockerhub@main
with:
repository: grafana/beyla-k8s-cache
file: k8scache.DockerFile
file: k8scache.Dockerfile
context: .
# cache image layers from/to github actions internal cache, for faster building
cache-from: type=gha
Expand Down
23 changes: 22 additions & 1 deletion pkg/internal/kube/cache_svc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ type cacheSvcClient struct {
address string
log *slog.Logger

waitForSubscription chan struct{}
syncTimeout time.Duration
waitForSubscription chan struct{}
waitForSynchronization chan struct{}
}

func (sc *cacheSvcClient) Start(ctx context.Context) {
sc.log = cslog()
sc.waitForSubscription = make(chan struct{})
sc.waitForSynchronization = make(chan struct{})
go func() {
select {
case <-ctx.Done():
Expand All @@ -53,6 +56,17 @@ func (sc *cacheSvcClient) Start(ctx context.Context) {
}
}
}()
sc.log.Info("waiting for K8s metadata synchronization", "timeout", sc.syncTimeout)
select {
case <-sc.waitForSynchronization:
sc.log.Debug("K8s metadata cache service synchronized")
case <-ctx.Done():
sc.log.Debug("context done. Nothing to do")
case <-time.After(sc.syncTimeout):
sc.log.Warn("timed out while waiting for K8s metadata synchronization. Some metadata might be temporarily missing." +
" If this is expected due to the size of your cluster, you might want to increase the timeout via" +
" the BEYLA_KUBE_INFORMERS_SYNC_TIMEOUT configuration option")
}
}

func (sc *cacheSvcClient) connect(ctx context.Context) error {
Expand All @@ -73,12 +87,19 @@ func (sc *cacheSvcClient) connect(ctx context.Context) error {
return fmt.Errorf("could not subscribe: %w", err)
}

unsynced := true
// Receive and print messages.
for {
event, err := stream.Recv()
if err != nil {
return fmt.Errorf("error receiving message: %w", err)
}
// send a notification about the client being synced with the K8s metadata service
// so Beyla can start processing/decorating the received flows and traces
if event.GetType() == informer.EventType_SYNC_FINISHED && unsynced {
close(sc.waitForSynchronization)
unsynced = false
}
sc.BaseNotifier.Notify(event)
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/internal/kube/informer_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,11 @@ func (mp *MetadataProvider) initLocalInformers(ctx context.Context) (*meta.Infor
// initRemoteInformerCacheClient connects via gRPC/Protobuf to a remote beyla-k8s-cache service, to avoid that
// each Beyla instance connects to the Kube API informer on each node, which would overload the Kube API
func (mp *MetadataProvider) initRemoteInformerCacheClient(ctx context.Context) *cacheSvcClient {
client := &cacheSvcClient{address: mp.cfg.MetaCacheAddr, BaseNotifier: meta.NewBaseNotifier()}
client := &cacheSvcClient{
address: mp.cfg.MetaCacheAddr,
BaseNotifier: meta.NewBaseNotifier(),
syncTimeout: mp.cfg.SyncTimeout,
}
client.Start(ctx)
return client
}
Expand Down

0 comments on commit eed03fc

Please sign in to comment.