Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distinguish read and deser errors in engine websocket #2640

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 44 additions & 10 deletions src/wasm-lib/kcl/src/engine/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,54 @@ pub struct TcpRead {
stream: futures::stream::SplitStream<tokio_tungstenite::WebSocketStream<reqwest::Upgraded>>,
}

/// Occurs when client couldn't read from the WebSocket to the engine.
// #[derive(Debug)]
pub enum WebSocketReadError {
/// Could not read a message due to WebSocket errors.
Read(tokio_tungstenite::tungstenite::Error),
/// WebSocket message didn't contain a valid message that the KCL Executor could parse.
Deser(anyhow::Error),
}

impl From<anyhow::Error> for WebSocketReadError {
fn from(e: anyhow::Error) -> Self {
Self::Deser(e)
}
}

impl TcpRead {
pub async fn read(&mut self) -> Result<WebSocketResponse> {
pub async fn read(&mut self) -> std::result::Result<WebSocketResponse, WebSocketReadError> {
let Some(msg) = self.stream.next().await else {
anyhow::bail!("Failed to read from websocket");
return Err(anyhow::anyhow!("Failed to read from WebSocket").into());
};
let msg: WebSocketResponse = match msg? {
WsMsg::Text(text) => serde_json::from_str(&text)?,
WsMsg::Binary(bin) => bson::from_slice(&bin)?,
other => anyhow::bail!("Unexpected websocket message from server: {}", other),
let msg = match msg {
Ok(msg) => msg,
Err(e) if matches!(e, tokio_tungstenite::tungstenite::Error::Protocol(_)) => {
return Err(WebSocketReadError::Read(e))
}
Err(e) => return Err(anyhow::anyhow!("Error reading from engine's WebSocket: {e}").into()),
};
let msg: WebSocketResponse = match msg {
WsMsg::Text(text) => serde_json::from_str(&text)
.map_err(anyhow::Error::from)
.map_err(WebSocketReadError::from)?,
WsMsg::Binary(bin) => bson::from_slice(&bin)
.map_err(anyhow::Error::from)
.map_err(WebSocketReadError::from)?,
other => return Err(anyhow::anyhow!("Unexpected WebSocket message from engine API: {other}").into()),
};
Ok(msg)
}
}

#[derive(Debug)]
pub struct TcpReadHandle {
handle: Arc<tokio::task::JoinHandle<Result<()>>>,
handle: Arc<tokio::task::JoinHandle<Result<(), WebSocketReadError>>>,
}

impl std::fmt::Debug for TcpReadHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TcpReadHandle")
}
}

impl Drop for TcpReadHandle {
Expand Down Expand Up @@ -150,14 +181,17 @@ impl EngineConnection {
match tcp_read.read().await {
Ok(ws_resp) => {
for e in ws_resp.errors.iter().flatten() {
println!("got error message: {e}");
println!("got error message: {} {}", e.error_code, e.message);
}
if let Some(id) = ws_resp.request_id {
responses_clone.insert(id, ws_resp.clone());
}
}
Err(e) => {
println!("got ws error: {:?}", e);
match &e {
WebSocketReadError::Read(e) => eprintln!("could not read from WS: {:?}", e),
WebSocketReadError::Deser(e) => eprintln!("could not deserialize msg from WS: {:?}", e),
}
*socket_health_tcp_read.lock().unwrap() = SocketHealth::Inactive;
return Err(e);
}
Expand Down
Loading