Skip to content

Commit

Permalink
make data transfer timeout configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
manio committed Dec 30, 2024
1 parent 3362ec7 commit 7282f4c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ OPTIONS:
-s, --stats-interval <SECONDS>
Interval of showing data transfer statistics (0 = disabled) [default: 0]
-t, --timeout-secs <SECONDS>
Data transfer timeout [default: 5]
-u, --udc <UDC>
UDC Controller name
Expand Down
19 changes: 14 additions & 5 deletions src/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>

const USB_ACCESSORY_PATH: &str = "/dev/usb_accessory";
const BUFFER_LEN: usize = 16 * 1024;
const READ_TIMEOUT: Duration = Duration::new(5, 0);
const TCP_CLIENT_TIMEOUT: Duration = Duration::new(30, 0);

// tokio_uring::fs::File and tokio_uring::net::TcpStream are using different
Expand Down Expand Up @@ -69,6 +68,7 @@ async fn copy<A: Endpoint<A>, B: Endpoint<B>>(
dbg_name_from: &'static str,
dbg_name_to: &'static str,
bytes_written: Arc<AtomicUsize>,
read_timeout: Duration,
) -> Result<()> {
let mut buf = vec![0u8; BUFFER_LEN];
loop {
Expand All @@ -77,7 +77,7 @@ async fn copy<A: Endpoint<A>, B: Endpoint<B>>(
// which `Vec<u8>` implements!
debug!("{}: before read", dbg_name_from);
let retval = from.read(buf);
let (res, buf_read) = timeout(READ_TIMEOUT, retval)
let (res, buf_read) = timeout(read_timeout, retval)
.await
.map_err(|e| -> String { format!("{} read: {}", dbg_name_from, e) })?;
// Propagate errors, see how many bytes we read
Expand All @@ -93,7 +93,7 @@ async fn copy<A: Endpoint<A>, B: Endpoint<B>>(
// into the full `Vec<u8>`
debug!("{}: before write", dbg_name_to);
let retval = to.write(buf_read.slice(..n)).submit();
let (res, buf_write) = timeout(READ_TIMEOUT, retval)
let (res, buf_write) = timeout(read_timeout, retval)
.await
.map_err(|e| -> String { format!("{} write: {}", dbg_name_to, e) })?;
let n = res?;
Expand All @@ -112,6 +112,7 @@ async fn transfer_monitor(
stats_interval: Option<Duration>,
usb_bytes_written: Arc<AtomicUsize>,
tcp_bytes_written: Arc<AtomicUsize>,
read_timeout: Duration,
) -> Result<()> {
let mut usb_bytes_out_last: usize = 0;
let mut tcp_bytes_out_last: usize = 0;
Expand Down Expand Up @@ -163,7 +164,7 @@ async fn transfer_monitor(
}

// transfer stall detection
if stall_check.elapsed() > READ_TIMEOUT {
if stall_check.elapsed() > read_timeout {
// compute delta since last check
stall_usb_bytes_last = usb_bytes_out - stall_usb_bytes_last;
stall_tcp_bytes_last = tcp_bytes_out - stall_tcp_bytes_last;
Expand Down Expand Up @@ -194,6 +195,7 @@ pub async fn io_loop(
stats_interval: Option<Duration>,
need_restart: Arc<Notify>,
tcp_start: Arc<Notify>,
read_timeout: Duration,
) -> Result<()> {
info!("{} 🛰️ Starting TCP server...", NAME);
let bind_addr = format!("0.0.0.0:{}", TCP_SERVER_PORT).parse().unwrap();
Expand Down Expand Up @@ -256,17 +258,24 @@ pub async fn io_loop(
"USB",
"TCP",
stream_bytes.clone(),
read_timeout,
));
let mut from_stream = tokio_uring::spawn(copy(
stream.clone(),
file.clone(),
"TCP",
"USB",
file_bytes.clone(),
read_timeout,
));

// Thread for monitoring transfer
let mut monitor = tokio::spawn(transfer_monitor(stats_interval, file_bytes, stream_bytes));
let mut monitor = tokio::spawn(transfer_monitor(
stats_interval,
file_bytes,
stream_bytes,
read_timeout,
));

// Stop as soon as one of them errors
let res = tokio::try_join!(
Expand Down
6 changes: 6 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ struct Args {
/// so that the phone can remain connected (used in special configurations)
#[clap(short, long)]
keepalive: bool,

/// Data transfer timeout
#[clap(short, long, value_name = "SECONDS", default_value_t = 5)]
timeout_secs: u16,
}

#[derive(Clone)]
Expand Down Expand Up @@ -247,6 +251,7 @@ fn main() {
Some(Duration::from_secs(args.stats_interval.into()))
}
};
let read_timeout = Duration::from_secs(args.timeout_secs.into());

info!(
"🛸 <b><blue>aa-proxy-rs</> is starting, build: {}, git: {}-{}",
Expand Down Expand Up @@ -283,6 +288,7 @@ fn main() {
stats_interval,
need_restart_cloned,
tcp_start_cloned,
read_timeout,
));

info!(
Expand Down

0 comments on commit 7282f4c

Please sign in to comment.