Skip to content

Commit 3615b26

Browse files
committed
WIP
1 parent e908347 commit 3615b26

File tree

8 files changed

+461
-11
lines changed

8 files changed

+461
-11
lines changed

Cargo.lock

Lines changed: 328 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ etherparse = { version = "0.19.0", default-features = false, features = [] }
110110
fixin = { git = "https://github.com/githedgehog/fixin", branch = "main", features = [] }
111111
futures = { version = "0.3.31", default-features = false, features = [] }
112112
hashbrown = { version = "0.16.1", default-features = false, features = [] }
113+
hostname = { version = "0.4.2", default-features = false, features = [] }
113114
hwlocality = { version = "1.0.0-alpha.11", default-features = false, features = [] }
114115
hyper = { version = "1.8.1", default-features = false, features = [] }
115116
hyper-util = { version = "0.1.19", default-features = false, features = [] }
@@ -152,6 +153,7 @@ rand = { version = "0.9.2", default-features = false, features = [] }
152153
rkyv = { version = "0.8.12", default-features = false, features = [] }
153154
roaring = { version = "0.11.2", default-features = false, features = [] }
154155
rtnetlink = { git = "https://github.com/githedgehog/rtnetlink.git", branch = "hh/tc-actions3", default-features = false, features = [] }
156+
rustls = { version = "0.23.35", default-features = false, features = [] }
155157
rustyline = { version = "17.0.2", default-features = false, features = [] }
156158
schemars = { version = "1", default-features = false, features = [] }
157159
serde = { version = "1.0.228", default-features = false, features = [] }

