From 2869e8a09613ebd2ab67c8881dfdd2d897a978f2 Mon Sep 17 00:00:00 2001 From: Jens Reidel Date: Tue, 10 Sep 2024 04:30:07 +0200 Subject: [PATCH] Make flush threshold configurable Signed-off-by: Jens Reidel --- src/proto/stream.rs | 2 +- src/proto/types.rs | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/proto/stream.rs b/src/proto/stream.rs index f1a03e55b66..8018812edc1 100644 --- a/src/proto/stream.rs +++ b/src/proto/stream.rs @@ -311,7 +311,7 @@ where fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // tokio-util calls poll_flush when more than 8096 bytes are pending, otherwise // it returns Ready. We will just replicate that behavior - if self.pending_bytes >= 8096 { + if self.pending_bytes >= self.inner.decoder().limits.flush_threshold { self.as_mut().poll_flush(cx) } else { Poll::Ready(Ok(())) diff --git a/src/proto/types.rs b/src/proto/types.rs index 993c8f7a7fb..52a1396dfc3 100644 --- a/src/proto/types.rs +++ b/src/proto/types.rs @@ -563,6 +563,9 @@ pub struct Limits { /// The maximum allowed payload length. The default /// is 64 MiB. pub(super) max_payload_len: usize, + /// Threshold of queued up bytes after which the underlying I/O is flushed + /// before the sink is declared ready. The default is 8 KiB. + pub(super) flush_threshold: usize, } impl Limits { @@ -571,6 +574,7 @@ impl Limits { pub fn unlimited() -> Self { Self { max_payload_len: usize::MAX, + flush_threshold: usize::MAX, } } @@ -582,12 +586,23 @@ impl Limits { self } + + /// Sets the threshold of queued up bytes after which the underlying I/O is + /// flushed before the sink is declared ready. `None` equals no limit. The + /// default is 8 KiB. + #[must_use] + pub fn flush_threshold(mut self, threshold: Option) -> Self { + self.flush_threshold = threshold.unwrap_or(usize::MAX); + + self + } } impl Default for Limits { fn default() -> Self { Self { max_payload_len: 64 * 1024 * 1024, + flush_threshold: 8 * 1024, } } }