Skip to content

Commit 31d0abf

Browse files
committed
feat: support subscribe from remote
1 parent 12d464e commit 31d0abf

File tree

18 files changed

+1045
-50
lines changed

18 files changed

+1045
-50
lines changed

bindings/python/src/table.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ impl FlussTable {
6767

6868
let table_scan = fluss_table.new_scan();
6969

70-
let rust_scanner = table_scan.create_log_scanner();
70+
let rust_scanner = table_scan.create_log_scanner().map_err(|e| {
71+
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
72+
"Failed to create log scanner: {e:?}"
73+
))
74+
})?;
7175

7276
let admin = conn
7377
.get_admin()

crates/examples/src/example_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub async fn main() -> Result<()> {
7070
try_join!(f1, f2, append_writer.flush())?;
7171

7272
// scan rows
73-
let log_scanner = table.new_scan().create_log_scanner();
73+
let log_scanner = table.new_scan().create_log_scanner()?;
7474
log_scanner.subscribe(0, 0).await?;
7575

7676
loop {

crates/fluss/Cargo.toml

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ version = { workspace = true }
2222
name = "fluss"
2323
build = "src/build.rs"
2424

25+
[features]
26+
default = ["storage-memory", "storage-fs"]
27+
storage-all = ["storage-memory", "storage-fs"]
28+
29+
storage-memory = ["opendal/services-memory"]
30+
storage-fs = ["opendal/services-fs"]
31+
integration_tests = []
32+
2533
[dependencies]
2634
arrow = { workspace = true }
2735
arrow-schema = "57.0.0"
@@ -45,16 +53,16 @@ ordered-float = { version = "4", features = ["serde"] }
4553
parse-display = "0.10"
4654
ref-cast = "1.0"
4755
chrono = { workspace = true }
48-
oneshot = "0.1.11"
56+
opendal = { version = "0.49", features = ["services-fs"] }
57+
url = "2.5.7"
58+
async-trait = "0.1.89"
59+
uuid = { version = "1.10", features = ["v4"] }
4960

5061
[dev-dependencies]
5162
testcontainers = "0.25.0"
5263
once_cell = "1.19"
5364
test-env-helpers = "0.2.2"
5465

55-
[features]
56-
integration_tests = []
57-
5866

5967
[build-dependencies]
6068
prost-build = { version = "0.13.5" }

crates/fluss/src/client/table/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub const EARLIEST_OFFSET: i64 = -2;
2626

2727
mod append;
2828

29+
mod remote_log;
2930
mod scanner;
3031
mod writer;
3132

Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
use crate::error::{Error, Result};
18+
use crate::io::{FileIO, Storage};
19+
use crate::metadata::TableBucket;
20+
use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
21+
use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord};
22+
use std::collections::HashMap;
23+
use std::io;
24+
use std::path::{Path, PathBuf};
25+
use tokio::io::AsyncWriteExt;
26+
use tokio::sync::oneshot;
27+
use tracing::warn;
28+
29+
/// Represents a remote log segment that needs to be downloaded
30+
#[derive(Debug, Clone)]
31+
pub struct RemoteLogSegment {
32+
pub segment_id: String,
33+
pub start_offset: i64,
34+
#[allow(dead_code)]
35+
pub end_offset: i64,
36+
#[allow(dead_code)]
37+
pub size_in_bytes: i32,
38+
pub table_bucket: TableBucket,
39+
}
40+
41+
impl RemoteLogSegment {
42+
pub fn from_proto(segment: &PbRemoteLogSegment, table_bucket: TableBucket) -> Self {
43+
Self {
44+
segment_id: segment.remote_log_segment_id.clone(),
45+
start_offset: segment.remote_log_start_offset,
46+
end_offset: segment.remote_log_end_offset,
47+
size_in_bytes: segment.segment_size_in_bytes,
48+
table_bucket,
49+
}
50+
}
51+
52+
/// Get the local file name for this remote log segment
53+
pub fn local_file_name(&self) -> String {
54+
// Format: ${remote_segment_id}_${offset_prefix}.log
55+
let offset_prefix = format!("{:020}", self.start_offset);
56+
format!("{}_{}.log", self.segment_id, offset_prefix)
57+
}
58+
}
59+
60+
/// Represents remote log fetch information
61+
#[derive(Debug, Clone)]
62+
pub struct RemoteLogFetchInfo {
63+
pub remote_log_tablet_dir: String,
64+
#[allow(dead_code)]
65+
pub partition_name: Option<String>,
66+
pub remote_log_segments: Vec<RemoteLogSegment>,
67+
pub first_start_pos: i32,
68+
}
69+
70+
impl RemoteLogFetchInfo {
71+
pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) -> Result<Self> {
72+
let segments = info
73+
.remote_log_segments
74+
.iter()
75+
.map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone()))
76+
.collect();
77+
78+
Ok(Self {
79+
remote_log_tablet_dir: info.remote_log_tablet_dir.clone(),
80+
partition_name: info.partition_name.clone(),
81+
remote_log_segments: segments,
82+
first_start_pos: info.first_start_pos.unwrap_or(0),
83+
})
84+
}
85+
}
86+
87+
/// Future for a remote log download request
88+
pub struct RemoteLogDownloadFuture {
89+
receiver: Option<oneshot::Receiver<Result<PathBuf>>>,
90+
recycle_callback: Option<Box<dyn FnOnce() + Send>>,
91+
}
92+
93+
impl RemoteLogDownloadFuture {
94+
pub fn new(
95+
receiver: oneshot::Receiver<Result<PathBuf>>,
96+
recycle_callback: Box<dyn FnOnce() + Send>,
97+
) -> Self {
98+
Self {
99+
receiver: Some(receiver),
100+
recycle_callback: Some(recycle_callback),
101+
}
102+
}
103+
104+
/// Get the downloaded file path (blocking)
105+
pub async fn get_file_path(&mut self) -> Result<PathBuf> {
106+
let receiver = self
107+
.receiver
108+
.take()
109+
.ok_or_else(|| Error::Io(io::Error::other("Download future already consumed")))?;
110+
111+
receiver.await.map_err(|e| {
112+
Error::Io(io::Error::other(format!(
113+
"Download future cancelled: {e:?}"
114+
)))
115+
})?
116+
}
117+
118+
/// Get the recycle callback
119+
pub fn take_recycle_callback(&mut self) -> Option<Box<dyn FnOnce() + Send>> {
120+
self.recycle_callback.take()
121+
}
122+
}
123+
124+
/// Downloader for remote log segment files
125+
pub struct RemoteLogDownloader {
126+
local_log_dir: PathBuf,
127+
}
128+
129+
impl RemoteLogDownloader {
130+
pub fn new<P: AsRef<Path>>(local_log_dir: P) -> Result<Self> {
131+
let local_log_dir = local_log_dir.as_ref().to_path_buf();
132+
std::fs::create_dir_all(&local_log_dir)?;
133+
134+
Ok(Self { local_log_dir })
135+
}
136+
137+
/// Request to fetch a remote log segment to local. This method is non-blocking.
138+
pub fn request_remote_log(
139+
&self,
140+
remote_log_tablet_dir: &str,
141+
segment: &RemoteLogSegment,
142+
) -> Result<RemoteLogDownloadFuture> {
143+
let (sender, receiver) = oneshot::channel();
144+
let local_file_name = segment.local_file_name();
145+
let local_file_path = self.local_log_dir.join(&local_file_name);
146+
let remote_path = self.build_remote_path(remote_log_tablet_dir, segment);
147+
let remote_log_tablet_dir = remote_log_tablet_dir.to_string();
148+
let recycle_callback: Box<dyn FnOnce() + Send> = Box::new({
149+
let local_file_path = local_file_path.clone();
150+
move || {
151+
// Clean up the downloaded file
152+
let _ = std::fs::remove_file(&local_file_path);
153+
}
154+
});
155+
156+
// Spawn async download task
157+
tokio::spawn(async move {
158+
let result =
159+
Self::download_file(&remote_log_tablet_dir, &remote_path, &local_file_path).await;
160+
let _ = sender.send(result);
161+
});
162+
163+
Ok(RemoteLogDownloadFuture::new(receiver, recycle_callback))
164+
}
165+
166+
/// Build the remote path for a log segment
167+
fn build_remote_path(&self, remote_log_tablet_dir: &str, segment: &RemoteLogSegment) -> String {
168+
// Format: ${remote_log_tablet_dir}/${segment_id}/${offset_prefix}.log
169+
let offset_prefix = format!("{:020}", segment.start_offset);
170+
format!(
171+
"{}/{}/{}.log",
172+
remote_log_tablet_dir, segment.segment_id, offset_prefix
173+
)
174+
}
175+
176+
/// Download a file from remote storage to local using streaming read/write
177+
async fn download_file(
178+
remote_log_tablet_dir: &str,
179+
remote_path: &str,
180+
local_path: &Path,
181+
) -> Result<PathBuf> {
182+
// Handle both URL (e.g., "s3://bucket/path") and local file paths
183+
// If the path doesn't contain "://", treat it as a local file path
184+
let remote_log_tablet_dir_url = if remote_log_tablet_dir.contains("://") {
185+
remote_log_tablet_dir.to_string()
186+
} else {
187+
format!("file://{remote_log_tablet_dir}")
188+
};
189+
190+
// Create FileIO from the remote log tablet dir URL to get the storage
191+
let file_io_builder = FileIO::from_url(&remote_log_tablet_dir_url)?;
192+
193+
// Build storage and create operator directly
194+
let storage = Storage::build(file_io_builder)?;
195+
let (op, relative_path) = storage.create(remote_path)?;
196+
197+
// Get file metadata to know the size
198+
let meta = op.stat(relative_path).await?;
199+
let file_size = meta.content_length();
200+
201+
// Create local file for writing
202+
let mut local_file = tokio::fs::File::create(local_path).await?;
203+
204+
// Stream data from remote to local file in chunks
205+
// opendal::Reader::read accepts a range, so we read in chunks
206+
const CHUNK_SIZE: u64 = 64 * 1024; // 64KB chunks for efficient streaming
207+
let mut offset = 0u64;
208+
209+
while offset < file_size {
210+
let end = std::cmp::min(offset + CHUNK_SIZE, file_size);
211+
let range = offset..end;
212+
213+
// Read chunk from remote storage
214+
let chunk = op.read_with(relative_path).range(range.clone()).await?;
215+
let bytes = chunk.to_bytes();
216+
217+
// Write chunk to local file
218+
local_file.write_all(&bytes).await?;
219+
220+
offset = end;
221+
}
222+
223+
// Ensure all data is flushed to disk
224+
local_file.sync_all().await?;
225+
226+
Ok(local_path.to_path_buf())
227+
}
228+
}
229+
230+
impl Drop for RemoteLogDownloader {
231+
fn drop(&mut self) {
232+
// Clean up the local log directory
233+
if self.local_log_dir.exists() {
234+
std::fs::remove_dir_all(&self.local_log_dir).unwrap_or_else(|_| {
235+
warn!("Failed to delete local log dir: {:?}", &self.local_log_dir)
236+
});
237+
}
238+
}
239+
}
240+
241+
/// Pending fetch that waits for remote log file to be downloaded
242+
pub struct RemotePendingFetch {
243+
segment: RemoteLogSegment,
244+
download_future: RemoteLogDownloadFuture,
245+
pos_in_log_segment: i32,
246+
#[allow(dead_code)]
247+
fetch_offset: i64,
248+
#[allow(dead_code)]
249+
high_watermark: i64,
250+
read_context: ReadContext,
251+
}
252+
253+
impl RemotePendingFetch {
254+
pub fn new(
255+
segment: RemoteLogSegment,
256+
download_future: RemoteLogDownloadFuture,
257+
pos_in_log_segment: i32,
258+
fetch_offset: i64,
259+
high_watermark: i64,
260+
read_context: ReadContext,
261+
) -> Self {
262+
Self {
263+
segment,
264+
download_future,
265+
pos_in_log_segment,
266+
fetch_offset,
267+
high_watermark,
268+
read_context,
269+
}
270+
}
271+
272+
/// Convert to completed fetch by reading the downloaded file
273+
pub async fn convert_to_completed_fetch(
274+
mut self,
275+
) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
276+
let file_path = self.download_future.get_file_path().await?;
277+
let file_data = std::fs::read(&file_path)?;
278+
279+
// Slice the data if needed
280+
let data = if self.pos_in_log_segment > 0 {
281+
&file_data[self.pos_in_log_segment as usize..]
282+
} else {
283+
&file_data
284+
};
285+
286+
// Parse log records
287+
let mut fetch_records = vec![];
288+
for log_record in &mut LogRecordsBatchs::new(data) {
289+
fetch_records.extend(log_record.records(&self.read_context)?);
290+
}
291+
292+
// Clean up the file
293+
if let Some(callback) = self.download_future.take_recycle_callback() {
294+
callback();
295+
}
296+
297+
let mut result = HashMap::new();
298+
result.insert(self.segment.table_bucket.clone(), fetch_records);
299+
Ok(result)
300+
}
301+
}

0 commit comments

Comments
 (0)