Skip to content

Commit

Permalink
Distributor: Delay simulation on ingestion (#10107)
Browse files Browse the repository at this point in the history
* Distributor: Artificially delay sample ingestion if configured

With Sigyn, we have a much higher latency in ingestion - somewhere around 5x. Ths can produce adverse side affects on remote write clients struggling. The purpose of this change is to help remote write clients prepare for the increase in latency by artifically producing it.

The delay is meant to be configurable per tenant.

Signed-off-by: gotjosh <[email protected]>

* make docs

Signed-off-by: gotjosh <[email protected]>

* skip docs

Signed-off-by: gotjosh <[email protected]>

* rework the implementation to use a fake time and be a middleware wrapper

* fix lint

Signed-off-by: gotjosh <[email protected]>

* minor style fixes

Signed-off-by: gotjosh <[email protected]>

* Address review comments

Signed-off-by: gotjosh <[email protected]>

* fix unit test to take into account jitter

Signed-off-by: gotjosh <[email protected]>

---------

Signed-off-by: gotjosh <[email protected]>
  • Loading branch information
gotjosh authored Dec 8, 2024
1 parent 9636602 commit de6d3fb
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 1 deletion.
35 changes: 34 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,15 @@ type Distributor struct {

// partitionsRing is the hash ring holding ingester partitions. It's used when ingest storage is enabled.
partitionsRing *ring.PartitionInstanceRing

// For testing functionality that relies on timing without having to sleep in unit tests.
sleep func(time.Duration)
now func() time.Time
}

func defaultSleep(d time.Duration) { time.Sleep(d) }
func defaultNow() time.Time { return time.Now() }

// OTelResourceAttributePromotionConfig contains methods for configuring OTel resource attribute promotion.
type OTelResourceAttributePromotionConfig interface {
// PromoteOTelResourceAttributes returns which OTel resource attributes to promote for tenant ID.
Expand Down Expand Up @@ -437,6 +444,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
}),

PushMetrics: newPushMetrics(reg),
now: defaultNow,
sleep: defaultSleep,
}

// Initialize expected rejected request labels
Expand Down Expand Up @@ -825,7 +834,9 @@ func (d *Distributor) wrapPushWithMiddlewares(next PushFunc) PushFunc {
next = middlewares[ix](next)
}

return next
// The delay middleware must take into account total runtime of all other middlewares and the push func, hence why we wrap all others.
return d.outerMaybeDelayMiddleware(next)

}

func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {
Expand Down Expand Up @@ -1192,6 +1203,28 @@ func (d *Distributor) metricsMiddleware(next PushFunc) PushFunc {
}
}

// outerMaybeDelayMiddleware is a middleware that may delay ingestion if configured.
func (d *Distributor) outerMaybeDelayMiddleware(next PushFunc) PushFunc {
return func(ctx context.Context, pushReq *Request) error {
start := d.now()
// Execute the whole middleware chain.
err := next(ctx, pushReq)

userID, userErr := tenant.TenantID(ctx) // Log tenant ID if available.
if userErr == nil {
// Target delay - time spent processing the middleware chain including the push.
// If the request took longer than the target delay, we don't delay at all as sleep will return immediately for a negative value.
if delay := d.limits.DistributorIngestionArtificialDelay(userID) - d.now().Sub(start); delay > 0 {
d.sleep(util.DurationWithJitter(delay, 0.10))
}
return err
}

level.Warn(d.log).Log("msg", "failed to get tenant ID while trying to delay ingestion", "err", userErr)
return err
}
}

type ctxKey int

const requestStateKey ctxKey = 1
Expand Down
98 changes: 98 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8004,3 +8004,101 @@ func TestCheckStartedMiddleware(t *testing.T) {
require.NotNil(t, err)
require.ErrorContains(t, err, "rpc error: code = Internal desc = distributor is unavailable (current state: New)")
}

