Skip to content

Commit

Permalink
Merge branch 'main' into deresolution20-patch-3
Browse files Browse the repository at this point in the history
  • Loading branch information
clayton-cornell authored Sep 14, 2023
2 parents 73d38b0 + 2b186e8 commit 12e563d
Show file tree
Hide file tree
Showing 307 changed files with 1,690 additions and 180 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Main (unreleased)
- Add a `file_watch` block in `loki.source.file` to configure how often to poll files from disk for changes via `min_poll_frequency` and `max_poll_frequency`.
In static mode it can be configured in the global `file_watch_config` via `min_poll_frequency` and `max_poll_frequency`. (@wildum)

- Flow: In `prometheus.exporter.blackbox`, allow setting labels for individual targets. (@spartan0x117)

### Enhancements

- Clustering: allow advertise interfaces to be configurable, with the possibility to select all available interfaces. (@wildum)
Expand All @@ -57,6 +59,9 @@ Main (unreleased)
- `loki.source.kafka` component now exposes internal label `__meta_kafka_offset`
to indicate offset of consumed message. (@hainenber)

- Flow: improve river config validation step in `prometheus.scrape` by comparing `scrape_timeout` with `scrape_interval`. (@wildum)


### Other changes

- Use Go 1.21.1 for builds. (@rfratto)
Expand All @@ -69,6 +74,14 @@ Main (unreleased)
`msg`, followed by non-common fields. Previously, the position of `msg` was
not consistent. (@rfratto)

### Bugfixes

- Fixed a bug where `otelcol.processor.discovery` could modify the `targets` passed by an upstream component. (@ptodev)

- Fixed a bug where `otelcol` components with a retry mechanism would not wait after the first retry. (@rfratto)

- Fixed a bug where documented default settings in `otelcol.exporter.loadbalancing` were never set. (@rfratto)

v0.36.1 (2023-09-06)
--------------------

Expand Down
11 changes: 7 additions & 4 deletions component/otelcol/config_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package otelcol
import (
"time"

"github.com/cenkalti/backoff/v4"
otelexporterhelper "go.opentelemetry.io/collector/exporter/exporterhelper"
)

Expand Down Expand Up @@ -35,9 +36,11 @@ func (args *RetryArguments) Convert() *otelexporterhelper.RetrySettings {
}

return &otelexporterhelper.RetrySettings{
Enabled: args.Enabled,
InitialInterval: args.InitialInterval,
MaxInterval: args.MaxInterval,
MaxElapsedTime: args.MaxElapsedTime,
Enabled: args.Enabled,
InitialInterval: args.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: args.MaxInterval,
MaxElapsedTime: args.MaxElapsedTime,
}
}
24 changes: 20 additions & 4 deletions component/otelcol/exporter/loadbalancing/loadbalancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,22 @@ var (
_ river.Defaulter = &Arguments{}
)

// DefaultArguments holds default values for Arguments.
var DefaultArguments = Arguments{
RoutingKey: "traceID",
}
var (
// DefaultArguments holds default values for Arguments.
DefaultArguments = Arguments{
Protocol: Protocol{
OTLP: DefaultOTLPConfig,
},
RoutingKey: "traceID",
}

DefaultOTLPConfig = OtlpConfig{
Timeout: otelcol.DefaultTimeout,
Queue: otelcol.DefaultQueueArguments,
Retry: otelcol.DefaultRetryArguments,
Client: DefaultGRPCClientArguments,
}
)

