Skip to content

Commit

Permalink
Merge pull request #71 from gonzalezzfelipe/feat/autoscaler
Browse files Browse the repository at this point in the history
feat: Autoscaler
  • Loading branch information
scarmuega authored Nov 25, 2024
2 parents 2286425 + 690f16e commit ea68fc3
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions bootstrap/stage2/deployment.tf
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,26 @@ resource "kubernetes_deployment_v1" "operator" {
value = var.init_aws_secret_access_key
}

env {
name = "AUTOSCALER_DELAY"
value = "60"
}

env {
name = "AUTOSCALER_LOW_WATERMARK"
value = var.autoscaler_low_watermark
}

env {
name = "AUTOSCALER_HIGH_WATERMARK"
value = var.autoscaler_high_watermark
}

env {
name = "AUTOSCALER_REGION_PREFIX"
value = var.autoscaler_region_prefix
}

resources {
limits = {
cpu = var.resources.limits.cpu
Expand Down
14 changes: 14 additions & 0 deletions bootstrap/stage2/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,20 @@ variable "api_key" {
type = string
}

variable "autoscaler_region_prefix" {
type = string
}

variable "autoscaler_low_watermark" {
type = number
default = 1
}

variable "autoscaler_high_watermark" {
type = number
default = 5
}

variable "tolerations" {
type = list(object({
effect = string
Expand Down
1 change: 1 addition & 0 deletions crates/operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ lazy_static = "1.5.0"
tracing-subscriber = "0.3.18"
reqwest = "0.12.9"
prometheus-parse = "0.2.5"
rand = "0.8.5"
5 changes: 3 additions & 2 deletions crates/operator/src/bin/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tracing::{error, info, instrument};

use hydra_control_plane_operator::{
config::Config,
controller::{error_policy, patch_statuses, reconcile, K8sContext},
controller::{error_policy, patch_statuses, reconcile, run_autoscaler, K8sContext},
custom_resource::HydraDoomNode,
};

Expand Down Expand Up @@ -34,8 +34,9 @@ async fn main() -> Result<()> {
}
});
let patch_statuses_controller = patch_statuses(context.clone());
let autoscaler_controller = run_autoscaler(context.clone());

let _ = tokio::join!(controller, patch_statuses_controller);
let _ = tokio::join!(controller, patch_statuses_controller, autoscaler_controller);

Ok(())
}
23 changes: 22 additions & 1 deletion crates/operator/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use lazy_static::lazy_static;
use std::env;
use std::{env, time::Duration};

lazy_static! {
static ref CONTROLLER_CONFIG: Config = Config::from_env();
Expand Down Expand Up @@ -31,6 +31,11 @@ pub struct Config {
pub bucket: String,
pub init_aws_access_key_id: String,
pub init_aws_secret_access_key: String,
// Autoscaler
pub autoscaler_delay: Duration,
pub autoscaler_low_watermark: usize,
pub autoscaler_high_watermark: usize,
pub autoscaler_region_prefix: String,
}

impl Config {
Expand Down Expand Up @@ -60,6 +65,22 @@ impl Config {
.expect("Missing INIT_AWS_ACCESS_KEY_ID env var."),
init_aws_secret_access_key: env::var("INIT_AWS_SECRET_ACCESS_KEY")
.expect("Missing INIT_AWS_SECRET_ACCESS_KEY env var."),
autoscaler_delay: env::var("AUTOSCALER_DELAY")
.map(|duration| {
Duration::from_secs(duration.parse().expect("Failed to parse AUTOSCALER_DELAY"))
})
.expect("Missing AUTOSCALER_DELAY env var."),
autoscaler_high_watermark: env::var("AUTOSCALER_HIGH_WATERMARK")
.map(|x| {
x.parse()
.expect("Failed to parse AUTOSCALER_HIGH_WATERMARK")
})
.expect("Missing AUTOSCALER_HIGH_WATERMARK env var."),
autoscaler_low_watermark: env::var("AUTOSCALER_LOW_WATERMARK")
.map(|x| x.parse().expect("Failed to parse AUTOSCALER_LOW_WATERMARK"))
.expect("Missing AUTOSCALER_LOW_WATERMARK env var."),
autoscaler_region_prefix: env::var("AUTOSCALER_REGION_PREFIX")
.expect("Missing AUTOSCALER_REGION_PREFIX env var."),
}
}
}
111 changes: 102 additions & 9 deletions crates/operator/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,24 @@ use kube::{
runtime::controller::Action,
Api, Client, ResourceExt,
};
use rand::{distributions::Alphanumeric, Rng};
use serde_json::json;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use thiserror::Error;
use tracing::{error, info, warn};

use crate::{config::Config, custom_resource::HydraDoomNodeStatus};
pub fn random_name() -> String {
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(7)
.map(char::from)
.collect()
}

use crate::{
config::Config,
custom_resource::{HydraDoomNodeSpec, HydraDoomNodeStatus},
};

use super::custom_resource::{HydraDoomNode, HYDRA_DOOM_NODE_FINALIZER};

Expand Down Expand Up @@ -59,7 +71,7 @@ impl From<f64> for HydraDoomGameState {
1.0 => Self::Lobby,
2.0 => Self::Running,
3.0 => Self::Done,
_ => Self::Waiting,
_ => Self::Waiting,
}
}
}
Expand Down Expand Up @@ -385,13 +397,15 @@ impl K8sContext {
});

