Skip to content

Commit

Permalink
Merge pull request #75 from gonzalezzfelipe/chore/implement-scale-down
Browse files Browse the repository at this point in the history
chore: Implement scale down
  • Loading branch information
scarmuega authored Nov 25, 2024
2 parents 89acc3f + a9fcfb6 commit bca15a9
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 111 deletions.
134 changes: 27 additions & 107 deletions crates/operator/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ use kube::{
};
use rand::{distributions::Alphanumeric, Rng};
use serde_json::json;
use std::{cmp::min, collections::BTreeMap, sync::Arc, time::Duration};
use std::{
cmp::{min, Ordering},
collections::BTreeMap,
sync::Arc,
time::Duration,
};
use thiserror::Error;
use tracing::{error, info, warn};

Expand All @@ -28,7 +33,7 @@ use crate::{
custom_resource::{HydraDoomNodeSpec, HydraDoomNodeStatus},
};

use super::custom_resource::{HydraDoomNode, HYDRA_DOOM_NODE_FINALIZER};
use super::custom_resource::HydraDoomNode;

pub enum HydraDoomNodeState {
Offline,
Expand Down Expand Up @@ -179,47 +184,14 @@ impl K8sContext {
self.patch_service(crd),
self.patch_ingress(crd),
self.patch_configmap(crd),
self.patch_crd(crd)
) {
(Ok(_), Ok(_), Ok(_), Ok(_), Ok(_)) => (),
(Ok(_), Ok(_), Ok(_), Ok(_)) => (),
_ => bail!("Failed to apply patch for components."),
};

Ok(())
}

pub async fn delete(&self, crd: &HydraDoomNode) -> anyhow::Result<()> {
match tokio::join!(
self.remove_deployment(crd),
self.remove_service(crd),
self.remove_ingress(crd),
self.remove_configmap(crd),
) {
(Ok(_), Ok(_), Ok(_), Ok(_)) => Ok(()),
_ => bail!("Failed to remove resources"),
}
}

async fn patch_crd(&self, crd: &HydraDoomNode) -> anyhow::Result<HydraDoomNode> {
let api: Api<HydraDoomNode> =
Api::namespaced(self.client.clone(), &crd.namespace().unwrap());

api.patch(
&crd.name_any(),
&PatchParams::default(),
&Patch::Merge(json!({
"metadata": {
"finalizers": [HYDRA_DOOM_NODE_FINALIZER]
}
})),
)
.await
.map_err(|err| {
error!(err = err.to_string(), "Failed to patch CRD.");
anyhow::Error::from(err)
})
}

async fn patch_configmap(&self, crd: &HydraDoomNode) -> anyhow::Result<ConfigMap> {
let api: Api<ConfigMap> = Api::namespaced(self.client.clone(), &crd.namespace().unwrap());

Expand All @@ -236,17 +208,6 @@ impl K8sContext {
})
}

async fn remove_configmap(&self, crd: &HydraDoomNode) -> anyhow::Result<()> {
let api: Api<ConfigMap> = Api::namespaced(self.client.clone(), &crd.namespace().unwrap());
match api
.delete(&crd.internal_name(), &DeleteParams::default())
.await
{
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}

async fn patch_deployment(&self, crd: &HydraDoomNode) -> anyhow::Result<Deployment> {
let api: Api<Deployment> = Api::namespaced(self.client.clone(), &crd.namespace().unwrap());

Expand All @@ -263,16 +224,6 @@ impl K8sContext {
})
}

async fn remove_deployment(&self, crd: &HydraDoomNode) -> anyhow::Result<()> {
let api: Api<Deployment> = Api::namespaced(self.client.clone(), &crd.namespace().unwrap());
let dp = DeleteParams::default();

match api.delete(&crd.internal_name(), &dp).await {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}

async fn patch_service(&self, crd: &HydraDoomNode) -> anyhow::Result<Service> {
// Apply the service to the cluster
let services: Api<Service> =
Expand All @@ -290,16 +241,6 @@ impl K8sContext {
})
}

async fn remove_service(&self, crd: &HydraDoomNode) -> anyhow::Result<()> {
let services: Api<Service> =
Api::namespaced(self.client.clone(), &crd.namespace().unwrap());
let dp = DeleteParams::default();
match services.delete(&crd.internal_name(), &dp).await {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}

async fn patch_ingress(&self, crd: &HydraDoomNode) -> anyhow::Result<Ingress> {
// Apply the service to the cluster
let api: Api<Ingress> = Api::namespaced(self.client.clone(), &crd.namespace().unwrap());
Expand All @@ -315,15 +256,6 @@ impl K8sContext {
})
}

