Skip to content

feat(blob): Create and Read Blobs in Spark SQL#18098

Open
the-other-tim-brown wants to merge 23 commits intoapache:masterfrom
the-other-tim-brown:blob-reader
Open

feat(blob): Create and Read Blobs in Spark SQL#18098
the-other-tim-brown wants to merge 23 commits intoapache:masterfrom
the-other-tim-brown:blob-reader

Conversation

@the-other-tim-brown
Copy link
Contributor

@the-other-tim-brown the-other-tim-brown commented Feb 5, 2026

Describe the issue this Pull Request addresses

Closes #18110

This PR adds support for establishing Blob columns with Spark SQL and then reading those values back as bytes with a read_blobs scalar function in Spark SQL.

The underlying reader will batch reads where possible to improve performance when reading blobs from ranges within the same file.

Summary and Changelog

  1. SparkAdapter is updated to inject scalar functions (read_blob) and then inject planner strategies for translating the blob reading from a logical plan node into a physical plan that actually reads back the data
  2. BatchedBlobReader performs the actual lookups for the blob content. It handles three cases, inline data, reading a subsection of a file, or reading the full file.
  3. The updates to the spark planning are handled by ReadBlobRule which takes the ReadBlobExpression generated when parsing the query and translates them to BatchedBlobRead nodes. The node is handled by the BatchedBlobReaderStrategy that converts to the physical node of BatchedBlobReadExec

Impact

Provides the utilities for users to retrieve their out-of-line blob data.

Risk Level

Low, this is net new functionalityi

Documentation Update

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@the-other-tim-brown the-other-tim-brown changed the title [draft] Blob reader feat(blob): Create and Read Blobs in Spark SQL Feb 20, 2026
@the-other-tim-brown the-other-tim-brown marked this pull request as ready for review February 20, 2026 19:50
@rahil-c
Copy link
Collaborator

rahil-c commented Mar 17, 2026

@the-other-tim-brown just to confirm do you just need someone to review or are there other items for this pr? If so I can take those up.

We are aiming to target a hudi release toward end of march/early april so let me know how I can assist here in terms of taking any other items for this?

@rahil-c rahil-c requested review from rahil-c, voonhous and yihua March 17, 2026 19:59
@the-other-tim-brown
Copy link
Contributor Author

@rahil-c this is just waiting on review.


