Skip to content

Commit 77cca74

Browse files
committed
Refactor batcher and inline into SessionKeeper
This commit does these things: - Inlines batching flag to SessionKeeper, removing batcher entirely as a separate entity with all of its logic(threshold, traffic trigger). - Emits all actions from SessionKeeper if batching is enabled when an action is added. This works nicely if we can guarantee that actions are gonna be a multiple of some T which is the case as currently we target T=70s for proxy, direct, vpn, stun keepalives. This also makes thresholds and the whole logic around them irrelevant. Traffic triggered batching implementation was doing more harm than good since it triggers on _any_ traffic, meaning even when alignment was achieved, it might misalign since any traffic triggers it. Feature flags are not removed in this commit so not to push for version update of libtelio, so now they are no-op. Signed-off-by: Lukas Pukenis <[email protected]>
1 parent 07f2d21 commit 77cca74

File tree

9 files changed

+267
-1665
lines changed

9 files changed

+267
-1665
lines changed

crates/telio-traversal/src/batcher.rs

Lines changed: 0 additions & 1292 deletions
This file was deleted.

crates/telio-traversal/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#![cfg_attr(docsrs, feature(doc_cfg))]
22

3-
pub mod batcher;
43
pub mod connectivity_check;
54
pub mod endpoint_providers;
65
pub mod endpoint_state;
Lines changed: 50 additions & 201 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::batcher::{Batcher, BatcherTrait, BatchingOptions};
21
use async_trait::async_trait;
32
use socket2::Type;
43
use std::future::Future;
@@ -10,18 +9,11 @@ use surge_ping::{
109
SurgeError, ICMP,
1110
};
1211
use telio_crypto::PublicKey;
13-
use telio_model::features::FeatureBatching;
1412
use telio_sockets::SocketPool;
1513
use telio_task::{task_exec, BoxAction, Runtime, Task};
1614
use telio_utils::{
1715
dual_target, repeated_actions, telio_log_debug, telio_log_warn, DualTarget, RepeatedActions,
1816
};
19-
use tokio::sync::watch;
20-
use tokio::time::Instant;
21-
22-
use futures::future::{pending, BoxFuture};
23-
use futures::FutureExt;
24-
2517
const PING_PAYLOAD_SIZE: usize = 56;
2618

2719
/// Possible [SessionKeeper] errors.
@@ -57,28 +49,19 @@ pub trait SessionKeeperTrait {
5749
public_key: PublicKey,
5850
target: dual_target::Target,
5951
interval: Duration,
60-
threshold: Option<Duration>,
6152
) -> Result<()>;
6253
async fn remove_node(&self, key: &PublicKey) -> Result<()>;
6354
async fn get_interval(&self, key: &PublicKey) -> Option<u32>;
6455
}
6556

6657
pub struct SessionKeeper {
58+
batch_all: bool,
6759
task: Task<State>,
6860
}
6961

