Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flow: change clustering interface to be a call to NotifyClusterChange #4790

Merged
merged 4 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,10 @@ type DebugComponent interface {
type ClusteredComponent interface {
Component

ClusterUpdatesRegistration() bool
// NotifyClusterChange notifies the component that the state of the cluster
// has changed.
//
// Implementations of ClusteredComponent should ignore calls to this method
// if they are configured to not utilize clustering.
NotifyClusterChange()
}
25 changes: 17 additions & 8 deletions component/prometheus/operator/common/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type Component struct {
mut sync.Mutex
mut sync.RWMutex
config *operator.Arguments
manager *crdManager

Expand Down Expand Up @@ -106,18 +106,27 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

// NotifyClusterChange implements component.ClusterComponent.
func (c *Component) NotifyClusterChange() {
c.mut.RLock()
defer c.mut.RUnlock()

if !c.config.Clustering.Enabled {
return // no-op
}

// Schedule a reload so targets get redistributed.
select {
case c.onUpdate <- struct{}{}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a case where this change can probably simplify things a lot. Trying to differentiate a clustering update vs an argument update was a real pain. We can probably not send on this channel, and just call c.manager.ClusteringUpdated() directly, and simplify the logic in Run by a bit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we defer that to a separate PR? I want to minimize the impact of changes as much as possible here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. I can make that change after.

default:
}
}

// DebugInfo returns debug information for this component.
func (c *Component) DebugInfo() interface{} {
return c.manager.DebugInfo()
}

// ClusterUpdatesRegistration implements component.ClusterComponent.
func (c *Component) ClusterUpdatesRegistration() bool {
c.mut.Lock()
defer c.mut.Unlock()
return c.config.Clustering.Enabled
}

func (c *Component) reportHealth(err error) {
c.healthMut.Lock()
defer c.healthMut.Unlock()
Expand Down
23 changes: 16 additions & 7 deletions component/prometheus/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,22 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

// NotifyClusterChange implements component.ClusterComponent.
func (c *Component) NotifyClusterChange() {
c.mut.RLock()
defer c.mut.RUnlock()

if !c.args.Clustering.Enabled {
return // no-op
}

// Schedule a reload so targets get redistributed.
select {
case c.reloadTargets <- struct{}{}:
default:
}
}

// Helper function to bridge the in-house configuration with the Prometheus
// scrape_config.
// As explained in the Config struct, the following fields are purposefully
Expand Down Expand Up @@ -324,13 +340,6 @@ func (c *Component) DebugInfo() interface{} {
}
}

// ClusterUpdatesRegistration implements component.ClusterComponent.
func (c *Component) ClusterUpdatesRegistration() bool {
c.mut.RLock()
defer c.mut.RUnlock()
return c.args.Clustering.Enabled
}

func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group {
promGroup := &targetgroup.Group{Source: jobName}
for _, tg := range tgs {
Expand Down
23 changes: 16 additions & 7 deletions component/pyroscope/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,22 @@ func (c *Component) Update(args component.Arguments) error {
return nil
}

// NotifyClusterChange implements component.ClusterComponent.
func (c *Component) NotifyClusterChange() {
c.mut.RLock()
defer c.mut.RUnlock()

if !c.args.Clustering.Enabled {
return // no-op
}

// Schedule a reload so targets get redistributed.
select {
case c.reloadTargets <- struct{}{}:
default:
}
}

func (c *Component) componentTargetsToProm(jobName string, tgs []discovery.Target) map[string][]*targetgroup.Group {
promGroup := &targetgroup.Group{Source: jobName}
for _, tg := range tgs {
Expand All @@ -337,13 +353,6 @@ func convertLabelSet(tg discovery.Target) model.LabelSet {
return lset
}

// ClusterUpdatesRegistration implements component.ClusterComponent.
func (c *Component) ClusterUpdatesRegistration() bool {
c.mut.RLock()
defer c.mut.RUnlock()
return c.args.Clustering.Enabled
}

// DebugInfo implements component.DebugComponent.
func (c *Component) DebugInfo() interface{} {
var res []scrape.TargetStatus
Expand Down
16 changes: 6 additions & 10 deletions pkg/flow/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,12 @@ func NewLoader(opts LoaderOptions) *Loader {
defer span.End()
for _, cmp := range l.Components() {
if cc, ok := cmp.managed.(component.ClusteredComponent); ok {
if cc.ClusterUpdatesRegistration() {
_, span := tracer.Start(spanCtx, "ClusteredComponentReevaluation", trace.WithSpanKind(trace.SpanKindInternal))
span.SetAttributes(attribute.String("node_id", cmp.NodeID()))
defer span.End()

err := cmp.Reevaluate()
if err != nil {
level.Error(l.log).Log("msg", "failed to reevaluate component", "componentID", cmp.NodeID(), "err", err)
}
}
_, span := tracer.Start(spanCtx, "NotifyClusterChange", trace.WithSpanKind(trace.SpanKindInternal))
span.SetAttributes(attribute.String("node_id", cmp.NodeID()))

cc.NotifyClusterChange()

span.End()
}
}
return true
Expand Down
31 changes: 0 additions & 31 deletions pkg/flow/internal/controller/node_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,37 +265,6 @@ func (cn *ComponentNode) Evaluate(scope *vm.Scope) error {
return err
}

// Reevaluate calls Update on the managed component with its last used
// arguments.Reevaluate does not build the component if it is not already built
// and does not re-evaluate the River block itself.
// Its only use case is for components opting-in to clustering where calling
// Update with the same Arguments may result in different functionality.
func (cn *ComponentNode) Reevaluate() error {
cn.mut.Lock()
defer cn.mut.Unlock()

cn.doingEval.Store(true)
defer cn.doingEval.Store(false)

if cn.managed == nil {
// We haven't built the managed component successfully yet.
return nil
}

// Update the existing managed component with the same arguments.
err := cn.managed.Update(cn.args)

switch err {
case nil:
cn.setEvalHealth(component.HealthTypeHealthy, "component evaluated")
return nil
default:
msg := fmt.Sprintf("component evaluation failed: %s", err)
cn.setEvalHealth(component.HealthTypeUnhealthy, msg)
return err
}
}

func (cn *ComponentNode) evaluate(scope *vm.Scope) error {
cn.mut.Lock()
defer cn.mut.Unlock()
Expand Down