Skip to content

Commit

Permalink
Add probabilistic sampler processor
Browse files Browse the repository at this point in the history
  • Loading branch information
mar4uk committed Sep 11, 2023
1 parent 0cf05ba commit 86bf82d
Show file tree
Hide file tree
Showing 7 changed files with 539 additions and 22 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ Main (unreleased)
- `discovery.serverset` discovers Serversets stored in Zookeeper. (@thampiotr)
- `discovery.scaleway` discovers scrape targets from Scaleway virtual
instances and bare-metal machines. (@rfratto)

- `otelcol.processor.probabilistic_sampler` - samples logs and traces based on configuration options. (@mar4uk)

- Flow: allow the HTTP server to be configured with TLS in the config file
using the new `http` config block. (@rfratto)

Expand Down
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import (
_ "github.com/grafana/agent/component/otelcol/processor/batch" // Import otelcol.processor.batch
_ "github.com/grafana/agent/component/otelcol/processor/discovery" // Import otelcol.processor.discovery
_ "github.com/grafana/agent/component/otelcol/processor/memorylimiter" // Import otelcol.processor.memory_limiter
_ "github.com/grafana/agent/component/otelcol/processor/probabilistic_sampler" // Import otelcol.processor.probabilistic_sampler
_ "github.com/grafana/agent/component/otelcol/processor/span" // Import otelcol.processor.span
_ "github.com/grafana/agent/component/otelcol/processor/tail_sampling" // Import otelcol.processor.tail_sampling
_ "github.com/grafana/agent/component/otelcol/receiver/jaeger" // Import otelcol.receiver.jaeger
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Package probabilistic_sampler provides an otelcol.processor.probabilistic_sampler component.
package probabilistic_sampler

import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/processor"
psp "github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor"
otelcomponent "go.opentelemetry.io/collector/component"
otelextension "go.opentelemetry.io/collector/extension"
)

const (
defaultAttributeSource = "traceID"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.processor.probabilistic_sampler",
Args: Arguments{},
Exports: otelcol.ConsumerExports{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := psp.NewFactory()
return processor.New(opts, fact, args.(Arguments))
},
})
}

// Arguments configures the otelcol.processor.probabilistic_sampler component.

