Skip to content

[SPARK-52159][SQL] Properly handle table existence check for jdbc dialects #50835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,32 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
}
}

test("SPARK-48618: Test table does not exists error") {
val tbl = s"$catalogName.tbl1"
val sqlStatement = s"SELECT * FROM $tbl"
val startPos = sqlStatement.indexOf(tbl)

withTable(tbl) {
sql(s"CREATE TABLE $tbl (col1 INT, col2 INT)")
sql(s"INSERT INTO $tbl VALUES (1, 2)")
val df = sql(sqlStatement)
val row = df.collect()
assert(row.length === 1)

// Drop the table
sql(s"DROP TABLE IF EXISTS $tbl")

checkError(
exception = intercept[AnalysisException] {
sql(sqlStatement).collect()
},
condition = "TABLE_OR_VIEW_NOT_FOUND",
parameters = Map("relationName" -> s"`$catalogName`.`tbl1`"),
context = ExpectedContext(tbl, startPos, startPos + tbl.length - 1)
)
}
}

def testDatetime(tbl: String): Unit = {}

test("scan with filter push-down with date time functions") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.util
import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import scala.util.Try
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException, TaskContext}
Expand Down Expand Up @@ -64,18 +64,21 @@ object JdbcUtils extends Logging with SQLConfHelper {
def tableExists(conn: Connection, options: JdbcOptionsInWrite): Boolean = {
val dialect = JdbcDialects.get(options.url)

// Somewhat hacky, but there isn't a good way to identify whether a table exists for all
// SQL database systems using JDBC meta data calls, considering "table" could also include
// the database name. Query used to find table exists can be overridden by the dialects.
Try {
val executionResult = Try {
val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.table))
try {
statement.setQueryTimeout(options.queryTimeout)
statement.executeQuery()
statement.executeQuery() // Success means table exists (query executed without error)
} finally {
statement.close()
}
}.isSuccess
}

executionResult match {
case Success(_) => true
case Failure(e: SQLException) if dialect.isObjectNotFoundException(e) => false
case Failure(e) => throw e // Re-throw unexpected exceptions
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLe
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)

override def isObjectNotFoundException(e: SQLException): Boolean = {
e.getErrorCode == -204
}

class DB2SQLBuilder extends JDBCSQLBuilder {

override def visitAggregateFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.Connection
import java.sql.{Connection, SQLException}

import scala.collection.mutable.ArrayBuilder

Expand All @@ -31,6 +31,10 @@ private case class DatabricksDialect() extends JdbcDialect with NoLegacyJDBCErro
url.startsWith("jdbc:databricks")
}

override def isObjectNotFoundException(e: SQLException): Boolean = {
e.getSQLState == "42P01" || e.getSQLState == "42704"
}

override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
sqlType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.Types
import java.sql.{SQLException, Types}
import java.util.Locale

import org.apache.spark.sql.connector.catalog.Identifier
Expand All @@ -38,6 +38,12 @@ private case class DerbyDialect() extends JdbcDialect with NoLegacyJDBCError {
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)

override def isObjectNotFoundException(e: SQLException): Boolean = {
e.getSQLState.equalsIgnoreCase("42Y07") ||
e.getSQLState.equalsIgnoreCase("42X05") ||
e.getSQLState.equalsIgnoreCase("X0X05")
}

override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.REAL) Option(FloatType) else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError {
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)

override def isObjectNotFoundException(e: SQLException): Boolean = {
Set(42102, 42103, 42104, 90079).contains(e.getErrorCode)
}

override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
sqlType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.{Connection, Date, Driver, ResultSetMetaData, Statement, Timestamp}
import java.sql.{Connection, Date, Driver, ResultSetMetaData, SQLException, Statement, Timestamp}
import java.time.{Instant, LocalDate, LocalDateTime}
import java.util
import java.util.ServiceLoader
Expand Down Expand Up @@ -758,6 +758,9 @@ abstract class JdbcDialect extends Serializable with Logging {
throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3182")
}

@Since("4.1.0")
def isObjectNotFoundException(e: SQLException): Boolean = true

/**
* Gets a dialect exception, classifies it and wraps it by `AnalysisException`.
* @param e The dialect specific exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver")

override def isObjectNotFoundException(e: SQLException): Boolean = {
e.getErrorCode == 208
}

// Microsoft SQL Server does not have the boolean type.
// Compile the boolean value to the bit data type instead.
// scalastyle:off line.size.limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)

override def isObjectNotFoundException(e: SQLException): Boolean = {
e.getErrorCode == 1146
}

class MySQLSQLBuilder extends JDBCSQLBuilder {

override def visitExtract(extract: Extract): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper with N
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)

override def isObjectNotFoundException(e: SQLException): Boolean = {
e.getMessage.contains("ORA-00942") ||
e.getMessage.contains("ORA-39165")
}

class OracleSQLBuilder extends JDBCSQLBuilder {

override def visitAggregateFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ private case class PostgresDialect()
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)

override def isObjectNotFoundException(e: SQLException): Boolean = {
e.getSQLState == "42P01" ||
e.getSQLState == "3F000" ||
e.getSQLState == "42704"
}

override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
sqlType match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.SQLException
import java.util.Locale

import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
Expand All @@ -26,6 +27,10 @@ private case class SnowflakeDialect() extends JdbcDialect with NoLegacyJDBCError
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:snowflake")

override def isObjectNotFoundException(e: SQLException): Boolean = {
e.getSQLState == "002003"
}

override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case BooleanType =>
// By default, BOOLEAN is mapped to BIT(1).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.jdbc

import java.sql.Types
import java.sql.{SQLException, Types}
import java.util.Locale

import org.apache.spark.sql.connector.catalog.Identifier
Expand All @@ -39,6 +39,10 @@ private case class TeradataDialect() extends JdbcDialect with NoLegacyJDBCError
override def isSupportedFunction(funcName: String): Boolean =
supportedFunctions.contains(funcName)

override def isObjectNotFoundException(e: SQLException): Boolean = {
e.getErrorCode == 3807
}

override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR(255)", java.sql.Types.VARCHAR))
case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
Expand Down