diff --git a/Cargo.lock b/Cargo.lock index 4ca3bac62..8c0fb4029 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -227,6 +227,28 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-lc-rs" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b5ce75405893cd713f9ab8e297d8e438f624dde7d706108285f7e17a25a180f" +dependencies = [ + "aws-lc-sys", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "179c3777a8b5e70e90ea426114ffc565b2c1a9f82f6c4a0c5a34aa6ef5e781b6" +dependencies = [ + "cc", + "cmake", + "dunce", + "fs_extra", +] + [[package]] name = "axum" version = "0.8.7" @@ -654,6 +676,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90583009037521a116abf44494efecd645ba48b6622457080f080b85544e2215" dependencies = [ "find-msvc-tools", + "jobserver", + "libc 0.2.178", "shlex", ] @@ -773,6 +797,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "cmake" +version = "0.1.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b042e5d8a74ae91bb0961acd039822472ec99f8ab0948cbf6d1369588f8be586" +dependencies = [ + "cc", +] + [[package]] name = "color-eyre" version = "0.6.5" @@ -1212,6 +1245,7 @@ version = "0.4.0" dependencies = [ "bolero", "caps", + "chrono", "dataplane-hardware", "dataplane-k8s-intf", "dataplane-lpm", @@ -1876,6 +1910,12 @@ dependencies = [ "tracing-test", ] +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "dyn-clone" version = "1.0.20" @@ -2096,6 +2136,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "funty" version = "2.0.0" @@ -2831,6 +2877,16 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jobserver" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" +dependencies = [ + "getrandom 0.3.4", + "libc 0.2.178", +] + [[package]] name = "js-sys" version = "0.3.83" @@ -4512,6 +4568,7 @@ version = "0.23.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ + "aws-lc-rs", "log", "once_cell", "ring", @@ -4557,6 +4614,7 @@ version = "0.103.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ffdfa2f5286e2247234e03f680868ac2815974dc39e00ea15adc445d0aafe52" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", diff --git a/NOTICE b/NOTICE new file mode 100644 index 000000000..9714f49e0 --- /dev/null +++ b/NOTICE @@ -0,0 +1,3 @@ +This product includes software developed by the OpenSSL Project for use in the OpenSSL Toolkit. (http://www.openssl.org/) + +This product includes cryptographic software written by Eric Young (eay@cryptsoft.com) diff --git a/config/Cargo.toml b/config/Cargo.toml index 6e15512b7..72fa23824 100644 --- a/config/Cargo.toml +++ b/config/Cargo.toml @@ -15,6 +15,7 @@ tracectl = { workspace = true } k8s-intf = { workspace = true } # external +chrono = { workspace = true } derive_builder = { workspace = true, default-features = false, features = ["default"] } ipnet = { workspace = true } linkme = { workspace = true } diff --git a/config/src/converters/k8s/bgp.rs b/config/src/converters/k8s/config/bgp.rs similarity index 100% rename from config/src/converters/k8s/bgp.rs rename to config/src/converters/k8s/config/bgp.rs diff --git a/config/src/converters/k8s/device.rs b/config/src/converters/k8s/config/device.rs similarity index 100% rename from config/src/converters/k8s/device.rs rename to config/src/converters/k8s/config/device.rs diff --git a/config/src/converters/k8s/expose.rs b/config/src/converters/k8s/config/expose.rs similarity index 98% rename from config/src/converters/k8s/expose.rs rename to config/src/converters/k8s/config/expose.rs index 0297dbbf0..d3da9a96d 100644 --- a/config/src/converters/k8s/expose.rs +++ b/config/src/converters/k8s/config/expose.rs @@ -9,7 +9,8 @@ use k8s_intf::gateway_agent_crd::{ }; use lpm::prefix::{Prefix, PrefixString}; -use crate::converters::k8s::{FromK8sConversionError, SubnetMap}; +use crate::converters::k8s::FromK8sConversionError; +use crate::converters::k8s::config::SubnetMap; use crate::external::overlay::vpcpeering::VpcExpose; fn process_ip_block( diff --git a/config/src/converters/k8s/gateway_config.rs b/config/src/converters/k8s/config/gateway_config.rs similarity index 100% rename from config/src/converters/k8s/gateway_config.rs rename to config/src/converters/k8s/config/gateway_config.rs diff --git a/config/src/converters/k8s/interface.rs b/config/src/converters/k8s/config/interface.rs similarity index 100% rename from config/src/converters/k8s/interface.rs rename to config/src/converters/k8s/config/interface.rs diff --git a/config/src/converters/k8s/config/mod.rs b/config/src/converters/k8s/config/mod.rs new file mode 100644 index 000000000..908285c7e --- /dev/null +++ b/config/src/converters/k8s/config/mod.rs @@ -0,0 +1,24 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +//! Converter for gateway-schema k8s objects to internal config + +#![deny(clippy::all, clippy::pedantic)] + +pub mod bgp; +pub mod device; +pub mod expose; +pub mod gateway_config; +pub mod interface; +pub mod overlay; +pub mod peering; +pub mod tracecfg; +pub mod underlay; +pub mod vpc; + +use std::collections::BTreeMap; + +use lpm::prefix::Prefix; + +pub type SubnetMap = BTreeMap; +pub type VpcSubnetMap = BTreeMap; diff --git a/config/src/converters/k8s/overlay.rs b/config/src/converters/k8s/config/overlay.rs similarity index 97% rename from config/src/converters/k8s/overlay.rs rename to config/src/converters/k8s/config/overlay.rs index b0352a14c..d7f37bd3d 100644 --- a/config/src/converters/k8s/overlay.rs +++ b/config/src/converters/k8s/config/overlay.rs @@ -6,7 +6,8 @@ use std::collections::BTreeMap; use k8s_intf::gateway_agent_crd::{GatewayAgentPeerings, GatewayAgentSpec, GatewayAgentVpcs}; use lpm::prefix::Prefix; -use crate::converters::k8s::{FromK8sConversionError, SubnetMap, VpcSubnetMap}; +use crate::converters::k8s::FromK8sConversionError; +use crate::converters::k8s::config::{SubnetMap, VpcSubnetMap}; use crate::external::overlay::Overlay; use crate::external::overlay::vpc::{Vpc, VpcTable}; use crate::external::overlay::vpcpeering::{VpcPeering, VpcPeeringTable}; diff --git a/config/src/converters/k8s/peering.rs b/config/src/converters/k8s/config/peering.rs similarity index 96% rename from config/src/converters/k8s/peering.rs rename to config/src/converters/k8s/config/peering.rs index f28341b5e..100a30100 100644 --- a/config/src/converters/k8s/peering.rs +++ b/config/src/converters/k8s/config/peering.rs @@ -3,7 +3,8 @@ use k8s_intf::gateway_agent_crd::{GatewayAgentPeerings, GatewayAgentPeeringsPeering}; -use crate::converters::k8s::{FromK8sConversionError, SubnetMap, VpcSubnetMap}; +use crate::converters::k8s::FromK8sConversionError; +use crate::converters::k8s::config::{SubnetMap, VpcSubnetMap}; use crate::external::overlay::vpcpeering::{VpcExpose, VpcManifest, VpcPeering}; impl TryFrom<(&SubnetMap, &str, &GatewayAgentPeeringsPeering)> for VpcManifest { @@ -68,7 +69,7 @@ mod test { }; use lpm::prefix::Prefix; - use crate::converters::k8s::{SubnetMap, VpcSubnetMap}; + use crate::converters::k8s::config::{SubnetMap, VpcSubnetMap}; #[test] fn test_vpc_manifest_conversion() { diff --git a/config/src/converters/k8s/tracecfg.rs b/config/src/converters/k8s/config/tracecfg.rs similarity index 100% rename from config/src/converters/k8s/tracecfg.rs rename to config/src/converters/k8s/config/tracecfg.rs diff --git a/config/src/converters/k8s/underlay.rs b/config/src/converters/k8s/config/underlay.rs similarity index 100% rename from config/src/converters/k8s/underlay.rs rename to config/src/converters/k8s/config/underlay.rs diff --git a/config/src/converters/k8s/vpc.rs b/config/src/converters/k8s/config/vpc.rs similarity index 100% rename from config/src/converters/k8s/vpc.rs rename to config/src/converters/k8s/config/vpc.rs diff --git a/config/src/converters/k8s/mod.rs b/config/src/converters/k8s/mod.rs index a2351234d..f04491174 100644 --- a/config/src/converters/k8s/mod.rs +++ b/config/src/converters/k8s/mod.rs @@ -1,27 +1,13 @@ // SPDX-License-Identifier: Apache-2.0 // Copyright Open Network Fabric Authors -//! Converter for gateway-schema k8s objects to internal config +//! Converter for gateway-schema k8s objects to internal config and status -#![deny(clippy::all, clippy::pedantic)] - -pub mod bgp; -pub mod device; -pub mod expose; -pub mod gateway_config; -pub mod interface; -pub mod overlay; -pub mod peering; -pub mod tracecfg; -pub mod underlay; -pub mod vpc; - -use std::collections::BTreeMap; +pub mod config; +pub mod status; use thiserror::Error; -use lpm::prefix::Prefix; - #[derive(Debug, Error)] pub enum FromK8sConversionError { #[error("Invalid Gateway Agent object: {0}")] @@ -45,6 +31,3 @@ pub enum ToK8sConversionError { #[error("Source configuration cannot be converted: {0}")] Unsupported(String), } - -pub type SubnetMap = BTreeMap; -pub type VpcSubnetMap = BTreeMap; diff --git a/config/src/converters/k8s/status/dataplane_status.rs b/config/src/converters/k8s/status/dataplane_status.rs new file mode 100644 index 000000000..641bede10 --- /dev/null +++ b/config/src/converters/k8s/status/dataplane_status.rs @@ -0,0 +1,212 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +use std::collections::BTreeMap; + +use chrono::{DateTime, Utc}; + +use k8s_intf::gateway_agent_crd::{ + GatewayAgentStatus, GatewayAgentStatusState, GatewayAgentStatusStateFrr, + GatewayAgentStatusStatePeerings, GatewayAgentStatusStateVpcs, +}; + +use crate::converters::k8s::ToK8sConversionError; +use crate::internal::status::DataplaneStatus; + +pub struct DataplaneStatusForK8sConversion<'a> { + pub last_applied_gen: Option, + pub last_applied_time: Option<&'a DateTime>, + pub last_collected_time: Option<&'a DateTime>, + pub status: Option<&'a DataplaneStatus>, +} + +impl TryFrom<&DataplaneStatusForK8sConversion<'_>> for GatewayAgentStatus { + type Error = ToK8sConversionError; + + fn try_from(status: &DataplaneStatusForK8sConversion<'_>) -> Result { + if status.last_applied_time.is_none() && status.last_applied_gen.is_some() { + return Err(ToK8sConversionError::MissingData( + "last_applied_gen is set, but last_applied_time is not".to_string(), + )); + } + + if status.last_collected_time.is_none() && status.status.is_some() { + return Err(ToK8sConversionError::MissingData( + "status is set, but last_collected_time is not".to_string(), + )); + } + + let frr_status = status + .status + .and_then(|status| { + status + .frr_status + .as_ref() + .map(GatewayAgentStatusStateFrr::try_from) + }) + .transpose()?; + + let vpcs = status + .status + .map(|status| { + status + .vpc_counters + .iter() + .map(|(k, v)| Ok((k.clone(), GatewayAgentStatusStateVpcs::try_from(v)?))) + .collect::, _>>() + }) + .transpose()?; + + let peerings = status + .status + .map(|status| { + status + .vpc_peering_counters + .iter() + .map(|(k, v)| Ok((k.clone(), GatewayAgentStatusStatePeerings::try_from(v)?))) + .collect::, _>>() + }) + .transpose()?; + + Ok(GatewayAgentStatus { + agent_version: None, + last_applied_gen: status.last_applied_gen, + last_applied_time: status + .last_applied_time + .map(|lat| lat.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)), + state: status.status.map(|_| GatewayAgentStatusState { + frr: frr_status, + last_collected_time: status + .last_collected_time + .map(|t| t.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)), + peerings: peerings.filter(|c| !c.is_empty()), + vpcs: vpcs.filter(|c| !c.is_empty()), + }), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use bolero::generator::*; + use bolero::{Driver, TypeGenerator}; + use chrono::TimeZone; + + use crate::internal::status::DataplaneStatus; + use crate::internal::status::contract::LegalValue; + + fn datetime_gen() -> impl ValueGenerator> { + (0..=i64::from(i32::MAX)).map_gen(|ts| Utc.timestamp_opt(ts, 0).unwrap()) + } + + #[derive(Debug)] + struct DataplaneStatusForK8sConversionOwned { + status: Option, + last_collected_time: Option>, + last_applied_gen: Option, + last_applied_time: Option>, + } + + impl TypeGenerator for LegalValue { + fn generate(d: &mut D) -> Option { + let time_gen = datetime_gen(); + let last_applied_gen = d.produce()?; + let last_applied_time = time_gen.generate(d)?; + let status = d + .produce::>>()? + .map(|v| v.take()); + let last_collected_time_raw = time_gen.generate(d)?; + let last_collected_time = status.as_ref().map(|_| last_collected_time_raw); + Some(LegalValue::new(DataplaneStatusForK8sConversionOwned { + status, + last_collected_time, + last_applied_gen, + last_applied_time: last_applied_gen.map(|_| last_applied_time), + })) + } + } + + #[test] + fn test_dataplane_status_conversion() { + bolero::check!() + .with_type::>() + .for_each(|status_owned| { + let status_owned = status_owned.as_ref(); + let conv_status = DataplaneStatusForK8sConversion { + status: status_owned.status.as_ref(), + last_collected_time: status_owned.last_collected_time.as_ref(), + last_applied_gen: status_owned.last_applied_gen, + last_applied_time: status_owned.last_applied_time.as_ref(), + }; + + let gateway_agent_status = GatewayAgentStatus::try_from(&conv_status).unwrap(); + + assert_eq!( + gateway_agent_status.last_applied_gen, + conv_status.last_applied_gen + ); + assert_eq!( + gateway_agent_status.last_applied_time, + conv_status + .last_applied_time + .map(|lat| lat.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)) + ); + assert_eq!( + gateway_agent_status + .state + .as_ref() + .and_then(|s| s.last_collected_time.clone()), + conv_status + .last_collected_time + .map(|t| t.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)) + ); + + assert_eq!( + conv_status.status.is_some(), + gateway_agent_status.state.is_some() + ); + + if let Some(state) = gateway_agent_status.state.as_ref() { + assert!(state.frr.is_some()); // Specifics tested elsewhere + + match state.vpcs.as_ref() { + Some(vpcs) => { + // Specifics tested elsewhere + assert_eq!( + vpcs.len(), + conv_status + .status + .expect("status should not be None") + .vpc_counters + .len() + ); + } + None => { + assert!( + conv_status.status.is_none() + || conv_status.status.unwrap().vpc_counters.is_empty() + ); + } + } + + match state.peerings.as_ref() { + // Specifics tested elsewhere + Some(peerings) => assert_eq!( + peerings.len(), + conv_status + .status + .expect("status should not be None") + .vpc_peering_counters + .len() + ), + None => assert!( + conv_status.status.is_none() + || conv_status.status.unwrap().vpc_peering_counters.is_empty() + ), + } + } + }); + } +} diff --git a/config/src/converters/k8s/status/frr.rs b/config/src/converters/k8s/status/frr.rs new file mode 100644 index 000000000..ae9cba2e4 --- /dev/null +++ b/config/src/converters/k8s/status/frr.rs @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +use k8s_intf::gateway_agent_crd::GatewayAgentStatusStateFrr; + +use crate::converters::k8s::ToK8sConversionError; +use crate::internal::status::FrrStatus; + +impl TryFrom<&FrrStatus> for GatewayAgentStatusStateFrr { + type Error = ToK8sConversionError; + + fn try_from(status: &FrrStatus) -> Result { + Ok(GatewayAgentStatusStateFrr { + last_applied_gen: Some(status.applied_config_gen), + }) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use crate::internal::status::contract::LegalValue; + + #[test] + fn test_frr_status_conversion() { + bolero::check!() + .with_type::>() + .for_each(|status| { + let status = status.as_ref(); + let k8s_frr_status = GatewayAgentStatusStateFrr::try_from(status) + .expect("Failed to convert frr status"); + + assert_eq!( + status.applied_config_gen, + k8s_frr_status + .last_applied_gen + .expect("K8s frr last applied gen not set"), + ); + }); + } +} diff --git a/config/src/converters/k8s/status/mod.rs b/config/src/converters/k8s/status/mod.rs new file mode 100644 index 000000000..4112c27b3 --- /dev/null +++ b/config/src/converters/k8s/status/mod.rs @@ -0,0 +1,9 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +//! Converter for gateway-schema k8s objects from internal status + +pub mod dataplane_status; +pub mod frr; +pub mod peerings; +pub mod vpcs; diff --git a/config/src/converters/k8s/status/peerings.rs b/config/src/converters/k8s/status/peerings.rs new file mode 100644 index 000000000..5f3e0d284 --- /dev/null +++ b/config/src/converters/k8s/status/peerings.rs @@ -0,0 +1,21 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +use k8s_intf::gateway_agent_crd::GatewayAgentStatusStatePeerings; + +use crate::converters::k8s::ToK8sConversionError; +use crate::internal::status::VpcPeeringCounters; + +impl TryFrom<&VpcPeeringCounters> for GatewayAgentStatusStatePeerings { + type Error = ToK8sConversionError; + + fn try_from(counters: &VpcPeeringCounters) -> Result { + Ok(GatewayAgentStatusStatePeerings { + b: Some(counters.bytes), + d: Some(counters.drops), + p: Some(counters.packets), + bps: Some(counters.bps), + pps: Some(counters.pps), + }) + } +} diff --git a/config/src/converters/k8s/status/vpcs.rs b/config/src/converters/k8s/status/vpcs.rs new file mode 100644 index 000000000..12005e4fe --- /dev/null +++ b/config/src/converters/k8s/status/vpcs.rs @@ -0,0 +1,44 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +use k8s_intf::gateway_agent_crd::GatewayAgentStatusStateVpcs; + +use crate::converters::k8s::ToK8sConversionError; +use crate::internal::status::VpcCounters; + +impl TryFrom<&VpcCounters> for GatewayAgentStatusStateVpcs { + type Error = ToK8sConversionError; + + fn try_from(vpc_counters: &VpcCounters) -> Result { + Ok(GatewayAgentStatusStateVpcs { + b: Some(vpc_counters.bytes), + d: Some(vpc_counters.drops), + p: Some(vpc_counters.packets), + }) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use crate::internal::status::contract::LegalValue; + + #[test] + fn test_vpc_status_conversion() { + bolero::check!() + .with_type::>() + .for_each(|status| { + let status = status.as_ref(); + let k8s_vpc_status = GatewayAgentStatusStateVpcs::try_from(status) + .expect("Failed to convert frr status"); + + assert_eq!(status.bytes, k8s_vpc_status.b.expect("K8s vpcs b not set"),); + assert_eq!(status.drops, k8s_vpc_status.d.expect("K8s vpcs d not set"),); + assert_eq!( + status.packets, + k8s_vpc_status.p.expect("K8s vpcs p not set"), + ); + }); + } +} diff --git a/config/src/internal/status/mod.rs b/config/src/internal/status/mod.rs index 0228a0bdb..58811963e 100644 --- a/config/src/internal/status/mod.rs +++ b/config/src/internal/status/mod.rs @@ -7,7 +7,11 @@ use std::collections::HashMap; +#[cfg(test)] +use bolero::TypeGenerator; + #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] +#[cfg_attr(test, derive(TypeGenerator))] pub enum InterfaceOperStatusType { #[default] Unknown, @@ -17,6 +21,7 @@ pub enum InterfaceOperStatusType { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] +#[cfg_attr(test, derive(TypeGenerator))] pub enum InterfaceAdminStatusType { #[default] Unknown, @@ -25,6 +30,7 @@ pub enum InterfaceAdminStatusType { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] +#[cfg_attr(test, derive(TypeGenerator))] pub enum ZebraStatusType { #[default] NotConnected, @@ -32,6 +38,7 @@ pub enum ZebraStatusType { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] +#[cfg_attr(test, derive(TypeGenerator))] pub enum FrrAgentStatusType { #[default] NotConnected, @@ -39,6 +46,7 @@ pub enum FrrAgentStatusType { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] +#[cfg_attr(test, derive(TypeGenerator))] pub enum DataplaneStatusType { #[default] Unknown, @@ -48,6 +56,7 @@ pub enum DataplaneStatusType { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] +#[cfg_attr(test, derive(TypeGenerator))] pub enum BgpNeighborSessionState { #[default] Unset, @@ -86,6 +95,7 @@ impl InterfaceStatus { } #[derive(Clone, Debug, Default, PartialEq)] +#[cfg_attr(test, derive(TypeGenerator))] pub struct InterfaceCounters { pub tx_bits: u64, pub tx_bps: f64, @@ -372,3 +382,151 @@ impl DataplaneStatus { self.bgp = Some(b); } } + +#[cfg(test)] +pub(crate) mod contract { + use super::*; + + use std::ops::Bound; + + use bolero::{Driver, TypeGenerator}; + + #[derive(Debug)] + pub struct LegalValue(T); + + impl AsRef for LegalValue { + fn as_ref(&self) -> &T { + &self.0 + } + } + + impl LegalValue { + pub fn new(value: T) -> Self { + LegalValue(value) + } + + pub fn take(self) -> T { + self.0 + } + } + + impl TypeGenerator for LegalValue { + fn generate(d: &mut D) -> Option> { + let applied_config_gen = d.gen_u32(Bound::Included(&0), Bound::Unbounded)?; + let total_configs = + d.gen_u32(Bound::Included(&applied_config_gen), Bound::Unbounded)?; + let applied_configs = d.gen_u32( + Bound::Included(&applied_config_gen), + Bound::Included(&total_configs), + )?; + let failed_configs = total_configs - applied_configs; + + Some(LegalValue(FrrStatus { + zebra_status: d.produce::()?, + frr_agent_status: d.produce::()?, + applied_config_gen: i64::from(applied_config_gen), + restarts: d.gen_u32(Bound::Included(&0), Bound::Unbounded)?, + applied_configs, + failed_configs, + })) + } + } + + impl TypeGenerator for LegalValue { + fn generate(d: &mut D) -> Option> { + let bytes = d.gen_u64(Bound::Included(&0), Bound::Unbounded)?; + let drops = d.gen_u64(Bound::Included(&0), Bound::Unbounded)?; + let packets = d.gen_u64(Bound::Included(&0), Bound::Unbounded)?; + + Some(LegalValue(VpcCounters { + name: format!("vpc-{}", d.gen_u32(Bound::Included(&0), Bound::Unbounded)?), + bytes, + drops, + packets, + })) + } + } + + impl TypeGenerator for LegalValue { + fn generate(d: &mut D) -> Option> { + let bytes = d.gen_u64(Bound::Included(&0), Bound::Unbounded)?; + let drops = d.gen_u64(Bound::Included(&0), Bound::Unbounded)?; + let packets = d.gen_u64(Bound::Included(&0), Bound::Unbounded)?; + let pps = d.gen_f64(Bound::Included(&0.0), Bound::Unbounded)?; + let bps = d.gen_f64(Bound::Included(&0.0), Bound::Unbounded)?; + + let src_vpc = format!("vpc-{}", d.gen_u32(Bound::Included(&0), Bound::Unbounded)?); + let dst_vpc = format!("vpc-{}", d.gen_u32(Bound::Included(&0), Bound::Unbounded)?); + + Some(LegalValue(VpcPeeringCounters { + name: format!("{src_vpc}--{dst_vpc}"), + src_vpc, + dst_vpc, + bytes, + drops, + packets, + pps, + bps, + })) + } + } + + impl TypeGenerator for LegalValue { + fn generate(d: &mut D) -> Option> { + let num_vpcs = d.gen_usize(Bound::Included(&0), Bound::Included(&10))?; + let vpc_names = (0..=num_vpcs) + .map(|i| format!("vpc-{i}")) + .collect::>(); + + let mut vpc_counters = HashMap::new(); + for vpc_name in &vpc_names { + let bytes = d.gen_u64(Bound::Included(&0), Bound::Unbounded)?; + let drops = d.gen_u64(Bound::Included(&0), Bound::Unbounded)?; + let packets = d.gen_u64(Bound::Included(&0), Bound::Unbounded)?; + + vpc_counters.insert( + vpc_name.clone(), + VpcCounters { + name: vpc_name.clone(), + ..d.produce::>()?.take() + }, + ); + } + + let num_vpc_peerings = d.gen_usize( + Bound::Included(&0), + Bound::Included(&std::cmp::min(num_vpcs * num_vpcs, 10)), + )?; + let mut vpc_peering_counters = HashMap::new(); + + for i in 0..num_vpc_peerings { + let src_vpc = &vpc_names + [d.gen_usize(Bound::Included(&0), Bound::Excluded(&vpc_names.len()))?]; + let dst_vpc = &vpc_names + [d.gen_usize(Bound::Included(&0), Bound::Excluded(&vpc_names.len()))?]; + let name = format!("{src_vpc}--{dst_vpc}"); + + vpc_peering_counters.insert( + name.clone(), + VpcPeeringCounters { + name, + src_vpc: src_vpc.clone(), + dst_vpc: dst_vpc.clone(), + ..d.produce::>()?.take() + }, + ); + } + + Some(LegalValue(DataplaneStatus { + bgp: None, // FIXME implement when tests need this field + dataplane_status: None, // FIXME implement when tests need this field + frr_status: Some(d.produce::>()?.take()), + vpc_counters, + vpc_peering_counters, + interface_runtime: HashMap::new(), // FIXME implement when tests need this field + interface_statuses: Vec::new(), // FIXME implement when tests need this field + vpcs: HashMap::new(), // FIXME implement when tests need this field + })) + } + } +} diff --git a/deny.toml b/deny.toml index a3ba79ee9..1e27376b2 100644 --- a/deny.toml +++ b/deny.toml @@ -67,6 +67,7 @@ allow = [ "NCSA", "Nokia", "OFL-1.1", + "OpenSSL", "OSL-1.0", "OSL-2.0", "OSL-2.1", diff --git a/k8s-intf/Cargo.toml b/k8s-intf/Cargo.toml index 02eb902d2..a3efed406 100644 --- a/k8s-intf/Cargo.toml +++ b/k8s-intf/Cargo.toml @@ -22,7 +22,7 @@ futures = { workspace = true } kube = { workspace = true, features = ["client", "derive", "runtime", "rustls-tls"] } k8s-openapi = { workspace = true, features = ["latest", "schemars", "std"] } linkme = { workspace = true } -rustls = { workspace = true, features = ["ring"] } +rustls = { workspace = true, features = ["aws-lc-rs"] } schemars = { workspace = true, features = ["derive", "std"] } serde = { workspace = true } serde-duration-ext = { workspace = true } diff --git a/k8s-intf/build.rs b/k8s-intf/build.rs index 774ff1701..f0477bf9b 100644 --- a/k8s-intf/build.rs +++ b/k8s-intf/build.rs @@ -65,6 +65,9 @@ fn fixup_types(raw: String) -> String { "idle_timeout: Option", "idle_timeout: Option", ) + .replace("b: Option", "b: Option") + .replace("d: Option", "d: Option") + .replace("p: Option", "p: Option") } fn generate_rust_for_crd(crd_content: &str) -> String { diff --git a/k8s-intf/src/client.rs b/k8s-intf/src/client.rs index a78276c4d..637e83d8b 100644 --- a/k8s-intf/src/client.rs +++ b/k8s-intf/src/client.rs @@ -2,13 +2,14 @@ // Copyright Open Network Fabric Authors use futures::{StreamExt, TryStreamExt}; +use kube::api::PostParams; use kube::runtime::{WatchStreamExt, watcher}; use kube::{Api, Client}; use tracectl::trace_target; use tracing::{error, info}; -use crate::gateway_agent_crd::GatewayAgent; +use crate::gateway_agent_crd::{GatewayAgent, GatewayAgentStatus}; trace_target!("k8s-client", LevelFilter::INFO, &["management"]); @@ -20,6 +21,16 @@ pub enum WatchError { WatcherError(#[from] kube::runtime::watcher::Error), } +#[derive(Debug, thiserror::Error)] +pub enum ReplaceStatusError { + #[error("Client error: {0}")] + ClientError(#[from] kube::Error), + #[error("Serialization error: {0}")] + SerializationError(#[from] serde_json::Error), + #[error("Max conflict retries exceeded")] + MaxConflictRetriesExceeded, +} + /// Watch `GatewayAgent` CRD and call callback for all changes /// /// # Errors @@ -63,3 +74,43 @@ pub async fn watch_gateway_agent_crd( } } } + +const NUM_CONFLICT_RETRIES: usize = 3; + +/// Patch `GatewayAgent` object with current status +/// +/// # Errors +/// Returns an error if the patch request fails. +pub async fn replace_gateway_status( + gateway_object_name: &str, + status: &GatewayAgentStatus, +) -> Result<(), ReplaceStatusError> { + let client = Client::try_default().await?; + let api: Api = Api::namespaced(client.clone(), "fab"); + + for attempt_num in 0..NUM_CONFLICT_RETRIES { + let mut status_obj = api.get_status(gateway_object_name).await?; + status_obj.status = Some(status.clone()); + let status_json = serde_json::to_vec(&status_obj)?; + + match api + .replace_status(gateway_object_name, &PostParams::default(), status_json) + .await + { + Ok(_) => break, + Err(kube::Error::Api(api_error)) => { + if api_error.code == 409 { + if attempt_num < NUM_CONFLICT_RETRIES - 1 { + continue; // Try again, resource version conflict + } + return Err(ReplaceStatusError::MaxConflictRetriesExceeded); + } + return Err(kube::Error::Api(api_error).into()); + } + Err(e) => { + return Err(e.into()); + } + } + } + Ok(()) +} diff --git a/k8s-intf/src/generated/gateway_agent_crd.rs b/k8s-intf/src/generated/gateway_agent_crd.rs index ec699f2ab..2879783f8 100644 --- a/k8s-intf/src/generated/gateway_agent_crd.rs +++ b/k8s-intf/src/generated/gateway_agent_crd.rs @@ -247,16 +247,16 @@ pub struct GatewayAgentStatusStateFrr { pub struct GatewayAgentStatusStatePeerings { /// Bytes is the number of bytes sent on the peering #[serde(default, skip_serializing_if = "Option::is_none")] - pub b: Option, + pub b: Option, /// BytesPerSecond is the number of bytes sent per second on the peering #[serde(default, skip_serializing_if = "Option::is_none")] pub bps: Option, /// Drops is the number of packets dropped on the peering #[serde(default, skip_serializing_if = "Option::is_none")] - pub d: Option, + pub d: Option, /// Packets is the number of packets sent on the peering #[serde(default, skip_serializing_if = "Option::is_none")] - pub p: Option, + pub p: Option, /// PktsPerSecond is the number of packets sent per second on the peering #[serde(default, skip_serializing_if = "Option::is_none")] pub pps: Option, @@ -267,12 +267,12 @@ pub struct GatewayAgentStatusStatePeerings { pub struct GatewayAgentStatusStateVpcs { /// Bytes is the number of bytes sent on the vpc #[serde(default, skip_serializing_if = "Option::is_none")] - pub b: Option, + pub b: Option, /// Drops is the number of packets dropped on the vpc #[serde(default, skip_serializing_if = "Option::is_none")] - pub d: Option, + pub d: Option, /// Packets is the number of packets sent on the vpc #[serde(default, skip_serializing_if = "Option::is_none")] - pub p: Option, + pub p: Option, } diff --git a/mgmt/src/processor/k8s_client.rs b/mgmt/src/processor/k8s_client.rs index e8f6b3462..e144dd3d4 100644 --- a/mgmt/src/processor/k8s_client.rs +++ b/mgmt/src/processor/k8s_client.rs @@ -1,12 +1,19 @@ // SPDX-License-Identifier: Apache-2.0 // Copyright Open Network Fabric Authors +use std::time::SystemTime; + +use chrono::{TimeZone, Utc}; +use config::converters::k8s::ToK8sConversionError; use tokio::sync::mpsc::Sender; -use config::{ExternalConfig, GwConfig}; -use k8s_intf::client::WatchError; -use k8s_intf::watch_gateway_agent_crd; -use tracing::error; +use config::converters::k8s::status::dataplane_status::DataplaneStatusForK8sConversion; +use config::{ExternalConfig, GwConfig, internal::status::DataplaneStatus}; +use k8s_intf::client::{ + ReplaceStatusError, WatchError, replace_gateway_status, watch_gateway_agent_crd, +}; +use k8s_intf::gateway_agent_crd::GatewayAgentStatus; +use tracing::{debug, error}; use crate::processor::proc::{ConfigChannelRequest, ConfigRequest, ConfigResponse}; @@ -14,43 +21,209 @@ use crate::processor::proc::{ConfigChannelRequest, ConfigRequest, ConfigResponse pub enum K8sClientError { #[error("K8s client exited early")] EarlyTermination, - #[error("K8s client could not get hostname: {0}")] - HostnameError(#[from] std::io::Error), #[error("K8s watch failed: {0}")] WatchError(#[from] WatchError), + #[error("Failed to convert dataplane status to k8s format: {0}")] + StatusConversionError(#[from] ToK8sConversionError), + #[error("Failed to patch k8s gateway status: {0}")] + ReplaceStatusError(#[from] ReplaceStatusError), } -pub async fn k8s_start_client( - hostname: &str, - tx: Sender, -) -> Result<(), K8sClientError> { - watch_gateway_agent_crd(hostname, async move |ga| { - let external_config = ExternalConfig::try_from(ga); - match external_config { - Ok(external_config) => { - let gw_config = Box::new(GwConfig::new(external_config)); - - let (req, rx) = ConfigChannelRequest::new(ConfigRequest::ApplyConfig(gw_config)); - let tx_result = tx.send(req).await; - if let Err(e) = tx_result { - error!("Failure sending request to config processor: {e}"); - } - match rx.await { - Err(e) => error!("Failure receiving from config processor: {e}"), - Ok(response) => match response { - ConfigResponse::ApplyConfig(Err(e)) => { - error!("Failed to apply config: {e}"); +async fn get_dataplane_status( + tx: &Sender, +) -> Result { + let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetDataplaneStatus); + tx.send(req).await.map_err(|_| { + MgmtStatusError::FetchStatusError("Failure relaying status fetch request".to_string()) + })?; + let response = rx.await.map_err(|e| { + MgmtStatusError::FetchStatusError(format!( + "Failure receiving status from config processor: {e}" + )) + })?; + + match response { + ConfigResponse::GetDataplaneStatus(status) => Ok(*status), + _ => unreachable!(), + } +} + +async fn get_current_config( + tx: &Sender, +) -> Result { + let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetCurrentConfig); + tx.send(req).await.map_err(|_| { + MgmtStatusError::FetchStatusError("Failure relaying get config request".to_string()) + })?; + let response = rx.await.map_err(|e| { + MgmtStatusError::FetchStatusError(format!("Failure receiving config from processor: {e}")) + })?; + match response { + ConfigResponse::GetCurrentConfig(opt_config) => { + opt_config.ok_or(MgmtStatusError::NoConfigApplied) + } + _ => unreachable!(), + } +} + +fn to_datetime(opt_time: Option<&SystemTime>) -> chrono::DateTime { + match opt_time { + Some(time) => chrono::DateTime::::from(*time), + None => Utc.timestamp_opt(0, 0).unwrap(), + } +} + +async fn update_gateway_status(hostname: &str, tx: &Sender) -> () { + let status = get_dataplane_status(tx).await; + + let status = match status { + Ok(status) => status, + Err(err) => { + error!( + "Failed to fetch dataplane status, skipping status update: {}", + err + ); + return; + } + }; + + let (last_applied_gen, last_applied_time) = match get_current_config(tx).await { + Ok(config) => (config.genid(), to_datetime(config.meta.apply_t.as_ref())), + Err(e) => { + error!("Failed to get current config, skipping status update: {e}"); + return; + } + }; + + let k8s_status = match GatewayAgentStatus::try_from(&DataplaneStatusForK8sConversion { + last_applied_gen: Some(last_applied_gen), + last_applied_time: Some(&last_applied_time), + last_collected_time: Some(&chrono::Utc::now()), + status: Some(&status), + }) { + Ok(status) => status, + Err(err) => { + error!("Failed to convert status to GatewayAgentStatus: {err}"); + return; + } + }; + + match replace_gateway_status(hostname, &k8s_status).await { + Ok(()) => (), + Err(err) => { + error!("Failed to update gateway status: {err}"); + } + } +} + +#[derive(Debug, thiserror::Error)] +enum MgmtStatusError { + #[error("Failed to fetch dataplane status: {0}")] + FetchStatusError(String), + #[error("No config is currently applied")] + NoConfigApplied, +} + +pub struct K8sClient { + hostname: String, +} + +impl K8sClient { + pub fn new(hostname: &str) -> Self { + Self { + hostname: hostname.to_string(), + } + } + + pub async fn init(&self) -> Result<(), K8sClientError> { + // Reset the config generation and applied time in K8s + replace_gateway_status( + &self.hostname, + &GatewayAgentStatus { + agent_version: None, + last_applied_gen: Some(0), + last_applied_time: Some( + Utc.timestamp_opt(0, 0) + .unwrap() + .to_rfc3339_opts(chrono::SecondsFormat::Nanos, true), + ), + state: None, + }, + ) + .await?; + Ok(()) + } + + pub async fn k8s_start_config_watch( + &self, + tx: Sender, + ) -> Result<(), K8sClientError> { + // Clone this here so that the closure does not try to borrow self + // and cause K8sClient to not be Send for 'static but only a specific + // lifetime + let hostname = self.hostname.clone(); + watch_gateway_agent_crd(&hostname.clone(), async move |ga| { + let external_config = ExternalConfig::try_from(ga); + match external_config { + Ok(external_config) => { + let genid = external_config.genid; + let current_genid = match get_current_config(&tx).await { + Ok(config) => config.genid(), + Err(e) => match e { + MgmtStatusError::NoConfigApplied => 0, + _ => { + error!("Failed to get current config generation: {e}"); + return; + } } - ConfigResponse::ApplyConfig(Ok(())) => {} - _ => unreachable!(), - }, - }; - } - Err(e) => { - error!("Failed to convert K8sGatewayAgent to ExternalConfig: {e}"); + }; + if current_genid == genid { + debug!("Not applying config, configuration generation unchanged (old={current_genid}, new={genid})"); + return; + } + + let gw_config = Box::new(GwConfig::new(external_config)); + + let (req, rx) = + ConfigChannelRequest::new(ConfigRequest::ApplyConfig(gw_config)); + let tx_result = tx.send(req).await; + if let Err(e) = tx_result { + error!("Failure sending request to config processor: {e}"); + } + match rx.await { + Err(e) => error!("Failure receiving from config processor: {e}"), + Ok(response) => match response { + ConfigResponse::ApplyConfig(Err(e)) => { + error!("Failed to apply config: {e}"); + } + ConfigResponse::ApplyConfig(Ok(())) => { + update_gateway_status(&hostname, &tx).await; + } + _ => unreachable!(), + }, + }; + } + Err(e) => { + error!("Failed to convert K8sGatewayAgent to ExternalConfig: {e}"); + } } + }) + .await?; + Err(K8sClientError::EarlyTermination) + } + + pub async fn k8s_start_status_update( + &self, + tx: Sender, + status_update_interval: &std::time::Duration, + ) -> Result<(), K8sClientError> { + // Clone this here so that the closure does not try to borrow self + // and cause K8sClient to not be Send for 'static but only a specific + // lifetime + let hostname = self.hostname.clone(); + loop { + update_gateway_status(&hostname, &tx).await; + tokio::time::sleep(*status_update_interval).await; } - }) - .await?; - Err(K8sClientError::EarlyTermination) + } } diff --git a/mgmt/src/processor/launch.rs b/mgmt/src/processor/launch.rs index 12a2c004d..2275776f9 100644 --- a/mgmt/src/processor/launch.rs +++ b/mgmt/src/processor/launch.rs @@ -1,8 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // Copyright Open Network Fabric Authors -use crate::processor::k8s_client::K8sClientError; -use crate::processor::k8s_client::k8s_start_client; +use crate::processor::k8s_client::{K8sClient, K8sClientError}; use crate::processor::proc::ConfigChannelRequest; use crate::processor::proc::ConfigProcessor; @@ -19,7 +18,10 @@ use tokio::sync::mpsc::Sender; use tokio_stream::Stream; use tonic::transport::Server; +use futures::future::OptionFuture; + use args::GrpcAddress; +use concurrency::sync::Arc; use tracing::{debug, error, info, warn}; use crate::grpc::server::create_config_service; @@ -187,6 +189,8 @@ pub struct MgmtParams { pub processor_params: ConfigProcessorParams, } +const STATUS_UPDATE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15); + /// Start the mgmt service with either type of socket pub fn start_mgmt( params: MgmtParams, @@ -232,29 +236,56 @@ pub fn start_mgmt( } else { debug!("Will start watching k8s for configuration changes"); rt.block_on(async { + let k8s_client = Arc::new(K8sClient::new(params.hostname.as_str())); let (processor, tx) = ConfigProcessor::new(params.processor_params); - let processor_handle = tokio::spawn(async { processor.run().await }); - let k8s_handle = tokio::spawn(async move { k8s_start_client(params.hostname.as_str(), tx).await }); - tokio::select! { - result = processor_handle => { - match result { - Ok(_) => { - error!("Configuration processor task exited unexpectedly"); - Err(LaunchError::PrematureProcessorExit)? + let tx1 = tx.clone(); + let k8s_client1 = k8s_client.clone(); + + k8s_client.init().await.map_err(|e| { + error!("Failed to initialize k8s state: {}", e); + LaunchError::K8sClientError(e) + })?; + let mut processor_handle = Some(tokio::spawn(async { processor.run().await })); + let mut k8s_config_handle = Some(tokio::spawn(async move { k8s_client.k8s_start_config_watch(tx).await })); + let mut k8s_status_handle = Some(tokio::spawn(async move { + k8s_client1.k8s_start_status_update(tx1, &STATUS_UPDATE_INTERVAL).await + })); + loop { + tokio::select! { + Some(result) = OptionFuture::from(processor_handle.as_mut()) => { + match result { + Ok(_) => { + error!("Configuration processor task exited unexpectedly"); + Err(LaunchError::PrematureProcessorExit)? + } + Err(e) => { Err::<(), LaunchError>(LaunchError::ProcessorJoinError(e)) } } - Err(e) => { Err::<(), LaunchError>(LaunchError::ProcessorJoinError(e)) } } - } - result = k8s_handle => { - match result { - Ok(result) => { result.inspect_err(|e| error!("K8s client task failed: {e}")).map_err(LaunchError::K8sClientError)?; - error!("Kubernetes client task exited unexpectedly"); - Err(LaunchError::PrematureK8sClientExit)? + Some(result) = OptionFuture::from(k8s_config_handle.as_mut()) => { + match result { + Ok(result) => { result.inspect_err(|e| error!("K8s config watch task failed: {e}")).map_err(LaunchError::K8sClientError)?; + error!("Kubernetes config watch task exited unexpectedly"); + Err(LaunchError::PrematureK8sClientExit)? + } + Err(e) => { Err(LaunchError::K8sClientJoinError(e))? } } - Err(e) => { Err(LaunchError::K8sClientJoinError(e))? } } + Some(result) = OptionFuture::from(k8s_status_handle.as_mut()) => { + k8s_status_handle = None; + match result { + Ok(result) => { result.inspect_err(|e| error!("K8s status update task failed: {e}")).map_err(LaunchError::K8sClientError)?; + error!("Kubernetes status update task exited unexpectedly"); + Err(LaunchError::PrematureK8sClientExit)? + } + Err(e) => { Err(LaunchError::K8sClientJoinError(e))? } + } + } + }?; + + if processor_handle.is_none() && k8s_config_handle.is_none() && k8s_status_handle.is_none() { + break; } - }?; + } Ok(()) }) }