Skip to content

Commit

Permalink
allow configurable client addr
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn authored and spacekookie committed Feb 5, 2024
1 parent 3985691 commit b229ce1
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
5 changes: 3 additions & 2 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ async fn build_quinn_client_config(config: &GossipConfig) -> eyre::Result<quinn:
pub async fn gossip_client_endpoint(config: &GossipConfig) -> eyre::Result<quinn::Endpoint> {
let client_config = build_quinn_client_config(config).await?;

let mut client = quinn::Endpoint::client(SocketAddr::from((config.bind_addr.ip(), 0)))?;
let mut client = quinn::Endpoint::client(config.client_addr)?;

client.set_default_client_config(client_config);
Ok(client)
Expand Down Expand Up @@ -1540,7 +1540,7 @@ mod tests {
use corro_types::{
api::{ColumnName, TableName},
base::CrsqlDbVersion,
config::{Config, TlsConfig},
config::{Config, TlsConfig, DEFAULT_GOSSIP_CLIENT_ADDR},
pubsub::pack_columns,
tls::{generate_ca, generate_client_cert, generate_server_cert},
};
Expand Down Expand Up @@ -1788,6 +1788,7 @@ mod tests {

let gossip_config = GossipConfig {
bind_addr: "127.0.0.1:0".parse()?,
client_addr: DEFAULT_GOSSIP_CLIENT_ADDR,
external_addr: None,
bootstrap: vec![],
tls: Some(TlsConfig {
Expand Down
19 changes: 16 additions & 3 deletions crates/corro-agent/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::{
sync::{mpsc, Mutex, RwLock},
time::error::Elapsed,
};
use tracing::{debug, debug_span, warn, Instrument};
use tracing::{debug, debug_span, info, warn, Instrument};

use crate::api::peer::gossip_client_endpoint;

Expand Down Expand Up @@ -52,8 +52,21 @@ impl Transport {
rtt_tx: mpsc::Sender<(SocketAddr, Duration)>,
) -> eyre::Result<Self> {
let mut endpoints = vec![];
for _ in 0..8 {
endpoints.push(gossip_client_endpoint(config).await?);
let endpoints_count = if config.client_addr.port() == 0 {
// zero port means we'll use whatever is available,
// corrosion can use multiple sockets and reduce the risk of filling kernel buffers
8
} else {
// non-zero client addr port means we can only use 1
1
};
for i in 0..endpoints_count {
let ep = gossip_client_endpoint(config).await?;
info!(
"Transport ({i}) for outgoing connections bound to socket {}",
ep.local_addr().unwrap()
);
endpoints.push(ep);
}
Ok(Self(Arc::new(TransportInner {
endpoints,
Expand Down
12 changes: 11 additions & 1 deletion crates/corro-types/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::SocketAddr;
use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6};

use camino::Utf8PathBuf;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -109,6 +109,8 @@ pub struct GossipConfig {
#[serde(alias = "addr")]
pub bind_addr: SocketAddr,
pub external_addr: Option<SocketAddr>,
#[serde(default = "default_gossip_client_addr")]
pub client_addr: SocketAddr,
#[serde(default)]
pub bootstrap: Vec<String>,
#[serde(default)]
Expand All @@ -127,6 +129,13 @@ fn default_gossip_idle_timeout() -> u32 {
DEFAULT_GOSSIP_IDLE_TIMEOUT
}

pub const DEFAULT_GOSSIP_CLIENT_ADDR: SocketAddr =
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0u16, 0, 0));

fn default_gossip_client_addr() -> SocketAddr {
DEFAULT_GOSSIP_CLIENT_ADDR
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TlsConfig {
/// Certificate file
Expand Down Expand Up @@ -295,6 +304,7 @@ impl ConfigBuilder {
.gossip_addr
.ok_or(ConfigBuilderError::GossipAddrRequired)?,
external_addr: self.external_addr,
client_addr: default_gossip_client_addr(),
bootstrap: self.bootstrap.unwrap_or_default(),
plaintext: self.tls.is_none(),
tls: self.tls,
Expand Down

0 comments on commit b229ce1

Please sign in to comment.