diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 2a8df25..71759d7 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -67,7 +67,11 @@ impl FlussTable { let table_scan = fluss_table.new_scan(); - let rust_scanner = table_scan.create_log_scanner(); + let rust_scanner = table_scan.create_log_scanner().map_err(|e| { + PyErr::new::(format!( + "Failed to create log scanner: {e:?}" + )) + })?; let admin = conn .get_admin() diff --git a/crates/examples/src/example_table.rs b/crates/examples/src/example_table.rs index deab363..2d6ac53 100644 --- a/crates/examples/src/example_table.rs +++ b/crates/examples/src/example_table.rs @@ -70,7 +70,7 @@ pub async fn main() -> Result<()> { try_join!(f1, f2, append_writer.flush())?; // scan rows - let log_scanner = table.new_scan().create_log_scanner(); + let log_scanner = table.new_scan().create_log_scanner()?; log_scanner.subscribe(0, 0).await?; loop { diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index af77037..4547b9c 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -22,6 +22,14 @@ version = { workspace = true } name = "fluss" build = "src/build.rs" +[features] +default = ["storage-memory", "storage-fs"] +storage-all = ["storage-memory", "storage-fs"] + +storage-memory = ["opendal/services-memory"] +storage-fs = ["opendal/services-fs"] +integration_tests = [] + [dependencies] arrow = { workspace = true } arrow-schema = "57.0.0" @@ -45,16 +53,17 @@ ordered-float = { version = "4", features = ["serde"] } parse-display = "0.10" ref-cast = "1.0" chrono = { workspace = true } -oneshot = "0.1.11" +opendal = "0.53.3" +url = "2.5.7" +async-trait = "0.1.89" +uuid = { version = "1.10", features = ["v4"] } +tempfile= "3.23.0" [dev-dependencies] testcontainers = "0.25.0" once_cell = "1.19" test-env-helpers = "0.2.2" -[features] -integration_tests = [] - [build-dependencies] prost-build = { version = "0.13.5" } diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 52ae700..9972247 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -26,6 +26,7 @@ pub const EARLIEST_OFFSET: i64 = -2; mod append; +mod remote_log; mod scanner; mod writer; diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs new file mode 100644 index 0000000..0f85282 --- /dev/null +++ b/crates/fluss/src/client/table/remote_log.rs @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{Error, Result}; +use crate::io::{FileIO, Storage}; +use crate::metadata::TableBucket; +use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; +use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord}; +use crate::util::delete_file; +use std::collections::HashMap; +use std::io; +use std::path::{Path, PathBuf}; +use tempfile::TempDir; +use tokio::io::AsyncWriteExt; +use tokio::sync::oneshot; + +/// Represents a remote log segment that needs to be downloaded +#[derive(Debug, Clone)] +pub struct RemoteLogSegment { + pub segment_id: String, + pub start_offset: i64, + #[allow(dead_code)] + pub end_offset: i64, + #[allow(dead_code)] + pub size_in_bytes: i32, + pub table_bucket: TableBucket, +} + +impl RemoteLogSegment { + pub fn from_proto(segment: &PbRemoteLogSegment, table_bucket: TableBucket) -> Self { + Self { + segment_id: segment.remote_log_segment_id.clone(), + start_offset: segment.remote_log_start_offset, + end_offset: segment.remote_log_end_offset, + size_in_bytes: segment.segment_size_in_bytes, + table_bucket, + } + } + + /// Get the local file name for this remote log segment + pub fn local_file_name(&self) -> String { + // Format: ${remote_segment_id}_${offset_prefix}.log + let offset_prefix = format!("{:020}", self.start_offset); + format!("{}_{}.log", self.segment_id, offset_prefix) + } +} + +/// Represents remote log fetch information +#[derive(Debug, Clone)] +pub struct RemoteLogFetchInfo { + pub remote_log_tablet_dir: String, + #[allow(dead_code)] + pub partition_name: Option, + pub remote_log_segments: Vec, + pub first_start_pos: i32, +} + +impl RemoteLogFetchInfo { + pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) -> Result { + let segments = info + .remote_log_segments + .iter() + .map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone())) + .collect(); + + Ok(Self { + remote_log_tablet_dir: info.remote_log_tablet_dir.clone(), + partition_name: info.partition_name.clone(), + remote_log_segments: segments, + first_start_pos: info.first_start_pos.unwrap_or(0), + }) + } +} + +/// Future for a remote log download request +pub struct RemoteLogDownloadFuture { + receiver: Option>>, +} + +impl RemoteLogDownloadFuture { + pub fn new(receiver: oneshot::Receiver>) -> Self { + Self { + receiver: Some(receiver), + } + } + + /// Get the downloaded file path + pub async fn get_file_path(&mut self) -> Result { + let receiver = self + .receiver + .take() + .ok_or_else(|| Error::Io(io::Error::other("Download future already consumed")))?; + + receiver.await.map_err(|e| { + Error::Io(io::Error::other(format!( + "Download future cancelled: {e:?}" + ))) + })? + } +} + +/// Downloader for remote log segment files +pub struct RemoteLogDownloader { + local_log_dir: TempDir, +} + +impl RemoteLogDownloader { + pub fn new(local_log_dir: TempDir) -> Result { + Ok(Self { local_log_dir }) + } + + /// Request to fetch a remote log segment to local. This method is non-blocking. + pub fn request_remote_log( + &self, + remote_log_tablet_dir: &str, + segment: &RemoteLogSegment, + ) -> Result { + let (sender, receiver) = oneshot::channel(); + let local_file_name = segment.local_file_name(); + let local_file_path = self.local_log_dir.path().join(&local_file_name); + let remote_path = self.build_remote_path(remote_log_tablet_dir, segment); + let remote_log_tablet_dir = remote_log_tablet_dir.to_string(); + // Spawn async download task + tokio::spawn(async move { + let result = + Self::download_file(&remote_log_tablet_dir, &remote_path, &local_file_path).await; + let _ = sender.send(result); + }); + Ok(RemoteLogDownloadFuture::new(receiver)) + } + + /// Build the remote path for a log segment + fn build_remote_path(&self, remote_log_tablet_dir: &str, segment: &RemoteLogSegment) -> String { + // Format: ${remote_log_tablet_dir}/${segment_id}/${offset_prefix}.log + let offset_prefix = format!("{:020}", segment.start_offset); + format!( + "{}/{}/{}.log", + remote_log_tablet_dir, segment.segment_id, offset_prefix + ) + } + + /// Download a file from remote storage to local using streaming read/write + async fn download_file( + remote_log_tablet_dir: &str, + remote_path: &str, + local_path: &Path, + ) -> Result { + // Handle both URL (e.g., "s3://bucket/path") and local file paths + // If the path doesn't contain "://", treat it as a local file path + let remote_log_tablet_dir_url = if remote_log_tablet_dir.contains("://") { + remote_log_tablet_dir.to_string() + } else { + format!("file://{remote_log_tablet_dir}") + }; + + // Create FileIO from the remote log tablet dir URL to get the storage + let file_io_builder = FileIO::from_url(&remote_log_tablet_dir_url)?; + + // Build storage and create operator directly + let storage = Storage::build(file_io_builder)?; + let (op, relative_path) = storage.create(remote_path)?; + + // Get file metadata to know the size + let meta = op.stat(relative_path).await?; + let file_size = meta.content_length(); + + // Create local file for writing + let mut local_file = tokio::fs::File::create(local_path).await?; + + // Stream data from remote to local file in chunks + // opendal::Reader::read accepts a range, so we read in chunks + const CHUNK_SIZE: u64 = 64 * 1024; // 64KB chunks for efficient streaming + let mut offset = 0u64; + + while offset < file_size { + let end = std::cmp::min(offset + CHUNK_SIZE, file_size); + let range = offset..end; + + // Read chunk from remote storage + let chunk = op.read_with(relative_path).range(range.clone()).await?; + let bytes = chunk.to_bytes(); + + // Write chunk to local file + local_file.write_all(&bytes).await?; + + offset = end; + } + + // Ensure all data is flushed to disk + local_file.sync_all().await?; + + Ok(local_path.to_path_buf()) + } +} + +/// Pending fetch that waits for remote log file to be downloaded +pub struct RemotePendingFetch { + segment: RemoteLogSegment, + download_future: RemoteLogDownloadFuture, + pos_in_log_segment: i32, + #[allow(dead_code)] + fetch_offset: i64, + #[allow(dead_code)] + high_watermark: i64, + read_context: ReadContext, +} + +impl RemotePendingFetch { + pub fn new( + segment: RemoteLogSegment, + download_future: RemoteLogDownloadFuture, + pos_in_log_segment: i32, + fetch_offset: i64, + high_watermark: i64, + read_context: ReadContext, + ) -> Self { + Self { + segment, + download_future, + pos_in_log_segment, + fetch_offset, + high_watermark, + read_context, + } + } + + /// Convert to completed fetch by reading the downloaded file + pub async fn convert_to_completed_fetch( + mut self, + ) -> Result>> { + let file_path = self.download_future.get_file_path().await?; + let file_data = tokio::fs::read(&file_path).await?; + + // Slice the data if needed + let data = if self.pos_in_log_segment > 0 { + &file_data[self.pos_in_log_segment as usize..] + } else { + &file_data + }; + + // delete the downloaded local file to free disk + delete_file(file_path).await; + + // Parse log records + let mut fetch_records = vec![]; + for log_record in &mut LogRecordsBatchs::new(data) { + fetch_records.extend(log_record.records(&self.read_context)?); + } + + let mut result = HashMap::new(); + result.insert(self.segment.table_bucket.clone(), fetch_records); + Ok(result) + } +} diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 13372ef..2b4b6ec 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -29,6 +29,11 @@ use std::collections::HashMap; use std::slice::from_ref; use std::sync::Arc; use std::time::Duration; +use tempfile::TempDir; + +use crate::client::table::remote_log::{ + RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch, +}; const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024; #[allow(dead_code)] @@ -120,7 +125,7 @@ impl<'a> TableScan<'a> { Ok(self) } - pub fn create_log_scanner(self) -> LogScanner { + pub fn create_log_scanner(self) -> Result { LogScanner::new( &self.table_info, self.metadata.clone(), @@ -144,9 +149,9 @@ impl LogScanner { metadata: Arc, connections: Arc, projected_fields: Option>, - ) -> Self { + ) -> Result { let log_scanner_status = Arc::new(LogScannerStatus::new()); - Self { + Ok(Self { table_path: table_info.table_path.clone(), table_id: table_info.table_id, metadata: metadata.clone(), @@ -157,8 +162,8 @@ impl LogScanner { metadata.clone(), log_scanner_status.clone(), projected_fields, - ), - } + )?, + }) } pub async fn poll(&self, _timeout: Duration) -> Result { @@ -188,6 +193,7 @@ struct LogFetcher { metadata: Arc, log_scanner_status: Arc, read_context: ReadContext, + remote_log_downloader: Arc, } impl LogFetcher { @@ -197,17 +203,21 @@ impl LogFetcher { metadata: Arc, log_scanner_status: Arc, projected_fields: Option>, - ) -> Self { + ) -> Result { let full_arrow_schema = to_arrow_schema(table_info.get_row_type()); - let read_context = Self::create_read_context(full_arrow_schema, projected_fields); - LogFetcher { + let read_context = Self::create_read_context(full_arrow_schema, projected_fields.clone()); + + let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?; + + Ok(LogFetcher { table_path: table_info.table_path.clone(), conns, table_info, metadata, log_scanner_status, read_context, - } + remote_log_downloader: Arc::new(RemoteLogDownloader::new(tmp_dir)?), + }) } fn create_read_context( @@ -239,10 +249,66 @@ impl LogFetcher { let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; for fetch_log_for_bucket in fetch_log_for_buckets { - let mut fetch_records = vec![]; let bucket: i32 = fetch_log_for_bucket.bucket_id; let table_bucket = TableBucket::new(table_id, bucket); - if fetch_log_for_bucket.records.is_some() { + + // Check if this is a remote log fetch + if let Some(ref remote_log_fetch_info) = + fetch_log_for_bucket.remote_log_fetch_info + { + let remote_fetch_info = RemoteLogFetchInfo::from_proto( + remote_log_fetch_info, + table_bucket.clone(), + )?; + + if let Some(fetch_offset) = + self.log_scanner_status.get_bucket_offset(&table_bucket) + { + let high_watermark = fetch_log_for_bucket.high_watermark.unwrap_or(-1); + // Download and process remote log segments + let mut pos_in_log_segment = remote_fetch_info.first_start_pos; + let mut current_fetch_offset = fetch_offset; + // todo: make segment download parallelly + for (i, segment) in + remote_fetch_info.remote_log_segments.iter().enumerate() + { + if i > 0 { + pos_in_log_segment = 0; + current_fetch_offset = segment.start_offset; + } + + let download_future = + self.remote_log_downloader.request_remote_log( + &remote_fetch_info.remote_log_tablet_dir, + segment, + )?; + let pending_fetch = RemotePendingFetch::new( + segment.clone(), + download_future, + pos_in_log_segment, + current_fetch_offset, + high_watermark, + self.read_context.clone(), + ); + let remote_records = + pending_fetch.convert_to_completed_fetch().await?; + // Update offset and merge results + for (tb, records) in remote_records { + if let Some(last_record) = records.last() { + self.log_scanner_status + .update_offset(&tb, last_record.offset() + 1); + } + result.entry(tb).or_default().extend(records); + } + } + } else { + // if the offset is null, it means the bucket has been unsubscribed, + // skip processing and continue to the next bucket. + continue; + } + } else if fetch_log_for_bucket.records.is_some() { + // Handle regular in-memory records + let mut fetch_records = vec![]; let data = fetch_log_for_bucket.records.unwrap(); for log_record in &mut LogRecordsBatchs::new(&data) { let last_offset = log_record.last_log_offset(); @@ -250,8 +316,8 @@ impl LogFetcher { self.log_scanner_status .update_offset(&table_bucket, last_offset + 1); } + result.insert(table_bucket, fetch_records); } - result.insert(table_bucket, fetch_records); } } } diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs index 58b88a4..b1d5d13 100644 --- a/crates/fluss/src/error.rs +++ b/crates/fluss/src/error.rs @@ -47,4 +47,16 @@ pub enum Error { #[error("Illegal argument error: {0}")] IllegalArgument(String), + + #[error("IO not supported error: {0}")] + IoUnsupported(String), + + #[error("IO operation failed on underlying storage: {0}")] + IoUnexpected(Box), +} + +impl From for Error { + fn from(err: opendal::Error) -> Self { + Error::IoUnexpected(Box::new(err)) + } } diff --git a/crates/fluss/src/io/file_io.rs b/crates/fluss/src/io/file_io.rs new file mode 100644 index 0000000..69a4c97 --- /dev/null +++ b/crates/fluss/src/io/file_io.rs @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::error::*; +use std::collections::HashMap; +use std::ops::Range; +use std::sync::Arc; + +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use opendal::Operator; + +use url::Url; + +use super::Storage; + +use crate::error::Result; + +#[derive(Clone, Debug)] +pub struct FileIO { + storage: Arc, +} + +impl FileIO { + /// Try to infer file io scheme from path. + pub fn from_url(path: &str) -> Result { + let url = + Url::parse(path).map_err(|_| Error::IllegalArgument(format!("Invalid URL: {path}")))?; + Ok(FileIOBuilder::new(url.scheme())) + } + + /// Create a new input file to read data. + pub fn new_input(&self, path: &str) -> Result { + let (op, relative_path) = self.storage.create(path)?; + let path = path.to_string(); + let relative_path_pos = path.len() - relative_path.len(); + Ok(InputFile { + op, + path, + relative_path_pos, + }) + } +} + +#[derive(Debug)] +pub struct FileIOBuilder { + scheme_str: Option, + props: HashMap, +} + +impl FileIOBuilder { + pub fn new(scheme_str: impl ToString) -> Self { + Self { + scheme_str: Some(scheme_str.to_string()), + props: HashMap::default(), + } + } + + pub(crate) fn into_parts(self) -> (String, HashMap) { + (self.scheme_str.unwrap_or_default(), self.props) + } + + pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self { + self.props.insert(key.to_string(), value.to_string()); + self + } + + pub fn with_props( + mut self, + args: impl IntoIterator, + ) -> Self { + self.props + .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string()))); + self + } + + pub fn build(self) -> Result { + let storage = Storage::build(self)?; + Ok(FileIO { + storage: Arc::new(storage), + }) + } +} + +#[async_trait::async_trait] +pub trait FileRead: Send + Unpin + 'static { + async fn read(&self, range: Range) -> Result; +} + +#[async_trait::async_trait] +impl FileRead for opendal::Reader { + async fn read(&self, range: Range) -> Result { + Ok(opendal::Reader::read(self, range).await?.to_bytes()) + } +} + +#[derive(Debug)] +pub struct InputFile { + op: Operator, + path: String, + relative_path_pos: usize, +} + +impl InputFile { + pub fn location(&self) -> &str { + &self.path + } + + pub async fn exists(&self) -> Result { + Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?) + } + + pub async fn metadata(&self) -> Result { + let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?; + + Ok(FileStatus { + size: meta.content_length(), + is_dir: meta.is_dir(), + path: self.path.clone(), + last_modified: meta.last_modified(), + }) + } + + pub async fn read(&self) -> Result { + Ok(self + .op + .read(&self.path[self.relative_path_pos..]) + .await? + .to_bytes()) + } + + pub async fn reader(&self) -> Result { + Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?) + } +} + +#[derive(Clone, Debug)] +pub struct FileStatus { + pub size: u64, + pub is_dir: bool, + pub path: String, + pub last_modified: Option>, +} diff --git a/crates/fluss/src/io/mod.rs b/crates/fluss/src/io/mod.rs new file mode 100644 index 0000000..3c9a165 --- /dev/null +++ b/crates/fluss/src/io/mod.rs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod file_io; + +pub use file_io::*; + +mod storage; +pub use storage::*; + +#[cfg(feature = "storage-fs")] +mod storage_fs; +#[cfg(feature = "storage-fs")] +use storage_fs::*; +#[cfg(feature = "storage-memory")] +mod storage_memory; + +#[cfg(feature = "storage-memory")] +use storage_memory::*; diff --git a/crates/fluss/src/io/storage.rs b/crates/fluss/src/io/storage.rs new file mode 100644 index 0000000..361da7e --- /dev/null +++ b/crates/fluss/src/io/storage.rs @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::error; +use crate::error::Result; +use crate::io::FileIOBuilder; +use opendal::{Operator, Scheme}; + +/// The storage carries all supported storage services in fluss +#[derive(Debug)] +pub enum Storage { + #[cfg(feature = "storage-memory")] + Memory, + #[cfg(feature = "storage-fs")] + LocalFs, +} + +impl Storage { + pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result { + let (scheme_str, _) = file_io_builder.into_parts(); + let scheme = Self::parse_scheme(&scheme_str)?; + + match scheme { + #[cfg(feature = "storage-memory")] + Scheme::Memory => Ok(Self::Memory), + #[cfg(feature = "storage-fs")] + Scheme::Fs => Ok(Self::LocalFs), + _ => Err(error::Error::IoUnsupported( + "Unsupported storage feature".to_string(), + )), + } + } + + pub(crate) fn create<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { + match self { + #[cfg(feature = "storage-memory")] + Storage::Memory => { + let op = super::memory_config_build()?; + + if let Some(stripped) = path.strip_prefix("memory:/") { + Ok((op, stripped)) + } else { + Ok((op, &path[1..])) + } + } + #[cfg(feature = "storage-fs")] + Storage::LocalFs => { + let op = super::fs_config_build()?; + if let Some(stripped) = path.strip_prefix("file:/") { + Ok((op, stripped)) + } else { + Ok((op, &path[1..])) + } + } + } + } + + fn parse_scheme(scheme: &str) -> Result { + match scheme { + "memory" => Ok(Scheme::Memory), + "file" | "" => Ok(Scheme::Fs), + s => Ok(s.parse::()?), + } + } +} diff --git a/crates/fluss/src/io/storage_fs.rs b/crates/fluss/src/io/storage_fs.rs new file mode 100644 index 0000000..95ca6fa --- /dev/null +++ b/crates/fluss/src/io/storage_fs.rs @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use opendal::Operator; +use opendal::services::FsConfig; + +use crate::error::Result; + +/// Build new opendal operator from give path. +pub(crate) fn fs_config_build() -> Result { + let mut cfg = FsConfig::default(); + cfg.root = Some("/".to_string()); + + Ok(Operator::from_config(cfg)?.finish()) +} diff --git a/crates/fluss/src/io/storage_memory.rs b/crates/fluss/src/io/storage_memory.rs new file mode 100644 index 0000000..af73a90 --- /dev/null +++ b/crates/fluss/src/io/storage_memory.rs @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::error::Result; +use opendal::Operator; +use opendal::services::MemoryConfig; + +pub(crate) fn memory_config_build() -> Result { + Ok(Operator::from_config(MemoryConfig::default())?.finish()) +} diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs index e63b5ed..366edfc 100644 --- a/crates/fluss/src/lib.rs +++ b/crates/fluss/src/lib.rs @@ -26,6 +26,7 @@ mod cluster; pub mod config; pub mod error; +pub mod io; mod util; pub type TableId = u64; diff --git a/crates/fluss/src/metadata/datatype.rs b/crates/fluss/src/metadata/datatype.rs index 4deed2b..8ad4f7e 100644 --- a/crates/fluss/src/metadata/datatype.rs +++ b/crates/fluss/src/metadata/datatype.rs @@ -96,25 +96,25 @@ impl DataType { impl Display for DataType { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - DataType::Boolean(v) => write!(f, "{}", v), - DataType::TinyInt(v) => write!(f, "{}", v), - DataType::SmallInt(v) => write!(f, "{}", v), - DataType::Int(v) => write!(f, "{}", v), - DataType::BigInt(v) => write!(f, "{}", v), - DataType::Float(v) => write!(f, "{}", v), - DataType::Double(v) => write!(f, "{}", v), - DataType::Char(v) => write!(f, "{}", v), - DataType::String(v) => write!(f, "{}", v), - DataType::Decimal(v) => write!(f, "{}", v), - DataType::Date(v) => write!(f, "{}", v), - DataType::Time(v) => write!(f, "{}", v), - DataType::Timestamp(v) => write!(f, "{}", v), - DataType::TimestampLTz(v) => write!(f, "{}", v), - DataType::Bytes(v) => write!(f, "{}", v), - DataType::Binary(v) => write!(f, "{}", v), - DataType::Array(v) => write!(f, "{}", v), - DataType::Map(v) => write!(f, "{}", v), - DataType::Row(v) => write!(f, "{}", v), + DataType::Boolean(v) => write!(f, "{v}"), + DataType::TinyInt(v) => write!(f, "{v}"), + DataType::SmallInt(v) => write!(f, "{v}"), + DataType::Int(v) => write!(f, "{v}"), + DataType::BigInt(v) => write!(f, "{v}"), + DataType::Float(v) => write!(f, "{v}"), + DataType::Double(v) => write!(f, "{v}"), + DataType::Char(v) => write!(f, "{v}"), + DataType::String(v) => write!(f, "{v}"), + DataType::Decimal(v) => write!(f, "{v}"), + DataType::Date(v) => write!(f, "{v}"), + DataType::Time(v) => write!(f, "{v}"), + DataType::Timestamp(v) => write!(f, "{v}"), + DataType::TimestampLTz(v) => write!(f, "{v}"), + DataType::Bytes(v) => write!(f, "{v}"), + DataType::Binary(v) => write!(f, "{v}"), + DataType::Array(v) => write!(f, "{v}"), + DataType::Map(v) => write!(f, "{v}"), + DataType::Row(v) => write!(f, "{v}"), } } } @@ -861,7 +861,7 @@ impl Display for RowType { if i > 0 { write!(f, ", ")?; } - write!(f, "{}", field)?; + write!(f, "{field}")?; } write!(f, ">")?; if !self.nullable { diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs index c26b4ec..f93abf9 100644 --- a/crates/fluss/src/util/mod.rs +++ b/crates/fluss/src/util/mod.rs @@ -19,8 +19,10 @@ use crate::metadata::TableBucket; use linked_hash_map::LinkedHashMap; use std::collections::{HashMap, HashSet}; use std::hash::Hash; +use std::path::PathBuf; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::warn; pub fn current_time_ms() -> i64 { SystemTime::now() @@ -29,6 +31,12 @@ pub fn current_time_ms() -> i64 { .as_millis() as i64 } +pub async fn delete_file(file_path: PathBuf) { + tokio::fs::remove_file(&file_path) + .await + .unwrap_or_else(|e| warn!("Could not delete file: {:?}, error: {:?}", &file_path, e)); +} + pub struct FairBucketStatusMap { map: LinkedHashMap>, size: usize, diff --git a/crates/fluss/tests/integration/fluss_cluster.rs b/crates/fluss/tests/integration/fluss_cluster.rs index e827e14..21422df 100644 --- a/crates/fluss/tests/integration/fluss_cluster.rs +++ b/crates/fluss/tests/integration/fluss_cluster.rs @@ -32,12 +32,27 @@ pub struct FlussTestingClusterBuilder { network: &'static str, cluster_conf: HashMap, testing_name: String, + remote_data_dir: Option, } impl FlussTestingClusterBuilder { pub fn new(testing_name: impl Into) -> Self { + Self::new_with_cluster_conf(testing_name.into(), &HashMap::default()) + } + + pub fn with_remote_data_dir(mut self, dir: std::path::PathBuf) -> Self { + // Ensure the directory exists before mounting + std::fs::create_dir_all(&dir).expect("Failed to create remote data directory"); + self.remote_data_dir = Some(dir); + self + } + + pub fn new_with_cluster_conf( + testing_name: impl Into, + conf: &HashMap, + ) -> Self { // reduce testing resources - let mut cluster_conf = HashMap::new(); + let mut cluster_conf = conf.clone(); cluster_conf.insert( "netty.server.num-network-threads".to_string(), "1".to_string(), @@ -52,6 +67,7 @@ impl FlussTestingClusterBuilder { cluster_conf, network: "fluss-cluster-network", testing_name: testing_name.into(), + remote_data_dir: None, } } @@ -92,6 +108,7 @@ impl FlussTestingClusterBuilder { coordinator_server, tablet_servers, bootstrap_servers: "127.0.0.1:9123".to_string(), + remote_data_dir: self.remote_data_dir.clone(), } } @@ -147,7 +164,15 @@ impl FlussTestingClusterBuilder { tablet_server_confs.insert("internal.listener.name", "INTERNAL".to_string()); tablet_server_confs.insert("tablet-server.id", tablet_server_id); - GenericImage::new("fluss/fluss", FLUSS_VERSION) + // Set remote.data.dir to use the same path as host when volume mount is provided + // This ensures the path is consistent between host and container + if let Some(remote_data_dir) = &self.remote_data_dir { + tablet_server_confs.insert( + "remote.data.dir", + remote_data_dir.to_string_lossy().to_string(), + ); + } + let mut image = GenericImage::new("fluss/fluss", FLUSS_VERSION) .with_cmd(vec!["tabletServer"]) .with_mapped_port(expose_host_port as u16, ContainerPort::Tcp(9123)) .with_network(self.network) @@ -155,10 +180,20 @@ impl FlussTestingClusterBuilder { .with_env_var( "FLUSS_PROPERTIES", self.to_fluss_properties_with(tablet_server_confs), - ) - .start() - .await - .unwrap() + ); + + // Add volume mount if remote_data_dir is provided + if let Some(ref remote_data_dir) = self.remote_data_dir { + use testcontainers::core::Mount; + // Ensure directory exists before mounting (double check) + std::fs::create_dir_all(remote_data_dir) + .expect("Failed to create remote data directory for mount"); + let host_path = remote_data_dir.to_string_lossy().to_string(); + let container_path = remote_data_dir.to_string_lossy().to_string(); + image = image.with_mount(Mount::bind_mount(host_path, container_path)); + } + + image.start().await.unwrap() } fn to_fluss_properties_with(&self, extra_properties: HashMap<&str, String>) -> String { @@ -180,6 +215,7 @@ pub struct FlussTestingCluster { coordinator_server: Arc>, tablet_servers: HashMap>>, bootstrap_servers: String, + remote_data_dir: Option, } impl FlussTestingCluster { @@ -189,6 +225,18 @@ impl FlussTestingCluster { } self.coordinator_server.stop().await.unwrap(); self.zookeeper.stop().await.unwrap(); + if let Some(remote_data_dir) = &self.remote_data_dir { + // Try to clean up the remote data directory, but don't fail if it can't be deleted. + // This can happen in CI environments or if Docker containers are still using the directory. + // The directory will be cleaned up by the CI system or OS eventually. + if let Err(e) = tokio::fs::remove_dir_all(remote_data_dir).await { + eprintln!( + "Warning: Failed to delete remote data directory: {:?}, error: {:?}. \ + This is non-fatal and the directory may be cleaned up later.", + remote_data_dir, e + ); + } + } } pub async fn get_fluss_connection(&self) -> FlussConnection { diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index e14b852..b23fd79 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -16,11 +16,11 @@ * limitations under the License. */ -use crate::integration::fluss_cluster::FlussTestingCluster; use once_cell::sync::Lazy; use parking_lot::RwLock; use std::sync::Arc; +use crate::integration::fluss_cluster::FlussTestingCluster; #[cfg(test)] use test_env_helpers::*; @@ -39,12 +39,11 @@ mod table_test { use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath}; use fluss::row::InternalRow; use std::sync::Arc; - use std::sync::atomic::AtomicUsize; use std::thread; fn before_all() { // Create a new tokio runtime in a separate thread let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); - std::thread::spawn(move || { + thread::spawn(move || { let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); rt.block_on(async { let cluster = FlussTestingClusterBuilder::new("test_table").build().await; @@ -71,7 +70,7 @@ mod table_test { fn after_all() { // Create a new tokio runtime in a separate thread let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); - std::thread::spawn(move || { + thread::spawn(move || { let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); rt.block_on(async { let mut guard = cluster_guard.write(); @@ -137,7 +136,10 @@ mod table_test { append_writer.flush().await.expect("Failed to flush"); let num_buckets = table.table_info().get_num_buckets(); - let log_scanner = table.new_scan().create_log_scanner(); + let log_scanner = table + .new_scan() + .create_log_scanner() + .expect("Failed to create log scanner"); for bucket_id in 0..num_buckets { log_scanner .subscribe(bucket_id, 0) @@ -166,7 +168,8 @@ mod table_test { .new_scan() .project(&[1, 0]) .expect("Failed to project") - .create_log_scanner(); + .create_log_scanner() + .expect("Failed to create log scanner"); for bucket_id in 0..num_buckets { log_scanner_projected .subscribe(bucket_id, 0) @@ -212,7 +215,9 @@ mod table_test { .expect("Failed to get table"); let table_scan = table.new_scan(); - let log_scanner = table_scan.create_log_scanner(); + let log_scanner = table_scan + .create_log_scanner() + .expect("Failed to create log scanner"); // Subscribe to bucket 0 starting from offset 0 log_scanner diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs new file mode 100644 index 0000000..f33d440 --- /dev/null +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::integration::fluss_cluster::FlussTestingCluster; +use once_cell::sync::Lazy; +use parking_lot::RwLock; +use std::sync::Arc; + +#[cfg(test)] +use test_env_helpers::*; + +// Module-level shared cluster instance (only for this test file) +static SHARED_FLUSS_CLUSTER: Lazy>>> = + Lazy::new(|| Arc::new(RwLock::new(None))); + +#[cfg(test)] +#[before_all] +#[after_all] +mod table_remote_scan_test { + use super::SHARED_FLUSS_CLUSTER; + use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; + use crate::integration::utils::create_table; + use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; + use fluss::row::{GenericRow, InternalRow}; + use std::collections::HashMap; + use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + use std::thread; + use std::thread::sleep; + use std::time::Duration; + use uuid::Uuid; + + fn before_all() { + // Create a new tokio runtime in a separate thread + let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); + thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); + rt.block_on(async { + // Create a temporary directory for remote data that can be accessed from both + // container and host. Use a fixed path so it's the same in container and host. + // On macOS, Docker Desktop may have issues with /tmp, so we use a path in the + // current working directory or user's home directory which Docker can access. + let temp_dir = std::env::current_dir() + .unwrap_or_else(|_| std::path::PathBuf::from(".")) + .join("target") + .join(format!("test-remote-data-{}", Uuid::new_v4())); + + // Remove existing directory if it exists to start fresh + let _ = std::fs::remove_dir_all(&temp_dir); + std::fs::create_dir_all(&temp_dir) + .expect("Failed to create temporary directory for remote data"); + println!("temp_dir: {:?}", temp_dir); + + // Verify directory was created and is accessible + if !temp_dir.exists() { + panic!("Remote data directory was not created: {:?}", temp_dir); + } + + // Get absolute path for Docker mount + let temp_dir = temp_dir + .canonicalize() + .expect("Failed to canonicalize remote data directory path"); + + let mut cluster_conf = HashMap::new(); + // set to a small size to make data can be tiered to remote + cluster_conf.insert("log.segment.file-size".to_string(), "120b".to_string()); + cluster_conf.insert( + "remote.log.task-interval-duration".to_string(), + "1s".to_string(), + ); + // remote.data.dir uses the same path in container and host + cluster_conf.insert( + "remote.data.dir".to_string(), + temp_dir.to_string_lossy().to_string(), + ); + + let cluster = + FlussTestingClusterBuilder::new_with_cluster_conf("test_table", &cluster_conf) + .with_remote_data_dir(temp_dir) + .build() + .await; + let mut guard = cluster_guard.write(); + *guard = Some(cluster); + }); + }) + .join() + .expect("Failed to create cluster"); + + // wait for 20 seconds to avoid the error like + // CoordinatorEventProcessor is not initialized yet + sleep(Duration::from_secs(20)); + } + + fn after_all() { + // Create a new tokio runtime in a separate thread + let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); + thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); + rt.block_on(async { + let mut guard = cluster_guard.write(); + if let Some(cluster) = guard.take() { + cluster.stop().await; + } + }); + }) + .join() + .expect("Failed to cleanup cluster"); + } + + #[tokio::test] + async fn test_scan_remote_log() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new( + "fluss".to_string(), + "test_append_record_batch_and_scan".to_string(), + ); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("c1", DataTypes::int()) + .column("c2", DataTypes::string()) + .build() + .expect("Failed to build schema"), + ) + .property("table.log.arrow.compression.type", "NONE") + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let append_writer = table + .new_append() + .expect("Failed to create append") + .create_writer(); + + // append 20 rows, there must be some tiered to remote + let record_count = 20; + for i in 0..record_count { + let mut row = GenericRow::new(); + row.set_field(0, i as i32); + let v = format!("v{}", i); + row.set_field(1, v.as_str()); + append_writer + .append(row) + .await + .expect("Failed to append row"); + } + + // Create a log scanner and subscribe to all buckets to read appended records + let num_buckets = table.table_info().get_num_buckets(); + let log_scanner = table + .new_scan() + .create_log_scanner() + .expect("Failed to create log scanner"); + for bucket_id in 0..num_buckets { + log_scanner + .subscribe(bucket_id, 0) + .await + .expect("Failed to subscribe"); + } + + let mut records = Vec::with_capacity(record_count); + let start = std::time::Instant::now(); + const MAX_WAIT_DURATION: Duration = Duration::from_secs(30); + while records.len() < record_count { + if start.elapsed() > MAX_WAIT_DURATION { + panic!( + "Timed out waiting for {} records; only got {} after {:?}", + record_count, + records.len(), + start.elapsed() + ); + } + let scan_records = log_scanner + .poll(Duration::from_secs(1)) + .await + .expect("Failed to poll log scanner"); + records.extend(scan_records); + } + + // then, check the data + for (i, record) in records.iter().enumerate() { + let row = record.row(); + let expected_c1 = i as i32; + let expected_c2 = format!("v{}", i); + assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}", i); + assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index {}", i); + } + } + + fn get_fluss_cluster() -> Arc { + let cluster_guard = SHARED_FLUSS_CLUSTER.read(); + if cluster_guard.is_none() { + panic!("Fluss cluster not initialized. Make sure before_all() was called."); + } + Arc::new(cluster_guard.as_ref().unwrap().clone()) + } +} diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs index a15ca23..65111af 100644 --- a/crates/fluss/tests/test_fluss.rs +++ b/crates/fluss/tests/test_fluss.rs @@ -25,4 +25,6 @@ mod integration { mod table; mod utils; + + mod table_remote_scan; }