diff --git a/bgp/src/connection.rs b/bgp/src/connection.rs index 3d1cd884..bea01bde 100644 --- a/bgp/src/connection.rs +++ b/bgp/src/connection.rs @@ -125,6 +125,7 @@ pub trait BgpListener { /// * `unnumbered_manager` - Optional unnumbered manager for resolving scope_id -> interface fn bind( addr: A, + log: Logger, unnumbered_manager: Option>, ) -> Result where @@ -148,6 +149,9 @@ pub trait BgpListener { min_ttl: Option, md5_key: Option, ) -> Result<(), Error>; + + /// `SocketAddr` the listener is receiving connections on + fn bind_addr(&self) -> SocketAddr; } /// Implementors of this trait initiate outbound BGP connections to peers. diff --git a/bgp/src/connection_channel.rs b/bgp/src/connection_channel.rs index 336ef7b2..75981aa7 100644 --- a/bgp/src/connection_channel.rs +++ b/bgp/src/connection_channel.rs @@ -23,7 +23,7 @@ use crate::{ unnumbered::UnnumberedManager, }; use mg_common::lock; -use slog::Logger; +use slog::{Logger, info}; use std::{ collections::{BTreeMap, HashMap}, net::{SocketAddr, ToSocketAddrs}, @@ -81,6 +81,7 @@ impl std::fmt::Display for Network { } /// A listener that can listen for messages on our simulated network. +#[derive(Debug)] struct Listener { rx: Receiver<(SocketAddr, Endpoint)>, } @@ -206,6 +207,7 @@ impl BgpListenerChannel { impl BgpListener for BgpListenerChannel { fn bind( addr: A, + log: Logger, unnumbered_manager: Option>, ) -> Result where @@ -219,6 +221,7 @@ impl BgpListener for BgpListenerChannel { "at least one address required".into(), ))?; let listener = NET.bind(addr); + info!(log, "BgpConnectionChannel Listener created"; "listener" => ?listener); Ok(Self { listener, bind_addr: addr, @@ -271,6 +274,10 @@ impl BgpListener for BgpListenerChannel { // Policy application is ignored for test connections Ok(()) } + + fn bind_addr(&self) -> SocketAddr { + self.bind_addr + } } /// A struct to implement BgpConnection for our simulated test network. diff --git a/bgp/src/connection_tcp.rs b/bgp/src/connection_tcp.rs index 13edd950..76ad2765 100644 --- a/bgp/src/connection_tcp.rs +++ b/bgp/src/connection_tcp.rs @@ -26,7 +26,7 @@ use crate::{ unnumbered::UnnumberedManager, }; use mg_common::lock; -use slog::Logger; +use slog::{Logger, info}; use std::{ collections::BTreeMap, io::Read, @@ -79,6 +79,7 @@ enum RecvError { pub struct BgpListenerTcp { listener: TcpListener, unnumbered_manager: Option>, + bind_addr: SocketAddr, } impl BgpListenerTcp { @@ -103,6 +104,7 @@ impl BgpListenerTcp { impl BgpListener for BgpListenerTcp { fn bind( addr: A, + log: Logger, unnumbered_manager: Option>, ) -> Result where @@ -116,10 +118,15 @@ impl BgpListener for BgpListenerTcp { "at least one address required".into(), ))?; let listener = TcpListener::bind(addr)?; + let bind_addr = listener.local_addr()?; + + info!(log, "TcpListener created"; "listener" => ?listener); listener.set_nonblocking(true)?; + Ok(Self { listener, unnumbered_manager, + bind_addr, }) } @@ -228,6 +235,10 @@ impl BgpListener for BgpListenerTcp { Ok(()) } + + fn bind_addr(&self) -> SocketAddr { + self.bind_addr + } } pub struct BgpConnectorTcp; diff --git a/bgp/src/dispatcher.rs b/bgp/src/dispatcher.rs index 0e5f8e5a..bb044ece 100644 --- a/bgp/src/dispatcher.rs +++ b/bgp/src/dispatcher.rs @@ -5,12 +5,11 @@ use crate::{ IO_TIMEOUT, connection::{BgpConnection, BgpListener}, - log::dispatcher_log, session::{FsmEvent, PeerId, SessionEndpoint, SessionEvent}, unnumbered::UnnumberedManager, }; use mg_common::lock; -use slog::Logger; +use slog::{Logger, debug, error, info, warn}; use std::{ collections::BTreeMap, net::SocketAddr, @@ -34,7 +33,7 @@ pub struct Dispatcher { shutdown: AtomicBool, listen: String, - log: Logger, + log: Mutex, } impl Dispatcher { @@ -44,11 +43,17 @@ impl Dispatcher { log: Logger, unnumbered_manager: Option>, ) -> Self { + let log = log.new(slog::o!( + "component" => crate::COMPONENT_BGP, + "module" => crate::MOD_NEIGHBOR, + "unit" => UNIT_DISPATCHER, + )); + Self { peer_to_session, unnumbered_manager, listen, - log, + log: Mutex::new(log), shutdown: AtomicBool::new(false), } } @@ -84,70 +89,69 @@ impl Dispatcher { } pub fn run>(&self) { - dispatcher_log!(self, - info, - "dispatcher started"; - "listen_address" => &self.listen - ); + let mut log = lock!(self.log).clone(); + info!(log, "dispatcher started"); + 'listener: loop { + info!(log, "starting listener with bind arg: {}", &self.listen); + // We need to check the shutdown flag in the listener loop so we can // still return even if bind() keeps failing and we're stuck if self.shutdown.load(Ordering::Acquire) { - dispatcher_log!(self, - info, - "dispatcher caught shutdown flag from listener loop"; - "listen_address" => &self.listen + info!( + log, + "dispatcher caught shutdown flag from listener loop" ); self.shutdown.store(false, Ordering::Release); break 'listener; } - dispatcher_log!(self, - debug, - "listener bind: {}", &self.listen; - "listen_address" => &self.listen - ); + let listener = match Listener::bind( &self.listen, + log.clone(), self.unnumbered_manager.clone(), ) { Ok(l) => l, Err(e) => { - dispatcher_log!(self, - error, - "listener bind error: {e}"; - "listen_address" => &self.listen - ); + error!(log, "listener bind error: {e}"); sleep(Duration::from_secs(1)); // XXX: possible death loop? continue 'listener; } }; + + // If the user requested to bind on port 0, a random port will be selected, + // so we capture the port in the logger context after the listener has been + // started + let bound_log = + log.new(slog::o!("bind_addr" => listener.bind_addr())); + *lock!(self.log) = bound_log.clone(); + log = bound_log; + + info!(log, "transitioning to accept loop"); 'accept: loop { // We also need to check the shutdown flag inside the accept // loop, because we won't restart the listener loop unless we've // encountered an error indicating we can't just call accept() // again and we need a whole new listener. if self.shutdown.load(Ordering::Acquire) { - dispatcher_log!(self, - info, - "dispatcher caught shutdown flag from accept loop"; - "listen_address" => &self.listen + info!( + log, + "dispatcher caught shutdown flag from accept loop" ); self.shutdown.store(false, Ordering::Release); break 'listener; } let accepted = match listener.accept( - self.log.clone(), + log.clone(), self.peer_to_session.clone(), IO_TIMEOUT, ) { Ok(c) => { - dispatcher_log!(self, - debug, + debug!(log, "accepted inbound connection from: {}", c.peer(); "peer" => c.peer(), - "listen_address" => &self.listen ); c } @@ -155,17 +159,17 @@ impl Dispatcher { continue 'accept; } Err(e) => { - dispatcher_log!(self, - error, - "listener accept error: {e}"; - "listen_address" => &self.listen - ); + error!(log, "listener accept error: {e}"); continue 'listener; } }; let peer_addr = accepted.peer(); let key = self.resolve_session_key(peer_addr); + let session_log = log.new(slog::o!( + "peer" => peer_addr, + "session_key" => format!("{key:?}"), + )); match lock!(self.peer_to_session).get(&key).cloned() { Some(session_endpoint) => { @@ -177,12 +181,8 @@ impl Dispatcher { if let Err(e) = Listener::apply_policy(&accepted, min_ttl, md5_key) { - dispatcher_log!(self, - warn, - "failed to apply policy for connection from {}: {e}", peer_addr; - "listen_address" => &self.listen, - "peer" => format!("{}", peer_addr), - "session_key" => format!("{:?}", key), + warn!(session_log, + "failed to apply policy for connection"; "error" => format!("{e}") ); } @@ -192,34 +192,24 @@ impl Dispatcher { SessionEvent::TcpConnectionAcked(accepted), )) { - dispatcher_log!(self, - error, - "failed to send connected event to session for {}: {e}", peer_addr; - "listen_address" => &self.listen, - "peer" => format!("{}", peer_addr), - "session_key" => format!("{:?}", key) + error!(session_log, + "failed to send connected event to session"; + "error" => format!("{e}") ); continue 'listener; } } None => { - dispatcher_log!(self, - debug, - "no session found for peer, dropping connection"; - "peer" => format!("{}", peer_addr), - "resolved_key" => format!("{:?}", key), - "listen_address" => &self.listen + debug!( + session_log, + "no session found for peer, dropping connection" ); continue 'accept; } } } } - dispatcher_log!(self, - info, - "dispatcher shutdown complete"; - "listen_address" => &self.listen - ); + info!(log, "dispatcher shutdown complete"); } pub fn listen_addr(&self) -> &str { @@ -227,9 +217,9 @@ impl Dispatcher { } pub fn shutdown(&self) { - dispatcher_log!(self, info, - "dispatcher received shutdown request, setting shutdown flag"; - "listen_address" => &self.listen + info!( + lock!(self.log), + "dispatcher received shutdown request, setting shutdown flag" ); self.shutdown.store(true, Ordering::Release); } @@ -237,11 +227,6 @@ impl Dispatcher { impl Drop for Dispatcher { fn drop(&mut self) { - dispatcher_log!(self, - debug, - "dropping dispatcher with listen_addr {}", - &self.listen; - "listen_address" => &self.listen - ); + debug!(lock!(self.log), "dropping dispatcher"); } } diff --git a/bgp/src/log.rs b/bgp/src/log.rs index fb9f4114..b919e7e6 100644 --- a/bgp/src/log.rs +++ b/bgp/src/log.rs @@ -210,43 +210,6 @@ macro_rules! collision_log { }; } -macro_rules! dispatcher_log { - ($self:expr, $level:ident, $msg:expr; $($key:expr => $value:expr),*) => { - slog::$level!($self.log, - $msg; - "component" => crate::COMPONENT_BGP, - "module" => crate::MOD_NEIGHBOR, - "unit" => UNIT_DISPATCHER, - $($key => $value),* - ) - }; - ($self:expr, $level:ident, $msg:expr, $($args:expr),*; $($key:expr => $value:expr),*) => { - slog::$level!($self.log, - $msg, $($args),*; - "component" => crate::COMPONENT_BGP, - "module" => crate::MOD_NEIGHBOR, - "unit" => UNIT_DISPATCHER, - $($key => $value),* - ) - }; - ($self:expr, $level:ident, $msg:expr) => { - slog::$level!($self.log, - $msg; - "component" => crate::COMPONENT_BGP, - "module" => crate::MOD_NEIGHBOR, - "unit" => UNIT_DISPATCHER, - ) - }; - ($self:expr, $level:ident, $msg:expr, $($args:expr),*) => { - slog::$level!($self.log, - $msg, $($args),*; - "component" => crate::COMPONENT_BGP, - "module" => crate::MOD_NEIGHBOR, - "unit" => UNIT_DISPATCHER, - ) - }; -} - #[allow(unused_macros)] macro_rules! connection_log { ($self:expr, $level:ident, $msg:expr; $($key:expr => $value:expr),*) => { @@ -341,6 +304,6 @@ macro_rules! connection_log_lite { #[allow(unused_imports)] pub(crate) use { - collision_log, connection_log, connection_log_lite, dispatcher_log, - session_log, session_log_lite, + collision_log, connection_log, connection_log_lite, session_log, + session_log_lite, }; diff --git a/mgd/src/main.rs b/mgd/src/main.rs index 9c0e8746..04c0fddf 100644 --- a/mgd/src/main.rs +++ b/mgd/src/main.rs @@ -96,6 +96,10 @@ struct RunArgs { /// SocketAddr the MGS service is listening on. #[arg(long, default_value = "[::1]:12225")] mgs_addr: SocketAddr, + + /// SocketAddr for the BGP Dispatcher to listen on. + #[arg(long, default_value = "[::]:179")] + bgp_dispatcher_addr: SocketAddr, } fn main() { @@ -274,7 +278,7 @@ fn init_bgp(args: &RunArgs, log: &Logger) -> BgpContext { let bgp_dispatcher = bgp::dispatcher::Dispatcher::::new( peer_to_session.clone(), - "[::]:179".into(), + args.bgp_dispatcher_addr.to_string(), log.clone(), Some(bgp_context.unnumbered_manager.clone()), // Enable link-local connection routing );