Skip to content

Commit

Permalink
refectory
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Dec 17, 2024
1 parent e315ebb commit f04c28f
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 115 deletions.
9 changes: 1 addition & 8 deletions development/mimir-microservices-mode/config/mimir.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
multitenancy_enabled: false
cost_attribution_registry_path: "/usage-metrics"
cost_attribution_eviction_interval: 10m

distributor:
ha_tracker:
Expand Down Expand Up @@ -186,10 +184,5 @@ limits:
ha_replica_label: ha_replica
ha_max_clusters: 10

cost_attribution_labels: "container"
max_cost_attribution_labels_per_user: 2
max_cost_attribution_cardinality_per_user: 100
cost_attribution_cooldown: 20m

runtime_config:
file: ./config/runtime.yaml
file: ./config/runtime.yaml
39 changes: 39 additions & 0 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,18 @@ overrides_exporter:
# (experimental) Enables optimized marshaling of timeseries.
# CLI flag: -timeseries-unmarshal-caching-optimization-enabled
[timeseries_unmarshal_caching_optimization_enabled: <boolean> | default = true]

# (experimental) Time interval at which inactive cost attributions are evicted
# from the counter, ensuring they are not included in the cost attribution
# cardinality per user limit.
# CLI flag: -cost-attribution.eviction-interval
[cost_attribution_eviction_interval: <duration> | default = 20m]

# (experimental) Defines a custom path for the registry. When specified, Mimir
# will expose cost attribution metrics through this custom path, if not
# specified, cost attribution metrics won't be exposed.
# CLI flag: -cost-attribution.registry-path
[cost_attribution_registry_path: <string> | default = ""]
```
### common
Expand Down Expand Up @@ -3569,6 +3581,33 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -querier.active-series-results-max-size-bytes
[active_series_results_max_size_bytes: <int> | default = 419430400]

# (experimental) Defines labels for cost attribution, applied to metrics like
# cortex_distributor_attributed_received_samples_total. Set to an empty string
# to disable. Example: 'team,service' will produce metrics such as
# cortex_distributor_attributed_received_samples_total{team='frontend',
# service='api'}.
# CLI flag: -validation.cost-attribution-labels
[cost_attribution_labels: <string> | default = ""]

# (experimental) Maximum number of cost attribution labels allowed per user.
# CLI flag: -validation.max-cost-attribution-labels-per-user
[max_cost_attribution_labels_per_user: <int> | default = 2]

# (experimental) Maximum cardinality of cost attribution labels allowed per
# user.
# CLI flag: -validation.max-cost-attribution-cardinality-per-user
[max_cost_attribution_cardinality_per_user: <int> | default = 10000]

# (experimental) Cooldown period for cost attribution labels. Specifies the
# duration the cost attribution remains in overflow before attempting a reset.
# If the cardinality remains above the limit after this period, the system will
# stay in overflow mode and extend the cooldown. Setting this value to 0
# disables the cooldown, causing the system to continuously check whether the
# cardinality has dropped below the limit. A reset will occur once the
# cardinality falls below the limit.
# CLI flag: -validation.cost-attribution-cooldown
[cost_attribution_cooldown: <duration> | default = 0s]

# Duration to delay the evaluation of rules to ensure the underlying metrics
# have been pushed.
# CLI flag: -ruler.evaluation-delay-duration
Expand Down
17 changes: 8 additions & 9 deletions pkg/costattribution/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,15 @@ func (m *Manager) inactiveObservationsForUser(userID string, deadline int64) []s
m.trackersByUserID[userID] = cat
m.mtx.Unlock()
return nil
} else {
maxCardinality := m.limits.MaxCostAttributionCardinalityPerUser(userID)
if cat.MaxCardinality() != maxCardinality {
cat.UpdateMaxCardinality(maxCardinality)
}
}
maxCardinality := m.limits.MaxCostAttributionCardinalityPerUser(userID)
if cat.MaxCardinality() != maxCardinality {
cat.UpdateMaxCardinality(maxCardinality)
}

cooldown := int64(m.limits.CostAttributionCooldown(userID).Seconds())
if cooldown != cat.CooldownDuration() {
cat.UpdateCooldownDuration(cooldown)
}
cooldown := int64(m.limits.CostAttributionCooldown(userID).Seconds())
if cooldown != cat.CooldownDuration() {
cat.UpdateCooldownDuration(cooldown)
}

return cat.InactiveObservations(deadline)
Expand Down
21 changes: 13 additions & 8 deletions pkg/costattribution/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func Test_CreateDeleteTracker(t *testing.T) {
})

t.Run("Purge inactive attributions", func(t *testing.T) {
manager.purgeInactiveAttributionsUntil(time.Unix(10, 0).Unix())
err := manager.purgeInactiveAttributionsUntil(time.Unix(10, 0).Unix())
assert.NoError(t, err)
expectedMetrics := `
# HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution.
# TYPE cortex_discarded_attributed_samples_total counter
Expand All @@ -103,8 +104,10 @@ func Test_CreateDeleteTracker(t *testing.T) {
})