k8s-intf/Cargo.toml

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,25 @@ bolero = ["dep:bolero", "dep:hardware", "dep:net", "dep:lpm", "net/test_buffer",
1010

1111
[dependencies]
1212
# internal
13+
hardware = { workspace = true, optional = true, features = ["bolero"] }
14+
lpm = { workspace = true, optional = true }
15+
net = { workspace = true, optional = true, features = ["bolero"] }
16+
tracectl = { workspace = true }
17+
tracing = { workspace = true }
1318

1419
# external
1520
bolero = { workspace = true, optional = true }
16-
hardware = { workspace = true, optional = true, features = [] }
17-
kube = { workspace = true, features = ["derive"] }
21+
futures = { workspace = true }
22+
kube = { workspace = true, features = ["client", "derive", "runtime", "rustls-tls"] }
1823
k8s-openapi = { workspace = true, features = ["latest", "schemars", "std"] }
19-
lpm = { workspace = true, optional = true }
20-
net = { workspace = true, optional = true, features = [] }
24+
linkme = { workspace = true }
25+
rustls = { workspace = true, features = ["ring"] }
2126
schemars = { workspace = true, features = ["derive", "std"] }
2227
serde = { workspace = true }
2328
serde-duration-ext = { workspace = true }
2429
serde_json = { workspace = true }
30+
thiserror = { workspace = true }
31+
tokio = { workspace = true }
2532

2633
[dev-dependencies]
2734
bolero = { workspace = true }

k8s-intf/src/client.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// Copyright Open Network Fabric Authors
3+
4+
use futures::{StreamExt, TryStreamExt};
5+
use kube::runtime::{WatchStreamExt, watcher};
6+
use kube::{Api, Client};
7+
8+
use tracectl::trace_target;
9+
use tracing::{error, info};
10+
11+
use crate::gateway_agent_crd::GatewayAgent;
12+
13+
trace_target!("k8s-client", LevelFilter::INFO, &["management"]);
14+
15+
#[derive(Debug, thiserror::Error)]
16+
pub enum WatchError {
17+
#[error("Client error: {0}")]
18+
ClientError(#[from] kube::Error),
19+
#[error("Watcher error: {0}")]
20+
WatcherError(#[from] kube::runtime::watcher::Error),
21+
}
22+
23+
/// Watch `GatewayAgent` CRD and call callback for all changes
24+
///
25+
/// # Errors
26+
/// Returns an error if the watch fails to start
27+
pub async fn watch_gateway_agent_crd(
28+
gateway_object_name: &str,
29+
callback: impl AsyncFn(&GatewayAgent),
30+
) -> Result<(), WatchError> {
31+
let client = Client::try_default().await?;
32+
// Relevant gateway agent objects are in the "fab" namespace
33+
let gws: Api<GatewayAgent> = Api::namespaced(client.clone(), "fab");
34+
35+
info!("Starting K8s GatewayAgent watcher...");
36+
37+
let watch_config = watcher::Config {
38+
// The service account for this gateway only has access to its corresponding
39+
// gateway agent object, so specifically filter for that to avoid an auth error
40+
// and to not apply incorrect configurations intended for other gateways
41+
field_selector: Some(format!("metadata.name={gateway_object_name}")),
42+
// The default initial list strategy attempts to list all gateway objects via the k8s
43+
// api and then filters them locally. But, the service account for this gateway does
44+
// not have permission to list all gateway objects. Instead, we use the streaming list
45+
// initial list strategy which directly calls the k8s watch api with the appropriate
46+
// watch config that includes the field selector.
47+
initial_list_strategy: watcher::InitialListStrategy::StreamingList,
48+
..Default::default()
49+
};
50+
let mut stream = watcher(gws, watch_config)
51+
.default_backoff()
52+
.applied_objects()
53+
.boxed();
54+
55+
loop {
56+
match stream.try_next().await {
57+
Ok(Some(ga)) => callback(&ga).await,
58+
Ok(None) => {}
59+
// Should we check for retriable vs non-retriable errors here?
60+
Err(err) => {
61+
error!("Watcher error: {err}");
62+
}
63+
}
64+
}
65+
}

k8s-intf/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@
77

88
#[cfg(any(test, feature = "bolero"))]
99
pub mod bolero;
10+
pub mod client;
1011
pub mod generated;
1112

1213
pub mod gateway_agent_crd {
1314
pub use crate::generated::gateway_agent_crd::*;
1415
}
16+
17+
pub use client::watch_gateway_agent_crd;

mgmt/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ concurrency = { workspace = true }
2323
gateway_config = { workspace = true }
2424
id = { workspace = true }
2525
interface-manager = { workspace = true }
26+
k8s-intf = { workspace = true }
2627
lpm = { workspace = true }
2728
nat = { workspace = true }
2829
net = { workspace = true }
@@ -42,6 +43,7 @@ caps = { workspace = true, default-features = false, features = [] }
4243
chrono = { workspace = true }
4344
derive_builder = { workspace = true, default-features = false, features = ["default"] }
4445
futures = { workspace = true, features = ["default"] }
46+
hostname = { workspace = true }
4547
linkme = { workspace = true }
4648
multi_index_map = { workspace = true, features = ["serde"] }
4749
netdev = { workspace = true }

mgmt/src/processor/k8s_client.rs

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,58 @@
33

44
use tokio::sync::mpsc::Sender;
55

6-
use crate::processor::proc::ConfigChannelRequest;
6+
use config::{ExternalConfig, GwConfig};
7+
use k8s_intf::client::WatchError;
8+
use k8s_intf::watch_gateway_agent_crd;
9+
use tracing::error;
10+
11+
use crate::processor::proc::{ConfigChannelRequest, ConfigRequest, ConfigResponse};
712

813
#[derive(Debug, thiserror::Error)]
914
pub enum K8sClientError {
10-
// Define error variants here
15+
#[error("K8s client exited early")]
16+
EarlyTermination,
17+
#[error("K8s client could not get hostname: {0}")]
18+
HostnameError(#[from] std::io::Error),
19+
#[error("K8s watch failed: {0}")]
20+
WatchError(#[from] WatchError),
1121
}
1222

13-
pub async fn k8s_start_client(_tx: Sender<ConfigChannelRequest>) -> Result<(), K8sClientError> {
14-
unimplemented!()
23+
pub async fn k8s_start_client(tx: Sender<ConfigChannelRequest>) -> Result<(), K8sClientError> {
24+
let hn = hostname::get()?
25+
.to_str()
26+
.ok_or(K8sClientError::HostnameError(std::io::Error::other(
27+
"hostname is not valid unicode",
28+
)))?
29+
.to_string();
30+
31+
watch_gateway_agent_crd(&hn, async move |ga| {
32+
let external_config = ExternalConfig::try_from(ga);
33+
match external_config {
34+
Ok(external_config) => {
35+
let gw_config = Box::new(GwConfig::new(external_config));
36+
37+
let (req, rx) = ConfigChannelRequest::new(ConfigRequest::ApplyConfig(gw_config));
38+
let tx_result = tx.send(req).await;
39+
if let Err(e) = tx_result {
40+
error!("Failure sending request to config processor: {e}");
41+
}
42+
match rx.await {
43+
Err(e) => error!("Failure receiving from config processor: {e}"),
44+
Ok(response) => match response {
45+
ConfigResponse::ApplyConfig(Err(e)) => {
46+
error!("Failed to apply config: {e}");
47+
}
48+
ConfigResponse::ApplyConfig(Ok(())) => {}
49+
_ => unreachable!(),
50+
},
51+
};
52+
}
53+
Err(e) => {
54+
error!("Failed to convert K8sGatewayAgent to ExternalConfig: {e}");
55+
}
56+
}
57+
})
58+
.await?;
59+
Err(K8sClientError::EarlyTermination)
1560
}

mgmt/src/processor/launch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ pub fn start_mgmt(
233233
rt.block_on(async {
234234
let (processor, tx) = ConfigProcessor::new(params.processor_params);
235235
let processor_handle = tokio::spawn(async { processor.run().await });
236-
let k8s_handle = tokio::spawn(async move { k8s_start_client(tx.clone()).await });
236+
let k8s_handle = tokio::spawn(async move { k8s_start_client(tx).await });
237237
tokio::select! {
238238
result = processor_handle => {
239239
match result {

0 commit comments

Comments
 (0)