@@ -22,11 +22,22 @@ use common_util::{
22
22
runtime:: { AbortOnDropMany , JoinHandle , Runtime } ,
23
23
time:: InstantExt ,
24
24
} ;
25
+ use datafusion:: {
26
+ common:: ToDFSchema ,
27
+ physical_expr:: { create_physical_expr, execution_props:: ExecutionProps } ,
28
+ physical_plan:: {
29
+ file_format:: { parquet:: page_filter:: PagePruningPredicate , ParquetFileMetrics } ,
30
+ metrics:: ExecutionPlanMetricsSet ,
31
+ } ,
32
+ } ;
25
33
use futures:: { future:: BoxFuture , FutureExt , Stream , StreamExt , TryFutureExt } ;
26
34
use log:: { debug, error} ;
27
35
use object_store:: { ObjectStoreRef , Path } ;
28
36
use parquet:: {
29
- arrow:: { async_reader:: AsyncFileReader , ParquetRecordBatchStreamBuilder , ProjectionMask } ,
37
+ arrow:: {
38
+ arrow_reader:: RowSelection , async_reader:: AsyncFileReader , ParquetRecordBatchStreamBuilder ,
39
+ ProjectionMask ,
40
+ } ,
30
41
file:: metadata:: RowGroupMetaData ,
31
42
} ;
32
43
use parquet_ext:: meta_data:: ChunkReader ;
@@ -71,6 +82,7 @@ pub struct Reader<'a> {
71
82
72
83
/// Options for `read_parallelly`
73
84
metrics : Metrics ,
85
+ df_plan_metrics : ExecutionPlanMetricsSet ,
74
86
}
75
87
76
88
#[ derive( Default , Debug , Clone , TraceMetricWhenDrop ) ]
@@ -94,7 +106,7 @@ impl<'a> Reader<'a> {
94
106
metrics_collector : Option < MetricsCollector > ,
95
107
) -> Self {
96
108
let store = store_picker. pick_by_freq ( options. frequency ) ;
97
-
109
+ let df_plan_metrics = ExecutionPlanMetricsSet :: new ( ) ;
98
110
let metrics = Metrics {
99
111
metrics_collector,
100
112
..Default :: default ( )
@@ -112,6 +124,7 @@ impl<'a> Reader<'a> {
112
124
meta_data : None ,
113
125
row_projector : None ,
114
126
metrics,
127
+ df_plan_metrics,
115
128
}
116
129
}
117
130
@@ -182,6 +195,36 @@ impl<'a> Reader<'a> {
182
195
suggested. min ( num_row_groups) . max ( 1 )
183
196
}
184
197
198
+ fn build_row_selection (
199
+ & self ,
200
+ arrow_schema : SchemaRef ,
201
+ row_groups : & [ usize ] ,
202
+ file_metadata : & parquet_ext:: ParquetMetaData ,
203
+ ) -> Result < Option < RowSelection > > {
204
+ // TODO: remove fixed partition
205
+ let partition = 0 ;
206
+ let exprs = datafusion:: optimizer:: utils:: conjunction ( self . predicate . exprs ( ) . to_vec ( ) ) ;
207
+ let exprs = match exprs {
208
+ Some ( exprs) => exprs,
209
+ None => return Ok ( None ) ,
210
+ } ;
211
+
212
+ let df_schema = arrow_schema
213
+ . clone ( )
214
+ . to_dfschema ( )
215
+ . context ( DataFusionError ) ?;
216
+ let physical_expr =
217
+ create_physical_expr ( & exprs, & df_schema, & arrow_schema, & ExecutionProps :: new ( ) )
218
+ . context ( DataFusionError ) ?;
219
+ let page_predicate = PagePruningPredicate :: try_new ( & physical_expr, arrow_schema. clone ( ) )
220
+ . context ( DataFusionError ) ?;
221
+
222
+ let metrics = ParquetFileMetrics :: new ( partition, self . path . as_ref ( ) , & self . df_plan_metrics ) ;
223
+ page_predicate
224
+ . prune ( row_groups, file_metadata, & metrics)
225
+ . context ( DataFusionError )
226
+ }
227
+
185
228
async fn fetch_record_batch_streams (
186
229
& mut self ,
187
230
suggested_parallelism : usize ,
@@ -190,10 +233,10 @@ impl<'a> Reader<'a> {
190
233
191
234
let meta_data = self . meta_data . as_ref ( ) . unwrap ( ) ;
192
235
let row_projector = self . row_projector . as_ref ( ) . unwrap ( ) ;
193
-
236
+ let arrow_schema = meta_data . custom ( ) . schema . to_arrow_schema_ref ( ) ;
194
237
// Get target row groups.
195
238
let target_row_groups = self . prune_row_groups (
196
- meta_data . custom ( ) . schema . to_arrow_schema_ref ( ) ,
239
+ arrow_schema . clone ( ) ,
197
240
meta_data. parquet ( ) . row_groups ( ) ,
198
241
meta_data. custom ( ) . parquet_filter . as_ref ( ) ,
199
242
) ?;
@@ -226,6 +269,7 @@ impl<'a> Reader<'a> {
226
269
target_row_group_chunks[ chunk_idx] . push ( row_group) ;
227
270
}
228
271
272
+ let parquet_metadata = meta_data. parquet ( ) ;
229
273
let proj_mask = ProjectionMask :: leaves (
230
274
meta_data. parquet ( ) . file_metadata ( ) . schema_descr ( ) ,
231
275
row_projector. existed_source_projection ( ) . iter ( ) . copied ( ) ,
@@ -239,9 +283,15 @@ impl<'a> Reader<'a> {
239
283
for chunk in target_row_group_chunks {
240
284
let object_store_reader =
241
285
ObjectStoreReader :: new ( self . store . clone ( ) , self . path . clone ( ) , meta_data. clone ( ) ) ;
242
- let builder = ParquetRecordBatchStreamBuilder :: new ( object_store_reader)
286
+ let mut builder = ParquetRecordBatchStreamBuilder :: new ( object_store_reader)
243
287
. await
244
288
. with_context ( || ParquetError ) ?;
289
+ let row_selection =
290
+ self . build_row_selection ( arrow_schema. clone ( ) , & chunk, parquet_metadata) ?;
291
+ if let Some ( selection) = row_selection {
292
+ builder = builder. with_row_selection ( selection) ;
293
+ } ;
294
+
245
295
let stream = builder
246
296
. with_batch_size ( self . num_rows_per_row_group )
247
297
. with_row_groups ( chunk)
@@ -353,6 +403,16 @@ impl<'a> Reader<'a> {
353
403
}
354
404
}
355
405
406
+ impl < ' a > Drop for Reader < ' a > {
407
+ fn drop ( & mut self ) {
408
+ debug ! (
409
+ "Parquet reader dropped, path:{:?}, df_plan_metrics:{}" ,
410
+ self . path,
411
+ self . df_plan_metrics. clone_inner( ) . to_string( )
412
+ ) ;
413
+ }
414
+ }
415
+
356
416
#[ derive( Clone ) ]
357
417
struct ObjectStoreReader {
358
418
storage : ObjectStoreRef ,
0 commit comments