Skip to content
This repository was archived by the owner on May 17, 2018. It is now read-only.

added UnixSeqpacket and UnixSeqpacketListener. #25

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ readme = "README.md"
keywords = ["posix", "unix", "socket", "domain"]

[dependencies]
libc = "0.2.1"
libc = "0.2.12"

[dev-dependencies]
tempdir = "0.3"
439 changes: 406 additions & 33 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -12,7 +12,6 @@ use std::fmt;
use std::io;
use std::iter::IntoIterator;
use std::mem;
use std::mem::size_of;
use std::net::Shutdown;
use std::os::unix::ffi::OsStrExt;
use std::os::unix::io::{RawFd, AsRawFd, FromRawFd, IntoRawFd};
@@ -164,6 +163,26 @@ impl Inner {
Ok(Some(io::Error::from_raw_os_error(errno)))
}
}

pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
unsafe {
let count = try!(cvt_s(libc::recv(self.0,
buf.as_mut_ptr() as *mut _,
buf.len(),
0)));
Ok(count as usize)
}
}

pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
unsafe {
let count = try!(cvt_s(libc::send(self.0,
buf.as_ptr() as *const _,
buf.len(),
0)));
Ok(count as usize)
}
}
}

unsafe fn sockaddr_un<P: AsRef<Path>>(path: P) -> io::Result<(libc::sockaddr_un, libc::socklen_t)> {
@@ -455,10 +474,7 @@ impl io::Read for UnixStream {

impl<'a> io::Read for &'a UnixStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
unsafe {
cvt_s(libc::recv(self.inner.0, buf.as_mut_ptr() as *mut _, buf.len(), 0))
.map(|r| r as usize)
}
self.inner.recv(buf)
}
}

@@ -474,10 +490,7 @@ impl io::Write for UnixStream {

impl<'a> io::Write for &'a UnixStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
unsafe {
cvt_s(libc::send(self.inner.0, buf.as_ptr() as *const _, buf.len(), 0))
.map(|r| r as usize)
}
self.inner.send(buf)
}

fn flush(&mut self) -> io::Result<()> {
@@ -505,15 +518,183 @@ impl IntoRawFd for UnixStream {
}
}

/// A structure representing a Unix domain socket server.

/// A structure representing a Unix domain seqpacket socket server.
///
/// # Examples
///
/// ```rust,no_run
/// use std::thread;
/// use unix_socket::{UnixSeqpacket, UnixSeqpacketListener};
///
/// fn handle_client(_stream: UnixSeqpacket) {
/// // ...
/// }
///
/// let listener = UnixSeqpacketListener::bind("/path/to/the/socket").unwrap();
///
/// // accept connections and process them, spawning a new thread for each one
/// for sock in listener.incoming() {
/// match sock {
/// Ok(sock) => {
/// /* connection succeeded */
/// thread::spawn(|| handle_client(sock));
/// }
/// Err(_err) => {
/// /* connection failed */
/// break;
/// }
/// }
/// }
///
/// // close the listener socket
/// drop(listener);
/// ```
pub struct UnixSeqpacketListener {
inner: Inner,
}

impl fmt::Debug for UnixSeqpacketListener {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let mut builder = fmt.debug_struct("UnixSeqpacketListener");
builder.field("fd", &self.inner.0);
if let Ok(addr) = self.local_addr() {
builder.field("local", &addr);
}
builder.finish()
}
}

