From 690809254d68006e1db3b48b5575b577487dfd6d Mon Sep 17 00:00:00 2001 From: Joeri van Ruth Date: Fri, 23 Feb 2024 15:36:28 +0100 Subject: [PATCH] Pay attention to TCP sequence numbers (Untested) --- src/pcap/tcp.rs | 99 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 80 insertions(+), 19 deletions(-) diff --git a/src/pcap/tcp.rs b/src/pcap/tcp.rs index 1a65785..c7d28db 100644 --- a/src/pcap/tcp.rs +++ b/src/pcap/tcp.rs @@ -66,12 +66,10 @@ impl TcpTracker { return Ok(()); } + let seqno = tcp.sequence_number(); + let id = ConnectionId::new(self.conn_ids.next().unwrap()); - let upstream = StreamState { - id, - dir: Direction::Upstream, - finished: false, - }; + let upstream = StreamState::new(id, Direction::Upstream, seqno.wrapping_add(1)); let ev = MapiEvent::Incoming { id, @@ -94,12 +92,11 @@ impl TcpTracker { let Some(upstream) = self.streams.get(&flipped) else { return Ok(()); }; + + let seqno = tcp.sequence_number(); + let id = upstream.id; - let downstream = StreamState { - id, - dir: Direction::Downstream, - finished: false, - }; + let downstream = StreamState::new(id, Direction::Downstream, seqno.wrapping_add(1)); let ev = MapiEvent::Connected { id, @@ -124,20 +121,20 @@ impl TcpTracker { let id = stream.id; let direction = stream.dir; + let seqno = tcp.sequence_number(); let payload = tcp.payload(); - if !payload.is_empty() { - let ev = MapiEvent::Data { - id, - direction, - data: payload.into(), - }; - handler(ev)?; + let Some(payload) = stream.reorder(seqno, tcp.fin(), payload) else { + return Ok(()); + }; + + Self::emit_data(id, direction, payload, handler)?; + while let Some(payload) = stream.next_ready() { + Self::emit_data(id, direction, &payload, handler)?; } - if !tcp.fin() { + if !stream.finished { return Ok(()); } - stream.finished = true; let ev = MapiEvent::ShutdownRead { id, direction }; handler(ev)?; @@ -152,11 +149,75 @@ impl TcpTracker { Ok(()) } + + fn emit_data( + id: ConnectionId, + direction: Direction, + payload: &[u8], + handler: &mut Handler, + ) -> io::Result<()> { + if !payload.is_empty() { + let ev = MapiEvent::Data { + id, + direction, + data: payload.into(), + }; + handler(ev)?; + } + Ok(()) + } } #[derive(Debug)] struct StreamState { id: ConnectionId, dir: Direction, + waiting_for: u32, + waiting: HashMap, bool)>, finished: bool, } + +impl StreamState { + fn new(id: ConnectionId, dir: Direction, seqno: u32) -> Self { + StreamState { + id, + dir, + waiting_for: seqno, + waiting: Default::default(), + finished: false, + } + } + + fn reorder<'a>(&'a mut self, seqno: u32, fin: bool, payload: &'a [u8]) -> Option<&'a [u8]> { + if self.waiting_for == seqno { + return self.yield_payload(payload, fin); + } + + // Discard packets we've already seen. Be careful with wraparound. + // Example values: waiting_for = 0x30, seqno_1 = 0x31, seqno_2 = 0x2f. + // delta_1 = 0x01, delta_2 = 0xff. + // delta_1 as i32 = 1, delta_2 as i32 = -1 + let delta = seqno.wrapping_sub(self.waiting_for); + if (delta as i32) < 0 { + return None; + } + + self.waiting.insert(seqno, (payload.to_owned(), fin)); + None + } + + fn next_ready(&mut self) -> Option> { + if let Some((payload, fin)) = self.waiting.remove(&self.waiting_for) { + self.yield_payload(payload, fin) + } else { + None + } + } + + fn yield_payload>(&mut self, payload: T, fin: bool) -> Option { + self.finished |= fin; + let n = payload.as_ref().len() as u32; + self.waiting_for = self.waiting_for.wrapping_add(n); + Some(payload) + } +}