From 160f7aff15758b9ffeea7aceefc42aa789165fec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9odore=20Pr=C3=A9vot?= Date: Sat, 2 Nov 2024 19:36:30 +0100 Subject: [PATCH] fix(engineio/socket): heartbeat delay causing ping burst (#392) --- crates/engineioxide/src/socket.rs | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/crates/engineioxide/src/socket.rs b/crates/engineioxide/src/socket.rs index 277ccef5..f8c0fdad 100644 --- a/crates/engineioxide/src/socket.rs +++ b/crates/engineioxide/src/socket.rs @@ -330,8 +330,6 @@ where .expect("Pong rx should be locked only once"); let instant = tokio::time::Instant::now(); - let mut interval_tick = tokio::time::interval(interval); - interval_tick.tick().await; // Sleep for an interval minus the time it took to get here tokio::time::sleep(interval.saturating_sub(Duration::from_millis( 15 + instant.elapsed().as_millis() as u64, @@ -339,19 +337,31 @@ where .await; #[cfg(feature = "tracing")] - tracing::debug!("[sid={}] heartbeat sender routine started", self.id); + tracing::debug!(sid = ?self.id, "heartbeat sender routine started"); + let mut interval_tick = tokio::time::interval(interval); + interval_tick.tick().await; + // Some clients send the pong packet in first. If that happens, we should consume it. + heartbeat_rx.try_recv().ok(); loop { - // Some clients send the pong packet in first. If that happens, we should consume it. - heartbeat_rx.try_recv().ok(); + #[cfg(feature = "tracing")] + tracing::trace!(sid = ?self.id, "emitting ping"); self.internal_tx .try_send(smallvec![Packet::Ping]) .map_err(|_| Error::HeartbeatTimeout)?; + + #[cfg(feature = "tracing")] + tracing::trace!(sid = ?self.id, "waiting for pong"); + tokio::time::timeout(timeout, heartbeat_rx.recv()) .await .map_err(|_| Error::HeartbeatTimeout)? .ok_or(Error::HeartbeatTimeout)?; + + #[cfg(feature = "tracing")] + tracing::trace!(sid = ?self.id, "pong received"); + interval_tick.tick().await; } } @@ -364,7 +374,7 @@ where .expect("Pong rx should be locked only once"); #[cfg(feature = "tracing")] - tracing::debug!("[sid={}] heartbeat receiver routine started", self.id); + tracing::debug!(sid = ?self.id, "heartbeat receiver routine started"); loop { tokio::time::timeout(interval + timeout, heartbeat_rx.recv()) @@ -373,7 +383,7 @@ where .ok_or(Error::HeartbeatTimeout)?; #[cfg(feature = "tracing")] - tracing::debug!("[sid={}] ping received, sending pong", self.id); + tracing::trace!(sid = ?self.id, "ping received, sending pong"); self.internal_tx .try_send(smallvec![Packet::Pong]) .map_err(|_| Error::HeartbeatTimeout)?;