impl UnixSeqpacketListener {
/// Creates a new `UnixSeqpacketListener` bound to the specified socket.
///
/// Linux provides, as a nonportable extension, a separate "abstract"
/// address namespace as opposed to filesystem-based addressing. If `path`
/// begins with a null byte, it will be interpreted as an "abstract"
/// address. Otherwise, it will be interpreted as a "pathname" address,
/// corresponding to a path on the filesystem.
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixSeqpacketListener> {
unsafe {
let inner = try!(Inner::new(libc::SOCK_SEQPACKET));
let (addr, len) = try!(sockaddr_un(path));

try!(cvt(libc::bind(inner.0, &addr as *const _ as *const _, len)));
try!(cvt(libc::listen(inner.0, 128)));

Ok(UnixSeqpacketListener { inner: inner })
}
}

/// Accepts a new incoming connection to this listener.
///
/// This function will block the calling thread until a new Unix connection
/// is established. When established, the corersponding `UnixSeqpacket` and
/// the remote peer's address will be returned.
pub fn accept(&self) -> io::Result<(UnixSeqpacket, SocketAddr)> {
unsafe {
let mut fd = 0;
let addr = try!(SocketAddr::new(|addr, len| {
fd = libc::accept(self.inner.0, addr, len);
fd
}));

Ok((UnixSeqpacket { inner: Inner(fd) }, addr))
}
}

/// Creates a new independently owned handle to the underlying socket.
///
/// The returned `UnixSeqpacketListener` is a reference to the same socket that this
/// object references. Both handles can be used to accept incoming
/// connections and options set on one listener will affect the other.
pub fn try_clone(&self) -> io::Result<UnixSeqpacketListener> {
Ok(UnixSeqpacketListener { inner: try!(self.inner.try_clone()) })
}

/// Returns the local socket address of this listener.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
SocketAddr::new(|addr, len| unsafe { libc::getsockname(self.inner.0, addr, len) })
}

/// Moves the socket into or out of nonblocking mode.
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
self.inner.set_nonblocking(nonblocking)
}

/// Returns the value of the `SO_ERROR` option.
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.inner.take_error()
}

/// Returns an iterator over incoming connections.
///
/// The iterator will never return `None` and will also not yield the
/// peer's `SocketAddr` structure.
pub fn incoming<'a>(&'a self) -> IncomingSeqpacket<'a> {
IncomingSeqpacket { listener: self }
}
}

impl AsRawFd for UnixSeqpacketListener {
fn as_raw_fd(&self) -> RawFd {
self.inner.0
}
}

impl FromRawFd for UnixSeqpacketListener {
unsafe fn from_raw_fd(fd: RawFd) -> UnixSeqpacketListener {
UnixSeqpacketListener { inner: Inner(fd) }
}
}

impl IntoRawFd for UnixSeqpacketListener {
fn into_raw_fd(self) -> RawFd {
let fd = self.inner.0;
mem::forget(self);
fd
}
}

impl<'a> IntoIterator for &'a UnixSeqpacketListener {
type Item = io::Result<UnixSeqpacket>;
type IntoIter = IncomingSeqpacket<'a>;

fn into_iter(self) -> IncomingSeqpacket<'a> {
self.incoming()
}
}

/// An iterator over incoming connections to a `UnixSeqpacketListener`.
///
/// It will never return `None`.
#[derive(Debug)]
pub struct IncomingSeqpacket<'a> {
listener: &'a UnixSeqpacketListener,
}

impl<'a> Iterator for IncomingSeqpacket<'a> {
type Item = io::Result<UnixSeqpacket>;

fn next(&mut self) -> Option<io::Result<UnixSeqpacket>> {
Some(self.listener.accept().map(|s| s.0))
}

fn size_hint(&self) -> (usize, Option<usize>) {
(usize::max_value(), None)
}
}



/// A structure representing a Unix domain stream socket server.
///
/// # Examples
///
/// ```rust,no_run
/// use std::thread;
/// use unix_socket::{UnixStream, UnixListener};
///
/// fn handle_client(stream: UnixStream) {
/// fn handle_client(_stream: UnixStream) {
/// // ...
/// }
///
@@ -526,7 +707,7 @@ impl IntoRawFd for UnixStream {
/// /* connection succeeded */
/// thread::spawn(|| handle_client(stream));
/// }
/// Err(err) => {
/// Err(_err) => {
/// /* connection failed */
/// break;
/// }
@@ -616,8 +797,8 @@ impl UnixListener {
///
/// The iterator will never return `None` and will also not yield the
/// peer's `SocketAddr` structure.
pub fn incoming<'a>(&'a self) -> Incoming<'a> {
Incoming { listener: self }
pub fn incoming<'a>(&'a self) -> IncomingStream<'a> {
IncomingStream { listener: self }
}
}

