Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mater): add unixfs wrapping support #690

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -159,7 +159,7 @@ blstrs = "0.7"
filecoin-hashers = "13.1.0"
filecoin-proofs = "18.1.0"
fr32 = "11.1.0"
generic-array = "1.1.0"
generic-array = "=1.1.0"
storage-proofs-core = "18.1.0"
storage-proofs-porep = "18.1.0"
storage-proofs-post = "18.1.0"
55 changes: 18 additions & 37 deletions mater/cli/src/convert.rs
Original file line number Diff line number Diff line change
@@ -9,34 +9,27 @@ use crate::error::Error;
pub(crate) async fn convert_file_to_car(
input_path: &PathBuf,
output_path: &PathBuf,
overwrite: bool,
config: Config,
) -> Result<Cid, Error> {
let source_file = File::open(input_path).await?;
let output_file = if overwrite {
File::create(output_path).await
} else {
File::create_new(output_path).await
}?;
let cid = create_filestore(source_file, output_file, Config::default()).await?;

let output_file = File::create(output_path).await?;
let cid = create_filestore(source_file, output_file, config).await?;
Ok(cid)
}

/// Tests for file conversion.
/// MaterError cases are not handled because these are tested in the mater library.
#[cfg(test)]
mod tests {
use std::str::FromStr;

use anyhow::Result;
use mater::Cid;
use mater::Config;
use std::str::FromStr;
use tempfile::tempdir;
use tokio::{fs::File, io::AsyncWriteExt};

use crate::{convert::convert_file_to_car, error::Error};

#[tokio::test]
async fn convert_file_to_car_success() -> Result<()> {
async fn convert_file_to_car_raw_success() -> Result<()> {
// Setup: Create a dummy input file
let temp_dir = tempdir()?;
let input_path = temp_dir.path().join("test_input.txt");
@@ -49,18 +42,14 @@ mod tests {
// Define output path
let output_path = temp_dir.path().join("test_output.car");

// Call the function under test
let result = convert_file_to_car(&input_path, &output_path, false).await;
// Configure in raw mode
let config = Config::balanced_raw(256 * 1024, 174);

// Assert the result is Ok
// Call the function under test
let result = super::convert_file_to_car(&input_path, &output_path, config).await;
assert!(result.is_ok());

// Verify that the CID is as expected
assert_eq!(result?, expected_cid);

// Close temporary directory
temp_dir.close()?;

Ok(())
}

@@ -69,19 +58,15 @@ mod tests {
// Define non-existent input path
let temp_dir = tempdir()?;
let input_path = temp_dir.path().join("non_existent_input.txt");

// Define output path
let output_path = temp_dir.path().join("test_output.car");

// Call the function under test
let result = convert_file_to_car(&input_path, &output_path, false).await;
let config = Config::default();

// Assert the result is an error
// Call the function under test
let result = super::convert_file_to_car(&input_path, &output_path, config).await;
assert!(result.is_err());
assert!(matches!(result, Err(Error::IoError(..))));

// Close temporary directory
temp_dir.close()?;
assert!(matches!(result, Err(super::Error::IoError(..))));

Ok(())
}
@@ -97,17 +82,13 @@ mod tests {
// Create output file
let output_path = temp_dir.path().join("output_file");
File::create_new(&output_path).await?;
println!("gets here");

// Call the function under test
let result = convert_file_to_car(&input_path, &output_path, false).await;
let config = Config::default();

// Assert the result is an error
// Call the function under test
let result = super::convert_file_to_car(&input_path, &output_path, config).await;
assert!(result.is_err());
assert!(matches!(result, Err(Error::IoError(..))));

// Close temporary directory
temp_dir.close()?;
assert!(matches!(result, Err(super::Error::IoError(..))));

Ok(())
}
43 changes: 30 additions & 13 deletions mater/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::path::PathBuf;

use clap::Parser;

use crate::{convert::convert_file_to_car, error::Error, extract::extract_file_from_car};

use clap::Parser;
use mater::Config;
use std::path::PathBuf;
mod convert;
mod error;
mod extract;
@@ -19,21 +17,32 @@ enum MaterCli {
input_path: PathBuf,

/// Optional path to output CARv2 file.
/// If no output path is given it will store the `.car` file in the same location.
/// If no output path is given it will store the .car file in the same location.
output_path: Option<PathBuf>,

/// If enabled, only the resulting CID will be printed.
#[arg(short, long, action)]
quiet: bool,

/// If enabled, the output will overwrite any existing files.
/// If enabled, content will be stored directly without UnixFS wrapping.
/// By default, content is wrapped in UnixFS format for IPFS compatibility.
#[arg(long, action)]
overwrite: bool,
raw: bool,

/// Size of each chunk in bytes. Defaults to 256 KiB.
#[arg(long)]
chunk_size: Option<usize>,

/// Maximum number of children per parent node. Defaults to 174.
#[arg(long)]
tree_width: Option<usize>,
},

/// Convert a CARv2 file to its original format
Extract {
/// Path to CARv2 file
input_path: PathBuf,

/// Path to output file
output_path: Option<PathBuf>,
},
@@ -46,14 +55,24 @@ async fn main() -> Result<(), Error> {
input_path,
output_path,
quiet,
overwrite,
raw,
chunk_size,
tree_width,
} => {
let output_path = output_path.unwrap_or_else(|| {
let mut new_path = input_path.clone();
new_path.set_extension("car");
new_path
});
let cid = convert_file_to_car(&input_path, &output_path, overwrite).await?;

// Build config with UnixFS wrapping by default
let config = Config::balanced(
chunk_size.unwrap_or(256 * 1024),
tree_width.unwrap_or(174),
raw,
);

let cid = convert_file_to_car(&input_path, &output_path, config).await?;

if quiet {
println!("{}", cid);
@@ -75,14 +94,12 @@ async fn main() -> Result<(), Error> {
new_path
});
extract_file_from_car(&input_path, &output_path).await?;

println!(
"Successfully converted CARv2 file {} and saved it to to {}",
"Successfully converted CARv2 file {} and saved it to {}",
input_path.display(),
output_path.display()
);
}
}

Ok(())
}
1 change: 1 addition & 0 deletions mater/lib/Cargo.toml
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ thiserror.workspace = true
tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread"] }
tokio-stream.workspace = true
tokio-util = { workspace = true, features = ["io"] }
tracing = { workspace = true }

# Optional dependencies
blockstore = { workspace = true, optional = true }
9 changes: 8 additions & 1 deletion mater/lib/src/lib.rs
Original file line number Diff line number Diff line change
@@ -21,7 +21,9 @@ mod v2;
// We need to re-expose this because `read_block` returns `(Cid, Vec<u8>)`.
pub use ipld_core::cid::Cid;
pub use multicodec::{DAG_PB_CODE, IDENTITY_CODE, RAW_CODE};
pub use stores::{create_filestore, Blockstore, Config, FileBlockstore};
pub use stores::{
create_filestore, Blockstore, Config, FileBlockstore, DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH,
};
pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer};
pub use v2::{
verify_cid, Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted,
@@ -111,6 +113,11 @@ pub enum Error {
/// See [`DagPbError`](ipld_dagpb::Error) for more information.
#[error(transparent)]
DagPbError(#[from] ipld_dagpb::Error),

/// Error returned when attempting to encode an incorrect node type.
/// For example, when attempting to encode a Leaf node as a Stem node.
#[error("Invalid node type: {0}")]
InvalidNodeType(String),
}

#[cfg(test)]
18 changes: 11 additions & 7 deletions mater/lib/src/stores/blockstore.rs
Original file line number Diff line number Diff line change
@@ -12,10 +12,10 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::StreamExt;
use tokio_util::io::ReaderStream;

use super::{DEFAULT_BLOCK_SIZE, DEFAULT_TREE_WIDTH};
use super::{DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH};
use crate::{
multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer,
Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex,
Config, Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex,
};

/// The [`Blockstore`] stores pairs of [`Cid`] and [`Bytes`] in memory.
@@ -76,7 +76,7 @@ impl Blockstore {
root: None,
blocks: IndexMap::new(),
indexed: HashSet::new(),
chunk_size: chunk_size.unwrap_or(DEFAULT_BLOCK_SIZE),
chunk_size: chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
tree_width: tree_width.unwrap_or(DEFAULT_TREE_WIDTH),
}
}
@@ -85,10 +85,14 @@ impl Blockstore {
/// converting the contents into a CARv2 file.
pub async fn read<R>(&mut self, reader: R) -> Result<(), Error>
where
R: AsyncRead + Unpin,
R: AsyncRead + Unpin + Send + 'static,
{
let chunks = ReaderStream::with_capacity(reader, self.chunk_size);

let config = Config::Balanced {
chunk_size: self.chunk_size,
tree_width: self.tree_width,
raw_mode: true,
};
// The `stream -> pin -> peekable` combo instead of `stream -> peekable -> pin` feels weird
// but it has to do with two things:
// - The fact that the stream can be self-referential:
@@ -98,7 +102,7 @@ impl Blockstore {
// https://github.com/tokio-rs/tokio/blob/14c17fc09656a30230177b600bacceb9db33e942/tokio-stream/src/stream_ext/peekable.rs#L26-L37
// - futures::Peekable::peek(self: Pin<&mut Self>)
// https://github.com/rust-lang/futures-rs/blob/c507ff833728e2979cf5519fc931ea97308ec876/futures-util/src/stream/stream/peek.rs#L38-L40
let tree = stream_balanced_tree(chunks, self.tree_width);
let tree = stream_balanced_tree(chunks, self.tree_width, &config);
tokio::pin!(tree);
let mut tree = tree.peekable();

@@ -206,7 +210,7 @@ impl Default for Blockstore {
root: None,
blocks: IndexMap::new(),
indexed: HashSet::new(),
chunk_size: DEFAULT_BLOCK_SIZE,
chunk_size: DEFAULT_CHUNK_SIZE,
tree_width: DEFAULT_TREE_WIDTH,
}
}
276 changes: 205 additions & 71 deletions mater/lib/src/stores/filestore.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,40 @@
use bytes::BytesMut;
use futures::stream::StreamExt;
use ipld_core::cid::Cid;
use sha2::{Digest, Sha256};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite};

use super::Config;
use crate::{
multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer,
Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex,
};
use bytes::BytesMut;
use futures::StreamExt;
use ipld_core::cid::Cid;
use std::collections::HashMap;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite};
use tracing::trace;

/// Converts a source stream into a CARv2 file and writes it to an output stream.
///
/// The expanded trait bounds are required because:
/// - `Send + 'static`: The async stream operations require the ability to move the source/output
/// between threads and ensure they live long enough for the entire async operation
/// - `AsyncSeek`: Required for the output to write the final header after processing all blocks
/// - `Unpin`: Required because we need to move the source/output around during async operations
async fn balanced_import<Src, Out>(
mut source: Src,
mut output: Out,
chunk_size: usize,
tree_width: usize,
config: &Config,
) -> Result<Cid, Error>
where
Src: AsyncRead + Unpin,
Out: AsyncWrite + AsyncSeek + Unpin,
Src: AsyncRead + Unpin + Send + 'static,
Out: AsyncWrite + AsyncSeek + Unpin + Send + 'static,
Comment on lines +28 to +29
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not the most versed in async Rust, could you elaborate why you needed these bounds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added code comments

{
// This custom stream gathers incoming buffers into a single byte chunk of `chunk_size`
// `tokio_util::io::ReaderStream` does a very similar thing, however, it does not attempt
// to fill it's buffer before returning, voiding the whole promise of properly sized chunks
// There is an alternative implementation (untested & uses unsafe) in the following GitHub Gist:
// https://gist.github.com/jmg-duarte/f606410a5e0314d7b5cee959a240b2d8
let chunker = async_stream::try_stream! {
let chunker = Box::pin(async_stream::try_stream! {
let mut buf = BytesMut::with_capacity(chunk_size);

loop {
if buf.capacity() < chunk_size {
// BytesMut::reserve *may* allocate more memory than requested to avoid further
@@ -45,82 +52,102 @@ where
// this means there is no right way of knowing when the reader is fully exhausted!
// If we need to support a case like that, we just need to track how many times
// the reader returned 0 and break at a certain point
if source.read_buf(&mut buf).await? == 0 {
// EOF but there's still content to yield -> yield it
if buf.len() > 0 {
let chunk = buf.split();
yield chunk.freeze();
}
break
} else if buf.len() >= chunk_size {
// The buffer may have a larger capacity than chunk_size due to reserve
// this also means that our read may have read more bytes than we expected,
// thats why we check if the length if bigger than the chunk_size and if so
// we split the buffer to the chunk_size, then freeze and return
let read_bytes = source.read_buf(&mut buf).await?;
trace!(bytes_read = read_bytes, buffer_size = buf.len(), "Buffer read status");

while buf.len() >= chunk_size {
let chunk = buf.split_to(chunk_size);
yield chunk.freeze();
} // otherwise, the buffer is not full, so we don't do a thing
}

if read_bytes == 0 && !buf.is_empty() {
let chunk = buf.split();
yield chunk.freeze();
break;
} else if read_bytes == 0 {
break;
}
}
};
});

let nodes = stream_balanced_tree(chunker, tree_width).peekable();
// Create the balanced tree stream
let nodes = stream_balanced_tree(chunker, tree_width, config).peekable();
tokio::pin!(nodes);

// Initialize the writer
let mut writer = CarV2Writer::new(&mut output);
let mut position = 0;

let placeholder_header = CarV2Header::default();
position += writer.write_header(&placeholder_header).await?;
let car_v1_start = position;
// Write placeholder header to be updated later
position += writer.write_header(&CarV2Header::default()).await?;

let placeholder_header_v1 = CarV1Header::default();
position += writer.write_v1_header(&placeholder_header_v1).await?;
// Write CARv1 header and track its start position for index offsets
let car_v1_start = position;
position += writer.write_v1_header(&CarV1Header::default()).await?;

let mut root = None;
let mut entries = vec![];
let mut seen_blocks = HashMap::new();

// Process all blocks from the balanced tree
while let Some(node) = nodes.next().await {
let (node_cid, node_bytes) = node?;
let digest = node_cid.hash().digest().to_owned();
let entry = IndexEntry::new(digest, (position - car_v1_start) as u64);
entries.push(entry);
position += writer.write_block(&node_cid, &node_bytes).await?;

// Handle deduplication
if let Some(existing_offset) = seen_blocks.get(&node_cid) {
entries.push(IndexEntry::new(
node_cid.hash().digest().to_vec(),
(*existing_offset - car_v1_start) as u64,
));
} else {
entries.push(IndexEntry::new(
node_cid.hash().digest().to_vec(),
(position - car_v1_start) as u64,
));
position += writer.write_block(&node_cid, &node_bytes).await?;
seen_blocks.insert(node_cid, position);
}

// Check if this is the root node
if nodes.as_mut().peek().await.is_none() {
root = Some(node_cid);
}
}

let Some(root) = root else {
return Err(Error::EmptyRootsError);
// Create and write index
let index = {
let single_width_index = SingleWidthIndex::try_from(entries)?;
Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width(
SHA_256_CODE,
single_width_index.into(),
))
};
position += writer.write_index(&index).await?;

let index_offset = position;
let single_width_index =
SingleWidthIndex::new(Sha256::output_size() as u32, entries.len() as u64, entries);
let index = Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width(
SHA_256_CODE,
single_width_index.into(),
));
writer.write_index(&index).await?;

// Go back to the beginning of the file
writer.get_inner_mut().rewind().await?;
let header = CarV2Header::new(
false,
(car_v1_start) as u64,
(index_offset - car_v1_start) as u64,
(index_offset) as u64,
);
writer.write_header(&header).await?;
// Update header with final values
let header = {
let data_size = position - car_v1_start;
let data_offset = CarV2Header::SIZE;
CarV2Header::new(
false,
data_offset,
data_size.try_into().unwrap(),
position.try_into().unwrap(),
)
};

// If the length of the roots doesn't match the previous one, you WILL OVERWRITE parts of the file
let header_v1 = CarV1Header::new(vec![root]);
writer.write_v1_header(&header_v1).await?;
// Seek back and write final header
writer
.get_inner_mut()
.seek(std::io::SeekFrom::Start(0))
.await?;
writer.write_header(&header).await?;

// Flush even if the caller doesn't - we did our best
// Finalize writer
writer.finish().await?;

Ok(root)
// Return root CID
Ok(root.expect("the stream yielded at least one element"))
}

/// Convert a `source` stream into a CARv2 file and write it to an `output` stream.
@@ -130,29 +157,30 @@ pub async fn create_filestore<Src, Out>(
config: Config,
) -> Result<Cid, Error>
where
Src: AsyncRead + Unpin,
Out: AsyncWrite + AsyncSeek + Unpin,
Src: AsyncRead + Unpin + Send + 'static,
Out: AsyncWrite + AsyncSeek + Unpin + Send + 'static,
{
match config {
Config::Balanced {
chunk_size,
tree_width,
} => balanced_import(source, output, chunk_size, tree_width).await,
..
} => balanced_import(source, output, chunk_size, tree_width, &config).await,
}
}

