Skip to content

Commit 4c0b733

Browse files
committed
Collect timestamps
1 parent fbd7031 commit 4c0b733

File tree

6 files changed

+126
-42
lines changed

6 files changed

+126
-42
lines changed

src/event.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use std::{error::Error, fmt, io};
1+
use std::{
2+
error::Error,
3+
fmt, io,
4+
time::{Duration, SystemTime, UNIX_EPOCH},
5+
};
26

37
use smallvec::SmallVec;
48

@@ -261,3 +265,14 @@ impl<'a> ConnectionSink<'a> {
261265
.emit_event(MapiEvent::Oob(self.id(), direction, byte))
262266
}
263267
}
268+
269+
/// A timestamp represented as a [Duration] since the
270+
/// [UNIX_EPOCH][std::time::UNIX_EPOCH].
271+
pub struct Timestamp(pub Duration);
272+
273+
impl From<SystemTime> for Timestamp {
274+
fn from(t: SystemTime) -> Self {
275+
let duration = t.duration_since(UNIX_EPOCH).unwrap();
276+
Timestamp(duration)
277+
}
278+
}

src/main.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@ use std::fs::File;
1111
use std::panic::PanicInfo;
1212
use std::path::{Path, PathBuf};
1313
use std::process::ExitCode;
14+
use std::time::SystemTime;
1415
use std::{io, panic, process, thread};
1516

1617
use addr::MonetAddr;
1718
use anyhow::{bail, Context, Result as AResult};
1819
use argsplitter::{ArgError, ArgSplitter};
19-
use event::MapiEvent;
20+
use event::{MapiEvent, Timestamp};
2021
use pcap::Tracker;
2122

2223
use crate::{proxy::Proxy, render::Renderer};
@@ -134,14 +135,15 @@ fn run_proxy(
134135
) -> AResult<()> {
135136
let (send_events, receive_events) = std::sync::mpsc::sync_channel(500);
136137
let handler = move |event| {
137-
let _ = send_events.send(event);
138+
let timestamp = SystemTime::now().into();
139+
let _ = send_events.send((timestamp, event));
138140
};
139141
let mut proxy = Proxy::new(listen_addr, forward_addr, handler)?;
140142
install_ctrl_c_handler(proxy.get_shutdown_trigger())?;
141143
thread::spawn(move || proxy.run().unwrap());
142144

143-
while let Ok(ev) = receive_events.recv() {
144-
mapi_state.handle(&ev, renderer)?;
145+
while let Ok((ts, ev)) = receive_events.recv() {
146+
mapi_state.handle(&ts, &ev, renderer)?;
145147
}
146148
Ok(())
147149
}
@@ -160,7 +162,7 @@ fn run_pcap(path: &Path, mut mapi_state: mapi::State, renderer: &mut Renderer) -
160162
owned_file.as_mut().unwrap()
161163
};
162164

163-
let handler = |ev: MapiEvent| mapi_state.handle(&ev, renderer);
165+
let handler = |ts: &Timestamp, ev: MapiEvent| mapi_state.handle(ts, &ev, renderer);
164166
let mut tracker = Tracker::new(handler);
165167
pcap::parse_pcap_file(reader, &mut tracker)
166168
}

src/mapi/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
};
77

88
use crate::{
9-
event::{ConnectionId, Direction, MapiEvent},
9+
event::{ConnectionId, Direction, MapiEvent, Timestamp},
1010
render::{Renderer, Style},
1111
Level,
1212
};
@@ -29,7 +29,12 @@ impl State {
2929
}
3030
}
3131

