Skip to content

Commit 9ef72be

Browse files
committed
feat(mgmt): Add task to periodically patch status for gateway
Adds a tokio task to periodically update the status directly with the k8s API. Also patches the status whenever a new config generation is applied. Also fixes an issue where the same config generation was repeatedly applied. Signed-off-by: Manish Vachharajani <[email protected]>
1 parent e0eba8d commit 9ef72be

File tree

2 files changed

+243
-55
lines changed

2 files changed

+243
-55
lines changed

mgmt/src/processor/k8s_client.rs

Lines changed: 195 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,215 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// Copyright Open Network Fabric Authors
33

4+
use chrono::{TimeZone, Utc};
5+
use config::GenId;
6+
use config::converters::k8s::ToK8sConversionError;
47
use tokio::sync::mpsc::Sender;
58

6-
use config::{ExternalConfig, GwConfig};
7-
use k8s_intf::client::WatchError;
8-
use k8s_intf::watch_gateway_agent_crd;
9-
use tracing::error;
9+
use config::converters::k8s::status::dataplane_status::DataplaneStatusForK8sConversion;
10+
use config::{ExternalConfig, GwConfig, internal::status::DataplaneStatus};
11+
use k8s_intf::client::{PatchError, WatchError, patch_gateway_status, watch_gateway_agent_crd};
12+
use k8s_intf::gateway_agent_crd::GatewayAgentStatus;
13+
use tracing::{debug, error};
1014

1115
use crate::processor::proc::{ConfigChannelRequest, ConfigRequest, ConfigResponse};
1216

