Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Bytes::from_owner #742

Merged
merged 28 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 158 additions & 1 deletion src/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::iter::FromIterator;
use core::mem::{self, ManuallyDrop};
use core::ops::{Deref, RangeBounds};
use core::ptr::NonNull;
use core::{cmp, fmt, hash, ptr, slice, usize};

use alloc::{
Expand Down Expand Up @@ -116,6 +117,7 @@ pub(crate) struct Vtable {
pub to_mut: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> BytesMut,
/// fn(data)
pub is_unique: unsafe fn(&AtomicPtr<()>) -> bool,
pub cheap_into_mut: unsafe fn(&AtomicPtr<()>) -> bool,
/// fn(data, ptr, len)
pub drop: unsafe fn(&mut AtomicPtr<()>, *const u8, usize),
}
Expand Down Expand Up @@ -200,6 +202,69 @@ impl Bytes {
}
}

/// Create [Bytes] with a buffer whose lifetime is controlled
/// via an explicit owner.
///
/// A common use case is to zero-copy construct from mapped memory.
///
/// ```ignore
amunra marked this conversation as resolved.
Show resolved Hide resolved
/// use bytes::Bytes;
/// use std::fs::File;
/// use memmap2::Map;
///
/// let file = File::open("upload_bundle.tar.gz")?;
/// let mmap = unsafe { Mmap::map(&file) }?;
/// let b = Bytes::from_owner(mmap);
/// ```
///
/// The `owner` will be transferred to the constructed [Bytes] object, which
/// will ensure it is dropped once all remaining clones of the constructed
/// object are dropped. The owner will then be responsible for dropping the
/// specified region of memory as part of its [Drop] implementation.
///
/// Note that converting [Bytes] constructed from an owner into a [BytesMut]
/// will always create a deep copy of the buffer into newly allocated memory.
pub fn from_owner<T>(owner: T) -> Self
where
T: AsRef<[u8]> + Send + 'static,
{
// Safety & Miri:
// The ownership of `owner` is first transferred to the `Owned` wrapper and `Bytes` object.
// This ensures that the owner is pinned in memory, allowing us to call `.as_ref()` safely
// since the lifetime of the owner is controlled by the lifetime of the new `Bytes` object,
// and the lifetime of the resulting borrowed `&[u8]` matches that of the owner.
// Note that this remains safe so long as we only call `.as_ref()` once.
//
// There are some additional special considerations here:
// * We rely on Bytes's Drop impl to clean up memory should `.as_ref()` panic.
// * Setting the `ptr` and `len` on the bytes object last (after moving the owner to
// Bytes) allows Miri checks to pass since it avoids obtaining the `&[u8]` slice
// from a stack-owned Box.
// More details on this: https://github.com/tokio-rs/bytes/pull/742/#discussion_r1813375863
// and: https://github.com/tokio-rs/bytes/pull/742/#discussion_r1813316032

let owned = Box::into_raw(Box::new(Owned {
lifetime: OwnedLifetime {
ref_cnt: AtomicUsize::new(1),
drop: owned_box_and_drop::<T>,
},
owner,
}));

let mut ret = Bytes {
ptr: NonNull::dangling().as_ptr(),
len: 0,
data: AtomicPtr::new(owned.cast()),
vtable: &OWNED_VTABLE,
};

let buf = unsafe { &*owned }.owner.as_ref();
ret.ptr = buf.as_ptr();
ret.len = buf.len();

ret
}

