Skip to content

Commit ccfe7cb

Browse files
authored
Make elasticinframetricsprocessor idempotent (#500)
elasticinframetricsprocessor now sets "otel_remapped" as a scope attribute to prevent duplicate metrics if the processor is used twice in a pipeline.
1 parent a2d6a6e commit ccfe7cb

File tree

4 files changed

+117
-42
lines changed

4 files changed

+117
-42
lines changed

processor/elasticinframetricsprocessor/processor.go

Lines changed: 73 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ package elasticinframetricsprocessor // import "github.com/elastic/opentelemetry
1919

2020
import (
2121
"context"
22+
"iter"
2223

24+
"github.com/elastic/opentelemetry-collector-components/processor/elasticinframetricsprocessor/internal/metadata"
2325
"github.com/elastic/opentelemetry-lib/remappers/common"
2426
"github.com/elastic/opentelemetry-lib/remappers/hostmetrics"
2527
"github.com/elastic/opentelemetry-lib/remappers/kubernetesmetrics"
@@ -34,10 +36,14 @@ const OTelRemappedLabel = common.OTelRemappedLabel
3436
// remapper interface defines the Remap method that should be implemented by different remappers
3537
type remapper interface {
3638
Remap(pmetric.ScopeMetrics, pmetric.MetricSlice, pcommon.Resource)
39+
40+
// Valid returns true if the remapper should be applied to the given scope metrics.
41+
Valid(pmetric.ScopeMetrics) bool
3742
}
3843

3944
type ElasticinframetricsProcessor struct {
4045
cfg *Config
46+
set processor.Settings
4147
logger *zap.Logger
4248
remappers []remapper
4349
}
@@ -52,65 +58,91 @@ func newProcessor(set processor.Settings, cfg *Config) *ElasticinframetricsProce
5258
}
5359
return &ElasticinframetricsProcessor{
5460
cfg: cfg,
61+
set: set,
5562
logger: set.Logger,
5663
remappers: remappers,
5764
}
5865
}
5966

