Skip to content

Commit

Permalink
Remove redundant image policy.
Browse files Browse the repository at this point in the history
  • Loading branch information
razvan committed Aug 21, 2023
1 parent 0a207ca commit 672b468
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 135 deletions.
17 changes: 0 additions & 17 deletions deploy/helm/spark-k8s-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10163,23 +10163,6 @@ spec:
nullable: true
type: string
type: object
sparkImagePullPolicy:
enum:
- Always
- IfNotPresent
- Never
nullable: true
type: string
sparkImagePullSecrets:
items:
description: LocalObjectReference contains enough information to let you locate the referenced object inside the same namespace.
properties:
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
type: object
nullable: true
type: array
stopped:
nullable: true
type: boolean
Expand Down
102 changes: 4 additions & 98 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ use stackable_operator::{
merge::{Atomic, Merge},
},
k8s_openapi::{
api::core::v1::{
EmptyDirVolumeSource, EnvVar, LocalObjectReference, PodTemplateSpec, Volume,
VolumeMount,
},
api::core::v1::{EmptyDirVolumeSource, EnvVar, PodTemplateSpec, Volume, VolumeMount},
apimachinery::pkg::api::resource::Quantity,
},
kube::{CustomResource, ResourceExt},
Expand All @@ -49,7 +46,7 @@ use stackable_operator::{
role_utils::pod_overrides_schema,
schemars::{self, JsonSchema},
};
use strum::{Display, EnumIter, EnumString};
use strum::{Display, EnumIter};