/// Returns the number of bytes contained in this `Bytes`.
///
/// # Examples
Expand Down Expand Up @@ -536,6 +601,9 @@ impl Bytes {
/// If `self` is not unique for the entire original buffer, this will fail
/// and return self.
///
/// This will also always fail if the buffer was constructed via
/// [from_owner](Bytes::from_owner).
///
/// # Examples
///
/// ```
Expand All @@ -545,7 +613,7 @@ impl Bytes {
/// assert_eq!(bytes.try_into_mut(), Ok(BytesMut::from(&b"hello"[..])));
/// ```
pub fn try_into_mut(self) -> Result<BytesMut, Bytes> {
if self.is_unique() {
if unsafe { (self.vtable.cheap_into_mut)(&self.data) } {
amunra marked this conversation as resolved.
Show resolved Hide resolved
Ok(self.into())
} else {
Err(self)
Expand Down Expand Up @@ -986,6 +1054,7 @@ const STATIC_VTABLE: Vtable = Vtable {
to_vec: static_to_vec,
to_mut: static_to_mut,
is_unique: static_is_unique,
cheap_into_mut: static_is_unique,
drop: static_drop,
};

Expand All @@ -1012,13 +1081,99 @@ unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
// nothing to drop for &'static [u8]
}

// ===== impl OwnedVtable =====

#[repr(C)]
struct OwnedLifetime {
ref_cnt: AtomicUsize,
drop: unsafe fn(*mut ()),
}

#[repr(C)]
struct Owned<T> {
lifetime: OwnedLifetime,
owner: T,
}

unsafe fn owned_box_and_drop<T>(ptr: *mut ()) {
let b: Box<Owned<T>> = Box::from_raw(ptr as _);
drop(b);
}

unsafe fn owned_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
let owned = data.load(Ordering::Acquire);
amunra marked this conversation as resolved.
Show resolved Hide resolved
let ref_cnt = &(*owned.cast::<OwnedLifetime>()).ref_cnt;
let old_cnt = ref_cnt.fetch_add(1, Ordering::Relaxed);
if old_cnt > usize::MAX >> 1 {
crate::abort()
}

Bytes {
ptr,
len,
data: AtomicPtr::new(owned as _),
vtable: &OWNED_VTABLE,
}
}

unsafe fn owned_to_vec(_data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
let slice = slice::from_raw_parts(ptr, len);
slice.to_vec()
}

unsafe fn owned_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
let bytes_mut = BytesMut::from_vec(owned_to_vec(data, ptr, len));
owned_drop_impl(data.load(Ordering::Acquire));
amunra marked this conversation as resolved.
Show resolved Hide resolved
bytes_mut
}

unsafe fn owned_is_unique(data: &AtomicPtr<()>) -> bool {
let owned = data.load(Ordering::Acquire);
amunra marked this conversation as resolved.
Show resolved Hide resolved
let ref_cnt = &(*owned.cast::<OwnedLifetime>()).ref_cnt;
ref_cnt.load(Ordering::Relaxed) == 1
amunra marked this conversation as resolved.
Show resolved Hide resolved
}

unsafe fn owned_cheap_into_mut(_data: &AtomicPtr<()>) -> bool {
// Since the memory's ownership is tied to an external owner
// it is never zero-copy to create a BytesMut.
false
}

unsafe fn owned_drop_impl(owned: *mut ()) {
let lifetime = owned.cast::<OwnedLifetime>();
let ref_cnt = &(*lifetime).ref_cnt;

let old_cnt = ref_cnt.fetch_sub(1, Ordering::Acquire);
if old_cnt != 1 {
return;
}
amunra marked this conversation as resolved.
Show resolved Hide resolved

let drop = &(*lifetime).drop;
drop(owned)
amunra marked this conversation as resolved.
Show resolved Hide resolved
}

unsafe fn owned_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
let owned = data.load(Ordering::Acquire);
amunra marked this conversation as resolved.
Show resolved Hide resolved
owned_drop_impl(owned);
}

static OWNED_VTABLE: Vtable = Vtable {
clone: owned_clone,
to_vec: owned_to_vec,
to_mut: owned_to_mut,
is_unique: owned_is_unique,
cheap_into_mut: owned_cheap_into_mut,
drop: owned_drop,
};

// ===== impl PromotableVtable =====

static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable {
clone: promotable_even_clone,
to_vec: promotable_even_to_vec,
to_mut: promotable_even_to_mut,
is_unique: promotable_is_unique,
cheap_into_mut: promotable_is_unique,
drop: promotable_even_drop,
};

Expand All @@ -1027,6 +1182,7 @@ static PROMOTABLE_ODD_VTABLE: Vtable = Vtable {
to_vec: promotable_odd_to_vec,
to_mut: promotable_odd_to_mut,
is_unique: promotable_is_unique,
cheap_into_mut: promotable_is_unique,
drop: promotable_odd_drop,
};

Expand Down Expand Up @@ -1203,6 +1359,7 @@ static SHARED_VTABLE: Vtable = Vtable {
to_vec: shared_to_vec,
to_mut: shared_to_mut,
is_unique: shared_is_unique,
cheap_into_mut: shared_is_unique,
drop: shared_drop,
};

Expand Down
1 change: 1 addition & 0 deletions src/bytes_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1776,6 +1776,7 @@ static SHARED_VTABLE: Vtable = Vtable {
to_vec: shared_v_to_vec,
to_mut: shared_v_to_mut,
is_unique: shared_v_is_unique,
cheap_into_mut: shared_v_is_unique,
drop: shared_v_drop,
};

Expand Down
139 changes: 139 additions & 0 deletions tests/test_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#![warn(rust_2018_idioms)]

use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use std::panic::{self, AssertUnwindSafe};
use std::usize;

const LONG: &[u8] = b"mary had a little lamb, little lamb, little lamb";
Expand Down Expand Up @@ -1479,3 +1482,139 @@ fn split_to_empty_addr_mut() {
let _ = &empty_start[..];
let _ = &buf[..];
}

#[derive(Clone)]
struct OwnedTester<const L: usize> {
buf: [u8; L],
drop_count: Arc<AtomicUsize>,
pub panic_as_ref: bool,
}

impl<const L: usize> OwnedTester<L> {
fn new(buf: [u8; L], drop_count: Arc<AtomicUsize>) -> Self {
Self {
buf,
drop_count,
panic_as_ref: false,
}
}
}

impl<const L: usize> AsRef<[u8]> for OwnedTester<L> {
fn as_ref(&self) -> &[u8] {
if self.panic_as_ref {
panic!("test-triggered panic in `AsRef<[u8]> for OwnedTester`");
}
self.buf.as_slice()
}
}

impl<const L: usize> Drop for OwnedTester<L> {
fn drop(&mut self) {
self.drop_count.fetch_add(1, Ordering::AcqRel);
amunra marked this conversation as resolved.
Show resolved Hide resolved
}
}

#[test]
fn owned_is_unique() {
let b1 = Bytes::from_owner([1, 2, 3, 4, 5, 6, 7]);
assert!(b1.is_unique());
let b2 = b1.clone();
assert!(!b1.is_unique());
assert!(!b2.is_unique());
drop(b1);
assert!(b2.is_unique());
}

#[test]
fn owned_buf_sharing() {
let buf = [1, 2, 3, 4, 5, 6, 7];
let b1 = Bytes::from_owner(buf);
let b2 = b1.clone();
assert_eq!(&buf[..], &b1[..]);
assert_eq!(&buf[..], &b2[..]);
assert_eq!(b1.as_ptr(), b2.as_ptr());
assert_eq!(b1.len(), b2.len());
assert_eq!(b1.len(), buf.len());
}

#[test]
fn owned_buf_slicing() {
let b1 = Bytes::from_owner(SHORT);
assert_eq!(SHORT, &b1[..]);
let b2 = b1.slice(1..(b1.len() - 1));
assert_eq!(&SHORT[1..(SHORT.len() - 1)], b2);
assert_eq!(unsafe { SHORT.as_ptr().add(1) }, b2.as_ptr());
assert_eq!(SHORT.len() - 2, b2.len());
}

#[test]
fn owned_dropped_exactly_once() {
let buf: [u8; 5] = [1, 2, 3, 4, 5];
let drop_counter = Arc::new(AtomicUsize::new(0));
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);
let b2 = b1.clone();
assert_eq!(drop_counter.load(Ordering::Acquire), 0);
drop(b1);
assert_eq!(drop_counter.load(Ordering::Acquire), 0);
let b3 = b2.slice(1..b2.len() - 1);
drop(b2);
assert_eq!(drop_counter.load(Ordering::Acquire), 0);
assert!(b3.is_unique());
drop(b3);
assert_eq!(drop_counter.load(Ordering::Acquire), 1);
}

