From 94e9772ffd610ba08700fee594b72da869d4afbe Mon Sep 17 00:00:00 2001 From: alekjarmov Date: Fri, 16 May 2025 15:57:43 +0200 Subject: [PATCH 1/5] [SPARK-52184][SQL] Wrap external engine JDBC syntax errors with a unified exception --- .../resources/error/error-conditions.json | 6 ++++ .../spark/sql/jdbc/DB2IntegrationSuite.scala | 2 +- .../sql/jdbc/MariaDBKrbIntegrationSuite.scala | 3 +- .../jdbc/MsSqlServerIntegrationSuite.scala | 3 +- .../sql/jdbc/OracleIntegrationSuite.scala | 4 ++- .../sql/jdbc/PostgresIntegrationSuite.scala | 2 +- .../sql/jdbc/SharedJDBCIntegrationTests.scala | 34 +++++++++++++++++++ .../execution/datasources/jdbc/JDBCRDD.scala | 6 ++-- .../datasources/jdbc/JdbcUtils.scala | 21 ++++++++++-- .../spark/sql/jdbc/AggregatedDialect.scala | 6 ++++ .../apache/spark/sql/jdbc/DB2Dialect.scala | 5 +++ .../spark/sql/jdbc/DatabricksDialect.scala | 5 +++ .../apache/spark/sql/jdbc/DerbyDialect.scala | 5 +++ .../org/apache/spark/sql/jdbc/H2Dialect.scala | 5 +++ .../apache/spark/sql/jdbc/JdbcDialects.scala | 13 +++++++ .../spark/sql/jdbc/MsSqlServerDialect.scala | 9 +++++ .../apache/spark/sql/jdbc/MySQLDialect.scala | 5 +++ .../apache/spark/sql/jdbc/OracleDialect.scala | 5 +++ .../spark/sql/jdbc/PostgresDialect.scala | 5 +++ .../spark/sql/jdbc/SnowflakeDialect.scala | 6 ++++ .../spark/sql/jdbc/TeradataDialect.scala | 7 ++++ 21 files changed, 148 insertions(+), 9 deletions(-) create mode 100644 connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index e2fed243c4769..2677cf5c12a47 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3876,6 +3876,12 @@ }, "sqlState" : "42000" }, + "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR" : { + "message" : [ + "JDBC external engine syntax error in : " + ], + "sqlState" : "42000" + }, "JOIN_CONDITION_IS_NOT_BOOLEAN_TYPE" : { "message" : [ "The join condition has the invalid type , expected \"BOOLEAN\"." diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala index 1d33acfdee013..25695f144f558 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala @@ -37,7 +37,7 @@ import org.apache.spark.tags.DockerTest * }}} */ @DockerTest -class DB2IntegrationSuite extends DockerJDBCIntegrationSuite { +class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with SharedJDBCIntegrationTests { override val db = new DB2DatabaseOnDocker override def dataPreparation(conn: Connection): Unit = { diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala index 4f782bed77922..53e45baa99679 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala @@ -33,7 +33,8 @@ import org.apache.spark.tags.DockerTest * }}} */ @DockerTest -class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite { +class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite + with SharedJDBCIntegrationTests { override protected val userName = s"mariadb/$dockerIp" override protected val keytabFileName = "mariadb.keytab" diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index 62f088ebc2b6d..f72f3ecca6929 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -40,7 +40,8 @@ import org.apache.spark.tags.DockerTest * }}} */ @DockerTest -class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { +class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite + with SharedJDBCIntegrationTests { override val db = new MsSQLServerDatabaseOnDocker override def dataPreparation(conn: Connection): Unit = { diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index cf547b93aa0ba..33a89f4f20c4c 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -61,7 +61,9 @@ import org.apache.spark.tags.DockerTest * and with Oracle Express Edition versions 18.4.0 and 21.4.0 */ @DockerTest -class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSparkSession { +class OracleIntegrationSuite extends DockerJDBCIntegrationSuite + with SharedSparkSession + with SharedJDBCIntegrationTests { import testImplicits._ override val db = new OracleDatabaseOnDocker diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 5c985da226b06..9d1186463ca2a 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -40,7 +40,7 @@ import org.apache.spark.tags.DockerTest * }}} */ @DockerTest -class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { +class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with SharedJDBCIntegrationTests { override val db = new PostgresDatabaseOnDocker override def dataPreparation(conn: Connection): Unit = { diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala new file mode 100644 index 0000000000000..696bc21aafe41 --- /dev/null +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import org.apache.spark.SparkException +import org.apache.spark.sql.QueryTest + +trait SharedJDBCIntegrationTests extends QueryTest { + protected def jdbcUrl: String + + test("SPARK-52184: Wrap external engine syntax error") { + val e = intercept[SparkException] { + spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("query", "THIS IS NOT VALID SQL").load() + } + assert(e.getCondition === "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR") + } +} \ No newline at end of file diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 1b71dc9221f78..b4a06e9fd3ea5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -59,7 +59,9 @@ object JDBCRDD extends Logging { val prepareQuery = options.prepareQuery val table = options.tableOrQuery val dialect = JdbcDialects.get(url) - getQueryOutputSchema(prepareQuery + dialect.getSchemaQuery(table), options, dialect) + JdbcUtils.withWrapExternalEngineError(dialect, "output schema resolution") { + getQueryOutputSchema(prepareQuery + dialect.getSchemaQuery(table), options, dialect) + } } def getQueryOutputSchema( @@ -284,7 +286,7 @@ class JDBCRDD( stmt.setQueryTimeout(options.queryTimeout) val startTime = System.nanoTime - rs = stmt.executeQuery() + rs = JdbcUtils.withWrapExternalEngineError(dialect, "query execution") {stmt.executeQuery()} val endTime = System.nanoTime val executionTime = endTime - startTime diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 0077012e2b0e4..e89b8d05c70fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -28,8 +28,7 @@ import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal - -import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException, TaskContext} +import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException, TaskContext} import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.{DEFAULT_ISOLATION_LEVEL, ISOLATION_LEVEL} @@ -1303,4 +1302,22 @@ object JdbcUtils extends Logging with SQLConfHelper { conn.close() } } + + /** + * Wraps the external engine error in a SparkException. + * This is used to wrap the syntax error from the external engine + * to provide a better error message to the user. + */ + def withWrapExternalEngineError[T](dialect: JdbcDialect, source: String)(f: => T): T = { + try { + f + } catch { + case e: SQLException if dialect.isSyntaxErrorBestEffort(e) => + throw new SparkException( + errorClass = "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR", + messageParameters = Map("errorMessage" -> e.getMessage, "source" -> source), + cause = e + ) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala index 5e79dbbb4d72e..9d0060a39f4b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.jdbc +import java.sql.SQLException + import org.apache.spark.sql.types.{DataType, MetadataBuilder} /** @@ -55,6 +57,10 @@ private class AggregatedDialect(dialects: List[JdbcDialect]) dialects.head.getSchemaQuery(table) } + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + dialects.head.isSyntaxErrorBestEffort(exception) + } + override def isCascadingTruncateTable(): Option[Boolean] = { // If any dialect claims cascading truncate, this dialect is also cascading truncate. // Otherwise, if any dialect has unknown cascading truncate, this dialect is also unknown. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index b748975eef650..448427fbf866d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -122,6 +122,11 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLe override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + // See https://www.ibm.com/docs/en/db2-for-zos/12.0.0?topic=codes-sqlstate-values-common-error + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + exception.getSQLState.startsWith("42") + } + // scalastyle:off line.size.limit // See https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0053474.html // scalastyle:on line.size.limit diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala index cb3cfecd940b1..e787ed9047baa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala @@ -53,6 +53,11 @@ private case class DatabricksDialect() extends JdbcDialect with NoLegacyJDBCErro case _ => None } + // See https://docs.databricks.com/aws/en/error-messages/sqlstates + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + exception.getSQLState.startsWith("42") + } + override def quoteIdentifier(colName: String): String = { s"`$colName`" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala index f4e6e25f58dba..582c17a8e48be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala @@ -68,6 +68,11 @@ private case class DerbyDialect() extends JdbcDialect with NoLegacyJDBCError { override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + // See https://db.apache.org/derby/docs/10.15/ref/rrefexcept71493.html + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + exception.getSQLState.startsWith("42") + } + // See https://db.apache.org/derby/docs/10.15/ref/rrefsqljrenametablestatement.html override def renameTable(oldTable: Identifier, newTable: Identifier): String = { if (!oldTable.namespace().sameElements(newTable.namespace())) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 956e7c05cd5ff..e8fc4ed94a843 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -61,6 +61,11 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError { Set(42102, 42103, 42104, 90079).contains(e.getErrorCode) } + // See https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + exception.getSQLState == "42000" || exception.getSQLState == "42001" + } + override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { sqlType match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index d6fe564c1520c..d340286a04238 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -578,6 +578,19 @@ abstract class JdbcDialect extends Serializable with Logging { */ def isCascadingTruncateTable(): Option[Boolean] = None + /** + * Attempts to determine if the given SQLException is a SQL syntax error. + * + * This check is best-effort: it may not detect all syntax errors across all JDBC dialects. + * However, if this method returns true, the exception is guaranteed to be a syntax error. + * + * This is used to decide whether to wrap the exception in a more appropriate Spark exception. + * + * @return true if the exception is confidently identified as a syntax error; false otherwise. + */ + @Since("4.0.0") + def isSyntaxErrorBestEffort(exception: java.sql.SQLException): Boolean = false + /** * Rename an existing table. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 7efdc52f35be0..f6c2290018c41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -273,6 +273,15 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr override def getJdbcSQLQueryBuilder(options: JDBCOptions): JdbcSQLQueryBuilder = new MsSqlServerSQLQueryBuilder(this, options) + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + val exceptionMessage = exception.getMessage.toLowerCase(Locale.ROOT) + // scalastyle:off line.size.limit + // All errors are shown here there is no consistent error code to identify most syntax errors + // https://learn.microsoft.com/en-us/sql/relational-databases/errors-events/database-engine-events-and-errors?view=sql-server-ver16 + // scalastyle:on line.size.limit + exceptionMessage.contains("incorrect syntax") || exceptionMessage.contains("syntax error") + } + override def supportsLimit: Boolean = true } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 5b894e71619a2..ca7fba74b4c65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -220,6 +220,11 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + // See https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + exception.getSQLState.equals("42000") + } + // See https://dev.mysql.com/doc/refman/8.0/en/alter-table.html override def getUpdateColumnTypeQuery( tableName: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index 236d9469a58d4..c702046640a84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -173,6 +173,11 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper with N override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + // See https://docs.oracle.com/cd/E11882_01/appdev.112/e10827/appd.htm#g642406 + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + exception.getSQLState.equals("42000") + } + /** * The SQL query used to truncate a table. * @param table The table to truncate diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 73b10f72e21b6..cf122ffb1b424 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -243,6 +243,11 @@ private case class PostgresDialect() s" $indexType (${columnList.mkString(", ")}) $indexProperties" } + // See https://www.postgresql.org/docs/current/errcodes-appendix.html + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + exception.getSQLState.startsWith("42") + } + // SHOW INDEX syntax // https://www.postgresql.org/docs/14/view-pg-indexes.html override def indexExists( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala index d4ac21a453005..bc28f89adaddd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala @@ -38,4 +38,10 @@ private case class SnowflakeDialect() extends JdbcDialect with NoLegacyJDBCError Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN)) case _ => JdbcUtils.getCommonJDBCType(dt) } + + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + // There is no official documentation for SQL state in Snowflake, but they follow ANSI standard + // Tests also show that this is the error state for syntax error + exception.getSQLState.equals("42000") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala index bbdab81201fc9..2f9c45fd1805d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala @@ -53,6 +53,13 @@ private case class TeradataDialect() extends JdbcDialect with NoLegacyJDBCError // Teradata does not support cascading a truncation override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + // scalastyle:off line.size.limit + // See https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/SQL-Stored-Procedures-and-Embedded-SQL/SQLSTATE-Mappings/SQLSTATE-Codes + // scalastyle:on line.size.limit + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + exception.getSQLState.startsWith("42") + } + /** * The SQL query used to truncate a table. Teradata does not support the 'TRUNCATE' syntax that * other dialects use. Instead, we need to use a 'DELETE FROM' statement. From a508685f6768ddb0af7e1ca9f09d48f333ee82aa Mon Sep 17 00:00:00 2001 From: alekjarmov Date: Fri, 16 May 2025 16:02:31 +0200 Subject: [PATCH 2/5] add empty line --- .../org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala index 696bc21aafe41..ebd4cdd2e87ac 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala @@ -31,4 +31,4 @@ trait SharedJDBCIntegrationTests extends QueryTest { } assert(e.getCondition === "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR") } -} \ No newline at end of file +} From 2aacaf9b99f12484e028c6e724ec62b2aa37d3c7 Mon Sep 17 00:00:00 2001 From: alekjarmov Date: Fri, 16 May 2025 16:13:43 +0200 Subject: [PATCH 3/5] address pr comments --- .../resources/error/error-conditions.json | 14 ++++++++++- .../sql/jdbc/SharedJDBCIntegrationTests.scala | 2 +- .../execution/datasources/jdbc/JDBCRDD.scala | 24 +++++++++++++++---- .../datasources/jdbc/JdbcUtils.scala | 21 ++-------------- .../apache/spark/sql/jdbc/DB2Dialect.scala | 2 +- .../spark/sql/jdbc/DatabricksDialect.scala | 2 +- .../apache/spark/sql/jdbc/DerbyDialect.scala | 2 +- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 2 +- .../spark/sql/jdbc/MsSqlServerDialect.scala | 7 ++++-- .../apache/spark/sql/jdbc/MySQLDialect.scala | 2 +- .../apache/spark/sql/jdbc/OracleDialect.scala | 2 +- .../spark/sql/jdbc/PostgresDialect.scala | 2 +- .../spark/sql/jdbc/SnowflakeDialect.scala | 5 ++-- .../spark/sql/jdbc/TeradataDialect.scala | 2 +- 14 files changed, 51 insertions(+), 38 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 2677cf5c12a47..49aaced8439cb 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3878,8 +3878,20 @@ }, "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR" : { "message" : [ - "JDBC external engine syntax error in : " + "JDBC external engine syntax error." ], + "subClass" : { + "DURING_OUTPUT_SCHEMA_RESOLUTION" : { + "message" : [ + "The error occurred during output schema resolution." + ] + }, + "DURING_QUERY_EXECUTION" : { + "message" : [ + "The error occurred during query execution." + ] + } + }, "sqlState" : "42000" }, "JOIN_CONDITION_IS_NOT_BOOLEAN_TYPE" : { diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala index ebd4cdd2e87ac..33dc2f9c98112 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala @@ -29,6 +29,6 @@ trait SharedJDBCIntegrationTests extends QueryTest { .option("url", jdbcUrl) .option("query", "THIS IS NOT VALID SQL").load() } - assert(e.getCondition === "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR") + assert(e.getCondition.startsWith("JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR")) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index b4a06e9fd3ea5..afb840b5b5468 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -17,12 +17,12 @@ package org.apache.spark.sql.execution.datasources.jdbc -import java.sql.{Connection, PreparedStatement, ResultSet} +import java.sql.{Connection, PreparedStatement, ResultSet, SQLException} import scala.util.Using import scala.util.control.NonFatal -import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} +import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, SparkException, TaskContext} import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.SQL_TEXT import org.apache.spark.rdd.RDD @@ -59,8 +59,15 @@ object JDBCRDD extends Logging { val prepareQuery = options.prepareQuery val table = options.tableOrQuery val dialect = JdbcDialects.get(url) - JdbcUtils.withWrapExternalEngineError(dialect, "output schema resolution") { + + try { getQueryOutputSchema(prepareQuery + dialect.getSchemaQuery(table), options, dialect) + } catch { + case e: SQLException if dialect.isSyntaxErrorBestEffort(e) => + throw new SparkException( + errorClass = "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR.DURING_OUTPUT_SCHEMA_RESOLUTION", + messageParameters = Map.empty, + cause = e) } } @@ -213,7 +220,6 @@ class JDBCRDD( /** * Runs the SQL query against the JDBC driver. - * */ override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow] = { var closed = false @@ -286,7 +292,15 @@ class JDBCRDD( stmt.setQueryTimeout(options.queryTimeout) val startTime = System.nanoTime - rs = JdbcUtils.withWrapExternalEngineError(dialect, "query execution") {stmt.executeQuery()} + rs = try { + stmt.executeQuery() + } catch { + case e: SQLException if dialect.isSyntaxErrorBestEffort(e) => + throw new SparkException( + errorClass = "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR.DURING_QUERY_EXECUTION", + messageParameters = Map.empty, + cause = e) + } val endTime = System.nanoTime val executionTime = endTime - startTime diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index e89b8d05c70fe..0077012e2b0e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -28,7 +28,8 @@ import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal -import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException, TaskContext} + +import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException, TaskContext} import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.{DEFAULT_ISOLATION_LEVEL, ISOLATION_LEVEL} @@ -1302,22 +1303,4 @@ object JdbcUtils extends Logging with SQLConfHelper { conn.close() } } - - /** - * Wraps the external engine error in a SparkException. - * This is used to wrap the syntax error from the external engine - * to provide a better error message to the user. - */ - def withWrapExternalEngineError[T](dialect: JdbcDialect, source: String)(f: => T): T = { - try { - f - } catch { - case e: SQLException if dialect.isSyntaxErrorBestEffort(e) => - throw new SparkException( - errorClass = "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR", - messageParameters = Map("errorMessage" -> e.getMessage, "source" -> source), - cause = e - ) - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 448427fbf866d..4f70f9a65ae40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -124,7 +124,7 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLe // See https://www.ibm.com/docs/en/db2-for-zos/12.0.0?topic=codes-sqlstate-values-common-error override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { - exception.getSQLState.startsWith("42") + Option(exception.getSQLState).exists(_.startsWith("42")) } // scalastyle:off line.size.limit diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala index e787ed9047baa..f4fc670470328 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala @@ -55,7 +55,7 @@ private case class DatabricksDialect() extends JdbcDialect with NoLegacyJDBCErro // See https://docs.databricks.com/aws/en/error-messages/sqlstates override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { - exception.getSQLState.startsWith("42") + Option(exception.getSQLState).exists(_.startsWith("42")) } override def quoteIdentifier(colName: String): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala index 582c17a8e48be..3f5d1612a67b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala @@ -70,7 +70,7 @@ private case class DerbyDialect() extends JdbcDialect with NoLegacyJDBCError { // See https://db.apache.org/derby/docs/10.15/ref/rrefexcept71493.html override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { - exception.getSQLState.startsWith("42") + Option(exception.getSQLState).exists(_.startsWith("42")) } // See https://db.apache.org/derby/docs/10.15/ref/rrefsqljrenametablestatement.html diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index e8fc4ed94a843..b5ee88aebd7d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -63,7 +63,7 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError { // See https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { - exception.getSQLState == "42000" || exception.getSQLState == "42001" + Option(exception.getSQLState).exists(_.startsWith("42")) } override def getCatalystType( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index f6c2290018c41..3f06ea1a2fbfc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -274,9 +274,12 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr new MsSqlServerSQLQueryBuilder(this, options) override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { - val exceptionMessage = exception.getMessage.toLowerCase(Locale.ROOT) + val exceptionMessage = Option(exception.getMessage) + .map(_.toLowerCase(Locale.ROOT)) + .getOrElse("") // scalastyle:off line.size.limit - // All errors are shown here there is no consistent error code to identify most syntax errors + // All errors are shown here, but there is no consistent error code to identify + // most syntax errors so we have to base off the exception message. // https://learn.microsoft.com/en-us/sql/relational-databases/errors-events/database-engine-events-and-errors?view=sql-server-ver16 // scalastyle:on line.size.limit exceptionMessage.contains("incorrect syntax") || exceptionMessage.contains("syntax error") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index ca7fba74b4c65..19377057844e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -222,7 +222,7 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No // See https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { - exception.getSQLState.equals("42000") + "42000".equals(exception.getSQLState) } // See https://dev.mysql.com/doc/refman/8.0/en/alter-table.html diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index c702046640a84..a9f6a727a7241 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -175,7 +175,7 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper with N // See https://docs.oracle.com/cd/E11882_01/appdev.112/e10827/appd.htm#g642406 override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { - exception.getSQLState.equals("42000") + "42000".equals(exception.getSQLState) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index cf122ffb1b424..b5a0dd95e60c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -245,7 +245,7 @@ private case class PostgresDialect() // See https://www.postgresql.org/docs/current/errcodes-appendix.html override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { - exception.getSQLState.startsWith("42") + Option(exception.getSQLState).exists(_.startsWith("42")) } // SHOW INDEX syntax diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala index bc28f89adaddd..1c88a554863ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala @@ -41,7 +41,8 @@ private case class SnowflakeDialect() extends JdbcDialect with NoLegacyJDBCError override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { // There is no official documentation for SQL state in Snowflake, but they follow ANSI standard - // Tests also show that this is the error state for syntax error - exception.getSQLState.equals("42000") + // where 42000 SQLState is used for syntax errors. + // Manual tests also show that this is the error state for syntax error + "42000".equals(exception.getSQLState) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala index 2f9c45fd1805d..f855275ce4149 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala @@ -57,7 +57,7 @@ private case class TeradataDialect() extends JdbcDialect with NoLegacyJDBCError // See https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/SQL-Stored-Procedures-and-Embedded-SQL/SQLSTATE-Mappings/SQLSTATE-Codes // scalastyle:on line.size.limit override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { - exception.getSQLState.startsWith("42") + Option(exception.getSQLState).exists(_.startsWith("42")) } /** From 0066daf4f8b87e4ee44a9aef92c1513dbd58dfca Mon Sep 17 00:00:00 2001 From: alekjarmov Date: Mon, 19 May 2025 16:32:39 +0200 Subject: [PATCH 4/5] add executed query text to error message --- .../utils/src/main/resources/error/error-conditions.json | 2 +- .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 49aaced8439cb..66dd6106e3c22 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3878,7 +3878,7 @@ }, "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR" : { "message" : [ - "JDBC external engine syntax error." + "JDBC external engine syntax error. The error was caused by the query ." ], "subClass" : { "DURING_OUTPUT_SCHEMA_RESOLUTION" : { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index afb840b5b5468..8342ae06da01a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -59,14 +59,15 @@ object JDBCRDD extends Logging { val prepareQuery = options.prepareQuery val table = options.tableOrQuery val dialect = JdbcDialects.get(url) + val fullQuery = prepareQuery + dialect.getSchemaQuery(table) try { - getQueryOutputSchema(prepareQuery + dialect.getSchemaQuery(table), options, dialect) + getQueryOutputSchema(fullQuery, options, dialect) } catch { case e: SQLException if dialect.isSyntaxErrorBestEffort(e) => throw new SparkException( errorClass = "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR.DURING_OUTPUT_SCHEMA_RESOLUTION", - messageParameters = Map.empty, + messageParameters = Map("jdbcQuery" -> fullQuery), cause = e) } } @@ -298,7 +299,7 @@ class JDBCRDD( case e: SQLException if dialect.isSyntaxErrorBestEffort(e) => throw new SparkException( errorClass = "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR.DURING_QUERY_EXECUTION", - messageParameters = Map.empty, + messageParameters = Map("jdbcQuery" -> sqlText), cause = e) } val endTime = System.nanoTime From 1a3ec008812d6f758205e33f756a0da1ac13c310 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 20 May 2025 17:17:07 +0800 Subject: [PATCH 5/5] Update sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala --- .../src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index d340286a04238..da0df734bbeca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -588,7 +588,7 @@ abstract class JdbcDialect extends Serializable with Logging { * * @return true if the exception is confidently identified as a syntax error; false otherwise. */ - @Since("4.0.0") + @Since("4.1.0") def isSyntaxErrorBestEffort(exception: java.sql.SQLException): Boolean = false /**