Skip to content

Commit

Permalink
Implement auto retries under a config feature flag
Browse files Browse the repository at this point in the history
Signed-off-by: Mateusz Szczygieł <[email protected]>
  • Loading branch information
matszczygiel committed Sep 24, 2024
1 parent d5f4cba commit 5465b2c
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 33 deletions.
59 changes: 29 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Add `base_dir` field to the `FilePending` event
* Update moose tracker to v13.1.0
* Adds the `transfer_intent_received` event
* Add internal retries and put under a config feature `auto_retry_interval_ms`

---
<br>
Expand Down
2 changes: 2 additions & 0 deletions drop-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct DropConfig {
// Default value is 256KB.
pub checksum_events_granularity: u64,
pub connection_retries: u32,
pub auto_retry_interval: Option<Duration>,
}

impl Default for DropConfig {
Expand All @@ -28,6 +29,7 @@ impl Default for DropConfig {
checksum_events_size_threshold: None,
checksum_events_granularity: 256 * 1024,
connection_retries: 5,
auto_retry_interval: None,
}
}
}
Expand Down
34 changes: 31 additions & 3 deletions drop-transfer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,26 @@ use std::{
net::IpAddr,
path::{Component, Path},
sync::Arc,
time::{Instant, SystemTime},
time::{Duration, Instant, SystemTime},
};

use drop_analytics::{InitEventData, Moose, TransferStateEventData};
use drop_config::DropConfig;
use drop_core::Status;
use drop_storage::Storage;
use slog::{debug, trace, Logger};
use tokio::sync::{mpsc, Semaphore};
use tokio::{
sync::{mpsc, Semaphore},
time::MissedTickBehavior,
};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

use crate::{
auth,
error::ResultExt,
manager::{self},
tasks::AliveWaiter,
tasks::{AliveGuard, AliveWaiter},
transfer::Transfer,
ws::{self, EventTxFactory},
Error, Event, FileId, TransferManager,
Expand Down Expand Up @@ -106,6 +109,10 @@ impl Service {

manager::resume(&refresh_trigger.subscribe(), &state, &logger, &guard, &stop).await;

if let Some(interval) = state.config.auto_retry_interval {
spawn_auto_retry_loop(refresh_trigger.clone(), interval, guard.clone());
}

Ok(Self {
refresh_trigger,
state,
Expand Down Expand Up @@ -335,3 +342,24 @@ fn validate_dest_path(parent_dir: &Path) -> crate::Result<()> {

Ok(())
}

fn spawn_auto_retry_loop(
trigger: tokio::sync::watch::Sender<()>,
interval: Duration,
guard: AliveGuard,
) {
let mut interval = tokio::time::interval(interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

tokio::spawn(async move {
let _guard = guard;

loop {
interval.tick().await;
if trigger.send(()).is_err() {
// the receivers are closed
break;
}
}
});
}
6 changes: 6 additions & 0 deletions norddrop/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

#[derive(Debug)]
pub struct Config {
pub dir_depth_limit: u64,
Expand All @@ -8,6 +10,7 @@ pub struct Config {
pub checksum_events_size_threshold: Option<u64>,
pub checksum_events_granularity: Option<u64>,
pub connection_retries: Option<u32>,
pub auto_retry_interval_ms: Option<u32>,
}

impl Config {
Expand All @@ -31,6 +34,7 @@ impl From<Config> for drop_config::Config {
checksum_events_size_threshold,
checksum_events_granularity,
connection_retries,
auto_retry_interval_ms,
} = val;

drop_config::Config {
Expand All @@ -43,6 +47,8 @@ impl From<Config> for drop_config::Config {
.unwrap_or(Config::default_checksum_granularity() as _),
connection_retries: connection_retries
.unwrap_or(Config::default_connection_retries()),
auto_retry_interval: auto_retry_interval_ms
.map(|ms| Duration::from_millis(ms as _)),
},
moose: drop_config::MooseConfig {
event_path: moose_event_path,
Expand Down
10 changes: 10 additions & 0 deletions norddrop/src/norddrop.udl
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ dictionary Config {

/// Limits the number of connection retries afer the `network_refresh()` call.
u32? connection_retries;

/// Enable auto retry loop inside libdrop. Conceptually this means that
/// libdrop is calling `network_refresh()` automatically with the given
/// period in milliseconds. When set to `null` the feature is disabled
/// and the application needs to call `network_refresh()` manually.
/// Note the setting `connection_retries` still applies, meaning the retry
/// is executed in burst with this number of count.
/// For example for a single retry every 5 seconds the application needs to
/// set `connection_retries = 1` and `auto_retry_interval_ms = 5000`.
u32? auto_retry_interval_ms;
};

/// Posible log levels.
Expand Down
3 changes: 3 additions & 0 deletions test/drop_test/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,18 +974,21 @@ def __init__(
dbpath: str = ":memory:",
checksum_events_size_threshold=2**32, # don't emit events for existing tests
checksum_events_granularity=None,
auto_retry_interval_ms=None,
):
self._addr = addr
self._dbpath = dbpath
self._checksum_events_size_threshold = checksum_events_size_threshold
self._checksum_events_granularity = checksum_events_granularity
self._auto_retry_interval_ms = auto_retry_interval_ms

async def run(self, drop: ffi.Drop):
drop.start(
peer_resolver.resolve(self._addr),
self._dbpath,
self._checksum_events_size_threshold,
self._checksum_events_granularity,
self._auto_retry_interval_ms,
)

def __str__(self):
Expand Down
2 changes: 2 additions & 0 deletions test/drop_test/ffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ def start(
dbpath: str,
checksum_events_size_threshold=None,
checksum_events_granularity=None,
auto_retry_interval_ms=None,
):
cfg = norddrop.Config(
dir_depth_limit=5,
Expand All @@ -332,6 +333,7 @@ def start(
checksum_events_size_threshold=checksum_events_size_threshold,
checksum_events_granularity=checksum_events_granularity,
connection_retries=1,
auto_retry_interval_ms=auto_retry_interval_ms,
)

self._instance.start(addr, cfg)
Expand Down
Loading

0 comments on commit 5465b2c

Please sign in to comment.