-
Notifications
You must be signed in to change notification settings - Fork 0
feat: websockets-powered network layer #443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Deploying kolme with
|
Latest commit: |
9a5364e
|
Status: | ✅ Deploy successful! |
Preview URL: | https://0627b906.kolme.pages.dev |
Branch Preview URL: | https://websockets-network-layer.kolme.pages.dev |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a WebSockets-powered network layer alongside the existing libp2p Kademlia-based networking. The implementation adds WebSocket support for gossip communication and block synchronization, allowing nodes to connect using either transport method.
- Adds WebSocket support for all gossip operations (publishing, block requests/responses)
- Introduces new builder methods to configure WebSocket binds and connections
- Updates test infrastructure to use WebSocket networking instead of Kademlia
Reviewed Changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
packages/kolme/src/gossip/websockets.rs | Implements complete WebSocket networking layer with client/server support |
packages/kolme/src/gossip.rs | Integrates WebSocket manager with existing gossip system |
packages/kolme/src/gossip/sync_manager.rs | Updates sync manager to support both peer types (libp2p and WebSocket) |
packages/kolme/src/gossip/messages.rs | Adds Clone trait and Debug implementation to gossip messages |
packages/kolme/src/testtasks/kademlia_helper.rs | Adds WebSocket test helper functions |
packages/kolme/src/api_server.rs | Improves error handling and block loading logic |
packages/kolme/Cargo.toml | Removes optional dependencies for WebSocket support |
packages/kolme-test/src/tx_evicted_mempool.rs | Converts tests to use WebSocket networking |
packages/kolme-test/src/p2p_websockets_sanity.rs | Adds new WebSocket sanity test |
packages/kolme-test/src/lib.rs | Registers new WebSocket test module |
static NEXT_ID: AtomicU64 = AtomicU64::new(0); | ||
let tx_private = WebsocketsPrivateSender { | ||
tx: tx_private, | ||
id: NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The static NEXT_ID variable could potentially overflow after 2^64 connections. Consider using a wrapping counter or adding overflow handling to prevent potential issues in long-running applications.
static NEXT_ID: AtomicU64 = AtomicU64::new(0); | |
let tx_private = WebsocketsPrivateSender { | |
tx: tx_private, | |
id: NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed), | |
// Use fetch_update to wrap the counter explicitly after u64::MAX | |
static NEXT_ID: AtomicU64 = AtomicU64::new(0); | |
let id = NEXT_ID.fetch_update(std::sync::atomic::Ordering::Relaxed, std::sync::atomic::Ordering::Relaxed, |x| { | |
Some(if x == u64::MAX { 0 } else { x + 1 }) | |
}).unwrap_or(0); | |
let tx_private = WebsocketsPrivateSender { | |
tx: tx_private, | |
id, |
Copilot uses AI. Check for mistakes.
pub(super) async fn get_incoming(&mut self) -> WebsocketsMessage<App> { | ||
match self.rx_message.recv().await { | ||
Some(msg) => msg, | ||
None => std::future::pending().await, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using std::future::pending().await
creates an infinite wait when the channel is closed. This could lead to resource leaks or hanging tasks. Consider returning an error or using a different termination strategy.
pub(super) async fn get_incoming(&mut self) -> WebsocketsMessage<App> { | |
match self.rx_message.recv().await { | |
Some(msg) => msg, | |
None => std::future::pending().await, | |
} | |
pub(super) async fn get_incoming(&mut self) -> Option<WebsocketsMessage<App>> { | |
self.rx_message.recv().await |
Copilot uses AI. Check for mistakes.
Ok(()) => tracing::warn!( | ||
%local_display_name, | ||
"Unexpected exit from gossip::websockets::launch_client_inner for {server}" | ||
), | ||
Err(e) => tracing::warn!( | ||
%local_display_name, | ||
"Error from gossip::websockets::launch_client_inner for {server}: {e}" | ||
), | ||
} | ||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 5-second sleep delay is hardcoded. Consider making this configurable or using exponential backoff for reconnection attempts to improve resilience and reduce unnecessary load.
Ok(()) => tracing::warn!( | |
%local_display_name, | |
"Unexpected exit from gossip::websockets::launch_client_inner for {server}" | |
), | |
Err(e) => tracing::warn!( | |
%local_display_name, | |
"Error from gossip::websockets::launch_client_inner for {server}: {e}" | |
), | |
} | |
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; | |
Ok(()) => { | |
tracing::warn!( | |
%local_display_name, | |
"Unexpected exit from gossip::websockets::launch_client_inner for {server}" | |
); | |
backoff = MIN_BACKOFF; | |
} | |
Err(e) => { | |
tracing::warn!( | |
%local_display_name, | |
"Error from gossip::websockets::launch_client_inner for {server}: {e}" | |
); | |
backoff = cmp::min(backoff * 2, MAX_BACKOFF); | |
} | |
} | |
tokio::time::sleep(tokio::time::Duration::from_secs(backoff)).await; |
Copilot uses AI. Check for mistakes.
tracing::warn!(%server_state.local_display_name, "Error from gossip::websockets::launch_server_inner for {bind}: {e}") | ||
} | ||
} | ||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate hardcoded 5-second sleep delay. This should be consistent with the client reconnection delay and made configurable.
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; | |
tokio::time::sleep(tokio::time::Duration::from_secs(RECONNECTION_DELAY_SECS)).await; |
Copilot uses AI. Check for mistakes.
No description provided.