Skip to content

feat: websockets-powered network layer #443

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 3, 2025
Merged

Conversation

snoyberg
Copy link
Member

No description provided.

Copy link

cloudflare-workers-and-pages bot commented Jul 31, 2025

Deploying kolme with  Cloudflare Pages  Cloudflare Pages

Latest commit: 9a5364e
Status: ✅  Deploy successful!
Preview URL: https://0627b906.kolme.pages.dev
Branch Preview URL: https://websockets-network-layer.kolme.pages.dev

View logs

@snoyberg snoyberg requested review from psibi and Copilot and removed request for psibi July 31, 2025 15:39
@snoyberg snoyberg marked this pull request as ready for review July 31, 2025 15:39
Copilot

This comment was marked as outdated.

Base automatically changed from more-debug-info to main August 3, 2025 05:17
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a WebSockets-powered network layer alongside the existing libp2p Kademlia-based networking. The implementation adds WebSocket support for gossip communication and block synchronization, allowing nodes to connect using either transport method.

  • Adds WebSocket support for all gossip operations (publishing, block requests/responses)
  • Introduces new builder methods to configure WebSocket binds and connections
  • Updates test infrastructure to use WebSocket networking instead of Kademlia

Reviewed Changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
packages/kolme/src/gossip/websockets.rs Implements complete WebSocket networking layer with client/server support
packages/kolme/src/gossip.rs Integrates WebSocket manager with existing gossip system
packages/kolme/src/gossip/sync_manager.rs Updates sync manager to support both peer types (libp2p and WebSocket)
packages/kolme/src/gossip/messages.rs Adds Clone trait and Debug implementation to gossip messages
packages/kolme/src/testtasks/kademlia_helper.rs Adds WebSocket test helper functions
packages/kolme/src/api_server.rs Improves error handling and block loading logic
packages/kolme/Cargo.toml Removes optional dependencies for WebSocket support
packages/kolme-test/src/tx_evicted_mempool.rs Converts tests to use WebSocket networking
packages/kolme-test/src/p2p_websockets_sanity.rs Adds new WebSocket sanity test
packages/kolme-test/src/lib.rs Registers new WebSocket test module

Comment on lines +336 to +339
static NEXT_ID: AtomicU64 = AtomicU64::new(0);
let tx_private = WebsocketsPrivateSender {
tx: tx_private,
id: NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
Copy link
Preview

Copilot AI Aug 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The static NEXT_ID variable could potentially overflow after 2^64 connections. Consider using a wrapping counter or adding overflow handling to prevent potential issues in long-running applications.

Suggested change
static NEXT_ID: AtomicU64 = AtomicU64::new(0);
let tx_private = WebsocketsPrivateSender {
tx: tx_private,
id: NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
// Use fetch_update to wrap the counter explicitly after u64::MAX
static NEXT_ID: AtomicU64 = AtomicU64::new(0);
let id = NEXT_ID.fetch_update(std::sync::atomic::Ordering::Relaxed, std::sync::atomic::Ordering::Relaxed, |x| {
Some(if x == u64::MAX { 0 } else { x + 1 })
}).unwrap_or(0);
let tx_private = WebsocketsPrivateSender {
tx: tx_private,
id,

Copilot uses AI. Check for mistakes.

Comment on lines +96 to +100
pub(super) async fn get_incoming(&mut self) -> WebsocketsMessage<App> {
match self.rx_message.recv().await {
Some(msg) => msg,
None => std::future::pending().await,
}
Copy link
Preview

Copilot AI Aug 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using std::future::pending().await creates an infinite wait when the channel is closed. This could lead to resource leaks or hanging tasks. Consider returning an error or using a different termination strategy.

Suggested change
pub(super) async fn get_incoming(&mut self) -> WebsocketsMessage<App> {
match self.rx_message.recv().await {
Some(msg) => msg,
None => std::future::pending().await,
}
pub(super) async fn get_incoming(&mut self) -> Option<WebsocketsMessage<App>> {
self.rx_message.recv().await

Copilot uses AI. Check for mistakes.

Comment on lines +119 to +128
Ok(()) => tracing::warn!(
%local_display_name,
"Unexpected exit from gossip::websockets::launch_client_inner for {server}"
),
Err(e) => tracing::warn!(
%local_display_name,
"Error from gossip::websockets::launch_client_inner for {server}: {e}"
),
}
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
Copy link
Preview

Copilot AI Aug 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 5-second sleep delay is hardcoded. Consider making this configurable or using exponential backoff for reconnection attempts to improve resilience and reduce unnecessary load.

Suggested change
Ok(()) => tracing::warn!(
%local_display_name,
"Unexpected exit from gossip::websockets::launch_client_inner for {server}"
),
Err(e) => tracing::warn!(
%local_display_name,
"Error from gossip::websockets::launch_client_inner for {server}: {e}"
),
}
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
Ok(()) => {
tracing::warn!(
%local_display_name,
"Unexpected exit from gossip::websockets::launch_client_inner for {server}"
);
backoff = MIN_BACKOFF;
}
Err(e) => {
tracing::warn!(
%local_display_name,
"Error from gossip::websockets::launch_client_inner for {server}: {e}"
);
backoff = cmp::min(backoff * 2, MAX_BACKOFF);
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(backoff)).await;

Copilot uses AI. Check for mistakes.

tracing::warn!(%server_state.local_display_name, "Error from gossip::websockets::launch_server_inner for {bind}: {e}")
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
Copy link
Preview

Copilot AI Aug 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate hardcoded 5-second sleep delay. This should be consistent with the client reconnection delay and made configurable.

Suggested change
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(RECONNECTION_DELAY_SECS)).await;

Copilot uses AI. Check for mistakes.

@snoyberg snoyberg merged commit 640ffa7 into main Aug 3, 2025
3 checks passed
@snoyberg snoyberg deleted the websockets-network-layer branch August 3, 2025 10:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants