From fb4b3596dad1183c56ef9415016defcc7b56ad84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Szczygie=C5=82?= Date: Mon, 20 May 2024 14:39:15 +0300 Subject: [PATCH 1/3] Add test case for multiple files transfer being paused for a while on the sender side MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mateusz Szczygieł --- test/drop_test/action.py | 11 +++ test/scenarios.py | 196 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 207 insertions(+) diff --git a/test/drop_test/action.py b/test/drop_test/action.py index 641038f..647dd03 100644 --- a/test/drop_test/action.py +++ b/test/drop_test/action.py @@ -682,6 +682,17 @@ def __str__(self): return f"DrainEvents({self._count})" +class ClearEventQueue(Action): + def __init__(self): + pass + + async def run(self, drop: ffi.Drop): + _ = await drop._events.gather_all(0) + + def __str__(self): + return "ClearEventQueue()" + + class NoEvent(Action): def __init__(self, duration: int = 3): self._duration = duration diff --git a/test/scenarios.py b/test/scenarios.py index 001418a..8559bca 100644 --- a/test/scenarios.py +++ b/test/scenarios.py @@ -2664,6 +2664,202 @@ ), }, ), + Scenario( + "scenario11-4", + "Initate a transfers with multiple files. Wait for them to start and then stop the sender. Then restart sender and expect transfer to finish successfuly", + { + "DROP_PEER_REN": ActionList( + [ + action.WaitForAnotherPeer("DROP_PEER_STIMPY"), + action.Start("DROP_PEER_REN", dbpath="/tmp/db/11-4-ren.sqlite"), + action.NewTransfer( + "DROP_PEER_STIMPY", + [ + "/tmp/testfile-bulk-01", + "/tmp/testfile-bulk-02", + "/tmp/testfile-bulk-03", + "/tmp/testfile-bulk-04", + "/tmp/testfile-bulk-05", + "/tmp/testfile-bulk-06", + "/tmp/testfile-bulk-07", + "/tmp/testfile-bulk-08", + "/tmp/testfile-bulk-09", + "/tmp/testfile-bulk-10", + ], + ), + # fmt: off + action.Wait( + event.Queued(0, "DROP_PEER_STIMPY", [ + norddrop.QueuedFile(FILES["testfile-bulk-01"].id, "testfile-bulk-01", 10485760, "/tmp"), + norddrop.QueuedFile(FILES["testfile-bulk-02"].id, "testfile-bulk-02", 10485760, "/tmp"), + norddrop.QueuedFile(FILES["testfile-bulk-03"].id, "testfile-bulk-03", 10485760, "/tmp"), + norddrop.QueuedFile(FILES["testfile-bulk-04"].id, "testfile-bulk-04", 10485760, "/tmp"), + norddrop.QueuedFile(FILES["testfile-bulk-05"].id, "testfile-bulk-05", 10485760, "/tmp"), + norddrop.QueuedFile(FILES["testfile-bulk-06"].id, "testfile-bulk-06", 10485760, "/tmp"), + norddrop.QueuedFile(FILES["testfile-bulk-07"].id, "testfile-bulk-07", 10485760, "/tmp"), + norddrop.QueuedFile(FILES["testfile-bulk-08"].id, "testfile-bulk-08", 10485760, "/tmp"), + norddrop.QueuedFile(FILES["testfile-bulk-09"].id, "testfile-bulk-09", 10485760, "/tmp"), + norddrop.QueuedFile(FILES["testfile-bulk-10"].id, "testfile-bulk-10", 10485760, "/tmp"), + ]), + ), + # Wait for some of the files and the stop + action.Repeated([action.WaitForOneOf([ + event.Start(0, FILES["testfile-bulk-01"].id), + event.Start(0, FILES["testfile-bulk-02"].id), + event.Start(0, FILES["testfile-bulk-03"].id), + event.Start(0, FILES["testfile-bulk-04"].id), + event.Start(0, FILES["testfile-bulk-05"].id), + event.Start(0, FILES["testfile-bulk-06"].id), + event.Start(0, FILES["testfile-bulk-07"].id), + event.Start(0, FILES["testfile-bulk-08"].id), + event.Start(0, FILES["testfile-bulk-09"].id), + event.Start(0, FILES["testfile-bulk-10"].id), + ])], 4), + # fmt: on + action.Stop(), + # fmt: off + # Wait of paused events + action.WaitAndIgnoreExcept([ + event.Paused(0, FILES["testfile-bulk-01"].id), + event.Paused(0, FILES["testfile-bulk-02"].id), + event.Paused(0, FILES["testfile-bulk-03"].id), + event.Paused(0, FILES["testfile-bulk-04"].id), + event.Paused(0, FILES["testfile-bulk-05"].id), + event.Paused(0, FILES["testfile-bulk-06"].id), + event.Paused(0, FILES["testfile-bulk-07"].id), + event.Paused(0, FILES["testfile-bulk-08"].id), + event.Paused(0, FILES["testfile-bulk-09"].id), + event.Paused(0, FILES["testfile-bulk-10"].id), + ]), + # fmt: on + action.Sleep(2), + action.ClearEventQueue(), + action.Start("DROP_PEER_REN", dbpath="/tmp/db/11-4-ren.sqlite"), + # fmt: off + # Wait for some of the files and the stop + action.WaitRacy([ + event.Start(0, FILES["testfile-bulk-01"].id, None), + event.Start(0, FILES["testfile-bulk-02"].id, None), + event.Start(0, FILES["testfile-bulk-03"].id, None), + event.Start(0, FILES["testfile-bulk-04"].id, None), + event.Start(0, FILES["testfile-bulk-05"].id, None), + event.Start(0, FILES["testfile-bulk-06"].id, None), + event.Start(0, FILES["testfile-bulk-07"].id, None), + event.Start(0, FILES["testfile-bulk-08"].id, None), + event.Start(0, FILES["testfile-bulk-09"].id, None), + event.Start(0, FILES["testfile-bulk-10"].id, None), + event.FinishFileUploaded(0, FILES["testfile-bulk-01"].id), + event.FinishFileUploaded(0, FILES["testfile-bulk-02"].id), + event.FinishFileUploaded(0, FILES["testfile-bulk-03"].id), + event.FinishFileUploaded(0, FILES["testfile-bulk-04"].id), + event.FinishFileUploaded(0, FILES["testfile-bulk-05"].id), + event.FinishFileUploaded(0, FILES["testfile-bulk-06"].id), + event.FinishFileUploaded(0, FILES["testfile-bulk-07"].id), + event.FinishFileUploaded(0, FILES["testfile-bulk-08"].id), + event.FinishFileUploaded(0, FILES["testfile-bulk-09"].id), + event.FinishFileUploaded(0, FILES["testfile-bulk-10"].id), + ]), + # fmt: on + action.ExpectCancel([0], True), + ] + ), + "DROP_PEER_STIMPY": ActionList( + [ + action.Start("DROP_PEER_STIMPY"), + # fmt: off + action.Wait( + event.Receive(0, "DROP_PEER_REN", [ + norddrop.ReceivedFile(FILES["testfile-bulk-01"].id, "testfile-bulk-01", 10485760), + norddrop.ReceivedFile(FILES["testfile-bulk-02"].id, "testfile-bulk-02", 10485760), + norddrop.ReceivedFile(FILES["testfile-bulk-03"].id, "testfile-bulk-03", 10485760), + norddrop.ReceivedFile(FILES["testfile-bulk-04"].id, "testfile-bulk-04", 10485760), + norddrop.ReceivedFile(FILES["testfile-bulk-05"].id, "testfile-bulk-05", 10485760), + norddrop.ReceivedFile(FILES["testfile-bulk-06"].id, "testfile-bulk-06", 10485760), + norddrop.ReceivedFile(FILES["testfile-bulk-07"].id, "testfile-bulk-07", 10485760), + norddrop.ReceivedFile(FILES["testfile-bulk-08"].id, "testfile-bulk-08", 10485760), + norddrop.ReceivedFile(FILES["testfile-bulk-09"].id, "testfile-bulk-09", 10485760), + norddrop.ReceivedFile(FILES["testfile-bulk-10"].id, "testfile-bulk-10", 10485760), + ]), + ), + # fmt: on + # fmt: off + action.Download(0, FILES["testfile-bulk-01"].id, "/tmp/received/11-4"), + action.Download(0, FILES["testfile-bulk-02"].id, "/tmp/received/11-4"), + action.Download(0, FILES["testfile-bulk-03"].id, "/tmp/received/11-4"), + action.Download(0, FILES["testfile-bulk-04"].id, "/tmp/received/11-4"), + action.Download(0, FILES["testfile-bulk-05"].id, "/tmp/received/11-4"), + action.Download(0, FILES["testfile-bulk-06"].id, "/tmp/received/11-4"), + action.Download(0, FILES["testfile-bulk-07"].id, "/tmp/received/11-4"), + action.Download(0, FILES["testfile-bulk-08"].id, "/tmp/received/11-4"), + action.Download(0, FILES["testfile-bulk-09"].id, "/tmp/received/11-4"), + action.Download(0, FILES["testfile-bulk-10"].id, "/tmp/received/11-4"), + # Wait for all 10 pending events + action.Repeated([action.WaitForOneOf([ + event.Pending(0, FILES["testfile-bulk-01"].id), + event.Pending(0, FILES["testfile-bulk-02"].id), + event.Pending(0, FILES["testfile-bulk-03"].id), + event.Pending(0, FILES["testfile-bulk-04"].id), + event.Pending(0, FILES["testfile-bulk-05"].id), + event.Pending(0, FILES["testfile-bulk-06"].id), + event.Pending(0, FILES["testfile-bulk-07"].id), + event.Pending(0, FILES["testfile-bulk-08"].id), + event.Pending(0, FILES["testfile-bulk-09"].id), + event.Pending(0, FILES["testfile-bulk-10"].id), + event.Start(0, FILES["testfile-bulk-01"].id), + event.Start(0, FILES["testfile-bulk-02"].id), + event.Start(0, FILES["testfile-bulk-03"].id), + event.Start(0, FILES["testfile-bulk-04"].id), + event.Start(0, FILES["testfile-bulk-05"].id), + event.Start(0, FILES["testfile-bulk-06"].id), + event.Start(0, FILES["testfile-bulk-07"].id), + event.Start(0, FILES["testfile-bulk-08"].id), + event.Start(0, FILES["testfile-bulk-09"].id), + event.Start(0, FILES["testfile-bulk-10"].id), + ]), + ], 10), + action.WaitAndIgnoreExcept([ + event.Paused(0, FILES["testfile-bulk-01"].id), + event.Paused(0, FILES["testfile-bulk-02"].id), + event.Paused(0, FILES["testfile-bulk-03"].id), + event.Paused(0, FILES["testfile-bulk-04"].id), + event.Paused(0, FILES["testfile-bulk-05"].id), + event.Paused(0, FILES["testfile-bulk-06"].id), + event.Paused(0, FILES["testfile-bulk-07"].id), + event.Paused(0, FILES["testfile-bulk-08"].id), + event.Paused(0, FILES["testfile-bulk-09"].id), + event.Paused(0, FILES["testfile-bulk-10"].id), + ]), + # The sender is stopped + action.WaitRacy([ + event.Start(0, FILES["testfile-bulk-01"].id, None), + event.Start(0, FILES["testfile-bulk-02"].id, None), + event.Start(0, FILES["testfile-bulk-03"].id, None), + event.Start(0, FILES["testfile-bulk-04"].id, None), + event.Start(0, FILES["testfile-bulk-05"].id, None), + event.Start(0, FILES["testfile-bulk-06"].id, None), + event.Start(0, FILES["testfile-bulk-07"].id, None), + event.Start(0, FILES["testfile-bulk-08"].id, None), + event.Start(0, FILES["testfile-bulk-09"].id, None), + event.Start(0, FILES["testfile-bulk-10"].id, None), + event.FinishFileDownloaded(0, FILES["testfile-bulk-01"].id, "/tmp/received/11-4/testfile-bulk-01"), + event.FinishFileDownloaded(0, FILES["testfile-bulk-02"].id, "/tmp/received/11-4/testfile-bulk-02"), + event.FinishFileDownloaded(0, FILES["testfile-bulk-03"].id, "/tmp/received/11-4/testfile-bulk-03"), + event.FinishFileDownloaded(0, FILES["testfile-bulk-04"].id, "/tmp/received/11-4/testfile-bulk-04"), + event.FinishFileDownloaded(0, FILES["testfile-bulk-05"].id, "/tmp/received/11-4/testfile-bulk-05"), + event.FinishFileDownloaded(0, FILES["testfile-bulk-06"].id, "/tmp/received/11-4/testfile-bulk-06"), + event.FinishFileDownloaded(0, FILES["testfile-bulk-07"].id, "/tmp/received/11-4/testfile-bulk-07"), + event.FinishFileDownloaded(0, FILES["testfile-bulk-08"].id, "/tmp/received/11-4/testfile-bulk-08"), + event.FinishFileDownloaded(0, FILES["testfile-bulk-09"].id, "/tmp/received/11-4/testfile-bulk-09"), + event.FinishFileDownloaded(0, FILES["testfile-bulk-10"].id, "/tmp/received/11-4/testfile-bulk-10"), + ]), + # fmt: on + action.CancelTransferRequest([0]), + action.ExpectCancel([0], False), + action.Stop(), + ] + ), + }, + ), Scenario( "scenario12-1", "Transfer file to two clients simultaneously", From d99a3f8de743b72a285d54427f895be4f728c63f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Szczygie=C5=82?= Date: Tue, 21 May 2024 14:42:12 +0300 Subject: [PATCH 2/3] Wait for `FileEventTx::pause()` calls to finish in the server connection loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mateusz Szczygieł --- drop-transfer/src/ws/server/handler.rs | 1 + drop-transfer/src/ws/server/mod.rs | 1 + drop-transfer/src/ws/server/v2.rs | 28 +++++++++++++++++--------- drop-transfer/src/ws/server/v4.rs | 28 +++++++++++++++++--------- drop-transfer/src/ws/server/v6.rs | 28 +++++++++++++++++--------- 5 files changed, 59 insertions(+), 27 deletions(-) diff --git a/drop-transfer/src/ws/server/handler.rs b/drop-transfer/src/ws/server/handler.rs index 4445333..2e35921 100644 --- a/drop-transfer/src/ws/server/handler.rs +++ b/drop-transfer/src/ws/server/handler.rs @@ -60,6 +60,7 @@ pub trait HandlerLoop { async fn on_bin_msg(&mut self, ws: &mut WebSocket, bytes: Vec) -> anyhow::Result<()>; async fn finalize_success(self); + async fn finalize_failure(self); } pub trait Request { diff --git a/drop-transfer/src/ws/server/mod.rs b/drop-transfer/src/ws/server/mod.rs index ce0217b..769f516 100644 --- a/drop-transfer/src/ws/server/mod.rs +++ b/drop-transfer/src/ws/server/mod.rs @@ -564,6 +564,7 @@ impl RunContext<'_> { "WS connection broke for {}: {err:?}", xfer.id() ); + handler.finalize_failure().await; } else { info!(self.logger, "Sucesfully finalizing transfer loop"); handler.finalize_success().await; diff --git a/drop-transfer/src/ws/server/v2.rs b/drop-transfer/src/ws/server/v2.rs index e5b1fb8..7180347 100644 --- a/drop-transfer/src/ws/server/v2.rs +++ b/drop-transfer/src/ws/server/v2.rs @@ -226,6 +226,18 @@ impl HandlerLoop<'_, PING> { self.stop_task(&file_id, Status::FileRejected).await; } } + + fn take_pause_futures(&mut self) -> impl Future { + let jobs = std::mem::take(&mut self.jobs); + + async move { + let tasks = jobs.into_values().map(|task| async move { + task.events.pause().await; + }); + + futures::future::join_all(tasks).await; + } + } } #[async_trait::async_trait] @@ -359,20 +371,18 @@ impl handler::HandlerLoop for HandlerLoop<'_, PING> { async fn finalize_success(mut self) { debug!(self.logger, "Finalizing"); } + + // While the destructor ensures the events are paused this function waits for + // the execution to be finished + async fn finalize_failure(mut self) { + self.take_pause_futures().await; + } } impl Drop for HandlerLoop<'_, PING> { fn drop(&mut self) { debug!(self.logger, "Stopping server handler"); - - let jobs = std::mem::take(&mut self.jobs); - tokio::spawn(async move { - let tasks = jobs.into_values().map(|task| async move { - task.events.pause().await; - }); - - futures::future::join_all(tasks).await; - }); + tokio::spawn(self.take_pause_futures()); } } diff --git a/drop-transfer/src/ws/server/v4.rs b/drop-transfer/src/ws/server/v4.rs index 70941db..589f1a7 100644 --- a/drop-transfer/src/ws/server/v4.rs +++ b/drop-transfer/src/ws/server/v4.rs @@ -325,6 +325,18 @@ impl HandlerLoop<'_> { } } } + + fn take_pause_futures(&mut self) -> impl Future { + let jobs = std::mem::take(&mut self.jobs); + + async move { + let tasks = jobs.into_values().map(|task| async move { + task.events.pause().await; + }); + + futures::future::join_all(tasks).await; + } + } } #[async_trait::async_trait] @@ -476,20 +488,18 @@ impl handler::HandlerLoop for HandlerLoop<'_> { .map(|tmp| (tmp.base_path, FileId::from(tmp.file_id))), ); } + + // While the destructor ensures the events are paused this function waits for + // the execution to be finished + async fn finalize_failure(mut self) { + self.take_pause_futures().await; + } } impl Drop for HandlerLoop<'_> { fn drop(&mut self) { debug!(self.logger, "Stopping server handler"); - - let jobs = std::mem::take(&mut self.jobs); - tokio::spawn(async move { - let tasks = jobs.into_values().map(|task| async move { - task.events.pause().await; - }); - - futures::future::join_all(tasks).await; - }); + tokio::spawn(self.take_pause_futures()); } } diff --git a/drop-transfer/src/ws/server/v6.rs b/drop-transfer/src/ws/server/v6.rs index c693096..6bc4fd5 100644 --- a/drop-transfer/src/ws/server/v6.rs +++ b/drop-transfer/src/ws/server/v6.rs @@ -370,6 +370,18 @@ impl HandlerLoop<'_> { } } } + + fn take_pause_futures(&mut self) -> impl Future { + let jobs = std::mem::take(&mut self.jobs); + + async move { + let tasks = jobs.into_values().map(|task| async move { + task.events.pause().await; + }); + + futures::future::join_all(tasks).await; + } + } } #[async_trait::async_trait] @@ -535,20 +547,18 @@ impl handler::HandlerLoop for HandlerLoop<'_> { .map(|tmp| (tmp.base_path, FileId::from(tmp.file_id))), ); } + + // While the destructor ensures the events are paused this function waits for + // the execution to be finished + async fn finalize_failure(mut self) { + self.take_pause_futures().await; + } } impl Drop for HandlerLoop<'_> { fn drop(&mut self) { debug!(self.logger, "Stopping server handler"); - - let jobs = std::mem::take(&mut self.jobs); - tokio::spawn(async move { - let tasks = jobs.into_values().map(|task| async move { - task.events.pause().await; - }); - - futures::future::join_all(tasks).await; - }); + tokio::spawn(self.take_pause_futures()); } } From d7deace3eb37f68933315df4af32cfa8817be983 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Szczygie=C5=82?= Date: Tue, 21 May 2024 14:43:57 +0300 Subject: [PATCH 3/3] Update changelog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mateusz Szczygieł --- changelog.md | 1 + 1 file changed, 1 insertion(+) diff --git a/changelog.md b/changelog.md index cf15544..7b2d622 100644 --- a/changelog.md +++ b/changelog.md @@ -6,6 +6,7 @@ This means the whole API has changed. Even though semantics are the same the mechanism is now different and requires new implementation to use properly. * Split checksum events into finalize and verify * Add `base_dir` field in the `RequestQueued` event files +* Fix rare issue of missing receiver's in-progress events ---