feat(blob): Create and Read Blobs in Spark SQL#18098
feat(blob): Create and Read Blobs in Spark SQL#18098the-other-tim-brown wants to merge 23 commits intoapache:masterfrom
Conversation
ff5ef8d to
4531228
Compare
…ke option to run on internal rows when fetching
7527b08 to
e919b89
Compare
|
@the-other-tim-brown just to confirm do you just need someone to review or are there other items for this pr? If so I can take those up. We are aiming to target a hudi release toward end of march/early april so let me know how I can assist here in terms of taking any other items for this? |
|
@rahil-c this is just waiting on review. |
|
|
||
| override def next(): R = { | ||
| if (!hasNext) { | ||
| throw new NoSuchElementException("No more rows") |
There was a problem hiding this comment.
Should we add more detail to this exception, such as the path we were reading from?
| val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 0) | ||
|
|
||
| // Case 1: Inline — return bytes directly without I/O | ||
| val inlineResults = inlineRows.map { ri => |
There was a problem hiding this comment.
This is a no op for now right since we are only targeting out of line blob for milestone 1?
There was a problem hiding this comment.
Right but I think it's easy enough to add reader support in this task so might as well.
| */ | ||
| private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { | ||
| // Group by file path | ||
| val byFile = rows.groupBy(_.filePath) |
There was a problem hiding this comment.
So this is the use for where multiple recordKeys refer to the same out of line blob and we want to just not repeat downloading the same file again?
There was a problem hiding this comment.
Right, if there are consecutive or almost consecutive ranges, it makes sense to just read the file once
| normalized.contains("show indexes") || | ||
| normalized.contains("refresh index") | ||
| normalized.contains("refresh index") || | ||
| normalized.contains(" blob") |
There was a problem hiding this comment.
Would this match on something such as SELECT * FROM blob_table? If so then i think we should maybe match it on the read_blob function, or is there a more precise way to match here?
There was a problem hiding this comment.
Yes but this is also not related to read_blob. This is just for triggering our custom parsing for adding blob types when creating a table
| } | ||
|
|
||
| @Test | ||
| def testMultipleReadBlobInSameQuery(): Unit = { |
There was a problem hiding this comment.
@the-other-tim-brown Wanted to share something with you when I tried modifying this test slightly.
Used the following test which basically alters the offset2 for each row (to make this more distinguishable)
def testMultipleReadBlobInSameQuery(): Unit = {
val filePath1 = createTestFile(tempDir, "multi1.bin", 10000)
val filePath2 = createTestFile(tempDir, "multi2.bin", 10000)
// Use different offsets for file1 vs file2 so byte content is distinguishable.
// createTestFile writes bytes as (i % 256), so:
// file1 offset=0 -> bytes (0, 1, 2, ...)
// file2 offset=500 -> bytes (244, 245, 246, ...)
val df = sparkSession.createDataFrame(Seq(
(1, filePath1, 0L, 50L, filePath2, 500L, 50L),
(2, filePath1, 100L, 50L, filePath2, 600L, 50L)
)).toDF("id", "external_path1", "offset1", "length1", "external_path2", "offset2", "length2")
.withColumn("file_info1",
blobStructCol("file_info1", col("external_path1"), col("offset1"), col("length1")))
.withColumn("file_info2",
blobStructCol("file_info2", col("external_path2"), col("offset2"), col("length2")))
.select("id", "file_info1", "file_info2")
df.createOrReplaceTempView("multi_table")
// SQL with multiple read_blob calls on DIFFERENT blob columns
val result = sparkSession.sql("""
SELECT
id,
read_blob(file_info1) as data1,
read_blob(file_info2) as data2
FROM multi_table
ORDER BY id
""")
val rows = result.collect()
assertEquals(2, rows.length)
// Row 1: data1 = file1 at offset 0, data2 = file2 at offset 500
val data1_row1 = rows(0).getAs[Array[Byte]]("data1")
val data2_row1 = rows(0).getAs[Array[Byte]]("data2")
assertEquals(50, data1_row1.length)
assertEquals(50, data2_row1.length)
assertBytesContent(data1_row1, expectedOffset = 0)
// This assertion will fail if ReadBlobRule only resolves the first blob column,
// because data2 will contain file1's bytes (offset 0) instead of file2's bytes (offset 500)
assertBytesContent(data2_row1, expectedOffset = 500)
// Row 2: data1 = file1 at offset 100, data2 = file2 at offset 600
val data1_row2 = rows(1).getAs[Array[Byte]]("data1")
val data2_row2 = rows(1).getAs[Array[Byte]]("data2")
assertBytesContent(data1_row2, expectedOffset = 100)
assertBytesContent(data2_row2, expectedOffset = 600)
}
I think the issue i am seeing it that data2 will contain file_info1's bytes (offset 0) instead of file_info2's bytes (offset 500). Let me know if this is correctness issue.
There was a problem hiding this comment.
Good catch, updating the PR with the fix
| var inputStream: SeekableDataInputStream = null | ||
| try { | ||
| val path = new StoragePath(rowInfo.filePath) | ||
| val fileLength = storage.getPathInfo(path).getLength.toInt |
There was a problem hiding this comment.
claude: Integer overflow — Long.toInt casts in readWholeFile and readAndSplitRange will silently wrap for files > 2GB, causing NegativeArraySizeException.
There was a problem hiding this comment.
any suggestions for how to fix it? the reader requires int
|
@the-other-tim-brown A more general comment was having maybe smaller PRs like maybe around 1k lines for this work, would it make easier on the implementer getting in changes and reviewer. Would it be possible to break this work only into two chunks, maybe just ensuring we first land anything needed for reading out of line blobs(milestone 1), and then have another PR for the remaining functionalities? |
| val blobStruct = contentField.dataType.asInstanceOf[StructType] | ||
| assertEquals(BlobType(), blobStruct) | ||
| } | ||
| } |
There was a problem hiding this comment.
@the-other-tim-brown I think i was also seeing issues when i added tests like this where i would use the blob_read in a WHERE clause
@Test
def testReadBlobInWhereClauseShouldFail(): Unit = {
val filePath = createTestFile(tempDir, "where.bin", 10000)
val df = sparkSession.createDataFrame(Seq(
(1, filePath, 0L, 100L),
(2, filePath, 100L, 100L),
(3, filePath, 200L, 100L)
)).toDF("id", "external_path", "offset", "length")
.withColumn("file_info",
blobStructCol("file_info", col("external_path"), col("offset"), col("length")))
.select("id", "file_info")
df.createOrReplaceTempView("where_table")
// read_blob() in WHERE clause: ReadBlobRule only matches Project nodes,
// so the ReadBlobExpression in the Filter node is never resolved.
// Since ReadBlobExpression is Unevaluable, this should either:
// (a) fail with a clear error during analysis, or
// (b) work correctly
// Currently it crashes at code generation with an opaque error.
val ex = assertThrows(classOf[Exception], () => {
sparkSession.sql("""
SELECT id
FROM where_table
WHERE length(read_blob(file_info)) > 50
""").collect()
})
// Demonstrates the bug: error message is unhelpful.
// Should either be supported or produce a clear analysis error like:
// "read_blob() can only be used in SELECT projections, not in WHERE/HAVING/ORDER BY"
assertTrue(
ex.getMessage.contains("Unevaluable") || ex.getMessage.contains("cannot be evaluated"),
s"Expected Unevaluable error but got: ${ex.getMessage}"
)
}
Was hitting the
Cannot generate code for expression: read_blob(input[1, struct<type:string,data:binary,reference:struct<external_path:string,offset:bigint,length:bigint,managed:boolean>>, false]) ==>
Would this also happen on a HAVING query?
There was a problem hiding this comment.
I'll see if this is easy to support. If not, we can move it to a follow up. We'll likely want to optimize this to just use the length in the struct itself instead of reading the bytes from the files
@rahil-c I'll pull the SQL support for creating a blob type and the test utils into an initial PR which should be quicker to review |
|
I've created this PR to pull out some of the changes #18347 |
Describe the issue this Pull Request addresses
Closes #18110
This PR adds support for establishing Blob columns with Spark SQL and then reading those values back as bytes with a
read_blobsscalar function in Spark SQL.The underlying reader will batch reads where possible to improve performance when reading blobs from ranges within the same file.
Summary and Changelog
SparkAdapteris updated to inject scalar functions (read_blob) and then inject planner strategies for translating the blob reading from a logical plan node into a physical plan that actually reads back the dataBatchedBlobReaderperforms the actual lookups for the blob content. It handles three cases, inline data, reading a subsection of a file, or reading the full file.ReadBlobRulewhich takes theReadBlobExpressiongenerated when parsing the query and translates them toBatchedBlobReadnodes. The node is handled by theBatchedBlobReaderStrategythat converts to the physical node ofBatchedBlobReadExecImpact
Provides the utilities for users to retrieve their out-of-line blob data.
Risk Level
Low, this is net new functionalityi
Documentation Update
Contributor's checklist