Skip to content

Commit

Permalink
otelcolconvert: support converting filterprocessor (#6479)
Browse files Browse the repository at this point in the history
Signed-off-by: Paschalis Tsilias <[email protected]>
  • Loading branch information
tpaschalis authored Feb 22, 2024
1 parent c00b19c commit c5bfe59
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 9 deletions.
6 changes: 3 additions & 3 deletions component/otelcol/processor/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func init() {
type Arguments struct {
// ErrorMode determines how the processor reacts to errors that occur while processing a statement.
ErrorMode ottl.ErrorMode `river:"error_mode,attr,optional"`
Traces traceConfig `river:"traces,block,optional"`
Metrics metricConfig `river:"metrics,block,optional"`
Logs logConfig `river:"logs,block,optional"`
Traces TraceConfig `river:"traces,block,optional"`
Metrics MetricConfig `river:"metrics,block,optional"`
Logs LogConfig `river:"logs,block,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `river:"output,block"`
Expand Down
12 changes: 6 additions & 6 deletions component/otelcol/processor/filter/types.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package filter

type traceConfig struct {
type TraceConfig struct {
Span []string `river:"span,attr,optional"`
SpanEvent []string `river:"spanevent,attr,optional"`
}

type metricConfig struct {
type MetricConfig struct {
Metric []string `river:"metric,attr,optional"`
Datapoint []string `river:"datapoint,attr,optional"`
}
type logConfig struct {
type LogConfig struct {
LogRecord []string `river:"log_record,attr,optional"`
}

func (args *traceConfig) convert() map[string]interface{} {
func (args *TraceConfig) convert() map[string]interface{} {
if args == nil {
return nil
}
Expand All @@ -29,7 +29,7 @@ func (args *traceConfig) convert() map[string]interface{} {
return result
}

func (args *metricConfig) convert() map[string]interface{} {
func (args *MetricConfig) convert() map[string]interface{} {
if args == nil {
return nil
}
Expand All @@ -45,7 +45,7 @@ func (args *metricConfig) convert() map[string]interface{} {
return result
}

func (args *logConfig) convert() map[string]interface{} {
func (args *LogConfig) convert() map[string]interface{} {
if args == nil {
return nil
}
Expand Down
71 changes: 71 additions & 0 deletions converter/internal/otelcolconvert/converter_filterprocessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package otelcolconvert

import (
"fmt"

"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/processor/filter"
"github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/converter/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor"
"go.opentelemetry.io/collector/component"
)

func init() {
converters = append(converters, filterProcessorConverter{})
}

type filterProcessorConverter struct{}

func (filterProcessorConverter) Factory() component.Factory {
return filterprocessor.NewFactory()
}

func (filterProcessorConverter) InputComponentName() string {
return "otelcol.processor.filter"
}

func (filterProcessorConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()

args := toFilterProcessor(state, id, cfg.(*filterprocessor.Config))
block := common.NewBlockWithOverride([]string{"otelcol", "processor", "filter"}, label, args)

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toFilterProcessor(state *state, id component.InstanceID, cfg *filterprocessor.Config) *filter.Arguments {
var (
nextMetrics = state.Next(id, component.DataTypeMetrics)
nextLogs = state.Next(id, component.DataTypeLogs)
nextTraces = state.Next(id, component.DataTypeTraces)
)

return &filter.Arguments{
ErrorMode: cfg.ErrorMode,
Traces: filter.TraceConfig{
Span: cfg.Traces.SpanConditions,
SpanEvent: cfg.Traces.SpanEventConditions,
},
Metrics: filter.MetricConfig{
Metric: cfg.Metrics.MetricConditions,
Datapoint: cfg.Metrics.DataPointConditions,
},
Logs: filter.LogConfig{
LogRecord: cfg.Logs.LogConditions,
},
Output: &otelcol.ConsumerArguments{
Metrics: toTokenizedConsumers(nextMetrics),
Logs: toTokenizedConsumers(nextLogs),
Traces: toTokenizedConsumers(nextTraces),
},
}
}
41 changes: 41 additions & 0 deletions converter/internal/otelcolconvert/testdata/filter.river
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
otelcol.receiver.otlp "default" {
grpc { }

http { }

output {
metrics = [otelcol.processor.filter.default_ottl.input]
logs = [otelcol.processor.filter.default_ottl.input]
traces = [otelcol.processor.filter.default_ottl.input]
}
}

otelcol.processor.filter "default_ottl" {
error_mode = "ignore"

traces {
span = ["attributes[\"container.name\"] == \"app_container_1\"", "resource.attributes[\"host.name\"] == \"localhost\"", "name == \"app_3\""]
spanevent = ["attributes[\"grpc\"] == true", "IsMatch(name, \".*grpc.*\")"]
}

metrics {
metric = ["name == \"my.metric\" and resource.attributes[\"my_label\"] == \"abc123\"", "type == METRIC_DATA_TYPE_HISTOGRAM"]
datapoint = ["metric.type == METRIC_DATA_TYPE_SUMMARY", "resource.attributes[\"service.name\"] == \"my_service_name\""]
}

logs {
log_record = ["IsMatch(body, \".*password.*\")", "severity_number < SEVERITY_NUMBER_WARN"]
}

output {
metrics = [otelcol.exporter.otlp.default.input]
logs = [otelcol.exporter.otlp.default.input]
traces = [otelcol.exporter.otlp.default.input]
}
}

otelcol.exporter.otlp "default" {
client {
endpoint = "database:4317"
}
}
53 changes: 53 additions & 0 deletions converter/internal/otelcolconvert/testdata/filter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
receivers:
otlp:
protocols:
grpc:
http:

exporters:
otlp:
# Our defaults have drifted from upstream, so we explicitly set our
# defaults below (balancer_name and queue_size).
endpoint: database:4317
balancer_name: pick_first
sending_queue:
queue_size: 5000

processors:
filter/ottl:
error_mode: ignore
metrics:
metric:
- 'name == "my.metric" and resource.attributes["my_label"] == "abc123"'
- 'type == METRIC_DATA_TYPE_HISTOGRAM'
datapoint:
- 'metric.type == METRIC_DATA_TYPE_SUMMARY'
- 'resource.attributes["service.name"] == "my_service_name"'
logs:
log_record:
- 'IsMatch(body, ".*password.*")'
- 'severity_number < SEVERITY_NUMBER_WARN'
traces:
span:
- 'attributes["container.name"] == "app_container_1"'
- 'resource.attributes["host.name"] == "localhost"'
- 'name == "app_3"'
spanevent:
- 'attributes["grpc"] == true'
- 'IsMatch(name, ".*grpc.*")'

service:
pipelines:
metrics:
receivers: [otlp]
processors: [filter/ottl]
exporters: [otlp]
logs:
receivers: [otlp]
processors: [filter/ottl]
exporters: [otlp]
traces:
receivers: [otlp]
processors: [filter/ottl]
exporters: [otlp]

0 comments on commit c5bfe59

Please sign in to comment.