Skip to content

Commit

Permalink
deps: add umio as contrib package
Browse files Browse the repository at this point in the history
  • Loading branch information
da2ce7 committed Aug 19, 2024
1 parent 0e26eb7 commit 98f9b45
Show file tree
Hide file tree
Showing 16 changed files with 721 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[workspace]
members = [
"contrib/umio",
"examples/get_metadata",
"examples/simple_torrent",
"packages/bencode",
Expand Down
19 changes: 19 additions & 0 deletions contrib/umio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
authors = ["Andrew <[email protected]>"]
description = "Message Based Readiness API In Rust"
keywords = ["message", "mio", "readyness"]
name = "umio"
readme = "README.md"

categories.workspace = true
documentation.workspace = true
edition.workspace = true
homepage.workspace = true
license.workspace = true
publish.workspace = true

repository.workspace = true
version.workspace = true

[dependencies]
mio = "0.5"
23 changes: 23 additions & 0 deletions contrib/umio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
umio-rs
=======
Message Based Readiness API In Rust.

Thin layer over mio for working with a single udp socket while retaining access to timers and event loop channels.


License
-------

Licensed under either of

* Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
* MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)

at your option.

Contribution
------------

Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any
additional terms or conditions.
87 changes: 87 additions & 0 deletions contrib/umio/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#[allow(clippy::module_name_repetitions)]
pub struct BufferPool {
// Use Stack For Temporal Locality
buffers: Vec<Buffer>,
buffer_size: usize,
}

impl BufferPool {
pub fn new(buffer_size: usize) -> BufferPool {
let buffers = Vec::new();

BufferPool { buffers, buffer_size }
}

pub fn pop(&mut self) -> Buffer {
self.buffers.pop().unwrap_or(Buffer::new(self.buffer_size))
}

pub fn push(&mut self, mut buffer: Buffer) {
buffer.reset_position();

self.buffers.push(buffer);
}
}

//----------------------------------------------------------------------------//

/// Reusable region of memory for incoming and outgoing messages.
pub struct Buffer {
buffer: Vec<u8>,
bytes_written: usize,
}

impl Buffer {
fn new(len: usize) -> Buffer {
Buffer {
buffer: vec![0u8; len],
bytes_written: 0,
}
}

fn reset_position(&mut self) {
self.set_written(0);
}

/// Update the number of bytes written to the buffer.
pub fn set_written(&mut self, bytes: usize) {
self.bytes_written = bytes;
}
}

impl AsRef<[u8]> for Buffer {
fn as_ref(&self) -> &[u8] {
&self.buffer[..self.bytes_written]
}
}

impl AsMut<[u8]> for Buffer {
fn as_mut(&mut self) -> &mut [u8] {
&mut self.buffer[self.bytes_written..]
}
}

#[cfg(test)]
mod tests {
use super::{Buffer, BufferPool};

const DEFAULT_BUFFER_SIZE: usize = 1500;

#[test]
fn positive_buffer_pool_buffer_len() {
let mut buffers = BufferPool::new(DEFAULT_BUFFER_SIZE);
let mut buffer = buffers.pop();

assert_eq!(buffer.as_mut().len(), DEFAULT_BUFFER_SIZE);
assert_eq!(buffer.as_ref().len(), 0);
}

#[test]
fn positive_buffer_len_update() {
let mut buffer = Buffer::new(DEFAULT_BUFFER_SIZE);
buffer.set_written(DEFAULT_BUFFER_SIZE - 1);

assert_eq!(buffer.as_mut().len(), 1);
assert_eq!(buffer.as_ref().len(), DEFAULT_BUFFER_SIZE - 1);
}
}
135 changes: 135 additions & 0 deletions contrib/umio/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use std::collections::VecDeque;
use std::net::SocketAddr;

use mio::udp::UdpSocket;
use mio::{EventLoop, EventSet, Handler, PollOpt, Token};

use crate::buffer::{Buffer, BufferPool};
use crate::{provider, Provider};

/// Handles events occurring within the event loop.
pub trait Dispatcher: Sized {
type Timeout;
type Message: Send;

/// Process an incoming message from the given address.
#[allow(unused)]
fn incoming(&mut self, provider: Provider<'_, Self>, message: &[u8], addr: SocketAddr) {}

/// Process a message sent via the event loop channel.
#[allow(unused)]
fn notify(&mut self, provider: Provider<'_, Self>, message: Self::Message) {}

/// Process a timeout that has been triggered.
#[allow(unused)]
fn timeout(&mut self, provider: Provider<'_, Self>, timeout: Self::Timeout) {}
}

//----------------------------------------------------------------------------//

const UDP_SOCKET_TOKEN: Token = Token(2);

pub struct DispatchHandler<D: Dispatcher> {
dispatch: D,
out_queue: VecDeque<(Buffer, SocketAddr)>,
udp_socket: UdpSocket,
buffer_pool: BufferPool,
current_set: EventSet,
}

impl<D: Dispatcher> DispatchHandler<D> {
pub fn new(
udp_socket: UdpSocket,
buffer_size: usize,
dispatch: D,
event_loop: &mut EventLoop<DispatchHandler<D>>,
) -> DispatchHandler<D> {
let buffer_pool = BufferPool::new(buffer_size);
let out_queue = VecDeque::new();

event_loop
.register(&udp_socket, UDP_SOCKET_TOKEN, EventSet::readable(), PollOpt::edge())
.unwrap();

DispatchHandler {
dispatch,
out_queue,
udp_socket,
buffer_pool,
current_set: EventSet::readable(),
}
}

pub fn handle_write(&mut self) {
if let Some((buffer, addr)) = self.out_queue.pop_front() {
self.udp_socket.send_to(buffer.as_ref(), &addr).unwrap();

self.buffer_pool.push(buffer);
};
}

pub fn handle_read(&mut self) -> Option<(Buffer, SocketAddr)> {
let mut buffer = self.buffer_pool.pop();

if let Ok(Some((bytes, addr))) = self.udp_socket.recv_from(buffer.as_mut()) {
buffer.set_written(bytes);

Some((buffer, addr))
} else {
None
}
}
}

impl<D: Dispatcher> Handler for DispatchHandler<D> {
type Timeout = D::Timeout;
type Message = D::Message;

fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
if token != UDP_SOCKET_TOKEN {
return;
}

if events.is_writable() {
self.handle_write();
}

if events.is_readable() {
let Some((buffer, addr)) = self.handle_read() else {
return;
};

{
let provider = provider::new(&mut self.buffer_pool, &mut self.out_queue, event_loop);

self.dispatch.incoming(provider, buffer.as_ref(), addr);
}

self.buffer_pool.push(buffer);
}
}

fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {
let provider = provider::new(&mut self.buffer_pool, &mut self.out_queue, event_loop);

self.dispatch.notify(provider, msg);
}

fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {
let provider = provider::new(&mut self.buffer_pool, &mut self.out_queue, event_loop);

self.dispatch.timeout(provider, timeout);
}

fn tick(&mut self, event_loop: &mut EventLoop<Self>) {
self.current_set = if self.out_queue.is_empty() {
EventSet::readable()
} else {
EventSet::readable() | EventSet::writable()
};

event_loop
.reregister(&self.udp_socket, UDP_SOCKET_TOKEN, self.current_set, PollOpt::edge())
.unwrap();
}
}
Loading

0 comments on commit 98f9b45

Please sign in to comment.