Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] spark executor task error when reading shuffle data when using java open jdk11 #2082

Open
2 of 3 tasks
ChenRussell opened this issue Aug 22, 2024 · 22 comments
Open
2 of 3 tasks

Comments

@ChenRussell
Copy link

ChenRussell commented Aug 22, 2024

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

Describe the bug

I use openjdk 11 in spark image, and I get errors when spark task reading shuffle data from uniffle server, here is the executor task error log:
image

Affects Version(s)

0.9.0

Uniffle Server Log Output

no error message

Uniffle Engine Log Output

java.lang.reflect.InvocationTargetException
    at java.base/jdk.internal.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at org.apache.uniffle.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
    at org.apache.uniffle.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
    at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:414)
    at org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:314)
    at org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:216)
    at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:115)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
    at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:307)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithKeys_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator.isEmpty(Iterator.scala:387)
    at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
    at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
    at scala.collection.TraversableOnce.nonEmpty(TraversableOnce.scala:143)
    at scala.collection.TraversableOnce.nonEmpty$(TraversableOnce.scala:143)
    at scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1431)
    at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1559)
    at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2$adapted(RDD.scala:1558)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:910)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:910)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
    Suppressed: org.apache.spark.util.TaskCompletionListenerException: null

Previous exception in task: null
    java.base/jdk.internal.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
    java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    java.base/java.lang.reflect.Method.invoke(Unknown Source)
    org.apache.uniffle.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
    org.apache.uniffle.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
    org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:414)
    org.apache.uniffle.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:314)
    org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:216)
    org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:115)
    org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
    org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:307)
    org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hashAgg_doAggregateWithKeys_0$(Unknown Source)
    org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    scala.collection.Iterator.isEmpty(Iterator.scala:387)
    scala.collection.Iterator.isEmpty$(Iterator.scala:387)
    scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
    scala.collection.TraversableOnce.nonEmpty(TraversableOnce.scala:143)
    scala.collection.TraversableOnce.nonEmpty$(TraversableOnce.scala:143)
    scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1431)
    org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2(RDD.scala:1559)
    org.apache.spark.rdd.RDD.$anonfun$takeOrdered$2$adapted(RDD.scala:1558)
    org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:910)
    org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:910)
    org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    org.apache.spark.scheduler.Task.run(Task.scala:141)
    org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    java.base/java.lang.Thread.run(Unknown Source)
        at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:254)
        at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:144)
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:137)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:177)
        ... 9 more
        Suppressed: java.lang.reflect.InvocationTargetException
            at java.base/jdk.internal.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
            at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
            at java.base/java.lang.reflect.Method.invoke(Unknown Source)
            at org.apache.uniffle.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
            at org.apache.uniffle.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
            at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:414)
            at org.apache.uniffle.client.impl.ShuffleReadClientImpl.close(ShuffleReadClientImpl.java:335)
            at org.apache.spark.shuffle.reader.RssShuffleDataIterator.cleanup(RssShuffleDataIterator.java:217)
            at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$0(RssShuffleReader.java:284)
            at scala.Function0.apply$mcV$sp(Function0.scala:39)
            at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
            at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$1(RssShuffleReader.java:296)
            at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:144)
            at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:144)
            at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:199)
            ... 12 more
        Caused by: java.lang.IllegalArgumentException: duplicate or slice
            at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unknown Source)
            ... 27 more
Caused by: java.lang.IllegalArgumentException: duplicate or slice
    at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unknown Source)
    ... 42 more

Uniffle Server Configurations

rss.rpc.server.type GRPC_NETTY
...

Uniffle Engine Configurations

spark.rss.client.type=GRPC_NETTY
spark.rss.client.netty.io.mode=EPOLL
spark.rss.storage.type=MEMORY_LOCALFILE
...

Additional context

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!
@ChenRussell
Copy link
Author

@rickyma @jerqi Does the client side support jdk11?

@jerqi
Copy link
Contributor

jerqi commented Sep 5, 2024

@rickyma @jerqi Does the client side support jdk11?

We didn't test it. From the error, it seems that we don't use proper method to release memory for JDK11.

@ChenRussell
Copy link
Author

@rickyma @jerqi Does the client side support jdk11?

We didn't test it. From the error, it seems that we don't use proper method to release memory for JDK11.

How should I fix this error for JDK11?

@jerqi
Copy link
Contributor

jerqi commented Sep 5, 2024

