Skip to content

Commit f32411e

Browse files
authored
[chore] Move list offsets to admin (#35)
1 parent 1b26535 commit f32411e

File tree

9 files changed

+149
-141
lines changed

9 files changed

+149
-141
lines changed

bindings/python/src/table.rs

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717

1818
use crate::TOKIO_RUNTIME;
1919
use crate::*;
20+
use fluss::client::EARLIEST_OFFSET;
21+
use fluss::rpc::message::OffsetSpec;
2022
use pyo3_async_runtimes::tokio::future_into_py;
21-
use std::collections::HashSet;
2223
use std::sync::Arc;
2324

24-
const EARLIEST_OFFSET: i64 = -2;
25-
2625
/// Represents a Fluss table for data operations
2726
#[pyclass]
2827
pub struct FlussTable {
@@ -70,8 +69,12 @@ impl FlussTable {
7069

7170
let rust_scanner = table_scan.create_log_scanner();
7271

73-
let py_scanner = LogScanner::from_core(rust_scanner, table_info.clone());
72+
let admin = conn
73+
.get_admin()
74+
.await
75+
.map_err(|e| FlussError::new_err(e.to_string()))?;
7476

77+
let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone());
7578
Python::with_gil(|py| Py::new(py, py_scanner))
7679
})
7780
}
@@ -275,6 +278,7 @@ impl AppendWriter {
275278
#[pyclass]
276279
pub struct LogScanner {
277280
inner: fcore::client::LogScanner,
281+
admin: fcore::client::FlussAdmin,
278282
table_info: fcore::metadata::TableInfo,
279283
#[allow(dead_code)]
280284
start_timestamp: Option<i64>,
@@ -327,50 +331,50 @@ impl LogScanner {
327331
let bucket_ids: Vec<i32> = (0..num_buckets).collect();
328332

329333
// todo: after supporting list_offsets with timestamp, we can use start_timestamp and end_timestamp here
330-
let target_offsets: HashMap<i32, i64> = TOKIO_RUNTIME
331-
.block_on(async { self.inner.list_offsets_latest(bucket_ids).await })
334+
let mut stopping_offsets: HashMap<i32, i64> = TOKIO_RUNTIME
335+
.block_on(async {
336+
self.admin
337+
.list_offsets(
338+
&self.table_info.table_path,
339+
bucket_ids.as_slice(),
340+
OffsetSpec::Latest,
341+
)
342+
.await
343+
})
332344
.map_err(|e| FlussError::new_err(e.to_string()))?;
333345

334-
let mut current_offsets: HashMap<i32, i64> = HashMap::new();
335-
let mut completed_buckets: HashSet<i32> = HashSet::new();
336-
337-
if !target_offsets.is_empty() {
346+
if !stopping_offsets.is_empty() {
338347
loop {
339348
let batch_result = TOKIO_RUNTIME
340349
.block_on(async { self.inner.poll(Duration::from_millis(500)).await });
341350

342351
match batch_result {
343352
Ok(scan_records) => {
344-
let mut filtered_records: HashMap<
345-
fcore::metadata::TableBucket,
346-
Vec<fcore::record::ScanRecord>,
347-
> = HashMap::new();
348-
for (bucket, records) in scan_records.records_by_buckets() {
349-
let bucket_id = bucket.bucket_id();
350-
if completed_buckets.contains(&bucket_id) {
353+
let mut result_records: Vec<fcore::record::ScanRecord> = vec![];
354+
for (bucket, records) in scan_records.into_records_by_buckets() {
355+
let stopping_offset = stopping_offsets.get(&bucket.bucket_id());
356+
357+
if stopping_offset.is_none() {
358+
// not to include this bucket, skip records for this bucket
359+
// since we already reach end offset for this bucket
351360
continue;
352361
}
353362
if let Some(last_record) = records.last() {
354363
let offset = last_record.offset();
355-
current_offsets.insert(bucket_id, offset);
356-
filtered_records.insert(bucket.clone(), records.clone());
357-
if offset >= target_offsets[&bucket_id] - 1 {
358-
completed_buckets.insert(bucket_id);
364+
result_records.extend(records);
365+
if offset >= stopping_offset.unwrap() - 1 {
366+
stopping_offsets.remove(&bucket.bucket_id());
359367
}
360368
}
361369
}
362370

363-
if !filtered_records.is_empty() {
364-
let filtered_scan_records =
365-
fcore::record::ScanRecords::new(filtered_records);
366-
let arrow_batch =
367-
Utils::convert_scan_records_to_arrow(filtered_scan_records);
371+
if !result_records.is_empty() {
372+
let arrow_batch = Utils::convert_scan_records_to_arrow(result_records);
368373
all_batches.extend(arrow_batch);
369374
}
370375

371-
// completed bucket is equal to all target buckets,
372-
// we can break scan records
373-
if completed_buckets.len() == target_offsets.len() {
376+
// we have reach end offsets of all bucket
377+
if stopping_offsets.is_empty() {
374378
break;
375379
}
376380
}
@@ -399,11 +403,13 @@ impl LogScanner {
399403
impl LogScanner {
400404
/// Create LogScanner from core LogScanner
401405
pub fn from_core(
402-
inner: fcore::client::LogScanner,
406+
inner_scanner: fcore::client::LogScanner,
407+
admin: fcore::client::FlussAdmin,
403408
table_info: fcore::metadata::TableInfo,
404409
) -> Self {
405410
Self {
406-
inner,
411+
inner: inner_scanner,
412+
admin,
407413
table_info,
408414
start_timestamp: None,
409415
end_timestamp: None,

bindings/python/src/utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,9 @@ impl Utils {
152152
.map_err(|e| FlussError::new_err(format!("Invalid kv format '{format_str}': {e}")))
153153
}
154154

155-
/// Convert ScanRecords to Arrow RecordBatch
155+
/// Convert Vec<ScanRecord> to Arrow RecordBatch
156156
pub fn convert_scan_records_to_arrow(
157-
_scan_records: fcore::record::ScanRecords,
157+
_scan_records: Vec<fcore::record::ScanRecord>,
158158
) -> Vec<Arc<arrow::record_batch::RecordBatch>> {
159159
let mut result = Vec::new();
160160
for record in _scan_records {

crates/fluss/src/client/admin.rs

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,16 @@ use crate::rpc::message::{
2525
DropTableRequest, GetDatabaseInfoRequest, GetLatestLakeSnapshotRequest, GetTableRequest,
2626
ListDatabasesRequest, ListTablesRequest, TableExistsRequest,
2727
};
28+
use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
2829
use crate::rpc::{RpcClient, ServerConnection};
2930

30-
use std::collections::HashMap;
31-
use std::sync::Arc;
32-
31+
use crate::BucketId;
3332
use crate::error::Result;
3433
use crate::proto::GetTableInfoResponse;
34+
use std::collections::HashMap;
35+
use std::slice::from_ref;
36+
use std::sync::Arc;
37+
use tokio::task::JoinHandle;
3538

3639
pub struct FlussAdmin {
3740
admin_gateway: ServerConnection,
@@ -216,4 +219,99 @@ impl FlussAdmin {
216219
table_buckets_offset,
217220
))
218221
}
222+
223+
/// List offset for the specified buckets. This operation enables to find the beginning offset,
224+
/// end offset as well as the offset matching a timestamp in buckets.
225+
pub async fn list_offsets(
226+
&self,
227+
table_path: &TablePath,
228+
buckets_id: &[BucketId],
229+
offset_spec: OffsetSpec,
230+
) -> Result<HashMap<i32, i64>> {
231+
self.metadata
232+
.check_and_update_table_metadata(from_ref(table_path))
233+
.await?;
234+
235+
let cluster = self.metadata.get_cluster();
236+
let table_id = cluster.get_table(table_path).table_id;
237+
238+
// Prepare requests
239+
let requests_by_server =
240+
self.prepare_list_offsets_requests(table_id, None, buckets_id, offset_spec)?;
241+
242+
// Send Requests
243+
let response_futures = self.send_list_offsets_request(requests_by_server).await?;
244+
245+
let mut results = HashMap::new();
246+
247+
for response_future in response_futures {
248+
let offsets = response_future.await.map_err(
249+
// todo: consider use suitable error
250+
|e| crate::error::Error::WriteError(format!("Fail to get result: {e}")),
251+
)?;
252+
results.extend(offsets?);
253+
}
254+
Ok(results)
255+
}
256+
257+
fn prepare_list_offsets_requests(
258+
&self,
259+
table_id: i64,
260+
partition_id: Option<i64>,
261+
buckets: &[BucketId],
262+
offset_spec: OffsetSpec,
263+
) -> Result<HashMap<i32, ListOffsetsRequest>> {
264+
let cluster = self.metadata.get_cluster();
265+
let mut node_for_bucket_list: HashMap<i32, Vec<i32>> = HashMap::new();
266+
267+
for bucket_id in buckets {
268+
let table_bucket = TableBucket::new(table_id, *bucket_id);
269+
let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
270+
// todo: consider use another suitable error
271+
crate::error::Error::InvalidTableError(format!(
272+
"No leader found for table bucket: table_id={table_id}, bucket_id={bucket_id}"
273+
))
274+
})?;
275+
276+
node_for_bucket_list
277+
.entry(leader.id())
278+
.or_default()
279+
.push(*bucket_id);
280+
}
281+
282+
let mut list_offsets_requests = HashMap::new();
283+
for (leader_id, bucket_ids) in node_for_bucket_list {
284+
let request =
285+
ListOffsetsRequest::new(table_id, partition_id, bucket_ids, offset_spec.clone());
286+
list_offsets_requests.insert(leader_id, request);
287+
}
288+
Ok(list_offsets_requests)
289+
}
290+
291+
async fn send_list_offsets_request(
292+
&self,
293+
request_map: HashMap<i32, ListOffsetsRequest>,
294+
) -> Result<Vec<JoinHandle<Result<HashMap<i32, i64>>>>> {
295+
let mut tasks = Vec::new();
296+
297+
for (leader_id, request) in request_map {
298+
let rpc_client = self.rpc_client.clone();
299+
let metadata = self.metadata.clone();
300+
301+
let task = tokio::spawn(async move {
302+
let cluster = metadata.get_cluster();
303+
let tablet_server = cluster.get_tablet_server(leader_id).ok_or_else(|| {
304+
// todo: consider use more suitable error
305+
crate::error::Error::InvalidTableError(format!(
306+
"Tablet server {leader_id} not found"
307+
))
308+
})?;
309+
let connection = rpc_client.get_connection(tablet_server).await?;
310+
let list_offsets_response = connection.request(request).await?;
311+
list_offsets_response.offsets()
312+
});
313+
tasks.push(task);
314+
}
315+
Ok(tasks)
316+
}
219317
}

crates/fluss/src/client/metadata.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
use crate::cluster::{Cluster, ServerNode, ServerType};
1919
use crate::metadata::{TableBucket, TablePath};
20-
use crate::rpc::{RpcClient, ServerConnection, UpdateMetadataRequest};
20+
use crate::rpc::message::UpdateMetadataRequest;
21+
use crate::rpc::{RpcClient, ServerConnection};
2122
use parking_lot::RwLock;
2223
use std::collections::HashSet;
2324
use std::net::SocketAddr;

crates/fluss/src/client/table/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use std::sync::Arc;
2222

2323
use crate::error::Result;
2424

25+
pub const EARLIEST_OFFSET: i64 = -2;
26+
2527
mod append;
2628

2729
mod scanner;

0 commit comments

Comments
 (0)