Skip to content

Commit 6078874

Browse files
committed
feat(mgmt): Add task to periodically patch status for gateway
Adds a tokio task to periodically update the status directly with the k8s API. Also patches the status whenever a new config generation is applied. Also fixes an issue where the same config generation was repeatedly applied. Signed-off-by: Manish Vachharajani <[email protected]>
1 parent 720661c commit 6078874

File tree

2 files changed

+259
-55
lines changed

2 files changed

+259
-55
lines changed

mgmt/src/processor/k8s_client.rs

Lines changed: 209 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,229 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// Copyright Open Network Fabric Authors
33

4+
use std::time::SystemTime;
5+
6+
use chrono::{TimeZone, Utc};
7+
use config::converters::k8s::ToK8sConversionError;
48
use tokio::sync::mpsc::Sender;
59

6-
use config::{ExternalConfig, GwConfig};
7-
use k8s_intf::client::WatchError;
8-
use k8s_intf::watch_gateway_agent_crd;
9-
use tracing::error;
10+
use config::converters::k8s::status::dataplane_status::DataplaneStatusForK8sConversion;
11+
use config::{ExternalConfig, GwConfig, internal::status::DataplaneStatus};
12+
use k8s_intf::client::{
13+
ReplaceStatusError, WatchError, replace_gateway_status, watch_gateway_agent_crd,
14+
};
15+
use k8s_intf::gateway_agent_crd::GatewayAgentStatus;
16+
use tracing::{debug, error};
1017

1118
use crate::processor::proc::{ConfigChannelRequest, ConfigRequest, ConfigResponse};
1219

