Skip to content

Commit

Permalink
Refactor batcher and inline into SessionKeeper
Browse files Browse the repository at this point in the history
This commit does these things:
- Inlines batching flag to SessionKeeper, removing batcher entirely
as a separate entity with all of its logic(threshold, traffic trigger).
- Emits all actions from SessionKeeper if batching is enabled when
an action is added. This works nicely if we can guarantee that actions
are gonna be a multiple of some T which is the case as currently we
target T=70s for proxy, direct, vpn, stun keepalives. This also makes
thresholds and the whole logic around them irrelevant.

Traffic triggered batching implementation was doing more harm than good
since it triggers on _any_ traffic, meaning even when alignment was
achieved, it might misalign since any traffic triggers it.

Feature flags are not removed in this commit so not to push for
version update of libtelio, so now they are no-op.

Signed-off-by: Lukas Pukenis <[email protected]>

Signed-off-by: Lukas Pukenis <[email protected]>
  • Loading branch information
LukasPukenis committed Dec 18, 2024
1 parent 07f2d21 commit 2d34b68
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 1,618 deletions.
1,292 changes: 0 additions & 1,292 deletions crates/telio-traversal/src/batcher.rs

This file was deleted.

1 change: 0 additions & 1 deletion crates/telio-traversal/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![cfg_attr(docsrs, feature(doc_cfg))]

pub mod batcher;
pub mod connectivity_check;
pub mod endpoint_providers;
pub mod endpoint_state;
Expand Down
251 changes: 50 additions & 201 deletions crates/telio-traversal/src/session_keeper.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::batcher::{Batcher, BatcherTrait, BatchingOptions};
use async_trait::async_trait;
use socket2::Type;
use std::future::Future;
Expand All @@ -10,18 +9,11 @@ use surge_ping::{
SurgeError, ICMP,
};
use telio_crypto::PublicKey;
use telio_model::features::FeatureBatching;
use telio_sockets::SocketPool;
use telio_task::{task_exec, BoxAction, Runtime, Task};
use telio_utils::{
dual_target, repeated_actions, telio_log_debug, telio_log_warn, DualTarget, RepeatedActions,
};
use tokio::sync::watch;
use tokio::time::Instant;

use futures::future::{pending, BoxFuture};
use futures::FutureExt;

const PING_PAYLOAD_SIZE: usize = 56;

/// Possible [SessionKeeper] errors.
Expand Down Expand Up @@ -57,28 +49,19 @@ pub trait SessionKeeperTrait {
public_key: PublicKey,
target: dual_target::Target,
interval: Duration,
threshold: Option<Duration>,
) -> Result<()>;
async fn remove_node(&self, key: &PublicKey) -> Result<()>;
async fn get_interval(&self, key: &PublicKey) -> Option<u32>;
}

pub struct SessionKeeper {
batch_all: bool,
task: Task<State>,
}

