Skip to content

Commit

Permalink
OTLP: add CLI flag for 'quiet zero'
Browse files Browse the repository at this point in the history
So that we can update all ingesters before enabling this in distributors.
  • Loading branch information
bboreham committed Dec 13, 2024
1 parent 3af22a8 commit b0624f0
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(
pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits, pushConfig.OTelResourceAttributePromotionConfig,
pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger,
pushConfig.RetryConfig, pushConfig.EnableStartTimeQuietZero, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger,
), true, false, "POST")

a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ type Config struct {

// OTelResourceAttributePromotionConfig allows for specializing OTel resource attribute promotion.
OTelResourceAttributePromotionConfig OTelResourceAttributePromotionConfig `yaml:"-"`

// Change the implementation of Otel startTime from a real zero to a special NaN value.
EnableStartTimeQuietZero bool `yaml:"start_time_quiet_zero" category:"advanced"`
}

// PushWrapper wraps around a push. It is similar to middleware.Interface.
Expand All @@ -267,6 +270,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.")
f.IntVar(&cfg.ReusableIngesterPushWorkers, "distributor.reusable-ingester-push-workers", 2000, "Number of pre-allocated workers used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned. (Note: this is a performance optimization, not a limiting feature.)")
f.BoolVar(&cfg.EnableStartTimeQuietZero, "distributor.otel-start-time-quiet-zero", false, "Change the implementation of Otel startTime from a real zero to a special NaN value.")

cfg.DefaultLimits.RegisterFlags(f)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func OTLPHandler(
limits OTLPHandlerLimits,
resourceAttributePromotionConfig OTelResourceAttributePromotionConfig,
retryCfg RetryConfig,
enableStartTimeQuietZero bool,
push PushFunc,
pushMetrics *PushMetrics,
reg prometheus.Registerer,
Expand Down Expand Up @@ -181,7 +182,7 @@ func OTLPHandler(
pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize))

var metrics []mimirpb.PreallocTimeseries
metrics, err = otelMetricsToTimeseries(ctx, tenantID, addSuffixes, enableCTZeroIngestion, promoteResourceAttributes, keepIdentifyingResourceAttributes, discardedDueToOtelParseError, spanLogger, otlpReq.Metrics())
metrics, err = otelMetricsToTimeseries(ctx, tenantID, addSuffixes, enableCTZeroIngestion, enableStartTimeQuietZero, promoteResourceAttributes, keepIdentifyingResourceAttributes, discardedDueToOtelParseError, spanLogger, otlpReq.Metrics())
if err != nil {
return err
}
Expand Down Expand Up @@ -410,11 +411,12 @@ func otelMetricsToMetadata(addSuffixes bool, md pmetric.Metrics) []*mimirpb.Metr
return metadata
}

