Skip to content

Commit 56bc18f

Browse files
committed
[SPARK-54130][SQL] Add detailed error messages for catalog assertion failures
1 parent b144965 commit 56bc18f

File tree

5 files changed

+222
-12
lines changed

5 files changed

+222
-12
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,10 @@ class InMemoryCatalog(
192192
override def createTable(
193193
tableDefinition: CatalogTable,
194194
ignoreIfExists: Boolean): Unit = synchronized {
195-
assert(tableDefinition.identifier.database.isDefined)
195+
assert(tableDefinition.identifier.database.isDefined,
196+
"Table identifier " + tableDefinition.identifier.quotedString +
197+
" is missing database name. " +
198+
"Cannot create table without a database specified.")
196199
val db = tableDefinition.identifier.database.get
197200
requireDbExists(db)
198201
val table = tableDefinition.identifier.table
@@ -313,7 +316,10 @@ class InMemoryCatalog(
313316
}
314317

315318
override def alterTable(tableDefinition: CatalogTable): Unit = synchronized {
316-
assert(tableDefinition.identifier.database.isDefined)
319+
assert(tableDefinition.identifier.database.isDefined,
320+
"Table identifier " + tableDefinition.identifier.quotedString +
321+
" is missing database name. " +
322+
"Cannot alter table without a database specified.")
317323
val db = tableDefinition.identifier.database.get
318324
requireTableExists(db, tableDefinition.identifier.table)
319325
val updatedProperties = tableDefinition.properties.filter(kv => kv._1 != "comment")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SQLFunction.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,15 @@ case class SQLFunction(
6161
owner: Option[String] = None,
6262
createTimeMs: Long = System.currentTimeMillis) extends UserDefinedFunction {
6363

64-
assert(exprText.nonEmpty || queryText.nonEmpty)
65-
assert((isTableFunc && returnType.isRight) || (!isTableFunc && returnType.isLeft))
64+
assert(exprText.nonEmpty || queryText.nonEmpty,
65+
"SQL function '" + name + "' is missing function body. " +
66+
"Either exprText or queryText must be defined. " +
67+
"Found: exprText=" + exprText + ", queryText=" + queryText + ".")
68+
assert((isTableFunc && returnType.isRight) || (!isTableFunc && returnType.isLeft),
69+
"SQL function '" + name + "' has mismatched function type and return type. " +
70+
"isTableFunc=" + isTableFunc + ", returnType.isRight=" + returnType.isRight + ", " +
71+
"returnType.isLeft=" + returnType.isLeft + ". " +
72+
"Table functions require Right[StructType] and scalar functions require Left[DataType].")
6673

6774
import SQLFunction._
6875

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1663,7 +1663,9 @@ class SessionCatalog(
16631663
.putString("__funcInputAlias", "true")
16641664
.build()
16651665
}
1666-
assert(!function.isTableFunc)
1666+
assert(!function.isTableFunc,
1667+
"Function '" + function.name + "' is a table function. " +
1668+
"Use makeSQLTableFunctionPlan() instead of makeSQLFunctionPlan().")
16671669
val funcName = function.name.funcName
16681670

16691671
// Use captured SQL configs when parsing a SQL function.
@@ -1674,7 +1676,10 @@ class SessionCatalog(
16741676
val inputParam = function.inputParam
16751677
val returnType = function.getScalarFuncReturnType
16761678
val (expression, query) = function.getExpressionAndQuery(parser, isTableFunc = false)
1677-
assert(expression.isDefined || query.isDefined)
1679+
assert(expression.isDefined || query.isDefined,
1680+
"SQL function '" + function.name + "' could not be parsed. " +
1681+
"Neither expression nor query could be extracted from function body. " +
1682+
"exprText=" + function.exprText + ", queryText=" + function.queryText + ".")
16781683

16791684
// Check function arguments
16801685
val paramSize = inputParam.map(_.size).getOrElse(0)
@@ -1763,12 +1768,17 @@ class SessionCatalog(
17631768
function: SQLFunction,
17641769
input: Seq[Expression],
17651770
outputAttrs: Seq[Attribute]): LogicalPlan = {
1766-
assert(function.isTableFunc)
1771+
assert(function.isTableFunc,
1772+
"Function '" + function.name + "' is a scalar function. " +
1773+
"Use makeSQLFunctionPlan() instead of makeSQLTableFunctionPlan().")
17671774
val funcName = function.name.funcName
17681775
val inputParam = function.inputParam
17691776
val returnParam = function.getTableFuncReturnCols
17701777
val (_, query) = function.getExpressionAndQuery(parser, isTableFunc = true)
1771-
assert(query.isDefined)
1778+
assert(query.isDefined,
1779+
"SQL table function '" + function.name + "' could not be parsed. " +
1780+
"Query could not be extracted from function body. " +
1781+
"queryText=" + function.queryText + ".")
17721782

17731783
// Check function arguments
17741784
val paramSize = inputParam.map(_.size).getOrElse(0)
@@ -1807,7 +1817,12 @@ class SessionCatalog(
18071817
query.get
18081818
}
18091819

1810-
assert(returnParam.length == outputAttrs.length)
1820+
assert(returnParam.length == outputAttrs.length,
1821+
"SQL table function '" + function.name + "' has mismatched return columns. " +
1822+
"Expected " + outputAttrs.length + " output attributes but found " +
1823+
returnParam.length + " return parameters. " +
1824+
"Return parameters: [" + returnParam.fieldNames.mkString(", ") + "], " +
1825+
"Output attributes: [" + outputAttrs.map(_.name).mkString(", ") + "].")
18111826
val output = returnParam.fields.zipWithIndex.map { case (param, i) =>
18121827
// Since we cannot get the output of a unresolved logical plan, we need
18131828
// to reference the output column of the lateral join by its position.
@@ -2390,7 +2405,9 @@ class SessionCatalog(
23902405
requireTableNotExists(newName)
23912406
val oldTable = getTableMetadata(oldName)
23922407
if (oldTable.tableType == CatalogTableType.MANAGED) {
2393-
assert(oldName.database.nonEmpty)
2408+
assert(oldName.database.nonEmpty,
2409+
"Table identifier " + oldName.quotedString + " is missing database name. " +
2410+
"Managed tables must have a database defined.")
23942411
val databaseLocation =
23952412
externalCatalog.getDatabase(oldName.database.get).locationUri
23962413
val newTableLocation = new Path(new Path(databaseLocation), format(newName.table))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,7 +1070,9 @@ case class UnresolvedCatalogRelation(
10701070
tableMeta: CatalogTable,
10711071
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(),
10721072
override val isStreaming: Boolean = false) extends UnresolvedLeafNode {
1073-
assert(tableMeta.identifier.database.isDefined)
1073+
assert(tableMeta.identifier.database.isDefined,
1074+
"Table identifier " + tableMeta.identifier.quotedString + " is missing database name. " +
1075+
"UnresolvedCatalogRelation requires a fully qualified table identifier with database.")
10741076
}
10751077

10761078
/**
@@ -1097,7 +1099,9 @@ case class HiveTableRelation(
10971099
tableStats: Option[Statistics] = None,
10981100
@transient prunedPartitions: Option[Seq[CatalogTablePartition]] = None)
10991101
extends LeafNode with MultiInstanceRelation with NormalizeableRelation {
1100-
assert(tableMeta.identifier.database.isDefined)
1102+
assert(tableMeta.identifier.database.isDefined,
1103+
"Table identifier " + tableMeta.identifier.quotedString + " is missing database name. " +
1104+
"HiveTableRelation requires a fully qualified table identifier with database.")
11011105
assert(DataTypeUtils.sameType(tableMeta.partitionSchema, partitionCols.toStructType))
11021106
assert(DataTypeUtils.sameType(tableMeta.dataSchema, dataCols.toStructType))
11031107

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM
3434
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
3535
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
3636
import org.apache.spark.sql.types._
37+
import org.apache.spark.util.Utils
3738

3839
class InMemorySessionCatalogSuite extends SessionCatalogSuite {
3940
protected val utils = new CatalogTestUtils {
@@ -2091,4 +2092,179 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
20912092
assert(exception.getMessage.matches(expectedPattern))
20922093
}
20932094
}
2095+
2096+
test("UnresolvedCatalogRelation requires database in identifier") {
2097+
val catalog = new SessionCatalog(newBasicCatalog())
2098+
val db = "test_db"
2099+
catalog.createDatabase(newDb(db), ignoreIfExists = false)
2100+
2101+
// Create a table with database
2102+
val validTable = CatalogTable(
2103+
identifier = TableIdentifier("test_table", Some(db)),
2104+
tableType = CatalogTableType.MANAGED,
2105+
storage = CatalogStorageFormat.empty,
2106+
schema = new StructType().add("id", IntegerType)
2107+
)
2108+
catalog.createTable(validTable, ignoreIfExists = false)
2109+
2110+
// Try to create UnresolvedCatalogRelation without database - should fail
2111+
val tableMetaWithoutDb = validTable.copy(
2112+
identifier = TableIdentifier("test_table", None)
2113+
)
2114+
2115+
val exception = intercept[AssertionError] {
2116+
UnresolvedCatalogRelation(tableMetaWithoutDb)
2117+
}
2118+
2119+
val expectedMessage =
2120+
"assertion failed: Table identifier `test_table` is missing database name. " +
2121+
"UnresolvedCatalogRelation requires a fully qualified table identifier with database."
2122+
assert(exception.getMessage === expectedMessage)
2123+
}
2124+
2125+
test("HiveTableRelation requires database in identifier") {
2126+
val catalog = new SessionCatalog(newBasicCatalog())
2127+
val db = "test_db"
2128+
catalog.createDatabase(newDb(db), ignoreIfExists = false)
2129+
2130+
// Create a table with database
2131+
val validTable = CatalogTable(
2132+
identifier = TableIdentifier("test_table", Some(db)),
2133+
tableType = CatalogTableType.MANAGED,
2134+
storage = CatalogStorageFormat.empty,
2135+
schema = new StructType()
2136+
.add("id", IntegerType)
2137+
.add("name", StringType)
2138+
)
2139+
2140+
// Try to create HiveTableRelation without database - should fail
2141+
val tableMetaWithoutDb = validTable.copy(
2142+
identifier = TableIdentifier("test_table", None)
2143+
)
2144+
2145+
val exception = intercept[AssertionError] {
2146+
HiveTableRelation(
2147+
tableMetaWithoutDb,
2148+
Seq(AttributeReference("id", IntegerType)()),
2149+
Seq.empty
2150+
)
2151+
}
2152+
2153+
val expectedMessage =
2154+
"assertion failed: Table identifier `test_table` is missing database name. " +
2155+
"HiveTableRelation requires a fully qualified table identifier with database."
2156+
assert(exception.getMessage === expectedMessage)
2157+
}
2158+
2159+
test("SQLFunction requires either exprText or queryText") {
2160+
// Test case 1: Neither exprText nor queryText provided
2161+
val exception1 = intercept[AssertionError] {
2162+
SQLFunction(
2163+
name = FunctionIdentifier("test_func"),
2164+
inputParam = None,
2165+
returnType = scala.util.Left(IntegerType),
2166+
exprText = None,
2167+
queryText = None,
2168+
comment = None,
2169+
deterministic = Some(true),
2170+
containsSQL = Some(false),
2171+
isTableFunc = false,
2172+
properties = Map.empty
2173+
)
2174+
}
2175+
2176+
val expectedMessage = "assertion failed: SQL function 'test_func' is missing function body. " +
2177+
"Either exprText or queryText must be defined. " +
2178+
"Found: exprText=None, queryText=None."
2179+
assert(exception1.getMessage === expectedMessage)
2180+
}
2181+
2182+
test("SQLFunction return type must match function type") {
2183+
// Test case: isTableFunc=true but returnType is Left (scalar type)
2184+
val exception = intercept[AssertionError] {
2185+
SQLFunction(
2186+
name = FunctionIdentifier("test_func"),
2187+
inputParam = None,
2188+
returnType = scala.util.Left(IntegerType), // Scalar return type
2189+
exprText = Some("SELECT 1"),
2190+
queryText = None,
2191+
comment = None,
2192+
deterministic = Some(true),
2193+
containsSQL = Some(true),
2194+
isTableFunc = true, // But marked as table function
2195+
properties = Map.empty
2196+
)
2197+
}
2198+
2199+
val expectedMessage =
2200+
"assertion failed: SQL function 'test_func' has mismatched function type " +
2201+
"and return type. " +
2202+
"isTableFunc=true, returnType.isRight=false, returnType.isLeft=true. " +
2203+
"Table functions require Right[StructType] and scalar functions require Left[DataType]."
2204+
assert(exception.getMessage === expectedMessage)
2205+
}
2206+
2207+
test("InMemoryCatalog.createTable requires database in identifier") {
2208+
val catalog = new InMemoryCatalog()
2209+
val db = "test_db"
2210+
val dbDefinition = CatalogDatabase(
2211+
name = db,
2212+
description = "test database",
2213+
locationUri = Utils.createTempDir().toURI,
2214+
properties = Map.empty
2215+
)
2216+
catalog.createDatabase(dbDefinition, ignoreIfExists = false)
2217+
2218+
// Try to create table without database - should fail
2219+
val tableWithoutDb = CatalogTable(
2220+
identifier = TableIdentifier("test_table", None),
2221+
tableType = CatalogTableType.MANAGED,
2222+
storage = CatalogStorageFormat.empty,
2223+
schema = new StructType().add("id", IntegerType)
2224+
)
2225+
2226+
val exception = intercept[AssertionError] {
2227+
catalog.createTable(tableWithoutDb, ignoreIfExists = false)
2228+
}
2229+
2230+
val expectedMessage =
2231+
"assertion failed: Table identifier `test_table` is missing database name. " +
2232+
"Cannot create table without a database specified."
2233+
assert(exception.getMessage === expectedMessage)
2234+
}
2235+
2236+
test("InMemoryCatalog.alterTable requires database in identifier") {
2237+
val catalog = new InMemoryCatalog()
2238+
val db = "test_db"
2239+
val dbDefinition = CatalogDatabase(
2240+
name = db,
2241+
description = "test database",
2242+
locationUri = Utils.createTempDir().toURI,
2243+
properties = Map.empty
2244+
)
2245+
catalog.createDatabase(dbDefinition, ignoreIfExists = false)
2246+
2247+
// First create a valid table
2248+
val validTable = CatalogTable(
2249+
identifier = TableIdentifier("test_table", Some(db)),
2250+
tableType = CatalogTableType.MANAGED,
2251+
storage = CatalogStorageFormat.empty,
2252+
schema = new StructType().add("id", IntegerType)
2253+
)
2254+
catalog.createTable(validTable, ignoreIfExists = false)
2255+
2256+
// Try to alter table with identifier without database - should fail
2257+
val tableWithoutDb = validTable.copy(
2258+
identifier = TableIdentifier("test_table", None)
2259+
)
2260+
2261+
val exception = intercept[AssertionError] {
2262+
catalog.alterTable(tableWithoutDb)
2263+
}
2264+
2265+
val expectedMessage =
2266+
"assertion failed: Table identifier `test_table` is missing database name. " +
2267+
"Cannot alter table without a database specified."
2268+
assert(exception.getMessage === expectedMessage)
2269+
}
20942270
}

0 commit comments

Comments
 (0)