Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Just use epoll for qcmp #1012

Merged
merged 2 commits into from
Sep 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 53 additions & 228 deletions src/codec/qcmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::{
net::{
phoenix::{DistanceMeasure, Measurement},
DualStackEpollSocket, DualStackLocalSocket,
DualStackEpollSocket,
},
time::{DurationNanos, UtcTimestamp},
};
Expand Down Expand Up @@ -192,248 +192,73 @@ impl Measurement for QcmpMeasurement {
}
}

#[cfg(not(target_os = "linux"))]
pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> crate::Result<()> {
let port = crate::net::socket_port(&socket);

uring_spawn!(uring_span!(tracing::debug_span!("qcmp")), async move {
let mut input_buf = vec![0; 1 << 16];
let socket = DualStackLocalSocket::new(port).unwrap();
let mut output_buf = QcmpPacket::default();

loop {
let result = tokio::select! {
result = socket.recv_from(input_buf) => result,
_ = shutdown_rx.changed() => return,
};
match result {
(Ok((size, source)), new_input_buf) => {
input_buf = new_input_buf;
let received_at = UtcTimestamp::now();
let command = match Protocol::parse(&input_buf[..size]) {
Ok(Some(command)) => command,
Ok(None) => {
tracing::debug!("rejected non-qcmp packet");
continue;
}
Err(error) => {
tracing::debug!(%error, "rejected malformed packet");
continue;
}
};

let Protocol::Ping {
client_timestamp,
nonce,
} = command
else {
tracing::warn!("rejected unsupported QCMP packet");
continue;
};

Protocol::ping_reply(nonce, client_timestamp, received_at)
.encode(&mut output_buf);

tracing::debug!("sending ping reply {:?}", &output_buf.buf[..output_buf.len]);

output_buf = match socket.send_to(output_buf, source).await {
(Ok(_), buf) => buf,
(Err(error), buf) => {
tracing::warn!(%error, "error responding to ping");
buf
}
};
}
(Err(error), new_input_buf) => {
tracing::warn!(%error, "error receiving packet");
input_buf = new_input_buf
}
};
}
});

Ok(())
}

