From 03714e3ec27639e316c2eae18d2df62de4338ef9 Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Wed, 30 Apr 2025 12:19:22 -0700 Subject: [PATCH 1/3] add integration tests --- .../spark/v3.5/integration/build.gradle.kts | 11 +- .../spark/quarkus/it/SparkDeltaIT.java | 265 ++++++++++++++++++ .../polaris/spark/quarkus/it/SparkIT.java | 108 ++++++- .../quarkus/it/SparkIntegrationBase.java | 48 ++-- .../src/intTest/resources/logback.xml | 32 +++ .../apache/polaris/spark/SparkCatalog.java | 19 +- .../spark/utils/PolarisCatalogUtils.java | 10 + 7 files changed, 453 insertions(+), 40 deletions(-) create mode 100644 plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkDeltaIT.java create mode 100644 plugins/spark/v3.5/integration/src/intTest/resources/logback.xml diff --git a/plugins/spark/v3.5/integration/build.gradle.kts b/plugins/spark/v3.5/integration/build.gradle.kts index 26841f3562..90a39b08c7 100644 --- a/plugins/spark/v3.5/integration/build.gradle.kts +++ b/plugins/spark/v3.5/integration/build.gradle.kts @@ -38,9 +38,9 @@ val scalaLibraryVersion = dependencies { // must be enforced to get a consistent and validated set of dependencies implementation(enforcedPlatform(libs.quarkus.bom)) { - exclude(group = "org.antlr", module = "antlr4-runtime") - exclude(group = "org.scala-lang", module = "scala-library") - exclude(group = "org.scala-lang", module = "scala-reflect") + exclude("org.antlr", "antlr4-runtime") + exclude("org.scala-lang", "scala-library") + exclude("org.scala-lang", "scala-reflect") } implementation(project(":polaris-quarkus-service")) @@ -51,10 +51,13 @@ dependencies { testImplementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") { // exclude log4j dependencies exclude("org.apache.logging.log4j", "log4j-slf4j2-impl") - exclude("org.apache.logging.log4j", "log4j-api") exclude("org.apache.logging.log4j", "log4j-1.2-api") + exclude("org.apache.logging.log4j", "log4j-core") exclude("org.slf4j", "jul-to-slf4j") } + testRuntimeOnly("org.apache.logging.log4j:log4j-core:2.24.3") + testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl:2.24.3") + testImplementation("io.delta:delta-spark_${scalaVersion}:3.3.1") testImplementation(platform(libs.jackson.bom)) testImplementation("com.fasterxml.jackson.core:jackson-annotations") diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkDeltaIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkDeltaIT.java new file mode 100644 index 0000000000..070f5f79a7 --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkDeltaIT.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.quarkus.test.junit.QuarkusIntegrationTest; +import java.io.File; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.polaris.service.it.env.IntegrationTestsHelper; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.apache.spark.sql.delta.DeltaAnalysisException; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +@QuarkusIntegrationTest +public class SparkDeltaIT extends SparkIntegrationBase { + private String defaultNs; + private String tableRootDir; + + private String getTableLocation(String tableName) { + return String.format("%s/%s", tableRootDir, tableName); + } + + private String getTableNameWithRandomSuffix() { + return generateName("deltatb"); + } + + @BeforeEach + public void createDefaultResources(@TempDir Path tempDir) { + spark.sparkContext().setLogLevel("WARN"); + defaultNs = generateName("delta"); + // create a default namespace + sql("CREATE NAMESPACE %s", defaultNs); + sql("USE NAMESPACE %s", defaultNs); + tableRootDir = + IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(defaultNs).getPath(); + } + + @AfterEach + public void cleanupDeltaData() { + // clean up delta data + File dirToDelete = new File(tableRootDir); + FileUtils.deleteQuietly(dirToDelete); + sql("DROP NAMESPACE %s", defaultNs); + } + + @Test + public void testBasicTableOperations() { + // create a regular delta table + String deltatb1 = "deltatb1"; + sql( + "CREATE TABLE %s (id INT, name STRING) USING DELTA LOCATION '%s'", + deltatb1, getTableLocation(deltatb1)); + sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", deltatb1); + List results = sql("SELECT * FROM %s WHERE id > 1 ORDER BY id DESC", deltatb1); + assertThat(results.size()).isEqualTo(1); + assertThat(results.get(0)).isEqualTo(new Object[] {2, "bob"}); + + // create a detla table with partition + String deltatb2 = "deltatb2"; + sql( + "CREATE TABLE %s (name String, age INT, country STRING) USING DELTA PARTITIONED BY (country) LOCATION '%s'", + deltatb2, getTableLocation(deltatb2)); + sql( + "INSERT INTO %s VALUES ('anna', 10, 'US'), ('james', 32, 'US'), ('yan', 16, 'CHINA')", + deltatb2); + results = sql("SELECT name, country FROM %s ORDER BY age", deltatb2); + assertThat(results.size()).isEqualTo(3); + assertThat(results.get(0)).isEqualTo(new Object[] {"anna", "US"}); + assertThat(results.get(1)).isEqualTo(new Object[] {"yan", "CHINA"}); + assertThat(results.get(2)).isEqualTo(new Object[] {"james", "US"}); + + // verify the partition dir is created + List subDirs = listDirs(getTableLocation(deltatb2)); + assertThat(subDirs).contains("_delta_log", "country=CHINA", "country=US"); + + // test listTables + List tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(2); + assertThat(tables) + .contains( + new Object[] {defaultNs, deltatb1, false}, new Object[] {defaultNs, deltatb2, false}); + + sql("DROP TABLE %s", deltatb1); + sql("DROP TABLE %s", deltatb2); + tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(0); + } + + @Test + public void testAlterOperations() { + String deltatb = getTableNameWithRandomSuffix(); + sql( + "CREATE TABLE %s (id INT, name STRING) USING DELTA LOCATION '%s'", + deltatb, getTableLocation(deltatb)); + sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", deltatb); + + // test alter columns + // add two new columns to the table + sql("Alter TABLE %s ADD COLUMNS (city STRING, age INT)", deltatb); + // add one more row to the table + sql("INSERT INTO %s VALUES (3, 'john', 'SFO', 20)", deltatb); + // verify the table now have 4 columns with correct result + List results = sql("SELECT * FROM %s ORDER BY id", deltatb); + assertThat(results.size()).isEqualTo(3); + assertThat(results).contains(new Object[] {1, "anna", null, null}); + assertThat(results).contains(new Object[] {2, "bob", null, null}); + assertThat(results).contains(new Object[] {3, "john", "SFO", 20}); + + // drop and rename column require set the delta.columnMapping property + sql("ALTER TABLE %s SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')", deltatb); + // drop column age + sql("Alter TABLE %s DROP COLUMN age", deltatb); + // verify the table now have 3 columns with correct result + results = sql("SELECT * FROM %s ORDER BY id", deltatb); + assertThat(results.size()).isEqualTo(3); + assertThat(results).contains(new Object[] {1, "anna", null}); + assertThat(results).contains(new Object[] {2, "bob", null}); + assertThat(results).contains(new Object[] {3, "john", "SFO"}); + + // rename column city to address + sql("Alter TABLE %s RENAME COLUMN city TO address", deltatb); + // verify column address exists + results = sql("SELECT id, address FROM %s ORDER BY id", deltatb); + assertThat(results.size()).isEqualTo(3); + assertThat(results).contains(new Object[] {1, null}); + assertThat(results).contains(new Object[] {2, null}); + assertThat(results).contains(new Object[] {3, "SFO"}); + + // test alter properties + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('description' = 'people table', 'test-owner' = 'test-user')", + deltatb); + List tableInfo = sql("DESCRIBE TABLE EXTENDED %s", deltatb); + // find the table properties result + String properties = null; + for (Object[] info : tableInfo) { + if (info[0].equals("Table Properties")) { + properties = (String) info[1]; + break; + } + } + assertThat(properties).contains("description=people table,test-owner=test-user"); + sql("DROP TABLE %s", deltatb); + } + + @Test + public void testUnsupportedAlterTableOperations() { + String deltatb = getTableNameWithRandomSuffix(); + sql( + "CREATE TABLE %s (name String, age INT, country STRING) USING DELTA PARTITIONED BY (country) LOCATION '%s'", + deltatb, getTableLocation(deltatb)); + + // ALTER TABLE ... RENAME TO ... fails + assertThatThrownBy(() -> sql("ALTER TABLE %s RENAME TO new_delta", deltatb)) + .isInstanceOf(UnsupportedOperationException.class); + + // ALTER TABLE ... SET LOCATION ... fails + assertThatThrownBy(() -> sql("ALTER TABLE %s SET LOCATION '/tmp/new/path'", deltatb)) + .isInstanceOf(DeltaAnalysisException.class); + + // ALTER TABLE ... SET FILEFORMAT ... fails + assertThatThrownBy(() -> sql("ALTER TABLE %s SET FILEFORMAT 'csv'", deltatb)) + .isInstanceOf(ParseException.class); + + // ALTER TABLE ... ADD PARTITION ... fails + assertThatThrownBy(() -> sql("ALTER TABLE %s ADD PARTITION (country='US')", deltatb)) + .isInstanceOf(AnalysisException.class); + + sql("DROP TABLE %s", deltatb); + } + + @Test + public void testUnsupportedTableCreateOperations() { + String deltatb = getTableNameWithRandomSuffix(); + // create delta table with no location + assertThatThrownBy(() -> sql("CREATE TABLE %s (id INT, name STRING) USING DELTA", deltatb)) + .isInstanceOf(UnsupportedOperationException.class); + + // CTAS fails + assertThatThrownBy( + () -> + sql( + "CREATE TABLE %s USING DELTA LOCATION '%s' AS SELECT 1 AS id", + deltatb, getTableLocation(deltatb))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testDataframeSaveOperations() { + List data = Arrays.asList(RowFactory.create("Alice", 30), RowFactory.create("Bob", 25)); + StructType schema = + new StructType( + new StructField[] { + new StructField("name", DataTypes.StringType, false, Metadata.empty()), + new StructField("age", DataTypes.IntegerType, false, Metadata.empty()) + }); + Dataset df = spark.createDataFrame(data, schema); + + String deltatb = getTableNameWithRandomSuffix(); + // saveAsTable requires support for delta requires CTAS support for third party catalog + // in delta catalog, which is currently not supported. + assertThatThrownBy( + () -> + df.write() + .format("delta") + .option("path", getTableLocation(deltatb)) + .saveAsTable(deltatb)) + .isInstanceOf(IllegalArgumentException.class); + + // verify regular dataframe saving still works + df.write().format("delta").save(getTableLocation(deltatb)); + + // verify the partition dir is created + List subDirs = listDirs(getTableLocation(deltatb)); + assertThat(subDirs).contains("_delta_log"); + + // verify we can create a table out of the exising delta location + sql("CREATE TABLE %s USING DELTA LOCATION '%s'", deltatb, getTableLocation(deltatb)); + List tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(1); + assertThat(tables).contains(new Object[] {defaultNs, deltatb, false}); + + sql("INSERT INTO %s VALUES ('Anna', 11)", deltatb); + + List results = sql("SELECT * FROM %s ORDER BY name", deltatb); + assertThat(results.size()).isEqualTo(3); + assertThat(results.get(0)).isEqualTo(new Object[] {"Alice", 30}); + assertThat(results.get(1)).isEqualTo(new Object[] {"Anna", 11}); + assertThat(results.get(2)).isEqualTo(new Object[] {"Bob", 25}); + + sql("DROP TABLE %s", deltatb); + } +} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java index f9af2609d0..a4e060a52f 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java @@ -22,8 +22,13 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.quarkus.test.junit.QuarkusIntegrationTest; +import java.io.File; +import java.nio.file.Path; import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.polaris.service.it.env.IntegrationTestsHelper; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; @QuarkusIntegrationTest public class SparkIT extends SparkIntegrationBase { @@ -64,7 +69,7 @@ public void testNamespaces() { @Test public void testCreatDropView() { - String namespace = "ns"; + String namespace = generateName("ns"); // create namespace ns sql("CREATE NAMESPACE %s", namespace); sql("USE %s", namespace); @@ -88,23 +93,112 @@ public void testCreatDropView() { sql("DROP VIEW %s", view2Name); views = sql("SHOW VIEWS"); assertThat(views.size()).isEqualTo(0); + + sql("DROP NAMESPACE %s", namespace); } @Test - public void renameView() { - sql("CREATE NAMESPACE ns"); - sql("USE ns"); + public void renameIcebergViewAndTable() { + String namespace = generateName("ns"); + sql("CREATE NAMESPACE %s", namespace); + sql("USE %s", namespace); + // create one view and one table String viewName = "originalView"; - String renamedView = "renamedView"; sql("CREATE VIEW %s AS SELECT 1 AS id", viewName); + + String icebergTable = "iceberg_table"; + sql("CREATE TABLE %s (col1 int, col2 string)", icebergTable); + + // verify view and table is showing correctly List views = sql("SHOW VIEWS"); assertThat(views.size()).isEqualTo(1); - assertThat(views).contains(new Object[] {"ns", viewName, false}); + assertThat(views).contains(new Object[] {namespace, viewName, false}); + + List tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(1); + assertThat(tables).contains(new Object[] {namespace, icebergTable, false}); + // rename the view + String renamedView = "renamedView"; sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView); views = sql("SHOW VIEWS"); assertThat(views.size()).isEqualTo(1); - assertThat(views).contains(new Object[] {"ns", renamedView, false}); + assertThat(views).contains(new Object[] {namespace, renamedView, false}); + + // rename the table + String newIcebergTable = "iceberg_table_new"; + sql("ALTER TABLE %s RENAME TO %s", icebergTable, newIcebergTable); + tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(1); + assertThat(tables).contains(new Object[] {namespace, newIcebergTable, false}); + + // clean up the resources + sql("DROP VIEW %s", renamedView); + sql("DROP TABLE %s", newIcebergTable); + sql("DROP NAMESPACE %s", namespace); + } + + @Test + public void testMixedTableAndViews(@TempDir Path tempDir) { + String namespace = generateName("ns"); + sql("CREATE NAMESPACE %s", namespace); + sql("USE %s", namespace); + + // create one iceberg table, iceberg view and one delta table + String icebergTable = "icebergtb"; + sql("CREATE TABLE %s (col1 int, col2 String)", icebergTable); + sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b')", icebergTable); + + String viewName = "icebergview"; + sql("CREATE VIEW %s AS SELECT col1 + 2 AS col1, col2 FROM %s", viewName, icebergTable); + + String deltaTable = "deltatb"; + String deltaDir = + IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(namespace).getPath(); + sql( + "CREATE TABLE %s (col1 int, col2 int) using delta location '%s/%s'", + deltaTable, deltaDir, deltaTable); + sql("INSERT INTO %s VALUES (1, 3), (2, 5), (11, 20)", deltaTable); + // join the iceberg and delta table + List joinResult = + sql( + "SELECT icebergtb.col1 as id, icebergtb.col2 as str_col, deltatb.col2 as int_col from icebergtb inner join deltatb on icebergtb.col1 = deltatb.col1 order by id"); + assertThat(joinResult.get(0)).isEqualTo(new Object[] {1, "a", 3}); + assertThat(joinResult.get(1)).isEqualTo(new Object[] {2, "b", 5}); + + // show tables shows all tables + List tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(2); + assertThat(tables) + .contains( + new Object[] {namespace, icebergTable, false}, + new Object[] {namespace, deltaTable, false}); + + // verify the table and view content + List results = sql("SELECT * FROM %s ORDER BY col1", icebergTable); + assertThat(results.size()).isEqualTo(2); + assertThat(results.get(0)).isEqualTo(new Object[] {1, "a"}); + assertThat(results.get(1)).isEqualTo(new Object[] {2, "b"}); + + // verify the table and view content + results = sql("SELECT * FROM %s ORDER BY col1", viewName); + assertThat(results.size()).isEqualTo(2); + assertThat(results.get(0)).isEqualTo(new Object[] {3, "a"}); + assertThat(results.get(1)).isEqualTo(new Object[] {4, "b"}); + + List views = sql("SHOW VIEWS"); + assertThat(views.size()).isEqualTo(1); + assertThat(views).contains(new Object[] {namespace, viewName, false}); + + // drop views and tables + sql("DROP TABLE %s", icebergTable); + sql("DROP TABLE %s", deltaTable); + sql("DROP VIEW %s", viewName); + sql("DROP NAMESPACE %s", namespace); + + // clean up delta directory + File dirToDelete = new File(deltaDir); + FileUtils.deleteQuietly(dirToDelete); } } diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java index b5006d6a79..be456716ca 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java @@ -20,9 +20,14 @@ import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.FormatMethod; +import java.io.File; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.DirectoryFileFilter; +import org.apache.commons.io.filefilter.FalseFileFilter; import org.apache.polaris.service.it.ext.PolarisSparkIntegrationTestBase; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -32,12 +37,14 @@ public abstract class SparkIntegrationBase extends PolarisSparkIntegrationTestBa @Override protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) { return builder + .config( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config( String.format("spark.sql.catalog.%s", catalogName), "org.apache.polaris.spark.SparkCatalog") - .config( - "spark.sql.extensions", - "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.warehouse.dir", warehouseDir.toString()) .config(String.format("spark.sql.catalog.%s.type", catalogName), "rest") .config( @@ -54,26 +61,6 @@ protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2"); } - @Override - protected void cleanupCatalog(String catalogName) { - onSpark("USE " + catalogName); - List namespaces = onSpark("SHOW NAMESPACES").collectAsList(); - for (Row namespace : namespaces) { - // TODO: once all table operations are supported, remove the override of this function - // List tables = onSpark("SHOW TABLES IN " + namespace.getString(0)).collectAsList(); - // for (Row table : tables) { - // onSpark("DROP TABLE " + namespace.getString(0) + "." + table.getString(1)); - // } - List views = onSpark("SHOW VIEWS IN " + namespace.getString(0)).collectAsList(); - for (Row view : views) { - onSpark("DROP VIEW " + namespace.getString(0) + "." + view.getString(1)); - } - onSpark("DROP NAMESPACE " + namespace.getString(0)); - } - - managementApi.deleteCatalog(catalogName); - } - @FormatMethod protected List sql(String query, Object... args) { List rows = spark.sql(String.format(query, args)).collectAsList(); @@ -108,4 +95,19 @@ private Object[] toJava(Row row) { }) .toArray(Object[]::new); } + + /** List the name of directories under a given path non-recursively. */ + protected List listDirs(String path) { + File directory = new File(path); + return FileUtils.listFilesAndDirs( + directory, FalseFileFilter.INSTANCE, DirectoryFileFilter.DIRECTORY) + .stream() + .map(File::getName) + .toList(); + } + + /** Generate a string name with given prefix and a random suffix */ + protected String generateName(String prefix) { + return prefix + "_" + UUID.randomUUID().toString().replaceAll("-", ""); + } } diff --git a/plugins/spark/v3.5/integration/src/intTest/resources/logback.xml b/plugins/spark/v3.5/integration/src/intTest/resources/logback.xml new file mode 100644 index 0000000000..f0990845b1 --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/resources/logback.xml @@ -0,0 +1,32 @@ + + + + + + %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level - %msg%n + + + + + + + diff --git a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java index e88628a70b..ab5353e7c9 100644 --- a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java +++ b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java @@ -151,13 +151,20 @@ public Table createTable( String provider = properties.get(PolarisCatalogUtils.TABLE_PROVIDER_KEY); if (PolarisCatalogUtils.useIceberg(provider)) { return this.icebergsSparkCatalog.createTable(ident, schema, transforms, properties); - } else if (PolarisCatalogUtils.useDelta(provider)) { - // For delta table, we load the delta catalog to help dealing with the - // delta log creation. - TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog); - return deltaCatalog.createTable(ident, schema, transforms, properties); } else { - return this.polarisSparkCatalog.createTable(ident, schema, transforms, properties); + if (PolarisCatalogUtils.isTableWithSparkManagedLocation(properties)) { + throw new UnsupportedOperationException( + "Table with spark managed location is currently not supported by Polaris. Please provide location or path to the table."); + } + + if (PolarisCatalogUtils.useDelta(provider)) { + // For delta table, we load the delta catalog to help dealing with the + // delta log creation. + TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog); + return deltaCatalog.createTable(ident, schema, transforms, properties); + } else { + return this.polarisSparkCatalog.createTable(ident, schema, transforms, properties); + } } } diff --git a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java index 01a4af45da..8dac78b23c 100644 --- a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java +++ b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java @@ -50,6 +50,16 @@ public static boolean useDelta(String provider) { return "delta".equalsIgnoreCase(provider); } + /** + * For tables whose location is manged by Spark Session Catalog, there will be no location or path + * in the properties. + */ + public static boolean isTableWithSparkManagedLocation(Map properties) { + boolean hasLocationClause = properties.containsKey(TableCatalog.PROP_LOCATION); + boolean hasPathClause = properties.containsKey(TABLE_PATH_KEY); + return !hasLocationClause && !hasPathClause; + } + /** * Load spark table using DataSourceV2. * From 825aebff819de2b47b09656a99fdc30fdd38a17d Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Wed, 30 Apr 2025 13:37:20 -0700 Subject: [PATCH 2/3] add change --- plugins/spark/v3.5/integration/build.gradle.kts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugins/spark/v3.5/integration/build.gradle.kts b/plugins/spark/v3.5/integration/build.gradle.kts index 90a39b08c7..375753a8b0 100644 --- a/plugins/spark/v3.5/integration/build.gradle.kts +++ b/plugins/spark/v3.5/integration/build.gradle.kts @@ -55,8 +55,10 @@ dependencies { exclude("org.apache.logging.log4j", "log4j-core") exclude("org.slf4j", "jul-to-slf4j") } + // enforce the usage of log4j 2.24.3 for log4j-api compatibility testRuntimeOnly("org.apache.logging.log4j:log4j-core:2.24.3") testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl:2.24.3") + testImplementation("io.delta:delta-spark_${scalaVersion}:3.3.1") testImplementation(platform(libs.jackson.bom)) From 86ed3452760f5e9ba6ebf8274f3f5dacde4ef853 Mon Sep 17 00:00:00 2001 From: Yun Zou Date: Wed, 30 Apr 2025 18:30:46 -0700 Subject: [PATCH 3/3] add comment --- plugins/spark/v3.5/integration/build.gradle.kts | 6 +++--- .../v3.5/integration/src/intTest/resources/logback.xml | 8 +++++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/plugins/spark/v3.5/integration/build.gradle.kts b/plugins/spark/v3.5/integration/build.gradle.kts index 375753a8b0..04b2ea58b6 100644 --- a/plugins/spark/v3.5/integration/build.gradle.kts +++ b/plugins/spark/v3.5/integration/build.gradle.kts @@ -38,9 +38,9 @@ val scalaLibraryVersion = dependencies { // must be enforced to get a consistent and validated set of dependencies implementation(enforcedPlatform(libs.quarkus.bom)) { - exclude("org.antlr", "antlr4-runtime") - exclude("org.scala-lang", "scala-library") - exclude("org.scala-lang", "scala-reflect") + exclude(group = "org.antlr", module = "antlr4-runtime") + exclude(group = "org.scala-lang", module = "scala-library") + exclude(group = "org.scala-lang", module = "scala-reflect") } implementation(project(":polaris-quarkus-service")) diff --git a/plugins/spark/v3.5/integration/src/intTest/resources/logback.xml b/plugins/spark/v3.5/integration/src/intTest/resources/logback.xml index f0990845b1..5ec1efd5fe 100644 --- a/plugins/spark/v3.5/integration/src/intTest/resources/logback.xml +++ b/plugins/spark/v3.5/integration/src/intTest/resources/logback.xml @@ -18,6 +18,12 @@ specific language governing permissions and limitations under the License. +--> + @@ -26,7 +32,7 @@ - +