From 1a00003a4522f3e39b3bcf3443e5f9c5d96580bc Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 25 Jun 2025 10:19:06 -0700 Subject: [PATCH 1/2] drop on full refresh --- .../spark/sql/pipelines/graph/DatasetManager.scala | 10 ++++++---- .../sql/pipelines/graph/MaterializeTablesSuite.scala | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala index 79c5ef36b0bc4..20e3a4a46cd39 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala @@ -178,12 +178,14 @@ object DatasetManager extends Logging { } // Wipe the data if we need to - if ((isFullRefresh || !table.isStreamingTableOpt.get) && existingTableOpt.isDefined) { - context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}") + val dropTable = (isFullRefresh || !table.isStreamingTableOpt.get) && existingTableOpt.isDefined + if (dropTable) { + catalog.dropTable(identifier) +// context.spark.sql(s"DROP TABLE ${table.identifier.quotedString}") } // Alter the table if we need to - if (existingTableOpt.isDefined) { + if (existingTableOpt.isDefined && !dropTable) { val existingSchema = existingTableOpt.get.schema() val targetSchema = if (table.isStreamingTableOpt.get && !isFullRefresh) { @@ -198,7 +200,7 @@ object DatasetManager extends Logging { } // Create the table if we need to - if (existingTableOpt.isEmpty) { + if (dropTable || existingTableOpt.isEmpty) { catalog.createTable( identifier, new TableInfo.Builder() diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala index 2587f503222e8..8cf7f613db440 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala @@ -446,8 +446,9 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest { val table2 = catalog.loadTable(identifier) assert( - table2.columns() sameElements CatalogV2Util - .structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType)) + table2.columns().toSet == CatalogV2Util + .structTypeToV2Columns(new StructType().add("x", BooleanType).add("y", IntegerType)) + .toSet ) assert(table2.partitioning().toSeq == Seq(Expressions.identity("x"))) @@ -468,8 +469,9 @@ class MaterializeTablesSuite extends BaseCoreExecutionTest { val table3 = catalog.loadTable(identifier) assert( - table3.columns() sameElements CatalogV2Util - .structTypeToV2Columns(new StructType().add("y", IntegerType).add("x", BooleanType)) + table3.columns().toSet == CatalogV2Util + .structTypeToV2Columns(new StructType().add("x", BooleanType).add("y", IntegerType)) + .toSet ) assert(table3.partitioning().toSeq == Seq(Expressions.identity("x"))) } From 60f79181b3684a7defd83376c390bf505fce6746 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 25 Jun 2025 21:31:47 -0700 Subject: [PATCH 2/2] remove commented-out code --- .../org/apache/spark/sql/pipelines/graph/DatasetManager.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala index 20e3a4a46cd39..dcc4aa8f4e31c 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala @@ -181,7 +181,6 @@ object DatasetManager extends Logging { val dropTable = (isFullRefresh || !table.isStreamingTableOpt.get) && existingTableOpt.isDefined if (dropTable) { catalog.dropTable(identifier) -// context.spark.sql(s"DROP TABLE ${table.identifier.quotedString}") } // Alter the table if we need to