diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index 4547b9c..ded289a 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -58,6 +58,7 @@ url = "2.5.7" async-trait = "0.1.89" uuid = { version = "1.10", features = ["v4"] } tempfile= "3.23.0" +snafu = "0.8.3" [dev-dependencies] testcontainers = "0.25.0" diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index fefab43..e185af8 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -29,7 +29,7 @@ use crate::rpc::message::{ListOffsetsRequest, OffsetSpec}; use crate::rpc::{RpcClient, ServerConnection}; use crate::BucketId; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::proto::GetTableInfoResponse; use std::collections::HashMap; use std::slice::from_ref; @@ -245,10 +245,10 @@ impl FlussAdmin { let mut results = HashMap::new(); for response_future in response_futures { - let offsets = response_future.await.map_err( - // todo: consider use suitable error - |e| crate::error::Error::WriteError(format!("Fail to get result: {e}")), - )?; + let offsets = response_future.await.map_err(|e| Error::UnexpectedError { + message: "Fail to get result for list offsets.".to_string(), + source: Some(Box::new(e)), + })?; results.extend(offsets?); } Ok(results) @@ -267,10 +267,11 @@ impl FlussAdmin { for bucket_id in buckets { let table_bucket = TableBucket::new(table_id, *bucket_id); let leader = cluster.leader_for(&table_bucket).ok_or_else(|| { - // todo: consider use another suitable error - crate::error::Error::InvalidTableError(format!( - "No leader found for table bucket: table_id={table_id}, bucket_id={bucket_id}" - )) + // todo: consider retry? + Error::UnexpectedError { + message: format!("No leader found for table bucket: {table_bucket}."), + source: None, + } })?; node_for_bucket_list @@ -301,10 +302,11 @@ impl FlussAdmin { let task = tokio::spawn(async move { let cluster = metadata.get_cluster(); let tablet_server = cluster.get_tablet_server(leader_id).ok_or_else(|| { - // todo: consider use more suitable error - crate::error::Error::InvalidTableError(format!( - "Tablet server {leader_id} not found" - )) + Error::LeaderNotAvailable { + message: format!( + "Tablet server {leader_id} is not found in metadata cache." + ), + } })?; let connection = rpc_client.get_connection(tablet_server).await?; let list_offsets_response = connection.request(request).await?; diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 65805d0..3503690 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -21,7 +21,6 @@ use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord}; use crate::util::delete_file; use std::collections::HashMap; -use std::io; use std::path::{Path, PathBuf}; use tempfile::TempDir; use tokio::io::AsyncWriteExt; @@ -99,15 +98,14 @@ impl RemoteLogDownloadFuture { /// Get the downloaded file path pub async fn get_file_path(&mut self) -> Result { - let receiver = self - .receiver - .take() - .ok_or_else(|| Error::Io(io::Error::other("Download future already consumed")))?; - - receiver.await.map_err(|e| { - Error::Io(io::Error::other(format!( - "Download future cancelled: {e:?}" - ))) + let receiver = self.receiver.take().ok_or_else(|| Error::UnexpectedError { + message: "Downloaded file already consumed".to_string(), + source: None, + })?; + + receiver.await.map_err(|e| Error::UnexpectedError { + message: format!("Download future cancelled: {e:?}"), + source: None, })? } } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index f6780d7..ca3417d 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -73,18 +73,20 @@ impl<'a> TableScan<'a> { /// ``` pub fn project(mut self, column_indices: &[usize]) -> Result { if column_indices.is_empty() { - return Err(Error::IllegalArgument( - "Column indices cannot be empty".to_string(), - )); + return Err(Error::IllegalArgument { + message: "Column indices cannot be empty".to_string(), + }); } let field_count = self.table_info.row_type().fields().len(); for &idx in column_indices { if idx >= field_count { - return Err(Error::IllegalArgument(format!( - "Column index {} out of range (max: {})", - idx, - field_count - 1 - ))); + return Err(Error::IllegalArgument { + message: format!( + "Column index {} out of range (max: {})", + idx, + field_count - 1 + ), + }); } } self.projected_fields = Some(column_indices.to_vec()); @@ -105,9 +107,9 @@ impl<'a> TableScan<'a> { /// ``` pub fn project_by_name(mut self, column_names: &[&str]) -> Result { if column_names.is_empty() { - return Err(Error::IllegalArgument( - "Column names cannot be empty".to_string(), - )); + return Err(Error::IllegalArgument { + message: "Column names cannot be empty".to_string(), + }); } let row_type = self.table_info.row_type(); let mut indices = Vec::new(); @@ -117,7 +119,9 @@ impl<'a> TableScan<'a> { .fields() .iter() .position(|f| f.name() == *name) - .ok_or_else(|| Error::IllegalArgument(format!("Column '{name}' not found")))?; + .ok_or_else(|| Error::IllegalArgument { + message: format!("Column '{name}' not found"), + })?; indices.push(idx); } @@ -268,7 +272,7 @@ impl LogFetcher { // Download and process remote log segments let mut pos_in_log_segment = remote_fetch_info.first_start_pos; let mut current_fetch_offset = fetch_offset; - // todo: make segment download parallelly + // todo: make segment download in parallel for (i, segment) in remote_fetch_info.remote_log_segments.iter().enumerate() { diff --git a/crates/fluss/src/client/write/mod.rs b/crates/fluss/src/client/write/mod.rs index e632cde..cd33586 100644 --- a/crates/fluss/src/client/write/mod.rs +++ b/crates/fluss/src/client/write/mod.rs @@ -74,11 +74,17 @@ impl ResultHandle { self.receiver .receive() .await - .map_err(|e| Error::WriteError(e.to_string())) + .map_err(|e| Error::UnexpectedError { + message: format!("Fail to wait write result {e:?}"), + source: None, + }) } pub fn result(&self, batch_result: BatchWriteResult) -> Result<(), Error> { // do nothing, just return empty result - batch_result.map_err(|e| Error::WriteError(e.to_string())) + batch_result.map_err(|e| Error::UnexpectedError { + message: format!("Fail to get write result {e:?}"), + source: None, + }) } } diff --git a/crates/fluss/src/client/write/sender.rs b/crates/fluss/src/client/write/sender.rs index 27460e3..462a846 100644 --- a/crates/fluss/src/client/write/sender.rs +++ b/crates/fluss/src/client/write/sender.rs @@ -17,7 +17,7 @@ use crate::client::metadata::Metadata; use crate::client::{ReadyWriteBatch, RecordAccumulator}; -use crate::error::Error::WriteError; +use crate::error::Error; use crate::error::Result; use crate::metadata::TableBucket; use crate::proto::ProduceLogResponse; @@ -150,9 +150,12 @@ impl Sender { let cluster = self.metadata.get_cluster(); - let destination_node = cluster - .get_tablet_server(destination) - .ok_or(WriteError(String::from("destination node not found")))?; + let destination_node = + cluster + .get_tablet_server(destination) + .ok_or(Error::LeaderNotAvailable { + message: format!("destination node not found in metadata cache {destination}."), + })?; let connection = self.metadata.get_connection(destination_node).await?; for (table_id, write_batches) in write_batch_by_table { diff --git a/crates/fluss/src/client/write/writer_client.rs b/crates/fluss/src/client/write/writer_client.rs index 28f5371..042859a 100644 --- a/crates/fluss/src/client/write/writer_client.rs +++ b/crates/fluss/src/client/write/writer_client.rs @@ -78,11 +78,12 @@ impl WriterClient { fn get_ack(config: &Config) -> Result { let acks = config.writer_acks.as_str(); - if acks.eq("all") { + if acks.eq_ignore_ascii_case("all") { Ok(-1) } else { - acks.parse::() - .map_err(|e| Error::IllegalArgument(e.to_string())) + acks.parse::().map_err(|e| Error::IllegalArgument { + message: format!("invalid writer ack '{acks}': {e}"), + }) } } @@ -133,11 +134,17 @@ impl WriterClient { self.shutdown_tx .send(()) .await - .map_err(|e| Error::WriteError(e.to_string()))?; + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to close write client: {e:?}"), + source: None, + })?; self.sender_join_handle .await - .map_err(|e| Error::WriteError(e.to_string()))?; + .map_err(|e| Error::UnexpectedError { + message: format!("Failed to close write client: {e:?}"), + source: None, + })?; Ok(()) } diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs index b1d5d13..b16e5ec 100644 --- a/crates/fluss/src/error.rs +++ b/crates/fluss/src/error.rs @@ -15,48 +15,137 @@ // specific language governing permissions and limitations // under the License. -use crate::rpc::RpcError; +pub use crate::rpc::RpcError; +pub use crate::rpc::{ApiError, FlussError}; + use arrow_schema::ArrowError; +use snafu::Snafu; use std::{io, result}; -use thiserror::Error; pub type Result = result::Result; -#[derive(Debug, Error)] +#[derive(Debug, Snafu)] pub enum Error { - #[error(transparent)] - Io(#[from] io::Error), + #[snafu( + whatever, + display("Fluss hitting unexpected error {}: {:?}", message, source) + )] + UnexpectedError { + message: String, + /// see https://github.com/shepmaster/snafu/issues/446 + #[snafu(source(from(Box, Some)))] + source: Option>, + }, + + #[snafu( + visibility(pub(crate)), + display("Fluss hitting unexpected io error {}: {:?}", message, source) + )] + IoUnexpectedError { message: String, source: io::Error }, + + #[snafu( + visibility(pub(crate)), + display( + "Fluss hitting remote storage unexpected error {}: {:?}", + message, + source + ) + )] + RemoteStorageUnexpectedError { + message: String, + source: opendal::Error, + }, + + #[snafu( + visibility(pub(crate)), + display("Fluss hitting invalid table error {}.", message) + )] + InvalidTableError { message: String }, - #[error("Invalid table")] - InvalidTableError(String), + #[snafu( + visibility(pub(crate)), + display("Fluss hitting json serde error {}.", message) + )] + JsonSerdeError { message: String }, - #[error("Json serde error")] - JsonSerdeError(String), + #[snafu( + visibility(pub(crate)), + display("Fluss hitting unexpected rpc error {}: {:?}", message, source) + )] + RpcError { message: String, source: RpcError }, - #[error("Rpc error")] - RpcError(#[from] RpcError), + #[snafu( + visibility(pub(crate)), + display("Fluss hitting row convert error {}.", message) + )] + RowConvertError { message: String }, - #[error("Row convert error")] - RowConvertError(String), + #[snafu( + visibility(pub(crate)), + display("Fluss hitting arrow error {}: {:?}.", message, source) + )] + ArrowError { message: String, source: ArrowError }, - #[error("arrow error")] - ArrowError(#[from] ArrowError), + #[snafu( + visibility(pub(crate)), + display("Fluss hitting illegal argument error {}.", message) + )] + IllegalArgument { message: String }, - #[error("Write error: {0}")] - WriteError(String), + #[snafu( + visibility(pub(crate)), + display("Fluss hitting IO not supported error {}.", message) + )] + IoUnsupported { message: String }, - #[error("Illegal argument error: {0}")] - IllegalArgument(String), + #[snafu( + visibility(pub(crate)), + display("Fluss hitting leader not available error {}.", message) + )] + LeaderNotAvailable { message: String }, - #[error("IO not supported error: {0}")] - IoUnsupported(String), + #[snafu(visibility(pub(crate)), display("Fluss API Error: {}.", api_error))] + FlussAPIError { api_error: ApiError }, +} - #[error("IO operation failed on underlying storage: {0}")] - IoUnexpected(Box), +impl From for Error { + fn from(value: ArrowError) -> Self { + Error::ArrowError { + message: format!("{value}"), + source: value, + } + } +} + +impl From for Error { + fn from(value: RpcError) -> Self { + Error::RpcError { + message: format!("{value}"), + source: value, + } + } +} + +impl From for Error { + fn from(value: io::Error) -> Self { + Error::IoUnexpectedError { + message: format!("{value}"), + source: value, + } + } } impl From for Error { - fn from(err: opendal::Error) -> Self { - Error::IoUnexpected(Box::new(err)) + fn from(value: opendal::Error) -> Self { + Error::RemoteStorageUnexpectedError { + message: format!("{value}"), + source: value, + } + } +} + +impl From for Error { + fn from(value: ApiError) -> Self { + Error::FlussAPIError { api_error: value } } } diff --git a/crates/fluss/src/io/file_io.rs b/crates/fluss/src/io/file_io.rs index 69a4c97..a699244 100644 --- a/crates/fluss/src/io/file_io.rs +++ b/crates/fluss/src/io/file_io.rs @@ -39,8 +39,9 @@ pub struct FileIO { impl FileIO { /// Try to infer file io scheme from path. pub fn from_url(path: &str) -> Result { - let url = - Url::parse(path).map_err(|_| Error::IllegalArgument(format!("Invalid URL: {path}")))?; + let url = Url::parse(path).map_err(|_| Error::IllegalArgument { + message: format!("Invalid URL: {path}"), + })?; Ok(FileIOBuilder::new(url.scheme())) } diff --git a/crates/fluss/src/io/storage.rs b/crates/fluss/src/io/storage.rs index 361da7e..31ef44b 100644 --- a/crates/fluss/src/io/storage.rs +++ b/crates/fluss/src/io/storage.rs @@ -39,9 +39,9 @@ impl Storage { Scheme::Memory => Ok(Self::Memory), #[cfg(feature = "storage-fs")] Scheme::Fs => Ok(Self::LocalFs), - _ => Err(error::Error::IoUnsupported( - "Unsupported storage feature".to_string(), - )), + _schema => Err(error::Error::IoUnsupported { + message: format!("unsupported storage feature '{scheme}'"), + }), } } diff --git a/crates/fluss/src/metadata/database.rs b/crates/fluss/src/metadata/database.rs index 8eaa4d3..fad1498 100644 --- a/crates/fluss/src/metadata/database.rs +++ b/crates/fluss/src/metadata/database.rs @@ -148,8 +148,8 @@ impl JsonSerde for DatabaseDescriptor { if let Some(comment_node) = node.get(Self::COMMENT_NAME) { let comment = comment_node .as_str() - .ok_or_else(|| { - JsonSerdeError(format!("{} should be a string", Self::COMMENT_NAME)) + .ok_or_else(|| JsonSerdeError { + message: format!("{} should be a string", Self::COMMENT_NAME), })? .to_owned(); builder = builder.comment(&comment); @@ -157,8 +157,8 @@ impl JsonSerde for DatabaseDescriptor { // Deserialize custom properties directly let custom_properties = if let Some(props_node) = node.get(Self::CUSTOM_PROPERTIES_NAME) { - let obj = props_node.as_object().ok_or_else(|| { - JsonSerdeError("Custom properties should be an object".to_string()) + let obj = props_node.as_object().ok_or_else(|| JsonSerdeError { + message: "Custom properties should be an object".to_string(), })?; let mut properties = HashMap::with_capacity(obj.len()); @@ -167,8 +167,8 @@ impl JsonSerde for DatabaseDescriptor { key.clone(), value .as_str() - .ok_or_else(|| { - JsonSerdeError("Property value should be a string".to_string()) + .ok_or_else(|| JsonSerdeError { + message: "Property value should be a string".to_string(), })? .to_owned(), ); @@ -186,16 +186,18 @@ impl JsonSerde for DatabaseDescriptor { impl DatabaseDescriptor { /// Create DatabaseDescriptor from JSON bytes (equivalent to Java's fromJsonBytes) pub fn from_json_bytes(bytes: &[u8]) -> Result { - let json_value: Value = serde_json::from_slice(bytes) - .map_err(|e| JsonSerdeError(format!("Failed to parse JSON: {e}")))?; + let json_value: Value = serde_json::from_slice(bytes).map_err(|e| JsonSerdeError { + message: format!("Failed to parse JSON: {e}"), + })?; Self::deserialize_json(&json_value) } /// Convert DatabaseDescriptor to JSON bytes pub fn to_json_bytes(&self) -> Result> { let json_value = self.serialize_json()?; - serde_json::to_vec(&json_value) - .map_err(|e| JsonSerdeError(format!("Failed to serialize to JSON: {e}"))) + serde_json::to_vec(&json_value).map_err(|e| JsonSerdeError { + message: format!("Failed to serialize to JSON: {e}"), + }) } } diff --git a/crates/fluss/src/metadata/json_serde.rs b/crates/fluss/src/metadata/json_serde.rs index 447b0f9..7d94e19 100644 --- a/crates/fluss/src/metadata/json_serde.rs +++ b/crates/fluss/src/metadata/json_serde.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::error::Error::{InvalidTableError, JsonSerdeError}; -use crate::error::Result; +use crate::error::Error::JsonSerdeError; +use crate::error::{Error, Result}; use crate::metadata::datatype::{DataField, DataType, DataTypes}; use crate::metadata::table::{Column, Schema, TableDescriptor}; use serde_json::{Value, json}; @@ -166,11 +166,11 @@ impl JsonSerde for DataType { let type_root = node .get(Self::FIELD_NAME_TYPE_NAME) .and_then(|v| v.as_str()) - .ok_or_else(|| { - JsonSerdeError(format!( + .ok_or_else(|| Error::JsonSerdeError { + message: format!( "Couldn't find field {} while deserializing datatype.", Self::FIELD_NAME_TYPE_NAME - )) + ), })?; let mut data_type = match type_root { @@ -185,11 +185,8 @@ impl JsonSerde for DataType { let length = node .get(Self::FIELD_NAME_LENGTH) .and_then(|v| v.as_u64()) - .ok_or_else(|| { - JsonSerdeError(format!( - "Missing required field: {}", - Self::FIELD_NAME_LENGTH - )) + .ok_or_else(|| Error::JsonSerdeError { + message: format!("Missing required field: {}", Self::FIELD_NAME_LENGTH), })? as u32; DataTypes::char(length) } @@ -198,11 +195,8 @@ impl JsonSerde for DataType { let precision = node .get(Self::FIELD_NAME_PRECISION) .and_then(|v| v.as_u64()) - .ok_or_else(|| { - JsonSerdeError(format!( - "Missing required field: {}", - Self::FIELD_NAME_PRECISION - )) + .ok_or_else(|| Error::JsonSerdeError { + message: format!("Missing required field: {}", Self::FIELD_NAME_PRECISION), })? as u32; let scale = node .get(Self::FIELD_NAME_SCALE) @@ -243,43 +237,46 @@ impl JsonSerde for DataType { "ARRAY" => { let element_type_node = node.get(Self::FIELD_NAME_ELEMENT_TYPE).ok_or_else(|| { - JsonSerdeError(format!( - "Missing required field: {}", - Self::FIELD_NAME_ELEMENT_TYPE - )) + Error::JsonSerdeError { + message: format!( + "Missing required field: {}", + Self::FIELD_NAME_ELEMENT_TYPE + ), + } })?; let element_type = DataType::deserialize_json(element_type_node)?; DataTypes::array(element_type) } "MAP" => { - let key_type_node = node.get(Self::FIELD_NAME_KEY_TYPE).ok_or_else(|| { - JsonSerdeError(format!( - "Missing required field: {}", - Self::FIELD_NAME_KEY_TYPE - )) - })?; + let key_type_node = + node.get(Self::FIELD_NAME_KEY_TYPE) + .ok_or_else(|| Error::JsonSerdeError { + message: format!( + "Missing required field: {}", + Self::FIELD_NAME_KEY_TYPE + ), + })?; let key_type = DataType::deserialize_json(key_type_node)?; - let value_type_node = node.get(Self::FIELD_NAME_VALUE_TYPE).ok_or_else(|| { - JsonSerdeError(format!( - "Missing required field: {}", - Self::FIELD_NAME_VALUE_TYPE - )) - })?; + let value_type_node = + node.get(Self::FIELD_NAME_VALUE_TYPE) + .ok_or_else(|| Error::JsonSerdeError { + message: format!( + "Missing required field: {}", + Self::FIELD_NAME_VALUE_TYPE + ), + })?; let value_type = DataType::deserialize_json(value_type_node)?; DataTypes::map(key_type, value_type) } "ROW" => { let fields_node = node .get(Self::FIELD_NAME_FIELDS) - .ok_or_else(|| { - JsonSerdeError(format!( - "Missing required field: {}", - Self::FIELD_NAME_FIELDS - )) + .ok_or_else(|| Error::JsonSerdeError { + message: format!("Missing required field: {}", Self::FIELD_NAME_FIELDS), })? .as_array() - .ok_or_else(|| { - JsonSerdeError(format!("{} must be an array", Self::FIELD_NAME_FIELDS)) + .ok_or_else(|| Error::JsonSerdeError { + message: format!("{} must be an array", Self::FIELD_NAME_FIELDS), })?; let mut fields = Vec::with_capacity(fields_node.len()); for field_node in fields_node { @@ -287,7 +284,11 @@ impl JsonSerde for DataType { } DataTypes::row(fields) } - _ => return Err(JsonSerdeError(format!("Unknown type root: {type_root}"))), + _ => { + return Err(Error::JsonSerdeError { + message: format!("Unknown type root: {type_root}"), + }); + } }; if let Some(nullable) = node.get(Self::FIELD_NAME_NULLABLE) { @@ -327,12 +328,16 @@ impl JsonSerde for DataField { let name = node .get(Self::NAME) .and_then(|v| v.as_str()) - .ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::NAME)))? + .ok_or_else(|| Error::JsonSerdeError { + message: format!("Missing required field: {}", Self::NAME), + })? .to_string(); - let field_type_node = node.get(Self::FIELD_TYPE).ok_or_else(|| { - JsonSerdeError(format!("Missing required field: {}", Self::FIELD_TYPE)) - })?; + let field_type_node = node + .get(Self::FIELD_TYPE) + .ok_or_else(|| Error::JsonSerdeError { + message: format!("Missing required field: {}", Self::FIELD_TYPE), + })?; let data_type = DataType::deserialize_json(field_type_node)?; @@ -373,12 +378,16 @@ impl JsonSerde for Column { let name = node .get(Self::NAME) .and_then(|v| v.as_str()) - .ok_or_else(|| JsonSerdeError(format!("Missing required field: {}", Self::NAME)))? + .ok_or_else(|| Error::JsonSerdeError { + message: format!("Missing required field: {}", Self::NAME), + })? .to_string(); - let data_type_node = node.get(Self::DATA_TYPE).ok_or_else(|| { - JsonSerdeError(format!("Missing required field: {}", Self::DATA_TYPE)) - })?; + let data_type_node = node + .get(Self::DATA_TYPE) + .ok_or_else(|| Error::JsonSerdeError { + message: format!("Missing required field: {}", Self::DATA_TYPE), + })?; let data_type = DataType::deserialize_json(data_type_node)?; @@ -429,11 +438,13 @@ impl JsonSerde for Schema { fn deserialize_json(node: &Value) -> Result { let columns_node = node .get(Self::COLUMNS_NAME) - .ok_or_else(|| { - JsonSerdeError(format!("Missing required field: {}", Self::COLUMNS_NAME)) + .ok_or_else(|| Error::JsonSerdeError { + message: format!("Missing required field: {}", Self::COLUMNS_NAME), })? .as_array() - .ok_or_else(|| JsonSerdeError(format!("{} must be an array", Self::COLUMNS_NAME)))?; + .ok_or_else(|| Error::JsonSerdeError { + message: format!("{} must be an array", Self::COLUMNS_NAME), + })?; let mut columns = Vec::with_capacity(columns_node.len()); for col_node in columns_node { @@ -443,17 +454,17 @@ impl JsonSerde for Schema { let mut schema_builder = Schema::builder().with_columns(columns); if let Some(pk_node) = node.get(Self::PRIMARY_KEY_NAME) { - let pk_array = pk_node - .as_array() - .ok_or_else(|| InvalidTableError("Primary key must be an array".to_string()))?; + let pk_array = pk_node.as_array().ok_or_else(|| Error::InvalidTableError { + message: "Primary key must be an array".to_string(), + })?; let mut primary_keys = Vec::with_capacity(pk_array.len()); for name_node in pk_array { primary_keys.push( name_node .as_str() - .ok_or_else(|| { - InvalidTableError("Primary key element must be a string".to_string()) + .ok_or_else(|| Error::InvalidTableError { + message: "Primary key element must be a string".to_string(), })? .to_string(), ); @@ -478,9 +489,9 @@ impl TableDescriptor { const VERSION: u32 = 1; fn deserialize_properties(node: &Value) -> Result> { - let obj = node - .as_object() - .ok_or_else(|| JsonSerdeError("Properties must be an object".to_string()))?; + let obj = node.as_object().ok_or_else(|| Error::JsonSerdeError { + message: "Properties must be an object".to_string(), + })?; let mut properties = HashMap::with_capacity(obj.len()); for (key, value) in obj { @@ -488,7 +499,9 @@ impl TableDescriptor { key.clone(), value .as_str() - .ok_or_else(|| JsonSerdeError("Property value must be a string".to_string()))? + .ok_or_else(|| Error::JsonSerdeError { + message: "Property value must be a string".to_string(), + })? .to_owned(), ); } @@ -545,8 +558,8 @@ impl JsonSerde for TableDescriptor { let mut builder = TableDescriptor::builder(); // Deserialize schema - let schema_node = node.get(Self::SCHEMA_NAME).ok_or_else(|| { - JsonSerdeError(format!("Missing required field: {}", Self::SCHEMA_NAME)) + let schema_node = node.get(Self::SCHEMA_NAME).ok_or_else(|| JsonSerdeError { + message: format!("Missing required field: {}", Self::SCHEMA_NAME), })?; let schema = Schema::deserialize_json(schema_node)?; builder = builder.schema(schema); @@ -555,22 +568,21 @@ impl JsonSerde for TableDescriptor { if let Some(comment_node) = node.get(Self::COMMENT_NAME) { let comment = comment_node .as_str() - .ok_or_else(|| JsonSerdeError(format!("{} must be a string", Self::COMMENT_NAME)))? + .ok_or_else(|| JsonSerdeError { + message: format!("{} must be a string", Self::COMMENT_NAME), + })? .to_owned(); builder = builder.comment(comment.as_str()); } let partition_node = node .get(Self::PARTITION_KEY_NAME) - .ok_or_else(|| { - JsonSerdeError(format!( - "Missing required field: {}", - Self::PARTITION_KEY_NAME - )) + .ok_or_else(|| JsonSerdeError { + message: format!("Missing required field: {}", Self::PARTITION_KEY_NAME), })? .as_array() - .ok_or_else(|| { - JsonSerdeError(format!("{} must be an array", Self::PARTITION_KEY_NAME)) + .ok_or_else(|| JsonSerdeError { + message: format!("{} must be an array", Self::PARTITION_KEY_NAME), })?; let mut partition_keys = Vec::with_capacity(partition_node.len()); @@ -578,11 +590,8 @@ impl JsonSerde for TableDescriptor { partition_keys.push( key_node .as_str() - .ok_or_else(|| { - JsonSerdeError(format!( - "{} element must be a string", - Self::PARTITION_KEY_NAME - )) + .ok_or_else(|| JsonSerdeError { + message: format!("{} element must be a string", Self::PARTITION_KEY_NAME), })? .to_owned(), ); @@ -592,15 +601,17 @@ impl JsonSerde for TableDescriptor { let mut bucket_count = None; let mut bucket_keys = vec![]; if let Some(bucket_key_node) = node.get(Self::BUCKET_KEY_NAME) { - let bucket_key_node = bucket_key_node.as_array().ok_or_else(|| { - JsonSerdeError(format!("{} must be an array", Self::BUCKET_KEY_NAME)) + let bucket_key_node = bucket_key_node.as_array().ok_or_else(|| JsonSerdeError { + message: format!("{} must be an array", Self::BUCKET_KEY_NAME), })?; for key_node in bucket_key_node { bucket_keys.push( key_node .as_str() - .ok_or_else(|| JsonSerdeError("Bucket key must be a string".to_string()))? + .ok_or_else(|| JsonSerdeError { + message: "Bucket key must be a string".to_string(), + })? .to_owned(), ); } @@ -617,18 +628,18 @@ impl JsonSerde for TableDescriptor { // Deserialize properties let properties = Self::deserialize_properties(node.get(Self::PROPERTIES_NAME).ok_or_else(|| { - JsonSerdeError(format!("Missing required field: {}", Self::PROPERTIES_NAME)) + JsonSerdeError { + message: format!("Missing required field: {}", Self::PROPERTIES_NAME), + } })?)?; builder = builder.properties(properties); // Deserialize custom properties let custom_properties = Self::deserialize_properties( - node.get(Self::CUSTOM_PROPERTIES_NAME).ok_or_else(|| { - JsonSerdeError(format!( - "Missing required field: {}", - Self::CUSTOM_PROPERTIES_NAME - )) - })?, + node.get(Self::CUSTOM_PROPERTIES_NAME) + .ok_or_else(|| JsonSerdeError { + message: format!("Missing required field: {}", Self::CUSTOM_PROPERTIES_NAME), + })?, )?; builder = builder.custom_properties(custom_properties); diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 751dd6d..770c4f2 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -16,7 +16,7 @@ // under the License. use crate::error::Error::InvalidTableError; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::metadata::datatype::{DataField, DataType, RowType}; use core::fmt; use serde::{Deserialize, Serialize}; @@ -220,9 +220,9 @@ impl SchemaBuilder { ) -> Result> { let names: Vec<_> = columns.iter().map(|c| &c.name).collect(); if let Some(duplicates) = Self::find_duplicates(&names) { - return Err(InvalidTableError(format!( - "Duplicate column names found: {duplicates:?}" - ))); + return Err(InvalidTableError { + message: format!("Duplicate column names found: {duplicates:?}"), + }); } let Some(pk) = primary_key else { @@ -232,9 +232,9 @@ impl SchemaBuilder { let pk_set: HashSet<_> = pk.column_names.iter().collect(); let all_columns: HashSet<_> = columns.iter().map(|c| &c.name).collect(); if !pk_set.is_subset(&all_columns) { - return Err(InvalidTableError(format!( - "Primary key columns {pk_set:?} not found in schema" - ))); + return Err(InvalidTableError { + message: format!("Primary key columns {pk_set:?} not found in schema"), + }); } Ok(columns @@ -441,12 +441,12 @@ impl TableDescriptor { pub fn replication_factor(&self) -> Result { self.properties .get("table.replication.factor") - .ok_or(InvalidTableError( - "Replication factor is not set".to_string(), - ))? + .ok_or_else(|| InvalidTableError { + message: "Replication factor is not set".to_string(), + })? .parse() - .map_err(|_e| { - InvalidTableError("Replication factor can't be convert into int".to_string()) + .map_err(|_e| InvalidTableError { + message: "Replication factor can't be convert into int".to_string(), }) } @@ -497,11 +497,13 @@ impl TableDescriptor { bucket_keys.retain(|k| !partition_keys.contains(k)); if bucket_keys.is_empty() { - return Err(InvalidTableError(format!( - "Primary Key constraint {:?} should not be same with partition fields {:?}.", - schema.primary_key().unwrap().column_names(), - partition_keys - ))); + return Err(Error::InvalidTableError { + message: format!( + "Primary Key constraint {:?} should not be same with partition fields {:?}.", + schema.primary_key().unwrap().column_names(), + partition_keys + ), + }); } Ok(bucket_keys) @@ -518,10 +520,12 @@ impl TableDescriptor { .iter() .any(|k| partition_keys.contains(k)) { - return Err(InvalidTableError(format!( - "Bucket key {:?} shouldn't include any column in partition keys {:?}.", - distribution.bucket_keys, partition_keys - ))); + return Err(InvalidTableError { + message: format!( + "Bucket key {:?} shouldn't include any column in partition keys {:?}.", + distribution.bucket_keys, partition_keys + ), + }); } return if let Some(pk) = schema.primary_key() { @@ -540,13 +544,15 @@ impl TableDescriptor { .iter() .all(|k| pk_columns.contains(k)) { - return Err(InvalidTableError(format!( - "Bucket keys must be a subset of primary keys excluding partition keys for primary-key tables. \ - The primary keys are {:?}, the partition keys are {:?}, but the user-defined bucket keys are {:?}.", - pk.column_names(), - partition_keys, - distribution.bucket_keys - ))); + return Err(InvalidTableError { + message: format!( + "Bucket keys must be a subset of primary keys excluding partition keys for primary-key tables. \ + The primary keys are {:?}, the partition keys are {:?}, but the user-defined bucket keys are {:?}.", + pk.column_names(), + partition_keys, + distribution.bucket_keys + ), + }); } Ok(Some(distribution)) } @@ -589,7 +595,9 @@ impl LogFormat { match s.to_uppercase().as_str() { "ARROW" => Ok(LogFormat::ARROW), "INDEXED" => Ok(LogFormat::INDEXED), - _ => Err(InvalidTableError(format!("Unknown log format: {s}"))), + _ => Err(InvalidTableError { + message: format!("Unknown log format: {s}"), + }), } } } @@ -615,7 +623,9 @@ impl KvFormat { match s.to_uppercase().as_str() { "INDEXED" => Ok(KvFormat::INDEXED), "COMPACTED" => Ok(KvFormat::COMPACTED), - _ => Err(InvalidTableError(format!("Unknown kv format: {s}"))), + _ => Err(Error::InvalidTableError { + message: format!("Unknown kv format: {s}"), + }), } } } @@ -961,6 +971,24 @@ impl TableBucket { } } +impl Display for TableBucket { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if let Some(partition_id) = self.partition_id { + write!( + f, + "TableBucket(table_id={}, partition_id={}, bucket={})", + self.table_id, partition_id, self.bucket + ) + } else { + write!( + f, + "TableBucket(table_id={}, bucket={})", + self.table_id, self.bucket + ) + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LakeSnapshot { pub snapshot_id: i64, diff --git a/crates/fluss/src/proto/fluss_api.proto b/crates/fluss/src/proto/fluss_api.proto index ef460fc..7072ae3 100644 --- a/crates/fluss/src/proto/fluss_api.proto +++ b/crates/fluss/src/proto/fluss_api.proto @@ -19,6 +19,11 @@ syntax = "proto2"; package proto; +message ErrorResponse { + required int32 error_code = 1; + optional string error_message = 2; +} + // metadata request and response, request send from client to each server. message MetadataRequest { repeated PbTablePath table_path = 1; diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index 3e48703..45bc82a 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -294,18 +294,22 @@ impl Datum<'_> { Datum::String(v) => append_value_to_arrow!(StringBuilder, *v), Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, v.as_ref()), Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) | Datum::TimestampTz(_) => { - return Err(RowConvertError(format!( - "Type {:?} is not yet supported for Arrow conversion", - std::mem::discriminant(self) - ))); + return Err(RowConvertError { + message: format!( + "Type {:?} is not yet supported for Arrow conversion", + std::mem::discriminant(self) + ), + }); } } - Err(RowConvertError(format!( - "Cannot append {:?} to builder of type {}", - self, - std::any::type_name_of_val(builder) - ))) + Err(RowConvertError { + message: format!( + "Cannot append {:?} to builder of type {}", + self, + std::any::type_name_of_val(builder) + ), + }) } } @@ -317,11 +321,13 @@ macro_rules! impl_to_arrow { b.append_value(*self); Ok(()) } else { - Err(RowConvertError(format!( - "Cannot cast {} to {} builder", - stringify!($ty), - stringify!($variant) - ))) + Err(RowConvertError { + message: format!( + "Cannot cast {} to {} builder", + stringify!($ty), + stringify!($variant) + ), + }) } } } diff --git a/crates/fluss/src/rpc/error.rs b/crates/fluss/src/rpc/error.rs index 84b20b1..da3a11e 100644 --- a/crates/fluss/src/rpc/error.rs +++ b/crates/fluss/src/rpc/error.rs @@ -17,6 +17,7 @@ use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; +use prost::DecodeError; use std::sync::Arc; use thiserror::Error; @@ -29,6 +30,9 @@ pub enum RpcError { #[error("Cannot read framed message: {0}")] ReadMessageError(#[from] crate::rpc::frame::ReadError), + #[error("Rpc Decode Error: {0}")] + RpcDecodeError(#[from] DecodeError), + #[error("connection error")] ConnectionError(String), diff --git a/crates/fluss/src/rpc/fluss_api_error.rs b/crates/fluss/src/rpc/fluss_api_error.rs new file mode 100644 index 0000000..b26eb72 --- /dev/null +++ b/crates/fluss/src/rpc/fluss_api_error.rs @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::ErrorResponse; +use std::fmt::{Debug, Display, Formatter}; + +/// API error response from Fluss server +pub struct ApiError { + pub code: i32, + pub message: String, +} + +impl Debug for ApiError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ApiError") + .field("code", &self.code) + .field("message", &self.message) + .finish() + } +} + +impl Display for ApiError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Debug::fmt(self, f) + } +} + +/// Fluss protocol errors. These errors are part of the client-server protocol. +/// The error codes cannot be changed, but the names can be. +/// +/// Do not add exceptions that occur only on the client or only on the server here. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(i32)] +pub enum FlussError { + /// The server experienced an unexpected error when processing the request. + UnknownServerError = -1, + /// No error occurred. + None = 0, + /// The server disconnected before a response was received. + NetworkException = 1, + /// The version of API is not supported. + UnsupportedVersion = 2, + /// This message has failed its CRC checksum, exceeds the valid size, has a null key for a primary key table, or is otherwise corrupt. + CorruptMessage = 3, + /// The database does not exist. + DatabaseNotExist = 4, + /// The database is not empty. + DatabaseNotEmpty = 5, + /// The database already exists. + DatabaseAlreadyExist = 6, + /// The table does not exist. + TableNotExist = 7, + /// The table already exists. + TableAlreadyExist = 8, + /// The schema does not exist. + SchemaNotExist = 9, + /// Exception occur while storage data for log in server. + LogStorageException = 10, + /// Exception occur while storage data for kv in server. + KvStorageException = 11, + /// Not leader or follower. + NotLeaderOrFollower = 12, + /// The record is too large. + RecordTooLargeException = 13, + /// The record is corrupt. + CorruptRecordException = 14, + /// The client has attempted to perform an operation on an invalid table. + InvalidTableException = 15, + /// The client has attempted to perform an operation on an invalid database. + InvalidDatabaseException = 16, + /// The replication factor is larger then the number of available tablet servers. + InvalidReplicationFactor = 17, + /// Produce request specified an invalid value for required acks. + InvalidRequiredAcks = 18, + /// The log offset is out of range. + LogOffsetOutOfRangeException = 19, + /// The table is not primary key table. + NonPrimaryKeyTableException = 20, + /// The table or bucket does not exist. + UnknownTableOrBucketException = 21, + /// The update version is invalid. + InvalidUpdateVersionException = 22, + /// The coordinator is invalid. + InvalidCoordinatorException = 23, + /// The leader epoch is invalid. + FencedLeaderEpochException = 24, + /// The request time out. + RequestTimeOut = 25, + /// The general storage exception. + StorageException = 26, + /// The server did not attempt to execute this operation. + OperationNotAttemptedException = 27, + /// Records are written to the server already, but to fewer in-sync replicas than required. + NotEnoughReplicasAfterAppendException = 28, + /// Messages are rejected since there are fewer in-sync replicas than required. + NotEnoughReplicasException = 29, + /// Get file access security token exception. + SecurityTokenException = 30, + /// The tablet server received an out of order sequence batch. + OutOfOrderSequenceException = 31, + /// The tablet server received a duplicate sequence batch. + DuplicateSequenceException = 32, + /// This exception is raised by the tablet server if it could not locate the writer metadata. + UnknownWriterIdException = 33, + /// The requested column projection is invalid. + InvalidColumnProjection = 34, + /// The requested target column to write is invalid. + InvalidTargetColumn = 35, + /// The partition does not exist. + PartitionNotExists = 36, + /// The table is not partitioned. + TableNotPartitionedException = 37, + /// The timestamp is invalid. + InvalidTimestampException = 38, + /// The config is invalid. + InvalidConfigException = 39, + /// The lake storage is not configured. + LakeStorageNotConfiguredException = 40, + /// The kv snapshot is not exist. + KvSnapshotNotExist = 41, + /// The partition already exists. + PartitionAlreadyExists = 42, + /// The partition spec is invalid. + PartitionSpecInvalidException = 43, + /// There is no currently available leader for the given partition. + LeaderNotAvailableException = 44, + /// Exceed the maximum number of partitions. + PartitionMaxNumException = 45, + /// Authentication failed. + AuthenticateException = 46, + /// Security is disabled. + SecurityDisabledException = 47, + /// Authorization failed. + AuthorizationException = 48, + /// Exceed the maximum number of buckets. + BucketMaxNumException = 49, + /// The tiering epoch is invalid. + FencedTieringEpochException = 50, + /// Authentication failed with retriable exception. + RetriableAuthenticateException = 51, + /// The server rack info is invalid. + InvalidServerRackInfoException = 52, + /// The lake snapshot is not exist. + LakeSnapshotNotExist = 53, + /// The lake table already exists. + LakeTableAlreadyExist = 54, + /// The new ISR contains at least one ineligible replica. + IneligibleReplicaException = 55, + /// The alter table is invalid. + InvalidAlterTableException = 56, + /// Deletion operations are disabled on this table. + DeletionDisabledException = 57, +} + +impl FlussError { + /// Returns the error code for this error. + pub fn code(&self) -> i32 { + *self as i32 + } + + /// Returns a friendly description of the error. + pub fn message(&self) -> &'static str { + match self { + FlussError::UnknownServerError => { + "The server experienced an unexpected error when processing the request." + } + FlussError::None => "No error", + FlussError::NetworkException => { + "The server disconnected before a response was received." + } + FlussError::UnsupportedVersion => "The version of API is not supported.", + FlussError::CorruptMessage => { + "This message has failed its CRC checksum, exceeds the valid size, has a null key for a primary key table, or is otherwise corrupt." + } + FlussError::DatabaseNotExist => "The database does not exist.", + FlussError::DatabaseNotEmpty => "The database is not empty.", + FlussError::DatabaseAlreadyExist => "The database already exists.", + FlussError::TableNotExist => "The table does not exist.", + FlussError::TableAlreadyExist => "The table already exists.", + FlussError::SchemaNotExist => "The schema does not exist.", + FlussError::LogStorageException => { + "Exception occur while storage data for log in server." + } + FlussError::KvStorageException => { + "Exception occur while storage data for kv in server." + } + FlussError::NotLeaderOrFollower => "Not leader or follower.", + FlussError::RecordTooLargeException => "The record is too large.", + FlussError::CorruptRecordException => "The record is corrupt.", + FlussError::InvalidTableException => { + "The client has attempted to perform an operation on an invalid table." + } + FlussError::InvalidDatabaseException => { + "The client has attempted to perform an operation on an invalid database." + } + FlussError::InvalidReplicationFactor => { + "The replication factor is larger then the number of available tablet servers." + } + FlussError::InvalidRequiredAcks => { + "Produce request specified an invalid value for required acks." + } + FlussError::LogOffsetOutOfRangeException => "The log offset is out of range.", + FlussError::NonPrimaryKeyTableException => "The table is not primary key table.", + FlussError::UnknownTableOrBucketException => "The table or bucket does not exist.", + FlussError::InvalidUpdateVersionException => "The update version is invalid.", + FlussError::InvalidCoordinatorException => "The coordinator is invalid.", + FlussError::FencedLeaderEpochException => "The leader epoch is invalid.", + FlussError::RequestTimeOut => "The request time out.", + FlussError::StorageException => "The general storage exception.", + FlussError::OperationNotAttemptedException => { + "The server did not attempt to execute this operation." + } + FlussError::NotEnoughReplicasAfterAppendException => { + "Records are written to the server already, but to fewer in-sync replicas than required." + } + FlussError::NotEnoughReplicasException => { + "Messages are rejected since there are fewer in-sync replicas than required." + } + FlussError::SecurityTokenException => "Get file access security token exception.", + FlussError::OutOfOrderSequenceException => { + "The tablet server received an out of order sequence batch." + } + FlussError::DuplicateSequenceException => { + "The tablet server received a duplicate sequence batch." + } + FlussError::UnknownWriterIdException => { + "This exception is raised by the tablet server if it could not locate the writer metadata." + } + FlussError::InvalidColumnProjection => "The requested column projection is invalid.", + FlussError::InvalidTargetColumn => "The requested target column to write is invalid.", + FlussError::PartitionNotExists => "The partition does not exist.", + FlussError::TableNotPartitionedException => "The table is not partitioned.", + FlussError::InvalidTimestampException => "The timestamp is invalid.", + FlussError::InvalidConfigException => "The config is invalid.", + FlussError::LakeStorageNotConfiguredException => "The lake storage is not configured.", + FlussError::KvSnapshotNotExist => "The kv snapshot does not exist.", + FlussError::PartitionAlreadyExists => "The partition already exists.", + FlussError::PartitionSpecInvalidException => "The partition spec is invalid.", + FlussError::LeaderNotAvailableException => { + "There is no currently available leader for the given partition." + } + FlussError::PartitionMaxNumException => "Exceed the maximum number of partitions.", + FlussError::AuthenticateException => "Authentication failed.", + FlussError::SecurityDisabledException => "Security is disabled.", + FlussError::AuthorizationException => "Authorization failed.", + FlussError::BucketMaxNumException => "Exceed the maximum number of buckets.", + FlussError::FencedTieringEpochException => "The tiering epoch is invalid.", + FlussError::RetriableAuthenticateException => { + "Authentication failed with retriable exception." + } + FlussError::InvalidServerRackInfoException => "The server rack info is invalid.", + FlussError::LakeSnapshotNotExist => "The lake snapshot does not exist.", + FlussError::LakeTableAlreadyExist => "The lake table already exists.", + FlussError::IneligibleReplicaException => { + "The new ISR contains at least one ineligible replica." + } + FlussError::InvalidAlterTableException => "The alter table is invalid.", + FlussError::DeletionDisabledException => { + "Deletion operations are disabled on this table." + } + } + } + + /// Create an ApiError from this error with the default message. + pub fn to_api_error(&self, message: Option) -> ApiError { + ApiError { + code: self.code(), + message: message.unwrap_or(self.message().to_string()), + } + } + + /// Get the FlussError for the given error code. + /// Returns `UnknownServerError` if the code is not recognized. + pub fn for_code(code: i32) -> Self { + match code { + -1 => FlussError::UnknownServerError, + 0 => FlussError::None, + 1 => FlussError::NetworkException, + 2 => FlussError::UnsupportedVersion, + 3 => FlussError::CorruptMessage, + 4 => FlussError::DatabaseNotExist, + 5 => FlussError::DatabaseNotEmpty, + 6 => FlussError::DatabaseAlreadyExist, + 7 => FlussError::TableNotExist, + 8 => FlussError::TableAlreadyExist, + 9 => FlussError::SchemaNotExist, + 10 => FlussError::LogStorageException, + 11 => FlussError::KvStorageException, + 12 => FlussError::NotLeaderOrFollower, + 13 => FlussError::RecordTooLargeException, + 14 => FlussError::CorruptRecordException, + 15 => FlussError::InvalidTableException, + 16 => FlussError::InvalidDatabaseException, + 17 => FlussError::InvalidReplicationFactor, + 18 => FlussError::InvalidRequiredAcks, + 19 => FlussError::LogOffsetOutOfRangeException, + 20 => FlussError::NonPrimaryKeyTableException, + 21 => FlussError::UnknownTableOrBucketException, + 22 => FlussError::InvalidUpdateVersionException, + 23 => FlussError::InvalidCoordinatorException, + 24 => FlussError::FencedLeaderEpochException, + 25 => FlussError::RequestTimeOut, + 26 => FlussError::StorageException, + 27 => FlussError::OperationNotAttemptedException, + 28 => FlussError::NotEnoughReplicasAfterAppendException, + 29 => FlussError::NotEnoughReplicasException, + 30 => FlussError::SecurityTokenException, + 31 => FlussError::OutOfOrderSequenceException, + 32 => FlussError::DuplicateSequenceException, + 33 => FlussError::UnknownWriterIdException, + 34 => FlussError::InvalidColumnProjection, + 35 => FlussError::InvalidTargetColumn, + 36 => FlussError::PartitionNotExists, + 37 => FlussError::TableNotPartitionedException, + 38 => FlussError::InvalidTimestampException, + 39 => FlussError::InvalidConfigException, + 40 => FlussError::LakeStorageNotConfiguredException, + 41 => FlussError::KvSnapshotNotExist, + 42 => FlussError::PartitionAlreadyExists, + 43 => FlussError::PartitionSpecInvalidException, + 44 => FlussError::LeaderNotAvailableException, + 45 => FlussError::PartitionMaxNumException, + 46 => FlussError::AuthenticateException, + 47 => FlussError::SecurityDisabledException, + 48 => FlussError::AuthorizationException, + 49 => FlussError::BucketMaxNumException, + 50 => FlussError::FencedTieringEpochException, + 51 => FlussError::RetriableAuthenticateException, + 52 => FlussError::InvalidServerRackInfoException, + 53 => FlussError::LakeSnapshotNotExist, + 54 => FlussError::LakeTableAlreadyExist, + 55 => FlussError::IneligibleReplicaException, + 56 => FlussError::InvalidAlterTableException, + 57 => FlussError::DeletionDisabledException, + _ => FlussError::UnknownServerError, + } + } +} + +impl Display for FlussError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.message()) + } +} + +impl From for ApiError { + fn from(error_response: ErrorResponse) -> Self { + let fluss_error = FlussError::for_code(error_response.error_code); + fluss_error.to_api_error(error_response.error_message) + } +} + +impl From for FlussError { + fn from(api_error: ApiError) -> Self { + FlussError::for_code(api_error.code) + } +} diff --git a/crates/fluss/src/rpc/frame.rs b/crates/fluss/src/rpc/frame.rs index 44dadc9..81cc094 100644 --- a/crates/fluss/src/rpc/frame.rs +++ b/crates/fluss/src/rpc/frame.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use prost::DecodeError; use thiserror::Error; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; @@ -29,6 +30,9 @@ pub enum ReadError { #[error("Message too large, limit is {limit} bytes but got {actual} bytes")] MessageTooLarge { limit: usize, actual: usize }, + + #[error("Fail to decode error response: {0}")] + ProtoErrorResponseDecodeError(#[from] DecodeError), } pub trait AsyncMessageRead { diff --git a/crates/fluss/src/rpc/message/create_database.rs b/crates/fluss/src/rpc/message/create_database.rs index e4052ef..7d24235 100644 --- a/crates/fluss/src/rpc/message/create_database.rs +++ b/crates/fluss/src/rpc/message/create_database.rs @@ -22,7 +22,8 @@ use crate::error::Result as FlussResult; use crate::proto::CreateDatabaseResponse; use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::ReadError; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; use bytes::{Buf, BufMut}; diff --git a/crates/fluss/src/rpc/message/create_table.rs b/crates/fluss/src/rpc/message/create_table.rs index 5802e71..69865b8 100644 --- a/crates/fluss/src/rpc/message/create_table.rs +++ b/crates/fluss/src/rpc/message/create_table.rs @@ -23,7 +23,8 @@ use crate::proto::CreateTableResponse; use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; use crate::rpc::convert::to_table_path; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::ReadError; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; use bytes::{Buf, BufMut}; diff --git a/crates/fluss/src/rpc/message/database_exists.rs b/crates/fluss/src/rpc/message/database_exists.rs index 795eea1..7e717a4 100644 --- a/crates/fluss/src/rpc/message/database_exists.rs +++ b/crates/fluss/src/rpc/message/database_exists.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. +use crate::rpc::frame::ReadError; + use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; use crate::{impl_read_version_type, impl_write_version_type, proto}; use bytes::{Buf, BufMut}; diff --git a/crates/fluss/src/rpc/message/drop_database.rs b/crates/fluss/src/rpc/message/drop_database.rs index 49cbfaf..663e970 100644 --- a/crates/fluss/src/rpc/message/drop_database.rs +++ b/crates/fluss/src/rpc/message/drop_database.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. +use crate::rpc::frame::ReadError; + use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; use crate::{impl_read_version_type, impl_write_version_type, proto}; use bytes::{Buf, BufMut}; diff --git a/crates/fluss/src/rpc/message/drop_table.rs b/crates/fluss/src/rpc/message/drop_table.rs index 0dbc21b..a2b3f2d 100644 --- a/crates/fluss/src/rpc/message/drop_table.rs +++ b/crates/fluss/src/rpc/message/drop_table.rs @@ -19,10 +19,12 @@ use crate::metadata::TablePath; use crate::{impl_read_version_type, impl_write_version_type, proto}; use crate::proto::DropTableResponse; +use crate::rpc::frame::ReadError; + use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; use crate::rpc::convert::to_table_path; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; use bytes::{Buf, BufMut}; diff --git a/crates/fluss/src/rpc/message/fetch.rs b/crates/fluss/src/rpc/message/fetch.rs index 6ebc5a2..1587606 100644 --- a/crates/fluss/src/rpc/message/fetch.rs +++ b/crates/fluss/src/rpc/message/fetch.rs @@ -16,9 +16,11 @@ // under the License. use crate::proto::FetchLogResponse; +use crate::rpc::frame::ReadError; + use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; use crate::{impl_read_version_type, impl_write_version_type, proto}; use prost::Message; diff --git a/crates/fluss/src/rpc/message/get_database_info.rs b/crates/fluss/src/rpc/message/get_database_info.rs index 85492a8..6468beb 100644 --- a/crates/fluss/src/rpc/message/get_database_info.rs +++ b/crates/fluss/src/rpc/message/get_database_info.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. +use crate::rpc::frame::ReadError; + use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; use crate::{impl_read_version_type, impl_write_version_type, proto}; use bytes::{Buf, BufMut}; diff --git a/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs index a0e186e..a632a15 100644 --- a/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs +++ b/crates/fluss/src/rpc/message/get_latest_lake_snapshot.rs @@ -19,10 +19,12 @@ use crate::proto; use crate::proto::PbTablePath; use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; use crate::metadata::TablePath; +use crate::rpc::frame::ReadError; + use crate::{impl_read_version_type, impl_write_version_type}; use bytes::{Buf, BufMut}; use prost::Message; diff --git a/crates/fluss/src/rpc/message/get_table.rs b/crates/fluss/src/rpc/message/get_table.rs index 4f4d6c7..61657f7 100644 --- a/crates/fluss/src/rpc/message/get_table.rs +++ b/crates/fluss/src/rpc/message/get_table.rs @@ -18,10 +18,12 @@ use crate::proto::{GetTableInfoRequest, GetTableInfoResponse, PbTablePath}; use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; use crate::metadata::TablePath; +use crate::rpc::frame::ReadError; + use crate::{impl_read_version_type, impl_write_version_type}; use bytes::{Buf, BufMut}; use prost::Message; diff --git a/crates/fluss/src/rpc/message/header.rs b/crates/fluss/src/rpc/message/header.rs index fe60f8c..77bda7c 100644 --- a/crates/fluss/src/rpc/message/header.rs +++ b/crates/fluss/src/rpc/message/header.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. +use crate::proto::ErrorResponse; use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; use crate::rpc::frame::{ReadError, WriteError}; use crate::rpc::message::{ReadVersionedType, WriteVersionedType}; use bytes::{Buf, BufMut}; +use prost::Message; #[allow(dead_code)] const REQUEST_HEADER_LENGTH: i32 = 8; @@ -53,9 +55,10 @@ where } } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq)] pub struct ResponseHeader { pub request_id: i32, + pub error_response: Option, } impl ReadVersionedType for ResponseHeader @@ -64,10 +67,17 @@ where { fn read_versioned(reader: &mut R, _version: ApiVersion) -> Result { let resp_type = reader.get_u8(); + let request_id = reader.get_i32(); if resp_type != SUCCESS_RESPONSE { - todo!("handle unsuccess response type"); + let error_response = ErrorResponse::decode(reader)?; + return Ok(ResponseHeader { + request_id, + error_response: Some(error_response), + }); } - let request_id = reader.get_i32(); - Ok(ResponseHeader { request_id }) + Ok(ResponseHeader { + request_id, + error_response: None, + }) } } diff --git a/crates/fluss/src/rpc/message/list_databases.rs b/crates/fluss/src/rpc/message/list_databases.rs index ce5a091..83226ab 100644 --- a/crates/fluss/src/rpc/message/list_databases.rs +++ b/crates/fluss/src/rpc/message/list_databases.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. +use crate::rpc::frame::ReadError; + use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; use crate::{impl_read_version_type, impl_write_version_type, proto}; use bytes::{Buf, BufMut}; diff --git a/crates/fluss/src/rpc/message/list_offsets.rs b/crates/fluss/src/rpc/message/list_offsets.rs index 500db33..9ab1f14 100644 --- a/crates/fluss/src/rpc/message/list_offsets.rs +++ b/crates/fluss/src/rpc/message/list_offsets.rs @@ -20,9 +20,11 @@ use crate::{impl_read_version_type, impl_write_version_type, proto}; use crate::error::Error; use crate::error::Result as FlussResult; use crate::proto::ListOffsetsResponse; +use crate::rpc::frame::ReadError; + use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; use std::collections::HashMap; @@ -108,12 +110,15 @@ impl ListOffsetsResponse { .map(|resp| { if resp.error_code.is_some() { // todo: consider use another suitable error - Err(Error::WriteError(format!( - "Missing offset, error message: {}", - resp.error_message - .as_deref() - .unwrap_or("unknown server exception") - ))) + Err(Error::UnexpectedError { + message: format!( + "Missing offset, error message: {}", + resp.error_message + .as_deref() + .unwrap_or("unknown server exception") + ), + source: None, + }) } else { // if no error msg, offset must exists Ok((resp.bucket_id, resp.offset.unwrap())) diff --git a/crates/fluss/src/rpc/message/list_tables.rs b/crates/fluss/src/rpc/message/list_tables.rs index daf57ea..ff2497a 100644 --- a/crates/fluss/src/rpc/message/list_tables.rs +++ b/crates/fluss/src/rpc/message/list_tables.rs @@ -18,9 +18,11 @@ use crate::{impl_read_version_type, impl_write_version_type, proto}; use crate::proto::ListTablesResponse; +use crate::rpc::frame::ReadError; + use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; use bytes::{Buf, BufMut}; diff --git a/crates/fluss/src/rpc/message/mod.rs b/crates/fluss/src/rpc/message/mod.rs index 230d971..c77caba 100644 --- a/crates/fluss/src/rpc/message/mod.rs +++ b/crates/fluss/src/rpc/message/mod.rs @@ -37,6 +37,7 @@ mod produce_log; mod table_exists; mod update_metadata; +pub use crate::rpc::RpcError; pub use create_database::*; pub use create_table::*; pub use database_exists::*; diff --git a/crates/fluss/src/rpc/message/produce_log.rs b/crates/fluss/src/rpc/message/produce_log.rs index 7da2b59..39bfb3f 100644 --- a/crates/fluss/src/rpc/message/produce_log.rs +++ b/crates/fluss/src/rpc/message/produce_log.rs @@ -17,9 +17,11 @@ use crate::error::Result as FlussResult; use crate::proto::{PbProduceLogReqForBucket, ProduceLogResponse}; +use crate::rpc::frame::ReadError; + use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; use crate::{impl_read_version_type, impl_write_version_type, proto}; use std::sync::Arc; diff --git a/crates/fluss/src/rpc/message/table_exists.rs b/crates/fluss/src/rpc/message/table_exists.rs index 3b71f47..ec98211 100644 --- a/crates/fluss/src/rpc/message/table_exists.rs +++ b/crates/fluss/src/rpc/message/table_exists.rs @@ -22,12 +22,13 @@ use crate::proto::TableExistsResponse; use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; use crate::rpc::convert::to_table_path; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; +use crate::rpc::frame::ReadError; + use bytes::{Buf, BufMut}; use prost::Message; - #[derive(Debug)] pub struct TableExistsRequest { pub inner_request: proto::TableExistsRequest, diff --git a/crates/fluss/src/rpc/message/update_metadata.rs b/crates/fluss/src/rpc/message/update_metadata.rs index 0d8ad64..a6e6288 100644 --- a/crates/fluss/src/rpc/message/update_metadata.rs +++ b/crates/fluss/src/rpc/message/update_metadata.rs @@ -18,10 +18,12 @@ use crate::proto::{MetadataResponse, PbTablePath}; use crate::rpc::api_key::ApiKey; use crate::rpc::api_version::ApiVersion; -use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType}; use crate::metadata::TablePath; +use crate::rpc::frame::ReadError; + use crate::{impl_read_version_type, impl_write_version_type, proto}; use bytes::{Buf, BufMut}; use prost::Message; diff --git a/crates/fluss/src/rpc/mod.rs b/crates/fluss/src/rpc/mod.rs index b8705a3..86e13b1 100644 --- a/crates/fluss/src/rpc/mod.rs +++ b/crates/fluss/src/rpc/mod.rs @@ -17,7 +17,9 @@ mod api_key; mod api_version; -mod error; +pub mod error; +mod fluss_api_error; +pub use fluss_api_error::{ApiError, FlussError}; mod frame; pub mod message; pub use error::*; diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index 4eeda46..2022a5e 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -16,6 +16,7 @@ // under the License. use crate::cluster::ServerNode; +use crate::error::Error; use crate::rpc::api_version::ApiVersion; use crate::rpc::error::RpcError; use crate::rpc::error::RpcError::ConnectionError; @@ -229,7 +230,7 @@ where } } - pub async fn request(&self, msg: R) -> Result + pub async fn request(&self, msg: R) -> Result where R: RequestBody + Send + WriteVersionedType>, R::ResponseBody: ReadVersionedType>>, @@ -248,9 +249,12 @@ where let mut buf = Vec::new(); // write header - header.write_versioned(&mut buf, header_version)?; + header + .write_versioned(&mut buf, header_version) + .map_err(RpcError::WriteMessageError)?; // write message body - msg.write_versioned(&mut buf, body_api_version)?; + msg.write_versioned(&mut buf, body_api_version) + .map_err(RpcError::WriteMessageError)?; let (tx, rx) = channel(); @@ -263,14 +267,21 @@ where ConnectionState::RequestMap(map) => { map.insert(request_id, ActiveRequest { channel: tx }); } - ConnectionState::Poison(e) => return Err(RpcError::Poisoned(Arc::clone(e))), + ConnectionState::Poison(e) => return Err(RpcError::Poisoned(Arc::clone(e)).into()), } self.send_message(buf).await?; _cleanup_on_cancel.message_sent(); let mut response = rx.await.expect("Who closed this channel?!")?; - let body = R::ResponseBody::read_versioned(&mut response.data, body_api_version)?; + if let Some(error_response) = response.header.error_response { + return Err(Error::FlussAPIError { + api_error: crate::rpc::ApiError::from(error_response), + }); + } + + let body = R::ResponseBody::read_versioned(&mut response.data, body_api_version) + .map_err(RpcError::ReadMessageError)?; let read_bytes = response.data.position(); let message_bytes = response.data.into_inner().len() as u64; @@ -280,7 +291,8 @@ where read: read_bytes, api_key: R::API_KEY, api_version: body_api_version, - }); + } + .into()); } Ok(body) } diff --git a/crates/fluss/tests/integration/admin.rs b/crates/fluss/tests/integration/admin.rs index c51373d..ed5c322 100644 --- a/crates/fluss/tests/integration/admin.rs +++ b/crates/fluss/tests/integration/admin.rs @@ -33,6 +33,7 @@ static SHARED_FLUSS_CLUSTER: Lazy>>> = mod admin_test { use super::SHARED_FLUSS_CLUSTER; use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; + use fluss::error::FlussError; use fluss::metadata::{ DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema, TableDescriptor, TablePath, @@ -250,4 +251,35 @@ mod admin_test { // database shouldn't exist now assert_eq!(admin.database_exists(test_db_name).await.unwrap(), false); } + + #[tokio::test] + async fn test_fluss_error_response() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection + .get_admin() + .await + .expect("Failed to get admin client"); + + let table_path = TablePath::new("fluss".to_string(), "not_exist".to_string()); + + let result = admin.get_table(&table_path).await; + assert!(result.is_err(), "Expected error but got Ok"); + + let error = result.unwrap_err(); + match error { + fluss::error::Error::FlussAPIError { api_error } => { + assert_eq!( + api_error.code, + FlussError::TableNotExist.code(), + "Expected error code 7 (TableNotExist)" + ); + assert_eq!( + api_error.message, "Table 'fluss.not_exist' does not exist.", + "Expected specific error message" + ); + } + other => panic!("Expected FlussAPIError, got {:?}", other), + } + } } diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs index f33d440..a30036d 100644 --- a/crates/fluss/tests/integration/table_remote_scan.rs +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -38,8 +38,6 @@ mod table_remote_scan_test { use fluss::row::{GenericRow, InternalRow}; use std::collections::HashMap; use std::sync::Arc; - use std::sync::atomic::AtomicUsize; - use std::sync::atomic::Ordering; use std::thread; use std::thread::sleep; use std::time::Duration; @@ -89,11 +87,13 @@ mod table_remote_scan_test { temp_dir.to_string_lossy().to_string(), ); - let cluster = - FlussTestingClusterBuilder::new_with_cluster_conf("test_table", &cluster_conf) - .with_remote_data_dir(temp_dir) - .build() - .await; + let cluster = FlussTestingClusterBuilder::new_with_cluster_conf( + "test_table_remote", + &cluster_conf, + ) + .with_remote_data_dir(temp_dir) + .build() + .await; let mut guard = cluster_guard.write(); *guard = Some(cluster); });