Skip to content

Commit 0e034ee

Browse files
enhancement(reduce transform): New setting for reduce transform: end_every_period_ms (#20440)
* New setting for reduce transform: end_every_period_ms * Correct changelog name to match pattern Signed-off-by: Jesse Szwedko <[email protected]> * regenerate component docs Signed-off-by: Jesse Szwedko <[email protected]> --------- Signed-off-by: Jesse Szwedko <[email protected]> Co-authored-by: Jesse Szwedko <[email protected]>
1 parent 6db92ac commit 0e034ee

File tree

5 files changed

+40
-0
lines changed

5 files changed

+40
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
A new configuration option `end_every_period_ms` is available on reduce transforms
2+
If supplied, every time this interval elapses for a given grouping, the reduced value
3+
for that grouping is flushed. Checked every `flush_period_ms`.
4+
5+
authors: charlesconnell

lib/vector-config/src/external/serde_with.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::cell::RefCell;
22

33
use vector_config_common::{attributes::CustomAttribute, constants};
44

5+
use crate::schema::generate_optional_schema;
56
use crate::{
67
num::NumberClass,
78
schema::{generate_number_schema, SchemaGenerator, SchemaObject},
@@ -132,3 +133,12 @@ impl Configurable for serde_with::DurationMilliSeconds<u64, serde_with::formats:
132133
Ok(generate_number_schema::<u64>())
133134
}
134135
}
136+
137+
impl Configurable for Option<serde_with::DurationMilliSeconds<u64, serde_with::formats::Strict>> {
138+
fn generate_schema(gen: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError>
139+
where
140+
Self: Sized,
141+
{
142+
generate_optional_schema(&u64::as_configurable_ref(), gen)
143+
}
144+
}

src/transforms/reduce/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ pub struct ReduceConfig {
3636
#[configurable(metadata(docs::human_name = "Expire After"))]
3737
pub expire_after_ms: Duration,
3838

39+
/// If supplied, every time this interval elapses for a given grouping, the reduced value
40+
/// for that grouping is flushed. Checked every flush_period_ms.
41+
#[serde_as(as = "Option<serde_with::DurationMilliSeconds<u64>>")]
42+
#[derivative(Default(value = "Option::None"))]
43+
#[configurable(metadata(docs::human_name = "End-Every Period"))]
44+
pub end_every_period_ms: Option<Duration>,
45+
3946
/// The interval to check for and flush any expired events, in milliseconds.
4047
#[serde(default = "default_flush_period_ms")]
4148
#[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]

src/transforms/reduce/transform.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ struct ReduceState {
2424
events: usize,
2525
fields: HashMap<KeyString, Box<dyn ReduceValueMerger>>,
2626
stale_since: Instant,
27+
creation: Instant,
2728
metadata: EventMetadata,
2829
}
2930

@@ -35,6 +36,7 @@ impl ReduceState {
3536
Self {
3637
events: 0,
3738
stale_since: Instant::now(),
39+
creation: Instant::now(),
3840
fields,
3941
metadata,
4042
}
@@ -93,6 +95,7 @@ impl ReduceState {
9395
pub struct Reduce {
9496
expire_after: Duration,
9597
flush_period: Duration,
98+
end_every_period: Option<Duration>,
9699
group_by: Vec<String>,
97100
merge_strategies: IndexMap<KeyString, MergeStrategy>,
98101
reduce_merge_states: HashMap<Discriminant, ReduceState>,
@@ -126,6 +129,7 @@ impl Reduce {
126129
Ok(Reduce {
127130
expire_after: config.expire_after_ms,
128131
flush_period: config.flush_period_ms,
132+
end_every_period: config.end_every_period_ms,
129133
group_by,
130134
merge_strategies: config.merge_strategies.clone(),
131135
reduce_merge_states: HashMap::new(),
@@ -139,6 +143,12 @@ impl Reduce {
139143
let mut flush_discriminants = Vec::new();
140144
let now = Instant::now();
141145
for (k, t) in &self.reduce_merge_states {
146+
if let Some(period) = self.end_every_period {
147+
if (now - t.creation) >= period {
148+
flush_discriminants.push(k.clone());
149+
}
150+
}
151+
142152
if (now - t.stale_since) >= self.expire_after {
143153
flush_discriminants.push(k.clone());
144154
}

website/cue/reference/components/transforms/base/reduce.cue

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
package metadata
22

33
base: components: transforms: reduce: configuration: {
4+
end_every_period_ms: {
5+
description: """
6+
If supplied, every time this interval elapses for a given grouping, the reduced value
7+
for that grouping is flushed. Checked every flush_period_ms.
8+
"""
9+
required: false
10+
type: uint: {}
11+
}
412
ends_when: {
513
description: """
614
A condition used to distinguish the final event of a transaction.

0 commit comments

Comments
 (0)