Skip to content

Commit

Permalink
Add feature flag --distributor.direct-otlp-translation-enabled
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed May 30, 2024
1 parent 0cd4b2b commit 488e83a
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 48 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* [ENHANCEMENT] Store-gateway: improve performance when streaming chunks to queriers is enabled (`-querier.prefer-streaming-chunks-from-store-gateways=true`) and the query selects fewer than `-blocks-storage.bucket-store.batch-series-size` series (defaults to 5000 series). #8039
* [ENHANCEMENT] Ingester: active series are now updated along with owned series. They decrease when series change ownership between ingesters. This helps provide a more accurate total of active series when ingesters are added. This is only enabled when `-ingester.track-ingester-owned-series` or `-ingester.use-ingester-owned-series-for-limits` are enabled. #8084
* [ENHANCEMENT] Query-frontend: include route name in query stats log lines. #8191
* [ENHANCEMENT] OTLP: Speed up conversion from OTel to Mimir format by about 8% and reduce memory consumption by about 30%. #7957
* [ENHANCEMENT] OTLP: Speed up conversion from OTel to Mimir format by about 8% and reduce memory consumption by about 30%. Can be disabled via `-distributor.direct-otlp-translation-enabled=false` #7957
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
* [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,17 @@
"fieldFlag": "distributor.reusable-ingester-push-workers",
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "direct_otlp_translation_enabled",
"required": false,
"desc": "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "distributor.direct-otlp-translation-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,8 @@ Usage of ./cmd/mimir/mimir:
Fraction of mutex contention events that are reported in the mutex profile. On average 1/rate events are reported. 0 to disable.
-distributor.client-cleanup-period duration
How frequently to clean up clients for ingesters that have gone away. (default 15s)
-distributor.direct-otlp-translation-enabled
[experimental] When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance. (default true)
-distributor.drop-label string
This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.
-distributor.enable-otlp-metadata-storage
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ The following features are currently experimental:
- `-distributor.max-exemplars-per-series-per-request`
- Enforce a maximum pool buffer size for write requests
- `-distributor.max-request-pool-buffer-size`
- Enable direct translation from OTLP write requests to Mimir equivalents
- `-distributor.direct-otlp-translation-enabled`
- Hash ring
- Disabling ring heartbeat timeouts
- `-distributor.ring.heartbeat-timeout=0`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,11 @@ instance_limits:
# limiting feature.)
# CLI flag: -distributor.reusable-ingester-push-workers
[reusable_ingester_push_workers: <int> | default = 2000]
# (experimental) When enabled, OTLP write requests are directly translated to
# Mimir equivalents, for optimum performance.
# CLI flag: -distributor.direct-otlp-translation-enabled
[direct_otlp_translation_enabled: <boolean> | default = true]
```

### ingester
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger, pushConfig.DirectOTLPTranslationEnabled), true, false, "POST")

a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ type Config struct {
WriteRequestsBufferPoolingEnabled bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"`
LimitInflightRequestsUsingGrpcMethodLimiter bool `yaml:"limit_inflight_requests_using_grpc_method_limiter" category:"deprecated"` // TODO Remove the configuration option in Mimir 2.14, keeping the same behavior as if it's enabled
ReusableIngesterPushWorkers int `yaml:"reusable_ingester_push_workers" category:"advanced"`

// DirectOTLPTranslationEnabled allows reverting to the older way of translating from OTLP write requests via Prometheus, in case of problems.
DirectOTLPTranslationEnabled bool `yaml:"direct_otlp_translation_enabled" category:"experimental"`
}

// PushWrapper wraps around a push. It is similar to middleware.Interface.
Expand All @@ -237,6 +240,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.")
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "distributor.limit-inflight-requests-using-grpc-method-limiter", true, "When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed.")
f.IntVar(&cfg.ReusableIngesterPushWorkers, "distributor.reusable-ingester-push-workers", 2000, "Number of pre-allocated workers used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned. (Note: this is a performance optimization, not a limiting feature.)")
f.BoolVar(&cfg.DirectOTLPTranslationEnabled, "distributor.direct-otlp-translation-enabled", true, "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.")

cfg.DefaultLimits.RegisterFlags(f)
}
Expand Down
132 changes: 129 additions & 3 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
Expand Down Expand Up @@ -53,6 +54,7 @@ func OTLPHandler(
pushMetrics *PushMetrics,
reg prometheus.Registerer,
logger log.Logger,
directTranslation bool,
) http.Handler {
discardedDueToOtelParseError := validation.DiscardedSamplesCounter(reg, otelParseError)

Expand Down Expand Up @@ -154,9 +156,17 @@ func OTLPHandler(
pushMetrics.IncOTLPRequest(tenantID)
pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize))

