Skip to content

Commit

Permalink
Merge pull request #20 from petro-rudenko/spark-3.0-support_2
Browse files Browse the repository at this point in the history
[CORE] Spark-3.0 mapper writer plugin implementation.
  • Loading branch information
yosefe authored Mar 19, 2020
2 parents c6fc767 + c3dfaa3 commit ab24e1d
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2020. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
*/
package org.apache.spark.shuffle.compat.spark_3_0

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO

/**
* Ucx local disk IO plugin to handle logic of writing to local disk and shuffle memory registration.
*/
case class UcxLocalDiskShuffleDataIO(sparkConf: SparkConf) extends LocalDiskShuffleDataIO(sparkConf) with Logging {

override def executor(): ShuffleExecutorComponents = {
new UcxLocalDiskShuffleExecutorComponents(sparkConf)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2020. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
*/
package org.apache.spark.shuffle.compat.spark_3_0

import java.util
import java.util.Optional

import org.apache.spark.internal.Logging
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.shuffle.sort.io.{LocalDiskShuffleExecutorComponents, LocalDiskShuffleMapOutputWriter, LocalDiskSingleSpillMapOutputWriter}
import org.apache.spark.shuffle.UcxShuffleManager
import org.apache.spark.shuffle.api.{ShuffleMapOutputWriter, SingleSpillShuffleMapOutputWriter}

/**
* Entry point to UCX executor.
*/
class UcxLocalDiskShuffleExecutorComponents(sparkConf: SparkConf)
extends LocalDiskShuffleExecutorComponents(sparkConf) with Logging{

private var blockResolver: UcxShuffleBlockResolver = _

override def initializeExecutor(appId: String, execId: String, extraConfigs: util.Map[String, String]): Unit = {
val ucxShuffleManager = SparkEnv.get.shuffleManager.asInstanceOf[UcxShuffleManager]
ucxShuffleManager.startUcxNodeIfMissing()
blockResolver = ucxShuffleManager.shuffleBlockResolver
}

override def createMapOutputWriter(shuffleId: Int, mapTaskId: Long, numPartitions: Int): ShuffleMapOutputWriter = {
if (blockResolver == null) {
throw new IllegalStateException(
"Executor components must be initialized before getting writers.")
}
new LocalDiskShuffleMapOutputWriter(
shuffleId, mapTaskId, numPartitions, blockResolver, sparkConf)
}

override def createSingleFileMapOutputWriter(shuffleId: Int, mapId: Long): Optional[SingleSpillShuffleMapOutputWriter] = {
if (blockResolver == null) {
throw new IllegalStateException(
"Executor components must be initialized before getting writers.")
}
Optional.of(new LocalDiskSingleSpillMapOutputWriter(shuffleId, mapId, blockResolver))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@
*/
package org.apache.spark.shuffle

import scala.collection.JavaConverters._

import org.apache.spark.shuffle.api.ShuffleExecutorComponents
import org.apache.spark.shuffle.compat.spark_3_0.UcxShuffleBlockResolver
import org.apache.spark.shuffle.sort.{SerializedShuffleHandle, SortShuffleWriter, UnsafeShuffleWriter}
import org.apache.spark.util.ShutdownHookManager
import org.apache.spark.{ShuffleDependency, SparkConf, TaskContext}
import org.apache.spark.{ShuffleDependency, SparkConf, SparkEnv, TaskContext}

/**
* Main entry point of Ucx shuffle plugin. It extends spark's default SortShufflePlugin
* and injects needed logic in override methods.
*/
class UcxShuffleManager(override val conf: SparkConf, isDriver: Boolean) extends CommonUcxShuffleManager(conf, isDriver) {
ShutdownHookManager.addShutdownHook(Int.MaxValue - 1)(stop)
private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf)

override val shuffleBlockResolver = new UcxShuffleBlockResolver(this)

override def registerShuffle[K, V, C](shuffleId: ShuffleId, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
Expand All @@ -26,15 +31,44 @@ class UcxShuffleManager(override val conf: SparkConf, isDriver: Boolean) extends

override def getWriter[K, V](handle: ShuffleHandle, mapId: Long, context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
logInfo(s"MapId num ${TaskContext.getPartitionId()}")
super.getWriter(handle.asInstanceOf[UcxShuffleHandle[K,V,_]].baseHandle, mapId, context, metrics)
shuffleIdToHandle.putIfAbsent(handle.shuffleId, handle.asInstanceOf[UcxShuffleHandle[K, V, _]])
val env = SparkEnv.get
handle.asInstanceOf[UcxShuffleHandle[K, V, _]].baseHandle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K@unchecked, V@unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf,
metrics,
shuffleExecutorComponents)
case other: BaseShuffleHandle[K@unchecked, V@unchecked, _] =>
new SortShuffleWriter(
shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents)
}
}

override def getReader[K, C](handle: ShuffleHandle, startPartition: MapId, endPartition: MapId,
context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {

super.getReader(handle.asInstanceOf[UcxShuffleHandle[K,_,C]].baseHandle, startPartition, endPartition,
context, metrics)
startUcxNodeIfMissing()
shuffleIdToHandle.putIfAbsent(handle.shuffleId, handle.asInstanceOf[UcxShuffleHandle[K, _, C]])
super.getReader(handle.asInstanceOf[UcxShuffleHandle[K,_,C]].baseHandle,
startPartition, endPartition, context, metrics)
}


private def loadShuffleExecutorComponents(conf: SparkConf): ShuffleExecutorComponents = {
val executorComponents = ShuffleDataIOUtils.loadShuffleDataIO(conf).executor()
val extraConfigs = conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX)
.toMap
executorComponents.initializeExecutor(
conf.getAppId,
SparkEnv.get.executorId,
extraConfigs.asJava)
executorComponents
}

}

0 comments on commit ab24e1d

Please sign in to comment.