Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] FileNotFoundException while querying HUDI table via native Spark SQL with HMS as catalog #12477

Open
ahujaanmol1288 opened this issue Dec 12, 2024 · 7 comments

Comments

@ahujaanmol1288
Copy link

Describe the problem you faced

While reading hudi table via spark sql the job fails with a java.io.FileNotFoundException. This error occurs when the underlying hudi table is updated while the read operation is underway i.e. (spark sql read operation started -> write operation finished -> error in completing spark sql read operation). Indicating that the write operation updated the underlying data files and deleted the earlier S3 file specified by the Hoodie File Index.

Spark Config Used :
"spark.serializer": "org.apache.spark.serializer.KryoSerializer", "spark.jars.packages": "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1", "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension", "hive.metastore.uris": "<metastore URI>"

spark commnads:
spark.sql("SELECT * FROM <hive_schema>.<hudi_table_name>")

To Reproduce

Steps to reproduce the behavior:

  1. Configure the Hudi environment with the following settings:
    Hudi version: 0.13.1
    Spark version: 3.3
    Hive version: 2.4
    Storage: S3

  2. Use the following Hudi write options:
    'hoodie.table.name': 'hudi_trips_cow1', 'hoodie.datasource.write.recordkey.field': 'uuid', 'hoodie.datasource.write.partitionpath.field': 'partitionpath', 'hoodie.datasource.write.table.name': 'hudi_trips_cow1', 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'hudi_trips_cow1'

  3. Implement concurrent read and write operations using concurrent.futures.ThreadPoolExecutor.
    Read operation: Perform a SQL query on the Hudi table and checkpoint the result.
    Write operation: Generate and insert new records using the Hudi DataGenerator and write them to the same Hudi table.

  4. Execute the code and observe the failure during the checkpointing step of the read operation.

Please refer this script to reproduce the issue:
read_while_update.txt

Expected behavior

The concurrent read and write operations should execute without any errors. The read operation should successfully checkpoint the results, and the write operation should upsert data to the Hudi table.

Environment Description

  • Hudi version : 0.13.1

  • Spark version : 3.3

  • Hive version : 2.4

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : Tested on AWS EMR (6.11.1)

Additional context

The issue might be related to file consistency in S3 during concurrent operations or checkpointing with a Hudi table on S3.

Stacktrace

