diff --git a/crates/telio-traversal/src/batcher.rs b/crates/telio-traversal/src/batcher.rs deleted file mode 100644 index 651b5b03a..000000000 --- a/crates/telio-traversal/src/batcher.rs +++ /dev/null @@ -1,1292 +0,0 @@ -// Batcher is an optimizer of aligning actions to be done in batches -// with the goal to conserve resources, thus Batcher's main goals are: -// 1) Don't break existing functionality done by usual means -// 2) Align actions - -use futures::future::BoxFuture; -use std::fmt::Debug; -use std::hash::Hash; -use std::{collections::HashMap, sync::Arc}; - -use async_trait::async_trait; -use telio_utils::{telio_log_debug, telio_log_error, telio_log_warn}; - -use thiserror::Error as ThisError; -use tokio::time::{sleep_until, Duration, Instant}; - -type Action> = - Arc Fn(&'a mut V) -> BoxFuture<'a, R> + Sync + Send>; - -pub struct Batcher { - actions: HashMap)>, - last_trigger_checkpoint: Instant, - options: BatchingOptions, -} - -struct BatchEntry { - deadline: Instant, - interval: Duration, - threshold: Duration, -} - -/// Possible [Batcher] errors. -#[derive(ThisError, Debug)] -pub enum BatcherError { - /// No actions in batcher - #[error("No actions present")] - NoActions, -} - -#[cfg_attr(any(test, feature = "mockall"), mockall::automock)] -#[async_trait] -pub trait BatcherTrait: Send + Sync -where - K: Eq + Hash + Debug + Clone + Send + Sync, - V: Send + Sync, -{ - fn add(&mut self, key: K, interval: Duration, threshold: Duration, action: Action); - fn remove(&mut self, key: &K); - async fn get_actions( - &mut self, - last_network_activity: Option, - ) -> Result)>, BatcherError>; - fn get_interval(&self, key: &K) -> Option; -} - -#[derive(Copy, Clone, Debug)] -pub struct BatchingOptions { - pub trigger_effective_duration: Duration, - pub trigger_cooldown_duration: Duration, -} - -impl Default for BatchingOptions { - fn default() -> Self { - Self { - trigger_effective_duration: Duration::from_secs(10), - trigger_cooldown_duration: Duration::from_secs(60), - } - } -} - -impl Batcher -where - K: Eq + Hash + Send + Sync + Debug + Clone, - V: Send + Sync, -{ - // It's necessary not to allow too short intervals since they can drain resources. - // Historically, libtelio had highest frequency action of 5seconds for direct - // connection keepalive, thus we cap any action to have at least that. - const MIN_INTERVAL: Duration = Duration::from_secs(5); - - pub fn new(options: BatchingOptions) -> Self { - telio_log_debug!("Options for batcher: {:?}", options); - - Self { - actions: HashMap::new(), - options, - last_trigger_checkpoint: Instant::now(), - } - } - - pub fn is_trigger_outstanding(&self, latest_network_activity: Instant) -> bool { - if self.last_trigger_checkpoint.elapsed() <= self.options.trigger_cooldown_duration { - telio_log_debug!( - "Batcher trigger in cooldown: {}", - self.last_trigger_checkpoint.elapsed().as_secs() - ); - return false; - } - - latest_network_activity.elapsed() < self.options.trigger_effective_duration - } -} - -#[async_trait] -impl BatcherTrait for Batcher -where - K: Eq + Hash + Send + Sync + Debug + Clone, - V: Send + Sync, -{ - async fn get_actions( - &mut self, - last_network_activity: Option, - ) -> Result)>, BatcherError> { - if self.actions.is_empty() { - return Err(BatcherError::NoActions); - } - - fn collect_batch_jobs( - actions: &mut HashMap)>, - ) -> Vec<(K, Action)> { - let now = Instant::now(); - - let mut batched_actions: Vec<(K, Action)> = vec![]; - - for (key, action) in actions.iter_mut() { - let adjusted_deadline = now + action.0.threshold; - - if action.0.deadline <= adjusted_deadline { - action.0.deadline = now + action.0.interval; - batched_actions.push((key.clone(), action.1.clone())); - } - } - - batched_actions - } - - if let Some(closest_deadline) = self - .actions - .values() - .min_by_key(|entry| entry.0.deadline) - .map(|v| v.0.deadline) - { - let early_batch = last_network_activity.map(|na| self.is_trigger_outstanding(na)); - - if let Some(true) = early_batch { - let batched_actions = collect_batch_jobs(&mut self.actions); - - if !batched_actions.is_empty() { - // TODO(LLT-5807): if there's an eligible trigger, maybe it's worth just - // emitting all the actions there are. Given a big enough cooldown it should - // pose no harm at the cost of near perfect alignment in one go. This should be - // experimented with. - self.last_trigger_checkpoint = Instant::now(); - return Ok(batched_actions); - } - } - - // we failed to early batch any actions, so lets wait until the closest one resolves - _ = sleep_until(closest_deadline).await; - - let batched_actions = collect_batch_jobs(&mut self.actions); - - if batched_actions.is_empty() { - telio_log_error!("Batcher resolves with empty list of jobs"); - return Err(BatcherError::NoActions); - } - self.last_trigger_checkpoint = Instant::now(); - return Ok(batched_actions); - } else { - return Err(BatcherError::NoActions); - } - } - - /// Remove batcher action. Action is no longer eligible for batching - fn remove(&mut self, key: &K) { - telio_log_debug!("removing item from batcher with key({:?})", key); - self.actions.remove(key); - } - - /// Add batcher action. Batcher itself doesn't run the tasks and depends - /// on actions being polled and invoked. It is a passive component on itself. - /// Adding an action has immediate deadline thus action is invoked as soon - /// as it's polled. - /// - /// Higher threshold means more opportunities to batch the action, however too high - /// means wasting resources since the job will be batched alongside other jobs more - /// frequently than necessary. For example an infrequent job with high - /// threshold will batch together with higher frequency job(shorter interval) with no added - /// value. - /// - /// A threshold of 0 makes batcher act effectively as `RepeatedActions` and will have - /// no opportunities to be batched. A small threshold will limit batching opportunities. - /// - /// Proving that the threshold of half the period is enough between two actions with same interval - /// when being added at different times can be done visually. In the scenario below - /// we have two actions A and B that have same interval and threshold: - /// - /// Legends: - /// - `d` - next deadline - /// - `D` - current deadline - /// - `-` - time tick - /// - `*` - jobs threshold - /// - '_' - indicates that action is not yet added - /// - /// ```text - /// Step #1 - /// A: D------******d------******d------******d------******d-----> time - /// B: __d------******d------******d------******d------******d---> time - /// ``` - /// at the beginning, job A was added and thus is immediately emitted. B doesn't yet exist - /// as it's added a bit later. Once B is added, it's deadline is immediate and it's next - /// deadline is shifted after one interval. Once B is added it cannot batch A since it's - /// not within the threshold. - /// - /// however a nice thing happens on the next A deadline: - /// - /// ```text - /// Step #2 - /// A: d------******D------******d------******d------******d-----> time - /// B: __d------******d------******d------******d------******d---> time - /// ``` - /// - /// At this point A will batch action B as well, since it's `deadline-threshold` is within the reach - /// and the timelines suddenly align afterwards: - /// - /// ```text - /// Step #3 - /// A: d------******d------******d------******d------******d-----> time - /// B: __d------****d------******d------******d------******d-----> time - /// ``` - /// - /// From now on A and B will always emit together(order is not guaranteed nor maintained). - /// - /// This is however a simplified approach and is not as easy to prove exactly what happens - /// when: - /// - jobs have same interval but different threshold - /// - jobs have different intervals. - /// - jobs have different intervals and different thresholds - /// - /// Thus it's best to aim for: - /// - as high interval as possible - /// - threshold being close to half the interval - fn add(&mut self, key: K, interval: Duration, threshold: Duration, action: Action) { - telio_log_debug!( - "adding item to batcher with key({:?}), interval({:?}), threshold({:?})", - key, - interval, - threshold, - ); - - let interval = { - if interval < Self::MIN_INTERVAL { - telio_log_warn!( - "Interval of {:?} is too short, overriding to {:?}", - interval, - Self::MIN_INTERVAL - ); - Self::MIN_INTERVAL - } else { - interval - } - }; - - let max_threshold = interval / 2; - - let threshold = { - if threshold > max_threshold { - telio_log_warn!( - "Threshold of {:?} is higher than maximum allowed threshold calculated: {:?}, overriding", - threshold, - max_threshold - ); - max_threshold - } else { - threshold - } - }; - - let entry = BatchEntry { - deadline: Instant::now(), - interval, - threshold, - }; - - self.actions.insert(key, (entry, action)); - } - - fn get_interval(&self, key: &K) -> Option { - self.actions.get(key).map(|(entry, _)| entry.interval) - } -} - -#[cfg(test)] -mod tests { - use crate::batcher::{Batcher, BatcherError, BatcherTrait, BatchingOptions}; - use std::sync::Arc; - use tokio::sync::{watch, Mutex}; - use tokio::time::*; - - struct TestChecker { - values: Vec<(String, Instant)>, - } - - #[derive(Copy, Clone)] - struct FakeNetwork { - latency: Duration, - } - - impl FakeNetwork { - async fn send( - &self, - peer_a_activity: Arc>, - peer_b_activity: Arc>, - ) { - // We're concerned about any activity, thus we just send to channel of one peer - // to simulate the send-trigger and after latency we simulate receival on another - // channel - peer_a_activity.send(Instant::now()).unwrap(); - - // latency sim - sleep(self.latency).await; - - // Simulate peer receiving data after some latency - peer_b_activity.send(Instant::now()).unwrap(); - } - } - - #[tokio::test(start_paused = true)] - async fn test_triggered_batching() { - struct TestHelperStruct { - trigger_enabled: bool, - latency: Duration, - start_emit_traffic_at: Instant, - stop_emit_traffic_at: Instant, - emit_packet_every: Duration, - test_duration: Duration, - delay_to_add_second_peer: Duration, - left_batching_options: BatchingOptions, - right_batching_options: BatchingOptions, - } - - #[derive(Debug)] - struct TestHelperResult { - left_peer_avg_interval: Duration, - right_peer_avg_interval: Duration, - avg_aligned_interval: Duration, - } - - // testing is split into two parts and results are stored separately. This allows for - // flexibility when asserting results more accurately and expectedly - async fn test_helper(params: TestHelperStruct) -> (TestHelperResult, TestHelperResult) { - let (left_send, left_recv) = watch::channel(Instant::now()); - let (right_send, right_recv) = watch::channel(Instant::now()); - - let left_send = Arc::new(left_send); - let right_send = Arc::new(right_send); - - let fake_network = Arc::new(FakeNetwork { - latency: params.latency, - }); - - let mut batcher_left = - Batcher::>>::new(params.left_batching_options); - let mut batcher_right = - Batcher::>>::new(params.right_batching_options); - - let test_checker_left = Arc::new(Mutex::new(TestChecker { values: Vec::new() })); - let test_checker_right = Arc::new(Mutex::new(TestChecker { values: Vec::new() })); - - let first_half_left_values = Arc::new(Mutex::new(vec![])); - let first_half_right_values = Arc::new(Mutex::new(vec![])); - - // at half the testcase, save the values and erase existing ones to remove the bias - // when calculating test metrics - // - { - let test_duration = params.test_duration.clone(); - let test_checker_left = test_checker_left.clone(); - let test_checker_right = test_checker_right.clone(); - let first_half_left_values = first_half_left_values.clone(); - let first_half_right_values = first_half_right_values.clone(); - - tokio::spawn(async move { - sleep(test_duration / 2).await; - - let mut left = test_checker_left.lock().await; - let mut right = test_checker_right.lock().await; - - let mut l = first_half_left_values.lock().await; - let mut r = first_half_right_values.lock().await; - - *l = (*left.values).to_vec(); - *r = (*right.values).to_vec(); - - left.values.clear(); - right.values.clear() - }); - } - - // simulate emission of some organic traffic - { - let fake_network = fake_network.clone(); - let left_send = left_send.clone(); - let right_send = right_send.clone(); - let start = params.start_emit_traffic_at; - let deadline = params.stop_emit_traffic_at; - - let emit_packet_every = params.emit_packet_every; - - tokio::spawn(async move { - sleep_until(start).await; - - loop { - tokio::select! { - _ = sleep_until(deadline) => { - break - } - _ = sleep(emit_packet_every) => { - let _ = fake_network - .send(left_send.clone(), right_send.clone()) - .await; - - } - } - } - }); - } - - { - let fake_network = fake_network.clone(); - let left_send = left_send.clone(); - let right_send = right_send.clone(); - batcher_left.add( - "key".to_owned(), - Duration::from_secs(100), - Duration::from_secs(50), - Arc::new(move |s: _| { - let fake_net = Arc::clone(&fake_network); - let left_send = left_send.clone(); - let right_send = right_send.clone(); - Box::pin(async move { - { - let mut s = s.lock().await; - s.values.push(("key".to_owned(), Instant::now())); - } - - let _ = fake_net.send(left_send, right_send).await; - Ok(()) - }) - }), - ); - } - - { - let left_send = left_send.clone(); - let right_send = right_send.clone(); - let fake_network = fake_network.clone(); - - batcher_right.add( - "key".to_owned(), - Duration::from_secs(100), - Duration::from_secs(50), - Arc::new(move |s: _| { - let fake_net = Arc::clone(&fake_network); - let left_send = left_send.clone(); - let right_send = right_send.clone(); - Box::pin(async move { - { - let mut s = s.lock().await; - s.values.push(("key".to_owned(), Instant::now())); - } - let _ = fake_net.send(right_send, left_send).await; - Ok(()) - }) - }), - ); - } - - fn spawn_batcher_poller( - trigger: bool, - mut network_activity_sub: watch::Receiver, - mut batcher: Batcher>>, - mut checker: Arc>, - ) { - tokio::spawn(async move { - let mut last_activity = { - if trigger { - Some(Instant::now()) - } else { - None - } - }; - - loop { - tokio::select! { - Ok( _) = network_activity_sub.changed() => { - if trigger { - last_activity = Some(*network_activity_sub.borrow_and_update()); - } else { - last_activity = None; - } - - } - Ok(ac) = batcher.get_actions(last_activity) => { - for a in ac { - a.1(&mut checker).await.unwrap(); - } - } - } - } - }); - } - - spawn_batcher_poller( - params.trigger_enabled, - left_recv, - batcher_left, - test_checker_left.clone(), - ); - - // add another action after a delay to the peer so they would be misaligned - sleep(params.delay_to_add_second_peer).await; - - spawn_batcher_poller( - params.trigger_enabled, - right_recv, - batcher_right, - test_checker_right.clone(), - ); - - sleep(params.test_duration).await; - - let first_half_left_values = first_half_left_values - .lock() - .await - .iter() - .map(|v| v.1.elapsed()) - .collect::>(); - let first_half_right_values = first_half_right_values - .lock() - .await - .iter() - .map(|v| v.1.elapsed()) - .collect::>(); - - let second_half_left_values = test_checker_left - .lock() - .await - .values - .iter() - .map(|v| v.1.elapsed()) - .collect::>(); - let second_half_right_values = test_checker_right - .lock() - .await - .values - .iter() - .map(|v| v.1.elapsed()) - .collect::>(); - - fn average_alignment(left: &[Duration], right: &[Duration]) -> Duration { - if left.is_empty() || right.is_empty() { - panic!("no data present"); - } - - fn find_closest_value(v: Duration, data: &[Duration]) -> Duration { - data.iter() - .min_by_key(|&&ts| (ts.as_millis().abs_diff(v.as_millis()))) - .copied() - .unwrap_or(v) - } - - let mut total_diff = 0; - let mut count = 0; - - for &t1 in left { - let closest_t2 = find_closest_value(t1, right); - total_diff += t1.as_millis().abs_diff(closest_t2.as_millis()); - count += 1; - } - - Duration::from_millis((total_diff / count) as u64) - } - - fn average_difference(numbers: &[Duration]) -> Duration { - if numbers.len() < 2 { - panic!("not enough elements"); - } - - let total_diff: u128 = numbers - .windows(2) - .map(|w| (w[0].as_millis().abs_diff(w[1].as_millis() as u128))) - .sum(); - - let count = numbers.len() - 1; - Duration::from_millis((total_diff / count as u128) as u64) - } - - let res_first_half = TestHelperResult { - left_peer_avg_interval: average_difference(first_half_left_values.as_slice()), - right_peer_avg_interval: average_difference(first_half_right_values.as_slice()), - - avg_aligned_interval: average_alignment( - first_half_left_values.as_slice(), - first_half_right_values.as_slice(), - ), - }; - - let res_second_half = TestHelperResult { - left_peer_avg_interval: average_difference(second_half_left_values.as_slice()), - right_peer_avg_interval: average_difference(second_half_right_values.as_slice()), - - avg_aligned_interval: average_alignment( - second_half_left_values.as_slice(), - second_half_right_values.as_slice(), - ), - }; - - return (res_first_half, res_second_half); - } - - fn durations_close_enough(d1: Duration, d2: Duration, tolerance: Duration) -> bool { - let diff = if d1 > d2 { d1 - d2 } else { d2 - d1 }; - diff <= tolerance - } - - fn within_1s(d1: Duration, d2: Duration) -> bool { - durations_close_enough(d1, d2, Duration::from_secs(1)) - } - - fn within_1ms(d1: Duration, d2: Duration) -> bool { - durations_close_enough(d1, d2, Duration::from_millis(1)) - } - - // actual testcases - { - // due to traffic being triggered every second for the duration of the testcase, we do - // not expect effective batching to happen. This is because trigger doesn't differentiate the - // traffic and thus constant activity just means attempts to batch all the time which - // just means that intervals will be shortened to `T - threshold` but no alignment - // between peers happens. The trigger cooldown is also high enough that it doesn't - // interfere with average interval - let (res_first_half, res_second_half) = test_helper(TestHelperStruct { - trigger_enabled: true, - latency: Duration::from_millis(333), - start_emit_traffic_at: Instant::now(), - stop_emit_traffic_at: Instant::now() + Duration::from_secs(3600), - emit_packet_every: Duration::from_secs(1), - delay_to_add_second_peer: Duration::from_secs(17), - test_duration: Duration::from_secs(3600), - left_batching_options: BatchingOptions { - trigger_cooldown_duration: Duration::from_secs(600), - trigger_effective_duration: Duration::from_secs(10), - }, - right_batching_options: BatchingOptions { - trigger_cooldown_duration: Duration::from_secs(600), - trigger_effective_duration: Duration::from_secs(10), - }, - }) - .await; - - assert!(within_1s( - res_first_half.left_peer_avg_interval, - Duration::from_secs(100) - )); - assert!(within_1s( - res_first_half.right_peer_avg_interval, - Duration::from_secs(100) - )); - - assert!(within_1s( - res_second_half.left_peer_avg_interval, - Duration::from_secs(100) - )); - assert!(within_1s( - res_second_half.right_peer_avg_interval, - Duration::from_secs(100) - )); - - // 17seconds in the test we add a secod peer and expect this amount of misalignment - // between the peers - assert!(within_1s( - res_first_half.avg_aligned_interval, - Duration::from_secs(17) - )); - assert!(within_1s( - res_second_half.avg_aligned_interval, - Duration::from_secs(20) - )); - } - - { - // due to traffic being triggered every second for the duration of the testcase, we do - // not expect effective batching to happen. This is because trigger doesn't differentiate the - // traffic and thus constant activity just means attempts to batch all the time which - // just means that intervals will be shortened to `T - threshold` but no alignment - // between peers happens. The trigger cooldown is short enough to showcase that it just - // reduces the action intervals but doesn't achieve alignment - let (res_first_half, res_second_half) = test_helper(TestHelperStruct { - trigger_enabled: true, - latency: Duration::from_millis(333), - start_emit_traffic_at: Instant::now(), - stop_emit_traffic_at: Instant::now() + Duration::from_secs(3600), - emit_packet_every: Duration::from_secs(1), - delay_to_add_second_peer: Duration::from_secs(17), - test_duration: Duration::from_secs(3600), - left_batching_options: BatchingOptions { - trigger_cooldown_duration: Duration::from_secs(30), - trigger_effective_duration: Duration::from_secs(10), - }, - right_batching_options: BatchingOptions { - trigger_cooldown_duration: Duration::from_secs(30), - trigger_effective_duration: Duration::from_secs(10), - }, - }) - .await; - - assert!(within_1s( - res_first_half.left_peer_avg_interval, - Duration::from_secs(50) - )); - assert!(within_1s( - res_first_half.right_peer_avg_interval, - Duration::from_secs(50) - )); - - assert!(within_1s( - res_second_half.left_peer_avg_interval, - Duration::from_secs(50) - )); - assert!(within_1s( - res_second_half.right_peer_avg_interval, - Duration::from_secs(50) - )); - - // 17seconds in the test we add a secod peer and expect this amount of misalignment - // between the peers - assert!(within_1s( - res_first_half.avg_aligned_interval, - Duration::from_secs(17) - )); - assert!(within_1s( - res_second_half.avg_aligned_interval, - Duration::from_secs(18) - )); - } - - { - // Start emitting traffic in the middle of the testcase. It should have no effect since - // batcher already aligned both peers and triggering more often has no effect on that except - // on local interval shortening(side effect or triggering) - let (res_first_half, res_second_half) = test_helper(TestHelperStruct { - trigger_enabled: true, - latency: Duration::from_millis(333), - start_emit_traffic_at: Instant::now() + Duration::from_secs(1800), - stop_emit_traffic_at: Instant::now() + Duration::from_secs(3600), - emit_packet_every: Duration::from_secs(1), - delay_to_add_second_peer: Duration::from_secs(17), - test_duration: Duration::from_secs(3600), - left_batching_options: BatchingOptions { - trigger_cooldown_duration: Duration::from_secs(30), - trigger_effective_duration: Duration::from_secs(10), - }, - right_batching_options: BatchingOptions { - trigger_cooldown_duration: Duration::from_secs(30), - trigger_effective_duration: Duration::from_secs(10), - }, - }) - .await; - - assert!(within_1s( - res_first_half.left_peer_avg_interval, - Duration::from_secs(100) - )); - assert!(within_1s( - res_first_half.right_peer_avg_interval, - Duration::from_secs(100) - )); - - assert!(within_1s( - res_first_half.avg_aligned_interval, - Duration::from_secs(1) - )); - - assert!(within_1s( - res_second_half.left_peer_avg_interval, - Duration::from_secs(50) - )); - - assert!(within_1s( - res_second_half.right_peer_avg_interval, - Duration::from_secs(50) - )); - - assert!(within_1ms( - res_second_half.avg_aligned_interval, - Duration::from_millis(333), - )); - } - } - // - #[tokio::test] - async fn no_actions() { - let mut batcher = Batcher::::new(BatchingOptions::default()); - assert!(matches!( - batcher.get_actions(None).await, - Err(BatcherError::NoActions) - )); - } - - #[tokio::test(start_paused = true)] - async fn no_threshold_expect_no_batching() { - let start_time = tokio::time::Instant::now(); - let mut batcher = Batcher::::new(BatchingOptions::default()); - let mut test_checker = TestChecker { values: Vec::new() }; - - batcher.add( - "key0".to_owned(), - Duration::from_secs(100), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key0".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - - // simulate some delay before adding a second task so they would be misaligned - advance(Duration::from_secs(20)).await; - - batcher.add( - "key1".to_owned(), - Duration::from_secs(100), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key1".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - // Await for few actions to resolve - for _ in 0..6 { - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - } - - // expect no batched behaviour since the thresholds were zero - assert!(test_checker.values[0].1.duration_since(start_time) == Duration::from_secs(0)); - assert!(test_checker.values[1].1.duration_since(start_time) == Duration::from_secs(20)); - assert!(test_checker.values[2].1.duration_since(start_time) == Duration::from_secs(100)); - assert!(test_checker.values[3].1.duration_since(start_time) == Duration::from_secs(120)); - assert!(test_checker.values[4].1.duration_since(start_time) == Duration::from_secs(200)); - assert!(test_checker.values[5].1.duration_since(start_time) == Duration::from_secs(220)); - assert!(test_checker.values[6].1.duration_since(start_time) == Duration::from_secs(300)); - } - - #[tokio::test(start_paused = true)] - async fn batch_one_no_threshold_expect_no_batch() { - let start_time = tokio::time::Instant::now(); - let mut batcher = Batcher::::new(BatchingOptions::default()); - batcher.add( - "key".to_owned(), - Duration::from_secs(100), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - let mut test_checker = TestChecker { values: Vec::new() }; - - // pick up the immediate fire - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - assert!(test_checker.values.len() == 1); - - // pick up the second event - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - - assert!(test_checker.values.len() == 2); - - assert!(test_checker.values[0].1.duration_since(start_time) == Duration::from_secs(0)); - assert!(test_checker.values[1].1.duration_since(start_time) == Duration::from_secs(100)); - - // after a pause of retrieving actions there will be a delay in signals as well as they are not active on their own - tokio::time::advance(Duration::from_secs(550)).await; - assert!(test_checker.values.len() == 2); - - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - - assert!(test_checker.values.len() == 3); - assert!(test_checker.values[2].1.duration_since(start_time) == Duration::from_secs(650)); - } - - #[tokio::test(start_paused = true)] - async fn batch_two_with_threshold_and_delay() { - let start_time = tokio::time::Instant::now(); - let mut batcher = Batcher::::new(BatchingOptions::default()); - batcher.add( - "key0".to_owned(), - Duration::from_secs(100), - Duration::from_secs(50), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key0".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - tokio::time::advance(Duration::from_secs(30)).await; - - batcher.add( - "key1".to_owned(), - Duration::from_secs(100), - Duration::from_secs(50), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key1".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - // At this point there are two actions added, one at t(0) and another at t(30). - let mut test_checker = TestChecker { values: Vec::new() }; - - // pick up the immediate fires - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - assert!(test_checker.values.len() == 2); - - // Do again and batching should be in action since the threshold of the second signal is bigger(50) than the delay between the packets(30) - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - - assert!(test_checker.values.len() == 4); - - let key0_entries: Vec = test_checker - .values - .iter() - .filter(|e| e.0 == "key0") - .map(|e| e.1.duration_since(start_time)) - .collect(); - let key1_entries: Vec = test_checker - .values - .iter() - .filter(|e| e.0 == "key1") - .map(|e| e.1.duration_since(start_time)) - .collect(); - - // Immediate fires were supressed because we need to poll them and we did so only after 30seconds - // Thus everything will be aligned at 30seconds - - assert!(key0_entries[0] == Duration::from_secs(30)); - assert!(key1_entries[0] == Duration::from_secs(30)); - - assert!(key0_entries[1] == Duration::from_secs(130)); - assert!(key1_entries[1] == Duration::from_secs(130)); - } - - #[tokio::test(start_paused = true)] - async fn batch_two_with_threshold_check_threshold_limit_enforcement() { - let start_time = tokio::time::Instant::now(); - let mut batcher = Batcher::::new(BatchingOptions::default()); - batcher.add( - "key0".to_owned(), - Duration::from_secs(30), - Duration::from_secs(30), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key0".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - let mut test_checker = TestChecker { values: Vec::new() }; - - // pick up the immediate fire - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - tokio::time::advance(Duration::from_secs(5)).await; - - batcher.add( - "key1".to_owned(), - Duration::from_secs(5), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key1".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - for _ in 0..8 { - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - } - - let key0_entries: Vec = test_checker - .values - .iter() - .filter(|e| e.0 == "key0") - .map(|e| e.1.duration_since(start_time)) - .collect(); - let key1_entries: Vec = test_checker - .values - .iter() - .filter(|e| e.0 == "key1") - .map(|e| e.1.duration_since(start_time)) - .collect(); - - // At this point we expect the first job to be batched alongside the - // second, very frequent job at half the interval(capped) - assert!(key0_entries[0] == Duration::from_secs(0)); - assert!(key0_entries[1] == Duration::from_secs(15)); - assert!(key0_entries[2] == Duration::from_secs(30)); - - assert!(key1_entries[0] == Duration::from_secs(5)); - assert!(key1_entries[1] == Duration::from_secs(10)); - assert!(key1_entries[2] == Duration::from_secs(15)); - assert!(key1_entries[3] == Duration::from_secs(20)); - } - - #[tokio::test(start_paused = true)] - async fn batch_two_with_threshold_check_interval_limit_enforcement() { - let start_time = tokio::time::Instant::now(); - let mut batcher = Batcher::::new(BatchingOptions::default()); - batcher.add( - "key0".to_owned(), - // Make interval and threshold too small and expect override - Duration::from_secs(1), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key0".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - let mut test_checker = TestChecker { values: Vec::new() }; - - // pick up the immediate fire - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - tokio::time::advance(Duration::from_secs(1)).await; - - batcher.add( - "key1".to_owned(), - // Make interval too small and expect override - Duration::from_secs(2), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key1".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - for _ in 0..8 { - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - } - - let key0_entries: Vec = test_checker - .values - .iter() - .filter(|e| e.0 == "key0") - .map(|e| e.1.duration_since(start_time)) - .collect(); - let key1_entries: Vec = test_checker - .values - .iter() - .filter(|e| e.0 == "key1") - .map(|e| e.1.duration_since(start_time)) - .collect(); - - // Expect forcing minimal interval of 5seconds - assert!(key0_entries[0] == Duration::from_secs(0)); - assert!(key0_entries[1] == Duration::from_secs(5)); - assert!(key0_entries[2] == Duration::from_secs(10)); - assert!(key0_entries[3] == Duration::from_secs(15)); - - assert!(key1_entries[0] == Duration::from_secs(1)); - assert!(key1_entries[1] == Duration::from_secs(6)); - assert!(key1_entries[2] == Duration::from_secs(11)); - assert!(key1_entries[3] == Duration::from_secs(16)); - } - - #[tokio::test(start_paused = true)] - async fn batch_two_no_threshold_delayed_check() { - let _start_time = tokio::time::Instant::now(); - let mut batcher = Batcher::::new(BatchingOptions::default()); - batcher.add( - "key0".to_owned(), - Duration::from_secs(100), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key0".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - tokio::time::advance(Duration::from_secs(30)).await; - - batcher.add( - "key1".to_owned(), - Duration::from_secs(100), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values - .push(("key1".to_owned(), tokio::time::Instant::now())); - Ok(()) - }) - }), - ); - - // At this point there are two actions added, one at t(0) and another at t(30). - let mut test_checker = TestChecker { values: Vec::new() }; - - // pick up the immediate fire - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - assert!( - test_checker.values.len() == 2, - "Both actions should be emitted immediately" - ); - - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - - assert!(test_checker.values.len() == 4, "Even though there was no threshold and actions were added at different times, we query only after adding the second one which resets both deadlines and aligns them"); - } - - #[tokio::test(start_paused = true)] - async fn batch_two_no_threshold_nodelay_check() { - let start_time = Instant::now(); - let mut batcher = Batcher::::new(BatchingOptions::default()); - batcher.add( - "key0".to_owned(), - Duration::from_secs(100), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values.push(("key0".to_owned(), Instant::now())); - Ok(()) - }) - }), - ); - - let mut test_checker = TestChecker { values: Vec::new() }; - // pick up the immediate fire - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - assert!(test_checker.values.len() == 1); - - tokio::time::advance(Duration::from_secs(30)).await; - - batcher.add( - "key1".to_owned(), - Duration::from_secs(100), - Duration::from_secs(0), - Arc::new(|s: _| { - Box::pin(async move { - s.values.push(("key1".to_owned(), Instant::now())); - Ok(()) - }) - }), - ); - - // pick up the immediate fire from the second action - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - assert!(test_checker.values.len() == 2); - - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - - assert!( - test_checker.values.len() == 3, - "No threshold but a delay was given, thus one signal should have resolved" - ); - - test_checker.values = vec![]; - for _ in 0..12 { - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - } - - let key0sum: tokio::time::Duration = test_checker - .values - .iter() - .filter(|e| e.0 == "key0") - .map(|e| e.1.duration_since(start_time)) - .collect::>() - .windows(2) - .map(|w| w[1] - w[0]) - .sum(); - - let key1sum: tokio::time::Duration = test_checker - .values - .iter() - .filter(|e| e.0 == "key1") - .map(|e| e.1.duration_since(start_time)) - .collect::>() - .windows(2) - .map(|w| w[1] - w[0]) - .sum(); - - // Because of misalignment, each call produces only one action instead of both - assert!(key0sum == Duration::from_secs(500)); - assert!(key1sum == Duration::from_secs(500)); - - // Now let's wait a bit so upon iterating again signals would be aligned - test_checker.values = vec![]; - tokio::time::advance(Duration::from_secs(500)).await; - - for _i in 0..11 { - for ac in batcher.get_actions(None).await.unwrap() { - ac.1(&mut test_checker).await.unwrap(); - } - } - - let key0sum: Duration = test_checker - .values - .iter() - .filter(|e| e.0 == "key0") - .map(|e| e.1.duration_since(start_time)) - .collect::>() - .windows(2) - .map(|w| w[1] - w[0]) - .sum(); - - let key1sum: Duration = test_checker - .values - .iter() - .filter(|e| e.0 == "key1") - .map(|e| e.1.duration_since(start_time)) - .collect::>() - .windows(2) - .map(|w| w[1] - w[0]) - .sum(); - - assert!(key0sum == Duration::from_secs(1000)); - assert!(key1sum == Duration::from_secs(1000)); - } -} diff --git a/crates/telio-traversal/src/lib.rs b/crates/telio-traversal/src/lib.rs index c1f72450e..73057365a 100644 --- a/crates/telio-traversal/src/lib.rs +++ b/crates/telio-traversal/src/lib.rs @@ -1,6 +1,5 @@ #![cfg_attr(docsrs, feature(doc_cfg))] -pub mod batcher; pub mod connectivity_check; pub mod endpoint_providers; pub mod endpoint_state; diff --git a/crates/telio-traversal/src/session_keeper.rs b/crates/telio-traversal/src/session_keeper.rs index ef17fb4b0..bca0a55d2 100644 --- a/crates/telio-traversal/src/session_keeper.rs +++ b/crates/telio-traversal/src/session_keeper.rs @@ -1,4 +1,3 @@ -use crate::batcher::{Batcher, BatcherTrait, BatchingOptions}; use async_trait::async_trait; use socket2::Type; use std::future::Future; @@ -10,18 +9,11 @@ use surge_ping::{ SurgeError, ICMP, }; use telio_crypto::PublicKey; -use telio_model::features::FeatureBatching; use telio_sockets::SocketPool; use telio_task::{task_exec, BoxAction, Runtime, Task}; use telio_utils::{ dual_target, repeated_actions, telio_log_debug, telio_log_warn, DualTarget, RepeatedActions, }; -use tokio::sync::watch; -use tokio::time::Instant; - -use futures::future::{pending, BoxFuture}; -use futures::FutureExt; - const PING_PAYLOAD_SIZE: usize = 56; /// Possible [SessionKeeper] errors. @@ -57,28 +49,19 @@ pub trait SessionKeeperTrait { public_key: PublicKey, target: dual_target::Target, interval: Duration, - threshold: Option, ) -> Result<()>; async fn remove_node(&self, key: &PublicKey) -> Result<()>; async fn get_interval(&self, key: &PublicKey) -> Option; } pub struct SessionKeeper { + batch_all: bool, task: Task, } impl SessionKeeper { - pub fn start( - sock_pool: Arc, - batching_feature: FeatureBatching, - network_activity: Option>, - - #[cfg(test)] batcher: Box>, - ) -> Result { - telio_log_debug!( - "Starting SessionKeeper with network subscriber: {}", - network_activity.is_some() - ); + pub fn start(sock_pool: Arc, batch_all: bool) -> Result { + telio_log_debug!("Starting with batch_all({})", batch_all); let (client_v4, client_v6) = ( PingerClient::new(&Self::make_builder(ICMP::V4).build()) .map_err(|e| Error::PingerCreationError(ICMP::V4, e))?, @@ -90,21 +73,14 @@ impl SessionKeeper { sock_pool.make_internal(client_v6.get_socket().get_native_sock())?; Ok(Self { + batch_all, task: Task::start(State { pingers: Pingers { pinger_client_v4: client_v4, pinger_client_v6: client_v6, }, - #[cfg(test)] - batched_actions: batcher, - - #[cfg(not(test))] - batched_actions: Box::new(Batcher::new(batching_feature.into())), - - nonbatched_actions: RepeatedActions::default(), - - network_activity, + actions: RepeatedActions::default(), }), }) } @@ -194,71 +170,58 @@ impl SessionKeeperTrait for SessionKeeper { public_key: PublicKey, target: dual_target::Target, interval: Duration, - threshold: Option, ) -> Result<()> { let dual_target = DualTarget::new(target).map_err(Error::DualTargetError)?; - match threshold { - Some(t) => task_exec!(&self.task, async move |s| { - s.batched_actions.add( - public_key, - interval, - t, - Arc::new(move |c: &mut State| { - Box::pin(async move { - telio_log_debug!("Batch-Pinging: {:?}", public_key); - if let Err(e) = ping(&c.pingers, (&public_key, &dual_target)).await { - telio_log_warn!( - "Failed to batch-ping, peer with key: {:?}, error: {:?}", - public_key, - e - ); - } - Ok(()) - }) - }), - ); - - Ok(()) - }) - .await - .map_err(Error::Task)?, - None => task_exec!(&self.task, async move |s| { - if s.nonbatched_actions.contains_action(&public_key) { - let _ = s.nonbatched_actions.remove_action(&public_key); - } + let batch_all = self.batch_all; + telio_log_debug!( + "Add action for {} and interval {:?}. batch_all({})", + public_key, + interval, + batch_all + ); - Ok(s.nonbatched_actions.add_action( - public_key, - interval, - Arc::new(move |c| { - Box::pin(async move { - if let Err(e) = ping(&c.pingers, (&public_key, &dual_target)).await { - telio_log_warn!( - "Failed to ping, peer with key: {:?}, error: {:?}", - public_key, - e - ); - } - Ok(()) - }) - }), - )) - }) - .await - .map_err(Error::Task)? - .map_err(Error::RepeatedActionError) - .map(|_| ())?, - } + task_exec!(&self.task, async move |s| { + if s.actions.contains_action(&public_key) { + let _ = s.actions.remove_action(&public_key); + } + let res = s.actions.add_action( + public_key, + interval, + Arc::new(move |c| { + Box::pin(async move { + if let Err(e) = ping(&c.pingers, (&public_key, &dual_target)).await { + telio_log_warn!( + "Failed to ping, peer with key: {:?}, error: {:?}", + public_key, + e + ); + } + Ok(()) + }) + }), + ); + + if batch_all { + s.actions.set_all_immediate(); + } + + Ok(res) + }) + .await + .map_err(Error::Task)? + .map_err(Error::RepeatedActionError) + .map(|_| ())?; + + telio_log_debug!("Added {}", public_key); Ok(()) } async fn remove_node(&self, key: &PublicKey) -> Result<()> { let pk = *key; task_exec!(&self.task, async move |s| { - let _ = s.nonbatched_actions.remove_action(&pk); - let _ = s.batched_actions.remove(&pk); + let _ = s.actions.remove_action(&pk); Ok(()) }) .await?; @@ -266,31 +229,16 @@ impl SessionKeeperTrait for SessionKeeper { Ok(()) } - // TODO: SK calls batched and nonbatched actions interchangibly, however call sites in general - // should be aware which one to call async fn get_interval(&self, key: &PublicKey) -> Option { let pk = *key; task_exec!(&self.task, async move |s| { - if let Some(interval) = s.batched_actions.get_interval(&pk) { - Ok(Some(interval.as_secs() as u32)) - } else { - Ok(s.nonbatched_actions.get_interval(&pk)) - } + Ok(s.actions.get_interval(&pk)) }) .await .unwrap_or(None) } } -impl From for BatchingOptions { - fn from(f: FeatureBatching) -> Self { - Self { - trigger_effective_duration: Duration::from_secs(f.trigger_effective_duration.into()), - trigger_cooldown_duration: Duration::from_secs(f.trigger_cooldown_duration.into()), - } - } -} - struct Pingers { pinger_client_v4: PingerClient, pinger_client_v6: PingerClient, @@ -298,9 +246,7 @@ struct Pingers { struct State { pingers: Pingers, - batched_actions: Box>, - nonbatched_actions: RepeatedActions>, - network_activity: Option>, + actions: RepeatedActions>, } #[async_trait] @@ -312,26 +258,8 @@ impl Runtime for State { where F: Future>> + Send, { - let last_network_activity = self - .network_activity - .as_mut() - .map(|receiver| *receiver.borrow_and_update()); - - let network_change_fut: BoxFuture< - '_, - std::result::Result<(), telio_utils::sync::watch::error::RecvError>, - > = { - match self.network_activity { - Some(ref mut na) => na.changed().boxed(), - None => pending::<()>().map(|_| Ok(())).boxed(), - } - }; - tokio::select! { - _ = network_change_fut => { - return Ok(()); - } - Ok((pk, action)) = self.nonbatched_actions.select_action() => { + Ok((pk, action)) = self.actions.select_action() => { let pk = *pk; action(self) .await @@ -340,15 +268,6 @@ impl Runtime for State { Ok(()) }, |_| Ok(()))?; } - Ok(batched_actions) = self.batched_actions.get_actions(last_network_activity) => { - for (pk, action) in batched_actions { - action(self).await.map_or_else(|e| { - telio_log_warn!("({}) Error sending batch-keepalive to {}: {:?}", Self::NAME, pk, e); - Ok(()) - }, |_| Ok(()))?; - } - } - update = update => { return update(self).await; } @@ -364,7 +283,6 @@ impl Runtime for State { #[cfg(test)] mod tests { use super::*; - use crate::batcher::{BatcherError, MockBatcherTrait}; use std::net::{Ipv4Addr, Ipv6Addr}; use telio_crypto::PublicKey; use telio_sockets::NativeProtector; @@ -383,13 +301,7 @@ mod tests { ) .unwrap(), )); - let sess_keep = SessionKeeper::start( - socket_pool, - FeatureBatching::default(), - None, - Box::new(Batcher::new(FeatureBatching::default().into())), - ) - .unwrap(); + let sess_keep = SessionKeeper::start(socket_pool, false).unwrap(); let pk = "REjdn4zY2TFx2AMujoNGPffo9vDiRDXpGG4jHPtx2AY=" .parse::() @@ -400,7 +312,6 @@ mod tests { pk, (Some(Ipv4Addr::LOCALHOST), Some(Ipv6Addr::LOCALHOST)), PERIOD, - None, ) .await .unwrap(); @@ -456,66 +367,4 @@ mod tests { .await .unwrap(); } - - #[tokio::test] - async fn test_batcher_invocation() { - const PERIOD: Duration = Duration::from_secs(20); - - const THRESHOLD: Duration = Duration::from_secs(10); - let socket_pool = Arc::new(SocketPool::new( - NativeProtector::new( - #[cfg(target_os = "macos")] - false, - ) - .unwrap(), - )); - - let mut batcher = Box::new(MockBatcherTrait::::new()); - - let pk = "REjdn4zY2TFx2AMujoNGPffo9vDiRDXpGG4jHPtx2AY=" - .parse::() - .unwrap(); - - use mockall::predicate::{always, eq}; - batcher - .expect_add() - .once() - .with(eq(pk), eq(PERIOD), eq(THRESHOLD), always()) - .return_once(|_, _, _, _| ()); - batcher - .expect_remove() - .once() - .with(eq(pk)) - .return_once(|_| ()); - - // it's hard to mock the exact return since it involves a complex type, however we - // can at least verify that the batcher's actions were queried - batcher - .expect_get_actions() - .times(..) - .returning(|_| Err(BatcherError::NoActions)); - - let sess_keep = SessionKeeper::start( - socket_pool, - FeatureBatching::default().into(), - None, - batcher, - ) - .unwrap(); - - sess_keep - .add_node( - pk, - (Some(Ipv4Addr::LOCALHOST), Some(Ipv6Addr::LOCALHOST)), - PERIOD, - Some(THRESHOLD), - ) - .await - .unwrap(); - - sess_keep.remove_node(&pk).await.unwrap(); - - // courtesy wait to be sure the runtime polls everything - sess_keep.stop().await; - } } diff --git a/crates/telio-utils/src/repeated_actions.rs b/crates/telio-utils/src/repeated_actions.rs index c30a62922..f972be677 100644 --- a/crates/telio-utils/src/repeated_actions.rs +++ b/crates/telio-utils/src/repeated_actions.rs @@ -29,7 +29,6 @@ pub enum RepeatedActionError { /// Single action type pub type RepeatedAction = Arc Fn(&'a mut V) -> BoxFuture<'a, R> + Sync + Send>; -type Action = (K, (Interval, RepeatedAction)); type Result = std::result::Result; /// Main struct container, that hold all actions @@ -57,6 +56,13 @@ where } } + /// Set all actions to be executed when polled next time + pub fn set_all_immediate(&mut self) { + self.actions + .values_mut() + .for_each(|v| v.0.reset_immediately()); + } + /// Add single action (first tick is immediate) pub fn add_action( &mut self, @@ -128,17 +134,6 @@ where } } -impl From<[Action; N]> for RepeatedActions -where - K: Eq + Hash + Send + Sync, -{ - fn from(arr: [Action; N]) -> Self { - RepeatedActions:: { - actions: HashMap::from(arr), - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -171,6 +166,111 @@ mod tests { self.test = str; Ok(()) } + + pub fn get(&self) -> &str { + &self.test + } + } + + #[tokio::test(start_paused = true)] + async fn test_set_all_immediate() { + let mut ctx = Context::new("test".to_owned()); + + let start = Instant::now(); + + ctx.actions + .add_action( + "action_0".to_owned(), + Duration::from_secs(10), + Arc::new({ + let start = start.clone(); + move |s: _| { + Box::pin({ + let start = start.clone(); + async move { + s.change(format!("ts_{}", start.elapsed().as_secs()).to_owned()) + .await + } + }) + } + }), + ) + .unwrap(); + + // immediate action + ctx.actions.select_action().await.unwrap().1(&mut ctx) + .await + .unwrap(); + + tokio::time::advance(Duration::from_secs(3)).await; + + ctx.actions + .add_action( + "action_1".to_owned(), + Duration::from_secs(10), + Arc::new({ + let start = start.clone(); + move |s: _| { + Box::pin({ + let start = start.clone(); + async move { + s.change(format!("ts_{}", start.elapsed().as_secs()).to_owned()) + .await + } + }) + } + }), + ) + .unwrap(); + + // immediate action + ctx.actions.select_action().await.unwrap().1(&mut ctx) + .await + .unwrap(); + + // at this point in time there's 7seconds until action_0 and 10seconds until action_1 + + ctx.actions.select_action().await.unwrap().1(&mut ctx) + .await + .unwrap(); + ctx.actions.select_action().await.unwrap().1(&mut ctx) + .await + .unwrap(); + + { + let mut values_to_expect = vec!["ts_20", "ts_23", "ts_30", "ts_33"]; + values_to_expect.reverse(); + + loop { + if values_to_expect.len() == 0 { + break; + } + ctx.actions.select_action().await.unwrap().1(&mut ctx) + .await + .unwrap(); + + assert_eq!(ctx.get(), values_to_expect.pop().unwrap()); + } + } + + // we have proven that two actions are misaligned + ctx.actions.set_all_immediate(); + + { + let mut values_to_expect = vec!["ts_33", "ts_33", "ts_43", "ts_43"]; + values_to_expect.reverse(); + + loop { + if values_to_expect.len() == 0 { + break; + } + ctx.actions.select_action().await.unwrap().1(&mut ctx) + .await + .unwrap(); + + assert_eq!(ctx.get(), values_to_expect.pop().unwrap()); + } + } } #[tokio::test] diff --git a/crates/telio-wg/src/wg.rs b/crates/telio-wg/src/wg.rs index a6b562f3e..347a9d07c 100644 --- a/crates/telio-wg/src/wg.rs +++ b/crates/telio-wg/src/wg.rs @@ -240,8 +240,6 @@ struct State { stats: HashMap>>, ip_stack: Option, - - network_activity: Option>, } const POLL_MILLIS: u64 = 1000; @@ -337,7 +335,6 @@ impl DynamicWg { io: Io, cfg: Config, link_detection: Option, - batching: Option, ipv6_enabled: bool, ) -> Result where @@ -349,25 +346,17 @@ impl DynamicWg { io, adapter, link_detection, - batching, cfg, ipv6_enabled, )); #[cfg(windows)] - return Ok(Self::start_with( - io, - adapter, - link_detection, - batching, - ipv6_enabled, - )); + return Ok(Self::start_with(io, adapter, link_detection, ipv6_enabled)); } fn start_with( io: Io, adapter: Box, link_detection: Option, - batching: Option, #[cfg(unix)] cfg: Config, ipv6_enabled: bool, ) -> Self { @@ -386,7 +375,6 @@ impl DynamicWg { libtelio_event: io.libtelio_wide_event_publisher, stats: HashMap::new(), ip_stack: None, - network_activity: batching.map(|_| watch::channel(Instant::now()).0), }), } } @@ -414,19 +402,6 @@ impl DynamicWg { Err(Error::RestartFailed) } } - - /// Returns a channel that can be used to observe WireGuard network activity - pub async fn subscribe_to_network_activity( - &self, - ) -> Result>, Error> { - Ok(task_exec!(&self.task, async move |s| { - match s.network_activity { - Some(ref mut na) => Ok(Some(na.subscribe())), - None => Ok(None), - } - }) - .await?) - } } #[async_trait] @@ -941,21 +916,13 @@ impl State { mut to: uapi::Interface, reason: UpdateReason, ) -> Result { - let mut new_network_activity = false; - for (pk, peer) in &mut to.peers { match self.stats.get_mut(pk) { Some(stats) => match stats.lock().as_mut() { Ok(s) => { - let before_rx = s.rx_bytes; - let before_tx = s.tx_bytes; - let new_rx = peer.rx_bytes.unwrap_or_default(); let new_tx = peer.tx_bytes.unwrap_or_default(); - if new_rx > before_rx || new_tx > before_tx { - new_network_activity = true; - } s.update(new_rx, new_tx); } Err(e) => { @@ -976,12 +943,6 @@ impl State { } self.stats.retain(|pk, _| to.peers.contains_key(pk)); - if new_network_activity { - if let Some(ref mut na) = self.network_activity { - let _ = na.send(Instant::now()); - } - } - // Diff and report events // Adapter doesn't keep track of mesh addresses, or endpoint changes, diff --git a/nat-lab/tests/test_batching.py b/nat-lab/tests/test_batching.py index 6ed0a5e74..63a625f54 100644 --- a/nat-lab/tests/test_batching.py +++ b/nat-lab/tests/test_batching.py @@ -38,7 +38,9 @@ def _generate_setup_parameters( conn_tag: ConnectionTag, adapter: TelioAdapterType, batching: bool ) -> SetupParameters: - features = features_with_endpoint_providers([EndpointProvider.STUN]) + features = features_with_endpoint_providers( + [EndpointProvider.STUN, EndpointProvider.LOCAL] + ) features.link_detection = FeatureLinkDetection( rtt_seconds=1, no_of_pings=1, use_for_downgrade=True @@ -67,6 +69,22 @@ def _generate_setup_parameters( ALL_NODES = [ + ( + ConnectionTag.DOCKER_UPNP_CLIENT_1, + TelioAdapterType.LINUX_NATIVE_TUN, + ), + ( + ConnectionTag.DOCKER_UPNP_CLIENT_2, + TelioAdapterType.LINUX_NATIVE_TUN, + ), + ( + ConnectionTag.DOCKER_OPEN_INTERNET_CLIENT_1, + TelioAdapterType.LINUX_NATIVE_TUN, + ), + ( + ConnectionTag.DOCKER_OPEN_INTERNET_CLIENT_2, + TelioAdapterType.LINUX_NATIVE_TUN, + ), ( ConnectionTag.DOCKER_CONE_CLIENT_1, TelioAdapterType.LINUX_NATIVE_TUN, @@ -177,6 +195,8 @@ async def start_node_manually(client, node, sleep_s): if not client.is_node(node) ]) + print("All peers directly interconnected") + pyro5_ports = [ int(port) for port in {client.get_proxy_port() for client in env.clients} ] diff --git a/nat-lab/tests/timeouts.py b/nat-lab/tests/timeouts.py index 1dfb344c0..3b693906b 100644 --- a/nat-lab/tests/timeouts.py +++ b/nat-lab/tests/timeouts.py @@ -12,4 +12,4 @@ TEST_NODE_STATE_FLICKERING_RELAY_TIMEOUT = 180 TEST_NODE_STATE_FLICKERING_DIRECT_TIMEOUT = 180 TEST_MESH_STATE_AFTER_DISCONNECTING_NODE_TIMEOUT = 300 -TEST_BATCHING_TIMEOUT = 1000 +TEST_BATCHING_TIMEOUT = 600 diff --git a/src/device.rs b/src/device.rs index 00c2cdb56..50215ad06 100644 --- a/src/device.rs +++ b/src/device.rs @@ -1120,7 +1120,6 @@ impl Runtime { firewall_reset_connections, }, features.link_detection, - features.batching, features.ipv6, )?); let wg_events = wg_events.rx; @@ -1325,11 +1324,7 @@ impl Runtime { let session_keeper = { match SessionKeeper::start( self.entities.socket_pool.clone(), - self.features.batching.unwrap_or_default(), - self.entities - .wireguard_interface - .subscribe_to_network_activity() - .await?, + self.features.batching.is_some(), ) { Ok(sk) => Some(Arc::new(sk)), Err(e) => { @@ -1341,10 +1336,13 @@ impl Runtime { // Batching optimisations work by employing SessionKeeper. If SessionKeeper is not present // functionality will break when offloading actions to it, thus we disable the feature - if session_keeper.is_none() && self.features.batching.is_some() { - telio_log_warn!( - "Batching feature is enabled but SessionKeeper failed to start. Disabling batching." - ); + if session_keeper.is_none() { + if self.features.batching.is_some() { + // Batching feature enables batching for various kinds of keepalives provided by + // SessionKeeper. As a fallback, disable batcher in case SessionKeeper fails to + // start to preerve maximum operatibility of libtelio + telio_log_warn!("Batching feature is enabled but SessionKeeper failed to start. Disabling batching."); + } self.features.batching = None; } diff --git a/src/device/wg_controller.rs b/src/device/wg_controller.rs index 9d74307e5..1e0e982de 100644 --- a/src/device/wg_controller.rs +++ b/src/device/wg_controller.rs @@ -366,6 +366,11 @@ async fn consolidate_wg_peers< let actual_keys: HashSet<&PublicKey> = actual_peers.keys().collect(); let delete_keys = &actual_keys - &requested_keys; let insert_keys = &requested_keys - &actual_keys; + telio_log_debug!( + "actual_keys: {}, requested_keys: {}", + actual_peers.keys().len(), + requested_peers.keys().len(), + ); let update_keys = &requested_keys & &actual_keys; for key in delete_keys { @@ -390,11 +395,6 @@ async fn consolidate_wg_peers< } } - let batcher_threshold = features - .batching - .as_ref() - .map(|b| Duration::from_secs(b.direct_connection_threshold as u64)); - for key in insert_keys { telio_log_info!("Inserting peer: {:?}", requested_peers.get(key)); let peer = requested_peers.get(key).ok_or(Error::PeerNotFound)?; @@ -405,21 +405,17 @@ async fn consolidate_wg_peers< // Add peer to session keeper if needed match (session_keeper, peer.batching_keepalive_interval) { (Some(sk), Some(keepalive_interval)) => { + telio_log_debug!("Adding to SessionKeeper"); let target = build_ping_endpoint(&ip_addresses, features.ipv6); if target.0.is_some() || target.1.is_some() { - sk.add_node( - *key, - target, - Duration::from_secs(keepalive_interval as u64), - batcher_threshold, - ) - .await?; + sk.add_node(*key, target, Duration::from_secs(keepalive_interval as u64)) + .await?; } else { telio_log_warn!("Peer {:?} has no ip address", key); } } (None, _) => telio_log_debug!("The session keeper is missing!"), - _ => (), + _ => {} } if let Some(stun) = stun_ep_provider { @@ -465,13 +461,8 @@ async fn consolidate_wg_peers< let target = build_ping_endpoint(&requested_peer.peer.ip_addresses, features.ipv6); if target.0.is_some() || target.1.is_some() { - sk.add_node( - *key, - target, - Duration::from_secs(interval as u64), - batcher_threshold, - ) - .await?; + sk.add_node(*key, target, Duration::from_secs(interval as u64)) + .await?; } else { telio_log_warn!("Peer {:?} has no ip address", key); } @@ -548,7 +539,6 @@ async fn consolidate_wg_peers< .unwrap_or(requested_state.keepalive_periods.direct) .into(), ), - None, ) .await?; } else { @@ -767,6 +757,10 @@ async fn build_requested_peers_list< (_, pq_keys) => { let preshared_key = pq_keys.map(|pq| pq.pq_shared); + telio_log_debug!( + "Will insert with keepalive of {:?}", + persistent_keepalive_interval + ); requested_peers.insert( exit_node.public_key, RequestedPeer { @@ -2112,10 +2106,7 @@ mod tests { } } - fn then_keeper_add_node( - &mut self, - input: Vec<(PublicKey, IpAddr, Option, u32, Option)>, - ) { + fn then_keeper_add_node(&mut self, input: Vec<(PublicKey, IpAddr, Option, u32)>) { for i in input { let ip4 = { match i.1 { @@ -2141,9 +2132,8 @@ mod tests { eq(i.0), eq((Some(ip4), ip6)), eq(Duration::from_secs(i.3.into())), - eq(i.4), ) - .return_once(|_, _, _, _| Ok(())); + .return_once(|_, _, _| Ok(())); } } @@ -2268,13 +2258,7 @@ mod tests { )]); if batching { - f.then_keeper_add_node(vec![( - pub_key, - ip1, - Some(ip1v6), - proxying_keepalive_time, - Some(Duration::default()), - )]); + f.then_keeper_add_node(vec![(pub_key, ip1, Some(ip1v6), proxying_keepalive_time)]); } f.consolidate_peers().await; @@ -2333,17 +2317,7 @@ mod tests { allowed_ips, )]); - f.then_keeper_add_node(vec![( - pub_key, - ip1, - Some(ip1v6), - direct_keepalive_period, - if batching { - Some(Duration::default()) - } else { - None - }, - )]); + f.then_keeper_add_node(vec![(pub_key, ip1, Some(ip1v6), direct_keepalive_period)]); f.session_keeper .expect_get_interval() @@ -2690,17 +2664,7 @@ mod tests { Some(Duration::from_secs(DEFAULT_PEER_UPGRADE_WINDOW)), )]); - f.then_keeper_add_node(vec![( - pub_key, - ip1, - Some(ip1v6), - direct_keepalive_period, - if batching { - Some(Duration::default()) - } else { - None - }, - )]); + f.then_keeper_add_node(vec![(pub_key, ip1, Some(ip1v6), direct_keepalive_period)]); f.consolidate_peers().await; } @@ -2911,7 +2875,6 @@ mod tests { allowed_ips[0], None, proxying_keepalive_period, - Some(Duration::from_secs(0)), )]); } else { f.then_keeper_del_node(vec![(pub_key)]); @@ -2985,7 +2948,6 @@ mod tests { IpAddr::from(VPN_INTERNAL_IPV4), Some(IpAddr::from(VPN_INTERNAL_IPV6)), vpn_persistent_keepalive, - Some(Duration::from_secs(0)), )]); } @@ -3136,7 +3098,6 @@ mod tests { IpAddr::from([100, 64, 0, 4]), Some(IpAddr::from([0xfd74, 0x656c, 0x696f, 0, 0, 0, 0, 4])), stun_persistent_keepalive, - Some(Duration::from_secs(0)), )]); } @@ -3202,7 +3163,6 @@ mod tests { ip1, Some(ip1v6), TEST_DIRECT_PERSISTENT_KEEPALIVE_PERIOD, - None, )]); f.consolidate_peers().await;