diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index fafff5046b9dc..5395a9a38321c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -822,6 +822,7 @@ private[spark] object Config extends Logging { val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim" val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir" val KUBERNETES_VOLUMES_NFS_TYPE = "nfs" + val KUBERNETES_VOLUMES_CSI_TYPE = "csiVolumeClaim" val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path" val KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY = "mount.subPath" val KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY = "mount.subPathExpr" @@ -835,6 +836,7 @@ private[spark] object Config extends Logging { val KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY = "options.server" val KUBERNETES_VOLUMES_LABEL_KEY = "label." val KUBERNETES_VOLUMES_ANNOTATION_KEY = "annotation." + val KUBERNETES_VOLUMES_OPTIONS_CSI_DRIVER_NAME_KEY = "csiDriverName" val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv." val KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH = 253 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala index b7113a562fa06..769c194d09c51 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala @@ -39,6 +39,11 @@ private[spark] case class KubernetesNFSVolumeConf( server: String) extends KubernetesVolumeSpecificConf +private[spark] case class KubernetesCSIVolumeConf( + driverName: String, + attributes: Map[String, String]) + extends KubernetesVolumeSpecificConf + private[spark] case class KubernetesVolumeSpec( volumeName: String, mountPath: String, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala index 95821a909f351..01e61d7272206 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -132,6 +132,16 @@ object KubernetesVolumeUtils { options(pathKey), options(serverKey)) + case KUBERNETES_VOLUMES_CSI_TYPE => + val driverNameKey = + s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CSI_DRIVER_NAME_KEY" + val volumeConfPrefix = s"$volumeType.$volumeName.options." + val attributes = options.filter { case (k, v) => k.startsWith(volumeConfPrefix) } + .map { case (k, v) => (k.substring(volumeConfPrefix.length), v) } + KubernetesCSIVolumeConf( + options(driverNameKey), + attributes) + case _ => throw new IllegalArgumentException(s"Kubernetes Volume type `$volumeType` is not supported") } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index 3d89696f19fcc..925c418b74614 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -125,6 +125,15 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) case KubernetesNFSVolumeConf(path, server) => new VolumeBuilder() .withNfs(new NFSVolumeSource(path, null, server)) + + case KubernetesCSIVolumeConf(driverName, attributes) => + val csiVolumeSourceBuilder = new CSIVolumeSourceBuilder() + .withDriver(driverName) + attributes.foreach { case (k, v) => + csiVolumeSourceBuilder.addToVolumeAttributes(k, v) + } + new VolumeBuilder() + .withCsi(csiVolumeSourceBuilder.build()) } val volume = volumeBuilder.withName(spec.volumeName).build() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala index e5ed79718d733..2d986a3f8dcd9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala @@ -143,6 +143,11 @@ object KubernetesTestConf { (KUBERNETES_VOLUMES_NFS_TYPE, Map( KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path, KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY -> server)) + + case KubernetesCSIVolumeConf(driverName, attributes) => + (KUBERNETES_VOLUMES_CSI_TYPE, Map( + KUBERNETES_VOLUMES_OPTIONS_CSI_DRIVER_NAME_KEY -> driverName + ) ++ attributes) } conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_PATH_KEY), spec.mountPath) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 293773ddb9ec5..e8dd3c156a9cb 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -573,4 +573,39 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(configuredPod.pod.getSpec.getVolumes.size() === 2) assert(configuredPod.container.getVolumeMounts.size() === 2) } + + test("Mounts csiVolumeClaim") { + val volumeConf = KubernetesVolumeSpec( + "spark-local-dir-0", + "/mnt/disk1", + "", + "", + false, + KubernetesCSIVolumeConf("local.csi.xhs.com", Map( + "options.parentIndex" ->"0", + "options.size" -> "500", + "options.volumeType" -> "HostPath", + "options.mountOptions" -> "\"dir_mode=0777,actimeo=30,nosharesock\"") + ) + ) + val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) + val step = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + + assert(configuredPod.pod.getSpec.getVolumes.size() === 1) + assert(configuredPod.pod.getSpec.getVolumes.get(0).getCsi.getDriver === "local.csi.xhs.com") + assert(configuredPod.pod.getSpec.getVolumes.get(0).getCsi.getVolumeAttributes.size() === 4) + assert(configuredPod.pod.getSpec.getVolumes.get(0).getCsi + .getVolumeAttributes.get("parentIndex") === "0") + assert(configuredPod.pod.getSpec.getVolumes.get(0).getCsi + .getVolumeAttributes.get("size") === "500") + assert(configuredPod.pod.getSpec.getVolumes.get(0).getCsi + .getVolumeAttributes.get("volumeType") === "HostPath") + assert(configuredPod.pod.getSpec.getVolumes.get(0).getCsi + .getVolumeAttributes.get("mountOptions") === "\"dir_mode=0777,actimeo=30,nosharesock\"") + + assert(configuredPod.container.getVolumeMounts.size() === 1) + assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/mnt/disk1") + assert(configuredPod.container.getVolumeMounts.get(0).getName === "spark-local-dir-0") + } }