Skip to content

Commit

Permalink
pass volumes/volume-mounts and env-vars through to gitsync containers (
Browse files Browse the repository at this point in the history
…#456)

* pass volumes/volume-monuts and env-vars through to gitsync containers

* changelog

* adapted gitsync test for envs and mounts

* pass pod overrides to template

* Update tests/templates/kuttl/mount-dags-gitsync/31-assert.yaml.j2

Co-authored-by: Razvan-Daniel Mihai <[email protected]>

* refactored duplicate code

---------

Co-authored-by: Razvan-Daniel Mihai <[email protected]>
  • Loading branch information
adwk67 and razvan authored Jun 21, 2024
1 parent b30eb32 commit 490684c
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 30 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
- Remove requirement of celery configs when using kubernetes executors ([#445]).
- Processing of corrupted log events fixed; If errors occur, the error
messages are added to the log event ([#449]).
- Add volumes/volumeMounts/envOverrides to gitsync containers ([#456]).

[#404]: https://github.com/stackabletech/airflow-operator/pull/404
[#439]: https://github.com/stackabletech/airflow-operator/pull/439
[#445]: https://github.com/stackabletech/airflow-operator/pull/445
[#449]: https://github.com/stackabletech/airflow-operator/pull/449
[#456]: https://github.com/stackabletech/airflow-operator/pull/456

## [24.3.0] - 2024-03-20

Expand Down
15 changes: 12 additions & 3 deletions rust/operator-binary/src/airflow_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use stackable_airflow_crd::{
GIT_SYNC_NAME, LOG_CONFIG_DIR, OPERATOR_NAME, STACKABLE_LOG_DIR, TEMPLATE_CONFIGMAP_NAME,
TEMPLATE_LOCATION, TEMPLATE_NAME, TEMPLATE_VOLUME_NAME,
};
use stackable_operator::k8s_openapi::api::core::v1::EnvVar;
use stackable_operator::k8s_openapi::api::core::v1::{EnvVar, PodTemplateSpec, VolumeMount};
use stackable_operator::kube::api::ObjectMeta;
use stackable_operator::{
builder::{
Expand Down Expand Up @@ -548,6 +548,7 @@ async fn build_executor_template(
&rbac_sa.name_unchecked(),
&merged_executor_config,
&common_config.env_overrides,
&common_config.pod_overrides,
&rolegroup,
)?;
cluster_resources
Expand Down Expand Up @@ -938,6 +939,7 @@ fn build_server_rolegroup_statefulset(
false,
&format!("{}-{}", GIT_SYNC_NAME, 1),
build_gitsync_statefulset_envs(rolegroup_config),
airflow.volume_mounts(),
)?;

pb.add_volume(
Expand All @@ -955,6 +957,7 @@ fn build_server_rolegroup_statefulset(
true,
&format!("{}-{}", GIT_SYNC_NAME, 0),
build_gitsync_statefulset_envs(rolegroup_config),
airflow.volume_mounts(),
)?;
// If the DAG is modularized we may encounter a timing issue whereby the celery worker has started
// *before* all modules referenced by the DAG have been fetched by gitsync and registered. This
Expand Down Expand Up @@ -1052,6 +1055,7 @@ fn build_executor_template_config_map(
sa_name: &str,
merged_executor_config: &ExecutorConfig,
env_overrides: &HashMap<String, String>,
pod_overrides: &PodTemplateSpec,
rolegroup_ref: &RoleGroupRef<AirflowCluster>,
) -> Result<ConfigMap> {
let mut pb = PodBuilder::new();
Expand Down Expand Up @@ -1122,7 +1126,8 @@ fn build_executor_template_config_map(
&gitsync,
true,
&format!("{}-{}", GIT_SYNC_NAME, 0),
build_gitsync_template(&gitsync.credentials_secret),
build_gitsync_template(env_overrides),
airflow.volume_mounts(),
)?;
pb.add_volume(
VolumeBuilder::new(GIT_CONTENT)
Expand All @@ -1142,7 +1147,9 @@ fn build_executor_template_config_map(
));
}

let pod_template = pb.build_template();
let mut pod_template = pb.build_template();
pod_template.merge_from(pod_overrides.clone());

let mut cm_builder = ConfigMapBuilder::new();

let restarter_label =
Expand Down Expand Up @@ -1180,6 +1187,7 @@ fn build_gitsync_container(
one_time: bool,
name: &str,
env_vars: Vec<EnvVar>,
volume_mounts: Vec<VolumeMount>,
) -> Result<k8s_openapi::api::core::v1::Container, Error> {
let gitsync_container = ContainerBuilder::new(name)
.context(InvalidContainerNameSnafu)?
Expand All @@ -1194,6 +1202,7 @@ fn build_gitsync_container(
])
.args(vec![gitsync.get_args(one_time).join("\n")])
.add_volume_mount(GIT_CONTENT, GIT_ROOT)
.add_volume_mounts(volume_mounts)
.resources(
ResourceRequirementsBuilder::new()
.with_cpu_request("100m")
Expand Down
62 changes: 35 additions & 27 deletions rust/operator-binary/src/env_vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,22 +299,16 @@ fn static_envs(airflow: &AirflowCluster) -> BTreeMap<String, EnvVar> {
pub fn build_gitsync_statefulset_envs(
rolegroup_config: &HashMap<PropertyNameKind, BTreeMap<String, String>>,
) -> Vec<EnvVar> {
let mut env = vec![];
let mut env: BTreeMap<String, EnvVar> = BTreeMap::new();

if let Some(git_secret) = rolegroup_config
.get(&PropertyNameKind::Env)
.and_then(|vars| vars.get(AirflowConfig::GIT_CREDENTIALS_SECRET_PROPERTY))
{
env.push(env_var_from_secret(GITSYNC_USERNAME, git_secret, "user"));
env.push(env_var_from_secret(
GITSYNC_PASSWORD,
git_secret,
"password",
));
if let Some(git_env) = rolegroup_config.get(&PropertyNameKind::Env) {
for (k, v) in git_env.iter() {
gitsync_vars_map(k, &mut env, v);
}
}

tracing::debug!("Env-var set [{:?}]", env);
env
transform_map_to_vec(env)
}

/// Return environment variables to be applied to the configuration map used in conjunction with
Expand Down Expand Up @@ -407,23 +401,37 @@ pub fn build_airflow_template_envs(

/// Return environment variables to be applied to the configuration map used in conjunction with
/// the `kubernetesExecutor` worker: applied to the gitsync `initContainer`.
pub fn build_gitsync_template(credentials_secret: &Option<String>) -> Vec<EnvVar> {
let mut env = vec![];

if let Some(credentials_secret) = &credentials_secret {
env.push(env_var_from_secret(
GITSYNC_USERNAME,
credentials_secret,
"user",
));
env.push(env_var_from_secret(
GITSYNC_PASSWORD,
credentials_secret,
"password",
));
pub fn build_gitsync_template(env_overrides: &HashMap<String, String>) -> Vec<EnvVar> {
let mut env: BTreeMap<String, EnvVar> = BTreeMap::new();

for (k, v) in env_overrides.iter().collect::<BTreeMap<_, _>>() {
gitsync_vars_map(k, &mut env, v);
}

tracing::debug!("Env-var set [{:?}]", env);
env
transform_map_to_vec(env)
}

fn gitsync_vars_map(k: &String, env: &mut BTreeMap<String, EnvVar>, v: &String) {
if k.eq_ignore_ascii_case(AirflowConfig::GIT_CREDENTIALS_SECRET_PROPERTY) {
env.insert(
GITSYNC_USERNAME.to_string(),
env_var_from_secret(GITSYNC_USERNAME, k, "user"),
);
env.insert(
GITSYNC_PASSWORD.to_string(),
env_var_from_secret(GITSYNC_PASSWORD, k, "password"),
);
} else {
env.insert(
k.to_string(),
EnvVar {
name: k.to_string(),
value: Some(v.to_string()),
..Default::default()
},
);
}
}

// Internally the environment variable collection uses a map so that overrides can actually
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ stringData:
connections.celeryBrokerUrl: redis://:redis@airflow-redis-master:6379/0
{% endif %}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: test-cm-gitsync
data:
test.txt: |
some test text here
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
Expand All @@ -43,6 +51,13 @@ spec:
# supply some config to check that safe.directory is correctly set
--git-config: http.sslVerify:false
gitFolder: "tests/templates/kuttl/mount-dags-gitsync/dags"
volumeMounts:
- name: test-cm-gitsync
mountPath: /tmp/test.txt
volumes:
- name: test-cm-gitsync
configMap:
name: test-cm-gitsync
webservers:
config:
logging:
Expand All @@ -61,11 +76,13 @@ spec:
default:
envOverrides:
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
AIRFLOW_TEST_VAR: "test"
replicas: 1
{% elif test_scenario['values']['executor'] == 'kubernetes' %}
kubernetesExecutors:
envOverrides:
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
AIRFLOW_TEST_VAR: "test"
config:
logging:
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
Expand Down
19 changes: 19 additions & 0 deletions tests/templates/kuttl/mount-dags-gitsync/31-assert.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 30
commands:

{% if test_scenario['values']['executor'] == 'kubernetes' %}
# check that the executor template configmap contains mounts and envs
# will expect 4 (2 from from the volume declaration + mounts to two containers, base and gitsync)
- script: kubectl -n $NAMESPACE get cm airflow-executor-pod-template -o json | jq -r '.data."airflow_executor_pod_template.yaml"' | grep "test-cm-gitsync" | wc -l | grep 4
# will expect 2 (two containers, base and gitsync)
- script: kubectl -n $NAMESPACE get cm airflow-executor-pod-template -o json | jq -r '.data."airflow_executor_pod_template.yaml"' | grep "AIRFLOW_TEST_VAR" | wc -l | grep 2
{% else %}
# check that the statefulset contains mounts and envs
# will expect 6 (2 from from the volume declaration + mounts to 3 containers, base and 2 gitsyncs, plus configmap restarter)
- script: kubectl -n $NAMESPACE get sts airflow-worker-default -o json | grep "test-cm-gitsync" | wc -l | grep 6
# will expect 3 (two containers, base and gitsync-1, and one initContainer gitsync-0)
- script: kubectl -n $NAMESPACE get sts airflow-worker-default -o json | grep "AIRFLOW_TEST_VAR" | wc -l | grep 3
{% endif %}

0 comments on commit 490684c

Please sign in to comment.