Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tag_cardinality_limit transform): enable per metric limits for tag_cardinality_limit #22077

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `tag_cardinality_limit` transform now supports customizing limits for specific metrics, matched by metric name and optionally its namespace.

authors: esensar
37 changes: 34 additions & 3 deletions src/transforms/tag_cardinality_limit/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ use vector_lib::configurable::configurable_component;
))]
#[derive(Clone, Debug)]
pub struct TagCardinalityLimitConfig {
#[serde(flatten)]
pub global: TagCardinalityLimitInnerConfig,

/// Tag cardinality limits configuration per metric name.
#[configurable(derived)]
#[serde(default)]
pub per_metric_limits: Vec<PerMetricConfig>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It's better to use a map type here from name to PerMetricConfig.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, not sure why I picked Vec, map makes much more sense.

}

/// Configuration for the `tag_cardinality_limit` transform for a specific group of metrics.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct TagCardinalityLimitInnerConfig {
/// How many distinct values to accept for any given key.
#[serde(default = "default_value_limit")]
pub value_limit: usize,
Expand Down Expand Up @@ -77,6 +90,21 @@ pub enum LimitExceededAction {
DropEvent,
}

/// Tag cardinality limit configuration per metric name.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct PerMetricConfig {
/// Name of the metric this configuration refers to.
pub name: String,

/// Namespace of the metric this configuration refers to.
#[serde(default)]
pub namespace: Option<String>,

#[serde(flatten)]
pub config: TagCardinalityLimitInnerConfig,
}

const fn default_limit_exceeded_action() -> LimitExceededAction {
LimitExceededAction::DropTag
}
Expand All @@ -92,9 +120,12 @@ pub(crate) const fn default_cache_size() -> usize {
impl GenerateConfig for TagCardinalityLimitConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
mode: Mode::Exact,
value_limit: default_value_limit(),
limit_exceeded_action: default_limit_exceeded_action(),
global: TagCardinalityLimitInnerConfig {
mode: Mode::Exact,
value_limit: default_value_limit(),
limit_exceeded_action: default_limit_exceeded_action(),
},
per_metric_limits: Vec::default(),
})
.unwrap()
}
Expand Down
88 changes: 69 additions & 19 deletions src/transforms/tag_cardinality_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ mod tag_value_set;
mod tests;

use crate::event::metric::TagValueSet;
pub use config::TagCardinalityLimitConfig;
pub use config::{TagCardinalityLimitConfig, TagCardinalityLimitInnerConfig};
use tag_value_set::AcceptedTagValueSet;

type MetricId = (Option<String>, String);

#[derive(Debug)]
pub struct TagCardinalityLimit {
config: TagCardinalityLimitConfig,
accepted_tags: HashMap<String, AcceptedTagValueSet>,
accepted_tags: HashMap<Option<MetricId>, HashMap<String, AcceptedTagValueSet>>,
}

impl TagCardinalityLimit {
Expand All @@ -36,6 +38,22 @@ impl TagCardinalityLimit {
}
}

fn get_config_for_metric(
&self,
metric_key: &Option<MetricId>,
) -> &TagCardinalityLimitInnerConfig {
match metric_key {
Some(id) => self
.config
.per_metric_limits
.iter()
.find(|c| c.name == id.1 && (c.namespace.is_none() || c.namespace == id.0))
.map(|c| &c.config)
.unwrap_or(&self.config.global),
None => &self.config.global,
}
}