#[cfg(test)]
mod test {
use std::path::Path;

use super::*;
use crate::test_utils::assert_buffer_eq;
use crate::unixfs::Data;
use ipld_core::codec::Codec;
use ipld_dagpb::{DagPbCodec, PbNode};
use quick_protobuf::MessageRead;
use std::{collections::HashSet, path::Path};
use tempfile::tempdir;
use tokio::fs::File;

use crate::{
stores::{filestore::create_filestore, Config},
test_utils::assert_buffer_eq,
};

async fn test_filestore_roundtrip<P1, P2>(original: P1, expected: P2)
where
P1: AsRef<Path>,
@@ -174,7 +202,7 @@ mod test {
}

#[tokio::test]
async fn test_filestore_lorem() {
async fn test_lorem_roundtrip() {
test_filestore_roundtrip(
"tests/fixtures/original/lorem.txt",
"tests/fixtures/car_v2/lorem.car",
@@ -183,11 +211,117 @@ mod test {
}

#[tokio::test]
async fn test_filestore_spaceglenda() {
async fn test_spaceglenda_roundtrip() {
test_filestore_roundtrip(
"tests/fixtures/original/spaceglenda.jpg",
"tests/fixtures/car_v2/spaceglenda.car",
)
.await
}

#[tokio::test]
async fn test_filestore_unixfs_dag_structure() {
use rand::{thread_rng, Rng};

let temp_dir = tempdir().unwrap();
let input_path = temp_dir.path().join("input.bin");
let temp_path = temp_dir.path().join("temp.car");

// Create test file with random data to ensure unique chunks
let mut rng = thread_rng();
let test_data = (0..512 * 1024).map(|_| rng.gen::<u8>()).collect::<Vec<_>>();

trace!("Creating test file of size: {} bytes", test_data.len());
tokio::fs::write(&input_path, &test_data).await.unwrap();

let source_file = File::open(&input_path).await.unwrap();
let output_file = File::create(&temp_path).await.unwrap();

let config = Config::balanced_unixfs(64 * 1024, 2);

let root_cid = create_filestore(source_file, output_file, config)
.await
.unwrap();
trace!("Root CID: {}", root_cid);

// Read back and verify structure
let file = File::open(&temp_path).await.unwrap();
let mut reader = crate::CarV2Reader::new(file);

reader.read_pragma().await.unwrap();
reader.read_header().await.unwrap();
reader.read_v1_header().await.unwrap();

// Track all unique blocks and statistics
let mut unique_blocks = HashSet::new();
let mut leaf_blocks = HashSet::new();
let mut parent_blocks = HashSet::new();
let mut level_sizes = Vec::new();
let mut current_level_nodes = HashSet::new();
let mut current_level = 0;

while let Ok((cid, data)) = reader.read_block().await {
unique_blocks.insert(cid);

let pb_node: PbNode = DagPbCodec::decode(&data[..]).unwrap();
let reader =
&mut quick_protobuf::BytesReader::from_bytes(&pb_node.data.clone().unwrap());
let bytes = &pb_node.data.unwrap();
let unixfs_data = Data::from_reader(reader, bytes).unwrap();

if pb_node.links.is_empty() {
leaf_blocks.insert(cid);
trace!("Found leaf node: {} (size: {})", cid, data.len());
trace!(
" Data size: {}",
unixfs_data.Data.as_ref().map_or(0, |d| d.len())
);
trace!(" Blocksizes: {:?}", unixfs_data.blocksizes);

// New level if this is first leaf
if current_level_nodes.is_empty() {
level_sizes.push(0);
current_level = level_sizes.len() - 1;
}
} else {
parent_blocks.insert(cid);

trace!(
"Found parent node: {} with {} links (size: {})",
cid,
pb_node.links.len(),
data.len()
);
trace!(" Total filesize: {:?}", unixfs_data.filesize);
trace!(" Blocksizes: {:?}", unixfs_data.blocksizes);

for link in &pb_node.links {
trace!(" -> Link to: {} (size: {:?})", link.cid, link.size);
}

// Track level changes
if !current_level_nodes.is_empty()
&& current_level_nodes
.iter()
.any(|n| pb_node.links.iter().any(|l| l.cid == *n))
{
level_sizes.push(0);
current_level = level_sizes.len() - 1;
current_level_nodes.clear();
}
}

level_sizes[current_level] += 1;
current_level_nodes.insert(cid);
}

// Verify structure
assert!(!leaf_blocks.is_empty(), "No leaf nodes found");
assert!(!parent_blocks.is_empty(), "No parent nodes found");
assert_eq!(
unique_blocks.len(),
leaf_blocks.len() + parent_blocks.len(),
"Block count mismatch"
);
}
}
110 changes: 95 additions & 15 deletions mater/lib/src/stores/mod.rs
Original file line number Diff line number Diff line change
@@ -6,42 +6,122 @@ pub use blockstore::Blockstore;
pub use file::FileBlockstore;
pub use filestore::create_filestore;

/// The default block size, as defined in
/// [boxo](https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13).
pub(crate) const DEFAULT_BLOCK_SIZE: usize = 1024 * 256;
/// The default chunk size for balanced trees (256 KiB)
/// Reference: https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13
pub const DEFAULT_CHUNK_SIZE: usize = 256 * 1024;

/// The default tree width, also called links per block, as defined in
/// [boxo](https://github.com/ipfs/boxo/blob/625ba769263c2beeec934836f54bbd6624db945a/ipld/unixfs/importer/helpers/helpers.go#L16-L30).
pub(crate) const DEFAULT_TREE_WIDTH: usize = 174;
/// The default number of children per parent node in balanced trees.
/// This value comes from the go-ipfs implementation and provides a good balance
/// between tree depth and width for most use cases.
pub const DEFAULT_TREE_WIDTH: usize = 174;

/// Store configuration options.
/// Store configuration options for controlling how data is stored and structured.
#[derive(Debug, Clone)]
pub enum Config {
/// The store should use the balanced tree layout,
/// generating byte chunks of `chunk_size` and
/// generating parent nodes every `tree_width` nodes.
/// Creates a balanced tree structure by generating fixed-size chunks
/// and arranging them into a tree with a specified width.
/// By default, content is wrapped in UnixFS format for IPFS compatibility.
Balanced {
/// The size of the byte chunks.
/// Size of each chunk in bytes. Defaults to 256 KiB.
/// Larger chunks reduce tree depth but increase minimum storage unit size.
chunk_size: usize,
/// The number of children per parent node.

/// Maximum number of children per parent node. Defaults to 174.
/// This affects tree shape and traversal performance.
tree_width: usize,

/// If true, store content directly without UnixFS metadata.
/// More space efficient but loses IPFS compatibility features.
/// Default is false (uses UnixFS wrapping).
raw_mode: bool,
},
}

impl Config {
/// Create a new [`Config::Balanced`].
pub fn balanced(chunk_size: usize, tree_width: usize) -> Self {
/// Creates a new balanced tree configuration with the specified parameters.
///
/// # Arguments
/// * `chunk_size` - Size of each data chunk in bytes
/// * `tree_width` - Maximum number of children per parent node
/// * `raw_mode` - Whether to store content directly without UnixFS wrapping
pub fn balanced(chunk_size: usize, tree_width: usize, raw_mode: bool) -> Self {
Self::Balanced {
chunk_size,
tree_width,
raw_mode,
}
}

/// Creates a new balanced tree configuration with UnixFS wrapping (recommended).
pub fn balanced_unixfs(chunk_size: usize, tree_width: usize) -> Self {
Self::balanced(chunk_size, tree_width, false)
}

/// Creates a new balanced tree configuration with raw storage.
pub fn balanced_raw(chunk_size: usize, tree_width: usize) -> Self {
Self::balanced(chunk_size, tree_width, true)
}
}

impl Default for Config {
fn default() -> Self {
Self::Balanced {
chunk_size: DEFAULT_BLOCK_SIZE,
chunk_size: DEFAULT_CHUNK_SIZE,
tree_width: DEFAULT_TREE_WIDTH,
raw_mode: false, // Default to UnixFS wrapping for IPFS compatibility
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_default_config() {
let config = Config::default();
match config {
Config::Balanced {
chunk_size,
tree_width,
raw_mode,
} => {
assert_eq!(chunk_size, DEFAULT_CHUNK_SIZE);
assert_eq!(tree_width, DEFAULT_TREE_WIDTH);
assert!(!raw_mode);
}
}
}

#[test]
fn test_config_builders() {
let chunk_size = 1024;
let tree_width = 10;

let unixfs = Config::balanced_unixfs(chunk_size, tree_width);
match unixfs {
Config::Balanced {
chunk_size: cs,
tree_width: tw,
raw_mode,
} => {
assert_eq!(cs, chunk_size);
assert_eq!(tw, tree_width);
assert!(!raw_mode);
}
}

let raw = Config::balanced_raw(chunk_size, tree_width);
match raw {
Config::Balanced {
chunk_size: cs,
tree_width: tw,
raw_mode,
} => {
assert_eq!(cs, chunk_size);
assert_eq!(tw, tree_width);
assert!(raw_mode);
}
}
}
}
736 changes: 201 additions & 535 deletions mater/lib/src/unixfs/mod.rs

Large diffs are not rendered by default.

111 changes: 95 additions & 16 deletions mater/lib/src/v2/reader.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use ipld_core::cid::Cid;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt, BufReader};
use std::collections::HashSet;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt};

use super::index::read_index;
use crate::multicodec::DAG_PB_CODE;
use crate::{
v1::BlockMetadata,
v2::{index::Index, Characteristics, Header, PRAGMA},
Error,
};

use ipld_core::codec::Codec;
use ipld_dagpb::DagPbCodec;
use ipld_dagpb::PbNode;
/// Low-level CARv2 reader.
pub struct Reader<R> {
reader: R,
@@ -22,7 +26,7 @@ impl<R> Reader<R> {

impl<R> Reader<R>
where
R: AsyncRead + Unpin,
R: AsyncRead + Unpin + AsyncSeek,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is Seek needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required for DAG traversal in extract_content to follow block links, me thinks

{
/// Takes in a CID and checks that the contents in the reader matches this CID
pub async fn verify_cid(&mut self, contents_cid: Cid) -> Result<(), Error> {
@@ -63,21 +67,48 @@ where
{
self.read_pragma().await?;
let header = self.read_header().await?;
let _v1_header = self.read_v1_header().await?;
let v1_header = self.read_v1_header().await?;
let mut written = 0;

while let Ok((_cid, contents)) = self.read_block().await {
// CAR file contents is empty
if contents.len() == 0 {
break;
}
// Keep track of root CID and position
let root_cid = v1_header.roots.first().ok_or(Error::EmptyRootsError)?;
let data_end = header.data_offset + header.data_size;

// Track what we've processed and need to process
let mut processed: HashSet<Cid> = HashSet::new();
let mut to_process = vec![*root_cid];

while !to_process.is_empty() {
let position = self.get_inner_mut().stream_position().await?;
let data_end = header.data_offset + header.data_size;
// Add the `written != 0` clause for files that are less than a single block.
if position >= data_end && written != 0 {
break;
}
written += output_file.write(&contents).await?;

if let Ok((cid, contents)) = self.read_block().await {
if contents.len() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember when implementing this that this break condition cause the output file to miss some data. Did you check that the output file contains all the data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed with proper DAG traversal. Tested with byte-by-byte comparisons in test_filestore_roundtrip.

break;
}

// Write the block data
written += output_file.write(&contents).await?;

// If it's a DAG-PB node, queue up its children
if cid.codec() == DAG_PB_CODE && !processed.contains(&cid) {
let reader = std::io::BufReader::new(&contents[..]);
if let Ok(node) = DagPbCodec::decode(reader) {
let pb_node: PbNode = node;
to_process.extend(
pb_node
.links
.iter()
.map(|link| link.cid)
.filter(|cid| !processed.contains(cid)),
);
}
}

processed.insert(cid);
}
}

Ok(())
@@ -164,9 +195,11 @@ where
}

/// Function verifies that a given CID matches the CID for the CAR file in the given reader
pub async fn verify_cid<R: AsyncRead + Unpin>(reader: R, contents_cid: Cid) -> Result<(), Error> {
let mut reader = Reader::new(BufReader::new(reader));

pub async fn verify_cid<R>(reader: R, contents_cid: Cid) -> Result<(), Error>
where
R: AsyncRead + AsyncSeek + Unpin,
{
let mut reader = Reader::new(reader);
reader.verify_cid(contents_cid).await
}

@@ -175,11 +208,13 @@ mod tests {
use std::{io::Cursor, path::PathBuf, str::FromStr};

use ipld_core::cid::Cid;
use ipld_core::codec::Codec;
use ipld_dagpb::{DagPbCodec, PbNode};
use sha2::Sha256;
use tokio::{fs::File, io::AsyncSeekExt};

use crate::{
multicodec::{generate_multihash, RAW_CODE, SHA_256_CODE},
multicodec::{generate_multihash, DAG_PB_CODE, RAW_CODE, SHA_256_CODE},
v2::{index::Index, reader::Reader},
verify_cid, Error,
};
@@ -438,4 +473,48 @@ mod tests {
assert_eq!(fst[0].entries.len(), 4);
}
}

#[tokio::test]
async fn test_dag_pb_links() {
let file = File::open("tests/fixtures/car_v2/spaceglenda.car")
.await
.unwrap();
let mut reader = Reader::new(file);

reader.read_pragma().await.unwrap();
reader.read_header().await.unwrap();

let mut found_dag_pb = false;
let mut total_links = 0;

while let Ok((cid, data)) = reader.read_block().await {
if cid.codec() == DAG_PB_CODE {
found_dag_pb = true;
let reader = std::io::BufReader::new(&data[..]);

match DagPbCodec::decode(reader) {
Ok(node) => {
let pb_node: PbNode = node;
if !pb_node.links.is_empty() {
total_links += pb_node.links.len();
}

// Verify each link
for link in pb_node.links {
assert!(
!link.cid.to_string().is_empty(),
"Link should have valid CID"
);
}
}
Err(err) => {
panic!("Failed to decode DAG-PB node: {}", err);
}
}
}
}

assert!(found_dag_pb, "No DAG-PB nodes found in test file");
assert!(total_links > 0, "No links found in DAG-PB nodes");
}
}
4 changes: 3 additions & 1 deletion mater/lib/src/v2/writer.rs
Original file line number Diff line number Diff line change
@@ -100,6 +100,7 @@ where
mod tests {
use std::{collections::BTreeMap, io::Cursor};

use crate::Config;
use ipld_core::cid::Cid;
use sha2::Sha256;
use tokio::{
@@ -245,7 +246,8 @@ mod tests {
.unwrap();
// https://github.com/ipfs/boxo/blob/f4fe8997dcbeb39b3a4842d8f08b34739bfd84a4/chunker/parse.go#L13
let file_chunker = ReaderStream::with_capacity(file, 1024 * 256);
let nodes = stream_balanced_tree(file_chunker, 11)
let config = Config::balanced(1024 * 256, 11, false);
let nodes = stream_balanced_tree(file_chunker, 11, &config)
.collect::<Result<Vec<_>, _>>()
.await
.unwrap();
63 changes: 46 additions & 17 deletions storage-provider/server/src/storage.rs
Original file line number Diff line number Diff line change
@@ -10,7 +10,9 @@ use axum::{
};
use futures::{TryFutureExt, TryStreamExt};
use mater::Cid;
use polka_storage_provider_common::commp::{commp, CommPError};
use mater::create_filestore;
use polka_storage_provider_common::commp::commp;
use polka_storage_provider_common::commp::{calculate_piece_commitment, CommPError};
use primitives::{commitment::piece::PaddedPieceSize, proofs::RegisteredPoStProof};
use tokio::{
fs::{self, File},
@@ -23,6 +25,8 @@ use tokio_util::{
use tower_http::trace::TraceLayer;
use uuid::Uuid;

use mater::{DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH};

#[cfg(feature = "delia")]
mod delia_imports {
pub use axum::{http::Method, response::Json, routing::post};
@@ -115,8 +119,13 @@ fn configure_router(state: Arc<StorageServerState>) -> Router {

#[cfg(not(feature = "delia"))]
fn config_non_delia(state: Arc<StorageServerState>) -> Router {
// Type annotation required to satisfy Send bounds needed for UnixFS processing
// across async operations and thread boundaries
Router::new()
.route("/upload/:cid", put(upload))
.route(
"/upload/:cid",
put(upload as fn(State<Arc<StorageServerState>>, Path<String>, Request<Body>) -> _),
)
.route("/download/:cid", get(download))
.with_state(state)
.layer(
@@ -160,9 +169,9 @@ fn configure_router(state: Arc<StorageServerState>) -> Router {
/// ```
#[tracing::instrument(skip_all, fields(cid))]
async fn upload(
ref s @ State(ref state): State<Arc<StorageServerState>>,
State(state): State<Arc<StorageServerState>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems arbitrary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simpler, more idiomatic pattern for handling Axum state, the old way was over-complicated imo

Path(cid): Path<String>,
request: Request,
request: Request<Body>,
) -> Result<String, (StatusCode, String)> {
let deal_cid = cid::Cid::from_str(&cid).map_err(|err| {
tracing::error!(cid, "failed to parse cid");
@@ -193,20 +202,25 @@ async fn upload(

// Branching needed here since the resulting `StreamReader`s don't have the same type
let file_cid = if request.headers().contains_key("Content-Type") {
// Handle multipart forms
let mut multipart = Multipart::from_request(request, &s)
// Handle the multipart data
let mut multipart = Multipart::from_request(request, &state)
.await
.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
let Some(field) = multipart

// Get the field data
let field_bytes = multipart
.next_field()
.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))
.await?
else {
return Err((StatusCode::BAD_REQUEST, "empty request".to_string()));
};
.await
.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?
.ok_or_else(|| (StatusCode::BAD_REQUEST, "empty request".to_string()))?
.bytes()
.await
.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;

// Create reader from the field data
let reader = std::io::Cursor::new(field_bytes);

let field_reader = StreamReader::new(field.map_err(std::io::Error::other));
stream_contents_to_car(state.car_piece_storage_dir.clone().as_ref(), field_reader)
stream_contents_to_car(state.car_piece_storage_dir.clone().as_ref(), reader)
.await
.map_err(|err| {
tracing::error!(%err, "failed to store file into CAR archive");
@@ -366,13 +380,21 @@ fn content_path(folder: &std::path::Path, cid: Cid) -> (String, PathBuf) {
(name, path)
}

/// Reads bytes from the source and writes them to a CAR file.
/// Converts a source stream into a CARv2 file and writes it to an output stream.
///
/// Send + 'static bounds are required because the UnixFS processing involves:
/// - Async stream processing that may cross thread boundaries
/// - State management for DAG construction and deduplication
/// - Block tracking that must be thread-safe
///
/// The expanded trait bounds ensure that all data can be safely moved between
/// threads during async operations.
async fn stream_contents_to_car<R>(
folder: &std::path::Path,
source: R,
) -> Result<Cid, Box<dyn std::error::Error>>
where
R: AsyncRead + Unpin,
R: AsyncRead + Unpin + Send + 'static,
{
// Temp file which will be used to store the CAR file content. The temp
// director has a randomized name and is created in the same folder as the
@@ -384,7 +406,14 @@ where
// Stream the body from source to the temp file.
let file = File::create(&temp_file_path).await?;
let writer = BufWriter::new(file);
let cid = mater::create_filestore(source, writer, mater::Config::default()).await?;

let config = mater::Config::Balanced {
chunk_size: DEFAULT_CHUNK_SIZE,
tree_width: DEFAULT_TREE_WIDTH,
raw_mode: false, // Default to UnixFS
};

let cid = create_filestore(source, writer, config).await?;
tracing::trace!("finished writing the CAR archive");

// If the file is successfully written, we can now move it to the final