7062
impl SessionKeeper {
71-
pub fn start(
72-
sock_pool: Arc<SocketPool>,
73-
batching_feature: FeatureBatching,
74-
network_activity: Option<watch::Receiver<Instant>>,
75-
76-
#[cfg(test)] batcher: Box<dyn BatcherTrait<PublicKey, State>>,
77-
) -> Result<Self> {
78-
telio_log_debug!(
79-
"Starting SessionKeeper with network subscriber: {}",
80-
network_activity.is_some()
81-
);
63+
pub fn start(sock_pool: Arc<SocketPool>, batch_all: bool) -> Result<Self> {
64+
telio_log_debug!("Starting with batch_all({})", batch_all);
8265
let (client_v4, client_v6) = (
8366
PingerClient::new(&Self::make_builder(ICMP::V4).build())
8467
.map_err(|e| Error::PingerCreationError(ICMP::V4, e))?,
@@ -90,21 +73,14 @@ impl SessionKeeper {
9073
sock_pool.make_internal(client_v6.get_socket().get_native_sock())?;
9174

9275
Ok(Self {
76+
batch_all,
9377
task: Task::start(State {
9478
pingers: Pingers {
9579
pinger_client_v4: client_v4,
9680
pinger_client_v6: client_v6,
9781
},
9882

99-
#[cfg(test)]
100-
batched_actions: batcher,
101-
102-
#[cfg(not(test))]
103-
batched_actions: Box::new(Batcher::new(batching_feature.into())),
104-
105-
nonbatched_actions: RepeatedActions::default(),
106-
107-
network_activity,
83+
actions: RepeatedActions::default(),
10884
}),
10985
})
11086
}
@@ -194,113 +170,83 @@ impl SessionKeeperTrait for SessionKeeper {
194170
public_key: PublicKey,
195171
target: dual_target::Target,
196172
interval: Duration,
197-
threshold: Option<Duration>,
198173
) -> Result<()> {
199174
let dual_target = DualTarget::new(target).map_err(Error::DualTargetError)?;
200-
match threshold {
201-
Some(t) => task_exec!(&self.task, async move |s| {
202-
s.batched_actions.add(
203-
public_key,
204-
interval,
205-
t,
206-
Arc::new(move |c: &mut State| {
207-
Box::pin(async move {
208-
telio_log_debug!("Batch-Pinging: {:?}", public_key);
209-
if let Err(e) = ping(&c.pingers, (&public_key, &dual_target)).await {
210-
telio_log_warn!(
211-
"Failed to batch-ping, peer with key: {:?}, error: {:?}",
212-
public_key,
213-
e
214-
);
215-
}
216-
Ok(())
217-
})
218-
}),
219-
);
220-
221-
Ok(())
222-
})
223-
.await
224-
.map_err(Error::Task)?,
225175

226-
None => task_exec!(&self.task, async move |s| {
227-
if s.nonbatched_actions.contains_action(&public_key) {
228-
let _ = s.nonbatched_actions.remove_action(&public_key);
229-
}
176+
let batch_all = self.batch_all;
177+
telio_log_debug!(
178+
"Add action for {} and interval {:?}. batch_all({})",
179+
public_key,
180+
interval,
181+
batch_all
182+
);
230183

231-
Ok(s.nonbatched_actions.add_action(
232-
public_key,
233-
interval,
234-
Arc::new(move |c| {
235-
Box::pin(async move {
236-
if let Err(e) = ping(&c.pingers, (&public_key, &dual_target)).await {
237-
telio_log_warn!(
238-
"Failed to ping, peer with key: {:?}, error: {:?}",
239-
public_key,
240-
e
241-
);
242-
}
243-
Ok(())
244-
})
245-
}),
246-
))
247-
})
248-
.await
249-
.map_err(Error::Task)?
250-
.map_err(Error::RepeatedActionError)
251-
.map(|_| ())?,
252-
}
184+
task_exec!(&self.task, async move |s| {
185+
if s.actions.contains_action(&public_key) {
186+
let _ = s.actions.remove_action(&public_key);
187+
}
253188

189+
let res = s.actions.add_action(
190+
public_key,
191+
interval,
192+
Arc::new(move |c| {
193+
Box::pin(async move {
194+
if let Err(e) = ping(&c.pingers, (&public_key, &dual_target)).await {
195+
telio_log_warn!(
196+
"Failed to ping, peer with key: {:?}, error: {:?}",
197+
public_key,
198+
e
199+
);
200+
}
201+
Ok(())
202+
})
203+
}),
204+
);
205+
206+
if batch_all {
207+
s.actions.set_all_immediate();
208+
}
209+
210+
Ok(res)
211+
})
212+
.await
213+
.map_err(Error::Task)?
214+
.map_err(Error::RepeatedActionError)
215+
.map(|_| ())?;
216+
217+
telio_log_debug!("Added {}", public_key);
254218
Ok(())
255219
}
256220

257221
async fn remove_node(&self, key: &PublicKey) -> Result<()> {
258222
let pk = *key;
259223
task_exec!(&self.task, async move |s| {
260-
let _ = s.nonbatched_actions.remove_action(&pk);
261-
let _ = s.batched_actions.remove(&pk);
224+
let _ = s.actions.remove_action(&pk);
262225
Ok(())
263226
})
264227
.await?;
265228

266229
Ok(())
267230
}
268231

269-
// TODO: SK calls batched and nonbatched actions interchangibly, however call sites in general
270-
// should be aware which one to call
271232
async fn get_interval(&self, key: &PublicKey) -> Option<u32> {
272233
let pk = *key;
273234
task_exec!(&self.task, async move |s| {
274-
if let Some(interval) = s.batched_actions.get_interval(&pk) {
275-
Ok(Some(interval.as_secs() as u32))
276-
} else {
277-
Ok(s.nonbatched_actions.get_interval(&pk))
278-
}
235+
Ok(s.actions.get_interval(&pk))
279236
})
280237
.await
281238
.unwrap_or(None)
282239
}
283240
}
284241

285-
impl From<FeatureBatching> for BatchingOptions {
286-
fn from(f: FeatureBatching) -> Self {
287-
Self {
288-
trigger_effective_duration: Duration::from_secs(f.trigger_effective_duration.into()),
289-
trigger_cooldown_duration: Duration::from_secs(f.trigger_cooldown_duration.into()),
290-
}
291-
}
292-
}
293-
294242
struct Pingers {
295243
pinger_client_v4: PingerClient,
296244
pinger_client_v6: PingerClient,
297245
}
298246

299247
struct State {
300248
pingers: Pingers,
301-
batched_actions: Box<dyn BatcherTrait<PublicKey, Self>>,
302-
nonbatched_actions: RepeatedActions<PublicKey, Self, Result<()>>,
303-
network_activity: Option<watch::Receiver<Instant>>,
249+
actions: RepeatedActions<PublicKey, Self, Result<()>>,
304250
}
305251

306252
#[async_trait]
@@ -312,26 +258,8 @@ impl Runtime for State {
312258
where
313259
F: Future<Output = BoxAction<Self, std::result::Result<(), Self::Err>>> + Send,
314260
{
315-
let last_network_activity = self
316-
.network_activity
317-
.as_mut()
318-
.map(|receiver| *receiver.borrow_and_update());
319-
320-
let network_change_fut: BoxFuture<
321-
'_,
322-
std::result::Result<(), telio_utils::sync::watch::error::RecvError>,
323-
> = {
324-
match self.network_activity {
325-
Some(ref mut na) => na.changed().boxed(),
326-
None => pending::<()>().map(|_| Ok(())).boxed(),
327-
}
328-
};
329-
330261
tokio::select! {
331-
_ = network_change_fut => {
332-
return Ok(());
333-
}
334-
Ok((pk, action)) = self.nonbatched_actions.select_action() => {
262+
Ok((pk, action)) = self.actions.select_action() => {
335263
let pk = *pk;
336264
action(self)
337265
.await
@@ -340,15 +268,6 @@ impl Runtime for State {
340268
Ok(())
341269
}, |_| Ok(()))?;
342270
}
343-
Ok(batched_actions) = self.batched_actions.get_actions(last_network_activity) => {
344-
for (pk, action) in batched_actions {
345-
action(self).await.map_or_else(|e| {
346-
telio_log_warn!("({}) Error sending batch-keepalive to {}: {:?}", Self::NAME, pk, e);
347-
Ok(())
348-
}, |_| Ok(()))?;
349-
}
350-
}
351-
352271
update = update => {
353272
return update(self).await;
354273
}
@@ -364,7 +283,6 @@ impl Runtime for State {
364283
#[cfg(test)]
365284
mod tests {
366285
use super::*;
367-
use crate::batcher::{BatcherError, MockBatcherTrait};
368286
use std::net::{Ipv4Addr, Ipv6Addr};
369287
use telio_crypto::PublicKey;
370288
use telio_sockets::NativeProtector;
@@ -383,13 +301,7 @@ mod tests {
383301
)
384302
.unwrap(),
385303
));
386-
let sess_keep = SessionKeeper::start(
387-
socket_pool,
388-
FeatureBatching::default(),
389-
None,
390-
Box::new(Batcher::new(FeatureBatching::default().into())),
391-
)
392-
.unwrap();
304+
let sess_keep = SessionKeeper::start(socket_pool, false).unwrap();
393305

394306
let pk = "REjdn4zY2TFx2AMujoNGPffo9vDiRDXpGG4jHPtx2AY="
395307
.parse::<PublicKey>()
@@ -400,7 +312,6 @@ mod tests {
400312
pk,
401313
(Some(Ipv4Addr::LOCALHOST), Some(Ipv6Addr::LOCALHOST)),
402314
PERIOD,
403-
None,
404315
)
405316
.await
406317
.unwrap();
@@ -456,66 +367,4 @@ mod tests {
456367
.await
457368
.unwrap();
458369
}
459-
460-
#[tokio::test]
461-
async fn test_batcher_invocation() {
462-
const PERIOD: Duration = Duration::from_secs(20);
463-
464-
const THRESHOLD: Duration = Duration::from_secs(10);
465-
let socket_pool = Arc::new(SocketPool::new(
466-
NativeProtector::new(
467-
#[cfg(target_os = "macos")]
468-
false,
469-
)
470-
.unwrap(),
471-
));
472-
473-
let mut batcher = Box::new(MockBatcherTrait::<telio_crypto::PublicKey, State>::new());
474-
475-
let pk = "REjdn4zY2TFx2AMujoNGPffo9vDiRDXpGG4jHPtx2AY="
476-
.parse::<PublicKey>()
477-
.unwrap();
478-
479-
use mockall::predicate::{always, eq};
480-
batcher
481-
.expect_add()
482-
.once()
483-
.with(eq(pk), eq(PERIOD), eq(THRESHOLD), always())
484-
.return_once(|_, _, _, _| ());
485-
batcher
486-
.expect_remove()
487-
.once()
488-
.with(eq(pk))
489-
.return_once(|_| ());
490-
491-
// it's hard to mock the exact return since it involves a complex type, however we
492-
// can at least verify that the batcher's actions were queried
493-
batcher
494-
.expect_get_actions()
495-
.times(..)
496-
.returning(|_| Err(BatcherError::NoActions));
497-
498-
let sess_keep = SessionKeeper::start(
499-
socket_pool,
500-
FeatureBatching::default().into(),
501-
None,
502-
batcher,
503-
)
504-
.unwrap();
505-
506-
sess_keep
507-
.add_node(
508-
pk,
509-
(Some(Ipv4Addr::LOCALHOST), Some(Ipv6Addr::LOCALHOST)),
510-
PERIOD,
511-
Some(THRESHOLD),
512-
)
513-
.await
514-
.unwrap();
515-
516-
sess_keep.remove_node(&pk).await.unwrap();
517-
518-
// courtesy wait to be sure the runtime polls everything
519-
sess_keep.stop().await;
520-
}
521370
}

0 commit comments

Comments
 (0)