1317
#[derive(Debug, thiserror::Error)]
1418
pub enum K8sClientError {
1519
#[error("K8s client exited early")]
1620
EarlyTermination,
17-
#[error("K8s client could not get hostname: {0}")]
18-
HostnameError(#[from] std::io::Error),
1921
#[error("K8s watch failed: {0}")]
2022
WatchError(#[from] WatchError),
23+
#[error("Failed to convert dataplane status to k8s format: {0}")]
24+
StatusConversionError(#[from] ToK8sConversionError),
25+
#[error("Failed to patch k8s gateway status: {0}")]
26+
PatchStatusError(#[from] PatchError),
2127
}
2228

23-
pub async fn k8s_start_client(
24-
hostname: &str,
25-
tx: Sender<ConfigChannelRequest>,
26-
) -> Result<(), K8sClientError> {
27-
watch_gateway_agent_crd(hostname, async move |ga| {
28-
let external_config = ExternalConfig::try_from(ga);
29-
match external_config {
30-
Ok(external_config) => {
31-
let gw_config = Box::new(GwConfig::new(external_config));
32-
33-
let (req, rx) = ConfigChannelRequest::new(ConfigRequest::ApplyConfig(gw_config));
34-
let tx_result = tx.send(req).await;
35-
if let Err(e) = tx_result {
36-
error!("Failure sending request to config processor: {e}");
37-
}
38-
match rx.await {
39-
Err(e) => error!("Failure receiving from config processor: {e}"),
40-
Ok(response) => match response {
41-
ConfigResponse::ApplyConfig(Err(e)) => {
42-
error!("Failed to apply config: {e}");
29+
async fn get_dataplane_status(
30+
tx: &Sender<ConfigChannelRequest>,
31+
) -> Result<DataplaneStatus, MgmtStatusError> {
32+
let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetDataplaneStatus);
33+
tx.send(req).await.map_err(|_| {
34+
MgmtStatusError::FetchStatusError("Failure relaying status fetch request".to_string())
35+
})?;
36+
let response = rx.await.map_err(|_| {
37+
MgmtStatusError::FetchStatusError(
38+
"Failure receiving status from config processor".to_string(),
39+
)
40+
})?;
41+
42+
match response {
43+
ConfigResponse::GetDataplaneStatus(status) => Ok(*status),
44+
_ => unreachable!(),
45+
}
46+
}
47+
48+
async fn get_current_config_generation(
49+
tx: &Sender<ConfigChannelRequest>,
50+
) -> Result<GenId, MgmtStatusError> {
51+
let (req, rx) = ConfigChannelRequest::new(ConfigRequest::GetGeneration);
52+
tx.send(req).await.map_err(|_| {
53+
MgmtStatusError::FetchStatusError("Failure relaying get generation request".to_string())
54+
})?;
55+
let response = rx.await.map_err(|_| {
56+
MgmtStatusError::FetchStatusError(
57+
"Failure receiving config generation from processor".to_string(),
58+
)
59+
})?;
60+
match response {
61+
ConfigResponse::GetGeneration(opt_genid) => {
62+
opt_genid.ok_or(MgmtStatusError::NoConfigApplied)
63+
}
64+
_ => unreachable!(),
65+
}
66+
}
67+
68+
#[derive(Debug, thiserror::Error)]
69+
enum MgmtStatusError {
70+
#[error("Failed to fetch dataplane status: {0}")]
71+
FetchStatusError(String),
72+
#[error("No config is currently applied")]
73+
NoConfigApplied,
74+
}
75+
76+
pub struct K8sClient {
77+
hostname: String,
78+
}
79+
80+
impl K8sClient {
81+
pub fn new(hostname: &str) -> Self {
82+
Self {
83+
hostname: hostname.to_string(),
84+
}
85+
}
86+
87+
pub async fn init(&self) -> Result<(), K8sClientError> {
88+
// Reset the config generation and applied time in K8s
89+
patch_gateway_status(
90+
&self.hostname,
91+
&GatewayAgentStatus {
92+
agent_version: Some("(none: agentless)".to_string()),
93+
last_applied_gen: Some(0),
94+
last_applied_time: Some(
95+
Utc.timestamp_opt(0, 0)
96+
.unwrap()
97+
.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true),
98+
),
99+
state: None,
100+
},
101+
)
102+
.await?;
103+
Ok(())
104+
}
105+
106+
pub async fn k8s_start_config_watch(
107+
&self,
108+
tx: Sender<ConfigChannelRequest>,
109+
) -> Result<(), K8sClientError> {
110+
// Clone this here so that the closure does not try to borrow self
111+
// and cause K8sClient to not be Send for 'static but only a specific
112+
// lifetime
113+
let hostname = self.hostname.clone();
114+
watch_gateway_agent_crd(&hostname.clone(), async move |ga| {
115+
let external_config = ExternalConfig::try_from(ga);
116+
match external_config {
117+
Ok(external_config) => {
118+
let genid = external_config.genid;
119+
let current_genid = match get_current_config_generation(&tx).await {
120+
Ok(id) => id,
121+
Err(e) => match e {
122+
MgmtStatusError::NoConfigApplied => 0,
123+
_ => {
124+
error!("Failed to get current config generation: {e}");
125+
return;
126+
}
43127
}
44-
ConfigResponse::ApplyConfig(Ok(())) => {}
45-
_ => unreachable!(),
46-
},
47-
};
48-
}
49-
Err(e) => {
50-
error!("Failed to convert K8sGatewayAgent to ExternalConfig: {e}");
128+
};
129+
if current_genid == genid {
130+
debug!("Not applying config, configuration generation unchanged (old={current_genid}, new={genid})");
131+
return;
132+
}
133+
134+
let gw_config = Box::new(GwConfig::new(external_config));
135+
136+
let (req, rx) =
137+
ConfigChannelRequest::new(ConfigRequest::ApplyConfig(gw_config));
138+
let tx_result = tx.send(req).await;
139+
if let Err(e) = tx_result {
140+
error!("Failure sending request to config processor: {e}");
141+
}
142+
match rx.await {
143+
Err(e) => error!("Failure receiving from config processor: {e}"),
144+
Ok(response) => match response {
145+
ConfigResponse::ApplyConfig(Err(e)) => {
146+
error!("Failed to apply config: {e}");
147+
}
148+
ConfigResponse::ApplyConfig(Ok(())) => {
149+
let last_applied_time = Some(chrono::Utc::now());
150+
let k8s_status = match GatewayAgentStatus::try_from(
151+
&DataplaneStatusForK8sConversion {
152+
last_applied_gen: Some(genid),
153+
last_applied_time: last_applied_time.as_ref(),
154+
last_collected_time: None,
155+
status: None,
156+
},
157+
) {
158+
Ok(v) => Some(v),
159+
Err(e) => { error!("Unable to build object to patch k8s status with applied generation: {e}"); None }
160+
161+
};
162+
163+
if let Some(k8s_status) = k8s_status {
164+
match patch_gateway_status(&hostname, &k8s_status).await {
165+
Ok(()) => {},
166+
Err(e) => {error!("Unable to patch k8s last_applied_gen and timestamp: {e}"); }
167+
}
168+
}
169+
}
170+
_ => unreachable!(),
171+
},
172+
};
173+
}
174+
Err(e) => {
175+
error!("Failed to convert K8sGatewayAgent to ExternalConfig: {e}");
176+
}
51177
}
178+
})
179+
.await?;
180+
Err(K8sClientError::EarlyTermination)
181+
}
182+
183+
pub async fn k8s_start_status_update(
184+
&self,
185+
tx: Sender<ConfigChannelRequest>,
186+
status_update_interval: &std::time::Duration,
187+
) -> Result<(), K8sClientError> {
188+
// Clone this here so that the closure does not try to borrow self
189+
// and cause K8sClient to not be Send for 'static but only a specific
190+
// lifetime
191+
let hostname = self.hostname.clone();
192+
loop {
193+
let status = get_dataplane_status(&tx).await;
194+
195+
let status = match status {
196+
Ok(status) => status,
197+
Err(err) => {
198+
error!("Failed to fetch dataplane status: {}", err);
199+
continue;
200+
}
201+
};
202+
203+
let k8s_status = GatewayAgentStatus::try_from(&DataplaneStatusForK8sConversion {
204+
last_applied_gen: None,
205+
last_applied_time: None,
206+
last_collected_time: Some(&chrono::Utc::now()),
207+
status: Some(&status),
208+
})?;
209+
patch_gateway_status(&hostname, &k8s_status).await?;
210+
211+
// Process status update
212+
tokio::time::sleep(*status_update_interval).await;
52213
}
53-
})
54-
.await?;
55-
Err(K8sClientError::EarlyTermination)
214+
}
56215
}

mgmt/src/processor/launch.rs

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// Copyright Open Network Fabric Authors
33

4-
use crate::processor::k8s_client::K8sClientError;
5-
use crate::processor::k8s_client::k8s_start_client;
4+
use crate::processor::k8s_client::{K8sClient, K8sClientError};
65
use crate::processor::proc::ConfigChannelRequest;
76
use crate::processor::proc::ConfigProcessor;
87

@@ -19,7 +18,10 @@ use tokio::sync::mpsc::Sender;
1918
use tokio_stream::Stream;
2019
use tonic::transport::Server;
2120

21+
use futures::future::OptionFuture;
22+
2223
use args::GrpcAddress;
24+
use concurrency::sync::Arc;
2325
use tracing::{debug, error, info, warn};
2426

2527
use crate::grpc::server::create_config_service;
@@ -187,6 +189,8 @@ pub struct MgmtParams {
187189
pub processor_params: ConfigProcessorParams,
188190
}
189191

192+
const STATUS_UPDATE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);
193+
190194
/// Start the mgmt service with either type of socket
191195
pub fn start_mgmt(
192196
params: MgmtParams,
@@ -232,29 +236,54 @@ pub fn start_mgmt(
232236
} else {
233237
debug!("Will start watching k8s for configuration changes");
234238
rt.block_on(async {
239+
let k8s_client = Arc::new(K8sClient::new(params.hostname.as_str()));
235240
let (processor, tx) = ConfigProcessor::new(params.processor_params);
236-
let processor_handle = tokio::spawn(async { processor.run().await });
237-
let k8s_handle = tokio::spawn(async move { k8s_start_client(params.hostname.as_str(), tx).await });
238-
tokio::select! {
239-
result = processor_handle => {
240-
match result {
241-
Ok(_) => {
242-
error!("Configuration processor task exited unexpectedly");
243-
Err(LaunchError::PrematureProcessorExit)?
241+
let tx1 = tx.clone();
242+
let k8s_client1 = k8s_client.clone();
243+
244+
k8s_client.init().await.map_err(|e| {
245+
error!("Failed to initialize k8s state: {}", e);
246+
LaunchError::K8sClientError(e)
247+
})?;
248+
let mut processor_handle = Some(tokio::spawn(async { processor.run().await }));
249+
let mut k8s_config_handle = Some(tokio::spawn(async move { k8s_client.k8s_start_config_watch(tx).await }));
250+
let mut k8s_status_handle = Some(tokio::spawn(async move { k8s_client1.k8s_start_status_update(tx1, &STATUS_UPDATE_INTERVAL).await }));
251+
loop {
252+
tokio::select! {
253+
Some(result) = OptionFuture::from(processor_handle.as_mut()) => {
254+
match result {
255+
Ok(_) => {
256+
error!("Configuration processor task exited unexpectedly");
257+
Err(LaunchError::PrematureProcessorExit)?
258+
}
259+
Err(e) => { Err::<(), LaunchError>(LaunchError::ProcessorJoinError(e)) }
244260
}
245-
Err(e) => { Err::<(), LaunchError>(LaunchError::ProcessorJoinError(e)) }
246261
}
247-
}
248-
result = k8s_handle => {
249-
match result {
250-
Ok(result) => { result.inspect_err(|e| error!("K8s client task failed: {e}")).map_err(LaunchError::K8sClientError)?;
251-
error!("Kubernetes client task exited unexpectedly");
252-
Err(LaunchError::PrematureK8sClientExit)?
262+
Some(result) = OptionFuture::from(k8s_config_handle.as_mut()) => {
263+
match result {
264+
Ok(result) => { result.inspect_err(|e| error!("K8s config watch task failed: {e}")).map_err(LaunchError::K8sClientError)?;
265+
error!("Kubernetes config watch task exited unexpectedly");
266+
Err(LaunchError::PrematureK8sClientExit)?
267+
}
268+
Err(e) => { Err(LaunchError::K8sClientJoinError(e))? }
253269
}
254-
Err(e) => { Err(LaunchError::K8sClientJoinError(e))? }
255270
}
271+
Some(result) = OptionFuture::from(k8s_status_handle.as_mut()) => {
272+
k8s_status_handle = None;
273+
match result {
274+
Ok(result) => { result.inspect_err(|e| error!("K8s status update task failed: {e}")).map_err(LaunchError::K8sClientError)?;
275+
error!("Kubernetes status update task exited unexpectedly");
276+
Err(LaunchError::PrematureK8sClientExit)?
277+
}
278+
Err(e) => { Err(LaunchError::K8sClientJoinError(e))? }
279+
}
280+
}
281+
}?;
282+
283+
if processor_handle.is_none() && k8s_config_handle.is_none() && k8s_status_handle.is_none() {
284+
break;
256285
}
257-
}?;
286+
}
258287
Ok(())
259288
})
260289
}

0 commit comments

Comments
 (0)