Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ rust-version = "1.85"

[workspace]
resolver = "2"
members = ["crates/fluss", "crates/examples", "bindings/python"]
members = ["crates/fluss", "crates/examples", "bindings/python", "bindings/cpp"]

[workspace.dependencies]
fluss = { version = "0.1.0", path = "./crates/fluss" }
Expand Down
3 changes: 3 additions & 0 deletions bindings/cpp/.clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
BasedOnStyle: Google
ColumnLimit: 100
IndentWidth: 4
7 changes: 7 additions & 0 deletions bindings/cpp/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
build/
cmake-build-*/
.idea/
*.o
*.a
*.so
*.dylib
105 changes: 105 additions & 0 deletions bindings/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

cmake_minimum_required(VERSION 3.22)

if (POLICY CMP0135)
cmake_policy(SET CMP0135 NEW)
endif()

project(fluss-cpp LANGUAGES CXX)

include(FetchContent)
set(FLUSS_GOOGLETEST_VERSION 1.15.2 CACHE STRING "version of GoogleTest")
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE Debug)
endif()

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

option(FLUSS_ENABLE_ADDRESS_SANITIZER "Enable address sanitizer" OFF)
option(FLUSS_ENABLE_TESTING "Enable building test binary for fluss" OFF)
option(FLUSS_DEV "Enable dev mode" OFF)

if (FLUSS_DEV)
set(FLUSS_ENABLE_ADDRESS_SANITIZER ON)
set(FLUSS_ENABLE_TESTING ON)
endif()

# Get cargo target dir
execute_process(COMMAND cargo locate-project --workspace --message-format plain
OUTPUT_VARIABLE CARGO_TARGET_DIR
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR})
string(REGEX REPLACE "/Cargo.toml\n$" "/target" CARGO_TARGET_DIR "${CARGO_TARGET_DIR}")

