Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bgp/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pub trait BgpListener<Cnx: BgpConnection> {
/// * `unnumbered_manager` - Optional unnumbered manager for resolving scope_id -> interface
fn bind<A: ToSocketAddrs>(
addr: A,
log: Logger,
unnumbered_manager: Option<Arc<dyn UnnumberedManager>>,
) -> Result<Self, Error>
where
Expand All @@ -148,6 +149,9 @@ pub trait BgpListener<Cnx: BgpConnection> {
min_ttl: Option<u8>,
md5_key: Option<String>,
) -> Result<(), Error>;

/// `SocketAddr` the listener is receiving connections on
fn bind_addr(&self) -> SocketAddr;
}

/// Implementors of this trait initiate outbound BGP connections to peers.
Expand Down
9 changes: 8 additions & 1 deletion bgp/src/connection_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
unnumbered::UnnumberedManager,
};
use mg_common::lock;
use slog::Logger;
use slog::{Logger, info};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using slog::info!, please use crate::log::connection_log or connection_log_lite.
They have a bunch of the slog KV values pre-populated that add additional context to the logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that the custom macros do not support debug formatting (the ? prefix):

error: no rules expected `?`
   --> bgp/src/connection_tcp.rs:120:73
    |
120 |         connection_log!(log, info, "TcpListener created"; "listener" => ?listener);
    |                                                                         ^ no rules expected this token in macro call
    |
   ::: bgp/src/log.rs:251:1
    |
