Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ private[spark] object Config extends Logging {
val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim"
val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir"
val KUBERNETES_VOLUMES_NFS_TYPE = "nfs"
val KUBERNETES_VOLUMES_CSI_TYPE = "csiVolumeClaim"
val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
val KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY = "mount.subPath"
val KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY = "mount.subPathExpr"
Expand All @@ -835,6 +836,7 @@ private[spark] object Config extends Logging {
val KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY = "options.server"
val KUBERNETES_VOLUMES_LABEL_KEY = "label."
val KUBERNETES_VOLUMES_ANNOTATION_KEY = "annotation."
val KUBERNETES_VOLUMES_OPTIONS_CSI_DRIVER_NAME_KEY = "csiDriverName"
val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."

val KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH = 253
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ private[spark] case class KubernetesNFSVolumeConf(
server: String)
extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesCSIVolumeConf(
driverName: String,
attributes: Map[String, String])
extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesVolumeSpec(
volumeName: String,
mountPath: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ object KubernetesVolumeUtils {
options(pathKey),
options(serverKey))

case KUBERNETES_VOLUMES_CSI_TYPE =>
val driverNameKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CSI_DRIVER_NAME_KEY"
val volumeConfPrefix = s"$volumeType.$volumeName.options."
val attributes = options.filter { case (k, v) => k.startsWith(volumeConfPrefix) }
.map { case (k, v) => (k.substring(volumeConfPrefix.length), v) }
KubernetesCSIVolumeConf(
options(driverNameKey),
attributes)

case _ =>
throw new IllegalArgumentException(s"Kubernetes Volume type `$volumeType` is not supported")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
case KubernetesNFSVolumeConf(path, server) =>
new VolumeBuilder()
.withNfs(new NFSVolumeSource(path, null, server))

case KubernetesCSIVolumeConf(driverName, attributes) =>
val csiVolumeSourceBuilder = new CSIVolumeSourceBuilder()
.withDriver(driverName)
attributes.foreach { case (k, v) =>
csiVolumeSourceBuilder.addToVolumeAttributes(k, v)
}
new VolumeBuilder()
.withCsi(csiVolumeSourceBuilder.build())
}

val volume = volumeBuilder.withName(spec.volumeName).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ object KubernetesTestConf {
(KUBERNETES_VOLUMES_NFS_TYPE, Map(
KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path,
KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY -> server))

case KubernetesCSIVolumeConf(driverName, attributes) =>
(KUBERNETES_VOLUMES_CSI_TYPE, Map(
KUBERNETES_VOLUMES_OPTIONS_CSI_DRIVER_NAME_KEY -> driverName
) ++ attributes)
}

conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_PATH_KEY), spec.mountPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,4 +573,39 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
assert(configuredPod.container.getVolumeMounts.size() === 2)
}

test("Mounts csiVolumeClaim") {
val volumeConf = KubernetesVolumeSpec(
"spark-local-dir-0",
"/mnt/disk1",
"",
"",
false,
KubernetesCSIVolumeConf("local.csi.xhs.com", Map(
"options.parentIndex" ->"0",
"options.size" -> "500",
"options.volumeType" -> "HostPath",
"options.mountOptions" -> "\"dir_mode=0777,actimeo=30,nosharesock\"")
)
)
val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
assert(configuredPod.pod.getSpec.getVolumes.get(0).getCsi.getDriver === "local.csi.xhs.com")
assert(configuredPod.pod.getSpec.getVolumes.get(0).getCsi.getVolumeAttributes.size() === 4)
assert(configuredPod.pod.getSpec.getVolumes.get(0).getCsi
.getVolumeAttributes.get("parentIndex") === "0")
assert(configuredPod.pod.getSpec.getVolumes.get(0).getCsi
.getVolumeAttributes.get("size") === "500")
assert(configuredPod.pod.getSpec.getVolumes.get(0).getCsi
.getVolumeAttributes.get("volumeType") === "HostPath")
assert(configuredPod.pod.getSpec.getVolumes.get(0).getCsi
.getVolumeAttributes.get("mountOptions") === "\"dir_mode=0777,actimeo=30,nosharesock\"")

assert(configuredPod.container.getVolumeMounts.size() === 1)
assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/mnt/disk1")
assert(configuredPod.container.getVolumeMounts.get(0).getName === "spark-local-dir-0")
}
}