diff --git a/src/wasm-lib/kcl/src/engine/conn.rs b/src/wasm-lib/kcl/src/engine/conn.rs index 6d6aa8c62b..fc8cbe7f12 100644 --- a/src/wasm-lib/kcl/src/engine/conn.rs +++ b/src/wasm-lib/kcl/src/engine/conn.rs @@ -40,15 +40,41 @@ pub struct TcpRead { stream: futures::stream::SplitStream>, } +/// Occurs when client couldn't read from the WebSocket to the engine. +#[derive(Debug)] +pub enum WebSocketReadError { + /// Could not read a message due to WebSocket errors. + Read(tokio_tungstenite::tungstenite::Error), + /// WebSocket message didn't contain a valid message that the KCL Executor could parse. + Deser(anyhow::Error), +} + +impl From for WebSocketReadError { + fn from(e: anyhow::Error) -> Self { + Self::Deser(e) + } +} + impl TcpRead { - pub async fn read(&mut self) -> Result { + pub async fn read(&mut self) -> std::result::Result { let Some(msg) = self.stream.next().await else { - anyhow::bail!("Failed to read from websocket"); + return Err(anyhow::anyhow!("Failed to read from WebSocket").into()); + }; + let msg = match msg { + Ok(msg) => msg, + Err(e) if matches!(e, tokio_tungstenite::tungstenite::Error::Protocol(_)) => { + return Err(WebSocketReadError::Read(e)) + } + Err(e) => return Err(anyhow::anyhow!("Error reading from engine's WebSocket: {e}").into()), }; - let msg: WebSocketResponse = match msg? { - WsMsg::Text(text) => serde_json::from_str(&text)?, - WsMsg::Binary(bin) => bson::from_slice(&bin)?, - other => anyhow::bail!("Unexpected websocket message from server: {}", other), + let msg: WebSocketResponse = match msg { + WsMsg::Text(text) => serde_json::from_str(&text) + .map_err(anyhow::Error::from) + .map_err(WebSocketReadError::from)?, + WsMsg::Binary(bin) => bson::from_slice(&bin) + .map_err(anyhow::Error::from) + .map_err(WebSocketReadError::from)?, + other => return Err(anyhow::anyhow!("Unexpected WebSocket message from engine API: {other}").into()), }; Ok(msg) } @@ -56,7 +82,7 @@ impl TcpRead { #[derive(Debug)] pub struct TcpReadHandle { - handle: Arc>>, + handle: Arc>>, } impl Drop for TcpReadHandle { @@ -150,7 +176,7 @@ impl EngineConnection { match tcp_read.read().await { Ok(ws_resp) => { for e in ws_resp.errors.iter().flatten() { - println!("got error message: {e}"); + println!("got error message: {} {}", e.error_code, e.message); } if let Some(id) = ws_resp.request_id { responses_clone.insert(id, ws_resp.clone());