Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] java.util.NoSuchElementException: FileID <some-uuid> of partition path tenant=xxxxx/date=YYYYMMD #12298

Open
hgudladona opened this issue Nov 19, 2024 · 9 comments
Labels
priority:critical production down; pipelines stalled; Need help asap. timeline-server writer-core Issues relating to core transactions/write actions

Comments

@hgudladona
Copy link

hgudladona commented Nov 19, 2024

Describe the problem you faced

Intermittent java.util.NoSuchElementException when writing to partitions that are out of order and not covered by the active timeline.

To Reproduce

We have a hudi job reading from kafka and writing to S3 in partitions dynamically derived from certain columns in the records in the format of tenant=xxxxx/date=YYYYMMDD. Under certain situations when the partition the new data is written into is not in the active timeline (Late arriving data), there seems to be a mismatch between the file group decided in the stage "Getting small files from partitions" and "Doing partition and writing data".

Lets say a FG id 'eef3ab7f-dc8a-40ec-856f-99010184d9f1-1' is decided as a small file in stage "Getting small files from partitions" and passed on to the "Doing partition and writing data" stage to INSERT new data and create a new base file for it, this stage fails with the following exception and fails the streamer job with exception below.

However, this operation streamer job succeeds in 2 situations

  1. Upon restart of the streamer job and the same records are retried to be INSERTed.
  2. If embedded timeline server is turned off

Expected behavior

We expect that there is no mismatch between the views of the stages "Getting small files from partitions" and "Doing partition and writing data" in cases when we are writing to a partition thats no not actively tracked in the active timeline.

Environment Description

  • Hudi version : 0.14.1

  • Spark version : 3.4.x

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : yes

Additional context

auto.offset.reset: latest
bootstrap.servers: kafka-brokers
group.id: hudi-ingest-some-group
hoodie.archive.async: true
hoodie.archive.automatic: true
hoodie.auto.adjust.lock.configs: true
hoodie.base.path: s3a://some-base-path
hoodie.clean.async: true
hoodie.cleaner.hours.retained: 36
hoodie.cleaner.parallelism: 600
hoodie.cleaner.policy: KEEP_LATEST_BY_HOURS
hoodie.cleaner.policy.failed.writes: LAZY
hoodie.clustering.async.enabled: false
hoodie.combine.before.insert: false
hoodie.copyonwrite.insert.auto.split: false
hoodie.datasource.fetch.table.enable: true
hoodie.datasource.hive_sync.database: hudi_events_v1
hoodie.datasource.hive_sync.mode: hms
hoodie.datasource.hive_sync.partition_extractor_class: org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.datasource.hive_sync.partition_fields: tenant,date
hoodie.datasource.hive_sync.table: some-table
hoodie.datasource.hive_sync.table_properties: projection.date.type=date|projection.date.format=yyyyMMdd|projection.date.range=19700101,99990101|projection.tenant.type=integer|projection.tenant.range=-1,8675309|projection.enabled=true
hoodie.datasource.meta_sync.condition.sync: true
hoodie.datasource.sync_tool.single_instance: true
hoodie.datasource.write.hive_style_partitioning: true
hoodie.datasource.write.keygenerator.class: com.some-class-prefix.KeyGenerator
hoodie.datasource.write.operation: insert
hoodie.datasource.write.partitionpath.field: tenant:SIMPLE,date:SIMPLE
hoodie.datasource.write.precombine.field: event_time_usec
hoodie.datasource.write.reconcile.schema: false
hoodie.datasource.write.recordkey.field: resource_id
hoodie.deltastreamer.kafka.source.maxEvents: 75000000
hoodie.deltastreamer.schemaprovider.registry.url: http://schema-registry.some-suffix:8085
hoodie.deltastreamer.source.kafka.enable.commit.offset: true
hoodie.deltastreamer.source.kafka.topic: some-topic
hoodie.deltastreamer.source.schema.subject: some-topic-value
hoodie.fail.on.timeline.archiving: false
hoodie.filesystem.view.incr.timeline.sync.enable: true
hoodie.filesystem.view.remote.timeout.secs: 2
hoodie.insert.shuffle.parallelism: 1600
hoodie.memory.merge.max.size: 2147483648
hoodie.metadata.enable: false
hoodie.metrics.on: true
hoodie.metrics.reporter.metricsname.prefix:
hoodie.metrics.reporter.prefix.tablename: false
hoodie.metrics.reporter.type: DATADOG
hoodie.parquet.compression.codec: zstd
hoodie.streamer.source.kafka.minPartitions: 450
hoodie.table.name: <>
hoodie.table.partition.fields: tenant,date
hoodie.table.type: MERGE_ON_READ
hoodie.write.concurrency.mode: OPTIMISTIC_CONCURRENCY_CONTROL
hoodie.write.lock.dynamodb.billing_mode: PROVISIONED
hoodie.write.lock.dynamodb.endpoint_url: https://dynamodb.us-east-2.amazonaws.com/
hoodie.write.lock.dynamodb.partition_key: some-key
hoodie.write.lock.dynamodb.region: us-east-2
hoodie.write.lock.dynamodb.table: HudiLocker
hoodie.write.lock.provider: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
hoodie.write.markers.type: DIRECT

