Skip to content

Commit 4098976

Browse files
committed
feat: support subscribe from remote
1 parent 12d464e commit 4098976

File tree

19 files changed

+1022
-51
lines changed

19 files changed

+1022
-51
lines changed

bindings/python/src/table.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ impl FlussTable {
6767

6868
let table_scan = fluss_table.new_scan();
6969

70-
let rust_scanner = table_scan.create_log_scanner();
70+
let rust_scanner = table_scan.create_log_scanner().map_err(|e| {
71+
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!(
72+
"Failed to create log scanner: {e:?}"
73+
))
74+
})?;
7175

7276
let admin = conn
7377
.get_admin()

crates/examples/src/example_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub async fn main() -> Result<()> {
7070
try_join!(f1, f2, append_writer.flush())?;
7171

7272
// scan rows
73-
let log_scanner = table.new_scan().create_log_scanner();
73+
let log_scanner = table.new_scan().create_log_scanner()?;
7474
log_scanner.subscribe(0, 0).await?;
7575

7676
loop {

crates/fluss/Cargo.toml

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ version = { workspace = true }
2222
name = "fluss"
2323
build = "src/build.rs"
2424

25+
[features]
26+
default = ["storage-memory", "storage-fs"]
27+
storage-all = ["storage-memory", "storage-fs"]
28+
29+
storage-memory = ["opendal/services-memory"]
30+
storage-fs = ["opendal/services-fs"]
31+
integration_tests = []
32+
2533
[dependencies]
2634
arrow = { workspace = true }
2735
arrow-schema = "57.0.0"
@@ -45,16 +53,17 @@ ordered-float = { version = "4", features = ["serde"] }
4553
parse-display = "0.10"
4654
ref-cast = "1.0"
4755
chrono = { workspace = true }
48-
oneshot = "0.1.11"
56+
opendal = "0.53.3"
57+
url = "2.5.7"
58+
async-trait = "0.1.89"
59+
uuid = { version = "1.10", features = ["v4"] }
60+
tempfile= "3.23.0"
4961

5062
[dev-dependencies]
5163
testcontainers = "0.25.0"
5264
once_cell = "1.19"
5365
test-env-helpers = "0.2.2"
5466

55-
[features]
56-
integration_tests = []
57-
5867

5968
[build-dependencies]
6069
prost-build = { version = "0.13.5" }

crates/fluss/src/client/table/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub const EARLIEST_OFFSET: i64 = -2;
2626

2727
mod append;
2828

29+
mod remote_log;
2930
mod scanner;
3031
mod writer;
3132

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
use crate::error::{Error, Result};
18+
use crate::io::{FileIO, Storage};
19+
use crate::metadata::TableBucket;
20+
use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
21+
use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord};
22+
use crate::util::delete_file;
23+
use std::collections::HashMap;
24+
use std::io;
25+
use std::path::{Path, PathBuf};
26+
use tempfile::TempDir;
27+
use tokio::io::AsyncWriteExt;
28+
use tokio::sync::oneshot;
29+
30+
/// Represents a remote log segment that needs to be downloaded
31+
#[derive(Debug, Clone)]
32+
pub struct RemoteLogSegment {
33+
pub segment_id: String,
34+
pub start_offset: i64,
35+
#[allow(dead_code)]
36+
pub end_offset: i64,
37+
#[allow(dead_code)]
38+
pub size_in_bytes: i32,
39+
pub table_bucket: TableBucket,
40+
}
41+
42+
impl RemoteLogSegment {
43+
pub fn from_proto(segment: &PbRemoteLogSegment, table_bucket: TableBucket) -> Self {
44+
Self {
45+
segment_id: segment.remote_log_segment_id.clone(),
46+
start_offset: segment.remote_log_start_offset,
47+
end_offset: segment.remote_log_end_offset,
48+
size_in_bytes: segment.segment_size_in_bytes,
49+
table_bucket,
50+
}
51+
}
52+
53+
/// Get the local file name for this remote log segment
54+
pub fn local_file_name(&self) -> String {
55+
// Format: ${remote_segment_id}_${offset_prefix}.log
56+
let offset_prefix = format!("{:020}", self.start_offset);
57+
format!("{}_{}.log", self.segment_id, offset_prefix)
58+
}
59+
}
60+
61+
/// Represents remote log fetch information
62+
#[derive(Debug, Clone)]
63+
pub struct RemoteLogFetchInfo {
64+
pub remote_log_tablet_dir: String,
65+
#[allow(dead_code)]
66+
pub partition_name: Option<String>,
67+
pub remote_log_segments: Vec<RemoteLogSegment>,
68+
pub first_start_pos: i32,
69+
}
70+
71+
impl RemoteLogFetchInfo {
72+
pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) -> Result<Self> {
73+
let segments = info
74+
.remote_log_segments
75+
.iter()
76+
.map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone()))
77+
.collect();
78+
79+
Ok(Self {
80+
remote_log_tablet_dir: info.remote_log_tablet_dir.clone(),
81+
partition_name: info.partition_name.clone(),
82+
remote_log_segments: segments,
83+
first_start_pos: info.first_start_pos.unwrap_or(0),
84+
})
85+
}
86+
}
87+
88+
/// Future for a remote log download request
89+
pub struct RemoteLogDownloadFuture {
90+
receiver: Option<oneshot::Receiver<Result<PathBuf>>>,
91+
}
92+
93+
impl RemoteLogDownloadFuture {
94+
pub fn new(receiver: oneshot::Receiver<Result<PathBuf>>) -> Self {
95+
Self {
96+
receiver: Some(receiver),
97+
}
98+
}
99+
100+
/// Get the downloaded file path
101+
pub async fn get_file_path(&mut self) -> Result<PathBuf> {
102+
let receiver = self
103+
.receiver
104+
.take()
105+
.ok_or_else(|| Error::Io(io::Error::other("Download future already consumed")))?;
106+
107+
receiver.await.map_err(|e| {
108+
Error::Io(io::Error::other(format!(
109+
"Download future cancelled: {e:?}"
110+
)))
111+
})?
112+
}
113+
}
114+
115+
/// Downloader for remote log segment files
116+
pub struct RemoteLogDownloader {
117+
local_log_dir: TempDir,
118+
}
119+
120+
impl RemoteLogDownloader {
121+
pub fn new(local_log_dir: TempDir) -> Result<Self> {
122+
Ok(Self { local_log_dir })
123+
}
124+
125+
/// Request to fetch a remote log segment to local. This method is non-blocking.
126+
pub fn request_remote_log(
127+
&self,
128+
remote_log_tablet_dir: &str,
129+
segment: &RemoteLogSegment,
130+
) -> Result<RemoteLogDownloadFuture> {
131+
let (sender, receiver) = oneshot::channel();
132+
let local_file_name = segment.local_file_name();
133+
let local_file_path = self.local_log_dir.path().join(&local_file_name);
134+
let remote_path = self.build_remote_path(remote_log_tablet_dir, segment);
135+
let remote_log_tablet_dir = remote_log_tablet_dir.to_string();
136+
// Spawn async download task
137+
tokio::spawn(async move {
138+
let result =
139+
Self::download_file(&remote_log_tablet_dir, &remote_path, &local_file_path).await;
140+
let _ = sender.send(result);
141+
});
142+
Ok(RemoteLogDownloadFuture::new(receiver))
143+
}
144+
145+
/// Build the remote path for a log segment
146+
fn build_remote_path(&self, remote_log_tablet_dir: &str, segment: &RemoteLogSegment) -> String {
147+
// Format: ${remote_log_tablet_dir}/${segment_id}/${offset_prefix}.log
148+
let offset_prefix = format!("{:020}", segment.start_offset);
149+
format!(
150+
"{}/{}/{}.log",
151+
remote_log_tablet_dir, segment.segment_id, offset_prefix
152+
)
153+
}
154+
155+
/// Download a file from remote storage to local using streaming read/write
156+
async fn download_file(
157+
remote_log_tablet_dir: &str,
158+
remote_path: &str,
159+
local_path: &Path,
160+
) -> Result<PathBuf> {
161+
// Handle both URL (e.g., "s3://bucket/path") and local file paths
162+
// If the path doesn't contain "://", treat it as a local file path
163+
let remote_log_tablet_dir_url = if remote_log_tablet_dir.contains("://") {
164+
remote_log_tablet_dir.to_string()
165+
} else {
166+
format!("file://{remote_log_tablet_dir}")
167+
};
168+
169+
// Create FileIO from the remote log tablet dir URL to get the storage
170+
let file_io_builder = FileIO::from_url(&remote_log_tablet_dir_url)?;
171+
172+
// Build storage and create operator directly
173+
let storage = Storage::build(file_io_builder)?;
174+
let (op, relative_path) = storage.create(remote_path)?;
175+
176+
// Get file metadata to know the size
177+
let meta = op.stat(relative_path).await?;
178+
let file_size = meta.content_length();
179+
180+
// Create local file for writing
181+
let mut local_file = tokio::fs::File::create(local_path).await?;
182+
183+
// Stream data from remote to local file in chunks
184+
// opendal::Reader::read accepts a range, so we read in chunks
185+
const CHUNK_SIZE: u64 = 64 * 1024; // 64KB chunks for efficient streaming
186+
let mut offset = 0u64;
187+
188+
while offset < file_size {
189+
let end = std::cmp::min(offset + CHUNK_SIZE, file_size);
190+
let range = offset..end;
191+
192+
// Read chunk from remote storage
193+
let chunk = op.read_with(relative_path).range(range.clone()).await?;
194+
let bytes = chunk.to_bytes();
195+
196+
// Write chunk to local file
197+
local_file.write_all(&bytes).await?;
198+
199+
offset = end;
200+
}
201+
202+
// Ensure all data is flushed to disk
203+
local_file.sync_all().await?;
204+
205+
Ok(local_path.to_path_buf())
206+
}
207+
}
208+
209+
/// Pending fetch that waits for remote log file to be downloaded
210+
pub struct RemotePendingFetch {
211+
segment: RemoteLogSegment,
212+
download_future: RemoteLogDownloadFuture,
213+
pos_in_log_segment: i32,
214+
#[allow(dead_code)]
215+
fetch_offset: i64,
216+
#[allow(dead_code)]
217+
high_watermark: i64,
218+
read_context: ReadContext,
219+
}
220+
221+
impl RemotePendingFetch {
222+
pub fn new(
223+
segment: RemoteLogSegment,
224+
download_future: RemoteLogDownloadFuture,
225+
pos_in_log_segment: i32,
226+
fetch_offset: i64,
227+
high_watermark: i64,
228+
read_context: ReadContext,
229+
) -> Self {
230+
Self {
231+
segment,
232+
download_future,
233+
pos_in_log_segment,
234+
fetch_offset,
235+
high_watermark,
236+
read_context,
237+
}
238+
}
239+
240+
/// Convert to completed fetch by reading the downloaded file
241+
pub async fn convert_to_completed_fetch(
242+
mut self,
243+
) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
244+
let file_path = self.download_future.get_file_path().await?;
245+
let file_data = tokio::fs::read(&file_path).await?;
246+
247+
// Slice the data if needed
248+
let data = if self.pos_in_log_segment > 0 {
249+
&file_data[self.pos_in_log_segment as usize..]
250+
} else {
251+
&file_data
252+
};
253+
254+
// delete the downloaded local file to free disk
255+
delete_file(file_path).await;
256+
257+
// Parse log records
258+
let mut fetch_records = vec![];
259+
for log_record in &mut LogRecordsBatchs::new(data) {
260+
fetch_records.extend(log_record.records(&self.read_context)?);
261+
}
262+
263+
let mut result = HashMap::new();
264+
result.insert(self.segment.table_bucket.clone(), fetch_records);
265+
Ok(result)
266+
}
267+
}

0 commit comments

Comments
 (0)