Skip to content

[SPARK-52065][SQL] Produce another plan tree with output columns (name, data type, nullability) in plan change logging #50852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.rules.RuleId
import org.apache.spark.sql.catalyst.rules.UnknownRuleId
Expand Down Expand Up @@ -55,6 +56,32 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]

def output: Seq[Attribute]

override def nodeWithOutputColumnsString(maxColumns: Int): String = {
try {
nodeName + {
if (this.output.length > maxColumns) {
val outputWithNullability = this.output.take(maxColumns).map { attr =>
attr.toString + s"[nullable=${attr.nullable}]"
}

outputWithNullability.mkString(" <output=", ", ",
s" ... ${this.output.length - maxColumns} more columns>")
} else {
val outputWithNullability = this.output.map { attr =>
attr.toString + s"[nullable=${attr.nullable}]"
}

outputWithNullability.mkString(" <output=", ", ", ">")
}
}
} catch {
case _: UnresolvedException =>
// If we encounter an UnresolvedException, it's high likely that the call of `this.output`
// throws it. In this case, we may have to give up and only show the nodeName.
nodeName + " <output='Unresolved'>"
}
}

/**
* Returns the set of attributes that are output by this node.
*/
Expand Down Expand Up @@ -797,9 +824,10 @@ object QueryPlan extends PredicateHelper {
verbose: Boolean,
addSuffix: Boolean,
maxFields: Int = SQLConf.get.maxToStringFields,
printOperatorId: Boolean = false): Unit = {
printOperatorId: Boolean = false,
printOutputColumns: Boolean = false): Unit = {
try {
plan.treeString(append, verbose, addSuffix, maxFields, printOperatorId)
plan.treeString(append, verbose, addSuffix, maxFields, printOperatorId, printOutputColumns)
} catch {
case e: AnalysisException => append(e.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {
log"""
|=== Applying Rule ${MDC(RULE_NAME, ruleName)} ===
|${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))}
|
|Output Information:
|${MDC(QUERY_PLAN, newPlan.treeStringWithOutputColumns)}
""".stripMargin
}

Expand All @@ -77,6 +80,9 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {
log"""
|=== Result of Batch ${MDC(BATCH_NAME, batchName)} ===
|${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))}
|
|Output Information:
|${MDC(QUERY_PLAN, newPlan.treeStringWithOutputColumns)}
""".stripMargin
} else {
log"Batch ${MDC(BATCH_NAME, batchName)} has no effect."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -943,13 +943,18 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
/** Returns a string representation of the nodes in this tree */
final def treeString: String = treeString(verbose = true)

final def treeStringWithOutputColumns: String = {
treeString(verbose = false, printOutputColumns = true)
}

final def treeString(
verbose: Boolean,
addSuffix: Boolean = false,
maxFields: Int = SQLConf.get.maxToStringFields,
printOperatorId: Boolean = false): String = {
printOperatorId: Boolean = false,
printOutputColumns: Boolean = false): String = {
val concat = new PlanStringConcat()
treeString(concat.append, verbose, addSuffix, maxFields, printOperatorId)
treeString(concat.append, verbose, addSuffix, maxFields, printOperatorId, printOutputColumns)
concat.toString
}

Expand All @@ -958,9 +963,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
verbose: Boolean,
addSuffix: Boolean,
maxFields: Int,
printOperatorId: Boolean): Unit = {
printOperatorId: Boolean,
printOutputColumns: Boolean): Unit = {
generateTreeString(0, new java.util.ArrayList(), append, verbose, "", addSuffix, maxFields,
printOperatorId, 0)
printOperatorId, printOutputColumns, 0)
}

/**
Expand Down Expand Up @@ -1011,6 +1017,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
*/
def innerChildren: Seq[TreeNode[_]] = Seq.empty

def nodeWithOutputColumnsString(maxColumns: Int): String = {
throw new UnsupportedOperationException("TreeNode does not have output columns")
}

/**
* Appends the string representation of this node and its children to the given Writer.
*
Expand All @@ -1029,6 +1039,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
(0 until indent).foreach(_ => append(" "))
if (depth > 0) {
Expand All @@ -1044,6 +1055,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
if (addSuffix) verboseStringWithSuffix(maxFields) else verboseString(maxFields)
} else if (printNodeId) {
simpleStringWithNodeId()
} else if (printOutputColumns) {
nodeWithOutputColumnsString(maxFields)
} else {
simpleString(maxFields)
}
Expand All @@ -1057,15 +1070,17 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
lastChildren.add(false)
innerChildrenLocal.init.foreach(_.generateTreeString(
depth + 2, lastChildren, append, verbose,
addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, indent = indent))
addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId,
printOutputColumns = printOutputColumns, indent = indent))
lastChildren.remove(lastChildren.size() - 1)
lastChildren.remove(lastChildren.size() - 1)

lastChildren.add(children.isEmpty)
lastChildren.add(true)
innerChildrenLocal.last.generateTreeString(
depth + 2, lastChildren, append, verbose,
addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, indent = indent)
addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId,
printOutputColumns = printOutputColumns, indent = indent)
lastChildren.remove(lastChildren.size() - 1)
lastChildren.remove(lastChildren.size() - 1)
}
Expand All @@ -1074,14 +1089,16 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
lastChildren.add(false)
children.init.foreach(_.generateTreeString(
depth + 1, lastChildren, append, verbose, prefix, addSuffix,
maxFields, printNodeId = printNodeId, indent = indent)
maxFields, printNodeId = printNodeId, printOutputColumns = printOutputColumns,
indent = indent)
)
lastChildren.remove(lastChildren.size() - 1)

lastChildren.add(true)
children.last.generateTreeString(
depth + 1, lastChildren, append, verbose, prefix,
addSuffix, maxFields, printNodeId = printNodeId, indent = indent)
addSuffix, maxFields, printNodeId = printNodeId, printOutputColumns = printOutputColumns,
indent = indent)
lastChildren.remove(lastChildren.size() - 1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
super.generateTreeString(depth,
lastChildren,
Expand All @@ -70,11 +71,13 @@ case class EmptyRelationExec(@transient logical: LogicalPlan) extends LeafExecNo
addSuffix,
maxFields,
printNodeId,
printOutputColumns,
indent)
Option(logical).foreach { _ =>
lastChildren.add(true)
logical.generateTreeString(
depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, indent)
depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId,
printOutputColumns, indent)
lastChildren.remove(lastChildren.size() - 1)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
child.generateTreeString(
depth,
Expand All @@ -557,6 +558,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod
addSuffix = false,
maxFields,
printNodeId,
printOutputColumns,
indent)
}

Expand Down Expand Up @@ -818,6 +820,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
child.generateTreeString(
depth,
Expand All @@ -828,6 +831,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
false,
maxFields,
printNodeId,
printOutputColumns,
indent)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ case class AdaptiveSparkPlanExec(
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
super.generateTreeString(
depth,
Expand All @@ -440,6 +441,7 @@ case class AdaptiveSparkPlanExec(
addSuffix,
maxFields,
printNodeId,
printOutputColumns,
indent)
if (currentPhysicalPlan.fastEquals(initialPlan)) {
lastChildren.add(true)
Expand All @@ -452,6 +454,7 @@ case class AdaptiveSparkPlanExec(
addSuffix = false,
maxFields,
printNodeId,
printOutputColumns,
indent)
lastChildren.remove(lastChildren.size() - 1)
} else {
Expand All @@ -462,27 +465,29 @@ case class AdaptiveSparkPlanExec(
append,
verbose,
maxFields,
printNodeId)
printNodeId,
printOutputColumns)
generateTreeStringWithHeader(
"Initial Plan",
initialPlan,
depth,
append,
verbose,
maxFields,
printNodeId)
printNodeId,
printOutputColumns)
}
}


private def generateTreeStringWithHeader(
header: String,
plan: SparkPlan,
depth: Int,
append: String => Unit,
verbose: Boolean,
maxFields: Int,
printNodeId: Boolean): Unit = {
printNodeId: Boolean,
printOutputColumns: Boolean): Unit = {
append(" " * depth)
append(s"+- == $header ==\n")
plan.generateTreeString(
Expand All @@ -494,6 +499,7 @@ case class AdaptiveSparkPlanExec(
addSuffix = false,
maxFields,
printNodeId,
printOutputColumns,
indent = depth + 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ abstract class QueryStageExec extends LeafExecNode {
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
super.generateTreeString(depth,
lastChildren,
Expand All @@ -143,10 +144,12 @@ abstract class QueryStageExec extends LeafExecNode {
addSuffix,
maxFields,
printNodeId,
printOutputColumns,
indent)
lastChildren.add(true)
plan.generateTreeString(
depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId, indent)
depth + 1, lastChildren, append, verbose, "", false, maxFields, printNodeId,
printOutputColumns, indent)
lastChildren.remove(lastChildren.size() - 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ abstract class BaseSubqueryExec extends SparkPlan {
addSuffix: Boolean = false,
maxFields: Int,
printNodeId: Boolean,
printOutputColumns: Boolean,
indent: Int = 0): Unit = {
/**
* In the new explain mode `EXPLAIN FORMATTED`, the subqueries are not shown in the
Expand All @@ -807,6 +808,7 @@ abstract class BaseSubqueryExec extends SparkPlan {
false,
maxFields,
printNodeId,
printOutputColumns,
indent)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ class QueryExecutionSuite extends SharedSparkSession {
}
}
Seq("=== Applying Rule org.apache.spark.sql.execution",
"=== Result of Batch Preparations ===").foreach { expectedMsg =>
"=== Result of Batch Preparations ===",
"Output Information:").foreach { expectedMsg =>
assert(testAppender.loggingEvents.exists(
_.getMessage.getFormattedMessage.contains(expectedMsg)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1740,7 +1740,8 @@ class AdaptiveQueryExecSuite
Seq("=== Result of Batch AQE Preparations ===",
"=== Result of Batch AQE Post Stage Creation ===",
"=== Result of Batch AQE Replanning ===",
"=== Result of Batch AQE Query Stage Optimization ===").foreach { expectedMsg =>
"=== Result of Batch AQE Query Stage Optimization ===",
"Output Information:").foreach { expectedMsg =>
assert(testAppender.loggingEvents.exists(
_.getMessage.getFormattedMessage.contains(expectedMsg)))
}
Expand Down