251 | macro_rules! connection_log {
    | --------------------------- when calling this macro
    |
note: while trying to match meta-variable `$value:expr`
   --> bgp/src/log.rs:252:58
    |
252 |     ($self:expr, $level:ident, $msg:expr; $($key:expr => $value:expr),*) => {
    |                                                          ^^^^^^^^^^^

    Checking ddmd v0.1.0 (/disk1/workspace/maghemite/ddmd)
error: could not compile `bgp` (lib) due to 1 previous error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per our conversation in chat, I've updated this logic to use the slog::o! method of updating and carrying forward the log context:

pfexec ./target/debug/mgd run --bgp-dispatcher-addr "[::]:0" | looker
18:35:48.595Z INFO slog-rs (mgd): signal handler waiting for context
    module = admin
    unit = signal
18:35:48.663Z INFO slog-rs (mgd): interface monitor started
    module = unnumbered manager
    unit = daemon
18:35:48.663Z INFO slog-rs (bgp): dispatcher started
    module = neighbor
    unit = dispatcher
18:35:48.663Z INFO slog-rs (bgp): starting listener with bind arg: [::]:0
    module = neighbor
    unit = dispatcher
18:35:48.663Z INFO slog-rs (bgp): TcpListener created
    listener = TcpListener { addr: [::]:48382, fd: 10 }
    module = neighbor
    unit = dispatcher
18:35:48.663Z INFO slog-rs (bgp): transitioning to accept loop
    bind_addr = [::]:48382
    module = neighbor
    unit = dispatcher
...

use std::{
collections::{BTreeMap, HashMap},
net::{SocketAddr, ToSocketAddrs},
Expand Down Expand Up @@ -81,6 +81,7 @@ impl std::fmt::Display for Network {
}

/// A listener that can listen for messages on our simulated network.
#[derive(Debug)]
struct Listener {
rx: Receiver<(SocketAddr, Endpoint<Message>)>,
}
Expand Down Expand Up @@ -206,6 +207,7 @@ impl BgpListenerChannel {
impl BgpListener<BgpConnectionChannel> for BgpListenerChannel {
fn bind<A: ToSocketAddrs>(
addr: A,
log: Logger,
unnumbered_manager: Option<Arc<dyn UnnumberedManager>>,
) -> Result<Self, Error>
where
Expand All @@ -219,6 +221,7 @@ impl BgpListener<BgpConnectionChannel> for BgpListenerChannel {
"at least one address required".into(),
))?;
let listener = NET.bind(addr);
info!(log, "BgpConnectionChannel Listener created"; "listener" => ?listener);
Ok(Self {
listener,
bind_addr: addr,
Expand Down Expand Up @@ -271,6 +274,10 @@ impl BgpListener<BgpConnectionChannel> for BgpListenerChannel {
// Policy application is ignored for test connections
Ok(())
}

fn bind_addr(&self) -> SocketAddr {
self.bind_addr
}
}

/// A struct to implement BgpConnection for our simulated test network.
Expand Down
13 changes: 12 additions & 1 deletion bgp/src/connection_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
unnumbered::UnnumberedManager,
};
use mg_common::lock;
use slog::Logger;
use slog::{Logger, info};
use std::{
collections::BTreeMap,
io::Read,
Expand Down Expand Up @@ -79,6 +79,7 @@ enum RecvError {
pub struct BgpListenerTcp {
listener: TcpListener,
unnumbered_manager: Option<Arc<dyn UnnumberedManager>>,
bind_addr: SocketAddr,
}

impl BgpListenerTcp {
Expand All @@ -103,6 +104,7 @@ impl BgpListenerTcp {
impl BgpListener<BgpConnectionTcp> for BgpListenerTcp {
fn bind<A: ToSocketAddrs>(
addr: A,
log: Logger,
unnumbered_manager: Option<Arc<dyn UnnumberedManager>>,
) -> Result<Self, Error>
where
Expand All @@ -116,10 +118,15 @@ impl BgpListener<BgpConnectionTcp> for BgpListenerTcp {
"at least one address required".into(),
))?;
let listener = TcpListener::bind(addr)?;
let bind_addr = listener.local_addr()?;

info!(log, "TcpListener created"; "listener" => ?listener);
listener.set_nonblocking(true)?;

Ok(Self {
listener,
unnumbered_manager,
bind_addr,
})
}

Expand Down Expand Up @@ -228,6 +235,10 @@ impl BgpListener<BgpConnectionTcp> for BgpListenerTcp {

Ok(())
}

fn bind_addr(&self) -> SocketAddr {
self.bind_addr
}
}

pub struct BgpConnectorTcp;
Expand Down
121 changes: 53 additions & 68 deletions bgp/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
use crate::{
IO_TIMEOUT,
connection::{BgpConnection, BgpListener},
log::dispatcher_log,
session::{FsmEvent, PeerId, SessionEndpoint, SessionEvent},
unnumbered::UnnumberedManager,
};
use mg_common::lock;
use slog::Logger;
use slog::{Logger, debug, error, info, warn};
use std::{
collections::BTreeMap,
net::SocketAddr,
Expand All @@ -34,7 +33,7 @@ pub struct Dispatcher<Cnx: BgpConnection> {

shutdown: AtomicBool,
listen: String,
log: Logger,
log: Mutex<Logger>,
}

impl<Cnx: BgpConnection + 'static> Dispatcher<Cnx> {
Expand All @@ -44,11 +43,17 @@ impl<Cnx: BgpConnection + 'static> Dispatcher<Cnx> {
log: Logger,
unnumbered_manager: Option<Arc<dyn UnnumberedManager>>,
) -> Self {
let log = log.new(slog::o!(
"component" => crate::COMPONENT_BGP,
"module" => crate::MOD_NEIGHBOR,
"unit" => UNIT_DISPATCHER,
));

Self {
peer_to_session,
unnumbered_manager,
listen,
log,
log: Mutex::new(log),
shutdown: AtomicBool::new(false),
}
}
Expand Down Expand Up @@ -84,88 +89,87 @@ impl<Cnx: BgpConnection + 'static> Dispatcher<Cnx> {
}

pub fn run<Listener: BgpListener<Cnx>>(&self) {
dispatcher_log!(self,
info,
"dispatcher started";
"listen_address" => &self.listen
);
let mut log = lock!(self.log).clone();
info!(log, "dispatcher started");

'listener: loop {
info!(log, "starting listener with bind arg: {}", &self.listen);

// We need to check the shutdown flag in the listener loop so we can
// still return even if bind() keeps failing and we're stuck
if self.shutdown.load(Ordering::Acquire) {
dispatcher_log!(self,
info,
"dispatcher caught shutdown flag from listener loop";
"listen_address" => &self.listen
info!(
log,
"dispatcher caught shutdown flag from listener loop"
);
self.shutdown.store(false, Ordering::Release);
break 'listener;
}
dispatcher_log!(self,
debug,
"listener bind: {}", &self.listen;
"listen_address" => &self.listen
);

let listener = match Listener::bind(
&self.listen,
log.clone(),
self.unnumbered_manager.clone(),
) {
Ok(l) => l,
Err(e) => {
dispatcher_log!(self,
error,
"listener bind error: {e}";
"listen_address" => &self.listen
);
error!(log, "listener bind error: {e}");
sleep(Duration::from_secs(1));
// XXX: possible death loop?
continue 'listener;
}
};

// If the user requested to bind on port 0, a random port will be selected,
// so we capture the port in the logger context after the listener has been
// started
let bound_log =
log.new(slog::o!("bind_addr" => listener.bind_addr()));
*lock!(self.log) = bound_log.clone();
log = bound_log;

info!(log, "transitioning to accept loop");
'accept: loop {
// We also need to check the shutdown flag inside the accept
// loop, because we won't restart the listener loop unless we've
// encountered an error indicating we can't just call accept()
// again and we need a whole new listener.
if self.shutdown.load(Ordering::Acquire) {
dispatcher_log!(self,
info,
"dispatcher caught shutdown flag from accept loop";
"listen_address" => &self.listen
info!(
log,
"dispatcher caught shutdown flag from accept loop"
);
self.shutdown.store(false, Ordering::Release);
break 'listener;
}

let accepted = match listener.accept(
self.log.clone(),
log.clone(),
self.peer_to_session.clone(),
IO_TIMEOUT,
) {
Ok(c) => {
dispatcher_log!(self,
debug,
debug!(log,
"accepted inbound connection from: {}", c.peer();
"peer" => c.peer(),
"listen_address" => &self.listen
);
c
}
Err(crate::error::Error::Timeout) => {
continue 'accept;
}
Err(e) => {
dispatcher_log!(self,
error,
"listener accept error: {e}";
"listen_address" => &self.listen
);
error!(log, "listener accept error: {e}");
continue 'listener;
}
};

let peer_addr = accepted.peer();
let key = self.resolve_session_key(peer_addr);
let session_log = log.new(slog::o!(
"peer" => peer_addr,
"session_key" => format!("{key:?}"),
));

match lock!(self.peer_to_session).get(&key).cloned() {
Some(session_endpoint) => {
Expand All @@ -177,12 +181,8 @@ impl<Cnx: BgpConnection + 'static> Dispatcher<Cnx> {
if let Err(e) =
Listener::apply_policy(&accepted, min_ttl, md5_key)
{
dispatcher_log!(self,
warn,
"failed to apply policy for connection from {}: {e}", peer_addr;
"listen_address" => &self.listen,
"peer" => format!("{}", peer_addr),
"session_key" => format!("{:?}", key),
warn!(session_log,
"failed to apply policy for connection";
"error" => format!("{e}")
);
}
Expand All @@ -192,56 +192,41 @@ impl<Cnx: BgpConnection + 'static> Dispatcher<Cnx> {
SessionEvent::TcpConnectionAcked(accepted),
))
{
dispatcher_log!(self,
error,
"failed to send connected event to session for {}: {e}", peer_addr;
"listen_address" => &self.listen,
"peer" => format!("{}", peer_addr),
"session_key" => format!("{:?}", key)
error!(session_log,
"failed to send connected event to session";
"error" => format!("{e}")
);
continue 'listener;
}
}
None => {
dispatcher_log!(self,
debug,
"no session found for peer, dropping connection";
"peer" => format!("{}", peer_addr),
"resolved_key" => format!("{:?}", key),
"listen_address" => &self.listen
debug!(
session_log,
"no session found for peer, dropping connection"
);
continue 'accept;
}
}
}
}
dispatcher_log!(self,
info,
"dispatcher shutdown complete";
"listen_address" => &self.listen
);
info!(log, "dispatcher shutdown complete");
}

pub fn listen_addr(&self) -> &str {
&self.listen
}

pub fn shutdown(&self) {
dispatcher_log!(self, info,
"dispatcher received shutdown request, setting shutdown flag";
"listen_address" => &self.listen
info!(
lock!(self.log),
"dispatcher received shutdown request, setting shutdown flag"
);
self.shutdown.store(true, Ordering::Release);
}
}

impl<Cnx: BgpConnection> Drop for Dispatcher<Cnx> {
fn drop(&mut self) {
dispatcher_log!(self,
debug,
"dropping dispatcher with listen_addr {}",
&self.listen;
"listen_address" => &self.listen
);
debug!(lock!(self.log), "dropping dispatcher");
}
}
Loading