Skip to content

Commit e4e017c

Browse files
authored
feat: support ColumnPruning (#57)
1 parent e4813e2 commit e4e017c

File tree

4 files changed

+390
-53
lines changed

4 files changed

+390
-53
lines changed

crates/examples/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ version = { workspace = true }
2727
fluss = { workspace = true }
2828
tokio = { workspace = true }
2929
clap = { workspace = true}
30-
31-
3230
[[example]]
3331
name = "example-table"
3432
path = "src/example_table.rs"

crates/fluss/src/client/table/scanner.rs

Lines changed: 104 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
use crate::client::connection::FlussConnection;
1919
use crate::client::metadata::Metadata;
20-
use crate::error::Result;
20+
use crate::error::{Error, Result};
2121
use crate::metadata::{TableBucket, TableInfo, TablePath};
2222
use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable};
2323
use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords, to_arrow_schema};
2424
use crate::rpc::RpcClient;
2525
use crate::util::FairBucketStatusMap;
26+
use arrow_schema::SchemaRef;
2627
use parking_lot::RwLock;
2728
use std::collections::HashMap;
2829
use std::slice::from_ref;
@@ -39,6 +40,8 @@ pub struct TableScan<'a> {
3940
conn: &'a FlussConnection,
4041
table_info: TableInfo,
4142
metadata: Arc<Metadata>,
43+
/// Column indices to project. None means all columns, Some(vec) means only the specified columns (non-empty).
44+
projected_fields: Option<Vec<usize>>,
4245
}
4346

4447
impl<'a> TableScan<'a> {
@@ -47,14 +50,82 @@ impl<'a> TableScan<'a> {
4750
conn,
4851
table_info,
4952
metadata,
53+
projected_fields: None,
5054
}
5155
}
5256

53-
pub fn create_log_scanner(&self) -> LogScanner {
57+
/// Projects the scan to only include specified columns by their indices.
58+
///
59+
/// # Arguments
60+
/// * `column_indices` - Zero-based indices of columns to include in the scan
61+
///
62+
/// # Errors
63+
/// Returns an error if `column_indices` is empty or if any column index is out of range.
64+
///
65+
/// # Example
66+
/// ```
67+
/// let scanner = table.new_scan().project(&[0, 2, 3])?.create_log_scanner();
68+
/// ```
69+
pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
70+
if column_indices.is_empty() {
71+
return Err(Error::IllegalArgument(
72+
"Column indices cannot be empty".to_string(),
73+
));
74+
}
75+
let field_count = self.table_info.row_type().fields().len();
76+
for &idx in column_indices {
77+
if idx >= field_count {
78+
return Err(Error::IllegalArgument(format!(
79+
"Column index {} out of range (max: {})",
80+
idx,
81+
field_count - 1
82+
)));
83+
}
84+
}
85+
self.projected_fields = Some(column_indices.to_vec());
86+
Ok(self)
87+
}
88+
89+
/// Projects the scan to only include specified columns by their names.
90+
///
91+
/// # Arguments
92+
/// * `column_names` - Names of columns to include in the scan
93+
///
94+
/// # Errors
95+
/// Returns an error if `column_names` is empty or if any column name is not found in the table schema.
96+
///
97+
/// # Example
98+
/// ```
99+
/// let scanner = table.new_scan().project_by_name(&["col1", "col3"])?.create_log_scanner();
100+
/// ```
101+
pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
102+
if column_names.is_empty() {
103+
return Err(Error::IllegalArgument(
104+
"Column names cannot be empty".to_string(),
105+
));
106+
}
107+
let row_type = self.table_info.row_type();
108+
let mut indices = Vec::new();
109+
110+
for name in column_names {
111+
let idx = row_type
112+
.fields()
113+
.iter()
114+
.position(|f| f.name() == *name)
115+
.ok_or_else(|| Error::IllegalArgument(format!("Column '{name}' not found")))?;
116+
indices.push(idx);
117+
}
118+
119+
self.projected_fields = Some(indices);
120+
Ok(self)
121+
}
122+
123+
pub fn create_log_scanner(self) -> LogScanner {
54124
LogScanner::new(
55125
&self.table_info,
56126
self.metadata.clone(),
57127
self.conn.get_connections(),
128+
self.projected_fields,
58129
)
59130
}
60131
}
@@ -72,6 +143,7 @@ impl LogScanner {
72143
table_info: &TableInfo,
73144
metadata: Arc<Metadata>,
74145
connections: Arc<RpcClient>,
146+
projected_fields: Option<Vec<usize>>,
75147
) -> Self {
76148
let log_scanner_status = Arc::new(LogScannerStatus::new());
77149
Self {
@@ -84,6 +156,7 @@ impl LogScanner {
84156
connections.clone(),
85157
metadata.clone(),
86158
log_scanner_status.clone(),
159+
projected_fields,
87160
),
88161
}
89162
}
@@ -114,6 +187,7 @@ struct LogFetcher {
114187
table_info: TableInfo,
115188
metadata: Arc<Metadata>,
116189
log_scanner_status: Arc<LogScannerStatus>,
190+
read_context: ReadContext,
117191
}
118192

