Skip to content

Commit

Permalink
Merge pull request #332 from NordSecurity/msz/FILE-601-receiver-does-…
Browse files Browse the repository at this point in the history
…not-reports-events

FILE-601 Receiver does not reports events
  • Loading branch information
matszczygiel authored May 22, 2024
2 parents e2e38a3 + d7deace commit b1e9f40
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 27 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This means the whole API has changed. Even though semantics are the same
the mechanism is now different and requires new implementation to use properly.
* Split checksum events into finalize and verify
* Add `base_dir` field in the `RequestQueued` event files
* Fix rare issue of missing receiver's in-progress events

---
<br>
Expand Down
1 change: 1 addition & 0 deletions drop-transfer/src/ws/server/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub trait HandlerLoop {
async fn on_bin_msg(&mut self, ws: &mut WebSocket, bytes: Vec<u8>) -> anyhow::Result<()>;

async fn finalize_success(self);
async fn finalize_failure(self);
}

pub trait Request {
Expand Down
1 change: 1 addition & 0 deletions drop-transfer/src/ws/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ impl RunContext<'_> {
"WS connection broke for {}: {err:?}",
xfer.id()
);
handler.finalize_failure().await;
} else {
info!(self.logger, "Sucesfully finalizing transfer loop");
handler.finalize_success().await;
Expand Down
28 changes: 19 additions & 9 deletions drop-transfer/src/ws/server/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,18 @@ impl<const PING: bool> HandlerLoop<'_, PING> {
self.stop_task(&file_id, Status::FileRejected).await;
}
}

fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
let jobs = std::mem::take(&mut self.jobs);

async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -359,20 +371,18 @@ impl<const PING: bool> handler::HandlerLoop for HandlerLoop<'_, PING> {
async fn finalize_success(mut self) {
debug!(self.logger, "Finalizing");
}

// While the destructor ensures the events are paused this function waits for
// the execution to be finished
async fn finalize_failure(mut self) {
self.take_pause_futures().await;
}
}

impl<const PING: bool> Drop for HandlerLoop<'_, PING> {
fn drop(&mut self) {
debug!(self.logger, "Stopping server handler");

let jobs = std::mem::take(&mut self.jobs);
tokio::spawn(async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
});
tokio::spawn(self.take_pause_futures());
}
}

Expand Down
28 changes: 19 additions & 9 deletions drop-transfer/src/ws/server/v4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,18 @@ impl HandlerLoop<'_> {
}
}
}

fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
let jobs = std::mem::take(&mut self.jobs);

async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -476,20 +488,18 @@ impl handler::HandlerLoop for HandlerLoop<'_> {
.map(|tmp| (tmp.base_path, FileId::from(tmp.file_id))),
);
}

// While the destructor ensures the events are paused this function waits for
// the execution to be finished
async fn finalize_failure(mut self) {
self.take_pause_futures().await;
}
}

impl Drop for HandlerLoop<'_> {
fn drop(&mut self) {
debug!(self.logger, "Stopping server handler");

let jobs = std::mem::take(&mut self.jobs);
tokio::spawn(async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
});
tokio::spawn(self.take_pause_futures());
}
}

Expand Down
28 changes: 19 additions & 9 deletions drop-transfer/src/ws/server/v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,18 @@ impl HandlerLoop<'_> {
}
}
}

fn take_pause_futures(&mut self) -> impl Future<Output = ()> {
let jobs = std::mem::take(&mut self.jobs);

async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -535,20 +547,18 @@ impl handler::HandlerLoop for HandlerLoop<'_> {
.map(|tmp| (tmp.base_path, FileId::from(tmp.file_id))),
);
}

// While the destructor ensures the events are paused this function waits for
// the execution to be finished
async fn finalize_failure(mut self) {
self.take_pause_futures().await;
}
}

impl Drop for HandlerLoop<'_> {
fn drop(&mut self) {
debug!(self.logger, "Stopping server handler");

let jobs = std::mem::take(&mut self.jobs);
tokio::spawn(async move {
let tasks = jobs.into_values().map(|task| async move {
task.events.pause().await;
});

futures::future::join_all(tasks).await;
});
tokio::spawn(self.take_pause_futures());
}
}

Expand Down
11 changes: 11 additions & 0 deletions test/drop_test/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,17 @@ def __str__(self):
return f"DrainEvents({self._count})"


class ClearEventQueue(Action):
def __init__(self):
pass

async def run(self, drop: ffi.Drop):
_ = await drop._events.gather_all(0)

def __str__(self):
return "ClearEventQueue()"


