Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/fluss/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ url = "2.5.7"
async-trait = "0.1.89"
uuid = { version = "1.10", features = ["v4"] }
tempfile= "3.23.0"
snafu = "0.8.3"

[dev-dependencies]
testcontainers = "0.25.0"
Expand Down
28 changes: 15 additions & 13 deletions crates/fluss/src/client/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::rpc::message::{ListOffsetsRequest, OffsetSpec};
use crate::rpc::{RpcClient, ServerConnection};

use crate::BucketId;
use crate::error::Result;
use crate::error::{Error, Result};
use crate::proto::GetTableInfoResponse;
use std::collections::HashMap;
use std::slice::from_ref;
Expand Down Expand Up @@ -245,10 +245,10 @@ impl FlussAdmin {
let mut results = HashMap::new();

for response_future in response_futures {
let offsets = response_future.await.map_err(
// todo: consider use suitable error
|e| crate::error::Error::WriteError(format!("Fail to get result: {e}")),
)?;
let offsets = response_future.await.map_err(|e| Error::UnexpectedError {
message: "Fail to get result for list offsets.".to_string(),
source: Some(Box::new(e)),
})?;
results.extend(offsets?);
}
Ok(results)
Expand All @@ -267,10 +267,11 @@ impl FlussAdmin {
for bucket_id in buckets {
let table_bucket = TableBucket::new(table_id, *bucket_id);
let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
// todo: consider use another suitable error
crate::error::Error::InvalidTableError(format!(
"No leader found for table bucket: table_id={table_id}, bucket_id={bucket_id}"
))
// todo: consider retry?
Error::UnexpectedError {
message: format!("No leader found for table bucket: {table_bucket}."),
source: None,
}
})?;

node_for_bucket_list
Expand Down Expand Up @@ -301,10 +302,11 @@ impl FlussAdmin {
let task = tokio::spawn(async move {
let cluster = metadata.get_cluster();
let tablet_server = cluster.get_tablet_server(leader_id).ok_or_else(|| {
// todo: consider use more suitable error
crate::error::Error::InvalidTableError(format!(
"Tablet server {leader_id} not found"
))
Error::LeaderNotAvailable {
message: format!(
"Tablet server {leader_id} is not found in metadata cache."
),
}
})?;
let connection = rpc_client.get_connection(tablet_server).await?;
let list_offsets_response = connection.request(request).await?;
Expand Down
18 changes: 8 additions & 10 deletions crates/fluss/src/client/table/remote_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord};
use crate::util::delete_file;
use std::collections::HashMap;
use std::io;
use std::path::{Path, PathBuf};
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -99,15 +98,14 @@ impl RemoteLogDownloadFuture {

/// Get the downloaded file path
pub async fn get_file_path(&mut self) -> Result<PathBuf> {
let receiver = self
.receiver
.take()
.ok_or_else(|| Error::Io(io::Error::other("Download future already consumed")))?;

receiver.await.map_err(|e| {
Error::Io(io::Error::other(format!(
"Download future cancelled: {e:?}"
)))
let receiver = self.receiver.take().ok_or_else(|| Error::UnexpectedError {
message: "Downloaded file already consumed".to_string(),
source: None,
})?;

receiver.await.map_err(|e| Error::UnexpectedError {
message: format!("Download future cancelled: {e:?}"),
source: None,
})?
}
}
Expand Down
30 changes: 17 additions & 13 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,20 @@ impl<'a> TableScan<'a> {
/// ```
pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
if column_indices.is_empty() {
return Err(Error::IllegalArgument(
"Column indices cannot be empty".to_string(),
));
return Err(Error::IllegalArgument {
message: "Column indices cannot be empty".to_string(),
});
}
let field_count = self.table_info.row_type().fields().len();
for &idx in column_indices {
if idx >= field_count {
return Err(Error::IllegalArgument(format!(
"Column index {} out of range (max: {})",
idx,
field_count - 1
)));
return Err(Error::IllegalArgument {
message: format!(
"Column index {} out of range (max: {})",
idx,
field_count - 1
),
});
}
}
self.projected_fields = Some(column_indices.to_vec());
Expand All @@ -105,9 +107,9 @@ impl<'a> TableScan<'a> {
/// ```
pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
if column_names.is_empty() {
return Err(Error::IllegalArgument(
"Column names cannot be empty".to_string(),
));
return Err(Error::IllegalArgument {
message: "Column names cannot be empty".to_string(),
});
}
let row_type = self.table_info.row_type();
let mut indices = Vec::new();
Expand All @@ -117,7 +119,9 @@ impl<'a> TableScan<'a> {
.fields()
.iter()
.position(|f| f.name() == *name)
.ok_or_else(|| Error::IllegalArgument(format!("Column '{name}' not found")))?;
.ok_or_else(|| Error::IllegalArgument {
message: format!("Column '{name}' not found"),
})?;
indices.push(idx);
}

Expand Down Expand Up @@ -268,7 +272,7 @@ impl LogFetcher {
// Download and process remote log segments
let mut pos_in_log_segment = remote_fetch_info.first_start_pos;
let mut current_fetch_offset = fetch_offset;
// todo: make segment download parallelly
// todo: make segment download in parallel
for (i, segment) in
remote_fetch_info.remote_log_segments.iter().enumerate()
{
Expand Down
10 changes: 8 additions & 2 deletions crates/fluss/src/client/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,17 @@ impl ResultHandle {
self.receiver
.receive()
.await
.map_err(|e| Error::WriteError(e.to_string()))
.map_err(|e| Error::UnexpectedError {
message: format!("Fail to wait write result {e:?}"),
source: None,
})
}

pub fn result(&self, batch_result: BatchWriteResult) -> Result<(), Error> {
// do nothing, just return empty result
batch_result.map_err(|e| Error::WriteError(e.to_string()))
batch_result.map_err(|e| Error::UnexpectedError {
message: format!("Fail to get write result {e:?}"),
source: None,
})
}
}
11 changes: 7 additions & 4 deletions crates/fluss/src/client/write/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::client::metadata::Metadata;
use crate::client::{ReadyWriteBatch, RecordAccumulator};
use crate::error::Error::WriteError;
use crate::error::Error;
use crate::error::Result;
use crate::metadata::TableBucket;
use crate::proto::ProduceLogResponse;
Expand Down Expand Up @@ -150,9 +150,12 @@ impl Sender {

let cluster = self.metadata.get_cluster();

let destination_node = cluster
.get_tablet_server(destination)
.ok_or(WriteError(String::from("destination node not found")))?;
let destination_node =
cluster
.get_tablet_server(destination)
.ok_or(Error::LeaderNotAvailable {
message: format!("destination node not found in metadata cache {destination}."),
})?;
let connection = self.metadata.get_connection(destination_node).await?;

for (table_id, write_batches) in write_batch_by_table {
Expand Down
17 changes: 12 additions & 5 deletions crates/fluss/src/client/write/writer_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,12 @@ impl WriterClient {

fn get_ack(config: &Config) -> Result<i16> {
let acks = config.writer_acks.as_str();
if acks.eq("all") {
if acks.eq_ignore_ascii_case("all") {
Ok(-1)
} else {
acks.parse::<i16>()
.map_err(|e| Error::IllegalArgument(e.to_string()))
acks.parse::<i16>().map_err(|e| Error::IllegalArgument {
message: format!("invalid writer ack '{acks}': {e}"),
})
}
}

Expand Down Expand Up @@ -133,11 +134,17 @@ impl WriterClient {
self.shutdown_tx
.send(())
.await
.map_err(|e| Error::WriteError(e.to_string()))?;
.map_err(|e| Error::UnexpectedError {
message: format!("Failed to close write client: {e:?}"),
source: None,
})?;

self.sender_join_handle
.await
.map_err(|e| Error::WriteError(e.to_string()))?;
.map_err(|e| Error::UnexpectedError {
message: format!("Failed to close write client: {e:?}"),
source: None,
})?;
Ok(())
}

Expand Down
Loading
Loading