// SetToDefault implements river.Defaulter.
func (args *Arguments) SetToDefault() {
Expand Down Expand Up @@ -88,6 +100,10 @@ type OtlpConfig struct {
Client GRPCClientArguments `river:"client,block"`
}

func (OtlpConfig *OtlpConfig) SetToDefault() {
*OtlpConfig = DefaultOTLPConfig
}

func (otlpConfig OtlpConfig) Convert() otlpexporter.Config {
return otlpexporter.Config{
TimeoutSettings: exporterhelper.TimeoutSettings{
Expand Down
43 changes: 31 additions & 12 deletions component/otelcol/exporter/loadbalancing/loadbalancing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,35 @@ import (
)

func TestConfigConversion(t *testing.T) {
defaultProtocol := loadbalancingexporter.Protocol{
OTLP: otlpexporter.Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "",
Compression: "gzip",
WriteBufferSize: 512 * 1024,
Headers: map[string]configopaque.String{},
BalancerName: "pick_first",
var (
defaultRetrySettings = exporterhelper.NewDefaultRetrySettings()
defaultTimeoutSettings = exporterhelper.NewDefaultTimeoutSettings()

// TODO(rfratto): resync defaults with upstream.
//
// We have drifted from the upstream defaults, which have decreased the
// default queue_size to 1000 since we introduced the defaults.
defaultQueueSettings = exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 10,
QueueSize: 5000,
}

defaultProtocol = loadbalancingexporter.Protocol{
OTLP: otlpexporter.Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "",
Compression: "gzip",
WriteBufferSize: 512 * 1024,
Headers: map[string]configopaque.String{},
BalancerName: "pick_first",
},
RetrySettings: defaultRetrySettings,
TimeoutSettings: defaultTimeoutSettings,
QueueSettings: defaultQueueSettings,
},
},
}
}
)

tests := []struct {
testName string
Expand Down Expand Up @@ -96,14 +114,15 @@ func TestConfigConversion(t *testing.T) {
static {
hostnames = ["endpoint-1", "endpoint-2:55678"]
}
}
`,
}`,
expected: loadbalancingexporter.Config{
Protocol: loadbalancingexporter.Protocol{
OTLP: otlpexporter.Config{
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 1 * time.Second,
},
RetrySettings: defaultRetrySettings,
QueueSettings: defaultQueueSettings,
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: "",
Compression: "gzip",
Expand Down
8 changes: 3 additions & 5 deletions component/otelcol/processor/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func (args *Arguments) Validate() error {

// Component is the otelcol.exporter.discovery component.
type Component struct {
cfg Arguments
consumer *promsdconsumer.Consumer
logger log.Logger
}
Expand Down Expand Up @@ -134,18 +133,17 @@ func (c *Component) Run(ctx context.Context) error {
// Update implements Component.
func (c *Component) Update(newConfig component.Arguments) error {
cfg := newConfig.(Arguments)
c.cfg = cfg

hostLabels := make(map[string]discovery.Target)

for _, labels := range c.cfg.Targets {
for _, labels := range cfg.Targets {
host, err := promsdconsumer.GetHostFromLabels(labels)
if err != nil {
level.Warn(c.logger).Log("msg", "ignoring target, unable to find address", "err", err)
continue
}
promsdconsumer.CleanupLabels(labels)
hostLabels[host] = labels

hostLabels[host] = promsdconsumer.NewTargetsWithNonInternalLabels(labels)
}

err := c.consumer.UpdateOptions(promsdconsumer.Options{
Expand Down
11 changes: 8 additions & 3 deletions component/prometheus/exporter/blackbox/blackbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func buildBlackboxTargets(baseTarget discovery.Target, args component.Arguments)
a := args.(Arguments)
for _, tgt := range a.Targets {
target := make(discovery.Target)
// Set extra labels first, meaning that any other labels will override
for k, v := range tgt.Labels {
target[k] = v
}
for k, v := range baseTarget {
target[k] = v
}
Expand All @@ -62,9 +66,10 @@ var DefaultArguments = Arguments{

// BlackboxTarget defines a target to be used by the exporter.
type BlackboxTarget struct {
Name string `river:",label"`
Target string `river:"address,attr"`
Module string `river:"module,attr,optional"`
Name string `river:",label"`
Target string `river:"address,attr"`
Module string `river:"module,attr,optional"`
Labels map[string]string `river:"labels,attr,optional"`
}

type TargetBlock []BlackboxTarget
Expand Down
44 changes: 44 additions & 0 deletions component/prometheus/exporter/blackbox/blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,47 @@ func TestBuildBlackboxTargets(t *testing.T) {
require.Equal(t, "http://example.com", targets[0]["__param_target"])
require.Equal(t, "http_2xx", targets[0]["__param_module"])
}

func TestBuildBlackboxTargetsWithExtraLabels(t *testing.T) {
baseArgs := Arguments{
ConfigFile: "modules.yml",
Targets: TargetBlock{{
Name: "target_a",
Target: "http://example.com",
Module: "http_2xx",
Labels: map[string]string{
"env": "test",
"foo": "bar",
},
}},
ProbeTimeoutOffset: 1.0,
}
baseTarget := discovery.Target{
model.SchemeLabel: "http",
model.MetricsPathLabel: "component/prometheus.exporter.blackbox.default/metrics",
"instance": "prometheus.exporter.blackbox.default",
"job": "integrations/blackbox",
"__meta_agent_integration_name": "blackbox",
"__meta_agent_integration_instance": "prometheus.exporter.blackbox.default",
}
args := component.Arguments(baseArgs)
targets := buildBlackboxTargets(baseTarget, args)
require.Equal(t, 1, len(targets))
require.Equal(t, "integrations/blackbox/target_a", targets[0]["job"])
require.Equal(t, "http://example.com", targets[0]["__param_target"])
require.Equal(t, "http_2xx", targets[0]["__param_module"])

require.Equal(t, "test", targets[0]["env"])
require.Equal(t, "bar", targets[0]["foo"])

// Check that the extra labels do not override existing labels
baseArgs.Targets[0].Labels = map[string]string{
"job": "test",
"instance": "test-instance",
}
args = component.Arguments(baseArgs)
targets = buildBlackboxTargets(baseTarget, args)
require.Equal(t, 1, len(targets))
require.Equal(t, "integrations/blackbox/target_a", targets[0]["job"])
require.Equal(t, "prometheus.exporter.blackbox.default", targets[0]["instance"])
}
4 changes: 4 additions & 0 deletions component/prometheus/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (arg *Arguments) SetToDefault() {

// Validate implements river.Validator.
func (arg *Arguments) Validate() error {
if arg.ScrapeTimeout > arg.ScrapeInterval {
return fmt.Errorf("scrape_timeout (%s) greater than scrape_interval (%s) for scrape config with job name %q", arg.ScrapeTimeout, arg.ScrapeInterval, arg.JobName)
}

// We must explicitly Validate because HTTPClientConfig is squashed and it won't run otherwise
return arg.HTTPClientConfig.Validate()
}
Expand Down
13 changes: 13 additions & 0 deletions component/prometheus/scrape/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,16 @@ func TestCustomDialer(t *testing.T) {
err = scrapeTrigger.Wait(1 * time.Minute)
require.NoError(t, err, "custom dialer was not used")
}

func TestValidateScrapeConfig(t *testing.T) {
var exampleRiverConfig = `
targets = [{ "target1" = "target1" }]
forward_to = []
scrape_interval = "10s"
scrape_timeout = "20s"
job_name = "local"
`
var args Arguments
err := river.Unmarshal([]byte(exampleRiverConfig), &args)
require.ErrorContains(t, err, "scrape_timeout (20s) greater than scrape_interval (10s) for scrape config with job name \"local\"")
}
File renamed without changes.
10 changes: 9 additions & 1 deletion converter/internal/common/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,17 @@ import (
)

func UnsupportedNotDeepEquals(a any, b any, name string) diag.Diagnostics {
return UnsupportedNotDeepEqualsMessage(a, b, name, "")
}

func UnsupportedNotDeepEqualsMessage(a any, b any, name string, message string) diag.Diagnostics {
var diags diag.Diagnostics
if !reflect.DeepEqual(a, b) {
diags.Add(diag.SeverityLevelError, fmt.Sprintf("unsupported %s config was provided.", name))
if message != "" {
diags.Add(diag.SeverityLevelError, fmt.Sprintf("unsupported %s config was provided: %s", name, message))
} else {
diags.Add(diag.SeverityLevelError, fmt.Sprintf("unsupported %s config was provided.", name))
}
}

return diags
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import (
)

func TestConvert(t *testing.T) {
test_common.TestDirectory(t, "testdata", ".yaml", prometheusconvert.Convert)
test_common.TestDirectory(t, "testdata", ".yaml", true, prometheusconvert.Convert)
}
2 changes: 1 addition & 1 deletion converter/internal/promtailconvert/promtailconvert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import (
)

func TestConvert(t *testing.T) {
test_common.TestDirectory(t, "testdata", ".yaml", promtailconvert.Convert)
test_common.TestDirectory(t, "testdata", ".yaml", true, promtailconvert.Convert)
}
51 changes: 32 additions & 19 deletions converter/internal/staticconvert/internal/build/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,28 @@ func NewIntegrationsV1ConfigBuilder(f *builder.File, diags *diag.Diagnostics, cf
}
}

func (b *IntegrationsV1ConfigBuilder) AppendIntegrations() {
func (b *IntegrationsV1ConfigBuilder) Build() {
b.appendLogging(b.cfg.Server)
b.appendServer(b.cfg.Server)
b.appendIntegrations()
}

func (b *IntegrationsV1ConfigBuilder) appendIntegrations() {
for _, integration := range b.cfg.Integrations.ConfigV1.Integrations {
if !integration.Common.Enabled {
continue
}

scrapeIntegration := b.cfg.Integrations.ConfigV1.ScrapeIntegrations
if integration.Common.ScrapeIntegration != nil {
scrapeIntegration = *integration.Common.ScrapeIntegration
}

if !scrapeIntegration {
b.diags.Add(diag.SeverityLevelError, fmt.Sprintf("unsupported integration which is not being scraped was provided: %s.", integration.Name()))
continue
}

var exports discovery.Exports
switch itg := integration.Config.(type) {
case *apache_http.Config:
Expand Down Expand Up @@ -116,27 +132,24 @@ func (b *IntegrationsV1ConfigBuilder) AppendIntegrations() {
}

func (b *IntegrationsV1ConfigBuilder) appendExporter(commonConfig *int_config.Common, name string, extraTargets []discovery.Target) {
scrapeConfigs := []*prom_config.ScrapeConfig{}
if b.cfg.Integrations.ConfigV1.ScrapeIntegrations {
scrapeConfig := prom_config.DefaultScrapeConfig
scrapeConfig.JobName = fmt.Sprintf("integrations/%s", name)
scrapeConfig.RelabelConfigs = commonConfig.RelabelConfigs
scrapeConfig.MetricRelabelConfigs = commonConfig.MetricRelabelConfigs
// TODO: Add support for scrapeConfig.HTTPClientConfig

scrapeConfig.ScrapeInterval = model.Duration(commonConfig.ScrapeInterval)
if commonConfig.ScrapeInterval == 0 {
scrapeConfig.ScrapeInterval = b.cfg.Integrations.ConfigV1.PrometheusGlobalConfig.ScrapeInterval
}

scrapeConfig.ScrapeTimeout = model.Duration(commonConfig.ScrapeTimeout)
if commonConfig.ScrapeTimeout == 0 {
scrapeConfig.ScrapeTimeout = b.cfg.Integrations.ConfigV1.PrometheusGlobalConfig.ScrapeTimeout
}
scrapeConfig := prom_config.DefaultScrapeConfig
scrapeConfig.JobName = fmt.Sprintf("integrations/%s", name)
scrapeConfig.RelabelConfigs = commonConfig.RelabelConfigs
scrapeConfig.MetricRelabelConfigs = commonConfig.MetricRelabelConfigs
scrapeConfig.HTTPClientConfig.TLSConfig = b.cfg.Integrations.ConfigV1.TLSConfig

scrapeConfig.ScrapeInterval = model.Duration(commonConfig.ScrapeInterval)
if commonConfig.ScrapeInterval == 0 {
scrapeConfig.ScrapeInterval = b.cfg.Integrations.ConfigV1.PrometheusGlobalConfig.ScrapeInterval
}

scrapeConfigs = []*prom_config.ScrapeConfig{&scrapeConfig}
scrapeConfig.ScrapeTimeout = model.Duration(commonConfig.ScrapeTimeout)
if commonConfig.ScrapeTimeout == 0 {
scrapeConfig.ScrapeTimeout = b.cfg.Integrations.ConfigV1.PrometheusGlobalConfig.ScrapeTimeout
}

scrapeConfigs := []*prom_config.ScrapeConfig{&scrapeConfig}

promConfig := &prom_config.Config{
GlobalConfig: b.cfg.Integrations.ConfigV1.PrometheusGlobalConfig,
ScrapeConfigs: scrapeConfigs,
Expand Down
Loading

0 comments on commit 12e563d

Please sign in to comment.