@rickyma Do you have any suggestion?

@rickyma
Copy link
Contributor

rickyma commented Sep 5, 2024

I've no idea. Could you please test this case using JDK 11? @maobaolong

@maobaolong
Copy link
Member

maobaolong commented Sep 6, 2024

@rickyma We use jdk8 for all RSS cluster and client, so we did not encounter this issue for production env.

But I did a test on JDK11 just konw, this issue reproduced.

➜  spark.mbl bin/spark-shell  --master  spark://localhost:7077  --deploy-mode client  --conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager --conf spark.rss.coordinator.quorum=localhost:19999  --conf spark.rss.storage.type=LOCALFILE --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.rss.test.mode.enable=true --conf spark.rss.client.type=GRPC_NETTY --conf spark.sql.shuffle.partitions=1  -i test.scala  
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/softwares/spark.mbl/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/data/softwares/hadoop-3.3.1/share/hadoop/common/lib/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
24/09/06 11:41:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/06 11:41:46 WARN RssSparkShuffleUtils: Empty conf items
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = spark://localhost:7077, app id = app-20240906114147-0002).
Spark session available as 'spark'.
24/09/06 11:41:57 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2) (localhost executor 0): java.lang.reflect.InvocationTargetException
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.uniffle.shaded.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
        at org.apache.uniffle.shaded.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
        at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:422)
        at org.apache.uniffle.client.impl.ShuffleReadClientImpl.close(ShuffleReadClientImpl.java:335)
        at org.apache.spark.shuffle.reader.RssShuffleDataIterator.cleanup(RssShuffleDataIterator.java:218)
        at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$0(RssShuffleReader.java:293)
        at org.apache.spark.shuffle.FunctionUtils$1.apply(FunctionUtils.java:33)
        at scala.Function0.apply$mcV$sp(Function0.scala:39)
        at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36)
        at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:316)
        at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155)
        at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
        at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:186)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1548)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: duplicate or slice
        at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unsafe.java:1238)
        ... 29 more

24/09/06 11:41:58 ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8) (9.134.219.180 executor 0): java.lang.reflect.InvocationTargetException
        at java.base/jdk.internal.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.uniffle.shaded.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
        at org.apache.uniffle.shaded.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
        at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:422)
        at org.apache.uniffle.client.impl.ShuffleReadClientImpl.close(ShuffleReadClientImpl.java:335)
        at org.apache.spark.shuffle.reader.RssShuffleDataIterator.cleanup(RssShuffleDataIterator.java:218)
        at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$0(RssShuffleReader.java:293)
        at org.apache.spark.shuffle.FunctionUtils$1.apply(FunctionUtils.java:33)
        at scala.Function0.apply$mcV$sp(Function0.scala:39)
        at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36)
        at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:316)
        at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155)
        at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
        at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:186)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1548)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: duplicate or slice
        at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unsafe.java:1238)
        ... 28 more

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2687)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2623)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2622)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2622)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2880)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2822)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2811)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2349)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2370)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2389)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
  at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
  ... 65 elided
Caused by: java.lang.reflect.InvocationTargetException: java.lang.IllegalArgumentException: duplicate or slice
  at java.base/jdk.internal.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
  at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
  at org.apache.uniffle.shaded.io.netty.util.internal.CleanerJava9.freeDirectBuffer(CleanerJava9.java:88)
  at org.apache.uniffle.shaded.io.netty.util.internal.PlatformDependent.freeDirectBuffer(PlatformDependent.java:521)
  at org.apache.uniffle.common.util.RssUtils.releaseByteBuffer(RssUtils.java:422)
  at org.apache.uniffle.client.impl.ShuffleReadClientImpl.close(ShuffleReadClientImpl.java:335)
  at org.apache.spark.shuffle.reader.RssShuffleDataIterator.cleanup(RssShuffleDataIterator.java:218)
  at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.lambda$new$0(RssShuffleReader.java:293)
  at org.apache.spark.shuffle.FunctionUtils$1.apply(FunctionUtils.java:33)
  at scala.Function0.apply$mcV$sp(Function0.scala:39)
  at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
  at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36)
  at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:316)
  at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:155)
  at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
  at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:186)
  at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:136)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1548)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: duplicate or slice
  at jdk.unsupported/sun.misc.Unsafe.invokeCleaner(Unsafe.java:1238)
  ... 28 more
  • test.scala
