diff --git a/iroh/examples/dht_discovery.rs b/iroh/examples/dht_discovery.rs index 964564b6ba..c514474d76 100644 --- a/iroh/examples/dht_discovery.rs +++ b/iroh/examples/dht_discovery.rs @@ -11,7 +11,7 @@ use std::str::FromStr; use clap::Parser; -use iroh::{endpoint::get_remote_node_id, Endpoint, NodeId}; +use iroh::{Endpoint, NodeId}; use tracing::warn; use url::Url; @@ -88,7 +88,7 @@ async fn chat_server(args: Args) -> anyhow::Result<()> { }; tokio::spawn(async move { let connection = connecting.await?; - let remote_node_id = get_remote_node_id(&connection)?; + let remote_node_id = connection.remote_node_id()?; println!("got connection from {}", remote_node_id); // just leave the tasks hanging. this is just an example. let (mut writer, mut reader) = connection.accept_bi().await?; diff --git a/iroh/examples/echo.rs b/iroh/examples/echo.rs index e18aee8949..89bd643a34 100644 --- a/iroh/examples/echo.rs +++ b/iroh/examples/echo.rs @@ -77,7 +77,7 @@ impl ProtocolHandler for Echo { // Wait for the connection to be fully established. let connection = connecting.await?; // We can get the remote's node id from the connection. - let node_id = iroh::endpoint::get_remote_node_id(&connection)?; + let node_id = connection.remote_node_id()?; println!("accepted connection from {node_id}"); // Our protocol is a simple request-response protocol, so we expect the diff --git a/iroh/examples/listen-unreliable.rs b/iroh/examples/listen-unreliable.rs index 06d24a76ab..5028ca520a 100644 --- a/iroh/examples/listen-unreliable.rs +++ b/iroh/examples/listen-unreliable.rs @@ -69,7 +69,7 @@ async fn main() -> anyhow::Result<()> { }; let alpn = connecting.alpn().await?; let conn = connecting.await?; - let node_id = iroh::endpoint::get_remote_node_id(&conn)?; + let node_id = conn.remote_node_id()?; info!( "new (unreliable) connection from {node_id} with ALPN {} (coming from {})", String::from_utf8_lossy(&alpn), diff --git a/iroh/examples/listen.rs b/iroh/examples/listen.rs index 13413992dd..fb93e5342d 100644 --- a/iroh/examples/listen.rs +++ b/iroh/examples/listen.rs @@ -70,7 +70,7 @@ async fn main() -> anyhow::Result<()> { }; let alpn = connecting.alpn().await?; let conn = connecting.await?; - let node_id = iroh::endpoint::get_remote_node_id(&conn)?; + let node_id = conn.remote_node_id()?; info!( "new connection from {node_id} with ALPN {} (coming from {})", String::from_utf8_lossy(&alpn), diff --git a/iroh/examples/search.rs b/iroh/examples/search.rs index 5223aded9b..d60b629038 100644 --- a/iroh/examples/search.rs +++ b/iroh/examples/search.rs @@ -35,7 +35,7 @@ use anyhow::Result; use clap::Parser; use futures_lite::future::Boxed as BoxedFuture; use iroh::{ - endpoint::{get_remote_node_id, Connecting}, + endpoint::Connecting, protocol::{ProtocolHandler, Router}, Endpoint, NodeId, }; @@ -134,7 +134,7 @@ impl ProtocolHandler for BlobSearch { // Wait for the connection to be fully established. let connection = connecting.await?; // We can get the remote's node id from the connection. - let node_id = get_remote_node_id(&connection)?; + let node_id = connection.remote_node_id()?; println!("accepted connection from {node_id}"); // Our protocol is a simple request-response protocol, so we expect the diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index 6b611b71ba..4c037ff30f 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -179,7 +179,7 @@ async fn provide( } }; let conn = connecting.await?; - let node_id = iroh::endpoint::get_remote_node_id(&conn)?; + let node_id = conn.remote_node_id()?; info!( "new connection from {node_id} with ALPN {} (coming from {})", String::from_utf8_lossy(TRANSFER_ALPN), diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 54a8fa2e12..54605d174f 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -23,7 +23,7 @@ use std::{ }; use anyhow::{bail, Context, Result}; -use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey}; +use iroh_base::{NodeAddr, NodeId, RelayUrl, SecretKey}; use iroh_relay::RelayMap; use pin_project::pin_project; use tracing::{debug, instrument, trace, warn}; @@ -43,7 +43,7 @@ mod rtt_actor; // Missing still: SendDatagram and ConnectionClose::frame_type's Type. pub use quinn::{ - AcceptBi, AcceptUni, AckFrequencyConfig, ApplicationClose, Chunk, ClosedStream, Connection, + AcceptBi, AcceptUni, AckFrequencyConfig, ApplicationClose, Chunk, ClosedStream, ConnectionClose, ConnectionError, ConnectionStats, MtuDiscoveryConfig, OpenBi, OpenUni, ReadDatagram, ReadError, ReadExactError, ReadToEndError, RecvStream, ResetError, RetryError, SendDatagramError, SendStream, ServerConfig, StoppedError, StreamId, TransportConfig, VarInt, @@ -212,6 +212,8 @@ impl Builder { /// also its [`NodeId`] /// /// If not set, a new secret key will be generated. + /// + /// [`PublicKey`]: iroh_base::PublicKey pub fn secret_key(mut self, secret_key: SecretKey) -> Self { self.secret_key = Some(secret_key); self @@ -713,7 +715,7 @@ impl Endpoint { warn!("rtt-actor not reachable: {err:#}"); } debug!("Connection established"); - Ok(connection) + Ok(Connection { inner: connection }) } /// Accepts an incoming connection on the endpoint. @@ -750,8 +752,8 @@ impl Endpoint { /// /// # Errors /// - /// Will return an error if we attempt to add our own [`PublicKey`] to the node map or if the - /// direct addresses are a subset of ours. + /// Will return an error if we attempt to add our own [`NodeId`] to the node map or + /// if the direct addresses are a subset of ours. /// /// [`StaticProvider`]: crate::discovery::static_provider::StaticProvider pub fn add_node_addr(&self, node_addr: NodeAddr) -> Result<()> { @@ -773,8 +775,8 @@ impl Endpoint { /// /// # Errors /// - /// Will return an error if we attempt to add our own [`PublicKey`] to the node map or if the - /// direct addresses are a subset of ours. + /// Will return an error if we attempt to add our own [`NodeId`] to the node map or + /// if the direct addresses are a subset of ours. /// /// [`StaticProvider`]: crate::discovery::static_provider::StaticProvider pub fn add_node_addr_with_source( @@ -1237,14 +1239,15 @@ pub struct IncomingFuture { } impl Future for IncomingFuture { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { let this = self.project(); match this.inner.poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => Poll::Ready(Err(err)), - Poll::Ready(Ok(conn)) => { + Poll::Ready(Ok(inner)) => { + let conn = Connection { inner }; try_send_rtt_msg(&conn, this.ep); Poll::Ready(Ok(conn)) } @@ -1265,7 +1268,8 @@ impl Connecting { /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security. pub fn into_0rtt(self) -> Result<(Connection, ZeroRttAccepted), Self> { match self.inner.into_0rtt() { - Ok((conn, zrtt_accepted)) => { + Ok((inner, zrtt_accepted)) => { + let conn = Connection { inner }; try_send_rtt_msg(&conn, &self.ep); Ok((conn, zrtt_accepted)) } @@ -1311,7 +1315,8 @@ impl Future for Connecting { match this.inner.poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => Poll::Ready(Err(err)), - Poll::Ready(Ok(conn)) => { + Poll::Ready(Ok(inner)) => { + let conn = Connection { inner }; try_send_rtt_msg(&conn, this.ep); Poll::Ready(Ok(conn)) } @@ -1319,25 +1324,333 @@ impl Future for Connecting { } } -/// Extract the [`PublicKey`] from the peer's TLS certificate. -// TODO: make this a method now -pub fn get_remote_node_id(connection: &Connection) -> Result { - let data = connection.peer_identity(); - match data { - None => bail!("no peer certificate found"), - Some(data) => match data.downcast::>() { - Ok(certs) => { - if certs.len() != 1 { - bail!( - "expected a single peer certificate, but {} found", - certs.len() - ); +/// A QUIC connection. +/// +/// If all references to a connection (including every clone of the Connection handle, +/// streams of incoming streams, and the various stream types) have been dropped, then the +/// connection will be automatically closed with an error_code of 0 and an empty reason. You +/// can also close the connection explicitly by calling [`Connection::close`]. +/// +/// Closing the connection immediately abandons efforts to deliver data to the peer. Upon +/// receiving CONNECTION_CLOSE the peer may drop any stream data not yet delivered to the +/// application. [`Connection::close`] describes in more detail how to gracefully close a +/// connection without losing application data. +/// +/// May be cloned to obtain another handle to the same connection. +// This has repr(transparent) as it opens the door to potentially allow casting it to a +// quinn::Connection in the future. Right now however that'd be iroh_quinn::Connection. +#[derive(Debug, Clone)] +#[repr(transparent)] +pub struct Connection { + inner: quinn::Connection, +} + +impl Connection { + /// Initiates a new outgoing unidirectional stream. + /// + /// Streams are cheap and instantaneous to open unless blocked by flow control. As a + /// consequence, the peer won’t be notified that a stream has been opened until the + /// stream is actually used. + #[inline] + pub fn open_uni(&self) -> OpenUni<'_> { + self.inner.open_uni() + } + + /// Initiates a new outgoing bidirectional stream. + /// + /// Streams are cheap and instantaneous to open unless blocked by flow control. As a + /// consequence, the peer won't be notified that a stream has been opened until the + /// stream is actually used. Calling [`open_bi`] then waiting on the [`RecvStream`] + /// without writing anything to [`SendStream`] will never succeed. + /// + /// [`open_bi`]: Connection::open_bi + #[inline] + pub fn open_bi(&self) -> OpenBi<'_> { + self.inner.open_bi() + } + + /// Accepts the next incoming uni-directional stream. + #[inline] + pub fn accept_uni(&self) -> AcceptUni<'_> { + self.inner.accept_uni() + } + + /// Accept the next incoming bidirectional stream. + /// + /// **Important Note**: The peer that calls [`open_bi`] must write to its [`SendStream`] + /// before the peer `Connection` is able to accept the stream using + /// `accept_bi()`. Calling [`open_bi`] then waiting on the [`RecvStream`] without + /// writing anything to the connected [`SendStream`] will never succeed. + /// + /// [`open_bi`]: Connection::open_bi + #[inline] + pub fn accept_bi(&self) -> AcceptBi<'_> { + self.inner.accept_bi() + } + + /// Receives an application datagram. + #[inline] + pub fn read_datagram(&self) -> ReadDatagram<'_> { + self.inner.read_datagram() + } + + /// Wait for the connection to be closed for any reason. + /// + /// Despite the return type's name, closed connections are often not an error condition + /// at the application layer. Cases that might be routine include + /// [`ConnectionError::LocallyClosed`] and [`ConnectionError::ApplicationClosed`]. + #[inline] + pub async fn closed(&self) -> ConnectionError { + self.inner.closed().await + } + + /// If the connection is closed, the reason why. + /// + /// Returns `None` if the connection is still open. + #[inline] + pub fn close_reason(&self) -> Option { + self.inner.close_reason() + } + + /// Closes the connection immediately. + /// + /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No + /// more data is sent to the peer and the peer may drop buffered data upon receiving the + /// CONNECTION_CLOSE frame. + /// + /// `error_code` and `reason` are not interpreted, and are provided directly to the + /// peer. + /// + /// `reason` will be truncated to fit in a single packet with overhead; to improve odds + /// that it is preserved in full, it should be kept under 1KiB. + /// + /// # Gracefully closing a connection + /// + /// Only the peer last receiving application data can be certain that all data is + /// delivered. The only reliable action it can then take is to close the connection, + /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE + /// frame is very likely if both endpoints stay online long enough, calling + /// [`Endpoint::close`] will wait to provide sufficient time. Otherwise, the remote peer + /// will time out the connection, provided that the idle timeout is not disabled. + /// + /// The sending side can not guarantee all stream data is delivered to the remote + /// application. It only knows the data is delivered to the QUIC stack of the remote + /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling + /// [`close`] the remote endpoint may drop any data it received but is as yet + /// undelivered to the application, including data that was acknowledged as received to + /// the local endpoint. + /// + /// [`close`]: Connection::close + #[inline] + pub fn close(&self, error_code: VarInt, reason: &[u8]) { + self.inner.close(error_code, reason) + } + + /// Transmits `data` as an unreliable, unordered application datagram. + /// + /// Application datagrams are a low-level primitive. They may be lost or delivered out + /// of order, and `data` must both fit inside a single QUIC packet and be smaller than + /// the maximum dictated by the peer. + #[inline] + pub fn send_datagram(&self, data: bytes::Bytes) -> Result<(), SendDatagramError> { + self.inner.send_datagram(data) + } + + // TODO: It seems `SendDatagram` is not yet exposed by quinn. This has been fixed + // upstream and will be in the next release. + // /// Transmits `data` as an unreliable, unordered application datagram + // /// + // /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion + // /// conditions, which effectively prioritizes old datagrams over new datagrams. + // /// + // /// See [`send_datagram()`] for details. + // /// + // /// [`send_datagram()`]: Connection::send_datagram + // #[inline] + // pub fn send_datagram_wait(&self, data: bytes::Bytes) -> SendDatagram<'_> { + // self.inner.send_datagram_wait(data) + // } + + /// Computes the maximum size of datagrams that may be passed to [`send_datagram`]. + /// + /// Returns `None` if datagrams are unsupported by the peer or disabled locally. + /// + /// This may change over the lifetime of a connection according to variation in the path + /// MTU estimate. The peer can also enforce an arbitrarily small fixed limit, but if the + /// peer's limit is large this is guaranteed to be a little over a kilobyte at minimum. + /// + /// Not necessarily the maximum size of received datagrams. + /// + /// [`send_datagram`]: Self::send_datagram + #[inline] + pub fn max_datagram_size(&self) -> Option { + self.inner.max_datagram_size() + } + + /// Bytes available in the outgoing datagram buffer. + /// + /// When greater than zero, calling [`send_datagram`] with a + /// datagram of at most this size is guaranteed not to cause older datagrams to be + /// dropped. + /// + /// [`send_datagram`]: Self::send_datagram + #[inline] + pub fn datagram_send_buffer_space(&self) -> usize { + self.inner.datagram_send_buffer_space() + } + + /// The peer's UDP address. + /// + /// If [`ServerConfig::migration`] is `true`, clients may change addresses at will, + /// e.g. when switching to a cellular internet connection. + #[inline] + pub fn remote_address(&self) -> SocketAddr { + self.inner.remote_address() + } + + /// The local IP address which was used when the peer established the connection. + /// + /// This can be different from the address the endpoint is bound to, in case the + /// endpoint is bound to a wildcard address like `0.0.0.0` or `::`. + /// + /// This will return `None` for clients, or when the platform does not expose this + /// information. See [`quinn::udp::RecvMeta::dst_ip`] for a list of supported + /// platforms. + #[inline] + pub fn local_ip(&self) -> Option { + self.inner.local_ip() + } + + /// Current best estimate of this connection's latency (round-trip-time). + #[inline] + pub fn rtt(&self) -> Duration { + self.inner.rtt() + } + + /// Returns connection statistics. + #[inline] + pub fn stats(&self) -> ConnectionStats { + self.inner.stats() + } + + /// Current state of the congestion control algorithm, for debugging purposes. + #[inline] + pub fn congestion_state(&self) -> Box { + self.inner.congestion_state() + } + + /// Parameters negotiated during the handshake. + /// + /// Guaranteed to return `Some` on fully established connections or after + /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for + /// details on the returned value. + /// + /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data + #[inline] + pub fn handshake_data(&self) -> Option> { + self.inner.handshake_data() + } + + /// Extracts the ALPN protocol from the peer's handshake data. + pub fn alpn(&self) -> Option> { + let data = self.handshake_data()?; + match data.downcast::() { + Ok(data) => data.protocol, + Err(_) => None, + } + } + + /// Cryptographic identity of the peer. + /// + /// The dynamic type returned is determined by the configured [`Session`]. For the + /// default `rustls` session, the return value can be [`downcast`] to a + /// Vec<[rustls::pki_types::CertificateDer]> + /// + /// [`Session`]: quinn_proto::crypto::Session + /// [`downcast`]: Box::downcast + #[inline] + pub fn peer_identity(&self) -> Option> { + self.inner.peer_identity() + } + + /// Returns the [`NodeId`] from the peer's TLS certificate. + /// + /// The [`PublicKey`] of a node is also known as a [`NodeId`]. This [`PublicKey`] is + /// included in the TLS certificate presented during the handshake when connecting. + /// This function allows you to get the [`NodeId`] of the remote node of this + /// connection. + /// + /// [`PublicKey`]: iroh_base::PublicKey + // TODO: Would be nice if this could be infallible. + pub fn remote_node_id(&self) -> Result { + let data = self.peer_identity(); + match data { + None => bail!("no peer certificate found"), + Some(data) => match data.downcast::>() { + Ok(certs) => { + if certs.len() != 1 { + bail!( + "expected a single peer certificate, but {} found", + certs.len() + ); + } + let cert = tls::certificate::parse(&certs[0])?; + Ok(cert.peer_id()) } - let cert = tls::certificate::parse(&certs[0])?; - Ok(cert.peer_id()) - } - Err(_) => bail!("invalid peer certificate"), - }, + Err(_) => bail!("invalid peer certificate"), + }, + } + } + + /// A stable identifier for this connection. + /// + /// Peer addresses and connection IDs can change, but this value will remain fixed for + /// the lifetime of the connection. + #[inline] + pub fn stable_id(&self) -> usize { + self.inner.stable_id() + } + + /// Derives keying material from this connection's TLS session secrets. + /// + /// When both peers call this method with the same `label` and `context` + /// arguments and `output` buffers of equal length, they will get the + /// same sequence of bytes in `output`. These bytes are cryptographically + /// strong and pseudorandom, and are suitable for use as keying material. + /// + /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information. + #[inline] + pub fn export_keying_material( + &self, + output: &mut [u8], + label: &[u8], + context: &[u8], + ) -> Result<(), quinn_proto::crypto::ExportKeyingMaterialError> { + self.inner.export_keying_material(output, label, context) + } + + /// Modifies the number of unidirectional streams that may be concurrently opened. + /// + /// No streams may be opened by the peer unless fewer than `count` are already + /// open. Large `count`s increase both minimum and worst-case memory consumption. + #[inline] + pub fn set_max_concurrent_uni_streams(&self, count: VarInt) { + self.inner.set_max_concurrent_uni_streams(count) + } + + /// See [`quinn_proto::TransportConfig::receive_window`]. + #[inline] + pub fn set_receive_window(&self, receive_window: VarInt) { + self.inner.set_receive_window(receive_window) + } + + /// Modifies the number of bidirectional streams that may be concurrently opened. + /// + /// No streams may be opened by the peer unless fewer than `count` are already + /// open. Large `count`s increase both minimum and worst-case memory consumption. + #[inline] + pub fn set_max_concurrent_bi_streams(&self, count: VarInt) { + self.inner.set_max_concurrent_bi_streams(count) } } @@ -1347,18 +1660,18 @@ pub fn get_remote_node_id(connection: &Connection) -> Result { /// function. fn try_send_rtt_msg(conn: &Connection, magic_ep: &Endpoint) { // If we can't notify the rtt-actor that's not great but not critical. - let Ok(peer_id) = get_remote_node_id(conn) else { + let Ok(node_id) = conn.remote_node_id() else { warn!(?conn, "failed to get remote node id"); return; }; - let Ok(conn_type_changes) = magic_ep.conn_type(peer_id) else { + let Ok(conn_type_changes) = magic_ep.conn_type(node_id) else { warn!(?conn, "failed to create conn_type stream"); return; }; let rtt_msg = RttMessage::NewConnection { - connection: conn.weak_handle(), + connection: conn.inner.weak_handle(), conn_type_changes: conn_type_changes.stream(), - node_id: peer_id, + node_id, }; if let Err(err) = magic_ep.rtt_actor.msg_tx.try_send(rtt_msg) { warn!(?conn, "rtt-actor not reachable: {err:#}"); @@ -1676,8 +1989,8 @@ mod tests { info!("[server] round {i}"); let incoming = ep.accept().await.unwrap(); let conn = incoming.await.unwrap(); - let peer_id = get_remote_node_id(&conn).unwrap(); - info!(%i, peer = %peer_id.fmt_short(), "accepted connection"); + let node_id = conn.remote_node_id().unwrap(); + info!(%i, peer = %node_id.fmt_short(), "accepted connection"); let (mut send, mut recv) = conn.accept_bi().await.unwrap(); let mut buf = vec![0u8; chunk_size]; for _i in 0..n_chunks_per_client { @@ -1687,7 +2000,7 @@ mod tests { send.finish().unwrap(); send.stopped().await.unwrap(); recv.read_to_end(0).await.unwrap(); - info!(%i, peer = %peer_id.fmt_short(), "finished"); + info!(%i, peer = %node_id.fmt_short(), "finished"); info!("[server] round {i} done in {:?}", round_start.elapsed()); } } @@ -1792,7 +2105,7 @@ mod tests { let mut iconn = incoming.accept().unwrap(); let alpn = iconn.alpn().await.unwrap(); let conn = iconn.await.unwrap(); - let node_id = get_remote_node_id(&conn).unwrap(); + let node_id = conn.remote_node_id().unwrap(); assert_eq!(node_id, src); assert_eq!(alpn, TEST_ALPN); let (mut send, mut recv) = conn.accept_bi().await.unwrap(); @@ -1866,7 +2179,7 @@ mod tests { .await .unwrap(); - async fn handle_direct_conn(ep: &Endpoint, node_id: PublicKey) -> Result<()> { + async fn handle_direct_conn(ep: &Endpoint, node_id: NodeId) -> Result<()> { let mut stream = ep.conn_type(node_id)?.stream(); let src = ep.node_id().fmt_short(); let dst = node_id.fmt_short(); @@ -1882,7 +2195,7 @@ mod tests { async fn accept(ep: &Endpoint) -> NodeId { let incoming = ep.accept().await.unwrap(); let conn = incoming.await.unwrap(); - let node_id = get_remote_node_id(&conn).unwrap(); + let node_id = conn.remote_node_id().unwrap(); tracing::info!(node_id=%node_id.fmt_short(), "accepted connection"); node_id }