/// Takes in key and a value corresponding to a tag on an incoming Metric
/// Event. If that value is already part of set of accepted values for that
/// key, then simply returns true. If that value is not yet part of the
Expand All @@ -44,22 +62,30 @@ impl TagCardinalityLimit {
/// for the key and returns true, otherwise returns false. A false return
/// value indicates to the caller that the value is not accepted for this
/// key, and the configured limit_exceeded_action should be taken.
fn try_accept_tag(&mut self, key: &str, value: &TagValueSet) -> bool {
let tag_value_set = self.accepted_tags.entry_ref(key).or_insert_with(|| {
AcceptedTagValueSet::new(self.config.value_limit, &self.config.mode)
});
fn try_accept_tag(
&mut self,
metric_key: &Option<MetricId>,
key: &str,
value: &TagValueSet,
) -> bool {
let config = self.get_config_for_metric(metric_key).clone();
info!("try_accept_tag using config: {:?}", config);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove info!s here and below. If you would like to keep them we can make them trace!s.

let metric_accepted_tags = self.accepted_tags.entry(metric_key.clone()).or_default();
let tag_value_set = metric_accepted_tags
.entry_ref(key)
.or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode));

if tag_value_set.contains(value) {
// Tag value has already been accepted, nothing more to do.
return true;
}

// Tag value not yet part of the accepted set.
if tag_value_set.len() < self.config.value_limit {
if tag_value_set.len() < config.value_limit {
// accept the new value
tag_value_set.insert(value.clone());

if tag_value_set.len() == self.config.value_limit {
if tag_value_set.len() == config.value_limit {
emit!(TagCardinalityValueLimitReached { key });
}

Expand All @@ -72,34 +98,58 @@ impl TagCardinalityLimit {

/// Checks if recording a key and value corresponding to a tag on an incoming Metric would
/// exceed the cardinality limit.
fn tag_limit_exceeded(&self, key: &str, value: &TagValueSet) -> bool {
fn tag_limit_exceeded(
&self,
metric_key: &Option<MetricId>,
esensar marked this conversation as resolved.
Show resolved Hide resolved
key: &str,
value: &TagValueSet,
) -> bool {
self.accepted_tags
.get(key)
.map(|value_set| {
!value_set.contains(value) && value_set.len() >= self.config.value_limit
.get(metric_key)
.and_then(|metric_accepted_tags| {
metric_accepted_tags.get(key).map(|value_set| {
!value_set.contains(value)
&& value_set.len() >= self.get_config_for_metric(metric_key).value_limit
})
})
.unwrap_or(false)
}

/// Record a key and value corresponding to a tag on an incoming Metric.
fn record_tag_value(&mut self, key: &str, value: &TagValueSet) {
self.accepted_tags
fn record_tag_value(&mut self, metric_key: &Option<MetricId>, key: &str, value: &TagValueSet) {
let config = self.get_config_for_metric(metric_key).clone();
let metric_accepted_tags = self.accepted_tags.entry(metric_key.clone()).or_default();
metric_accepted_tags
.entry_ref(key)
.or_insert_with(|| AcceptedTagValueSet::new(self.config.value_limit, &self.config.mode))
.or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode))
.insert(value.clone());
}

fn transform_one(&mut self, mut event: Event) -> Option<Event> {
let metric = event.as_mut_metric();
let metric_name = metric.name().to_string();
let metric_namespace = metric.namespace().map(|n| n.to_string());
info!("The config: {:?}", self.config);
let has_per_metric_config = self.config.per_metric_limits.iter().any(|c| {
c.name == metric_name && (c.namespace.is_none() || c.namespace == metric_namespace)
});
let metric_key = if has_per_metric_config {
info!("Metric specific config has been found!");
Some((metric_namespace, metric_name.clone()))
} else {
None
};
if let Some(tags_map) = metric.tags_mut() {
match self.config.limit_exceeded_action {
match self
.get_config_for_metric(&metric_key)
.limit_exceeded_action
{
LimitExceededAction::DropEvent => {
// This needs to check all the tags, to ensure that the ordering of tag names
// doesn't change the behavior of the check.

for (key, value) in tags_map.iter_sets() {
if self.tag_limit_exceeded(key, value) {
if self.tag_limit_exceeded(&metric_key, key, value) {
emit!(TagCardinalityLimitRejectingEvent {
metric_name: &metric_name,
tag_key: key,
Expand All @@ -109,12 +159,12 @@ impl TagCardinalityLimit {
}
}
for (key, value) in tags_map.iter_sets() {
self.record_tag_value(key, value);
self.record_tag_value(&metric_key, key, value);
}
}
LimitExceededAction::DropTag => {
tags_map.retain(|key, value| {
if self.try_accept_tag(key, value) {
if self.try_accept_tag(&metric_key, key, value) {
true
} else {
emit!(TagCardinalityLimitRejectingTag {
Expand Down
Loading
Loading