@@ -643,9 +824,9 @@ impl IntoRawFd for UnixListener {

impl<'a> IntoIterator for &'a UnixListener {
type Item = io::Result<UnixStream>;
type IntoIter = Incoming<'a>;
type IntoIter = IncomingStream<'a>;

fn into_iter(self) -> Incoming<'a> {
fn into_iter(self) -> IncomingStream<'a> {
self.incoming()
}
}
@@ -654,11 +835,11 @@ impl<'a> IntoIterator for &'a UnixListener {
///
/// It will never return `None`.
#[derive(Debug)]
pub struct Incoming<'a> {
pub struct IncomingStream<'a> {
listener: &'a UnixListener,
}

impl<'a> Iterator for Incoming<'a> {
impl<'a> Iterator for IncomingStream<'a> {
type Item = io::Result<UnixStream>;

fn next(&mut self) -> Option<io::Result<UnixStream>> {
@@ -800,13 +981,7 @@ impl UnixDatagram {
///
/// On success, returns the number of bytes read.
pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
unsafe {
let count = try!(cvt_s(libc::recv(self.inner.0,
buf.as_mut_ptr() as *mut _,
buf.len(),
0)));
Ok(count as usize)
}
self.inner.recv(buf)
}

/// Sends data on the socket to the specified address.
@@ -833,13 +1008,7 @@ impl UnixDatagram {
///
/// On success, returns the number of bytes written.
pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
unsafe {
let count = try!(cvt_s(libc::send(self.inner.0,
buf.as_ptr() as *const _,
buf.len(),
0)));
Ok(count as usize)
}
self.inner.send(buf)
}

/// Sets the read timeout for the socket.
@@ -910,6 +1079,177 @@ impl IntoRawFd for UnixDatagram {
}
}

/// A Unix seqpacket socket.
///
/// A Unix Seqpacket socket is connection oriented but sends and receives
/// datagrams with guaranteed ordering.
///
/// # Examples
///
/// ```rust,no_run
/// use unix_socket::UnixSeqpacket;
///
/// let path = "/path/to/my/socket";
/// let socket = UnixSeqpacket::connect(path).unwrap();
/// let _count = socket.send(b"hello world").unwrap();
/// let mut buf = [0; 100];
/// let count = socket.recv(&mut buf).unwrap();
/// println!("socket {:?} sent {:?}", path, &buf[..count]);
/// ```
pub struct UnixSeqpacket {
inner: Inner,
}

impl fmt::Debug for UnixSeqpacket {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let mut builder = fmt.debug_struct("UnixSeqpacket");
builder.field("fd", &self.inner.0);
if let Ok(addr) = self.local_addr() {
builder.field("local", &addr);
}
if let Ok(addr) = self.peer_addr() {
builder.field("peer", &addr);
}
builder.finish()
}
}

impl UnixSeqpacket {
/// Connects to the socket named by `path`.
///
/// Linux provides, as a nonportable extension, a separate "abstract"
/// address namespace as opposed to filesystem-based addressing. If `path`
/// begins with a null byte, it will be interpreted as an "abstract"
/// address. Otherwise, it will be interpreted as a "pathname" address,
/// corresponding to a path on the filesystem.
pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixSeqpacket> {
unsafe {
let inner = try!(Inner::new(libc::SOCK_SEQPACKET));
let (addr, len) = try!(sockaddr_un(path));

let ret = libc::connect(inner.0, &addr as *const _ as *const _, len);
if ret < 0 {
Err(io::Error::last_os_error())
} else {
Ok(UnixSeqpacket { inner: inner })
}
}
}

/// Create an unnamed pair of connected sockets.
///
/// Returns two `UnixSeqpackets`s which are connected to each other.
pub fn pair() -> io::Result<(UnixSeqpacket, UnixSeqpacket)> {
let (i1, i2) = try!(Inner::new_pair(libc::SOCK_SEQPACKET));
Ok((UnixSeqpacket { inner: i1 }, UnixSeqpacket { inner: i2 }))
}

/// Creates a new independently owned handle to the underlying socket.
///
/// The returned `UnixListener` is a reference to the same socket that this
/// object references. Both handles can be used to accept incoming
/// connections and options set on one listener will affect the other.
pub fn try_clone(&self) -> io::Result<UnixSeqpacket> {
Ok(UnixSeqpacket { inner: try!(self.inner.try_clone()) })
}

/// Returns the address of this socket.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
SocketAddr::new(|addr, len| unsafe { libc::getsockname(self.inner.0, addr, len) })
}

/// Returns the address of this socket's peer.
///
/// Returns the SocketAddr (path) of the peer of this connected socket
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
SocketAddr::new(|addr, len| unsafe { libc::getpeername(self.inner.0, addr, len) })
}

