Skip to content

Commit

Permalink
flow: change clustering interface to be a call to NotifyClusterChange (
Browse files Browse the repository at this point in the history
…#4790)

* flow: change clustering interface to be a call to NotifyClusterChange

Previously, components would opt in to clustering notifications through
a call to ClusterUpdatesRegistration. Then, if that method returned
true, they would be re-evaluated.

In preparation for the move to services, the clustering service would
not be able to function the same way, as services are not permitted to
trigger a re-evaluation of a component.

The new interface is a NotifyClusterChange method, which allows
components to opt-in to whether they want to ignore that signal or not.

This is not reflected in the CHANGELOG as it is a no-op for users.

* flow: remove unused Reevaluate function in ComponentNode

* component: add more documentation around NotifyClusterChange

* prometheus.operator.*: change Mutex to a RWMutex
  • Loading branch information
rfratto authored Aug 14, 2023
1 parent 4e62b55 commit d1c5a50
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 64 deletions.
7 changes: 6 additions & 1 deletion component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,10 @@ type DebugComponent interface {
type ClusteredComponent interface {
Component

ClusterUpdatesRegistration() bool
// NotifyClusterChange notifies the component that the state of the cluster
// has changed.
//
// Implementations of ClusteredComponent should ignore calls to this method
// if they are configured to not utilize clustering.
NotifyClusterChange()
}
25 changes: 17 additions & 8 deletions component/prometheus/operator/common/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type Component struct {
mut sync.Mutex
mut sync.RWMutex
config *operator.Arguments
manager *crdManager

Expand Down Expand Up @@ -106,18 +106,27 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

// NotifyClusterChange implements component.ClusterComponent.
func (c *Component) NotifyClusterChange() {
c.mut.RLock()
defer c.mut.RUnlock()

if !c.config.Clustering.Enabled {
return // no-op
}

// Schedule a reload so targets get redistributed.
select {
case c.onUpdate <- struct{}{}:
default:
}
}

// DebugInfo returns debug information for this component.
func (c *Component) DebugInfo() interface{} {
return c.manager.DebugInfo()
}

// ClusterUpdatesRegistration implements component.ClusterComponent.
func (c *Component) ClusterUpdatesRegistration() bool {
c.mut.Lock()
defer c.mut.Unlock()
return c.config.Clustering.Enabled
}

func (c *Component) reportHealth(err error) {
c.healthMut.Lock()
defer c.healthMut.Unlock()
Expand Down
23 changes: 16 additions & 7 deletions component/prometheus/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,22 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

// NotifyClusterChange implements component.ClusterComponent.
func (c *Component) NotifyClusterChange() {
c.mut.RLock()
defer c.mut.RUnlock()

if !c.args.Clustering.Enabled {
return // no-op
}

// Schedule a reload so targets get redistributed.
select {
case c.reloadTargets <- struct{}{}:
default:
}
}

// Helper function to bridge the in-house configuration with the Prometheus
// scrape_config.
// As explained in the Config struct, the following fields are purposefully
Expand Down Expand Up @@ -324,13 +340,6 @@ func (c *Component) DebugInfo() interface{} {
}
}

// ClusterUpdatesRegistration implements component.ClusterComponent.
func (c *Component) ClusterUpdatesRegistration() bool {
c.mut.RLock()
defer c.mut.RUnlock()
return c.args.Clustering.Enabled
}

func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group {
promGroup := &targetgroup.Group{Source: jobName}
for _, tg := range tgs {
Expand Down
23 changes: 16 additions & 7 deletions component/pyroscope/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,22 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

// NotifyClusterChange implements component.ClusterComponent.
func (c *Component) NotifyClusterChange() {
c.mut.RLock()
defer c.mut.RUnlock()

if !c.args.Clustering.Enabled {
return // no-op
}

// Schedule a reload so targets get redistributed.
select {
case c.reloadTargets <- struct{}{}:
default:
}
}

func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group {
promGroup := &targetgroup.Group{Source: jobName}
for _, tg := range tgs {
Expand All @@ -337,13 +353,6 @@ func convertLabelSet(tg discovery.Target) model.LabelSet {
return lset
}

// ClusterUpdatesRegistration implements component.ClusterComponent.
func (c *Component) ClusterUpdatesRegistration() bool {
c.mut.RLock()
defer c.mut.RUnlock()
return c.args.Clustering.Enabled
}

// DebugInfo implements component.DebugComponent.
func (c *Component) DebugInfo() interface{} {
var res []scrape.TargetStatus
Expand Down
16 changes: 6 additions & 10 deletions pkg/flow/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,12 @@ func NewLoader(opts LoaderOptions) *Loader {
defer span.End()
for _, cmp := range l.Components() {
if cc, ok := cmp.managed.(component.ClusteredComponent); ok {
if cc.ClusterUpdatesRegistration() {
_, span := tracer.Start(spanCtx, "ClusteredComponentReevaluation", trace.WithSpanKind(trace.SpanKindInternal))
span.SetAttributes(attribute.String("node_id", cmp.NodeID()))
defer span.End()

err := cmp.Reevaluate()
if err != nil {
level.Error(l.log).Log("msg", "failed to reevaluate component", "componentID", cmp.NodeID(), "err", err)
}
}
_, span := tracer.Start(spanCtx, "NotifyClusterChange", trace.WithSpanKind(trace.SpanKindInternal))
span.SetAttributes(attribute.String("node_id", cmp.NodeID()))

cc.NotifyClusterChange()

span.End()
}
}
return true
Expand Down
31 changes: 0 additions & 31 deletions pkg/flow/internal/controller/node_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,37 +265,6 @@ func (cn *ComponentNode) Evaluate(scope *vm.Scope) error {
return err
}

// Reevaluate calls Update on the managed component with its last used
// arguments.Reevaluate does not build the component if it is not already built
// and does not re-evaluate the River block itself.
// Its only use case is for components opting-in to clustering where calling
// Update with the same Arguments may result in different functionality.
func (cn *ComponentNode) Reevaluate() error {
cn.mut.Lock()
defer cn.mut.Unlock()

cn.doingEval.Store(true)
defer cn.doingEval.Store(false)

if cn.managed == nil {
// We haven't built the managed component successfully yet.
return nil
}

// Update the existing managed component with the same arguments.
err := cn.managed.Update(cn.args)

switch err {
case nil:
cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated")
return nil
default:
msg := fmt.Sprintf("component evaluation failed: %s", err)
cn.setEvalHealth(component.HealthTypeUnhealthy, msg)
return err
}
}

func (cn *ComponentNode) evaluate(scope *vm.Scope) error {
cn.mut.Lock()
defer cn.mut.Unlock()
Expand Down

0 comments on commit d1c5a50

Please sign in to comment.