Skip to content

Conversation

@Weijun-H
Copy link
Member

@Weijun-H Weijun-H commented Oct 24, 2025

Which issue does this PR close?

Rationale for this change

Add BaselineMetrics into RepartitionExec

What changes are included in this PR?

> explain analyze
select v1, count(*)
from generate_series(1,100) as t1(v1)
group by v1;
+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                     |
+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[v1@0 as v1, count(Int64(1))@1 as count(*)], metrics=[output_rows=100, elapsed_compute=33.25µs]                                                                                                                                     |
|                   |   AggregateExec: mode=FinalPartitioned, gby=[v1@0 as v1], aggr=[count(Int64(1))], metrics=[output_rows=100, elapsed_compute=3.517459ms, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=53376]                                         |
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=100, elapsed_compute=346.915µs]                                                                                                                                                    |
|                   |       RepartitionExec: partitioning=Hash([v1@0], 11), input_partitions=11, metrics=[output_rows=100, elapsed_compute=5.418µs, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=5.720625ms, repartition_time=539.261µs, send_time=18.568µs] |
|                   |         AggregateExec: mode=Partial, gby=[v1@0 as v1], aggr=[count(Int64(1))], metrics=[output_rows=100, elapsed_compute=883.25µs, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, skipped_aggregation_rows=0, peak_mem_used=7136]                   |
|                   |           RepartitionExec: partitioning=RoundRobinBatch(11), input_partitions=1, metrics=[output_rows=100, elapsed_compute=1.635µs, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=169.959µs, repartition_time=1ns, send_time=19.843µs]  |
|                   |             ProjectionExec: expr=[value@0 as v1], metrics=[output_rows=100, elapsed_compute=17.541µs]                                                                                                                                                    |
|                   |               LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100, batch_size=8192], metrics=[output_rows=100, elapsed_compute=142µs]                                                                                      |
|                   |                                                                                                                                                                                                                                                          |
+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 0.032 seconds.

Are these changes tested?

Added tests for spilling, order preservation, and default repartitioning.

Are there any user-facing changes?

Yes, explain analyze would produce more baseline metric for repartition

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Oct 24, 2025
@Weijun-H Weijun-H marked this pull request as draft October 24, 2025 06:32
@Weijun-H Weijun-H force-pushed the 18218-baselinemetric-into-repartition branch from 2ebcedf to 293347f Compare October 24, 2025 07:56
@Weijun-H Weijun-H changed the title feat: add baseline metrics tracking to RepartitionExec feat: add baseline metrics tracking to RepartitionExec Oct 24, 2025
@Weijun-H Weijun-H marked this pull request as ready for review October 24, 2025 07:59
@Weijun-H Weijun-H requested a review from 2010YOUY01 October 24, 2025 08:00
@2010YOUY01
Copy link
Contributor

Thank you!

Here the elpased_compute metric doesn't seem correct. It should represent the time operator using the CPU and do the actual work, so it must be larger than the repartition_time. I think it's mostly repartition_time plus time doing spilling writes and reads.
In this line elapsed_compute is smaller than repartition_time

|                   |       RepartitionExec: partitioning=Hash([v1@0], 11), input_partitions=11, metrics=[output_rows=100, elapsed_compute=5.418µs, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=5.720625ms, repartition_time=539.261µs, send_time=18.568µs] |

Besides I suggest to wait until #18207 is merged, there seem to be a lot of conflict.

@Weijun-H Weijun-H marked this pull request as draft October 24, 2025 12:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RepartitionExec is missing BaselineMetrics

2 participants