Skip to content

Commit

Permalink
Add documentation for SyncIoBridge with examples and alternatives
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathy-bajo committed Sep 4, 2024
1 parent 881d3fa commit f3edf64
Showing 1 changed file with 124 additions and 36 deletions.
160 changes: 124 additions & 36 deletions tokio-util/src/io/sync_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,70 +15,158 @@ use tokio::io::{
/// ## Example 1: Hashing Data
///
/// When hashing data, using `SyncIoBridge` can lead to suboptimal performance and might not fully leverage the async capabilities of the system.
///
/// ### Why It Matters:
/// `SyncIoBridge` allows you to use synchronous I/O operations in an asynchronous context by blocking the current thread. However, this can be inefficient because:
/// - **Blocking:** The use of `SyncIoBridge` may block a valuable async runtime thread, which could otherwise be used to handle more tasks concurrently.
/// - **Thread Pool Saturation:** If many threads are blocked using `SyncIoBridge`, it can exhaust the async runtime's thread pool, leading to increased latency and reduced throughput.
/// - **Lack of Parallelism:** By blocking on synchronous operations, you may miss out on the benefits of running tasks concurrently, especially in I/O-bound operations where async tasks could be interleaved.
///
/// Instead, consider reading the data into memory and then hashing it, or processing the data in chunks.
///
/// ```rust, no_run
/// let mut data = Vec::new();
/// reader.read_to_end(&mut data).await?;
/// let hash = blake3::hash(&data);
/// ```rust
/// use tokio::io::AsyncReadExt;
/// # mod blake3 { pub fn hash(_: &[u8]) {} }
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut reader = tokio::io::empty();
///
/// // Read all data from the reader into a Vec<u8>.
/// let mut data = Vec::new();
/// reader.read_to_end(&mut data).await?;
///
/// // Hash the data using the blake3 hashing function.
/// let hash = blake3::hash(&data);
///
/// Ok(())
/// }
/// ```
///
/// Or, for more complex cases:
///
/// ```rust, no_run
/// let mut data = vec![0; 64 * 1024];
/// loop {
/// let len = reader.read(&mut data).await?;
/// if len == 0 { break; }
/// hasher.update(&data[..len]);
/// ```rust
/// use tokio::io::AsyncReadExt;
/// # struct Hasher;
/// # impl Hasher { pub fn update(&mut self, _: &[u8]) {} pub fn finalize(&self) {} }
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut reader = tokio::io::empty();
/// let mut hasher = Hasher;
///
/// // Create a buffer to read data into, sized for performance.
/// let mut data = vec![0; 64 * 1024];
/// loop {
/// //Read data from the reader into the buffer.
/// let len = reader.read(&mut data).await?;
/// if len == 0 { break; } // Exit loop if no more data.
///
/// // Update the hash with the data read.
/// hasher.update(&data[..len]);
/// }
///
/// // Finalize the hash after all data has been processed.
/// let hash = hasher.finalize();
///
/// Ok(())
/// }
/// let hash = hasher.finalize();
/// ```
///
/// ## Example 2: Compressing Data
///
/// When compressing data, avoid using `SyncIoBridge`` with non-async compression libraries, as it may lead to inefficient and blocking code.
/// Instead, use `async-compression`, which is designed to work with asynchronous data streams.
/// When compressing data, avoid using `SyncIoBridge` with non-async compression libraries, as it may lead to inefficient and blocking code.
/// Instead, use an async compression library such as the [`async-compression`](https://docs.rs/async-compression/latest/async_compression/) crate,
/// which is designed to work with asynchronous data streams.
///
/// ```rust, no_run
/// ```ignore
/// use async_compression::tokio::write::GzipEncoder;
/// use tokio::io::AsyncReadExt;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut reader = tokio::io::empty();
/// let mut writer = tokio::io::sink();
///
/// // Create a Gzip encoder that wraps the writer.
/// let mut encoder = GzipEncoder::new(writer);
///
/// // Copy data from the reader to the encoder, compressing it.
/// tokio::io::copy(&mut reader, &mut encoder).await?;
///
/// let mut encoder = GzipEncoder::new(writer);
/// tokio::io::copy(&mut reader, &mut encoder).await?;
/// Ok(())
/// }
/// ```
///
/// ## Example 3: Parsing `JSON`
///
/// When parsing `JSON` data, avoid using `SyncIoBridge` with `serde_json::from_reader` as it may cause blocking operations.
/// Instead, read the data into a `Vec<u8>` and parse it using `serde_json::from_slice`.
/// When parsing serialization formats such as `JSON`, avoid using `SyncIoBridge` with functions that
/// deserialize data from a type implementing `std::io::Read`, such as `serde_json::from_reader`.
///
/// ```rust,no_run
/// use tokio::io::AsyncReadExt;
/// # mod serde {
/// # pub trait DeserializeOwned: 'static {}
/// # impl<T: 'static> DeserializeOwned for T {}
/// # }
/// # mod serde_json {
/// # use super::serde::DeserializeOwned;
/// # pub fn from_slice<T: DeserializeOwned>(_: &[u8]) -> Result<T, std::io::Error> {
/// # unimplemented!()
/// # }
/// # }
/// # #[derive(Debug)] struct MyStruct;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut reader = tokio::io::empty();
///
/// ```rust, no_run
/// let mut data = Vec::new();
/// reader.read_to_end(&mut data).await?;
/// let value: MyStruct = serde_json::from_slice(&data)?;
/// // Read all data from the reader into a Vec<u8>.
/// let mut data = Vec::new();
/// reader.read_to_end(&mut data).await?;
///
/// // Deserialize the data from the Vec<u8> into a MyStruct instance.
/// let value: MyStruct = serde_json::from_slice(&data)?;
///
/// Ok(())
/// }
/// ```
///
/// ## Correct Usage of `SyncIoBridge` inside `spawn_blocking`
///
/// `SyncIoBridge` is mainly useful when you need to interface with synchronous libraries from an asynchronous context.
/// Here is how you can do it correctly:
///
/// ```rust, no_run
/// ```rust
/// use tokio::task::spawn_blocking;
/// use tokio::io::AsyncReadExt;
/// use tokio_util::io::SyncIoBridge;
/// use std::fs::File;
/// use std::io::Read;
///
/// let reader = tokio::fs::File::open("data.txt").await?;
/// let sync_reader = SyncIoBridge::new(reader);
///
/// let result = spawn_blocking(move || {
/// let mut file = File::open("output.txt")?;
/// std::io::copy(&mut sync_reader, &mut file)?;
/// Ok::<_, std::io::Error>(())
/// })
/// .await??;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
///
/// let reader = tokio::io::empty();
///
/// // Wrap the async reader with SyncIoBridge to allow synchronous reading.
/// let mut sync_reader = SyncIoBridge::new(reader);
///
/// // Spawn a blocking task to perform synchronous I/O operations.
/// let result = spawn_blocking(move || {
/// // Create an in-memory buffer to hold the copied data.
/// let mut buffer = Vec::new();
///
/// // Copy data from the sync_reader to the buffer.
/// std::io::copy(&mut sync_reader, &mut buffer)?;
///
/// // Return the buffer containing the copied data.
/// Ok::<_, std::io::Error>(buffer)
/// })
/// .await??;
///
/// // You can use `result` here as needed.
/// // `result` contains the data read into the buffer.
///
/// Ok(())
/// }
/// ```
///
#[derive(Debug)]
Expand Down

0 comments on commit f3edf64

Please sign in to comment.