Skip to content

Commit

Permalink
Merge pull request #337 from NordSecurity/msz/FILE-534-implement-auto…
Browse files Browse the repository at this point in the history
…matic-transfer-cancellation

Implement automatic transfer cancelation
  • Loading branch information
matszczygiel authored Jul 15, 2024
2 parents ffd54c9 + c6deb3f commit 630126f
Show file tree
Hide file tree
Showing 12 changed files with 610 additions and 541 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
---
* Remove support for deprecated, unsecure protocols V1, V2, V4 and V5
* Sanitize file ID before downloading a file, to prevent the temporary file from being allowed to traverse parent dirs
* Implement automatic transfer cancellation when all files reach the terminal state

---
<br>
Expand Down
31 changes: 10 additions & 21 deletions drop-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,30 +377,19 @@ impl Storage {
}
}

pub async fn update_incoming_file_sync_states(
&self,
transfer_id: Uuid,
file_id: &str,
local: sync::FileState,
) {
let task = async {
let conn = self.conn.lock().await;
sync::incoming_file_set_local_state(&conn, transfer_id, file_id, local)?;
Ok::<(), Error>(())
};

if let Err(e) = task.await {
pub async fn stop_incoming_file(&self, transfer_id: Uuid, file_id: &str) -> Option<()> {
let conn = self.conn.lock().await;

if let Err(e) = sync::incoming_file_set_local_state(
&conn,
transfer_id,
file_id,
sync::FileState::Terminal,
) {
error!(self.logger, "Failed to update incoming file sync states"; "error" => %e);
}
}

pub async fn stop_incoming_file(&self, transfer_id: Uuid, file_id: &str) -> Option<()> {
let task = async {
let conn = self.conn.lock().await;
sync::stop_incoming_file(&conn, transfer_id, file_id)
};

match task.await {
match sync::stop_incoming_file(&conn, transfer_id, file_id) {
Ok(state) => state,
Err(e) => {
error!(self.logger, "Failed to stop incoming file sync state"; "error" => %e);
Expand Down
89 changes: 7 additions & 82 deletions drop-transfer/examples/udrop.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
collections::{btree_map::Entry, BTreeMap, HashSet},
env,
io::Write,
net::IpAddr,
Expand Down Expand Up @@ -202,93 +201,19 @@ async fn listen(
) -> anyhow::Result<()> {
info!("Awaiting events…");

let mut active_file_downloads = BTreeMap::new();
let mut storage = drop_transfer::StorageDispatch::new(storage);
while let Some((ev, _)) = rx.recv().await {
storage.handle_event(&ev).await;
print_event(&ev);
match ev {
Event::RequestReceived(xfer) => {
let xfid = xfer.id();
let files = xfer.files();

if files.is_empty() {
service
.cancel_all(xfid)
.await
.context("Failed to cancled transfer")?;
}

for file in xfer.files().values() {
service
.download(xfid, file.id(), &out_dir.to_string_lossy())
.await
.context("Cannot issue download call")?;
}
}
Event::FileDownloadStarted(xfer, file, _, _) => {
active_file_downloads
.entry(xfer.id())
.or_insert_with(HashSet::new)
.insert(file);
}

Event::FileDownloadProgress(xfer, file, _) => {
active_file_downloads
.entry(xfer.id())
.or_insert_with(HashSet::new)
.insert(file);
}
Event::FileDownloadSuccess(xfer, info) => {
let xfid = xfer.id();
if let Entry::Occupied(mut occ) = active_file_downloads.entry(xfer.id()) {
occ.get_mut().remove(&info.id);
if occ.get().is_empty() {
service
.cancel_all(xfid)
.await
.context("Failed to cancled transfer")?;
occ.remove_entry();
}
}
}
Event::FileDownloadFailed(xfer, file, _) => {
let xfid = xfer.id();

if let Entry::Occupied(mut occ) = active_file_downloads.entry(xfer.id()) {
occ.get_mut().remove(&file);
if occ.get().is_empty() {
service
.cancel_all(xfid)
.await
.context("Failed to cancled transfer")?;
occ.remove_entry();
}
}
}
Event::FileDownloadRejected {
transfer_id,
file_id,
..
} => {
if let Entry::Occupied(mut occ) = active_file_downloads.entry(transfer_id) {
occ.get_mut().remove(&file_id);
if occ.get().is_empty() {
service
.cancel_all(transfer_id)
.await
.context("Failed to cancled transfer")?;
occ.remove_entry();
}
}
}
Event::IncomingTransferCanceled(xfer, _) => {
active_file_downloads.remove(&xfer.id());
}
Event::OutgoingTransferCanceled(xfer, _) => {
active_file_downloads.remove(&xfer.id());
if let Event::RequestReceived(xfer) = ev {
let xfid = xfer.id();
for file in xfer.files().values() {
service
.download(xfid, file.id(), &out_dir.to_string_lossy())
.await
.context("Cannot issue download call")?;
}
_ => (),
}
}

Expand Down
30 changes: 14 additions & 16 deletions drop-transfer/src/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,11 @@ pub(crate) fn spawn(
utils::RetryTrigger::new(refresh_trigger, state.config.connection_retries);

let task = async {
loop {
let cf = run(&state, &xfer, &logger).await;
if cf.is_break() {
info!(logger, "Transfer {} is gone. Clearing", xfer.id());

if let Some(state) = state.transfer_manager.incoming_remove(xfer.id()).await {
state.xfer_events.cancel(true).await
}

break;
}

while run(&state, &xfer, &logger).await.is_continue() {
backoff.backoff().await;
}

info!(logger, "Transfer {} is gone. Clearing", xfer.id());
};

tokio::select! {
Expand All @@ -68,11 +59,18 @@ async fn run(state: &State, xfer: &Arc<IncomingTransfer>, logger: &Logger) -> Co
return ControlFlow::Break(());
}

ask_server(state, xfer, logger).await?;
if !ask_server_if_alive(state, xfer, logger).await {
if let Some(state) = state.transfer_manager.incoming_remove(xfer.id()).await {
state.xfer_events.cancel(true).await
}

return ControlFlow::Break(());
}

ControlFlow::Continue(())
}

async fn ask_server(state: &State, xfer: &IncomingTransfer, logger: &Logger) -> ControlFlow<()> {
async fn ask_server_if_alive(state: &State, xfer: &IncomingTransfer, logger: &Logger) -> bool {
let mut connector = hyper::client::HttpConnector::new();
connector.set_local_address(Some(state.addr));

Expand All @@ -91,7 +89,7 @@ async fn ask_server(state: &State, xfer: &IncomingTransfer, logger: &Logger) ->
)
.await
{
Ok(false) => return ControlFlow::Break(()),
Ok(false) => return false,
Ok(true) => break,
Err(RequestError::UnexpectedResponse(status)) => {
debug!(
Expand All @@ -110,7 +108,7 @@ async fn ask_server(state: &State, xfer: &IncomingTransfer, logger: &Logger) ->
}
}

ControlFlow::Continue(())
true
}

// Returns whether the transfer is alive
Expand Down
Loading

0 comments on commit 630126f

Please sign in to comment.