Skip to content

Commit

Permalink
feat(otelcol/exp/prom): add support for resource_to_telemetry_conversion
Browse files Browse the repository at this point in the history
Signed-off-by: hainenber <[email protected]>
  • Loading branch information
hainenber committed Nov 18, 2023
1 parent 950870a commit 418724d
Show file tree
Hide file tree
Showing 6 changed files with 312 additions and 48 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ v0.38.0-rc.0 (2023-11-16)

- Updated windows exporter to use prometheus-community/windows_exporter commit 1836cd1. (@mattdurham)

- Added support to convert resource attributes to Prometheus-compatible datapoint attributes in `otelcol.exporter.prometheus`. (@hainenber)

### Bugfixes

- Set exit code 1 on grafana-agentctl non-runnable command. (@fgouteroux)
Expand Down
50 changes: 38 additions & 12 deletions component/otelcol/exporter/prometheus/internal/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type Options struct {
IncludeScopeLabels bool
// AddMetricSuffixes controls whether suffixes are added to metric names. Defaults to true.
AddMetricSuffixes bool
// ResourceToTelemetryConversion controls whether to convert resource attributes to Prometheus-compatible datapoint attributes
ResourceToTelemetryConversion bool
}

var _ consumer.Metrics = (*Converter)(nil)
Expand Down Expand Up @@ -131,6 +133,7 @@ func (conv *Converter) consumeResourceMetrics(app storage.Appender, rm pmetric.R
Type: textparse.MetricTypeGauge,
Help: "Target metadata",
})
resAttrs := rm.Resource().Attributes()
memResource := conv.getOrCreateResource(rm.Resource())