An error was encountered:
An error occurred while calling o126.checkpoint.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 9) (ip-172-31-113-136.ap-southeast-1.compute.internal executor 1): java.io.FileNotFoundException: File s3://test-bucket/test_path/hudi-bug1/americas does not exist.
	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:706)
	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:633)
	at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.listStatus(EmrFileSystem.java:433)
	at org.apache.hudi.metadata.FileSystemBackedTableMetadata.lambda$getPartitionPathWithPathPrefix$f0540b37$1(FileSystemBackedTableMetadata.java:111)
	at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:137)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2279)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:138)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2239)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2260)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2304)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
	at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
	at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
	at org.apache.hudi.client.common.HoodieSparkEngineContext.flatMap(HoodieSparkEngineContext.java:137)
	at org.apache.hudi.metadata.FileSystemBackedTableMetadata.getPartitionPathWithPathPrefix(FileSystemBackedTableMetadata.java:109)
	at org.apache.hudi.metadata.FileSystemBackedTableMetadata.lambda$getPartitionPathWithPathPrefixes$0(FileSystemBackedTableMetadata.java:91)
	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
	at org.apache.hudi.metadata.FileSystemBackedTableMetadata.getPartitionPathWithPathPrefixes(FileSystemBackedTableMetadata.java:95)
	at org.apache.hudi.BaseHoodieTableFileIndex.listPartitionPaths(BaseHoodieTableFileIndex.java:281)
	at org.apache.hudi.BaseHoodieTableFileIndex.getAllQueryPartitionPaths(BaseHoodieTableFileIndex.java:206)
	at org.apache.hudi.SparkHoodieTableFileIndex.listMatchingPartitionPaths(SparkHoodieTableFileIndex.scala:205)
	at org.apache.hudi.HoodieFileIndex.listFiles(HoodieFileIndex.scala:146)
	at org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(HoodiePruneFileSourcePartitions.scala:54)
	at org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(HoodiePruneFileSourcePartitions.scala:42)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:626)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:179)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:626)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:602)
	at org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions.apply(HoodiePruneFileSourcePartitions.scala:42)
	at org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions.apply(HoodiePruneFileSourcePartitions.scala:40)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:215)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:212)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$6(RuleExecutor.scala:284)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$RuleExecutionContext$.withContext(RuleExecutor.scala:327)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5(RuleExecutor.scala:284)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5$adapted(RuleExecutor.scala:274)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:274)
	at org.apache.spark.sql.catalyst.optimizer.BaseOptimizer.super$execute(BaseOptimizer.scala:28)
	at org.apache.spark.sql.catalyst.optimizer.BaseOptimizer.$anonfun$execute$1(BaseOptimizer.scala:28)
	at org.apache.spark.sql.catalyst.optimizer.OptimizationContext$.withOptimizationContext(OptimizationContext.scala:80)
	at org.apache.spark.sql.catalyst.optimizer.BaseOptimizer.execute(BaseOptimizer.scala:28)
	at org.apache.spark.sql.catalyst.optimizer.BaseOptimizer.execute(BaseOptimizer.scala:23)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:188)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:214)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:554)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:214)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:213)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:131)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$4(QueryExecution.scala:297)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:678)
	at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:297)
	at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:314)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:268)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:247)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3920)
	at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:681)
	at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:643)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.FileNotFoundException: File s3://grofers-test-dse-singapore/shubham/hudi-bug1/americas does not exist.
	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:706)
	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:633)
	at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.listStatus(EmrFileSystem.java:433)
	at org.apache.hudi.metadata.FileSystemBackedTableMetadata.lambda$getPartitionPathWithPathPrefix$f0540b37$1(FileSystemBackedTableMetadata.java:111)
	at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:137)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2279)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:138)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 440, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 52, in run
    result = self.fn(*self.args, **self.kwargs)
  File "<stdin>", line 9, in read_from_hudi
  File "/mnt/yarn/usercache/livy/appcache/application_1733995056318_0012/container_1733995056318_0012_01_000001/pyspark.zip/pyspark/sql/dataframe.py", line 683, in checkpoint
    jdf = self._jdf.checkpoint(eager)
  File "/mnt/yarn/usercache/livy/appcache/application_1733995056318_0012/container_1733995056318_0012_01_000001/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/mnt/yarn/usercache/livy/appcache/application_1733995056318_0012/container_1733995056318_0012_01_000001/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/mnt/yarn/usercache/livy/appcache/application_1733995056318_0012/container_1733995056318_0012_01_000001/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o126.checkpoint.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 9) (ip-172-31-113-136.ap-southeast-1.compute.internal executor 1): java.io.FileNotFoundException: File s3://test-bucket/test_path/hudi-bug1/americas does not exist.
	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:706)
	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:633)
	at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.listStatus(EmrFileSystem.java:433)
	at org.apache.hudi.metadata.FileSystemBackedTableMetadata.lambda$getPartitionPathWithPathPrefix$f0540b37$1(FileSystemBackedTableMetadata.java:111)
	at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:137)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2279)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:138)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2239)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2260)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2304)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
	at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
	at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
	at org.apache.hudi.client.common.HoodieSparkEngineContext.flatMap(HoodieSparkEngineContext.java:137)
	at org.apache.hudi.metadata.FileSystemBackedTableMetadata.getPartitionPathWithPathPrefix(FileSystemBackedTableMetadata.java:109)
	at org.apache.hudi.metadata.FileSystemBackedTableMetadata.lambda$getPartitionPathWithPathPrefixes$0(FileSystemBackedTableMetadata.java:91)
	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:269)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
	at org.apache.hudi.metadata.FileSystemBackedTableMetadata.getPartitionPathWithPathPrefixes(FileSystemBackedTableMetadata.java:95)
	at org.apache.hudi.BaseHoodieTableFileIndex.listPartitionPaths(BaseHoodieTableFileIndex.java:281)
	at org.apache.hudi.BaseHoodieTableFileIndex.getAllQueryPartitionPaths(BaseHoodieTableFileIndex.java:206)
	at org.apache.hudi.SparkHoodieTableFileIndex.listMatchingPartitionPaths(SparkHoodieTableFileIndex.scala:205)
	at org.apache.hudi.HoodieFileIndex.listFiles(HoodieFileIndex.scala:146)
	at org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(HoodiePruneFileSourcePartitions.scala:54)
	at org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(HoodiePruneFileSourcePartitions.scala:42)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:626)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:179)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:626)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:602)
	at org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions.apply(HoodiePruneFileSourcePartitions.scala:42)
	at org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions.apply(HoodiePruneFileSourcePartitions.scala:40)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:215)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:212)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$6(RuleExecutor.scala:284)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$RuleExecutionContext$.withContext(RuleExecutor.scala:327)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5(RuleExecutor.scala:284)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5$adapted(RuleExecutor.scala:274)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:274)
	at org.apache.spark.sql.catalyst.optimizer.BaseOptimizer.super$execute(BaseOptimizer.scala:28)
	at org.apache.spark.sql.catalyst.optimizer.BaseOptimizer.$anonfun$execute$1(BaseOptimizer.scala:28)
	at org.apache.spark.sql.catalyst.optimizer.OptimizationContext$.withOptimizationContext(OptimizationContext.scala:80)
	at org.apache.spark.sql.catalyst.optimizer.BaseOptimizer.execute(BaseOptimizer.scala:28)
	at org.apache.spark.sql.catalyst.optimizer.BaseOptimizer.execute(BaseOptimizer.scala:23)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:188)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:135)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:214)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:554)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:214)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:213)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:131)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$4(QueryExecution.scala:297)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:678)
	at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:297)
	at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:314)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:268)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:247)
	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3920)
	at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:681)
	at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:643)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.FileNotFoundException: File s3://test-bucket/test_path/hudi-bug1/americas does not exist.
	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:706)
	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.listStatus(S3NativeFileSystem.java:633)
	at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.listStatus(EmrFileSystem.java:433)
	at org.apache.hudi.metadata.FileSystemBackedTableMetadata.lambda$getPartitionPathWithPathPrefix$f0540b37$1(FileSystemBackedTableMetadata.java:111)
	at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:137)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at scala.collection.AbstractIterator.to(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2279)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:138)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

