Skip to content

Commit 3df2038

Browse files
alekjarmovcloud-fan
authored andcommitted
[SPARK-52184][SQL] Wrap external engine JDBC syntax errors with a unified exception
### What changes were proposed in this pull request? This PR introduces a unified mechanism for handling SQL syntax errors thrown by external JDBC engines. It does so by: * Adding a new error condition: `JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR`. * Extending multiple JdbcDialect implementations to provide best-effort syntax error detection via `isSyntaxErrorBestEffort`. * Updating integration test suites to include a new shared test that verifies the wrapping behavior. ### Why are the changes needed? Current behavior rethrows the error if it's a syntax error on the external engine. Having it wrapped in a Spark exception enables for easier data scince. ### Does this PR introduce _any_ user-facing change? The users would get a better error message. ### How was this patch tested? Adding new integration tests. ### Was this patch authored or co-authored using generative AI tooling? Coauthored with GitHub Copilot. Closes apache#50918 from alekjarmov/wrap-external-engine-errors. Lead-authored-by: alekjarmov <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 65a8fd5 commit 3df2038

20 files changed

+163
-10
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3876,6 +3876,24 @@
38763876
},
38773877
"sqlState" : "42000"
38783878
},
3879+
"JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR" : {
3880+
"message" : [
3881+
"JDBC external engine syntax error. The error was caused by the query <jdbcQuery>."
3882+
],
3883+
"subClass" : {
3884+
"DURING_OUTPUT_SCHEMA_RESOLUTION" : {
3885+
"message" : [
3886+
"The error occurred during output schema resolution."
3887+
]
3888+
},
3889+
"DURING_QUERY_EXECUTION" : {
3890+
"message" : [
3891+
"The error occurred during query execution."
3892+
]
3893+
}
3894+
},
3895+
"sqlState" : "42000"
3896+
},
38793897
"JOIN_CONDITION_IS_NOT_BOOLEAN_TYPE" : {
38803898
"message" : [
38813899
"The join condition <joinCondition> has the invalid type <conditionType>, expected \"BOOLEAN\"."

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.tags.DockerTest
3737
* }}}
3838
*/
3939
@DockerTest
40-
class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
40+
class DB2IntegrationSuite extends DockerJDBCIntegrationSuite with SharedJDBCIntegrationTests {
4141
override val db = new DB2DatabaseOnDocker
4242

4343
override def dataPreparation(conn: Connection): Unit = {

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MariaDBKrbIntegrationSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ import org.apache.spark.tags.DockerTest
3333
* }}}
3434
*/
3535
@DockerTest
36-
class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
36+
class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite
37+
with SharedJDBCIntegrationTests {
3738
override protected val userName = s"mariadb/$dockerIp"
3839
override protected val keytabFileName = "mariadb.keytab"
3940

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ import org.apache.spark.tags.DockerTest
4040
* }}}
4141
*/
4242
@DockerTest
43-
class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
43+
class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite
44+
with SharedJDBCIntegrationTests {
4445
override val db = new MsSQLServerDatabaseOnDocker
4546

4647
override def dataPreparation(conn: Connection): Unit = {

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ import org.apache.spark.tags.DockerTest
6161
* and with Oracle Express Edition versions 18.4.0 and 21.4.0
6262
*/
6363
@DockerTest
64-
class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSparkSession {
64+
class OracleIntegrationSuite extends DockerJDBCIntegrationSuite
65+
with SharedSparkSession
66+
with SharedJDBCIntegrationTests {
6567
import testImplicits._
6668

6769
override val db = new OracleDatabaseOnDocker

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.tags.DockerTest
4040
* }}}
4141
*/
4242
@DockerTest
43-
class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
43+
class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite with SharedJDBCIntegrationTests {
4444
override val db = new PostgresDatabaseOnDocker
4545

4646
override def dataPreparation(conn: Connection): Unit = {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.jdbc
19+
20+
import org.apache.spark.SparkException
21+
import org.apache.spark.sql.QueryTest
22+
23+
trait SharedJDBCIntegrationTests extends QueryTest {
24+
protected def jdbcUrl: String
25+
26+
test("SPARK-52184: Wrap external engine syntax error") {
27+
val e = intercept[SparkException] {
28+
spark.read.format("jdbc")
29+
.option("url", jdbcUrl)
30+
.option("query", "THIS IS NOT VALID SQL").load()
31+
}
32+
assert(e.getCondition.startsWith("JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR"))
33+
}
34+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
package org.apache.spark.sql.execution.datasources.jdbc
1919

20-
import java.sql.{Connection, PreparedStatement, ResultSet}
20+
import java.sql.{Connection, PreparedStatement, ResultSet, SQLException}
2121

2222
import scala.util.Using
2323
import scala.util.control.NonFatal
2424

25-
import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}
25+
import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, SparkException, TaskContext}
2626
import org.apache.spark.internal.{Logging, MDC}
2727
import org.apache.spark.internal.LogKeys.SQL_TEXT
2828
import org.apache.spark.rdd.RDD
@@ -59,7 +59,17 @@ object JDBCRDD extends Logging {
5959
val prepareQuery = options.prepareQuery
6060
val table = options.tableOrQuery
6161
val dialect = JdbcDialects.get(url)
62-
getQueryOutputSchema(prepareQuery + dialect.getSchemaQuery(table), options, dialect)
62+
val fullQuery = prepareQuery + dialect.getSchemaQuery(table)
63+
64+
try {
65+
getQueryOutputSchema(fullQuery, options, dialect)
66+
} catch {
67+
case e: SQLException if dialect.isSyntaxErrorBestEffort(e) =>
68+
throw new SparkException(
69+
errorClass = "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR.DURING_OUTPUT_SCHEMA_RESOLUTION",
70+
messageParameters = Map("jdbcQuery" -> fullQuery),
71+
cause = e)
72+
}
6373
}
6474

6575
def getQueryOutputSchema(
@@ -211,7 +221,6 @@ class JDBCRDD(
211221

212222
/**
213223
* Runs the SQL query against the JDBC driver.
214-
*
215224
*/
216225
override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow] = {
217226
var closed = false
@@ -284,7 +293,15 @@ class JDBCRDD(
284293
stmt.setQueryTimeout(options.queryTimeout)
285294

286295
val startTime = System.nanoTime
287-
rs = stmt.executeQuery()
296+
rs = try {
297+
stmt.executeQuery()
298+
} catch {
299+
case e: SQLException if dialect.isSyntaxErrorBestEffort(e) =>
300+
throw new SparkException(
301+
errorClass = "JDBC_EXTERNAL_ENGINE_SYNTAX_ERROR.DURING_QUERY_EXECUTION",
302+
messageParameters = Map("jdbcQuery" -> sqlText),
303+
cause = e)
304+
}
288305
val endTime = System.nanoTime
289306

290307
val executionTime = endTime - startTime

sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.jdbc
1919

20+
import java.sql.SQLException
21+
2022
import org.apache.spark.sql.types.{DataType, MetadataBuilder}
2123

2224
/**
@@ -55,6 +57,10 @@ private class AggregatedDialect(dialects: List[JdbcDialect])
5557
dialects.head.getSchemaQuery(table)
5658
}
5759

60+
override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = {
61+
dialects.head.isSyntaxErrorBestEffort(exception)
62+
}
63+
5864
override def isCascadingTruncateTable(): Option[Boolean] = {
5965
// If any dialect claims cascading truncate, this dialect is also cascading truncate.
6066
// Otherwise, if any dialect has unknown cascading truncate, this dialect is also unknown.

sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,11 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLe
122122

123123
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
124124

125+
// See https://www.ibm.com/docs/en/db2-for-zos/12.0.0?topic=codes-sqlstate-values-common-error
126+
override def isSyntaxErrorBestEffort(exception: SQLException): Boolean = {
127+
Option(exception.getSQLState).exists(_.startsWith("42"))
128+
}
129+
125130
// scalastyle:off line.size.limit
126131
// See https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0053474.html
127132
// scalastyle:on line.size.limit

0 commit comments

Comments
 (0)