From f3edf64d2a2d71c1e0b38dea910b470efbe51e08 Mon Sep 17 00:00:00 2001 From: Nathy-bajo Date: Wed, 4 Sep 2024 10:52:51 +0100 Subject: [PATCH] Add documentation for SyncIoBridge with examples and alternatives --- tokio-util/src/io/sync_bridge.rs | 160 ++++++++++++++++++++++++------- 1 file changed, 124 insertions(+), 36 deletions(-) diff --git a/tokio-util/src/io/sync_bridge.rs b/tokio-util/src/io/sync_bridge.rs index 0174b5a6cf2..68ce8367129 100644 --- a/tokio-util/src/io/sync_bridge.rs +++ b/tokio-util/src/io/sync_bridge.rs @@ -15,47 +15,121 @@ use tokio::io::{ /// ## Example 1: Hashing Data /// /// When hashing data, using `SyncIoBridge` can lead to suboptimal performance and might not fully leverage the async capabilities of the system. +/// +/// ### Why It Matters: +/// `SyncIoBridge` allows you to use synchronous I/O operations in an asynchronous context by blocking the current thread. However, this can be inefficient because: +/// - **Blocking:** The use of `SyncIoBridge` may block a valuable async runtime thread, which could otherwise be used to handle more tasks concurrently. +/// - **Thread Pool Saturation:** If many threads are blocked using `SyncIoBridge`, it can exhaust the async runtime's thread pool, leading to increased latency and reduced throughput. +/// - **Lack of Parallelism:** By blocking on synchronous operations, you may miss out on the benefits of running tasks concurrently, especially in I/O-bound operations where async tasks could be interleaved. +/// /// Instead, consider reading the data into memory and then hashing it, or processing the data in chunks. /// -/// ```rust, no_run -/// let mut data = Vec::new(); -/// reader.read_to_end(&mut data).await?; -/// let hash = blake3::hash(&data); +/// ```rust +/// use tokio::io::AsyncReadExt; +/// # mod blake3 { pub fn hash(_: &[u8]) {} } +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// let mut reader = tokio::io::empty(); +/// +/// // Read all data from the reader into a Vec. +/// let mut data = Vec::new(); +/// reader.read_to_end(&mut data).await?; +/// +/// // Hash the data using the blake3 hashing function. +/// let hash = blake3::hash(&data); +/// +/// Ok(()) +/// } /// ``` /// /// Or, for more complex cases: /// -/// ```rust, no_run -/// let mut data = vec![0; 64 * 1024]; -/// loop { -/// let len = reader.read(&mut data).await?; -/// if len == 0 { break; } -/// hasher.update(&data[..len]); +/// ```rust +/// use tokio::io::AsyncReadExt; +/// # struct Hasher; +/// # impl Hasher { pub fn update(&mut self, _: &[u8]) {} pub fn finalize(&self) {} } +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// let mut reader = tokio::io::empty(); +/// let mut hasher = Hasher; +/// +/// // Create a buffer to read data into, sized for performance. +/// let mut data = vec![0; 64 * 1024]; +/// loop { +/// //Read data from the reader into the buffer. +/// let len = reader.read(&mut data).await?; +/// if len == 0 { break; } // Exit loop if no more data. +/// +/// // Update the hash with the data read. +/// hasher.update(&data[..len]); +/// } +/// +/// // Finalize the hash after all data has been processed. +/// let hash = hasher.finalize(); +/// +/// Ok(()) /// } -/// let hash = hasher.finalize(); /// ``` /// /// ## Example 2: Compressing Data /// -/// When compressing data, avoid using `SyncIoBridge`` with non-async compression libraries, as it may lead to inefficient and blocking code. -/// Instead, use `async-compression`, which is designed to work with asynchronous data streams. +/// When compressing data, avoid using `SyncIoBridge` with non-async compression libraries, as it may lead to inefficient and blocking code. +/// Instead, use an async compression library such as the [`async-compression`](https://docs.rs/async-compression/latest/async_compression/) crate, +/// which is designed to work with asynchronous data streams. /// -/// ```rust, no_run +/// ```ignore /// use async_compression::tokio::write::GzipEncoder; +/// use tokio::io::AsyncReadExt; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// let mut reader = tokio::io::empty(); +/// let mut writer = tokio::io::sink(); +/// +/// // Create a Gzip encoder that wraps the writer. +/// let mut encoder = GzipEncoder::new(writer); +/// +/// // Copy data from the reader to the encoder, compressing it. +/// tokio::io::copy(&mut reader, &mut encoder).await?; /// -/// let mut encoder = GzipEncoder::new(writer); -/// tokio::io::copy(&mut reader, &mut encoder).await?; +/// Ok(()) +/// } /// ``` /// /// ## Example 3: Parsing `JSON` /// -/// When parsing `JSON` data, avoid using `SyncIoBridge` with `serde_json::from_reader` as it may cause blocking operations. -/// Instead, read the data into a `Vec` and parse it using `serde_json::from_slice`. +/// When parsing serialization formats such as `JSON`, avoid using `SyncIoBridge` with functions that +/// deserialize data from a type implementing `std::io::Read`, such as `serde_json::from_reader`. +/// +/// ```rust,no_run +/// use tokio::io::AsyncReadExt; +/// # mod serde { +/// # pub trait DeserializeOwned: 'static {} +/// # impl DeserializeOwned for T {} +/// # } +/// # mod serde_json { +/// # use super::serde::DeserializeOwned; +/// # pub fn from_slice(_: &[u8]) -> Result { +/// # unimplemented!() +/// # } +/// # } +/// # #[derive(Debug)] struct MyStruct; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// let mut reader = tokio::io::empty(); /// -/// ```rust, no_run -/// let mut data = Vec::new(); -/// reader.read_to_end(&mut data).await?; -/// let value: MyStruct = serde_json::from_slice(&data)?; +/// // Read all data from the reader into a Vec. +/// let mut data = Vec::new(); +/// reader.read_to_end(&mut data).await?; +/// +/// // Deserialize the data from the Vec into a MyStruct instance. +/// let value: MyStruct = serde_json::from_slice(&data)?; +/// +/// Ok(()) +/// } /// ``` /// /// ## Correct Usage of `SyncIoBridge` inside `spawn_blocking` @@ -63,22 +137,36 @@ use tokio::io::{ /// `SyncIoBridge` is mainly useful when you need to interface with synchronous libraries from an asynchronous context. /// Here is how you can do it correctly: /// -/// ```rust, no_run +/// ```rust /// use tokio::task::spawn_blocking; -/// use tokio::io::AsyncReadExt; /// use tokio_util::io::SyncIoBridge; -/// use std::fs::File; -/// use std::io::Read; -/// -/// let reader = tokio::fs::File::open("data.txt").await?; -/// let sync_reader = SyncIoBridge::new(reader); -/// -/// let result = spawn_blocking(move || { -/// let mut file = File::open("output.txt")?; -/// std::io::copy(&mut sync_reader, &mut file)?; -/// Ok::<_, std::io::Error>(()) -/// }) -/// .await??; +/// +/// #[tokio::main] +/// async fn main() -> Result<(), Box> { +/// +/// let reader = tokio::io::empty(); +/// +/// // Wrap the async reader with SyncIoBridge to allow synchronous reading. +/// let mut sync_reader = SyncIoBridge::new(reader); +/// +/// // Spawn a blocking task to perform synchronous I/O operations. +/// let result = spawn_blocking(move || { +/// // Create an in-memory buffer to hold the copied data. +/// let mut buffer = Vec::new(); +/// +/// // Copy data from the sync_reader to the buffer. +/// std::io::copy(&mut sync_reader, &mut buffer)?; +/// +/// // Return the buffer containing the copied data. +/// Ok::<_, std::io::Error>(buffer) +/// }) +/// .await??; +/// +/// // You can use `result` here as needed. +/// // `result` contains the data read into the buffer. +/// +/// Ok(()) +/// } /// ``` /// #[derive(Debug)]