impl SessionKeeper {
pub fn start(
sock_pool: Arc<SocketPool>,
batching_feature: FeatureBatching,
network_activity: Option<watch::Receiver<Instant>>,

#[cfg(test)] batcher: Box<dyn BatcherTrait<PublicKey, State>>,
) -> Result<Self> {
telio_log_debug!(
"Starting SessionKeeper with network subscriber: {}",
network_activity.is_some()
);
pub fn start(sock_pool: Arc<SocketPool>, batch_all: bool) -> Result<Self> {
telio_log_debug!("Starting with batch_all({})", batch_all);
let (client_v4, client_v6) = (
PingerClient::new(&Self::make_builder(ICMP::V4).build())
.map_err(|e| Error::PingerCreationError(ICMP::V4, e))?,
Expand All @@ -90,21 +73,14 @@ impl SessionKeeper {
sock_pool.make_internal(client_v6.get_socket().get_native_sock())?;

Ok(Self {
batch_all,
task: Task::start(State {
pingers: Pingers {
pinger_client_v4: client_v4,
pinger_client_v6: client_v6,
},

#[cfg(test)]
batched_actions: batcher,

#[cfg(not(test))]
batched_actions: Box::new(Batcher::new(batching_feature.into())),

nonbatched_actions: RepeatedActions::default(),

network_activity,
actions: RepeatedActions::default(),
}),
})
}
Expand Down Expand Up @@ -194,113 +170,83 @@ impl SessionKeeperTrait for SessionKeeper {
public_key: PublicKey,
target: dual_target::Target,
interval: Duration,
threshold: Option<Duration>,
) -> Result<()> {
let dual_target = DualTarget::new(target).map_err(Error::DualTargetError)?;
match threshold {
Some(t) => task_exec!(&self.task, async move |s| {
s.batched_actions.add(
public_key,
interval,
t,
Arc::new(move |c: &mut State| {
Box::pin(async move {
telio_log_debug!("Batch-Pinging: {:?}", public_key);
if let Err(e) = ping(&c.pingers, (&public_key, &dual_target)).await {
telio_log_warn!(
"Failed to batch-ping, peer with key: {:?}, error: {:?}",
public_key,
e
);
}
Ok(())
})
}),
);

Ok(())
})
.await
.map_err(Error::Task)?,

None => task_exec!(&self.task, async move |s| {
if s.nonbatched_actions.contains_action(&public_key) {
let _ = s.nonbatched_actions.remove_action(&public_key);
}
let batch_all = self.batch_all;
telio_log_debug!(
"Add action for {} and interval {:?}. batch_all({})",
public_key,
interval,
batch_all
);

Ok(s.nonbatched_actions.add_action(
public_key,
interval,
Arc::new(move |c| {
Box::pin(async move {
if let Err(e) = ping(&c.pingers, (&public_key, &dual_target)).await {
telio_log_warn!(
"Failed to ping, peer with key: {:?}, error: {:?}",
public_key,
e
);
}
Ok(())
})
}),
))
})
.await
.map_err(Error::Task)?
.map_err(Error::RepeatedActionError)
.map(|_| ())?,
}
task_exec!(&self.task, async move |s| {
if s.actions.contains_action(&public_key) {
let _ = s.actions.remove_action(&public_key);
}

let res = s.actions.add_action(
public_key,
interval,
Arc::new(move |c| {
Box::pin(async move {
if let Err(e) = ping(&c.pingers, (&public_key, &dual_target)).await {
telio_log_warn!(
"Failed to ping, peer with key: {:?}, error: {:?}",
public_key,
e
);
}
Ok(())
})
}),
);

if batch_all {
s.actions.set_all_immediate();
}

Ok(res)
})
.await
.map_err(Error::Task)?
.map_err(Error::RepeatedActionError)
.map(|_| ())?;

telio_log_debug!("Added {}", public_key);
Ok(())
}

async fn remove_node(&self, key: &PublicKey) -> Result<()> {
let pk = *key;
task_exec!(&self.task, async move |s| {
let _ = s.nonbatched_actions.remove_action(&pk);
let _ = s.batched_actions.remove(&pk);
let _ = s.actions.remove_action(&pk);
Ok(())
})
.await?;

Ok(())
}

// TODO: SK calls batched and nonbatched actions interchangibly, however call sites in general
// should be aware which one to call
async fn get_interval(&self, key: &PublicKey) -> Option<u32> {
let pk = *key;
task_exec!(&self.task, async move |s| {
if let Some(interval) = s.batched_actions.get_interval(&pk) {
Ok(Some(interval.as_secs() as u32))
} else {
Ok(s.nonbatched_actions.get_interval(&pk))
}
Ok(s.actions.get_interval(&pk))
})
.await
.unwrap_or(None)
}
}

impl From<FeatureBatching> for BatchingOptions {
fn from(f: FeatureBatching) -> Self {
Self {
trigger_effective_duration: Duration::from_secs(f.trigger_effective_duration.into()),
trigger_cooldown_duration: Duration::from_secs(f.trigger_cooldown_duration.into()),
}
}
}

struct Pingers {
pinger_client_v4: PingerClient,
pinger_client_v6: PingerClient,
}

struct State {
pingers: Pingers,
batched_actions: Box<dyn BatcherTrait<PublicKey, Self>>,
nonbatched_actions: RepeatedActions<PublicKey, Self, Result<()>>,
network_activity: Option<watch::Receiver<Instant>>,
actions: RepeatedActions<PublicKey, Self, Result<()>>,
}

#[async_trait]
Expand All @@ -312,26 +258,8 @@ impl Runtime for State {
where
F: Future<Output = BoxAction<Self, std::result::Result<(), Self::Err>>> + Send,
{
let last_network_activity = self
.network_activity
.as_mut()
.map(|receiver| *receiver.borrow_and_update());

let network_change_fut: BoxFuture<
'_,
std::result::Result<(), telio_utils::sync::watch::error::RecvError>,
> = {
match self.network_activity {
Some(ref mut na) => na.changed().boxed(),
None => pending::<()>().map(|_| Ok(())).boxed(),
}
};

tokio::select! {
_ = network_change_fut => {
return Ok(());
}
Ok((pk, action)) = self.nonbatched_actions.select_action() => {
Ok((pk, action)) = self.actions.select_action() => {
let pk = *pk;
action(self)
.await
Expand All @@ -340,15 +268,6 @@ impl Runtime for State {
Ok(())
}, |_| Ok(()))?;
}
Ok(batched_actions) = self.batched_actions.get_actions(last_network_activity) => {
for (pk, action) in batched_actions {
action(self).await.map_or_else(|e| {
telio_log_warn!("({}) Error sending batch-keepalive to {}: {:?}", Self::NAME, pk, e);
Ok(())
}, |_| Ok(()))?;
}
}

update = update => {
return update(self).await;
}
Expand All @@ -364,7 +283,6 @@ impl Runtime for State {
#[cfg(test)]
mod tests {
use super::*;
use crate::batcher::{BatcherError, MockBatcherTrait};
use std::net::{Ipv4Addr, Ipv6Addr};
use telio_crypto::PublicKey;
use telio_sockets::NativeProtector;
Expand All @@ -383,13 +301,7 @@ mod tests {
)
.unwrap(),
));
let sess_keep = SessionKeeper::start(
socket_pool,
FeatureBatching::default(),
None,
Box::new(Batcher::new(FeatureBatching::default().into())),
)
.unwrap();
let sess_keep = SessionKeeper::start(socket_pool, false).unwrap();

let pk = "REjdn4zY2TFx2AMujoNGPffo9vDiRDXpGG4jHPtx2AY="
.parse::<PublicKey>()
Expand All @@ -400,7 +312,6 @@ mod tests {
pk,
(Some(Ipv4Addr::LOCALHOST), Some(Ipv6Addr::LOCALHOST)),
PERIOD,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -456,66 +367,4 @@ mod tests {
.await
.unwrap();
}

#[tokio::test]
async fn test_batcher_invocation() {
const PERIOD: Duration = Duration::from_secs(20);

const THRESHOLD: Duration = Duration::from_secs(10);
let socket_pool = Arc::new(SocketPool::new(
NativeProtector::new(
#[cfg(target_os = "macos")]
false,
)
.unwrap(),
));

let mut batcher = Box::new(MockBatcherTrait::<telio_crypto::PublicKey, State>::new());

let pk = "REjdn4zY2TFx2AMujoNGPffo9vDiRDXpGG4jHPtx2AY="
.parse::<PublicKey>()
.unwrap();

use mockall::predicate::{always, eq};
batcher
.expect_add()
.once()
.with(eq(pk), eq(PERIOD), eq(THRESHOLD), always())
.return_once(|_, _, _, _| ());
batcher
.expect_remove()
.once()
.with(eq(pk))
.return_once(|_| ());

// it's hard to mock the exact return since it involves a complex type, however we
// can at least verify that the batcher's actions were queried
batcher
.expect_get_actions()
.times(..)
.returning(|_| Err(BatcherError::NoActions));

let sess_keep = SessionKeeper::start(
socket_pool,
FeatureBatching::default().into(),
None,
batcher,
)
.unwrap();

sess_keep
.add_node(
pk,
(Some(Ipv4Addr::LOCALHOST), Some(Ipv6Addr::LOCALHOST)),
PERIOD,
Some(THRESHOLD),
)
.await
.unwrap();

sess_keep.remove_node(&pk).await.unwrap();

// courtesy wait to be sure the runtime polls everything
sess_keep.stop().await;
}
}
Loading

0 comments on commit 2d34b68

Please sign in to comment.