Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8803] Fix ArrayIndexOutOfBoundsException while vectorized read with schema evolution #12560

Merged

Conversation

wangyinsheng
Copy link
Contributor

Change Logs

While vectorized reading parquet file with schema evolution, occasionally it will failed with ArrayIndexOutOfBoundsException

24/11/25 11:53:38 [Executor task launch worker for task 9] ERROR Executor: Exception in task 0.5 in stage 3.0 (TID 9)
java.lang.ArrayIndexOutOfBoundsException: -1
	at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.isNullAt(OnHeapColumnVector.java:130)
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.getUTF8String(WritableColumnVector.java:396)
	at org.apache.spark.sql.vectorized.ColumnarBatchRow.getUTF8String(ColumnarBatch.java:220)
	at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:34)
	at org.apache.hudi.RecordMergingFileIterator.hasNextInternal(Iterators.scala:200)
	at org.apache.hudi.RecordMergingFileIterator.doHasNext(Iterators.scala:192)
	at org.apache.hudi.util.CachingIterator.hasNext(CachingIterator.scala:36)
	at org.apache.hudi.util.CachingIterator.hasNext$(CachingIterator.scala:36)
	at org.apache.hudi.LogFileIterator.hasNext(Iterators.scala:60)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:355)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:878)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:878)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:355)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:319)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:129)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:478)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1480)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:481)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

The root cause is Spark3HoodieVectorizedParquetRecordReader extends
VectorizedParquetRecordReader,both of them keep a member variable named "batchIdx", but Spark3HoodieVectorizedParquetRecordReader never change the value of "batchIdx" from super, Exception occured when call super.getCurrentValue

// code in Spark3HoodieVectorizedParquetRecordReader

@Override
public Object getCurrentValue() {
  if (typeChangeInfos == null || typeChangeInfos.isEmpty()) {
    return super.getCurrentValue();
  }

  if (returnColumnarBatch) {
    return columnarBatch == null ? super.getCurrentValue() : columnarBatch;
  }

  return columnarBatch == null ? super.getCurrentValue() : columnarBatch.getRow(batchIdx - 1);
}

@Override
public boolean nextKeyValue() throws IOException {
  resultBatch();

  if (returnColumnarBatch)  {
    return nextBatch();
  }

  if (batchIdx >= numBatched) {
    if (!nextBatch()) {
      return false;
    }
  }
  ++batchIdx;
  return true;
} 
// VectorizedParquetRecordReader

@Override
public boolean nextKeyValue() throws IOException {
  resultBatch();

  if (returnColumnarBatch) return nextBatch();

  if (batchIdx >= numBatched) {
    if (!nextBatch()) return false;
  }
  ++batchIdx;
  return true;
}

@Override
public Object getCurrentValue() {
  if (returnColumnarBatch) return columnarBatch;
  return columnarBatch.getRow(batchIdx - 1);
} 

Impact

None

Risk level (write none, low medium or high below)

medium

Documentation Update

None

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Dec 31, 2024
@wangyinsheng
Copy link
Contributor Author

@hudi-bot run azure

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405 danny0405 merged commit e7d2392 into apache:master Jan 1, 2025
42 of 43 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:M PR with lines of changes in (100, 300]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants