Skip to content

Commit

Permalink
First working version
Browse files Browse the repository at this point in the history
  • Loading branch information
razvan committed Aug 16, 2023
1 parent a11150f commit 105c92d
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 22 deletions.
39 changes: 38 additions & 1 deletion deploy/config-spec/properties.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,41 @@
version: 0.1.0
spec:
units: []
properties: []
properties:
- property: &jvmDnsCacheTtl
propertyNames:
- name: "networkaddress.cache.ttl"
kind:
type: "file"
file: "security.properties"
datatype:
type: "integer"
min: "0"
recommendedValues:
- fromVersion: "0.0.0"
value: "30"
roles:
- name: "node"
required: true
asOfVersion: "0.0.0"
comment: "History server - TTL for successfully resolved domain names."
description: "History server - TTL for successfully resolved domain names."

- property: &jvmDnsCacheNegativeTtl
propertyNames:
- name: "networkaddress.cache.negative.ttl"
kind:
type: "file"
file: "security.properties"
datatype:
type: "integer"
min: "0"
recommendedValues:
- fromVersion: "0.0.0"
value: "0"
roles:
- name: "node"
required: true
asOfVersion: "0.0.0"
comment: "History server - TTL for domain names that cannot be resolved."
description: "History server - TTL for domain names that cannot be resolved."
39 changes: 38 additions & 1 deletion deploy/helm/spark-k8s-operator/configs/properties.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,41 @@
version: 0.1.0
spec:
units: []
properties: []
properties:
- property: &jvmDnsCacheTtl
propertyNames:
- name: "networkaddress.cache.ttl"
kind:
type: "file"
file: "security.properties"
datatype:
type: "integer"
min: "0"
recommendedValues:
- fromVersion: "0.0.0"
value: "30"
roles:
- name: "node"
required: true
asOfVersion: "0.0.0"
comment: "History server - TTL for successfully resolved domain names."
description: "History server - TTL for successfully resolved domain names."

- property: &jvmDnsCacheNegativeTtl
propertyNames:
- name: "networkaddress.cache.negative.ttl"
kind:
type: "file"
file: "security.properties"
datatype:
type: "integer"
min: "0"
recommendedValues:
- fromVersion: "0.0.0"
value: "0"
roles:
- name: "node"
required: true
asOfVersion: "0.0.0"
comment: "History server - TTL for domain names that cannot be resolved."
description: "History server - TTL for domain names that cannot be resolved."
5 changes: 4 additions & 1 deletion rust/crd/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ impl SparkHistoryServer {
> = vec![(
HISTORY_ROLE_NAME.to_string(),
(
vec![PropertyNameKind::File(SPARK_DEFAULTS_FILE_NAME.to_string())],
vec![
PropertyNameKind::File(SPARK_DEFAULTS_FILE_NAME.to_string()),
PropertyNameKind::File(JVM_SECURITY_PROPERTIES_FILE.to_string()),
],
self.spec.nodes.clone(),
),
)]
Expand Down
23 changes: 20 additions & 3 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,11 +538,28 @@ impl SparkApplication {
}
}

// s3 with TLS
// Extra JVM opts:
// - java security properties
// - s3 with TLS
let mut extra_java_opts = vec![format!(
"-Djava.security.properties={VOLUME_MOUNT_PATH_LOG_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}"
)];
if tlscerts::tls_secret_names(s3conn, s3_log_dir).is_some() {
submit_cmd.push(format!("--conf spark.driver.extraJavaOptions=\"-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12 -Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD} -Djavax.net.ssl.trustStoreType=pkcs12 -Djavax.net.debug=ssl,handshake\""));
submit_cmd.push(format!("--conf spark.executor.extraJavaOptions=\"-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12 -Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD} -Djavax.net.ssl.trustStoreType=pkcs12 -Djavax.net.debug=ssl,handshake\""));
extra_java_opts.extend(
vec![
format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"),
format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"),
format!("-Djavax.net.ssl.trustStoreType=pkcs12"),
format!("-Djavax.net.debug=ssl,handshake"),
]
.into_iter(),
);
}
let str_extra_java_opts = extra_java_opts.join(" ");
submit_cmd.extend(vec![
format!("--conf spark.driver.extraJavaOptions=\"{str_extra_java_opts}\""),
format!("--conf spark.executor.extraJavaOptions=\"{str_extra_java_opts}\""),
]);