metrics, err := otelMetricsToTimeseries(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics())
if err != nil {
return err
var metrics []mimirpb.PreallocTimeseries
if directTranslation {
metrics, err = otelMetricsToTimeseries(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics())
if err != nil {
return err
}
} else {
metrics, err = otelMetricsToTimeseriesOld(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics())
if err != nil {
return err
}
}

metricCount := len(metrics)
Expand Down Expand Up @@ -283,6 +293,122 @@ func otelMetricsToTimeseries(tenantID string, addSuffixes bool, discardedDueToOt
return mimirTS, nil
}

// Old, less efficient, version of otelMetricsToTimeseries.
func otelMetricsToTimeseriesOld(tenantID string, addSuffixes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
converter := prometheusremotewrite.NewPrometheusConverter()
errs := converter.FromMetrics(md, prometheusremotewrite.Settings{
AddMetricSuffixes: addSuffixes,
})
promTS := converter.TimeSeries()
if errs != nil {
dropped := len(multierr.Errors(errs))
discardedDueToOtelParseError.WithLabelValues(tenantID, "").Add(float64(dropped)) // Group is empty here as metrics couldn't be parsed

parseErrs := errs.Error()
if len(parseErrs) > maxErrMsgLen {
parseErrs = parseErrs[:maxErrMsgLen]
}

if len(promTS) == 0 {
return nil, errors.New(parseErrs)
}

level.Warn(logger).Log("msg", "OTLP parse error", "err", parseErrs)
}

mimirTS := mimirpb.PreallocTimeseriesSliceFromPool()
for _, ts := range promTS {
mimirTS = append(mimirTS, promToMimirTimeseries(&ts))
}

return mimirTS, nil
}

func promToMimirTimeseries(promTs *prompb.TimeSeries) mimirpb.PreallocTimeseries {
labels := make([]mimirpb.LabelAdapter, 0, len(promTs.Labels))
for _, label := range promTs.Labels {
labels = append(labels, mimirpb.LabelAdapter{
Name: label.Name,
Value: label.Value,
})
}

samples := make([]mimirpb.Sample, 0, len(promTs.Samples))
for _, sample := range promTs.Samples {
samples = append(samples, mimirpb.Sample{
TimestampMs: sample.Timestamp,
Value: sample.Value,
})
}

histograms := make([]mimirpb.Histogram, 0, len(promTs.Histograms))
for idx := range promTs.Histograms {
histograms = append(histograms, promToMimirHistogram(&promTs.Histograms[idx]))
}

exemplars := make([]mimirpb.Exemplar, 0, len(promTs.Exemplars))
for _, exemplar := range promTs.Exemplars {
labels := make([]mimirpb.LabelAdapter, 0, len(exemplar.Labels))
for _, label := range exemplar.Labels {
labels = append(labels, mimirpb.LabelAdapter{
Name: label.Name,
Value: label.Value,
})
}

exemplars = append(exemplars, mimirpb.Exemplar{
Labels: labels,
Value: exemplar.Value,
TimestampMs: exemplar.Timestamp,
})
}

ts := mimirpb.TimeseriesFromPool()
ts.Labels = labels
ts.Samples = samples
ts.Histograms = histograms
ts.Exemplars = exemplars

return mimirpb.PreallocTimeseries{TimeSeries: ts}
}

func promToMimirHistogram(h *prompb.Histogram) mimirpb.Histogram {
pSpans := make([]mimirpb.BucketSpan, 0, len(h.PositiveSpans))
for _, span := range h.PositiveSpans {
pSpans = append(
pSpans, mimirpb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
},
)
}
nSpans := make([]mimirpb.BucketSpan, 0, len(h.NegativeSpans))
for _, span := range h.NegativeSpans {
nSpans = append(
nSpans, mimirpb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
},
)
}

return mimirpb.Histogram{
Count: &mimirpb.Histogram_CountInt{CountInt: h.GetCountInt()},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()},
NegativeSpans: nSpans,
NegativeDeltas: h.NegativeDeltas,
NegativeCounts: h.NegativeCounts,
PositiveSpans: pSpans,
PositiveDeltas: h.PositiveDeltas,
PositiveCounts: h.PositiveCounts,
Timestamp: h.Timestamp,
ResetHint: mimirpb.Histogram_ResetHint(h.ResetHint),
}
}

