diff --git a/src/proto/stream.rs b/src/proto/stream.rs index f1a03e55b66..88fb5fb4693 100644 --- a/src/proto/stream.rs +++ b/src/proto/stream.rs @@ -311,7 +311,7 @@ where fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // tokio-util calls poll_flush when more than 8096 bytes are pending, otherwise // it returns Ready. We will just replicate that behavior - if self.pending_bytes >= 8096 { + if self.pending_bytes >= self.config.flush_threshold { self.as_mut().poll_flush(cx) } else { Poll::Ready(Ok(())) diff --git a/src/proto/types.rs b/src/proto/types.rs index 993c8f7a7fb..263424d16c9 100644 --- a/src/proto/types.rs +++ b/src/proto/types.rs @@ -159,7 +159,7 @@ impl TryFrom for CloseCode { /// the backing type is [`Bytes`] with a reference counter greater than one. /// /// [`From`]: #impl-From-for-Payload -/// [`Into`]: #impl-From-for-BytesMut +/// [`Into`]: #impl-From-for-BytesMut pub struct Payload { /// The raw payload data. data: UnsafeCell, @@ -603,6 +603,9 @@ pub struct Config { /// Consider decreasing this if the remote imposes a limit on the frame /// payload size. The default is 4MiB. pub(super) frame_size: usize, + /// Threshold of queued up bytes after which the underlying I/O is flushed + /// before the sink is declared ready. The default is 8 KiB. + pub(super) flush_threshold: usize, } impl Config { @@ -616,12 +619,22 @@ impl Config { self } + + /// Sets the threshold of queued up bytes after which the underlying I/O is + /// flushed before the sink is declared ready. The default is 8 KiB. + #[must_use] + pub fn flush_threshold(mut self, threshold: usize) -> Self { + self.flush_threshold = threshold; + + self + } } impl Default for Config { fn default() -> Self { Self { frame_size: 4 * 1024 * 1024, + flush_threshold: 8 * 1024, } } }