-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SDP] [SPARK-52576] Drop/recreate on full refresh and MV update #51280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, can change if we add support for drop Column for HMS in the V2SessionCatalog
val dropTable = (isFullRefresh || !table.isStreamingTableOpt.get) && existingTableOpt.isDefined | ||
if (dropTable) { | ||
catalog.dropTable(identifier) | ||
// context.spark.sql(s"DROP TABLE ${table.identifier.quotedString}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove? Optionally add comment about why not truncate/alter?
val dropTable = (isFullRefresh || !table.isStreamingTableOpt.get) && existingTableOpt.isDefined | ||
if (dropTable) { | ||
catalog.dropTable(identifier) | ||
// context.spark.sql(s"DROP TABLE ${table.identifier.quotedString}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we remove this line?
@@ -446,8 +446,9 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest { | |||
|
|||
val table2 = catalog.loadTable(identifier) | |||
assert( | |||
table2.columns() sameElements CatalogV2Util | |||
.structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType)) | |||
table2.columns().toSet == CatalogV2Util |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ordering of columns does not appear to be deterministic (at least across different catalog implementations). Is that unexpected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for a table, the column order matters. I think we should keep the test as it is and fix the issues we found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we able to get some help with fixing this? What I'm observing is that, with Hive, when I create a table using the following:
catalog.createTable(
identifier,
new TableInfo.Builder()
.withProperties(mergedProperties.asJava)
.withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema))
.withPartitions(partitioning.toArray)
.build()
)
and then later fetch the columns using
catalog.loadTable(identifier).columns()
The columns are returned in a different order than they appear in outputSchema
.
This happens only with Hive, not the default catalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
strange, i can take a look
I ran the test case in HiveDDLSuite a few times and cant reproduce it.
val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]
withTable("t1") {
val identifier = Identifier.of(Array("default"), "t1")
val outputSchema = new StructType()
.add("a", IntegerType, true, "comment1")
.add("b", IntegerType, true, "comment2")
.add("c", IntegerType, true, "comment3")
.add("d", IntegerType, true, "comment4")
catalog.createTable(
identifier,
new TableInfo.Builder()
.withProperties(Map.empty.asJava)
.withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema))
.withPartitions(Array.empty)
.build()
)
val cols = catalog.loadTable(identifier).columns()
assert(cols.length == 4)
assert(cols(0).name() == "a")
assert(cols(1).name() == "b")
assert(cols(2).name() == "c")
assert(cols(3).name() == "d")
}
Is it reproducible with this pr itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the issue is that, the hive catalog always puts partition columns at the end of the table schema. For example, a, b, c PARTITIONED BY b
will end up with column order a, c, b
.
Can we tweak this test to avoid this hive limitation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it sounds like the order of the columns returned here is not defined in a catalog-agnostic way? Objections to using non-ordered comparison here in that case?
An alternative could be for MaterializeTablesSuite
to have a column-order method that the upcoming Hive-specific subclass overrides. Though not sure the added complexity is worth it given that column ordering is tangential to the main behaviors we're trying to test in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice find. Does the suite run against hive/non-hive? For me, Im ok either way, maybe add a short comment in the test to explain.
What changes were proposed in this pull request?
Some pipeline runs result in wiping out and replacing all the data for a table:
In the current implementation, this "wipe out and replace" is implemented by:
The reason that we want originally wanted to truncate + alter instead of drop / recreate is that dropping has some undesirable effects. E.g. it interrupts readers of the table and wipes away things like ACLs.
However, we discovered that not all catalogs support dropping columns (e.g. Hive does not), and there’s no way to tell whether a catalog supports dropping columns or not. So this PR changes the implementation to drop/recreate the table instead of truncate/alter.
Why are the changes needed?
See section above.
Does this PR introduce any user-facing change?
Yes, see section above. No releases contained the old behavior.
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
No