#[cfg(target_os = "linux")]
pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> crate::Result<()> {
use crate::components::proxy::io_uring_shared::EventFd;
use eyre::Context as _;
use tracing::{instrument::WithSubscriber as _, Instrument as _};

let port = crate::net::socket_port(&socket);

// Create an eventfd so we can signal to the qcmp loop when we want to exit
let mut shutdown_event = EventFd::new()?;
let shutdown = shutdown_event.writer();

// Spawn a task on the main loop whose sole purpose is to signal the eventfd
tokio::task::spawn(async move {
let _ = shutdown_rx.changed().await;
shutdown.write(1);
});

let _thread_span = uring_span!(tracing::debug_span!("qcmp").or_current());
let dispatcher = tracing::dispatcher::get_default(|d| d.clone());

std::thread::Builder::new()
.name("qcmp".into())
.spawn(move || -> eyre::Result<()> {
let _guard = tracing::dispatcher::set_default(&dispatcher);

let mut ring = io_uring::IoUring::new(3).context("unable to create io uring")?;
let (submitter, mut sq, mut cq) = ring.split();

const RECV: u64 = 0;
const SEND: u64 = 1;
const SHUTDOWN: u64 = 2;

// Queue the read from the shutdown eventfd used to signal when the loop
// should exit
let entry = shutdown_event.io_uring_entry().user_data(SHUTDOWN);
// SAFETY: the memory being written to is located on the stack inside the shutdown event, and is alive
// at least as long as the uring loop
unsafe {
sq.push(&entry).context("unable to insert io-uring entry")?;
}

// Our loop is simple and only ever processes one ping/pong pair at a time
// so we just reuse the same buffer for both receives and sends
let mut buf = QcmpPacket::default();
// SAFETY: msghdr is POD
let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() };
// SAFETY: msghdr is POD
let addr = unsafe {
socket2::SockAddr::new(
std::mem::zeroed(),
std::mem::size_of::<libc::sockaddr_storage>() as _,
)
};

let mut iov = libc::iovec {
iov_base: buf.buf.as_mut_ptr() as *mut _,
iov_len: 0,
};

msghdr.msg_iov = std::ptr::addr_of_mut!(iov);
msghdr.msg_iovlen = 1;
msghdr.msg_name = addr.as_ptr() as *mut libc::sockaddr_storage as *mut _;
msghdr.msg_namelen = addr.len();

let msghdr_mut = std::ptr::addr_of_mut!(msghdr);

let socket = DualStackLocalSocket::new(port)
.context("failed to create already bound qcmp socket")?;
let socket_fd = socket.raw_fd();

let enqueue_recv =
|sq: &mut io_uring::SubmissionQueue, iov: &mut libc::iovec| -> eyre::Result<()> {
iov.iov_len = MAX_QCMP_PACKET_LEN;
let entry = io_uring::opcode::RecvMsg::new(socket_fd, msghdr_mut)
.build()
.user_data(RECV);
// SAFETY: the memory being written to is located on the stack and outlives the uring loop
unsafe { sq.push(&entry) }.context("unable to insert io-uring entry")?;
Ok(())
};

enqueue_recv(&mut sq, &mut iov)?;

sq.sync();
tokio::task::spawn(
async move {
let mut input_buf = [0u8; MAX_QCMP_PACKET_LEN];
let socket = DualStackEpollSocket::new(port).unwrap();
let mut output_buf = QcmpPacket::default();

loop {
match submitter.submit_and_wait(1) {
Ok(_) => {}
Err(ref err) if err.raw_os_error() == Some(libc::EBUSY) => {}
Err(err) => {
return Err(err).context("failed to submit io-uring operations");
}
}
cq.sync();

let mut has_pending_send = false;
for cqe in &mut cq {
let ret = cqe.result();

match cqe.user_data() {
RECV => {
if ret < 0 {
let error = std::io::Error::from_raw_os_error(-ret).to_string();
tracing::error!(%error, "failed to send QCMP response");
let result = tokio::select! {
result = socket.recv_from(&mut input_buf) => result,
_ = shutdown_rx.changed() => return,
};
match result {
Ok((size, source)) => {
let received_at = UtcTimestamp::now();
let command = match Protocol::parse(&input_buf[..size]) {
Ok(Some(command)) => command,
Ok(None) => {
tracing::debug!("rejected non-qcmp packet");
continue;
}

buf.len = ret as _;
let received_at = UtcTimestamp::now();
let command = match Protocol::parse(&buf) {
Ok(Some(command)) => command,
Ok(None) => {
tracing::debug!("rejected non-QCMP packet");
continue;
}
Err(error) => {
tracing::debug!(%error, "rejected malformed packet");
continue;
}
};

let Protocol::Ping {
client_timestamp,
nonce,
} = command
else {
tracing::warn!("rejected unsupported QCMP packet");
Err(error) => {
tracing::debug!(%error, "rejected malformed packet");
continue;
};

Protocol::ping_reply(nonce, client_timestamp, received_at)
.encode(&mut buf);

tracing::debug!("sending QCMP ping reply");

// Update the iovec with the actual length of the pong
iov.iov_len = buf.len;

// Note we don't have to do anything else with the msghdr
// as the recv has already filled in the socket address
// of the sender, which is also our destination

{
let entry = io_uring::opcode::SendMsg::new(
socket_fd,
std::ptr::addr_of!(msghdr),
)
.build()
.user_data(SEND);
// SAFETY: the memory being read from is located on the stack and outlives the uring loop
if unsafe { sq.push(&entry) }.is_err() {
tracing::error!("failed to enqueue QCMP pong response");
continue;
}
}
};

let Protocol::Ping {
client_timestamp,
nonce,
} = command
else {
tracing::warn!("rejected unsupported QCMP packet");
continue;
};

has_pending_send = true;
}
SEND => {
if ret < 0 {
let error = std::io::Error::from_raw_os_error(-ret).to_string();
tracing::error!(%error, "failed to send QCMP response");
Protocol::ping_reply(nonce, client_timestamp, received_at)
.encode(&mut output_buf);

tracing::debug!(
"sending QCMP pong",
);

match socket.send_to(&output_buf, source).await {
Ok(len) => {
if len != output_buf.len() {
tracing::error!("failed to send entire QCMP pong response, expected {} but only sent {len}", output_buf.len());
}
}
Err(error) => {
tracing::warn!(%error, "error responding to ping");
}
}
SHUTDOWN => {
tracing::info!("QCMP thread was signaled to shutdown");
return Ok(());
}
ud => unreachable!("io-uring user data {ud} is invalid"),
}
}

if !has_pending_send {
enqueue_recv(&mut sq, &mut iov)?;
}

sq.sync();
Err(error) => {
tracing::warn!(%error, "error receiving packet");
}
};
}
})?;
}
.instrument(tracing::debug_span!("qcmp"))
.with_current_subscriber(),
);

Ok(())
}
Expand Down
Loading