Skip to content

Commit

Permalink
Implement automatic transfer cancellation
Browse files Browse the repository at this point in the history
Signed-off-by: Mateusz Szczygieł <[email protected]>
  • Loading branch information
matszczygiel committed Jul 10, 2024
1 parent f03cfdc commit b685e1e
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 124 deletions.
89 changes: 7 additions & 82 deletions drop-transfer/examples/udrop.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
collections::{btree_map::Entry, BTreeMap, HashSet},
env,
io::Write,
net::IpAddr,
Expand Down Expand Up @@ -202,93 +201,19 @@ async fn listen(
) -> anyhow::Result<()> {
info!("Awaiting events…");

let mut active_file_downloads = BTreeMap::new();
let mut storage = drop_transfer::StorageDispatch::new(storage);
while let Some((ev, _)) = rx.recv().await {
storage.handle_event(&ev).await;
print_event(&ev);
match ev {
Event::RequestReceived(xfer) => {
let xfid = xfer.id();
let files = xfer.files();

if files.is_empty() {
service
.cancel_all(xfid)
.await
.context("Failed to cancled transfer")?;
}

for file in xfer.files().values() {
service
.download(xfid, file.id(), &out_dir.to_string_lossy())
.await
.context("Cannot issue download call")?;
}
}
Event::FileDownloadStarted(xfer, file, _, _) => {
active_file_downloads
.entry(xfer.id())
.or_insert_with(HashSet::new)
.insert(file);
}

Event::FileDownloadProgress(xfer, file, _) => {
active_file_downloads
.entry(xfer.id())
.or_insert_with(HashSet::new)
.insert(file);
}
Event::FileDownloadSuccess(xfer, info) => {
let xfid = xfer.id();
if let Entry::Occupied(mut occ) = active_file_downloads.entry(xfer.id()) {
occ.get_mut().remove(&info.id);
if occ.get().is_empty() {
service
.cancel_all(xfid)
.await
.context("Failed to cancled transfer")?;
occ.remove_entry();
}
}
}
Event::FileDownloadFailed(xfer, file, _) => {
let xfid = xfer.id();

if let Entry::Occupied(mut occ) = active_file_downloads.entry(xfer.id()) {
occ.get_mut().remove(&file);
if occ.get().is_empty() {
service
.cancel_all(xfid)
.await
.context("Failed to cancled transfer")?;
occ.remove_entry();
}
}
}
Event::FileDownloadRejected {
transfer_id,
file_id,
..
} => {
if let Entry::Occupied(mut occ) = active_file_downloads.entry(transfer_id) {
occ.get_mut().remove(&file_id);
if occ.get().is_empty() {
service
.cancel_all(transfer_id)
.await
.context("Failed to cancled transfer")?;
occ.remove_entry();
}
}
}
Event::IncomingTransferCanceled(xfer, _) => {
active_file_downloads.remove(&xfer.id());
}
Event::OutgoingTransferCanceled(xfer, _) => {
active_file_downloads.remove(&xfer.id());
if let Event::RequestReceived(xfer) = ev {
let xfid = xfer.id();
for file in xfer.files().values() {
service
.download(xfid, file.id(), &out_dir.to_string_lossy())
.await
.context("Cannot issue download call")?;
}
_ => (),
}
}

Expand Down
111 changes: 95 additions & 16 deletions drop-transfer/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@ pub struct CloseResult<T: Transfer> {
}

pub struct FinishResult<T: Transfer> {
pub xfer: Arc<T>,
pub events: Arc<FileEventTx<T>>,
pub xfer_state: FinishTrasferState<T>,
pub file_events: Arc<FileEventTx<T>>,
}

pub enum FinishTrasferState<T: Transfer> {
Canceled { events: Arc<TransferEventTx<T>> },
Alive,
}

#[derive(Debug, Clone, Copy, strum::FromRepr)]
Expand Down Expand Up @@ -350,8 +355,10 @@ impl TransferManager {
}

Ok(FinishResult {
xfer: state.xfer.clone(),
events: state.file_events(file_id)?.clone(),
xfer_state: state
.cancel_transfer_if_all_files_terminated(&self.logger, &self.storage)
.await,
file_events: state.file_events(file_id)?.clone(),
})
}

Expand All @@ -378,9 +385,13 @@ impl TransferManager {
)
.await;

let xfer_state = state
.cancel_transfer_if_all_files_terminated(&self.logger, &self.storage)
.await;

Some(FinishResult {
xfer: state.xfer.clone(),
events: state.file_events(file_id)?.clone(),
xfer_state,
file_events: state.file_events(file_id)?.clone(),
})
} else {
None
Expand Down Expand Up @@ -422,9 +433,13 @@ impl TransferManager {
};
}

let xfer_state = state
.cancel_transfer_if_all_files_terminated(&self.logger, &self.storage)
.await;

Ok(FinishResult {
xfer: state.xfer.clone(),
events: state.file_events(file_id)?.clone(),
xfer_state,
file_events: state.file_events(file_id)?.clone(),
})
}

