Skip to content

src: experimental http client support #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 26, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -19,5 +19,6 @@ jobs:
- run: cargo publish -p composefs-oci
- run: cargo publish -p composefs-boot
- run: cargo publish -p composefs-fuse
- run: cargo publish -p composefs-http
- run: cargo publish -p cfsctl
- run: cargo publish -p composefs-setup-root
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ unsafe_code = "deny" # https://github.com/containers/composefs-rs/issues/123
composefs = { version = "0.3.0", path = "crates/composefs", default-features = false }
composefs-oci = { version = "0.3.0", path = "crates/composefs-oci", default-features = false }
composefs-boot = { version = "0.3.0", path = "crates/composefs-boot", default-features = false }
composefs-http = { version = "0.3.0", path = "crates/composefs-http", default-features = false }

[profile.dev.package.sha2]
# this is *really* slow otherwise
2 changes: 2 additions & 0 deletions crates/cfsctl/Cargo.toml
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ version.workspace = true

[features]
default = ['pre-6.15', 'oci']
http = ['composefs-http']
oci = ['composefs-oci']
rhel9 = ['composefs/rhel9']
'pre-6.15' = ['composefs/pre-6.15']
@@ -22,6 +23,7 @@ clap = { version = "4.0.1", default-features = false, features = ["std", "help",
composefs = { workspace = true }
composefs-boot = { workspace = true }
composefs-oci = { workspace = true, optional = true }
composefs-http = { workspace = true, optional = true }
env_logger = { version = "0.11.0", default-features = false }
hex = { version = "0.4.0", default-features = false }
rustix = { version = "1.0.0", default-features = false, features = ["fs", "process"] }
11 changes: 11 additions & 0 deletions crates/cfsctl/src/main.rs
Original file line number Diff line number Diff line change
@@ -140,6 +140,11 @@ enum Command {
ImageObjects {
name: String,
},
#[cfg(feature = "http")]
Fetch {
url: String,
name: String,
},
}

fn verity_opt(opt: &Option<String>) -> Result<Option<Sha256HashValue>> {
@@ -344,6 +349,12 @@ async fn main() -> Result<()> {
Command::GC => {
repo.gc()?;
}
#[cfg(feature = "http")]
Command::Fetch { url, name } => {
let (sha256, verity) = composefs_http::download(&url, &name, Arc::new(repo)).await?;
println!("sha256 {}", hex::encode(sha256));
println!("verity {}", verity.to_hex());
}
}
Ok(())
}
27 changes: 27 additions & 0 deletions crates/composefs-http/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "composefs-http"
description = "HTTP downloader for composefs repositories"
keywords = ["composefs", "http"]

edition.workspace = true
license.workspace = true
readme.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true

[dependencies]
anyhow = { version = "1.0.87", default-features = false }
bytes = { version = "1.7.1", default-features = false }
composefs = { workspace = true }
hex = { version = "0.4.0", default-features = false }
indicatif = { version = "0.17.0", default-features = false }
reqwest = { version = "0.12.15", features = ["zstd"] }
sha2 = { version = "0.10.1", default-features = false }
tokio = { version = "1.24.2", default-features = false }

[dev-dependencies]
similar-asserts = "1.7.0"

[lints]
workspace = true
246 changes: 246 additions & 0 deletions crates/composefs-http/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
use std::{
collections::{HashMap, HashSet},
fs::File,
io::Read,
sync::Arc,
};

use anyhow::{bail, Result};
use bytes::Bytes;
use indicatif::{ProgressBar, ProgressStyle};
use reqwest::{Client, Response, Url};
use sha2::{Digest, Sha256};
use tokio::task::JoinSet;

use composefs::{
fsverity::FsVerityHashValue,
repository::Repository,
splitstream::{DigestMapEntry, SplitStreamReader},
util::Sha256Digest,
};

struct Downloader<ObjectID: FsVerityHashValue> {
client: Client,
repo: Arc<Repository<ObjectID>>,
url: Url,
}

impl<ObjectID: FsVerityHashValue> Downloader<ObjectID> {
fn is_symlink(response: &Response) -> bool {
let Some(content_type_header) = response.headers().get("Content-Type") else {
return false;
};

let Ok(content_type) = content_type_header.to_str() else {
return false;
};

["text/x-symlink-target"].contains(&content_type)
}

async fn fetch(&self, dir: &str, name: &str) -> Result<(Bytes, bool)> {
let object_url = self.url.join(dir)?.join(name)?;
let request = self.client.get(object_url.clone()).build()?;
let response = self.client.execute(request).await?;
response.error_for_status_ref()?;
let is_symlink = Self::is_symlink(&response);
Ok((response.bytes().await?, is_symlink))
}

async fn ensure_object(&self, id: &ObjectID) -> Result<bool> {
if self.repo.open_object(id).is_err() {
let (data, _is_symlink) = self.fetch("objects/", &id.to_object_pathname()).await?;
let actual_id = self.repo.ensure_object_async(data.into()).await?;
if actual_id != *id {
bail!("Downloaded {id:?} but it has fs-verity {actual_id:?}");
}
Ok(true)
} else {
Ok(false)
}
}

fn open_splitstream(&self, id: &ObjectID) -> Result<SplitStreamReader<File, ObjectID>> {
SplitStreamReader::new(File::from(self.repo.open_object(id)?))
}

fn read_object(&self, id: &ObjectID) -> Result<Vec<u8>> {
let mut data = vec![];
File::from(self.repo.open_object(id)?).read_to_end(&mut data)?;
Ok(data)
}

async fn ensure_stream(self: &Arc<Self>, name: &str) -> Result<(Sha256Digest, ObjectID)> {
let progress = ProgressBar::new(2); // the first object gets "ensured" twice
progress.set_style(
ProgressStyle::with_template(
"[eta {eta}] {bar:40.cyan/blue} Fetching {pos} / {len} splitstreams",
)
.unwrap()
.progress_chars("##-"),
);

// Ideally we'll get a symlink, but we might get the data directly
let (data, is_symlink) = self.fetch("streams/", name).await?;
let my_id = if is_symlink {
ObjectID::from_object_pathname(&data)?
} else {
self.repo.ensure_object(&data)?
};
progress.inc(1);

let mut objects_todo = HashSet::new();

// TODO: if 'name' looks sha256ish then we ought to use it instead of None?
let mut splitstreams = HashMap::from([(my_id.clone(), None)]);
let mut splitstreams_todo = vec![my_id.clone()];

// Recursively fetch all splitstreams
// TODO: make this parallel, at least the ensure_object() part...
while let Some(id) = splitstreams_todo.pop() {
// this is the slow part (downloads, writing to disk, etc.)
if self.ensure_object(&id).await? {
progress.inc(1);
} else {
progress.dec_length(1);
}

// this part is fast: it only touches the header
let mut reader = self.open_splitstream(&id)?;
for DigestMapEntry { verity, body } in &reader.refs.map {
match splitstreams.insert(verity.clone(), Some(*body)) {
// This is the (normal) case if we encounter a splitstream we didn't see yet...
None => {
splitstreams_todo.push(verity.clone());
progress.inc_length(1);
}

// This is the case where we've already been asked to fetch this stream. We'll
// verify the SHA-256 content hashes later (after we get all the objects) so we
// need to make sure that all referents of this stream agree on what that is.
Some(Some(previous)) => {
if previous != *body {
bail!(
"Splitstream with verity {verity:?} has different body hashes {} and {}",
hex::encode(previous),
hex::encode(body)
);
}
}

// This case should really be absolutely impossible: the only None value we
// record is for the original stream, and if we somehow managed to get back
// there via object IDs (which we check on download) then it means someone
// managed to construct two self-referential content-addressed objects...
Some(None) => bail!("Splitstream attempts to include itself recursively"),
}
}

// This part is medium-fast: it needs to iterate the entire stream
reader.get_object_refs(|id| {
if !splitstreams.contains_key(id) {
objects_todo.insert(id.clone());
}
})?;
}

progress.finish();

let progress = ProgressBar::new(objects_todo.len() as u64);
progress.set_style(
ProgressStyle::with_template(
"[eta {eta}] {bar:40.cyan/blue} Fetching {pos} / {len} objects",
)
.unwrap()
.progress_chars("##-"),
);

// Fetch all the objects
let mut set = JoinSet::<Result<bool>>::new();
let mut iter = objects_todo.into_iter();

// Queue up 100 initial requests
// See SETTINGS_MAX_CONCURRENT_STREAMS in RFC 7540
// We might actually want to increase this...
for id in iter.by_ref().take(100) {
let self_ = Arc::clone(self);
set.spawn(async move { self_.ensure_object(&id).await });
}

// Collect results for tasks that finish. For each finished task, add another (if there
// are any).
while let Some(result) = set.join_next().await {
if result?? {
// a download
progress.inc(1);
} else {
// a not-download
progress.dec_length(1);
}

if let Some(id) = iter.next() {
let self_ = Arc::clone(self);
set.spawn(async move { self_.ensure_object(&id).await });
}
}

progress.finish();

// Now that we have all of the objects, we can verify that the merged-content of each
// splitstream corresponds to its claimed body content checksum, if any...
let progress = ProgressBar::new(splitstreams.len() as u64);
progress.set_style(
ProgressStyle::with_template(
"[eta {eta}] {bar:40.cyan/blue} Verifying {pos} / {len} splitstreams",
)
.unwrap()
.progress_chars("##-"),
);

let mut my_sha256 = None;
// TODO: This can definitely happen in parallel...
for (id, expected_checksum) in splitstreams {
let mut reader = self.open_splitstream(&id)?;
let mut context = Sha256::new();
reader.cat(&mut context, |id| self.read_object(id))?;
let measured_checksum: Sha256Digest = context.finalize().into();

if let Some(expected) = expected_checksum {
if measured_checksum != expected {
bail!(
"Splitstream id {id:?} should have checksum {} but is actually {}",
hex::encode(expected),
hex::encode(measured_checksum)
);
}
}

if id == my_id {
my_sha256 = Some(measured_checksum);
}

progress.inc(1);
}

progress.finish();

// We've definitely set this by now: `my_id` is in `splitstreams`.
let my_sha256 = my_sha256.unwrap();

Ok((my_sha256, my_id))
}
}

pub async fn download<ObjectID: FsVerityHashValue>(
url: &str,
name: &str,
repo: Arc<Repository<ObjectID>>,
) -> Result<(Sha256Digest, ObjectID)> {
let downloader = Arc::new(Downloader {
client: Client::new(),
repo,
url: Url::parse(url)?,
});

downloader.ensure_stream(name).await
}
115 changes: 115 additions & 0 deletions examples/s3-uploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import argparse
import errno
import os
import threading
import concurrent.futures
from collections.abc import Iterator
from typing import Any

import boto3
from botocore.exceptions import ClientError
import magic
import zstd


class MimeDB(threading.local):
db: magic.Magic

def getdb(self) -> magic.Magic:
if not hasattr(self, "db"):
db = magic.open(magic.MAGIC_MIME | magic.MAGIC_MIME_ENCODING)
assert db is not None
result = db.load()
assert result == 0
self.db = db

return self.db

def content_type_for_data(self, data: bytes) -> str:
result = self.getdb().buffer(data)
assert result is not None
result = result.replace("; charset=binary", "") # this is just silly...
return result


mimedb = MimeDB()


def ensure_file(bucket: Any, dirfd: int, path: str) -> None:
obj = bucket and bucket.Object(path)

if obj is not None:
try:
# This is a HEAD request only
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/object/load.html
obj.load()
print(f"{path}: skip")
return # done!
except ClientError as e:
if e.response["Error"]["Code"] != "404":
raise

try:
with open(os.open(path, os.O_RDONLY | os.O_NOFOLLOW, dir_fd=dirfd), "rb") as f:
data = f.read()

content_type = mimedb.content_type_for_data(data)
except OSError as exc:
if exc.errno != errno.ELOOP:
raise

data = os.readlink(path, dir_fd=dirfd).encode('utf-8', 'surrogateescape')
content_type = 'text/x-symlink-target'

# 22 is max but RFC 9659 limits the window size to 8MB, which we get at level 19
# we could possibly tweak other parameters to be more aggressive...
compressed = zstd.compress(data, 19)

# skip compression unless it made a meaningful difference
ratio = len(data) / len(compressed) # len(compressed) won't be 0
if ratio > 1.1:
print(
f"{path}: {content_type} zstd {len(data)}{len(compressed)} ({ratio:.2f}x compression)"
)
content_encoding = "zstd"
data = compressed
else:
print(f"{path}: {content_type} as-is {len(data)} ({ratio:.2f}x)")
content_encoding = ""

if obj is not None:
obj.put(Body=data, ContentEncoding=content_encoding, ContentType=content_type)


def find_not_type_d(dir_fd: int) -> Iterator[str]:
for dirpath, _dirnames, filenames, _dirfd in os.fwalk(dir_fd=dir_fd):
yield from (os.path.normpath(os.path.join(dirpath, name)) for name in filenames)


def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--endpoint", default="https://nbg1.your-objectstorage.com")
parser.add_argument("--bucket", default="lis")
parser.add_argument("-j", "--jobs", type=int, default=64)
parser.add_argument("dir")

args = parser.parse_args()

# https://github.com/boto/boto3/issues/4392
# https://docs.hetzner.com/storage/object-storage/getting-started/using-s3-api-tools/
os.environ["AWS_REQUEST_CHECKSUM_CALCULATION"] = "WHEN_REQUIRED"

dirfd = os.open(args.dir, os.O_PATH | os.O_CLOEXEC)
files = sorted(find_not_type_d(dirfd))
print(f"Found {len(files)} files to process.")

s3: Any = boto3.resource("s3", endpoint_url=args.endpoint)
bucket = s3.Bucket(args.bucket)

with concurrent.futures.ThreadPoolExecutor(max_workers=args.jobs) as executor:
for _ in executor.map(lambda file: ensure_file(bucket, dirfd, file), files):
pass


if __name__ == "__main__":
main()