32-
pub fn handle(&mut self, event: &MapiEvent, renderer: &mut Renderer) -> io::Result<()> {
32+
pub fn handle(
33+
&mut self,
34+
_timestamp: &Timestamp,
35+
event: &MapiEvent,
36+
renderer: &mut Renderer,
37+
) -> io::Result<()> {
3338
match event {
3439
MapiEvent::BoundPort(port) => {
3540
renderer.message(None, None, format_args!("LISTEN on port {port}"))?;

src/pcap/mod.rs

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,28 @@ mod mybufread;
22
mod tcp;
33
mod tracker;
44

5-
use std::io;
5+
use std::{
6+
io,
7+
time::{Duration, SystemTime},
8+
};
69

710
use anyhow::{bail, Result as AResult};
811

912
use pcap_file::{
1013
pcap::PcapReader,
11-
pcapng::{Block, PcapNgReader},
14+
pcapng::{blocks::interface_description::InterfaceDescriptionOption, Block, PcapNgReader},
1215
DataLink,
1316
};
1417

18+
use crate::event::Timestamp;
19+
1520
use self::mybufread::MyBufReader;
1621
pub use self::tracker::Tracker;
1722

1823
/// Parse PCAP records from the reader and hand the packets to the Tracker. This
1924
/// function works with both the old-style PCAP and with PCAP-NG file formats.
25+
///
26+
/// See also https://www.ietf.org/archive/id/draft-tuexen-opsawg-pcapng-04.html
2027
pub fn parse_pcap_file(mut rd: impl io::Read, tracker: &mut Tracker) -> AResult<()> {
2128
// read ahead to inspect the file header
2229
let mut signature = [0u8; 4];
@@ -52,11 +59,12 @@ fn parse_legacy_pcap(rd: MyBufReader, tracker: &mut Tracker) -> AResult<()> {
5259

5360
while let Some(pkt) = pcap_reader.next_packet() {
5461
let pkt = pkt?;
62+
let timestamp = Timestamp(pkt.timestamp);
5563
if pkt.data.len() == header.snaplen as usize {
5664
bail!("truncated packet");
5765
}
5866

59-
process_packet(header.datalink, &pkt.data, tracker)?;
67+
process_packet(&timestamp, header.datalink, &pkt.data, tracker)?;
6068
}
6169

6270
Ok(())
@@ -71,22 +79,51 @@ fn parse_pcap_ng(rd: MyBufReader, tracker: &mut Tracker) -> AResult<()> {
7179
// This mutable holds the latest value we have seen.
7280
let mut linktype = None;
7381

82+
// Only used for legacy Block::Packet, completely untested
83+
let mut timestamp_resolution = Duration::from_micros(1);
84+
85+
let mut timestamp: Timestamp = SystemTime::now().into();
7486
while let Some(block) = pcapng_reader.next_block() {
7587
let data = match block? {
7688
Block::InterfaceDescription(iface) => {
7789
linktype = Some(iface.linktype);
90+
for opt in iface.options {
91+
// This is all completely untested
92+
if let InterfaceDescriptionOption::IfTsResol(reso) = opt {
93+
let base = if reso & 0x80 == 0 { 10u32 } else { 2 };
94+
let divisor = base.pow(reso as u32 & 0x7F);
95+
timestamp_resolution = Duration::from_secs(1) / divisor;
96+
}
97+
}
7898
continue;
7999
}
80-
Block::Packet(packet) => packet.data,
81-
Block::SimplePacket(packet) => packet.data,
82-
Block::EnhancedPacket(packet) => packet.data,
100+
Block::Packet(packet) => {
101+
// This is all completely untested
102+
let units = packet.timestamp;
103+
// Duration can be multiplied by u32, not by u64.
104+
let units_lo = (units & 0xFFFF_FFFF) as u32;
105+
let units_hi = (units >> 32) as u32;
106+
let duration_lo = timestamp_resolution * units_lo;
107+
let duration_hi = timestamp_resolution * units_hi;
108+
let duration = duration_hi * 0x1_0000 * 0x1_0000 + duration_lo;
109+
timestamp = Timestamp(duration);
110+
packet.data
111+
}
112+
Block::SimplePacket(packet) => {
113+
// has no timestamp, keep existing
114+
packet.data
115+
}
116+
Block::EnhancedPacket(packet) => {
117+
timestamp = Timestamp(packet.timestamp);
118+
packet.data
119+
}
83120
_ => continue,
84121
};
85122

86123
// Broken files might contain packets before the first interface description block.
87124
// Ignore them.
88125
if let Some(lt) = linktype {
89-
process_packet(lt, &data, tracker)?;
126+
process_packet(&timestamp, lt, &data, tracker)?;
90127
}
91128
}
92129

@@ -95,11 +132,16 @@ fn parse_pcap_ng(rd: MyBufReader, tracker: &mut Tracker) -> AResult<()> {
95132

96133
/// This function is called from both [parse_legacy_pcap] and [parse_pcap_ng]
97134
/// for each packet in the file.
98-
fn process_packet(linktype: DataLink, data: &[u8], tracker: &mut Tracker) -> AResult<()> {
135+
fn process_packet(
136+
timestamp: &Timestamp,
137+
linktype: DataLink,
138+
data: &[u8],
139+
tracker: &mut Tracker,
140+
) -> AResult<()> {
99141
// We expect to read ethernet frames but it's also possible for pcap files to
100142
// capture at the IP level. Right now we only support Ethernet.
101143
match linktype {
102-
DataLink::ETHERNET => tracker.process_ethernet(data),
144+
DataLink::ETHERNET => tracker.process_ethernet(timestamp, data),
103145
_ => bail!("pcap file contains packet of type {linktype:?}, this is not supported"),
104146
}
105147
}

src/pcap/tcp.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ use std::{
77

88
use etherparse::TcpSlice;
99

10-
use crate::event::{ConnectionId, Direction, MapiEvent};
10+
use crate::event::{ConnectionId, Direction, MapiEvent, Timestamp};
1111

12-
type Handler<'a> = dyn FnMut(MapiEvent) -> io::Result<()> + 'a;
12+
type Handler<'a> = dyn for<'t> FnMut(&'t Timestamp, MapiEvent) -> io::Result<()> + 'a;
1313

1414
/// TCP connection state is identified by (src_ip,src_port, dest_ip,dest_port) tuples.
1515
/// This struct represents those.
@@ -52,6 +52,7 @@ impl TcpTracker {
5252
/// Handle a TCP packet.
5353
pub fn handle(
5454
&mut self,
55+
timestamp: &Timestamp,
5556
src_addr: IpAddr,
5657
dest_addr: IpAddr,
5758
tcp: &TcpSlice,
@@ -63,13 +64,19 @@ impl TcpTracker {
6364
};
6465

6566
match (tcp.syn(), tcp.ack()) {
66-
(true, false) => self.handle_syn(key, tcp, handler),
67-
(true, true) => self.handle_syn_ack(key, tcp, handler),
68-
_ => self.handle_existing(key, tcp, handler),
67+
(true, false) => self.handle_syn(timestamp, key, tcp, handler),
68+
(true, true) => self.handle_syn_ack(timestamp, key, tcp, handler),
69+
_ => self.handle_existing(timestamp, key, tcp, handler),
6970
}
7071
}
7172

72-
fn handle_syn(&mut self, key: Key, tcp: &TcpSlice, handler: &mut Handler) -> io::Result<()> {
73+
fn handle_syn(
74+
&mut self,
75+
timestamp: &Timestamp,
76+
key: Key,
77+
tcp: &TcpSlice,
78+
handler: &mut Handler,
79+
) -> io::Result<()> {
7380
let flipped = key.flip();
7481
if self.streams.contains_key(&key) || self.streams.contains_key(&flipped) {
7582
return Ok(());
@@ -85,14 +92,15 @@ impl TcpTracker {
8592
local: key.dest.into(),
8693
peer: key.src.into(),
8794
};
88-
handler(ev)?;
95+
handler(timestamp, ev)?;
8996

9097
self.streams.insert(key, upstream);
9198
Ok(())
9299
}
93100

94101
fn handle_syn_ack(
95102
&mut self,
103+
timestamp: &Timestamp,
96104
key: Key,
97105
tcp: &TcpSlice,
98106
handler: &mut Handler,
@@ -111,14 +119,15 @@ impl TcpTracker {
111119
id,
112120
peer: key.src.into(),
113121
};
114-
handler(ev)?;
122+
handler(timestamp, ev)?;
115123

116124
self.streams.insert(key, downstream);
117125
Ok(())
118126
}
119127

120128
fn handle_existing(
121129
&mut self,
130+
timestamp: &Timestamp,
122131
key: Key,
123132
tcp: &TcpSlice,
124133
handler: &mut Handler,
@@ -139,13 +148,13 @@ impl TcpTracker {
139148
let Some(payload) = stream.reorder(seqno, tcp.fin(), payload) else {
140149
return Ok(());
141150
};
142-
Self::emit_data(id, direction, payload, handler)?;
151+
Self::emit_data(timestamp, id, direction, payload, handler)?;
143152

144153
// If stream.reorder above returned this packet, it means it was exactly
145154
// the packet we needed right now. Packets do not always arrive in-order
146155
// so it's possible that the next packet is already in our cache.
147156
while let Some(payload) = stream.next_ready() {
148-
Self::emit_data(id, direction, &payload, handler)?;
157+
Self::emit_data(timestamp, id, direction, &payload, handler)?;
149158
}
150159

151160
// Stream.finished is set by stream.reorder and stream.next_ready.
@@ -157,20 +166,21 @@ impl TcpTracker {
157166
// Report this and drop all state if the other direction has also finished.
158167

159168
let ev = MapiEvent::ShutdownRead { id, direction };
160-
handler(ev)?;
169+
handler(timestamp, ev)?;
161170

162171
let flipped = key.flip();
163172
if let Some(StreamState { finished: true, .. }) = self.streams.get(&flipped) {
164173
self.streams.remove(&key);
165174
self.streams.remove(&flipped);
166175
let ev = MapiEvent::End { id };
167-
handler(ev)?;
176+
handler(timestamp, ev)?;
168177
}
169178

170179
Ok(())
171180
}
172181

173182
fn emit_data(
183+
timestamp: &Timestamp,
174184
id: ConnectionId,
175185
direction: Direction,
176186
payload: &[u8],
@@ -182,7 +192,7 @@ impl TcpTracker {
182192
direction,
183193
data: payload.into(),
184194
};
185-
handler(ev)?;
195+
handler(timestamp, ev)?;
186196
}
187197
Ok(())
188198
}

0 commit comments

Comments
 (0)