func Test_outerMaybeDelayMiddleware(t *testing.T) {
tests := []struct {
name string
userID string
delay time.Duration
pushDuration time.Duration
expectedSleep time.Duration
}{
{
name: "No delay configured",
userID: "user1",
delay: 0,
pushDuration: 500 * time.Millisecond,
expectedSleep: 0,
},
{
name: "Delay configured but request took longer than delay",
userID: "user2",
delay: 500 * time.Millisecond,
pushDuration: 1 * time.Second,
expectedSleep: 0,
},
{
name: "Delay configured and request took less than delay",
userID: "user3",
delay: 500 * time.Millisecond,
pushDuration: 50 * time.Millisecond,
expectedSleep: 450 * time.Millisecond,
},
{
name: "Failed to extract a tenantID",
userID: "",
delay: 500 * time.Millisecond,
pushDuration: 50 * time.Millisecond,
expectedSleep: 0,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
limits := validation.NewMockTenantLimits(map[string]*validation.Limits{
tc.userID: {
IngestionArtificialDelay: model.Duration(tc.delay),
},
})
overrides, err := validation.NewOverrides(*prepareDefaultLimits(), limits)
require.NoError(t, err)

// Mock to capture sleep and advance time.
timeSource := &MockTimeSource{CurrentTime: time.Now()}

distributor := &Distributor{
log: log.NewNopLogger(),
limits: overrides,
sleep: timeSource.Sleep,
now: timeSource.Now,
}

// fake push just adds time to the mocked time to make it seem like time has moved forward.
p := func(_ context.Context, _ *Request) error {
timeSource.Add(tc.pushDuration)
return nil
}

ctx := context.Background()
if tc.userID != "" {
ctx = user.InjectOrgID(ctx, tc.userID)
}
wrappedPush := distributor.outerMaybeDelayMiddleware(p)
err = wrappedPush(ctx, NewParsedRequest(&mimirpb.WriteRequest{}))
require.NoError(t, err)

// Due to the 10% jitter we need to take into account that the number will not be deterministic in tests.
difference := timeSource.Slept - tc.expectedSleep
require.LessOrEqual(t, difference.Abs(), tc.expectedSleep/10)
})
}
}

type MockTimeSource struct {
CurrentTime time.Time
Slept time.Duration
}

func (m *MockTimeSource) Now() time.Time {
return m.CurrentTime
}

func (m *MockTimeSource) Sleep(d time.Duration) {
if d > 0 {
m.Slept += d
}
}

func (m *MockTimeSource) Add(d time.Duration) {
m.CurrentTime = m.CurrentTime.Add(d)
}
7 changes: 7 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type Limits struct {
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs. Labels available during the relabeling phase and cleaned afterwards: __meta_tenant_id" category:"experimental"`
MetricRelabelingEnabled bool `yaml:"metric_relabeling_enabled" json:"metric_relabeling_enabled" category:"experimental"`
ServiceOverloadStatusCodeOnRateLimitEnabled bool `yaml:"service_overload_status_code_on_rate_limit_enabled" json:"service_overload_status_code_on_rate_limit_enabled" category:"experimental"`
IngestionArtificialDelay model.Duration `yaml:"ingestion_artificial_delay" json:"ingestion_artificial_delay" category:"experimental" doc:"hidden"`
// Ingester enforced limits.
// Series
MaxGlobalSeriesPerUser int `yaml:"max_global_series_per_user" json:"max_global_series_per_user"`
Expand Down Expand Up @@ -280,6 +281,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&l.OTelMetricSuffixesEnabled, "distributor.otel-metric-suffixes-enabled", false, "Whether to enable automatic suffixes to names of metrics ingested through OTLP.")
f.BoolVar(&l.OTelCreatedTimestampZeroIngestionEnabled, "distributor.otel-created-timestamp-zero-ingestion-enabled", false, "Whether to enable translation of OTel start timestamps to Prometheus zero samples in the OTLP endpoint.")
f.Var(&l.PromoteOTelResourceAttributes, "distributor.otel-promote-resource-attributes", "Optionally specify OTel resource attributes to promote to labels.")
f.Var(&l.IngestionArtificialDelay, "distributor.ingestion-artificial-latency", "Target ingestion delay. If set to a non-zero value, the distributor will artificially delay ingestion time-frame by the specified duration by computing the difference between actual ingestion and the target. There is no delay on actual ingestion of samples, it is only the response back to the client.")

f.IntVar(&l.MaxGlobalSeriesPerUser, MaxSeriesPerUserFlag, 150000, "The maximum number of in-memory series per tenant, across the cluster before replication. 0 to disable.")
f.IntVar(&l.MaxGlobalSeriesPerMetric, MaxSeriesPerMetricFlag, 0, "The maximum number of in-memory series per metric name, across the cluster before replication. 0 to disable.")
Expand Down Expand Up @@ -1111,6 +1113,11 @@ func (o *Overrides) PromoteOTelResourceAttributes(tenantID string) []string {
return o.getOverridesForUser(tenantID).PromoteOTelResourceAttributes
}

// DistributorIngestionArtificialDelay returns the artificial ingestion latency for a given use.
func (o *Overrides) DistributorIngestionArtificialDelay(tenantID string) time.Duration {
return time.Duration(o.getOverridesForUser(tenantID).IngestionArtificialDelay)
}

func (o *Overrides) AlignQueriesWithStep(userID string) bool {
return o.getOverridesForUser(userID).AlignQueriesWithStep
}
Expand Down

0 comments on commit de6d3fb

Please sign in to comment.