Skip to content

[SPARK-52184][SQL] Wrap external engine JDBC syntax errors with a unified exception #50918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3876,6 +3876,24 @@
},
"sqlState" : "42000"
},
"JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR" : {
"message" : [
"JDBC external engine syntax error. The error was caused by the query <jdbcQuery>."
],
"subClass" : {
"DURING_OUTPUT_SCHEMA_RESOLUTION" : {
"message" : [
"The error occurred during output schema resolution."
]
},
"DURING_QUERY_EXECUTION" : {
"message" : [
"The error occurred during query execution."
]
}
},
"sqlState" : "42000"
},
"JOIN_CONDITION_IS_NOT_BOOLEAN_TYPE" : {
"message" : [
"The join condition <joinCondition> has the invalid type <conditionType>, expected \"BOOLEAN\"."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.tags.DockerTest
* }}}
*/
@DockerTest
class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with SharedJDBCIntegrationTests {
override val db = new DB2DatabaseOnDocker

override def dataPreparation(conn: Connection): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import org.apache.spark.tags.DockerTest
* }}}
*/
@DockerTest
class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite
with SharedJDBCIntegrationTests {
override protected val userName = s"mariadb/$dockerIp"
override protected val keytabFileName = "mariadb.keytab"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import org.apache.spark.tags.DockerTest
* }}}
*/
@DockerTest
class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite
with SharedJDBCIntegrationTests {
override val db = new MsSQLServerDatabaseOnDocker

override def dataPreparation(conn: Connection): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ import org.apache.spark.tags.DockerTest
* and with Oracle Express Edition versions 18.4.0 and 21.4.0
*/
@DockerTest
class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSparkSession {
class OracleIntegrationSuite extends DockerJDBCIntegrationSuite
with SharedSparkSession
with SharedJDBCIntegrationTests {
import testImplicits._

override val db = new OracleDatabaseOnDocker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.tags.DockerTest
* }}}
*/
@DockerTest
class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with SharedJDBCIntegrationTests {
override val db = new PostgresDatabaseOnDocker

override def dataPreparation(conn: Connection): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc

import org.apache.spark.SparkException
import org.apache.spark.sql.QueryTest

trait SharedJDBCIntegrationTests extends QueryTest {
protected def jdbcUrl: String

test("SPARK-52184: Wrap external engine syntax error") {
val e = intercept[SparkException] {
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", "THIS IS NOT VALID SQL").load()
}
assert(e.getCondition.startsWith("JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Connection, PreparedStatement, ResultSet}
import java.sql.{Connection, PreparedStatement, ResultSet, SQLException}

import scala.util.Using
import scala.util.control.NonFatal

import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}
import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, SparkException, TaskContext}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.SQL_TEXT
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -59,7 +59,17 @@ object JDBCRDD extends Logging {
val prepareQuery = options.prepareQuery
val table = options.tableOrQuery
val dialect = JdbcDialects.get(url)
getQueryOutputSchema(prepareQuery + dialect.getSchemaQuery(table), options, dialect)
val fullQuery = prepareQuery + dialect.getSchemaQuery(table)

try {
getQueryOutputSchema(fullQuery, options, dialect)
} catch {
case e: SQLException if dialect.isSyntaxErrorBestEffort(e) =>
throw new SparkException(
errorClass = "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR.DURING_OUTPUT_SCHEMA_RESOLUTION",
messageParameters = Map("jdbcQuery" -> fullQuery),
cause = e)
}
}

def getQueryOutputSchema(
Expand Down Expand Up @@ -211,7 +221,6 @@ class JDBCRDD(

/**
* Runs the SQL query against the JDBC driver.
*
*/
override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow] = {
var closed = false
Expand Down Expand Up @@ -284,7 +293,15 @@ class JDBCRDD(
stmt.setQueryTimeout(options.queryTimeout)

val startTime = System.nanoTime
rs = stmt.executeQuery()
rs = try {
stmt.executeQuery()
} catch {
case e: SQLException if dialect.isSyntaxErrorBestEffort(e) =>
throw new SparkException(
errorClass = "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR.DURING_QUERY_EXECUTION",
messageParameters = Map("jdbcQuery" -> sqlText),
cause = e)
}
val endTime = System.nanoTime

val executionTime = endTime - startTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.jdbc

import java.sql.SQLException

import org.apache.spark.sql.types.{DataType, MetadataBuilder}

/**
Expand Down Expand Up @@ -55,6 +57,10 @@ private class AggregatedDialect(dialects: List[JdbcDialect])
dialects.head.getSchemaQuery(table)
}

override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = {
dialects.head.isSyntaxErrorBestEffort(exception)
}

override def isCascadingTruncateTable(): Option[Boolean] = {
// If any dialect claims cascading truncate, this dialect is also cascading truncate.
// Otherwise, if any dialect has unknown cascading truncate, this dialect is also unknown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLe

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)

// See https://www.ibm.com/docs/en/db2-for-zos/12.0.0?topic=codes-sqlstate-values-common-error
override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = {
Option(exception.getSQLState).exists(_.startsWith("42"))
}

// scalastyle:off line.size.limit
// See https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0053474.html
// scalastyle:on line.size.limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ private case class DatabricksDialect() extends JdbcDialect with NoLegacyJDBCErro
case _ => None
}

// See https://docs.databricks.com/aws/en/error-messages/sqlstates
override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = {
Option(exception.getSQLState).exists(_.startsWith("42"))
}

override def quoteIdentifier(colName: String): String = {
s"`$colName`"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ private case class DerbyDialect() extends JdbcDialect with NoLegacyJDBCError {

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)

// See https://db.apache.org/derby/docs/10.15/ref/rrefexcept71493.html
override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = {
Option(exception.getSQLState).exists(_.startsWith("42"))
}

// See https://db.apache.org/derby/docs/10.15/ref/rrefsqljrenametablestatement.html
override def renameTable(oldTable: Identifier, newTable: Identifier): String = {
if (!oldTable.namespace().sameElements(newTable.namespace())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError {
Set(42102, 42103, 42104, 90079).contains(e.getErrorCode)
}

// See https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html
override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = {
Option(exception.getSQLState).exists(_.startsWith("42"))
}

override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
sqlType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,19 @@ abstract class JdbcDialect extends Serializable with Logging {
*/
def isCascadingTruncateTable(): Option[Boolean] = None

/**
* Attempts to determine if the given SQLException is a SQL syntax error.
*
* This check is best-effort: it may not detect all syntax errors across all JDBC dialects.
* However, if this method returns true, the exception is guaranteed to be a syntax error.
*
* This is used to decide whether to wrap the exception in a more appropriate Spark exception.
*
* @return true if the exception is confidently identified as a syntax error; false otherwise.
*/
@Since("4.1.0")
def isSyntaxErrorBestEffort(exception: java.sql.SQLException): Boolean = false

/**
* Rename an existing table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,18 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr
override def getJdbcSQLQueryBuilder(options: JDBCOptions): JdbcSQLQueryBuilder =
new MsSqlServerSQLQueryBuilder(this, options)

override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = {
val exceptionMessage = Option(exception.getMessage)
.map(_.toLowerCase(Locale.ROOT))
.getOrElse("")
// scalastyle:off line.size.limit
// All errors are shown here, but there is no consistent error code to identify
// most syntax errors so we have to base off the exception message.
// https://learn.microsoft.com/en-us/sql/relational-databases/errors-events/database-engine-events-and-errors?view=sql-server-ver16
// scalastyle:on line.size.limit
exceptionMessage.contains("incorrect syntax") || exceptionMessage.contains("syntax error")
}

override def supportsLimit: Boolean = true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)

// See https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html
override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = {
"42000".equals(exception.getSQLState)
}

// See https://dev.mysql.com/doc/refman/8.0/en/alter-table.html
override def getUpdateColumnTypeQuery(
tableName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper with N

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)

// See https://docs.oracle.com/cd/E11882_01/appdev.112/e10827/appd.htm#g642406
override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = {
"42000".equals(exception.getSQLState)
}

/**
* The SQL query used to truncate a table.
* @param table The table to truncate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ private case class PostgresDialect()
s" $indexType (${columnList.mkString(", ")}) $indexProperties"
}

// See https://www.postgresql.org/docs/current/errcodes-appendix.html
override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = {
Option(exception.getSQLState).exists(_.startsWith("42"))
}

// SHOW INDEX syntax
// https://www.postgresql.org/docs/14/view-pg-indexes.html
override def indexExists(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,11 @@ private case class SnowflakeDialect() extends JdbcDialect with NoLegacyJDBCError
Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
case _ => JdbcUtils.getCommonJDBCType(dt)
}

override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = {
// There is no official documentation for SQL state in Snowflake, but they follow ANSI standard
// where 42000 SQLState is used for syntax errors.
// Manual tests also show that this is the error state for syntax error
"42000".equals(exception.getSQLState)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ private case class TeradataDialect() extends JdbcDialect with NoLegacyJDBCError
// Teradata does not support cascading a truncation
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)

// scalastyle:off line.size.limit
// See https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/SQL-Stored-Procedures-and-Embedded-SQL/SQLSTATE-Mappings/SQLSTATE-Codes
// scalastyle:on line.size.limit
override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = {
Option(exception.getSQLState).exists(_.startsWith("42"))
}

/**
* The SQL query used to truncate a table. Teradata does not support the 'TRUNCATE' syntax that
* other dialects use. Instead, we need to use a 'DELETE FROM' statement.
Expand Down