1320
#[derive(Debug, thiserror::Error)]
1421
pub enum K8sClientError {
1522
#[error("K8s client exited early")]
1623
EarlyTermination,
17-
#[error("K8s client could not get hostname: {0}")]
18-
HostnameError(#[from] std::io::Error),
1924
#[error("K8s watch failed: {0}")]
2025
WatchError(#[from] WatchError),
26+
#[error("Failed to convert dataplane status to k8s format: {0}")]
27+
StatusConversionError(#[from] ToK8sConversionError),
28+
#[error("Failed to patch k8s gateway status: {0}")]
29+
ReplaceStatusError(#[from] ReplaceStatusError),
2130
}
2231

23-
pub async fn k8s_start_client(
24-
hostname: &str,
25-
tx: Sender<ConfigChannelRequest>,
26-
) -> Result<(), K8sClientError> {
27-
watch_gateway_agent_crd(hostname, async move |ga| {
28-
let external_config = ExternalConfig::try_from(ga);
29-
match external_config {
30-
Ok(external_config) => {
31-
let gw_config = Box::new(GwConfig::new(external_config));
32-
33-
let (req, rx) = ConfigChannelRequest::new(ConfigRequest::ApplyConfig(gw_config));
34-
let tx_result = tx.send(req).await;
35-
if let Err(e) = tx_result {
36-
error!("Failure sending request to config processor: {e}");
37-
}
38-
match rx.await {
39-
Err(e) => error!("Failure receiving from config processor: {e}"),
40-
Ok(response) => match response {
41-
ConfigResponse::ApplyConfig(Err(e)) => {
42-
error!("Failed to apply config: {e}");
32+
async fn get_dataplane_status(
33+
tx: &Sender<ConfigChannelRequest>,
34+
) -> Result<DataplaneStatus, MgmtStatusError> {
35+
let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetDataplaneStatus);
36+
tx.send(req).await.map_err(|_| {
37+
MgmtStatusError::FetchStatusError("Failure relaying status fetch request".to_string())
38+
})?;
39+
let response = rx.await.map_err(|_| {
40+
MgmtStatusError::FetchStatusError(
41+
"Failure receiving status from config processor".to_string(),
42+
)
43+
})?;
44+
45+
match response {
46+
ConfigResponse::GetDataplaneStatus(status) => Ok(*status),
47+
_ => unreachable!(),
48+
}
49+
}
50+
51+
async fn get_current_config(
52+
tx: &Sender<ConfigChannelRequest>,
53+
) -> Result<GwConfig, MgmtStatusError> {
54+
let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetCurrentConfig);
55+
tx.send(req).await.map_err(|_| {
56+
MgmtStatusError::FetchStatusError("Failure relaying get config request".to_string())
57+
})?;
58+
let response = rx.await.map_err(|_| {
59+
MgmtStatusError::FetchStatusError("Failure receiving config from processor".to_string())
60+
})?;
61+
match response {
62+
ConfigResponse::GetCurrentConfig(opt_config) => {
63+
opt_config.ok_or(MgmtStatusError::NoConfigApplied)
64+
}
65+
_ => unreachable!(),
66+
}
67+
}
68+
69+
fn to_datetime(opt_time: Option<&SystemTime>) -> chrono::DateTime<Utc> {
70+
match opt_time {
71+
Some(time) => chrono::DateTime::<Utc>::from(*time),
72+
None => Utc.timestamp_opt(0, 0).unwrap(),
73+
}
74+
}
75+
76+
async fn update_gateway_status(hostname: &str, tx: &Sender<ConfigChannelRequest>) -> () {
77+
let status = get_dataplane_status(tx).await;
78+
79+
let status = match status {
80+
Ok(status) => status,
81+
Err(err) => {
82+
error!(
83+
"Failed to fetch dataplane status, skipping status update: {}",
84+
err
85+
);
86+
return;
87+
}
88+
};
89+
90+
let (last_applied_gen, last_applied_time) = match get_current_config(tx).await {
91+
Ok(config) => (config.genid(), to_datetime(config.meta.apply_t.as_ref())),
92+
Err(e) => {
93+
error!("Failed to get current config, skipping status update: {e}");
94+
return;
95+
}
96+
};
97+
98+
let k8s_status = match GatewayAgentStatus::try_from(&DataplaneStatusForK8sConversion {
99+
last_applied_gen: Some(last_applied_gen),
100+
last_applied_time: Some(&last_applied_time),
101+
last_collected_time: Some(&chrono::Utc::now()),
102+
status: Some(&status),
103+
}) {
104+
Ok(status) => status,
105+
Err(err) => {
106+
error!("Failed to convert status to GatewayAgentStatus: {err}");
107+
return;
108+
}
109+
};
110+
111+
match replace_gateway_status(hostname, &k8s_status).await {
112+
Ok(()) => (),
113+
Err(err) => {
114+
error!("Failed to update gateway status: {err}");
115+
}
116+
}
117+
}
118+
119+
#[derive(Debug, thiserror::Error)]
120+
enum MgmtStatusError {
121+
#[error("Failed to fetch dataplane status: {0}")]
122+
FetchStatusError(String),
123+
#[error("No config is currently applied")]
124+
NoConfigApplied,
125+
}
126+
127+
pub struct K8sClient {
128+
hostname: String,
129+
}
130+
131+
impl K8sClient {
132+
pub fn new(hostname: &str) -> Self {
133+
Self {
134+
hostname: hostname.to_string(),
135+
}
136+
}
137+
138+
pub async fn init(&self) -> Result<(), K8sClientError> {
139+
// Reset the config generation and applied time in K8s
140+
replace_gateway_status(
141+
&self.hostname,
142+
&GatewayAgentStatus {
143+
agent_version: None,
144+
last_applied_gen: Some(0),
145+
last_applied_time: Some(
146+
Utc.timestamp_opt(0, 0)
147+
.unwrap()
148+
.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true),
149+
),
150+
state: None,
151+
},
152+
)
153+
.await?;
154+
Ok(())
155+
}
156+
157+
pub async fn k8s_start_config_watch(
158+
&self,
159+
tx: Sender<ConfigChannelRequest>,
160+
) -> Result<(), K8sClientError> {
161+
// Clone this here so that the closure does not try to borrow self
162+
// and cause K8sClient to not be Send for 'static but only a specific
163+
// lifetime
164+
let hostname = self.hostname.clone();
165+
watch_gateway_agent_crd(&hostname.clone(), async move |ga| {
166+
let external_config = ExternalConfig::try_from(ga);
167+
match external_config {
168+
Ok(external_config) => {
169+
let genid = external_config.genid;
170+
let current_genid = match get_current_config(&tx).await {
171+
Ok(config) => config.genid(),
172+
Err(e) => match e {
173+
MgmtStatusError::NoConfigApplied => 0,
174+
_ => {
175+
error!("Failed to get current config generation: {e}");
176+
return;
177+
}
43178
}
44-
ConfigResponse::ApplyConfig(Ok(())) => {}
45-
_ => unreachable!(),
46-
},
47-
};
48-
}
49-
Err(e) => {
50-
error!("Failed to convert K8sGatewayAgent to ExternalConfig: {e}");
179+
};
180+
if current_genid == genid {
181+
debug!("Not applying config, configuration generation unchanged (old={current_genid}, new={genid})");
182+
return;
183+
}
184+
185+
let gw_config = Box::new(GwConfig::new(external_config));
186+
187+
let (req, rx) =
188+
ConfigChannelRequest::new(ConfigRequest::ApplyConfig(gw_config));
189+
let tx_result = tx.send(req).await;
190+
if let Err(e) = tx_result {
191+
error!("Failure sending request to config processor: {e}");
192+
}
193+
match rx.await {
194+
Err(e) => error!("Failure receiving from config processor: {e}"),
195+
Ok(response) => match response {
196+
ConfigResponse::ApplyConfig(Err(e)) => {
197+
error!("Failed to apply config: {e}");
198+
}
199+
ConfigResponse::ApplyConfig(Ok(())) => {
200+
update_gateway_status(&hostname, &tx).await;
201+
}
202+
_ => unreachable!(),
203+
},
204+
};
205+
}
206+
Err(e) => {
207+
error!("Failed to convert K8sGatewayAgent to ExternalConfig: {e}");
208+
}
51209
}
210+
})
211+
.await?;
212+
Err(K8sClientError::EarlyTermination)
213+
}
214+
215+
pub async fn k8s_start_status_update(
216+
&self,
217+
tx: Sender<ConfigChannelRequest>,
218+
status_update_interval: &std::time::Duration,
219+
) -> Result<(), K8sClientError> {
220+
// Clone this here so that the closure does not try to borrow self
221+
// and cause K8sClient to not be Send for 'static but only a specific
222+
// lifetime
223+
let hostname = self.hostname.clone();
224+
loop {
225+
update_gateway_status(&hostname, &tx).await;
226+
tokio::time::sleep(*status_update_interval).await;
52227
}
53-
})
54-
.await?;
55-
Err(K8sClientError::EarlyTermination)
228+
}
56229
}

mgmt/src/processor/launch.rs

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// Copyright Open Network Fabric Authors
33

4-
use crate::processor::k8s_client::K8sClientError;
5-
use crate::processor::k8s_client::k8s_start_client;
4+
use crate::processor::k8s_client::{K8sClient, K8sClientError};
65
use crate::processor::proc::ConfigChannelRequest;
76
use crate::processor::proc::ConfigProcessor;
87

@@ -19,7 +18,10 @@ use tokio::sync::mpsc::Sender;
1918
use tokio_stream::Stream;
2019
use tonic::transport::Server;
2120

21+
use futures::future::OptionFuture;
22+
2223
use args::GrpcAddress;
24+
use concurrency::sync::Arc;
2325
use tracing::{debug, error, info, warn};
2426

2527
use crate::grpc::server::create_config_service;
@@ -187,6 +189,8 @@ pub struct MgmtParams {
187189
pub processor_params: ConfigProcessorParams,
188190
}
189191

192+
const STATUS_UPDATE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);
193+
190194
/// Start the mgmt service with either type of socket
191195
pub fn start_mgmt(
192196
params: MgmtParams,
@@ -232,29 +236,56 @@ pub fn start_mgmt(
232236
} else {
233237
debug!("Will start watching k8s for configuration changes");
234238
rt.block_on(async {
239+
let k8s_client = Arc::new(K8sClient::new(params.hostname.as_str()));
235240
let (processor, tx) = ConfigProcessor::new(params.processor_params);
236-
let processor_handle = tokio::spawn(async { processor.run().await });
237-
let k8s_handle = tokio::spawn(async move { k8s_start_client(params.hostname.as_str(), tx).await });
238-
tokio::select! {
239-
result = processor_handle => {
240-
match result {
241-
Ok(_) => {
242-
error!("Configuration processor task exited unexpectedly");
243-
Err(LaunchError::PrematureProcessorExit)?
241+
let tx1 = tx.clone();
242+
let k8s_client1 = k8s_client.clone();
243+
244+
k8s_client.init().await.map_err(|e| {
245+
error!("Failed to initialize k8s state: {}", e);
246+
LaunchError::K8sClientError(e)
247+
})?;
248+
let mut processor_handle = Some(tokio::spawn(async { processor.run().await }));
249+
let mut k8s_config_handle = Some(tokio::spawn(async move { k8s_client.k8s_start_config_watch(tx).await }));
250+
let mut k8s_status_handle = Some(tokio::spawn(async move {
251+
k8s_client1.k8s_start_status_update(tx1, &STATUS_UPDATE_INTERVAL).await
252+
}));
253+
loop {
254+
tokio::select! {
255+
Some(result) = OptionFuture::from(processor_handle.as_mut()) => {
256+
match result {
257+
Ok(_) => {
258+
error!("Configuration processor task exited unexpectedly");
259+
Err(LaunchError::PrematureProcessorExit)?
260+
}
261+
Err(e) => { Err::<(), LaunchError>(LaunchError::ProcessorJoinError(e)) }
244262
}
245-
Err(e) => { Err::<(), LaunchError>(LaunchError::ProcessorJoinError(e)) }
246263
}
247-
}
248-
result = k8s_handle => {
249-
match result {
250-
Ok(result) => { result.inspect_err(|e| error!("K8s client task failed: {e}")).map_err(LaunchError::K8sClientError)?;
251-
error!("Kubernetes client task exited unexpectedly");
252-
Err(LaunchError::PrematureK8sClientExit)?
264+
Some(result) = OptionFuture::from(k8s_config_handle.as_mut()) => {
265+
match result {
266+
Ok(result) => { result.inspect_err(|e| error!("K8s config watch task failed: {e}")).map_err(LaunchError::K8sClientError)?;
267+
error!("Kubernetes config watch task exited unexpectedly");
268+
Err(LaunchError::PrematureK8sClientExit)?
269+
}
270+
Err(e) => { Err(LaunchError::K8sClientJoinError(e))? }
253271
}
254-
Err(e) => { Err(LaunchError::K8sClientJoinError(e))? }
255272
}
273+
Some(result) = OptionFuture::from(k8s_status_handle.as_mut()) => {
274+
k8s_status_handle = None;
275+
match result {
276+
Ok(result) => { result.inspect_err(|e| error!("K8s status update task failed: {e}")).map_err(LaunchError::K8sClientError)?;
277+
error!("Kubernetes status update task exited unexpectedly");
278+
Err(LaunchError::PrematureK8sClientExit)?
279+
}
280+
Err(e) => { Err(LaunchError::K8sClientJoinError(e))? }
281+
}
282+
}
283+
}?;
284+
285+
if processor_handle.is_none() && k8s_config_handle.is_none() && k8s_status_handle.is_none() {
286+
break;
256287
}
257-
}?;
288+
}
258289
Ok(())
259290
})
260291
}

0 commit comments

Comments
 (0)