Skip to content

Commit

Permalink
feat: Support podOverrides
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernauer committed Jul 3, 2023
1 parent 23ed311 commit f1c3e07
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Support Airflow `2.6.1` ([#284]).
- Set explicit resources on all containers ([#289])
- Operator errors out when credentialsSecret is missing ([#293]).
- Support podOverrides ([#XXX]).

### Changed

Expand Down
96 changes: 51 additions & 45 deletions rust/operator-binary/src/airflow_controller.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Ensures that `Pod`s are configured and running for each [`AirflowCluster`]
use stackable_operator::builder::resources::ResourceRequirementsBuilder;
use stackable_operator::k8s_openapi::DeepMerge;

use crate::config::{self, PYTHON_IMPORTS};
use crate::controller_commons::{
Expand Down Expand Up @@ -615,19 +616,41 @@ fn build_server_rolegroup_statefulset(

let rolegroup = role.role_groups.get(&rolegroup_ref.role_group);

// initialising commands
let commands = airflow_role.get_commands();

// container
let mut cb = ContainerBuilder::new(&Container::Airflow.to_string())
.context(InvalidContainerNameSnafu)?;
let mut pb = PodBuilder::new();
pb.metadata_builder(|m| {
m.with_recommended_labels(build_recommended_labels(
airflow,
AIRFLOW_CONTROLLER_NAME,
&resolved_product_image.app_version_label,
&rolegroup_ref.role,
&rolegroup_ref.role_group,
))
})
.image_pull_secrets_from_product_image(resolved_product_image)
.affinity(&config.affinity)
.service_account_name(sa_name)
.security_context(
PodSecurityContextBuilder::new()
.run_as_user(AIRFLOW_UID)
.run_as_group(0)
.fs_group(1000) // Needed for secret-operator
.build(),
);

let mut airflow_container = ContainerBuilder::new(&Container::Airflow.to_string())
.context(InvalidContainerNameSnafu)?;

if let Some(authentication_class) = authentication_class {
add_authentication_volumes_and_volume_mounts(authentication_class, &mut cb, &mut pb)?;
add_authentication_volumes_and_volume_mounts(
authentication_class,
&mut airflow_container,
&mut pb,
)?;
}

let cb = cb
airflow_container
.image_from_product_image(resolved_product_image)
.resources(config.resources.clone().into())
.command(vec!["/bin/bash".to_string()])
Expand All @@ -648,15 +671,15 @@ fn build_server_rolegroup_statefulset(
// mapped environment variables
let env_mapped = build_mapped_envs(airflow, rolegroup_config);

cb.add_env_vars(env_config);
cb.add_env_vars(env_mapped);
cb.add_env_vars(build_static_envs());
airflow_container.add_env_vars(env_config);
airflow_container.add_env_vars(env_mapped);
airflow_container.add_env_vars(build_static_envs());

let volume_mounts = airflow.volume_mounts();
cb.add_volume_mounts(volume_mounts);
cb.add_volume_mount(CONFIG_VOLUME_NAME, CONFIG_PATH);
cb.add_volume_mount(LOG_CONFIG_VOLUME_NAME, LOG_CONFIG_DIR);
cb.add_volume_mount(LOG_VOLUME_NAME, STACKABLE_LOG_DIR);
airflow_container.add_volume_mounts(volume_mounts);
airflow_container.add_volume_mount(CONFIG_VOLUME_NAME, CONFIG_PATH);
airflow_container.add_volume_mount(LOG_CONFIG_VOLUME_NAME, LOG_CONFIG_DIR);
airflow_container.add_volume_mount(LOG_VOLUME_NAME, STACKABLE_LOG_DIR);

if let Some(resolved_port) = airflow_role.get_http_port() {
let probe = Probe {
Expand All @@ -668,12 +691,12 @@ fn build_server_rolegroup_statefulset(
period_seconds: Some(5),
..Probe::default()
};
cb.readiness_probe(probe.clone());
cb.liveness_probe(probe);
cb.add_container_port("http", resolved_port.into());
airflow_container.readiness_probe(probe.clone());
airflow_container.liveness_probe(probe);
airflow_container.add_container_port("http", resolved_port.into());
}

let container = cb.build();
pb.add_container(airflow_container.build());

let metrics_container = ContainerBuilder::new("metrics")
.context(InvalidContainerNameSnafu)?
Expand All @@ -690,16 +713,14 @@ fn build_server_rolegroup_statefulset(
.build(),
)
.build();
pb.add_container(metrics_container);

let mut volumes = airflow.volumes();
volumes.extend(controller_commons::create_volumes(
pb.add_volumes(airflow.volumes());
pb.add_volumes(controller_commons::create_volumes(
&rolegroup_ref.object_name(),
config.logging.containers.get(&Container::Airflow),
));

pb.add_container(container);
pb.add_container(metrics_container);

if let Some(gitsync) = airflow.git_sync() {
let gitsync_container = ContainerBuilder::new(&format!("{}-{}", GIT_SYNC_NAME, 1))
.context(InvalidContainerNameSnafu)?
Expand All @@ -718,7 +739,7 @@ fn build_server_rolegroup_statefulset(
)
.build();

volumes.push(
pb.add_volume(
VolumeBuilder::new(GIT_CONTENT)
.empty_dir(EmptyDirVolumeSource::default())
.build(),
Expand All @@ -741,6 +762,12 @@ fn build_server_rolegroup_statefulset(
));
}

let mut pod_template = pb.build_template();
pod_template.merge_from(role.config.pod_overrides.clone());
if let Some(rolegroup) = rolegroup {
pod_template.merge_from(rolegroup.config.pod_overrides.clone());
}

Ok(StatefulSet {
metadata: ObjectMetaBuilder::new()
.name_and_namespace(airflow)
Expand Down Expand Up @@ -769,28 +796,7 @@ fn build_server_rolegroup_statefulset(
..LabelSelector::default()
},
service_name: rolegroup_ref.object_name(),
template: pb
.metadata_builder(|m| {
m.with_recommended_labels(build_recommended_labels(
airflow,
AIRFLOW_CONTROLLER_NAME,
&resolved_product_image.app_version_label,
&rolegroup_ref.role,
&rolegroup_ref.role_group,
))
})
.image_pull_secrets_from_product_image(resolved_product_image)
.add_volumes(volumes)
.affinity(&config.affinity)
.service_account_name(sa_name)
.security_context(
PodSecurityContextBuilder::new()
.run_as_user(AIRFLOW_UID)
.run_as_group(0)
.fs_group(1000) // Needed for secret-operator
.build(),
)
.build_template(),
template: pod_template,
..StatefulSetSpec::default()
}),
status: None,
Expand Down

0 comments on commit f1c3e07

Please sign in to comment.