Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tag_cardinality_limit transform): enable per metric limits for tag_cardinality_limit #22077

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `tag_cardinality_limit` transform now supports customizing limits for specific metrics, matched by metric name and optionally its namespace.

authors: esensar
37 changes: 34 additions & 3 deletions src/transforms/tag_cardinality_limit/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@ use vector_lib::configurable::configurable_component;
))]
#[derive(Clone, Debug)]
pub struct TagCardinalityLimitConfig {
#[serde(flatten)]
pub global: TagCardinalityLimitInnerConfig,

/// Tag cardinality limits configuration per metric name.
#[configurable(
derived,
metadata(docs::additional_props_description = "An individual metric configuration.")
)]
#[serde(default)]
pub per_metric_limits: HashMap<String, PerMetricConfig>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jszwedko do these UX changes look good to you? I did a review and they look good to me. We are basically adding a new config field here without affecting any existing fields.

}

/// Configuration for the `tag_cardinality_limit` transform for a specific group of metrics.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct TagCardinalityLimitInnerConfig {
/// How many distinct values to accept for any given key.
#[serde(default = "default_value_limit")]
pub value_limit: usize,
Expand Down Expand Up @@ -77,6 +93,18 @@ pub enum LimitExceededAction {
DropEvent,
}

/// Tag cardinality limit configuration per metric name.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct PerMetricConfig {
/// Namespace of the metric this configuration refers to.
#[serde(default)]
pub namespace: Option<String>,

#[serde(flatten)]
pub config: TagCardinalityLimitInnerConfig,
}

const fn default_limit_exceeded_action() -> LimitExceededAction {
LimitExceededAction::DropTag
}
Expand All @@ -92,9 +120,12 @@ pub(crate) const fn default_cache_size() -> usize {
impl GenerateConfig for TagCardinalityLimitConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
mode: Mode::Exact,
value_limit: default_value_limit(),
limit_exceeded_action: default_limit_exceeded_action(),
global: TagCardinalityLimitInnerConfig {
mode: Mode::Exact,
value_limit: default_value_limit(),
limit_exceeded_action: default_limit_exceeded_action(),
},
per_metric_limits: HashMap::default(),
})
.unwrap()
}
Expand Down
88 changes: 69 additions & 19 deletions src/transforms/tag_cardinality_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ mod tag_value_set;
mod tests;

use crate::event::metric::TagValueSet;
pub use config::TagCardinalityLimitConfig;
pub use config::{TagCardinalityLimitConfig, TagCardinalityLimitInnerConfig};
use tag_value_set::AcceptedTagValueSet;

type MetricId = (Option<String>, String);

#[derive(Debug)]
pub struct TagCardinalityLimit {
config: TagCardinalityLimitConfig,
accepted_tags: HashMap<String, AcceptedTagValueSet>,
accepted_tags: HashMap<Option<MetricId>, HashMap<String, AcceptedTagValueSet>>,
}

impl TagCardinalityLimit {
Expand All @@ -36,6 +38,24 @@ impl TagCardinalityLimit {
}
}

fn get_config_for_metric(
&self,
metric_key: Option<&MetricId>,
) -> &TagCardinalityLimitInnerConfig {
match metric_key {
Some(id) => self
.config
.per_metric_limits
.iter()
.find(|(name, config)| {
**name == id.1 && (config.namespace.is_none() || config.namespace == id.0)
})
.map(|(_, c)| &c.config)
.unwrap_or(&self.config.global),
None => &self.config.global,
}
}

