From 3411ee7b2b096b0509cea1701f078fe596bcfe9e Mon Sep 17 00:00:00 2001 From: Noah Callaway Date: Sun, 6 Sep 2020 11:56:28 -0700 Subject: [PATCH] Change Connection behavior to require both a packet send and receive (#280) --- docs/md_book/src/SUMMARY.md | 9 ++- docs/md_book/src/connections.md | 63 +++++++++++++++ docs/md_book/src/protocols.md | 27 ++++--- src/config.rs | 5 ++ src/net/connection.rs | 5 +- src/net/connection_impl.rs | 48 ++++++++---- src/net/connection_manager.rs | 134 +++++++++++++++++++++++++------- src/net/events.rs | 13 +++- src/net/virtual_connection.rs | 27 +++++++ tests/basic_socket_test.rs | 1 - 10 files changed, 268 insertions(+), 64 deletions(-) create mode 100644 docs/md_book/src/connections.md diff --git a/docs/md_book/src/SUMMARY.md b/docs/md_book/src/SUMMARY.md index 0ce44f40..06132e0c 100644 --- a/docs/md_book/src/SUMMARY.md +++ b/docs/md_book/src/SUMMARY.md @@ -3,11 +3,12 @@ - [Intro](intro.md) - [Important Notices](important.md) - [Protocols](protocols.md) +- [Connections](connections.md) - [Heartbeat](heartbeat.md) - [Fragmentation](fragmentation.md) - [Reliability](reliability/basics.md) - - [Basics](reliability/basics.md) - - [Reliability](reliability/reliability.md) - - [Ordering](reliability/ordering.md) + - [Basics](reliability/basics.md) + - [Reliability](reliability/reliability.md) + - [Ordering](reliability/ordering.md) - [Congestion Avoidance](congestion_avoidence/congestion_avoidance.md) - - [Whit RTT](congestion_avoidence/rtt.md) \ No newline at end of file + - [Whit RTT](congestion_avoidence/rtt.md) diff --git a/docs/md_book/src/connections.md b/docs/md_book/src/connections.md new file mode 100644 index 00000000..19c5b5e9 --- /dev/null +++ b/docs/md_book/src/connections.md @@ -0,0 +1,63 @@ +# Connections + +As mentioned in [Protocols](protocols.md) laminar is built on top of UDP. UDP is a [connectionless protocol](https://en.wikipedia.org/wiki/Connectionless_communication), but for multiplayer games we very often want to connect to a particular endpoint and repeatedly send data back and forth. + +Laminar itself needs to maintain a small amount of data for each endpoint it is communicating with. For example, Laminar maintains data about which sent packets the other endpoint has acknowledged as well as an estimated [Rount Trip Time](congestion_avoidence/rtt.md). + +In order to support these common use-cases Laminar adds an extremely simple connection model on top of UDP. + +Connections are considered established whenever we have both sent and received data to the same endpoint. This means, to establish a connection your server needs to respond to inbound messages it receives. A sample of this is presented + +**client** + +```rust +let socket = Socket::bind(SERVER_ADDRESS); +let (sender, receiver) = (socket.get_packet_sender(), socket.get_event_receiver()); +thread::spawn(move || socket.start_polling()); + +sender.send(Packet::reliable_unordered(SERVER_ADDRESS, "Ping".as_bytes().to_vec())); + +loop { + if let Ok(event) = receiver.recv() { + match event { + SocketEvent::Connect(addr) => { + println!("Connected to: {}", addr); + }, + _ + } + } +} +``` + +**server** + +```rust +let socket = Socket::bind(SERVER_ADDRESS); +let (sender, receiver) = (socket.get_packet_sender(), socket.get_event_receiver()); +thread::spawn(move || socket.start_polling()); + +loop { + if let Ok(event) = receiver.recv() { + match event { + SocketEvent::Packet(packet) => { + if packet.payload() == b"Ping" { + sender.send(Packet::reliable_unordered( + packet.addr(), + "Pong".as_bytes().to_vec(), + )).unwrap(); + } + }, + SocketEvent::Connect(addr) => { + println!("Connected to: {}", addr); + }, + _ + } + } +} +``` + +If we don't send the `Pong` in the server, then neither the client nor the server will display the "Connected to" message. + +### Packet Flooding Mitigation + +Laminar will optimistically track data for endpoints before connections are established. As soon as data is sent or received from a new endpoint Laminar will start tracking the endpoint. In order to prevent packet flooding attacks from causing Laminar to allocate too much memory, the number of unestablished connections that Laminar will optimistically track can be controlled with the `max_unestablished_connections` Config. diff --git a/docs/md_book/src/protocols.md b/docs/md_book/src/protocols.md index f0477311..8993a74b 100644 --- a/docs/md_book/src/protocols.md +++ b/docs/md_book/src/protocols.md @@ -1,36 +1,41 @@ # Networking protocols -So first and possibly the important one is which protocol to use and when. Let’s first take a look at TCP and UDP. +When building any networked application the first and possibly the important decision to make is which protocol to use and when. Laminar is built on top of UDP. Let’s first take a quick look at both TCP and UDP, then we'll explain why Laminar uses UDP. ## IP -All communication over the internet is happening over IP (Internet Protocol). -This protocol only passes packets across the network without any guarantee that it will arrive at the destination. + +All communication over the internet is happening over IP (Internet Protocol). +This protocol only passes packets across the network without any guarantee that it will arrive at the destination. Sometimes IP passes along multiple copies of the same packet and these packets make their way to the destination via different paths, causing packets to arrive out of order and in duplicate. -So to be able to communicate over the network we make use of existing protocols that provides some more certainty. +So to be able to communicate over the network we make use of existing protocols that provides some more certainty. We will first take a look at TCP where after we checkout UPD. ## TCP/IP -TCP stands for “transmission control protocol”. IP stands for “internet protocol”. + +TCP stands for “transmission control protocol”. IP stands for “internet protocol”. Together they form the backbone for almost everything you do online, from web browsing to IRC to email, it’s all built on top of TCP/IP. -TCP is a connection-oriented protocol, which means a connection is established and maintained until the application programs at each end have finished exchanging messages. -TCP provides full reliable, ordered communication between two machines. The data you send is guaranteed to arrive and in order. +TCP is a connection-oriented protocol, which means a connection is established and maintained until the application programs at each end have finished exchanging messages. +TCP provides full reliable, ordered communication between two machines. The data you send is guaranteed to arrive and in order. The TCP protocol will also split up and reassemble packets if those are too large. **Characteristics** + - Reliable - Ordered - Automatic [fragmentation](fragmentation.md) of packets - Stream based - Control Flow ([Congestion Avoidance](congestion_avoidence/congestion_avoidance.md)) - + ## UDP + UDP stands for “user datagram protocol” and it’s another protocol built on top of IP, but unlike TCP, instead of adding lots of features and complexity, UDP is a very thin layer over IP. Like IP, UDP is an unreliable protocol. In practice however, most packets that are sent will get through, but you’ll usually have around 1-5% packet loss, and occasionally you’ll get periods where no packets get through at all (remember there are lots of computers between you and your destination where things can go wrong…) **Characteristics** + - Not Reliable - Not Ordered - No [fragmentation](fragmentation.md) of packets @@ -39,9 +44,10 @@ Like IP, UDP is an unreliable protocol. In practice however, most packets that a - Message based ## Why UDP and not TCP | More + Those of you familiar with TCP know that it already has its own concept of connection, reliability-ordering and congestion avoidance, so why are we rewriting our own mini version of TCP on top of UDP? -The issue is that multiplayer action games rely on a steady stream of packets sent at rates of 10 to 30 packets per second, and for the most part, the data contained in these packets is so time sensitive that only the most recent data is useful. +The issue is that multiplayer action games rely on a steady stream of packets sent at rates of 10 to 30 packets per second, and for the most part, the data contained in these packets is so time sensitive that only the most recent data is useful. This includes data such as player inputs, the position, orientation and velocity of each player character, and the state of physics objects in the world. The problem with TCP is that it abstracts data delivery as a reliable ordered stream. Because of this, if a packet is lost, TCP has to stop and wait for that packet to be resent. @@ -57,6 +63,7 @@ What TCP does is maintain a sliding window where the ACK sent is the sequence nu It is not possible to implement a reliability system with these properties using TCP, so we have no choice but to roll our own reliability on top of UDP. TCP itself is built on UDP. ## When use TCP + Of course there could be use-cases for TCP like chat, asset streaming, etc. We can setup a TCP socket for this that is distinct from UDP. -We could also make our UDP channel reliable as described below so when we detect package lost on the client we could construct a new package +We could also make our UDP channel reliable as described below so when we detect package lost on the client we could construct a new package diff --git a/src/config.rs b/src/config.rs index 10e6c8b3..1d9dfe25 100644 --- a/src/config.rs +++ b/src/config.rs @@ -55,6 +55,10 @@ pub struct Config { /// When we send a reliable packet, it is stored locally until an acknowledgement comes back to /// us, if that store grows to a size. pub max_packets_in_flight: u16, + + /// The maximum number of unestablished connections that laminar will track internally. This is + /// used to prevent malicious packet flooding from consuming an unbounded amount of memory. + pub max_unestablished_connections: u16, } impl Default for Config { @@ -73,6 +77,7 @@ impl Default for Config { socket_event_buffer_size: 1024, socket_polling_timeout: Some(Duration::from_millis(1)), max_packets_in_flight: 512, + max_unestablished_connections: 50, } } } diff --git a/src/net/connection.rs b/src/net/connection.rs index fedcd25b..a195c1d3 100644 --- a/src/net/connection.rs +++ b/src/net/connection.rs @@ -32,14 +32,15 @@ pub trait Connection: Debug { /// * messenger - allows to send packets and events, also provides a config. /// * address - defines a address that connection is associated with. /// * time - creation time, used by connection, so that it doesn't get dropped immediately or send heartbeat packet. - /// * initial_data - if initiated by remote host, this will hold that a packet data. fn create_connection( messenger: &mut impl ConnectionMessenger, address: SocketAddr, time: Instant, - initial_data: Option<&[u8]>, ) -> Self; + /// Connections are considered established once they have both had both a send and a receive. + fn is_established(&self) -> bool; + /// Determines if the connection should be dropped due to its state. fn should_drop( &mut self, diff --git a/src/net/connection_impl.rs b/src/net/connection_impl.rs index 4cc0ebc9..9f6d773a 100644 --- a/src/net/connection_impl.rs +++ b/src/net/connection_impl.rs @@ -18,6 +18,7 @@ impl ConnectionEventAddress for SocketEvent { SocketEvent::Packet(packet) => packet.addr(), SocketEvent::Connect(addr) => *addr, SocketEvent::Timeout(addr) => *addr, + SocketEvent::Disconnect(addr) => *addr, } } } @@ -44,15 +45,15 @@ impl Connection for VirtualConnection { messenger: &mut impl ConnectionMessenger, address: SocketAddr, time: Instant, - initial_data: Option<&[u8]>, ) -> VirtualConnection { - // emit connect event if this is initiated by the remote host. - if initial_data.is_some() { - messenger.send_event(&address, SocketEvent::Connect(address)); - } VirtualConnection::new(address, messenger.config(), time) } + /// Connections are considered established once they both have had a send and a receive. + fn is_established(&self) -> bool { + self.is_established() + } + /// Determines if the given `Connection` should be dropped due to its state. fn should_drop( &mut self, @@ -66,6 +67,12 @@ impl Connection for VirtualConnection { &self.remote_address, SocketEvent::Timeout(self.remote_address), ); + if self.is_established() { + messenger.send_event( + &self.remote_address, + SocketEvent::Disconnect(self.remote_address), + ); + } } should_drop } @@ -80,6 +87,13 @@ impl Connection for VirtualConnection { if !payload.is_empty() { match self.process_incoming(payload, time) { Ok(packets) => { + if self.record_recv() { + messenger.send_event( + &self.remote_address, + SocketEvent::Connect(self.remote_address), + ); + } + for incoming in packets { messenger.send_event(&self.remote_address, SocketEvent::Packet(incoming.0)); } @@ -102,6 +116,10 @@ impl Connection for VirtualConnection { time: Instant, ) { let addr = self.remote_address; + if self.record_send() { + messenger.send_event(&addr, SocketEvent::Connect(addr)); + } + send_packets( messenger, &addr, @@ -143,15 +161,17 @@ impl Connection for VirtualConnection { } // send heartbeat packets if required - if let Some(heartbeat_interval) = messenger.config().heartbeat_interval { - let addr = self.remote_address; - if self.last_sent(time) >= heartbeat_interval { - send_packets( - messenger, - &addr, - self.process_outgoing(PacketInfo::heartbeat_packet(&[]), None, time), - "heatbeat packet", - ); + if self.is_established() { + if let Some(heartbeat_interval) = messenger.config().heartbeat_interval { + let addr = self.remote_address; + if self.last_sent(time) >= heartbeat_interval { + send_packets( + messenger, + &addr, + self.process_outgoing(PacketInfo::heartbeat_packet(&[]), None, time), + "heatbeat packet", + ); + } } } } diff --git a/src/net/connection_manager.rs b/src/net/connection_manager.rs index 60adef47..80a3b03f 100644 --- a/src/net/connection_manager.rs +++ b/src/net/connection_manager.rs @@ -75,6 +75,7 @@ pub struct ConnectionManager { messenger: SocketEventSenderAndConfig, event_receiver: Receiver, user_event_sender: Sender, + max_unestablished_connections: u16, } impl ConnectionManager { @@ -82,6 +83,8 @@ impl ConnectionManager Self { let (event_sender, event_receiver) = unbounded(); let (user_event_sender, user_event_receiver) = unbounded(); + let max_unestablished_connections = config.max_unestablished_connections; + ConnectionManager { receive_buffer: vec![0; config.receive_buffer_max_size], connections: Default::default(), @@ -89,6 +92,7 @@ impl ConnectionManager ConnectionManager ConnectionManager { if let Some(conn) = self.connections.get_mut(&address) { + let was_est = conn.is_established(); conn.process_packet(messenger, payload, time); + if !was_est && conn.is_established() { + unestablished_connections -= 1; + } } else { - // create connection, but do not add to active connections list - let mut conn = - TConnection::create_connection(messenger, address, time, Some(payload)); + let mut conn = TConnection::create_connection(messenger, address, time); conn.process_packet(messenger, payload, time); + + // We only allow a maximum amount number of unestablished connections to bet created + // from inbound packets to prevent packet flooding from allocating unbounded memory. + if unestablished_connections < self.max_unestablished_connections as usize { + self.connections.insert(address, conn); + unestablished_connections += 1; + } } } Err(e) => { @@ -130,9 +146,14 @@ impl ConnectionManager ConnectionManager usize { + self.connections + .iter() + .filter(|c| !c.1.is_established()) + .count() + } + /// Returns socket mutable reference. #[allow(dead_code)] pub fn socket_mut(&mut self) -> &mut TSocket { @@ -181,7 +209,7 @@ impl ConnectionManager SocketAddr { + SocketAddr::V4(SocketAddrV4::new("127.0.0.1".parse().unwrap(), 10002 + n)) + } + fn server_address() -> SocketAddr { SERVER_ADDR.parse().unwrap() } @@ -234,7 +266,6 @@ mod tests { client.manual_poll(time); server.manual_poll(time); - assert_eq![Ok(SocketEvent::Connect(client_address())), receiver.recv()]; if let SocketEvent::Packet(packet) = receiver.recv().unwrap() { assert_eq![b"Hello world!", packet.payload()]; } else { @@ -285,30 +316,42 @@ mod tests { #[test] fn receiving_does_not_allow_denial_of_service() { - let (mut server, mut client, _) = create_server_client_network(); + let time = Instant::now(); + let network = NetworkEmulator::default(); + let mut server = FakeSocket::bind( + &network, + server_address(), + Config { + max_unestablished_connections: 2, + ..Default::default() + }, + ) + .unwrap(); // send a bunch of packets to a server - for _ in 0..3 { + for i in 0..3 { + let mut client = + FakeSocket::bind(&network, client_address_n(i), Config::default()).unwrap(); + client .send(Packet::unreliable( server_address(), vec![1, 2, 3, 4, 5, 6, 7, 8, 9], )) .unwrap(); - } - let time = Instant::now(); + client.manual_poll(time); + } - client.manual_poll(time); server.manual_poll(time); - for _ in 0..6 { + for _ in 0..3 { assert![server.recv().is_some()]; } assert![server.recv().is_none()]; - // the server shall not have any connection in its connection table even though it received - // packets - assert_eq![0, server.connection_count()]; + // the server shall not have at most the configured `max_unestablished_connections` in + // its connection table even though it packets from 3 clients + assert_eq![2, server.connection_count()]; server .send(Packet::unreliable(client_address(), vec![1])) @@ -317,7 +360,7 @@ mod tests { server.manual_poll(time); // the server only adds to its table after having sent explicitly - assert_eq![1, server.connection_count()]; + assert_eq![2, server.connection_count()]; } #[test] @@ -425,7 +468,7 @@ mod tests { assert![!seen.contains(&byte)]; seen.insert(byte); } - SocketEvent::Timeout(_) => { + SocketEvent::Timeout(_) | SocketEvent::Disconnect(_) => { panic!["This should not happen, as we've not advanced time"]; } } @@ -463,7 +506,7 @@ mod tests { SocketEvent::Packet(_) => { cnt += 1; } - SocketEvent::Timeout(_) => { + SocketEvent::Timeout(_) | SocketEvent::Disconnect(_) => { panic!["This should not happen, as we've not advanced time"]; } } @@ -557,10 +600,15 @@ mod tests { .send(Packet::unreliable(server_address(), vec![0, 1, 2])) .unwrap(); + server + .send(Packet::unreliable(client_address(), vec![2, 1, 0])) + .unwrap(); + let now = Instant::now(); client.manual_poll(now); server.manual_poll(now); + assert!(matches!(server.recv().unwrap(), SocketEvent::Packet(_))); assert_eq!( server.recv().unwrap(), SocketEvent::Connect(client_address()) @@ -581,10 +629,6 @@ mod tests { client.manual_poll(now); server.manual_poll(now); - assert_eq!( - server.recv().unwrap(), - SocketEvent::Connect(client_address()) - ); assert_eq!( server.recv().unwrap(), SocketEvent::Packet(Packet::unreliable(client_address(), vec![0, 1, 2])) @@ -598,7 +642,16 @@ mod tests { server.manual_poll(now); client.manual_poll(now); + assert_eq!( + server.recv().unwrap(), + SocketEvent::Connect(client_address()) + ); + // make sure the connection was successful on the client side + assert_eq!( + client.recv().unwrap(), + SocketEvent::Connect(server_address()) + ); assert_eq!( client.recv().unwrap(), SocketEvent::Packet(Packet::unreliable(server_address(), vec![])) @@ -619,10 +672,18 @@ mod tests { server.recv().unwrap(), SocketEvent::Timeout(client_address()) ); + assert_eq!( + server.recv().unwrap(), + SocketEvent::Disconnect(client_address()) + ); assert_eq!( client.recv().unwrap(), SocketEvent::Timeout(server_address()) ); + assert_eq!( + client.recv().unwrap(), + SocketEvent::Disconnect(server_address()) + ); } #[test] @@ -640,11 +701,6 @@ mod tests { client.manual_poll(now); server.manual_poll(now); - // make sure the connection was successful on the server side - assert_eq!( - server.recv().unwrap(), - SocketEvent::Connect(client_address()) - ); assert_eq!( server.recv().unwrap(), SocketEvent::Packet(Packet::unreliable(client_address(), vec![0, 1, 2])) @@ -659,6 +715,18 @@ mod tests { server.manual_poll(now); client.manual_poll(now); + // make sure the connection was successful on the server side + assert_eq!( + server.recv().unwrap(), + SocketEvent::Connect(client_address()) + ); + + // make sure the connection was successful on the server side + assert_eq!( + client.recv().unwrap(), + SocketEvent::Connect(server_address()) + ); + // make sure the connection was successful on the client side assert_eq!( client.recv().unwrap(), @@ -704,8 +772,8 @@ mod tests { } // ensure that we get the correct number of events to the server. - // 35 connect events plus the 35 messages - assert_eq!(events.len(), 70); + // 0 connect events plus the 35 messages + assert_eq!(events.len(), 35); // finally the server decides to send us a message back. This necessarily will include // the ack information for 33 of the sent 35 packets. @@ -714,6 +782,12 @@ mod tests { .unwrap(); server.manual_poll(now); + // make sure the connection was successful on the server side + assert_eq!( + server.recv().unwrap(), + SocketEvent::Connect(client_address()) + ); + // loop to ensure that the client gets the server message before moving on loop { client.manual_poll(now); @@ -790,7 +864,7 @@ mod tests { SocketEvent::Packet(pkt) => { set.insert(pkt.payload()[0]); } - SocketEvent::Timeout(_) => { + SocketEvent::Timeout(_) | SocketEvent::Disconnect(_) => { panic!["Unable to time out, time has not advanced"] } SocketEvent::Connect(_) => {} diff --git a/src/net/events.rs b/src/net/events.rs index a4b05d8f..63ab4675 100644 --- a/src/net/events.rs +++ b/src/net/events.rs @@ -7,10 +7,17 @@ use crate::packet::Packet; pub enum SocketEvent { /// A packet was received from a client. Packet(Packet), - /// A new client connected. - /// Clients are uniquely identified by the ip:port combination at this layer. + /// A new connection has been established with a client. A connection is considered + /// established whenever a packet has been both _sent_ and _received_ from the client. + /// + /// On the server—in order to receive a `Connect` event—you must respond to the first + /// Packet from a new client. + /// + /// Clients are uniquely identified by the `ip:port` combination at this layer. Connect(SocketAddr), - /// The client has been idling for a configurable amount of time. + /// The client has been idling for longer than the `idle_connection_timeout` time. /// You can control the timeout in the config. Timeout(SocketAddr), + /// The established connection to a client has timed out. + Disconnect(SocketAddr), } diff --git a/src/net/virtual_connection.rs b/src/net/virtual_connection.rs index 6ec5a2b5..f7c15db7 100644 --- a/src/net/virtual_connection.rs +++ b/src/net/virtual_connection.rs @@ -29,6 +29,9 @@ pub struct VirtualConnection { /// The address of the remote endpoint pub remote_address: SocketAddr, + ever_sent: bool, + ever_recv: bool, + ordering_system: OrderingSystem<(Box<[u8]>, PacketType)>, sequencing_system: SequencingSystem>, acknowledge_handler: AcknowledgmentHandler, @@ -45,6 +48,8 @@ impl VirtualConnection { last_heard: time, last_sent: time, remote_address: addr, + ever_sent: false, + ever_recv: false, ordering_system: OrderingSystem::new(), sequencing_system: SequencingSystem::new(), acknowledge_handler: AcknowledgmentHandler::new(), @@ -54,6 +59,28 @@ impl VirtualConnection { } } + /// Records that this connection has sent a packet. Returns whether the connection has + /// become acknowledged because of this send. + pub fn record_send(&mut self) -> bool { + let was_est = self.is_established(); + self.ever_sent = true; + + !was_est && self.is_established() + } + + /// Records that this connection has sent a packet. Returns whether the connection has + /// become acknowledged because of this send. + pub fn record_recv(&mut self) -> bool { + let was_est = self.is_established(); + self.ever_recv = true; + + !was_est && self.is_established() + } + + pub fn is_established(&self) -> bool { + self.ever_sent && self.ever_recv + } + pub fn packets_in_flight(&self) -> u16 { self.acknowledge_handler.packets_in_flight() } diff --git a/tests/basic_socket_test.rs b/tests/basic_socket_test.rs index 9f397165..733856e5 100644 --- a/tests/basic_socket_test.rs +++ b/tests/basic_socket_test.rs @@ -46,7 +46,6 @@ fn blocking_sender_and_receiver() { client.manual_poll(time); server.manual_poll(time); - assert_eq![SocketEvent::Connect(client_addr), server.recv().unwrap()]; if let SocketEvent::Packet(packet) = server.recv().unwrap() { assert_eq![b"Hello world!", packet.payload()]; } else {