Skip to content

Conversation

@zhaohaidao
Copy link
Contributor

Purpose

Linked issue: close #67

Brief change log

Tests

API and Format

Documentation

@zhaohaidao zhaohaidao changed the title (WIP)Support Cpp bindings Support Cpp bindings Dec 8, 2025
@zhaohaidao
Copy link
Contributor Author

@luoyuxia Hi, yuxia, PTAL if u have time.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds C++ language bindings for the Fluss client library, enabling C++ applications to interact with Fluss tables. The implementation uses the cxx library to create a safe FFI bridge between Rust and C++, exposing core functionality including connection management, admin operations, table operations, append writers, and log scanners.

Key changes:

  • Implements FFI bridge layer using cxx for type-safe Rust-C++ interop
  • Adds C++ wrapper classes with RAII resource management for Connection, Admin, Table, AppendWriter, and LogScanner
  • Includes comprehensive example demonstrating table creation, data insertion, scanning, and column projection
  • Modifies Config struct to manually implement Default trait to avoid conflicts with clap's Parser derive

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 13 comments.

Show a summary per file
File Description
bindings/cpp/src/lib.rs Rust FFI implementation defining the bridge interface and core bindings logic
bindings/cpp/src/types.rs Type conversion utilities between FFI types and Fluss core types
bindings/cpp/src/connection.cpp C++ implementation of Connection class
bindings/cpp/src/admin.cpp C++ implementation of Admin class for table management
bindings/cpp/src/table.cpp C++ implementation of Table, AppendWriter, and LogScanner classes
bindings/cpp/src/ffi_converter.hpp Helper utilities for converting between C++ and FFI types
bindings/cpp/include/fluss.hpp Public C++ API header with all class definitions and types
bindings/cpp/examples/example.cpp Comprehensive usage example demonstrating all features
bindings/cpp/build.rs Build script for cxx bridge code generation
bindings/cpp/Cargo.toml Rust package configuration for C++ bindings
bindings/cpp/CMakeLists.txt CMake build configuration
bindings/cpp/.clang-format Code formatting configuration
bindings/cpp/.gitignore Git ignore rules for build artifacts
crates/fluss/src/config.rs Manual Default implementation to avoid clap derive conflicts
Cargo.toml Workspace updated to include cpp bindings

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

if (!Available()) {
return utils::make_error(1, "Connection not available");
}

Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential memory leak: If out.table_ is already non-null when GetTable is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling out.Destroy() before assigning the new pointer, or check and free the existing resource first.

Suggested change
// Free any existing resource before overwriting out.table_
out.Destroy();

Copilot uses AI. Check for mistakes.
}

Result Connection::Connect(const std::string& bootstrap_server, Connection& out) {
try {
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential memory leak: If out.conn_ is already non-null when Connect is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling out.Destroy() before assigning the new pointer, or check and free the existing resource first.

Suggested change
try {
try {
out.Destroy();

Copilot uses AI. Check for mistakes.
if (!Available()) {
return utils::make_error(1, "Table not available");
}

Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential memory leak: If out.writer_ is already non-null when NewAppendWriter is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling out.Destroy() before assigning the new pointer, or check and free the existing resource first.

Suggested change
// Free any existing resource to prevent memory leak
out.Destroy();

Copilot uses AI. Check for mistakes.
datum.i64_val = array.value(row_id);
datum
}
_ => panic!("Unsupported Time64 unit for column {}", i),
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panics can cause undefined behavior when crossing FFI boundaries. Instead of panic!, consider returning an error Result or using a default/fallback value. The panic will not be caught by C++ exception handlers and will abort the process.

Copilot uses AI. Check for mistakes.
}
_ => panic!("Unsupported Time64 unit for column {}", i),
},
other => panic!("Unsupported Arrow data type for column {}: {:?}", i, other),
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panics can cause undefined behavior when crossing FFI boundaries. Instead of panic!, consider returning an error Result or using a default/fallback value. The panic will not be caught by C++ exception handlers and will abort the process.

Copilot uses AI. Check for mistakes.
if (!Available()) {
return utils::make_error(1, "Table not available");
}

Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential memory leak: If out.scanner_ is already non-null when NewLogScanner is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling out.Destroy() before assigning the new pointer, or check and free the existing resource first.

Suggested change
out.Destroy();

Copilot uses AI. Check for mistakes.
for (size_t idx : column_indices) {
rust_indices.push_back(idx);
}
out.scanner_ = table_->new_log_scanner_with_projection(std::move(rust_indices));
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential memory leak: If out.scanner_ is already non-null when NewLogScannerWithProjection is called, the existing resource will be overwritten without being freed, causing a memory leak. Consider calling out.Destroy() before assigning the new pointer, or check and free the existing resource first.

Copilot uses AI. Check for mistakes.
datum.i32_val = array.value(row_id);
datum
}
_ => panic!("Unsupported Time32 unit for column {}", i),
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panics can cause undefined behavior when crossing FFI boundaries. Instead of panic!, consider returning an error Result or using a default/fallback value. The panic will not be caught by C++ exception handlers and will abort the process.

