Skip to content

[SDP] [SPARK-52576] Drop/recreate on full refresh and MV update #51280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

sryza
Copy link
Contributor

@sryza sryza commented Jun 25, 2025

What changes were proposed in this pull request?

Some pipeline runs result in wiping out and replacing all the data for a table:

  • Every run of a materialized view
  • Runs of streaming tables that have the "full refresh" flag

In the current implementation, this "wipe out and replace" is implemented by:

  • Truncating the table
  • Altering the table to drop/update/add columns that don't match the columns in the DataFrame for the current run

The reason that we want originally wanted to truncate + alter instead of drop / recreate is that dropping has some undesirable effects. E.g. it interrupts readers of the table and wipes away things like ACLs.

However, we discovered that not all catalogs support dropping columns (e.g. Hive does not), and there’s no way to tell whether a catalog supports dropping columns or not. So this PR changes the implementation to drop/recreate the table instead of truncate/alter.

Why are the changes needed?

See section above.

Does this PR introduce any user-facing change?

Yes, see section above. No releases contained the old behavior.

How was this patch tested?

  • Tests in MaterializeTablesSuite
  • Ran the tests in MaterializeTablesSuite with Hive instead of the default catalog

Was this patch authored or co-authored using generative AI tooling?

No

@sryza sryza requested review from gengliangwang and cloud-fan June 25, 2025 17:51
@github-actions github-actions bot added the SQL label Jun 25, 2025
@sryza sryza changed the title [SDP] Drop/recreate on full refresh and MV update [SDP] [SPARK-52576] Drop/recreate on full refresh and MV update Jun 25, 2025
Copy link
Contributor

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, can change if we add support for drop Column for HMS in the V2SessionCatalog

val dropTable = (isFullRefresh || !table.isStreamingTableOpt.get) && existingTableOpt.isDefined
if (dropTable) {
catalog.dropTable(identifier)
// context.spark.sql(s"DROP TABLE ${table.identifier.quotedString}")
Copy link
Contributor

@szehon-ho szehon-ho Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove? Optionally add comment about why not truncate/alter?

val dropTable = (isFullRefresh || !table.isStreamingTableOpt.get) && existingTableOpt.isDefined
if (dropTable) {
catalog.dropTable(identifier)
// context.spark.sql(s"DROP TABLE ${table.identifier.quotedString}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we remove this line?

@@ -446,8 +446,9 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest {

val table2 = catalog.loadTable(identifier)
assert(
table2.columns() sameElements CatalogV2Util
.structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType))
table2.columns().toSet == CatalogV2Util
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering of columns does not appear to be deterministic (at least across different catalog implementations). Is that unexpected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for a table, the column order matters. I think we should keep the test as it is and fix the issues we found.

Copy link
Contributor Author

@sryza sryza Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we able to get some help with fixing this? What I'm observing is that, with Hive, when I create a table using the following:

      catalog.createTable(
        identifier,
        new TableInfo.Builder()
          .withProperties(mergedProperties.asJava)
          .withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema))
          .withPartitions(partitioning.toArray)
          .build()
      )

and then later fetch the columns using

catalog.loadTable(identifier).columns()

The columns are returned in a different order than they appear in outputSchema.

This happens only with Hive, not the default catalog.

Copy link
Contributor

@szehon-ho szehon-ho Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strange, i can take a look

I ran the test case in HiveDDLSuite a few times and cant reproduce it.

    val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]
    withTable("t1") {
      val identifier = Identifier.of(Array("default"), "t1")
      val outputSchema = new StructType()
        .add("a", IntegerType, true, "comment1")
        .add("b", IntegerType, true, "comment2")
        .add("c", IntegerType, true, "comment3")
        .add("d", IntegerType, true, "comment4")
      catalog.createTable(
        identifier,
        new TableInfo.Builder()
          .withProperties(Map.empty.asJava)
          .withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema))
          .withPartitions(Array.empty)
          .build()
      )
      val cols = catalog.loadTable(identifier).columns()
      assert(cols.length == 4)
      assert(cols(0).name() == "a")
      assert(cols(1).name() == "b")
      assert(cols(2).name() == "c")
      assert(cols(3).name() == "d")
    }

Is it reproducible with this pr itself?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue is that, the hive catalog always puts partition columns at the end of the table schema. For example, a, b, c PARTITIONED BY b will end up with column order a, c, b.

Can we tweak this test to avoid this hive limitation?

Copy link
Contributor Author

@sryza sryza Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it sounds like the order of the columns returned here is not defined in a catalog-agnostic way? Objections to using non-ordered comparison here in that case?

An alternative could be for MaterializeTablesSuite to have a column-order method that the upcoming Hive-specific subclass overrides. Though not sure the added complexity is worth it given that column ordering is tangential to the main behaviors we're trying to test in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice find. Does the suite run against hive/non-hive? For me, Im ok either way, maybe add a short comment in the test to explain.

@sryza sryza requested review from gengliangwang and szehon-ho June 26, 2025 15:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants