Skip to content

Commit

Permalink
Fix auto pong responses not flushing after block
Browse files Browse the repository at this point in the history
Retry pong flushes on read.
Add read_usage_auto_pong_flush scenario test
  • Loading branch information
alexheretic committed Dec 2, 2023
1 parent 0f6e651 commit 70dee81
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Unreleased (0.20.2)
- Fix read-predominant auto pong responses not flushing when hitting WouldBlock errors.
- Improve `FrameHeader::format` write correctness.
- Up minimum _rustls_ to `0.21.6`.

Expand Down
31 changes: 20 additions & 11 deletions src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@ use self::{
},
message::{IncompleteMessage, IncompleteMessageType},
};
use crate::{
error::{Error, ProtocolError, Result},
util::NonBlockingResult,
};
use crate::error::{Error, ProtocolError, Result};
use log::*;
use std::{
io::{ErrorKind as IoErrorKind, Read, Write},
io::{self, Read, Write},
mem::replace,
};

Expand Down Expand Up @@ -313,6 +310,9 @@ pub struct WebSocketContext {
incomplete: Option<IncompleteMessage>,
/// Send in addition to regular messages E.g. "pong" or "close".
additional_send: Option<Frame>,
/// True indicates there is an additional message (like a pong)
/// that failed to flush previously and we should try again.
unflushed_additional: bool,
/// The configuration for the websocket session.
config: WebSocketConfig,
}
Expand Down Expand Up @@ -344,6 +344,7 @@ impl WebSocketContext {
state: WebSocketState::Active,
incomplete: None,
additional_send: None,
unflushed_additional: false,
config,
}
}
Expand Down Expand Up @@ -391,10 +392,16 @@ impl WebSocketContext {
self.state.check_not_terminated()?;

loop {
if self.additional_send.is_some() {
if self.additional_send.is_some() || self.unflushed_additional {
// Since we may get ping or close, we need to reply to the messages even during read.
// Thus we flush but ignore its blocking.
self.flush(stream).no_block()?;
match self.flush(stream) {
Ok(_) => {}
Err(Error::Io(err)) if err.kind() == io::ErrorKind::WouldBlock => {
// If blocked continue reading, but try again later
self.unflushed_additional = true;
}
Err(err) => return Err(err),
}
} else if self.role == Role::Server && !self.state.can_read() {
self.state = WebSocketState::Terminated;
return Err(Error::ConnectionClosed);
Expand Down Expand Up @@ -462,7 +469,9 @@ impl WebSocketContext {
{
self._write(stream, None)?;
self.frame.write_out_buffer(stream)?;
Ok(stream.flush()?)
stream.flush()?;
self.unflushed_additional = false;
Ok(())
}

/// Writes any data in the out_buffer, `additional_send` and given `data`.
Expand Down Expand Up @@ -495,7 +504,7 @@ impl WebSocketContext {
Ok(_) => true,
}
} else {
false
self.unflushed_additional
};

// If we're closing and there is nothing to send anymore, we should close the connection.
Expand Down Expand Up @@ -774,7 +783,7 @@ impl<T> CheckConnectionReset for Result<T> {
fn check_connection_reset(self, state: WebSocketState) -> Self {
match self {
Err(Error::Io(io_error)) => Err({
if !state.can_read() && io_error.kind() == IoErrorKind::ConnectionReset {
if !state.can_read() && io_error.kind() == io::ErrorKind::ConnectionReset {
Error::ConnectionClosed
} else {
Error::Io(io_error)
Expand Down
124 changes: 124 additions & 0 deletions tests/auto_pong_flush.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::{
io::{self, Cursor, Read, Write},
mem,
};
use tungstenite::{
protocol::frame::{
coding::{Control, OpCode},
Frame, FrameHeader,
},
Message, WebSocket,
};

const NUMBER_OF_FLUSHES_TO_GET_IT_TO_WORK: usize = 3;

/// `Read`/`Write` mock.
/// * Reads a single ping, then returns `WouldBlock` forever after.
/// * Writes work fine.
/// * Flush `WouldBlock` twice then works on the 3rd attempt.
#[derive(Debug, Default)]
struct MockWrite {
/// Data written, but not flushed.
written_data: Vec<u8>,
/// The latest successfully flushed data.
flushed_data: Vec<u8>,
write_calls: usize,
flush_calls: usize,
read_calls: usize,
}

impl Read for MockWrite {
fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
self.read_calls += 1;
if self.read_calls == 1 {
let ping = Frame::ping(vec![]);
let len = ping.len();
ping.format(&mut buf).expect("format failed");
Ok(len)
} else {
Err(io::Error::new(io::ErrorKind::WouldBlock, "nothing else to read"))
}
}
}
impl Write for MockWrite {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_calls += 1;
self.written_data.write(buf)
}

fn flush(&mut self) -> io::Result<()> {
self.flush_calls += 1;
if self.flush_calls % NUMBER_OF_FLUSHES_TO_GET_IT_TO_WORK == 0 {
mem::swap(&mut self.written_data, &mut self.flushed_data);
self.written_data.clear();
eprintln!("flush success");
Ok(())
} else {
eprintln!("flush would block");
Err(io::Error::new(io::ErrorKind::WouldBlock, "try again"))
}
}
}

/// Test for auto pong write & flushing behaviour.
///
/// In read-only/read-predominant usage auto pong responses should be written and flushed
/// even if WouldBlock errors are encountered.
#[test]
fn read_usage_auto_pong_flush() {
let mut ws =
WebSocket::from_raw_socket(MockWrite::default(), tungstenite::protocol::Role::Client, None);

// Receiving a ping should auto scheduled a pong on next read or write (but not written yet).
let msg = ws.read().unwrap();
assert!(matches!(msg, Message::Ping(_)), "Unexpected msg {:?}", msg);
assert_eq!(ws.get_ref().read_calls, 1);
assert!(ws.get_ref().written_data.is_empty(), "Unexpected {:?}", ws.get_ref());
assert!(ws.get_ref().flushed_data.is_empty(), "Unexpected {:?}", ws.get_ref());

// Next read fails as there is nothing else to read.
// This read call should have tried to write & flush a pong response, with the flush WouldBlock-ing
let next = ws.read().unwrap_err();
assert!(
matches!(next, tungstenite::Error::Io(ref err) if err.kind() == io::ErrorKind::WouldBlock),
"Unexpected read err {:?}",
next
);
assert_eq!(ws.get_ref().read_calls, 2);
assert!(!ws.get_ref().written_data.is_empty(), "Should have written a pong frame");
assert_eq!(ws.get_ref().write_calls, 1);

let pong_header =
FrameHeader::parse(&mut Cursor::new(&ws.get_ref().written_data)).unwrap().unwrap().0;
assert_eq!(pong_header.opcode, OpCode::Control(Control::Pong));
let written_data = ws.get_ref().written_data.clone();

assert_eq!(ws.get_ref().flush_calls, 1);
assert!(ws.get_ref().flushed_data.is_empty(), "Unexpected {:?}", ws.get_ref());

// Next read fails as before.
// This read call should try to flush the pong again, which again WouldBlock
let next = ws.read().unwrap_err();
assert!(
matches!(next, tungstenite::Error::Io(ref err) if err.kind() == io::ErrorKind::WouldBlock),
"Unexpected read err {:?}",
next
);
assert_eq!(ws.get_ref().read_calls, 3);
assert_eq!(ws.get_ref().write_calls, 1);
assert_eq!(ws.get_ref().flush_calls, 2);
assert!(ws.get_ref().flushed_data.is_empty(), "Unexpected {:?}", ws.get_ref());

// Next read fails as before.
// This read call should try to flush the pong again, 3rd flush attempt is the charm
let next = ws.read().unwrap_err();
assert!(
matches!(next, tungstenite::Error::Io(ref err) if err.kind() == io::ErrorKind::WouldBlock),
"Unexpected read err {:?}",
next
);
assert_eq!(ws.get_ref().read_calls, 4);
assert_eq!(ws.get_ref().write_calls, 1);
assert_eq!(ws.get_ref().flush_calls, 3);
assert!(ws.get_ref().flushed_data == written_data, "Unexpected {:?}", ws.get_ref());
}

0 comments on commit 70dee81

Please sign in to comment.