diff --git a/changelog.d/22077_per_metric_limits_for_tag_cardinality_limit_transform.feature.md b/changelog.d/22077_per_metric_limits_for_tag_cardinality_limit_transform.feature.md new file mode 100644 index 0000000000000..57a96a6f2f471 --- /dev/null +++ b/changelog.d/22077_per_metric_limits_for_tag_cardinality_limit_transform.feature.md @@ -0,0 +1,3 @@ +The `tag_cardinality_limit` transform now supports customizing limits for specific metrics, matched by metric name and optionally its namespace. + +authors: esensar diff --git a/src/transforms/tag_cardinality_limit/config.rs b/src/transforms/tag_cardinality_limit/config.rs index 6e52396656427..3a348e8e287a0 100644 --- a/src/transforms/tag_cardinality_limit/config.rs +++ b/src/transforms/tag_cardinality_limit/config.rs @@ -16,6 +16,22 @@ use vector_lib::configurable::configurable_component; ))] #[derive(Clone, Debug)] pub struct TagCardinalityLimitConfig { + #[serde(flatten)] + pub global: TagCardinalityLimitInnerConfig, + + /// Tag cardinality limits configuration per metric name. + #[configurable( + derived, + metadata(docs::additional_props_description = "An individual metric configuration.") + )] + #[serde(default)] + pub per_metric_limits: HashMap, +} + +/// Configuration for the `tag_cardinality_limit` transform for a specific group of metrics. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct TagCardinalityLimitInnerConfig { /// How many distinct values to accept for any given key. #[serde(default = "default_value_limit")] pub value_limit: usize, @@ -77,6 +93,18 @@ pub enum LimitExceededAction { DropEvent, } +/// Tag cardinality limit configuration per metric name. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct PerMetricConfig { + /// Namespace of the metric this configuration refers to. + #[serde(default)] + pub namespace: Option, + + #[serde(flatten)] + pub config: TagCardinalityLimitInnerConfig, +} + const fn default_limit_exceeded_action() -> LimitExceededAction { LimitExceededAction::DropTag } @@ -92,9 +120,12 @@ pub(crate) const fn default_cache_size() -> usize { impl GenerateConfig for TagCardinalityLimitConfig { fn generate_config() -> toml::Value { toml::Value::try_from(Self { - mode: Mode::Exact, - value_limit: default_value_limit(), - limit_exceeded_action: default_limit_exceeded_action(), + global: TagCardinalityLimitInnerConfig { + mode: Mode::Exact, + value_limit: default_value_limit(), + limit_exceeded_action: default_limit_exceeded_action(), + }, + per_metric_limits: HashMap::default(), }) .unwrap() } diff --git a/src/transforms/tag_cardinality_limit/mod.rs b/src/transforms/tag_cardinality_limit/mod.rs index 14b18b457a6c0..bf543ae86ed0e 100644 --- a/src/transforms/tag_cardinality_limit/mod.rs +++ b/src/transforms/tag_cardinality_limit/mod.rs @@ -19,13 +19,15 @@ mod tag_value_set; mod tests; use crate::event::metric::TagValueSet; -pub use config::TagCardinalityLimitConfig; +pub use config::{TagCardinalityLimitConfig, TagCardinalityLimitInnerConfig}; use tag_value_set::AcceptedTagValueSet; +type MetricId = (Option, String); + #[derive(Debug)] pub struct TagCardinalityLimit { config: TagCardinalityLimitConfig, - accepted_tags: HashMap, + accepted_tags: HashMap, HashMap>, } impl TagCardinalityLimit { @@ -36,6 +38,24 @@ impl TagCardinalityLimit { } } + fn get_config_for_metric( + &self, + metric_key: Option<&MetricId>, + ) -> &TagCardinalityLimitInnerConfig { + match metric_key { + Some(id) => self + .config + .per_metric_limits + .iter() + .find(|(name, config)| { + **name == id.1 && (config.namespace.is_none() || config.namespace == id.0) + }) + .map(|(_, c)| &c.config) + .unwrap_or(&self.config.global), + None => &self.config.global, + } + } + /// Takes in key and a value corresponding to a tag on an incoming Metric /// Event. If that value is already part of set of accepted values for that /// key, then simply returns true. If that value is not yet part of the @@ -44,10 +64,17 @@ impl TagCardinalityLimit { /// for the key and returns true, otherwise returns false. A false return /// value indicates to the caller that the value is not accepted for this /// key, and the configured limit_exceeded_action should be taken. - fn try_accept_tag(&mut self, key: &str, value: &TagValueSet) -> bool { - let tag_value_set = self.accepted_tags.entry_ref(key).or_insert_with(|| { - AcceptedTagValueSet::new(self.config.value_limit, &self.config.mode) - }); + fn try_accept_tag( + &mut self, + metric_key: Option<&MetricId>, + key: &str, + value: &TagValueSet, + ) -> bool { + let config = self.get_config_for_metric(metric_key).clone(); + let metric_accepted_tags = self.accepted_tags.entry(metric_key.cloned()).or_default(); + let tag_value_set = metric_accepted_tags + .entry_ref(key) + .or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode)); if tag_value_set.contains(value) { // Tag value has already been accepted, nothing more to do. @@ -55,11 +82,11 @@ impl TagCardinalityLimit { } // Tag value not yet part of the accepted set. - if tag_value_set.len() < self.config.value_limit { + if tag_value_set.len() < config.value_limit { // accept the new value tag_value_set.insert(value.clone()); - if tag_value_set.len() == self.config.value_limit { + if tag_value_set.len() == config.value_limit { emit!(TagCardinalityValueLimitReached { key }); } @@ -72,34 +99,57 @@ impl TagCardinalityLimit { /// Checks if recording a key and value corresponding to a tag on an incoming Metric would /// exceed the cardinality limit. - fn tag_limit_exceeded(&self, key: &str, value: &TagValueSet) -> bool { + fn tag_limit_exceeded( + &self, + metric_key: Option<&MetricId>, + key: &str, + value: &TagValueSet, + ) -> bool { self.accepted_tags - .get(key) - .map(|value_set| { - !value_set.contains(value) && value_set.len() >= self.config.value_limit + .get(&metric_key.cloned()) + .and_then(|metric_accepted_tags| { + metric_accepted_tags.get(key).map(|value_set| { + !value_set.contains(value) + && value_set.len() >= self.get_config_for_metric(metric_key).value_limit + }) }) .unwrap_or(false) } /// Record a key and value corresponding to a tag on an incoming Metric. - fn record_tag_value(&mut self, key: &str, value: &TagValueSet) { - self.accepted_tags + fn record_tag_value(&mut self, metric_key: Option<&MetricId>, key: &str, value: &TagValueSet) { + let config = self.get_config_for_metric(metric_key).clone(); + let metric_accepted_tags = self.accepted_tags.entry(metric_key.cloned()).or_default(); + metric_accepted_tags .entry_ref(key) - .or_insert_with(|| AcceptedTagValueSet::new(self.config.value_limit, &self.config.mode)) + .or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode)) .insert(value.clone()); } fn transform_one(&mut self, mut event: Event) -> Option { let metric = event.as_mut_metric(); let metric_name = metric.name().to_string(); + let metric_namespace = metric.namespace().map(|n| n.to_string()); + let has_per_metric_config = self.config.per_metric_limits.iter().any(|(name, config)| { + *name == metric_name + && (config.namespace.is_none() || config.namespace == metric_namespace) + }); + let metric_key = if has_per_metric_config { + Some((metric_namespace, metric_name.clone())) + } else { + None + }; if let Some(tags_map) = metric.tags_mut() { - match self.config.limit_exceeded_action { + match self + .get_config_for_metric(metric_key.as_ref()) + .limit_exceeded_action + { LimitExceededAction::DropEvent => { // This needs to check all the tags, to ensure that the ordering of tag names // doesn't change the behavior of the check. for (key, value) in tags_map.iter_sets() { - if self.tag_limit_exceeded(key, value) { + if self.tag_limit_exceeded(metric_key.as_ref(), key, value) { emit!(TagCardinalityLimitRejectingEvent { metric_name: &metric_name, tag_key: key, @@ -109,12 +159,12 @@ impl TagCardinalityLimit { } } for (key, value) in tags_map.iter_sets() { - self.record_tag_value(key, value); + self.record_tag_value(metric_key.as_ref(), key, value); } } LimitExceededAction::DropTag => { tags_map.retain(|key, value| { - if self.try_accept_tag(key, value) { + if self.try_accept_tag(metric_key.as_ref(), key, value) { true } else { emit!(TagCardinalityLimitRejectingTag { diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index d4e63b0820ec8..8e573cc959bd7 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -1,5 +1,7 @@ +use std::collections::HashMap; use std::sync::Arc; +use config::PerMetricConfig; use vector_lib::config::ComponentKey; use vector_lib::config::OutputId; use vector_lib::event::EventMetadata; @@ -24,12 +26,12 @@ fn generate_config() { crate::test_util::test_generate_config::(); } -fn make_metric(tags: MetricTags) -> Event { +fn make_metric_with_name(tags: MetricTags, name: &str) -> Event { let event_metadata = EventMetadata::default().with_source_type("unit_test_stream"); Event::Metric( Metric::new_with_metadata( - "event", + name, metric::MetricKind::Incremental, metric::MetricValue::Counter { value: 1.0 }, event_metadata, @@ -38,27 +40,69 @@ fn make_metric(tags: MetricTags) -> Event { ) } -const fn make_transform_hashset( +fn make_metric(tags: MetricTags) -> Event { + make_metric_with_name(tags, "event") +} + +fn make_transform_hashset( + value_limit: usize, + limit_exceeded_action: LimitExceededAction, +) -> TagCardinalityLimitConfig { + TagCardinalityLimitConfig { + global: TagCardinalityLimitInnerConfig { + value_limit, + limit_exceeded_action, + mode: Mode::Exact, + }, + per_metric_limits: HashMap::new(), + } +} + +fn make_transform_bloom( + value_limit: usize, + limit_exceeded_action: LimitExceededAction, +) -> TagCardinalityLimitConfig { + TagCardinalityLimitConfig { + global: TagCardinalityLimitInnerConfig { + value_limit, + limit_exceeded_action, + mode: Mode::Probabilistic(BloomFilterConfig { + cache_size_per_key: default_cache_size(), + }), + }, + per_metric_limits: HashMap::new(), + } +} + +const fn make_transform_hashset_with_per_metric_limits( value_limit: usize, limit_exceeded_action: LimitExceededAction, + per_metric_limits: HashMap, ) -> TagCardinalityLimitConfig { TagCardinalityLimitConfig { - value_limit, - limit_exceeded_action, - mode: Mode::Exact, + global: TagCardinalityLimitInnerConfig { + value_limit, + limit_exceeded_action, + mode: Mode::Exact, + }, + per_metric_limits, } } -const fn make_transform_bloom( +const fn make_transform_bloom_with_per_metric_limits( value_limit: usize, limit_exceeded_action: LimitExceededAction, + per_metric_limits: HashMap, ) -> TagCardinalityLimitConfig { TagCardinalityLimitConfig { - value_limit, - limit_exceeded_action, - mode: Mode::Probabilistic(BloomFilterConfig { - cache_size_per_key: default_cache_size(), - }), + global: TagCardinalityLimitInnerConfig { + value_limit, + limit_exceeded_action, + mode: Mode::Probabilistic(BloomFilterConfig { + cache_size_per_key: default_cache_size(), + }), + }, + per_metric_limits, } } @@ -359,3 +403,186 @@ fn drop_event_checks_all_tags(make_tags: impl Fn(&str, &str) -> MetricTags) { assert_eq!(new_event3, None); assert_eq!(new_event4, Some(event4)); } + +#[tokio::test] +async fn tag_cardinality_limit_separate_value_limit_per_metric_name_hashset() { + separate_value_limit_per_metric_name(make_transform_hashset_with_per_metric_limits( + 2, + LimitExceededAction::DropTag, + HashMap::from([ + ( + "metricA".to_string(), + PerMetricConfig { + namespace: None, + config: make_transform_hashset(1, LimitExceededAction::DropTag).global, + }, + ), + ( + "metricB".to_string(), + PerMetricConfig { + namespace: None, + config: make_transform_hashset(5, LimitExceededAction::DropTag).global, + }, + ), + ]), + )) + .await; +} + +#[tokio::test] +async fn tag_cardinality_limit_separate_value_limit_per_metric_name_bloom() { + separate_value_limit_per_metric_name(make_transform_bloom_with_per_metric_limits( + 2, + LimitExceededAction::DropTag, + HashMap::from([ + ( + "metricA".to_string(), + PerMetricConfig { + namespace: None, + config: make_transform_bloom(1, LimitExceededAction::DropTag).global, + }, + ), + ( + "metricB".to_string(), + PerMetricConfig { + namespace: None, + config: make_transform_bloom(5, LimitExceededAction::DropTag).global, + }, + ), + ]), + )) + .await; +} + +/// Test that hitting the value limit on one tag does not affect the ability to take new +/// values for other tags. +async fn separate_value_limit_per_metric_name(config: TagCardinalityLimitConfig) { + assert_transform_compliance(async move { + let mut event_a1 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val1"), "metricA"); + + // The limit for tag1 should already be reached here + let mut event_a2 = + make_metric_with_name(metric_tags!("tag1" => "val2", "tag2" => "val1"), "metricA"); + + // The limit for tag2 should be reached here + let mut event_a3 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val2"), "metricA"); + + // MetricB should have all of its tags kept due to higher limit + let mut event_b1 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val1"), "metricB"); + let mut event_b2 = + make_metric_with_name(metric_tags!("tag1" => "val2", "tag2" => "val1"), "metricB"); + let mut event_b3 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val2"), "metricB"); + + // MetricC has no specific config, so it uses the global config, which allows 2 values + let mut event_c1 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val1"), "metricC"); + let mut event_c2 = + make_metric_with_name(metric_tags!("tag1" => "val2", "tag2" => "val2"), "metricC"); + // The limit for tag2 should be reached here + let mut event_c3 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val3"), "metricC"); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await; + + let events = vec![ + &mut event_a1, + &mut event_a2, + &mut event_a3, + &mut event_b1, + &mut event_b2, + &mut event_b3, + &mut event_c1, + &mut event_c2, + &mut event_c3, + ]; + + for event in &events { + tx.send((*event).clone()).await.unwrap(); + } + + let new_event_a1 = out.recv().await; + let new_event_a2 = out.recv().await; + let new_event_a3 = out.recv().await; + let new_event_b1 = out.recv().await; + let new_event_b2 = out.recv().await; + let new_event_b3 = out.recv().await; + let new_event_c1 = out.recv().await; + let new_event_c2 = out.recv().await; + let new_event_c3 = out.recv().await; + + drop(tx); + topology.stop().await; + + for event in events { + event.set_source_id(Arc::new(ComponentKey::from("in"))); + event.set_upstream_id(Arc::new(OutputId::from("transform"))); + event.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + } + + assert_eq!(new_event_a1, Some(event_a1)); + // The second event should have been modified to remove "tag1" + let new_event_a2 = new_event_a2.unwrap(); + assert!(!new_event_a2 + .as_metric() + .tags() + .unwrap() + .contains_key("tag1")); + assert_eq!( + "val1", + new_event_a2 + .as_metric() + .tags() + .unwrap() + .get("tag2") + .unwrap() + ); + + // The third event should have been modified to remove "tag2" + let new_event_a3 = new_event_a3.unwrap(); + assert!(!new_event_a3 + .as_metric() + .tags() + .unwrap() + .contains_key("tag2")); + assert_eq!( + "val1", + new_event_a3 + .as_metric() + .tags() + .unwrap() + .get("tag1") + .unwrap() + ); + + assert_eq!(new_event_b1, Some(event_b1)); + assert_eq!(new_event_b2, Some(event_b2)); + assert_eq!(new_event_b3, Some(event_b3)); + + assert_eq!(new_event_c1, Some(event_c1)); + assert_eq!(new_event_c2, Some(event_c2)); + // The third event should have been modified to remove "tag2" + let new_event_c3 = new_event_c3.unwrap(); + assert!(!new_event_c3 + .as_metric() + .tags() + .unwrap() + .contains_key("tag2")); + assert_eq!( + "val1", + new_event_c3 + .as_metric() + .tags() + .unwrap() + .get("tag1") + .unwrap() + ); + }) + .await; +} diff --git a/website/cue/reference/components/transforms/base/tag_cardinality_limit.cue b/website/cue/reference/components/transforms/base/tag_cardinality_limit.cue index 8e961d886ba92..5680d03c2c3d3 100644 --- a/website/cue/reference/components/transforms/base/tag_cardinality_limit.cue +++ b/website/cue/reference/components/transforms/base/tag_cardinality_limit.cue @@ -46,6 +46,71 @@ base: components: transforms: tag_cardinality_limit: configuration: { """ } } + per_metric_limits: { + description: "Tag cardinality limits configuration per metric name." + required: false + type: object: options: "*": { + description: "An individual metric configuration." + required: true + type: object: options: { + cache_size_per_key: { + description: """ + The size of the cache for detecting duplicate tags, in bytes. + + The larger the cache size, the less likely it is to have a false positive, or a case where + we allow a new value for tag even after we have reached the configured limits. + """ + relevant_when: "mode = \"probabilistic\"" + required: false + type: uint: default: 5120 + } + limit_exceeded_action: { + description: """ + Possible actions to take when an event arrives that would exceed the cardinality limit for one + or more of its tags. + """ + required: false + type: string: { + default: "drop_tag" + enum: { + drop_event: "Drop the entire event itself." + drop_tag: "Drop the tag(s) that would exceed the configured limit." + } + } + } + mode: { + description: "Controls the approach taken for tracking tag cardinality." + required: true + type: string: enum: { + exact: """ + Tracks cardinality exactly. + + This mode has higher memory requirements than `probabilistic`, but never falsely outputs + metrics with new tags after the limit has been hit. + """ + probabilistic: """ + Tracks cardinality probabilistically. + + This mode has lower memory requirements than `exact`, but may occasionally allow metric + events to pass through the transform even when they contain new tags that exceed the + configured limit. The rate at which this happens can be controlled by changing the value of + `cache_size_per_key`. + """ + } + } + namespace: { + description: "Namespace of the metric this configuration refers to." + required: false + type: string: {} + } + value_limit: { + description: "How many distinct values to accept for any given key." + required: false + type: uint: default: 500 + } + } + } + } value_limit: { description: "How many distinct values to accept for any given key." required: false