async fn remove_ingress(&self, crd: &HydraDoomNode) -> anyhow::Result<()> {
let api: Api<Ingress> = Api::namespaced(self.client.clone(), &crd.namespace().unwrap());
let dp = DeleteParams::default();
match api.delete(&crd.internal_name(), &dp).await {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}

fn get_internal_url(&self, crd: &HydraDoomNode) -> String {
format!("ws://{}:{}", crd.internal_host(), self.constants.port)
}
Expand Down Expand Up @@ -507,7 +439,12 @@ impl K8sContext {

pub async fn remove_node(&self, crd: &HydraDoomNode) -> anyhow::Result<()> {
info!("Removing node: {}", crd.name_any());
todo!()
let api: Api<HydraDoomNode> = Api::default_namespaced(self.client.clone());
let dp = DeleteParams::default();
match api.delete(&crd.name_any(), &dp).await {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}

pub async fn scale(&self) -> anyhow::Result<()> {
Expand All @@ -521,6 +458,19 @@ impl K8sContext {
None => false,
})
.collect();

// Sorted for LIFO
available_hydra_nodes.sort_by(|a, b| {
match (
&a.metadata.creation_timestamp,
&b.metadata.creation_timestamp,
) {
(Some(a), Some(b)) => a.cmp(b),
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(None, None) => Ordering::Equal,
}
});
info!(
"Amount of nodes in waiting state: {}",
available_hydra_nodes.len()
Expand Down Expand Up @@ -594,36 +544,6 @@ type Result<T, E = Error> = std::result::Result<T, E>;

pub async fn reconcile(crd: Arc<HydraDoomNode>, ctx: Arc<K8sContext>) -> Result<Action, Error> {
tracing::info!("Reconciling {}", crd.name_any());
// Check if deletion timestamp is set
if crd.metadata.deletion_timestamp.is_some() {
let hydra_doom_pod_api: Api<HydraDoomNode> =
Api::namespaced(ctx.client.clone(), &crd.namespace().unwrap());
// Finalizer logic for cleanup
if crd
.finalizers()
.contains(&HYDRA_DOOM_NODE_FINALIZER.to_string())
{
// Delete associated resources
ctx.delete(&crd).await?;
// Remove finalizer
let patch = json!({
"metadata": {
"finalizers": crd.finalizers().iter().filter(|f| *f != HYDRA_DOOM_NODE_FINALIZER).collect::<Vec<_>>()
}
});
let _ = hydra_doom_pod_api
.patch(
&crd.name_any(),
&PatchParams::default(),
&Patch::Merge(&patch),
)
.await
.map_err(anyhow::Error::from)?;
}
return Ok(Action::await_change());
}

// Ensure finalizer is set
ctx.patch(&crd).await?;
Ok(Action::await_change())
}
Expand Down
19 changes: 15 additions & 4 deletions crates/operator/src/custom_resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use k8s_openapi::{
IngressServiceBackend, IngressSpec, ServiceBackendPort,
},
},
apimachinery::pkg::api::resource::Quantity,
apimachinery::pkg::{api::resource::Quantity, apis::meta::v1::OwnerReference},
};
use kube::{api::ObjectMeta, CustomResource, ResourceExt};
use kube::{api::ObjectMeta, CustomResource, Resource, ResourceExt};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
Expand All @@ -22,8 +22,6 @@ use crate::config::Config;

use super::controller::K8sConstants;

pub static HYDRA_DOOM_NODE_FINALIZER: &str = "hydradoomnode/finalizer";

#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct ResourcesInner {
pub cpu: String,
Expand Down Expand Up @@ -149,6 +147,15 @@ impl HydraDoomNode {
])
}

pub fn owner_references(&self) -> Vec<OwnerReference> {
vec![OwnerReference {
api_version: HydraDoomNode::api_version(&()).to_string(),
kind: HydraDoomNode::kind(&()).to_string(),
name: self.name_any(),
..Default::default()
}]
}

pub fn internal_host(&self) -> String {
format!(
"{}.{}.svc.cluster.local",
Expand All @@ -167,6 +174,7 @@ impl HydraDoomNode {
ConfigMap {
metadata: ObjectMeta {
name: Some(name),
owner_references: Some(self.owner_references()),
..Default::default()
},
data: Some(BTreeMap::from([(
Expand Down Expand Up @@ -354,6 +362,7 @@ impl HydraDoomNode {
Deployment {
metadata: ObjectMeta {
name: Some(name.clone()),
owner_references: Some(self.owner_references()),
..Default::default()
},
spec: Some(DeploymentSpec {
Expand Down Expand Up @@ -462,6 +471,7 @@ impl HydraDoomNode {
Service {
metadata: ObjectMeta {
name: Some(name),
owner_references: Some(self.owner_references()),
..Default::default()
},
spec: Some(ServiceSpec {
Expand Down Expand Up @@ -502,6 +512,7 @@ impl HydraDoomNode {
Ingress {
metadata: ObjectMeta {
name: Some(name.clone()),
owner_references: Some(self.owner_references()),
annotations: Some(constants.ingress_annotations.clone()),
..Default::default()
},
Expand Down

0 comments on commit bca15a9

Please sign in to comment.