Skip to content

Commit d99a3f8

Browse files
committed
Wait for FileEventTx::pause() calls to finish in the server connection loop
Signed-off-by: Mateusz Szczygieł <[email protected]>
1 parent fb4b359 commit d99a3f8

File tree

5 files changed

+59
-27
lines changed

5 files changed

+59
-27
lines changed

drop-transfer/src/ws/server/handler.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ pub trait HandlerLoop {
6060
async fn on_bin_msg(&mut self, ws: &mut WebSocket, bytes: Vec<u8>) -> anyhow::Result<()>;
6161

6262
async fn finalize_success(self);
63+
async fn finalize_failure(self);
6364
}
6465

6566
pub trait Request {

drop-transfer/src/ws/server/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,7 @@ impl RunContext<'_> {
564564
"WS connection broke for {}: {err:?}",
565565
xfer.id()
566566
);
567+
handler.finalize_failure().await;
567568
} else {
568569
info!(self.logger, "Sucesfully finalizing transfer loop");
569570
handler.finalize_success().await;

drop-transfer/src/ws/server/v2.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,18 @@ impl<const PING: bool> HandlerLoop<'_, PING> {
226226
self.stop_task(&file_id, Status::FileRejected).await;
227227
}
228228
}
229+
230+
fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
231+
let jobs = std::mem::take(&mut self.jobs);
232+
233+
async move {
234+
let tasks = jobs.into_values().map(|task| async move {
235+
task.events.pause().await;
236+
});
237+
238+
futures::future::join_all(tasks).await;
239+
}
240+
}
229241
}
230242

231243
#[async_trait::async_trait]
@@ -359,20 +371,18 @@ impl<const PING: bool> handler::HandlerLoop for HandlerLoop<'_, PING> {
359371
async fn finalize_success(mut self) {
360372
debug!(self.logger, "Finalizing");
361373
}
374+
375+
// While the destructor ensures the events are paused this function waits for
376+
// the execution to be finished
377+
async fn finalize_failure(mut self) {
378+
self.take_pause_futures().await;
379+
}
362380
}
363381

364382
impl<const PING: bool> Drop for HandlerLoop<'_, PING> {
365383
fn drop(&mut self) {
366384
debug!(self.logger, "Stopping server handler");
367-
368-
let jobs = std::mem::take(&mut self.jobs);
369-
tokio::spawn(async move {
370-
let tasks = jobs.into_values().map(|task| async move {
371-
task.events.pause().await;
372-
});
373-
374-
futures::future::join_all(tasks).await;
375-
});
385+
tokio::spawn(self.take_pause_futures());
376386
}
377387
}
378388

drop-transfer/src/ws/server/v4.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,18 @@ impl HandlerLoop<'_> {
325325
}
326326
}
327327
}
328+
329+
fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
330+
let jobs = std::mem::take(&mut self.jobs);
331+
332+
async move {
333+
let tasks = jobs.into_values().map(|task| async move {
334+
task.events.pause().await;
335+
});
336+
337+
futures::future::join_all(tasks).await;
338+
}
339+
}
328340
}
329341

330342
#[async_trait::async_trait]
@@ -476,20 +488,18 @@ impl handler::HandlerLoop for HandlerLoop<'_> {
476488
.map(|tmp| (tmp.base_path, FileId::from(tmp.file_id))),
477489
);
478490
}
491+
492+
// While the destructor ensures the events are paused this function waits for
493+
// the execution to be finished
494+
async fn finalize_failure(mut self) {
495+
self.take_pause_futures().await;
496+
}
479497
}
480498

481499
impl Drop for HandlerLoop<'_> {
482500
fn drop(&mut self) {
483501
debug!(self.logger, "Stopping server handler");
484-
485-
let jobs = std::mem::take(&mut self.jobs);
486-
tokio::spawn(async move {
487-
let tasks = jobs.into_values().map(|task| async move {
488-
task.events.pause().await;
489-
});
490-
491-
futures::future::join_all(tasks).await;
492-
});
502+
tokio::spawn(self.take_pause_futures());
493503
}
494504
}
495505

drop-transfer/src/ws/server/v6.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,18 @@ impl HandlerLoop<'_> {
370370
}
371371
}
372372
}
373+
374+
fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
375+
let jobs = std::mem::take(&mut self.jobs);
376+
377+
async move {
378+
let tasks = jobs.into_values().map(|task| async move {
379+
task.events.pause().await;
380+
});
381+
382+
futures::future::join_all(tasks).await;
383+
}
384+
}
373385
}
374386

375387
#[async_trait::async_trait]
@@ -535,20 +547,18 @@ impl handler::HandlerLoop for HandlerLoop<'_> {
535547
.map(|tmp| (tmp.base_path, FileId::from(tmp.file_id))),
536548
);
537549
}
550+
551+
// While the destructor ensures the events are paused this function waits for
552+
// the execution to be finished
553+
async fn finalize_failure(mut self) {
554+
self.take_pause_futures().await;
555+
}
538556
}
539557

540558
impl Drop for HandlerLoop<'_> {
541559
fn drop(&mut self) {
542560
debug!(self.logger, "Stopping server handler");
543-
544-
let jobs = std::mem::take(&mut self.jobs);
545-
tokio::spawn(async move {
546-
let tasks = jobs.into_values().map(|task| async move {
547-
task.events.pause().await;
548-
});
549-
550-
futures::future::join_all(tasks).await;
551-
});
561+
tokio::spawn(self.take_pause_futures());
552562
}
553563
}
554564

0 commit comments

Comments
 (0)