Skip to content

Commit

Permalink
cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
o0Ignition0o committed Jul 19, 2020
1 parent da22c57 commit 6a342be
Show file tree
Hide file tree
Showing 21 changed files with 527 additions and 350 deletions.
8 changes: 4 additions & 4 deletions examples/fread.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use nuclei::*;
use std::io;
use std::time::Duration;
use std::fs::{File, OpenOptions};
use std::io;
use std::path::PathBuf;
use std::time::Duration;
use std::time::Duration;

use futures::AsyncRead;
use futures_util::io::AsyncReadExt;


fn main() -> io::Result<()> {
let x = drive(async {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
Expand All @@ -25,4 +25,4 @@ fn main() -> io::Result<()> {
println!("Length of file is {}", x.len());

Ok(())
}
}
6 changes: 3 additions & 3 deletions examples/h1-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use nuclei::*;
use std::net::TcpListener;

use anyhow::Result;
use futures::prelude::*;
use http_types::{Request, Response, StatusCode};
use async_dup::Arc;
use futures::pending;
use futures::prelude::*;
use http_types::{Request, Response, StatusCode};

/// Serves a request and returns a response.
async fn serve(req: Request) -> http_types::Result<Response> {
Expand All @@ -30,7 +30,7 @@ async fn listen(listener: Handle<TcpListener>) -> Result<()> {
// Spawn a background task serving this connection.
let stream = Arc::new(stream);
spawn(async move {
if let Err(err) = async_h1::accept( stream, serve).await {
if let Err(err) = async_h1::accept(stream, serve).await {
println!("Connection error: {:#?}", err);
}
});
Expand Down
4 changes: 2 additions & 2 deletions examples/tcp-server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::io;
use nuclei::*;
use std::net::{TcpListener, TcpStream};
use futures::io;

async fn echo(stream: Handle<TcpStream>) -> io::Result<()> {
io::copy(&stream, &mut &stream).await?;
Expand All @@ -23,4 +23,4 @@ fn main() -> io::Result<()> {
spawn_blocking(|| echo(stream));
}
})
}
}
1 change: 1 addition & 0 deletions src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct Handle<T> {
#[cfg(unix)]
pub(crate) chan: Option<CompletionChan>,
/// File operation storage
#[cfg(unix)]
pub(crate) store_file: Option<StoreFile>,
/// Completion callback for read
pub(crate) read: Arc<TTas<Option<AsyncOp<usize>>>>,
Expand Down
9 changes: 4 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
mod handle;
mod proactor;
mod runtime;
mod submission_handler;
mod async_io;
mod sys;
mod waker;
mod proactor;
mod utils;
mod waker;

#[cfg(not(any(
target_os = "linux", // epoll, iouring
Expand All @@ -20,7 +20,6 @@ mod utils;
)))]
compile_error!("Target OS is not supported");


#[cfg(any(
target_os = "macos",
target_os = "ios",
Expand All @@ -46,5 +45,5 @@ mod syscore {
pub(crate) use windows::*;
}

pub use agnostik::*;
pub use proactor::*;
pub use agnostik::*;
25 changes: 14 additions & 11 deletions src/submission_handler.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
use futures::io::{
AsyncRead, AsyncWrite, AsyncBufRead, AsyncSeek
};
use std::marker::PhantomData as marker;
use std::{task::{Context, Poll}, io, pin::Pin, future::Future, ops::{DerefMut, Deref}};
use super::handle::{Handle, HandleOpRegisterer};

use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
use std::marker::PhantomData as marker;
use std::{
future::Future,
io,
ops::{Deref, DerefMut},
pin::Pin,
task::{Context, Poll},
};

pub struct SubmissionHandler<T>(marker<T>)
where
T: Unpin;
T: Unpin;

impl<T> SubmissionHandler<T>
where
T: Unpin + HandleOpRegisterer
T: Unpin + HandleOpRegisterer,
{
pub fn handle_read(
handle: Pin<&mut T>,
cx: &mut Context,
completion_dispatcher: impl Future<Output=io::Result<usize>> + 'static
completion_dispatcher: impl Future<Output = io::Result<usize>> + 'static,
) -> Poll<io::Result<usize>> {
let handle = handle.get_mut();
let read_result = handle.read_registerer();
Expand Down Expand Up @@ -46,7 +49,7 @@ where
pub fn handle_write(
handle: Pin<&mut T>,
cx: &mut Context,
completion_dispatcher: impl Future<Output=io::Result<usize>> + 'static
completion_dispatcher: impl Future<Output = io::Result<usize>> + 'static,
) -> Poll<io::Result<usize>> {
let handle = handle.get_mut();
let write_result = handle.write_registerer();
Expand All @@ -71,4 +74,4 @@ where

poll
}
}
}
57 changes: 29 additions & 28 deletions src/syscore/bsd/kqueue.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::mem::MaybeUninit;
use std::io::{self, Read, Write};
use std::os::unix::io::{AsRawFd, RawFd};
use std::{fs::File, os::unix::net::UnixStream, collections::HashMap, time::Duration};
use crate::sys::event::{kevent_ts, kqueue, KEvent};
use futures::channel::oneshot;
use lever::prelude::*;
use pin_utils::unsafe_pinned;
use std::future::Future;
use std::io::{self, Read, Write};
use std::mem::MaybeUninit;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::task::{Context, Poll};
use lever::prelude::*;
use std::{collections::HashMap, fs::File, os::unix::net::UnixStream, time::Duration};

