Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace rpc in antctl with metric service #2670

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions ant-networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub enum NetworkEvent {
KeysToFetchForReplication(Vec<(PeerId, RecordKey)>),
/// Started listening on a new address
NewListenAddr(Multiaddr),
/// stopped Listening from a address
ClosedListenAddr(Vec<Multiaddr>),
/// Report unverified record
UnverifiedRecord(Record),
/// Terminate Node on unrecoverable errors
Expand Down Expand Up @@ -187,6 +189,9 @@ impl Debug for NetworkEvent {
NetworkEvent::NewListenAddr(addr) => {
write!(f, "NetworkEvent::NewListenAddr({addr:?})")
}
NetworkEvent::ClosedListenAddr(addr) => {
write!(f, "NetworkEvent::ClosedListenAddr({addr:?})")
}
NetworkEvent::UnverifiedRecord(record) => {
let pretty_key = PrettyPrintRecordKey::from(&record.key);
write!(f, "NetworkEvent::UnverifiedRecord({pretty_key:?})")
Expand Down
2 changes: 2 additions & 0 deletions ant-networking/src/event/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ impl SwarmDriver {
if let Some(relay_manager) = self.relay_manager.as_mut() {
relay_manager.on_listener_closed(&listener_id, &mut self.swarm);
}

self.send_event(NetworkEvent::ClosedListenAddr(addresses.clone()));
}
SwarmEvent::IncomingConnection {
connection_id,
Expand Down
39 changes: 38 additions & 1 deletion ant-networking/src/metrics/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct MetricsRegistries {
pub standard_metrics: Registry,
pub extended_metrics: Registry,
pub metadata: Registry,
pub metadata_extended: Registry,
}

const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0";
Expand All @@ -36,7 +37,7 @@ pub(crate) fn run_metrics_server(registries: MetricsRegistries, port: u16) {
info!("Metrics server on http://{}/metrics", server.local_addr());
println!("Metrics server on http://{}/metrics", server.local_addr());

info!("Metrics server on http://{} Available endpoints: /metrics, /metrics_extended, /metadata", server.local_addr());
info!("Metrics server on http://{} Available endpoints: /metrics, /metrics_extended, /metadata, /metadata_extended", server.local_addr());
// run the server forever
if let Err(e) = server.await {
error!("server error: {}", e);
Expand All @@ -50,6 +51,7 @@ pub(crate) struct MetricService {
standard_registry: SharedRegistry,
extended_registry: SharedRegistry,
metadata: SharedRegistry,
metadata_extended: SharedRegistry,
}

impl MetricService {
Expand All @@ -65,6 +67,10 @@ impl MetricService {
Arc::clone(&self.metadata)
}

fn get_metadata_extended_registry(&mut self) -> SharedRegistry {
Arc::clone(&self.metadata_extended)
}

fn respond_with_metrics(&mut self) -> Result<Response<String>> {
let mut response: Response<String> = Response::default();

Expand Down Expand Up @@ -152,6 +158,28 @@ impl MetricService {
Ok(response)
}

fn respond_with_metadata_extended(&mut self) -> Result<Response<String>> {
let mut response: Response<String> = Response::default();

response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
METRICS_CONTENT_TYPE
.try_into()
.map_err(|_| NetworkError::NetworkMetricError)?,
);

let reg = self.get_metadata_extended_registry();
let reg = reg.lock().map_err(|_| NetworkError::NetworkMetricError)?;
encode(&mut response.body_mut(), &reg).map_err(|err| {
error!("Failed to encode the metadata Registry {err:?}");
NetworkError::NetworkMetricError
})?;

*response.status_mut() = StatusCode::OK;

Ok(response)
}

fn respond_with_404_not_found(&mut self) -> Response<String> {
let mut resp = Response::default();
*resp.status_mut() = StatusCode::NOT_FOUND;
Expand Down Expand Up @@ -196,6 +224,11 @@ impl Service<Request<Body>> for MetricService {
Ok(resp) => resp,
Err(_) => self.respond_with_500_server_error(),
}
} else if req_method == Method::GET && req_path == "/metadata_extended" {
match self.respond_with_metadata_extended() {
Ok(resp) => resp,
Err(_) => self.respond_with_500_server_error(),
}
} else {
self.respond_with_404_not_found()
};
Expand All @@ -207,6 +240,7 @@ pub(crate) struct MakeMetricService {
standard_registry: SharedRegistry,
extended_registry: SharedRegistry,
metadata: SharedRegistry,
metadata_extended: SharedRegistry,
}

impl MakeMetricService {
Expand All @@ -215,6 +249,7 @@ impl MakeMetricService {
standard_registry: Arc::new(Mutex::new(registries.standard_metrics)),
extended_registry: Arc::new(Mutex::new(registries.extended_metrics)),
metadata: Arc::new(Mutex::new(registries.metadata)),
metadata_extended: Arc::new(Mutex::new(registries.metadata_extended)),
}
}
}
Expand All @@ -232,12 +267,14 @@ impl<T> Service<T> for MakeMetricService {
let standard_registry = Arc::clone(&self.standard_registry);
let extended_registry = Arc::clone(&self.extended_registry);
let metadata = Arc::clone(&self.metadata);
let metadata_extended = Arc::clone(&self.metadata_extended);

let fut = async move {
Ok(MetricService {
standard_registry,
extended_registry,
metadata,
metadata_extended,
})
};
Box::pin(fut)
Expand Down
61 changes: 58 additions & 3 deletions ant-networking/src/network_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,14 @@ impl NetworkBuilder {
let listen_addr = self.listen_addr;
let upnp = self.upnp;

let (network, events_receiver, mut swarm_driver) =
self.build(kad_cfg, Some(store_cfg), false, ProtocolSupport::Full, upnp);
let (network, events_receiver, mut swarm_driver) = self.build(
kad_cfg,
Some(store_cfg),
false,
ProtocolSupport::Full,
upnp,
Some(root_dir.clone()),
);

// Listen on the provided address
let listen_socket_addr = listen_addr.ok_or(NetworkError::ListenAddressNotProvided)?;
Expand Down Expand Up @@ -282,7 +288,7 @@ impl NetworkBuilder {
.set_replication_factor(REPLICATION_FACTOR);

let (network, net_event_recv, driver) =
self.build(kad_cfg, None, true, ProtocolSupport::Outbound, false);
self.build(kad_cfg, None, true, ProtocolSupport::Outbound, false, None);

(network, net_event_recv, driver)
}
Expand All @@ -295,6 +301,7 @@ impl NetworkBuilder {
is_client: bool,
req_res_protocol: ProtocolSupport,
upnp: bool,
root_dir: Option<PathBuf>,
) -> (Network, mpsc::Receiver<NetworkEvent>, SwarmDriver) {
let identify_protocol_str = IDENTIFY_PROTOCOL_STR
.read()
Expand Down Expand Up @@ -367,6 +374,54 @@ impl NetworkBuilder {
)]),
);

let metadata_extended_sub_reg = metrics_registries
.metadata_extended
.sub_registry_with_prefix("ant_networking");

metadata_extended_sub_reg.register(
"peer_id",
"Identifier of a peer of the network",
Info::new(vec![("peer_id".to_string(), peer_id.to_string())]),
);

metadata_extended_sub_reg.register(
"pid",
"id of the node process",
Info::new(vec![("pid".to_string(), std::process::id().to_string())]),
);

metadata_extended_sub_reg.register(
"bin_version",
"Package version of the node",
Info::new(vec![(
"bin_version".to_string(),
env!("CARGO_PKG_VERSION").to_string(),
)]),
);

if let Some(root_dir) = root_dir.clone() {
metadata_extended_sub_reg.register(
"data_dir",
"Root directory of the node",
Info::new(vec![(
"root_dir".to_string(),
root_dir.clone().to_string_lossy().to_string(),
)]),
);
}

if let Some(log_dir) = root_dir.clone() {
let log_dir = log_dir.join("logs");
metadata_extended_sub_reg.register(
"log_dir",
"Root directory of the node",
Info::new(vec![(
"log_dir".to_string(),
log_dir.clone().to_string_lossy().to_string(),
)]),
);
}

run_metrics_server(metrics_registries, port);
Some(metrics_recorder)
} else {
Expand Down
23 changes: 2 additions & 21 deletions ant-node-manager/src/add_services/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@ use ant_logging::LogFormat;
use ant_service_management::node::push_arguments_from_peers_args;
use color_eyre::{eyre::eyre, Result};
use service_manager::{ServiceInstallCtx, ServiceLabel};
use std::{
ffi::OsString,
net::{Ipv4Addr, SocketAddr},
path::PathBuf,
str::FromStr,
};
use std::{ffi::OsString, net::Ipv4Addr, path::PathBuf, str::FromStr};

#[derive(Clone, Debug)]
pub enum PortRange {
Expand Down Expand Up @@ -87,7 +82,6 @@ pub struct InstallNodeServiceCtxBuilder {
pub node_port: Option<u16>,
pub peers_args: PeersArgs,
pub rewards_address: RewardsAddress,
pub rpc_socket_addr: SocketAddr,
pub service_user: Option<String>,
pub upnp: bool,
}
Expand All @@ -96,8 +90,6 @@ impl InstallNodeServiceCtxBuilder {
pub fn build(self) -> Result<ServiceInstallCtx> {
let label: ServiceLabel = self.name.parse()?;
let mut args = vec![
OsString::from("--rpc"),
OsString::from(self.rpc_socket_addr.to_string()),
OsString::from("--root-dir"),
OsString::from(self.data_dir_path.to_string_lossy().to_string()),
OsString::from("--log-output-dest"),
Expand Down Expand Up @@ -190,8 +182,6 @@ pub struct AddNodeServiceOptions {
pub node_port: Option<PortRange>,
pub peers_args: PeersArgs,
pub rewards_address: RewardsAddress,
pub rpc_address: Option<Ipv4Addr>,
pub rpc_port: Option<PortRange>,
pub service_data_dir_path: PathBuf,
pub service_log_dir_path: PathBuf,
pub upnp: bool,
Expand Down Expand Up @@ -302,7 +292,7 @@ pub struct AddDaemonServiceOptions {
mod tests {
use super::*;
use ant_evm::{CustomNetwork, RewardsAddress};
use std::net::{IpAddr, Ipv4Addr};
use std::net::Ipv4Addr;

fn create_default_builder() -> InstallNodeServiceCtxBuilder {
InstallNodeServiceCtxBuilder {
Expand All @@ -324,7 +314,6 @@ mod tests {
peers_args: PeersArgs::default(),
rewards_address: RewardsAddress::from_str("0x03B770D9cD32077cC0bF330c13C114a87643B124")
.unwrap(),
rpc_socket_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
service_user: None,
upnp: false,
}
Expand Down Expand Up @@ -359,7 +348,6 @@ mod tests {
peers_args: PeersArgs::default(),
rewards_address: RewardsAddress::from_str("0x03B770D9cD32077cC0bF330c13C114a87643B124")
.unwrap(),
rpc_socket_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
antnode_path: PathBuf::from("/bin/antnode"),
service_user: None,
upnp: false,
Expand Down Expand Up @@ -395,7 +383,6 @@ mod tests {
peers_args: PeersArgs::default(),
rewards_address: RewardsAddress::from_str("0x03B770D9cD32077cC0bF330c13C114a87643B124")
.unwrap(),
rpc_socket_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
antnode_path: PathBuf::from("/bin/antnode"),
service_user: None,
upnp: false,
Expand All @@ -414,8 +401,6 @@ mod tests {
assert_eq!(result.working_directory, None);

let expected_args = vec![
"--rpc",
"127.0.0.1:8080",
"--root-dir",
"/data",
"--log-output-dest",
Expand Down Expand Up @@ -446,8 +431,6 @@ mod tests {
assert_eq!(result.working_directory, None);

let expected_args = vec![
"--rpc",
"127.0.0.1:8080",
"--root-dir",
"/data",
"--log-output-dest",
Expand Down Expand Up @@ -495,8 +478,6 @@ mod tests {
let result = builder.build().unwrap();

let expected_args = vec![
"--rpc",
"127.0.0.1:8080",
"--root-dir",
"/data",
"--log-output-dest",
Expand Down
Loading
Loading