diff --git a/CHANGELOG.md b/CHANGELOG.md index 573e96fb..ed99bbe4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - Support Airflow `2.6.1` ([#284]). - Set explicit resources on all containers ([#289]) - Operator errors out when credentialsSecret is missing ([#293]). +- Support podOverrides ([#295]). ### Changed @@ -28,6 +29,7 @@ [#289]: https://github.com/stackabletech/airflow-operator/pull/289 [#291]: https://github.com/stackabletech/airflow-operator/pull/291 [#293]: https://github.com/stackabletech/airflow-operator/pull/293 +[#295]: https://github.com/stackabletech/airflow-operator/pull/295 ## [23.4.0] - 2023-04-17 diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index 101f8232..27760c8e 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -377,7 +377,7 @@ impl AirflowRole { } impl AirflowCluster { - pub fn get_role(&self, role: AirflowRole) -> &Option> { + pub fn get_role(&self, role: &AirflowRole) -> &Option> { match role { AirflowRole::Webserver => &self.spec.webservers, AirflowRole::Scheduler => &self.spec.schedulers, diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index 68994a6d..64f4a835 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -1,5 +1,6 @@ //! Ensures that `Pod`s are configured and running for each [`AirflowCluster`] use stackable_operator::builder::resources::ResourceRequirementsBuilder; +use stackable_operator::k8s_openapi::DeepMerge; use crate::config::{self, PYTHON_IMPORTS}; use crate::controller_commons::{ @@ -237,7 +238,7 @@ pub async fn reconcile_airflow(airflow: Arc, ctx: Arc) -> R let mut roles = HashMap::new(); for role in AirflowRole::iter() { - if let Some(resolved_role) = airflow.get_role(role.clone()).clone() { + if let Some(resolved_role) = airflow.get_role(&role).clone() { roles.insert( role.to_string(), ( @@ -370,6 +371,7 @@ pub async fn reconcile_airflow(airflow: Arc, ctx: Arc) -> R let rg_statefulset = build_server_rolegroup_statefulset( &airflow, &resolved_product_image, + &airflow_role, &rolegroup, rolegroup_config, authentication_class.as_ref(), @@ -598,36 +600,59 @@ fn build_rolegroup_service( /// The rolegroup [`StatefulSet`] runs the rolegroup, as configured by the administrator. /// /// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding [`Service`] (from [`build_rolegroup_service`]). +#[allow(clippy::too_many_arguments)] fn build_server_rolegroup_statefulset( airflow: &AirflowCluster, resolved_product_image: &ResolvedProductImage, + airflow_role: &AirflowRole, rolegroup_ref: &RoleGroupRef, rolegroup_config: &HashMap>, authentication_class: Option<&AuthenticationClass>, sa_name: &str, config: &AirflowConfig, ) -> Result { - let airflow_role = AirflowRole::from_str(&rolegroup_ref.role).unwrap(); let role = airflow - .get_role(airflow_role.clone()) + .get_role(airflow_role) .as_ref() .context(NoAirflowRoleSnafu)?; let rolegroup = role.role_groups.get(&rolegroup_ref.role_group); - // initialising commands let commands = airflow_role.get_commands(); - // container - let mut cb = ContainerBuilder::new(&Container::Airflow.to_string()) - .context(InvalidContainerNameSnafu)?; let mut pb = PodBuilder::new(); + pb.metadata_builder(|m| { + m.with_recommended_labels(build_recommended_labels( + airflow, + AIRFLOW_CONTROLLER_NAME, + &resolved_product_image.app_version_label, + &rolegroup_ref.role, + &rolegroup_ref.role_group, + )) + }) + .image_pull_secrets_from_product_image(resolved_product_image) + .affinity(&config.affinity) + .service_account_name(sa_name) + .security_context( + PodSecurityContextBuilder::new() + .run_as_user(AIRFLOW_UID) + .run_as_group(0) + .fs_group(1000) // Needed for secret-operator + .build(), + ); + + let mut airflow_container = ContainerBuilder::new(&Container::Airflow.to_string()) + .context(InvalidContainerNameSnafu)?; if let Some(authentication_class) = authentication_class { - add_authentication_volumes_and_volume_mounts(authentication_class, &mut cb, &mut pb)?; + add_authentication_volumes_and_volume_mounts( + authentication_class, + &mut airflow_container, + &mut pb, + )?; } - let cb = cb + airflow_container .image_from_product_image(resolved_product_image) .resources(config.resources.clone().into()) .command(vec!["/bin/bash".to_string()]) @@ -648,15 +673,15 @@ fn build_server_rolegroup_statefulset( // mapped environment variables let env_mapped = build_mapped_envs(airflow, rolegroup_config); - cb.add_env_vars(env_config); - cb.add_env_vars(env_mapped); - cb.add_env_vars(build_static_envs()); + airflow_container.add_env_vars(env_config); + airflow_container.add_env_vars(env_mapped); + airflow_container.add_env_vars(build_static_envs()); let volume_mounts = airflow.volume_mounts(); - cb.add_volume_mounts(volume_mounts); - cb.add_volume_mount(CONFIG_VOLUME_NAME, CONFIG_PATH); - cb.add_volume_mount(LOG_CONFIG_VOLUME_NAME, LOG_CONFIG_DIR); - cb.add_volume_mount(LOG_VOLUME_NAME, STACKABLE_LOG_DIR); + airflow_container.add_volume_mounts(volume_mounts); + airflow_container.add_volume_mount(CONFIG_VOLUME_NAME, CONFIG_PATH); + airflow_container.add_volume_mount(LOG_CONFIG_VOLUME_NAME, LOG_CONFIG_DIR); + airflow_container.add_volume_mount(LOG_VOLUME_NAME, STACKABLE_LOG_DIR); if let Some(resolved_port) = airflow_role.get_http_port() { let probe = Probe { @@ -668,12 +693,12 @@ fn build_server_rolegroup_statefulset( period_seconds: Some(5), ..Probe::default() }; - cb.readiness_probe(probe.clone()); - cb.liveness_probe(probe); - cb.add_container_port("http", resolved_port.into()); + airflow_container.readiness_probe(probe.clone()); + airflow_container.liveness_probe(probe); + airflow_container.add_container_port("http", resolved_port.into()); } - let container = cb.build(); + pb.add_container(airflow_container.build()); let metrics_container = ContainerBuilder::new("metrics") .context(InvalidContainerNameSnafu)? @@ -690,16 +715,14 @@ fn build_server_rolegroup_statefulset( .build(), ) .build(); + pb.add_container(metrics_container); - let mut volumes = airflow.volumes(); - volumes.extend(controller_commons::create_volumes( + pb.add_volumes(airflow.volumes()); + pb.add_volumes(controller_commons::create_volumes( &rolegroup_ref.object_name(), config.logging.containers.get(&Container::Airflow), )); - pb.add_container(container); - pb.add_container(metrics_container); - if let Some(gitsync) = airflow.git_sync() { let gitsync_container = ContainerBuilder::new(&format!("{}-{}", GIT_SYNC_NAME, 1)) .context(InvalidContainerNameSnafu)? @@ -718,7 +741,7 @@ fn build_server_rolegroup_statefulset( ) .build(); - volumes.push( + pb.add_volume( VolumeBuilder::new(GIT_CONTENT) .empty_dir(EmptyDirVolumeSource::default()) .build(), @@ -741,6 +764,12 @@ fn build_server_rolegroup_statefulset( )); } + let mut pod_template = pb.build_template(); + pod_template.merge_from(role.config.pod_overrides.clone()); + if let Some(rolegroup) = rolegroup { + pod_template.merge_from(rolegroup.config.pod_overrides.clone()); + } + Ok(StatefulSet { metadata: ObjectMetaBuilder::new() .name_and_namespace(airflow) @@ -769,28 +798,7 @@ fn build_server_rolegroup_statefulset( ..LabelSelector::default() }, service_name: rolegroup_ref.object_name(), - template: pb - .metadata_builder(|m| { - m.with_recommended_labels(build_recommended_labels( - airflow, - AIRFLOW_CONTROLLER_NAME, - &resolved_product_image.app_version_label, - &rolegroup_ref.role, - &rolegroup_ref.role_group, - )) - }) - .image_pull_secrets_from_product_image(resolved_product_image) - .add_volumes(volumes) - .affinity(&config.affinity) - .service_account_name(sa_name) - .security_context( - PodSecurityContextBuilder::new() - .run_as_user(AIRFLOW_UID) - .run_as_group(0) - .fs_group(1000) // Needed for secret-operator - .build(), - ) - .build_template(), + template: pod_template, ..StatefulSetSpec::default() }), status: None, diff --git a/tests/templates/kuttl/resources/03-assert.yaml.j2 b/tests/templates/kuttl/resources/03-assert.yaml.j2 index 25eaa80c..c96bd295 100644 --- a/tests/templates/kuttl/resources/03-assert.yaml.j2 +++ b/tests/templates/kuttl/resources/03-assert.yaml.j2 @@ -25,10 +25,10 @@ spec: resources: requests: cpu: 100m - memory: 1Gi + memory: 2Gi limits: cpu: "1" - memory: 1Gi + memory: 2Gi - name: metrics {% if lookup('env', 'VECTOR_AGGREGATOR') %} - name: vector @@ -49,9 +49,33 @@ spec: resources: requests: cpu: 200m - memory: 2Gi + memory: 3Gi limits: cpu: "2" + memory: 3Gi + - name: metrics +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + - name: vector +{% endif %} +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-worker-resources-from-pod-overrides +spec: + template: + spec: + containers: + - name: airflow + resources: + requests: + cpu: 300m + memory: 2Gi + limits: + cpu: 900m memory: 2Gi - name: metrics {% if lookup('env', 'VECTOR_AGGREGATOR') %} diff --git a/tests/templates/kuttl/resources/03-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/resources/03-install-airflow-cluster.yaml.j2 index 6025e118..7b879140 100644 --- a/tests/templates/kuttl/resources/03-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/resources/03-install-airflow-cluster.yaml.j2 @@ -52,7 +52,7 @@ spec: min: 100m max: "1" memory: - limit: 1Gi + limit: 2Gi roleGroups: resources-from-role: replicas: 1 @@ -63,8 +63,18 @@ spec: min: 200m max: "2" memory: - limit: 2Gi + limit: 3Gi replicas: 1 + resources-from-pod-overrides: + podOverrides: + spec: + containers: + - name: airflow + resources: + requests: + cpu: 300m + limits: + cpu: 900m schedulers: config: logging: