Skip to content

Commit

Permalink
sim-lib: select on sending channel and shutdown listener
Browse files Browse the repository at this point in the history
  • Loading branch information
carlaKC committed Aug 1, 2024
1 parent 51f221e commit 11f313b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 17 deletions.
20 changes: 12 additions & 8 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,20 @@ simulation, we relay on the following:
and a `Listener` that propagates this signal.
2. The (`Trigger`, `Listener`) pair are used with channels: if a channel
errors out across `send()` or `recv()`, shutdown is triggered. There is
no reliance on channel mechanics, i.e. errors generated when all senders
are and/or a receiver is dropped.
no reliance on channel mechanics, i.e. that receiving channels will error
out when all of their sending channels are dropped.
3. All events are handled in a `tokio::select` to allow waiting on
multiple asynchronous tasks at once. These selects should be `biased`
on the exit case (ie, the `Listener` being triggered) so that we
prioritize exit above generating more events.
4. Additionally, we `select!` on shutdown signal on `send()`/`recv()`
for all channels to guarantee this:
- A task's receiver exiting while one or more corresponding senders
(in different tasks) are actively sending, doesn't result in the
sending tasks erroring due to channel `SendError`. Any sender's
inability to `send()` due to a dropped receiver triggers a clean
shutdown across all listening tasks.
for all channels to guarantee shutdown:
- When the signal to shutdown is received, it is possible that a
task responsible for consuming on the `Receiver` channel exits when
multiple tasks are still attempting to send to it.
- By using `select` with all `send()` instructions, we ensure that
the senders will exit cleanly, rather than block waiting on a
receiver that has already exited to consume its send.
- An alternative to this approach would be to use `receiver.close()`
and drain all items from the channel (resulting in unblocking the
senders).
45 changes: 36 additions & 9 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1053,8 +1053,17 @@ async fn consume_events(
}
};

if sender.send(outcome.clone()).await.is_err() {
return Err(SimulationError::MpscChannelError(format!("Error sending simulation output {outcome:?}.")));
select!{
biased;
_ = listener.clone() => {
return Ok(())
}
send_result = sender.send(outcome.clone()) => {
if send_result.is_err() {
return Err(SimulationError::MpscChannelError(
format!("Error sending simulation output {outcome:?}.")));
}
}
}
}
}
Expand Down Expand Up @@ -1118,8 +1127,17 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +

// Send the payment, exiting if we can no longer send to the consumer.
let event = SimulationEvent::SendPayment(destination.clone(), amount);
if sender.send(event.clone()).await.is_err() {
return Err(SimulationError::MpscChannelError (format!("Stopped activity producer for {amount}: {source} -> {destination}.")));
select!{
biased;
_ = listener.clone() => {
return Ok(());
},
send_result = sender.send(event.clone()) => {
if send_result.is_err(){
return Err(SimulationError::MpscChannelError(
format!("Stopped activity producer for {amount}: {source} -> {destination}.")));
}
},
}

current_count += 1;
Expand Down Expand Up @@ -1314,10 +1332,18 @@ async fn produce_simulation_results(
}
},
SimulationOutput::SendPaymentFailure(payment, result) => {
if results.send((payment, result.clone())).await.is_err() {
break Err(SimulationError::MpscChannelError(
format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.", payment.hash, payment.dispatch_time),
));
select!{
_ = listener.clone() => {
return Ok(());
},
send_result = results.send((payment, result.clone())) => {
if send_result.is_err(){
break Err(SimulationError::MpscChannelError(
format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.",
payment.hash, payment.dispatch_time),
));
}
},
}
}
};
Expand Down Expand Up @@ -1384,7 +1410,8 @@ async fn track_payment_result(
},
send_payment_result = results.send((payment, res.clone())) => {
if send_payment_result.is_err() {
return Err(SimulationError::MpscChannelError(format!("Failed to send payment result {res} for payment {payment}.")))
return Err(SimulationError::MpscChannelError(
format!("Failed to send payment result {res} for payment {payment}.")))
}
}
}
Expand Down

0 comments on commit 11f313b

Please sign in to comment.