-
Notifications
You must be signed in to change notification settings - Fork 19
feat: add support for add_column with backfill #91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
ACTION NEEDED The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. For details on the error please inspect the "PR Title Check" action. |
...base_2.12/src/main/java/com/lancedb/lance/spark/extention/ExtendedDataSourceV2Strategy.scala
Outdated
Show resolved
Hide resolved
NamedReference segmentId = Expressions.column(LanceConstant.ROW_ADDRESS); | ||
SortValue sortValue = | ||
new SortValue(segmentId, SortDirection.ASCENDING, NullOrdering.NULLS_FIRST); | ||
return new SortValue[] {sortValue}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should both of two keys, _frag_id and _rowaddr, be sorted here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The physical plan is:
== Physical Plan ==
CommandResult <empty>
+- AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$1782/1752894940@6f7b8ae1, com.lancedb.lance.spark.write.AddColumnsWrite@6c8d8b60
+- AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(2) Sort [_rowaddr#11L ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(_fragid#12, 200), REPARTITION_BY_COL, [plan_id=84]
+- *(1) Project [_rowaddr#11L, _fragid#12, id#8, (id#8 * 100) AS new_col#39]
+- *(1) ColumnarToRow
+- BatchScan add_column_table[id#8, _rowaddr#11L, _fragid#12] class com.lancedb.lance.spark.read.LanceScan RuntimeFilters: []
+- == Initial Plan ==
Sort [_rowaddr#11L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(_fragid#12, 200), REPARTITION_BY_COL, [plan_id=68]
+- Project [_rowaddr#11L, _fragid#12, id#8, (id#8 * 100) AS new_col#39]
+- BatchScan add_column_table[id#8, _rowaddr#11L, _fragid#12] class com.lancedb.lance.spark.read.LanceScan RuntimeFilters: []
The sort operator is after distribution. So I think we can only sort data by _rowaddr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say the data is first repartitioned by _fragid, which guarantees that all rows from the same fragment end up in the same partition.
But if the sort doesn’t include _fragid, rows inside that partition may get interleaved? E.g., (1,0), (2,0), (1,1), (2,2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if the sort doesn’t include _fragid, rows inside that partition may get interleaved? E.g., (1,0), (2,0), (1,1), (2,2)
_rowaddr is u64 and is composed by:
frag_id (32) + row_index(32)
So I think rows will not get interleaved.
Dataset<Row> df2 = result.withColumn("new_col", functions.expr("id * 100")); | ||
|
||
// Write back with backfill option | ||
df2.write() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit hesitated about this user experience... we are clearly inventing something new here which is great, but overloading the overwrite mode feels wrong to me. So far the "norm" is that we can invent something in SQL, and then eventually dataframe operations will catch up. For example there was MERGE INTO SQL, and in Spark 4.0 now there is merge DataFrame operations. So to me having a SQL extensions feels "official" compared to using write overwrite that feels much more like a hack.
df2.createOrReplaceTempView("backfill_data")
spark.sql("ALTER TABLE ADD COLUMNS col1, col2 AS SELECT * FROM backfill_data")
I guess that brings the complexity of needing to add the SQL extensions and related parsers, but we probably need that anyway for features like compaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing is that compared to this, there is technically also the add column using function experience, which would be harder to express in DataFrame, but we can invent SQL like:
spark.sql("ALTER TABLE ADD COLUMNS col1 AS my_udf(col2)")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, that custom DSL/SQL will deliver a better user experience. For now, changing the semantics of overwrite/mergeinto seems okay to me.
For the UDF scenario, if the computed column only depends on the current dataset (and is 1:1 row-aligned), the UDF approach is more intuitive. It's worth noting that there are also cases where we need to combine external data to compute new columns — e.g., join with external tables / lookup services -- where the DataFrame style offers greater expressiveness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I greatly appreciate your suggestion. I also think using SQL extensions feels more official.
} | ||
|
||
@Override | ||
public SortOrder[] requiredOrdering() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the row address naturally be ascending within a fragment? I don't think we need to force ordering it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the row address naturally be ascending within a fragment? I don't think we need to force ordering it?
The data in the original table may be processed in a distributed manner, which can change the order of the records. So it is necessary to force order it by _rowaddr.
555bd7f
to
33a412c
Compare
f2d0827
to
fb84266
Compare
@jackye1995 @qidian99 This PR is ready. Could you please review it? Thank you. |
9553e73
to
d65a315
Compare
Related issue: #32
To add column with backfill data, some config should be set.