From 59848e3f8ef9ad763fc12ce970a943b9d8617215 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Mon, 14 Aug 2023 13:19:44 -0400 Subject: [PATCH] flow: change clustering interface to be a call to NotifyClusterChange (#4790) * flow: change clustering interface to be a call to NotifyClusterChange Previously, components would opt in to clustering notifications through a call to ClusterUpdatesRegistration. Then, if that method returned true, they would be re-evaluated. In preparation for the move to services, the clustering service would not be able to function the same way, as services are not permitted to trigger a re-evaluation of a component. The new interface is a NotifyClusterChange method, which allows components to opt-in to whether they want to ignore that signal or not. This is not reflected in the CHANGELOG as it is a no-op for users. * flow: remove unused Reevaluate function in ComponentNode * component: add more documentation around NotifyClusterChange * prometheus.operator.*: change Mutex to a RWMutex --- component/component.go | 7 ++++- .../prometheus/operator/common/component.go | 25 ++++++++++----- component/prometheus/scrape/scrape.go | 23 +++++++++----- component/pyroscope/scrape/scrape.go | 23 +++++++++----- pkg/flow/internal/controller/loader.go | 16 ++++------ .../internal/controller/node_component.go | 31 ------------------- 6 files changed, 61 insertions(+), 64 deletions(-) diff --git a/component/component.go b/component/component.go index 5cbe74ba2623..4709ab985388 100644 --- a/component/component.go +++ b/component/component.go @@ -119,5 +119,10 @@ type DebugComponent interface { type ClusteredComponent interface { Component - ClusterUpdatesRegistration() bool + // NotifyClusterChange notifies the component that the state of the cluster + // has changed. + // + // Implementations of ClusteredComponent should ignore calls to this method + // if they are configured to not utilize clustering. + NotifyClusterChange() } diff --git a/component/prometheus/operator/common/component.go b/component/prometheus/operator/common/component.go index 9fad0dd0e481..d174b01aa8a3 100644 --- a/component/prometheus/operator/common/component.go +++ b/component/prometheus/operator/common/component.go @@ -11,7 +11,7 @@ import ( ) type Component struct { - mut sync.Mutex + mut sync.RWMutex config *operator.Arguments manager *crdManager @@ -106,18 +106,27 @@ func (c *Component) Update(args component.Arguments) error { return nil } +// NotifyClusterChange implements component.ClusterComponent. +func (c *Component) NotifyClusterChange() { + c.mut.RLock() + defer c.mut.RUnlock() + + if !c.config.Clustering.Enabled { + return // no-op + } + + // Schedule a reload so targets get redistributed. + select { + case c.onUpdate <- struct{}{}: + default: + } +} + // DebugInfo returns debug information for this component. func (c *Component) DebugInfo() interface{} { return c.manager.DebugInfo() } -// ClusterUpdatesRegistration implements component.ClusterComponent. -func (c *Component) ClusterUpdatesRegistration() bool { - c.mut.Lock() - defer c.mut.Unlock() - return c.config.Clustering.Enabled -} - func (c *Component) reportHealth(err error) { c.healthMut.Lock() defer c.healthMut.Unlock() diff --git a/component/prometheus/scrape/scrape.go b/component/prometheus/scrape/scrape.go index 65821134e211..3f3535229e60 100644 --- a/component/prometheus/scrape/scrape.go +++ b/component/prometheus/scrape/scrape.go @@ -242,6 +242,22 @@ func (c *Component) Update(args component.Arguments) error { return nil } +// NotifyClusterChange implements component.ClusterComponent. +func (c *Component) NotifyClusterChange() { + c.mut.RLock() + defer c.mut.RUnlock() + + if !c.args.Clustering.Enabled { + return // no-op + } + + // Schedule a reload so targets get redistributed. + select { + case c.reloadTargets <- struct{}{}: + default: + } +} + // Helper function to bridge the in-house configuration with the Prometheus // scrape_config. // As explained in the Config struct, the following fields are purposefully @@ -324,13 +340,6 @@ func (c *Component) DebugInfo() interface{} { } } -// ClusterUpdatesRegistration implements component.ClusterComponent. -func (c *Component) ClusterUpdatesRegistration() bool { - c.mut.RLock() - defer c.mut.RUnlock() - return c.args.Clustering.Enabled -} - func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group { promGroup := &targetgroup.Group{Source: jobName} for _, tg := range tgs { diff --git a/component/pyroscope/scrape/scrape.go b/component/pyroscope/scrape/scrape.go index de1d9517c694..6c213797fefc 100644 --- a/component/pyroscope/scrape/scrape.go +++ b/component/pyroscope/scrape/scrape.go @@ -320,6 +320,22 @@ func (c *Component) Update(args component.Arguments) error { return nil } +// NotifyClusterChange implements component.ClusterComponent. +func (c *Component) NotifyClusterChange() { + c.mut.RLock() + defer c.mut.RUnlock() + + if !c.args.Clustering.Enabled { + return // no-op + } + + // Schedule a reload so targets get redistributed. + select { + case c.reloadTargets <- struct{}{}: + default: + } +} + func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group { promGroup := &targetgroup.Group{Source: jobName} for _, tg := range tgs { @@ -337,13 +353,6 @@ func convertLabelSet(tg discovery.Target) model.LabelSet { return lset } -// ClusterUpdatesRegistration implements component.ClusterComponent. -func (c *Component) ClusterUpdatesRegistration() bool { - c.mut.RLock() - defer c.mut.RUnlock() - return c.args.Clustering.Enabled -} - // DebugInfo implements component.DebugComponent. func (c *Component) DebugInfo() interface{} { var res []scrape.TargetStatus diff --git a/pkg/flow/internal/controller/loader.go b/pkg/flow/internal/controller/loader.go index 9da91733c85c..2e1fb25a824d 100644 --- a/pkg/flow/internal/controller/loader.go +++ b/pkg/flow/internal/controller/loader.go @@ -96,16 +96,12 @@ func NewLoader(opts LoaderOptions) *Loader { defer span.End() for _, cmp := range l.Components() { if cc, ok := cmp.managed.(component.ClusteredComponent); ok { - if cc.ClusterUpdatesRegistration() { - _, span := tracer.Start(spanCtx, "ClusteredComponentReevaluation", trace.WithSpanKind(trace.SpanKindInternal)) - span.SetAttributes(attribute.String("node_id", cmp.NodeID())) - defer span.End() - - err := cmp.Reevaluate() - if err != nil { - level.Error(l.log).Log("msg", "failed to reevaluate component", "componentID", cmp.NodeID(), "err", err) - } - } + _, span := tracer.Start(spanCtx, "NotifyClusterChange", trace.WithSpanKind(trace.SpanKindInternal)) + span.SetAttributes(attribute.String("node_id", cmp.NodeID())) + + cc.NotifyClusterChange() + + span.End() } } return true diff --git a/pkg/flow/internal/controller/node_component.go b/pkg/flow/internal/controller/node_component.go index 9c911c30792e..c5363942abb9 100644 --- a/pkg/flow/internal/controller/node_component.go +++ b/pkg/flow/internal/controller/node_component.go @@ -265,37 +265,6 @@ func (cn *ComponentNode) Evaluate(scope *vm.Scope) error { return err } -// Reevaluate calls Update on the managed component with its last used -// arguments.Reevaluate does not build the component if it is not already built -// and does not re-evaluate the River block itself. -// Its only use case is for components opting-in to clustering where calling -// Update with the same Arguments may result in different functionality. -func (cn *ComponentNode) Reevaluate() error { - cn.mut.Lock() - defer cn.mut.Unlock() - - cn.doingEval.Store(true) - defer cn.doingEval.Store(false) - - if cn.managed == nil { - // We haven't built the managed component successfully yet. - return nil - } - - // Update the existing managed component with the same arguments. - err := cn.managed.Update(cn.args) - - switch err { - case nil: - cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated") - return nil - default: - msg := fmt.Sprintf("component evaluation failed: %s", err) - cn.setEvalHealth(component.HealthTypeUnhealthy, msg) - return err - } -} - func (cn *ComponentNode) evaluate(scope *vm.Scope) error { cn.mut.Lock() defer cn.mut.Unlock()