Skip to content

Commit 305718b

Browse files
committed
feat(mgmt, dataplane): Add previously unimplmented k8s config processor
Adds a k8s config processor using the k8s-intf watcher client. The watcher needs the hostname as well, so MgmtParams is augmented to accept hostname as a configuration parameter. Signed-off-by: Manish Vachharajani <[email protected]>
1 parent 06c5921 commit 305718b

File tree

6 files changed

+58
-5
lines changed

6 files changed

+58
-5
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dataplane/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ ctrlc = { workspace = true, features = ["termination"] }
1616
dpdk = { workspace = true }
1717
dyn-iter = { workspace = true }
1818
futures = { workspace = true }
19+
hostname = { workspace = true }
1920
hyper = { workspace = true }
2021
hyper-util = { workspace = true }
2122
id = { workspace = true }

dataplane/src/main.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,15 @@ fn main() {
130130
let pipeline_factory = setup.pipeline;
131131

132132
/* start management */
133+
let hostname = hostname::get()
134+
.expect("Failed to get hostname")
135+
.to_str()
136+
.expect("hostname is not valid unicode")
137+
.to_string();
138+
133139
start_mgmt(MgmtParams {
134140
grpc_addr,
141+
hostname,
135142
processor_params: ConfigProcessorParams {
136143
router_ctl: setup.router.get_ctl_tx(),
137144
vpcmapw: setup.vpcmapw,

mgmt/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ concurrency = { workspace = true }
2323
gateway_config = { workspace = true }
2424
id = { workspace = true }
2525
interface-manager = { workspace = true }
26+
k8s-intf = { workspace = true }
2627
lpm = { workspace = true }
2728
nat = { workspace = true }
2829
net = { workspace = true }

mgmt/src/processor/k8s_client.rs

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,54 @@
33

44
use tokio::sync::mpsc::Sender;
55

6-
use crate::processor::proc::ConfigChannelRequest;
6+
use config::{ExternalConfig, GwConfig};
7+
use k8s_intf::client::WatchError;
8+
use k8s_intf::watch_gateway_agent_crd;
9+
use tracing::error;
10+
11+
use crate::processor::proc::{ConfigChannelRequest, ConfigRequest, ConfigResponse};
712

813
#[derive(Debug, thiserror::Error)]
914
pub enum K8sClientError {
10-
// Define error variants here
15+
#[error("K8s client exited early")]
16+
EarlyTermination,
17+
#[error("K8s client could not get hostname: {0}")]
18+
HostnameError(#[from] std::io::Error),
19+
#[error("K8s watch failed: {0}")]
20+
WatchError(#[from] WatchError),
1121
}
1222

13-
pub async fn k8s_start_client(_tx: Sender<ConfigChannelRequest>) -> Result<(), K8sClientError> {
14-
unimplemented!()
23+
pub async fn k8s_start_client(
24+
hostname: &str,
25+
tx: Sender<ConfigChannelRequest>,
26+
) -> Result<(), K8sClientError> {
27+
watch_gateway_agent_crd(hostname, async move |ga| {
28+
let external_config = ExternalConfig::try_from(ga);
29+
match external_config {
30+
Ok(external_config) => {
31+
let gw_config = Box::new(GwConfig::new(external_config));
32+
33+
let (req, rx) = ConfigChannelRequest::new(ConfigRequest::ApplyConfig(gw_config));
34+
let tx_result = tx.send(req).await;
35+
if let Err(e) = tx_result {
36+
error!("Failure sending request to config processor: {e}");
37+
}
38+
match rx.await {
39+
Err(e) => error!("Failure receiving from config processor: {e}"),
40+
Ok(response) => match response {
41+
ConfigResponse::ApplyConfig(Err(e)) => {
42+
error!("Failed to apply config: {e}");
43+
}
44+
ConfigResponse::ApplyConfig(Ok(())) => {}
45+
_ => unreachable!(),
46+
},
47+
};
48+
}
49+
Err(e) => {
50+
error!("Failed to convert K8sGatewayAgent to ExternalConfig: {e}");
51+
}
52+
}
53+
})
54+
.await?;
55+
Err(K8sClientError::EarlyTermination)
1556
}

mgmt/src/processor/launch.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ impl Display for ServerAddress {
183183

184184
pub struct MgmtParams {
185185
pub grpc_addr: Option<GrpcAddress>,
186+
pub hostname: String,
186187
pub processor_params: ConfigProcessorParams,
187188
}
188189

@@ -233,7 +234,7 @@ pub fn start_mgmt(
233234
rt.block_on(async {
234235
let (processor, tx) = ConfigProcessor::new(params.processor_params);
235236
let processor_handle = tokio::spawn(async { processor.run().await });
236-
let k8s_handle = tokio::spawn(async move { k8s_start_client(tx).await });
237+
let k8s_handle = tokio::spawn(async move { k8s_start_client(params.hostname.as_str(), tx).await });
237238
tokio::select! {
238239
result = processor_handle => {
239240
match result {

0 commit comments

Comments
 (0)