/// Receives data from the socket from the connected peer.
///
/// On success, returns the number of bytes read.
pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.recv(buf)
}

/// Sends data on the socket to the socket's peer.
///
/// will return an error if the socket has not already been connected.
///
/// On success, returns the number of bytes written.
pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.inner.send(buf)
}

/// Sets the read timeout for the socket.
///
/// If the provided value is `None`, then `recv` and `recv_from` calls will
/// block indefinitely. It is an error to pass the zero `Duration` to this
/// method.
pub fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.inner.set_timeout(timeout, libc::SO_RCVTIMEO)
}

/// Sets the write timeout for the socket.
///
/// If the provided value is `None`, then `send` and `send_to` calls will
/// block indefinitely. It is an error to pass the zero `Duration` to this
/// method.
pub fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
self.inner.set_timeout(timeout, libc::SO_SNDTIMEO)
}

/// Returns the read timeout of this socket.
pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
self.inner.timeout(libc::SO_RCVTIMEO)
}

/// Returns the write timeout of this socket.
pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
self.inner.timeout(libc::SO_SNDTIMEO)
}

/// Moves the socket into or out of nonblocking mode.
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
self.inner.set_nonblocking(nonblocking)
}

/// Returns the value of the `SO_ERROR` option.
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.inner.take_error()
}

/// Shut down the read, write, or both halves of this connection.
///
/// This function will cause all pending and future I/O calls on the
/// specified portions to immediately return with an appropriate value
/// (see the documentation of `Shutdown`).
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.inner.shutdown(how)
}
}

impl AsRawFd for UnixSeqpacket {
fn as_raw_fd(&self) -> RawFd {
self.inner.0
}
}

impl FromRawFd for UnixSeqpacket {
unsafe fn from_raw_fd(fd: RawFd) -> UnixSeqpacket {
UnixSeqpacket { inner: Inner(fd) }
}
}

impl IntoRawFd for UnixSeqpacket {
fn into_raw_fd(self) -> RawFd {
let fd = self.inner.0;
mem::forget(self);
fd
}
}


#[cfg(test)]
mod test {
extern crate tempdir;
@@ -933,7 +1273,7 @@ mod test {
}

#[test]
fn basic() {
fn basic_stream() {
let dir = or_panic!(TempDir::new("unix_socket"));
let socket_path = dir.path().join("sock");
let msg1 = b"hello";
@@ -960,6 +1300,39 @@ mod test {
thread.join().unwrap();
}

#[test]
fn basic_seqpacket() {
let dir = or_panic!(TempDir::new("unix_socket"));
let socket_path = dir.path().join("sock");
let msg1 = b"hello";
let msg2 = b"world!";

let listener = or_panic!(UnixSeqpacketListener::bind(&socket_path));
let thread = thread::spawn(move || {
let stream = or_panic!(listener.accept()).0;
let mut buf = [0; 5];
let res = or_panic!(stream.recv(&mut buf));
println!("recv in thread result was {}", res);
assert_eq!(&msg1[..], &buf[..]);
let res = or_panic!(stream.send(msg2));
println!("send in thread result was {}", res);
});

let stream = or_panic!(UnixSeqpacket::connect(&socket_path));
assert_eq!(Some(&*socket_path),
stream.peer_addr().unwrap().as_pathname());
let res = or_panic!(stream.send(msg1));
println!("outer send result was {}", res);
let mut buf = vec![0,0,0,0,0,0];
let res = or_panic!(stream.recv(&mut buf));
println!("outer recv result was {}", res);
assert_eq!(&msg2[..], &buf[..]);
drop(stream);

thread.join().unwrap();
}


#[test]
fn pair() {
let msg1 = b"hello";