6067
// processMetrics processes the given metrics and applies remappers if configured.
6168
func (p *ElasticinframetricsProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
62-
for resIndex := range md.ResourceMetrics().Len() {
63-
resourceMetric := md.ResourceMetrics().At(resIndex)
64-
rm := resourceMetric.Resource()
65-
for scopeIndex := range resourceMetric.ScopeMetrics().Len() {
66-
scopeMetric := resourceMetric.ScopeMetrics().At(scopeIndex)
69+
for _, resourceMetrics := range md.ResourceMetrics().All() {
70+
for _, scopeMetrics := range resourceMetrics.ScopeMetrics().All() {
6771
for _, r := range p.remappers {
68-
r.Remap(scopeMetric, scopeMetric.Metrics(), rm)
72+
if !r.Valid(scopeMetrics) {
73+
continue
74+
}
75+
p.remapScopeMetrics(r, scopeMetrics, resourceMetrics.Resource())
76+
break // At most one remapper should be applied
6977
}
7078
}
7179
}
72-
// drop_original=true will keep only the metrics that have been remapped based on the presense of OTelRemappedLabel label.
73-
// See https://github.com/elastic/opentelemetry-lib/blob/6d89cbad4221429570107eb4a4968cf8a2ff919f/remappers/common/const.go#L31
74-
if p.cfg.DropOriginal {
75-
newMetic := pmetric.NewMetrics()
76-
for resIndex := range md.ResourceMetrics().Len() {
77-
resourceMetric := md.ResourceMetrics().At(resIndex)
78-
rmNew := newMetic.ResourceMetrics().AppendEmpty()
80+
return md, nil
81+
}
7982

80-
// We need to copy Resource().Attributes() because those inlcude additional attributes of the metrics
81-
resourceMetric.Resource().Attributes().CopyTo(rmNew.Resource().Attributes())
82-
for scopeIndex := range resourceMetric.ScopeMetrics().Len() {
83-
scopeMetric := resourceMetric.ScopeMetrics().At(scopeIndex)
84-
rmScope := rmNew.ScopeMetrics().AppendEmpty()
83+
func (p *ElasticinframetricsProcessor) remapScopeMetrics(
84+
r remapper,
85+
scopeMetrics pmetric.ScopeMetrics,
86+
resource pcommon.Resource,
87+
) {
88+
scope := scopeMetrics.Scope()
89+
if _, ok := scope.Attributes().Get(common.OTelRemappedLabel); ok {
90+
// These metrics have already been remapped.
91+
return
92+
}
93+
scope.Attributes().PutBool(common.OTelRemappedLabel, true)
8594

86-
// Iterate over the metrics
87-
for metricIndex := range scopeMetric.Metrics().Len() {
88-
metric := scopeMetric.Metrics().At(metricIndex)
95+
// Also check if there are any remapped metrics by iterating over the
96+
// metrics, to handle metrics from older versions of the processor that
97+
// do not set scope attributes.
98+
for range remappedMetrics(scopeMetrics.Metrics()) {
99+
// Found remapped metrics.
100+
return
101+
}
89102

90-
if metric.Type() == pmetric.MetricTypeGauge {
91-
for dataPointIndex := range metric.Gauge().DataPoints().Len() {
92-
if oTelRemappedLabel, ok := metric.Gauge().DataPoints().At(dataPointIndex).Attributes().Get(OTelRemappedLabel); ok {
93-
if oTelRemappedLabel.Bool() {
94-
metric.CopyTo(rmScope.Metrics().AppendEmpty())
95-
}
96-
}
97-
}
98-
} else if metric.Type() == pmetric.MetricTypeSum {
99-
for dataPointIndex := range metric.Sum().DataPoints().Len() {
100-
if oTelRemappedLabel, ok := metric.Sum().DataPoints().At(dataPointIndex).Attributes().Get(OTelRemappedLabel); ok {
101-
if oTelRemappedLabel.Bool() {
102-
resourceMetric.Resource().Attributes().CopyTo(rmNew.Resource().Attributes())
103-
metric.CopyTo(rmScope.Metrics().AppendEmpty())
104-
}
105-
}
106-
}
107-
}
103+
result := scopeMetrics.Metrics()
104+
if p.cfg.DropOriginal {
105+
result = pmetric.NewMetricSlice()
106+
}
107+
r.Remap(scopeMetrics, result, resource)
108+
if p.cfg.DropOriginal {
109+
// This overrides the existing metrics with just the remapped ones.
110+
//
111+
// When dropping the original metrics we update the scope name to
112+
// the processor's scope name, since original scope name is no longer
113+
// relevant.
114+
result.CopyTo(scopeMetrics.Metrics())
115+
scope.SetName(metadata.ScopeName)
116+
scope.SetVersion(p.set.BuildInfo.Version)
117+
}
118+
}
108119

120+
func remappedMetrics(ms pmetric.MetricSlice) iter.Seq[pmetric.Metric] {
121+
return func(yield func(pmetric.Metric) bool) {
122+
for _, metric := range ms.All() {
123+
if isRemappedMetric(metric) {
124+
if !yield(metric) {
125+
return
109126
}
110127
}
111128
}
112-
return newMetic, nil
113129
}
130+
}
114131

115-
return md, nil
132+
func isRemappedMetric(metric pmetric.Metric) bool {
133+
switch metric.Type() {
134+
case pmetric.MetricTypeGauge:
135+
for _, dp := range metric.Gauge().DataPoints().All() {
136+
if attr, ok := dp.Attributes().Get(OTelRemappedLabel); ok && attr.Bool() {
137+
return true
138+
}
139+
}
140+
case pmetric.MetricTypeSum:
141+
for _, dp := range metric.Sum().DataPoints().All() {
142+
if attr, ok := dp.Attributes().Get(OTelRemappedLabel); ok && attr.Bool() {
143+
return true
144+
}
145+
}
146+
}
147+
return false
116148
}

processor/elasticinframetricsprocessor/processor_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ import (
2222
"path/filepath"
2323
"testing"
2424

25+
"github.com/elastic/opentelemetry-collector-components/processor/elasticinframetricsprocessor/internal/metadata"
2526
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
2627
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
2728
"github.com/stretchr/testify/assert"
2829
"github.com/stretchr/testify/require"
2930
"go.opentelemetry.io/collector/component"
3031
"go.opentelemetry.io/collector/pdata/pmetric"
3132
"go.opentelemetry.io/collector/processor"
33+
"go.opentelemetry.io/collector/processor/processortest"
3234
"go.uber.org/zap"
3335
)
3436

@@ -159,6 +161,36 @@ func TestProcessMetrics(t *testing.T) {
159161
}
160162
}
161163

164+
func TestProcessMetricsIdempotent(t *testing.T) {
165+
md, err := golden.ReadMetrics("testdata/k8smetrics/input-metrics.yaml")
166+
require.NoError(t, err)
167+
168+
config := &Config{AddK8sMetrics: true}
169+
p := newProcessor(processortest.NewNopSettings(metadata.Type), config)
170+
171+
_, err = p.processMetrics(context.Background(), md)
172+
require.NoError(t, err)
173+
174+
// Check that the processor did its job by looking for one remapped metric.
175+
// TestProcessorK8sMetrics is more comprehensive.
176+
var found bool
177+
for _, metric := range md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().All() {
178+
if metric.Name() == "kubernetes.pod.cpu.usage.limit.pct" {
179+
found = true
180+
break
181+
}
182+
}
183+
require.True(t, found, "remapped metric not found")
184+
185+
// Copy the metrics and process them again.
186+
mdCopy := pmetric.NewMetrics()
187+
md.CopyTo(mdCopy)
188+
189+
_, err = p.processMetrics(context.Background(), md)
190+
require.NoError(t, err)
191+
assert.NoError(t, pmetrictest.CompareMetrics(mdCopy, md))
192+
}
193+
162194
func TestRemappers(t *testing.T) {
163195
testCases := []struct {
164196
name string

processor/elasticinframetricsprocessor/testdata/k8smetrics/output-metrics.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,5 +122,9 @@ resourceMetrics:
122122
timeUnixNano: "1000000"
123123
schemaUrl: https://test-scope-schema.com/schema
124124
scope:
125+
attributes:
126+
- key: otel_remapped
127+
value:
128+
boolValue: true
125129
name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver
126130
version: 1.2.3

processor/elasticinframetricsprocessor/testdata/k8smetrics_drop_original/output-metrics.yaml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ resourceMetrics:
44
- key: k8s.pod.name
55
value:
66
stringValue: test-pod
7+
schemaUrl: https://test-res-schema.com/schema
78
scopeMetrics:
89
- metrics:
910
- gauge:
@@ -114,4 +115,10 @@ resourceMetrics:
114115
value:
115116
stringValue: kubernetes
116117
timeUnixNano: "1000000"
117-
scope: {}
118+
schemaUrl: https://test-scope-schema.com/schema
119+
scope:
120+
attributes:
121+
- key: otel_remapped
122+
value:
123+
boolValue: true
124+
name: github.com/elastic/opentelemetry-collector-components/processor/elasticinframetricsprocessor

0 commit comments

Comments
 (0)