#[test]
fn owned_to_mut() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = Arc::new(AtomicUsize::new(0));
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);

// Holding an owner will fail converting to a BytesMut,
// even when the bytes instance is unique.
assert!(b1.is_unique());
let b1 = b1.try_into_mut().unwrap_err();

// That said, it's still possible, just not cheap.
let bm1: BytesMut = b1.into();
let new_buf = &bm1[..];
assert_eq!(new_buf, &buf[..]);

// `.into::<BytesMut>()` has correctly dropped the owner
assert_eq!(drop_counter.load(Ordering::Acquire), 1);
}

#[test]
fn owned_to_vec() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = Arc::new(AtomicUsize::new(0));
let owner = OwnedTester::new(buf, drop_counter.clone());
let b1 = Bytes::from_owner(owner);
assert!(b1.is_unique());

let v1 = b1.to_vec();
assert!(b1.is_unique());
assert_eq!(&v1[..], &buf[..]);
assert_eq!(&v1[..], &b1[..]);

drop(b1);
assert_eq!(drop_counter.load(Ordering::Acquire), 1);
}

#[test]
fn owned_safe_drop_on_as_ref_panic() {
let buf: [u8; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let drop_counter = Arc::new(AtomicUsize::new(0));
let mut owner = OwnedTester::new(buf, drop_counter.clone());
owner.panic_as_ref = true;

let result = panic::catch_unwind(AssertUnwindSafe(|| {
let _ = Bytes::from_owner(owner);
}));

assert!(result.is_err());
assert_eq!(drop_counter.load(Ordering::Acquire), 1);
}