-
Notifications
You must be signed in to change notification settings - Fork 13
Support Cpp bindings #83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@luoyuxia Hi, yuxia, PTAL if u have time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds C++ language bindings for the Fluss client library, enabling C++ applications to interact with Fluss tables. The implementation uses the cxx library to create a safe FFI bridge between Rust and C++, exposing core functionality including connection management, admin operations, table operations, append writers, and log scanners.
Key changes:
- Implements FFI bridge layer using cxx for type-safe Rust-C++ interop
- Adds C++ wrapper classes with RAII resource management for Connection, Admin, Table, AppendWriter, and LogScanner
- Includes comprehensive example demonstrating table creation, data insertion, scanning, and column projection
- Modifies Config struct to manually implement Default trait to avoid conflicts with clap's Parser derive
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
bindings/cpp/src/lib.rs |
Rust FFI implementation defining the bridge interface and core bindings logic |
bindings/cpp/src/types.rs |
Type conversion utilities between FFI types and Fluss core types |
bindings/cpp/src/connection.cpp |
C++ implementation of Connection class |
bindings/cpp/src/admin.cpp |
C++ implementation of Admin class for table management |
bindings/cpp/src/table.cpp |
C++ implementation of Table, AppendWriter, and LogScanner classes |
bindings/cpp/src/ffi_converter.hpp |
Helper utilities for converting between C++ and FFI types |
bindings/cpp/include/fluss.hpp |
Public C++ API header with all class definitions and types |
bindings/cpp/examples/example.cpp |
Comprehensive usage example demonstrating all features |
bindings/cpp/build.rs |
Build script for cxx bridge code generation |
bindings/cpp/Cargo.toml |
Rust package configuration for C++ bindings |
bindings/cpp/CMakeLists.txt |
CMake build configuration |
bindings/cpp/.clang-format |
Code formatting configuration |
bindings/cpp/.gitignore |
Git ignore rules for build artifacts |
crates/fluss/src/config.rs |
Manual Default implementation to avoid clap derive conflicts |
Cargo.toml |
Workspace updated to include cpp bindings |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (!Available()) { | ||
| return utils::make_error(1, "Connection not available"); | ||
| } | ||
|
|
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential memory leak: If out.table_ is already non-null when GetTable is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling out.Destroy() before assigning the new pointer, or check and free the existing resource first.
| // Free any existing resource before overwriting out.table_ | |
| out.Destroy(); |
| } | ||
|
|
||
| Result Connection::Connect(const std::string& bootstrap_server, Connection& out) { | ||
| try { |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential memory leak: If out.conn_ is already non-null when Connect is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling out.Destroy() before assigning the new pointer, or check and free the existing resource first.
| try { | |
| try { | |
| out.Destroy(); |
| if (!Available()) { | ||
| return utils::make_error(1, "Table not available"); | ||
| } | ||
|
|
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential memory leak: If out.writer_ is already non-null when NewAppendWriter is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling out.Destroy() before assigning the new pointer, or check and free the existing resource first.
| // Free any existing resource to prevent memory leak | |
| out.Destroy(); |
| datum.i64_val = array.value(row_id); | ||
| datum | ||
| } | ||
| _ => panic!("Unsupported Time64 unit for column {}", i), |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Panics can cause undefined behavior when crossing FFI boundaries. Instead of panic!, consider returning an error Result or using a default/fallback value. The panic will not be caught by C++ exception handlers and will abort the process.
| } | ||
| _ => panic!("Unsupported Time64 unit for column {}", i), | ||
| }, | ||
| other => panic!("Unsupported Arrow data type for column {}: {:?}", i, other), |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Panics can cause undefined behavior when crossing FFI boundaries. Instead of panic!, consider returning an error Result or using a default/fallback value. The panic will not be caught by C++ exception handlers and will abort the process.
| if (!Available()) { | ||
| return utils::make_error(1, "Table not available"); | ||
| } | ||
|
|
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential memory leak: If out.scanner_ is already non-null when NewLogScanner is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling out.Destroy() before assigning the new pointer, or check and free the existing resource first.
| out.Destroy(); |
| for (size_t idx : column_indices) { | ||
| rust_indices.push_back(idx); | ||
| } | ||
| out.scanner_ = table_->new_log_scanner_with_projection(std::move(rust_indices)); |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential memory leak: If out.scanner_ is already non-null when NewLogScannerWithProjection is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling out.Destroy() before assigning the new pointer, or check and free the existing resource first.
| datum.i32_val = array.value(row_id); | ||
| datum | ||
| } | ||
| _ => panic!("Unsupported Time32 unit for column {}", i), |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Panics can cause undefined behavior when crossing FFI boundaries. Instead of panic!, consider returning an error Result or using a default/fallback value. The panic will not be caught by C++ exception handlers and will abort the process.
| .expect("LargeUtf8 column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_STRING); | ||
| datum.string_val = array.value(row_id).to_string(); | ||
| datum | ||
| } | ||
| ArrowDataType::Binary => { | ||
| let mut datum = new_datum(DATUM_TYPE_BYTES); | ||
| datum.bytes_val = row.get_bytes(i); | ||
| datum | ||
| } | ||
| ArrowDataType::FixedSizeBinary(len) => { | ||
| let mut datum = new_datum(DATUM_TYPE_BYTES); | ||
| datum.bytes_val = row.get_binary(i, *len as usize); | ||
| datum | ||
| } | ||
| ArrowDataType::LargeBinary => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<LargeBinaryArray>() | ||
| .expect("LargeBinary column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_BYTES); | ||
| datum.bytes_val = array.value(row_id).to_vec(); | ||
| datum | ||
| } | ||
| ArrowDataType::Date32 => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<Date32Array>() | ||
| .expect("Date32 column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT32); | ||
| datum.i32_val = array.value(row_id); | ||
| datum | ||
| } | ||
| ArrowDataType::Timestamp(unit, _) => match unit { | ||
| TimeUnit::Second => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<TimestampSecondArray>() | ||
| .expect("Timestamp(second) column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT64); | ||
| datum.i64_val = array.value(row_id); | ||
| datum | ||
| } | ||
| TimeUnit::Millisecond => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<TimestampMillisecondArray>() | ||
| .expect("Timestamp(millisecond) column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT64); | ||
| datum.i64_val = array.value(row_id); | ||
| datum | ||
| } | ||
| TimeUnit::Microsecond => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<TimestampMicrosecondArray>() | ||
| .expect("Timestamp(microsecond) column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT64); | ||
| datum.i64_val = array.value(row_id); | ||
| datum | ||
| } | ||
| TimeUnit::Nanosecond => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<TimestampNanosecondArray>() | ||
| .expect("Timestamp(nanosecond) column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT64); | ||
| datum.i64_val = array.value(row_id); | ||
| datum | ||
| } | ||
| }, | ||
| ArrowDataType::Time32(unit) => match unit { | ||
| TimeUnit::Second => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<Time32SecondArray>() | ||
| .expect("Time32(second) column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT32); | ||
| datum.i32_val = array.value(row_id); | ||
| datum | ||
| } | ||
| TimeUnit::Millisecond => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<Time32MillisecondArray>() | ||
| .expect("Time32(millisecond) column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT32); | ||
| datum.i32_val = array.value(row_id); | ||
| datum | ||
| } | ||
| _ => panic!("Unsupported Time32 unit for column {}", i), | ||
| }, | ||
| ArrowDataType::Time64(unit) => match unit { | ||
| TimeUnit::Microsecond => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<Time64MicrosecondArray>() | ||
| .expect("Time64(microsecond) column expected"); | ||
| let mut datum = new_datum(DATUM_TYPE_INT64); | ||
| datum.i64_val = array.value(row_id); | ||
| datum | ||
| } | ||
| TimeUnit::Nanosecond => { | ||
| let array = record_batch | ||
| .column(i) | ||
| .as_any() | ||
| .downcast_ref::<Time64NanosecondArray>() | ||
| .expect("Time64(nanosecond) column expected"); |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The expect() calls throughout this function can panic if the downcasts fail, which will cause undefined behavior when crossing FFI boundaries. Consider using pattern matching with proper error handling instead of expect(), or return a Result type that can be properly handled by the C++ caller.
| fn new_log_scanner(&self) -> Result<*mut LogScanner, String> { | ||
| let fluss_table = | ||
| fcore::client::FlussTable::new(&self.connection, self.metadata.clone(), self.table_info.clone()); | ||
|
|
||
| let scanner = fluss_table.new_scan().create_log_scanner(); | ||
| let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner })); | ||
| Ok(scanner) | ||
| } | ||
|
|
||
| fn new_log_scanner_with_projection(&self, column_indices: Vec<usize>) -> Result<*mut LogScanner, String> { | ||
| let fluss_table = | ||
| fcore::client::FlussTable::new(&self.connection, self.metadata.clone(), self.table_info.clone()); | ||
|
|
||
| let scan = fluss_table.new_scan(); | ||
| let scan = match scan.project(&column_indices) { | ||
| Ok(s) => s, | ||
| Err(e) => return Err(format!("Failed to project columns: {}", e)), | ||
| }; | ||
| let scanner = scan.create_log_scanner(); | ||
| let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner })); | ||
| Ok(scanner) | ||
| } |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent runtime context handling: new_append_writer uses RUNTIME.enter() but new_log_scanner and new_log_scanner_with_projection do not. If the scanner creation requires async context (similar to the writer), this could lead to runtime panics. Verify whether scanner creation needs runtime context, and if so, add let _enter = RUNTIME.enter(); to these methods as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This inconsistent runtime context handling is intentional. RUNTIME.enter() is only needed when the creation path involves tokio.
-
new_append_writer needs RUNTIME.enter() because it will use tokio::spawn() internally.
-
new_log_scanner and new_log_scanner_with_projection do NOT need RUNTIME.enter() because No tokio::spawn or async operations in the scanner creation path.
Purpose
Linked issue: close #67
Brief change log
Tests
API and Format
Documentation