type Arguments struct {
SamplingPercentage float32 `river:"sampling_percentage,attr,optional"`
HashSeed uint32 `river:"hash_seed,attr,optional"`
AttributeSource string `river:"attribute_source,attr,optional"`
FromAttribute string `river:"from_attribute,attr,optional"`
SamplingPriority string `river:"sampling_priority,attr,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `river:"output,block"`
}

var (
_ processor.Arguments = Arguments{}
)

// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
AttributeSource: defaultAttributeSource,
}

// SetToDefault implements river.Defaulter.
func (args *Arguments) SetToDefault() {
*args = DefaultArguments
}

// Validate implements river.Validator.
func (args *Arguments) Validate() error {
cfg, err := args.Convert()
if err != nil {
return err
}

return cfg.(*psp.Config).Validate()
}

// Convert implements processor.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &psp.Config{
SamplingPercentage: args.SamplingPercentage,
HashSeed: args.HashSeed,
AttributeSource: psp.AttributeSource(args.AttributeSource),
FromAttribute: args.FromAttribute,
SamplingPriority: args.SamplingPriority,
}, nil
}

// Extensions implements processor.Arguments.
func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension {
return nil
}

// Exporters implements processor.Arguments.
func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component {
return nil
}

// NextConsumers implements processor.Arguments.
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
return args.Output
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
package probabilistic_sampler

import (
"context"
"testing"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"
"github.com/grafana/agent/pkg/util"

"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/pkg/flow/componenttest"
"github.com/grafana/dskit/backoff"
"github.com/grafana/river"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestBadRiverConfigNegativeSamplingRate(t *testing.T) {
exampleBadRiverConfig := `
sampling_percentage = -1
output {
// no-op: will be overridden by test code.
}
`
var args Arguments
require.EqualError(t, river.Unmarshal([]byte(exampleBadRiverConfig), &args), "negative sampling rate: -1.00")
}

func TestBadRiverConfigInvalidAttributeSource(t *testing.T) {
exampleBadRiverConfig := `
sampling_percentage = 0.1
attribute_source = "example"
output {
// no-op: will be overridden by test code.
}
`
var args Arguments
require.EqualError(t, river.Unmarshal([]byte(exampleBadRiverConfig), &args), "invalid attribute source: example. Expected: traceID or record")
}

func TestLogProcessing(t *testing.T) {
exampleSmallConfig := `
sampling_percentage = 100
hash_seed = 123
output {
// no-op: will be overridden by test code.
}
`
ctx := componenttest.TestContext(t)
l := util.TestLogger(t)

ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.probabilistic_sampler")
require.NoError(t, err)

var args Arguments
require.NoError(t, river.Unmarshal([]byte(exampleSmallConfig), &args))

// Override our arguments so logs get forwarded to logsCh.
logsCh := make(chan plog.Logs)
args.Output = makeLogsOutput(logsCh)

go func() {
err := ctrl.Run(ctx, args)
require.NoError(t, err)
}()

require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")
require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")

// Send traces in the background to our processor.
go func() {
exports := ctrl.Exports().(otelcol.ConsumerExports)

exports.Input.Capabilities()

bo := backoff.New(ctx, backoff.Config{
MinBackoff: 10 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond,
})
for bo.Ongoing() {
err := exports.Input.ConsumeLogs(ctx, createTestLogs())
if err != nil {
level.Error(l).Log("msg", "failed to send logs", "err", err)
bo.Wait()
continue
}

return
}
}()

// Wait for our processor to finish and forward data to logCh.
select {
case <-time.After(time.Second * 10):
require.FailNow(t, "failed waiting for logs")
case tr := <-logsCh:
require.Equal(t, 1, tr.LogRecordCount())
}
}

func TestTraceProcessing(t *testing.T) {
exampleSmallConfig := `
sampling_percentage = 100
hash_seed = 123
output {
// no-op: will be overridden by test code.
}
`
ctx := componenttest.TestContext(t)
l := util.TestLogger(t)

ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.probabilistic_sampler")
require.NoError(t, err)

var args Arguments
require.NoError(t, river.Unmarshal([]byte(exampleSmallConfig), &args))

// Override our arguments so traces get forwarded to traceCh.
traceCh := make(chan ptrace.Traces)
args.Output = makeTracesOutput(traceCh)

go func() {
err := ctrl.Run(ctx, args)
require.NoError(t, err)
}()

require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")
require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")

// Send traces in the background to our processor.
go func() {
exports := ctrl.Exports().(otelcol.ConsumerExports)

exports.Input.Capabilities()

bo := backoff.New(ctx, backoff.Config{
MinBackoff: 10 * time.Millisecond,
MaxBackoff: 100 * time.Millisecond,
})
for bo.Ongoing() {
err := exports.Input.ConsumeTraces(ctx, createTestTraces())
if err != nil {
level.Error(l).Log("msg", "failed to send traces", "err", err)
bo.Wait()
continue
}

return
}
}()

// Wait for our processor to finish and forward data to traceCh.
select {
case <-time.After(time.Second * 10):
require.FailNow(t, "failed waiting for traces")
case tr := <-traceCh:
require.Equal(t, 1, tr.SpanCount())
}
}

// makeTracesOutput returns ConsumerArguments which will forward traces to the
// provided channel.
func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments {
traceConsumer := fakeconsumer.Consumer{
ConsumeTracesFunc: func(ctx context.Context, t ptrace.Traces) error {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- t:
return nil
}
},
}

return &otelcol.ConsumerArguments{
Traces: []otelcol.Consumer{&traceConsumer},
}
}

func createTestTraces() ptrace.Traces {
// Matches format from the protobuf definition:
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
var bb = `{
"resource_spans": [{
"scope_spans": [{
"spans": [{
"name": "TestSpan"
}]
}]
}]
}`

decoder := &ptrace.JSONUnmarshaler{}
data, err := decoder.UnmarshalTraces([]byte(bb))
if err != nil {
panic(err)
}
return data
}

// makeLogsOutput returns ConsumerArguments which will forward logs to the
// provided channel.
func makeLogsOutput(ch chan plog.Logs) *otelcol.ConsumerArguments {
logConsumer := fakeconsumer.Consumer{
ConsumeLogsFunc: func(ctx context.Context, t plog.Logs) error {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- t:
return nil
}
},
}

return &otelcol.ConsumerArguments{
Logs: []otelcol.Consumer{&logConsumer},
}
}

func createTestLogs() plog.Logs {
// Matches format from the protobuf definition:
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/logs/v1/logs.proto
var bb = `{
"resource_logs": [{
"scope_logs": [{
"log_records": [{
"attributes": [{
"key": "foo",
"value": {
"string_value": "bar"
}
}]
}]
}]
}]
}`

decoder := &plog.JSONUnmarshaler{}
data, err := decoder.UnmarshalLogs([]byte(bb))
if err != nil {
panic(err)
}
return data
}
Loading

0 comments on commit 86bf82d

Please sign in to comment.