override def next(): R = {
if (!hasNext) {
throw new NoSuchElementException("No more rows")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add more detail to this exception, such as the path we were reading from?

val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 0)

// Case 1: Inline — return bytes directly without I/O
val inlineResults = inlineRows.map { ri =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a no op for now right since we are only targeting out of line blob for milestone 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but I think it's easy enough to add reader support in this task so might as well.

*/
private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = {
// Group by file path
val byFile = rows.groupBy(_.filePath)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the use for where multiple recordKeys refer to the same out of line blob and we want to just not repeat downloading the same file again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, if there are consecutive or almost consecutive ranges, it makes sense to just read the file once

normalized.contains("show indexes") ||
normalized.contains("refresh index")
normalized.contains("refresh index") ||
normalized.contains(" blob")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this match on something such as SELECT * FROM blob_table? If so then i think we should maybe match it on the read_blob function, or is there a more precise way to match here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but this is also not related to read_blob. This is just for triggering our custom parsing for adding blob types when creating a table

}

@Test
def testMultipleReadBlobInSameQuery(): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-other-tim-brown Wanted to share something with you when I tried modifying this test slightly.

Used the following test which basically alters the offset2 for each row (to make this more distinguishable)

def testMultipleReadBlobInSameQuery(): Unit = {
    val filePath1 = createTestFile(tempDir, "multi1.bin", 10000)
    val filePath2 = createTestFile(tempDir, "multi2.bin", 10000)

    // Use different offsets for file1 vs file2 so byte content is distinguishable.
    // createTestFile writes bytes as (i % 256), so:
    //   file1 offset=0   -> bytes (0, 1, 2, ...)
    //   file2 offset=500 -> bytes (244, 245, 246, ...)
    val df = sparkSession.createDataFrame(Seq(
      (1, filePath1, 0L, 50L, filePath2, 500L, 50L),
      (2, filePath1, 100L, 50L, filePath2, 600L, 50L)
    )).toDF("id", "external_path1", "offset1", "length1", "external_path2", "offset2", "length2")
      .withColumn("file_info1",
        blobStructCol("file_info1", col("external_path1"), col("offset1"), col("length1")))
      .withColumn("file_info2",
        blobStructCol("file_info2", col("external_path2"), col("offset2"), col("length2")))
      .select("id", "file_info1", "file_info2")

    df.createOrReplaceTempView("multi_table")

    // SQL with multiple read_blob calls on DIFFERENT blob columns
    val result = sparkSession.sql("""
      SELECT
        id,
        read_blob(file_info1) as data1,
        read_blob(file_info2) as data2
      FROM multi_table
      ORDER BY id
    """)

    val rows = result.collect()
    assertEquals(2, rows.length)

    // Row 1: data1 = file1 at offset 0, data2 = file2 at offset 500
    val data1_row1 = rows(0).getAs[Array[Byte]]("data1")
    val data2_row1 = rows(0).getAs[Array[Byte]]("data2")
    assertEquals(50, data1_row1.length)
    assertEquals(50, data2_row1.length)
    assertBytesContent(data1_row1, expectedOffset = 0)
    // This assertion will fail if ReadBlobRule only resolves the first blob column,
    // because data2 will contain file1's bytes (offset 0) instead of file2's bytes (offset 500)
    assertBytesContent(data2_row1, expectedOffset = 500)

    // Row 2: data1 = file1 at offset 100, data2 = file2 at offset 600
    val data1_row2 = rows(1).getAs[Array[Byte]]("data1")
    val data2_row2 = rows(1).getAs[Array[Byte]]("data2")
    assertBytesContent(data1_row2, expectedOffset = 100)
    assertBytesContent(data2_row2, expectedOffset = 600)
  }

I think the issue i am seeing it that data2 will contain file_info1's bytes (offset 0) instead of file_info2's bytes (offset 500). Let me know if this is correctness issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, updating the PR with the fix

var inputStream: SeekableDataInputStream = null
try {
val path = new StoragePath(rowInfo.filePath)
val fileLength = storage.getPathInfo(path).getLength.toInt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

claude: Integer overflow — Long.toInt casts in readWholeFile and readAndSplitRange will silently wrap for files > 2GB, causing NegativeArraySizeException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any suggestions for how to fix it? the reader requires int

@rahil-c
Copy link
Collaborator

rahil-c commented Mar 18, 2026

@the-other-tim-brown A more general comment was having maybe smaller PRs like maybe around 1k lines for this work, would it make easier on the implementer getting in changes and reviewer.

Would it be possible to break this work only into two chunks, maybe just ensuring we first land anything needed for reading out of line blobs(milestone 1), and then have another PR for the remaining functionalities?

cc @vinothchandar @yihua @voonhous

val blobStruct = contentField.dataType.asInstanceOf[StructType]
assertEquals(BlobType(), blobStruct)
}
}
Copy link
Collaborator

@rahil-c rahil-c Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-other-tim-brown I think i was also seeing issues when i added tests like this where i would use the blob_read in a WHERE clause

  @Test
  def testReadBlobInWhereClauseShouldFail(): Unit = {
    val filePath = createTestFile(tempDir, "where.bin", 10000)

    val df = sparkSession.createDataFrame(Seq(
      (1, filePath, 0L, 100L),
      (2, filePath, 100L, 100L),
      (3, filePath, 200L, 100L)
    )).toDF("id", "external_path", "offset", "length")
      .withColumn("file_info",
        blobStructCol("file_info", col("external_path"), col("offset"), col("length")))
      .select("id", "file_info")

    df.createOrReplaceTempView("where_table")

    // read_blob() in WHERE clause: ReadBlobRule only matches Project nodes,
    // so the ReadBlobExpression in the Filter node is never resolved.
    // Since ReadBlobExpression is Unevaluable, this should either:
    //   (a) fail with a clear error during analysis, or
    //   (b) work correctly
    // Currently it crashes at code generation with an opaque error.
    val ex = assertThrows(classOf[Exception], () => {
      sparkSession.sql("""
        SELECT id
        FROM where_table
        WHERE length(read_blob(file_info)) > 50
      """).collect()
    })

    // Demonstrates the bug: error message is unhelpful.
    // Should either be supported or produce a clear analysis error like:
    // "read_blob() can only be used in SELECT projections, not in WHERE/HAVING/ORDER BY"
    assertTrue(
      ex.getMessage.contains("Unevaluable") || ex.getMessage.contains("cannot be evaluated"),
      s"Expected Unevaluable error but got: ${ex.getMessage}"
    )
  }

Was hitting the

Cannot generate code for expression: read_blob(input[1, struct<type:string,data:binary,reference:struct<external_path:string,offset:bigint,length:bigint,managed:boolean>>, false]) ==> 

Would this also happen on a HAVING query?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll see if this is easy to support. If not, we can move it to a follow up. We'll likely want to optimize this to just use the length in the struct itself instead of reading the bytes from the files

@the-other-tim-brown
Copy link
Contributor Author

@the-other-tim-brown A more general comment was having maybe smaller PRs like maybe around 1k lines for this work, would it make easier on the implementer getting in changes and reviewer.

Would it be possible to break this work only into two chunks, maybe just ensuring we first land anything needed for reading out of line blobs(milestone 1), and then have another PR for the remaining functionalities?

cc @vinothchandar @yihua @voonhous

@rahil-c I'll pull the SQL support for creating a blob type and the test utils into an initial PR which should be quicker to review

@the-other-tim-brown
Copy link
Contributor Author

I've created this PR to pull out some of the changes #18347

@codecov-commenter
Copy link

codecov-commenter commented Mar 19, 2026

Codecov Report

❌ Patch coverage is 82.46445% with 74 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.79%. Comparing base (ec04479) to head (0861049).
⚠️ Report is 1055 commits behind head on master.

Files with missing lines Patch % Lines
...apache/spark/sql/hudi/blob/BatchedBlobReader.scala 82.47% 17 Missing and 24 partials ⚠️
.../org/apache/spark/sql/hudi/blob/ReadBlobRule.scala 61.53% 4 Missing and 11 partials ⚠️
...pache/spark/sql/hudi/blob/ReadBlobExpression.scala 66.66% 1 Missing and 1 partial ⚠️
...l/parser/HoodieSpark3_3ExtendedSqlAstBuilder.scala 85.71% 0 Missing and 2 partials ⚠️
...l/parser/HoodieSpark3_4ExtendedSqlAstBuilder.scala 85.71% 0 Missing and 2 partials ⚠️
...l/parser/HoodieSpark3_5ExtendedSqlAstBuilder.scala 85.71% 0 Missing and 2 partials ⚠️
...l/parser/HoodieSpark4_0ExtendedSqlAstBuilder.scala 85.71% 0 Missing and 2 partials ⚠️
...g/apache/spark/sql/hudi/blob/BatchedBlobRead.scala 88.88% 0 Missing and 1 partial ⚠️
...ache/spark/sql/hudi/blob/BatchedBlobReadExec.scala 93.75% 0 Missing and 1 partial ⚠️
...park/sql/hudi/blob/BatchedBlobReaderStrategy.scala 95.00% 1 Missing ⚠️
... and 5 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18098      +/-   ##
============================================
+ Coverage     61.43%   68.79%   +7.36%     
- Complexity    23082    27752    +4670     
============================================
  Files          2108     2435     +327     
  Lines        127636   133055    +5419     
  Branches      14534    16050    +1516     
============================================
+ Hits          78409    91533   +13124     
+ Misses        42873    34203    -8670     
- Partials       6354     7319     +965     
Flag Coverage Δ
common-and-other-modules 44.32% <6.84%> (?)
hadoop-mr-java-client 45.06% <0.00%> (?)
spark-client-hadoop-common 48.21% <66.66%> (?)
spark-java-tests 49.32% <71.09%> (?)
spark-scala-tests 45.20% <20.37%> (?)
utilities 38.52% <6.02%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...scala/org/apache/spark/sql/hudi/SparkAdapter.scala 54.54% <ø> (ø)
...va/org/apache/hudi/common/schema/HoodieSchema.java 81.39% <100.00%> (ø)
...quet/avro/AvroSchemaConverterWithTimestampNTZ.java 75.57% <ø> (ø)
...in/scala/org/apache/spark/sql/types/BlobType.scala 100.00% <100.00%> (ø)
...e/spark/sql/hudi/HoodieSparkSessionExtension.scala 91.66% <100.00%> (ø)
...pache/spark/sql/hudi/analysis/HoodieAnalysis.scala 74.80% <100.00%> (ø)
...org/apache/spark/sql/adapter/Spark3_3Adapter.scala 62.71% <100.00%> (ø)
...org/apache/spark/sql/adapter/Spark3_4Adapter.scala 61.01% <100.00%> (ø)
...org/apache/spark/sql/adapter/Spark3_5Adapter.scala 61.66% <100.00%> (ø)
...org/apache/spark/sql/adapter/Spark4_0Adapter.scala 54.90% <100.00%> (ø)
... and 15 more

... and 4474 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@apache apache deleted a comment from hudi-bot Mar 20, 2026
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XL PR with lines of changes > 1000

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Milestone 1] Add deserialize/fetch blobs functionality to Spark readers

5 participants