t.Run("Disabling user cost attribution", func(t *testing.T) {
manager.limits, _ = getMockLimits(1)
manager.purgeInactiveAttributionsUntil(time.Unix(11, 0).Unix())
var err error
manager.limits, err = getMockLimits(1)
assert.NoError(t, err)
assert.NoError(t, manager.purgeInactiveAttributionsUntil(time.Unix(11, 0).Unix()))
assert.Equal(t, 1, len(manager.trackersByUserID))

expectedMetrics := `
Expand All @@ -116,8 +119,10 @@ func Test_CreateDeleteTracker(t *testing.T) {
})

t.Run("Updating user cardinality and labels", func(t *testing.T) {
manager.limits, _ = getMockLimits(2)
manager.purgeInactiveAttributionsUntil(time.Unix(12, 0).Unix())
var err error
manager.limits, err = getMockLimits(2)
assert.NoError(t, err)
assert.NoError(t, manager.purgeInactiveAttributionsUntil(time.Unix(12, 0).Unix()))
assert.Equal(t, 1, len(manager.trackersByUserID))
assert.True(t, manager.TrackerForUser("user3").CompareCALabels([]string{"feature", "team"}))

Expand Down Expand Up @@ -151,7 +156,7 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) {
manager.TrackerForUser("user3").IncrementDiscardedSamples(labels.FromStrings("department", "foo", "service", "bar"), 1, "out-of-window", time.Unix(10, 0))

t.Run("Purge before inactive timeout", func(t *testing.T) {
manager.purgeInactiveAttributionsUntil(time.Unix(0, 0).Unix())
assert.NoError(t, manager.purgeInactiveAttributionsUntil(time.Unix(0, 0).Unix()))
assert.Equal(t, 2, len(manager.trackersByUserID))

expectedMetrics := `
Expand All @@ -166,7 +171,7 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) {
t.Run("Purge after inactive timeout", func(t *testing.T) {
// disable cost attribution for user1 to test purging
manager.limits, _ = getMockLimits(1)
manager.purgeInactiveAttributionsUntil(time.Unix(5, 0).Unix())
assert.NoError(t, manager.purgeInactiveAttributionsUntil(time.Unix(5, 0).Unix()))

// User3's tracker should remain since it's active, user1's tracker should be removed
assert.Equal(t, 1, len(manager.trackersByUserID), "Expected one active tracker after purging")
Expand All @@ -182,7 +187,7 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) {

t.Run("Purge all trackers", func(t *testing.T) {
// Trigger a purge that should remove all inactive trackers
manager.purgeInactiveAttributionsUntil(time.Unix(20, 0).Unix())
assert.NoError(t, manager.purgeInactiveAttributionsUntil(time.Unix(20, 0).Unix()))

// Tracker would stay at 1 since user1's tracker is disabled
assert.Equal(t, 1, len(manager.trackersByUserID), "Expected one active tracker after full purge")
Expand Down
46 changes: 32 additions & 14 deletions pkg/costattribution/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ type Tracker struct {
activeSeriesPerUserAttribution *prometheus.Desc
receivedSamplesAttribution *prometheus.Desc
discardedSampleAttribution *prometheus.Desc
failedActiveSeriesDecrement *prometheus.Desc
overflowLabels []string
obseveredMtx sync.RWMutex
observed map[string]*Observation
hashBuffer []byte
state TrackerState
overflowCounter *Observation
cooldownUntil *atomic.Int64
totalFailedActiveSeries *atomic.Float64
cooldownDuration int64
logger log.Logger
}
Expand All @@ -70,15 +72,16 @@ func newTracker(userID string, trackedLabels []string, limit int, cooldown time.
overflowLabels[len(trackedLabels)+1] = overflowValue

tracker := &Tracker{
userID: userID,
caLabels: trackedLabels,
caLabelMap: caLabelMap,
maxCardinality: limit,
observed: make(map[string]*Observation),
hashBuffer: make([]byte, 0, 1024),
cooldownDuration: int64(cooldown.Seconds()),
logger: logger,
overflowLabels: overflowLabels,
userID: userID,
caLabels: trackedLabels,
caLabelMap: caLabelMap,
maxCardinality: limit,
observed: make(map[string]*Observation),
hashBuffer: make([]byte, 0, 1024),
cooldownDuration: int64(cooldown.Seconds()),
logger: logger,
overflowLabels: overflowLabels,
totalFailedActiveSeries: atomic.NewFloat64(0),
}

tracker.discardedSampleAttribution = prometheus.NewDesc("cortex_discarded_attributed_samples_total",
Expand All @@ -94,7 +97,9 @@ func newTracker(userID string, trackedLabels []string, limit int, cooldown time.
tracker.activeSeriesPerUserAttribution = prometheus.NewDesc("cortex_ingester_attributed_active_series",
"The total number of active series per user and attribution.", append(trackedLabels, TenantLabel),
prometheus.Labels{TrackerLabel: defaultTrackerName})

tracker.failedActiveSeriesDecrement = prometheus.NewDesc("cortex_ingester_attributed_active_series_failure",
"The total number of failed active series decrement per user and tracker.", []string{TenantLabel},
prometheus.Labels{TrackerLabel: defaultTrackerName})
return tracker
}

Expand Down Expand Up @@ -149,11 +154,11 @@ func (t *Tracker) IncrementActiveSeries(lbs labels.Labels, now time.Time) {
t.updateCounters(lbs, now.Unix(), 1, 0, 0, nil)
}

func (t *Tracker) DecrementActiveSeries(lbs labels.Labels, now time.Time) {
func (t *Tracker) DecrementActiveSeries(lbs labels.Labels) {
if t == nil {
return
}
t.updateCounters(lbs, now.Unix(), -1, 0, 0, nil)
t.updateCounters(lbs, -1, -1, 0, 0, nil)
}

func (t *Tracker) Collect(out chan<- prometheus.Metric) {
Expand Down Expand Up @@ -182,6 +187,9 @@ func (t *Tracker) Collect(out chan<- prometheus.Metric) {
o.discardSamplemtx.Unlock()
}
}
if t.totalFailedActiveSeries.Load() > 0 {
out <- prometheus.MustNewConstMetric(t.failedActiveSeriesDecrement, prometheus.CounterValue, t.totalFailedActiveSeries.Load(), t.userID)
}
}

func (t *Tracker) IncrementDiscardedSamples(lbs labels.Labels, value float64, reason string, now time.Time) {
Expand All @@ -198,6 +206,13 @@ func (t *Tracker) IncrementReceivedSamples(lbs labels.Labels, value float64, now
t.updateCounters(lbs, now.Unix(), 0, value, 0, nil)
}

func (t *Tracker) IncrementActiveSeriesFailure(value float64) {
if t == nil {
return
}
t.totalFailedActiveSeries.Add(value)
}

func (t *Tracker) updateCounters(lbls labels.Labels, ts int64, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement float64, reason *string) {
labelValues := make([]string, len(t.caLabels))
lbls.Range(func(l labels.Label) {
Expand Down Expand Up @@ -248,8 +263,11 @@ func (t *Tracker) handleObservation(stream string, ts int64, activeSeriesIncreme
o.discardSamplemtx.Unlock()
}
} else if len(t.observed) < t.maxCardinality*2 {
// Create a new observation for the stream
t.createNewObservation(stream, ts, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement, reason)
// If the ts is negative, it means that the method is called from DecrementActiveSeries, when key doesn't exist we should ignore the call
// Otherwise create a new observation for the stream
if ts >= 0 {
t.createNewObservation(stream, ts, activeSeriesIncrement, receivedSampleIncrement, discardedSampleIncrement, reason)
}
}
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/costattribution/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ func Test_CreateCleanupTracker(t *testing.T) {

cat.IncrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), time.Unix(1, 0))
cat.IncrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "2"), time.Unix(2, 0))
cat.DecrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "3"), time.Unix(3, 0))
cat.DecrementActiveSeries(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "3"))
cat.IncrementReceivedSamples(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), 5, time.Unix(4, 0))
cat.IncrementDiscardedSamples(labels.FromStrings("platform", "foo", "tenant", "user4", "team", "1"), 2, "sample-out-of-order", time.Unix(4, 0))

cat.IncrementActiveSeries(labels.FromStrings("platform", "bar", "tenant", "user4", "team", "2"), time.Unix(6, 0))
cat.IncrementActiveSeriesFailure(1)

expectedMetrics := `
# HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution.
Expand All @@ -49,6 +49,9 @@ func Test_CreateCleanupTracker(t *testing.T) {
# TYPE cortex_ingester_attributed_active_series gauge
cortex_ingester_attributed_active_series{platform="bar",tenant="user4",tracker="cost-attribution"} 1
cortex_ingester_attributed_active_series{platform="foo",tenant="user4",tracker="cost-attribution"} 1
# HELP cortex_ingester_attributed_active_series_failure The total number of failed active series decrement per user and tracker.
# TYPE cortex_ingester_attributed_active_series_failure counter
cortex_ingester_attributed_active_series_failure{tenant="user4",tracker="cost-attribution"} 1
# HELP cortex_received_attributed_samples_total The total number of samples that were received per attribution.
# TYPE cortex_received_attributed_samples_total counter
cortex_received_attributed_samples_total{platform="foo",tenant="user4",tracker="cost-attribution"} 5
Expand All @@ -58,15 +61,19 @@ func Test_CreateCleanupTracker(t *testing.T) {
"cortex_discarded_attributed_samples_total",
"cortex_received_attributed_samples_total",
"cortex_ingester_attributed_active_series",
"cortex_ingester_attributed_active_series_failure",
}
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...))
assert.Equal(t, []string{"foo"}, cat.InactiveObservations(5))
tManager.purgeInactiveAttributionsUntil(5)
assert.NoError(t, tManager.purgeInactiveAttributionsUntil(5))

expectedMetrics = `
# HELP cortex_ingester_attributed_active_series The total number of active series per user and attribution.
# TYPE cortex_ingester_attributed_active_series gauge
cortex_ingester_attributed_active_series{platform="bar",tenant="user4",tracker="cost-attribution"} 1
# HELP cortex_ingester_attributed_active_series_failure The total number of failed active series decrement per user and tracker.
# TYPE cortex_ingester_attributed_active_series_failure counter
cortex_ingester_attributed_active_series_failure{tenant="user4",tracker="cost-attribution"} 1
`
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...))
tManager.deleteUserTracker("user4")
Expand Down
Loading

0 comments on commit f04c28f

Please sign in to comment.