From c2fcd7a6433d461d41f53e9360ab33d6da9a2092 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 23 Dec 2024 17:25:18 +0100 Subject: [PATCH 1/7] feat(tag_cardinality_limit transform): enable per metric limits for `tag_cardinality_limit` This adds the ability to add per metric limits in `tag_cardinality_limit` transform, besides the global configuration that applies to all metrics. It supports matching metrics by name and optionally by namespace too. Closes: #15743 --- .../tag_cardinality_limit/config.rs | 37 ++- src/transforms/tag_cardinality_limit/mod.rs | 88 +++++-- src/transforms/tag_cardinality_limit/tests.rs | 238 +++++++++++++++++- 3 files changed, 331 insertions(+), 32 deletions(-) diff --git a/src/transforms/tag_cardinality_limit/config.rs b/src/transforms/tag_cardinality_limit/config.rs index 6e52396656427..237520b42b83f 100644 --- a/src/transforms/tag_cardinality_limit/config.rs +++ b/src/transforms/tag_cardinality_limit/config.rs @@ -16,6 +16,19 @@ use vector_lib::configurable::configurable_component; ))] #[derive(Clone, Debug)] pub struct TagCardinalityLimitConfig { + #[serde(flatten)] + pub global: TagCardinalityLimitInnerConfig, + + /// Tag cardinality limits configuration per metric name. + #[configurable(derived)] + #[serde(default)] + pub per_metric_limits: Vec, +} + +/// Configuration for the `tag_cardinality_limit` transform for a specific group of metrics. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct TagCardinalityLimitInnerConfig { /// How many distinct values to accept for any given key. #[serde(default = "default_value_limit")] pub value_limit: usize, @@ -77,6 +90,21 @@ pub enum LimitExceededAction { DropEvent, } +/// Tag cardinality limit configuration per metric name. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct PerMetricConfig { + /// Name of the metric this configuration refers to. + pub name: String, + + /// Namespace of the metric this configuration refers to. + #[serde(default)] + pub namespace: Option, + + #[serde(flatten)] + pub config: TagCardinalityLimitInnerConfig, +} + const fn default_limit_exceeded_action() -> LimitExceededAction { LimitExceededAction::DropTag } @@ -92,9 +120,12 @@ pub(crate) const fn default_cache_size() -> usize { impl GenerateConfig for TagCardinalityLimitConfig { fn generate_config() -> toml::Value { toml::Value::try_from(Self { - mode: Mode::Exact, - value_limit: default_value_limit(), - limit_exceeded_action: default_limit_exceeded_action(), + global: TagCardinalityLimitInnerConfig { + mode: Mode::Exact, + value_limit: default_value_limit(), + limit_exceeded_action: default_limit_exceeded_action(), + }, + per_metric_limits: Vec::default(), }) .unwrap() } diff --git a/src/transforms/tag_cardinality_limit/mod.rs b/src/transforms/tag_cardinality_limit/mod.rs index 14b18b457a6c0..5156cb0c48a84 100644 --- a/src/transforms/tag_cardinality_limit/mod.rs +++ b/src/transforms/tag_cardinality_limit/mod.rs @@ -19,13 +19,15 @@ mod tag_value_set; mod tests; use crate::event::metric::TagValueSet; -pub use config::TagCardinalityLimitConfig; +pub use config::{TagCardinalityLimitConfig, TagCardinalityLimitInnerConfig}; use tag_value_set::AcceptedTagValueSet; +type MetricId = (Option, String); + #[derive(Debug)] pub struct TagCardinalityLimit { config: TagCardinalityLimitConfig, - accepted_tags: HashMap, + accepted_tags: HashMap, HashMap>, } impl TagCardinalityLimit { @@ -36,6 +38,22 @@ impl TagCardinalityLimit { } } + fn get_config_for_metric( + &self, + metric_key: &Option, + ) -> &TagCardinalityLimitInnerConfig { + match metric_key { + Some(id) => self + .config + .per_metric_limits + .iter() + .find(|c| c.name == id.1 && (c.namespace.is_none() || c.namespace == id.0)) + .map(|c| &c.config) + .unwrap_or(&self.config.global), + None => &self.config.global, + } + } + /// Takes in key and a value corresponding to a tag on an incoming Metric /// Event. If that value is already part of set of accepted values for that /// key, then simply returns true. If that value is not yet part of the @@ -44,10 +62,18 @@ impl TagCardinalityLimit { /// for the key and returns true, otherwise returns false. A false return /// value indicates to the caller that the value is not accepted for this /// key, and the configured limit_exceeded_action should be taken. - fn try_accept_tag(&mut self, key: &str, value: &TagValueSet) -> bool { - let tag_value_set = self.accepted_tags.entry_ref(key).or_insert_with(|| { - AcceptedTagValueSet::new(self.config.value_limit, &self.config.mode) - }); + fn try_accept_tag( + &mut self, + metric_key: &Option, + key: &str, + value: &TagValueSet, + ) -> bool { + let config = self.get_config_for_metric(metric_key).clone(); + info!("try_accept_tag using config: {:?}", config); + let metric_accepted_tags = self.accepted_tags.entry(metric_key.clone()).or_default(); + let tag_value_set = metric_accepted_tags + .entry_ref(key) + .or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode)); if tag_value_set.contains(value) { // Tag value has already been accepted, nothing more to do. @@ -55,11 +81,11 @@ impl TagCardinalityLimit { } // Tag value not yet part of the accepted set. - if tag_value_set.len() < self.config.value_limit { + if tag_value_set.len() < config.value_limit { // accept the new value tag_value_set.insert(value.clone()); - if tag_value_set.len() == self.config.value_limit { + if tag_value_set.len() == config.value_limit { emit!(TagCardinalityValueLimitReached { key }); } @@ -72,34 +98,58 @@ impl TagCardinalityLimit { /// Checks if recording a key and value corresponding to a tag on an incoming Metric would /// exceed the cardinality limit. - fn tag_limit_exceeded(&self, key: &str, value: &TagValueSet) -> bool { + fn tag_limit_exceeded( + &self, + metric_key: &Option, + key: &str, + value: &TagValueSet, + ) -> bool { self.accepted_tags - .get(key) - .map(|value_set| { - !value_set.contains(value) && value_set.len() >= self.config.value_limit + .get(metric_key) + .and_then(|metric_accepted_tags| { + metric_accepted_tags.get(key).map(|value_set| { + !value_set.contains(value) + && value_set.len() >= self.get_config_for_metric(metric_key).value_limit + }) }) .unwrap_or(false) } /// Record a key and value corresponding to a tag on an incoming Metric. - fn record_tag_value(&mut self, key: &str, value: &TagValueSet) { - self.accepted_tags + fn record_tag_value(&mut self, metric_key: &Option, key: &str, value: &TagValueSet) { + let config = self.get_config_for_metric(metric_key).clone(); + let metric_accepted_tags = self.accepted_tags.entry(metric_key.clone()).or_default(); + metric_accepted_tags .entry_ref(key) - .or_insert_with(|| AcceptedTagValueSet::new(self.config.value_limit, &self.config.mode)) + .or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode)) .insert(value.clone()); } fn transform_one(&mut self, mut event: Event) -> Option { let metric = event.as_mut_metric(); let metric_name = metric.name().to_string(); + let metric_namespace = metric.namespace().map(|n| n.to_string()); + info!("The config: {:?}", self.config); + let has_per_metric_config = self.config.per_metric_limits.iter().any(|c| { + c.name == metric_name && (c.namespace.is_none() || c.namespace == metric_namespace) + }); + let metric_key = if has_per_metric_config { + info!("Metric specific config has been found!"); + Some((metric_namespace, metric_name.clone())) + } else { + None + }; if let Some(tags_map) = metric.tags_mut() { - match self.config.limit_exceeded_action { + match self + .get_config_for_metric(&metric_key) + .limit_exceeded_action + { LimitExceededAction::DropEvent => { // This needs to check all the tags, to ensure that the ordering of tag names // doesn't change the behavior of the check. for (key, value) in tags_map.iter_sets() { - if self.tag_limit_exceeded(key, value) { + if self.tag_limit_exceeded(&metric_key, key, value) { emit!(TagCardinalityLimitRejectingEvent { metric_name: &metric_name, tag_key: key, @@ -109,12 +159,12 @@ impl TagCardinalityLimit { } } for (key, value) in tags_map.iter_sets() { - self.record_tag_value(key, value); + self.record_tag_value(&metric_key, key, value); } } LimitExceededAction::DropTag => { tags_map.retain(|key, value| { - if self.try_accept_tag(key, value) { + if self.try_accept_tag(&metric_key, key, value) { true } else { emit!(TagCardinalityLimitRejectingTag { diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index d4e63b0820ec8..d8bb4f974c012 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use config::PerMetricConfig; use vector_lib::config::ComponentKey; use vector_lib::config::OutputId; use vector_lib::event::EventMetadata; @@ -24,12 +25,12 @@ fn generate_config() { crate::test_util::test_generate_config::(); } -fn make_metric(tags: MetricTags) -> Event { +fn make_metric_with_name(tags: MetricTags, name: &str) -> Event { let event_metadata = EventMetadata::default().with_source_type("unit_test_stream"); Event::Metric( Metric::new_with_metadata( - "event", + name, metric::MetricKind::Incremental, metric::MetricValue::Counter { value: 1.0 }, event_metadata, @@ -38,14 +39,21 @@ fn make_metric(tags: MetricTags) -> Event { ) } +fn make_metric(tags: MetricTags) -> Event { + make_metric_with_name(tags, "event") +} + const fn make_transform_hashset( value_limit: usize, limit_exceeded_action: LimitExceededAction, ) -> TagCardinalityLimitConfig { TagCardinalityLimitConfig { - value_limit, - limit_exceeded_action, - mode: Mode::Exact, + global: TagCardinalityLimitInnerConfig { + value_limit, + limit_exceeded_action, + mode: Mode::Exact, + }, + per_metric_limits: Vec::new(), } } @@ -54,11 +62,46 @@ const fn make_transform_bloom( limit_exceeded_action: LimitExceededAction, ) -> TagCardinalityLimitConfig { TagCardinalityLimitConfig { - value_limit, - limit_exceeded_action, - mode: Mode::Probabilistic(BloomFilterConfig { - cache_size_per_key: default_cache_size(), - }), + global: TagCardinalityLimitInnerConfig { + value_limit, + limit_exceeded_action, + mode: Mode::Probabilistic(BloomFilterConfig { + cache_size_per_key: default_cache_size(), + }), + }, + per_metric_limits: Vec::new(), + } +} + +const fn make_transform_hashset_with_per_metric_limits( + value_limit: usize, + limit_exceeded_action: LimitExceededAction, + per_metric_limits: Vec, +) -> TagCardinalityLimitConfig { + TagCardinalityLimitConfig { + global: TagCardinalityLimitInnerConfig { + value_limit, + limit_exceeded_action, + mode: Mode::Exact, + }, + per_metric_limits, + } +} + +const fn make_transform_bloom_with_per_metric_limits( + value_limit: usize, + limit_exceeded_action: LimitExceededAction, + per_metric_limits: Vec, +) -> TagCardinalityLimitConfig { + TagCardinalityLimitConfig { + global: TagCardinalityLimitInnerConfig { + value_limit, + limit_exceeded_action, + mode: Mode::Probabilistic(BloomFilterConfig { + cache_size_per_key: default_cache_size(), + }), + }, + per_metric_limits, } } @@ -359,3 +402,178 @@ fn drop_event_checks_all_tags(make_tags: impl Fn(&str, &str) -> MetricTags) { assert_eq!(new_event3, None); assert_eq!(new_event4, Some(event4)); } + +#[tokio::test] +async fn tag_cardinality_limit_separate_value_limit_per_metric_name_hashset() { + separate_value_limit_per_metric_name(make_transform_hashset_with_per_metric_limits( + 2, + LimitExceededAction::DropTag, + vec![ + PerMetricConfig { + name: "metricA".to_string(), + namespace: None, + config: make_transform_hashset(1, LimitExceededAction::DropTag).global, + }, + PerMetricConfig { + name: "metricB".to_string(), + namespace: None, + config: make_transform_hashset(5, LimitExceededAction::DropTag).global, + }, + ], + )) + .await; +} + +#[tokio::test] +async fn tag_cardinality_limit_separate_value_limit_per_metric_name_bloom() { + separate_value_limit_per_metric_name(make_transform_bloom_with_per_metric_limits( + 2, + LimitExceededAction::DropTag, + vec![ + PerMetricConfig { + name: "metricA".to_string(), + namespace: None, + config: make_transform_bloom(1, LimitExceededAction::DropTag).global, + }, + PerMetricConfig { + name: "metricB".to_string(), + namespace: None, + config: make_transform_bloom(5, LimitExceededAction::DropTag).global, + }, + ], + )) + .await; +} + +/// Test that hitting the value limit on one tag does not affect the ability to take new +/// values for other tags. +async fn separate_value_limit_per_metric_name(config: TagCardinalityLimitConfig) { + assert_transform_compliance(async move { + let mut event_a1 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val1"), "metricA"); + + // The limit for tag1 should already be reached here + let mut event_a2 = + make_metric_with_name(metric_tags!("tag1" => "val2", "tag2" => "val1"), "metricA"); + + // The limit for tag2 should be reached here + let mut event_a3 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val2"), "metricA"); + + // MetricB should have all of its tags kept due to higher limit + let mut event_b1 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val1"), "metricB"); + let mut event_b2 = + make_metric_with_name(metric_tags!("tag1" => "val2", "tag2" => "val1"), "metricB"); + let mut event_b3 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val2"), "metricB"); + + // MetricC has no specific config, so it uses the global config, which allows 2 values + let mut event_c1 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val1"), "metricC"); + let mut event_c2 = + make_metric_with_name(metric_tags!("tag1" => "val2", "tag2" => "val2"), "metricC"); + // The limit for tag2 should be reached here + let mut event_c3 = + make_metric_with_name(metric_tags!("tag1" => "val1", "tag2" => "val3"), "metricC"); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await; + + let events = vec![ + &mut event_a1, + &mut event_a2, + &mut event_a3, + &mut event_b1, + &mut event_b2, + &mut event_b3, + &mut event_c1, + &mut event_c2, + &mut event_c3, + ]; + + for event in &events { + tx.send((*event).clone()).await.unwrap(); + } + + let new_event_a1 = out.recv().await; + let new_event_a2 = out.recv().await; + let new_event_a3 = out.recv().await; + let new_event_b1 = out.recv().await; + let new_event_b2 = out.recv().await; + let new_event_b3 = out.recv().await; + let new_event_c1 = out.recv().await; + let new_event_c2 = out.recv().await; + let new_event_c3 = out.recv().await; + + drop(tx); + topology.stop().await; + + for event in events { + event.set_source_id(Arc::new(ComponentKey::from("in"))); + event.set_upstream_id(Arc::new(OutputId::from("transform"))); + event.metadata_mut().set_schema_definition(&Arc::new( + Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]), + )); + } + + assert_eq!(new_event_a1, Some(event_a1)); + // The second event should have been modified to remove "tag1" + let new_event_a2 = new_event_a2.unwrap(); + assert!(!new_event_a2 + .as_metric() + .tags() + .unwrap() + .contains_key("tag1")); + assert_eq!( + "val1", + new_event_a2 + .as_metric() + .tags() + .unwrap() + .get("tag2") + .unwrap() + ); + + // The third event should have been modified to remove "tag2" + let new_event_a3 = new_event_a3.unwrap(); + assert!(!new_event_a3 + .as_metric() + .tags() + .unwrap() + .contains_key("tag2")); + assert_eq!( + "val1", + new_event_a3 + .as_metric() + .tags() + .unwrap() + .get("tag1") + .unwrap() + ); + + assert_eq!(new_event_b1, Some(event_b1)); + assert_eq!(new_event_b2, Some(event_b2)); + assert_eq!(new_event_b3, Some(event_b3)); + + assert_eq!(new_event_c1, Some(event_c1)); + assert_eq!(new_event_c2, Some(event_c2)); + // The third event should have been modified to remove "tag2" + let new_event_c3 = new_event_c3.unwrap(); + assert!(!new_event_c3 + .as_metric() + .tags() + .unwrap() + .contains_key("tag2")); + assert_eq!( + "val1", + new_event_c3 + .as_metric() + .tags() + .unwrap() + .get("tag1") + .unwrap() + ); + }) + .await; +} From 880a43570c59461629da3a33089a858cc261d4f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 23 Dec 2024 17:33:03 +0100 Subject: [PATCH 2/7] Add changelog entry --- ...etric_limits_for_tag_cardinality_limit_transform.feature.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/22077_per_metric_limits_for_tag_cardinality_limit_transform.feature.md diff --git a/changelog.d/22077_per_metric_limits_for_tag_cardinality_limit_transform.feature.md b/changelog.d/22077_per_metric_limits_for_tag_cardinality_limit_transform.feature.md new file mode 100644 index 0000000000000..5abea4c17ada0 --- /dev/null +++ b/changelog.d/22077_per_metric_limits_for_tag_cardinality_limit_transform.feature.md @@ -0,0 +1,3 @@ +The `tag_cardinality_limit` transform now supports customizing limits for specific matrics, matched by metric name and optionally its namespace. + +authors: esensar From 803043a85761d7552febdbc41acc107da7175ef1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 23 Dec 2024 17:39:03 +0100 Subject: [PATCH 3/7] Fix typo in changelog entry --- ...metric_limits_for_tag_cardinality_limit_transform.feature.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/22077_per_metric_limits_for_tag_cardinality_limit_transform.feature.md b/changelog.d/22077_per_metric_limits_for_tag_cardinality_limit_transform.feature.md index 5abea4c17ada0..57a96a6f2f471 100644 --- a/changelog.d/22077_per_metric_limits_for_tag_cardinality_limit_transform.feature.md +++ b/changelog.d/22077_per_metric_limits_for_tag_cardinality_limit_transform.feature.md @@ -1,3 +1,3 @@ -The `tag_cardinality_limit` transform now supports customizing limits for specific matrics, matched by metric name and optionally its namespace. +The `tag_cardinality_limit` transform now supports customizing limits for specific metrics, matched by metric name and optionally its namespace. authors: esensar From 8adbcf0629d49ae91826b51021256d319d96d850 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 6 Jan 2025 11:59:33 +0100 Subject: [PATCH 4/7] Generate docs using `make generate-component-docs` --- .../transforms/base/tag_cardinality_limit.cue | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/website/cue/reference/components/transforms/base/tag_cardinality_limit.cue b/website/cue/reference/components/transforms/base/tag_cardinality_limit.cue index 8e961d886ba92..96d5d407e2f61 100644 --- a/website/cue/reference/components/transforms/base/tag_cardinality_limit.cue +++ b/website/cue/reference/components/transforms/base/tag_cardinality_limit.cue @@ -46,6 +46,75 @@ base: components: transforms: tag_cardinality_limit: configuration: { """ } } + per_metric_limits: { + description: "Tag cardinality limits configuration per metric name." + required: false + type: array: { + default: [] + items: type: object: options: { + cache_size_per_key: { + description: """ + The size of the cache for detecting duplicate tags, in bytes. + + The larger the cache size, the less likely it is to have a false positive, or a case where + we allow a new value for tag even after we have reached the configured limits. + """ + relevant_when: "mode = \"probabilistic\"" + required: false + type: uint: default: 5120 + } + limit_exceeded_action: { + description: """ + Possible actions to take when an event arrives that would exceed the cardinality limit for one + or more of its tags. + """ + required: false + type: string: { + default: "drop_tag" + enum: { + drop_event: "Drop the entire event itself." + drop_tag: "Drop the tag(s) that would exceed the configured limit." + } + } + } + mode: { + description: "Controls the approach taken for tracking tag cardinality." + required: true + type: string: enum: { + exact: """ + Tracks cardinality exactly. + + This mode has higher memory requirements than `probabilistic`, but never falsely outputs + metrics with new tags after the limit has been hit. + """ + probabilistic: """ + Tracks cardinality probabilistically. + + This mode has lower memory requirements than `exact`, but may occasionally allow metric + events to pass through the transform even when they contain new tags that exceed the + configured limit. The rate at which this happens can be controlled by changing the value of + `cache_size_per_key`. + """ + } + } + name: { + description: "Name of the metric this configuration refers to." + required: true + type: string: {} + } + namespace: { + description: "Namespace of the metric this configuration refers to." + required: false + type: string: {} + } + value_limit: { + description: "How many distinct values to accept for any given key." + required: false + type: uint: default: 500 + } + } + } + } value_limit: { description: "How many distinct values to accept for any given key." required: false From 1a0babe2f5b98b6fbd6a8b1cffd54f7c552899b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 6 Jan 2025 13:38:32 +0100 Subject: [PATCH 5/7] Use HashMap instead of Vec for `per_metric_limits` --- .../tag_cardinality_limit/config.rs | 12 ++-- src/transforms/tag_cardinality_limit/mod.rs | 11 +-- src/transforms/tag_cardinality_limit/tests.rs | 69 +++++++++++-------- .../transforms/base/tag_cardinality_limit.cue | 32 ++++----- 4 files changed, 66 insertions(+), 58 deletions(-) diff --git a/src/transforms/tag_cardinality_limit/config.rs b/src/transforms/tag_cardinality_limit/config.rs index 237520b42b83f..3a348e8e287a0 100644 --- a/src/transforms/tag_cardinality_limit/config.rs +++ b/src/transforms/tag_cardinality_limit/config.rs @@ -20,9 +20,12 @@ pub struct TagCardinalityLimitConfig { pub global: TagCardinalityLimitInnerConfig, /// Tag cardinality limits configuration per metric name. - #[configurable(derived)] + #[configurable( + derived, + metadata(docs::additional_props_description = "An individual metric configuration.") + )] #[serde(default)] - pub per_metric_limits: Vec, + pub per_metric_limits: HashMap, } /// Configuration for the `tag_cardinality_limit` transform for a specific group of metrics. @@ -94,9 +97,6 @@ pub enum LimitExceededAction { #[configurable_component] #[derive(Clone, Debug)] pub struct PerMetricConfig { - /// Name of the metric this configuration refers to. - pub name: String, - /// Namespace of the metric this configuration refers to. #[serde(default)] pub namespace: Option, @@ -125,7 +125,7 @@ impl GenerateConfig for TagCardinalityLimitConfig { value_limit: default_value_limit(), limit_exceeded_action: default_limit_exceeded_action(), }, - per_metric_limits: Vec::default(), + per_metric_limits: HashMap::default(), }) .unwrap() } diff --git a/src/transforms/tag_cardinality_limit/mod.rs b/src/transforms/tag_cardinality_limit/mod.rs index 5156cb0c48a84..f43d4e3efdcd3 100644 --- a/src/transforms/tag_cardinality_limit/mod.rs +++ b/src/transforms/tag_cardinality_limit/mod.rs @@ -47,8 +47,10 @@ impl TagCardinalityLimit { .config .per_metric_limits .iter() - .find(|c| c.name == id.1 && (c.namespace.is_none() || c.namespace == id.0)) - .map(|c| &c.config) + .find(|(name, config)| { + **name == id.1 && (config.namespace.is_none() || config.namespace == id.0) + }) + .map(|(_, c)| &c.config) .unwrap_or(&self.config.global), None => &self.config.global, } @@ -130,8 +132,9 @@ impl TagCardinalityLimit { let metric_name = metric.name().to_string(); let metric_namespace = metric.namespace().map(|n| n.to_string()); info!("The config: {:?}", self.config); - let has_per_metric_config = self.config.per_metric_limits.iter().any(|c| { - c.name == metric_name && (c.namespace.is_none() || c.namespace == metric_namespace) + let has_per_metric_config = self.config.per_metric_limits.iter().any(|(name, config)| { + *name == metric_name + && (config.namespace.is_none() || config.namespace == metric_namespace) }); let metric_key = if has_per_metric_config { info!("Metric specific config has been found!"); diff --git a/src/transforms/tag_cardinality_limit/tests.rs b/src/transforms/tag_cardinality_limit/tests.rs index d8bb4f974c012..8e573cc959bd7 100644 --- a/src/transforms/tag_cardinality_limit/tests.rs +++ b/src/transforms/tag_cardinality_limit/tests.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use config::PerMetricConfig; @@ -43,7 +44,7 @@ fn make_metric(tags: MetricTags) -> Event { make_metric_with_name(tags, "event") } -const fn make_transform_hashset( +fn make_transform_hashset( value_limit: usize, limit_exceeded_action: LimitExceededAction, ) -> TagCardinalityLimitConfig { @@ -53,11 +54,11 @@ const fn make_transform_hashset( limit_exceeded_action, mode: Mode::Exact, }, - per_metric_limits: Vec::new(), + per_metric_limits: HashMap::new(), } } -const fn make_transform_bloom( +fn make_transform_bloom( value_limit: usize, limit_exceeded_action: LimitExceededAction, ) -> TagCardinalityLimitConfig { @@ -69,14 +70,14 @@ const fn make_transform_bloom( cache_size_per_key: default_cache_size(), }), }, - per_metric_limits: Vec::new(), + per_metric_limits: HashMap::new(), } } const fn make_transform_hashset_with_per_metric_limits( value_limit: usize, limit_exceeded_action: LimitExceededAction, - per_metric_limits: Vec, + per_metric_limits: HashMap, ) -> TagCardinalityLimitConfig { TagCardinalityLimitConfig { global: TagCardinalityLimitInnerConfig { @@ -91,7 +92,7 @@ const fn make_transform_hashset_with_per_metric_limits( const fn make_transform_bloom_with_per_metric_limits( value_limit: usize, limit_exceeded_action: LimitExceededAction, - per_metric_limits: Vec, + per_metric_limits: HashMap, ) -> TagCardinalityLimitConfig { TagCardinalityLimitConfig { global: TagCardinalityLimitInnerConfig { @@ -408,18 +409,22 @@ async fn tag_cardinality_limit_separate_value_limit_per_metric_name_hashset() { separate_value_limit_per_metric_name(make_transform_hashset_with_per_metric_limits( 2, LimitExceededAction::DropTag, - vec![ - PerMetricConfig { - name: "metricA".to_string(), - namespace: None, - config: make_transform_hashset(1, LimitExceededAction::DropTag).global, - }, - PerMetricConfig { - name: "metricB".to_string(), - namespace: None, - config: make_transform_hashset(5, LimitExceededAction::DropTag).global, - }, - ], + HashMap::from([ + ( + "metricA".to_string(), + PerMetricConfig { + namespace: None, + config: make_transform_hashset(1, LimitExceededAction::DropTag).global, + }, + ), + ( + "metricB".to_string(), + PerMetricConfig { + namespace: None, + config: make_transform_hashset(5, LimitExceededAction::DropTag).global, + }, + ), + ]), )) .await; } @@ -429,18 +434,22 @@ async fn tag_cardinality_limit_separate_value_limit_per_metric_name_bloom() { separate_value_limit_per_metric_name(make_transform_bloom_with_per_metric_limits( 2, LimitExceededAction::DropTag, - vec![ - PerMetricConfig { - name: "metricA".to_string(), - namespace: None, - config: make_transform_bloom(1, LimitExceededAction::DropTag).global, - }, - PerMetricConfig { - name: "metricB".to_string(), - namespace: None, - config: make_transform_bloom(5, LimitExceededAction::DropTag).global, - }, - ], + HashMap::from([ + ( + "metricA".to_string(), + PerMetricConfig { + namespace: None, + config: make_transform_bloom(1, LimitExceededAction::DropTag).global, + }, + ), + ( + "metricB".to_string(), + PerMetricConfig { + namespace: None, + config: make_transform_bloom(5, LimitExceededAction::DropTag).global, + }, + ), + ]), )) .await; } diff --git a/website/cue/reference/components/transforms/base/tag_cardinality_limit.cue b/website/cue/reference/components/transforms/base/tag_cardinality_limit.cue index 96d5d407e2f61..5680d03c2c3d3 100644 --- a/website/cue/reference/components/transforms/base/tag_cardinality_limit.cue +++ b/website/cue/reference/components/transforms/base/tag_cardinality_limit.cue @@ -49,9 +49,10 @@ base: components: transforms: tag_cardinality_limit: configuration: { per_metric_limits: { description: "Tag cardinality limits configuration per metric name." required: false - type: array: { - default: [] - items: type: object: options: { + type: object: options: "*": { + description: "An individual metric configuration." + required: true + type: object: options: { cache_size_per_key: { description: """ The size of the cache for detecting duplicate tags, in bytes. @@ -82,26 +83,21 @@ base: components: transforms: tag_cardinality_limit: configuration: { required: true type: string: enum: { exact: """ - Tracks cardinality exactly. + Tracks cardinality exactly. - This mode has higher memory requirements than `probabilistic`, but never falsely outputs - metrics with new tags after the limit has been hit. - """ + This mode has higher memory requirements than `probabilistic`, but never falsely outputs + metrics with new tags after the limit has been hit. + """ probabilistic: """ - Tracks cardinality probabilistically. + Tracks cardinality probabilistically. - This mode has lower memory requirements than `exact`, but may occasionally allow metric - events to pass through the transform even when they contain new tags that exceed the - configured limit. The rate at which this happens can be controlled by changing the value of - `cache_size_per_key`. - """ + This mode has lower memory requirements than `exact`, but may occasionally allow metric + events to pass through the transform even when they contain new tags that exceed the + configured limit. The rate at which this happens can be controlled by changing the value of + `cache_size_per_key`. + """ } } - name: { - description: "Name of the metric this configuration refers to." - required: true - type: string: {} - } namespace: { description: "Namespace of the metric this configuration refers to." required: false From 7d12a371e1ade6fc1f0d33a7b0eb3429107f32f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 6 Jan 2025 13:45:57 +0100 Subject: [PATCH 6/7] Remove debugging logs from `tag_cardinality_limit` --- src/transforms/tag_cardinality_limit/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/transforms/tag_cardinality_limit/mod.rs b/src/transforms/tag_cardinality_limit/mod.rs index f43d4e3efdcd3..eaf899ec748e3 100644 --- a/src/transforms/tag_cardinality_limit/mod.rs +++ b/src/transforms/tag_cardinality_limit/mod.rs @@ -71,7 +71,6 @@ impl TagCardinalityLimit { value: &TagValueSet, ) -> bool { let config = self.get_config_for_metric(metric_key).clone(); - info!("try_accept_tag using config: {:?}", config); let metric_accepted_tags = self.accepted_tags.entry(metric_key.clone()).or_default(); let tag_value_set = metric_accepted_tags .entry_ref(key) @@ -131,13 +130,11 @@ impl TagCardinalityLimit { let metric = event.as_mut_metric(); let metric_name = metric.name().to_string(); let metric_namespace = metric.namespace().map(|n| n.to_string()); - info!("The config: {:?}", self.config); let has_per_metric_config = self.config.per_metric_limits.iter().any(|(name, config)| { *name == metric_name && (config.namespace.is_none() || config.namespace == metric_namespace) }); let metric_key = if has_per_metric_config { - info!("Metric specific config has been found!"); Some((metric_namespace, metric_name.clone())) } else { None From a83908ac19e36c7da2b6d321fd889581f8747c26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 6 Jan 2025 14:18:04 +0100 Subject: [PATCH 7/7] Change `&Option` into `Option<&..>` in `tag_cardinality_limit` --- src/transforms/tag_cardinality_limit/mod.rs | 22 ++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/transforms/tag_cardinality_limit/mod.rs b/src/transforms/tag_cardinality_limit/mod.rs index eaf899ec748e3..bf543ae86ed0e 100644 --- a/src/transforms/tag_cardinality_limit/mod.rs +++ b/src/transforms/tag_cardinality_limit/mod.rs @@ -40,7 +40,7 @@ impl TagCardinalityLimit { fn get_config_for_metric( &self, - metric_key: &Option, + metric_key: Option<&MetricId>, ) -> &TagCardinalityLimitInnerConfig { match metric_key { Some(id) => self @@ -66,12 +66,12 @@ impl TagCardinalityLimit { /// key, and the configured limit_exceeded_action should be taken. fn try_accept_tag( &mut self, - metric_key: &Option, + metric_key: Option<&MetricId>, key: &str, value: &TagValueSet, ) -> bool { let config = self.get_config_for_metric(metric_key).clone(); - let metric_accepted_tags = self.accepted_tags.entry(metric_key.clone()).or_default(); + let metric_accepted_tags = self.accepted_tags.entry(metric_key.cloned()).or_default(); let tag_value_set = metric_accepted_tags .entry_ref(key) .or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode)); @@ -101,12 +101,12 @@ impl TagCardinalityLimit { /// exceed the cardinality limit. fn tag_limit_exceeded( &self, - metric_key: &Option, + metric_key: Option<&MetricId>, key: &str, value: &TagValueSet, ) -> bool { self.accepted_tags - .get(metric_key) + .get(&metric_key.cloned()) .and_then(|metric_accepted_tags| { metric_accepted_tags.get(key).map(|value_set| { !value_set.contains(value) @@ -117,9 +117,9 @@ impl TagCardinalityLimit { } /// Record a key and value corresponding to a tag on an incoming Metric. - fn record_tag_value(&mut self, metric_key: &Option, key: &str, value: &TagValueSet) { + fn record_tag_value(&mut self, metric_key: Option<&MetricId>, key: &str, value: &TagValueSet) { let config = self.get_config_for_metric(metric_key).clone(); - let metric_accepted_tags = self.accepted_tags.entry(metric_key.clone()).or_default(); + let metric_accepted_tags = self.accepted_tags.entry(metric_key.cloned()).or_default(); metric_accepted_tags .entry_ref(key) .or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode)) @@ -141,7 +141,7 @@ impl TagCardinalityLimit { }; if let Some(tags_map) = metric.tags_mut() { match self - .get_config_for_metric(&metric_key) + .get_config_for_metric(metric_key.as_ref()) .limit_exceeded_action { LimitExceededAction::DropEvent => { @@ -149,7 +149,7 @@ impl TagCardinalityLimit { // doesn't change the behavior of the check. for (key, value) in tags_map.iter_sets() { - if self.tag_limit_exceeded(&metric_key, key, value) { + if self.tag_limit_exceeded(metric_key.as_ref(), key, value) { emit!(TagCardinalityLimitRejectingEvent { metric_name: &metric_name, tag_key: key, @@ -159,12 +159,12 @@ impl TagCardinalityLimit { } } for (key, value) in tags_map.iter_sets() { - self.record_tag_value(&metric_key, key, value); + self.record_tag_value(metric_key.as_ref(), key, value); } } LimitExceededAction::DropTag => { tags_map.retain(|key, value| { - if self.try_accept_tag(&metric_key, key, value) { + if self.try_accept_tag(metric_key.as_ref(), key, value) { true } else { emit!(TagCardinalityLimitRejectingTag {