set(CARGO_MANIFEST ${PROJECT_SOURCE_DIR}/Cargo.toml)
set(RUST_SOURCE_FILE ${PROJECT_SOURCE_DIR}/src/lib.rs)
set(RUST_BRIDGE_CPP ${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src/lib.rs.cc)
set(RUST_HEADER_FILE ${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src/lib.rs.h)

if (CMAKE_BUILD_TYPE STREQUAL "Debug")
set(RUST_LIB ${CARGO_TARGET_DIR}/debug/${CMAKE_STATIC_LIBRARY_PREFIX}fluss_cpp${CMAKE_STATIC_LIBRARY_SUFFIX})
else()
set(RUST_LIB ${CARGO_TARGET_DIR}/release/${CMAKE_STATIC_LIBRARY_PREFIX}fluss_cpp${CMAKE_STATIC_LIBRARY_SUFFIX})
endif()

set(CPP_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/include
${PROJECT_SOURCE_DIR}/src
${CARGO_TARGET_DIR}/cxxbridge
${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src)

file(GLOB CPP_SOURCE_FILE "src/*.cpp")
file(GLOB CPP_HEADER_FILE "include/*.hpp")

if (NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
list(APPEND CARGO_BUILD_FLAGS "--release")
endif()

add_custom_target(cargo_build
COMMAND cargo build --manifest-path ${CARGO_MANIFEST} ${CARGO_BUILD_FLAGS}
BYPRODUCTS ${RUST_BRIDGE_CPP} ${RUST_LIB} ${RUST_HEADER_FILE}
DEPENDS ${RUST_SOURCE_FILE}
USES_TERMINAL
COMMENT "Running cargo..."
)

add_library(fluss_cpp STATIC ${CPP_SOURCE_FILE} ${RUST_BRIDGE_CPP})
target_sources(fluss_cpp PUBLIC ${CPP_HEADER_FILE})
target_sources(fluss_cpp PRIVATE ${RUST_HEADER_FILE})
target_include_directories(fluss_cpp PUBLIC ${CPP_INCLUDE_DIR})
target_link_libraries(fluss_cpp PUBLIC ${RUST_LIB})
target_link_libraries(fluss_cpp PRIVATE ${CMAKE_DL_LIBS})
if(APPLE)
target_link_libraries(fluss_cpp PUBLIC "-framework CoreFoundation" "-framework Security")
endif()

add_executable(fluss_cpp_example examples/example.cpp)
target_link_libraries(fluss_cpp_example fluss_cpp)
target_include_directories(fluss_cpp_example PUBLIC ${CPP_INCLUDE_DIR})

set_target_properties(fluss_cpp
PROPERTIES ADDITIONAL_CLEAN_FILES ${CARGO_TARGET_DIR}
)
add_dependencies(fluss_cpp cargo_build)

if (FLUSS_ENABLE_ADDRESS_SANITIZER)
target_compile_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined -fno-omit-frame-pointer -fno-common -O1)
target_link_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined)
endif()
36 changes: 36 additions & 0 deletions bindings/cpp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "fluss-cpp"
version = "0.1.0"
edition.workspace = true
rust-version.workspace = true
publish = false

[lib]
crate-type = ["staticlib"]

[dependencies]
anyhow = "1.0"
arrow = { workspace = true }
cxx = "1.0"
fluss = { path = "../../crates/fluss" }
tokio = { version = "1.27", features = ["rt-multi-thread", "macros"] }

[build-dependencies]
cxx-build = "1.0"
24 changes: 24 additions & 0 deletions bindings/cpp/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

fn main() {
cxx_build::bridge("src/lib.rs")
.std("c++17")
.compile("fluss-cpp-bridge");

println!("cargo:rerun-if-changed=src/lib.rs");
}
166 changes: 166 additions & 0 deletions bindings/cpp/examples/example.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "fluss.hpp"

#include <iostream>
#include <vector>

static void check(const char* step, const fluss::Result& r) {
if (!r.Ok()) {
std::cerr << step << " failed: code=" << r.error_code
<< " msg=" << r.error_message << std::endl;
std::exit(1);
}
}

int main() {
// 1) Connect
fluss::Connection conn;
check("connect", fluss::Connection::Connect("127.0.0.1:9123", conn));

// 2) Admin
fluss::Admin admin;
check("get_admin", conn.GetAdmin(admin));

// 3) Schema & descriptor
auto schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int)
.AddColumn("name", fluss::DataType::String)
.AddColumn("score", fluss::DataType::Float)
.AddColumn("age", fluss::DataType::Int)
.Build();

auto descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetBucketCount(1)
.SetProperty("table.log.arrow.compression.type", "NONE")
.SetComment("cpp example table")
.Build();

fluss::TablePath table_path("fluss", "sample_table_cpp_v1");
// ignore_if_exists=true to allow re-run
check("create_table", admin.CreateTable(table_path, descriptor, true));

// 4) Get table
fluss::Table table;
check("get_table", conn.GetTable(table_path, table));

// 5) Writer
fluss::AppendWriter writer;
check("new_append_writer", table.NewAppendWriter(writer));

struct RowData {
int id;
const char* name;
float score;
int age;
};

std::vector<RowData> rows = {
{1, "Alice", 95.2f, 25},
{2, "Bob", 87.2f, 30},
{3, "Charlie", 92.1f, 35},
};

for (const auto& r : rows) {
fluss::GenericRow row;
row.SetInt32(0, r.id);
row.SetString(1, r.name);
row.SetFloat32(2, r.score);
row.SetInt32(3, r.age);
check("append", writer.Append(row));
}
check("flush", writer.Flush());
std::cout << "Wrote " << rows.size() << " rows" << std::endl;

// 6) Scan
fluss::LogScanner scanner;
check("new_log_scanner", table.NewLogScanner(scanner));

auto info = table.GetTableInfo();
int buckets = info.num_buckets;
for (int b = 0; b < buckets; ++b) {
check("subscribe", scanner.Subscribe(b, 0));
}

fluss::ScanRecords records;
check("poll", scanner.Poll(5000, records));

std::cout << "Scanned records: " << records.records.size() << std::endl;
for (const auto& rec : records.records) {
std::cout << " offset=" << rec.offset << " id=" << rec.row.fields[0].i32_val
<< " name=" << rec.row.fields[1].string_val
<< " score=" << rec.row.fields[2].f32_val << " age=" << rec.row.fields[3].i32_val
<< " ts=" << rec.timestamp << std::endl;
}

// 7) Project only id (0) and name (1) columns
std::vector<size_t> projected_columns = {0, 1};
fluss::LogScanner projected_scanner;
check("new_log_scanner_with_projection",
table.NewLogScannerWithProjection(projected_columns, projected_scanner));

for (int b = 0; b < buckets; ++b) {
check("subscribe_projected", projected_scanner.Subscribe(b, 0));
}

fluss::ScanRecords projected_records;
check("poll_projected", projected_scanner.Poll(5000, projected_records));

std::cout << "Projected records: " << projected_records.records.size() << std::endl;

bool projection_verified = true;
for (size_t i = 0; i < projected_records.records.size(); ++i) {
const auto& rec = projected_records.records[i];
const auto& row = rec.row;

if (row.fields.size() != projected_columns.size()) {
std::cerr << "ERROR: Record " << i << " has " << row.fields.size()
<< " fields, expected " << projected_columns.size() << std::endl;
projection_verified = false;
continue;
}

// Verify field types match expected columns
// Column 0 (id) should be Int32, Column 1 (name) should be String
if (row.fields[0].type != fluss::DatumType::Int32) {
std::cerr << "ERROR: Record " << i << " field 0 type mismatch, expected Int32" << std::endl;
projection_verified = false;
}
if (row.fields[1].type != fluss::DatumType::String) {
std::cerr << "ERROR: Record " << i << " field 1 type mismatch, expected String" << std::endl;
projection_verified = false;
}

// Print projected data
if (row.fields[0].type == fluss::DatumType::Int32 &&
row.fields[1].type == fluss::DatumType::String) {
std::cout << " Record " << i << ": id=" << row.fields[0].i32_val
<< ", name=" << row.fields[1].string_val << std::endl;
}
}

if (projection_verified) {
std::cout << "Column pruning verification passed!" << std::endl;
} else {
std::cerr << "Column pruning verification failed!" << std::endl;
std::exit(1);
}

return 0;
}
Loading