Skip to content

Commit

Permalink
Replace tokio's TcpStream with async-std (#481)
Browse files Browse the repository at this point in the history
* async-std

* works, but still in progress

* working test withouth extra signals

* remove comments and re-add connector drop trait

* cleanup

* fix the failing test about current slot

* fix the failing test about current slot, by connecting to specifically different nodes

* update config files

* use a different account for testing

* fix rustfmt

---------

Co-authored-by: Gianfranco <[email protected]>
  • Loading branch information
b-yap and gianfra-t authored Jan 26, 2024
1 parent 326f2a5 commit f88ef24
Show file tree
Hide file tree
Showing 19 changed files with 296 additions and 185 deletions.
119 changes: 119 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions clients/stellar-relay-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ tokio = { version = "1.0", features = [
"sync", # to make channels available
"time" # for timeouts and sleep, when reconnecting
] }
async-std = { version = "1.12.0", features = ["attributes"] }

[features]
std = [
Expand Down
79 changes: 38 additions & 41 deletions clients/stellar-relay-lib/src/connection/connector/connector.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use async_std::net::TcpStream;
use std::{
fmt::{Debug, Formatter},
net::TcpStream,
sync::{Arc, Mutex},
time::Duration,
net::Shutdown,
};
use substrate_stellar_sdk::{
types::{AuthenticatedMessageV0, Curve25519Public, HmacSha256Mac, MessageType},
Expand Down Expand Up @@ -37,7 +36,7 @@ pub struct Connector {
flow_controller: FlowController,

/// for writing/reading xdr messages to/from Stellar Node.
pub(crate) tcp_stream: Arc<Mutex<TcpStream>>,
pub(crate) tcp_stream: TcpStream,
}

impl Debug for Connector {
Expand All @@ -53,10 +52,32 @@ impl Debug for Connector {
.field("receive_scp_messages", &self.receive_scp_messages)
.field("handshake_state", &self.handshake_state)
.field("flow_controller", &self.flow_controller)
.field(
"local_addr",
&self
.tcp_stream
.local_addr()
.map(|addr| addr.to_string())
.unwrap_or("cannot provide".to_string()),
)
.field(
"peer_addr",
&self
.tcp_stream
.peer_addr()
.map(|addr| addr.to_string())
.unwrap_or("cannot provide".to_string()),
)
.finish()
}
}

impl Drop for Connector {
fn drop(&mut self) {
self.stop();
}
}

impl Connector {
/// Verifies the AuthenticatedMessage, received from the Stellar Node
pub(super) fn verify_auth(
Expand Down Expand Up @@ -115,22 +136,17 @@ impl Connector {

/// returns a Connector and starts creating a connection to Stellar
pub async fn start(local_node: NodeInfo, conn_info: ConnectionInfo) -> Result<Self, Error> {
// Create the stream
let tcp_stream = TcpStream::connect(conn_info.address())
.await
.map_err(|e| Error::ConnectionFailed(e.to_string()))?;

let connection_auth = ConnectionAuth::new(
&local_node.network_id,
conn_info.keypair(),
conn_info.auth_cert_expiration,
);

// Create the stream
let tcp_stream = TcpStream::connect(conn_info.address())
.map_err(|e| Error::ConnectionFailed(e.to_string()))?;

if let Err(e) =
tcp_stream.set_read_timeout(Some(Duration::from_secs(conn_info.timeout_in_secs)))
{
log::warn!("start(): failed to set read timeout for the stream: {e:?}");
}

let mut connector = Connector {
local: LocalInfo::new(local_node),
remote_info: None,
Expand All @@ -142,14 +158,20 @@ impl Connector {
receive_scp_messages: conn_info.recv_scp_msgs,
handshake_state: HandshakeState::Connecting,
flow_controller: FlowController::default(),
tcp_stream: Arc::new(Mutex::new(tcp_stream)),
tcp_stream,
};

// To start the handshake, send a hello message to Stellar
connector.send_hello_message().await?;

Ok(connector)
}

pub fn stop(&mut self) {
if let Err(e) = self.tcp_stream.shutdown(Shutdown::Both) {
log::error!("stop(): failed to shutdown tcp stream: {}", e);
}
}
}

// getters setters
Expand Down Expand Up @@ -231,7 +253,6 @@ impl Connector {
mod test {
use crate::{connection::hmac::HMacKeys, node::RemoteInfo, StellarOverlayConfig};
use serial_test::serial;
use std::net::Shutdown;

use substrate_stellar_sdk::{
compound_types::LimitedString,
Expand Down Expand Up @@ -263,16 +284,6 @@ mod test {
new_auth_cert
}

impl Connector {
fn shutdown(&mut self) {
self.tcp_stream
.lock()
.unwrap()
.shutdown(Shutdown::Both)
.expect("should shutdown both read and write of stream");
}
}

async fn create_connector() -> (NodeInfo, ConnectionInfo, Connector) {
let cfg_file_path = "./resources/config/testnet/stellar_relay_config_sdftest1.json";
let secret_key_path = "./resources/secretkey/stellar_secretkey_testnet";
Expand All @@ -294,7 +305,7 @@ mod test {
#[tokio::test]
#[serial]
async fn create_new_connector_works() {
let (node_info, _, mut connector) = create_connector().await;
let (node_info, _, connector) = create_connector().await;

let connector_local_node = connector.local.node();

Expand All @@ -303,8 +314,6 @@ mod test {
assert_eq!(connector_local_node.overlay_min_version, node_info.overlay_min_version);
assert_eq!(connector_local_node.version_str, node_info.version_str);
assert_eq!(connector_local_node.network_id, node_info.network_id);

connector.shutdown();
}

#[tokio::test]
Expand All @@ -314,8 +323,6 @@ mod test {
assert_eq!(connector.local_sequence(), 0);
connector.increment_local_sequence();
assert_eq!(connector.local_sequence(), 1);

connector.shutdown();
}

#[tokio::test]
Expand All @@ -340,8 +347,6 @@ mod test {
connector.set_remote(RemoteInfo::new(&hello));

assert!(connector.remote().is_some());

connector.shutdown();
}

#[tokio::test]
Expand Down Expand Up @@ -370,8 +375,6 @@ mod test {
connector.increment_remote_sequence().unwrap();
connector.increment_remote_sequence().unwrap();
assert_eq!(connector.remote().unwrap().sequence(), 3);

connector.shutdown();
}

#[tokio::test]
Expand Down Expand Up @@ -408,8 +411,6 @@ mod test {
));
//assert
assert!(connector.hmac_keys().is_some());

connector.shutdown();
}

#[tokio::test]
Expand All @@ -426,8 +427,6 @@ mod test {

connector.handshake_completed();
assert!(connector.is_handshake_created());

connector.shutdown();
}

#[tokio::test]
Expand All @@ -437,7 +436,5 @@ mod test {

assert!(!connector.inner_check_to_send_more(MessageType::ScpMessage));
connector.enable_flow_controller(node_info.overlay_version, node_info.overlay_version);

connector.shutdown();
}
}
Loading

0 comments on commit f88ef24

Please sign in to comment.