Skip to content

Commit

Permalink
WIP history server tests
Browse files Browse the repository at this point in the history
  • Loading branch information
adwk67 committed Aug 15, 2024
1 parent c87f14c commit 250e22c
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 20 deletions.
61 changes: 61 additions & 0 deletions rust/crd/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,64 @@ impl Configuration for HistoryConfigFragment {
Ok(BTreeMap::new())
}
}

#[cfg(test)]
mod test {
use super::*;
use indoc::indoc;

#[test]
pub fn test_env_overrides() {
let input = indoc! {r#"
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkHistoryServer
metadata:
name: spark-history
spec:
image:
productVersion: 3.5.1
logFileDirectory:
s3:
prefix: eventlogs/
bucket:
reference: spark-history-s3-bucket
nodes:
envOverrides:
TEST_SPARK_HIST_VAR: ROLE
roleGroups:
default:
replicas: 1
config:
cleaner: true
envOverrides:
TEST_SPARK_HIST_VAR: ROLEGROUP
"#};

let deserializer = serde_yaml::Deserializer::from_str(input);
let history: SparkHistoryServer =
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();

assert_eq!(
Some(&"ROLE".to_string()),
history
.spec
.nodes
.config
.env_overrides
.get("TEST_SPARK_HIST_VAR")
);
assert_eq!(
Some(&"ROLEGROUP".to_string()),
history
.spec
.nodes
.role_groups
.get("default")
.unwrap()
.config
.env_overrides
.get("TEST_SPARK_HIST_VAR")
);
}
}
102 changes: 82 additions & 20 deletions rust/operator-binary/src/history/history_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,16 @@ fn build_stateful_set(
..PodSecurityContext::default()
});

let role_group = shs
.rolegroup(rolegroupref)
.with_context(|_| CannotRetrieveRoleGroupSnafu)?;

let merged_env_vars = env_vars(
s3_log_dir,
shs.role().config.clone().env_overrides,
role_group.config.env_overrides,
);

let container_name = "spark-history";
let container = ContainerBuilder::new(container_name)
.context(InvalidContainerNameSnafu)?
Expand All @@ -449,7 +459,7 @@ fn build_stateful_set(
.args(command_args(s3_log_dir))
.add_container_port("http", 18080)
.add_container_port("metrics", METRICS_PORT.into())
.add_env_vars(env_vars(s3_log_dir))
.add_env_vars(merged_env_vars)
.add_volume_mounts(s3_log_dir.volume_mounts())
.add_volume_mount(VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_PATH_CONFIG)
.add_volume_mount(VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_LOG_CONFIG)
Expand Down Expand Up @@ -670,21 +680,33 @@ fn command_args(s3logdir: &S3LogDir) -> Vec<String> {
vec![String::from("-c"), command.join(" && ")]
}

fn env_vars(s3logdir: &S3LogDir) -> Vec<EnvVar> {
let mut vars: Vec<EnvVar> = vec![];
fn env_vars(
s3logdir: &S3LogDir,
role_env_overrides: HashMap<String, String>,
role_group_env_overrides: HashMap<String, String>,
) -> Vec<EnvVar> {
// Maps env var name to env var object. This allows env_overrides to work
// as expected (i.e. users can override the env var value).
let mut vars: BTreeMap<String, EnvVar> = BTreeMap::new();

// This env var prevents the history server from detaching itself from the
// start script because this leads to the Pod terminating immediately.
vars.push(EnvVar {
name: "SPARK_NO_DAEMONIZE".to_string(),
value: Some("true".into()),
value_from: None,
});
vars.push(EnvVar {
name: "SPARK_DAEMON_CLASSPATH".to_string(),
value: Some("/stackable/spark/extra-jars/*".into()),
value_from: None,
});
vars.insert(
"SPARK_NO_DAEMONIZE".to_string(),
EnvVar {
name: "SPARK_NO_DAEMONIZE".to_string(),
value: Some("true".into()),
value_from: None,
},
);
vars.insert(
"SPARK_DAEMON_CLASSPATH".to_string(),
EnvVar {
name: "SPARK_DAEMON_CLASSPATH".to_string(),
value: Some("/stackable/spark/extra-jars/*".into()),
value_from: None,
},
);

let mut history_opts = vec![
format!("-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"),
Expand All @@ -693,6 +715,8 @@ fn env_vars(s3logdir: &S3LogDir) -> Vec<EnvVar> {
),
format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent.jar={METRICS_PORT}:/stackable/jmx/config.yaml")
];

// if TLS is enabled build truststore
if tlscerts::tls_secret_name(&s3logdir.bucket.connection).is_some() {
history_opts.extend(vec![
format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"),
Expand All @@ -701,13 +725,51 @@ fn env_vars(s3logdir: &S3LogDir) -> Vec<EnvVar> {
]);
}

vars.push(EnvVar {
name: "SPARK_HISTORY_OPTS".to_string(),
value: Some(history_opts.join(" ")),
value_from: None,
});
// if TLS is enabled build truststore
vars
vars.insert(
"SPARK_HISTORY_OPTS".to_string(),
EnvVar {
name: "SPARK_HISTORY_OPTS".to_string(),
value: Some(history_opts.join(" ")),
value_from: None,
},
);

// apply the role overrides
let mut role_envs = role_env_overrides
.into_iter()
.map(|env_var| {
(
env_var.0.clone(),
EnvVar {
name: env_var.0.clone(),
value: Some(env_var.1),
value_from: None,
},
)
})
.collect();

vars.append(&mut role_envs);

// apply the role-group overrides
let mut role_group_envs = role_group_env_overrides
.into_iter()
.map(|env_var| {
(
env_var.0.clone(),
EnvVar {
name: env_var.0.clone(),
value: Some(env_var.1),
value_from: None,
},
)
})
.collect();

vars.append(&mut role_group_envs);

// convert to Vec
vars.into_values().collect()
}

fn labels<'a, T>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ spec:
replicas: 1
config:
cleaner: true
envOverrides:
TEST_SPARK_HIST_VAR_0: ORIGINAL
podOverrides:
spec:
containers:
Expand Down

0 comments on commit 250e22c

Please sign in to comment.