Copilot uses AI. Check for mistakes.
Comment on lines +350 to +466
.expect("LargeUtf8 column expected");
let mut datum = new_datum(DATUM_TYPE_STRING);
datum.string_val = array.value(row_id).to_string();
datum
}
ArrowDataType::Binary => {
let mut datum = new_datum(DATUM_TYPE_BYTES);
datum.bytes_val = row.get_bytes(i);
datum
}
ArrowDataType::FixedSizeBinary(len) => {
let mut datum = new_datum(DATUM_TYPE_BYTES);
datum.bytes_val = row.get_binary(i, *len as usize);
datum
}
ArrowDataType::LargeBinary => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<LargeBinaryArray>()
.expect("LargeBinary column expected");
let mut datum = new_datum(DATUM_TYPE_BYTES);
datum.bytes_val = array.value(row_id).to_vec();
datum
}
ArrowDataType::Date32 => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<Date32Array>()
.expect("Date32 column expected");
let mut datum = new_datum(DATUM_TYPE_INT32);
datum.i32_val = array.value(row_id);
datum
}
ArrowDataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<TimestampSecondArray>()
.expect("Timestamp(second) column expected");
let mut datum = new_datum(DATUM_TYPE_INT64);
datum.i64_val = array.value(row_id);
datum
}
TimeUnit::Millisecond => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.expect("Timestamp(millisecond) column expected");
let mut datum = new_datum(DATUM_TYPE_INT64);
datum.i64_val = array.value(row_id);
datum
}
TimeUnit::Microsecond => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.expect("Timestamp(microsecond) column expected");
let mut datum = new_datum(DATUM_TYPE_INT64);
datum.i64_val = array.value(row_id);
datum
}
TimeUnit::Nanosecond => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.expect("Timestamp(nanosecond) column expected");
let mut datum = new_datum(DATUM_TYPE_INT64);
datum.i64_val = array.value(row_id);
datum
}
},
ArrowDataType::Time32(unit) => match unit {
TimeUnit::Second => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<Time32SecondArray>()
.expect("Time32(second) column expected");
let mut datum = new_datum(DATUM_TYPE_INT32);
datum.i32_val = array.value(row_id);
datum
}
TimeUnit::Millisecond => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<Time32MillisecondArray>()
.expect("Time32(millisecond) column expected");
let mut datum = new_datum(DATUM_TYPE_INT32);
datum.i32_val = array.value(row_id);
datum
}
_ => panic!("Unsupported Time32 unit for column {}", i),
},
ArrowDataType::Time64(unit) => match unit {
TimeUnit::Microsecond => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<Time64MicrosecondArray>()
.expect("Time64(microsecond) column expected");
let mut datum = new_datum(DATUM_TYPE_INT64);
datum.i64_val = array.value(row_id);
datum
}
TimeUnit::Nanosecond => {
let array = record_batch
.column(i)
.as_any()
.downcast_ref::<Time64NanosecondArray>()
.expect("Time64(nanosecond) column expected");
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expect() calls throughout this function can panic if the downcasts fail, which will cause undefined behavior when crossing FFI boundaries. Consider using pattern matching with proper error handling instead of expect(), or return a Result type that can be properly handled by the C++ caller.

Copilot uses AI. Check for mistakes.
Comment on lines +402 to +423
fn new_log_scanner(&self) -> Result<*mut LogScanner, String> {
let fluss_table =
fcore::client::FlussTable::new(&self.connection, self.metadata.clone(), self.table_info.clone());

let scanner = fluss_table.new_scan().create_log_scanner();
let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
Ok(scanner)
}

fn new_log_scanner_with_projection(&self, column_indices: Vec<usize>) -> Result<*mut LogScanner, String> {
let fluss_table =
fcore::client::FlussTable::new(&self.connection, self.metadata.clone(), self.table_info.clone());

let scan = fluss_table.new_scan();
let scan = match scan.project(&column_indices) {
Ok(s) => s,
Err(e) => return Err(format!("Failed to project columns: {}", e)),
};
let scanner = scan.create_log_scanner();
let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner }));
Ok(scanner)
}
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent runtime context handling: new_append_writer uses RUNTIME.enter() but new_log_scanner and new_log_scanner_with_projection do not. If the scanner creation requires async context (similar to the writer), this could lead to runtime panics. Verify whether scanner creation needs runtime context, and if so, add let _enter = RUNTIME.enter(); to these methods as well.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This inconsistent runtime context handling is intentional. RUNTIME.enter() is only needed when the creation path involves tokio.

  • new_append_writer needs RUNTIME.enter() because it will use tokio::spawn() internally.

  • new_log_scanner and new_log_scanner_with_projection do NOT need RUNTIME.enter() because No tokio::spawn or async operations in the scanner creation path.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce cpp binding

1 participant