val data = sc.parallelize(Seq(("A", 1), ("B", 2), ("C", 3), ("A", 4), ("B", 5), ("A", 6), ("A", 7),("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7), ("A", 7)));
val result = data.reduceByKey(_ + _);
result.collect().foreach(println);
System.exit(0);

@rickyma
Copy link
Contributor

rickyma commented Sep 6, 2024

Could you help to solve this? It will block people from using JDK 11. @maobaolong

@maobaolong
Copy link
Member

maobaolong commented Sep 11, 2024

@rickyma @ChenRussell JDK11 cannot works with uniffle client for now.

@ChenRussell
Copy link
Author

@rickyma @ChenRussell JDK8 cannot works with uniffle client for now.

Do you mean JDK11?

@maobaolong
Copy link
Member

Do you mean JDK11?

Sorry for the mistake, yeah, i mean JDK11, I got this conclusion from the community meeting.

@jerqi
Copy link
Contributor

jerqi commented Nov 7, 2024

@advancedxy @LuciferYang Do you have any suggestion about this issue?

@advancedxy
Copy link
Contributor

Based on the stack trace, It seems that RSS is releasing a slice or a duplicate, which might indicate a bigger problem and needs some further investigation.

@liorregev
Copy link

liorregev commented Nov 7, 2024

I've been running into the same issue.
After searching around online I found this. And while it seems like a dirty hack, that project is wide-spread enough to hopefully catch this sort of thing.
I replaced RssUtils.java:426 with the following, and it works:

  PlatformDependent.freeDirectBuffer(byteBuffer);
} catch (Throwable e) {
  Throwable cause = e.getCause();

  if (!(cause instanceof IllegalArgumentException)) {
    throw e;
  }
  // Buffer is duplicate or slice
}

Not sure how to check for memory leaks right now though.

@LuciferYang
Copy link
Contributor

@ChenRussell Have you tried --add-opens java.base/jdk.internal.ref=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true

@ChenRussell
Copy link
Author

@ChenRussell Have you tried --add-opens java.base/jdk.internal.ref=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true

Yes, I have tried, that's not working, it's not the accessible problem, it can access the reflection method

@LuciferYang
Copy link
Contributor

@ChenRussell Got it ~ Is there an existing unit test (UT) that can reproduce this issue? Java 8 uses CleanerJava6, while Java 9+ uses CleanerJava9, and there are some differences in the code implementation. It would be easier to identify the root cause if there is a UT that can reproduce it.

@ChenRussell
Copy link
Author

@ChenRussell Got it ~ Is there an existing unit test (UT) that can reproduce this issue? Java 8 uses CleanerJava6, while Java 9+ uses CleanerJava9, and there are some differences in the code implementation. It would be easier to identify the root cause if there is a UT that can reproduce it.

You can use this #2082 (comment) to reproduce this issue.

@xumanbu
Copy link
Contributor

xumanbu commented Dec 9, 2024

Based on the stack trace, It seems that RSS is releasing a slice or a duplicate, which might indicate a bigger problem and needs some further investigation.

agree. The root cause is that ByteBuffer#attachment is unexpectedly null.

@xumanbu
Copy link
Contributor

xumanbu commented Dec 9, 2024

Based on the stack trace, It seems that RSS is releasing a slice or a duplicate, which might indicate a bigger problem and needs some further investigation.

agree. The root cause is that ByteBuffer#attachment is unexpectedly null.

Can we assess if ByteBuffer#attachment is non-null before releasing memory as a temporary fix?

@dingshun3016
Copy link
Contributor

Similar problems arise here, such as ARTEMIS-2131 2378 WFLY-11026 , We can refer to it

@xumanbu
Copy link
Contributor

xumanbu commented Dec 11, 2024

It seems to be caused by this code, a result byteBuf will be a silced ByteBuf, but a silced Buffer byteBuffer = byteBuf.nioBuffer() not support release. @advancedxy @ChenRussell

@jerqi
Copy link
Contributor

jerqi commented Dec 13, 2024

Maybe we could remove

    if (readBuffer != null) {
      RssUtils.releaseByteBuffer(readBuffer);
    }

Because we have released the ByteBuf

    if (sdr != null) {
      sdr.release();
      // We set sdr to null here to prevent IllegalReferenceCountException that could occur
      // if sdr.release() is called multiple times in the close() method,
      // when an exception is thrown by clientReadHandler.readShuffleData().
      sdr = null;
    }

image

cc @rickyma @maobaolong @leixm WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants