Skip to content

Commit

Permalink
feat(flow): allow loading cfg srcs from dir (#5228)
Browse files Browse the repository at this point in the history
Co-authored-by: Robert Fratto <[email protected]>
Co-authored-by: Clayton Cornell <[email protected]>
  • Loading branch information
3 people authored Sep 21, 2023
1 parent 25df18b commit 0b0b4a5
Show file tree
Hide file tree
Showing 24 changed files with 402 additions and 248 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ Main (unreleased)

- The `cri` stage in `loki.process` can now be configured to limit line size.

- Flow: Allow `grafana-agent run` to accept a path to a directory of `*.river` files.
This will load all River files in the directory as a single configuration;
component names must be unique across all loaded files. (@rfratto, @hainenber)


### Enhancements

- Clustering: allow advertise interfaces to be configurable, with the possibility to select all available interfaces. (@wildum)
Expand Down
85 changes: 61 additions & 24 deletions cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -55,15 +57,18 @@ func runCommand() *cobra.Command {
}

cmd := &cobra.Command{
Use: "run [flags] file",
Use: "run [flags] path",
Short: "Run Grafana Agent Flow",
Long: `The run subcommand runs Grafana Agent Flow in the foreground until an interrupt
is received.
run must be provided an argument pointing at the River file to use. If the
River file wasn't specified, can't be loaded, or contains errors, run will exit
run must be provided an argument pointing at the River dir/file-path to use. If the
River dir/file-path wasn't specified, can't be loaded, or contains errors, run will exit
immediately.
If path is a directory, all *.river files in that directory will be combined
into a single unit. Subdirectories are not recursively searched for further merging.
run starts an HTTP server which can be used to debug Grafana Agent Flow or
force it to reload (by sending a GET or POST request to /-/reload). The listen
address can be changed through the --server.http.listen-addr flag.
Expand All @@ -76,7 +81,7 @@ Additionally, the HTTP server exposes the following debug endpoints:
/debug/pprof Go performance profiling tools
If reloading the config file fails, Grafana Agent Flow will continue running in
If reloading the config dir/file-path fails, Grafana Agent Flow will continue running in
its last valid state. Components which failed may be be listed as unhealthy,
depending on the nature of the reload error.
`,
Expand Down Expand Up @@ -140,15 +145,15 @@ type flowRun struct {
configBypassConversionErrors bool
}

func (fr *flowRun) Run(configFile string) error {
func (fr *flowRun) Run(configPath string) error {
var wg sync.WaitGroup
defer wg.Wait()

ctx, cancel := interruptContext()
defer cancel()

if configFile == "" {
return fmt.Errorf("file argument not provided")
if configPath == "" {
return fmt.Errorf("path argument not provided")
}

l, err := logging.New(os.Stderr, logging.DefaultOptions)
Expand Down Expand Up @@ -192,7 +197,7 @@ func (fr *flowRun) Run(configFile string) error {
// To work around this, we lazily create variables for the functions the HTTP
// service needs and set them after the Flow controller exists.
var (
reload func() error
reload func() (*flow.Source, error)
ready func() bool
)

Expand Down Expand Up @@ -222,7 +227,7 @@ func (fr *flowRun) Run(configFile string) error {
Gatherer: prometheus.DefaultGatherer,

ReadyFunc: func() bool { return ready() },
ReloadFunc: func() error { return reload() },
ReloadFunc: func() (*flow.Source, error) { return reload() },

HTTPListenAddr: fr.httpListenAddr,
MemoryListenAddr: fr.inMemoryAddr,
Expand Down Expand Up @@ -250,18 +255,19 @@ func (fr *flowRun) Run(configFile string) error {
})

ready = f.Ready
reload = func() error {
flowCfg, err := loadFlowFile(configFile, fr.configFormat, fr.configBypassConversionErrors)
reload = func() (*flow.Source, error) {
flowSource, err := loadFlowSource(configPath, fr.configFormat, fr.configBypassConversionErrors)
defer instrumentation.InstrumentSHA256(flowSource.SHA256())
defer instrumentation.InstrumentLoad(err == nil)

if err != nil {
return fmt.Errorf("reading config file %q: %w", configFile, err)
return nil, fmt.Errorf("reading config path %q: %w", configPath, err)
}
if err := f.LoadFile(flowCfg, nil); err != nil {
return fmt.Errorf("error during the initial gragent load: %w", err)
if err := f.LoadSource(flowSource, nil); err != nil {
return flowSource, fmt.Errorf("error during the initial grafana/agent load: %w", err)
}

return nil
return flowSource, nil
}

// Flow controller
Expand Down Expand Up @@ -290,25 +296,23 @@ func (fr *flowRun) Run(configFile string) error {
// Perform the initial reload. This is done after starting the HTTP server so
// that /metric and pprof endpoints are available while the Flow controller
// is loading.
if err := reload(); err != nil {
if source, err := reload(); err != nil {
var diags diag.Diagnostics
if errors.As(err, &diags) {
bb, _ := os.ReadFile(configFile)

p := diag.NewPrinter(diag.PrinterConfig{
Color: !color.NoColor,
ContextLinesBefore: 1,
ContextLinesAfter: 1,
})
_ = p.Fprint(os.Stderr, map[string][]byte{configFile: bb}, diags)
_ = p.Fprint(os.Stderr, source.RawConfigs(), diags)

// Print newline after the diagnostics.
fmt.Println()

return fmt.Errorf("could not perform the initial load successfully")
}

// Exit if the initial load files
// Exit if the initial load fails.
return err
}

Expand All @@ -330,7 +334,7 @@ func (fr *flowRun) Run(configFile string) error {
case <-ctx.Done():
return nil
case <-reloadSignal:
if err := reload(); err != nil {
if _, err := reload(); err != nil {
level.Error(l).Log("msg", "failed to reload config", "err", err)
} else {
level.Info(l).Log("msg", "config reloaded")
Expand All @@ -351,12 +355,45 @@ func getEnabledComponentsFunc(f *flow.Flow) func() map[string]interface{} {
}
}

func loadFlowFile(filename string, converterSourceFormat string, converterBypassErrors bool) (*flow.File, error) {
bb, err := os.ReadFile(filename)
func loadFlowSource(path string, converterSourceFormat string, converterBypassErrors bool) (*flow.Source, error) {
fi, err := os.Stat(path)
if err != nil {
return nil, err
}

if fi.IsDir() {
sources := map[string][]byte{}
err := filepath.WalkDir(path, func(curPath string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
// Skip all directories and don't recurse into child dirs that aren't at top-level
if d.IsDir() {
if curPath != path {
return filepath.SkipDir
}
return nil
}
// Ignore files not ending in .river extension
if !strings.HasSuffix(curPath, ".river") {
return nil
}

bb, err := os.ReadFile(curPath)
sources[curPath] = bb
return err
})
if err != nil {
return nil, err
}

return flow.ParseSources(sources)
}

bb, err := os.ReadFile(path)
if err != nil {
return nil, err
}
if converterSourceFormat != "flow" {
var diags convert_diag.Diagnostics
bb, diags = converter.Convert(bb, converter.Input(converterSourceFormat))
Expand All @@ -369,7 +406,7 @@ func loadFlowFile(filename string, converterSourceFormat string, converterBypass

instrumentation.InstrumentConfig(bb)

return flow.ReadFile(filename, bb)
return flow.ParseSource(path, bb)
}

func interruptContext() (context.Context, context.CancelFunc) {
Expand Down
2 changes: 1 addition & 1 deletion component/discovery/consulagent/consulagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"github.com/grafana/river"
promcfg "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gotest.tools/assert"
)

func TestConvert(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

//nolint:interfacer // this follows the pattern in prometheus service discovery
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
}

func TestConfiguredService(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions component/module/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (c *Component) newManagedLocalComponent(o component.Options) (*file.Compone

if !c.inUpdate.Load() && c.isCreated.Load() {
// Any errors found here are reported via component health
_ = c.mod.LoadFlowContent(c.getArgs().Arguments, c.getContent().Value)
_ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value)
}
}

Expand Down Expand Up @@ -138,7 +138,7 @@ func (c *Component) Update(args component.Arguments) error {

// Force a content load here and bubble up any error. This will catch problems
// on initial load.
return c.mod.LoadFlowContent(newArgs.Arguments, c.getContent().Value)
return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value)
}

// CurrentHealth implements component.HealthComponent.
Expand Down
2 changes: 1 addition & 1 deletion component/module/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c *Component) pollFile(ctx context.Context, args Arguments) error {
return err
}

return c.mod.LoadFlowContent(args.Arguments, string(bb))
return c.mod.LoadFlowSource(args.Arguments, string(bb))
}

// CurrentHealth implements component.HealthComponent.
Expand Down
4 changes: 2 additions & 2 deletions component/module/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (c *Component) newManagedLocalComponent(o component.Options) (*remote_http.

if !c.inUpdate.Load() && c.isCreated.Load() {
// Any errors found here are reported via component health
_ = c.mod.LoadFlowContent(c.getArgs().Arguments, c.getContent().Value)
_ = c.mod.LoadFlowSource(c.getArgs().Arguments, c.getContent().Value)
}
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func (c *Component) Update(args component.Arguments) error {

// Force a content load here and bubble up any error. This will catch problems
// on initial load.
return c.mod.LoadFlowContent(newArgs.Arguments, c.getContent().Value)
return c.mod.LoadFlowSource(newArgs.Arguments, c.getContent().Value)
}

// CurrentHealth implements component.HealthComponent.
Expand Down
9 changes: 4 additions & 5 deletions component/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ func NewModuleComponent(o component.Options) (*ModuleComponent, error) {
return c, err
}

// LoadFlowContent loads the flow controller with the current component content. It
// will set the component health in addition to return the error so that the consumer
// can rely on either or both. If the content is the same as the last time it was
// successfully loaded, it will not be reloaded.
func (c *ModuleComponent) LoadFlowContent(args map[string]any, contentValue string) error {
// LoadFlowSource loads the flow controller with the current component source.
// It will set the component health in addition to return the error so that the consumer can rely on either or both.
// If the content is the same as the last time it was successfully loaded, it will not be reloaded.
func (c *ModuleComponent) LoadFlowSource(args map[string]any, contentValue string) error {
if reflect.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion component/module/string/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *Component) Run(ctx context.Context) error {
func (c *Component) Update(args component.Arguments) error {
newArgs := args.(Arguments)

return c.mod.LoadFlowContent(newArgs.Arguments, newArgs.Content.Value)
return c.mod.LoadFlowSource(newArgs.Arguments, newArgs.Content.Value)
}

// CurrentHealth implements component.HealthComponent.
Expand Down
4 changes: 2 additions & 2 deletions converter/internal/test_common/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func validateRiver(t *testing.T, expectedRiver []byte, actualRiver []byte, loadF

// attemptLoadingFlowConfig will attempt to load the Flow config and report any errors.
func attemptLoadingFlowConfig(t *testing.T, river []byte) {
cfg, err := flow.ReadFile(t.Name(), river)
cfg, err := flow.ParseSource(t.Name(), river)
require.NoError(t, err, "the output River config failed to parse: %s", string(normalizeLineEndings(river)))

// The below check suffers from test race conditions on Windows. Our goal here is to verify config conversions,
Expand Down Expand Up @@ -201,7 +201,7 @@ func attemptLoadingFlowConfig(t *testing.T, river []byte) {
clusterService,
},
})
err = f.LoadFile(cfg, nil)
err = f.LoadSource(cfg, nil)

// Many components will fail to build as e.g. the cert files are missing, so we ignore these errors.
// This is not ideal, but we still validate for other potential issues.
Expand Down
12 changes: 8 additions & 4 deletions docs/sources/flow/reference/cli/run.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@ interrupt is received.

Usage:

* `AGENT_MODE=flow grafana-agent run [FLAG ...] FILE_NAME`
* `grafana-agent-flow run [FLAG ...] FILE_NAME`
* `AGENT_MODE=flow grafana-agent run [FLAG ...] PATH_NAME`
* `grafana-agent-flow run [FLAG ...] PATH_NAME`

Replace the following:

* `FLAG`: One or more flags that define the input and output of the command.
* `FILE_NAME`: Required. The Grafana Agent configuration file.
* `PATH_NAME`: Required. The Grafana Agent configuration file/directory path.

If the `FILE_NAME` argument is not provided, or if the configuration file can't be loaded or
If the `PATH_NAME` argument is not provided, or if the configuration path can't be loaded or
contains errors during the initial load, the `run` command will immediately exit and show an error message.

If you give the `PATH_NAME` argument a directory path, the agent will find `*.river` files
(ignoring nested directories) and load them as a single configuration source. However, component names must
be **unique** across all River files, and configuration blocks must not be repeated.

Grafana Agent Flow will continue to run if subsequent reloads of the configuration
file fail, potentially marking components as unhealthy depending on the nature
of the failure. When this happens, Grafana Agent Flow will continue functioning
Expand Down
6 changes: 5 additions & 1 deletion pkg/config/instrumentation/config_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ func newConfigMetrics() *configMetrics {
// Create a sha256 hash of the config before expansion and expose it via
// the agent_config_hash metric.
func InstrumentConfig(buf []byte) {
InstrumentSHA256(sha256.Sum256(buf))
}

// InstrumentSHA256 stores the provided hash to the agent_config_hash metric.
func InstrumentSHA256(hash [sha256.Size]byte) {
configMetricsInitializer.Do(initializeConfigMetrics)
hash := sha256.Sum256(buf)
confMetrics.configHash.Reset()
confMetrics.configHash.WithLabelValues(fmt.Sprintf("%x", hash)).Set(1)
}
Expand Down
Loading

0 comments on commit 0b0b4a5

Please sign in to comment.