From 17c6020f855c0c586beb10b5d4d0b9f35e134a45 Mon Sep 17 00:00:00 2001 From: Rigidity Date: Fri, 27 Oct 2023 09:30:26 -0400 Subject: [PATCH 1/3] Refactor client --- Cargo.lock | 24 +++++------ chia-client/src/error.rs | 5 ++- chia-client/src/peer.rs | 87 +++++++++++++++++++++++----------------- 3 files changed, 66 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b35c72096..21353da74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -238,7 +238,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chia" -version = "0.2.9" +version = "0.2.12" dependencies = [ "chia-protocol", "chia-traits", @@ -258,7 +258,7 @@ dependencies = [ [[package]] name = "chia-bls" -version = "0.2.7" +version = "0.2.12" dependencies = [ "anyhow", "arbitrary", @@ -289,7 +289,7 @@ dependencies = [ [[package]] name = "chia-client" -version = "0.1.0" +version = "0.2.12" dependencies = [ "chia-protocol", "chia-traits", @@ -316,7 +316,7 @@ dependencies = [ [[package]] name = "chia-protocol" -version = "0.2.7" +version = "0.2.12" dependencies = [ "arbitrary", "chia-bls", @@ -345,7 +345,7 @@ dependencies = [ [[package]] name = "chia-tools" -version = "0.1.16" +version = "0.2.12" dependencies = [ "chia", "chia-protocol", @@ -364,7 +364,7 @@ dependencies = [ [[package]] name = "chia-traits" -version = "0.1.0" +version = "0.2.12" dependencies = [ "chia_py_streamable_macro", "chia_streamable_macro", @@ -377,7 +377,7 @@ dependencies = [ [[package]] name = "chia-wallet" -version = "0.1.0" +version = "0.2.12" dependencies = [ "clvm-utils", "clvmr", @@ -386,7 +386,7 @@ dependencies = [ [[package]] name = "chia_py_streamable_macro" -version = "0.1.4" +version = "0.2.12" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -396,7 +396,7 @@ dependencies = [ [[package]] name = "chia_streamable_macro" -version = "0.2.4" +version = "0.2.12" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -485,7 +485,7 @@ checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" [[package]] name = "clvm-derive" -version = "0.1.0" +version = "0.2.12" dependencies = [ "proc-macro2", "quote", @@ -494,7 +494,7 @@ dependencies = [ [[package]] name = "clvm-traits" -version = "0.1.0" +version = "0.2.12" dependencies = [ "clvm-derive", "clvmr", @@ -505,7 +505,7 @@ dependencies = [ [[package]] name = "clvm-utils" -version = "0.2.7" +version = "0.2.12" dependencies = [ "clvm-traits", "clvmr", diff --git a/chia-client/src/error.rs b/chia-client/src/error.rs index e5aa3e62b..05f3e5e69 100644 --- a/chia-client/src/error.rs +++ b/chia-client/src/error.rs @@ -13,5 +13,8 @@ pub enum Error { WebSocket(#[from] tungstenite::Error), #[error("{0:?}")] - InvalidResponse(Option), + InvalidResponse(Message), + + #[error("missing response")] + MissingResponse, } diff --git a/chia-client/src/peer.rs b/chia-client/src/peer.rs index 5681593ed..4d6369bc8 100644 --- a/chia-client/src/peer.rs +++ b/chia-client/src/peer.rs @@ -11,6 +11,7 @@ use futures_util::{SinkExt, StreamExt}; use tokio::sync::{broadcast, oneshot, Mutex}; use tokio::{net::TcpStream, task::JoinHandle}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; +use tungstenite::Message as WsMessage; use crate::utils::stream; use crate::{Error, Result}; @@ -43,36 +44,9 @@ impl Peer { let inbound_task = tokio::spawn(async move { while let Some(message) = stream.next().await { if let Ok(message) = message { - let bytes = message.into_data(); - let cursor = &mut Cursor::new(bytes.as_slice()); - - // Parse the message. - let Ok(message) = Message::parse(cursor) else { - continue; - }; - - if let Some(id) = message.id { - // Send response through oneshot channel if present. - if let Some(request) = requests_clone.lock().await.remove(&id) { - request.send(message).ok(); - } - } else { - match message.msg_type { - ProtocolMessageTypes::CoinStateUpdate => { - let cursor = &mut Cursor::new(message.data.as_ref()); - if let Ok(body) = CoinStateUpdate::parse(cursor) { - event_sender.send(PeerEvent::CoinStateUpdate(body)).ok(); - } - } - ProtocolMessageTypes::NewPeakWallet => { - let cursor = &mut Cursor::new(message.data.as_ref()); - if let Ok(body) = NewPeakWallet::parse(cursor) { - event_sender.send(PeerEvent::NewPeakWallet(body)).ok(); - } - } - _ => {} - } - } + Self::handle_inbound(message, &requests_clone, &event_sender) + .await + .ok(); } } }); @@ -86,6 +60,45 @@ impl Peer { } } + async fn handle_inbound( + message: WsMessage, + requests: &Requests, + event_sender: &broadcast::Sender, + ) -> Result<()> { + let bytes = message.into_data(); + let cursor = &mut Cursor::new(bytes.as_slice()); + + // Parse the message. + let message = Message::parse(cursor)?; + + if let Some(id) = message.id { + // Send response through oneshot channel if present. + if let Some(request) = requests.lock().await.remove(&id) { + request.send(message).ok(); + } + return Ok(()); + } + + let cursor = &mut Cursor::new(message.data.as_ref()); + + macro_rules! events { + ( $( $event:ident ),+ $(,)? ) => { + match message.msg_type { + $( ProtocolMessageTypes::$event => { + event_sender + .send(PeerEvent::$event($event::parse(cursor)?)) + .ok(); + } )+ + _ => {} + } + }; + } + + events!(CoinStateUpdate, NewPeakWallet); + + Ok(()) + } + pub async fn perform_handshake(&self, network_id: String, node_type: NodeType) -> Result<()> { let handshake = Handshake { network_id, @@ -165,20 +178,20 @@ impl Peer { // Remove the one shot channel. self.requests.lock().await.remove(&message_id); - match response { - Ok(message) => { + // Handle the response, if present. + response + .map(|message| { let expected_type = R::msg_type(); let found_type = message.msg_type; if found_type != expected_type { - return Err(Error::InvalidResponse(Some(message))); + return Err(Error::InvalidResponse(message)); } R::parse(&mut Cursor::new(message.data.as_ref())) - .map_err(|_| Error::InvalidResponse(Some(message))) - } - _ => Err(Error::InvalidResponse(None)), - } + .or(Err(Error::InvalidResponse(message))) + }) + .unwrap_or(Err(Error::MissingResponse)) } pub fn receiver(&self) -> &broadcast::Receiver { From cc71a48e43f3fa1408696e80eb1f3016d46a9d09 Mon Sep 17 00:00:00 2001 From: Rigidity Date: Fri, 27 Oct 2023 09:31:49 -0400 Subject: [PATCH 2/3] Lock update --- Cargo.lock | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b35c72096..21353da74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -238,7 +238,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chia" -version = "0.2.9" +version = "0.2.12" dependencies = [ "chia-protocol", "chia-traits", @@ -258,7 +258,7 @@ dependencies = [ [[package]] name = "chia-bls" -version = "0.2.7" +version = "0.2.12" dependencies = [ "anyhow", "arbitrary", @@ -289,7 +289,7 @@ dependencies = [ [[package]] name = "chia-client" -version = "0.1.0" +version = "0.2.12" dependencies = [ "chia-protocol", "chia-traits", @@ -316,7 +316,7 @@ dependencies = [ [[package]] name = "chia-protocol" -version = "0.2.7" +version = "0.2.12" dependencies = [ "arbitrary", "chia-bls", @@ -345,7 +345,7 @@ dependencies = [ [[package]] name = "chia-tools" -version = "0.1.16" +version = "0.2.12" dependencies = [ "chia", "chia-protocol", @@ -364,7 +364,7 @@ dependencies = [ [[package]] name = "chia-traits" -version = "0.1.0" +version = "0.2.12" dependencies = [ "chia_py_streamable_macro", "chia_streamable_macro", @@ -377,7 +377,7 @@ dependencies = [ [[package]] name = "chia-wallet" -version = "0.1.0" +version = "0.2.12" dependencies = [ "clvm-utils", "clvmr", @@ -386,7 +386,7 @@ dependencies = [ [[package]] name = "chia_py_streamable_macro" -version = "0.1.4" +version = "0.2.12" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -396,7 +396,7 @@ dependencies = [ [[package]] name = "chia_streamable_macro" -version = "0.2.4" +version = "0.2.12" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -485,7 +485,7 @@ checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" [[package]] name = "clvm-derive" -version = "0.1.0" +version = "0.2.12" dependencies = [ "proc-macro2", "quote", @@ -494,7 +494,7 @@ dependencies = [ [[package]] name = "clvm-traits" -version = "0.1.0" +version = "0.2.12" dependencies = [ "clvm-derive", "clvmr", @@ -505,7 +505,7 @@ dependencies = [ [[package]] name = "clvm-utils" -version = "0.2.7" +version = "0.2.12" dependencies = [ "clvm-traits", "clvmr", From 9c286f297554c33562cefbf8b5744f54f79cd673 Mon Sep 17 00:00:00 2001 From: Rigidity Date: Fri, 27 Oct 2023 10:40:14 -0400 Subject: [PATCH 3/3] Use from_bytes --- chia-client/src/peer.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/chia-client/src/peer.rs b/chia-client/src/peer.rs index 4d6369bc8..d90935238 100644 --- a/chia-client/src/peer.rs +++ b/chia-client/src/peer.rs @@ -1,4 +1,3 @@ -use std::io::Cursor; use std::{collections::HashMap, sync::Arc}; use chia_protocol::{ @@ -65,11 +64,8 @@ impl Peer { requests: &Requests, event_sender: &broadcast::Sender, ) -> Result<()> { - let bytes = message.into_data(); - let cursor = &mut Cursor::new(bytes.as_slice()); - // Parse the message. - let message = Message::parse(cursor)?; + let message = Message::from_bytes(message.into_data().as_ref())?; if let Some(id) = message.id { // Send response through oneshot channel if present. @@ -79,14 +75,12 @@ impl Peer { return Ok(()); } - let cursor = &mut Cursor::new(message.data.as_ref()); - macro_rules! events { ( $( $event:ident ),+ $(,)? ) => { match message.msg_type { $( ProtocolMessageTypes::$event => { event_sender - .send(PeerEvent::$event($event::parse(cursor)?)) + .send(PeerEvent::$event($event::from_bytes(message.data.as_ref())?)) .ok(); } )+ _ => {} @@ -188,8 +182,7 @@ impl Peer { return Err(Error::InvalidResponse(message)); } - R::parse(&mut Cursor::new(message.data.as_ref())) - .or(Err(Error::InvalidResponse(message))) + R::from_bytes(message.data.as_ref()).or(Err(Error::InvalidResponse(message))) }) .unwrap_or(Err(Error::MissingResponse)) }