#[derive(Snafu, Debug)]
pub enum Error {
Expand Down Expand Up @@ -178,10 +175,6 @@ pub struct SparkApplicationSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub image: Option<String>,
pub spark_image: ProductImage,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub spark_image_pull_policy: Option<ImagePullPolicy>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub spark_image_pull_secrets: Option<Vec<LocalObjectReference>>,
/// Name of the Vector aggregator discovery ConfigMap.
/// It must contain the key `ADDRESS` with the address of the Vector aggregator.
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -210,13 +203,6 @@ pub struct SparkApplicationSpec {
pub log_file_directory: Option<LogFileDirectorySpec>,
}

#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize, Display, EnumString)]
pub enum ImagePullPolicy {
Always,
IfNotPresent,
Never,
}

#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct JobDependencies {
Expand Down Expand Up @@ -247,14 +233,6 @@ impl SparkApplication {
self.spec.image.as_deref()
}

pub fn spark_image_pull_policy(&self) -> Option<ImagePullPolicy> {
self.spec.spark_image_pull_policy.clone()
}

pub fn spark_image_pull_secrets(&self) -> Option<Vec<LocalObjectReference>> {
self.spec.spark_image_pull_secrets.clone()
}

pub fn version(&self) -> Option<&str> {
self.spec.version.as_deref()
}
Expand Down Expand Up @@ -1072,11 +1050,9 @@ impl ExecutorConfig {

#[cfg(test)]
mod tests {
use crate::{
cores_from_quantity, resources_to_executor_props, ExecutorConfig, ImagePullPolicy,
};
use crate::DriverConfig;
use crate::{cores_from_quantity, resources_to_executor_props, ExecutorConfig};
use crate::{resources_to_driver_props, SparkApplication};
use crate::{DriverConfig, LocalObjectReference};
use crate::{Quantity, SparkStorageConfig};
use rstest::rstest;
use stackable_operator::builder::ObjectMetaBuilder;
Expand All @@ -1087,7 +1063,6 @@ mod tests {
use stackable_operator::k8s_openapi::api::core::v1::PodTemplateSpec;
use stackable_operator::product_logging::spec::Logging;
use std::collections::{BTreeMap, HashMap};
use std::str::FromStr;

#[test]
fn test_spark_examples_s3() {
Expand Down Expand Up @@ -1258,75 +1233,6 @@ spec:
assert!(spark_application.spec.image.is_none());
}

#[test]
fn test_image_actions() {
let spark_application = serde_yaml::from_str::<SparkApplication>(
r#"
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: spark-pi-local
namespace: default
spec:
version: "1.0"
sparkImage:
productVersion: 3.2.1
sparkImagePullPolicy: Always
sparkImagePullSecrets:
- name: myregistrykey
mode: cluster
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///stackable/spark/examples/jars/spark-examples.jar
sparkConf:
spark.kubernetes.node.selector.node: "2"
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
executor:
cores: 1
instances: 1
memory: "512m"
"#,
)
.unwrap();

assert_eq!(
Some(vec![LocalObjectReference {
name: Some("myregistrykey".to_string())
}]),
spark_application.spark_image_pull_secrets()
);
assert_eq!(
Some(ImagePullPolicy::Always),
spark_application.spark_image_pull_policy()
);
}

#[test]
fn test_image_pull_policy_ser() {
assert_eq!("Never", ImagePullPolicy::Never.to_string());
assert_eq!("Always", ImagePullPolicy::Always.to_string());
assert_eq!("IfNotPresent", ImagePullPolicy::IfNotPresent.to_string());
}

#[test]
fn test_image_pull_policy_de() {
assert_eq!(
ImagePullPolicy::Always,
ImagePullPolicy::from_str("Always").unwrap()
);
assert_eq!(
ImagePullPolicy::Never,
ImagePullPolicy::from_str("Never").unwrap()
);
assert_eq!(
ImagePullPolicy::IfNotPresent,
ImagePullPolicy::from_str("IfNotPresent").unwrap()
);
}

#[test]
fn test_default_resource_limits() {
let spark_application = serde_yaml::from_str::<SparkApplication>(
Expand Down
32 changes: 12 additions & 20 deletions rust/operator-binary/src/spark_k8s_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ fn init_containers(
logging: &Logging<SparkContainer>,
s3conn: &Option<S3ConnectionSpec>,
s3logdir: &Option<S3LogDir>,
spark_image: &str,
spark_image: &ResolvedProductImage,
) -> Result<Vec<Container>> {
let mut jcb = ContainerBuilder::new(&SparkContainer::Job.to_string())
.context(IllegalContainerNameSnafu)?;
Expand Down Expand Up @@ -383,14 +383,12 @@ fn init_containers(
"pip install --target={VOLUME_MOUNT_PATH_REQ} {req}"
));

rcb.image(spark_image)
rcb.image(&spark_image.image)
.command(vec!["/bin/bash".to_string(), "-c".to_string()])
.args(vec![args.join(" && ")])
.add_volume_mount(VOLUME_MOUNT_NAME_REQ, VOLUME_MOUNT_PATH_REQ)
.add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG);
if let Some(image_pull_policy) = spark_application.spark_image_pull_policy() {
rcb.image_pull_policy(image_pull_policy.to_string());
}
.add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG)
.image_pull_policy(&spark_image.image_pull_policy);

rcb.resources(
ResourceRequirementsBuilder::new()
Expand Down Expand Up @@ -418,7 +416,7 @@ fn init_containers(
format!("{STACKABLE_MOUNT_PATH_TLS}/{cert_secret}"),
);
}
tcb.image(spark_image)
tcb.image(&spark_image.image)
.command(vec!["/bin/bash".to_string(), "-c".to_string()])
.args(vec![args.join(" && ")])
.add_volume_mount(STACKABLE_TRUST_STORE_NAME, STACKABLE_TRUST_STORE)
Expand Down Expand Up @@ -453,7 +451,8 @@ fn pod_template(
let mut cb = ContainerBuilder::new(&container_name).context(IllegalContainerNameSnafu)?;
cb.add_volume_mounts(config.volume_mounts.clone())
.add_env_vars(env.to_vec())
.resources(config.resources.clone().into());
.resources(config.resources.clone().into())
.image_pull_policy(&spark_image.image_pull_policy);

if config.logging.enable_vector_agent {
cb.add_env_var(
Expand All @@ -467,10 +466,6 @@ fn pod_template(
);
}

if let Some(image_pull_policy) = spark_application.spark_image_pull_policy() {
cb.image_pull_policy(image_pull_policy.to_string());
}

let mut pb = PodBuilder::new();
pb.metadata(
ObjectMetaBuilder::new()
Expand All @@ -493,15 +488,15 @@ fn pod_template(
&config.logging,
s3conn,
s3logdir,
&spark_image.image,
spark_image,
)
.unwrap();

for init_container in init_containers {
pb.add_init_container(init_container.clone());
}

if let Some(image_pull_secrets) = spark_application.spark_image_pull_secrets() {
if let Some(image_pull_secrets) = spark_image.pull_secrets.as_ref() {
pb.image_pull_secrets(
image_pull_secrets
.iter()
Expand Down Expand Up @@ -697,11 +692,8 @@ fn spark_job(
),
)
// TODO: move this to the image
.add_env_var("SPARK_CONF_DIR", "/stackable/spark/conf");

if let Some(image_pull_policy) = spark_application.spark_image_pull_policy() {
cb.image_pull_policy(image_pull_policy.to_string());
}
.add_env_var("SPARK_CONF_DIR", "/stackable/spark/conf")
.image_pull_policy(&spark_image.image_pull_policy);

let mut volumes = vec![
VolumeBuilder::new(VOLUME_MOUNT_NAME_CONFIG)
Expand Down Expand Up @@ -754,7 +746,7 @@ fn spark_job(
restart_policy: Some("Never".to_string()),
service_account_name: serviceaccount.metadata.name.clone(),
volumes: Some(volumes),
image_pull_secrets: spark_application.spark_image_pull_secrets(),
image_pull_secrets: spark_image.pull_secrets.clone(),
security_context: Some(security_context()),
..PodSpec::default()
}),
Expand Down

0 comments on commit 672b468

Please sign in to comment.