119193
impl LogFetcher {
@@ -122,13 +196,27 @@ impl LogFetcher {
122196
conns: Arc<RpcClient>,
123197
metadata: Arc<Metadata>,
124198
log_scanner_status: Arc<LogScannerStatus>,
199+
projected_fields: Option<Vec<usize>>,
125200
) -> Self {
201+
let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
202+
let read_context = Self::create_read_context(full_arrow_schema, projected_fields);
126203
LogFetcher {
127204
table_path: table_info.table_path.clone(),
128-
conns: conns.clone(),
129-
table_info: table_info.clone(),
130-
metadata: metadata.clone(),
131-
log_scanner_status: log_scanner_status.clone(),
205+
conns,
206+
table_info,
207+
metadata,
208+
log_scanner_status,
209+
read_context,
210+
}
211+
}
212+
213+
fn create_read_context(
214+
full_arrow_schema: SchemaRef,
215+
projected_fields: Option<Vec<usize>>,
216+
) -> ReadContext {
217+
match projected_fields {
218+
None => ReadContext::new(full_arrow_schema),
219+
Some(fields) => ReadContext::with_projection_pushdown(full_arrow_schema, fields),
132220
}
133221
}
134222

@@ -149,7 +237,7 @@ impl LogFetcher {
149237
for pb_fetch_log_resp in fetch_response.tables_resp {
150238
let table_id = pb_fetch_log_resp.table_id;
151239
let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
152-
let arrow_schema = to_arrow_schema(self.table_info.get_row_type());
240+
153241
for fetch_log_for_bucket in fetch_log_for_buckets {
154242
let mut fetch_records = vec![];
155243
let bucket: i32 = fetch_log_for_bucket.bucket_id;
@@ -158,8 +246,7 @@ impl LogFetcher {
158246
let data = fetch_log_for_bucket.records.unwrap();
159247
for log_record in &mut LogRecordsBatchs::new(&data) {
160248
let last_offset = log_record.last_log_offset();
161-
fetch_records
162-
.extend(log_record.records(ReadContext::new(arrow_schema.clone())));
249+
fetch_records.extend(log_record.records(&self.read_context)?);
163250
self.log_scanner_status
164251
.update_offset(&table_bucket, last_offset + 1);
165252
}
@@ -209,13 +296,19 @@ impl LogFetcher {
209296
if ready_for_fetch_count == 0 {
210297
HashMap::new()
211298
} else {
299+
let (projection_enabled, projected_fields) =
300+
match self.read_context.project_fields_in_order() {
301+
None => (false, vec![]),
302+
Some(fields) => (true, fields.iter().map(|&i| i as i32).collect()),
303+
};
304+
212305
fetch_log_req_for_buckets
213306
.into_iter()
214307
.map(|(leader_id, feq_for_buckets)| {
215308
let req_for_table = PbFetchLogReqForTable {
216309
table_id: table_id.unwrap(),
217-
projection_pushdown_enabled: false,
218-
projected_fields: vec![],
310+
projection_pushdown_enabled: projection_enabled,
311+
projected_fields: projected_fields.clone(),
219312
buckets_req: feq_for_buckets,
220313
};
221314

0 commit comments

Comments
 (0)