Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 committed Dec 8, 2023
1 parent f295898 commit a3aedbf
Showing 1 changed file with 1 addition and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.scalatest.matchers.must.Matchers.the
import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Lag, Literal, NonFoldableLiteral}
import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow, WindowGroupLimit}
import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec}
Expand Down Expand Up @@ -1300,18 +1300,6 @@ class DataFrameWindowFunctionsSuite extends QueryTest
Seq(-1, 100).foreach { threshold =>
withSQLConf(SQLConf.WINDOW_GROUP_LIMIT_THRESHOLD.key -> threshold.toString) {
// RowFrame
val existWindowGroupLimitRowFrame =
df.withColumn("sum_value", sum("value").over(window))
.limit(1)
.queryExecution.optimizedPlan.exists {
case _: WindowGroupLimit => true
case _ => false
}
if (threshold == -1) {
assert(!existWindowGroupLimitRowFrame)
} else {
assert(existWindowGroupLimitRowFrame)
}
checkAnswer(df.withColumn("rn", row_number().over(window)).limit(1),
Seq(
Row("a", 4, "", 1)
Expand Down Expand Up @@ -1345,18 +1333,6 @@ class DataFrameWindowFunctionsSuite extends QueryTest
)

// RangeFrame
val existWindowGroupLimitRangeFrame =
df.withColumn("sum_value", sum("value").over(window))
.limit(1)
.queryExecution.optimizedPlan.exists {
case _: WindowGroupLimit => true
case _ => false
}
if (threshold == -1) {
assert(!existWindowGroupLimitRangeFrame)
} else {
assert(existWindowGroupLimitRangeFrame)
}
checkAnswer(df.withColumn("sum_value", sum("value").over(window)).limit(1),
Seq(
Row("a", 4, "", 8)
Expand Down Expand Up @@ -1399,18 +1375,6 @@ class DataFrameWindowFunctionsSuite extends QueryTest
Row("a", 4, "", 4, 8)
)
)

// Choose LimitPushDownThroughWindow instead of WindowGroupLimit if the
// window function is rank-like and Window partitionSpec is empty.
val existWindowGroupLimit =
df.withColumn("rn", row_number().over(window3))
.limit(10)
.filter("rn < 5")
.queryExecution.optimizedPlan.exists {
case _: WindowGroupLimit => true
case _ => false
}
assert(!existWindowGroupLimit)
}
}
}
Expand Down

0 comments on commit a3aedbf

Please sign in to comment.