/// Takes in key and a value corresponding to a tag on an incoming Metric
/// Event. If that value is already part of set of accepted values for that
/// key, then simply returns true. If that value is not yet part of the
Expand All @@ -44,22 +64,29 @@ impl TagCardinalityLimit {
/// for the key and returns true, otherwise returns false. A false return
/// value indicates to the caller that the value is not accepted for this
/// key, and the configured limit_exceeded_action should be taken.
fn try_accept_tag(&mut self, key: &str, value: &TagValueSet) -> bool {
let tag_value_set = self.accepted_tags.entry_ref(key).or_insert_with(|| {
AcceptedTagValueSet::new(self.config.value_limit, &self.config.mode)
});
fn try_accept_tag(
&mut self,
metric_key: Option<&MetricId>,
key: &str,
value: &TagValueSet,
) -> bool {
let config = self.get_config_for_metric(metric_key).clone();
let metric_accepted_tags = self.accepted_tags.entry(metric_key.cloned()).or_default();
let tag_value_set = metric_accepted_tags
.entry_ref(key)
.or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode));

if tag_value_set.contains(value) {
// Tag value has already been accepted, nothing more to do.
return true;
}

// Tag value not yet part of the accepted set.
if tag_value_set.len() < self.config.value_limit {
if tag_value_set.len() < config.value_limit {
// accept the new value
tag_value_set.insert(value.clone());

if tag_value_set.len() == self.config.value_limit {
if tag_value_set.len() == config.value_limit {
emit!(TagCardinalityValueLimitReached { key });
}

Expand All @@ -72,34 +99,57 @@ impl TagCardinalityLimit {

/// Checks if recording a key and value corresponding to a tag on an incoming Metric would
/// exceed the cardinality limit.
fn tag_limit_exceeded(&self, key: &str, value: &TagValueSet) -> bool {
fn tag_limit_exceeded(
&self,
metric_key: Option<&MetricId>,
key: &str,
value: &TagValueSet,
) -> bool {
self.accepted_tags
.get(key)
.map(|value_set| {
!value_set.contains(value) && value_set.len() >= self.config.value_limit
.get(&metric_key.cloned())
.and_then(|metric_accepted_tags| {
metric_accepted_tags.get(key).map(|value_set| {
!value_set.contains(value)
&& value_set.len() >= self.get_config_for_metric(metric_key).value_limit
})
})
.unwrap_or(false)
}

/// Record a key and value corresponding to a tag on an incoming Metric.
fn record_tag_value(&mut self, key: &str, value: &TagValueSet) {
self.accepted_tags
fn record_tag_value(&mut self, metric_key: Option<&MetricId>, key: &str, value: &TagValueSet) {
let config = self.get_config_for_metric(metric_key).clone();
let metric_accepted_tags = self.accepted_tags.entry(metric_key.cloned()).or_default();
metric_accepted_tags
.entry_ref(key)
.or_insert_with(|| AcceptedTagValueSet::new(self.config.value_limit, &self.config.mode))
.or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode))
.insert(value.clone());
}

fn transform_one(&mut self, mut event: Event) -> Option<Event> {
let metric = event.as_mut_metric();
let metric_name = metric.name().to_string();
let metric_namespace = metric.namespace().map(|n| n.to_string());
let has_per_metric_config = self.config.per_metric_limits.iter().any(|(name, config)| {
*name == metric_name
&& (config.namespace.is_none() || config.namespace == metric_namespace)
});
let metric_key = if has_per_metric_config {
Some((metric_namespace, metric_name.clone()))
} else {
None
};
if let Some(tags_map) = metric.tags_mut() {
match self.config.limit_exceeded_action {
match self
.get_config_for_metric(metric_key.as_ref())
.limit_exceeded_action
{
LimitExceededAction::DropEvent => {
// This needs to check all the tags, to ensure that the ordering of tag names
// doesn't change the behavior of the check.

for (key, value) in tags_map.iter_sets() {
if self.tag_limit_exceeded(key, value) {
if self.tag_limit_exceeded(metric_key.as_ref(), key, value) {
emit!(TagCardinalityLimitRejectingEvent {
metric_name: &metric_name,
tag_key: key,
Expand All @@ -109,12 +159,12 @@ impl TagCardinalityLimit {
}
}
for (key, value) in tags_map.iter_sets() {
self.record_tag_value(key, value);
self.record_tag_value(metric_key.as_ref(), key, value);
}
}
LimitExceededAction::DropTag => {
tags_map.retain(|key, value| {
if self.try_accept_tag(key, value) {
if self.try_accept_tag(metric_key.as_ref(), key, value) {
true
} else {
emit!(TagCardinalityLimitRejectingTag {
Expand Down
Loading
Loading