Skip to content

Commit

Permalink
Distributor: Enable automatically generated OTel metric name suffixes (
Browse files Browse the repository at this point in the history
…#6542)

* Upgrade github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus to v0.88
* Distributor: Add parameter controlling enabling of OTel metric suffixes

---------

Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 authored Nov 22, 2023
1 parent d857863 commit c79e939
Show file tree
Hide file tree
Showing 16 changed files with 162 additions and 66 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* [FEATURE] Added `-<prefix>.s3.part-size` flag to configure the S3 minimum file size in bytes used for multipart uploads. #6592
* [FEATURE] Add the experimental `-<prefix>.s3.send-content-md5` flag (defaults to `false`) to configure S3 Put Object requests to send a `Content-MD5` header. Setting this flag is not recommended unless your object storage does not support checksums. #6622
* [FEATURE] Distributor: add an experimental flag `-distributor.reusable-ingester-push-worker` that can be used to pre-allocate a pool of workers to be used to send push requests to the ingesters. #6660
* [FEATURE] Distributor: Support enabling of automatically generated name suffixes for metrics ingested via OTLP, through the flag `-distributor.otel-metric-suffixes-enabled`. #6542
* [ENHANCEMENT] Ingester: exported summary `cortex_ingester_inflight_push_requests_summary` tracking total number of inflight requests in percentile buckets. #5845
* [ENHANCEMENT] Query-scheduler: add `cortex_query_scheduler_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request. #5879
* [ENHANCEMENT] Query-frontend: add `cortex_query_frontend_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request. When query-scheduler is in use, the metric has the `scheduler_address` label to differentiate the enqueue duration by query-scheduler backend. #5879 #6087 #6120
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -3938,6 +3938,17 @@
"fieldDefaultValue": 0,
"fieldFlag": "alertmanager.max-alerts-size-bytes",
"fieldType": "int"
},
{
"kind": "field",
"name": "otel_metric_suffixes_enabled",
"required": false,
"desc": "Whether to enable automatic suffixes to names of metrics ingested through OTLP.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "distributor.otel-metric-suffixes-enabled",
"fieldType": "boolean",
"fieldCategory": "advanced"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Use experimental method of limiting push requests.
-distributor.max-recv-msg-size int
Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected. (default 104857600)
-distributor.otel-metric-suffixes-enabled
Whether to enable automatic suffixes to names of metrics ingested through OTLP.
-distributor.remote-timeout duration
Timeout for downstream ingesters. (default 2s)
-distributor.request-burst-size int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ exporters:
otlphttp:
endpoint: http://distributor-1:8000/otlp


processors:
batch: {}
receivers:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ std.manifestYamlDoc({

otel_collector:: {
otel_collector: {
image: 'otel/opentelemetry-collector-contrib:0.54.0',
image: 'otel/opentelemetry-collector-contrib:0.88.0',
command: ['--config=/etc/otel-collector/otel-collector.yaml'],
volumes: ['./config:/etc/otel-collector'],
ports: ['8083:8083'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3339,6 +3339,11 @@ The `limits` block configures default and per-tenant limits imposed by component
# alerts will fail with a log message and metric increment. 0 = no limit.
# CLI flag: -alertmanager.max-alerts-size-bytes
[alertmanager_max_alerts_size_bytes: <int> | default = 0]
# (advanced) Whether to enable automatic suffixes to names of metrics ingested
# through OTLP.
# CLI flag: -distributor.otel-metric-suffixes-enabled
[otel_metric_suffixes_enabled: <boolean> | default = false]
```

### blocks_storage
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/hashicorp/vault/api v1.10.0
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.84.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.88.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.84.0
github.com/prometheus/procfs v0.12.0
github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98
Expand Down Expand Up @@ -226,7 +226,7 @@ require (
go.etcd.io/etcd/client/v3 v3.5.4 // indirect
go.mongodb.org/mongo-driver v1.12.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0017 // indirect
go.opentelemetry.io/collector/semconv v0.88.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -793,12 +793,12 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v1.24.0 h1:+0glovB9Jd6z3VR+ScSwQqXVTIfJcGA9UBM8yzQxhqg=
github.com/onsi/gomega v1.24.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.84.0 h1:Z6mmJnj4t47FLifxesZsuGN9mZRCT6pUY5Y5g8SWJkw=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.84.0/go.mod h1:6XJ+dV/QjwFJf55/2nnVfVqA+qwa8Y/iYR6taPJN7n0=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.88.0 h1:ornGkT2YBY/8W4kcVnErFehd6NUHqUW8g36DG7+3tCQ=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.88.0/go.mod h1:l2gdVngRvmSczRunw8WWun/mmkUkLuDSua2cWzalqrM=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.84.0 h1:EMdLjuWUpbkbYQ/lEg+qZk3lGEG2iPiU+t6g7g/J/5s=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.84.0/go.mod h1:sEq5Vn7ZiuT79tcx/CxDFyhrx1f2jV7kMQhOpdVTAa0=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.84.0 h1:uYYn5SIejB+AZ7D1U/uqArifD4HfDBHCz7EkFNmVYn0=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.84.0/go.mod h1:tfmxciJw1uQ7eeaDwqCp9RMUeWEQfIs74mVj6FW+FYQ=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.88.0 h1:RtlhAoPl9sAWMZza7QfzsjdRRr5cVjwSnxlxbeJykic=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.88.0/go.mod h1:CtmR5BAaQTW9j2q97lZqIGv/KapnoeqTa/ppHNd8+yY=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.84.0 h1:EuRz/++XprYiwsmIYcbn/DLrrCySdmObPPUURLcQzcE=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.84.0/go.mod h1:fwGawLbxB1Hdmh9Vy3WEE6AJLAPCVBmM8RRsXW79JHw=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
Expand Down Expand Up @@ -995,8 +995,8 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014 h1:C9o0mbP0MyygqFnKueVQK/v9jef6zvuttmTGlKaqhgw=
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014/go.mod h1:0mE3mDLmUrOXVoNsuvj+7dV14h/9HFl/Fy9YTLoLObo=
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0017 h1:DtJQalPXMWQqT6jd2LZ1oKrOfLJJRCi+rh2LKnkj4Zo=
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0017/go.mod h1:fLmJMf1AoHttkF8p5oJAc4o5ZpHu8yO5XYJ7gbLCLzo=
go.opentelemetry.io/collector/pdata v1.0.0-rcv0018 h1:a2IHOZKphRzPagcvOHQHHUE0DlITFSKlIBwaWhPZpl4=
go.opentelemetry.io/collector/pdata v1.0.0-rcv0018/go.mod h1:oNIcTRyEJYIfMcRYyyh5lquDU0Vl+ktTL6ka+p+dYvg=
go.opentelemetry.io/collector/semconv v0.88.0 h1:8TVP4hYaUC87S6CCLKNoSxsUE0ChldE4vqotvNHHUnE=
Expand Down
55 changes: 41 additions & 14 deletions integration/otlp_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package integration

import (
"fmt"
"io"
"testing"
"time"
Expand All @@ -21,6 +22,18 @@ import (
)

func TestOTLPIngestion(t *testing.T) {
t.Run("enabling OTel suffixes", func(t *testing.T) {
testOTLPIngestion(t, true)
})

t.Run("disabling OTel suffixes", func(t *testing.T) {
testOTLPIngestion(t, false)
})
}

func testOTLPIngestion(t *testing.T, enableSuffixes bool) {
t.Helper()

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()
Expand All @@ -34,12 +47,17 @@ func TestOTLPIngestion(t *testing.T) {

// Start Mimir in single binary mode, reading the config from file and overwriting
// the backend config to make it work with Minio.
enable := "false"
if enableSuffixes {
enable = "true"
}
flags := mergeFlags(
DefaultSingleBinaryFlags(),
BlocksStorageFlags(),
BlocksStorageS3Flags(),
map[string]string{
"-distributor.enable-otlp-metadata-storage": "true",
"-distributor.otel-metric-suffixes-enabled": enable,
},
)

Expand All @@ -49,13 +67,22 @@ func TestOTLPIngestion(t *testing.T) {
c, err := e2emimir.NewClient(mimir.HTTPEndpoint(), mimir.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

sfx := ""
if enableSuffixes {
sfx = "_bytes"
}

// Push some series to Mimir.
now := time.Now()
series, expectedVector, expectedMatrix := generateFloatSeries("series_1", now, prompb.Label{Name: "foo", Value: "bar"})
// Fix up the expectation wrt. suffix
for _, s := range expectedVector {
s.Metric[model.LabelName("__name__")] = model.LabelValue(fmt.Sprintf("series_1%s", sfx))
}
metadata := []mimirpb.MetricMetadata{
{
Help: "foo",
Unit: "foo",
Unit: "By",
},
}

Expand All @@ -64,7 +91,7 @@ func TestOTLPIngestion(t *testing.T) {
require.Equal(t, 200, res.StatusCode)

// Query the series.
result, err := c.Query("series_1", now)
result, err := c.Query(fmt.Sprintf("series_1%s", sfx), now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
Expand All @@ -77,7 +104,7 @@ func TestOTLPIngestion(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []string{"__name__", "foo"}, labelNames)

rangeResult, err := c.QueryRange("series_1", now.Add(-15*time.Minute), now, 15*time.Second)
rangeResult, err := c.QueryRange(fmt.Sprintf("series_1%s", sfx), now.Add(-15*time.Minute), now, 15*time.Second)
require.NoError(t, err)
require.Equal(t, model.ValMatrix, rangeResult.Type())
require.Equal(t, expectedMatrix, rangeResult.(model.Matrix))
Expand All @@ -90,20 +117,20 @@ func TestOTLPIngestion(t *testing.T) {
metadataResponseBody, err := io.ReadAll(metadataResult.Body)
require.NoError(t, err)

expectedJSON := `
expectedJSON := fmt.Sprintf(`
{
"status":"success",
"data":{
"series_1":[
"series_1%s":[
{
"type":"gauge",
"help":"foo",
"unit":"foo"
"unit":"By"
}
]
}
}
`
`, sfx)

require.JSONEq(t, expectedJSON, string(metadataResponseBody))

Expand All @@ -113,7 +140,7 @@ func TestOTLPIngestion(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

result, err = c.Query("series", now)
result, err = c.Query(fmt.Sprintf("series%s", sfx), now)
require.NoError(t, err)

want := expectedVector[0]
Expand All @@ -124,27 +151,27 @@ func TestOTLPIngestion(t *testing.T) {
// till https://github.com/open-telemetry/opentelemetry-proto/pull/441 is released. That is only
// to test setup logic

expectedJSON = `
expectedJSON = fmt.Sprintf(`
{
"status":"success",
"data":{
"series":[
"series%s":[
{
"type":"histogram",
"help":"foo",
"unit":"foo"
"unit":"By"
}
],
"series_1":[
"series_1%s":[
{
"type":"gauge",
"help":"foo",
"unit":"foo"
"unit":"By"
}
]
}
}
`
`, sfx, sfx)

metadataResult, err = c.GetPrometheusMetadata()
require.NoError(t, err)
Expand Down
33 changes: 17 additions & 16 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func OTLPHandler(
) http.Handler {
discardedDueToOtelParseError := validation.DiscardedSamplesCounter(reg, otelParseError)

return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, retryCfg, push, logger, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, dst []byte, req *mimirpb.PreallocWriteRequest, logger log.Logger) ([]byte, error) {
return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, retryCfg, push, logger, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, _ []byte, req *mimirpb.PreallocWriteRequest, logger log.Logger) ([]byte, error) {
var decoderFunc func(buf []byte) (pmetricotlp.ExportRequest, error)

contentType := r.Header.Get("Content-Type")
Expand Down Expand Up @@ -127,7 +127,13 @@ func OTLPHandler(

level.Debug(spanLogger).Log("msg", "decoding complete, starting conversion")

metrics, err := otelMetricsToTimeseries(ctx, discardedDueToOtelParseError, logger, otlpReq.Metrics())
tenantID, err := tenant.TenantID(ctx)
if err != nil {
return body, err
}
addSuffixes := limits.OTelMetricSuffixesEnabled(tenantID)

metrics, err := otelMetricsToTimeseries(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics())
if err != nil {
return body, err
}
Expand All @@ -154,7 +160,7 @@ func OTLPHandler(
req.Timeseries = metrics

if enableOtelMetadataStorage {
metadata := otelMetricsToMetadata(otlpReq.Metrics())
metadata := otelMetricsToMetadata(addSuffixes, otlpReq.Metrics())
req.Metadata = metadata
}

Expand Down Expand Up @@ -182,7 +188,7 @@ func otelMetricTypeToMimirMetricType(otelMetric pmetric.Metric) mimirpb.MetricMe
return mimirpb.UNKNOWN
}

func otelMetricsToMetadata(md pmetric.Metrics) []*mimirpb.MetricMetadata {
func otelMetricsToMetadata(addSuffixes bool, md pmetric.Metrics) []*mimirpb.MetricMetadata {
resourceMetricsSlice := md.ResourceMetrics()

metadataLength := 0
Expand All @@ -193,7 +199,7 @@ func otelMetricsToMetadata(md pmetric.Metrics) []*mimirpb.MetricMetadata {
}
}

var metadata = make([]*mimirpb.MetricMetadata, 0, metadataLength)
metadata := make([]*mimirpb.MetricMetadata, 0, metadataLength)
for i := 0; i < resourceMetricsSlice.Len(); i++ {
scopeMetricsSlice := resourceMetricsSlice.At(i).ScopeMetrics()
for j := 0; j < scopeMetricsSlice.Len(); j++ {
Expand All @@ -202,7 +208,7 @@ func otelMetricsToMetadata(md pmetric.Metrics) []*mimirpb.MetricMetadata {
metric := scopeMetrics.Metrics().At(k)
entry := mimirpb.MetricMetadata{
Type: otelMetricTypeToMimirMetricType(metric),
MetricFamilyName: prometheustranslator.BuildCompliantName(metric, "", true), // TODO expose addMetricSuffixes in configuration (https://github.com/grafana/mimir/issues/5967)
MetricFamilyName: prometheustranslator.BuildCompliantName(metric, "", addSuffixes),
Help: metric.Description(),
Unit: metric.Unit(),
}
Expand All @@ -212,20 +218,15 @@ func otelMetricsToMetadata(md pmetric.Metrics) []*mimirpb.MetricMetadata {
}

return metadata

}

func otelMetricsToTimeseries(ctx context.Context, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
tsMap, errs := prometheusremotewrite.FromMetrics(md, prometheusremotewrite.Settings{})

func otelMetricsToTimeseries(tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
tsMap, errs := prometheusremotewrite.FromMetrics(md, prometheusremotewrite.Settings{
AddMetricSuffixes: addSuffixes,
})
if errs != nil {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

dropped := len(multierr.Errors(errs))
discardedDueToOtelParseError.WithLabelValues(userID, "").Add(float64(dropped)) // Group is empty here as metrics couldn't be parsed
discardedDueToOtelParseError.WithLabelValues(tenantID, "").Add(float64(dropped)) // Group is empty here as metrics couldn't be parsed

parseErrs := errs.Error()
if len(parseErrs) > maxErrMsgLen {
Expand Down
Loading

0 comments on commit c79e939

Please sign in to comment.