Skip to content

Commit 9116d33

Browse files
[SPARK-52548][CORE][TESTS] Add a test case for when shuffle manager is overridden by a SparkPlugin
### What changes were proposed in this pull request? As title, adding a test case for when shuffle manager is overridden by a SparkPlugin. ### Why are the changes needed? PR #43627 introduced a change to allow the shuffle manager specified in Spark configuration to be overridden in SparkPlugin, this change was not guarded by test though. The change in PR #43627 also allows Spark plugins to configure a shuffle manager automatically for user, so user won't have to configure both `spark.plugins` and `spark.shuffle.manager` at the same time. ### Does this PR introduce _any_ user-facing change? No. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51246 from zhztheplayer/wip-test-plugin-shuffle. Lead-authored-by: Hongze Zhang <[email protected]> Co-authored-by: Kent Yao <[email protected]> Signed-off-by: Kent Yao <[email protected]>
1 parent 108bbae commit 9116d33

File tree

1 file changed

+35
-0
lines changed

1 file changed

+35
-0
lines changed

core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import org.apache.spark.resource.ResourceInformation
4242
import org.apache.spark.resource.ResourceUtils.GPU
4343
import org.apache.spark.resource.TestResourceIDs.{DRIVER_GPU_ID, EXECUTOR_GPU_ID, WORKER_GPU_ID}
4444
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
45+
import org.apache.spark.shuffle.sort.SortShuffleManager
4546
import org.apache.spark.util.Utils
4647

4748
class PluginContainerSuite extends SparkFunSuite with LocalSparkContext {
@@ -292,6 +293,20 @@ class PluginContainerSuite extends SparkFunSuite with LocalSparkContext {
292293
// If the listener bus is stopped before the plugin is shutdown,
293294
// then the event will be dropped and won't be delivered to the listener.
294295
}
296+
297+
test("SPARK-52548: override shuffle manager in plugin") {
298+
val conf = new SparkConf()
299+
.setAppName(getClass().getName())
300+
.set(SparkLauncher.SPARK_MASTER, "local[1]")
301+
.set(SHUFFLE_MANAGER, "sort")
302+
.set(PLUGINS, Seq(classOf[SetShuffleManagerPlugin].getName()))
303+
304+
sc = new SparkContext(conf)
305+
306+
// Ensures the shuffle manager specified in configuration was
307+
// overridden by the Spark plugin.
308+
assert(sc.env.shuffleManager.isInstanceOf[SetShuffleManagerPlugin.MyShuffleManager])
309+
}
295310
}
296311

297312
class MemoryOverridePlugin extends SparkPlugin {
@@ -391,6 +406,26 @@ object NonLocalModeSparkPlugin {
391406
}
392407
}
393408

409+
class SetShuffleManagerPlugin extends SparkPlugin {
410+
import SetShuffleManagerPlugin._
411+
override def driverPlugin(): DriverPlugin = {
412+
new DriverPlugin {
413+
override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
414+
sc.conf.set(SHUFFLE_MANAGER, classOf[MyShuffleManager].getName)
415+
Map.empty[String, String].asJava
416+
}
417+
}
418+
}
419+
420+
override def executorPlugin(): ExecutorPlugin = {
421+
new ExecutorPlugin {}
422+
}
423+
}
424+
425+
private object SetShuffleManagerPlugin {
426+
class MyShuffleManager(conf: SparkConf) extends SortShuffleManager(conf)
427+
}
428+
394429
class TestSparkPlugin extends SparkPlugin {
395430

396431
override def driverPlugin(): DriverPlugin = {

0 commit comments

Comments
 (0)