Skip to content

Commit

Permalink
Merge pull request #295 from Rigidity/client-refactor
Browse files Browse the repository at this point in the history
Refactor client code
  • Loading branch information
arvidn authored Oct 28, 2023
2 parents 5bc7565 + 9c286f2 commit d232af9
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 52 deletions.
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion chia-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,8 @@ pub enum Error {
WebSocket(#[from] tungstenite::Error),

#[error("{0:?}")]
InvalidResponse(Option<Message>),
InvalidResponse(Message),

#[error("missing response")]
MissingResponse,
}
84 changes: 45 additions & 39 deletions chia-client/src/peer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::io::Cursor;
use std::{collections::HashMap, sync::Arc};

use chia_protocol::{
Expand All @@ -11,6 +10,7 @@ use futures_util::{SinkExt, StreamExt};
use tokio::sync::{broadcast, oneshot, Mutex};
use tokio::{net::TcpStream, task::JoinHandle};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tungstenite::Message as WsMessage;

use crate::utils::stream;
use crate::{Error, Result};
Expand Down Expand Up @@ -43,36 +43,9 @@ impl Peer {
let inbound_task = tokio::spawn(async move {
while let Some(message) = stream.next().await {
if let Ok(message) = message {
let bytes = message.into_data();
let cursor = &mut Cursor::new(bytes.as_slice());

// Parse the message.
let Ok(message) = Message::parse(cursor) else {
continue;
};

if let Some(id) = message.id {
// Send response through oneshot channel if present.
if let Some(request) = requests_clone.lock().await.remove(&id) {
request.send(message).ok();
}
} else {
match message.msg_type {
ProtocolMessageTypes::CoinStateUpdate => {
let cursor = &mut Cursor::new(message.data.as_ref());
if let Ok(body) = CoinStateUpdate::parse(cursor) {
event_sender.send(PeerEvent::CoinStateUpdate(body)).ok();
}
}
ProtocolMessageTypes::NewPeakWallet => {
let cursor = &mut Cursor::new(message.data.as_ref());
if let Ok(body) = NewPeakWallet::parse(cursor) {
event_sender.send(PeerEvent::NewPeakWallet(body)).ok();
}
}
_ => {}
}
}
Self::handle_inbound(message, &requests_clone, &event_sender)
.await
.ok();
}
}
});
Expand All @@ -86,6 +59,40 @@ impl Peer {
}
}

async fn handle_inbound(
message: WsMessage,
requests: &Requests,
event_sender: &broadcast::Sender<PeerEvent>,
) -> Result<()> {
// Parse the message.
let message = Message::from_bytes(message.into_data().as_ref())?;

if let Some(id) = message.id {
// Send response through oneshot channel if present.
if let Some(request) = requests.lock().await.remove(&id) {
request.send(message).ok();
}
return Ok(());
}

macro_rules! events {
( $( $event:ident ),+ $(,)? ) => {
match message.msg_type {
$( ProtocolMessageTypes::$event => {
event_sender
.send(PeerEvent::$event($event::from_bytes(message.data.as_ref())?))
.ok();
} )+
_ => {}
}
};
}

events!(CoinStateUpdate, NewPeakWallet);

Ok(())
}

pub async fn perform_handshake(&self, network_id: String, node_type: NodeType) -> Result<()> {
let handshake = Handshake {
network_id,
Expand Down Expand Up @@ -165,20 +172,19 @@ impl Peer {
// Remove the one shot channel.
self.requests.lock().await.remove(&message_id);

match response {
Ok(message) => {
// Handle the response, if present.
response
.map(|message| {
let expected_type = R::msg_type();
let found_type = message.msg_type;

if found_type != expected_type {
return Err(Error::InvalidResponse(Some(message)));
return Err(Error::InvalidResponse(message));
}

R::parse(&mut Cursor::new(message.data.as_ref()))
.map_err(|_| Error::InvalidResponse(Some(message)))
}
_ => Err(Error::InvalidResponse(None)),
}
R::from_bytes(message.data.as_ref()).or(Err(Error::InvalidResponse(message)))
})
.unwrap_or(Err(Error::MissingResponse))
}

pub fn receiver(&self) -> &broadcast::Receiver<PeerEvent> {
Expand Down

0 comments on commit d232af9

Please sign in to comment.