-
Notifications
You must be signed in to change notification settings - Fork 255
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: buffer Framed<.., Codec>
packets
#826
Conversation
* refactor: implement size for all v5 packet types * refactor: implement `Packet.size()` for v4
* refactor: `Packet::read` * refactor: `Packet::write` * test: fix changes in refactor
Defaults to 10
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update!
I don't understand why there's homebrewn counting of the outgoing packets. The counter also includes packets generated in response to incoming packets, so the name of the configuration is misleading.
The manual count + flush also works against the capacity of Framed
(which defaults here). Wouldn't it be better to leave the need to a implicit flush to Framed
? Flushing must still happen after each batch of incoming packets and requests.
I totally agree with wrapping of SinkExt::flush
with the timeout.
My suggestion is:
- rename
max_request_batch
toio_buffer_capacity
. Apply that whenFramed
is constructed. - Remove
Network::write
and useSinkExt::feed
instead (or write a wrapper if you wan't to hide theSink
) - Keep
Network::flush
with the timeout feature.
cheers
@@ -185,7 +185,7 @@ impl EventLoop { | |||
o = network.read() => { | |||
let incoming = o?; | |||
if let Some(packet) = self.state.handle_incoming_packet(incoming)? { | |||
network.send(packet).await?; | |||
network.write(packet).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This packet is never flushed unless there's a ping or options.max_request_batch - 1
more packets "sent".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had the same doubt, was using tests as a crutch, but I see how even the tests aren't complete.
What type of acks should we instantly respond to, I guess only PingResp
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not so deep in the MQTT standard but think it's just PingResp
.
How does
I would like to keep write and wrap the internal workings for the time being, mainly because this isn't directly exposed to the user and readability of code is important for maintenance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments. My suggestion is to move the "when to flush" logic to EventLoop
. See comments.
@@ -185,7 +185,7 @@ impl EventLoop { | |||
o = network.read() => { | |||
let incoming = o?; | |||
if let Some(packet) = self.state.handle_incoming_packet(incoming)? { | |||
network.send(packet).await?; | |||
network.write(packet).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not so deep in the MQTT standard but think it's just PingResp
.
.await | ||
.map_err(StateError::Deserialization)?; | ||
|
||
if should_flush || self.framed.write_buffer().len() + packet_size >= self.buffer_capacity { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FramedWrite
does that job for you here. I think removing the manual buffer size check is good and avoid not necessary complexity here.
I'd also do the "when to flush" logic in EventLoop
and not in here because it must reflect MQTT and the IO part should be agnostic.
Add a loop (up to e.g 10) around the select!
that breaks if pending
or request_rx
is empty and the rx buffer of Framed
is empty after a fill_buf
. There is no need for extra logic in Network
.
like this:
for _ in 0..10 {
select! {
r = next_request() => {
packet = state.process(r)
network.feed(packet)
}
p = network.read() => {
packet = state.handle_incoming(p)
network.feed(packet)
if matches(packet, PingResp) {
// Ensure the flush by breaking.
break;
}
}
_ = ping_interval.tick() => {
packet = PingReq
network.feed(packet)
}
}
let data_pending = network.fill_rx_buf(); // get inner of framed; get read_buffer_mut, fill it.
// Break if there is nothing to do.
if pending.is_empty() && requests_rx.is_empty() && !data_pending() {
break;
}
}
network.flush()
pub async fn write(&mut self, packet: Packet) -> Result<(), StateError> { | ||
let packet_size = packet.size(); | ||
let should_flush = match packet { | ||
Packet::Connect(..) | Packet::PingReq(_) | Packet::PingResp(_) => true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
closed in favor of #823 |
resolves #810 on top of #825
Defaults to buffering 10 packets before each flush, behavior can be changed with
MqttOptions.set_max_request_batch()
. Instantly flushes outgoingPingReq
as well(required to not close the connection).Type of change
Checklist:
cargo fmt
CHANGELOG.md
if it's relevant to the users of the library. If it's not relevant mention why.