From a5c87b7ab079164923bf51bd462e7d166c7f656a Mon Sep 17 00:00:00 2001 From: Fee Fladder Date: Mon, 30 Sep 2024 21:36:04 +0200 Subject: [PATCH] feat:multithread made necessary modules public for using in mutithreaded envs, implemented ChunkReader and provided example --- Cargo.toml | 15 +- examples/multithread_http.rs | 194 +++++++++++++++++++++++++ src/decoder/async_decoder/mod.rs | 8 +- src/decoder/ifd.rs | 35 ----- src/decoder/image.rs | 16 +- src/decoder/mod.rs | 59 +++++--- src/decoder/multithread_decoder/mod.rs | 68 +++++++++ 7 files changed, 325 insertions(+), 70 deletions(-) create mode 100644 examples/multithread_http.rs create mode 100644 src/decoder/multithread_decoder/mod.rs diff --git a/Cargo.toml b/Cargo.toml index c164a6b..e0c04d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,9 @@ exclude = ["tests/images/*", "tests/fuzz_images/*"] [features] async_decoder = ["dep:futures", "dep:async-trait"] -ehttp = ["dep:ehttp", "async_decoder"] +multithread = ["async_decoder"] +# only for async example reading a COG +ehttp = ["async_decoder", "dep:ehttp"] [dependencies] weezl = "0.1.0" @@ -44,4 +46,13 @@ description = "Example showing use of async features using async http requests" [[example]] name = "async_http" path="examples/async_http.rs" -required-features=["ehttp"] \ No newline at end of file +required-features=["ehttp", "async_decoder"] + +[package.metadata.example.multithread] +name="multithread_http" +description = "example showing multithreaded reading of COG" + +[[example]] +name = "multithread_http" +path="examples/multithread_http.rs" +required-features=["ehttp", "multithread"] \ No newline at end of file diff --git a/examples/multithread_http.rs b/examples/multithread_http.rs new file mode 100644 index 0000000..e295b01 --- /dev/null +++ b/examples/multithread_http.rs @@ -0,0 +1,194 @@ +// Special thanks to Alice for the help: https://users.rust-lang.org/t/63019/6 +use std::io::{Result, SeekFrom}; +use std::pin::Pin; +use std::sync::Arc; +use futures::{ + future::BoxFuture, + io::{AsyncRead, AsyncSeek}, + Future, +}; +use tiff::decoder::Decoder; + +// extern crate ehttp; + +// Arc for sharing, see https://users.rust-lang.org/t/how-to-clone-a-boxed-closure/31035/9 +// or https://stackoverflow.com/a/27883612/14681457 +pub type F = Arc< + dyn Fn(u64, u64) -> BoxFuture<'static, std::io::Result> + Send + Sync, +>; +pub struct RangedStreamer { + pos: u64, + length: u64, // total size + state: State, + range_get: F, + min_request_size: usize, // requests have at least this size +} + +/// This is a fake clone, that doesn't clone the currently pending task, but everything else +impl Clone for RangedStreamer { + fn clone(&self) -> Self { + RangedStreamer { + pos: self.pos, + length: self.length, + state: State::HasChunk(SeekOutput { + start: 0, + data: vec![], + }), + range_get: self.range_get.clone(), + min_request_size: self.min_request_size, + } + } +} + +enum State { + HasChunk(SeekOutput), + Seeking(BoxFuture<'static, std::io::Result>), +} + +#[derive(Debug, Clone)] +pub struct SeekOutput { + pub start: u64, + pub data: Vec, +} + + + +impl RangedStreamer { + pub fn new(length: usize, min_request_size: usize, range_get: F) -> Self { + let length = length as u64; + Self { + pos: 0, + length, + state: State::HasChunk(SeekOutput { + start: 0, + data: vec![], + }), + range_get, + min_request_size, + } + } +} + +// whether `test_interval` is inside `a` (start, length). +fn range_includes(a: (usize, usize), test_interval: (usize, usize)) -> bool { + if test_interval.0 < a.0 { + return false; + } + let test_end = test_interval.0 + test_interval.1; + let a_end = a.0 + a.1; + if test_end > a_end { + return false; + } + true +} + +impl AsyncRead for RangedStreamer { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + let requested_range = (self.pos as usize, buf.len()); + let min_request_size = self.min_request_size; + match &mut self.state { + State::HasChunk(output) => { + let existing_range = (output.start as usize, output.data.len()); + if range_includes(existing_range, requested_range) { + let offset = requested_range.0 - existing_range.0; + buf.copy_from_slice(&output.data[offset..offset + buf.len()]); + self.pos += buf.len() as u64; + std::task::Poll::Ready(Ok(buf.len())) + } else { + let start = requested_range.0 as u64; + let length = std::cmp::max(min_request_size, requested_range.1); + let future = (self.range_get)(start, length.try_into().unwrap()); + self.state = State::Seeking(Box::pin(future)); + self.poll_read(cx, buf) + } + } + State::Seeking(ref mut future) => match Pin::new(future).poll(cx) { + std::task::Poll::Ready(v) => { + match v { + Ok(output) => self.state = State::HasChunk(output), + Err(e) => return std::task::Poll::Ready(Err(e)), + }; + self.poll_read(cx, buf) + } + std::task::Poll::Pending => std::task::Poll::Pending, + }, + } + } +} + +impl AsyncSeek for RangedStreamer { + fn poll_seek( + mut self: std::pin::Pin<&mut Self>, + _: &mut std::task::Context<'_>, + pos: std::io::SeekFrom, + ) -> std::task::Poll> { + match pos { + SeekFrom::Start(pos) => self.pos = pos, + SeekFrom::End(pos) => self.pos = (self.length as i64 + pos) as u64, + SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64, + }; + std::task::Poll::Ready(Ok(self.pos)) + } +} + + + +#[tokio::main] +async fn main() { + let url = "https://isdasoil.s3.amazonaws.com/covariates/dem_30m/dem_30m.tif"; + let Ok(url_head) = ehttp::fetch_async(ehttp::Request::head(url)).await else {println!("EPIC FAIL!"); return;}; + let length = usize::from_str_radix(url_head.headers.get("content-length").unwrap(), 10).expect("Could not parse content length"); + println!("head: {:?}", url_head); + let range_get = Arc::new(move |start: u64, length: u64| { + // let bucket = bucket.clone(); + let url = url; + Box::pin(async move { + println!("requested: {} kb", length / 1024); + let mut request = ehttp::Request::get(url); + request.headers.insert("Range".to_string(), format!("bytes={:?}-{:?}",start,start+length)); + let resp = ehttp::fetch_async(request).await.map_err(|e| std::io::Error::other(e))?; + if !resp.ok { + Err(std::io::Error::other(format!("Received invalid response: {:?}", resp.status))) + } else { + println!("received: {} kb", resp.bytes.len() / 1024); + Ok(SeekOutput {start, data: resp.bytes}) + } + }) as BoxFuture<'static, std::io::Result> + }); + let reader = RangedStreamer::new(length, 30*1024, range_get); + + // this decoder will read all necessary tags + let decoder = Decoder::new_overview_async(reader, 0).await.expect("oh noes!"); + println!("initialized decoder"); + let cloneable_decoder = tiff::decoder::ChunkDecoder::from_decoder(decoder); + + let mut handles = Vec::new(); + for chunk in 42..69 { + let mut cloned_decoder = cloneable_decoder.clone(); + + let handle = tokio::spawn(async move { + + let result = cloned_decoder.read_chunk_async(chunk).await; + match result { + Ok(data) => { + println!("Successfully read chunk {}", chunk); + Ok(data) // Return the data for collection + } + Err(e) => { + eprintln!("Error reading chunk {}: {:?}", chunk, e); + Err(e) // Return the error for handling + } + } + }); + handles.push(handle); + } + + let results = futures::future::join_all(handles).await; + for r in results { + println!("result: {:?}", r.expect("idk").expect("idk²").len()) + } +} diff --git a/src/decoder/async_decoder/mod.rs b/src/decoder/async_decoder/mod.rs index 450e526..6fcfe8e 100644 --- a/src/decoder/async_decoder/mod.rs +++ b/src/decoder/async_decoder/mod.rs @@ -5,7 +5,7 @@ use futures::{ }; use std::collections::{HashMap, HashSet}; -use crate::{ColorType, TiffError, TiffFormatError, TiffResult, TiffUnsupportedError, UsageError}; +use crate::{TiffError, TiffFormatError, TiffResult, TiffUnsupportedError}; // use self::ifd::Directory; // use self::image::Image; @@ -18,7 +18,7 @@ use crate::decoder::{ Decoder, ifd::{Value, Directory}, Image, stream::{ ByteOrder, SmartReader, - }, ChunkType, DecodingBuffer, DecodingResult, Limits, + }, ChunkType, DecodingBuffer, DecodingResult, }; use stream::AsyncEndianReader; @@ -453,7 +453,7 @@ impl Decoder { pub async fn read_chunk_async(&mut self, chunk_index: u32) -> TiffResult { let data_dims = self.image().chunk_data_dimensions(chunk_index)?; - let mut result = self.result_buffer(data_dims.0 as usize, data_dims.1 as usize)?; + let mut result = Self::result_buffer(data_dims.0 as usize, data_dims.1 as usize, self.image(), &self.limits)?; self.read_chunk_to_buffer_async(result.as_buffer(0), chunk_index, data_dims.0 as usize) .await?; @@ -465,7 +465,7 @@ impl Decoder { pub async fn read_image_async(&mut self) -> TiffResult { let width = self.image().width; let height = self.image().height; - let mut result = self.result_buffer(width as usize, height as usize)?; + let mut result = Self::result_buffer(usize::try_from(width)?, usize::try_from(height)?, self.image(), &self.limits )?; if width == 0 || height == 0 { return Ok(result); } diff --git a/src/decoder/ifd.rs b/src/decoder/ifd.rs index 5a7a21b..df6d9a9 100644 --- a/src/decoder/ifd.rs +++ b/src/decoder/ifd.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::io::{self, Read, Seek}; -use std::mem; use std::str; use super::stream::{ByteOrder, EndianReader, SmartReader}; @@ -679,40 +678,6 @@ impl Entry { } Ok(List(v)) } - - #[inline] - fn decode_offset( - &self, - value_count: u64, - bo: ByteOrder, - bigtiff: bool, - limits: &super::Limits, - reader: &mut SmartReader, - decode_fn: F, - ) -> TiffResult - where - R: Read + Seek, - F: Fn(&mut SmartReader) -> TiffResult, - { - let value_count = usize::try_from(value_count)?; - if value_count > limits.decoding_buffer_size / mem::size_of::() { - return Err(TiffError::LimitsExceeded); - } - - let mut v = Vec::with_capacity(value_count); - - let offset = if bigtiff { - self.r(bo).read_u64()? - } else { - self.r(bo).read_u32()?.into() - }; - reader.goto_offset(offset)?; - - for _ in 0..value_count { - v.push(decode_fn(reader)?) - } - Ok(List(v)) - } } /// Extracts a list of BYTE tags stored in an offset diff --git a/src/decoder/image.rs b/src/decoder/image.rs index f22872b..f0604c5 100644 --- a/src/decoder/image.rs +++ b/src/decoder/image.rs @@ -10,14 +10,14 @@ use crate::{ColorType, TiffError, TiffFormatError, TiffResult, TiffUnsupportedEr use std::io::{self, Cursor, Read, Seek}; use std::sync::Arc; -#[derive(Debug)] -pub(crate) struct StripDecodeState { +#[derive(Debug, Clone)] +pub struct StripDecodeState { pub rows_per_strip: u32, } -#[derive(Debug)] +#[derive(Debug, Clone)] /// Computed values useful for tile decoding -pub(crate) struct TileAttributes { +pub struct TileAttributes { pub image_width: usize, pub image_height: usize, @@ -58,8 +58,8 @@ impl TileAttributes { } } -#[derive(Debug)] -pub(crate) struct Image { +#[derive(Debug, Clone)] +pub struct Image { pub ifd: Option, pub width: u32, pub height: u32, @@ -679,7 +679,7 @@ impl Image { } } } else { - for (i, row) in buf + for (_, row) in buf .chunks_mut(output_row_stride) .take(data_dims.1 as usize) .enumerate() @@ -687,8 +687,6 @@ impl Image { let row = &mut row[..data_row_bytes]; reader.read_exact(row)?; - println!("chunk={chunk_index}, index={i}"); - // Skip horizontal padding if chunk_row_bytes > data_row_bytes { let len = u64::try_from(chunk_row_bytes - data_row_bytes)?; diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs index 695824e..6d86c51 100644 --- a/src/decoder/mod.rs +++ b/src/decoder/mod.rs @@ -15,12 +15,16 @@ use crate::tags::{ use self::stream::{ByteOrder, EndianReader, SmartReader}; pub mod ifd; -mod image; -mod stream; +pub mod image; +pub mod stream; mod tag_reader; #[cfg(feature = "async_decoder")] -mod async_decoder; +pub mod async_decoder; +#[cfg(feature = "multithread")] +mod multithread_decoder; +#[cfg(feature = "multithread")] +pub use multithread_decoder::ChunkDecoder; /// Result of a decoding process #[derive(Debug)] @@ -142,6 +146,21 @@ impl DecodingResult { DecodingResult::I64(ref mut buf) => DecodingBuffer::I64(&mut buf[start..]), } } + + pub fn len(&self) -> usize { + match self { + DecodingResult::U8(v) => v.len(), + DecodingResult::U16(v) => v.len(), + DecodingResult::U32(v) => v.len(), + DecodingResult::U64(v) => v.len(), + DecodingResult::F32(v) => v.len(), + DecodingResult::F64(v) => v.len(), + DecodingResult::I8(v) => v.len(), + DecodingResult::I16(v) => v.len(), + DecodingResult::I32(v) => v.len(), + DecodingResult::I64(v) => v.len(), + } + } } // A buffer for image decoding @@ -435,7 +454,7 @@ impl Decoder { self.image().colortype() } - fn image(&self) -> &Image { + pub fn image(&self) -> &Image { &self.image } @@ -491,38 +510,38 @@ impl Decoder { } - fn result_buffer(&self, width: usize, height: usize) -> TiffResult { + fn result_buffer(width: usize, height: usize, image: &Image, limits: &Limits) -> TiffResult { let buffer_size = match width .checked_mul(height) - .and_then(|x| x.checked_mul(self.image().samples_per_pixel())) + .and_then(|x| x.checked_mul(image.samples_per_pixel())) { Some(s) => s, None => return Err(TiffError::LimitsExceeded), }; - let max_sample_bits = self.image().bits_per_sample; - match self.image().sample_format { + let max_sample_bits = image.bits_per_sample; + match image.sample_format { SampleFormat::Uint => match max_sample_bits { - n if n <= 8 => DecodingResult::new_u8(buffer_size, &self.limits), - n if n <= 16 => DecodingResult::new_u16(buffer_size, &self.limits), - n if n <= 32 => DecodingResult::new_u32(buffer_size, &self.limits), - n if n <= 64 => DecodingResult::new_u64(buffer_size, &self.limits), + n if n <= 8 => DecodingResult::new_u8(buffer_size, &limits), + n if n <= 16 => DecodingResult::new_u16(buffer_size, &limits), + n if n <= 32 => DecodingResult::new_u32(buffer_size, &limits), + n if n <= 64 => DecodingResult::new_u64(buffer_size, &limits), n => Err(TiffError::UnsupportedError( TiffUnsupportedError::UnsupportedBitsPerChannel(n), )), }, SampleFormat::IEEEFP => match max_sample_bits { - 32 => DecodingResult::new_f32(buffer_size, &self.limits), - 64 => DecodingResult::new_f64(buffer_size, &self.limits), + 32 => DecodingResult::new_f32(buffer_size, &limits), + 64 => DecodingResult::new_f64(buffer_size, &limits), n => Err(TiffError::UnsupportedError( TiffUnsupportedError::UnsupportedBitsPerChannel(n), )), }, SampleFormat::Int => match max_sample_bits { - n if n <= 8 => DecodingResult::new_i8(buffer_size, &self.limits), - n if n <= 16 => DecodingResult::new_i16(buffer_size, &self.limits), - n if n <= 32 => DecodingResult::new_i32(buffer_size, &self.limits), - n if n <= 64 => DecodingResult::new_i64(buffer_size, &self.limits), + n if n <= 8 => DecodingResult::new_i8(buffer_size, &limits), + n if n <= 16 => DecodingResult::new_i16(buffer_size, &limits), + n if n <= 32 => DecodingResult::new_i32(buffer_size, &limits), + n if n <= 64 => DecodingResult::new_i64(buffer_size, &limits), n => Err(TiffError::UnsupportedError( TiffUnsupportedError::UnsupportedBitsPerChannel(n), )), @@ -1036,7 +1055,7 @@ impl Decoder { pub fn read_chunk(&mut self, chunk_index: u32) -> TiffResult { let data_dims = self.image().chunk_data_dimensions(chunk_index)?; - let mut result = self.result_buffer(data_dims.0 as usize, data_dims.1 as usize)?; + let mut result = Self::result_buffer(data_dims.0 as usize, data_dims.1 as usize, self.image(), &self.limits)?; self.read_chunk_to_buffer(result.as_buffer(0), chunk_index, data_dims.0 as usize)?; @@ -1047,7 +1066,7 @@ impl Decoder { pub fn read_image(&mut self) -> TiffResult { let width = self.image().width; let height = self.image().height; - let mut result = self.result_buffer(width as usize, height as usize)?; + let mut result = Self::result_buffer(width as usize, height as usize, self.image(), &self.limits)?; if width == 0 || height == 0 { return Ok(result); } diff --git a/src/decoder/multithread_decoder/mod.rs b/src/decoder/multithread_decoder/mod.rs new file mode 100644 index 0000000..ae91a60 --- /dev/null +++ b/src/decoder/multithread_decoder/mod.rs @@ -0,0 +1,68 @@ +//! Decoder that can be Cloned, sharing the [`Image`] data between threads +//! Therefore, it holds an `Arc` +//! Loading in the image meatadata should be done using another decoder +//! Also shows how terrificly easy and ergonomic the api for the folks over at geotiff would be :P +use std::sync::Arc; + +use futures::{AsyncRead, AsyncSeek}; + +use crate::decoder::{ + image::Image, + Decoder, Limits, DecodingResult, + stream::SmartReader, + async_decoder::RangeReader, +}; +use crate::TiffResult; +/// Decoder that can be Cloned, sharing the [`Image`] data between threads +#[derive(Clone, Debug)] +pub struct ChunkDecoder { + reader: SmartReader, + // bigtiff: bool, + limits: Limits, + image: Arc, +} + +impl Clone for SmartReader { + fn clone(&self) -> Self { + Self { + reader: self.reader.clone(), + byte_order: self.byte_order, + } + } +} + +impl ChunkDecoder{ + pub fn from_decoder(decoder: Decoder) -> Self { + ChunkDecoder { + reader: decoder.reader.clone(), + // bigtiff: decoder.bigtiff, + limits: decoder.limits.clone(), + image: Arc::new(decoder.image().clone()), + } + } + + /// Get a reference to the image (in read mode) + pub fn image(&self) -> &Image { + // this is really bad + &self.image//.read().expect("Could not obtain lock") + } + + pub async fn read_chunk_async(&mut self, chunk_index: u32) -> TiffResult{ + // read_chunk code + let (width, height) = self.image().chunk_data_dimensions(chunk_index)?; + let mut result = Decoder::::result_buffer(usize::try_from(width)?, usize::try_from(height)?, self.image(), &self.limits)?; + // read_chunk_to_buffer code + let (offset, length) = self.image().chunk_file_range(chunk_index)?; + let v = self.reader.read_range(offset, offset + length).await?; + let output_row_stride = (width as u64).saturating_mul(self.image().samples_per_pixel() as u64).saturating_mul(self.image.bits_per_sample as u64) / 8; + self.image().expand_chunk( + &mut std::io::Cursor::new(v), + result.as_buffer(0).as_bytes_mut(), + output_row_stride.try_into()?, + self.reader.byte_order, + chunk_index, + &self.limits, + )?; + Ok(result) + } +}