Skip to content

Commit 3b32392

Browse files
committed
Add probabilistic sampler processor
1 parent 0cf05ba commit 3b32392

File tree

8 files changed

+541
-23
lines changed

8 files changed

+541
-23
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ Main (unreleased)
3131
- `discovery.serverset` discovers Serversets stored in Zookeeper. (@thampiotr)
3232
- `discovery.scaleway` discovers scrape targets from Scaleway virtual
3333
instances and bare-metal machines. (@rfratto)
34-
34+
- `otelcol.processor.probabilistic_sampler` - samples logs and traces based on configuration options. (@mar4uk)
35+
3536
- Flow: allow the HTTP server to be configured with TLS in the config file
3637
using the new `http` config block. (@rfratto)
3738

component/all/all.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ import (
7575
_ "github.com/grafana/agent/component/otelcol/processor/batch" // Import otelcol.processor.batch
7676
_ "github.com/grafana/agent/component/otelcol/processor/discovery" // Import otelcol.processor.discovery
7777
_ "github.com/grafana/agent/component/otelcol/processor/memorylimiter" // Import otelcol.processor.memory_limiter
78+
_ "github.com/grafana/agent/component/otelcol/processor/probabilistic_sampler" // Import otelcol.processor.probabilistic_sampler
7879
_ "github.com/grafana/agent/component/otelcol/processor/span" // Import otelcol.processor.span
7980
_ "github.com/grafana/agent/component/otelcol/processor/tail_sampling" // Import otelcol.processor.tail_sampling
8081
_ "github.com/grafana/agent/component/otelcol/receiver/jaeger" // Import otelcol.receiver.jaeger
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Package probabilistic_sampler provides an otelcol.processor.probabilistic_sampler component.
2+
package probabilistic_sampler
3+
4+
import (
5+
"github.com/grafana/agent/component"
6+
"github.com/grafana/agent/component/otelcol"
7+
"github.com/grafana/agent/component/otelcol/processor"
8+
psp "github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor"
9+
otelcomponent "go.opentelemetry.io/collector/component"
10+
otelextension "go.opentelemetry.io/collector/extension"
11+
)
12+
13+
const (
14+
defaultAttributeSource = "traceID"
15+
)
16+
17+
func init() {
18+
component.Register(component.Registration{
19+
Name: "otelcol.processor.probabilistic_sampler",
20+
Args: Arguments{},
21+
Exports: otelcol.ConsumerExports{},
22+
23+
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
24+
fact := psp.NewFactory()
25+
return processor.New(opts, fact, args.(Arguments))
26+
},
27+
})
28+
}
29+
30+
// Arguments configures the otelcol.processor.probabilistic_sampler component.
31+
32+
type Arguments struct {
33+
SamplingPercentage float32 `river:"sampling_percentage,attr,optional"`
34+
HashSeed uint32 `river:"hash_seed,attr,optional"`
35+
AttributeSource string `river:"attribute_source,attr,optional"`
36+
FromAttribute string `river:"from_attribute,attr,optional"`
37+
SamplingPriority string `river:"sampling_priority,attr,optional"`
38+
39+
// Output configures where to send processed data. Required.
40+
Output *otelcol.ConsumerArguments `river:"output,block"`
41+
}
42+
43+
var (
44+
_ processor.Arguments = Arguments{}
45+
)
46+
47+
// DefaultArguments holds default settings for Arguments.
48+
var DefaultArguments = Arguments{
49+
AttributeSource: defaultAttributeSource,
50+
}
51+
52+
// SetToDefault implements river.Defaulter.
53+
func (args *Arguments) SetToDefault() {
54+
*args = DefaultArguments
55+
}
56+
57+
// Validate implements river.Validator.
58+
func (args *Arguments) Validate() error {
59+
cfg, err := args.Convert()
60+
if err != nil {
61+
return err
62+
}
63+
64+
return cfg.(*psp.Config).Validate()
65+
}
66+
67+
// Convert implements processor.Arguments.
68+
func (args Arguments) Convert() (otelcomponent.Config, error) {
69+
return &psp.Config{
70+
SamplingPercentage: args.SamplingPercentage,
71+
HashSeed: args.HashSeed,
72+
AttributeSource: psp.AttributeSource(args.AttributeSource),
73+
FromAttribute: args.FromAttribute,
74+
SamplingPriority: args.SamplingPriority,
75+
}, nil
76+
}
77+
78+
// Extensions implements processor.Arguments.
79+
func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension {
80+
return nil
81+
}
82+
83+
// Exporters implements processor.Arguments.
84+
func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component {
85+
return nil
86+
}
87+
88+
// NextConsumers implements processor.Arguments.
89+
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
90+
return args.Output
91+
}
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
package probabilistic_sampler
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/go-kit/log/level"
9+
"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"
10+
"github.com/grafana/agent/pkg/util"
11+
12+
"github.com/grafana/agent/component/otelcol"
13+
"github.com/grafana/agent/pkg/flow/componenttest"
14+
"github.com/grafana/dskit/backoff"
15+
"github.com/grafana/river"
16+
"github.com/stretchr/testify/require"
17+
"go.opentelemetry.io/collector/pdata/plog"
18+
"go.opentelemetry.io/collector/pdata/ptrace"
19+
)
20+
21+
func TestBadRiverConfigNegativeSamplingRate(t *testing.T) {
22+
exampleBadRiverConfig := `
23+
sampling_percentage = -1
24+
output {
25+
// no-op: will be overridden by test code.
26+
}
27+
`
28+
var args Arguments
29+
require.EqualError(t, river.Unmarshal([]byte(exampleBadRiverConfig), &args), "negative sampling rate: -1.00")
30+
}
31+
32+
func TestBadRiverConfigInvalidAttributeSource(t *testing.T) {
33+
exampleBadRiverConfig := `
34+
sampling_percentage = 0.1
35+
attribute_source = "example"
36+
output {
37+
// no-op: will be overridden by test code.
38+
}
39+
`
40+
var args Arguments
41+
require.EqualError(t, river.Unmarshal([]byte(exampleBadRiverConfig), &args), "invalid attribute source: example. Expected: traceID or record")
42+
}
43+
44+
func TestLogProcessing(t *testing.T) {
45+
exampleSmallConfig := `
46+
sampling_percentage = 100
47+
hash_seed = 123
48+
49+
output {
50+
// no-op: will be overridden by test code.
51+
}
52+
`
53+
ctx := componenttest.TestContext(t)
54+
l := util.TestLogger(t)
55+
56+
ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.probabilistic_sampler")
57+
require.NoError(t, err)
58+
59+
var args Arguments
60+
require.NoError(t, river.Unmarshal([]byte(exampleSmallConfig), &args))
61+
62+
// Override our arguments so logs get forwarded to logsCh.
63+
logsCh := make(chan plog.Logs)
64+
args.Output = makeLogsOutput(logsCh)
65+
66+
go func() {
67+
err := ctrl.Run(ctx, args)
68+
require.NoError(t, err)
69+
}()
70+
71+
require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")
72+
require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")
73+
74+
// Send traces in the background to our processor.
75+
go func() {
76+
exports := ctrl.Exports().(otelcol.ConsumerExports)
77+
78+
exports.Input.Capabilities()
79+
80+
bo := backoff.New(ctx, backoff.Config{
81+
MinBackoff: 10 * time.Millisecond,
82+
MaxBackoff: 100 * time.Millisecond,
83+
})
84+
for bo.Ongoing() {
85+
err := exports.Input.ConsumeLogs(ctx, createTestLogs())
86+
if err != nil {
87+
level.Error(l).Log("msg", "failed to send logs", "err", err)
88+
bo.Wait()
89+
continue
90+
}
91+
92+
return
93+
}
94+
}()
95+
96+
// Wait for our processor to finish and forward data to logCh.
97+
select {
98+
case <-time.After(time.Second * 10):
99+
require.FailNow(t, "failed waiting for logs")
100+
case tr := <-logsCh:
101+
require.Equal(t, 1, tr.LogRecordCount())
102+
}
103+
}
104+
105+
func TestTraceProcessing(t *testing.T) {
106+
exampleSmallConfig := `
107+
sampling_percentage = 100
108+
hash_seed = 123
109+
110+
output {
111+
// no-op: will be overridden by test code.
112+
}
113+
`
114+
ctx := componenttest.TestContext(t)
115+
l := util.TestLogger(t)
116+
117+
ctrl, err := componenttest.NewControllerFromID(l, "otelcol.processor.probabilistic_sampler")
118+
require.NoError(t, err)
119+
120+
var args Arguments
121+
require.NoError(t, river.Unmarshal([]byte(exampleSmallConfig), &args))
122+
123+
// Override our arguments so traces get forwarded to traceCh.
124+
traceCh := make(chan ptrace.Traces)
125+
args.Output = makeTracesOutput(traceCh)
126+
127+
go func() {
128+
err := ctrl.Run(ctx, args)
129+
require.NoError(t, err)
130+
}()
131+
132+
require.NoError(t, ctrl.WaitRunning(time.Second), "component never started")
133+
require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything")
134+
135+
// Send traces in the background to our processor.
136+
go func() {
137+
exports := ctrl.Exports().(otelcol.ConsumerExports)
138+
139+
exports.Input.Capabilities()
140+
141+
bo := backoff.New(ctx, backoff.Config{
142+
MinBackoff: 10 * time.Millisecond,
143+
MaxBackoff: 100 * time.Millisecond,
144+
})
145+
for bo.Ongoing() {
146+
err := exports.Input.ConsumeTraces(ctx, createTestTraces())
147+
if err != nil {
148+
level.Error(l).Log("msg", "failed to send traces", "err", err)
149+
bo.Wait()
150+
continue
151+
}
152+
153+
return
154+
}
155+
}()
156+
157+
// Wait for our processor to finish and forward data to traceCh.
158+
select {
159+
case <-time.After(time.Second * 10):
160+
require.FailNow(t, "failed waiting for traces")
161+
case tr := <-traceCh:
162+
require.Equal(t, 1, tr.SpanCount())
163+
}
164+
}
165+
166+
// makeTracesOutput returns ConsumerArguments which will forward traces to the
167+
// provided channel.
168+
func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments {
169+
traceConsumer := fakeconsumer.Consumer{
170+
ConsumeTracesFunc: func(ctx context.Context, t ptrace.Traces) error {
171+
select {
172+
case <-ctx.Done():
173+
return ctx.Err()
174+
case ch <- t:
175+
return nil
176+
}
177+
},
178+
}
179+
180+
return &otelcol.ConsumerArguments{
181+
Traces: []otelcol.Consumer{&traceConsumer},
182+
}
183+
}
184+
185+
func createTestTraces() ptrace.Traces {
186+
// Matches format from the protobuf definition:
187+
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto
188+
var bb = `{
189+
"resource_spans": [{
190+
"scope_spans": [{
191+
"spans": [{
192+
"name": "TestSpan"
193+
}]
194+
}]
195+
}]
196+
}`
197+
198+
decoder := &ptrace.JSONUnmarshaler{}
199+
data, err := decoder.UnmarshalTraces([]byte(bb))
200+
if err != nil {
201+
panic(err)
202+
}
203+
return data
204+
}
205+
206+
// makeLogsOutput returns ConsumerArguments which will forward logs to the
207+
// provided channel.
208+
func makeLogsOutput(ch chan plog.Logs) *otelcol.ConsumerArguments {
209+
logConsumer := fakeconsumer.Consumer{
210+
ConsumeLogsFunc: func(ctx context.Context, t plog.Logs) error {
211+
select {
212+
case <-ctx.Done():
213+
return ctx.Err()
214+
case ch <- t:
215+
return nil
216+
}
217+
},
218+
}
219+
220+
return &otelcol.ConsumerArguments{
221+
Logs: []otelcol.Consumer{&logConsumer},
222+
}
223+
}
224+
225+
func createTestLogs() plog.Logs {
226+
// Matches format from the protobuf definition:
227+
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/logs/v1/logs.proto
228+
var bb = `{
229+
"resource_logs": [{
230+
"scope_logs": [{
231+
"log_records": [{
232+
"attributes": [{
233+
"key": "foo",
234+
"value": {
235+
"string_value": "bar"
236+
}
237+
}]
238+
}]
239+
}]
240+
}]
241+
}`
242+
243+
decoder := &plog.JSONUnmarshaler{}
244+
data, err := decoder.UnmarshalLogs([]byte(bb))
245+
if err != nil {
246+
panic(err)
247+
}
248+
return data
249+
}

component/otelcol/processor/tail_sampling/tail_sampling_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ func TestBadRiverConfigErrorMode(t *testing.T) {
6262
`
6363

6464
var args Arguments
65-
require.ErrorContains(t, river.Unmarshal([]byte(exampleBadRiverConfig), &args), "\"\" unknown error mode")
65+
err := river.Unmarshal([]byte(exampleBadRiverConfig), &args)
66+
require.ErrorContains(t, err, "\"\" unknown error mode")
6667
}
6768

6869
func TestBadOtelConfig(t *testing.T) {

0 commit comments

Comments
 (0)