Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,11 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

// Use v1 command to describe (temp) view, as v2 catalog doesn't support view yet.
case DescribeRelation(
ResolvedV1TableOrViewIdentifier(ident), partitionSpec, isExtended, output) =>
DescribeTableCommand(ident, partitionSpec, isExtended, output)
resolvedChild @ ResolvedV1TableOrViewIdentifier(ident),
partitionSpec,
isExtended,
output) =>
DescribeTableCommand(resolvedChild, ident, partitionSpec, isExtended, output)

case DescribeColumn(
ResolvedViewIdentifier(ident), column: UnresolvedAttribute, isExtended, output) =>
Expand Down Expand Up @@ -431,7 +434,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
output,
pattern.map(_.asInstanceOf[UnresolvedPartitionSpec].spec))

case ShowColumns(ResolvedViewIdentifier(ident), ns, output) =>
case ShowColumns(resolvedChild @ ResolvedViewIdentifier(ident), ns, output) =>
val resolver = conf.resolver
val db = ns match {
case Some(nsSeq) if ident.database.exists(!resolver(_, nsSeq.head)) =>
Expand All @@ -443,9 +446,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// matches expected format (e.g. "SHOW COLUMNS IN showcolumn4 FROM global_temp").
val tableNameForCommand =
if (db.isDefined && ident.database == db) TableIdentifier(ident.table, None) else ident
ShowColumnsCommand(db, tableNameForCommand, output)
ShowColumnsCommand(resolvedChild, db, tableNameForCommand, output)

case ShowColumns(ResolvedV1TableIdentifier(ident), ns, output) =>
case ShowColumns(
resolvedChild @ ResolvedV1TableIdentifier(ident), ns, output) =>
val v1TableName = ident
val resolver = conf.resolver
val db = ns match {
Expand All @@ -454,7 +458,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
Seq(db.head), Seq(v1TableName.database.get))
case _ => ns.map(_.head)
}
ShowColumnsCommand(db, v1TableName, output)
ShowColumnsCommand(resolvedChild, db, v1TableName, output)

// V2 catalog doesn't support RECOVER PARTITIONS yet, we must use v1 command here.
case RecoverPartitions(ResolvedV1TableIdentifierInSessionCatalog(ident)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, F

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.analysis.{ResolvedPersistentView, ResolvedTable, ResolvedTempView, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
Expand All @@ -39,8 +39,8 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIfNeeded, CaseInsensitiveMap, CharVarcharUtils, DateTimeUtils, ResolveDefaultColumns}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY
import org.apache.spark.sql.classic.ClassicConversions.castToImpl
import org.apache.spark.sql.connector.catalog.{TableCatalog, V1Table}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TableIdentifierHelper
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.CommandExecutionMode
import org.apache.spark.sql.execution.datasources.DataSource
Expand Down Expand Up @@ -577,7 +577,31 @@ case class TruncateTableCommand(
}
}

