Skip to content

Commit

Permalink
don't reuse sequence numbers
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed Nov 23, 2023
1 parent df26f90 commit c8089a6
Showing 1 changed file with 64 additions and 32 deletions.
96 changes: 64 additions & 32 deletions source/calliope/src/req_rsp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ pub enum DispatchError {
ConnectionReset(message::Reset),
}

enum RequestError {
Reset(message::Reset),
SeqInUse,
}

// === impl Client ===

impl<S> Client<S>
Expand All @@ -98,29 +103,40 @@ where

pub async fn request(&self, body: S::Request) -> Result<S::Response, message::Reset> {
#[cfg_attr(debug_assertions, allow(unreachable_code))]
let handle_wait_error = |err: WaitError| -> message::Reset {
match err {
WaitError::Closed => {
let error = self.channel.tx().try_reserve().expect_err("if the waitmap was closed, then the channel should have been closed with an error!");
if let TrySendError::Error { error, .. } = error {
return error;
}

#[cfg(debug_assertions)]
unreachable!("closing the channel with an error should have priority over full/disconnected errors.");

message::Reset::BecauseISaidSo
let handle_wait_error = |err: WaitError| match err {
WaitError::Closed => {
let error = self.channel.tx().try_reserve().expect_err(
"if the waitmap was closed, then the channel should \
have been closed with an error!",
);
if let TrySendError::Error { error, .. } = error {
return RequestError::Reset(error);
}
WaitError::Duplicate => panic!("sequence number was reused, implying we overflowed a usize! this is real bad news..."),
WaitError::AlreadyConsumed => unreachable!("data should not already be consumed, this is a bug"),
WaitError::NeverAdded => unreachable!("we ensured the waiter was added, this is a bug!"),
error => {
#[cfg(debug_assertions)]
todo!("james added a new WaitError variant that we don't know how to handle: {error:}");

#[cfg_attr(debug_assertions, allow(unreachable_code))]
message::Reset::BecauseISaidSo
}
#[cfg(debug_assertions)]
unreachable!(
"closing the channel with an error should have priority \
over full/disconnected errors."
);

RequestError::Reset(message::Reset::BecauseISaidSo)
}
WaitError::Duplicate => RequestError::SeqInUse,
WaitError::AlreadyConsumed => {
unreachable!("data should not already be consumed, this is a bug")
}
WaitError::NeverAdded => {
unreachable!("we ensured the waiter was added, this is a bug!")
}
error => {
#[cfg(debug_assertions)]
todo!(
"james added a new WaitError variant that we don't \
know how to handle: {error:}"
);

#[cfg_attr(debug_assertions, allow(unreachable_code))]
RequestError::Reset(message::Reset::BecauseISaidSo)
}
};

Expand All @@ -130,18 +146,34 @@ where
SendError::Disconnected(()) => message::Reset::BecauseISaidSo,
SendError::Error { error, .. } => error,
})?;
let seq = self.seq.fetch_add(1, Ordering::Relaxed);
let req = Request { seq, body };

// ensure waiter is enqueued before sending the request.
let wait = self.dispatcher.wait(seq);
pin_mut!(wait);
wait.as_mut().enqueue().await.map_err(handle_wait_error)?;

// actually send the message...
permit.send(req);
loop {
let seq = self.seq.fetch_add(1, Ordering::Relaxed);
// ensure waiter is enqueued before sending the request.
let wait = self.dispatcher.wait(seq);
pin_mut!(wait);
match wait.as_mut().enqueue().await.map_err(handle_wait_error) {
Ok(_) => {}
Err(RequestError::Reset(reset)) => return Err(reset),
Err(RequestError::SeqInUse) => {
tracing::trace!(seq, "sequence number in use, retrying...");
continue;
}
};

wait.await.map_err(handle_wait_error)
let req = Request { seq, body };
// actually send the message...
permit.send(req);

return match wait.await.map_err(handle_wait_error) {
Ok(rsp) => Ok(rsp),
Err(RequestError::Reset(reset)) => Err(reset),
Err(RequestError::SeqInUse) => unreachable!(
"we should have already enqueued the waiter, so its \
sequence number should be okay. this is a bug!"
),
};
}
}

/// Shut down the client dispatcher for this `Client`.
Expand Down Expand Up @@ -185,7 +217,7 @@ where
// shutdown.
let msg = select_biased! {
_ = self.shutdown.wait().fuse() => {
tracing::debug!("client dispatcher `sshutting down...");
tracing::debug!("client dispatcher `shutting down...");
return Ok(());
}
msg = self.channel.rx().recv().fuse() => msg,
Expand Down

0 comments on commit c8089a6

Please sign in to comment.