class NoEvent(Action):
def __init__(self, duration: int = 3):
self._duration = duration
Expand Down
196 changes: 196 additions & 0 deletions test/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -2664,6 +2664,202 @@
),
},
),
Scenario(
"scenario11-4",
"Initate a transfers with multiple files. Wait for them to start and then stop the sender. Then restart sender and expect transfer to finish successfuly",
{
"DROP_PEER_REN": ActionList(
[
action.WaitForAnotherPeer("DROP_PEER_STIMPY"),
action.Start("DROP_PEER_REN", dbpath="/tmp/db/11-4-ren.sqlite"),
action.NewTransfer(
"DROP_PEER_STIMPY",
[
"/tmp/testfile-bulk-01",
"/tmp/testfile-bulk-02",
"/tmp/testfile-bulk-03",
"/tmp/testfile-bulk-04",
"/tmp/testfile-bulk-05",
"/tmp/testfile-bulk-06",
"/tmp/testfile-bulk-07",
"/tmp/testfile-bulk-08",
"/tmp/testfile-bulk-09",
"/tmp/testfile-bulk-10",
],
),
# fmt: off
action.Wait(
event.Queued(0, "DROP_PEER_STIMPY", [
norddrop.QueuedFile(FILES["testfile-bulk-01"].id, "testfile-bulk-01", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-02"].id, "testfile-bulk-02", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-03"].id, "testfile-bulk-03", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-04"].id, "testfile-bulk-04", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-05"].id, "testfile-bulk-05", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-06"].id, "testfile-bulk-06", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-07"].id, "testfile-bulk-07", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-08"].id, "testfile-bulk-08", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-09"].id, "testfile-bulk-09", 10485760, "/tmp"),
norddrop.QueuedFile(FILES["testfile-bulk-10"].id, "testfile-bulk-10", 10485760, "/tmp"),
]),
),
# Wait for some of the files and the stop
action.Repeated([action.WaitForOneOf([
event.Start(0, FILES["testfile-bulk-01"].id),
event.Start(0, FILES["testfile-bulk-02"].id),
event.Start(0, FILES["testfile-bulk-03"].id),
event.Start(0, FILES["testfile-bulk-04"].id),
event.Start(0, FILES["testfile-bulk-05"].id),
event.Start(0, FILES["testfile-bulk-06"].id),
event.Start(0, FILES["testfile-bulk-07"].id),
event.Start(0, FILES["testfile-bulk-08"].id),
event.Start(0, FILES["testfile-bulk-09"].id),
event.Start(0, FILES["testfile-bulk-10"].id),
])], 4),
# fmt: on
action.Stop(),
# fmt: off
# Wait of paused events
action.WaitAndIgnoreExcept([
event.Paused(0, FILES["testfile-bulk-01"].id),
event.Paused(0, FILES["testfile-bulk-02"].id),
event.Paused(0, FILES["testfile-bulk-03"].id),
event.Paused(0, FILES["testfile-bulk-04"].id),
event.Paused(0, FILES["testfile-bulk-05"].id),
event.Paused(0, FILES["testfile-bulk-06"].id),
event.Paused(0, FILES["testfile-bulk-07"].id),
event.Paused(0, FILES["testfile-bulk-08"].id),
event.Paused(0, FILES["testfile-bulk-09"].id),
event.Paused(0, FILES["testfile-bulk-10"].id),
]),
# fmt: on
action.Sleep(2),
action.ClearEventQueue(),
action.Start("DROP_PEER_REN", dbpath="/tmp/db/11-4-ren.sqlite"),
# fmt: off
# Wait for some of the files and the stop
action.WaitRacy([
event.Start(0, FILES["testfile-bulk-01"].id, None),
event.Start(0, FILES["testfile-bulk-02"].id, None),
event.Start(0, FILES["testfile-bulk-03"].id, None),
event.Start(0, FILES["testfile-bulk-04"].id, None),
event.Start(0, FILES["testfile-bulk-05"].id, None),
event.Start(0, FILES["testfile-bulk-06"].id, None),
event.Start(0, FILES["testfile-bulk-07"].id, None),
event.Start(0, FILES["testfile-bulk-08"].id, None),
event.Start(0, FILES["testfile-bulk-09"].id, None),
event.Start(0, FILES["testfile-bulk-10"].id, None),
event.FinishFileUploaded(0, FILES["testfile-bulk-01"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-02"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-03"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-04"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-05"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-06"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-07"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-08"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-09"].id),
event.FinishFileUploaded(0, FILES["testfile-bulk-10"].id),
]),
# fmt: on
action.ExpectCancel([0], True),
]
),
"DROP_PEER_STIMPY": ActionList(
[
action.Start("DROP_PEER_STIMPY"),
# fmt: off
action.Wait(
event.Receive(0, "DROP_PEER_REN", [
norddrop.ReceivedFile(FILES["testfile-bulk-01"].id, "testfile-bulk-01", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-02"].id, "testfile-bulk-02", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-03"].id, "testfile-bulk-03", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-04"].id, "testfile-bulk-04", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-05"].id, "testfile-bulk-05", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-06"].id, "testfile-bulk-06", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-07"].id, "testfile-bulk-07", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-08"].id, "testfile-bulk-08", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-09"].id, "testfile-bulk-09", 10485760),
norddrop.ReceivedFile(FILES["testfile-bulk-10"].id, "testfile-bulk-10", 10485760),
]),
),
# fmt: on
# fmt: off
action.Download(0, FILES["testfile-bulk-01"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-02"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-03"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-04"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-05"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-06"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-07"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-08"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-09"].id, "/tmp/received/11-4"),
action.Download(0, FILES["testfile-bulk-10"].id, "/tmp/received/11-4"),
# Wait for all 10 pending events
action.Repeated([action.WaitForOneOf([
event.Pending(0, FILES["testfile-bulk-01"].id),
event.Pending(0, FILES["testfile-bulk-02"].id),
event.Pending(0, FILES["testfile-bulk-03"].id),
event.Pending(0, FILES["testfile-bulk-04"].id),
event.Pending(0, FILES["testfile-bulk-05"].id),
event.Pending(0, FILES["testfile-bulk-06"].id),
event.Pending(0, FILES["testfile-bulk-07"].id),
event.Pending(0, FILES["testfile-bulk-08"].id),
event.Pending(0, FILES["testfile-bulk-09"].id),
event.Pending(0, FILES["testfile-bulk-10"].id),
event.Start(0, FILES["testfile-bulk-01"].id),
event.Start(0, FILES["testfile-bulk-02"].id),
event.Start(0, FILES["testfile-bulk-03"].id),
event.Start(0, FILES["testfile-bulk-04"].id),
event.Start(0, FILES["testfile-bulk-05"].id),
event.Start(0, FILES["testfile-bulk-06"].id),
event.Start(0, FILES["testfile-bulk-07"].id),
event.Start(0, FILES["testfile-bulk-08"].id),
event.Start(0, FILES["testfile-bulk-09"].id),
event.Start(0, FILES["testfile-bulk-10"].id),
]),
], 10),
action.WaitAndIgnoreExcept([
event.Paused(0, FILES["testfile-bulk-01"].id),
event.Paused(0, FILES["testfile-bulk-02"].id),
event.Paused(0, FILES["testfile-bulk-03"].id),
event.Paused(0, FILES["testfile-bulk-04"].id),
event.Paused(0, FILES["testfile-bulk-05"].id),
event.Paused(0, FILES["testfile-bulk-06"].id),
event.Paused(0, FILES["testfile-bulk-07"].id),
event.Paused(0, FILES["testfile-bulk-08"].id),
event.Paused(0, FILES["testfile-bulk-09"].id),
event.Paused(0, FILES["testfile-bulk-10"].id),
]),
# The sender is stopped
action.WaitRacy([
event.Start(0, FILES["testfile-bulk-01"].id, None),
event.Start(0, FILES["testfile-bulk-02"].id, None),
event.Start(0, FILES["testfile-bulk-03"].id, None),
event.Start(0, FILES["testfile-bulk-04"].id, None),
event.Start(0, FILES["testfile-bulk-05"].id, None),
event.Start(0, FILES["testfile-bulk-06"].id, None),
event.Start(0, FILES["testfile-bulk-07"].id, None),
event.Start(0, FILES["testfile-bulk-08"].id, None),
event.Start(0, FILES["testfile-bulk-09"].id, None),
event.Start(0, FILES["testfile-bulk-10"].id, None),
event.FinishFileDownloaded(0, FILES["testfile-bulk-01"].id, "/tmp/received/11-4/testfile-bulk-01"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-02"].id, "/tmp/received/11-4/testfile-bulk-02"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-03"].id, "/tmp/received/11-4/testfile-bulk-03"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-04"].id, "/tmp/received/11-4/testfile-bulk-04"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-05"].id, "/tmp/received/11-4/testfile-bulk-05"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-06"].id, "/tmp/received/11-4/testfile-bulk-06"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-07"].id, "/tmp/received/11-4/testfile-bulk-07"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-08"].id, "/tmp/received/11-4/testfile-bulk-08"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-09"].id, "/tmp/received/11-4/testfile-bulk-09"),
event.FinishFileDownloaded(0, FILES["testfile-bulk-10"].id, "/tmp/received/11-4/testfile-bulk-10"),
]),
# fmt: on
action.CancelTransferRequest([0]),
action.ExpectCancel([0], False),
action.Stop(),
]
),
},
),
Scenario(
"scenario12-1",
"Transfer file to two clients simultaneously",
Expand Down

0 comments on commit b1e9f40

Please sign in to comment.