Skip to content

Commit

Permalink
Distinguish read and deser errors in engine websocket
Browse files Browse the repository at this point in the history
Fixes #2639
  • Loading branch information
adamchalmers committed Jun 10, 2024
1 parent 541400f commit d7bef74
Showing 1 changed file with 34 additions and 8 deletions.
42 changes: 34 additions & 8 deletions src/wasm-lib/kcl/src/engine/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,49 @@ pub struct TcpRead {
stream: futures::stream::SplitStream<tokio_tungstenite::WebSocketStream<reqwest::Upgraded>>,
}

/// Occurs when client couldn't read from the WebSocket to the engine.
#[derive(Debug)]
pub enum WebSocketReadError {
/// Could not read a message due to WebSocket errors.
Read(tokio_tungstenite::tungstenite::Error),
/// WebSocket message didn't contain a valid message that the KCL Executor could parse.
Deser(anyhow::Error),
}

impl From<anyhow::Error> for WebSocketReadError {
fn from(e: anyhow::Error) -> Self {
Self::Deser(e)
}
}

impl TcpRead {
pub async fn read(&mut self) -> Result<WebSocketResponse> {
pub async fn read(&mut self) -> std::result::Result<WebSocketResponse, WebSocketReadError> {
let Some(msg) = self.stream.next().await else {
anyhow::bail!("Failed to read from websocket");
return Err(anyhow::anyhow!("Failed to read from WebSocket").into());
};
let msg = match msg {
Ok(msg) => msg,
Err(e) if matches!(e, tokio_tungstenite::tungstenite::Error::Protocol(_)) => {
return Err(WebSocketReadError::Read(e))
}
Err(e) => return Err(anyhow::anyhow!("Error reading from engine's WebSocket: {e}").into()),
};
let msg: WebSocketResponse = match msg? {
WsMsg::Text(text) => serde_json::from_str(&text)?,
WsMsg::Binary(bin) => bson::from_slice(&bin)?,
other => anyhow::bail!("Unexpected websocket message from server: {}", other),
let msg: WebSocketResponse = match msg {
WsMsg::Text(text) => serde_json::from_str(&text)
.map_err(anyhow::Error::from)
.map_err(WebSocketReadError::from)?,
WsMsg::Binary(bin) => bson::from_slice(&bin)
.map_err(anyhow::Error::from)
.map_err(WebSocketReadError::from)?,
other => return Err(anyhow::anyhow!("Unexpected WebSocket message from engine API: {other}").into()),
};
Ok(msg)
}
}

#[derive(Debug)]
pub struct TcpReadHandle {
handle: Arc<tokio::task::JoinHandle<Result<()>>>,
handle: Arc<tokio::task::JoinHandle<Result<(), WebSocketReadError>>>,
}

impl Drop for TcpReadHandle {
Expand Down Expand Up @@ -150,7 +176,7 @@ impl EngineConnection {
match tcp_read.read().await {
Ok(ws_resp) => {
for e in ws_resp.errors.iter().flatten() {
println!("got error message: {e}");
println!("got error message: {} {}", e.error_code, e.message);
}
if let Some(id) = ws_resp.request_id {
responses_clone.insert(id, ws_resp.clone());
Expand Down

0 comments on commit d7bef74

Please sign in to comment.