match (node_state, game_state, transactions) {
(Some(node_state), Some(game_state), Some(transactions)) => HydraDoomNodeStatus {
transactions,
node_state: node_state.into(),
game_state: game_state.into(),
local_url: self.get_internal_url(crd),
external_url: self.get_external_url(crd),
},
(Some(node_state), Some(game_state), Some(transactions)) => {
HydraDoomNodeStatus {
transactions,
node_state: node_state.into(),
game_state: game_state.into(),
local_url: self.get_internal_url(crd),
external_url: self.get_external_url(crd),
}
}
_ => default,
}
}
Expand Down Expand Up @@ -455,6 +469,76 @@ impl K8sContext {

Ok(())
}

pub async fn deploy_node(&self) -> anyhow::Result<HydraDoomNode> {
info!("Deploying new node.");

// List available snapshots.
// Try move from available to used dir.
// If successful, start new node.
// If anything fails, at any point, deploy offline node.

let api: Api<HydraDoomNode> = Api::default_namespaced(self.client.clone());
let name = format!(
"{}{}{}",
self.config.autoscaler_region_prefix,
"0", // 1 for online, 0 for offline
random_name()
);
let new_node = HydraDoomNode {
spec: HydraDoomNodeSpec::default(),
status: None,
metadata: kube::api::ObjectMeta {
name: Some(name.clone()),
..Default::default()
},
};
// Create or patch the deployment
api.patch(
&name,
&PatchParams::apply("hydra-doom-pod-controller"),
&Patch::Apply(&new_node),
)
.await
.map_err(|err| {
error!(err = err.to_string(), "Failed to create new node.");
err.into()
})
}

pub async fn remove_node(&self, crd: &HydraDoomNode) -> anyhow::Result<()> {
info!("Removing node: {}", crd.name_any());
todo!()
}

pub async fn scale(&self) -> anyhow::Result<()> {
let api: Api<HydraDoomNode> = Api::default_namespaced(self.client.clone());
let crds = api.list(&ListParams::default()).await?;

let mut available_hydra_nodes: Vec<HydraDoomNode> = crds
.into_iter()
.filter(|crd| match &crd.status {
Some(status) => status.game_state == String::from(HydraDoomGameState::Waiting),
None => false,
})
.collect();

if available_hydra_nodes.len() < self.config.autoscaler_low_watermark {
let amount = available_hydra_nodes.len() - self.config.autoscaler_low_watermark;
// One after the other to avoid race conditions.
for _ in 0..amount {
self.deploy_node().await?;
}
} else if available_hydra_nodes.len() > self.config.autoscaler_high_watermark {
while available_hydra_nodes.len() > self.config.autoscaler_high_watermark {
// High watermark will never be < 1.
self.remove_node(&available_hydra_nodes.pop().unwrap())
.await?;
}
}

Ok(())
}
}

pub async fn patch_statuses(context: Arc<K8sContext>) -> Result<()> {
Expand All @@ -466,6 +550,15 @@ pub async fn patch_statuses(context: Arc<K8sContext>) -> Result<()> {
}
}

pub async fn run_autoscaler(context: Arc<K8sContext>) -> Result<()> {
info!("Running autoscaler loop.");

loop {
context.scale().await?;
tokio::time::sleep(context.config.autoscaler_delay).await;
}
}

// Auxiliary error value because K8s controller api doesnt go along with anyhow.
#[derive(Debug, Error)]
pub enum Error {
Expand Down
14 changes: 14 additions & 0 deletions crates/operator/src/custom_resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,20 @@ pub struct HydraDoomNodeSpec {
pub resources: Option<Resources>,
}

impl Default for HydraDoomNodeSpec {
fn default() -> Self {
Self {
offline: Some(true),
network_id: None,
seed_input: "_".to_string(),
commit_inputs: vec![],
start_chain_from: None,
asleep: None,
resources: None,
}
}
}

#[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct HydraDoomNodeStatus {
Expand Down
17 changes: 17 additions & 0 deletions playbook/doom-dev/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,20 @@ variable "frontend_replicas" {
default = 1
}

variable "autoscaler_low_watermark" {
type = number
default = 1
}

variable "autoscaler_low_watermark" {
type = number
default = 5
}

variable "autoscaler_region_prefix" {
type = string
}

provider "kubernetes" {
config_path = "~/.kube/config"
config_context = var.eks_cluster_arn
Expand Down Expand Up @@ -142,4 +156,7 @@ module "stage2" {
init_image = "ghcr.io/demeter-run/doom-patrol-init:b7b4fc499b5274cd71b6b72f93ab4ba8199437fe"
frontend_image = var.frontend_image
frontend_replicas = var.frontend_replicas
autoscaler_high_watermark = var.autoscaler_high_watermark
autoscaler_low_watermark = var.autoscaler_low_watermark
autoscaler_region_prefix = var.autoscaler_region_prefix
}

0 comments on commit ea68fc3

Please sign in to comment.