@ad1happy2go
Copy link
Collaborator

@ahujaanmol1288 Are you using the concurrency control if you are using multiple writers?

@ad1happy2go
Copy link
Collaborator

I confirmed after checking code, you are not using those. Please refer https://hudi.apache.org/docs/concurrency_control/

@Wrekkers
Copy link

Wrekkers commented Dec 13, 2024

@ad1happy2go Hey Aditya, the issue is not related to multiple writers, it is actually the read that fails.
Scenario is a read SQL is setup and while the read operation is underway an independent write operation to the same table is done which causes a failure on the initial read operation initiated through Spark SQL.

My assumption is that in Spark SQL we are unable to set hoodie.file.index.enable as false and thus the error of FileNotFoundException ocuurs.

@codope
Copy link
Member

codope commented Dec 17, 2024

My assumption is that in Spark SQL we are unable to set hoodie.file.index.enable as false and thus the error of FileNotFoundException ocuurs.

You could set it as a spark session config.

Scenario is a read SQL is setup and while the read operation is underway an independent write operation to the same table is done which causes a failure on the initial read operation initiated through Spark SQL.

Generally speaking, Hudi guarantees snapshot isolation between writers and readers through its timeline and multi-version concurrency control. And Hudi does not delete the last version of any data file unless the cleaner is configured that way (your configs suggest no change to the default cleaner configs). I would like to understand more about your use case and also how the file is getting deleted? Are you using OSS Hudi or EMR Hudi? If it's the latter, did you also try with the 0.15.0 version of OSS Hudi? Could you zip the .hoodie folder under the base path of the erroneous table and share it with us?

We have many production use cases with concurrent read and write scenario, and data freshness latency of just a few minutes. For example - https://aws.amazon.com/blogs/big-data/how-nerdwallet-uses-aws-and-apache-hudi-to-build-a-serverless-real-time-analytics-platform/

If it's just single writer and multiple readers, Hudi employs MVCC by default. I will need to review the script shared above to understand further what's going on.

@ad1happy2go
Copy link
Collaborator

@ahujaanmol1288 @Wrekkers when i checked the code we are writing in multiple threads parallely without any concurrency control so its going to corrupt the table. So read may have issues.

@ad1happy2go
Copy link
Collaborator

@ahujaanmol1288 As @codope mentioned, Can you please share the hoodie timeline to confirm the issue.

@ahujaanmol1288
Copy link
Author

ahujaanmol1288 commented Dec 18, 2024

@ad1happy2go Pls find the hoodie timeline attached, also we are using OSS Hudi
hoodie.zip

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Awaiting Triage
Development

No branches or pull requests

4 participants