Skip to content

Commit

Permalink
Log error on failed component Run (#1286)
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr authored Jul 16, 2024
1 parent 0a6bf2b commit 16814cf
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 37 deletions.
7 changes: 4 additions & 3 deletions internal/runtime/alloy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/internal/controller"
"github.com/grafana/alloy/internal/runtime/internal/worker"
"github.com/grafana/alloy/internal/runtime/logging"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/runtime/tracing"
"github.com/grafana/alloy/internal/service"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
)

// Options holds static options for an Alloy controller.
Expand Down Expand Up @@ -180,7 +181,7 @@ func newController(o controllerOptions) *Runtime {
opts: o,

updateQueue: controller.NewQueue(),
sched: controller.NewScheduler(),
sched: controller.NewScheduler(log),

modules: o.ModuleRegistry,

Expand Down
17 changes: 6 additions & 11 deletions internal/runtime/internal/controller/node_builtin_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/runtime/tracing"
"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/vm"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
)

// ComponentID is a fully-qualified name of a component. Each element in
Expand Down Expand Up @@ -316,17 +316,12 @@ func (cn *BuiltinComponentNode) Run(ctx context.Context) error {
cn.setRunHealth(component.HealthTypeHealthy, "started component")
err := cn.managed.Run(ctx)

var exitMsg string
logger := cn.managedOpts.Logger
// Note: logging of this error is handled by the scheduler.
if err != nil {
level.Error(logger).Log("msg", "component exited with error", "err", err)
exitMsg = fmt.Sprintf("component shut down with error: %s", err)
cn.setRunHealth(component.HealthTypeExited, fmt.Sprintf("component shut down with error: %s", err))
} else {
level.Info(logger).Log("msg", "component exited")
exitMsg = "component shut down normally"
cn.setRunHealth(component.HealthTypeExited, "component shut down cleanly")
}

cn.setRunHealth(component.HealthTypeExited, exitMsg)
return err
}

Expand Down
12 changes: 5 additions & 7 deletions internal/runtime/internal/controller/node_config_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"go.uber.org/atomic"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/runner"
"github.com/grafana/alloy/internal/runtime/internal/importsource"
Expand All @@ -22,7 +24,6 @@ import (
"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/parser"
"github.com/grafana/alloy/syntax/vm"
"github.com/prometheus/client_golang/prometheus"
)

// ImportConfigNode imports declare and import blocks via a managed import source.
Expand Down Expand Up @@ -365,15 +366,12 @@ func (cn *ImportConfigNode) Run(ctx context.Context) error {

err = cn.run(errChan, updateTasks)

var exitMsg string
// Note: logging of this error is handled by the scheduler.
if err != nil {
level.Error(cn.logger).Log("msg", "import exited with error", "err", err)
exitMsg = fmt.Sprintf("import shut down with error: %s", err)
cn.setRunHealth(component.HealthTypeExited, fmt.Sprintf("import shut down with error: %s", err))
} else {
level.Info(cn.logger).Log("msg", "import exited")
exitMsg = "import shut down normally"
cn.setRunHealth(component.HealthTypeExited, "import shut down cleanly")
}
cn.setRunHealth(component.HealthTypeExited, exitMsg)
return err
}

Expand Down
12 changes: 6 additions & 6 deletions internal/runtime/internal/controller/node_custom_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"time"

