diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index e2fed243c4769..66dd6106e3c22 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3876,6 +3876,24 @@ }, "sqlState" : "42000" }, + "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR" : { + "message" : [ + "JDBC external engine syntax error. The error was caused by the query ." + ], + "subClass" : { + "DURING_OUTPUT_SCHEMA_RESOLUTION" : { + "message" : [ + "The error occurred during output schema resolution." + ] + }, + "DURING_QUERY_EXECUTION" : { + "message" : [ + "The error occurred during query execution." + ] + } + }, + "sqlState" : "42000" + }, "JOIN_CONDITION_IS_NOT_BOOLEAN_TYPE" : { "message" : [ "The join condition has the invalid type , expected \"BOOLEAN\"." diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala index 1d33acfdee013..25695f144f558 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala @@ -37,7 +37,7 @@ import org.apache.spark.tags.DockerTest * }}} */ @DockerTest -class DB2IntegrationSuite extends DockerJDBCIntegrationSuite { +class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with SharedJDBCIntegrationTests { override val db = new DB2DatabaseOnDocker override def dataPreparation(conn: Connection): Unit = { diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala index 4f782bed77922..53e45baa99679 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala @@ -33,7 +33,8 @@ import org.apache.spark.tags.DockerTest * }}} */ @DockerTest -class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite { +class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite + with SharedJDBCIntegrationTests { override protected val userName = s"mariadb/$dockerIp" override protected val keytabFileName = "mariadb.keytab" diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index 62f088ebc2b6d..f72f3ecca6929 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -40,7 +40,8 @@ import org.apache.spark.tags.DockerTest * }}} */ @DockerTest -class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { +class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite + with SharedJDBCIntegrationTests { override val db = new MsSQLServerDatabaseOnDocker override def dataPreparation(conn: Connection): Unit = { diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index cf547b93aa0ba..33a89f4f20c4c 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -61,7 +61,9 @@ import org.apache.spark.tags.DockerTest * and with Oracle Express Edition versions 18.4.0 and 21.4.0 */ @DockerTest -class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSparkSession { +class OracleIntegrationSuite extends DockerJDBCIntegrationSuite + with SharedSparkSession + with SharedJDBCIntegrationTests { import testImplicits._ override val db = new OracleDatabaseOnDocker diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 5c985da226b06..9d1186463ca2a 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -40,7 +40,7 @@ import org.apache.spark.tags.DockerTest * }}} */ @DockerTest -class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { +class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with SharedJDBCIntegrationTests { override val db = new PostgresDatabaseOnDocker override def dataPreparation(conn: Connection): Unit = { diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala new file mode 100644 index 0000000000000..33dc2f9c98112 --- /dev/null +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/SharedJDBCIntegrationTests.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import org.apache.spark.SparkException +import org.apache.spark.sql.QueryTest + +trait SharedJDBCIntegrationTests extends QueryTest { + protected def jdbcUrl: String + + test("SPARK-52184: Wrap external engine syntax error") { + val e = intercept[SparkException] { + spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("query", "THIS IS NOT VALID SQL").load() + } + assert(e.getCondition.startsWith("JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR")) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index 1b71dc9221f78..8342ae06da01a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -17,12 +17,12 @@ package org.apache.spark.sql.execution.datasources.jdbc -import java.sql.{Connection, PreparedStatement, ResultSet} +import java.sql.{Connection, PreparedStatement, ResultSet, SQLException} import scala.util.Using import scala.util.control.NonFatal -import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} +import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, SparkException, TaskContext} import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.SQL_TEXT import org.apache.spark.rdd.RDD @@ -59,7 +59,17 @@ object JDBCRDD extends Logging { val prepareQuery = options.prepareQuery val table = options.tableOrQuery val dialect = JdbcDialects.get(url) - getQueryOutputSchema(prepareQuery + dialect.getSchemaQuery(table), options, dialect) + val fullQuery = prepareQuery + dialect.getSchemaQuery(table) + + try { + getQueryOutputSchema(fullQuery, options, dialect) + } catch { + case e: SQLException if dialect.isSyntaxErrorBestEffort(e) => + throw new SparkException( + errorClass = "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR.DURING_OUTPUT_SCHEMA_RESOLUTION", + messageParameters = Map("jdbcQuery" -> fullQuery), + cause = e) + } } def getQueryOutputSchema( @@ -211,7 +221,6 @@ class JDBCRDD( /** * Runs the SQL query against the JDBC driver. - * */ override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow] = { var closed = false @@ -284,7 +293,15 @@ class JDBCRDD( stmt.setQueryTimeout(options.queryTimeout) val startTime = System.nanoTime - rs = stmt.executeQuery() + rs = try { + stmt.executeQuery() + } catch { + case e: SQLException if dialect.isSyntaxErrorBestEffort(e) => + throw new SparkException( + errorClass = "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR.DURING_QUERY_EXECUTION", + messageParameters = Map("jdbcQuery" -> sqlText), + cause = e) + } val endTime = System.nanoTime val executionTime = endTime - startTime diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala index 5e79dbbb4d72e..9d0060a39f4b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.jdbc +import java.sql.SQLException + import org.apache.spark.sql.types.{DataType, MetadataBuilder} /** @@ -55,6 +57,10 @@ private class AggregatedDialect(dialects: List[JdbcDialect]) dialects.head.getSchemaQuery(table) } + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + dialects.head.isSyntaxErrorBestEffort(exception) + } + override def isCascadingTruncateTable(): Option[Boolean] = { // If any dialect claims cascading truncate, this dialect is also cascading truncate. // Otherwise, if any dialect has unknown cascading truncate, this dialect is also unknown. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index b748975eef650..4f70f9a65ae40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -122,6 +122,11 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLe override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + // See https://www.ibm.com/docs/en/db2-for-zos/12.0.0?topic=codes-sqlstate-values-common-error + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + Option(exception.getSQLState).exists(_.startsWith("42")) + } + // scalastyle:off line.size.limit // See https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0053474.html // scalastyle:on line.size.limit diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala index cb3cfecd940b1..f4fc670470328 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala @@ -53,6 +53,11 @@ private case class DatabricksDialect() extends JdbcDialect with NoLegacyJDBCErro case _ => None } + // See https://docs.databricks.com/aws/en/error-messages/sqlstates + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + Option(exception.getSQLState).exists(_.startsWith("42")) + } + override def quoteIdentifier(colName: String): String = { s"`$colName`" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala index f4e6e25f58dba..3f5d1612a67b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala @@ -68,6 +68,11 @@ private case class DerbyDialect() extends JdbcDialect with NoLegacyJDBCError { override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + // See https://db.apache.org/derby/docs/10.15/ref/rrefexcept71493.html + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + Option(exception.getSQLState).exists(_.startsWith("42")) + } + // See https://db.apache.org/derby/docs/10.15/ref/rrefsqljrenametablestatement.html override def renameTable(oldTable: Identifier, newTable: Identifier): String = { if (!oldTable.namespace().sameElements(newTable.namespace())) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 956e7c05cd5ff..b5ee88aebd7d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -61,6 +61,11 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError { Set(42102, 42103, 42104, 90079).contains(e.getErrorCode) } + // See https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + Option(exception.getSQLState).exists(_.startsWith("42")) + } + override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { sqlType match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index d6fe564c1520c..da0df734bbeca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -578,6 +578,19 @@ abstract class JdbcDialect extends Serializable with Logging { */ def isCascadingTruncateTable(): Option[Boolean] = None + /** + * Attempts to determine if the given SQLException is a SQL syntax error. + * + * This check is best-effort: it may not detect all syntax errors across all JDBC dialects. + * However, if this method returns true, the exception is guaranteed to be a syntax error. + * + * This is used to decide whether to wrap the exception in a more appropriate Spark exception. + * + * @return true if the exception is confidently identified as a syntax error; false otherwise. + */ + @Since("4.1.0") + def isSyntaxErrorBestEffort(exception: java.sql.SQLException): Boolean = false + /** * Rename an existing table. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 7efdc52f35be0..3f06ea1a2fbfc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -273,6 +273,18 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr override def getJdbcSQLQueryBuilder(options: JDBCOptions): JdbcSQLQueryBuilder = new MsSqlServerSQLQueryBuilder(this, options) + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + val exceptionMessage = Option(exception.getMessage) + .map(_.toLowerCase(Locale.ROOT)) + .getOrElse("") + // scalastyle:off line.size.limit + // All errors are shown here, but there is no consistent error code to identify + // most syntax errors so we have to base off the exception message. + // https://learn.microsoft.com/en-us/sql/relational-databases/errors-events/database-engine-events-and-errors?view=sql-server-ver16 + // scalastyle:on line.size.limit + exceptionMessage.contains("incorrect syntax") || exceptionMessage.contains("syntax error") + } + override def supportsLimit: Boolean = true } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 5b894e71619a2..19377057844e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -220,6 +220,11 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + // See https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + "42000".equals(exception.getSQLState) + } + // See https://dev.mysql.com/doc/refman/8.0/en/alter-table.html override def getUpdateColumnTypeQuery( tableName: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index 236d9469a58d4..a9f6a727a7241 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -173,6 +173,11 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper with N override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + // See https://docs.oracle.com/cd/E11882_01/appdev.112/e10827/appd.htm#g642406 + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + "42000".equals(exception.getSQLState) + } + /** * The SQL query used to truncate a table. * @param table The table to truncate diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 73b10f72e21b6..b5a0dd95e60c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -243,6 +243,11 @@ private case class PostgresDialect() s" $indexType (${columnList.mkString(", ")}) $indexProperties" } + // See https://www.postgresql.org/docs/current/errcodes-appendix.html + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + Option(exception.getSQLState).exists(_.startsWith("42")) + } + // SHOW INDEX syntax // https://www.postgresql.org/docs/14/view-pg-indexes.html override def indexExists( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala index d4ac21a453005..1c88a554863ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala @@ -38,4 +38,11 @@ private case class SnowflakeDialect() extends JdbcDialect with NoLegacyJDBCError Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN)) case _ => JdbcUtils.getCommonJDBCType(dt) } + + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + // There is no official documentation for SQL state in Snowflake, but they follow ANSI standard + // where 42000 SQLState is used for syntax errors. + // Manual tests also show that this is the error state for syntax error + "42000".equals(exception.getSQLState) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala index bbdab81201fc9..f855275ce4149 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala @@ -53,6 +53,13 @@ private case class TeradataDialect() extends JdbcDialect with NoLegacyJDBCError // Teradata does not support cascading a truncation override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + // scalastyle:off line.size.limit + // See https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/SQL-Stored-Procedures-and-Embedded-SQL/SQLSTATE-Mappings/SQLSTATE-Codes + // scalastyle:on line.size.limit + override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = { + Option(exception.getSQLState).exists(_.startsWith("42")) + } + /** * The SQL query used to truncate a table. Teradata does not support the 'TRUNCATE' syntax that * other dialects use. Instead, we need to use a 'DELETE FROM' statement.