From b685e1e39a576104cc7244ec8d5ecf8d5e09c963 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Szczygie=C5=82?= Date: Wed, 10 Jul 2024 12:46:53 +0300 Subject: [PATCH] Implement automatic transfer cancellation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mateusz Szczygieł --- drop-transfer/examples/udrop.rs | 89 ++--------------------- drop-transfer/src/manager.rs | 111 ++++++++++++++++++++++++----- drop-transfer/src/service.rs | 8 ++- drop-transfer/src/ws/client/mod.rs | 22 ++++-- drop-transfer/src/ws/client/v4.rs | 2 +- drop-transfer/src/ws/client/v6.rs | 10 ++- drop-transfer/src/ws/server/mod.rs | 36 ++++++---- drop-transfer/src/ws/server/v2.rs | 2 +- drop-transfer/src/ws/server/v4.rs | 2 +- drop-transfer/src/ws/server/v6.rs | 7 +- 10 files changed, 165 insertions(+), 124 deletions(-) diff --git a/drop-transfer/examples/udrop.rs b/drop-transfer/examples/udrop.rs index 7f104bfc..c42df7c6 100644 --- a/drop-transfer/examples/udrop.rs +++ b/drop-transfer/examples/udrop.rs @@ -1,5 +1,4 @@ use std::{ - collections::{btree_map::Entry, BTreeMap, HashSet}, env, io::Write, net::IpAddr, @@ -202,93 +201,19 @@ async fn listen( ) -> anyhow::Result<()> { info!("Awaiting events…"); - let mut active_file_downloads = BTreeMap::new(); let mut storage = drop_transfer::StorageDispatch::new(storage); while let Some((ev, _)) = rx.recv().await { storage.handle_event(&ev).await; print_event(&ev); - match ev { - Event::RequestReceived(xfer) => { - let xfid = xfer.id(); - let files = xfer.files(); - - if files.is_empty() { - service - .cancel_all(xfid) - .await - .context("Failed to cancled transfer")?; - } - - for file in xfer.files().values() { - service - .download(xfid, file.id(), &out_dir.to_string_lossy()) - .await - .context("Cannot issue download call")?; - } - } - Event::FileDownloadStarted(xfer, file, _, _) => { - active_file_downloads - .entry(xfer.id()) - .or_insert_with(HashSet::new) - .insert(file); - } - Event::FileDownloadProgress(xfer, file, _) => { - active_file_downloads - .entry(xfer.id()) - .or_insert_with(HashSet::new) - .insert(file); - } - Event::FileDownloadSuccess(xfer, info) => { - let xfid = xfer.id(); - if let Entry::Occupied(mut occ) = active_file_downloads.entry(xfer.id()) { - occ.get_mut().remove(&info.id); - if occ.get().is_empty() { - service - .cancel_all(xfid) - .await - .context("Failed to cancled transfer")?; - occ.remove_entry(); - } - } - } - Event::FileDownloadFailed(xfer, file, _) => { - let xfid = xfer.id(); - - if let Entry::Occupied(mut occ) = active_file_downloads.entry(xfer.id()) { - occ.get_mut().remove(&file); - if occ.get().is_empty() { - service - .cancel_all(xfid) - .await - .context("Failed to cancled transfer")?; - occ.remove_entry(); - } - } - } - Event::FileDownloadRejected { - transfer_id, - file_id, - .. - } => { - if let Entry::Occupied(mut occ) = active_file_downloads.entry(transfer_id) { - occ.get_mut().remove(&file_id); - if occ.get().is_empty() { - service - .cancel_all(transfer_id) - .await - .context("Failed to cancled transfer")?; - occ.remove_entry(); - } - } - } - Event::IncomingTransferCanceled(xfer, _) => { - active_file_downloads.remove(&xfer.id()); - } - Event::OutgoingTransferCanceled(xfer, _) => { - active_file_downloads.remove(&xfer.id()); + if let Event::RequestReceived(xfer) = ev { + let xfid = xfer.id(); + for file in xfer.files().values() { + service + .download(xfid, file.id(), &out_dir.to_string_lossy()) + .await + .context("Cannot issue download call")?; } - _ => (), } } diff --git a/drop-transfer/src/manager.rs b/drop-transfer/src/manager.rs index 26822aa2..4317c6e7 100644 --- a/drop-transfer/src/manager.rs +++ b/drop-transfer/src/manager.rs @@ -35,8 +35,13 @@ pub struct CloseResult { } pub struct FinishResult { - pub xfer: Arc, - pub events: Arc>, + pub xfer_state: FinishTrasferState, + pub file_events: Arc>, +} + +pub enum FinishTrasferState { + Canceled { events: Arc> }, + Alive, } #[derive(Debug, Clone, Copy, strum::FromRepr)] @@ -350,8 +355,10 @@ impl TransferManager { } Ok(FinishResult { - xfer: state.xfer.clone(), - events: state.file_events(file_id)?.clone(), + xfer_state: state + .cancel_transfer_if_all_files_terminated(&self.logger, &self.storage) + .await, + file_events: state.file_events(file_id)?.clone(), }) } @@ -378,9 +385,13 @@ impl TransferManager { ) .await; + let xfer_state = state + .cancel_transfer_if_all_files_terminated(&self.logger, &self.storage) + .await; + Some(FinishResult { - xfer: state.xfer.clone(), - events: state.file_events(file_id)?.clone(), + xfer_state, + file_events: state.file_events(file_id)?.clone(), }) } else { None @@ -422,9 +433,13 @@ impl TransferManager { }; } + let xfer_state = state + .cancel_transfer_if_all_files_terminated(&self.logger, &self.storage) + .await; + Ok(FinishResult { - xfer: state.xfer.clone(), - events: state.file_events(file_id)?.clone(), + xfer_state, + file_events: state.file_events(file_id)?.clone(), }) } @@ -452,7 +467,7 @@ impl TransferManager { transfer_id: Uuid, file_id: &FileId, success: Result<(), String>, - ) -> crate::Result<()> { + ) -> crate::Result> { let mut lock = self.incoming.lock().await; let state = lock @@ -486,7 +501,11 @@ impl TransferManager { }; } - Ok(()) + let xfer_state = state + .cancel_transfer_if_all_files_terminated(&self.logger, &self.storage) + .await; + + Ok(xfer_state) } pub async fn incoming_terminal_recv( @@ -508,9 +527,13 @@ impl TransferManager { .stop_incoming_file(transfer_id, file_id.as_ref()) .await; + let xfer_state = state + .cancel_transfer_if_all_files_terminated(&self.logger, &self.storage) + .await; + Some(FinishResult { - xfer: state.xfer.clone(), - events: state.file_events(file_id)?.clone(), + xfer_state, + file_events: state.file_events(file_id)?.clone(), }) } else { None @@ -543,9 +566,13 @@ impl TransferManager { ) .await; + let xfer_state = state + .cancel_transfer_if_all_files_terminated(&self.logger, &self.storage) + .await; + Ok(FinishResult { - xfer: state.xfer.clone(), - events: state.file_events(file_id)?.clone(), + xfer_state, + file_events: state.file_events(file_id)?.clone(), }) } @@ -714,6 +741,32 @@ impl OutgoingState { .ok_or(crate::Error::BadFileId) } + async fn cancel_transfer_if_all_files_terminated( + &mut self, + logger: &Logger, + storage: &Storage, + ) -> FinishTrasferState { + let all_terminated = self + .file_sync + .values() + .all(|file_state| matches!(file_state, OutgoingLocalFileState::Terminal(_))); + + if all_terminated { + debug!( + logger, + "All outgoing files terminated, cancelling transfer: {}", + self.xfer.id() + ); + + self.cancel_transfer(logger, storage).await; + FinishTrasferState::Canceled { + events: self.xfer_events.clone(), + } + } else { + FinishTrasferState::Alive + } + } + async fn cancel_transfer(&mut self, logger: &Logger, storage: &Storage) { storage .update_transfer_sync_states( @@ -724,7 +777,7 @@ impl OutgoingState { self.xfer_sync = sync::TransferState::Canceled; if let Some(conn) = self.conn.take() { - debug!(logger, "Pushing incoming close request"); + debug!(logger, "Pushing outgoing close request"); if let Err(e) = conn.send(ClientReq::Close) { warn!(logger, "Failed to send close request: {}", e); @@ -855,6 +908,32 @@ impl IncomingState { .ok_or(crate::Error::BadFileId) } + async fn cancel_transfer_if_all_files_terminated( + &mut self, + logger: &Logger, + storage: &Storage, + ) -> FinishTrasferState { + let all_terminated = self + .file_sync + .values() + .all(|file_state| matches!(file_state, IncomingLocalFileState::Terminal(_))); + + if all_terminated { + debug!( + logger, + "All incoming files terminated, cancelling transfer: {}", + self.xfer.id() + ); + + self.cancel_transfer(logger, storage).await; + FinishTrasferState::Canceled { + events: self.xfer_events.clone(), + } + } else { + FinishTrasferState::Alive + } + } + async fn cancel_transfer(&mut self, logger: &Logger, storage: &Storage) { storage .update_transfer_sync_states(self.xfer.id(), sync::TransferState::Canceled) @@ -863,7 +942,7 @@ impl IncomingState { self.xfer_sync = sync::TransferState::Canceled; if let Some(conn) = self.conn.take() { - debug!(logger, "Pushing outgoing close request"); + debug!(logger, "Pushing incoming close request"); if let Err(e) = conn.send(ServerReq::Close) { warn!(logger, "Failed to send close request: {}", e); diff --git a/drop-transfer/src/service.rs b/drop-transfer/src/service.rs index eef31f02..59e59127 100644 --- a/drop-transfer/src/service.rs +++ b/drop-transfer/src/service.rs @@ -18,7 +18,7 @@ use uuid::Uuid; use crate::{ auth, error::ResultExt, - manager, + manager::{self}, tasks::AliveWaiter, transfer::Transfer, ws::{self, EventTxFactory}, @@ -226,7 +226,8 @@ impl Service { .await { Ok(res) => { - res.events.rejected(false).await; + res.file_events.rejected(false).await; + super::ws::client::handle_finish_xfer_state(res.xfer_state, false).await; return Ok(()); } Err(crate::Error::BadTransfer) => (), @@ -254,7 +255,8 @@ impl Service { tmp_bases.into_iter().map(|base| (base, &file)), ); - res.events.rejected(false).await; + res.file_events.rejected(false).await; + super::ws::server::handle_finish_xfer_state(res.xfer_state, false).await; return Ok(()); } Err(crate::Error::BadTransfer) => (), diff --git a/drop-transfer/src/ws/client/mod.rs b/drop-transfer/src/ws/client/mod.rs index b832d332..ea697b4e 100644 --- a/drop-transfer/src/ws/client/mod.rs +++ b/drop-transfer/src/ws/client/mod.rs @@ -34,7 +34,7 @@ use super::OutgoingFileEventTx; use crate::{ auth, file::FileId, - manager::FileTerminalState, + manager::{FileTerminalState, FinishTrasferState}, protocol, service::State, tasks::AliveGuard, @@ -584,7 +584,10 @@ async fn start_upload( Err(err) => { warn!(logger, "Failed to post failure {err:?}"); } - Ok(res) => res.events.failed(err).await, + Ok(res) => { + res.file_events.failed(err).await; + handle_finish_xfer_state(res.xfer_state, false).await; + } } uploader.error(msg).await; } @@ -606,7 +609,10 @@ async fn on_upload_finished( .await { Err(err) => warn!(logger, "Failed to accept file as done: {err}"), - Ok(Some(res)) => res.events.success().await, + Ok(Some(res)) => { + res.file_events.success().await; + handle_finish_xfer_state(res.xfer_state, true).await; + } Ok(None) => (), } } @@ -625,12 +631,20 @@ async fn on_upload_failure( { Err(err) => warn!(logger, "Failed to accept failure: {err}"), Ok(Some(res)) => { - res.events + res.file_events .failed(crate::Error::BadTransferState(format!( "Receiver reported an error: {msg}" ))) .await; + handle_finish_xfer_state(res.xfer_state, true).await; } Ok(None) => (), } } + +pub async fn handle_finish_xfer_state(state: FinishTrasferState, by_peer: bool) { + match state { + FinishTrasferState::Canceled { events } => events.cancel(by_peer).await, + FinishTrasferState::Alive => (), + } +} diff --git a/drop-transfer/src/ws/client/v4.rs b/drop-transfer/src/ws/client/v4.rs index 6690269c..9813923d 100644 --- a/drop-transfer/src/ws/client/v4.rs +++ b/drop-transfer/src/ws/client/v4.rs @@ -175,7 +175,7 @@ impl HandlerLoop<'_> { Err(err) => { warn!(logger, "Failed to post failure {err:?}"); } - Ok(res) => res.events.failed(err).await, + Ok(res) => res.file_events.failed(err).await, } let msg = v4::Error { diff --git a/drop-transfer/src/ws/client/v6.rs b/drop-transfer/src/ws/client/v6.rs index e2b8f554..a81ead7f 100644 --- a/drop-transfer/src/ws/client/v6.rs +++ b/drop-transfer/src/ws/client/v6.rs @@ -119,7 +119,10 @@ impl HandlerLoop<'_> { .await { Err(err) => error!(self.logger, "Failed to handler file rejection: {err}"), - Ok(Some(res)) => res.events.rejected(true).await, + Ok(Some(res)) => { + res.file_events.rejected(true).await; + super::handle_finish_xfer_state(res.xfer_state, true).await; + } Ok(None) => (), } @@ -205,7 +208,10 @@ impl HandlerLoop<'_> { Err(err) => { warn!(logger, "Failed to post failure {err:?}"); } - Ok(res) => res.events.failed(err).await, + Ok(res) => { + res.file_events.failed(err).await; + super::handle_finish_xfer_state(res.xfer_state, false).await; + } } let msg = prot::Error { diff --git a/drop-transfer/src/ws/server/mod.rs b/drop-transfer/src/ws/server/mod.rs index 539ff991..28c8cda2 100644 --- a/drop-transfer/src/ws/server/mod.rs +++ b/drop-transfer/src/ws/server/mod.rs @@ -37,6 +37,7 @@ use super::{events::FileEventTx, IncomingFileEventTx}; use crate::{ check, file::{self, FileSubPath, FileToRecv}, + manager::FinishTrasferState, protocol, quarantine::PathExt, service::State, @@ -989,20 +990,21 @@ impl FileXferTask { if let Err(e) = tokio::spawn(async move { let _guard = guard; - match result { - Err(crate::Error::Canceled) => info!(logger, "File {} stopped", self.file.id()), + let finish_res = match result { + Err(crate::Error::Canceled) => { + info!(logger, "File {} stopped", self.file.id()); + return; + } Ok(dst_location) => { info!(logger, "File {} downloaded succesfully", self.file.id()); - if let Err(err) = state + let finish_res = state .transfer_manager .incoming_finish_post(self.xfer.id(), self.file.id(), Ok(())) - .await - { - warn!(logger, "Failed to post finish: {err}"); - } + .await; events.success(dst_location).await; + finish_res } Err(err) => { info!( @@ -1011,16 +1013,19 @@ impl FileXferTask { self.file.id() ); - if let Err(err) = state + let finish_res = state .transfer_manager .incoming_finish_post(self.xfer.id(), self.file.id(), Err(err.to_string())) - .await - { - warn!(logger, "Failed to post finish: {err}"); - } + .await; events.failed(err).await; + finish_res } + }; + + match finish_res { + Ok(xfer_state) => handle_finish_xfer_state(xfer_state, false).await, + Err(err) => warn!(logger, "Failed to post finish: {err}"), } }) .await @@ -1200,6 +1205,13 @@ fn validate_subpath_for_download(subpath: &FileSubPath) -> crate::Result<()> { Ok(()) } +pub async fn handle_finish_xfer_state(state: FinishTrasferState, by_peer: bool) { + match state { + FinishTrasferState::Canceled { events } => events.cancel(by_peer).await, + FinishTrasferState::Alive => (), + } +} + #[cfg(test)] mod tests { use crate::file::FileSubPath; diff --git a/drop-transfer/src/ws/server/v2.rs b/drop-transfer/src/ws/server/v2.rs index 71803472..60663b49 100644 --- a/drop-transfer/src/ws/server/v2.rs +++ b/drop-transfer/src/ws/server/v2.rs @@ -214,7 +214,7 @@ impl HandlerLoop<'_, PING> { warn!(self.logger, "Failed to accept failure: {err}"); } Ok(Some(res)) => { - res.events + res.file_events .failed(crate::Error::BadTransferState(format!( "Sender reported an error: {msg}" ))) diff --git a/drop-transfer/src/ws/server/v4.rs b/drop-transfer/src/ws/server/v4.rs index 589f1a7a..6463788b 100644 --- a/drop-transfer/src/ws/server/v4.rs +++ b/drop-transfer/src/ws/server/v4.rs @@ -280,7 +280,7 @@ impl HandlerLoop<'_> { warn!(self.logger, "Failed to accept failure: {err}"); } Ok(Some(res)) => { - res.events + res.file_events .failed(crate::Error::BadTransferState(format!( "Sender reported an error: {msg}" ))) diff --git a/drop-transfer/src/ws/server/v6.rs b/drop-transfer/src/ws/server/v6.rs index 6bc4fd5e..fa85bdb6 100644 --- a/drop-transfer/src/ws/server/v6.rs +++ b/drop-transfer/src/ws/server/v6.rs @@ -281,7 +281,8 @@ impl HandlerLoop<'_> { tmp_bases.into_iter().map(|base| (base, &file_id)), ); - res.events.rejected(true).await; + res.file_events.rejected(true).await; + super::handle_finish_xfer_state(res.xfer_state, true).await; } Ok(None) => (), } @@ -325,11 +326,13 @@ impl HandlerLoop<'_> { warn!(self.logger, "Failed to accept failure: {err}"); } Ok(Some(res)) => { - res.events + res.file_events .failed(crate::Error::BadTransferState(format!( "Sender reported an error: {msg}" ))) .await; + + super::handle_finish_xfer_state(res.xfer_state, true).await; } Ok(None) => (), }