"github.com/go-kit/log"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/vm"
)
Expand Down Expand Up @@ -211,7 +211,6 @@ func (cn *CustomComponentNode) evaluate(evalScope *vm.Scope) error {
func (cn *CustomComponentNode) Run(ctx context.Context) error {
cn.mut.RLock()
managed := cn.managed
logger := cn.logger
cn.mut.RUnlock()

if managed == nil {
Expand All @@ -220,12 +219,13 @@ func (cn *CustomComponentNode) Run(ctx context.Context) error {

cn.setRunHealth(component.HealthTypeHealthy, "started custom component")
err := managed.Run(ctx)

// Note: logging of this error is handled by the scheduler.
if err != nil {
level.Error(logger).Log("msg", "error running custom component", "id", cn.nodeID, "err", err)
cn.setRunHealth(component.HealthTypeExited, fmt.Sprintf("custom component shut down with error: %s", err))
} else {
cn.setRunHealth(component.HealthTypeExited, "custom component shut down cleanly")
}

level.Info(logger).Log("msg", "custom component exited")
cn.setRunHealth(component.HealthTypeExited, "custom component shut down")
return err
}

Expand Down
24 changes: 18 additions & 6 deletions internal/runtime/internal/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"context"
"fmt"
"sync"

"github.com/go-kit/log"

"github.com/grafana/alloy/internal/runtime/logging/level"
)

// RunnableNode is any BlockNode which can also be run.
Expand All @@ -17,6 +21,7 @@ type Scheduler struct {
ctx context.Context
cancel context.CancelFunc
running sync.WaitGroup
logger log.Logger

tasksMut sync.Mutex
tasks map[string]*task
Expand All @@ -26,11 +31,12 @@ type Scheduler struct {
// components which are running.
//
// Call Close to stop the Scheduler and all running components.
func NewScheduler() *Scheduler {
func NewScheduler(logger log.Logger) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{
ctx: ctx,
cancel: cancel,
logger: logger,

tasks: make(map[string]*task),
}
Expand Down Expand Up @@ -85,9 +91,15 @@ func (s *Scheduler) Synchronize(rr []RunnableNode) error {
opts := taskOptions{
Context: s.ctx,
Runnable: newRunnable,
OnDone: func() {
OnDone: func(err error) {
defer s.running.Done()

if err != nil {
level.Error(s.logger).Log("msg", "node exited with error", "node", nodeID, "err", err)
} else {
level.Info(s.logger).Log("msg", "node exited without error", "node", nodeID)
}

s.tasksMut.Lock()
defer s.tasksMut.Unlock()
delete(s.tasks, nodeID)
Expand Down Expand Up @@ -121,7 +133,7 @@ type task struct {
type taskOptions struct {
Context context.Context
Runnable RunnableNode
OnDone func()
OnDone func(error)
}

// newTask creates and starts a new task.
Expand All @@ -135,9 +147,9 @@ func newTask(opts taskOptions) *task {
}

go func() {
defer opts.OnDone()
defer close(t.exited)
_ = opts.Runnable.Run(t.ctx)
err := opts.Runnable.Run(t.ctx)
close(t.exited)
opts.OnDone(err)
}()
return t
}
Expand Down
12 changes: 8 additions & 4 deletions internal/runtime/internal/controller/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package controller_test

import (
"context"
"os"
"sync"
"testing"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/runtime/internal/controller"
"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/vm"
"github.com/stretchr/testify/require"
)

func TestScheduler_Synchronize(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)
t.Run("Can start new jobs", func(t *testing.T) {
var started, finished sync.WaitGroup
started.Add(3)
Expand All @@ -26,7 +30,7 @@ func TestScheduler_Synchronize(t *testing.T) {
return nil
}

sched := controller.NewScheduler()
sched := controller.NewScheduler(logger)
sched.Synchronize([]controller.RunnableNode{
fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}},
fakeRunnable{ID: "component-b", Component: mockComponent{RunFunc: runFunc}},
Expand All @@ -48,7 +52,7 @@ func TestScheduler_Synchronize(t *testing.T) {
return nil
}

sched := controller.NewScheduler()
sched := controller.NewScheduler(logger)

for i := 0; i < 10; i++ {
// If a new runnable is created, runFunc will panic since the WaitGroup
Expand All @@ -74,7 +78,7 @@ func TestScheduler_Synchronize(t *testing.T) {
return nil
}

sched := controller.NewScheduler()
sched := controller.NewScheduler(logger)

sched.Synchronize([]controller.RunnableNode{
fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}},
Expand Down

0 comments on commit 16814cf

Please sign in to comment.