From 45fa9196db102471d8ae7d00540e4c4e53373e89 Mon Sep 17 00:00:00 2001 From: Cameron Garnham Date: Thu, 8 Aug 2024 11:05:56 +0200 Subject: [PATCH] more test work --- packages/dht/Cargo.toml | 3 +- packages/dht/examples/debug.rs | 31 +-- packages/dht/src/builder.rs | 9 +- packages/dht/src/routing/node.rs | 2 +- packages/dht/src/worker/bootstrap.rs | 17 +- packages/dht/src/worker/handler.rs | 85 +++--- packages/dht/src/worker/lookup.rs | 13 +- packages/dht/src/worker/messenger.rs | 9 +- packages/dht/src/worker/refresh.rs | 7 +- packages/disk/Cargo.toml | 3 +- packages/disk/examples/add_torrent.rs | 27 +- packages/disk/src/disk/manager.rs | 88 ++++--- packages/disk/src/disk/tasks/context.rs | 4 +- packages/disk/src/disk/tasks/mod.rs | 19 +- packages/disk/tests/add_torrent.rs | 22 +- packages/disk/tests/common/mod.rs | 76 +++--- packages/disk/tests/complete_torrent.rs | 244 ++++++++---------- .../tests/disk_manager_send_backpressure.rs | 39 ++- packages/disk/tests/load_block.rs | 9 +- packages/disk/tests/process_block.rs | 22 +- packages/disk/tests/remove_torrent.rs | 28 +- packages/disk/tests/resume_torrent.rs | 43 +-- packages/magnet/src/lib.rs | 2 +- packages/peer/Cargo.toml | 4 + packages/peer/src/manager/mod.rs | 3 +- packages/peer/src/manager/peer_manager.rs | 74 +----- .../peer/src/manager/peer_manager_sink.rs | 5 - packages/peer/src/message/mod.rs | 102 ++++++-- packages/peer/src/message/null.rs | 1 + packages/peer/src/message/prot_ext/mod.rs | 5 +- packages/peer/src/protocol/extension.rs | 4 +- packages/peer/src/protocol/wire.rs | 4 +- .../peer/tests/common/connected_channel.rs | 79 ++++++ packages/peer/tests/common/mod.rs | 160 +++++++----- .../tests/peer_manager_send_backpressure.rs | 61 ++--- packages/select/Cargo.toml | 2 +- packages/select/src/discovery/ut_metadata.rs | 5 +- packages/util/src/sha/mod.rs | 11 + packages/utracker/src/client/mod.rs | 2 +- 39 files changed, 754 insertions(+), 570 deletions(-) create mode 100644 packages/peer/tests/common/connected_channel.rs diff --git a/packages/dht/Cargo.toml b/packages/dht/Cargo.toml index b6d34a6f9..1b6da9241 100644 --- a/packages/dht/Cargo.toml +++ b/packages/dht/Cargo.toml @@ -21,9 +21,10 @@ handshake = { path = "../handshake" } util = { path = "../util" } crc = "3" -log = "0" futures = "0" tokio = { version = "1", features = ["full"] } rand = "0" chrono = "0" thiserror = "1" +tracing = "0" +tracing-subscriber = "0" diff --git a/packages/dht/examples/debug.rs b/packages/dht/examples/debug.rs index 84e806244..2fbf58ee3 100644 --- a/packages/dht/examples/debug.rs +++ b/packages/dht/examples/debug.rs @@ -1,28 +1,16 @@ use std::collections::HashSet; use std::io::Read; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::sync::Once; use dht::handshaker_trait::HandshakerTrait; use dht::{DhtBuilder, Router}; use futures::future::BoxFuture; use futures::StreamExt; +use tracing::level_filters::LevelFilter; use util::bt::{InfoHash, PeerId}; -struct SimpleLogger; - -impl log::Log for SimpleLogger { - fn enabled(&self, metadata: &log::Metadata<'_>) -> bool { - metadata.level() <= log::Level::Info - } - - fn log(&self, record: &log::Record<'_>) { - if self.enabled(record.metadata()) { - println!("{} - {}", record.level(), record.args()); - } - } - - fn flush(&self) {} -} +static INIT: Once = Once::new(); struct SimpleHandshaker { filter: HashSet, @@ -63,10 +51,19 @@ impl HandshakerTrait for SimpleHandshaker { fn metadata(&mut self, (): Self::MetadataEnvelope) {} } +fn tracing_stdout_init(filter: LevelFilter) { + let builder = tracing_subscriber::fmt().with_max_level(filter).with_ansi(true); + + builder.pretty().with_file(true).init(); + + tracing::info!("Logging initialized"); +} + #[tokio::main] async fn main() { - log::set_logger(&SimpleLogger).unwrap(); - log::set_max_level(log::LevelFilter::max()); + INIT.call_once(|| { + tracing_stdout_init(LevelFilter::INFO); + }); let hash = InfoHash::from_bytes(b"My Unique Info Hash"); diff --git a/packages/dht/src/builder.rs b/packages/dht/src/builder.rs index 4826e2784..6e043fd6a 100644 --- a/packages/dht/src/builder.rs +++ b/packages/dht/src/builder.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use futures::channel::mpsc::{self, Receiver, Sender}; use futures::SinkExt as _; -use log::warn; use tokio::net::UdpSocket; use util::bt::InfoHash; use util::net; @@ -49,7 +48,7 @@ impl MainlineDht { .await .is_err() { - warn!("bip_dt: MainlineDht failed to send a start bootstrap message..."); + tracing::warn!("bip_dt: MainlineDht failed to send a start bootstrap message..."); } Ok(MainlineDht { main_task_sender }) @@ -71,7 +70,7 @@ impl MainlineDht { .await .is_err() { - warn!("bip_dht: MainlineDht failed to send a start lookup message..."); + tracing::warn!("bip_dht: MainlineDht failed to send a start lookup message..."); } } @@ -84,7 +83,7 @@ impl MainlineDht { let (send, recv) = mpsc::channel(1); if let Err(e) = self.main_task_sender.clone().send(OneshotTask::RegisterSender(send)).await { - warn!("bip_dht: MainlineDht failed to send a register sender message..., {e}"); + tracing::warn!("bip_dht: MainlineDht failed to send a register sender message..., {e}"); // TODO: Should we push a Shutdown event through the sender here? We would need // to know the cause or create a new cause for this specific scenario since the // client could have been lazy and wasn't monitoring this until after it shutdown! @@ -102,7 +101,7 @@ impl Drop for MainlineDht { .try_send(OneshotTask::Shutdown(ShutdownCause::ClientInitiated)) .is_err() { - warn!( + tracing::warn!( "bip_dht: MainlineDht failed to send a shutdown message (may have already been \ shutdown)..." ); diff --git a/packages/dht/src/routing/node.rs b/packages/dht/src/routing/node.rs index da5cddc52..b5030abf8 100644 --- a/packages/dht/src/routing/node.rs +++ b/packages/dht/src/routing/node.rs @@ -89,7 +89,7 @@ impl Node { /// Record that we sent the node a request. pub fn local_request(&self) { if self.status() != NodeStatus::Good { - let num_requests = self.refresh_requests.fetch_add(1, Ordering::SeqCst); + let num_requests = self.refresh_requests.fetch_add(1, Ordering::SeqCst) + 1; } } diff --git a/packages/dht/src/worker/bootstrap.rs b/packages/dht/src/worker/bootstrap.rs index fe0f4115d..0493f33e6 100644 --- a/packages/dht/src/worker/bootstrap.rs +++ b/packages/dht/src/worker/bootstrap.rs @@ -6,7 +6,6 @@ use std::sync::{Arc, Mutex, RwLock}; use futures::channel::mpsc::Sender; use futures::future::BoxFuture; use futures::{FutureExt as _, SinkExt as _}; -use log::{error, info, warn}; use tokio::time::{sleep, Duration}; use util::bt::{self, NodeId}; @@ -83,7 +82,7 @@ impl TableBootstrap { .await .is_err() { - error!("bip_dht: Failed to send scheduled task check for bootstrap timeout"); + tracing::error!("bip_dht: Failed to send scheduled task check for bootstrap timeout"); } }); @@ -97,7 +96,7 @@ impl TableBootstrap { // Ping all initial routers and nodes for addr in self.starting_routers.iter().chain(self.starting_nodes.iter()) { if out.send((find_node_msg.clone(), *addr)).await.is_err() { - error!("bip_dht: Failed to send bootstrap message to router through channel..."); + tracing::error!("bip_dht: Failed to send bootstrap message to router through channel..."); return BootstrapStatus::Failed; } } @@ -123,7 +122,7 @@ impl TableBootstrap { let _timeout = if let Some(t) = self.active_messages.lock().unwrap().get(&trans_id) { *t } else { - warn!( + tracing::warn!( "bip_dht: Received expired/unsolicited node response for an active table \ bootstrap..." ); @@ -157,7 +156,7 @@ impl TableBootstrap { H: HandshakerTrait + 'static, { if self.active_messages.lock().unwrap().remove(&trans_id).is_none() { - warn!( + tracing::warn!( "bip_dht: Received expired/unsolicited node timeout for an active table \ bootstrap..." ); @@ -276,7 +275,7 @@ impl TableBootstrap { { let bootstrap_bucket = self.curr_bootstrap_bucket.load(Ordering::Relaxed); - info!("bip_dht: bootstrap::send_bootstrap_requests {}", bootstrap_bucket); + tracing::info!("bip_dht: bootstrap::send_bootstrap_requests {}", bootstrap_bucket); let mut messages_sent = 0; @@ -290,7 +289,7 @@ impl TableBootstrap { // Send the message to the node if out.send((find_node_msg, node.addr())).await.is_err() { - error!("bip_dht: Could not send a bootstrap message through the channel..."); + tracing::error!("bip_dht: Could not send a bootstrap message through the channel..."); return BootstrapStatus::Failed; } @@ -311,12 +310,12 @@ impl TableBootstrap { .await .is_err() { - error!("bip_dht: Failed to send scheduled task check for bootstrap timeout"); + tracing::error!("bip_dht: Failed to send scheduled task check for bootstrap timeout"); } }); } - let bootstrap_bucket = self.curr_bootstrap_bucket.fetch_add(1, Ordering::Relaxed); + let bootstrap_bucket = self.curr_bootstrap_bucket.fetch_add(1, Ordering::AcqRel) + 1; if (bootstrap_bucket) == table::MAX_BUCKETS { BootstrapStatus::Completed diff --git a/packages/dht/src/worker/handler.rs b/packages/dht/src/worker/handler.rs index 1a0f45e07..3dda90f6f 100644 --- a/packages/dht/src/worker/handler.rs +++ b/packages/dht/src/worker/handler.rs @@ -8,7 +8,6 @@ use bencode::{ben_bytes, BDecodeOpt, BencodeMut, BencodeRef}; use futures::channel::mpsc::{self, Sender}; use futures::future::BoxFuture; use futures::{FutureExt, SinkExt, StreamExt as _}; -use log::{error, info, log_enabled, warn}; use tokio::net::UdpSocket; use util::bt::InfoHash; use util::convert; @@ -85,10 +84,10 @@ where // when it processes the message and tries to pass it to us, it will see that our channel // is closed and know that it should shut down. The outgoing messenger will shut itself down. if kill_sock.send_to(&b"0"[..], kill_addr).await.is_err() { - error!("bip_dht: Failed to send a wake up message to the incoming channel..."); + tracing::error!("bip_dht: Failed to send a wake up message to the incoming channel..."); } - info!("bip_dht: DhtHandler gracefully shut down, exiting thread..."); + tracing::info!("bip_dht: DhtHandler gracefully shut down, exiting thread..."); }); main_task_sender @@ -201,7 +200,7 @@ where async fn handle_incoming(&self, buffer: &[u8], addr: SocketAddr) { // Parse the buffer as a bencoded message let Ok(bencode) = BencodeRef::decode(buffer, BDecodeOpt::default()) else { - warn!("bip_dht: Received invalid bencode data..."); + tracing::warn!("bip_dht: Received invalid bencode data..."); return; }; @@ -237,7 +236,7 @@ where // Process the given message match message { Ok(MessageType::Request(RequestType::Ping(p))) => { - info!("bip_dht: Received a PingRequest..."); + tracing::info!("bip_dht: Received a PingRequest..."); let node = Node::as_good(p.node_id(), addr); let ping_rsp = { @@ -258,12 +257,12 @@ where let sent = out.send((ping_msg, addr)).await; if sent.is_err() { - error!("bip_dht: Failed to send a ping response on the out channel..."); + tracing::error!("bip_dht: Failed to send a ping response on the out channel..."); self.handle_shutdown(ShutdownCause::Unspecified); } } Ok(MessageType::Request(RequestType::FindNode(f))) => { - info!("bip_dht: Received a FindNodeRequest..."); + tracing::info!("bip_dht: Received a FindNodeRequest..."); let node = Node::as_good(f.node_id(), addr); let find_node_msg = { @@ -286,12 +285,12 @@ where }; if self.out_channel.clone().send((find_node_msg, addr)).await.is_err() { - error!("bip_dht: Failed to send a find node response on the out channel..."); + tracing::error!("bip_dht: Failed to send a find node response on the out channel..."); self.handle_shutdown(ShutdownCause::Unspecified); } } Ok(MessageType::Request(RequestType::GetPeers(g))) => { - info!("bip_dht: Received a GetPeersRequest..."); + tracing::info!("bip_dht: Received a GetPeersRequest..."); let node = Node::as_good(g.node_id(), addr); let get_peers_msg = { @@ -317,7 +316,7 @@ where } } SocketAddr::V6(_) => { - error!("AnnounceStorage contained an IPv6 Address..."); + tracing::error!("AnnounceStorage contained an IPv6 Address..."); return; } }; @@ -363,12 +362,12 @@ where }; if self.out_channel.clone().send((get_peers_msg, addr)).await.is_err() { - error!("bip_dht: Failed to send a get peers response on the out channel..."); + tracing::error!("bip_dht: Failed to send a get peers response on the out channel..."); self.handle_shutdown(ShutdownCause::Unspecified); } } Ok(MessageType::Request(RequestType::AnnouncePeer(a))) => { - info!("bip_dht: Received an AnnouncePeerRequest..."); + tracing::info!("bip_dht: Received an AnnouncePeerRequest..."); let node = Node::as_good(a.node_id(), addr); let response_msg = { @@ -399,7 +398,7 @@ where // Resolve type of response we are going to send if !is_valid { // Node gave us an invalid token - warn!("bip_dht: Remote node sent us an invalid token for an AnnounceRequest..."); + tracing::warn!("bip_dht: Remote node sent us an invalid token for an AnnounceRequest..."); ErrorMessage::new( a.transaction_id().to_vec(), ErrorCode::ProtocolError, @@ -412,7 +411,7 @@ where } else { // Node unsuccessfully stored the value with us, send them an error message // TODO: Spec doesn't actually say what error message to send, or even if we should send one... - warn!( + tracing::warn!( "bip_dht: AnnounceStorage failed to store contact information because it \ is full..." ); @@ -426,12 +425,12 @@ where }; if self.out_channel.clone().send((response_msg, addr)).await.is_err() { - error!("bip_dht: Failed to send an announce peer response on the out channel..."); + tracing::error!("bip_dht: Failed to send an announce peer response on the out channel..."); self.handle_shutdown(ShutdownCause::Unspecified); } } Ok(MessageType::Response(ResponseType::FindNode(f))) => { - info!("bip_dht: Received a FindNodeResponse..."); + tracing::info!("bip_dht: Received a FindNodeResponse..."); let trans_id = TransactionID::from_bytes(f.transaction_id()).unwrap(); let node = Node::as_good(f.node_id(), addr); @@ -460,11 +459,11 @@ where Some((bootstrap, attempts)) } Some(TableAction::Lookup(_)) => { - error!("bip_dht: Resolved a FindNodeResponse ActionID to a TableLookup..."); + tracing::error!("bip_dht: Resolved a FindNodeResponse ActionID to a TableLookup..."); None } None => { - error!( + tracing::error!( "bip_dht: Resolved a TransactionID to a FindNodeResponse but no \ action found..." ); @@ -519,7 +518,7 @@ where let routing_table = self.routing_table.read().unwrap(); - if log_enabled!(log::Level::Info) { + if tracing::enabled!(tracing::Level::INFO) { let mut total = 0; for (index, bucket) in routing_table.buckets().enumerate() { @@ -539,7 +538,7 @@ where } } Ok(MessageType::Response(ResponseType::GetPeers(g))) => { - info!("bip_dht: Received a GetPeersResponse..."); + tracing::info!("bip_dht: Received a GetPeersResponse..."); let trans_id = TransactionID::from_bytes(g.transaction_id()).unwrap(); let node = Node::as_good(g.node_id(), addr); @@ -555,21 +554,21 @@ where match table_action { Some(TableAction::Lookup(lookup)) => Some(lookup), Some(TableAction::Refresh(_)) => { - error!( + tracing::error!( "bip_dht: Resolved a GetPeersResponse ActionID to a \ TableRefresh..." ); None } Some(TableAction::Bootstrap(_, _)) => { - error!( + tracing::error!( "bip_dht: Resolved a GetPeersResponse ActionID to a \ TableBootstrap..." ); None } None => { - error!( + tracing::error!( "bip_dht: Resolved a TransactionID to a GetPeersResponse but no \ action found..." ); @@ -609,20 +608,20 @@ where } } Ok(MessageType::Response(ResponseType::Ping(_))) => { - info!("bip_dht: Received a PingResponse..."); + tracing::info!("bip_dht: Received a PingResponse..."); // Yeah...we should never be getting this type of response (we never use this message) } Ok(MessageType::Response(ResponseType::AnnouncePeer(_))) => { - info!("bip_dht: Received an AnnouncePeerResponse..."); + tracing::info!("bip_dht: Received an AnnouncePeerResponse..."); } Ok(MessageType::Error(e)) => { - info!("bip_dht: Received an ErrorMessage..."); + tracing::info!("bip_dht: Received an ErrorMessage..."); - warn!("bip_dht: KRPC error message from {:?}: {:?}", addr, e); + tracing::warn!("bip_dht: KRPC error message from {:?}: {:?}", addr, e); } Err(e) => { - warn!("bip_dht: Error parsing KRPC message: {:?}", e); + tracing::warn!("bip_dht: Error parsing KRPC message: {:?}", e); } } } @@ -769,21 +768,21 @@ where .await, ), Some(TableAction::Lookup(_)) => { - error!( + tracing::error!( "bip_dht: Resolved a TransactionID to a check table refresh but TableLookup \ found..." ); None } Some(TableAction::Bootstrap(_, _)) => { - error!( + tracing::error!( "bip_dht: Resolved a TransactionID to a check table refresh but \ TableBootstrap found..." ); None } None => { - error!( + tracing::error!( "bip_dht: Resolved a TransactionID to a check table refresh but no action \ found..." ); @@ -815,21 +814,21 @@ where attempts, )), Some(TableAction::Lookup(_)) => { - error!( + tracing::error!( "bip_dht: Resolved a TransactionID to a check table bootstrap but \ TableLookup found..." ); None } Some(TableAction::Refresh(_)) => { - error!( + tracing::error!( "bip_dht: Resolved a TransactionID to a check table bootstrap but \ TableRefresh found..." ); None } None => { - error!( + tracing::error!( "bip_dht: Resolved a TransactionID to a check table bootstrap but no \ action found..." ); @@ -885,21 +884,21 @@ where lookup.info_hash(), )), Some(TableAction::Bootstrap(_, _)) => { - error!( + tracing::error!( "bip_dht: Resolved a TransactionID to a check table lookup but TableBootstrap \ found..." ); None } Some(TableAction::Refresh(_)) => { - error!( + tracing::error!( "bip_dht: Resolved a TransactionID to a check table lookup but TableRefresh \ found..." ); None } None => { - error!( + tracing::error!( "bip_dht: Resolved a TransactionID to a check table lookup but no action \ found..." ); @@ -939,21 +938,21 @@ where )) } Some(TableAction::Bootstrap(_, _)) => { - error!( + tracing::error!( "bip_dht: Resolved a TransactionID to a check table lookup but TableBootstrap \ found..." ); None } Some(TableAction::Refresh(_)) => { - error!( + tracing::error!( "bip_dht: Resolved a TransactionID to a check table lookup but TableRefresh \ found..." ); None } None => { - error!( + tracing::error!( "bip_dht: Resolved a TransactionID to a check table lookup but no action \ found..." ); @@ -1032,9 +1031,9 @@ fn attempt_rebootstrap( ) -> BoxFuture<'static, Option> { async move { // Increment the bootstrap counter - let attempt = attempts.fetch_add(1, Ordering::Acquire); + let attempt = attempts.fetch_add(1, Ordering::AcqRel) + 1; - warn!("bip_dht: Bootstrap attempt {} failed, attempting a rebootstrap...", attempt); + tracing::warn!("bip_dht: Bootstrap attempt {} failed, attempting a rebootstrap...", attempt); // Check if we reached the maximum bootstrap attempts if attempt >= MAX_BOOTSTRAP_ATTEMPTS { @@ -1079,7 +1078,7 @@ fn attempt_rebootstrap( /// Shut down the event loop by sending it a shutdown message with the given cause. async fn shutdown_event_loop(mut main_task_sender: Sender, cause: ShutdownCause) { if main_task_sender.send(OneshotTask::Shutdown(cause)).await.is_err() { - error!("bip_dht: Failed to send a shutdown message to the EventLoop..."); + tracing::error!("bip_dht: Failed to send a shutdown message to the EventLoop..."); } } diff --git a/packages/dht/src/worker/lookup.rs b/packages/dht/src/worker/lookup.rs index b95018690..163737660 100644 --- a/packages/dht/src/worker/lookup.rs +++ b/packages/dht/src/worker/lookup.rs @@ -8,7 +8,6 @@ use bencode::BRefAccess; use futures::channel::mpsc::Sender; use futures::future::BoxFuture; use futures::{FutureExt, SinkExt as _}; -use log::{error, warn}; use tokio::time::{sleep, Duration, Instant}; use util::bt::{self, InfoHash, NodeId}; use util::net; @@ -134,7 +133,7 @@ impl TableLookup { B::BType: PartialEq + Eq + core::hash::Hash + Debug, { let Some((dist_to_beat, _)) = self.active_lookups.lock().unwrap().remove(&trans_id) else { - warn!( + tracing::warn!( "bip_dht: Received expired/unsolicited node response for an active table \ lookup..." ); @@ -238,7 +237,7 @@ impl TableLookup { scheduled_task_sender: Sender, ) -> LookupStatus { if self.active_lookups.lock().unwrap().remove(&trans_id).is_none() { - warn!( + tracing::warn!( "bip_dht: Received expired/unsolicited node timeout for an active table \ lookup..." ); @@ -296,7 +295,7 @@ impl TableLookup { for (node, announce_peer_msg) in node_announces { if out.send((announce_peer_msg, node.addr())).await.is_err() { - error!( + tracing::error!( "bip_dht: TableLookup announce request failed to send through the out \ channel..." ); @@ -350,7 +349,7 @@ impl TableLookup { let get_peers_msg = GetPeersRequest::new(trans_id.as_ref(), self.table_id, self.target_id).encode(); if out.send((get_peers_msg, node.addr())).await.is_err() { - error!("bip_dht: Could not send a lookup message through the channel..."); + tracing::error!("bip_dht: Could not send a lookup message through the channel..."); return LookupStatus::Failed; } @@ -373,7 +372,7 @@ impl TableLookup { .await .is_err() { - error!("bip_dht: Failed to send scheduled task check for lookup timeout"); + tracing::error!("bip_dht: Failed to send scheduled task check for lookup timeout"); } }); } @@ -416,7 +415,7 @@ impl TableLookup { for (node, get_peers_msg, req) in endgame_messages { if out.send((get_peers_msg, node.addr())).await.is_err() { - error!("bip_dht: Could not send an endgame message through the channel..."); + tracing::error!("bip_dht: Could not send an endgame message through the channel..."); return LookupStatus::Failed; } diff --git a/packages/dht/src/worker/messenger.rs b/packages/dht/src/worker/messenger.rs index 78a59f5b2..5af051eab 100644 --- a/packages/dht/src/worker/messenger.rs +++ b/packages/dht/src/worker/messenger.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use futures::channel::mpsc::{self, Receiver, Sender}; use futures::stream::StreamExt; use futures::SinkExt as _; -use log::{info, warn}; use tokio::net::UdpSocket; use tokio::task; @@ -24,7 +23,7 @@ pub fn create_outgoing_messenger(socket: &Arc) -> Sender<(Vec, So send_bytes(&socket, &message[..], addr).await; } - info!("bip_dht: Outgoing messenger received a channel hangup, exiting thread..."); + tracing::info!("bip_dht: Outgoing messenger received a channel hangup, exiting thread..."); }); send @@ -38,7 +37,7 @@ async fn send_bytes(socket: &UdpSocket, bytes: &[u8], addr: SocketAddr) { bytes_sent += num_sent; } else { // TODO: Maybe shut down in this case, will fail on every write... - warn!( + tracing::warn!( "bip_dht: Outgoing messenger failed to write {} bytes to {}; {} bytes written \ before error...", bytes.len(), @@ -64,12 +63,12 @@ pub fn create_incoming_messenger(socket: Arc, send: Sender { - warn!("bip_dht: Incoming messenger failed to receive bytes..."); + tracing::warn!("bip_dht: Incoming messenger failed to receive bytes..."); } } } - info!("bip_dht: Incoming messenger received a channel hangup, exiting thread..."); + tracing::info!("bip_dht: Incoming messenger received a channel hangup, exiting thread..."); }); } diff --git a/packages/dht/src/worker/refresh.rs b/packages/dht/src/worker/refresh.rs index 06fb3480c..33ed6c71d 100644 --- a/packages/dht/src/worker/refresh.rs +++ b/packages/dht/src/worker/refresh.rs @@ -4,7 +4,6 @@ use std::sync::{Arc, Mutex, RwLock}; use futures::channel::mpsc::Sender; use futures::SinkExt as _; -use log::{error, info}; use tokio::time::{sleep, Duration}; use util::bt::{self, NodeId}; @@ -61,7 +60,7 @@ impl TableRefresh { .find(|n| n.status() == NodeStatus::Questionable) .cloned(); - info!("bip_dht: Performing a refresh for bucket {}", refresh_bucket); + tracing::info!("bip_dht: Performing a refresh for bucket {}", refresh_bucket); (node, target_id, node_id) }; @@ -77,7 +76,7 @@ impl TableRefresh { // Send the message if out.send((find_node_msg, node.addr())).await.is_err() { - error!( + tracing::error!( "bip_dht: TableRefresh failed to send a refresh message to the out \ channel..." ); @@ -99,7 +98,7 @@ impl TableRefresh { .await .is_err() { - error!("bip_dht: Failed to send scheduled task check for table refresh"); + tracing::error!("bip_dht: Failed to send scheduled task check for table refresh"); } }); diff --git a/packages/disk/Cargo.toml b/packages/disk/Cargo.toml index 0d48bc677..98b9ee6a8 100644 --- a/packages/disk/Cargo.toml +++ b/packages/disk/Cargo.toml @@ -23,12 +23,13 @@ crossbeam = "0" pin-project = "1" bytes = "1" futures = "0" -log = "0" +tracing = "0" lru-cache = "0" tokio = { version = "1", features = ["full"] } thiserror = "1" [dev-dependencies] +tracing-subscriber = "0" rand = "0" criterion = { version = "0", features = ["async_futures"] } diff --git a/packages/disk/examples/add_torrent.rs b/packages/disk/examples/add_torrent.rs index 90c8e0d36..53befdcd5 100644 --- a/packages/disk/examples/add_torrent.rs +++ b/packages/disk/examples/add_torrent.rs @@ -1,15 +1,30 @@ use std::fs::File; use std::io::{BufRead, Read, Write}; -use std::sync::Arc; +use std::sync::{Arc, Once}; use disk::fs::NativeFileSystem; use disk::{DiskManagerBuilder, IDiskMessage, ODiskMessage}; use futures::{SinkExt, StreamExt}; use metainfo::Metainfo; +use tracing::level_filters::LevelFilter; + +static INIT: Once = Once::new(); + +fn tracing_stdout_init(filter: LevelFilter) { + let builder = tracing_subscriber::fmt().with_max_level(filter).with_ansi(true); + + builder.pretty().with_file(true).init(); + + tracing::info!("Logging initialized"); +} #[tokio::main] async fn main() { - println!("Utility For Allocating Disk Space For A Torrent File"); + INIT.call_once(|| { + tracing_stdout_init(LevelFilter::INFO); + }); + + tracing::info!("Utility For Allocating Disk Space For A Torrent File"); let stdin = std::io::stdin(); let mut input_lines = stdin.lock().lines(); @@ -30,19 +45,17 @@ async fn main() { let filesystem = NativeFileSystem::with_directory(download_path); let disk_manager = DiskManagerBuilder::new().build(Arc::new(filesystem)); - let (mut disk_send, mut disk_recv) = disk_manager.split(); + let (mut disk_send, mut disk_recv) = disk_manager.into_parts(); let total_pieces = metainfo_file.info().pieces().count(); disk_send.send(IDiskMessage::AddTorrent(metainfo_file)).await.unwrap(); - println!(); - let mut good_pieces = 0; while let Some(recv_msg) = disk_recv.next().await { match recv_msg.unwrap() { ODiskMessage::TorrentAdded(hash) => { - println!("Torrent With Hash {hash:?} Successfully Added"); - println!("Torrent Has {good_pieces} Good Pieces Out Of {total_pieces} Total Pieces"); + tracing::info!("Torrent With Hash {hash:?} Successfully Added"); + tracing::info!("Torrent Has {good_pieces} Good Pieces Out Of {total_pieces} Total Pieces"); break; } ODiskMessage::FoundGoodPiece(_, _) => good_pieces += 1, diff --git a/packages/disk/src/disk/manager.rs b/packages/disk/src/disk/manager.rs index 581e66606..327dade3f 100644 --- a/packages/disk/src/disk/manager.rs +++ b/packages/disk/src/disk/manager.rs @@ -5,9 +5,8 @@ use crossbeam::queue::SegQueue; use futures::channel::mpsc::{self, Receiver}; use futures::task::{Context, Poll, Waker}; use futures::{Stream, StreamExt}; -use log::info; use pin_project::pin_project; -use tokio::task::{JoinError, JoinSet}; +use tokio::task::JoinSet; use crate::disk::builder::DiskManagerBuilder; use crate::disk::fs::FileSystem; @@ -63,7 +62,7 @@ impl futures::Sink for DiskManager where F: FileSystem + Send + Sync + 'static, { - type Error = JoinError; + type Error = std::io::Error; fn poll_ready(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().sink.poll_ready(cx) @@ -146,14 +145,27 @@ where } } - fn try_submit_work(&self) -> bool { - let cur_capacity = self.cur_capacity.fetch_add(1, Ordering::SeqCst); + fn try_submit_work(&self, waker: &Waker) -> Result { + let cap = self.cur_capacity.fetch_add(1, Ordering::SeqCst) + 1; + let max = self.max_capacity; - if cur_capacity < self.max_capacity { - true + #[allow(clippy::comparison_chain)] + if cap < max { + tracing::trace!("now have {cap} of capacity: {max}"); + + Ok(cap) + } else if cap == max { + tracing::trace!("at max capacity: {max}"); + + Ok(cap) } else { + self.wake_queue.push(waker.clone()); + tracing::debug!("now have {} pending wakers...", self.wake_queue.len()); + self.cur_capacity.fetch_sub(1, Ordering::SeqCst); - false + tracing::debug!("at over capacity: {cap} of {max}"); + + Err(cap) } } } @@ -163,19 +175,17 @@ where F: FileSystem + Sync + 'static, Arc: Send + Sync, { - type Error = JoinError; + type Error = std::io::Error; fn poll_ready(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.try_submit_work() { - Poll::Ready(Ok(())) - } else { - self.wake_queue.push(cx.waker().clone()); - Poll::Pending + match self.try_submit_work(cx.waker()) { + Ok(_remaining) => Poll::Ready(Ok(())), + Err(_full) => Poll::Pending, } } fn start_send(self: std::pin::Pin<&mut Self>, item: IDiskMessage) -> Result<(), Self::Error> { - info!("Starting Send For DiskManagerSink With IDiskMessage"); + tracing::info!("Starting Send For DiskManagerSink With IDiskMessage"); self.task_set .lock() .unwrap() @@ -185,17 +195,31 @@ where fn poll_flush(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let Ok(mut task_set) = self.task_set.try_lock() else { + tracing::warn!("unable to get task_set lock"); cx.waker().wake_by_ref(); return Poll::Pending; }; + tracing::debug!("flushing the {} tasks", task_set.len()); + while let Some(ready) = match task_set.poll_join_next(cx) { Poll::Ready(ready) => ready, - Poll::Pending => return Poll::Pending, + Poll::Pending => { + tracing::debug!("all {} task(s) are still pending...", task_set.len()); + return Poll::Pending; + } } { match ready { - Ok(()) => continue, - Err(e) => return Poll::Ready(Err(e)), + Ok(()) => { + tracing::trace!("task completed... with {} remaining...", task_set.len()); + + continue; + } + Err(e) => { + tracing::error!("task completed... with {} remaining, with error: {e}", task_set.len()); + + return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e))); + } } } @@ -213,21 +237,31 @@ where #[derive(Debug)] pub struct DiskManagerStream { recv: Receiver, - cur_capacity: Arc, - task_queue: Arc>, + pub cur_capacity: Arc, + wake_queue: Arc>, } impl DiskManagerStream { - fn new(recv: Receiver, cur_capacity: Arc, task_queue: Arc>) -> DiskManagerStream { + fn new(recv: Receiver, cur_capacity: Arc, wake_queue: Arc>) -> DiskManagerStream { DiskManagerStream { recv, cur_capacity, - task_queue, + wake_queue, } } - fn complete_work(&self) { - self.cur_capacity.fetch_sub(1, Ordering::SeqCst); + fn complete_work(&self) -> usize { + let cap = self.cur_capacity.fetch_sub(1, Ordering::SeqCst) - 1; + + tracing::debug!( + "Notify next waker: {} that there is space again: {cap}", + self.wake_queue.len() + ); + if let Some(waker) = self.wake_queue.pop() { + waker.wake(); + }; + + cap } } @@ -235,7 +269,7 @@ impl Stream for DiskManagerStream { type Item = Result; fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - info!("Polling DiskManagerStream For ODiskMessage"); + tracing::info!("Polling DiskManagerStream For ODiskMessage"); match self.recv.poll_next_unpin(cx) { Poll::Ready(Some(msg)) => { @@ -246,10 +280,6 @@ impl Stream for DiskManagerStream { | ODiskMessage::BlockLoaded(_) | ODiskMessage::BlockProcessed(_) => { self.complete_work(); - info!("Notifying DiskManager That We Can Submit More Work"); - while let Some(waker) = self.task_queue.pop() { - waker.wake(); - } } _ => {} } diff --git a/packages/disk/src/disk/tasks/context.rs b/packages/disk/src/disk/tasks/context.rs index 9fb9794a4..aee391e43 100644 --- a/packages/disk/src/disk/tasks/context.rs +++ b/packages/disk/src/disk/tasks/context.rs @@ -88,7 +88,7 @@ where hash_not_exists } - pub async fn update_torrent<'a, C, D>(self, hash: InfoHash, call: C) -> Option + pub async fn update_torrent<'a, C, D>(self, hash: InfoHash, with_state: C) -> Option where C: FnOnce(Arc, MetainfoState) -> BoxFuture<'a, D>, { @@ -101,7 +101,7 @@ where read_torrents.get(&hash)?.clone() }; - Some(call(self.fs.clone(), state.clone()).await) + Some(with_state(self.fs.clone(), state.clone()).await) } pub fn remove_torrent(&self, hash: InfoHash) -> bool { diff --git a/packages/disk/src/disk/tasks/mod.rs b/packages/disk/src/disk/tasks/mod.rs index 6edd2efdf..97951139e 100644 --- a/packages/disk/src/disk/tasks/mod.rs +++ b/packages/disk/src/disk/tasks/mod.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use futures::channel::mpsc::Sender; use futures::lock::Mutex; use futures::{FutureExt, SinkExt as _}; -use log::info; use metainfo::Metainfo; use util::bt::InfoHash; @@ -52,10 +51,14 @@ where }, }; + tracing::trace!("sending output disk message: {out_msg:?}"); + sender .send(out_msg) .await .expect("bip_disk: Failed To Send Out Message In execute_on_pool"); + + tracing::debug!("finished sending output message... "); } async fn execute_add_torrent(file: Metainfo, context: DiskManagerContext, sender: Sender) -> TorrentResult<()> @@ -155,12 +158,9 @@ where let block_result = context .update_torrent(info_hash, |fs, state| { - async move { - info!( - "Processing Block, Acquired Torrent Lock For {:?}", - state.file.info().info_hash() - ); + tracing::trace!("Updating Blocks for Torrent: {info_hash}"); + async move { let piece_accessor = PieceAccessor::new(fs.clone(), state.clone()); // Write Out Piece Out To The Filesystem And Recalculate The Diff @@ -175,17 +175,14 @@ where send_piece_diff(&state.checker, state.file.info().info_hash(), sender.clone(), false).await; - info!( - "Processing Block, Released Torrent Lock For {:?}", - state.file.info().info_hash() - ); - block_result } .boxed() }) .await; + tracing::debug!("Finished Updating Torrent: {info_hash}, {block_result:?}"); + match block_result { Some(result) => Ok(result?), None => Err(BlockError::InfoHashNotFound { hash: info_hash }), diff --git a/packages/disk/tests/add_torrent.rs b/packages/disk/tests/add_torrent.rs index 698a4d9b9..3281dd877 100644 --- a/packages/disk/tests/add_torrent.rs +++ b/packages/disk/tests/add_torrent.rs @@ -1,14 +1,21 @@ -use common::{random_buffer, runtime_loop_with_timeout, InMemoryFileSystem, MultiFileDirectAccessor}; +use common::{ + random_buffer, runtime_loop_with_timeout, tracing_stdout_init, InMemoryFileSystem, MultiFileDirectAccessor, DEFAULT_TIMEOUT, + INIT, +}; use disk::{DiskManagerBuilder, FileSystem as _, IDiskMessage, ODiskMessage}; use futures::future::{self, Either}; -use futures::{FutureExt, SinkExt as _, StreamExt as _}; +use futures::{FutureExt, SinkExt as _}; use metainfo::{Metainfo, MetainfoBuilder, PieceLength}; -use tokio::runtime::Runtime; +use tracing::level_filters::LevelFilter; mod common; #[tokio::test] async fn positive_add_torrent() { + INIT.call_once(|| { + tracing_stdout_init(LevelFilter::INFO); + }); + // Create some "files" as random bytes let data_a = (random_buffer(50), "/path/to/file/a".into()); let data_b = (random_buffer(2000), "/path/to/file/b".into()); @@ -27,18 +34,17 @@ async fn positive_add_torrent() { let filesystem = InMemoryFileSystem::new(); let disk_manager = DiskManagerBuilder::new().build(filesystem.me()); - let (mut send, recv) = disk_manager.split(); + let (mut send, recv) = disk_manager.into_parts(); send.send(IDiskMessage::AddTorrent(metainfo_file)).await.unwrap(); // Verify that zero pieces are marked as good - let mut runtime = Runtime::new().unwrap(); - // Run a runtime loop until we get the TorrentAdded message - let good_pieces = runtime_loop_with_timeout(&mut runtime, 500, (0, recv), |good_pieces, recv, msg| match msg { + let good_pieces = runtime_loop_with_timeout(DEFAULT_TIMEOUT, (0, recv), |good_pieces, recv, msg| match msg { Ok(ODiskMessage::TorrentAdded(_)) => Either::Left(future::ready(good_pieces).boxed()), Ok(ODiskMessage::FoundGoodPiece(_, _)) => Either::Right(future::ready((good_pieces + 1, recv)).boxed()), unexpected => panic!("Unexpected Message: {unexpected:?}"), - }); + }) + .await; assert_eq!(0, good_pieces); diff --git a/packages/disk/tests/common/mod.rs b/packages/disk/tests/common/mod.rs index faf3a1b19..0da4db507 100644 --- a/packages/disk/tests/common/mod.rs +++ b/packages/disk/tests/common/mod.rs @@ -1,7 +1,7 @@ use std::cmp; use std::collections::HashMap; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex, Weak}; +use std::sync::{Arc, Mutex, Once, Weak}; use std::time::Duration; use bytes::BytesMut; @@ -10,10 +10,25 @@ use futures::future::BoxFuture; use futures::stream::Stream; use futures::{future, Sink, SinkExt as _, StreamExt as _}; use metainfo::{Accessor, IntoAccessor, PieceAccess}; -use tokio::runtime::Runtime; use tokio::time::timeout; +use tracing::level_filters::LevelFilter; use util::bt::InfoHash; +#[allow(dead_code)] +pub const DEFAULT_TIMEOUT: Duration = Duration::from_millis(500); + +#[allow(dead_code)] +pub static INIT: Once = Once::new(); + +#[allow(dead_code)] +pub fn tracing_stdout_init(filter: LevelFilter) { + let builder = tracing_subscriber::fmt().with_max_level(filter).with_ansi(true); + + builder.pretty().with_file(true).init(); + + tracing::info!("Logging initialized"); +} + /// Generate buffer of size random bytes. pub fn random_buffer(size: usize) -> Vec { let mut buffer = vec![0u8; size]; @@ -25,40 +40,37 @@ pub fn random_buffer(size: usize) -> Vec { /// /// Returns R or panics if an error occurred in the loop (including a timeout). #[allow(dead_code)] -pub fn runtime_loop_with_timeout<'a, 'b, I, S, F, R>( - runtime: &mut Runtime, - timeout_ms: u64, - initial_state: (I, S), - mut call: F, -) -> R +pub async fn runtime_loop_with_timeout<'a, 'b, I, S, F, R>(timeout_time: Duration, initial_state: (I, S), mut call: F) -> R where F: FnMut(I, S, S::Item) -> future::Either, BoxFuture<'b, (I, S)>>, S: Stream + Unpin, R: 'static, + I: std::fmt::Debug + Clone, { - runtime.block_on(async { - let timeout_future = timeout(Duration::from_millis(timeout_ms), future::pending::<()>()); - - let loop_future = async { - let mut state = initial_state; - loop { - let (init, mut stream) = state; - if let Some(msg) = stream.next().await { - match call(init, stream, msg) { - future::Either::Left(fut) => return fut.await, - future::Either::Right(fut) => state = fut.await, - } - } else { - panic!("End Of Stream Reached"); + let mut state = initial_state; + loop { + let (init, mut stream) = state; + if let Some(msg) = { + timeout(timeout_time, stream.next()) + .await + .unwrap_or_else(|_| panic!("timeout while waiting for next message: {timeout_time:?}, {init:?}")) + } { + match call(init.clone(), stream, msg) { + future::Either::Left(fut) => { + return timeout(timeout_time, fut) + .await + .unwrap_or_else(|_| panic!("timeout waiting for final processing: {timeout_time:?}, {init:?}")); + } + future::Either::Right(fut) => { + state = timeout(timeout_time, fut) + .await + .unwrap_or_else(|_| panic!("timeout waiting for next loop state: {timeout_time:?}, {init:?}")); } } - }; - - tokio::select! { - result = loop_future => result, - _ = timeout_future => panic!("Core Loop Timed Out"), + } else { + panic!("End Of Stream Reached"); } - }) + } } /// Send block with the given metadata and entire data given. @@ -74,7 +86,12 @@ pub async fn send_block( ) where S: Sink + Unpin, M: Fn(&mut [u8]), + >::Error: std::fmt::Display, { + tracing::trace!( + "sending block for torrent: {hash}, index: {piece_index}, block_offset: {block_offset}, block_length: {block_len}" + ); + let mut bytes = BytesMut::new(); bytes.extend_from_slice(data); @@ -84,7 +101,7 @@ pub async fn send_block( sink.send(IDiskMessage::ProcessBlock(block.into())) .await - .unwrap_or_else(|_| panic!("Failed To Send Process Block Message")); + .unwrap_or_else(|e| panic!("Failed To Send Process Block Message: {e}")); } //----------------------------------------------------------------------------// @@ -143,6 +160,7 @@ impl Accessor for MultiFileDirectAccessor { //----------------------------------------------------------------------------// /// Allow us to mock out the file system. +#[derive(Debug)] pub struct InMemoryFileSystem { #[allow(dead_code)] me: Weak, diff --git a/packages/disk/tests/complete_torrent.rs b/packages/disk/tests/complete_torrent.rs index 82f1fae85..13ce603ce 100644 --- a/packages/disk/tests/complete_torrent.rs +++ b/packages/disk/tests/complete_torrent.rs @@ -1,15 +1,27 @@ -use common::{random_buffer, runtime_loop_with_timeout, send_block, InMemoryFileSystem, MultiFileDirectAccessor}; +use common::{ + random_buffer, runtime_loop_with_timeout, send_block, tracing_stdout_init, InMemoryFileSystem, MultiFileDirectAccessor, + DEFAULT_TIMEOUT, INIT, +}; use disk::{DiskManagerBuilder, IDiskMessage, ODiskMessage}; use futures::future::{self, Either}; -use futures::{FutureExt, SinkExt as _, StreamExt as _}; +use futures::{FutureExt, SinkExt as _}; use metainfo::{Metainfo, MetainfoBuilder, PieceLength}; -use tokio::runtime::Runtime; +use tokio::task::JoinSet; +use tracing::level_filters::LevelFilter; mod common; +#[allow(unused_variables)] +#[allow(unreachable_code)] #[allow(clippy::too_many_lines)] #[tokio::test] async fn positive_complete_torrent() { + INIT.call_once(|| { + tracing_stdout_init(LevelFilter::DEBUG); + }); + + let mut tasks = JoinSet::new(); + // Create some "files" as random bytes let data_a = (random_buffer(1023), "/path/to/file/a".into()); let data_b = (random_buffer(2000), "/path/to/file/b".into()); @@ -26,118 +38,62 @@ async fn positive_complete_torrent() { let filesystem = InMemoryFileSystem::new(); let disk_manager = DiskManagerBuilder::new().build(filesystem.clone()); - let (mut send, recv) = disk_manager.split(); + let (mut send, recv) = disk_manager.into_parts(); send.send(IDiskMessage::AddTorrent(metainfo_file.clone())).await.unwrap(); // Verify that zero pieces are marked as good - let mut runtime = Runtime::new().unwrap(); - // Run a runtime loop until we get the TorrentAdded message - let (good_pieces, recv) = runtime_loop_with_timeout(&mut runtime, 500, (0, recv), |good_pieces, recv, msg| match msg { + let (good_pieces, recv) = runtime_loop_with_timeout(DEFAULT_TIMEOUT, (0, recv), |good_pieces, recv, msg| match msg { Ok(ODiskMessage::TorrentAdded(_)) => Either::Left(future::ready((good_pieces, recv)).boxed()), Ok(ODiskMessage::FoundGoodPiece(_, _)) => Either::Right(future::ready((good_pieces + 1, recv)).boxed()), unexpected => panic!("Unexpected Message: {unexpected:?}"), - }); + }) + .await; // Make sure we have no good pieces assert_eq!(0, good_pieces); - // Send a couple blocks that are known to be good, then one bad block - let mut files_bytes = Vec::new(); - files_bytes.extend_from_slice(&data_a.0); - files_bytes.extend_from_slice(&data_b.0); - - // Send piece 0 with a bad last block - send_block( - &mut send, - &files_bytes[0..500], - metainfo_file.info().info_hash(), - 0, - 0, - 500, - |_| (), - ) - .await; - send_block( - &mut send, - &files_bytes[500..1000], - metainfo_file.info().info_hash(), - 0, - 500, - 500, - |_| (), - ) - .await; - send_block( - &mut send, - &files_bytes[1000..1024], - metainfo_file.info().info_hash(), - 0, - 1000, - 24, - |bytes| { - bytes[0] = !bytes[0]; - }, - ) - .await; + let files_bytes = { + let mut b = Vec::new(); + b.extend_from_slice(&data_a.0); + b.extend_from_slice(&data_b.0); + b + }; + + tracing::warn!("send two blocks that are known to be good, then one bad block"); + + let send_one_bad_and_two_good = { + let mut send = send.clone(); + let data = files_bytes.clone(); + let info_hash = metainfo_file.info().info_hash(); + + async move { + // Send piece 0 with a bad last block + send_block(&mut send, &data[0..500], info_hash, 0, 0, 500, |_| ()).await; + send_block(&mut send, &data[500..1000], info_hash, 0, 500, 500, |_| ()).await; + send_block(&mut send, &data[1000..1024], info_hash, 0, 1000, 24, |bytes| { + bytes[0] = !bytes[0]; + }) + .await; + + // Send piece 1 with good blocks + send_block(&mut send, &data[1024..(1024 + 500)], info_hash, 1, 0, 500, |_| ()).await; + send_block(&mut send, &data[(1024 + 500)..(1024 + 1000)], info_hash, 1, 500, 500, |_| ()).await; + send_block(&mut send, &data[(1024 + 1000)..(1024 + 1024)], info_hash, 1, 1000, 24, |_| ()).await; + + // Send piece 2 with good blocks + send_block(&mut send, &data[2048..(2048 + 500)], info_hash, 2, 0, 500, |_| ()).await; + send_block(&mut send, &data[(2048 + 500)..(2048 + 975)], info_hash, 2, 500, 475, |_| ()).await; + } + .boxed() + }; + + tasks.spawn(send_one_bad_and_two_good); + + tracing::warn!("verify that piece 0 is bad, but piece 1 and 2 are good"); - // Send piece 1 with good blocks - send_block( - &mut send, - &files_bytes[1024..(1024 + 500)], - metainfo_file.info().info_hash(), - 1, - 0, - 500, - |_| (), - ) - .await; - send_block( - &mut send, - &files_bytes[(1024 + 500)..(1024 + 1000)], - metainfo_file.info().info_hash(), - 1, - 500, - 500, - |_| (), - ) - .await; - send_block( - &mut send, - &files_bytes[(1024 + 1000)..(1024 + 1024)], - metainfo_file.info().info_hash(), - 1, - 1000, - 24, - |_| (), - ) - .await; - - // Send piece 2 with good blocks - send_block( - &mut send, - &files_bytes[2048..(2048 + 500)], - metainfo_file.info().info_hash(), - 2, - 0, - 500, - |_| (), - ) - .await; - send_block( - &mut send, - &files_bytes[(2048 + 500)..(2048 + 975)], - metainfo_file.info().info_hash(), - 2, - 500, - 475, - |_| (), - ) - .await; - // Verify that piece 0 is bad, but piece 1 and 2 are good let (recv, piece_zero_good, piece_one_good, piece_two_good) = runtime_loop_with_timeout( - &mut runtime, - 500, + DEFAULT_TIMEOUT, ((false, false, false, 0), recv), |(piece_zero_good, piece_one_good, piece_two_good, messages_recvd), recv, msg| { let messages_recvd = messages_recvd + 1; @@ -165,49 +121,47 @@ async fn positive_complete_torrent() { Either::Right(future::ready(((piece_zero_good, piece_one_good, piece_two_good, messages_recvd), recv)).boxed()) } }, - ); + ) + .await; // Assert whether or not pieces were good assert!(!piece_zero_good); assert!(piece_one_good); assert!(piece_two_good); - // Resend piece 0 with good blocks - send_block( - &mut send, - &files_bytes[0..500], - metainfo_file.info().info_hash(), - 0, - 0, - 500, - |_| (), - ) - .await; - send_block( - &mut send, - &files_bytes[500..1000], - metainfo_file.info().info_hash(), - 0, - 500, - 500, - |_| (), - ) - .await; - send_block( - &mut send, - &files_bytes[1000..1024], - metainfo_file.info().info_hash(), - 0, - 1000, - 24, - |_| (), - ) - .await; + { + tokio::task::yield_now().await; + + while let Some(task) = tasks.try_join_next() { + match task { + Ok(()) => continue, + Err(e) => panic!("task joined with error: {e}"), + } + } + + assert!(tasks.is_empty(), "all the tasks should have finished now"); + } + + tracing::warn!("resend piece 0 with good blocks"); + + let resend_with_good_blocks = { + let mut send = send.clone(); + let data = files_bytes.clone(); + let info_hash = metainfo_file.info().info_hash(); + async move { + send_block(&mut send, &data[0..500], info_hash, 0, 0, 500, |_| ()).await; + send_block(&mut send, &data[500..1000], info_hash, 0, 500, 500, |_| ()).await; + send_block(&mut send, &data[1000..1024], info_hash, 0, 1000, 24, |_| ()).await; + } + .boxed() + }; + + tasks.spawn(resend_with_good_blocks); + + tracing::warn!("verify that piece 0 is now good"); - // Verify that piece 0 is now good let piece_zero_good = runtime_loop_with_timeout( - &mut runtime, - 500, + DEFAULT_TIMEOUT, ((false, 0), recv), |(piece_zero_good, messages_recvd), recv, msg| { let messages_recvd = messages_recvd + 1; @@ -233,7 +187,21 @@ async fn positive_complete_torrent() { Either::Right(future::ready(((piece_zero_good, messages_recvd), recv)).boxed()) } }, - ); + ) + .await; + + { + tokio::task::yield_now().await; + + while let Some(task) = tasks.try_join_next() { + match task { + Ok(()) => continue, + Err(e) => panic!("task joined with error: {e}"), + } + } + + assert!(tasks.is_empty(), "all the tasks should have finished now"); + } // Assert whether or not piece was good assert!(piece_zero_good); diff --git a/packages/disk/tests/disk_manager_send_backpressure.rs b/packages/disk/tests/disk_manager_send_backpressure.rs index 363aa3ef5..d58084fb6 100644 --- a/packages/disk/tests/disk_manager_send_backpressure.rs +++ b/packages/disk/tests/disk_manager_send_backpressure.rs @@ -1,12 +1,17 @@ -use common::{random_buffer, InMemoryFileSystem, MultiFileDirectAccessor}; +use common::{random_buffer, tracing_stdout_init, InMemoryFileSystem, MultiFileDirectAccessor, DEFAULT_TIMEOUT, INIT}; use disk::{DiskManagerBuilder, IDiskMessage}; -use futures::{SinkExt as _, StreamExt as _}; +use futures::{FutureExt, SinkExt as _, StreamExt as _}; use metainfo::{Metainfo, MetainfoBuilder, PieceLength}; +use tracing::level_filters::LevelFilter; mod common; #[tokio::test] async fn positive_disk_manager_send_backpressure() { + INIT.call_once(|| { + tracing_stdout_init(LevelFilter::TRACE); + }); + // Create some "files" as random bytes let data_a = (random_buffer(50), "/path/to/file/a".into()); let data_b = (random_buffer(2000), "/path/to/file/b".into()); @@ -27,21 +32,37 @@ async fn positive_disk_manager_send_backpressure() { let (mut m_send, mut m_recv) = DiskManagerBuilder::new() .with_sink_buffer_capacity(1) .build(filesystem.clone()) - .split(); + .into_parts(); // Add a torrent, so our receiver has a single torrent added message buffered - m_send.send(IDiskMessage::AddTorrent(metainfo_file)).await.unwrap(); + tokio::time::timeout(DEFAULT_TIMEOUT, m_send.send(IDiskMessage::AddTorrent(metainfo_file))) + .await + .unwrap() + .unwrap(); // Try to send a remove message (but it should fail) - let result = m_send.send(IDiskMessage::RemoveTorrent(info_hash)).await; - assert!(result.is_err(), "Expected backpressure to prevent sending"); + assert!( + m_send.send(IDiskMessage::RemoveTorrent(info_hash)).now_or_never().is_none(), + "it should have back_pressure" + ); // Receive from our stream to unblock the backpressure - m_recv.next().await; + tokio::time::timeout(DEFAULT_TIMEOUT, m_recv.next()) + .await + .unwrap() + .unwrap() + .unwrap(); // Try to send a remove message again which should go through - m_send.send(IDiskMessage::RemoveTorrent(info_hash)).await.unwrap(); + tokio::time::timeout(DEFAULT_TIMEOUT, m_send.send(IDiskMessage::RemoveTorrent(info_hash))) + .await + .unwrap() + .unwrap(); // Receive confirmation - m_recv.next().await; + tokio::time::timeout(DEFAULT_TIMEOUT, m_recv.next()) + .await + .unwrap() + .unwrap() + .unwrap(); } diff --git a/packages/disk/tests/load_block.rs b/packages/disk/tests/load_block.rs index ed8211584..0bbe9251a 100644 --- a/packages/disk/tests/load_block.rs +++ b/packages/disk/tests/load_block.rs @@ -1,14 +1,19 @@ use bytes::BytesMut; -use common::{random_buffer, InMemoryFileSystem, MultiFileDirectAccessor}; +use common::{random_buffer, tracing_stdout_init, InMemoryFileSystem, MultiFileDirectAccessor, INIT}; use disk::{Block, BlockMetadata, BlockMut, DiskManagerBuilder, IDiskMessage, ODiskMessage}; use futures::{SinkExt as _, StreamExt as _}; use metainfo::{Metainfo, MetainfoBuilder, PieceLength}; use tokio::time::{timeout, Duration}; +use tracing::level_filters::LevelFilter; mod common; #[tokio::test] async fn positive_load_block() { + INIT.call_once(|| { + tracing_stdout_init(LevelFilter::INFO); + }); + // Create some "files" as random bytes let data_a = (random_buffer(1023), "/path/to/file/a".into()); let data_b = (random_buffer(2000), "/path/to/file/b".into()); @@ -37,7 +42,7 @@ async fn positive_load_block() { ); let load_block = BlockMut::new(BlockMetadata::new(metainfo_file.info().info_hash(), 1, 0, 50), load_block); - let (mut send, mut recv) = disk_manager.split(); + let (mut send, mut recv) = disk_manager.into_parts(); send.send(IDiskMessage::AddTorrent(metainfo_file)).await.unwrap(); let timeout_duration = Duration::from_millis(500); diff --git a/packages/disk/tests/process_block.rs b/packages/disk/tests/process_block.rs index 292dc7c67..1394c382e 100644 --- a/packages/disk/tests/process_block.rs +++ b/packages/disk/tests/process_block.rs @@ -1,14 +1,21 @@ use bytes::BytesMut; -use common::{random_buffer, runtime_loop_with_timeout, InMemoryFileSystem, MultiFileDirectAccessor}; +use common::{ + random_buffer, runtime_loop_with_timeout, tracing_stdout_init, InMemoryFileSystem, MultiFileDirectAccessor, DEFAULT_TIMEOUT, + INIT, +}; use disk::{Block, BlockMetadata, DiskManagerBuilder, FileSystem, IDiskMessage, ODiskMessage}; -use futures::{future, FutureExt as _, SinkExt as _, StreamExt as _}; +use futures::{future, FutureExt as _, SinkExt as _}; use metainfo::{Metainfo, MetainfoBuilder, PieceLength}; -use tokio::runtime::Runtime; +use tracing::level_filters::LevelFilter; mod common; #[tokio::test] async fn positive_process_block() { + INIT.call_once(|| { + tracing_stdout_init(LevelFilter::INFO); + }); + // Create some "files" as random bytes let data_a = (random_buffer(1023), "/path/to/file/a".into()); let data_b = (random_buffer(2000), "/path/to/file/b".into()); @@ -33,13 +40,11 @@ async fn positive_process_block() { process_bytes.freeze(), ); - let (mut send, recv) = disk_manager.split(); + let (mut send, recv) = disk_manager.into_parts(); send.send(IDiskMessage::AddTorrent(metainfo_file)).await.unwrap(); - let mut runtime = Runtime::new().unwrap(); runtime_loop_with_timeout( - &mut runtime, - 500, + DEFAULT_TIMEOUT, ((send, Some(process_block)), recv), |(mut send, opt_pblock), recv, msg| match msg { Ok(ODiskMessage::TorrentAdded(_)) => { @@ -55,7 +60,8 @@ async fn positive_process_block() { Ok(ODiskMessage::BlockProcessed(_)) => future::Either::Left(future::ready(()).boxed()), unexpected => panic!("Unexpected Message: {unexpected:?}"), }, - ); + ) + .await; // Verify block was updated in data_b let mut received_file_b = filesystem.open_file(data_b.1).unwrap(); diff --git a/packages/disk/tests/remove_torrent.rs b/packages/disk/tests/remove_torrent.rs index a7a752250..6447d1af4 100644 --- a/packages/disk/tests/remove_torrent.rs +++ b/packages/disk/tests/remove_torrent.rs @@ -1,14 +1,21 @@ use bytes::BytesMut; -use common::{random_buffer, runtime_loop_with_timeout, InMemoryFileSystem, MultiFileDirectAccessor}; +use common::{ + random_buffer, runtime_loop_with_timeout, tracing_stdout_init, InMemoryFileSystem, MultiFileDirectAccessor, DEFAULT_TIMEOUT, + INIT, +}; use disk::{Block, BlockMetadata, DiskManagerBuilder, IDiskMessage, ODiskMessage}; -use futures::{future, FutureExt, SinkExt as _, StreamExt as _}; +use futures::{future, FutureExt, SinkExt as _}; use metainfo::{Metainfo, MetainfoBuilder, PieceLength}; -use tokio::runtime::Runtime; +use tracing::level_filters::LevelFilter; mod common; #[tokio::test] async fn positive_remove_torrent() { + INIT.call_once(|| { + tracing_stdout_init(LevelFilter::INFO); + }); + // Create some "files" as random bytes let data_a = (random_buffer(50), "/path/to/file/a".into()); let data_b = (random_buffer(2000), "/path/to/file/b".into()); @@ -28,15 +35,12 @@ async fn positive_remove_torrent() { let filesystem = InMemoryFileSystem::new(); let disk_manager = DiskManagerBuilder::new().build(filesystem.clone()); - let (mut send, recv) = disk_manager.split(); + let (mut send, recv) = disk_manager.into_parts(); send.send(IDiskMessage::AddTorrent(metainfo_file)).await.unwrap(); // Verify that zero pieces are marked as good - let mut runtime = Runtime::new().unwrap(); - let (mut send, good_pieces, recv) = runtime_loop_with_timeout( - &mut runtime, - 500, + DEFAULT_TIMEOUT, ((send, 0), recv), |(mut send, good_pieces), recv, msg| match msg { Ok(ODiskMessage::TorrentAdded(_)) => { @@ -55,7 +59,8 @@ async fn positive_remove_torrent() { } unexpected => panic!("Unexpected Message: {unexpected:?}"), }, - ); + ) + .await; assert_eq!(0, good_pieces); @@ -66,8 +71,9 @@ async fn positive_remove_torrent() { send.send(IDiskMessage::ProcessBlock(process_block)).await.unwrap(); - runtime_loop_with_timeout(&mut runtime, 500, ((), recv), |(), _, msg| match msg { + runtime_loop_with_timeout(DEFAULT_TIMEOUT, ((), recv), |(), _, msg| match msg { Ok(ODiskMessage::ProcessBlockError(_, _)) => future::Either::Left(future::ready(()).boxed()), unexpected => panic!("Unexpected Message: {unexpected:?}"), - }); + }) + .await; } diff --git a/packages/disk/tests/resume_torrent.rs b/packages/disk/tests/resume_torrent.rs index 5d6444a56..737b875ed 100644 --- a/packages/disk/tests/resume_torrent.rs +++ b/packages/disk/tests/resume_torrent.rs @@ -1,14 +1,21 @@ -use common::{random_buffer, runtime_loop_with_timeout, send_block, InMemoryFileSystem, MultiFileDirectAccessor}; +use common::{ + random_buffer, runtime_loop_with_timeout, send_block, tracing_stdout_init, InMemoryFileSystem, MultiFileDirectAccessor, + DEFAULT_TIMEOUT, INIT, +}; use disk::{DiskManagerBuilder, IDiskMessage, ODiskMessage}; -use futures::{future, FutureExt, SinkExt as _, StreamExt as _}; +use futures::{future, FutureExt, SinkExt as _}; use metainfo::{Metainfo, MetainfoBuilder, PieceLength}; -use tokio::runtime::Runtime; +use tracing::level_filters::LevelFilter; mod common; #[allow(clippy::too_many_lines)] #[tokio::test] async fn positive_complete_torrent() { + INIT.call_once(|| { + tracing_stdout_init(LevelFilter::INFO); + }); + // Create some "files" as random bytes let data_a = (random_buffer(1023), "/path/to/file/a".into()); let data_b = (random_buffer(2000), "/path/to/file/b".into()); @@ -26,18 +33,18 @@ async fn positive_complete_torrent() { let filesystem = InMemoryFileSystem::new(); let disk_manager = DiskManagerBuilder::new().build(filesystem.clone()); - let (mut send, recv) = disk_manager.split(); + let (mut send, recv) = disk_manager.into_parts(); send.send(IDiskMessage::AddTorrent(metainfo_file.clone())).await.unwrap(); // Verify that zero pieces are marked as good - let mut runtime = Runtime::new().unwrap(); // Run a runtime loop until we get the TorrentAdded message - let (good_pieces, recv) = runtime_loop_with_timeout(&mut runtime, 500, (0, recv), |good_pieces, recv, msg| match msg { + let (good_pieces, recv) = runtime_loop_with_timeout(DEFAULT_TIMEOUT, (0, recv), |good_pieces, recv, msg| match msg { Ok(ODiskMessage::TorrentAdded(_)) => future::Either::Left(future::ready((good_pieces, recv)).boxed()), Ok(ODiskMessage::FoundGoodPiece(_, _)) => future::Either::Right(future::ready((good_pieces + 1, recv)).boxed()), unexpected => panic!("Unexpected Message: {unexpected:?}"), - }); + }) + .await; // Make sure we have no good pieces assert_eq!(0, good_pieces); @@ -81,8 +88,7 @@ async fn positive_complete_torrent() { // Verify that piece 0 is good let (recv, piece_zero_good) = runtime_loop_with_timeout( - &mut runtime, - 500, + DEFAULT_TIMEOUT, ((false, 0), recv), |(piece_zero_good, messages_recvd), recv, msg| { let messages_recvd = messages_recvd + 1; @@ -107,7 +113,8 @@ async fn positive_complete_torrent() { future::Either::Right(future::ready(((piece_zero_good, messages_recvd), recv)).boxed()) } }, - ); + ) + .await; // Assert whether or not pieces were good assert!(piece_zero_good); @@ -116,20 +123,22 @@ async fn positive_complete_torrent() { send.send(IDiskMessage::RemoveTorrent(info_hash)).await.unwrap(); // Verify that our torrent was removed - let recv = runtime_loop_with_timeout(&mut runtime, 500, ((), recv), |(), recv, msg| match msg { + let recv = runtime_loop_with_timeout(DEFAULT_TIMEOUT, ((), recv), |(), recv, msg| match msg { Ok(ODiskMessage::TorrentRemoved(_)) => future::Either::Left(future::ready(recv).boxed()), unexpected => panic!("Unexpected Message: {unexpected:?}"), - }); + }) + .await; // Re-add our torrent and verify that we see our good first block send.send(IDiskMessage::AddTorrent(metainfo_file.clone())).await.unwrap(); let (recv, piece_zero_good) = - runtime_loop_with_timeout(&mut runtime, 500, (false, recv), |piece_zero_good, recv, msg| match msg { + runtime_loop_with_timeout(DEFAULT_TIMEOUT, (false, recv), |piece_zero_good, recv, msg| match msg { Ok(ODiskMessage::TorrentAdded(_)) => future::Either::Left(future::ready((recv, piece_zero_good)).boxed()), Ok(ODiskMessage::FoundGoodPiece(_, 0)) => future::Either::Right(future::ready((true, recv)).boxed()), unexpected => panic!("Unexpected Message: {unexpected:?}"), - }); + }) + .await; assert!(piece_zero_good); @@ -189,8 +198,7 @@ async fn positive_complete_torrent() { // Verify last two blocks are good let (piece_one_good, piece_two_good) = runtime_loop_with_timeout( - &mut runtime, - 500, + DEFAULT_TIMEOUT, ((false, false, 0), recv), |(piece_one_good, piece_two_good, messages_recvd), recv, msg| { let messages_recvd = messages_recvd + 1; @@ -216,7 +224,8 @@ async fn positive_complete_torrent() { future::Either::Right(future::ready(((piece_one_good, piece_two_good, messages_recvd), recv)).boxed()) } }, - ); + ) + .await; assert!(piece_one_good); assert!(piece_two_good); diff --git a/packages/magnet/src/lib.rs b/packages/magnet/src/lib.rs index 7cc3fdf23..7892ff692 100644 --- a/packages/magnet/src/lib.rs +++ b/packages/magnet/src/lib.rs @@ -27,7 +27,7 @@ impl Topic { } } else if s.starts_with("urn:btih:") && s.len() == 9 + 32 { // BitTorrent Info Hash, base-32 - base32::decode(base32::Alphabet::RFC4648 { padding: true }, &s[9..]).and_then(|hash| { + base32::decode(base32::Alphabet::Rfc4648 { padding: true }, &s[9..]).and_then(|hash| { match ShaHash::from_hash(&hash[..]) { Ok(sha_hash) => Some(Topic::BitTorrentInfoHash(sha_hash)), Err(_) => None, diff --git a/packages/peer/Cargo.toml b/packages/peer/Cargo.toml index 778c74bc0..eb5e16013 100644 --- a/packages/peer/Cargo.toml +++ b/packages/peer/Cargo.toml @@ -25,8 +25,12 @@ pin-project = "1" crossbeam = "0" bytes = "1" futures = "0" +tracing = "0" tokio = { version = "1", features = ["full"] } tokio-util = {version = "0", features = ["codec"]} nom = "7" thiserror = "1" byteorder = "1" + +[dev-dependencies] +tracing-subscriber = "0" diff --git a/packages/peer/src/manager/mod.rs b/packages/peer/src/manager/mod.rs index 89306fcdb..8ec06e869 100644 --- a/packages/peer/src/manager/mod.rs +++ b/packages/peer/src/manager/mod.rs @@ -20,7 +20,7 @@ mod task; /// /// Any `PeerProtocol` (or plain `Codec`) that wants to be managed by `PeerManager` /// must ensure that its message type implements this trait to provide the necessary hooks. -pub trait ManagedMessage { +pub trait ManagedMessage: std::fmt::Debug { /// Retrieves a keep-alive message variant. fn keep_alive() -> Self; @@ -68,6 +68,7 @@ pub enum PeerManagerOutputError { } /// Messages that can be received from the `PeerManager`. +#[derive(Debug)] pub enum PeerManagerOutputMessage { /// Indicates a peer has been added to the peer manager. PeerAdded(PeerInfo), diff --git a/packages/peer/src/manager/peer_manager.rs b/packages/peer/src/manager/peer_manager.rs index 94e14bde2..f26b1641b 100644 --- a/packages/peer/src/manager/peer_manager.rs +++ b/packages/peer/src/manager/peer_manager.rs @@ -1,28 +1,19 @@ -use std::cmp; use std::collections::HashMap; use std::marker::PhantomData; use std::sync::{Arc, Mutex}; -use std::time::Duration; use crossbeam::queue::SegQueue; -use futures::channel::mpsc::{self, SendError}; -use futures::sink::{Sink, SinkExt}; -use futures::stream::{Stream, StreamExt}; -use futures::task::{Context, Poll}; +use futures::channel::mpsc::{self}; +use futures::sink::Sink; +use futures::stream::Stream; use futures::TryStream; -use pin_project::pin_project; -use tokio::time::{self}; use super::peer_manager_sink::PeerManagerSink; use super::peer_manager_stream::PeerManagerStream; -use super::{ManagedMessage, PeerManagerInputMessage, PeerManagerOutputError, PeerManagerOutputMessage}; -use crate::manager::builder::PeerManagerBuilder; -use crate::manager::error::PeerManagerError; - -const DEFAULT_TIMER_SLOTS: usize = 2048; +use super::ManagedMessage; +use crate::PeerManagerBuilder; /// Manages a set of peers with beating hearts. -#[pin_project] pub struct PeerManager where Peer: Sink> @@ -51,15 +42,11 @@ where /// Create a new `PeerManager` from the given `PeerManagerBuilder`. #[must_use] pub fn from_builder(builder: PeerManagerBuilder) -> PeerManager { - let max_duration = cmp::max(builder.heartbeat_interval(), builder.heartbeat_timeout()); - let tick_duration = Duration::from_millis(max_duration.as_secs() * 1000 / (DEFAULT_TIMER_SLOTS as u64) + 1); - let timer = time::interval(tick_duration); - let (res_send, res_recv) = mpsc::channel(builder.stream_buffer_capacity()); let peers = Arc::new(Mutex::new(HashMap::new())); let task_queue = Arc::new(SegQueue::new()); - let sink = PeerManagerSink::new(timer, builder, res_send, peers.clone(), task_queue.clone()); + let sink = PeerManagerSink::new(builder, res_send, peers.clone(), task_queue.clone()); let stream = PeerManagerStream::new(res_recv, peers); PeerManager { @@ -76,52 +63,3 @@ where (self.sink, self.stream) } } - -impl Sink>> for PeerManager -where - Peer: Sink> - + Stream> - + TryStream - + Send - + Unpin - + 'static, - Message: ManagedMessage + Send + Unpin + 'static, -{ - type Error = PeerManagerError; - - fn poll_ready(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.sink.poll_ready_unpin(cx) - } - - fn start_send( - mut self: std::pin::Pin<&mut Self>, - item: std::io::Result>, - ) -> Result<(), Self::Error> { - self.sink.start_send_unpin(item) - } - - fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.sink.poll_flush_unpin(cx) - } - - fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.sink.poll_close_unpin(cx) - } -} - -impl Stream for PeerManager -where - Peer: Sink> - + Stream> - + TryStream - + Send - + Unpin - + 'static, - Message: ManagedMessage + Send + 'static, -{ - type Item = Result, PeerManagerOutputError>; - - fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.stream.poll_next_unpin(cx) - } -} diff --git a/packages/peer/src/manager/peer_manager_sink.rs b/packages/peer/src/manager/peer_manager_sink.rs index 0fe9f3cd4..8de2624da 100644 --- a/packages/peer/src/manager/peer_manager_sink.rs +++ b/packages/peer/src/manager/peer_manager_sink.rs @@ -8,7 +8,6 @@ use futures::channel::mpsc::{SendError, Sender}; use futures::sink::Sink; use futures::task::{Context, Poll}; use futures::{SinkExt as _, Stream, TryStream}; -use tokio::time::{interval, Interval}; use super::task::run_peer; use super::{PeerManagerInputMessage, PeerManagerOutputError, PeerManagerOutputMessage}; @@ -28,7 +27,6 @@ where + 'static, Message: ManagedMessage + Send + 'static, { - timer: Interval, builder: PeerManagerBuilder, sender: Sender, PeerManagerOutputError>>, #[allow(clippy::type_complexity)] @@ -48,7 +46,6 @@ where { fn clone(&self) -> PeerManagerSink { PeerManagerSink { - timer: interval(self.timer.period()), builder: self.builder, sender: self.sender.clone(), peers: self.peers.clone(), @@ -69,14 +66,12 @@ where { #[allow(clippy::type_complexity)] pub fn new( - timer: Interval, builder: PeerManagerBuilder, sender: Sender, PeerManagerOutputError>>, peers: Arc>>>>, task_queue: Arc>>, ) -> PeerManagerSink { PeerManagerSink { - timer, builder, sender, peers, diff --git a/packages/peer/src/message/mod.rs b/packages/peer/src/message/mod.rs index 74ecba22a..763e700a3 100644 --- a/packages/peer/src/message/mod.rs +++ b/packages/peer/src/message/mod.rs @@ -78,7 +78,9 @@ impl From for std::io::Error { #[derive(Debug, Clone)] pub enum PeerWireProtocolMessage

where - P: PeerProtocol + Clone, + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, { /// Message to keep the connection alive. KeepAlive, @@ -115,7 +117,9 @@ where impl

ManagedMessage for PeerWireProtocolMessage

where - P: PeerProtocol + Clone, + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, { fn keep_alive() -> PeerWireProtocolMessage

{ PeerWireProtocolMessage::KeepAlive @@ -128,7 +132,9 @@ where impl

PeerWireProtocolMessage

where - P: PeerProtocol + Clone, + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, { /// Bytes Needed to encode Byte Slice /// @@ -247,7 +253,12 @@ fn u32_to_usize(value: u32) -> usize { // basis. If possible, we should return the number of bytes needed for the rest of the WHOLE message. // This allows clients to only re invoke the parser when it knows it has enough of the data. -fn parse_keep_alive(input: &[u8]) -> IResult<&[u8], std::io::Result>> { +fn parse_keep_alive

(input: &[u8]) -> IResult<&[u8], std::io::Result>> +where + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, +{ map( tuple(( be_u32::<_, nom::error::Error<&[u8]>>, @@ -257,7 +268,12 @@ fn parse_keep_alive(input: &[u8]) -> IResult<&[u8], std )(input) } -fn parse_choke(input: &[u8]) -> IResult<&[u8], std::io::Result>> { +fn parse_choke

(input: &[u8]) -> IResult<&[u8], std::io::Result>> +where + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, +{ map( tuple(( value(CHOKE_MESSAGE_LEN, be_u32::<_, nom::error::Error<&[u8]>>), @@ -267,7 +283,12 @@ fn parse_choke(input: &[u8]) -> IResult<&[u8], std::io: )(input) } -fn parse_unchoke(input: &[u8]) -> IResult<&[u8], std::io::Result>> { +fn parse_unchoke

(input: &[u8]) -> IResult<&[u8], std::io::Result>> +where + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, +{ map( tuple(( value(UNCHOKE_MESSAGE_LEN, be_u32::<_, nom::error::Error<&[u8]>>), @@ -277,7 +298,12 @@ fn parse_unchoke(input: &[u8]) -> IResult<&[u8], std::i )(input) } -fn parse_interested(input: &[u8]) -> IResult<&[u8], std::io::Result>> { +fn parse_interested

(input: &[u8]) -> IResult<&[u8], std::io::Result>> +where + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, +{ map( tuple(( value(INTERESTED_MESSAGE_LEN, be_u32::<_, nom::error::Error<&[u8]>>), @@ -287,7 +313,12 @@ fn parse_interested(input: &[u8]) -> IResult<&[u8], std )(input) } -fn parse_uninterested(input: &[u8]) -> IResult<&[u8], std::io::Result>> { +fn parse_uninterested

(input: &[u8]) -> IResult<&[u8], std::io::Result>> +where + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, +{ map( tuple(( value(UNINTERESTED_MESSAGE_LEN, be_u32::<_, nom::error::Error<&[u8]>>), @@ -297,7 +328,12 @@ fn parse_uninterested(input: &[u8]) -> IResult<&[u8], s )(input) } -fn parse_have(input: &[u8]) -> IResult<&[u8], std::io::Result>> { +fn parse_have

(input: &[u8]) -> IResult<&[u8], std::io::Result>> +where + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, +{ map( preceded( tuple(( @@ -310,7 +346,12 @@ fn parse_have(input: &[u8]) -> IResult<&[u8], std::io:: )(input) } -fn parse_bitfield(input: &[u8]) -> IResult<&[u8], std::io::Result>> { +fn parse_bitfield

(input: &[u8]) -> IResult<&[u8], std::io::Result>> +where + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, +{ map( preceded( tuple(( @@ -323,7 +364,12 @@ fn parse_bitfield(input: &[u8]) -> IResult<&[u8], std:: )(input) } -fn parse_request(input: &[u8]) -> IResult<&[u8], std::io::Result>> { +fn parse_request

(input: &[u8]) -> IResult<&[u8], std::io::Result>> +where + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, +{ map( preceded( tuple(( @@ -336,7 +382,12 @@ fn parse_request(input: &[u8]) -> IResult<&[u8], std::i )(input) } -fn parse_piece(input: &[u8]) -> IResult<&[u8], std::io::Result>> { +fn parse_piece

(input: &[u8]) -> IResult<&[u8], std::io::Result>> +where + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, +{ map( preceded( tuple(( @@ -352,7 +403,12 @@ fn parse_piece(input: &[u8]) -> IResult<&[u8], std::io: )(input) } -fn parse_cancel(input: &[u8]) -> IResult<&[u8], std::io::Result>> { +fn parse_cancel

(input: &[u8]) -> IResult<&[u8], std::io::Result>> +where + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, +{ map( preceded( tuple(( @@ -365,17 +421,27 @@ fn parse_cancel(input: &[u8]) -> IResult<&[u8], std::io )(input) } -fn parse_bits_extension(input: &[u8]) -> IResult<&[u8], std::io::Result>> { +fn parse_bits_extension

(input: &[u8]) -> IResult<&[u8], std::io::Result>> +where + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, +{ map( |input| BitsExtensionMessage::parse_bytes(input), |res_bits_ext| res_bits_ext.map(|bits_ext| PeerWireProtocolMessage::BitsExtension(bits_ext)), )(input) } -fn parse_prot_extension<'a, P: PeerProtocol + Clone>( +fn parse_prot_extension<'a, P>( input: &'a [u8], ext_protocol: &mut P, -) -> IResult<&'a [u8], std::io::Result>> { +) -> IResult<&'a [u8], std::io::Result>> +where + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, +{ map( |input| match ext_protocol.parse_bytes(input) { Ok(msg) => Ok((input, Ok(PeerWireProtocolMessage::ProtExtension(msg)))), @@ -390,7 +456,9 @@ fn parse_prot_extension<'a, P: PeerProtocol + Clone>( fn parse_message<'a, P>(bytes: &'a [u8], ext_protocol: &mut P) -> IResult<&'a [u8], std::io::Result>> where - P: PeerProtocol + Clone, + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, { alt(( parse_keep_alive, diff --git a/packages/peer/src/message/null.rs b/packages/peer/src/message/null.rs index ec8976886..2af79c6f4 100644 --- a/packages/peer/src/message/null.rs +++ b/packages/peer/src/message/null.rs @@ -1,3 +1,4 @@ /// Enumeration of messages for `NullProtocol`. #[allow(clippy::module_name_repetitions)] +#[derive(Debug)] pub enum NullProtocolMessage {} diff --git a/packages/peer/src/message/prot_ext/mod.rs b/packages/peer/src/message/prot_ext/mod.rs index cdbe81ca0..f53249182 100644 --- a/packages/peer/src/message/prot_ext/mod.rs +++ b/packages/peer/src/message/prot_ext/mod.rs @@ -66,6 +66,7 @@ impl From for PeerExtensionProtocolMessageError { } /// Enumeration of `BEP 10` extension protocol compatible messages. +#[derive(Debug)] pub enum PeerExtensionProtocolMessage

where P: PeerProtocol, @@ -77,7 +78,9 @@ where impl

PeerExtensionProtocolMessage

where - P: PeerProtocol + Clone, + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, { /// Returns the number of bytes needed encode a given slice. /// diff --git a/packages/peer/src/protocol/extension.rs b/packages/peer/src/protocol/extension.rs index 4ac8a7d01..4b010b121 100644 --- a/packages/peer/src/protocol/extension.rs +++ b/packages/peer/src/protocol/extension.rs @@ -33,7 +33,9 @@ where impl

PeerProtocol for PeerExtensionProtocol

where - P: PeerProtocol + Clone, + P: PeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, { type ProtocolMessage = PeerExtensionProtocolMessage

; type ProtocolMessageError = PeerExtensionProtocolMessageError; diff --git a/packages/peer/src/protocol/wire.rs b/packages/peer/src/protocol/wire.rs index 37a34c542..a353dffae 100644 --- a/packages/peer/src/protocol/wire.rs +++ b/packages/peer/src/protocol/wire.rs @@ -28,7 +28,9 @@ where impl

PeerProtocol for PeerWireProtocol

where - P: PeerProtocol + NestedPeerProtocol + Clone, + P: PeerProtocol + NestedPeerProtocol + Clone + std::fmt::Debug, +

::ProtocolMessage: std::fmt::Debug, +

::ProtocolMessageError: std::fmt::Debug, { type ProtocolMessage = PeerWireProtocolMessage

; diff --git a/packages/peer/tests/common/connected_channel.rs b/packages/peer/tests/common/connected_channel.rs new file mode 100644 index 000000000..b10cd0298 --- /dev/null +++ b/packages/peer/tests/common/connected_channel.rs @@ -0,0 +1,79 @@ +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; + +use futures::channel::mpsc::{Receiver, Sender}; +use futures::{Sink, SinkExt as _, Stream, StreamExt as _}; + +#[derive(Debug)] +pub struct ConnectedChannel { + send: Sender, + recv: Arc>>, +} + +impl Clone for ConnectedChannel { + fn clone(&self) -> Self { + Self { + send: self.send.clone(), + recv: self.recv.clone(), + } + } +} + +impl Sink for ConnectedChannel { + type Error = std::io::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.send + .poll_ready_unpin(cx) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::ConnectionAborted, e)) + } + + fn start_send(mut self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + self.send + .start_send_unpin(item) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::ConnectionAborted, e)) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.send + .poll_flush_unpin(cx) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::ConnectionAborted, e)) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.send + .poll_close_unpin(cx) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::ConnectionAborted, e)) + } +} + +impl Stream for ConnectedChannel { + type Item = O; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Ok(recv) = self.recv.try_lock() else { + cx.waker().wake_by_ref(); + return Poll::Pending; + }; + + Pin::new(recv).poll_next_unpin(cx) + } +} + +#[must_use] +pub fn connected_channel(capacity: usize) -> (ConnectedChannel, ConnectedChannel) { + let (send_one, recv_one) = futures::channel::mpsc::channel(capacity); + let (send_two, recv_two) = futures::channel::mpsc::channel(capacity); + + ( + ConnectedChannel { + send: send_one, + recv: Arc::new(Mutex::new(recv_two)), + }, + ConnectedChannel { + send: send_two, + recv: Arc::new(Mutex::new(recv_one)), + }, + ) +} diff --git a/packages/peer/tests/common/mod.rs b/packages/peer/tests/common/mod.rs index 8805b8f23..a0209e89d 100644 --- a/packages/peer/tests/common/mod.rs +++ b/packages/peer/tests/common/mod.rs @@ -1,82 +1,116 @@ -use std::io; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::Once; +use std::time::Duration; -use futures::channel::mpsc::{Receiver, Sender}; +use futures::channel::mpsc::SendError; use futures::sink::Sink; use futures::stream::Stream; -use futures::task::{Context, Poll}; -use futures::StreamExt as _; +use futures::{SinkExt as _, StreamExt as _, TryStream}; +use peer::error::PeerManagerError; +use peer::{ManagedMessage, PeerInfo, PeerManagerInputMessage, PeerManagerOutputError, PeerManagerOutputMessage}; +use thiserror::Error; +use tokio::time::error::Elapsed; +use tracing::level_filters::LevelFilter; -#[derive(Debug)] -pub struct ConnectedChannel { - send: Sender, - recv: Arc>>, -} +pub mod connected_channel; -impl Clone for ConnectedChannel { - fn clone(&self) -> Self { - Self { - send: self.send.clone(), - recv: self.recv.clone(), - } - } +#[derive(Debug, Error)] +pub enum Error +where + Message: ManagedMessage + Send + 'static, +{ + #[error("Timed Out")] + TimedOut(#[from] Elapsed), + + #[error("Receiver Closed")] + ReceiverClosed(), + + #[error("Peer Manager Input Error {0}")] + PeerManagerErr(#[from] PeerManagerError), + + #[error("Peer Manager Output Error {0}")] + PeerManagerOutputErr(#[from] PeerManagerOutputError), + + #[error("Failed to correct response, but got: {0:?}")] + WrongResponse(#[from] PeerManagerOutputMessage), + + #[error("Failed to receive Peer Added with matching infohash: got: {0:?}, expected: {1:?}")] + InfoHashMissMatch(PeerInfo, PeerInfo), } -impl Sink for ConnectedChannel { - type Error = std::io::Error; +#[allow(dead_code)] +pub const DEFAULT_TIMEOUT: Duration = Duration::from_millis(500); - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.send) - .poll_ready(cx) - .map_err(|e| std::io::Error::new(io::ErrorKind::ConnectionAborted, e)) - } +#[allow(dead_code)] +pub static INIT: Once = Once::new(); - fn start_send(mut self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - Pin::new(&mut self.send) - .start_send(item) - .map_err(|e| std::io::Error::new(io::ErrorKind::ConnectionAborted, e)) - } +#[allow(dead_code)] +pub fn tracing_stdout_init(filter: LevelFilter) { + let builder = tracing_subscriber::fmt().with_max_level(filter).with_ansi(true); - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.send) - .poll_flush(cx) - .map_err(|e| std::io::Error::new(io::ErrorKind::ConnectionAborted, e)) - } + builder.pretty().with_file(true).init(); - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.send) - .poll_close(cx) - .map_err(|e| std::io::Error::new(io::ErrorKind::ConnectionAborted, e)) - } + tracing::info!("Logging initialized"); } -impl Stream for ConnectedChannel { - type Item = O; +pub async fn add_peer( + send: &mut Si, + recv: &mut St, + info: PeerInfo, + peer: Peer, +) -> Result<(), Error> +where + Si: Sink>, Error = PeerManagerError> + Unpin, + St: Stream, PeerManagerOutputError>> + Unpin, + Peer: Sink> + + Stream> + + TryStream + + Send + + Unpin + + 'static, + Message: ManagedMessage + Send + 'static, +{ + let () = tokio::time::timeout(DEFAULT_TIMEOUT, send.send(Ok(PeerManagerInputMessage::AddPeer(info, peer)))).await??; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let Ok(recv) = self.recv.try_lock() else { - cx.waker().wake_by_ref(); - return Poll::Pending; - }; + let response = tokio::time::timeout(DEFAULT_TIMEOUT, recv.next()) + .await? + .ok_or(Error::ReceiverClosed())??; - Pin::new(recv).poll_next_unpin(cx) + if let PeerManagerOutputMessage::PeerAdded(info_recv) = response { + if info_recv == info { + Ok(()) + } else { + Err(Error::InfoHashMissMatch(info_recv, info)) + } + } else { + Err(Error::from(response)) } } -#[must_use] -pub fn connected_channel(capacity: usize) -> (ConnectedChannel, ConnectedChannel) { - let (send_one, recv_one) = futures::channel::mpsc::channel(capacity); - let (send_two, recv_two) = futures::channel::mpsc::channel(capacity); - - ( - ConnectedChannel { - send: send_one, - recv: Arc::new(Mutex::new(recv_two)), - }, - ConnectedChannel { - send: send_two, - recv: Arc::new(Mutex::new(recv_one)), - }, - ) +pub async fn remove_peer(send: &mut Si, recv: &mut St, info: PeerInfo) -> Result<(), Error> +where + Si: Sink>, Error = PeerManagerError> + Unpin, + St: Stream, PeerManagerOutputError>> + Unpin, + Peer: Sink> + + Stream> + + TryStream + + Send + + Unpin + + 'static, + Message: ManagedMessage + Send + 'static, +{ + let () = tokio::time::timeout(DEFAULT_TIMEOUT, send.send(Ok(PeerManagerInputMessage::RemovePeer(info)))).await??; + + let response = tokio::time::timeout(DEFAULT_TIMEOUT, recv.next()) + .await? + .ok_or(Error::ReceiverClosed())??; + + if let PeerManagerOutputMessage::PeerRemoved(info_recv) = response { + if info_recv == info { + Ok(()) + } else { + Err(Error::InfoHashMissMatch(info_recv, info)) + } + } else { + Err(Error::from(response)) + } } diff --git a/packages/peer/tests/peer_manager_send_backpressure.rs b/packages/peer/tests/peer_manager_send_backpressure.rs index 90f33d672..40e466750 100644 --- a/packages/peer/tests/peer_manager_send_backpressure.rs +++ b/packages/peer/tests/peer_manager_send_backpressure.rs @@ -1,10 +1,12 @@ -use common::{connected_channel, ConnectedChannel}; -use futures::{SinkExt as _, StreamExt}; +use common::connected_channel::{connected_channel, ConnectedChannel}; +use common::{add_peer, remove_peer, tracing_stdout_init, INIT}; +use futures::SinkExt as _; use handshake::Extensions; use peer::error::PeerManagerError; use peer::messages::PeerWireProtocolMessage; use peer::protocols::NullProtocol; -use peer::{PeerInfo, PeerManager, PeerManagerBuilder, PeerManagerInputMessage, PeerManagerOutputMessage}; +use peer::{PeerInfo, PeerManagerBuilder, PeerManagerInputMessage}; +use tracing::level_filters::LevelFilter; use util::bt; mod common; @@ -16,9 +18,14 @@ type Peer = ConnectedChannel< #[tokio::test] async fn positive_peer_manager_send_backpressure() { - let mut manager = PeerManagerBuilder::new() + INIT.call_once(|| { + tracing_stdout_init(LevelFilter::TRACE); + }); + + let (mut send, mut recv) = PeerManagerBuilder::new() .with_peer_capacity(1) - .build::>(); + .build::>() + .into_parts(); // Create two peers let (peer_one, peer_two): (Peer, Peer) = connected_channel(5); @@ -26,34 +33,10 @@ async fn positive_peer_manager_send_backpressure() { let peer_two_info = create_peer_info("127.0.0.1:1", [1u8; bt::PEER_ID_LEN], [1u8; bt::INFO_HASH_LEN]); // Add peer one to the manager - add_peer(&mut manager, peer_one_info, peer_one).await; + add_peer(&mut send, &mut recv, peer_one_info, peer_one).await.unwrap(); // Try to add peer two, but make sure it was denied (start send returned not ready) - let () = try_add_peer(&mut manager, peer_two_info, peer_two.clone()); - - // Remove peer one from the manager - remove_peer(&mut manager, peer_one_info).await; - - // Try to add peer two, but make sure it goes through - add_peer(&mut manager, peer_two_info, peer_two).await; -} - -async fn add_peer(manager: &mut PeerManager>, peer_info: PeerInfo, peer: Peer) { - manager - .send(Ok(PeerManagerInputMessage::AddPeer(peer_info, peer))) - .await - .expect("Failed to send AddPeer message"); - - let response = manager.next().await.expect("Failed to receive response"); - - match response { - Ok(PeerManagerOutputMessage::PeerAdded(info)) => assert_eq!(peer_info, info), - _ => panic!("Unexpected response when adding peer"), - }; -} - -fn try_add_peer(manager: &mut PeerManager>, peer_info: PeerInfo, peer: Peer) { - let Err(full) = manager.start_send_unpin(Ok(PeerManagerInputMessage::AddPeer(peer_info, peer))) else { + let Err(full) = send.start_send_unpin(Ok(PeerManagerInputMessage::AddPeer(peer_two_info, peer_two.clone()))) else { panic!("it should not add to full peer store") }; @@ -62,20 +45,12 @@ fn try_add_peer(manager: &mut PeerManager>, peer_info: PeerInfo) { - manager - .send(Ok(PeerManagerInputMessage::RemovePeer(peer_info))) - .await - .expect("Failed to send RemovePeer message"); - - let response = manager.next().await.expect("Failed to receive response"); + // Remove peer one from the manager + remove_peer(&mut send, &mut recv, peer_one_info).await.unwrap(); - match response { - Ok(PeerManagerOutputMessage::PeerRemoved(info)) => assert_eq!(peer_info, info), - _ => panic!("Unexpected response when removing peer"), - }; + // Try to add peer two, but make sure it goes through + add_peer(&mut send, &mut recv, peer_two_info, peer_two).await.unwrap(); } fn create_peer_info(addr: &str, peer_id: [u8; bt::PEER_ID_LEN], info_hash: [u8; bt::INFO_HASH_LEN]) -> PeerInfo { diff --git a/packages/select/Cargo.toml b/packages/select/Cargo.toml index af8744671..1ea72239c 100644 --- a/packages/select/Cargo.toml +++ b/packages/select/Cargo.toml @@ -26,8 +26,8 @@ bit-set = "0" bytes = "1" futures = "0" rand = "0" -log = "0" thiserror = "1" +tracing = "0" [dev-dependencies] tokio = { version = "1", features = ["full"] } diff --git a/packages/select/src/discovery/ut_metadata.rs b/packages/select/src/discovery/ut_metadata.rs index 2642536cd..7150e5b46 100644 --- a/packages/select/src/discovery/ut_metadata.rs +++ b/packages/select/src/discovery/ut_metadata.rs @@ -9,7 +9,6 @@ use bytes::BytesMut; use futures::sink::Sink; use futures::stream::Stream; use handshake::InfoHash; -use log::info; use metainfo::{Info, Metainfo}; use peer::messages::builders::ExtendedMessageBuilder; use peer::messages::{ @@ -109,7 +108,7 @@ impl UtMetadataModule { .is_some(); let opt_metadata_size = ext_info.their_message().and_then(ExtendedMessage::metadata_size); - info!( + tracing::info!( "Our Support For UtMetadata Is {:?} And {:?} Support For UtMetadata Is {:?} With Metadata Size {:?}", our_support, info.addr(), @@ -219,7 +218,7 @@ impl UtMetadataModule { let selected_message = pending.messages.pop().unwrap(); self.active_requests .push(generate_active_request(selected_message, *selected_peer)); - info!( + tracing::info!( "Requesting Piece {:?} For Hash {:?}", selected_message.piece(), selected_peer.hash() diff --git a/packages/util/src/sha/mod.rs b/packages/util/src/sha/mod.rs index 0d2e51ed6..296da78a3 100644 --- a/packages/util/src/sha/mod.rs +++ b/packages/util/src/sha/mod.rs @@ -52,6 +52,17 @@ impl ShaHash { } } +impl std::fmt::Display for ShaHash { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "0x")?; + + for byte in &self.hash { + write!(f, "{byte:02x}")?; + } + Ok(()) + } +} + impl AsRef<[u8]> for ShaHash { fn as_ref(&self) -> &[u8] { &self.hash diff --git a/packages/utracker/src/client/mod.rs b/packages/utracker/src/client/mod.rs index 0bc2ffae7..4bd8bde47 100644 --- a/packages/utracker/src/client/mod.rs +++ b/packages/utracker/src/client/mod.rs @@ -251,7 +251,7 @@ impl RequestLimiter { /// /// It is invalid to not make the request after this returns true. pub fn can_initiate(&self) -> bool { - let current_active_requests = self.active.fetch_add(1, Ordering::AcqRel); + let current_active_requests = self.active.fetch_add(1, Ordering::AcqRel) + 1; // If the number of requests stored previously was less than the capacity, // then the add is considered good and a request can (SHOULD) be made.