Expand Down Expand Up @@ -452,7 +467,7 @@ impl TransferManager {
transfer_id: Uuid,
file_id: &FileId,
success: Result<(), String>,
) -> crate::Result<()> {
) -> crate::Result<FinishTrasferState<IncomingTransfer>> {
let mut lock = self.incoming.lock().await;

let state = lock
Expand Down Expand Up @@ -486,7 +501,11 @@ impl TransferManager {
};
}

Ok(())
let xfer_state = state
.cancel_transfer_if_all_files_terminated(&self.logger, &self.storage)
.await;

Ok(xfer_state)
}

pub async fn incoming_terminal_recv(
Expand All @@ -508,9 +527,13 @@ impl TransferManager {
.stop_incoming_file(transfer_id, file_id.as_ref())
.await;

let xfer_state = state
.cancel_transfer_if_all_files_terminated(&self.logger, &self.storage)
.await;

Some(FinishResult {
xfer: state.xfer.clone(),
events: state.file_events(file_id)?.clone(),
xfer_state,
file_events: state.file_events(file_id)?.clone(),
})
} else {
None
Expand Down Expand Up @@ -543,9 +566,13 @@ impl TransferManager {
)
.await;

let xfer_state = state
.cancel_transfer_if_all_files_terminated(&self.logger, &self.storage)
.await;

Ok(FinishResult {
xfer: state.xfer.clone(),
events: state.file_events(file_id)?.clone(),
xfer_state,
file_events: state.file_events(file_id)?.clone(),
})
}

Expand Down Expand Up @@ -714,6 +741,32 @@ impl OutgoingState {
.ok_or(crate::Error::BadFileId)
}

async fn cancel_transfer_if_all_files_terminated(
&mut self,
logger: &Logger,
storage: &Storage,
) -> FinishTrasferState<OutgoingTransfer> {
let all_terminated = self
.file_sync
.values()
.all(|file_state| matches!(file_state, OutgoingLocalFileState::Terminal(_)));

if all_terminated {
debug!(
logger,
"All outgoing files terminated, cancelling transfer: {}",
self.xfer.id()
);

self.cancel_transfer(logger, storage).await;
FinishTrasferState::Canceled {
events: self.xfer_events.clone(),
}
} else {
FinishTrasferState::Alive
}
}

async fn cancel_transfer(&mut self, logger: &Logger, storage: &Storage) {
storage
.update_transfer_sync_states(
Expand All @@ -724,7 +777,7 @@ impl OutgoingState {
self.xfer_sync = sync::TransferState::Canceled;

if let Some(conn) = self.conn.take() {
debug!(logger, "Pushing incoming close request");
debug!(logger, "Pushing outgoing close request");

if let Err(e) = conn.send(ClientReq::Close) {
warn!(logger, "Failed to send close request: {}", e);
Expand Down Expand Up @@ -855,6 +908,32 @@ impl IncomingState {
.ok_or(crate::Error::BadFileId)
}

async fn cancel_transfer_if_all_files_terminated(
&mut self,
logger: &Logger,
storage: &Storage,
) -> FinishTrasferState<IncomingTransfer> {
let all_terminated = self
.file_sync
.values()
.all(|file_state| matches!(file_state, IncomingLocalFileState::Terminal(_)));

if all_terminated {
debug!(
logger,
"All incoming files terminated, cancelling transfer: {}",
self.xfer.id()
);

self.cancel_transfer(logger, storage).await;
FinishTrasferState::Canceled {
events: self.xfer_events.clone(),
}
} else {
FinishTrasferState::Alive
}
}

async fn cancel_transfer(&mut self, logger: &Logger, storage: &Storage) {
storage
.update_transfer_sync_states(self.xfer.id(), sync::TransferState::Canceled)
Expand All @@ -863,7 +942,7 @@ impl IncomingState {
self.xfer_sync = sync::TransferState::Canceled;

if let Some(conn) = self.conn.take() {
debug!(logger, "Pushing outgoing close request");
debug!(logger, "Pushing incoming close request");

if let Err(e) = conn.send(ServerReq::Close) {
warn!(logger, "Failed to send close request: {}", e);
Expand Down
8 changes: 5 additions & 3 deletions drop-transfer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use uuid::Uuid;
use crate::{
auth,
error::ResultExt,
manager,
manager::{self},
tasks::AliveWaiter,
transfer::Transfer,
ws::{self, EventTxFactory},
Expand Down Expand Up @@ -226,7 +226,8 @@ impl Service {
.await
{
Ok(res) => {
res.events.rejected(false).await;
res.file_events.rejected(false).await;
super::ws::client::handle_finish_xfer_state(res.xfer_state, false).await;
return Ok(());
}
Err(crate::Error::BadTransfer) => (),
Expand Down Expand Up @@ -254,7 +255,8 @@ impl Service {
tmp_bases.into_iter().map(|base| (base, &file)),
);

res.events.rejected(false).await;
res.file_events.rejected(false).await;
super::ws::server::handle_finish_xfer_state(res.xfer_state, false).await;
return Ok(());
}
Err(crate::Error::BadTransfer) => (),
Expand Down
22 changes: 18 additions & 4 deletions drop-transfer/src/ws/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use super::OutgoingFileEventTx;
use crate::{
auth,
file::FileId,
manager::FileTerminalState,
manager::{FileTerminalState, FinishTrasferState},
protocol,
service::State,
tasks::AliveGuard,
Expand Down Expand Up @@ -584,7 +584,10 @@ async fn start_upload(
Err(err) => {
warn!(logger, "Failed to post failure {err:?}");
}
Ok(res) => res.events.failed(err).await,
Ok(res) => {
res.file_events.failed(err).await;
handle_finish_xfer_state(res.xfer_state, false).await;
}
}
uploader.error(msg).await;
}
Expand All @@ -606,7 +609,10 @@ async fn on_upload_finished(
.await
{
Err(err) => warn!(logger, "Failed to accept file as done: {err}"),
Ok(Some(res)) => res.events.success().await,
Ok(Some(res)) => {
res.file_events.success().await;
handle_finish_xfer_state(res.xfer_state, true).await;
}
Ok(None) => (),
}
}
Expand All @@ -625,12 +631,20 @@ async fn on_upload_failure(
{
Err(err) => warn!(logger, "Failed to accept failure: {err}"),
Ok(Some(res)) => {
res.events
res.file_events
.failed(crate::Error::BadTransferState(format!(
"Receiver reported an error: {msg}"
)))
.await;
handle_finish_xfer_state(res.xfer_state, true).await;
}
Ok(None) => (),
}
}

pub async fn handle_finish_xfer_state(state: FinishTrasferState<OutgoingTransfer>, by_peer: bool) {
match state {
FinishTrasferState::Canceled { events } => events.cancel(by_peer).await,
FinishTrasferState::Alive => (),
}
}
2 changes: 1 addition & 1 deletion drop-transfer/src/ws/client/v4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl HandlerLoop<'_> {
Err(err) => {
warn!(logger, "Failed to post failure {err:?}");
}
Ok(res) => res.events.failed(err).await,
Ok(res) => res.file_events.failed(err).await,
}

let msg = v4::Error {
Expand Down
Loading

0 comments on commit b685e1e

Please sign in to comment.