// TimeseriesToOTLPRequest is used in tests.
func TimeseriesToOTLPRequest(timeseries []prompb.TimeSeries, metadata []mimirpb.MetricMetadata) pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func BenchmarkOTLPHandler(b *testing.B) {
validation.NewMockTenantLimits(map[string]*validation.Limits{}),
)
require.NoError(b, err)
handler := OTLPHandler(100000, nil, nil, false, true, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger())
handler := OTLPHandler(100000, nil, nil, false, true, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger(), true)

b.Run("protobuf", func(b *testing.B) {
req := createOTLPProtoRequest(b, exportReq, false)
Expand Down
47 changes: 5 additions & 42 deletions pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func TestHandlerOTLPPush(t *testing.T) {
t.Cleanup(pushReq.CleanUp)
return tt.verifyFunc(t, pushReq)
}
handler := OTLPHandler(tt.maxMsgSize, nil, nil, false, tt.enableOtelMetadataStorage, limits, RetryConfig{}, pusher, nil, nil, log.NewNopLogger())
handler := OTLPHandler(tt.maxMsgSize, nil, nil, false, tt.enableOtelMetadataStorage, limits, RetryConfig{}, pusher, nil, nil, log.NewNopLogger(), true)

resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestHandler_otlpDroppedMetricsPanic(t *testing.T) {
assert.False(t, request.SkipLabelNameValidation)
pushReq.CleanUp()
return nil
}, nil, nil, log.NewNopLogger())
}, nil, nil, log.NewNopLogger(), true)
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}
Expand Down Expand Up @@ -414,7 +414,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {
assert.Len(t, request.Timeseries, 1)
assert.False(t, request.SkipLabelNameValidation)
return nil
}, nil, nil, log.NewNopLogger())
}, nil, nil, log.NewNopLogger(), true)
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)

Expand All @@ -440,7 +440,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {
assert.Len(t, request.Timeseries, 9) // 6 buckets (including +Inf) + 2 sum/count + 2 from the first case
assert.False(t, request.SkipLabelNameValidation)
return nil
}, nil, nil, log.NewNopLogger())
}, nil, nil, log.NewNopLogger(), true)
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}
Expand All @@ -461,7 +461,7 @@ func TestHandler_otlpWriteRequestTooBigWithCompression(t *testing.T) {

resp := httptest.NewRecorder()

handler := OTLPHandler(140, nil, nil, false, true, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger())
handler := OTLPHandler(140, nil, nil, false, true, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger(), true)
handler.ServeHTTP(resp, req)
assert.Equal(t, http.StatusRequestEntityTooLarge, resp.Code)
body, err := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -662,43 +662,6 @@ func createPrometheusRemoteWriteProtobuf(t testing.TB) []byte {
return inputBytes
}

func promToMimirHistogram(h *prompb.Histogram) mimirpb.Histogram {
pSpans := make([]mimirpb.BucketSpan, 0, len(h.PositiveSpans))
for _, span := range h.PositiveSpans {
pSpans = append(
pSpans, mimirpb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
},
)
}
nSpans := make([]mimirpb.BucketSpan, 0, len(h.NegativeSpans))
for _, span := range h.NegativeSpans {
nSpans = append(
nSpans, mimirpb.BucketSpan{
Offset: span.Offset,
Length: span.Length,
},
)
}

return mimirpb.Histogram{
Count: &mimirpb.Histogram_CountInt{CountInt: h.GetCountInt()},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &mimirpb.Histogram_ZeroCountInt{ZeroCountInt: h.GetZeroCountInt()},
NegativeSpans: nSpans,
NegativeDeltas: h.NegativeDeltas,
NegativeCounts: h.NegativeCounts,
PositiveSpans: pSpans,
PositiveDeltas: h.PositiveDeltas,
PositiveCounts: h.PositiveCounts,
Timestamp: h.Timestamp,
ResetHint: mimirpb.Histogram_ResetHint(h.ResetHint),
}
}

func createMimirWriteRequestProtobuf(t *testing.T, skipLabelNameValidation bool) []byte {
t.Helper()
h := remote.HistogramToHistogramProto(1337, test.GenerateTestHistogram(1))
Expand Down

0 comments on commit 488e83a

Please sign in to comment.