+{
+ type Error = Error;
type BlockSize = N;
fn block_size(&self) -> Self::BlockSize {
- self.shared_inner.borrow().block_size
+ self.shared.borrow().block_size
}
fn num_blocks(&self) -> u64 {
- self.shared_inner.borrow().num_blocks
+ self.shared.borrow().num_blocks
+ }
+}
+
+impl BlockIO
+ for SharedRingBufferBlockIO
+{
+ async fn read_or_write_blocks(
+ &self,
+ start_block_idx: u64,
+ operation: Operation<'_, P>,
+ ) -> Result<(), Self::Error> {
+ self.request(start_block_idx, operation).await
}
+}
- async fn read_blocks(&self, start_block_idx: u64, buf: &mut [u8]) -> Result<(), Self::Error> {
- let sem = self.shared_inner.borrow().queue_guard.clone();
- let permit = sem.acquire().await;
+pub struct RequestFuture<'a, N, P: Access, A: AbstractBounceBufferAllocator, F: FnMut()> {
+ io: &'a SharedRingBufferBlockIO,
+ operation: Operation<'a, P>,
+ request_index: usize,
+ poll_returned_ready: bool,
+}
- let key = {
- let mut inner = self.shared_inner.borrow_mut();
- let range = inner
- .bounce_buffer_allocator
- .allocate(Layout::from_size_align(buf.len(), 1).unwrap())
- .unwrap();
- let key = range.start;
- let req = BlockIORequest::new(
- BlockIORequestStatus::Pending,
- BlockIORequestType::Read,
- start_block_idx.try_into().unwrap(),
- Descriptor::new(
- inner.dma_region_paddr + range.start,
- range.len().try_into().unwrap(),
- 0,
- ),
- );
- inner.request_statuses.add(key, req).unwrap();
- inner.ring_buffers.free_mut().enqueue(req).unwrap();
- inner.ring_buffers.notify().unwrap();
- key
+impl<'a, N, P: Access, A: AbstractBounceBufferAllocator, F: FnMut()> RequestFuture<'a, N, P, A, F> {
+ fn poll_inner<'b>(&'b mut self, cx: &mut Context<'_>) -> Poll>
+ where
+ 'a: 'b,
+ {
+ assert!(!self.poll_returned_ready);
+ let ret = match self
+ .io
+ .shared
+ .borrow_mut()
+ .owned
+ .poll_request(
+ self.request_index,
+ &mut PollRequestBuf::new(&mut self.operation),
+ Some(cx.waker().clone()),
+ )
+ .map_err(ErrorOrUserError::unwrap_error)
+ {
+ Ok(val) => val.map_err(Error::from),
+ Err(err) => Poll::Ready(Err(err)),
};
+ if ret.is_ready() {
+ self.poll_returned_ready = true;
+ }
+ ret
+ }
+}
+
+impl<'a, N, P: Access, A: AbstractBounceBufferAllocator, F: FnMut()> Future
+ for RequestFuture<'a, N, P, A, F>
+{
+ type Output = Result<(), Error>;
- future::poll_fn(|cx| {
- let mut inner = self.shared_inner.borrow_mut();
- let completion = ready!(inner.request_statuses.poll(&key, cx.waker()).unwrap());
- assert_eq!(completion.complete, BlockIORequestStatus::Ok);
- let req = completion.value;
- let range_start = req.buf().encoded_addr() - inner.dma_region_paddr;
- let range_end = range_start + usize::try_from(req.buf().len()).unwrap();
- let range = range_start..range_end;
- inner
- .dma_region
- .as_mut_ptr()
- .index(range.clone())
- .copy_into_slice(buf);
- inner.bounce_buffer_allocator.deallocate(range);
- Poll::Ready(())
- })
- .await;
-
- drop(permit); // explicit extent of scope
-
- Ok(())
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
+ self.poll_inner(cx)
+ }
+}
+
+impl<'a, N, P: Access, A: AbstractBounceBufferAllocator, F: FnMut()> Drop
+ for RequestFuture<'a, N, P, A, F>
+{
+ fn drop(&mut self) {
+ if !self.poll_returned_ready {
+ self.io
+ .shared
+ .borrow_mut()
+ .owned
+ .cancel_request(self.request_index)
+ .unwrap();
+ }
}
}
diff --git a/crates/sel4-shared-ring-buffer/block-io/src/owned.rs b/crates/sel4-shared-ring-buffer/block-io/src/owned.rs
new file mode 100644
index 000000000..6c63af832
--- /dev/null
+++ b/crates/sel4-shared-ring-buffer/block-io/src/owned.rs
@@ -0,0 +1,374 @@
+use core::alloc::Layout;
+use core::task::{Poll, Waker};
+
+use sel4_async_block_io::{access::Access, Operation};
+use sel4_bounce_buffer_allocator::{AbstractBounceBufferAllocator, BounceBufferAllocator};
+use sel4_externally_shared::ExternallySharedRef;
+use sel4_shared_ring_buffer::{
+ roles::Provide, Descriptor, PeerMisbehaviorError as SharedRingBuffersPeerMisbehaviorError,
+ RingBuffers,
+};
+use sel4_shared_ring_buffer_block_io_types::{
+ BlockIORequest, BlockIORequestStatus, BlockIORequestType,
+};
+use sel4_shared_ring_buffer_bookkeeping::{slot_set_semaphore::*, slot_tracker::*};
+
+pub use crate::errors::{Error, ErrorOrUserError, IOError, PeerMisbehaviorError, UserError};
+
+pub struct OwnedSharedRingBufferBlockIO {
+ dma_region: ExternallySharedRef<'static, [u8]>,
+ bounce_buffer_allocator: BounceBufferAllocator,
+ ring_buffers: RingBuffers<'static, Provide, F, BlockIORequest>,
+ requests: SlotTracker,
+ slot_set_semaphore: SlotSetSemaphore,
+}
+
+const RING_BUFFERS_SLOT_POOL_INDEX: usize = 0;
+const REQUESTS_SLOT_POOL_INDEX: usize = 1;
+const NUM_SLOT_POOLS: usize = 2;
+
+enum StateTypesImpl {}
+
+impl SlotStateTypes for StateTypesImpl {
+ type Occupied = Occupied;
+}
+
+struct Occupied {
+ req: BlockIORequest,
+ state: OccupiedState,
+}
+
+enum OccupiedState {
+ Pending { waker: Option },
+ Canceled,
+ Complete { error: Option },
+}
+
+pub enum IssueRequestBuf<'a> {
+ Read { len: usize },
+ Write { buf: &'a [u8] },
+}
+
+impl<'a> IssueRequestBuf<'a> {
+ pub fn new(operation: &'a Operation<'a, A>) -> Self {
+ match operation {
+ Operation::Read { buf, .. } => Self::Read { len: buf.len() },
+ Operation::Write { buf, .. } => Self::Write { buf },
+ }
+ }
+
+ fn len(&self) -> usize {
+ match self {
+ Self::Read { len } => *len,
+ Self::Write { buf } => buf.len(),
+ }
+ }
+
+ fn ty(&self) -> BlockIORequestType {
+ match self {
+ Self::Read { .. } => BlockIORequestType::Read,
+ Self::Write { .. } => BlockIORequestType::Write,
+ }
+ }
+}
+
+pub enum PollRequestBuf<'a> {
+ Read { buf: &'a mut [u8] },
+ Write,
+}
+
+impl<'a> PollRequestBuf<'a> {
+ pub fn new<'b, A: Access>(operation: &'a mut Operation<'b, A>) -> Self
+ where
+ 'b: 'a,
+ {
+ match operation {
+ Operation::Read { buf, .. } => Self::Read { buf },
+ Operation::Write { .. } => Self::Write,
+ }
+ }
+}
+
+impl
+ OwnedSharedRingBufferBlockIO
+{
+ pub fn new(
+ dma_region: ExternallySharedRef<'static, [u8]>,
+ bounce_buffer_allocator: BounceBufferAllocator,
+ mut ring_buffers: RingBuffers<'static, Provide, F, BlockIORequest>,
+ ) -> Self {
+ assert!(ring_buffers.free_mut().is_empty().unwrap());
+ assert!(ring_buffers.used_mut().is_empty().unwrap());
+ let n = ring_buffers.free().capacity();
+ Self {
+ dma_region,
+ bounce_buffer_allocator,
+ ring_buffers,
+ requests: SlotTracker::new_with_capacity((), (), n),
+ slot_set_semaphore: SlotSetSemaphore::new([n, n]),
+ }
+ }
+
+ pub fn slot_set_semaphore(&self) -> &SlotSetSemaphoreHandle {
+ self.slot_set_semaphore.handle()
+ }
+
+ fn report_current_num_free_current_num_free_ring_buffers_slots(
+ &mut self,
+ ) -> Result<(), ErrorOrUserError> {
+ let current_num_free = self.requests.num_free();
+ self.slot_set_semaphore
+ .report_current_num_free_slots(RING_BUFFERS_SLOT_POOL_INDEX, current_num_free)
+ .unwrap();
+ Ok(())
+ }
+
+ fn report_current_num_free_current_num_free_requests_slots(
+ &mut self,
+ ) -> Result<(), ErrorOrUserError> {
+ let current_num_free = self.ring_buffers.free_mut().num_empty_slots()?;
+ self.slot_set_semaphore
+ .report_current_num_free_slots(REQUESTS_SLOT_POOL_INDEX, current_num_free)
+ .unwrap();
+ Ok(())
+ }
+
+ fn can_issue_requests(
+ &mut self,
+ n: usize,
+ ) -> Result {
+ let can =
+ self.ring_buffers.free_mut().num_empty_slots()? >= n && self.requests.num_free() >= n;
+ Ok(can)
+ }
+
+ pub fn issue_read_request(
+ &mut self,
+ reservation: &mut SlotSetReservation<'_, S, NUM_SLOT_POOLS>,
+ start_block_idx: u64,
+ num_bytes: usize,
+ ) -> Result {
+ self.issue_request(
+ reservation,
+ start_block_idx,
+ &mut IssueRequestBuf::Read { len: num_bytes },
+ )
+ }
+
+ pub fn issue_write_request(
+ &mut self,
+ reservation: &mut SlotSetReservation<'_, S, NUM_SLOT_POOLS>,
+ start_block_idx: u64,
+ buf: &[u8],
+ ) -> Result {
+ self.issue_request(
+ reservation,
+ start_block_idx,
+ &mut IssueRequestBuf::Write { buf },
+ )
+ }
+
+ pub fn issue_request<'a>(
+ &mut self,
+ reservation: &mut SlotSetReservation<'_, S, NUM_SLOT_POOLS>,
+ start_block_idx: u64,
+ buf: &mut IssueRequestBuf,
+ ) -> Result {
+ if reservation.count() < 1 {
+ return Err(UserError::TooManyOutstandingRequests.into());
+ }
+
+ assert!(self.can_issue_requests(1)?);
+
+ let request_index = self.requests.peek_next_free_index().unwrap();
+
+ let range = self
+ .bounce_buffer_allocator
+ .allocate(Layout::from_size_align(buf.len(), 1).unwrap())
+ .map_err(|_| Error::BounceBufferAllocationError)?;
+
+ if let IssueRequestBuf::Write { buf } = buf {
+ self.dma_region
+ .as_mut_ptr()
+ .index(range.clone())
+ .copy_from_slice(buf);
+ }
+
+ let req = BlockIORequest::new(
+ BlockIORequestStatus::Pending,
+ buf.ty(),
+ start_block_idx.try_into().unwrap(),
+ Descriptor::from_encoded_addr_range(range, request_index),
+ );
+
+ self.requests
+ .occupy(Occupied {
+ req: req.clone(),
+ state: OccupiedState::Pending { waker: None },
+ })
+ .unwrap();
+
+ self.ring_buffers
+ .free_mut()
+ .enqueue_and_commit(req)?
+ .unwrap();
+
+ self.ring_buffers.notify_mut();
+
+ self.slot_set_semaphore.consume(reservation, 1).unwrap();
+
+ Ok(request_index)
+ }
+
+ pub fn cancel_request(&mut self, request_index: usize) -> Result<(), ErrorOrUserError> {
+ let state_value = self.requests.get_state_value_mut(request_index)?;
+ let occupied = state_value.as_occupied()?;
+ match &occupied.state {
+ OccupiedState::Pending { .. } => {
+ occupied.state = OccupiedState::Canceled;
+ }
+ OccupiedState::Complete { .. } => {
+ let range = occupied.req.buf().encoded_addr_range();
+ self.bounce_buffer_allocator.deallocate(range);
+ self.requests.free(request_index, ()).unwrap();
+ self.report_current_num_free_current_num_free_requests_slots()?;
+ }
+ _ => {
+ return Err(UserError::RequestStateMismatch.into());
+ }
+ }
+ Ok(())
+ }
+
+ pub fn poll_read_request(
+ &mut self,
+ request_index: usize,
+ buf: &mut [u8],
+ waker: Option,
+ ) -> Result>, ErrorOrUserError> {
+ self.poll_request(request_index, &mut PollRequestBuf::Read { buf }, waker)
+ }
+
+ pub fn poll_write_request(
+ &mut self,
+ request_index: usize,
+ waker: Option,
+ ) -> Result>, ErrorOrUserError> {
+ self.poll_request(request_index, &mut PollRequestBuf::Write, waker)
+ }
+
+ pub fn poll_request(
+ &mut self,
+ request_index: usize,
+ buf: &mut PollRequestBuf,
+ waker: Option