abstract class DescribeCommandBase extends LeafRunnableCommand {
object ResolvedChildHelper {
/**
* Used by ShowColumnsCommand and DescribeTableCommand to
* extract CatalogTable from the plan representing the entity
* being described.
*/
def getTableMetadata(
child: LogicalPlan,
sparkSession: SparkSession,
table: TableIdentifier): CatalogTable = {
val catalog = sparkSession.sessionState.catalog
child match {
case ResolvedTempView(_, metadata) => metadata
case ResolvedPersistentView(_, _, metadata) => metadata
case ResolvedTable(_, _, t: V1Table, _) => t.v1Table
case _ if (catalog.isTempView(table)) =>
catalog.getTempViewOrPermanentTableMetadata(table)
case _ => catalog.getTableRawMetadata(table)
}
}
}

trait DescribeCommandBase {
def output: Seq[Attribute]

protected def describeSchema(
schema: StructType,
buffer: ArrayBuffer[Row],
Expand All @@ -602,24 +626,32 @@ abstract class DescribeCommandBase extends LeafRunnableCommand {
* }}}
*/
case class DescribeTableCommand(
override val child: LogicalPlan,
table: TableIdentifier,
partitionSpec: TablePartitionSpec,
isExtended: Boolean,
override val output: Seq[Attribute])
extends DescribeCommandBase {
extends UnaryRunnableCommand with DescribeCommandBase {

override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(child = newChild)

override def run(sparkSession: SparkSession): Seq[Row] = {
val result = new ArrayBuffer[Row]
val catalog = sparkSession.sessionState.catalog
// V2SessionCatalog.loadTable uses getTableMetadata which replaces char/varchar with string
// in the CatalogTable schema. Restore the original types from field metadata so that
// DESCRIBE TABLE reports char(n)/varchar(n) instead of string.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good catch! shall we do CharVarcharUtils.getRawSchema in the util function ResolvedChildHelper.getTableMetadata? SHOW COLUMNS should report char/varchar instead of string as well.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SHOW COLUMNS only reports the column names I think. See

metadata.schema.map { c =>
      Row(c.name)
    }

Do you still think we should repeat this there for consistency?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, then let's leave it

val rawMetadata = ResolvedChildHelper.getTableMetadata(child, sparkSession, table)
val metadata = rawMetadata.copy(
schema = CharVarcharUtils.getRawSchema(rawMetadata.schema))

if (catalog.isTempView(table)) {
if (partitionSpec.nonEmpty) {
throw QueryCompilationErrors.descPartitionNotAllowedOnTempView(table.identifier)
}
val schema = catalog.getTempViewOrPermanentTableMetadata(table).schema
describeSchema(schema, result, header = false)
describeSchema(metadata.schema, result, header = false)
} else {
val metadata = catalog.getTableRawMetadata(table)
if (metadata.schema.isEmpty) {
// In older version(prior to 2.1) of Spark, the table schema can be empty and should be
// inferred at runtime. We should still support it.
Expand Down Expand Up @@ -747,7 +779,7 @@ case class DescribeTableCommand(
* 7. Common table expressions (CTEs)
*/
case class DescribeQueryCommand(queryText: String, plan: LogicalPlan)
extends DescribeCommandBase with SupervisingCommand with CTEInChildren {
extends LeafRunnableCommand with DescribeCommandBase with SupervisingCommand with CTEInChildren {

override val output = DescribeCommandSchema.describeTableAttributes()

Expand Down Expand Up @@ -988,18 +1020,22 @@ case class ShowTablePropertiesCommand(
* }}}
*/
case class ShowColumnsCommand(
override val child: LogicalPlan,
databaseName: Option[String],
tableName: TableIdentifier,
override val output: Seq[Attribute]) extends LeafRunnableCommand {
override val output: Seq[Attribute])
extends UnaryRunnableCommand {

override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(child = newChild)

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val lookupTable = databaseName match {
case None => tableName
case Some(db) => TableIdentifier(tableName.identifier, Some(db))
}
val table = catalog.getTempViewOrPermanentTableMetadata(lookupTable)
table.schema.map { c =>
val metadata = ResolvedChildHelper.getTableMetadata(child, sparkSession, lookupTable)
metadata.schema.map { c =>
Row(c.name)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ CreateDataSourceTableCommand `spark_catalog`.`default`.`test_change`, false
DESC test_change
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`test_change`, false, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.test_change, V1Table(default.test_change), [a#x, b#x, c#x]


-- !query
Expand All @@ -34,6 +35,7 @@ org.apache.spark.sql.catalyst.parser.ParseException
DESC test_change
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`test_change`, false, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.test_change, V1Table(default.test_change), [a#x, b#x, c#x]


-- !query
Expand All @@ -54,6 +56,7 @@ org.apache.spark.sql.AnalysisException
DESC test_change
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`test_change`, false, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.test_change, V1Table(default.test_change), [a#x, b#x, c#x]


-- !query
Expand Down Expand Up @@ -84,6 +87,7 @@ org.apache.spark.sql.AnalysisException
DESC test_change
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`test_change`, false, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.test_change, V1Table(default.test_change), [a#x, b#x, c#x]


-- !query
Expand Down Expand Up @@ -118,6 +122,7 @@ org.apache.spark.sql.AnalysisException
DESC test_change
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`test_change`, false, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.test_change, V1Table(default.test_change), [a#x, b#x, c#x]


-- !query
Expand All @@ -142,6 +147,7 @@ AlterTableChangeColumnCommand `spark_catalog`.`default`.`test_change`, c, Struct
DESC test_change
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`test_change`, false, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.test_change, V1Table(default.test_change), [a#x, b#x, c#x]


-- !query
Expand All @@ -160,6 +166,7 @@ AlterTableChangeColumnCommand `spark_catalog`.`default`.`test_change`, a, Struct
DESC test_change
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`test_change`, false, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.test_change, V1Table(default.test_change), [a#x, b#x, c#x]


-- !query
Expand Down Expand Up @@ -187,6 +194,7 @@ org.apache.spark.sql.AnalysisException
DESC test_change
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`test_change`, false, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.test_change, V1Table(default.test_change), [a#x, b#x, c#x]


-- !query
Expand All @@ -199,6 +207,7 @@ AlterTableChangeColumnCommand `spark_catalog`.`default`.`test_change`, a, Struct
DESC test_change
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`test_change`, false, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.test_change, V1Table(default.test_change), [a#x, b#x, c#x]


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ CreateDataSourceTableCommand `spark_catalog`.`default`.`char_tbl`, false
desc formatted char_tbl
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_tbl`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.char_tbl, V1Table(default.char_tbl), [c#x, v#x]


-- !query
Expand Down Expand Up @@ -44,6 +45,7 @@ ShowCreateTable false, [createtab_stmt#x]
desc formatted char_tbl2
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_tbl2`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.char_tbl2, V1Table(default.char_tbl2), [c#x, v#x]


-- !query
Expand All @@ -62,6 +64,7 @@ CreateTableLikeCommand `spark_catalog`.`default`.`char_tbl3`, `spark_catalog`.`d
desc formatted char_tbl3
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_tbl3`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.char_tbl3, V1Table(default.char_tbl3), [c#x, v#x]


-- !query
Expand Down Expand Up @@ -94,6 +97,7 @@ org.apache.spark.sql.AnalysisException
desc formatted char_view
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_view`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedPersistentView V2SessionCatalog(spark_catalog), default.char_view, `spark_catalog`.`default`.`char_view`


-- !query
Expand All @@ -118,6 +122,7 @@ AlterTableRenameCommand `spark_catalog`.`default`.`char_tbl`, `char_tbl1`, false
desc formatted char_tbl1
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_tbl1`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.char_tbl1, V1Table(default.char_tbl1), [c#x, v#x]


-- !query
Expand Down Expand Up @@ -154,6 +159,7 @@ AlterTableChangeColumnCommand `spark_catalog`.`default`.`char_tbl1`, c, StructFi
desc formatted char_tbl1
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_tbl1`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.char_tbl1, V1Table(default.char_tbl1), [c#x, v#x]


-- !query
Expand All @@ -166,6 +172,7 @@ AlterTableAddColumnsCommand `spark_catalog`.`default`.`char_tbl1`, [StructField(
desc formatted char_tbl1
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_tbl1`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.char_tbl1, V1Table(default.char_tbl1), [c#x, v#x, d#x]


-- !query
Expand All @@ -182,6 +189,7 @@ AlterViewAsCommand `spark_catalog`.`default`.`char_view`, select * from char_tbl
desc formatted char_view
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_view`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedPersistentView V2SessionCatalog(spark_catalog), default.char_view, `spark_catalog`.`default`.`char_view`


-- !query
Expand All @@ -194,6 +202,7 @@ AlterTableSetPropertiesCommand `spark_catalog`.`default`.`char_tbl1`, [yes=no],
desc formatted char_tbl1
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_tbl1`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.char_tbl1, V1Table(default.char_tbl1), [c#x, v#x, d#x]


-- !query
Expand All @@ -206,6 +215,7 @@ AlterTableSetPropertiesCommand `spark_catalog`.`default`.`char_view`, [yes=no],
desc formatted char_view
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_view`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedPersistentView V2SessionCatalog(spark_catalog), default.char_view, `spark_catalog`.`default`.`char_view`


-- !query
Expand All @@ -218,6 +228,7 @@ AlterTableUnsetPropertiesCommand `spark_catalog`.`default`.`char_tbl1`, [yes], f
desc formatted char_tbl1
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_tbl1`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.char_tbl1, V1Table(default.char_tbl1), [c#x, v#x, d#x]


-- !query
Expand All @@ -230,6 +241,7 @@ AlterTableUnsetPropertiesCommand `spark_catalog`.`default`.`char_view`, [yes], f
desc formatted char_view
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_view`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedPersistentView V2SessionCatalog(spark_catalog), default.char_view, `spark_catalog`.`default`.`char_view`


-- !query
Expand All @@ -242,6 +254,7 @@ AlterTableSerDePropertiesCommand `spark_catalog`.`default`.`char_tbl1`, Map(yes
desc formatted char_tbl1
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_tbl1`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.char_tbl1, V1Table(default.char_tbl1), [c#x, v#x, d#x]


-- !query
Expand All @@ -261,6 +274,7 @@ org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
desc formatted char_part
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_part`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.char_part, V1Table(default.char_part), [c1#x, v1#x, v2#x, c2#x]


-- !query
Expand Down Expand Up @@ -293,6 +307,7 @@ org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException
desc formatted char_part
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_part`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.char_part, V1Table(default.char_part), [c1#x, v1#x, v2#x, c2#x]


-- !query
Expand All @@ -313,6 +328,7 @@ org.apache.spark.sql.AnalysisException
desc formatted char_part
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_part`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.char_part, V1Table(default.char_part), [c1#x, v1#x, v2#x, c2#x]


-- !query
Expand All @@ -325,6 +341,7 @@ AlterTableSetLocationCommand `spark_catalog`.`default`.`char_part`, Map(v2 -> ke
desc formatted char_part
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_part`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.char_part, V1Table(default.char_part), [c1#x, v1#x, v2#x, c2#x]


-- !query
Expand All @@ -337,6 +354,7 @@ RepairTableCommand `spark_catalog`.`default`.`char_part`, true, false, MSCK REPA
desc formatted char_part
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`char_part`, true, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.char_part, V1Table(default.char_part), [c1#x, v1#x, v2#x, c2#x]


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_d
describe table t1
-- !query analysis
DescribeTableCommand `spark_catalog`.`default`.`t1`, false, [col_name#x, data_type#x, comment#x]
+- ResolvedTable V2SessionCatalog(spark_catalog), default.t1, V1Table(default.t1), [utf8_binary#x, utf8_lcase#x]


-- !query
Expand Down
Loading