Skip to content

Commit

Permalink
[operator] Allow Informers to be dynamically added and removed from I…
Browse files Browse the repository at this point in the history
…nformerController (#521)

Allow code to dynamically add or remove informers from
`operator.InformerController` while it is running. To do this,
`app.DynamicMultiRunner` has been introduced, which is a `app.Runnable`
which runs multiple `app.Runnable` instances and allows them to be
dynamically added or removed.

Relates to #454
  • Loading branch information
IfSentient authored Dec 9, 2024
1 parent 02062c6 commit 7ed1693
Show file tree
Hide file tree
Showing 3 changed files with 388 additions and 11 deletions.
162 changes: 159 additions & 3 deletions app/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/grafana/grafana-app-sdk/metrics"
)

var ErrRunnerExitTimeout = fmt.Errorf("exit wait time exceeded waiting for Runners to complete")

var RunnableCollectorDefaultErrorHandler = func(ctx context.Context, err error) bool {
logging.FromContext(ctx).Error("runner exited with error", "error", err)
return true
Expand Down Expand Up @@ -47,12 +49,13 @@ func (m *MultiRunner) Run(ctx context.Context) error {
errs := make(chan error, len(m.Runners))
defer close(errs)
wg := &sync.WaitGroup{}
timedOut := false
for _, runner := range m.Runners {
wg.Add(1)
go func(r Runnable) {
err := r.Run(propagatedContext)
wg.Done()
if err != nil {
if err != nil && !timedOut {
errs <- err
}
}(runner)
Expand All @@ -68,7 +71,8 @@ func (m *MultiRunner) Run(ctx context.Context) error {
cancel()
if m.ExitWait != nil {
if waitOrTimeout(wg, *m.ExitWait) {
return fmt.Errorf("exit wait time exceeded waiting for Runners to complete: %w", err)
timedOut = true
return errors.Join(ErrRunnerExitTimeout, err)
}
} else {
wg.Wait() // Wait for all the runners to stop
Expand All @@ -79,7 +83,8 @@ func (m *MultiRunner) Run(ctx context.Context) error {
cancel()
if m.ExitWait != nil {
if waitOrTimeout(wg, *m.ExitWait) {
return fmt.Errorf("exit wait time exceeded waiting for Runners to complete")
timedOut = true
return ErrRunnerExitTimeout
}
} else {
wg.Wait() // Wait for all the runners to stop
Expand Down Expand Up @@ -196,3 +201,154 @@ func (s *SingletonRunner) PrometheusCollectors() []prometheus.Collector {
}
return nil
}

type dynamicMultiRunnerTuple struct {
runner Runnable
cancelFunc context.CancelFunc
mainTimedOut bool
}

// DynamicMultiRunner is a MultiRunner that allows for adding and removing Runnable instances after Run is called.
// Only one concurrent Run call is allowed at a time.
type DynamicMultiRunner struct {
// ErrorHandler is called if one of the Runners returns an error. If the function call returns true,
// the context will be canceled and all other Runners will also be prompted to exit.
// If ErrorHandler is nil, RunnableCollectorDefaultErrorHandler is used.
ErrorHandler func(context.Context, error) bool
// ExitWait is how long to wait for Runners to exit after ErrorHandler returns true or the context is canceled
// before stopping execution and returning a timeout error instead of exiting gracefully.
// If ExitWait is nil, Run execution will always block until all Runners have exited.
ExitWait *time.Duration
runners []*dynamicMultiRunnerTuple
running bool
runMux sync.Mutex
runCtx context.Context
errs chan error
wg *sync.WaitGroup
}

// NewDynamicMultiRunner creates a new properly-initialized DynamicMultiRunner.
func NewDynamicMultiRunner() *DynamicMultiRunner {
return &DynamicMultiRunner{
ErrorHandler: RunnableCollectorDefaultErrorHandler,
runners: make([]*dynamicMultiRunnerTuple, 0),
}
}

// Run runs all the current runners, and will dynamically run any runners added with AddRunnable.
func (d *DynamicMultiRunner) Run(ctx context.Context) error {
d.runMux.Lock()
if d.running {
d.runMux.Unlock()
return fmt.Errorf("already running")
}
d.running = true
d.errs = make(chan error)
defer close(d.errs)
d.wg = &sync.WaitGroup{}
var cancel context.CancelFunc
d.runCtx, cancel = context.WithCancel(ctx)
for idx := range d.runners {
d.runners[idx].mainTimedOut = false // Reset this in case we're in a new Run() after a timeout on the previous
d.runTuple(d.runners[idx])
}
d.runMux.Unlock()
for {
select {
case err := <-d.errs:
handler := d.ErrorHandler
if handler == nil {
handler = RunnableCollectorDefaultErrorHandler
}
if handler(d.runCtx, err) {
cancel()
if d.ExitWait != nil {
if waitOrTimeout(d.wg, *d.ExitWait) {
d.setTimedOut(true)
return errors.Join(ErrRunnerExitTimeout, err)
}
} else {
d.wg.Wait() // Wait for all the runners to stop
}
return err
}
case <-ctx.Done():
cancel()
if d.ExitWait != nil {
if waitOrTimeout(d.wg, *d.ExitWait) {
d.setTimedOut(true)
return ErrRunnerExitTimeout
}
} else {
d.wg.Wait() // Wait for all the runners to stop
}
return nil
}
}
}

func (d *DynamicMultiRunner) setTimedOut(val bool) {
d.runMux.Lock()
defer d.runMux.Unlock()
for idx := range d.runners {
d.runners[idx].mainTimedOut = val
}
}

// AddRunnable adds the provided Runnable to the list of runners which gets run by Run.
// If the DynamicMultiRunner is already running, the Runnable will be started immediately.
func (d *DynamicMultiRunner) AddRunnable(runnable Runnable) {
d.runMux.Lock()
defer d.runMux.Unlock()
tpl := &dynamicMultiRunnerTuple{
runner: runnable,
}
if d.running {
d.runTuple(tpl)
}
d.runners = append(d.runners, tpl)
}

// RemoveRunnable removes the provided Runnable from the list of runners, provided that it exists in the current list.
// If the DynamicMultiRunner is already running, the context provided to the Runnable's Run method will be canceled.
func (d *DynamicMultiRunner) RemoveRunnable(runnable Runnable) {
d.runMux.Lock()
defer d.runMux.Unlock()
for i, tpl := range d.runners {
if tpl.runner == runnable {
if d.running && tpl.cancelFunc != nil {
tpl.cancelFunc()
}
if len(d.runners) > i+1 {
d.runners = append(d.runners[:i], d.runners[i+1:]...)
} else {
d.runners = d.runners[:i]
}
}
}
}

// PrometheusCollectors implements metrics.Provider by returning prometheus collectors for all Runners that also
// implement metrics.Provider.
func (d *DynamicMultiRunner) PrometheusCollectors() []prometheus.Collector {
collectors := make([]prometheus.Collector, 0)
for _, runner := range d.runners {
if cast, ok := runner.runner.(metrics.Provider); ok {
collectors = append(collectors, cast.PrometheusCollectors()...)
}
}
return collectors
}

func (d *DynamicMultiRunner) runTuple(tpl *dynamicMultiRunnerTuple) {
d.wg.Add(1)
ctx, cancel := context.WithCancel(d.runCtx)
tpl.cancelFunc = cancel
go func() {
err := tpl.runner.Run(ctx)
d.wg.Done()
if err != nil && !tpl.mainTimedOut { // Only send the error if main isn't timed out (otherwise the channel is closed)
d.errs <- err
}
}()
}
Loading

0 comments on commit 7ed1693

Please sign in to comment.