macro_rules! syscall {
($fn:ident $args:tt) => {{
Expand All @@ -25,8 +25,8 @@ macro_rules! syscall {
///////////////////

use socket2::SockAddr;
use std::os::unix::net::{SocketAddr as UnixSocketAddr};
use std::mem;
use std::os::unix::net::SocketAddr as UnixSocketAddr;

fn max_len() -> usize {
// The maximum read limit on most posix-like systems is `SSIZE_MAX`,
Expand All @@ -44,18 +44,22 @@ fn max_len() -> usize {
}
}

pub(crate) fn shim_recv_from<A: AsRawFd>(fd: A, buf: &mut [u8], flags: libc::c_int) -> io::Result<(usize, SockAddr)> {
pub(crate) fn shim_recv_from<A: AsRawFd>(
fd: A,
buf: &mut [u8],
flags: libc::c_int,
) -> io::Result<(usize, SockAddr)> {
let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
let mut addrlen = mem::size_of_val(&storage) as libc::socklen_t;

let n = syscall!(recvfrom(
fd.as_raw_fd() as _,
buf.as_mut_ptr() as *mut libc::c_void,
std::cmp::min(buf.len(), max_len()),
flags,
&mut storage as *mut _ as *mut _,
&mut addrlen,
))?;
fd.as_raw_fd() as _,
buf.as_mut_ptr() as *mut libc::c_void,
std::cmp::min(buf.len(), max_len()),
flags,
&mut storage as *mut _ as *mut _,
&mut addrlen,
))?;
let addr = unsafe { SockAddr::from_raw_parts(&storage as *const _ as *const _, addrlen) };
Ok((n as usize, addr))
}
Expand All @@ -78,7 +82,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result<UnixSocketAddr>
let abst_sock_ident: libc::c_char = unsafe {
std::slice::from_raw_parts(
&addr.sun_path as *const _ as *const u8,
mem::size_of::<libc::c_char>()
mem::size_of::<libc::c_char>(),
)
}[1] as libc::c_char;

Expand All @@ -88,15 +92,15 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result<UnixSocketAddr>
// https://man7.org/linux/man-pages/man7/unix.7.html
(sa, 0) if sa != 0 && sa > mem::size_of::<libc::sa_family_t>() as libc::socklen_t => {
len = mem::size_of::<libc::sa_family_t>() as libc::socklen_t;
},
}
// If unnamed socket, then addr is always zero,
// assign the offset reserved difference as length.
(0, _) => {
let base = &addr as *const _ as usize;
let path = &addr.sun_path as *const _ as usize;
let sun_path_offset = path - base;
len = sun_path_offset as libc::socklen_t;
},
}

// Discard rest, they are not special.
(_, _) => {}
Expand All @@ -109,7 +113,7 @@ pub(crate) fn shim_to_af_unix(sockaddr: &SockAddr) -> io::Result<UnixSocketAddr>
std::ptr::copy_nonoverlapping(
sockaddr.as_ptr(),
&mut init as *mut _ as *mut _,
len as usize
len as usize,
);

// Safety: We've written the init addr above.
Expand Down Expand Up @@ -142,7 +146,7 @@ pub struct SysProactor {
registered: TTas<HashMap<RawFd, usize>>,

/// Hashmap for holding interested concrete completion callbacks
completions: TTas<HashMap<RawFd, CompletionList>>
completions: TTas<HashMap<RawFd, CompletionList>>,
}

impl SysProactor {
Expand All @@ -157,7 +161,7 @@ impl SysProactor {
read_stream: TTas::new(read_stream),
write_stream,
registered: TTas::new(HashMap::new()),
completions: TTas::new(HashMap::new())
completions: TTas::new(HashMap::new()),
};

let mut rs = proactor.read_stream.lock();
Expand Down Expand Up @@ -232,12 +236,13 @@ impl SysProactor {
});

let mut events: Vec<KEvent> = Vec::with_capacity(max_event_size);
events.resize(max_event_size, unsafe { MaybeUninit::zeroed().assume_init() });
events.resize(max_event_size, unsafe {
MaybeUninit::zeroed().assume_init()
});
let mut events: Box<[KEvent]> = events.into_boxed_slice();

// dbg!("SENDING EVENT");
let res =
kevent_ts(self.kqueue_fd, &[], &mut events, timeout)? as isize;
let res = kevent_ts(self.kqueue_fd, &[], &mut events, timeout)? as isize;
// dbg!(res);
// dbg!("EVENT FINISH");

Expand Down Expand Up @@ -304,9 +309,7 @@ impl SysProactor {
}

let (tx, rx) = oneshot::channel();
let comp = completions
.entry(fd)
.or_insert(Vec::new());
let comp = completions.entry(fd).or_insert(Vec::new());

comp.push((evts, tx));

Expand Down Expand Up @@ -356,11 +359,9 @@ impl SysProactor {
if ack_removal {
completions.remove(&fd);
}

}
}


//////////////////////////////
//////////////////////////////

Expand Down
2 changes: 1 addition & 1 deletion src/syscore/bsd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod fs;
mod kqueue;
mod processor;
mod nethandle;
mod processor;

pub(crate) use fs::*;
pub(crate) use kqueue::*;
Expand Down
Loading

0 comments on commit 6a342be

Please sign in to comment.