-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(mater): add unixfs wrapping support #690
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,17 @@ | ||
use ipld_core::cid::Cid; | ||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt, BufReader}; | ||
use std::collections::HashSet; | ||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt}; | ||
|
||
use super::index::read_index; | ||
use crate::multicodec::DAG_PB_CODE; | ||
use crate::{ | ||
v1::BlockMetadata, | ||
v2::{index::Index, Characteristics, Header, PRAGMA}, | ||
Error, | ||
}; | ||
|
||
use ipld_core::codec::Codec; | ||
use ipld_dagpb::DagPbCodec; | ||
use ipld_dagpb::PbNode; | ||
/// Low-level CARv2 reader. | ||
pub struct Reader<R> { | ||
reader: R, | ||
|
@@ -22,7 +26,7 @@ impl<R> Reader<R> { | |
|
||
impl<R> Reader<R> | ||
where | ||
R: AsyncRead + Unpin, | ||
R: AsyncRead + Unpin + AsyncSeek, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is Seek needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Required for DAG traversal in extract_content to follow block links, me thinks |
||
{ | ||
/// Takes in a CID and checks that the contents in the reader matches this CID | ||
pub async fn verify_cid(&mut self, contents_cid: Cid) -> Result<(), Error> { | ||
|
@@ -63,21 +67,48 @@ where | |
{ | ||
self.read_pragma().await?; | ||
let header = self.read_header().await?; | ||
let _v1_header = self.read_v1_header().await?; | ||
let v1_header = self.read_v1_header().await?; | ||
let mut written = 0; | ||
|
||
while let Ok((_cid, contents)) = self.read_block().await { | ||
// CAR file contents is empty | ||
if contents.len() == 0 { | ||
break; | ||
} | ||
// Keep track of root CID and position | ||
let root_cid = v1_header.roots.first().ok_or(Error::EmptyRootsError)?; | ||
let data_end = header.data_offset + header.data_size; | ||
|
||
// Track what we've processed and need to process | ||
let mut processed: HashSet<Cid> = HashSet::new(); | ||
let mut to_process = vec![*root_cid]; | ||
|
||
while !to_process.is_empty() { | ||
let position = self.get_inner_mut().stream_position().await?; | ||
let data_end = header.data_offset + header.data_size; | ||
// Add the `written != 0` clause for files that are less than a single block. | ||
if position >= data_end && written != 0 { | ||
break; | ||
} | ||
written += output_file.write(&contents).await?; | ||
|
||
if let Ok((cid, contents)) = self.read_block().await { | ||
if contents.len() == 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I remember when implementing this that this break condition cause the output file to miss some data. Did you check that the output file contains all the data? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed with proper DAG traversal. Tested with byte-by-byte comparisons in test_filestore_roundtrip. |
||
break; | ||
} | ||
|
||
// Write the block data | ||
written += output_file.write(&contents).await?; | ||
|
||
// If it's a DAG-PB node, queue up its children | ||
if cid.codec() == DAG_PB_CODE && !processed.contains(&cid) { | ||
let reader = std::io::BufReader::new(&contents[..]); | ||
if let Ok(node) = DagPbCodec::decode(reader) { | ||
let pb_node: PbNode = node; | ||
to_process.extend( | ||
pb_node | ||
.links | ||
.iter() | ||
.map(|link| link.cid) | ||
.filter(|cid| !processed.contains(cid)), | ||
); | ||
} | ||
} | ||
|
||
processed.insert(cid); | ||
} | ||
} | ||
|
||
Ok(()) | ||
|
@@ -164,9 +195,11 @@ where | |
} | ||
|
||
/// Function verifies that a given CID matches the CID for the CAR file in the given reader | ||
pub async fn verify_cid<R: AsyncRead + Unpin>(reader: R, contents_cid: Cid) -> Result<(), Error> { | ||
let mut reader = Reader::new(BufReader::new(reader)); | ||
|
||
pub async fn verify_cid<R>(reader: R, contents_cid: Cid) -> Result<(), Error> | ||
where | ||
R: AsyncRead + AsyncSeek + Unpin, | ||
{ | ||
let mut reader = Reader::new(reader); | ||
reader.verify_cid(contents_cid).await | ||
} | ||
|
||
|
@@ -175,11 +208,13 @@ mod tests { | |
use std::{io::Cursor, path::PathBuf, str::FromStr}; | ||
|
||
use ipld_core::cid::Cid; | ||
use ipld_core::codec::Codec; | ||
use ipld_dagpb::{DagPbCodec, PbNode}; | ||
use sha2::Sha256; | ||
use tokio::{fs::File, io::AsyncSeekExt}; | ||
|
||
use crate::{ | ||
multicodec::{generate_multihash, RAW_CODE, SHA_256_CODE}, | ||
multicodec::{generate_multihash, DAG_PB_CODE, RAW_CODE, SHA_256_CODE}, | ||
v2::{index::Index, reader::Reader}, | ||
verify_cid, Error, | ||
}; | ||
|
@@ -438,4 +473,48 @@ mod tests { | |
assert_eq!(fst[0].entries.len(), 4); | ||
} | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_dag_pb_links() { | ||
let file = File::open("tests/fixtures/car_v2/spaceglenda.car") | ||
.await | ||
.unwrap(); | ||
let mut reader = Reader::new(file); | ||
|
||
reader.read_pragma().await.unwrap(); | ||
reader.read_header().await.unwrap(); | ||
|
||
let mut found_dag_pb = false; | ||
let mut total_links = 0; | ||
|
||
while let Ok((cid, data)) = reader.read_block().await { | ||
if cid.codec() == DAG_PB_CODE { | ||
found_dag_pb = true; | ||
let reader = std::io::BufReader::new(&data[..]); | ||
|
||
match DagPbCodec::decode(reader) { | ||
Ok(node) => { | ||
let pb_node: PbNode = node; | ||
if !pb_node.links.is_empty() { | ||
total_links += pb_node.links.len(); | ||
} | ||
|
||
// Verify each link | ||
for link in pb_node.links { | ||
assert!( | ||
!link.cid.to_string().is_empty(), | ||
"Link should have valid CID" | ||
); | ||
} | ||
} | ||
Err(err) => { | ||
panic!("Failed to decode DAG-PB node: {}", err); | ||
} | ||
} | ||
} | ||
} | ||
|
||
assert!(found_dag_pb, "No DAG-PB nodes found in test file"); | ||
assert!(total_links > 0, "No links found in DAG-PB nodes"); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,9 @@ use axum::{ | |
}; | ||
use futures::{TryFutureExt, TryStreamExt}; | ||
use mater::Cid; | ||
use polka_storage_provider_common::commp::{commp, CommPError}; | ||
use mater::create_filestore; | ||
use polka_storage_provider_common::commp::commp; | ||
use polka_storage_provider_common::commp::{calculate_piece_commitment, CommPError}; | ||
use primitives::{commitment::piece::PaddedPieceSize, proofs::RegisteredPoStProof}; | ||
use tokio::{ | ||
fs::{self, File}, | ||
|
@@ -23,6 +25,8 @@ use tokio_util::{ | |
use tower_http::trace::TraceLayer; | ||
use uuid::Uuid; | ||
|
||
use mater::{DEFAULT_CHUNK_SIZE, DEFAULT_TREE_WIDTH}; | ||
|
||
#[cfg(feature = "delia")] | ||
mod delia_imports { | ||
pub use axum::{http::Method, response::Json, routing::post}; | ||
|
@@ -115,8 +119,13 @@ fn configure_router(state: Arc<StorageServerState>) -> Router { | |
|
||
#[cfg(not(feature = "delia"))] | ||
fn config_non_delia(state: Arc<StorageServerState>) -> Router { | ||
// Type annotation required to satisfy Send bounds needed for UnixFS processing | ||
// across async operations and thread boundaries | ||
Router::new() | ||
.route("/upload/:cid", put(upload)) | ||
.route( | ||
"/upload/:cid", | ||
put(upload as fn(State<Arc<StorageServerState>>, Path<String>, Request<Body>) -> _), | ||
) | ||
.route("/download/:cid", get(download)) | ||
.with_state(state) | ||
.layer( | ||
|
@@ -160,9 +169,9 @@ fn configure_router(state: Arc<StorageServerState>) -> Router { | |
/// ``` | ||
#[tracing::instrument(skip_all, fields(cid))] | ||
async fn upload( | ||
ref s @ State(ref state): State<Arc<StorageServerState>>, | ||
State(state): State<Arc<StorageServerState>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change seems arbitrary There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. simpler, more idiomatic pattern for handling Axum state, the old way was over-complicated imo |
||
Path(cid): Path<String>, | ||
request: Request, | ||
request: Request<Body>, | ||
) -> Result<String, (StatusCode, String)> { | ||
let deal_cid = cid::Cid::from_str(&cid).map_err(|err| { | ||
tracing::error!(cid, "failed to parse cid"); | ||
|
@@ -193,20 +202,25 @@ async fn upload( | |
|
||
// Branching needed here since the resulting `StreamReader`s don't have the same type | ||
let file_cid = if request.headers().contains_key("Content-Type") { | ||
// Handle multipart forms | ||
let mut multipart = Multipart::from_request(request, &s) | ||
// Handle the multipart data | ||
let mut multipart = Multipart::from_request(request, &state) | ||
.await | ||
.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?; | ||
let Some(field) = multipart | ||
|
||
// Get the field data | ||
let field_bytes = multipart | ||
.next_field() | ||
.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string())) | ||
.await? | ||
else { | ||
return Err((StatusCode::BAD_REQUEST, "empty request".to_string())); | ||
}; | ||
.await | ||
.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))? | ||
.ok_or_else(|| (StatusCode::BAD_REQUEST, "empty request".to_string()))? | ||
.bytes() | ||
.await | ||
.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?; | ||
|
||
// Create reader from the field data | ||
let reader = std::io::Cursor::new(field_bytes); | ||
|
||
let field_reader = StreamReader::new(field.map_err(std::io::Error::other)); | ||
stream_contents_to_car(state.car_piece_storage_dir.clone().as_ref(), field_reader) | ||
stream_contents_to_car(state.car_piece_storage_dir.clone().as_ref(), reader) | ||
.await | ||
.map_err(|err| { | ||
tracing::error!(%err, "failed to store file into CAR archive"); | ||
|
@@ -366,13 +380,21 @@ fn content_path(folder: &std::path::Path, cid: Cid) -> (String, PathBuf) { | |
(name, path) | ||
} | ||
|
||
/// Reads bytes from the source and writes them to a CAR file. | ||
/// Converts a source stream into a CARv2 file and writes it to an output stream. | ||
/// | ||
/// Send + 'static bounds are required because the UnixFS processing involves: | ||
/// - Async stream processing that may cross thread boundaries | ||
/// - State management for DAG construction and deduplication | ||
/// - Block tracking that must be thread-safe | ||
/// | ||
/// The expanded trait bounds ensure that all data can be safely moved between | ||
/// threads during async operations. | ||
async fn stream_contents_to_car<R>( | ||
folder: &std::path::Path, | ||
source: R, | ||
) -> Result<Cid, Box<dyn std::error::Error>> | ||
where | ||
R: AsyncRead + Unpin, | ||
R: AsyncRead + Unpin + Send + 'static, | ||
{ | ||
// Temp file which will be used to store the CAR file content. The temp | ||
// director has a randomized name and is created in the same folder as the | ||
|
@@ -384,7 +406,14 @@ where | |
// Stream the body from source to the temp file. | ||
let file = File::create(&temp_file_path).await?; | ||
let writer = BufWriter::new(file); | ||
let cid = mater::create_filestore(source, writer, mater::Config::default()).await?; | ||
|
||
let config = mater::Config::Balanced { | ||
chunk_size: DEFAULT_CHUNK_SIZE, | ||
tree_width: DEFAULT_TREE_WIDTH, | ||
raw_mode: false, // Default to UnixFS | ||
}; | ||
|
||
let cid = create_filestore(source, writer, config).await?; | ||
tracing::trace!("finished writing the CAR archive"); | ||
|
||
// If the file is successfully written, we can now move it to the final | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not the most versed in async Rust, could you elaborate why you needed these bounds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added code comments