Skip to content

Commit

Permalink
enhancement(sample transform): add stratified sampling capability (#2…
Browse files Browse the repository at this point in the history
…1274)

* (enhancement sample transform): add stratified sampling capability

* Convert group_by key to Template

* Generate documentation

* Add changelog

* Fix typo in docs

* more documentation clean up

* Fix formatting to pass linter, fix punctuation in docs
  • Loading branch information
hillmandj authored Oct 16, 2024
1 parent 7054ae0 commit 3baebb4
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `sample` transform can now take in a `group_by` configuration option that will allow logs with unique values for the patterns passed in to be sampled independently. This can reduce the complexity of the topology, since users would no longer need to create separate samplers with similar configuration to handle different log streams.

authors: hillmandj
14 changes: 14 additions & 0 deletions src/transforms/sample/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
TransformOutput,
},
schema,
template::Template,
transforms::Transform,
};

Expand Down Expand Up @@ -44,6 +45,16 @@ pub struct SampleConfig {
#[configurable(metadata(docs::examples = "message"))]
pub key_field: Option<String>,

/// The value to group events into separate buckets to be sampled independently.
///
/// If left unspecified, or if the event doesn't have `group_by`, then the event is not
/// sampled separately.
#[configurable(metadata(
docs::examples = "{{ service }}",
docs::examples = "{{ hostname }}-{{ service }}"
))]
pub group_by: Option<Template>,

