Skip to content

Conversation

fangbo
Copy link
Contributor

@fangbo fangbo commented Sep 18, 2025

Related issue: #32

To add column with backfill data, some config should be set.

  1. Read _rowaddr and _fragid from Lance dataset
      spark.conf().set("spark.sql.lance.with_metadata", "true");
  1. Use "overwrite" mode to write and set "backfill_columns" to specify the new columns to add
      df2.write()
          .mode("overwrite")
          .option("backfill_columns", "new_col")
          .saveAsTable(catalogName + ".default." + tableName);

Copy link
Contributor

ACTION NEEDED
Lance follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

For details on the error please inspect the "PR Title Check" action.

@fangbo fangbo marked this pull request as draft September 18, 2025 03:58
@fangbo fangbo changed the title [WIP]feat: add support for add_column with backfill feat: add support for add_column with backfill Sep 18, 2025
@github-actions github-actions bot added the enhancement New feature or request label Sep 18, 2025
NamedReference segmentId = Expressions.column(LanceConstant.ROW_ADDRESS);
SortValue sortValue =
new SortValue(segmentId, SortDirection.ASCENDING, NullOrdering.NULLS_FIRST);
return new SortValue[] {sortValue};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should both of two keys, _frag_id and _rowaddr, be sorted here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The physical plan is:

== Physical Plan ==
CommandResult <empty>
   +- AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$1782/1752894940@6f7b8ae1, com.lancedb.lance.spark.write.AddColumnsWrite@6c8d8b60
      +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            *(2) Sort [_rowaddr#11L ASC NULLS FIRST], false, 0
            +- AQEShuffleRead coalesced
               +- ShuffleQueryStage 0
                  +- Exchange hashpartitioning(_fragid#12, 200), REPARTITION_BY_COL, [plan_id=84]
                     +- *(1) Project [_rowaddr#11L, _fragid#12, id#8, (id#8 * 100) AS new_col#39]
                        +- *(1) ColumnarToRow
                           +- BatchScan add_column_table[id#8, _rowaddr#11L, _fragid#12] class com.lancedb.lance.spark.read.LanceScan RuntimeFilters: []
         +- == Initial Plan ==
            Sort [_rowaddr#11L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(_fragid#12, 200), REPARTITION_BY_COL, [plan_id=68]
               +- Project [_rowaddr#11L, _fragid#12, id#8, (id#8 * 100) AS new_col#39]
                  +- BatchScan add_column_table[id#8, _rowaddr#11L, _fragid#12] class com.lancedb.lance.spark.read.LanceScan RuntimeFilters: []

The sort operator is after distribution. So I think we can only sort data by _rowaddr.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say the data is first repartitioned by _fragid, which guarantees that all rows from the same fragment end up in the same partition.
But if the sort doesn’t include _fragid, rows inside that partition may get interleaved? E.g., (1,0), (2,0), (1,1), (2,2)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if the sort doesn’t include _fragid, rows inside that partition may get interleaved? E.g., (1,0), (2,0), (1,1), (2,2)

_rowaddr is u64 and is composed by:

 frag_id (32) + row_index(32)

So I think rows will not get interleaved.

Dataset<Row> df2 = result.withColumn("new_col", functions.expr("id * 100"));

// Write back with backfill option
df2.write()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit hesitated about this user experience... we are clearly inventing something new here which is great, but overloading the overwrite mode feels wrong to me. So far the "norm" is that we can invent something in SQL, and then eventually dataframe operations will catch up. For example there was MERGE INTO SQL, and in Spark 4.0 now there is merge DataFrame operations. So to me having a SQL extensions feels "official" compared to using write overwrite that feels much more like a hack.

df2.createOrReplaceTempView("backfill_data")
spark.sql("ALTER TABLE ADD COLUMNS col1, col2 AS SELECT * FROM backfill_data")

I guess that brings the complexity of needing to add the SQL extensions and related parsers, but we probably need that anyway for features like compaction.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing is that compared to this, there is technically also the add column using function experience, which would be harder to express in DataFrame, but we can invent SQL like:

spark.sql("ALTER TABLE ADD COLUMNS col1 AS my_udf(col2)")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, that custom DSL/SQL will deliver a better user experience. For now, changing the semantics of overwrite/mergeinto seems okay to me.

For the UDF scenario, if the computed column only depends on the current dataset (and is 1:1 row-aligned), the UDF approach is more intuitive. It's worth noting that there are also cases where we need to combine external data to compute new columns — e.g., join with external tables / lookup services -- where the DataFrame style offers greater expressiveness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I greatly appreciate your suggestion. I also think using SQL extensions feels more official.

}

@Override
public SortOrder[] requiredOrdering() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the row address naturally be ascending within a fragment? I don't think we need to force ordering it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the row address naturally be ascending within a fragment? I don't think we need to force ordering it?

The data in the original table may be processed in a distributed manner, which can change the order of the records. So it is necessary to force order it by _rowaddr.

@fangbo fangbo force-pushed the add-column-backfill branch 2 times, most recently from 555bd7f to 33a412c Compare September 30, 2025 10:00
@fangbo fangbo force-pushed the add-column-backfill branch 2 times, most recently from f2d0827 to fb84266 Compare October 9, 2025 06:30
@fangbo fangbo marked this pull request as ready for review October 9, 2025 09:39
@fangbo
Copy link
Contributor Author

fangbo commented Oct 10, 2025

@jackye1995 @qidian99 This PR is ready. Could you please review it? Thank you.

@fangbo fangbo force-pushed the add-column-backfill branch from 9553e73 to d65a315 Compare October 11, 2025 02:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants