@@ -64,59 +64,64 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
6464 }
6565
6666 private def _apply (plan : SparkPlan ): SparkPlan = {
67- if (! isCometLoaded(conf) || ! isCometScanEnabled(conf)) {
68- if (! isCometLoaded(conf)) {
69- withInfo(plan, " Comet is not enabled" )
70- } else if (! isCometScanEnabled(conf)) {
71- withInfo(plan, " Comet Scan is not enabled" )
72- }
73- plan
74- } else {
67+ if (! isCometLoaded(conf)) return plan
7568
76- def hasMetadataCol (plan : SparkPlan ): Boolean = {
77- plan.expressions.exists(_.exists {
78- case a : Attribute =>
79- a.isMetadataCol
80- case _ => false
81- })
82- }
69+ def isSupportedScanNode (plan : SparkPlan ): Boolean = plan match {
70+ case _ : FileSourceScanExec => true
71+ case _ : BatchScanExec => true
72+ case _ => false
73+ }
8374
84- def isIcebergMetadataTable (scanExec : BatchScanExec ): Boolean = {
85- // List of Iceberg metadata tables:
86- // https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables
87- val metadataTableSuffix = Set (
88- " history" ,
89- " metadata_log_entries" ,
90- " snapshots" ,
91- " entries" ,
92- " files" ,
93- " manifests" ,
94- " partitions" ,
95- " position_deletes" ,
96- " all_data_files" ,
97- " all_delete_files" ,
98- " all_entries" ,
99- " all_manifests" )
100-
101- metadataTableSuffix.exists(suffix => scanExec.table.name().endsWith(suffix))
102- }
75+ def hasMetadataCol (plan : SparkPlan ): Boolean = {
76+ plan.expressions.exists(_.exists {
77+ case a : Attribute =>
78+ a.isMetadataCol
79+ case _ => false
80+ })
81+ }
10382
104- plan.transform {
105- case scan if hasMetadataCol(scan) =>
106- withInfo(scan, " Metadata column is not supported" )
83+ def isIcebergMetadataTable (scanExec : BatchScanExec ): Boolean = {
84+ // List of Iceberg metadata tables:
85+ // https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables
86+ val metadataTableSuffix = Set (
87+ " history" ,
88+ " metadata_log_entries" ,
89+ " snapshots" ,
90+ " entries" ,
91+ " files" ,
92+ " manifests" ,
93+ " partitions" ,
94+ " position_deletes" ,
95+ " all_data_files" ,
96+ " all_delete_files" ,
97+ " all_entries" ,
98+ " all_manifests" )
99+
100+ metadataTableSuffix.exists(suffix => scanExec.table.name().endsWith(suffix))
101+ }
107102
108- // data source V1
109- case scanExec : FileSourceScanExec =>
110- transformV1Scan(scanExec )
103+ def transformScan ( plan : SparkPlan ) : SparkPlan = plan match {
104+ case scan if ! isCometScanEnabled(conf) =>
105+ withInfo(scan, " Comet Scan is not enabled " )
111106
112- // data source V2
113- case scanExec : BatchScanExec =>
114- if (isIcebergMetadataTable(scanExec)) {
115- withInfo(scanExec, " Iceberg Metadata tables are not supported" )
116- } else {
117- transformV2Scan(scanExec)
118- }
119- }
107+ case scan if hasMetadataCol(scan) =>
108+ withInfo(scan, " Metadata column is not supported" )
109+
110+ // data source V1
111+ case scanExec : FileSourceScanExec =>
112+ transformV1Scan(scanExec)
113+
114+ // data source V2
115+ case scanExec : BatchScanExec =>
116+ if (isIcebergMetadataTable(scanExec)) {
117+ withInfo(scanExec, " Iceberg Metadata tables are not supported" )
118+ } else {
119+ transformV2Scan(scanExec)
120+ }
121+ }
122+
123+ plan.transform {
124+ case scan if isSupportedScanNode(scan) => transformScan(scan)
120125 }
121126 }
122127
0 commit comments