Skip to content

Commit

Permalink
add release_queue() to Client and Eventloop (#53)
Browse files Browse the repository at this point in the history
* add release_producer and release_consumer

If both are available they can be given back to BBQueu with try_release_framed(prod, cons)

* rename release_consumer() and release_producer()
  • Loading branch information
andresv authored Oct 11, 2022
1 parent 63a3a76 commit 52806b6
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 27 deletions.
5 changes: 4 additions & 1 deletion mqttrust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ pub use encoding::v4::{
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "defmt-impl", derive(defmt::Format))]
pub enum MqttError {
/// Queue full, cannot send/receive more packets
Full,
/// RefCell borrow fault
Borrow,
Overflow,
/// Needed resource is unavailable
Unavailable,
}

pub trait Mqtt {
Expand Down
38 changes: 23 additions & 15 deletions mqttrust_core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,24 @@ use mqttrust::{
/// event loop. Length in number of request packets
pub struct Client<'a, 'b, const L: usize> {
client_id: &'b str,
producer: RefCell<FrameProducer<'a, L>>,
producer: Option<RefCell<FrameProducer<'a, L>>>,
}

impl<'a, 'b, const L: usize> Client<'a, 'b, L> {
pub fn new(producer: FrameProducer<'a, L>, client_id: &'b str) -> Self {
Self {
client_id,
producer: RefCell::new(producer),
producer: Some(RefCell::new(producer)),
}
}

/// Release `FrameProducer`
///
/// This can be called before dropping `Client` to get back original `FrameProducer`.
pub fn release_queue(&mut self) -> Option<FrameProducer<'a, L>> {
match self.producer.take() {
Some(prod) => Some(prod.into_inner()),
None => None,
}
}
}
Expand All @@ -41,18 +51,16 @@ impl<'a, 'b, const L: usize> Mqtt for Client<'a, 'b, L> {
}

fn send(&self, packet: Packet<'_>) -> Result<(), MqttError> {
let mut prod = self
.producer
.try_borrow_mut()
.map_err(|_| MqttError::Borrow)?;

let max_size = packet.len();
let mut grant = prod.grant(max_size).map_err(|_| MqttError::Full)?;

let len = encode_slice(&packet, grant.deref_mut()).map_err(|_| MqttError::Full)?;

grant.commit(len);

Ok(())
match &self.producer {
Some(producer) => {
let mut prod = producer.try_borrow_mut().map_err(|_| MqttError::Borrow)?;
let max_size = packet.len();
let mut grant = prod.grant(max_size).map_err(|_| MqttError::Full)?;
let len = encode_slice(&packet, grant.deref_mut()).map_err(|_| MqttError::Full)?;
grant.commit(len);
Ok(())
}
None => Err(MqttError::Unavailable),
}
}
}
34 changes: 23 additions & 11 deletions mqttrust_core/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ where
/// Options of the current mqtt connection
pub options: MqttOptions<'b>,
/// Request stream
pub(crate) requests: FrameConsumer<'a, L>,
pub(crate) requests: Option<FrameConsumer<'a, L>>,
network_handle: NetworkHandle<S>,
}

Expand All @@ -39,11 +39,18 @@ where
state: MqttState::new(),
last_outgoing_timer: outgoing_timer,
options,
requests,
requests: Some(requests),
network_handle: NetworkHandle::new(),
}
}

/// Release `FrameConsumer`
///
/// This can be called before dropping `EventLoop` to get back original `FrameConsumer`.
pub fn release_queue(&mut self) -> Option<FrameConsumer<'a, L>> {
self.requests.take()
}

pub fn connect<N: Dns + TcpClientStack<TcpSocket = S> + ?Sized>(
&mut self,
network: &mut N,
Expand Down Expand Up @@ -118,17 +125,22 @@ where

// Handle a request
if self.should_handle_request() {
if let Some(mut grant) = self.requests.read() {
let mut packet = SerializedPacket(grant.deref_mut());
match self.state.handle_outgoing_request(&mut packet, &now) {
Ok(()) => {
self.network_handle.send(network, packet.to_inner())?;
grant.release();
return Err(nb::Error::WouldBlock);
match &mut self.requests {
Some(requests) => {
if let Some(mut grant) = requests.read() {
let mut packet = SerializedPacket(grant.deref_mut());
match self.state.handle_outgoing_request(&mut packet, &now) {
Ok(()) => {
self.network_handle.send(network, packet.to_inner())?;
grant.release();
return Err(nb::Error::WouldBlock);
}
Err(crate::state::StateError::MaxMessagesInflight) => {}
Err(e) => return Err(nb::Error::Other(e.into())),
}
}
Err(crate::state::StateError::MaxMessagesInflight) => {}
Err(e) => return Err(nb::Error::Other(e.into())),
}
None => return Err(nb::Error::Other(EventError::RequestsNotAvailable)),
}
}

Expand Down
1 change: 1 addition & 0 deletions mqttrust_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub enum EventError {
Network(NetworkError),
BufferSize,
Clock,
RequestsNotAvailable,
}

#[derive(Debug, PartialEq)]
Expand Down

0 comments on commit 52806b6

Please sign in to comment.