Additional logs

2024-11-19T20:05:51,784 INFO [Executor task launch worker for task 227.3 in stage 723.0 (TID 398672)] org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor:Merging updates for commit 20241119200519832 for file eef3ab7f-dc8a-40ec-856f-99010184d9f1-1
2024-11-19T20:05:51,784 INFO [Executor task launch worker for task 227.3 in stage 723.0 (TID 398672)] org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor:Small file corrections for updates for commit 20241119200519832 for file eef3ab7f-dc8a-40ec-856f-99010184d9f1-1
2024-11-19T20:05:51,814 INFO [Executor task launch worker for task 227.3 in stage 723.0 (TID 398672)] org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView:Sending request : (http://<some-prefix>-driver-svc.hudi-ingest.svc:43915/v1/hoodie/view/datafile/latest/partition?partition=tenant%3DXXXXX%2Fdate%3D20241118&basepath=s3a%3A%2F%2Fsome-bucket%2Fhudi<some-prefix>&fileid=eef3ab7f-dc8a-40ec-856f-99010184d9f1-1&lastinstantts=20241119200150363&timelinehash=49ea23be5a76a1a27c370601443200ed6dd13f7c4a9937d84028c3cbf9e72aed)
2024-11-19T20:05:51,853 INFO [Executor task launch worker for task 227.3 in stage 723.0 (TID 398672)] org.apache.hudi.io.HoodieMergeHandleFactory:Create update handle for fileId eef3ab7f-dc8a-40ec-856f-99010184d9f1-1 and partition path tenant=11467/date=20241118 at commit 20241119200519832
2024-11-19T20:05:51,853 INFO [Executor task launch worker for task 227.3 in stage 723.0 (TID 398672)] org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView:Sending request : (http://<some-prefix>-driver-svc.hudi-ingest.svc:43915/v1/hoodie/view/datafile/latest/partition?partition=tenant%3DXXXXX%2Fdate%3D20241118&basepath=s3a%3A%2F%2Fsome-bucket%2Fhudi<some-prefix>&fileid=eef3ab7f-dc8a-40ec-856f-99010184d9f1-1&lastinstantts=20241119200150363&timelinehash=49ea23be5a76a1a27c370601443200ed6dd13f7c4a9937d84028c3cbf9e72aed)

Stacktrace

org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :227
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:342)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:348)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:259)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:905)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:905)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1552)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.NoSuchElementException: FileID eef3ab7f-dc8a-40ec-856f-99010184d9f1-1
 of partition path tenant=XXXXX/date=20241118 does not exist.
	at org.apache.hudi.io.HoodieMergeHandle.getLatestBaseFile(HoodieMergeHandle.java:161)
	at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:126)
	at org.apache.hudi.io.HoodieMergeHandleFactory.create(HoodieMergeHandleFactory.java:68)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpdateHandle(BaseSparkCommitActionExecutor.java:400)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:368)
	at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:79)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335)```

@ad1happy2go
Copy link
Collaborator

@hgudladona This should already be fixed by #9879 . Can you try with this patch ?

@hgudladona
Copy link
Author

hgudladona commented Nov 20, 2024

This patch requires us to migrate to 1.x.beta release which we are not ready to do yet, Any chance this can be back ported to 0.14.x? Also, can you kindly explain how this can remediate our situation. Are the file groups outside of the active timeline treated as uncommitted with this patch?

@hgudladona
Copy link
Author

@ad1happy2go This patch will still not solve this problem. If you follow the code path getLatestFileSlicesBeforeOrOn will filters the file slices using function getLatestFileSliceFilteringUncommittedFiles which filters using filterUncommittedFiles . Looking into this function

 private Stream<FileSlice> filterUncommittedFiles(FileSlice fileSlice, boolean includeEmptyFileSlice) {
    Option<HoodieBaseFile> committedBaseFile = fileSlice.getBaseFile().isPresent() && completionTimeQueryView.isCompleted(fileSlice.getBaseInstantTime()) ? fileSlice.getBaseFile() : Option.empty();
    List<HoodieLogFile> committedLogFiles = fileSlice.getLogFiles().filter(logFile -> completionTimeQueryView.isCompleted(logFile.getDeltaCommitTime())).collect(Collectors.toList());
    if ((fileSlice.getBaseFile().isPresent() && !committedBaseFile.isPresent())
        || committedLogFiles.size() != fileSlice.getLogFiles().count()) {
      LOG.debug("File Slice (" + fileSlice + ") has uncommitted files.");
      // A file is filtered out of the file-slice if the corresponding
      // instant has not completed yet.
      FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
      committedBaseFile.ifPresent(transformed::setBaseFile);
      committedLogFiles.forEach(transformed::addLogFile);
      if (transformed.isEmpty() && !includeEmptyFileSlice) {
        return Stream.of();
      }
      return Stream.of(transformed);
    }
    return Stream.of(fileSlice);
  }

...

  public boolean isCompleted(String instantTime) {
    return this.startToCompletionInstantTimeMap.containsKey(instantTime)
        || HoodieTimeline.compareTimestamps(instantTime, LESSER_THAN, this.firstNonSavepointCommit);
  }

If a file slice base instant time is less than firstNonSavepointCommit, although the not in active timeline its treated as completed which is pretty similar to the current behavior. Kindly, go through the scenario I mentioned one more time and suggest of this is the right patch?

@bhasudha
Copy link
Contributor

@nsivabalan could you please help with this?

@nsivabalan
Copy link
Contributor

taking a look.

@nsivabalan
Copy link
Contributor

I am not sure I follow the use case fully.

From what I gauge, this is what the situation is:

table has multi-writers enabled. diff writers could write to diff parittions. partitioning column is dynamically derived from input data.

So, writer1 tries to write to partitionX. we do not have any completed commits in active timeline containing any data for the partition of interest.
This is what you suggested.

but the stackrace shows different.

if we had routed it to MergeHandle, it means that we already had some data written to parittionX and hence the file group was chosen as small file. and eventually we ended up w/ HoodieMergeHandle.

But let me poke around one theory though.
lets say there was a concurrent writer, writer2 was writing to partition2 and is inflight and added the file group of interest.
writer1 when going thru upsert partitioner was able to get hold of the same file group which the other writer is writing to and routes it to HoodieMergeHandle.
but in b/w, other writer (writer2) fails and rolls backs the commit which deleted the file group of interest. and hence within HoodieMergeHandle, we run into Filegroup not found issue.

but the possibility of this happening is very unlikely, since the commit from writer2 is not committed at all. I have to go through the code to see if we have any edge cases around this.

@hgudladona
Copy link
Author

Let me clarify.

This is not a multi writer, we only have 1 writer jobs with cleans running async.

The scenario can be described like this:
Active Timeline Before: C1,C2,C3 ; Commit C4 some N records are partition to write is tenant=12345/date=20241120 -- Active Timeline After: C1,C2,C3,C4
Active Timeline Before: C1,C2,C3,C4; Commit C5 some N records are partition to write is tenant=12345/date=20241121 -- Active Timeline After: C1,C2,C3,C4,C5
-- Archive moves commit C1,C2,C3,C4 to archived dir --
Active Timeline Before: C5 ; Commit C6 some N records are partition to write is tenant=12345/date=20241120 -- Intermittently this commit fails with the exception during write phase. Partition tenant=12345/date=20241120 is no longer tracked in active timeline, But when new writes target this partition it identifies a small file written during commit C4 but this file id cannot be found in the write phase, although it exists on the file system. This intermittently fails when timeline sever is ON and consistently succeeds when its OFF

Hope this helps.

@ad1happy2go ad1happy2go added priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions timeline-server labels Nov 27, 2024
@github-project-automation github-project-automation bot moved this to ⏳ Awaiting Triage in Hudi Issue Support Nov 27, 2024
@ad1happy2go
Copy link
Collaborator

@hgudladona I did take a pass trying to reproduce the issue by manually applying this operation but didn't able to reproduce.

Do you have any code or reproducible script, or any other artifacts that can help me to reproduce this issue.

@hgudladona
Copy link
Author

Hello @ad1happy2go , Could you kindly describe the test setup? How large was the timeline? What was the partitions structure? what was the value for hoodie.filesystem.view.remote.timeout.secs that determines if the secondary view was used? It turns out for us, increasing this value gradually reduced the occurrence of the problem.

Unfortunately I don't have any reproducible script, But like we described above, I know the exact scenario this happens.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap. timeline-server writer-core Issues relating to core transactions/write actions
Projects
Status: Awaiting Triage
Development

No branches or pull requests

4 participants