if conv.getOpts().IncludeTargetInfo {
Expand All @@ -144,7 +147,7 @@ func (conv *Converter) consumeResourceMetrics(app storage.Appender, rm pmetric.R

for smcount := 0; smcount < rm.ScopeMetrics().Len(); smcount++ {
sm := rm.ScopeMetrics().At(smcount)
conv.consumeScopeMetrics(app, memResource, sm)
conv.consumeScopeMetrics(app, memResource, sm, resAttrs)
}
}

Expand Down Expand Up @@ -219,7 +222,7 @@ func (conv *Converter) getOrCreateResource(res pcommon.Resource) *memorySeries {
return entry
}

func (conv *Converter) consumeScopeMetrics(app storage.Appender, memResource *memorySeries, sm pmetric.ScopeMetrics) {
func (conv *Converter) consumeScopeMetrics(app storage.Appender, memResource *memorySeries, sm pmetric.ScopeMetrics, resAttrs pcommon.Map) {
scopeMD := conv.createOrUpdateMetadata("otel_scope_info", metadata.Metadata{
Type: textparse.MetricTypeGauge,
})
Expand All @@ -236,7 +239,7 @@ func (conv *Converter) consumeScopeMetrics(app storage.Appender, memResource *me

for mcount := 0; mcount < sm.Metrics().Len(); mcount++ {
m := sm.Metrics().At(mcount)
conv.consumeMetric(app, memResource, memScope, m)
conv.consumeMetric(app, memResource, memScope, m, resAttrs)
}
}

Expand Down Expand Up @@ -274,20 +277,27 @@ func (conv *Converter) getOrCreateScope(res *memorySeries, scope pcommon.Instrum
return entry
}

func (conv *Converter) consumeMetric(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {
func (conv *Converter) consumeMetric(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric, resAttrs pcommon.Map) {
switch m.Type() {
case pmetric.MetricTypeGauge:
conv.consumeGauge(app, memResource, memScope, m)
conv.consumeGauge(app, memResource, memScope, m, resAttrs)
case pmetric.MetricTypeSum:
conv.consumeSum(app, memResource, memScope, m)
conv.consumeSum(app, memResource, memScope, m, resAttrs)
case pmetric.MetricTypeHistogram:
conv.consumeHistogram(app, memResource, memScope, m)
conv.consumeHistogram(app, memResource, memScope, m, resAttrs)
case pmetric.MetricTypeSummary:
conv.consumeSummary(app, memResource, memScope, m)
conv.consumeSummary(app, memResource, memScope, m, resAttrs)
}
}

func (conv *Converter) consumeGauge(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {
func joinAttributeMaps(from, to pcommon.Map) {
from.Range(func(k string, v pcommon.Value) bool {
v.CopyTo(to.PutEmpty(k))
return true
})
}

func (conv *Converter) consumeGauge(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric, resAttrs pcommon.Map) {
metricName := prometheus.BuildCompliantName(m, "", conv.opts.AddMetricSuffixes)

metricMD := conv.createOrUpdateMetadata(metricName, metadata.Metadata{
Expand All @@ -302,6 +312,10 @@ func (conv *Converter) consumeGauge(app storage.Appender, memResource *memorySer
for dpcount := 0; dpcount < m.Gauge().DataPoints().Len(); dpcount++ {
dp := m.Gauge().DataPoints().At(dpcount)

if conv.getOpts().ResourceToTelemetryConversion {
joinAttributeMaps(resAttrs, dp.Attributes())
}

memSeries := conv.getOrCreateSeries(memResource, memScope, metricName, dp.Attributes())
if err := writeSeries(app, memSeries, dp, getNumberDataPointValue(dp)); err != nil {
level.Error(conv.log).Log("msg", "failed to write metric sample", metricName, "err", err)
Expand Down Expand Up @@ -389,7 +403,7 @@ func getNumberDataPointValue(dp pmetric.NumberDataPoint) float64 {
return 0
}

func (conv *Converter) consumeSum(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {
func (conv *Converter) consumeSum(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric, resAttrs pcommon.Map) {
metricName := prometheus.BuildCompliantName(m, "", conv.opts.AddMetricSuffixes)

// Excerpt from the spec:
Expand Down Expand Up @@ -430,6 +444,10 @@ func (conv *Converter) consumeSum(app storage.Appender, memResource *memorySerie
for dpcount := 0; dpcount < m.Sum().DataPoints().Len(); dpcount++ {
dp := m.Sum().DataPoints().At(dpcount)

if conv.getOpts().ResourceToTelemetryConversion {
joinAttributeMaps(resAttrs, dp.Attributes())
}

memSeries := conv.getOrCreateSeries(memResource, memScope, metricName, dp.Attributes())

val := getNumberDataPointValue(dp)
Expand All @@ -447,7 +465,7 @@ func (conv *Converter) consumeSum(app storage.Appender, memResource *memorySerie
}
}

func (conv *Converter) consumeHistogram(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {
func (conv *Converter) consumeHistogram(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric, resAttrs pcommon.Map) {
metricName := prometheus.BuildCompliantName(m, "", conv.opts.AddMetricSuffixes)

if m.Histogram().AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
Expand All @@ -469,6 +487,10 @@ func (conv *Converter) consumeHistogram(app storage.Appender, memResource *memor
for dpcount := 0; dpcount < m.Histogram().DataPoints().Len(); dpcount++ {
dp := m.Histogram().DataPoints().At(dpcount)

if conv.getOpts().ResourceToTelemetryConversion {
joinAttributeMaps(resAttrs, dp.Attributes())
}

// Sum metric
if dp.HasSum() {
sumMetric := conv.getOrCreateSeries(memResource, memScope, metricName+"_sum", dp.Attributes())
Expand Down Expand Up @@ -606,7 +628,7 @@ func (conv *Converter) convertExemplar(otelExemplar pmetric.Exemplar, ts time.Ti
}
}

func (conv *Converter) consumeSummary(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric) {
func (conv *Converter) consumeSummary(app storage.Appender, memResource *memorySeries, memScope *memorySeries, m pmetric.Metric, resAttrs pcommon.Map) {
metricName := prometheus.BuildCompliantName(m, "", conv.opts.AddMetricSuffixes)

metricMD := conv.createOrUpdateMetadata(metricName, metadata.Metadata{
Expand All @@ -621,6 +643,10 @@ func (conv *Converter) consumeSummary(app storage.Appender, memResource *memoryS
for dpcount := 0; dpcount < m.Summary().DataPoints().Len(); dpcount++ {
dp := m.Summary().DataPoints().At(dpcount)

if conv.getOpts().ResourceToTelemetryConversion {
joinAttributeMaps(resAttrs, dp.Attributes())
}

// Sum metric
{
sumMetric := conv.getOrCreateSeries(memResource, memScope, metricName+"_sum", dp.Attributes())
Expand Down
Loading

0 comments on commit 418724d

Please sign in to comment.