From b058246505d3f496bd79de8070dd00aa3c7c317d Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 3 Jul 2023 13:38:12 +0200 Subject: [PATCH 1/5] feat: Support podOverrides --- CHANGELOG.md | 2 + .../operator-binary/src/airflow_controller.rs | 96 ++++++++++--------- 2 files changed, 53 insertions(+), 45 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 573e96fb..ed99bbe4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - Support Airflow `2.6.1` ([#284]). - Set explicit resources on all containers ([#289]) - Operator errors out when credentialsSecret is missing ([#293]). +- Support podOverrides ([#295]). ### Changed @@ -28,6 +29,7 @@ [#289]: https://github.com/stackabletech/airflow-operator/pull/289 [#291]: https://github.com/stackabletech/airflow-operator/pull/291 [#293]: https://github.com/stackabletech/airflow-operator/pull/293 +[#295]: https://github.com/stackabletech/airflow-operator/pull/295 ## [23.4.0] - 2023-04-17 diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index 68994a6d..b0f13821 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -1,5 +1,6 @@ //! Ensures that `Pod`s are configured and running for each [`AirflowCluster`] use stackable_operator::builder::resources::ResourceRequirementsBuilder; +use stackable_operator::k8s_openapi::DeepMerge; use crate::config::{self, PYTHON_IMPORTS}; use crate::controller_commons::{ @@ -615,19 +616,41 @@ fn build_server_rolegroup_statefulset( let rolegroup = role.role_groups.get(&rolegroup_ref.role_group); - // initialising commands let commands = airflow_role.get_commands(); - // container - let mut cb = ContainerBuilder::new(&Container::Airflow.to_string()) - .context(InvalidContainerNameSnafu)?; let mut pb = PodBuilder::new(); + pb.metadata_builder(|m| { + m.with_recommended_labels(build_recommended_labels( + airflow, + AIRFLOW_CONTROLLER_NAME, + &resolved_product_image.app_version_label, + &rolegroup_ref.role, + &rolegroup_ref.role_group, + )) + }) + .image_pull_secrets_from_product_image(resolved_product_image) + .affinity(&config.affinity) + .service_account_name(sa_name) + .security_context( + PodSecurityContextBuilder::new() + .run_as_user(AIRFLOW_UID) + .run_as_group(0) + .fs_group(1000) // Needed for secret-operator + .build(), + ); + + let mut airflow_container = ContainerBuilder::new(&Container::Airflow.to_string()) + .context(InvalidContainerNameSnafu)?; if let Some(authentication_class) = authentication_class { - add_authentication_volumes_and_volume_mounts(authentication_class, &mut cb, &mut pb)?; + add_authentication_volumes_and_volume_mounts( + authentication_class, + &mut airflow_container, + &mut pb, + )?; } - let cb = cb + airflow_container .image_from_product_image(resolved_product_image) .resources(config.resources.clone().into()) .command(vec!["/bin/bash".to_string()]) @@ -648,15 +671,15 @@ fn build_server_rolegroup_statefulset( // mapped environment variables let env_mapped = build_mapped_envs(airflow, rolegroup_config); - cb.add_env_vars(env_config); - cb.add_env_vars(env_mapped); - cb.add_env_vars(build_static_envs()); + airflow_container.add_env_vars(env_config); + airflow_container.add_env_vars(env_mapped); + airflow_container.add_env_vars(build_static_envs()); let volume_mounts = airflow.volume_mounts(); - cb.add_volume_mounts(volume_mounts); - cb.add_volume_mount(CONFIG_VOLUME_NAME, CONFIG_PATH); - cb.add_volume_mount(LOG_CONFIG_VOLUME_NAME, LOG_CONFIG_DIR); - cb.add_volume_mount(LOG_VOLUME_NAME, STACKABLE_LOG_DIR); + airflow_container.add_volume_mounts(volume_mounts); + airflow_container.add_volume_mount(CONFIG_VOLUME_NAME, CONFIG_PATH); + airflow_container.add_volume_mount(LOG_CONFIG_VOLUME_NAME, LOG_CONFIG_DIR); + airflow_container.add_volume_mount(LOG_VOLUME_NAME, STACKABLE_LOG_DIR); if let Some(resolved_port) = airflow_role.get_http_port() { let probe = Probe { @@ -668,12 +691,12 @@ fn build_server_rolegroup_statefulset( period_seconds: Some(5), ..Probe::default() }; - cb.readiness_probe(probe.clone()); - cb.liveness_probe(probe); - cb.add_container_port("http", resolved_port.into()); + airflow_container.readiness_probe(probe.clone()); + airflow_container.liveness_probe(probe); + airflow_container.add_container_port("http", resolved_port.into()); } - let container = cb.build(); + pb.add_container(airflow_container.build()); let metrics_container = ContainerBuilder::new("metrics") .context(InvalidContainerNameSnafu)? @@ -690,16 +713,14 @@ fn build_server_rolegroup_statefulset( .build(), ) .build(); + pb.add_container(metrics_container); - let mut volumes = airflow.volumes(); - volumes.extend(controller_commons::create_volumes( + pb.add_volumes(airflow.volumes()); + pb.add_volumes(controller_commons::create_volumes( &rolegroup_ref.object_name(), config.logging.containers.get(&Container::Airflow), )); - pb.add_container(container); - pb.add_container(metrics_container); - if let Some(gitsync) = airflow.git_sync() { let gitsync_container = ContainerBuilder::new(&format!("{}-{}", GIT_SYNC_NAME, 1)) .context(InvalidContainerNameSnafu)? @@ -718,7 +739,7 @@ fn build_server_rolegroup_statefulset( ) .build(); - volumes.push( + pb.add_volume( VolumeBuilder::new(GIT_CONTENT) .empty_dir(EmptyDirVolumeSource::default()) .build(), @@ -741,6 +762,12 @@ fn build_server_rolegroup_statefulset( )); } + let mut pod_template = pb.build_template(); + pod_template.merge_from(role.config.pod_overrides.clone()); + if let Some(rolegroup) = rolegroup { + pod_template.merge_from(rolegroup.config.pod_overrides.clone()); + } + Ok(StatefulSet { metadata: ObjectMetaBuilder::new() .name_and_namespace(airflow) @@ -769,28 +796,7 @@ fn build_server_rolegroup_statefulset( ..LabelSelector::default() }, service_name: rolegroup_ref.object_name(), - template: pb - .metadata_builder(|m| { - m.with_recommended_labels(build_recommended_labels( - airflow, - AIRFLOW_CONTROLLER_NAME, - &resolved_product_image.app_version_label, - &rolegroup_ref.role, - &rolegroup_ref.role_group, - )) - }) - .image_pull_secrets_from_product_image(resolved_product_image) - .add_volumes(volumes) - .affinity(&config.affinity) - .service_account_name(sa_name) - .security_context( - PodSecurityContextBuilder::new() - .run_as_user(AIRFLOW_UID) - .run_as_group(0) - .fs_group(1000) // Needed for secret-operator - .build(), - ) - .build_template(), + template: pod_template, ..StatefulSetSpec::default() }), status: None, From 1d1b7a79e63ca766e11488fd4b25bfa6b02ec820 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 3 Jul 2023 14:28:11 +0200 Subject: [PATCH 2/5] pass throurg role --- rust/operator-binary/src/airflow_controller.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index b0f13821..afb0d18c 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -371,6 +371,7 @@ pub async fn reconcile_airflow(airflow: Arc, ctx: Arc) -> R let rg_statefulset = build_server_rolegroup_statefulset( &airflow, &resolved_product_image, + &airflow_role, &rolegroup, rolegroup_config, authentication_class.as_ref(), @@ -599,16 +600,17 @@ fn build_rolegroup_service( /// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator. /// /// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding [`Service`] (from [`build_rolegroup_service`]). +#[allow(clippy::too_many_arguments)] fn build_server_rolegroup_statefulset( airflow: &AirflowCluster, resolved_product_image: &ResolvedProductImage, + airflow_role: &AirflowRole, rolegroup_ref: &RoleGroupRef, rolegroup_config: &HashMap>, authentication_class: Option<&AuthenticationClass>, sa_name: &str, config: &AirflowConfig, ) -> Result { - let airflow_role = AirflowRole::from_str(&rolegroup_ref.role).unwrap(); let role = airflow .get_role(airflow_role.clone()) .as_ref() From e5d8ad1ee1588300bd8bfcaa3b40c9fcd1568055 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 3 Jul 2023 14:29:58 +0200 Subject: [PATCH 3/5] fixup --- rust/crd/src/lib.rs | 2 +- rust/operator-binary/src/airflow_controller.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index 101f8232..27760c8e 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -377,7 +377,7 @@ impl AirflowRole { } impl AirflowCluster { - pub fn get_role(&self, role: AirflowRole) -> &Option> { + pub fn get_role(&self, role: &AirflowRole) -> &Option> { match role { AirflowRole::Webserver => &self.spec.webservers, AirflowRole::Scheduler => &self.spec.schedulers, diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index afb0d18c..64f4a835 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -238,7 +238,7 @@ pub async fn reconcile_airflow(airflow: Arc, ctx: Arc) -> R let mut roles = HashMap::new(); for role in AirflowRole::iter() { - if let Some(resolved_role) = airflow.get_role(role.clone()).clone() { + if let Some(resolved_role) = airflow.get_role(&role).clone() { roles.insert( role.to_string(), ( @@ -612,7 +612,7 @@ fn build_server_rolegroup_statefulset( config: &AirflowConfig, ) -> Result { let role = airflow - .get_role(airflow_role.clone()) + .get_role(airflow_role) .as_ref() .context(NoAirflowRoleSnafu)?; From d3007d3f644c110aca6ad7686191ec0c2c30174e Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 4 Jul 2023 11:22:46 +0200 Subject: [PATCH 4/5] add test --- .../kuttl/resources/03-assert.yaml.j2 | 24 +++++++++++++++++++ .../03-install-airflow-cluster.yaml.j2 | 10 ++++++++ 2 files changed, 34 insertions(+) diff --git a/tests/templates/kuttl/resources/03-assert.yaml.j2 b/tests/templates/kuttl/resources/03-assert.yaml.j2 index 25eaa80c..8ea8a6c3 100644 --- a/tests/templates/kuttl/resources/03-assert.yaml.j2 +++ b/tests/templates/kuttl/resources/03-assert.yaml.j2 @@ -63,6 +63,30 @@ status: --- apiVersion: apps/v1 kind: StatefulSet +metadata: + name: airflow-worker-resources-from-pod-overrides +spec: + template: + spec: + containers: + - name: airflow + resources: + requests: + cpu: 300m + memory: 1Gi + limits: + cpu: 900m + memory: 1Gi + - name: metrics +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + - name: vector +{% endif %} +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet metadata: name: airflow-scheduler-default status: diff --git a/tests/templates/kuttl/resources/03-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/resources/03-install-airflow-cluster.yaml.j2 index 6025e118..a2c985d2 100644 --- a/tests/templates/kuttl/resources/03-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/resources/03-install-airflow-cluster.yaml.j2 @@ -65,6 +65,16 @@ spec: memory: limit: 2Gi replicas: 1 + resources-from-pod-overrides: + podOverrides: + spec: + containers: + - name: airflow + resources: + requests: + cpu: 300m + limits: + cpu: 900m schedulers: config: logging: From 3a902c4319c1aef72e533140eb5730dcb8c97997 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 4 Jul 2023 11:33:49 +0200 Subject: [PATCH 5/5] prevent OOMKillded --- tests/templates/kuttl/resources/03-assert.yaml.j2 | 12 ++++++------ .../resources/03-install-airflow-cluster.yaml.j2 | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/templates/kuttl/resources/03-assert.yaml.j2 b/tests/templates/kuttl/resources/03-assert.yaml.j2 index 8ea8a6c3..c96bd295 100644 --- a/tests/templates/kuttl/resources/03-assert.yaml.j2 +++ b/tests/templates/kuttl/resources/03-assert.yaml.j2 @@ -25,10 +25,10 @@ spec: resources: requests: cpu: 100m - memory: 1Gi + memory: 2Gi limits: cpu: "1" - memory: 1Gi + memory: 2Gi - name: metrics {% if lookup('env', 'VECTOR_AGGREGATOR') %} - name: vector @@ -49,10 +49,10 @@ spec: resources: requests: cpu: 200m - memory: 2Gi + memory: 3Gi limits: cpu: "2" - memory: 2Gi + memory: 3Gi - name: metrics {% if lookup('env', 'VECTOR_AGGREGATOR') %} - name: vector @@ -73,10 +73,10 @@ spec: resources: requests: cpu: 300m - memory: 1Gi + memory: 2Gi limits: cpu: 900m - memory: 1Gi + memory: 2Gi - name: metrics {% if lookup('env', 'VECTOR_AGGREGATOR') %} - name: vector diff --git a/tests/templates/kuttl/resources/03-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/resources/03-install-airflow-cluster.yaml.j2 index a2c985d2..7b879140 100644 --- a/tests/templates/kuttl/resources/03-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/resources/03-install-airflow-cluster.yaml.j2 @@ -52,7 +52,7 @@ spec: min: 100m max: "1" memory: - limit: 1Gi + limit: 2Gi roleGroups: resources-from-role: replicas: 1 @@ -63,7 +63,7 @@ spec: min: 200m max: "2" memory: - limit: 2Gi + limit: 3Gi replicas: 1 resources-from-pod-overrides: podOverrides: