Skip to content

Commit 5664045

Browse files
committed
feat(mgmt): Add task to periodically patch status for gateway
Adds a tokio task to periodically update the status directly with the k8s API Signed-off-by: Manish Vachharajani <[email protected]>
1 parent e0eba8d commit 5664045

File tree

2 files changed

+204
-54
lines changed

2 files changed

+204
-54
lines changed

mgmt/src/processor/k8s_client.rs

Lines changed: 156 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// Copyright Open Network Fabric Authors
33

4+
use chrono::{TimeZone, Utc};
5+
use config::converters::k8s::ToK8sConversionError;
46
use tokio::sync::mpsc::Sender;
57

6-
use config::{ExternalConfig, GwConfig};
7-
use k8s_intf::client::WatchError;
8-
use k8s_intf::watch_gateway_agent_crd;
8+
use config::converters::k8s::status::dataplane_status::DataplaneStatusForK8sConversion;
9+
use config::{ExternalConfig, GwConfig, internal::status::DataplaneStatus};
10+
use k8s_intf::client::{PatchError, WatchError, patch_gateway_status, watch_gateway_agent_crd};
11+
use k8s_intf::gateway_agent_crd::GatewayAgentStatus;
912
use tracing::error;
1013

1114
use crate::processor::proc::{ConfigChannelRequest, ConfigRequest, ConfigResponse};
@@ -14,43 +17,161 @@ use crate::processor::proc::{ConfigChannelRequest, ConfigRequest, ConfigResponse
1417
pub enum K8sClientError {
1518
#[error("K8s client exited early")]
1619
EarlyTermination,
17-
#[error("K8s client could not get hostname: {0}")]
18-
HostnameError(#[from] std::io::Error),
1920
#[error("K8s watch failed: {0}")]
2021
WatchError(#[from] WatchError),
22+
#[error("Failed to convert dataplane status to k8s format: {0}")]
23+
StatusConversionError(#[from] ToK8sConversionError),
24+
#[error("Failed to patch k8s gateway status: {0}")]
25+
PatchStatusError(#[from] PatchError),
2126
}
2227

23-
pub async fn k8s_start_client(
24-
hostname: &str,
25-
tx: Sender<ConfigChannelRequest>,
26-
) -> Result<(), K8sClientError> {
27-
watch_gateway_agent_crd(hostname, async move |ga| {
28-
let external_config = ExternalConfig::try_from(ga);
29-
match external_config {
30-
Ok(external_config) => {
31-
let gw_config = Box::new(GwConfig::new(external_config));
32-
33-
let (req, rx) = ConfigChannelRequest::new(ConfigRequest::ApplyConfig(gw_config));
34-
let tx_result = tx.send(req).await;
35-
if let Err(e) = tx_result {
36-
error!("Failure sending request to config processor: {e}");
28+
async fn get_dataplane_status(
29+
tx: &Sender<ConfigChannelRequest>,
30+
) -> Result<DataplaneStatus, MgmtStatusError> {
31+
let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetDataplaneStatus);
32+
tx.send(req).await.map_err(|_| {
33+
MgmtStatusError::FetchStatusError("Failure relaying status fetch request".to_string())
34+
})?;
35+
let response = rx.await.map_err(|_| {
36+
MgmtStatusError::FetchStatusError(
37+
"Failure receiving status from config processor".to_string(),
38+
)
39+
})?;
40+
41+
match response {
42+
ConfigResponse::GetDataplaneStatus(status) => Ok(*status),
43+
_ => unreachable!(),
44+
}
45+
}
46+
47+
#[derive(Debug, thiserror::Error)]
48+
enum MgmtStatusError {
49+
#[error("Failed to fetch dataplane status: {0}")]
50+
FetchStatusError(String),
51+
}
52+
53+
pub struct K8sClient {
54+
hostname: String,
55+
}
56+
57+
impl K8sClient {
58+
pub fn new(hostname: &str) -> Self {
59+
Self {
60+
hostname: hostname.to_string(),
61+
}
62+
}
63+
64+
pub async fn init(&self) -> Result<(), K8sClientError> {
65+
// Reset the config generation and applied time in K8s
66+
patch_gateway_status(
67+
&self.hostname,
68+
&GatewayAgentStatus {
69+
agent_version: Some("(none: agentless)".to_string()),
70+
last_applied_gen: Some(0),
71+
last_applied_time: Some(
72+
Utc.timestamp_opt(0, 0)
73+
.unwrap()
74+
.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true),
75+
),
76+
state: None,
77+
},
78+
)
79+
.await?;
80+
Ok(())
81+
}
82+
83+
pub async fn k8s_start_config_watch(
84+
&self,
85+
tx: Sender<ConfigChannelRequest>,
86+
) -> Result<(), K8sClientError> {
87+
// Clone this here so that the closure does not try to borrow self
88+
// and cause K8sClient to not be Send for 'static but only a specific
89+
// lifetime
90+
let hostname = self.hostname.clone();
91+
watch_gateway_agent_crd(&hostname.clone(), async move |ga| {
92+
let external_config = ExternalConfig::try_from(ga);
93+
match external_config {
94+
Ok(external_config) => {
95+
let genid = external_config.genid;
96+
let gw_config = Box::new(GwConfig::new(external_config));
97+
98+
let (req, rx) =
99+
ConfigChannelRequest::new(ConfigRequest::ApplyConfig(gw_config));
100+
let tx_result = tx.send(req).await;
101+
if let Err(e) = tx_result {
102+
error!("Failure sending request to config processor: {e}");
103+
}
104+
match rx.await {
105+
Err(e) => error!("Failure receiving from config processor: {e}"),
106+
Ok(response) => match response {
107+
ConfigResponse::ApplyConfig(Err(e)) => {
108+
error!("Failed to apply config: {e}");
109+
}
110+
ConfigResponse::ApplyConfig(Ok(())) => {
111+
let last_applied_time = Some(chrono::Utc::now());
112+
let k8s_status = match GatewayAgentStatus::try_from(
113+
&DataplaneStatusForK8sConversion {
114+
last_applied_gen: Some(genid),
115+
last_applied_time: last_applied_time.as_ref(),
116+
last_collected_time: None,
117+
status: None,
118+
},
119+
) {
120+
Ok(v) => Some(v),
121+
Err(e) => { error!("Unable to build object to patch k8s status with applied generation: {e}"); None }
122+
123+
};
124+
125+
if let Some(k8s_status) = k8s_status {
126+
match patch_gateway_status(&hostname, &k8s_status).await {
127+
Ok(()) => {},
128+
Err(e) => {error!("Unable to patch k8s last_applied_gen and timestamp: {e}"); }
129+
}
130+
}
131+
}
132+
_ => unreachable!(),
133+
},
134+
};
135+
}
136+
Err(e) => {
137+
error!("Failed to convert K8sGatewayAgent to ExternalConfig: {e}");
37138
}
38-
match rx.await {
39-
Err(e) => error!("Failure receiving from config processor: {e}"),
40-
Ok(response) => match response {
41-
ConfigResponse::ApplyConfig(Err(e)) => {
42-
error!("Failed to apply config: {e}");
43-
}
44-
ConfigResponse::ApplyConfig(Ok(())) => {}
45-
_ => unreachable!(),
46-
},
47-
};
48-
}
49-
Err(e) => {
50-
error!("Failed to convert K8sGatewayAgent to ExternalConfig: {e}");
51139
}
140+
})
141+
.await?;
142+
Err(K8sClientError::EarlyTermination)
143+
}
144+
145+
pub async fn k8s_start_status_update(
146+
&self,
147+
tx: Sender<ConfigChannelRequest>,
148+
status_update_interval: &std::time::Duration,
149+
) -> Result<(), K8sClientError> {
150+
// Clone this here so that the closure does not try to borrow self
151+
// and cause K8sClient to not be Send for 'static but only a specific
152+
// lifetime
153+
let hostname = self.hostname.clone();
154+
loop {
155+
let status = get_dataplane_status(&tx).await;
156+
157+
let status = match status {
158+
Ok(status) => status,
159+
Err(err) => {
160+
error!("Failed to fetch dataplane status: {}", err);
161+
continue;
162+
}
163+
};
164+
165+
let k8s_status = GatewayAgentStatus::try_from(&DataplaneStatusForK8sConversion {
166+
last_applied_gen: None,
167+
last_applied_time: None,
168+
last_collected_time: Some(&chrono::Utc::now()),
169+
status: Some(&status),
170+
})?;
171+
patch_gateway_status(&hostname, &k8s_status).await?;
172+
173+
// Process status update
174+
tokio::time::sleep(*status_update_interval).await;
52175
}
53-
})
54-
.await?;
55-
Err(K8sClientError::EarlyTermination)
176+
}
56177
}

mgmt/src/processor/launch.rs

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// Copyright Open Network Fabric Authors
33

4-
use crate::processor::k8s_client::K8sClientError;
5-
use crate::processor::k8s_client::k8s_start_client;
4+
use crate::processor::k8s_client::{K8sClient, K8sClientError};
65
use crate::processor::proc::ConfigChannelRequest;
76
use crate::processor::proc::ConfigProcessor;
87

@@ -19,7 +18,10 @@ use tokio::sync::mpsc::Sender;
1918
use tokio_stream::Stream;
2019
use tonic::transport::Server;
2120

21+
use futures::future::OptionFuture;
22+
2223
use args::GrpcAddress;
24+
use concurrency::sync::Arc;
2325
use tracing::{debug, error, info, warn};
2426

2527
use crate::grpc::server::create_config_service;
@@ -187,6 +189,8 @@ pub struct MgmtParams {
187189
pub processor_params: ConfigProcessorParams,
188190
}
189191

192+
const STATUS_UPDATE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);
193+
190194
/// Start the mgmt service with either type of socket
191195
pub fn start_mgmt(
192196
params: MgmtParams,
@@ -232,29 +236,54 @@ pub fn start_mgmt(
232236
} else {
233237
debug!("Will start watching k8s for configuration changes");
234238
rt.block_on(async {
239+
let k8s_client = Arc::new(K8sClient::new(params.hostname.as_str()));
235240
let (processor, tx) = ConfigProcessor::new(params.processor_params);
236-
let processor_handle = tokio::spawn(async { processor.run().await });
237-
let k8s_handle = tokio::spawn(async move { k8s_start_client(params.hostname.as_str(), tx).await });
238-
tokio::select! {
239-
result = processor_handle => {
240-
match result {
241-
Ok(_) => {
242-
error!("Configuration processor task exited unexpectedly");
243-
Err(LaunchError::PrematureProcessorExit)?
241+
let tx1 = tx.clone();
242+
let k8s_client1 = k8s_client.clone();
243+
244+
k8s_client.init().await.map_err(|e| {
245+
error!("Failed to initialize k8s state: {}", e);
246+
LaunchError::K8sClientError(e)
247+
})?;
248+
let mut processor_handle = Some(tokio::spawn(async { processor.run().await }));
249+
let mut k8s_config_handle = Some(tokio::spawn(async move { k8s_client.k8s_start_config_watch(tx).await }));
250+
let mut k8s_status_handle = Some(tokio::spawn(async move { k8s_client1.k8s_start_status_update(tx1, &STATUS_UPDATE_INTERVAL).await }));
251+
loop {
252+
tokio::select! {
253+
Some(result) = OptionFuture::from(processor_handle.as_mut()) => {
254+
match result {
255+
Ok(_) => {
256+
error!("Configuration processor task exited unexpectedly");
257+
Err(LaunchError::PrematureProcessorExit)?
258+
}
259+
Err(e) => { Err::<(), LaunchError>(LaunchError::ProcessorJoinError(e)) }
244260
}
245-
Err(e) => { Err::<(), LaunchError>(LaunchError::ProcessorJoinError(e)) }
246261
}
247-
}
248-
result = k8s_handle => {
249-
match result {
250-
Ok(result) => { result.inspect_err(|e| error!("K8s client task failed: {e}")).map_err(LaunchError::K8sClientError)?;
251-
error!("Kubernetes client task exited unexpectedly");
252-
Err(LaunchError::PrematureK8sClientExit)?
262+
Some(result) = OptionFuture::from(k8s_config_handle.as_mut()) => {
263+
match result {
264+
Ok(result) => { result.inspect_err(|e| error!("K8s config watch task failed: {e}")).map_err(LaunchError::K8sClientError)?;
265+
error!("Kubernetes config watch task exited unexpectedly");
266+
Err(LaunchError::PrematureK8sClientExit)?
267+
}
268+
Err(e) => { Err(LaunchError::K8sClientJoinError(e))? }
253269
}
254-
Err(e) => { Err(LaunchError::K8sClientJoinError(e))? }
255270
}
271+
Some(result) = OptionFuture::from(k8s_status_handle.as_mut()) => {
272+
k8s_status_handle = None;
273+
match result {
274+
Ok(result) => { result.inspect_err(|e| error!("K8s status update task failed: {e}")).map_err(LaunchError::K8sClientError)?;
275+
error!("Kubernetes status update task exited unexpectedly");
276+
Err(LaunchError::PrematureK8sClientExit)?
277+
}
278+
Err(e) => { Err(LaunchError::K8sClientJoinError(e))? }
279+
}
280+
}
281+
}?;
282+
283+
if processor_handle.is_none() && k8s_config_handle.is_none() && k8s_status_handle.is_none() {
284+
break;
256285
}
257-
}?;
286+
}
258287
Ok(())
259288
})
260289
}

0 commit comments

Comments
 (0)