/// A logical condition used to exclude events from sampling.
pub exclude: Option<AnyCondition>,
}
Expand All @@ -53,6 +64,7 @@ impl GenerateConfig for SampleConfig {
toml::Value::try_from(Self {
rate: 10,
key_field: None,
group_by: None,
exclude: None::<AnyCondition>,
})
.unwrap()
Expand All @@ -67,6 +79,7 @@ impl TransformConfig for SampleConfig {
Self::NAME.to_string(),
self.rate,
self.key_field.clone(),
self.group_by.clone(),
self.exclude
.as_ref()
.map(|condition| condition.build(&context.enrichment_tables))
Expand Down Expand Up @@ -126,6 +139,7 @@ mod tests {
let config = SampleConfig {
rate: 1,
key_field: None,
group_by: None,
exclude: None,
};
let (tx, rx) = mpsc::channel(1);
Expand Down
82 changes: 76 additions & 6 deletions src/transforms/sample/transform.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::collections::HashMap;
use vector_lib::config::LegacyKey;
use vrl::event_path;

use crate::{
conditions::Condition,
event::Event,
internal_events::SampleEventDiscarded,
sinks::prelude::TemplateRenderingError,
template::Template,
transforms::{FunctionTransform, OutputBuffer},
};

Expand All @@ -13,26 +16,29 @@ pub struct Sample {
name: String,
rate: u64,
key_field: Option<String>,
group_by: Option<Template>,
exclude: Option<Condition>,
count: u64,
counter: HashMap<Option<String>, u64>,
}

impl Sample {
// This function is dead code when the feature flag `transforms-impl-sample` is specified but not
// `transforms-sample`.
#![allow(dead_code)]
pub const fn new(
pub fn new(
name: String,
rate: u64,
key_field: Option<String>,
group_by: Option<Template>,
exclude: Option<Condition>,
) -> Self {
Self {
name,
rate,
key_field,
group_by,
exclude,
count: 0,
counter: HashMap::new(),
}
}
}
Expand Down Expand Up @@ -69,13 +75,42 @@ impl FunctionTransform for Sample {
})
.map(|v| v.to_string_lossy());

// Fetch actual field value if group_by option is set.
let group_by_key = self.group_by.as_ref().and_then(|group_by| match &event {
Event::Log(event) => group_by
.render_string(event)
.map_err(|error| {
emit!(TemplateRenderingError {
error,
field: Some("group_by"),
drop_event: false,
})
})
.ok(),
Event::Trace(event) => group_by
.render_string(event)
.map_err(|error| {
emit!(TemplateRenderingError {
error,
field: Some("group_by"),
drop_event: false,
})
})
.ok(),
Event::Metric(_) => panic!("component can never receive metric events"),
});

let counter_value: u64 = *self.counter.entry(group_by_key.clone()).or_default();

let num = if let Some(value) = value {
seahash::hash(value.as_bytes())
} else {
self.count
counter_value
};

self.count = (self.count + 1) % self.rate;
// reset counter for particular key, or default key if group_by option isn't provided
let increment: u64 = (counter_value + 1) % self.rate;
self.counter.insert(group_by_key.clone(), increment);

if num % self.rate == 0 {
match event {
Expand Down Expand Up @@ -134,6 +169,7 @@ mod tests {
"sample".to_string(),
2,
log_schema().message_key().map(ToString::to_string),
None,
Some(condition_contains(
log_schema().message_key().unwrap().to_string().as_str(),
"na",
Expand All @@ -156,6 +192,7 @@ mod tests {
"sample".to_string(),
25,
log_schema().message_key().map(ToString::to_string),
None,
Some(condition_contains(
log_schema().message_key().unwrap().to_string().as_str(),
"na",
Expand All @@ -181,6 +218,7 @@ mod tests {
"sample".to_string(),
2,
log_schema().message_key().map(ToString::to_string),
None,
Some(condition_contains(
log_schema().message_key().unwrap().to_string().as_str(),
"na",
Expand Down Expand Up @@ -216,6 +254,7 @@ mod tests {
"sample".to_string(),
0,
key_field.clone(),
None,
Some(condition_contains(
log_schema().message_key().unwrap().to_string().as_str(),
"important",
Expand All @@ -232,6 +271,33 @@ mod tests {
}
}

#[test]
fn handles_group_by() {
for group_by in &[None, Some(Template::try_from("{{ other_field }}").unwrap())] {
let mut event = Event::Log(LogEvent::from("nananana"));
let log = event.as_mut_log();
log.insert("other_field", "foo");
let mut sampler = Sample::new(
"sample".to_string(),
0,
log_schema().message_key().map(ToString::to_string),
group_by.clone(),
Some(condition_contains(
log_schema().message_key().unwrap().to_string().as_str(),
"na",
)),
);
let iterations = 0..1000;
let total_passed = iterations
.filter_map(|_| {
transform_one(&mut sampler, event.clone())
.map(|result| assert_eq!(result, event))
})
.count();
assert_eq!(total_passed, 1000);
}
}

#[test]
fn handles_key_field() {
for key_field in &[None, Some("other_field".into())] {
Expand All @@ -242,6 +308,7 @@ mod tests {
"sample".to_string(),
0,
key_field.clone(),
None,
Some(condition_contains("other_field", "foo")),
);
let iterations = 0..1000;
Expand All @@ -264,6 +331,7 @@ mod tests {
"sample".to_string(),
10,
key_field.clone(),
None,
Some(condition_contains(&message_key, "na")),
);
let passing = events
Expand All @@ -278,6 +346,7 @@ mod tests {
"sample".to_string(),
25,
key_field.clone(),
None,
Some(condition_contains(&message_key, "na")),
);
let passing = events
Expand All @@ -292,6 +361,7 @@ mod tests {
"sample".to_string(),
25,
key_field.clone(),
None,
Some(condition_contains(&message_key, "na")),
);
let event = Event::Log(LogEvent::from("nananana"));
Expand All @@ -304,7 +374,7 @@ mod tests {
fn handles_trace_event() {
let event: TraceEvent = LogEvent::from("trace").into();
let trace = Event::Trace(event);
let mut sampler = Sample::new("sample".to_string(), 2, None, None);
let mut sampler = Sample::new("sample".to_string(), 2, None, None, None);
let iterations = 0..2;
let total_passed = iterations
.filter_map(|_| transform_one(&mut sampler, trace.clone()))
Expand Down
13 changes: 13 additions & 0 deletions website/cue/reference/components/transforms/base/sample.cue
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,19 @@ base: components: transforms: sample: configuration: {
required: false
type: condition: {}
}
group_by: {
description: """
The value to group events into separate buckets to be sampled independently.
If left unspecified, or if the event doesn't have `group_by`, then the event is not
sampled separately.
"""
required: false
type: string: {
examples: ["{{ service }}", "{{ hostname }}-{{ service }}"]
syntax: "template"
}
}
key_field: {
description: """
The name of the field whose value is hashed to determine if the event should be
Expand Down

0 comments on commit 3baebb4

Please sign in to comment.