From 85a0a5fb1c8070baefb4c0e89ace6cd07f6f020f Mon Sep 17 00:00:00 2001 From: Martin Hoffmann Date: Mon, 26 Feb 2024 13:41:23 +0100 Subject: [PATCH] =?UTF-8?q?Don=E2=80=99t=20have=20the=20RTR=20listener=20f?= =?UTF-8?q?ail=20if=20a=20socket=20fails=20after=20accept.=20(#937)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR changes the RTR listener to not fail the accept loop when setting up a stream after accepting it fails. Instead it will quietly drop the stream in this case and keep going. As part of this, the PR also drops tokio-stream as a dependency and implements its own listener stream. This PR fixes CVE-2024-1622 reported by Yohei Nishimura, Atsushi Enomoto, Ruka Miyachi; Internet Multifeed Co., Japan. --- Cargo.lock | 1 - Cargo.toml | 1 - src/rtr.rs | 48 +++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b8d50a1d..0df90eda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1127,7 +1127,6 @@ dependencies = [ "tempfile", "tokio", "tokio-rustls", - "tokio-stream", "toml_edit", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index 46a9847b..958ed650 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,6 @@ serde_json = "1.0.57" tempfile = "3.1.0" tokio = { version = "1.24", features = [ "io-util", "macros", "process", "rt", "rt-multi-thread", "signal", "sync" ] } tokio-rustls = "0.24.1" -tokio-stream = { version = "0.1", features = ["net"] } toml_edit = "0.20" uuid = "1.1" routinator-ui = { version = "0.3.4", optional = true } diff --git a/src/rtr.rs b/src/rtr.rs index df9a0e82..005606c2 100644 --- a/src/rtr.rs +++ b/src/rtr.rs @@ -2,12 +2,12 @@ use std::io; use std::future::Future; -use std::net::{TcpListener as StdListener}; +use std::net::{SocketAddr, TcpListener as StdListener}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; -use futures::{pin_mut, StreamExt, TryStreamExt}; +use futures::{pin_mut, Stream}; use futures::future::{pending, select_all}; use log::error; use rpki::rtr::server::{NotifySender, Server, Socket}; @@ -15,7 +15,6 @@ use rpki::rtr::state::State; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::TlsAcceptor; -use tokio_stream::wrappers::TcpListenerStream; use crate::config::Config; use crate::error::ExitError; use crate::metrics::{SharedRtrServerMetrics, RtrClientMetrics}; @@ -118,9 +117,9 @@ async fn single_rtr_listener( } }; let tls = tls.map(TlsAcceptor::from); - let listener = TcpListenerStream::new(listener).and_then(|sock| async { - RtrStream::new(sock, tls.as_ref(), keepalive, server_metrics.clone()) - }).boxed(); + let listener = RtrListener { + tcp: listener, tls, keepalive, server_metrics + }; if let Err(err) = Server::new( listener, sender, origins.clone() ).run().await { @@ -129,6 +128,40 @@ async fn single_rtr_listener( } +//------------ RtrListener -------------------------------------------------- + +/// A wrapper around an TCP listener that produces RTR streams. +struct RtrListener { + tcp: TcpListener, + tls: Option, + keepalive: Option, + server_metrics: SharedRtrServerMetrics, +} + +impl Stream for RtrListener { + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + match self.tcp.poll_accept(ctx) { + Poll::Ready(Ok((sock, addr))) => { + match RtrStream::new( + sock, addr, + self.tls.as_ref(), self.keepalive, + self.server_metrics.clone() + ) { + Ok(stream) => Poll::Ready(Some(Ok(stream))), + Err(_) => Poll::Pending, + } + } + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Pending => Poll::Pending, + } + } +} + //------------ RtrStream ---------------------------------------------------- /// A wrapper around a stream socket that takes care of updating metrics. @@ -141,6 +174,7 @@ impl RtrStream { #[allow(clippy::redundant_async_block)] // False positive fn new( sock: TcpStream, + addr: SocketAddr, tls: Option<&TlsAcceptor>, keepalive: Option, server_metrics: SharedRtrServerMetrics, @@ -148,7 +182,7 @@ impl RtrStream { if let Some(duration) = keepalive { Self::set_keepalive(&sock, duration)? } - let metrics = Arc::new(RtrClientMetrics::new(sock.peer_addr()?.ip())); + let metrics = Arc::new(RtrClientMetrics::new(addr.ip())); let client_metrics = metrics.clone(); tokio::spawn(async move { server_metrics.add_client(client_metrics).await