Skip to content

Commit

Permalink
Fixed whitespace and added more hearbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
CEbbinghaus committed Dec 22, 2024
1 parent d472baf commit 5520b54
Showing 1 changed file with 110 additions and 107 deletions.
217 changes: 110 additions & 107 deletions backend/src/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,120 +20,123 @@ const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);

#[get("/ws-test")]
async fn ws(req: HttpRequest, body: web::Payload, sender: web::Data<Sender<CardEvent>>) -> Result<HttpResponse, Error> {
let (response, session, msg_stream) = actix_ws::handle(&req, body)?;
let (response, session, msg_stream) = actix_ws::handle(&req, body)?;

actix_rt::spawn(async move {
handle_ws(session, msg_stream, sender.into_inner()).await;
});
actix_rt::spawn(async move {
handle_ws(session, msg_stream, sender.into_inner()).await;
});

Ok(response)
Ok(response)
}


/// Broadcast text & binary messages received from a client, respond to ping messages, and monitor
/// connection health to detect network issues and free up resources.
pub async fn handle_ws(
mut session: actix_ws::Session,
mut msg_stream: actix_ws::MessageStream,
channel: Arc<Sender<CardEvent>>,
mut session: actix_ws::Session,
mut msg_stream: actix_ws::MessageStream,
channel: Arc<Sender<CardEvent>>,
) {
info!("connected");

let mut last_heartbeat = Instant::now();
let mut interval = interval(HEARTBEAT_INTERVAL);

let mut reciever = channel.subscribe();

let reason = loop {
// waits for either `msg_stream` to receive a message from the client, the broadcast channel
// to send a message, or the heartbeat interval timer to tick, yielding the value of
// whichever one is ready first
select! {
broadcast_msg = reciever.recv() => {
let msg = match broadcast_msg {
Ok(msg) => msg,
Err(error::RecvError::Closed) => break None,
Err(error::RecvError::Lagged(_)) => continue,
};

let res = session.text(Event::new(msg)).await;

if let Err(err) = res {
error!("{err}");
break None;
}
}

// heartbeat interval ticked
_tick = interval.tick() => {
// if no heartbeat ping/pong received recently, close the connection
if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
info!(
"client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"
);

break None;
}

// send heartbeat ping
let _ = session.ping(b"").await;
},

msg = msg_stream.next() => {
let msg = match msg {
// received message from WebSocket client
Some(Ok(msg)) => msg,

// client WebSocket stream error
Some(Err(err)) => {
error!("{err}");
break None;
}

// client WebSocket stream ended
None => break None
};

debug!("msg: {msg:?}");

match msg {
Message::Text(data) => {
let event = ParsedEvent::parse(&data);
info!(event = ?event, "Recieved Event");
let _ = channel.send(CardEvent::Updated);
// drop client's text messages
}

Message::Binary(_) => {
// drop client's binary messages
}

Message::Close(reason) => {
break reason;
}

Message::Ping(bytes) => {
last_heartbeat = Instant::now();
let _ = session.pong(&bytes).await;
}

Message::Pong(_) => {
last_heartbeat = Instant::now();
}

Message::Continuation(_) => {
warn!("no support for continuation frames");
}

// no-op; ignore
Message::Nop => {}
};
}
}
};

// attempt to close connection gracefully
let _ = session.close(reason).await;

info!("disconnected");
info!("connected");

let mut last_heartbeat = Instant::now();
let mut interval = interval(HEARTBEAT_INTERVAL);

let mut reciever = channel.subscribe();

let reason = loop {
// waits for either `msg_stream` to receive a message from the client, the broadcast channel
// to send a message, or the heartbeat interval timer to tick, yielding the value of
// whichever one is ready first
select! {
broadcast_msg = reciever.recv() => {
let msg = match broadcast_msg {
Ok(msg) => msg,
Err(error::RecvError::Closed) => break None,
Err(error::RecvError::Lagged(_)) => continue,
};

let res = session.text(Event::new(msg)).await;

if let Err(err) = res {
error!("{err}");
break None;
}
}

// heartbeat interval ticked
_tick = interval.tick() => {
// if no heartbeat ping/pong received recently, close the connection
if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
info!(
"client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"
);

break None;
}

// send heartbeat ping
let _ = session.ping(b"").await;
},

msg = msg_stream.next() => {
let msg = match msg {
// received message from WebSocket client
Some(Ok(msg)) => msg,

// client WebSocket stream error
Some(Err(err)) => {
error!("{err}");
break None;
}

// client WebSocket stream ended
None => break None
};

debug!("msg: {msg:?}");

match msg {
Message::Text(data) => {
last_heartbeat = Instant::now();
let event = ParsedEvent::parse(&data);
info!(event = ?event, "received Event");
let _ = channel.send(CardEvent::Updated);
// drop client's text messages
}

Message::Binary(_) => {
last_heartbeat = Instant::now();
// drop client's binary messages
warn!("received binary message; dropping");
}

Message::Close(reason) => {
break reason;
}

Message::Ping(bytes) => {
last_heartbeat = Instant::now();
let _ = session.pong(&bytes).await;
}

Message::Pong(_) => {
last_heartbeat = Instant::now();
}

Message::Continuation(_) => {
warn!("no support for continuation frames");
}

// no-op; ignore
Message::Nop => {}
};
}
}
};

// attempt to close connection gracefully
let _ = session.close(reason).await;

info!("disconnected");
}

0 comments on commit 5520b54

Please sign in to comment.