func otelMetricsToTimeseries(ctx context.Context, tenantID string, addSuffixes, enableCTZeroIngestion bool, promoteResourceAttributes []string, keepIdentifyingResourceAttributes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
func otelMetricsToTimeseries(ctx context.Context, tenantID string, addSuffixes, enableCTZeroIngestion, enableStartTimeQuietZero bool, promoteResourceAttributes []string, keepIdentifyingResourceAttributes bool, discardedDueToOtelParseError *prometheus.CounterVec, logger log.Logger, md pmetric.Metrics) ([]mimirpb.PreallocTimeseries, error) {
converter := otlp.NewMimirConverter()
_, errs := converter.FromMetrics(ctx, md, otlp.Settings{
AddMetricSuffixes: addSuffixes,
EnableCreatedTimestampZeroIngestion: enableCTZeroIngestion,
EnableStartTimeQuietZero: enableStartTimeQuietZero,
PromoteResourceAttributes: promoteResourceAttributes,
KeepIdentifyingResourceAttributes: keepIdentifyingResourceAttributes,
}, utillog.SlogFromGoKit(logger))
Expand Down
14 changes: 7 additions & 7 deletions pkg/distributor/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestOTelMetricsToTimeSeries(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mimirTS, err := otelMetricsToTimeseries(
context.Background(), tenantID, true, false, tc.promoteResourceAttributes, tc.keepIdentifyingResourceAttributes, discardedDueToOTelParseError, log.NewNopLogger(), md,
context.Background(), tenantID, true, false, false, tc.promoteResourceAttributes, tc.keepIdentifyingResourceAttributes, discardedDueToOTelParseError, log.NewNopLogger(), md,
)
require.NoError(t, err)
require.Len(t, mimirTS, 2)
Expand Down Expand Up @@ -351,7 +351,7 @@ func BenchmarkOTLPHandler(b *testing.B) {
validation.NewMockTenantLimits(map[string]*validation.Limits{}),
)
require.NoError(b, err)
handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger())
handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, false, pushFunc, nil, nil, log.NewNopLogger())

b.Run("protobuf", func(b *testing.B) {
req := createOTLPProtoRequest(b, exportReq, "")
Expand Down Expand Up @@ -750,7 +750,7 @@ func TestHandlerOTLPPush(t *testing.T) {

logs := &concurrency.SyncBuffer{}
retryConfig := RetryConfig{Enabled: true, MinBackoff: 5 * time.Second, MaxBackoff: 5 * time.Second}
handler := OTLPHandler(tt.maxMsgSize, nil, nil, limits, tt.resourceAttributePromotionConfig, retryConfig, pusher, nil, nil, level.NewFilter(log.NewLogfmtLogger(logs), level.AllowInfo()))
handler := OTLPHandler(tt.maxMsgSize, nil, nil, limits, tt.resourceAttributePromotionConfig, retryConfig, false, pusher, nil, nil, level.NewFilter(log.NewLogfmtLogger(logs), level.AllowInfo()))

resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
Expand Down Expand Up @@ -823,7 +823,7 @@ func TestHandler_otlpDroppedMetricsPanic(t *testing.T) {

req := createOTLPProtoRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), "")
resp := httptest.NewRecorder()
handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, false, func(_ context.Context, pushReq *Request) error {
request, err := pushReq.WriteRequest()
assert.NoError(t, err)
assert.Len(t, request.Timeseries, 3)
Expand Down Expand Up @@ -869,7 +869,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {

req := createOTLPProtoRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), "")
resp := httptest.NewRecorder()
handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, false, func(_ context.Context, pushReq *Request) error {
request, err := pushReq.WriteRequest()
t.Cleanup(pushReq.CleanUp)
require.NoError(t, err)
Expand All @@ -895,7 +895,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {

req = createOTLPProtoRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), "")
resp = httptest.NewRecorder()
handler = OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
handler = OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, false, func(_ context.Context, pushReq *Request) error {
request, err := pushReq.WriteRequest()
t.Cleanup(pushReq.CleanUp)
require.NoError(t, err)
Expand Down Expand Up @@ -923,7 +923,7 @@ func TestHandler_otlpWriteRequestTooBigWithCompression(t *testing.T) {

resp := httptest.NewRecorder()

handler := OTLPHandler(140, nil, nil, nil, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger())
handler := OTLPHandler(140, nil, nil, nil, nil, RetryConfig{}, false, readBodyPushFunc(t), nil, nil, log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, http.StatusRequestEntityTooLarge, resp.Code)
body, err := io.ReadAll(resp.Body)
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ func TestOTLPPushHandlerErrorsAreReportedCorrectlyViaHttpgrpc(t *testing.T) {

return nil
}
h := OTLPHandler(200, util.NewBufferPool(0), nil, otlpLimitsMock{}, nil, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger())
h := OTLPHandler(200, util.NewBufferPool(0), nil, otlpLimitsMock{}, nil, RetryConfig{}, false, push, newPushMetrics(reg), reg, log.NewNopLogger())
srv.HTTP.Handle("/otlp", h)

// start the server
Expand Down

0 comments on commit b0624f0

Please sign in to comment.