Skip to content

Commit b5204be

Browse files
committed
feat: support subscribe from remote
1 parent 12d464e commit b5204be

File tree

18 files changed

+1045
-50
lines changed

18 files changed

+1045
-50
lines changed

bindings/python/src/table.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ impl FlussTable {
6767

6868
let table_scan = fluss_table.new_scan();
6969

70-
let rust_scanner = table_scan.create_log_scanner();
70+
let rust_scanner = table_scan.create_log_scanner().map_err(|e| {
71+
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
72+
"Failed to create log scanner: {e:?}"
73+
))
74+
})?;
7175

7276
let admin = conn
7377
.get_admin()

crates/examples/src/example_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub async fn main() -> Result<()> {
7070
try_join!(f1, f2, append_writer.flush())?;
7171

7272
// scan rows
73-
let log_scanner = table.new_scan().create_log_scanner();
73+
let log_scanner = table.new_scan().create_log_scanner()?;
7474
log_scanner.subscribe(0, 0).await?;
7575

7676
loop {

crates/fluss/Cargo.toml

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ version = { workspace = true }
2222
name = "fluss"
2323
build = "src/build.rs"
2424

25+
[features]
26+
default = ["storage-memory", "storage-fs"]
27+
storage-all = ["storage-memory", "storage-fs"]
28+
29+
storage-memory = ["opendal/services-memory"]
30+
storage-fs = ["opendal/services-fs"]
31+
integration_tests = []
32+
2533
[dependencies]
2634
arrow = { workspace = true }
2735
arrow-schema = "57.0.0"
@@ -45,16 +53,16 @@ ordered-float = { version = "4", features = ["serde"] }
4553
parse-display = "0.10"
4654
ref-cast = "1.0"
4755
chrono = { workspace = true }
48-
oneshot = "0.1.11"
56+
opendal = { version = "0.49", features = ["services-fs"] }
57+
url = "2.5.7"
58+
async-trait = "0.1.89"
59+
uuid = { version = "1.10", features = ["v4"] }
4960

5061
[dev-dependencies]
5162
testcontainers = "0.25.0"
5263
once_cell = "1.19"
5364
test-env-helpers = "0.2.2"
5465

55-
[features]
56-
integration_tests = []
57-
5866

5967
[build-dependencies]
6068
prost-build = { version = "0.13.5" }

crates/fluss/src/client/table/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub const EARLIEST_OFFSET: i64 = -2;
2626

2727
mod append;
2828

29+
mod remote_log;
2930
mod scanner;
3031
mod writer;
3132

Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
use crate::error::{Error, Result};
18+
use crate::io::{FileIO, Storage};
19+
use crate::metadata::TableBucket;
20+
use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
21+
use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord};
22+
use std::collections::HashMap;
23+
use std::io;
24+
use std::path::{Path, PathBuf};
25+
use tokio::io::AsyncWriteExt;
26+
use tokio::sync::oneshot;
27+
use tracing::warn;
28+
29+
/// Represents a remote log segment that needs to be downloaded
30+
#[derive(Debug, Clone)]
31+
pub struct RemoteLogSegment {
32+
pub segment_id: String,
33+
pub start_offset: i64,
34+
#[allow(dead_code)]
35+
pub end_offset: i64,
36+
#[allow(dead_code)]
37+
pub size_in_bytes: i32,
38+
pub table_bucket: TableBucket,
39+
}
40+
41+
impl RemoteLogSegment {
42+
pub fn from_proto(segment: &PbRemoteLogSegment, table_bucket: TableBucket) -> Self {
43+
Self {
44+
segment_id: segment.remote_log_segment_id.clone(),
45+
start_offset: segment.remote_log_start_offset,
46+
end_offset: segment.remote_log_end_offset,
47+
size_in_bytes: segment.segment_size_in_bytes,
48+
table_bucket,
49+
}
50+
}
51+
52+
/// Get the local file name for this remote log segment
53+
pub fn local_file_name(&self) -> String {
54+
// Format: ${remote_segment_id}_${offset_prefix}.log
55+
let offset_prefix = format!("{:020}", self.start_offset);
56+
format!("{}_{}.log", self.segment_id, offset_prefix)
57+
}
58+
}
59+
60+
/// Represents remote log fetch information
61+
#[derive(Debug, Clone)]
62+
pub struct RemoteLogFetchInfo {
63+
pub remote_log_tablet_dir: String,
64+
#[allow(dead_code)]
65+
pub partition_name: Option<String>,
66+
pub remote_log_segments: Vec<RemoteLogSegment>,
67+
pub first_start_pos: i32,
68+
}
69+
70+
impl RemoteLogFetchInfo {
71+
pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) -> Result<Self> {
72+
let segments = info
73+
.remote_log_segments
74+
.iter()
75+
.map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone()))
76+
.collect();
77+
78+
Ok(Self {
79+
remote_log_tablet_dir: info.remote_log_tablet_dir.clone(),
80+
partition_name: info.partition_name.clone(),
81+
remote_log_segments: segments,
82+
first_start_pos: info.first_start_pos.unwrap_or(0),
83+
})
84+
}
85+
}
86+
87+
/// Future for a remote log download request
88+
pub struct RemoteLogDownloadFuture {
89+
receiver: Option<oneshot::Receiver<Result<PathBuf>>>,
90+
recycle_callback: Option<Box<dyn FnOnce() + Send>>,
91+
}
92+
93+
impl RemoteLogDownloadFuture {
94+
pub fn new(
95+
receiver: oneshot::Receiver<Result<PathBuf>>,
96+
recycle_callback: Box<dyn FnOnce() + Send>,
97+
) -> Self {
98+
Self {
99+
receiver: Some(receiver),
100+
recycle_callback: Some(recycle_callback),
101+
}
102+
}
103+
104+
/// Get the downloaded file path (blocking)
105+
pub async fn get_file_path(&mut self) -> Result<PathBuf> {
106+
let receiver = self.receiver.take().ok_or_else(|| {
107+
Error::Io(io::Error::other(
108+
"Download future already consumed",
109+
))
110+
})?;
111+
112+
receiver.await.map_err(|e| {
113+
Error::Io(io::Error::other(
114+
format!("Download future cancelled: {e:?}"),
115+
))
116+
})?
117+
}
118+
119+
/// Get the recycle callback
120+
pub fn take_recycle_callback(&mut self) -> Option<Box<dyn FnOnce() + Send>> {
121+
self.recycle_callback.take()
122+
}
123+
}
124+
125+
/// Downloader for remote log segment files
126+
pub struct RemoteLogDownloader {
127+
local_log_dir: PathBuf,
128+
}
129+
130+
impl RemoteLogDownloader {
131+
pub fn new<P: AsRef<Path>>(local_log_dir: P) -> Result<Self> {
132+
let local_log_dir = local_log_dir.as_ref().to_path_buf();
133+
std::fs::create_dir_all(&local_log_dir)?;
134+
135+
Ok(Self { local_log_dir })
136+
}
137+
138+
/// Request to fetch a remote log segment to local. This method is non-blocking.
139+
pub fn request_remote_log(
140+
&self,
141+
remote_log_tablet_dir: &str,
142+
segment: &RemoteLogSegment,
143+
) -> Result<RemoteLogDownloadFuture> {
144+
let (sender, receiver) = oneshot::channel();
145+
let local_file_name = segment.local_file_name();
146+
let local_file_path = self.local_log_dir.join(&local_file_name);
147+
let remote_path = self.build_remote_path(remote_log_tablet_dir, segment);
148+
let remote_log_tablet_dir = remote_log_tablet_dir.to_string();
149+
let recycle_callback: Box<dyn FnOnce() + Send> = Box::new({
150+
let local_file_path = local_file_path.clone();
151+
move || {
152+
// Clean up the downloaded file
153+
let _ = std::fs::remove_file(&local_file_path);
154+
}
155+
});
156+
157+
// Spawn async download task
158+
tokio::spawn(async move {
159+
let result =
160+
Self::download_file(&remote_log_tablet_dir, &remote_path, &local_file_path).await;
161+
let _ = sender.send(result);
162+
});
163+
164+
Ok(RemoteLogDownloadFuture::new(receiver, recycle_callback))
165+
}
166+
167+
/// Build the remote path for a log segment
168+
fn build_remote_path(&self, remote_log_tablet_dir: &str, segment: &RemoteLogSegment) -> String {
169+
// Format: ${remote_log_tablet_dir}/${segment_id}/${offset_prefix}.log
170+
let offset_prefix = format!("{:020}", segment.start_offset);
171+
format!(
172+
"{}/{}/{}.log",
173+
remote_log_tablet_dir, segment.segment_id, offset_prefix
174+
)
175+
}
176+
177+
/// Download a file from remote storage to local using streaming read/write
178+
async fn download_file(
179+
remote_log_tablet_dir: &str,
180+
remote_path: &str,
181+
local_path: &Path,
182+
) -> Result<PathBuf> {
183+
// Handle both URL (e.g., "s3://bucket/path") and local file paths
184+
// If the path doesn't contain "://", treat it as a local file path
185+
let remote_log_tablet_dir_url = if remote_log_tablet_dir.contains("://") {
186+
remote_log_tablet_dir.to_string()
187+
} else {
188+
format!("file://{remote_log_tablet_dir}")
189+
};
190+
191+
// Create FileIO from the remote log tablet dir URL to get the storage
192+
let file_io_builder = FileIO::from_url(&remote_log_tablet_dir_url)?;
193+
194+
// Build storage and create operator directly
195+
let storage = Storage::build(file_io_builder)?;
196+
let (op, relative_path) = storage.create(remote_path)?;
197+
198+
// Get file metadata to know the size
199+
let meta = op.stat(relative_path).await?;
200+
let file_size = meta.content_length();
201+
202+
// Create local file for writing
203+
let mut local_file = tokio::fs::File::create(local_path).await?;
204+
205+
// Stream data from remote to local file in chunks
206+
// opendal::Reader::read accepts a range, so we read in chunks
207+
const CHUNK_SIZE: u64 = 64 * 1024; // 64KB chunks for efficient streaming
208+
let mut offset = 0u64;
209+
210+
while offset < file_size {
211+
let end = std::cmp::min(offset + CHUNK_SIZE, file_size);
212+
let range = offset..end;
213+
214+
// Read chunk from remote storage
215+
let chunk = op.read_with(relative_path).range(range.clone()).await?;
216+
let bytes = chunk.to_bytes();
217+
218+
// Write chunk to local file
219+
local_file.write_all(&bytes).await?;
220+
221+
offset = end;
222+
}
223+
224+
// Ensure all data is flushed to disk
225+
local_file.sync_all().await?;
226+
227+
Ok(local_path.to_path_buf())
228+
}
229+
}
230+
231+
impl Drop for RemoteLogDownloader {
232+
fn drop(&mut self) {
233+
// Clean up the local log directory
234+
if self.local_log_dir.exists() {
235+
std::fs::remove_dir_all(&self.local_log_dir).unwrap_or_else(|_| {
236+
warn!("Failed to delete local log dir: {:?}", &self.local_log_dir)
237+
});
238+
}
239+
}
240+
}
241+
242+
/// Pending fetch that waits for remote log file to be downloaded
243+
pub struct RemotePendingFetch {
244+
segment: RemoteLogSegment,
245+
download_future: RemoteLogDownloadFuture,
246+
pos_in_log_segment: i32,
247+
#[allow(dead_code)]
248+
fetch_offset: i64,
249+
#[allow(dead_code)]
250+
high_watermark: i64,
251+
read_context: ReadContext,
252+
}
253+
254+
impl RemotePendingFetch {
255+
pub fn new(
256+
segment: RemoteLogSegment,
257+
download_future: RemoteLogDownloadFuture,
258+
pos_in_log_segment: i32,
259+
fetch_offset: i64,
260+
high_watermark: i64,
261+
read_context: ReadContext,
262+
) -> Self {
263+
Self {
264+
segment,
265+
download_future,
266+
pos_in_log_segment,
267+
fetch_offset,
268+
high_watermark,
269+
read_context,
270+
}
271+
}
272+
273+
/// Convert to completed fetch by reading the downloaded file
274+
pub async fn convert_to_completed_fetch(mut self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
275+
let file_path = self.download_future.get_file_path().await?;
276+
let file_data = std::fs::read(&file_path)?;
277+
278+
// Slice the data if needed
279+
let data = if self.pos_in_log_segment > 0 {
280+
&file_data[self.pos_in_log_segment as usize..]
281+
} else {
282+
&file_data
283+
};
284+
285+
// Parse log records
286+
let mut fetch_records = vec![];
287+
for log_record in &mut LogRecordsBatchs::new(data) {
288+
fetch_records.extend(log_record.records(&self.read_context)?);
289+
}
290+
291+
// Clean up the file
292+
if let Some(callback) = self.download_future.take_recycle_callback() {
293+
callback();
294+
}
295+
296+
let mut result = HashMap::new();
297+
result.insert(self.segment.table_bucket.clone(), fetch_records);
298+
Ok(result)
299+
}
300+
}

0 commit comments

Comments
 (0)