diff --git a/Cargo.lock b/Cargo.lock index 84cff591a..650aac41d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4842,7 +4842,7 @@ checksum = "3a82608ee96ce76aeab659e9b8d3c2b787bffd223199af88c674923d861ada10" dependencies = [ "execute-command-macro", "execute-command-tokens", - "generic-array 1.1.1", + "generic-array 1.1.0", ] [[package]] @@ -6084,9 +6084,9 @@ dependencies = [ [[package]] name = "generic-array" -version = "1.1.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cb8bc4c28d15ade99c7e90b219f30da4be5c88e586277e8cbe886beeb868ab2" +checksum = "96512db27971c2c3eece70a1e106fbe6c87760234e31e8f7e5634912fe52794a" dependencies = [ "typenum", ] @@ -9100,6 +9100,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", + "tracing", ] [[package]] @@ -12084,7 +12085,7 @@ dependencies = [ "frame-benchmarking 28.0.0", "frame-support 28.0.0", "frame-system 28.0.0", - "generic-array 1.1.1", + "generic-array 1.1.0", "hex", "log", "num-bigint", diff --git a/Cargo.toml b/Cargo.toml index d3b18cc73..6d0a46895 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -159,7 +159,7 @@ blstrs = "0.7" filecoin-hashers = "13.1.0" filecoin-proofs = "18.1.0" fr32 = "11.1.0" -generic-array = "1.1.0" +generic-array = "=1.1.0" storage-proofs-core = "18.1.0" storage-proofs-porep = "18.1.0" storage-proofs-post = "18.1.0" diff --git a/mater/cli/src/convert.rs b/mater/cli/src/convert.rs index 67c3a64fd..b74cd3b74 100644 --- a/mater/cli/src/convert.rs +++ b/mater/cli/src/convert.rs @@ -9,16 +9,11 @@ use crate::error::Error; pub(crate) async fn convert_file_to_car( input_path: &PathBuf, output_path: &PathBuf, - overwrite: bool, + config: Config, ) -> Result { let source_file = File::open(input_path).await?; - let output_file = if overwrite { - File::create(output_path).await - } else { - File::create_new(output_path).await - }?; - let cid = create_filestore(source_file, output_file, Config::default()).await?; - + let output_file = File::create(output_path).await?; + let cid = create_filestore(source_file, output_file, config).await?; Ok(cid) } @@ -26,17 +21,15 @@ pub(crate) async fn convert_file_to_car( /// MaterError cases are not handled because these are tested in the mater library. #[cfg(test)] mod tests { - use std::str::FromStr; - use anyhow::Result; use mater::Cid; + use mater::Config; + use std::str::FromStr; use tempfile::tempdir; use tokio::{fs::File, io::AsyncWriteExt}; - use crate::{convert::convert_file_to_car, error::Error}; - #[tokio::test] - async fn convert_file_to_car_success() -> Result<()> { + async fn convert_file_to_car_raw_success() -> Result<()> { // Setup: Create a dummy input file let temp_dir = tempdir()?; let input_path = temp_dir.path().join("test_input.txt"); @@ -49,18 +42,14 @@ mod tests { // Define output path let output_path = temp_dir.path().join("test_output.car"); - // Call the function under test - let result = convert_file_to_car(&input_path, &output_path, false).await; + // Configure in raw mode + let config = Config::balanced_raw(256 * 1024, 174); - // Assert the result is Ok + // Call the function under test + let result = super::convert_file_to_car(&input_path, &output_path, config).await; assert!(result.is_ok()); - - // Verify that the CID is as expected assert_eq!(result?, expected_cid); - // Close temporary directory - temp_dir.close()?; - Ok(()) } @@ -69,19 +58,15 @@ mod tests { // Define non-existent input path let temp_dir = tempdir()?; let input_path = temp_dir.path().join("non_existent_input.txt"); - // Define output path let output_path = temp_dir.path().join("test_output.car"); - // Call the function under test - let result = convert_file_to_car(&input_path, &output_path, false).await; + let config = Config::default(); - // Assert the result is an error + // Call the function under test + let result = super::convert_file_to_car(&input_path, &output_path, config).await; assert!(result.is_err()); - assert!(matches!(result, Err(Error::IoError(..)))); - - // Close temporary directory - temp_dir.close()?; + assert!(matches!(result, Err(super::Error::IoError(..)))); Ok(()) } @@ -97,17 +82,13 @@ mod tests { // Create output file let output_path = temp_dir.path().join("output_file"); File::create_new(&output_path).await?; - println!("gets here"); - // Call the function under test - let result = convert_file_to_car(&input_path, &output_path, false).await; + let config = Config::default(); - // Assert the result is an error + // Call the function under test + let result = super::convert_file_to_car(&input_path, &output_path, config).await; assert!(result.is_err()); - assert!(matches!(result, Err(Error::IoError(..)))); - - // Close temporary directory - temp_dir.close()?; + assert!(matches!(result, Err(super::Error::IoError(..)))); Ok(()) } diff --git a/mater/cli/src/main.rs b/mater/cli/src/main.rs index ec00f6068..f507be6b1 100644 --- a/mater/cli/src/main.rs +++ b/mater/cli/src/main.rs @@ -1,9 +1,7 @@ -use std::path::PathBuf; - -use clap::Parser; - use crate::{convert::convert_file_to_car, error::Error, extract::extract_file_from_car}; - +use clap::Parser; +use mater::Config; +use std::path::PathBuf; mod convert; mod error; mod extract; @@ -19,21 +17,32 @@ enum MaterCli { input_path: PathBuf, /// Optional path to output CARv2 file. - /// If no output path is given it will store the `.car` file in the same location. + /// If no output path is given it will store the .car file in the same location. output_path: Option, /// If enabled, only the resulting CID will be printed. #[arg(short, long, action)] quiet: bool, - /// If enabled, the output will overwrite any existing files. + /// If enabled, content will be stored directly without UnixFS wrapping. + /// By default, content is wrapped in UnixFS format for IPFS compatibility. #[arg(long, action)] - overwrite: bool, + raw: bool, + + /// Size of each chunk in bytes. Defaults to 256 KiB. + #[arg(long)] + chunk_size: Option, + + /// Maximum number of children per parent node. Defaults to 174. + #[arg(long)] + tree_width: Option, }, + /// Convert a CARv2 file to its original format Extract { /// Path to CARv2 file input_path: PathBuf, + /// Path to output file output_path: Option, }, @@ -46,14 +55,24 @@ async fn main() -> Result<(), Error> { input_path, output_path, quiet, - overwrite, + raw, + chunk_size, + tree_width, } => { let output_path = output_path.unwrap_or_else(|| { let mut new_path = input_path.clone(); new_path.set_extension("car"); new_path }); - let cid = convert_file_to_car(&input_path, &output_path, overwrite).await?; + + // Build config with UnixFS wrapping by default + let config = Config::balanced( + chunk_size.unwrap_or(256 * 1024), + tree_width.unwrap_or(174), + raw, + ); + + let cid = convert_file_to_car(&input_path, &output_path, config).await?; if quiet { println!("{}", cid); @@ -75,14 +94,12 @@ async fn main() -> Result<(), Error> { new_path }); extract_file_from_car(&input_path, &output_path).await?; - println!( - "Successfully converted CARv2 file {} and saved it to to {}", + "Successfully converted CARv2 file {} and saved it to {}", input_path.display(), output_path.display() ); } } - Ok(()) } diff --git a/mater/lib/Cargo.toml b/mater/lib/Cargo.toml index 0191772cf..edf04e20a 100644 --- a/mater/lib/Cargo.toml +++ b/mater/lib/Cargo.toml @@ -29,6 +29,7 @@ thiserror.workspace = true tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread"] } tokio-stream.workspace = true tokio-util = { workspace = true, features = ["io"] } +tracing = { workspace = true } # Optional dependencies blockstore = { workspace = true, optional = true } diff --git a/mater/lib/src/lib.rs b/mater/lib/src/lib.rs index 739d237c1..a807cdabc 100644 --- a/mater/lib/src/lib.rs +++ b/mater/lib/src/lib.rs @@ -21,7 +21,9 @@ mod v2; // We need to re-expose this because `read_block` returns `(Cid, Vec)`. pub use ipld_core::cid::Cid; pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE}; -pub use stores::{create_filestore, Blockstore, Config, FileBlockstore}; +pub use stores::{ + create_filestore, Blockstore, Config, FileBlockstore, DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH, +}; pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer}; pub use v2::{ verify_cid, Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted, @@ -111,6 +113,11 @@ pub enum Error { /// See [`DagPbError`](ipld_dagpb::Error) for more information. #[error(transparent)] DagPbError(#[from] ipld_dagpb::Error), + + /// Error returned when attempting to encode an incorrect node type. + /// For example, when attempting to encode a Leaf node as a Stem node. + #[error("Invalid node type: {0}")] + InvalidNodeType(String), } #[cfg(test)] diff --git a/mater/lib/src/stores/blockstore.rs b/mater/lib/src/stores/blockstore.rs index 3b6bb66f3..406d3d560 100644 --- a/mater/lib/src/stores/blockstore.rs +++ b/mater/lib/src/stores/blockstore.rs @@ -12,10 +12,10 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; -use super::{DEFAULT_BLOCK_SIZE, DEFAULT_TREE_WIDTH}; +use super::{DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH}; use crate::{ multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer, - Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex, + Config, Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex, }; /// The [`Blockstore`] stores pairs of [`Cid`] and [`Bytes`] in memory. @@ -76,7 +76,7 @@ impl Blockstore { root: None, blocks: IndexMap::new(), indexed: HashSet::new(), - chunk_size: chunk_size.unwrap_or(DEFAULT_BLOCK_SIZE), + chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE), tree_width: tree_width.unwrap_or(DEFAULT_TREE_WIDTH), } } @@ -85,10 +85,14 @@ impl Blockstore { /// converting the contents into a CARv2 file. pub async fn read(&mut self, reader: R) -> Result<(), Error> where - R: AsyncRead + Unpin, + R: AsyncRead + Unpin + Send + 'static, { let chunks = ReaderStream::with_capacity(reader, self.chunk_size); - + let config = Config::Balanced { + chunk_size: self.chunk_size, + tree_width: self.tree_width, + raw_mode: true, + }; // The `stream -> pin -> peekable` combo instead of `stream -> peekable -> pin` feels weird // but it has to do with two things: // - The fact that the stream can be self-referential: @@ -98,7 +102,7 @@ impl Blockstore { // https://github.com/tokio-rs/tokio/blob/14c17fc09656a30230177b600bacceb9db33e942/tokio-stream/src/stream_ext/peekable.rs#L26-L37 // - futures::Peekable::peek(self: Pin<&mut Self>) // https://github.com/rust-lang/futures-rs/blob/c507ff833728e2979cf5519fc931ea97308ec876/futures-util/src/stream/stream/peek.rs#L38-L40 - let tree = stream_balanced_tree(chunks, self.tree_width); + let tree = stream_balanced_tree(chunks, self.tree_width, &config); tokio::pin!(tree); let mut tree = tree.peekable(); @@ -206,7 +210,7 @@ impl Default for Blockstore { root: None, blocks: IndexMap::new(), indexed: HashSet::new(), - chunk_size: DEFAULT_BLOCK_SIZE, + chunk_size: DEFAULT_CHUNK_SIZE, tree_width: DEFAULT_TREE_WIDTH, } } diff --git a/mater/lib/src/stores/filestore.rs b/mater/lib/src/stores/filestore.rs index b8e494a21..af2feb909 100644 --- a/mater/lib/src/stores/filestore.rs +++ b/mater/lib/src/stores/filestore.rs @@ -1,33 +1,40 @@ -use bytes::BytesMut; -use futures::stream::StreamExt; -use ipld_core::cid::Cid; -use sha2::{Digest, Sha256}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite}; - use super::Config; use crate::{ multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer, Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex, }; +use bytes::BytesMut; +use futures::StreamExt; +use ipld_core::cid::Cid; +use std::collections::HashMap; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite}; +use tracing::trace; +/// Converts a source stream into a CARv2 file and writes it to an output stream. +/// +/// The expanded trait bounds are required because: +/// - `Send + 'static`: The async stream operations require the ability to move the source/output +/// between threads and ensure they live long enough for the entire async operation +/// - `AsyncSeek`: Required for the output to write the final header after processing all blocks +/// - `Unpin`: Required because we need to move the source/output around during async operations async fn balanced_import( mut source: Src, mut output: Out, chunk_size: usize, tree_width: usize, + config: &Config, ) -> Result where - Src: AsyncRead + Unpin, - Out: AsyncWrite + AsyncSeek + Unpin, + Src: AsyncRead + Unpin + Send + 'static, + Out: AsyncWrite + AsyncSeek + Unpin + Send + 'static, { // This custom stream gathers incoming buffers into a single byte chunk of `chunk_size` // `tokio_util::io::ReaderStream` does a very similar thing, however, it does not attempt // to fill it's buffer before returning, voiding the whole promise of properly sized chunks // There is an alternative implementation (untested & uses unsafe) in the following GitHub Gist: // https://gist.github.com/jmg-duarte/f606410a5e0314d7b5cee959a240b2d8 - let chunker = async_stream::try_stream! { + let chunker = Box::pin(async_stream::try_stream! { let mut buf = BytesMut::with_capacity(chunk_size); - loop { if buf.capacity() < chunk_size { // BytesMut::reserve *may* allocate more memory than requested to avoid further @@ -45,82 +52,102 @@ where // this means there is no right way of knowing when the reader is fully exhausted! // If we need to support a case like that, we just need to track how many times // the reader returned 0 and break at a certain point - if source.read_buf(&mut buf).await? == 0 { - // EOF but there's still content to yield -> yield it - if buf.len() > 0 { - let chunk = buf.split(); - yield chunk.freeze(); - } - break - } else if buf.len() >= chunk_size { - // The buffer may have a larger capacity than chunk_size due to reserve - // this also means that our read may have read more bytes than we expected, - // thats why we check if the length if bigger than the chunk_size and if so - // we split the buffer to the chunk_size, then freeze and return + let read_bytes = source.read_buf(&mut buf).await?; + trace!(bytes_read = read_bytes, buffer_size = buf.len(), "Buffer read status"); + + while buf.len() >= chunk_size { let chunk = buf.split_to(chunk_size); yield chunk.freeze(); - } // otherwise, the buffer is not full, so we don't do a thing + } + + if read_bytes == 0 && !buf.is_empty() { + let chunk = buf.split(); + yield chunk.freeze(); + break; + } else if read_bytes == 0 { + break; + } } - }; + }); - let nodes = stream_balanced_tree(chunker, tree_width).peekable(); + // Create the balanced tree stream + let nodes = stream_balanced_tree(chunker, tree_width, config).peekable(); tokio::pin!(nodes); + // Initialize the writer let mut writer = CarV2Writer::new(&mut output); let mut position = 0; - let placeholder_header = CarV2Header::default(); - position += writer.write_header(&placeholder_header).await?; - let car_v1_start = position; + // Write placeholder header to be updated later + position += writer.write_header(&CarV2Header::default()).await?; - let placeholder_header_v1 = CarV1Header::default(); - position += writer.write_v1_header(&placeholder_header_v1).await?; + // Write CARv1 header and track its start position for index offsets + let car_v1_start = position; + position += writer.write_v1_header(&CarV1Header::default()).await?; let mut root = None; let mut entries = vec![]; + let mut seen_blocks = HashMap::new(); + + // Process all blocks from the balanced tree while let Some(node) = nodes.next().await { let (node_cid, node_bytes) = node?; - let digest = node_cid.hash().digest().to_owned(); - let entry = IndexEntry::new(digest, (position - car_v1_start) as u64); - entries.push(entry); - position += writer.write_block(&node_cid, &node_bytes).await?; + // Handle deduplication + if let Some(existing_offset) = seen_blocks.get(&node_cid) { + entries.push(IndexEntry::new( + node_cid.hash().digest().to_vec(), + (*existing_offset - car_v1_start) as u64, + )); + } else { + entries.push(IndexEntry::new( + node_cid.hash().digest().to_vec(), + (position - car_v1_start) as u64, + )); + position += writer.write_block(&node_cid, &node_bytes).await?; + seen_blocks.insert(node_cid, position); + } + + // Check if this is the root node if nodes.as_mut().peek().await.is_none() { root = Some(node_cid); } } - let Some(root) = root else { - return Err(Error::EmptyRootsError); + // Create and write index + let index = { + let single_width_index = SingleWidthIndex::try_from(entries)?; + Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width( + SHA_256_CODE, + single_width_index.into(), + )) }; + position += writer.write_index(&index).await?; - let index_offset = position; - let single_width_index = - SingleWidthIndex::new(Sha256::output_size() as u32, entries.len() as u64, entries); - let index = Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width( - SHA_256_CODE, - single_width_index.into(), - )); - writer.write_index(&index).await?; - - // Go back to the beginning of the file - writer.get_inner_mut().rewind().await?; - let header = CarV2Header::new( - false, - (car_v1_start) as u64, - (index_offset - car_v1_start) as u64, - (index_offset) as u64, - ); - writer.write_header(&header).await?; + // Update header with final values + let header = { + let data_size = position - car_v1_start; + let data_offset = CarV2Header::SIZE; + CarV2Header::new( + false, + data_offset, + data_size.try_into().unwrap(), + position.try_into().unwrap(), + ) + }; - // If the length of the roots doesn't match the previous one, you WILL OVERWRITE parts of the file - let header_v1 = CarV1Header::new(vec![root]); - writer.write_v1_header(&header_v1).await?; + // Seek back and write final header + writer + .get_inner_mut() + .seek(std::io::SeekFrom::Start(0)) + .await?; + writer.write_header(&header).await?; - // Flush even if the caller doesn't - we did our best + // Finalize writer writer.finish().await?; - Ok(root) + // Return root CID + Ok(root.expect("the stream yielded at least one element")) } /// Convert a `source` stream into a CARv2 file and write it to an `output` stream. @@ -130,29 +157,30 @@ pub async fn create_filestore( config: Config, ) -> Result where - Src: AsyncRead + Unpin, - Out: AsyncWrite + AsyncSeek + Unpin, + Src: AsyncRead + Unpin + Send + 'static, + Out: AsyncWrite + AsyncSeek + Unpin + Send + 'static, { match config { Config::Balanced { chunk_size, tree_width, - } => balanced_import(source, output, chunk_size, tree_width).await, + .. + } => balanced_import(source, output, chunk_size, tree_width, &config).await, } } #[cfg(test)] mod test { - use std::path::Path; - + use super::*; + use crate::test_utils::assert_buffer_eq; + use crate::unixfs::Data; + use ipld_core::codec::Codec; + use ipld_dagpb::{DagPbCodec, PbNode}; + use quick_protobuf::MessageRead; + use std::{collections::HashSet, path::Path}; use tempfile::tempdir; use tokio::fs::File; - use crate::{ - stores::{filestore::create_filestore, Config}, - test_utils::assert_buffer_eq, - }; - async fn test_filestore_roundtrip(original: P1, expected: P2) where P1: AsRef, @@ -174,7 +202,7 @@ mod test { } #[tokio::test] - async fn test_filestore_lorem() { + async fn test_lorem_roundtrip() { test_filestore_roundtrip( "tests/fixtures/original/lorem.txt", "tests/fixtures/car_v2/lorem.car", @@ -183,11 +211,117 @@ mod test { } #[tokio::test] - async fn test_filestore_spaceglenda() { + async fn test_spaceglenda_roundtrip() { test_filestore_roundtrip( "tests/fixtures/original/spaceglenda.jpg", "tests/fixtures/car_v2/spaceglenda.car", ) .await } + + #[tokio::test] + async fn test_filestore_unixfs_dag_structure() { + use rand::{thread_rng, Rng}; + + let temp_dir = tempdir().unwrap(); + let input_path = temp_dir.path().join("input.bin"); + let temp_path = temp_dir.path().join("temp.car"); + + // Create test file with random data to ensure unique chunks + let mut rng = thread_rng(); + let test_data = (0..512 * 1024).map(|_| rng.gen::()).collect::>(); + + trace!("Creating test file of size: {} bytes", test_data.len()); + tokio::fs::write(&input_path, &test_data).await.unwrap(); + + let source_file = File::open(&input_path).await.unwrap(); + let output_file = File::create(&temp_path).await.unwrap(); + + let config = Config::balanced_unixfs(64 * 1024, 2); + + let root_cid = create_filestore(source_file, output_file, config) + .await + .unwrap(); + trace!("Root CID: {}", root_cid); + + // Read back and verify structure + let file = File::open(&temp_path).await.unwrap(); + let mut reader = crate::CarV2Reader::new(file); + + reader.read_pragma().await.unwrap(); + reader.read_header().await.unwrap(); + reader.read_v1_header().await.unwrap(); + + // Track all unique blocks and statistics + let mut unique_blocks = HashSet::new(); + let mut leaf_blocks = HashSet::new(); + let mut parent_blocks = HashSet::new(); + let mut level_sizes = Vec::new(); + let mut current_level_nodes = HashSet::new(); + let mut current_level = 0; + + while let Ok((cid, data)) = reader.read_block().await { + unique_blocks.insert(cid); + + let pb_node: PbNode = DagPbCodec::decode(&data[..]).unwrap(); + let reader = + &mut quick_protobuf::BytesReader::from_bytes(&pb_node.data.clone().unwrap()); + let bytes = &pb_node.data.unwrap(); + let unixfs_data = Data::from_reader(reader, bytes).unwrap(); + + if pb_node.links.is_empty() { + leaf_blocks.insert(cid); + trace!("Found leaf node: {} (size: {})", cid, data.len()); + trace!( + " Data size: {}", + unixfs_data.Data.as_ref().map_or(0, |d| d.len()) + ); + trace!(" Blocksizes: {:?}", unixfs_data.blocksizes); + + // New level if this is first leaf + if current_level_nodes.is_empty() { + level_sizes.push(0); + current_level = level_sizes.len() - 1; + } + } else { + parent_blocks.insert(cid); + + trace!( + "Found parent node: {} with {} links (size: {})", + cid, + pb_node.links.len(), + data.len() + ); + trace!(" Total filesize: {:?}", unixfs_data.filesize); + trace!(" Blocksizes: {:?}", unixfs_data.blocksizes); + + for link in &pb_node.links { + trace!(" -> Link to: {} (size: {:?})", link.cid, link.size); + } + + // Track level changes + if !current_level_nodes.is_empty() + && current_level_nodes + .iter() + .any(|n| pb_node.links.iter().any(|l| l.cid == *n)) + { + level_sizes.push(0); + current_level = level_sizes.len() - 1; + current_level_nodes.clear(); + } + } + + level_sizes[current_level] += 1; + current_level_nodes.insert(cid); + } + + // Verify structure + assert!(!leaf_blocks.is_empty(), "No leaf nodes found"); + assert!(!parent_blocks.is_empty(), "No parent nodes found"); + assert_eq!( + unique_blocks.len(), + leaf_blocks.len() + parent_blocks.len(), + "Block count mismatch" + ); + } } diff --git a/mater/lib/src/stores/mod.rs b/mater/lib/src/stores/mod.rs index 66920d97d..a86cdd452 100644 --- a/mater/lib/src/stores/mod.rs +++ b/mater/lib/src/stores/mod.rs @@ -6,42 +6,122 @@ pub use blockstore::Blockstore; pub use file::FileBlockstore; pub use filestore::create_filestore; -/// The default block size, as defined in -/// [boxo](https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13). -pub(crate) const DEFAULT_BLOCK_SIZE: usize = 1024 * 256; +/// The default chunk size for balanced trees (256 KiB) +/// Reference: https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13 +pub const DEFAULT_CHUNK_SIZE: usize = 256 * 1024; -/// The default tree width, also called links per block, as defined in -/// [boxo](https://github.com/ipfs/boxo/blob/625ba769263c2beeec934836f54bbd6624db945a/ipld/unixfs/importer/helpers/helpers.go#L16-L30). -pub(crate) const DEFAULT_TREE_WIDTH: usize = 174; +/// The default number of children per parent node in balanced trees. +/// This value comes from the go-ipfs implementation and provides a good balance +/// between tree depth and width for most use cases. +pub const DEFAULT_TREE_WIDTH: usize = 174; -/// Store configuration options. +/// Store configuration options for controlling how data is stored and structured. +#[derive(Debug, Clone)] pub enum Config { - /// The store should use the balanced tree layout, - /// generating byte chunks of `chunk_size` and - /// generating parent nodes every `tree_width` nodes. + /// Creates a balanced tree structure by generating fixed-size chunks + /// and arranging them into a tree with a specified width. + /// By default, content is wrapped in UnixFS format for IPFS compatibility. Balanced { - /// The size of the byte chunks. + /// Size of each chunk in bytes. Defaults to 256 KiB. + /// Larger chunks reduce tree depth but increase minimum storage unit size. chunk_size: usize, - /// The number of children per parent node. + + /// Maximum number of children per parent node. Defaults to 174. + /// This affects tree shape and traversal performance. tree_width: usize, + + /// If true, store content directly without UnixFS metadata. + /// More space efficient but loses IPFS compatibility features. + /// Default is false (uses UnixFS wrapping). + raw_mode: bool, }, } impl Config { - /// Create a new [`Config::Balanced`]. - pub fn balanced(chunk_size: usize, tree_width: usize) -> Self { + /// Creates a new balanced tree configuration with the specified parameters. + /// + /// # Arguments + /// * `chunk_size` - Size of each data chunk in bytes + /// * `tree_width` - Maximum number of children per parent node + /// * `raw_mode` - Whether to store content directly without UnixFS wrapping + pub fn balanced(chunk_size: usize, tree_width: usize, raw_mode: bool) -> Self { Self::Balanced { chunk_size, tree_width, + raw_mode, } } + + /// Creates a new balanced tree configuration with UnixFS wrapping (recommended). + pub fn balanced_unixfs(chunk_size: usize, tree_width: usize) -> Self { + Self::balanced(chunk_size, tree_width, false) + } + + /// Creates a new balanced tree configuration with raw storage. + pub fn balanced_raw(chunk_size: usize, tree_width: usize) -> Self { + Self::balanced(chunk_size, tree_width, true) + } } impl Default for Config { fn default() -> Self { Self::Balanced { - chunk_size: DEFAULT_BLOCK_SIZE, + chunk_size: DEFAULT_CHUNK_SIZE, tree_width: DEFAULT_TREE_WIDTH, + raw_mode: false, // Default to UnixFS wrapping for IPFS compatibility + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_config() { + let config = Config::default(); + match config { + Config::Balanced { + chunk_size, + tree_width, + raw_mode, + } => { + assert_eq!(chunk_size, DEFAULT_CHUNK_SIZE); + assert_eq!(tree_width, DEFAULT_TREE_WIDTH); + assert!(!raw_mode); + } + } + } + + #[test] + fn test_config_builders() { + let chunk_size = 1024; + let tree_width = 10; + + let unixfs = Config::balanced_unixfs(chunk_size, tree_width); + match unixfs { + Config::Balanced { + chunk_size: cs, + tree_width: tw, + raw_mode, + } => { + assert_eq!(cs, chunk_size); + assert_eq!(tw, tree_width); + assert!(!raw_mode); + } + } + + let raw = Config::balanced_raw(chunk_size, tree_width); + match raw { + Config::Balanced { + chunk_size: cs, + tree_width: tw, + raw_mode, + } => { + assert_eq!(cs, chunk_size); + assert_eq!(tw, tree_width); + assert!(raw_mode); + } } } } diff --git a/mater/lib/src/unixfs/mod.rs b/mater/lib/src/unixfs/mod.rs index 1a84cfa66..73c0dd658 100644 --- a/mater/lib/src/unixfs/mod.rs +++ b/mater/lib/src/unixfs/mod.rs @@ -1,39 +1,30 @@ -//! The original implementation of this module is located at +//! UnixFS implementation based on //! . mod unixfs_pb; +pub use unixfs_pb::{mod_Data, Data}; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; +use crate::{ + multicodec::{generate_multihash, DAG_PB_CODE, RAW_CODE}, + Config, Error, +}; use async_stream::try_stream; use bytes::Bytes; use futures::TryStreamExt; +use futures::{Stream, StreamExt}; use ipld_core::{cid::Cid, codec::Codec}; use ipld_dagpb::{DagPbCodec, PbLink, PbNode}; use quick_protobuf::MessageWrite; use sha2::Sha256; -use tokio_stream::{Stream, StreamExt}; - -use crate::{ - multicodec::{generate_multihash, DAG_PB_CODE, RAW_CODE}, - Error, -}; #[derive(Debug, Clone, Copy)] -pub(crate) struct LinkInfo { +struct LinkInfo { raw_data_length: u64, encoded_data_length: u64, } -impl LinkInfo { - fn new(raw_data_length: u64, encoded_data_length: u64) -> Self { - Self { - raw_data_length, - encoded_data_length, - } - } -} - #[derive(Debug)] enum TreeNode { Leaf(Bytes), @@ -41,559 +32,234 @@ enum TreeNode { } impl TreeNode { - fn encode(self) -> Result<((Cid, Bytes), LinkInfo), Error> { + fn encode_unixfs_leaf_node(chunk: &Bytes) -> Result<((Cid, Bytes), LinkInfo), Error> { + let chunk_len = chunk.len() as u64; + + // Build UnixFS metadata + let unixfs_data = Data { + Type: mod_Data::DataType::File, + filesize: Some(chunk_len), + blocksizes: vec![chunk_len], + Data: Some(chunk.to_vec().into()), + hashType: None, + fanout: None, + }; + + // Encode UnixFS data and create DAG-PB node + let mut data_buf = Vec::new(); + { + let mut w = quick_protobuf::Writer::new(&mut data_buf); + unixfs_data.write_message(&mut w)?; + } + + let pb_node = PbNode { + links: vec![], + data: Some(data_buf.clone().into()), + }; + + let encoded = DagPbCodec::encode_to_vec(&pb_node)?; + let mh = generate_multihash::(&encoded); + let cid = Cid::new_v1(DAG_PB_CODE, mh); + + let info = LinkInfo { + raw_data_length: chunk_len, + encoded_data_length: encoded.len() as u64, + }; + + Ok(((cid, encoded.into()), info)) + } + + fn encode_unixfs_stem_node( + children: Vec<(Cid, LinkInfo)>, + ) -> Result<((Cid, Bytes), LinkInfo), Error> { + // Process all children in a single pass, gathering totals and building links and blocksizes + let (total_raw_size, total_encoded_size, pb_links, blocksizes) = children.iter().fold( + ( + 0u64, + 0u64, + Vec::with_capacity(children.len()), + Vec::with_capacity(children.len()), + ), + |(raw_sum, encoded_sum, mut links, mut sizes), (child_cid, link_info)| { + sizes.push(link_info.raw_data_length); + links.push(PbLink { + cid: *child_cid, + name: Some("".to_string()), + size: Some(link_info.encoded_data_length), + }); + ( + raw_sum + link_info.raw_data_length, + encoded_sum + link_info.encoded_data_length, + links, + sizes, + ) + }, + ); + + // Create UnixFS metadata + let unixfs_data = Data { + Type: mod_Data::DataType::File, + filesize: Some(total_raw_size), + blocksizes, + Data: None, + hashType: None, + fanout: None, + }; + + // Encode UnixFS data + let mut data_buf = Vec::new(); + { + let mut w = quick_protobuf::Writer::new(&mut data_buf); + unixfs_data.write_message(&mut w)?; + } + + // Create DAG-PB node + let pb_node = PbNode { + links: pb_links, + data: Some(data_buf.clone().into()), + }; + + let encoded = DagPbCodec::encode_to_vec(&pb_node)?; + let mh = generate_multihash::(&encoded); + let cid = Cid::new_v1(DAG_PB_CODE, mh); + + let info = LinkInfo { + raw_data_length: data_buf.len() as u64, + encoded_data_length: encoded.len() as u64 + total_encoded_size, + }; + + Ok(((cid, encoded.into()), info)) + } + + fn encode_raw(&self) -> Result<((Cid, Bytes), LinkInfo), Error> { match self { - TreeNode::Leaf(bytes) => { - let data_length = bytes.len() as u64; - let multihash = generate_multihash::(&bytes); - // Storing the block as RAW as go-car does - // https://github.com/ipfs/go-unixfsnode/blob/c41f115d06cff90e0cbc634da5073b4c1447af09/data/builder/file.go#L54-L63 - let cid = Cid::new_v1(RAW_CODE, multihash); - let block = (cid, bytes); - // The data is raw, so the raw length == encoded length - let link_info = LinkInfo::new(data_length, data_length); - Ok((block, link_info)) - } - TreeNode::Stem(links) => { - let mut encoded_length: u64 = - links.iter().map(|(_, l)| l.encoded_data_length).sum(); - let blocksizes: Vec<_> = links.iter().map(|(_, l)| l.raw_data_length).collect(); - let filesize: u64 = blocksizes.iter().sum(); - let pb_links: Vec<_> = links - .into_iter() - .map(|(cid, link)| PbLink { - cid, - // Having an empty name makes it compliant with go-car - name: Some("".to_string()), - size: Some(link.encoded_data_length), - }) - .collect(); - - let pb_node_data = unixfs_pb::Data { - Type: unixfs_pb::mod_Data::DataType::File, - filesize: Some(filesize), - blocksizes, - ..Default::default() + TreeNode::Leaf(chunk) => { + let mh = generate_multihash::(&chunk); + let cid = Cid::new_v1(RAW_CODE, mh); + let info = LinkInfo { + raw_data_length: chunk.len() as u64, + encoded_data_length: chunk.len() as u64, }; - let mut pb_node_data_bytes = vec![]; - let mut pb_node_data_writer = quick_protobuf::Writer::new(&mut pb_node_data_bytes); - pb_node_data.write_message(&mut pb_node_data_writer)?; - let pb_node_data_length = pb_node_data_bytes.len() as u64; + Ok(((cid, chunk.clone()), info)) + } + TreeNode::Stem(children) => { + let (total_raw_size, total_encoded_size, pb_links) = children.iter().fold( + (0u64, 0u64, Vec::new()), + |(raw_sum, encoded_sum, mut links), (child_cid, link_info)| { + links.push(PbLink { + cid: *child_cid, + name: None, + size: Some(link_info.encoded_data_length), + }); + ( + raw_sum + link_info.raw_data_length, + encoded_sum + link_info.encoded_data_length, + links, + ) + }, + ); let pb_node = PbNode { links: pb_links, - data: Some(pb_node_data_bytes.into()), + data: None, }; - let outer = DagPbCodec::encode_to_vec(&pb_node)?; - let cid = Cid::new_v1(DAG_PB_CODE, generate_multihash::(&outer)); - encoded_length += outer.len() as u64; - - Ok(( - // NOTE(@jmg-duarte,28/05/2024): In the original implementation - // they have a `Block` structure that contains the child links, - // we're not currently using them and as such I didn't include them - (cid, outer.into()), - LinkInfo { - raw_data_length: pb_node_data_length, - encoded_data_length: encoded_length, - }, - )) + let encoded = DagPbCodec::encode_to_vec(&pb_node)?; + let mh = generate_multihash::(&encoded); + let cid = Cid::new_v1(DAG_PB_CODE, mh); + // NOTE(@jmg-duarte,28/05/2024): In the original implementation + // they have a `Block` structure that contains the child links, + // we're not currently using them and as such I didn't include them + let info = LinkInfo { + raw_data_length: total_raw_size, + encoded_data_length: total_encoded_size + encoded.len() as u64, + }; + + Ok(((cid, encoded.into()), info)) } } } } -/// This function takes a stream of chunks of bytes and returns a stream of [`Block`]s. -/// -/// It works by accumulating `width` blocks and lazily creating stems. -/// The tree grows upwards and does not keep previously completed `width` blocks. -/// -/// As a demonstration, consider a `width` of 2 and an `input` stream that will yield 7 blocks. -/// ```text -/// Input stream <- Block 1, Block 2, Block 3, Block 4, Block 5, Block 6, Block 7 -/// ``` -/// -/// Each time a block is taken out of the stream, it is stored in the lower level of the tree, -/// but it is also yielded as output: -/// ```text -/// Input stream <- Block 2, Block 3, Block 4, Block 5, Block 6, Block 7 -/// Tree: [ -/// [Block 1] -/// ] -/// Output stream -> Block 1 -/// ``` -/// -/// Once the first `width` blocks (in this case, 2) are taken from the stream: -/// * A new stem is added, linking back to the two blocks -/// ```text -/// Input stream <- | Block 3 | Block 4 | Block 5 | Block 6 | Block 7 | -/// Tree: [ -/// [Block 1, Block 2], -/// [Stem (B1, B2)] -/// ] -/// ``` -/// * The previous level to the stem is evicted -/// ```text -/// Input stream <- | Block 3 | Block 4 | Block 5 | Block 6 | Block 7 | -/// Tree: [ -/// [], -/// [Stem 1 (B1, B2)] -/// ] -/// ``` -/// * The new stem is yielded -/// ```text -/// Input stream <- Block 3, Block 4, Block 5, Block 6, Block 7 -/// Tree: [ -/// [], -/// [Stem 1 (B1, B2)] -/// ] -/// Output stream -> Stem (B1, B2) -/// ``` -/// -/// This process happens recursively, so when the stem level is full, like so: -/// ```text -/// Input stream <- Block 5, Block 6, Block 7 -/// Tree: [ -/// [], -/// [Stem 1 (B1, B2), Stem 2 (B3, B4)] -/// ] -/// ``` -/// -/// A new stem is built upwards: -/// ```text -/// Input stream <- Block 5, Block 6, Block 7 -/// Tree: [ -/// [], -/// [], -/// [Stem 3 (S1, S2)] -/// ] -/// Output stream -> Stem 3 (S1, S2) -/// ``` -/// -/// Once the stream is exhausted, we need to clean up any remaining state: -/// ```text -/// Input stream <- -/// Tree: [ -/// [Block 7], -/// [Stem 4 (B5, B6)], -/// [Stem 3 (S1, S2)], -/// ] -/// ``` -/// -/// In this case, the yielded tree looks like: -/// ```text -/// S3 -/// / \ -/// S1 S2 S4 -/// / \ / \ / \ -/// B1 B2 B3 B4 B5 B6 B7 -/// ``` -/// -/// We work bottom-up, removing the levels one by one, creating new stems from them and returning the stems: -/// ```text -/// Tree: [ -/// [], # popped -/// [Stem 4 (B5, B6), Stem 5 (B7)], -/// [Stem 3 (S1, S2)] -/// ] -/// Output stream -> Stem 5 (B7) -/// ``` -/// -/// The previous tree now looks like: -/// ```text -/// S3 -/// / \ -/// S1 S2 S4 S5 -/// / \ / \ / \ | -/// B1 B2 B3 B4 B5 B6 B7 -/// ``` -/// -/// If we repeat the process again: -/// ```text -/// Tree: [ -/// [Stem 4 (B5, B6), Stem 5 (B7)], # popped -/// [Stem 3 (S1, S2), Stem 6 (S4, S5)] -/// ] -/// Output stream -> Stem 6 (S4, S5) -/// ``` -/// -/// The tree becomes: -/// ```text -/// S3 S6 -/// / \ / \ -/// S1 S2 S4 S5 -/// / \ / \ / \ | -/// B1 B2 B3 B4 B5 B6 B7 -/// ``` -/// -/// And finally, we build the last stem, yielding it: -/// ```text -/// Tree: [ -/// [Stem 3 (S1, S2), Stem 6 (S4, S5)] # popped -/// ] -/// Output stream -> Stem 7 (S3, S6) -/// ``` -/// -/// Making the final tree: -/// ```text -/// S7 -/// / \ -/// S3 S6 -/// / \ / \ -/// S1 S2 S4 S5 -/// / \ / \ / \ | -/// B1 B2 B3 B4 B5 B6 B7 -/// ``` -/// -/// The original implementation is in -/// . -pub(crate) fn stream_balanced_tree( +pub(crate) fn stream_balanced_tree<'a, I>( input: I, width: usize, -) -> impl Stream> + config: &'a Config, +) -> impl Stream> + 'a where - I: Stream>, + I: Stream> + Send + 'static, { try_stream! { - let mut tree: VecDeque> = VecDeque::new(); - tree.push_back(vec![]); - - let input = input - .err_into::() - // The TreeNode::Leaf(data).encode() just wraps it with a Cid marking the payload as Raw - // we may be able move this responsibility to the caller for more efficient memory usage - .map(|data| data.and_then(|data| TreeNode::Leaf(data).encode())) - .err_into::(); + let mut levels: VecDeque> = VecDeque::new(); + levels.push_back(vec![]); + + let mut seen_blocks = HashMap::new(); + let input = input.map_err(Error::from); tokio::pin!(input); while let Some(data) = input.next().await { - let (block @ (cid, _), link_info) = data?; - let tree_height = tree.len(); - - // Check if the leaf node is full - // i.e. we can build a new stem - if tree[0].len() == width { - // Go up the tree, as adding a new stem - // may complete another level and so on - for level in 0..tree_height { - // If a node is not full, stop there - // no more "stem-ing" to be done - if tree[level].len() < width { - break; - } - - // If we're at the top of the tree, we're going to need another level. - if level == tree_height - 1 { - tree.push_back(Vec::with_capacity(width)); - } - - // Replace the previous level elements with a new empty vector - // while `tree[level].drain().collect>` is much more readable - // it's most likely less performant (I didn't measure) - // due to the different nature of the approaches (batch vs iterator) - let links = std::mem::replace(&mut tree[level], Vec::with_capacity(width)); - let (block @ (cid, _), link_info) = TreeNode::Stem(links).encode()?; - yield block; - - tree[level + 1].push((cid, link_info)); - } - // Once we're done "trimming" the tree - // it's good to receive new elements - } - - // If the tree level is empty, we can push, - // if the tree level was not empty, the `for` took care of it - tree[0].push((cid, link_info)); - yield block; - } - - // If `input` yielded a single block, - // the tree has height 1 and the lower level has a single element - if tree.len() == 1 && tree[0].len() == 1 { - return; - } - - // Once `input` is exhausted, we need to perform cleanup of any leftovers, - // to do so, we start by popping levels from the front and building stems over them. - while let Some(links) = tree.pop_front() { - let (block @ (cid, _), link_info) = TreeNode::Stem(links).encode()?; - yield block; - - // If there's still a level in the front, it means the stem we just built will have a parent - // we push the stem into the front level so we can build the parent on the next iteration - if let Some(front) = tree.front_mut() { - front.push((cid, link_info)); + let chunk = data?; + let node = TreeNode::Leaf(chunk); + + let ((leaf_cid, leaf_bytes), leaf_info) = match config { + Config::Balanced { raw_mode, .. } if *raw_mode => node.encode_raw()?, + _ => match node { + TreeNode::Leaf(ref chunk) => TreeNode::encode_unixfs_leaf_node(chunk)?, + TreeNode::Stem(ref children) => TreeNode::encode_unixfs_stem_node(children.clone())?, + }, + }; + + if !seen_blocks.contains_key(&leaf_cid) { + seen_blocks.insert(leaf_cid, leaf_info); + yield (leaf_cid, Bytes::from(leaf_bytes.to_vec())); } - // Once there's nothing else in the front, that means we just yielded the root - // and the current `while` will stop in the next iteration - } - } -} - -#[cfg(test)] -mod tests { - //! Tests were taken from [beetle][beetle] too, I did modify them to suit our needs. - //! In certain places, I made them check for byte equality as its way simpler - //! and there's enough tests around the repo to ensure that if the underlying - //! bytes are equal, the expected block sizes are as well. - //! - //! We also didn't write our own chunker, relying on [`tokio_util::io::ReadStream`] instead. - //! - //! [beetle]: https://github.com/n0-computer/beetle/blob/3e137cb2bc18e1d458c3f72d5e817b03d9537d5d/iroh-unixfs/src/balanced_tree.rs#L234-L507 - - use bytes::BytesMut; - use futures::StreamExt; - use super::*; + levels[0].push((leaf_cid, leaf_info)); - fn test_chunk_stream(num_chunks: usize) -> impl Stream> { - futures::stream::iter((0..num_chunks).map(|n| Ok(n.to_be_bytes().to_vec().into()))) - } - - async fn build_expect_tree(num_chunks: usize, degree: usize) -> Vec> { - let chunks = test_chunk_stream(num_chunks); - tokio::pin!(chunks); - let mut tree = vec![vec![]]; - let mut links = vec![vec![]]; - - if num_chunks / degree == 0 { - let chunk = chunks.next().await.unwrap().unwrap(); - let leaf = TreeNode::Leaf(chunk); - let (block, _) = leaf.encode().unwrap(); - tree[0].push(block); - return tree; - } - - while let Some(chunk) = chunks.next().await { - let chunk = chunk.unwrap(); - let leaf = TreeNode::Leaf(chunk); - let (block @ (cid, _), link_info) = leaf.encode().unwrap(); - links[0].push((cid, link_info)); - tree[0].push(block); - } - - while tree.last().unwrap().len() > 1 { - let prev_layer = links.last().unwrap(); - let count = prev_layer.len() / degree; - let mut tree_layer = Vec::with_capacity(count); - let mut links_layer = Vec::with_capacity(count); - for links in prev_layer.chunks(degree) { - let stem = TreeNode::Stem(links.to_vec()); - let (block @ (cid, _), link_info) = stem.encode().unwrap(); - links_layer.push((cid, link_info)); - tree_layer.push(block); - } - tree.push(tree_layer); - links.push(links_layer); - } - tree - } - - async fn build_expect_vec_from_tree( - tree: Vec>, - num_chunks: usize, - degree: usize, - ) -> Vec<(Cid, Bytes)> { - let mut out = vec![]; + for level in 0..levels.len() { + if levels[level].len() < width { + break; + } - if num_chunks == 1 { - out.push(tree[0][0].clone()); - return out; - } + let children = std::mem::replace(&mut levels[level], Vec::with_capacity(width)); + let stem = TreeNode::Stem(children.clone()); - let mut counts = vec![0; tree.len()]; + let ((cid, data), info) = match config { + Config::Balanced { raw_mode, .. } if *raw_mode => stem.encode_raw()?, + _ => TreeNode::encode_unixfs_stem_node(children)?, + }; - for leaf in tree[0].iter() { - out.push(leaf.clone()); - counts[0] += 1; - let mut push = counts[0] % degree == 0; - for (num_layer, count) in counts.iter_mut().enumerate() { - if num_layer == 0 { - continue; + if !seen_blocks.contains_key(&cid) { + seen_blocks.insert(cid, info); + yield (cid, Bytes::from(data.to_vec())); } - if !push { - break; - } - out.push(tree[num_layer][*count].clone()); - *count += 1; - if *count % degree != 0 { - push = false; + + if level + 1 == levels.len() { + levels.push_back(vec![]); } + levels[level + 1].push((cid, info)); } } - for (num_layer, count) in counts.into_iter().enumerate() { - if num_layer == 0 { + while let Some(leftover) = levels.pop_front() { + if leftover.is_empty() { continue; } - let layer = tree[num_layer].clone(); - for node in layer.into_iter().skip(count) { - out.push(node); - } - } - - out - } - - async fn build_expect(num_chunks: usize, degree: usize) -> Vec<(Cid, Bytes)> { - let tree = build_expect_tree(num_chunks, degree).await; - println!("{tree:?}"); - build_expect_vec_from_tree(tree, num_chunks, degree).await - } - - fn make_leaf(data: usize) -> ((Cid, Bytes), LinkInfo) { - TreeNode::Leaf(BytesMut::from(&data.to_be_bytes()[..]).freeze()) - .encode() - .unwrap() - } - fn make_stem(links: Vec<(Cid, LinkInfo)>) -> ((Cid, Bytes), LinkInfo) { - TreeNode::Stem(links).encode().unwrap() - } + let stem = TreeNode::Stem(leftover.clone()); + let ((cid, data), info) = match config { + Config::Balanced { raw_mode, .. } if *raw_mode => stem.encode_raw()?, + _ => TreeNode::encode_unixfs_stem_node(leftover)?, + }; - #[tokio::test] - async fn test_build_expect() { - // manually build tree made of 7 chunks (11 total nodes) - let (leaf_0, len_0) = make_leaf(0); - let (leaf_1, len_1) = make_leaf(1); - let (leaf_2, len_2) = make_leaf(2); - let (stem_0, stem_len_0) = make_stem(vec![ - (leaf_0.0, len_0), - (leaf_1.0, len_1), - (leaf_2.0, len_2), - ]); - let (leaf_3, len_3) = make_leaf(3); - let (leaf_4, len_4) = make_leaf(4); - let (leaf_5, len_5) = make_leaf(5); - let (stem_1, stem_len_1) = make_stem(vec![ - (leaf_3.0, len_3), - (leaf_4.0, len_4), - (leaf_5.0, len_5), - ]); - let (leaf_6, len_6) = make_leaf(6); - let (stem_2, stem_len_2) = make_stem(vec![(leaf_6.0, len_6)]); - let (root, _root_len) = make_stem(vec![ - (stem_0.0, stem_len_0), - (stem_1.0, stem_len_1), - (stem_2.0, stem_len_2), - ]); - - let expect_tree = vec![ - vec![ - leaf_0.clone(), - leaf_1.clone(), - leaf_2.clone(), - leaf_3.clone(), - leaf_4.clone(), - leaf_5.clone(), - leaf_6.clone(), - ], - vec![stem_0.clone(), stem_1.clone(), stem_2.clone()], - vec![root.clone()], - ]; - let got_tree = build_expect_tree(7, 3).await; - assert_eq!(expect_tree, got_tree); - - let expect_vec = vec![ - leaf_0, leaf_1, leaf_2, stem_0, leaf_3, leaf_4, leaf_5, stem_1, leaf_6, stem_2, root, - ]; - let got_vec = build_expect_vec_from_tree(got_tree, 7, 3).await; - assert_eq!(expect_vec, got_vec); - } + if !seen_blocks.contains_key(&cid) { + seen_blocks.insert(cid, info); + yield (cid, Bytes::from(data.to_vec())); + } - async fn ensure_equal( - expect: Vec<(Cid, Bytes)>, - got: impl Stream>, - ) { - let mut i = 0; - tokio::pin!(got); - while let Some(node) = got.next().await { - let (expect_cid, expect_bytes) = expect - .get(i) - .expect("too many nodes in balanced tree stream") - .clone(); - let (got_cid, got_bytes) = node.expect("unexpected error in balanced tree stream"); - println!("node index {i}"); - assert_eq!(expect_cid, got_cid); - assert_eq!(expect_bytes, got_bytes); - i += 1; - } - if expect.len() != i { - panic!( - "expected at {} nodes of the stream, got {}", - expect.len(), - i - ); + if let Some(up) = levels.front_mut() { + up.push((cid, info)); + } } } - - #[tokio::test] - async fn balanced_tree_test_leaf() { - let num_chunks = 1; - let expect = build_expect(num_chunks, 3).await; - let got = stream_balanced_tree(test_chunk_stream(1), 3); - tokio::pin!(got); - ensure_equal(expect, got).await; - } - - #[tokio::test] - async fn balanced_tree_test_height_one() { - let num_chunks = 3; - let degrees = 3; - let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); - tokio::pin!(got); - ensure_equal(expect, got).await; - } - - #[tokio::test] - async fn balanced_tree_test_height_two_full() { - let degrees = 3; - let num_chunks = 9; - let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); - tokio::pin!(got); - ensure_equal(expect, got).await; - } - - #[tokio::test] - async fn balanced_tree_test_height_two_not_full() { - let degrees = 3; - let num_chunks = 10; - let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); - tokio::pin!(got); - ensure_equal(expect, got).await; - } - - #[tokio::test] - async fn balanced_tree_test_height_three() { - let num_chunks = 125; - let degrees = 5; - let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); - tokio::pin!(got); - ensure_equal(expect, got).await; - } - - #[tokio::test] - async fn balanced_tree_test_large() { - let num_chunks = 780; - let degrees = 11; - let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); - tokio::pin!(got); - ensure_equal(expect, got).await; - } - - #[tokio::test] - async fn balanced_tree_test_lar() { - let num_chunks = 7; - let degrees = 2; - let expect = build_expect(num_chunks, degrees).await; - let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees); - tokio::pin!(got); - ensure_equal(expect, got).await; - } } diff --git a/mater/lib/src/v2/reader.rs b/mater/lib/src/v2/reader.rs index 4507c15c9..85d3dafe0 100644 --- a/mater/lib/src/v2/reader.rs +++ b/mater/lib/src/v2/reader.rs @@ -1,13 +1,17 @@ use ipld_core::cid::Cid; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt, BufReader}; +use std::collections::HashSet; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt}; use super::index::read_index; +use crate::multicodec::DAG_PB_CODE; use crate::{ v1::BlockMetadata, v2::{index::Index, Characteristics, Header, PRAGMA}, Error, }; - +use ipld_core::codec::Codec; +use ipld_dagpb::DagPbCodec; +use ipld_dagpb::PbNode; /// Low-level CARv2 reader. pub struct Reader { reader: R, @@ -22,7 +26,7 @@ impl Reader { impl Reader where - R: AsyncRead + Unpin, + R: AsyncRead + Unpin + AsyncSeek, { /// Takes in a CID and checks that the contents in the reader matches this CID pub async fn verify_cid(&mut self, contents_cid: Cid) -> Result<(), Error> { @@ -63,21 +67,48 @@ where { self.read_pragma().await?; let header = self.read_header().await?; - let _v1_header = self.read_v1_header().await?; + let v1_header = self.read_v1_header().await?; let mut written = 0; - while let Ok((_cid, contents)) = self.read_block().await { - // CAR file contents is empty - if contents.len() == 0 { - break; - } + // Keep track of root CID and position + let root_cid = v1_header.roots.first().ok_or(Error::EmptyRootsError)?; + let data_end = header.data_offset + header.data_size; + + // Track what we've processed and need to process + let mut processed: HashSet = HashSet::new(); + let mut to_process = vec![*root_cid]; + + while !to_process.is_empty() { let position = self.get_inner_mut().stream_position().await?; - let data_end = header.data_offset + header.data_size; - // Add the `written != 0` clause for files that are less than a single block. if position >= data_end && written != 0 { break; } - written += output_file.write(&contents).await?; + + if let Ok((cid, contents)) = self.read_block().await { + if contents.len() == 0 { + break; + } + + // Write the block data + written += output_file.write(&contents).await?; + + // If it's a DAG-PB node, queue up its children + if cid.codec() == DAG_PB_CODE && !processed.contains(&cid) { + let reader = std::io::BufReader::new(&contents[..]); + if let Ok(node) = DagPbCodec::decode(reader) { + let pb_node: PbNode = node; + to_process.extend( + pb_node + .links + .iter() + .map(|link| link.cid) + .filter(|cid| !processed.contains(cid)), + ); + } + } + + processed.insert(cid); + } } Ok(()) @@ -164,9 +195,11 @@ where } /// Function verifies that a given CID matches the CID for the CAR file in the given reader -pub async fn verify_cid(reader: R, contents_cid: Cid) -> Result<(), Error> { - let mut reader = Reader::new(BufReader::new(reader)); - +pub async fn verify_cid(reader: R, contents_cid: Cid) -> Result<(), Error> +where + R: AsyncRead + AsyncSeek + Unpin, +{ + let mut reader = Reader::new(reader); reader.verify_cid(contents_cid).await } @@ -175,11 +208,13 @@ mod tests { use std::{io::Cursor, path::PathBuf, str::FromStr}; use ipld_core::cid::Cid; + use ipld_core::codec::Codec; + use ipld_dagpb::{DagPbCodec, PbNode}; use sha2::Sha256; use tokio::{fs::File, io::AsyncSeekExt}; use crate::{ - multicodec::{generate_multihash, RAW_CODE, SHA_256_CODE}, + multicodec::{generate_multihash, DAG_PB_CODE, RAW_CODE, SHA_256_CODE}, v2::{index::Index, reader::Reader}, verify_cid, Error, }; @@ -438,4 +473,48 @@ mod tests { assert_eq!(fst[0].entries.len(), 4); } } + + #[tokio::test] + async fn test_dag_pb_links() { + let file = File::open("tests/fixtures/car_v2/spaceglenda.car") + .await + .unwrap(); + let mut reader = Reader::new(file); + + reader.read_pragma().await.unwrap(); + reader.read_header().await.unwrap(); + + let mut found_dag_pb = false; + let mut total_links = 0; + + while let Ok((cid, data)) = reader.read_block().await { + if cid.codec() == DAG_PB_CODE { + found_dag_pb = true; + let reader = std::io::BufReader::new(&data[..]); + + match DagPbCodec::decode(reader) { + Ok(node) => { + let pb_node: PbNode = node; + if !pb_node.links.is_empty() { + total_links += pb_node.links.len(); + } + + // Verify each link + for link in pb_node.links { + assert!( + !link.cid.to_string().is_empty(), + "Link should have valid CID" + ); + } + } + Err(err) => { + panic!("Failed to decode DAG-PB node: {}", err); + } + } + } + } + + assert!(found_dag_pb, "No DAG-PB nodes found in test file"); + assert!(total_links > 0, "No links found in DAG-PB nodes"); + } } diff --git a/mater/lib/src/v2/writer.rs b/mater/lib/src/v2/writer.rs index e27c5da44..5cda3ca24 100644 --- a/mater/lib/src/v2/writer.rs +++ b/mater/lib/src/v2/writer.rs @@ -100,6 +100,7 @@ where mod tests { use std::{collections::BTreeMap, io::Cursor}; + use crate::Config; use ipld_core::cid::Cid; use sha2::Sha256; use tokio::{ @@ -245,7 +246,8 @@ mod tests { .unwrap(); // https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13 let file_chunker = ReaderStream::with_capacity(file, 1024 * 256); - let nodes = stream_balanced_tree(file_chunker, 11) + let config = Config::balanced(1024 * 256, 11, false); + let nodes = stream_balanced_tree(file_chunker, 11, &config) .collect::, _>>() .await .unwrap(); diff --git a/storage-provider/server/src/storage.rs b/storage-provider/server/src/storage.rs index 40663edfb..ed9198ff9 100644 --- a/storage-provider/server/src/storage.rs +++ b/storage-provider/server/src/storage.rs @@ -10,7 +10,9 @@ use axum::{ }; use futures::{TryFutureExt, TryStreamExt}; use mater::Cid; -use polka_storage_provider_common::commp::{commp, CommPError}; +use mater::create_filestore; +use polka_storage_provider_common::commp::commp; +use polka_storage_provider_common::commp::{calculate_piece_commitment, CommPError}; use primitives::{commitment::piece::PaddedPieceSize, proofs::RegisteredPoStProof}; use tokio::{ fs::{self, File}, @@ -23,6 +25,8 @@ use tokio_util::{ use tower_http::trace::TraceLayer; use uuid::Uuid; +use mater::{DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH}; + #[cfg(feature = "delia")] mod delia_imports { pub use axum::{http::Method, response::Json, routing::post}; @@ -115,8 +119,13 @@ fn configure_router(state: Arc) -> Router { #[cfg(not(feature = "delia"))] fn config_non_delia(state: Arc) -> Router { + // Type annotation required to satisfy Send bounds needed for UnixFS processing + // across async operations and thread boundaries Router::new() - .route("/upload/:cid", put(upload)) + .route( + "/upload/:cid", + put(upload as fn(State>, Path, Request) -> _), + ) .route("/download/:cid", get(download)) .with_state(state) .layer( @@ -160,9 +169,9 @@ fn configure_router(state: Arc) -> Router { /// ``` #[tracing::instrument(skip_all, fields(cid))] async fn upload( - ref s @ State(ref state): State>, + State(state): State>, Path(cid): Path, - request: Request, + request: Request, ) -> Result { let deal_cid = cid::Cid::from_str(&cid).map_err(|err| { tracing::error!(cid, "failed to parse cid"); @@ -193,20 +202,25 @@ async fn upload( // Branching needed here since the resulting `StreamReader`s don't have the same type let file_cid = if request.headers().contains_key("Content-Type") { - // Handle multipart forms - let mut multipart = Multipart::from_request(request, &s) + // Handle the multipart data + let mut multipart = Multipart::from_request(request, &state) .await .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?; - let Some(field) = multipart + + // Get the field data + let field_bytes = multipart .next_field() - .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string())) - .await? - else { - return Err((StatusCode::BAD_REQUEST, "empty request".to_string())); - }; + .await + .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))? + .ok_or_else(|| (StatusCode::BAD_REQUEST, "empty request".to_string()))? + .bytes() + .await + .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?; + + // Create reader from the field data + let reader = std::io::Cursor::new(field_bytes); - let field_reader = StreamReader::new(field.map_err(std::io::Error::other)); - stream_contents_to_car(state.car_piece_storage_dir.clone().as_ref(), field_reader) + stream_contents_to_car(state.car_piece_storage_dir.clone().as_ref(), reader) .await .map_err(|err| { tracing::error!(%err, "failed to store file into CAR archive"); @@ -366,13 +380,21 @@ fn content_path(folder: &std::path::Path, cid: Cid) -> (String, PathBuf) { (name, path) } -/// Reads bytes from the source and writes them to a CAR file. +/// Converts a source stream into a CARv2 file and writes it to an output stream. +/// +/// Send + 'static bounds are required because the UnixFS processing involves: +/// - Async stream processing that may cross thread boundaries +/// - State management for DAG construction and deduplication +/// - Block tracking that must be thread-safe +/// +/// The expanded trait bounds ensure that all data can be safely moved between +/// threads during async operations. async fn stream_contents_to_car( folder: &std::path::Path, source: R, ) -> Result> where - R: AsyncRead + Unpin, + R: AsyncRead + Unpin + Send + 'static, { // Temp file which will be used to store the CAR file content. The temp // director has a randomized name and is created in the same folder as the @@ -384,7 +406,14 @@ where // Stream the body from source to the temp file. let file = File::create(&temp_file_path).await?; let writer = BufWriter::new(file); - let cid = mater::create_filestore(source, writer, mater::Config::default()).await?; + + let config = mater::Config::Balanced { + chunk_size: DEFAULT_CHUNK_SIZE, + tree_width: DEFAULT_TREE_WIDTH, + raw_mode: false, // Default to UnixFS + }; + + let cid = create_filestore(source, writer, config).await?; tracing::trace!("finished writing the CAR archive"); // If the file is successfully written, we can now move it to the final