// repositories and packages arguments
if let Some(deps) = self.spec.deps.clone() {
Expand Down
64 changes: 48 additions & 16 deletions rust/operator-binary/src/history_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use stackable_operator::{
Resource, ResourceExt,
},
labels::{role_group_selector_labels, role_selector_labels, ObjectLabels},
product_config::ProductConfigManager,
product_config::{
types::PropertyNameKind, writer::to_java_properties_string, ProductConfigManager,
},
product_logging::{
framework::{calculate_log_volume_size_limit, vector_container},
spec::{
Expand All @@ -32,19 +34,20 @@ use stackable_operator::{
use stackable_spark_k8s_crd::{
constants::{
ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_IMAGE_BASE_NAME,
HISTORY_ROLE_NAME, LOG4J2_CONFIG_FILE, MAX_SPARK_LOG_FILES_SIZE, OPERATOR_NAME,
SECRET_ACCESS_KEY, SPARK_CLUSTER_ROLE, SPARK_DEFAULTS_FILE_NAME, SPARK_UID,
STACKABLE_TLS_STORE_PASSWORD, STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG,
VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_NAME_SPARK_DEFAULTS,
VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, VOLUME_MOUNT_PATH_SPARK_DEFAULTS,
HISTORY_ROLE_NAME, JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE,
MAX_SPARK_LOG_FILES_SIZE, OPERATOR_NAME, SECRET_ACCESS_KEY, SPARK_CLUSTER_ROLE,
SPARK_DEFAULTS_FILE_NAME, SPARK_UID, STACKABLE_TLS_STORE_PASSWORD, STACKABLE_TRUST_STORE,
VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG,
VOLUME_MOUNT_NAME_SPARK_DEFAULTS, VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG,
VOLUME_MOUNT_PATH_SPARK_DEFAULTS,
},
history,
history::{HistoryConfig, SparkHistoryServer, SparkHistoryServerContainer},
s3logdir::S3LogDir,
tlscerts,
};
use std::time::Duration;
use std::{collections::BTreeMap, sync::Arc};
use std::{collections::HashMap, time::Duration};

use snafu::{OptionExt, ResultExt, Snafu};
use stackable_operator::builder::resources::ResourceRequirementsBuilder;
Expand Down Expand Up @@ -129,6 +132,14 @@ pub enum Error {
},
#[snafu(display("cannot retrieve role group"))]
CannotRetrieveRoleGroup { source: history::Error },
#[snafu(display(
"History server : failed to serialize [{JVM_SECURITY_PROPERTIES_FILE}] for group {}",
rolegroup
))]
JvmSecurityProperties {
source: stackable_operator::product_config::writer::PropertiesWriterError,
rolegroup: String,
},
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -211,7 +222,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
role_group: rolegroup_name.into(),
};

let config = shs
let merged_config = shs
.merged_config(&rgr)
.context(FailedToResolveConfigSnafu)?;

Expand All @@ -228,7 +239,8 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac

let config_map = build_config_map(
&shs,
&config,
_rolegroup_config,
&merged_config,
&resolved_product_image.app_version_label,
&rgr,
s3_log_dir.as_ref().unwrap(),
Expand All @@ -244,7 +256,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
&resolved_product_image,
&rgr,
s3_log_dir.as_ref().unwrap(),
&config,
&merged_config,
&serviceaccount,
)?;
cluster_resources
Expand All @@ -268,7 +280,8 @@ pub fn error_policy(_obj: Arc<SparkHistoryServer>, _error: &Error, _ctx: Arc<Ctx

fn build_config_map(
shs: &SparkHistoryServer,
config: &HistoryConfig,
config: &HashMap<PropertyNameKind, BTreeMap<String, String>>,
merged_config: &HistoryConfig,
app_version_label: &str,
rolegroupref: &RoleGroupRef<SparkHistoryServer>,
s3_log_dir: &S3LogDir,
Expand All @@ -278,6 +291,16 @@ fn build_config_map(

let spark_defaults = spark_defaults(shs, s3_log_dir, rolegroupref)?;

let jvm_sec_props: BTreeMap<String, Option<String>> = config
.get(&PropertyNameKind::File(
JVM_SECURITY_PROPERTIES_FILE.to_string(),
))
.cloned()
.unwrap_or_default()
.into_iter()
.map(|(k, v)| (k, Some(v)))
.collect();

let mut cm_builder = ConfigMapBuilder::new();

cm_builder
Expand All @@ -290,12 +313,20 @@ fn build_config_map(
.with_recommended_labels(labels(shs, app_version_label, &rolegroupref.role_group))
.build(),
)
.add_data(SPARK_DEFAULTS_FILE_NAME, spark_defaults);
.add_data(SPARK_DEFAULTS_FILE_NAME, spark_defaults)
.add_data(
JVM_SECURITY_PROPERTIES_FILE,
to_java_properties_string(jvm_sec_props.iter()).with_context(|_| {
JvmSecurityPropertiesSnafu {
rolegroup: rolegroupref.role_group.clone(),
}
})?,
);

product_logging::extend_config_map(
rolegroupref,
vector_aggregator_address,
&config.logging,
&merged_config.logging,
SparkHistoryServerContainer::SparkHistory,
SparkHistoryServerContainer::Vector,
&mut cm_builder,
Expand Down Expand Up @@ -593,9 +624,10 @@ fn env_vars(s3logdir: &S3LogDir) -> Vec<EnvVar> {
});
vars.push(EnvVar {
name: "SPARK_HISTORY_OPTS".to_string(),
value: Some(format!(
"-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"
)),
value: Some(vec![
format!("-Dlog4j.configurationFile={VOLUME_MOUNT_PATH_LOG_CONFIG}/{LOG4J2_CONFIG_FILE}"),
format!("-Djava.security.properties={VOLUME_MOUNT_PATH_LOG_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}"),
].join(" ")),
value_from: None,
});
// if TLS is enabled build truststore
Expand Down

0 comments on commit 105c92d

Please sign in to comment.