diff --git a/Cargo.toml b/Cargo.toml index 15bcb79..b4ac03b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ rust-version = "1.85" [workspace] resolver = "2" -members = ["crates/fluss", "crates/examples", "bindings/python"] +members = ["crates/fluss", "crates/examples", "bindings/python", "bindings/cpp"] [workspace.dependencies] fluss = { version = "0.1.0", path = "./crates/fluss" } diff --git a/bindings/cpp/.clang-format b/bindings/cpp/.clang-format new file mode 100644 index 0000000..6804ec6 --- /dev/null +++ b/bindings/cpp/.clang-format @@ -0,0 +1,3 @@ +BasedOnStyle: Google +ColumnLimit: 100 +IndentWidth: 4 diff --git a/bindings/cpp/.gitignore b/bindings/cpp/.gitignore new file mode 100644 index 0000000..6836e70 --- /dev/null +++ b/bindings/cpp/.gitignore @@ -0,0 +1,7 @@ +build/ +cmake-build-*/ +.idea/ +*.o +*.a +*.so +*.dylib diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt new file mode 100644 index 0000000..9d99ede --- /dev/null +++ b/bindings/cpp/CMakeLists.txt @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cmake_minimum_required(VERSION 3.22) + +if (POLICY CMP0135) + cmake_policy(SET CMP0135 NEW) +endif() + +project(fluss-cpp LANGUAGES CXX) + +include(FetchContent) +set(FLUSS_GOOGLETEST_VERSION 1.15.2 CACHE STRING "version of GoogleTest") +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) + +if (NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE Debug) +endif() + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +option(FLUSS_ENABLE_ADDRESS_SANITIZER "Enable address sanitizer" OFF) +option(FLUSS_ENABLE_TESTING "Enable building test binary for fluss" OFF) +option(FLUSS_DEV "Enable dev mode" OFF) + +if (FLUSS_DEV) + set(FLUSS_ENABLE_ADDRESS_SANITIZER ON) + set(FLUSS_ENABLE_TESTING ON) +endif() + +# Get cargo target dir +execute_process(COMMAND cargo locate-project --workspace --message-format plain + OUTPUT_VARIABLE CARGO_TARGET_DIR + WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}) +string(REGEX REPLACE "/Cargo.toml\n$" "/target" CARGO_TARGET_DIR "${CARGO_TARGET_DIR}") + +set(CARGO_MANIFEST ${PROJECT_SOURCE_DIR}/Cargo.toml) +set(RUST_SOURCE_FILE ${PROJECT_SOURCE_DIR}/src/lib.rs) +set(RUST_BRIDGE_CPP ${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src/lib.rs.cc) +set(RUST_HEADER_FILE ${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src/lib.rs.h) + +if (CMAKE_BUILD_TYPE STREQUAL "Debug") + set(RUST_LIB ${CARGO_TARGET_DIR}/debug/${CMAKE_STATIC_LIBRARY_PREFIX}fluss_cpp${CMAKE_STATIC_LIBRARY_SUFFIX}) +else() + set(RUST_LIB ${CARGO_TARGET_DIR}/release/${CMAKE_STATIC_LIBRARY_PREFIX}fluss_cpp${CMAKE_STATIC_LIBRARY_SUFFIX}) +endif() + +set(CPP_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/include + ${PROJECT_SOURCE_DIR}/src + ${CARGO_TARGET_DIR}/cxxbridge + ${CARGO_TARGET_DIR}/cxxbridge/fluss-cpp/src) + +file(GLOB CPP_SOURCE_FILE "src/*.cpp") +file(GLOB CPP_HEADER_FILE "include/*.hpp") + +if (NOT CMAKE_BUILD_TYPE STREQUAL "Debug") + list(APPEND CARGO_BUILD_FLAGS "--release") +endif() + +add_custom_target(cargo_build + COMMAND cargo build --manifest-path ${CARGO_MANIFEST} ${CARGO_BUILD_FLAGS} + BYPRODUCTS ${RUST_BRIDGE_CPP} ${RUST_LIB} ${RUST_HEADER_FILE} + DEPENDS ${RUST_SOURCE_FILE} + USES_TERMINAL + COMMENT "Running cargo..." +) + +add_library(fluss_cpp STATIC ${CPP_SOURCE_FILE} ${RUST_BRIDGE_CPP}) +target_sources(fluss_cpp PUBLIC ${CPP_HEADER_FILE}) +target_sources(fluss_cpp PRIVATE ${RUST_HEADER_FILE}) +target_include_directories(fluss_cpp PUBLIC ${CPP_INCLUDE_DIR}) +target_link_libraries(fluss_cpp PUBLIC ${RUST_LIB}) +target_link_libraries(fluss_cpp PRIVATE ${CMAKE_DL_LIBS}) +if(APPLE) + target_link_libraries(fluss_cpp PUBLIC "-framework CoreFoundation" "-framework Security") +endif() + +add_executable(fluss_cpp_example examples/example.cpp) +target_link_libraries(fluss_cpp_example fluss_cpp) +target_include_directories(fluss_cpp_example PUBLIC ${CPP_INCLUDE_DIR}) + +set_target_properties(fluss_cpp + PROPERTIES ADDITIONAL_CLEAN_FILES ${CARGO_TARGET_DIR} +) +add_dependencies(fluss_cpp cargo_build) + +if (FLUSS_ENABLE_ADDRESS_SANITIZER) + target_compile_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined -fno-omit-frame-pointer -fno-common -O1) + target_link_options(fluss_cpp PRIVATE -fsanitize=leak,address,undefined) +endif() \ No newline at end of file diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml new file mode 100644 index 0000000..2d3d913 --- /dev/null +++ b/bindings/cpp/Cargo.toml @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "fluss-cpp" +version = "0.1.0" +edition.workspace = true +rust-version.workspace = true +publish = false + +[lib] +crate-type = ["staticlib"] + +[dependencies] +anyhow = "1.0" +arrow = { workspace = true } +cxx = "1.0" +fluss = { path = "../../crates/fluss" } +tokio = { version = "1.27", features = ["rt-multi-thread", "macros"] } + +[build-dependencies] +cxx-build = "1.0" diff --git a/bindings/cpp/build.rs b/bindings/cpp/build.rs new file mode 100644 index 0000000..ec75e24 --- /dev/null +++ b/bindings/cpp/build.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +fn main() { + cxx_build::bridge("src/lib.rs") + .std("c++17") + .compile("fluss-cpp-bridge"); + + println!("cargo:rerun-if-changed=src/lib.rs"); +} diff --git a/bindings/cpp/examples/example.cpp b/bindings/cpp/examples/example.cpp new file mode 100644 index 0000000..5146f28 --- /dev/null +++ b/bindings/cpp/examples/example.cpp @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "fluss.hpp" + +#include +#include + +static void check(const char* step, const fluss::Result& r) { + if (!r.Ok()) { + std::cerr << step << " failed: code=" << r.error_code + << " msg=" << r.error_message << std::endl; + std::exit(1); + } +} + +int main() { + // 1) Connect + fluss::Connection conn; + check("connect", fluss::Connection::Connect("127.0.0.1:9123", conn)); + + // 2) Admin + fluss::Admin admin; + check("get_admin", conn.GetAdmin(admin)); + + // 3) Schema & descriptor + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int) + .AddColumn("name", fluss::DataType::String) + .AddColumn("score", fluss::DataType::Float) + .AddColumn("age", fluss::DataType::Int) + .Build(); + + auto descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetBucketCount(1) + .SetProperty("table.log.arrow.compression.type", "NONE") + .SetComment("cpp example table") + .Build(); + + fluss::TablePath table_path("fluss", "sample_table_cpp_v1"); + // ignore_if_exists=true to allow re-run + check("create_table", admin.CreateTable(table_path, descriptor, true)); + + // 4) Get table + fluss::Table table; + check("get_table", conn.GetTable(table_path, table)); + + // 5) Writer + fluss::AppendWriter writer; + check("new_append_writer", table.NewAppendWriter(writer)); + + struct RowData { + int id; + const char* name; + float score; + int age; + }; + + std::vector rows = { + {1, "Alice", 95.2f, 25}, + {2, "Bob", 87.2f, 30}, + {3, "Charlie", 92.1f, 35}, + }; + + for (const auto& r : rows) { + fluss::GenericRow row; + row.SetInt32(0, r.id); + row.SetString(1, r.name); + row.SetFloat32(2, r.score); + row.SetInt32(3, r.age); + check("append", writer.Append(row)); + } + check("flush", writer.Flush()); + std::cout << "Wrote " << rows.size() << " rows" << std::endl; + + // 6) Scan + fluss::LogScanner scanner; + check("new_log_scanner", table.NewLogScanner(scanner)); + + auto info = table.GetTableInfo(); + int buckets = info.num_buckets; + for (int b = 0; b < buckets; ++b) { + check("subscribe", scanner.Subscribe(b, 0)); + } + + fluss::ScanRecords records; + check("poll", scanner.Poll(5000, records)); + + std::cout << "Scanned records: " << records.records.size() << std::endl; + for (const auto& rec : records.records) { + std::cout << " offset=" << rec.offset << " id=" << rec.row.fields[0].i32_val + << " name=" << rec.row.fields[1].string_val + << " score=" << rec.row.fields[2].f32_val << " age=" << rec.row.fields[3].i32_val + << " ts=" << rec.timestamp << std::endl; + } + + // 7) Project only id (0) and name (1) columns + std::vector projected_columns = {0, 1}; + fluss::LogScanner projected_scanner; + check("new_log_scanner_with_projection", + table.NewLogScannerWithProjection(projected_columns, projected_scanner)); + + for (int b = 0; b < buckets; ++b) { + check("subscribe_projected", projected_scanner.Subscribe(b, 0)); + } + + fluss::ScanRecords projected_records; + check("poll_projected", projected_scanner.Poll(5000, projected_records)); + + std::cout << "Projected records: " << projected_records.records.size() << std::endl; + + bool projection_verified = true; + for (size_t i = 0; i < projected_records.records.size(); ++i) { + const auto& rec = projected_records.records[i]; + const auto& row = rec.row; + + if (row.fields.size() != projected_columns.size()) { + std::cerr << "ERROR: Record " << i << " has " << row.fields.size() + << " fields, expected " << projected_columns.size() << std::endl; + projection_verified = false; + continue; + } + + // Verify field types match expected columns + // Column 0 (id) should be Int32, Column 1 (name) should be String + if (row.fields[0].type != fluss::DatumType::Int32) { + std::cerr << "ERROR: Record " << i << " field 0 type mismatch, expected Int32" << std::endl; + projection_verified = false; + } + if (row.fields[1].type != fluss::DatumType::String) { + std::cerr << "ERROR: Record " << i << " field 1 type mismatch, expected String" << std::endl; + projection_verified = false; + } + + // Print projected data + if (row.fields[0].type == fluss::DatumType::Int32 && + row.fields[1].type == fluss::DatumType::String) { + std::cout << " Record " << i << ": id=" << row.fields[0].i32_val + << ", name=" << row.fields[1].string_val << std::endl; + } + } + + if (projection_verified) { + std::cout << "Column pruning verification passed!" << std::endl; + } else { + std::cerr << "Column pruning verification failed!" << std::endl; + std::exit(1); + } + + return 0; +} diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp new file mode 100644 index 0000000..f5ff0b1 --- /dev/null +++ b/bindings/cpp/include/fluss.hpp @@ -0,0 +1,461 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace fluss { + +namespace ffi { + struct Connection; + struct Admin; + struct Table; + struct AppendWriter; + struct LogScanner; +} // namespace ffi + +enum class DataType { + Boolean = 1, + TinyInt = 2, + SmallInt = 3, + Int = 4, + BigInt = 5, + Float = 6, + Double = 7, + String = 8, + Bytes = 9, + Date = 10, + Time = 11, + Timestamp = 12, + TimestampLtz = 13, +}; + +enum class DatumType { + Null = 0, + Bool = 1, + Int32 = 2, + Int64 = 3, + Float32 = 4, + Float64 = 5, + String = 6, + Bytes = 7, +}; + +struct Result { + int32_t error_code{0}; + std::string error_message; + + bool Ok() const { return error_code == 0; } +}; + +struct TablePath { + std::string database_name; + std::string table_name; + + TablePath() = default; + TablePath(std::string db, std::string tbl) + : database_name(std::move(db)), table_name(std::move(tbl)) {} + + std::string ToString() const { return database_name + "." + table_name; } +}; + +struct Column { + std::string name; + DataType data_type; + std::string comment; +}; + +struct Schema { + std::vector columns; + std::vector primary_keys; + + class Builder { + public: + Builder& AddColumn(std::string name, DataType type, + std::string comment = "") { + columns_.push_back({std::move(name), type, std::move(comment)}); + return *this; + } + + Builder& SetPrimaryKeys(std::vector keys) { + primary_keys_ = std::move(keys); + return *this; + } + + Schema Build() { + return Schema{std::move(columns_), std::move(primary_keys_)}; + } + + private: + std::vector columns_; + std::vector primary_keys_; + }; + + static Builder NewBuilder() { return Builder(); } +}; + +struct TableDescriptor { + Schema schema; + std::vector partition_keys; + int32_t bucket_count{0}; + std::vector bucket_keys; + std::unordered_map properties; + std::string comment; + + class Builder { + public: + Builder& SetSchema(Schema s) { + schema_ = std::move(s); + return *this; + } + + Builder& SetPartitionKeys(std::vector keys) { + partition_keys_ = std::move(keys); + return *this; + } + + Builder& SetBucketCount(int32_t count) { + bucket_count_ = count; + return *this; + } + + Builder& SetBucketKeys(std::vector keys) { + bucket_keys_ = std::move(keys); + return *this; + } + + Builder& SetProperty(std::string key, std::string value) { + properties_[std::move(key)] = std::move(value); + return *this; + } + + Builder& SetComment(std::string comment) { + comment_ = std::move(comment); + return *this; + } + + TableDescriptor Build() { + return TableDescriptor{std::move(schema_), + std::move(partition_keys_), + bucket_count_, + std::move(bucket_keys_), + std::move(properties_), + std::move(comment_)}; + } + + private: + Schema schema_; + std::vector partition_keys_; + int32_t bucket_count_{0}; + std::vector bucket_keys_; + std::unordered_map properties_; + std::string comment_; + }; + + static Builder NewBuilder() { return Builder(); } +}; + +struct TableInfo { + int64_t table_id; + int32_t schema_id; + TablePath table_path; + int64_t created_time; + int64_t modified_time; + std::vector primary_keys; + std::vector bucket_keys; + std::vector partition_keys; + int32_t num_buckets; + bool has_primary_key; + bool is_partitioned; + std::unordered_map properties; + std::string comment; + Schema schema; +}; + +struct Datum { + DatumType type{DatumType::Null}; + bool bool_val{false}; + int32_t i32_val{0}; + int64_t i64_val{0}; + float f32_val{0.0F}; + double f64_val{0.0}; + std::string string_val; + std::vector bytes_val; + + static Datum Null() { return Datum(); } + static Datum Bool(bool v) { + Datum d; + d.type = DatumType::Bool; + d.bool_val = v; + return d; + } + static Datum Int32(int32_t v) { + Datum d; + d.type = DatumType::Int32; + d.i32_val = v; + return d; + } + static Datum Int64(int64_t v) { + Datum d; + d.type = DatumType::Int64; + d.i64_val = v; + return d; + } + static Datum Float32(float v) { + Datum d; + d.type = DatumType::Float32; + d.f32_val = v; + return d; + } + static Datum Float64(double v) { + Datum d; + d.type = DatumType::Float64; + d.f64_val = v; + return d; + } + static Datum String(std::string v) { + Datum d; + d.type = DatumType::String; + d.string_val = std::move(v); + return d; + } + static Datum Bytes(std::vector v) { + Datum d; + d.type = DatumType::Bytes; + d.bytes_val = std::move(v); + return d; + } +}; + +struct GenericRow { + std::vector fields; + + void SetNull(size_t idx) { + EnsureSize(idx); + fields[idx] = Datum::Null(); + } + + void SetBool(size_t idx, bool v) { + EnsureSize(idx); + fields[idx] = Datum::Bool(v); + } + + void SetInt32(size_t idx, int32_t v) { + EnsureSize(idx); + fields[idx] = Datum::Int32(v); + } + + void SetInt64(size_t idx, int64_t v) { + EnsureSize(idx); + fields[idx] = Datum::Int64(v); + } + + void SetFloat32(size_t idx, float v) { + EnsureSize(idx); + fields[idx] = Datum::Float32(v); + } + + void SetFloat64(size_t idx, double v) { + EnsureSize(idx); + fields[idx] = Datum::Float64(v); + } + + void SetString(size_t idx, std::string v) { + EnsureSize(idx); + fields[idx] = Datum::String(std::move(v)); + } + + void SetBytes(size_t idx, std::vector v) { + EnsureSize(idx); + fields[idx] = Datum::Bytes(std::move(v)); + } + +private: + void EnsureSize(size_t idx) { + if (fields.size() <= idx) { + fields.resize(idx + 1); + } + } +}; + +struct ScanRecord { + int64_t offset; + int64_t timestamp; + GenericRow row; +}; + +struct ScanRecords { + std::vector records; + + size_t Size() const { return records.size(); } + bool Empty() const { return records.empty(); } + const ScanRecord& operator[](size_t idx) const { return records[idx]; } + + auto begin() const { return records.begin(); } + auto end() const { return records.end(); } +}; + +struct BucketOffset { + int64_t table_id; + int64_t partition_id; + int32_t bucket_id; + int64_t offset; +}; + +struct LakeSnapshot { + int64_t snapshot_id; + std::vector bucket_offsets; +}; + +class AppendWriter; +class LogScanner; +class Admin; +class Table; + +class Connection { +public: + Connection() noexcept; + ~Connection() noexcept; + + Connection(const Connection&) = delete; + Connection& operator=(const Connection&) = delete; + Connection(Connection&& other) noexcept; + Connection& operator=(Connection&& other) noexcept; + + static Result Connect(const std::string& bootstrap_server, Connection& out); + + bool Available() const; + + Result GetAdmin(Admin& out); + Result GetTable(const TablePath& table_path, Table& out); + +private: + void Destroy() noexcept; + ffi::Connection* conn_{nullptr}; +}; + +class Admin { +public: + Admin() noexcept; + ~Admin() noexcept; + + Admin(const Admin&) = delete; + Admin& operator=(const Admin&) = delete; + Admin(Admin&& other) noexcept; + Admin& operator=(Admin&& other) noexcept; + + bool Available() const; + + Result CreateTable(const TablePath& table_path, + const TableDescriptor& descriptor, + bool ignore_if_exists = false); + + Result GetTable(const TablePath& table_path, TableInfo& out); + + Result GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& out); + +private: + friend class Connection; + Admin(ffi::Admin* admin) noexcept; + + void Destroy() noexcept; + ffi::Admin* admin_{nullptr}; +}; + +class Table { +public: + Table() noexcept; + ~Table() noexcept; + + Table(const Table&) = delete; + Table& operator=(const Table&) = delete; + Table(Table&& other) noexcept; + Table& operator=(Table&& other) noexcept; + + bool Available() const; + + Result NewAppendWriter(AppendWriter& out); + Result NewLogScanner(LogScanner& out); + Result NewLogScannerWithProjection(const std::vector& column_indices, LogScanner& out); + + TableInfo GetTableInfo() const; + TablePath GetTablePath() const; + bool HasPrimaryKey() const; + +private: + friend class Connection; + Table(ffi::Table* table) noexcept; + + void Destroy() noexcept; + ffi::Table* table_{nullptr}; +}; + +class AppendWriter { +public: + AppendWriter() noexcept; + ~AppendWriter() noexcept; + + AppendWriter(const AppendWriter&) = delete; + AppendWriter& operator=(const AppendWriter&) = delete; + AppendWriter(AppendWriter&& other) noexcept; + AppendWriter& operator=(AppendWriter&& other) noexcept; + + bool Available() const; + + Result Append(const GenericRow& row); + Result Flush(); + +private: + friend class Table; + AppendWriter(ffi::AppendWriter* writer) noexcept; + + void Destroy() noexcept; + ffi::AppendWriter* writer_{nullptr}; +}; + +class LogScanner { +public: + LogScanner() noexcept; + ~LogScanner() noexcept; + + LogScanner(const LogScanner&) = delete; + LogScanner& operator=(const LogScanner&) = delete; + LogScanner(LogScanner&& other) noexcept; + LogScanner& operator=(LogScanner&& other) noexcept; + + bool Available() const; + + Result Subscribe(int32_t bucket_id, int64_t start_offset); + Result Poll(int64_t timeout_ms, ScanRecords& out); + +private: + friend class Table; + LogScanner(ffi::LogScanner* scanner) noexcept; + + void Destroy() noexcept; + ffi::LogScanner* scanner_{nullptr}; +}; + +} // namespace fluss diff --git a/bindings/cpp/src/admin.cpp b/bindings/cpp/src/admin.cpp new file mode 100644 index 0000000..f6997a6 --- /dev/null +++ b/bindings/cpp/src/admin.cpp @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "fluss.hpp" +#include "lib.rs.h" +#include "ffi_converter.hpp" +#include "rust/cxx.h" + +namespace fluss { + +Admin::Admin() noexcept = default; + +Admin::Admin(ffi::Admin* admin) noexcept : admin_(admin) {} + +Admin::~Admin() noexcept { Destroy(); } + +void Admin::Destroy() noexcept { + if (admin_) { + ffi::delete_admin(admin_); + admin_ = nullptr; + } +} + +Admin::Admin(Admin&& other) noexcept : admin_(other.admin_) { + other.admin_ = nullptr; +} + +Admin& Admin::operator=(Admin&& other) noexcept { + if (this != &other) { + Destroy(); + admin_ = other.admin_; + other.admin_ = nullptr; + } + return *this; +} + +bool Admin::Available() const { return admin_ != nullptr; } + +Result Admin::CreateTable(const TablePath& table_path, + const TableDescriptor& descriptor, + bool ignore_if_exists) { + if (!Available()) { + return utils::make_error(1, "Admin not available"); + } + + auto ffi_path = utils::to_ffi_table_path(table_path); + auto ffi_desc = utils::to_ffi_table_descriptor(descriptor); + + auto ffi_result = admin_->create_table(ffi_path, ffi_desc, ignore_if_exists); + return utils::from_ffi_result(ffi_result); +} + +Result Admin::GetTable(const TablePath& table_path, TableInfo& out) { + if (!Available()) { + return utils::make_error(1, "Admin not available"); + } + + auto ffi_path = utils::to_ffi_table_path(table_path); + auto ffi_result = admin_->get_table_info(ffi_path); + + auto result = utils::from_ffi_result(ffi_result.result); + if (result.Ok()) { + out = utils::from_ffi_table_info(ffi_result.table_info); + } + + return result; +} + +Result Admin::GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& out) { + if (!Available()) { + return utils::make_error(1, "Admin not available"); + } + + auto ffi_path = utils::to_ffi_table_path(table_path); + auto ffi_result = admin_->get_latest_lake_snapshot(ffi_path); + + auto result = utils::from_ffi_result(ffi_result.result); + if (result.Ok()) { + out = utils::from_ffi_lake_snapshot(ffi_result.lake_snapshot); + } + + return result; +} + +} // namespace fluss diff --git a/bindings/cpp/src/connection.cpp b/bindings/cpp/src/connection.cpp new file mode 100644 index 0000000..ea884cd --- /dev/null +++ b/bindings/cpp/src/connection.cpp @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "fluss.hpp" +#include "lib.rs.h" +#include "ffi_converter.hpp" +#include "rust/cxx.h" + +namespace fluss { + +Connection::Connection() noexcept = default; + +Connection::~Connection() noexcept { Destroy(); } + +void Connection::Destroy() noexcept { + if (conn_) { + ffi::delete_connection(conn_); + conn_ = nullptr; + } +} + +Connection::Connection(Connection&& other) noexcept : conn_(other.conn_) { + other.conn_ = nullptr; +} + +Connection& Connection::operator=(Connection&& other) noexcept { + if (this != &other) { + Destroy(); + conn_ = other.conn_; + other.conn_ = nullptr; + } + return *this; +} + +Result Connection::Connect(const std::string& bootstrap_server, Connection& out) { + try { + out.conn_ = ffi::new_connection(bootstrap_server); + return utils::make_ok(); + } catch (const rust::Error& e) { + return utils::make_error(1, e.what()); + } catch (const std::exception& e) { + return utils::make_error(1, e.what()); + } +} + +bool Connection::Available() const { return conn_ != nullptr; } + +Result Connection::GetAdmin(Admin& out) { + if (!Available()) { + return utils::make_error(1, "Connection not available"); + } + + try { + out.admin_ = conn_->get_admin(); + return utils::make_ok(); + } catch (const rust::Error& e) { + return utils::make_error(1, e.what()); + } catch (const std::exception& e) { + return utils::make_error(1, e.what()); + } +} + +Result Connection::GetTable(const TablePath& table_path, Table& out) { + if (!Available()) { + return utils::make_error(1, "Connection not available"); + } + + try { + auto ffi_path = utils::to_ffi_table_path(table_path); + out.table_ = conn_->get_table(ffi_path); + return utils::make_ok(); + } catch (const rust::Error& e) { + return utils::make_error(1, e.what()); + } catch (const std::exception& e) { + return utils::make_error(1, e.what()); + } +} + +} // namespace fluss diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp new file mode 100644 index 0000000..4413662 --- /dev/null +++ b/bindings/cpp/src/ffi_converter.hpp @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include "fluss.hpp" +#include "lib.rs.h" + +namespace fluss { +namespace utils { + +inline Result make_error(int32_t code, std::string msg) { + return Result{code, std::move(msg)}; +} + +inline Result make_ok() { + return Result{0, {}}; +} + +inline Result from_ffi_result(const ffi::FfiResult& ffi_result) { + return Result{ffi_result.error_code, std::string(ffi_result.error_message)}; +} + +inline ffi::FfiTablePath to_ffi_table_path(const TablePath& path) { + ffi::FfiTablePath ffi_path; + ffi_path.database_name = rust::String(path.database_name); + ffi_path.table_name = rust::String(path.table_name); + return ffi_path; +} + +inline ffi::FfiColumn to_ffi_column(const Column& col) { + ffi::FfiColumn ffi_col; + ffi_col.name = rust::String(col.name); + ffi_col.data_type = static_cast(col.data_type); + ffi_col.comment = rust::String(col.comment); + return ffi_col; +} + +inline ffi::FfiSchema to_ffi_schema(const Schema& schema) { + ffi::FfiSchema ffi_schema; + + rust::Vec cols; + for (const auto& col : schema.columns) { + cols.push_back(to_ffi_column(col)); + } + ffi_schema.columns = std::move(cols); + + rust::Vec pks; + for (const auto& pk : schema.primary_keys) { + pks.push_back(rust::String(pk)); + } + ffi_schema.primary_keys = std::move(pks); + + return ffi_schema; +} + +inline ffi::FfiTableDescriptor to_ffi_table_descriptor(const TableDescriptor& desc) { + ffi::FfiTableDescriptor ffi_desc; + + ffi_desc.schema = to_ffi_schema(desc.schema); + + rust::Vec partition_keys; + for (const auto& pk : desc.partition_keys) { + partition_keys.push_back(rust::String(pk)); + } + ffi_desc.partition_keys = std::move(partition_keys); + + ffi_desc.bucket_count = desc.bucket_count; + + rust::Vec bucket_keys; + for (const auto& bk : desc.bucket_keys) { + bucket_keys.push_back(rust::String(bk)); + } + ffi_desc.bucket_keys = std::move(bucket_keys); + + rust::Vec props; + for (const auto& [k, v] : desc.properties) { + ffi::HashMapValue prop; + prop.key = rust::String(k); + prop.value = rust::String(v); + props.push_back(prop); + } + ffi_desc.properties = std::move(props); + + ffi_desc.comment = rust::String(desc.comment); + + return ffi_desc; +} + +inline ffi::FfiDatum to_ffi_datum(const Datum& datum) { + ffi::FfiDatum ffi_datum; + ffi_datum.datum_type = static_cast(datum.type); + ffi_datum.bool_val = datum.bool_val; + ffi_datum.i32_val = datum.i32_val; + ffi_datum.i64_val = datum.i64_val; + ffi_datum.f32_val = datum.f32_val; + ffi_datum.f64_val = datum.f64_val; + ffi_datum.string_val = rust::String(datum.string_val); + + rust::Vec bytes; + for (auto b : datum.bytes_val) { + bytes.push_back(b); + } + ffi_datum.bytes_val = std::move(bytes); + + return ffi_datum; +} + +inline ffi::FfiGenericRow to_ffi_generic_row(const GenericRow& row) { + ffi::FfiGenericRow ffi_row; + + rust::Vec fields; + for (const auto& field : row.fields) { + fields.push_back(to_ffi_datum(field)); + } + ffi_row.fields = std::move(fields); + + return ffi_row; +} + +inline Column from_ffi_column(const ffi::FfiColumn& ffi_col) { + return Column{ + std::string(ffi_col.name), + static_cast(ffi_col.data_type), + std::string(ffi_col.comment)}; +} + +inline Schema from_ffi_schema(const ffi::FfiSchema& ffi_schema) { + Schema schema; + + for (const auto& col : ffi_schema.columns) { + schema.columns.push_back(from_ffi_column(col)); + } + + for (const auto& pk : ffi_schema.primary_keys) { + schema.primary_keys.push_back(std::string(pk)); + } + + return schema; +} + +inline TableInfo from_ffi_table_info(const ffi::FfiTableInfo& ffi_info) { + TableInfo info; + + info.table_id = ffi_info.table_id; + info.schema_id = ffi_info.schema_id; + info.table_path = TablePath{ + std::string(ffi_info.table_path.database_name), + std::string(ffi_info.table_path.table_name)}; + info.created_time = ffi_info.created_time; + info.modified_time = ffi_info.modified_time; + + for (const auto& pk : ffi_info.primary_keys) { + info.primary_keys.push_back(std::string(pk)); + } + + for (const auto& bk : ffi_info.bucket_keys) { + info.bucket_keys.push_back(std::string(bk)); + } + + for (const auto& pk : ffi_info.partition_keys) { + info.partition_keys.push_back(std::string(pk)); + } + + info.num_buckets = ffi_info.num_buckets; + info.has_primary_key = ffi_info.has_primary_key; + info.is_partitioned = ffi_info.is_partitioned; + + for (const auto& prop : ffi_info.properties) { + info.properties[std::string(prop.key)] = std::string(prop.value); + } + + info.comment = std::string(ffi_info.comment); + info.schema = from_ffi_schema(ffi_info.schema); + + return info; +} + +inline Datum from_ffi_datum(const ffi::FfiDatum& ffi_datum) { + Datum datum; + datum.type = static_cast(ffi_datum.datum_type); + datum.bool_val = ffi_datum.bool_val; + datum.i32_val = ffi_datum.i32_val; + datum.i64_val = ffi_datum.i64_val; + datum.f32_val = ffi_datum.f32_val; + datum.f64_val = ffi_datum.f64_val; + datum.string_val = std::string(ffi_datum.string_val); + + for (auto b : ffi_datum.bytes_val) { + datum.bytes_val.push_back(b); + } + + return datum; +} + +inline GenericRow from_ffi_generic_row(const ffi::FfiGenericRow& ffi_row) { + GenericRow row; + + for (const auto& field : ffi_row.fields) { + row.fields.push_back(from_ffi_datum(field)); + } + + return row; +} + +inline ScanRecord from_ffi_scan_record(const ffi::FfiScanRecord& ffi_record) { + return ScanRecord{ + ffi_record.offset, + ffi_record.timestamp, + from_ffi_generic_row(ffi_record.row)}; +} + +inline ScanRecords from_ffi_scan_records(const ffi::FfiScanRecords& ffi_records) { + ScanRecords records; + + for (const auto& record : ffi_records.records) { + records.records.push_back(from_ffi_scan_record(record)); + } + + return records; +} + +inline LakeSnapshot from_ffi_lake_snapshot(const ffi::FfiLakeSnapshot& ffi_snapshot) { + LakeSnapshot snapshot; + snapshot.snapshot_id = ffi_snapshot.snapshot_id; + + for (const auto& offset : ffi_snapshot.bucket_offsets) { + snapshot.bucket_offsets.push_back(BucketOffset{ + offset.table_id, + offset.partition_id, + offset.bucket_id, + offset.offset}); + } + + return snapshot; +} + +} // namespace utils +} // namespace fluss diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs new file mode 100644 index 0000000..b6338ca --- /dev/null +++ b/bindings/cpp/src/lib.rs @@ -0,0 +1,509 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod types; + +use std::sync::{Arc, LazyLock}; +use std::time::Duration; + +use fluss as fcore; + +static RUNTIME: LazyLock = LazyLock::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() +}); + +#[cxx::bridge(namespace = "fluss::ffi")] +mod ffi { + struct HashMapValue { + key: String, + value: String, + } + + struct FfiResult { + error_code: i32, + error_message: String, + } + + struct FfiTablePath { + database_name: String, + table_name: String, + } + + struct FfiColumn { + name: String, + data_type: i32, + comment: String, + } + + struct FfiSchema { + columns: Vec, + primary_keys: Vec, + } + + struct FfiTableDescriptor { + schema: FfiSchema, + partition_keys: Vec, + bucket_count: i32, + bucket_keys: Vec, + properties: Vec, + comment: String, + } + + struct FfiTableInfo { + table_id: i64, + schema_id: i32, + table_path: FfiTablePath, + created_time: i64, + modified_time: i64, + primary_keys: Vec, + bucket_keys: Vec, + partition_keys: Vec, + num_buckets: i32, + has_primary_key: bool, + is_partitioned: bool, + properties: Vec, + comment: String, + schema: FfiSchema, + } + + struct FfiTableInfoResult { + result: FfiResult, + table_info: FfiTableInfo, + } + + struct FfiDatum { + datum_type: i32, + bool_val: bool, + i32_val: i32, + i64_val: i64, + f32_val: f32, + f64_val: f64, + string_val: String, + bytes_val: Vec, + } + + struct FfiGenericRow { + fields: Vec, + } + + struct FfiScanRecord { + offset: i64, + timestamp: i64, + row: FfiGenericRow, + } + + struct FfiScanRecords { + records: Vec, + } + + struct FfiScanRecordsResult { + result: FfiResult, + scan_records: FfiScanRecords, + } + + struct FfiLakeSnapshot { + snapshot_id: i64, + bucket_offsets: Vec, + } + + struct FfiBucketOffset { + table_id: i64, + partition_id: i64, + bucket_id: i32, + offset: i64, + } + + struct FfiLakeSnapshotResult { + result: FfiResult, + lake_snapshot: FfiLakeSnapshot, + } + + extern "Rust" { + type Connection; + type Admin; + type Table; + type AppendWriter; + type LogScanner; + + // Connection + fn new_connection(bootstrap_server: &str) -> Result<*mut Connection>; + unsafe fn delete_connection(conn: *mut Connection); + fn get_admin(self: &Connection) -> Result<*mut Admin>; + fn get_table(self: &Connection, table_path: &FfiTablePath) -> Result<*mut Table>; + + // Admin + unsafe fn delete_admin(admin: *mut Admin); + fn create_table( + self: &Admin, + table_path: &FfiTablePath, + descriptor: &FfiTableDescriptor, + ignore_if_exists: bool, + ) -> FfiResult; + fn get_table_info(self: &Admin, table_path: &FfiTablePath) -> FfiTableInfoResult; + fn get_latest_lake_snapshot(self: &Admin, table_path: &FfiTablePath) + -> FfiLakeSnapshotResult; + + // Table + unsafe fn delete_table(table: *mut Table); + fn new_append_writer(self: &Table) -> Result<*mut AppendWriter>; + fn new_log_scanner(self: &Table) -> Result<*mut LogScanner>; + fn new_log_scanner_with_projection(self: &Table, column_indices: Vec) -> Result<*mut LogScanner>; + fn get_table_info_from_table(self: &Table) -> FfiTableInfo; + fn get_table_path(self: &Table) -> FfiTablePath; + fn has_primary_key(self: &Table) -> bool; + + // AppendWriter + unsafe fn delete_append_writer(writer: *mut AppendWriter); + fn append(self: &mut AppendWriter, row: &FfiGenericRow) -> FfiResult; + fn flush(self: &mut AppendWriter) -> FfiResult; + + // LogScanner + unsafe fn delete_log_scanner(scanner: *mut LogScanner); + fn subscribe(self: &LogScanner, bucket_id: i32, start_offset: i64) -> FfiResult; + fn poll(self: &LogScanner, timeout_ms: i64) -> FfiScanRecordsResult; + } +} + +pub struct Connection { + inner: Arc, + #[allow(dead_code)] + metadata: Option>, +} + +pub struct Admin { + inner: fcore::client::FlussAdmin, +} + +pub struct Table { + connection: Arc, + metadata: Arc, + table_info: fcore::metadata::TableInfo, + table_path: fcore::metadata::TablePath, + has_pk: bool, +} + +pub struct AppendWriter { + inner: fcore::client::AppendWriter, +} + +pub struct LogScanner { + inner: fcore::client::LogScanner, +} + +fn ok_result() -> ffi::FfiResult { + ffi::FfiResult { + error_code: 0, + error_message: String::new(), + } +} + +fn err_result(code: i32, msg: String) -> ffi::FfiResult { + ffi::FfiResult { + error_code: code, + error_message: msg, + } +} + +// Connection implementation +fn new_connection(bootstrap_server: &str) -> Result<*mut Connection, String> { + let mut config = fcore::config::Config::default(); + config.bootstrap_server = Some(bootstrap_server.to_string()); + + let conn = RUNTIME.block_on(async { fcore::client::FlussConnection::new(config).await }); + + match conn { + Ok(c) => { + let conn = Box::into_raw(Box::new(Connection { + inner: Arc::new(c), + metadata: None, + })); + Ok(conn) + } + Err(e) => Err(format!("Failed to connect: {}", e)), + } +} + +unsafe fn delete_connection(conn: *mut Connection) { + if !conn.is_null() { + unsafe { + drop(Box::from_raw(conn)); + } + } +} + +impl Connection { + fn get_admin(&self) -> Result<*mut Admin, String> { + let admin_result = + RUNTIME.block_on(async { self.inner.get_admin().await }); + + match admin_result { + Ok(admin) => { + let admin = Box::into_raw(Box::new(Admin { inner: admin })); + Ok(admin) + } + Err(e) => Err(format!("Failed to get admin: {}", e)), + } + } + + fn get_table(&self, table_path: &ffi::FfiTablePath) -> Result<*mut Table, String> { + let path = fcore::metadata::TablePath::new( + table_path.database_name.clone(), + table_path.table_name.clone(), + ); + + let table_result = RUNTIME.block_on(async { self.inner.get_table(&path).await }); + + match table_result { + Ok(t) => { + let table = Box::into_raw(Box::new(Table { + connection: self.inner.clone(), + metadata: t.metadata().clone(), + table_info: t.table_info().clone(), + table_path: t.table_path().clone(), + has_pk: t.has_primary_key(), + })); + Ok(table) + } + Err(e) => Err(format!("Failed to get table: {}", e)), + } + } +} + +// Admin implementation +unsafe fn delete_admin(admin: *mut Admin) { + if !admin.is_null() { + unsafe { + drop(Box::from_raw(admin)); + } + } +} + +impl Admin { + fn create_table( + &self, + table_path: &ffi::FfiTablePath, + descriptor: &ffi::FfiTableDescriptor, + ignore_if_exists: bool, + ) -> ffi::FfiResult { + let path = fcore::metadata::TablePath::new( + table_path.database_name.clone(), + table_path.table_name.clone(), + ); + + let core_descriptor = match types::ffi_descriptor_to_core(descriptor) { + Ok(d) => d, + Err(e) => return err_result(1, e.to_string()), + }; + + let result = RUNTIME.block_on(async { + self.inner + .create_table(&path, &core_descriptor, ignore_if_exists) + .await + }); + + match result { + Ok(_) => ok_result(), + Err(e) => err_result(2, e.to_string()), + } + } + + fn get_table_info(&self, table_path: &ffi::FfiTablePath) -> ffi::FfiTableInfoResult { + let path = fcore::metadata::TablePath::new( + table_path.database_name.clone(), + table_path.table_name.clone(), + ); + + let result = RUNTIME.block_on(async { self.inner.get_table(&path).await }); + + match result { + Ok(info) => ffi::FfiTableInfoResult { + result: ok_result(), + table_info: types::core_table_info_to_ffi(&info), + }, + Err(e) => ffi::FfiTableInfoResult { + result: err_result(1, e.to_string()), + table_info: types::empty_table_info(), + }, + } + } + + fn get_latest_lake_snapshot( + &self, + table_path: &ffi::FfiTablePath, + ) -> ffi::FfiLakeSnapshotResult { + let path = fcore::metadata::TablePath::new( + table_path.database_name.clone(), + table_path.table_name.clone(), + ); + + let result = RUNTIME.block_on(async { self.inner.get_latest_lake_snapshot(&path).await }); + + match result { + Ok(snapshot) => ffi::FfiLakeSnapshotResult { + result: ok_result(), + lake_snapshot: types::core_lake_snapshot_to_ffi(&snapshot), + }, + Err(e) => ffi::FfiLakeSnapshotResult { + result: err_result(1, e.to_string()), + lake_snapshot: ffi::FfiLakeSnapshot { + snapshot_id: -1, + bucket_offsets: vec![], + }, + }, + } + } +} + +// Table implementation +unsafe fn delete_table(table: *mut Table) { + if !table.is_null() { + unsafe { + drop(Box::from_raw(table)); + } + } +} + +impl Table { + fn new_append_writer(&self) -> Result<*mut AppendWriter, String> { + let _enter = RUNTIME.enter(); + + let fluss_table = + fcore::client::FlussTable::new(&self.connection, self.metadata.clone(), self.table_info.clone()); + + let table_append = match fluss_table.new_append() { + Ok(a) => a, + Err(e) => { + return Err(format!("Failed to create append: {}", e)) + } + }; + + let writer = table_append.create_writer(); + let writer = Box::into_raw(Box::new(AppendWriter { inner: writer })); + Ok(writer) + } + + fn new_log_scanner(&self) -> Result<*mut LogScanner, String> { + let fluss_table = + fcore::client::FlussTable::new(&self.connection, self.metadata.clone(), self.table_info.clone()); + + let scanner = fluss_table.new_scan().create_log_scanner(); + let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner })); + Ok(scanner) + } + + fn new_log_scanner_with_projection(&self, column_indices: Vec) -> Result<*mut LogScanner, String> { + let fluss_table = + fcore::client::FlussTable::new(&self.connection, self.metadata.clone(), self.table_info.clone()); + + let scan = fluss_table.new_scan(); + let scan = match scan.project(&column_indices) { + Ok(s) => s, + Err(e) => return Err(format!("Failed to project columns: {}", e)), + }; + let scanner = scan.create_log_scanner(); + let scanner = Box::into_raw(Box::new(LogScanner { inner: scanner })); + Ok(scanner) + } + + fn get_table_info_from_table(&self) -> ffi::FfiTableInfo { + types::core_table_info_to_ffi(&self.table_info) + } + + fn get_table_path(&self) -> ffi::FfiTablePath { + ffi::FfiTablePath { + database_name: self.table_path.database().to_string(), + table_name: self.table_path.table().to_string(), + } + } + + fn has_primary_key(&self) -> bool { + self.has_pk + } +} + +// AppendWriter implementation +unsafe fn delete_append_writer(writer: *mut AppendWriter) { + if !writer.is_null() { + unsafe { + drop(Box::from_raw(writer)); + } + } +} + +impl AppendWriter { + fn append(&mut self, row: &ffi::FfiGenericRow) -> ffi::FfiResult { + let mut owner = types::OwnedRowData::new(); + owner.collect_strings(row); + let generic_row = types::ffi_row_to_core(row, &owner); + + let result = RUNTIME.block_on(async { self.inner.append(generic_row).await }); + + match result { + Ok(_) => ok_result(), + Err(e) => err_result(1, e.to_string()), + } + } + + fn flush(&mut self) -> ffi::FfiResult { + let result = RUNTIME.block_on(async { self.inner.flush().await }); + + match result { + Ok(_) => ok_result(), + Err(e) => err_result(1, e.to_string()), + } + } +} + +// LogScanner implementation +unsafe fn delete_log_scanner(scanner: *mut LogScanner) { + if !scanner.is_null() { + unsafe { + drop(Box::from_raw(scanner)); + } + } +} + +impl LogScanner { + fn subscribe(&self, bucket_id: i32, start_offset: i64) -> ffi::FfiResult { + let result = + RUNTIME.block_on(async { self.inner.subscribe(bucket_id, start_offset).await }); + + match result { + Ok(_) => ok_result(), + Err(e) => err_result(1, e.to_string()), + } + } + + fn poll(&self, timeout_ms: i64) -> ffi::FfiScanRecordsResult { + let timeout = Duration::from_millis(timeout_ms as u64); + let result = RUNTIME.block_on(async { self.inner.poll(timeout).await }); + + match result { + Ok(records) => ffi::FfiScanRecordsResult { + result: ok_result(), + scan_records: types::core_scan_records_to_ffi(&records), + }, + Err(e) => ffi::FfiScanRecordsResult { + result: err_result(1, e.to_string()), + scan_records: ffi::FfiScanRecords { records: vec![] }, + }, + } + } +} diff --git a/bindings/cpp/src/table.cpp b/bindings/cpp/src/table.cpp new file mode 100644 index 0000000..b28b783 --- /dev/null +++ b/bindings/cpp/src/table.cpp @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "fluss.hpp" +#include "lib.rs.h" +#include "ffi_converter.hpp" +#include "rust/cxx.h" + +namespace fluss { + +Table::Table() noexcept = default; + +Table::Table(ffi::Table* table) noexcept : table_(table) {} + +Table::~Table() noexcept { Destroy(); } + +void Table::Destroy() noexcept { + if (table_) { + ffi::delete_table(table_); + table_ = nullptr; + } +} + +Table::Table(Table&& other) noexcept : table_(other.table_) { + other.table_ = nullptr; +} + +Table& Table::operator=(Table&& other) noexcept { + if (this != &other) { + Destroy(); + table_ = other.table_; + other.table_ = nullptr; + } + return *this; +} + +bool Table::Available() const { return table_ != nullptr; } + +Result Table::NewAppendWriter(AppendWriter& out) { + if (!Available()) { + return utils::make_error(1, "Table not available"); + } + + try { + out.writer_ = table_->new_append_writer(); + return utils::make_ok(); + } catch (const rust::Error& e) { + return utils::make_error(1, e.what()); + } catch (const std::exception& e) { + return utils::make_error(1, e.what()); + } +} + +Result Table::NewLogScanner(LogScanner& out) { + if (!Available()) { + return utils::make_error(1, "Table not available"); + } + + try { + out.scanner_ = table_->new_log_scanner(); + return utils::make_ok(); + } catch (const rust::Error& e) { + return utils::make_error(1, e.what()); + } catch (const std::exception& e) { + return utils::make_error(1, e.what()); + } +} + +Result Table::NewLogScannerWithProjection(const std::vector& column_indices, LogScanner& out) { + if (!Available()) { + return utils::make_error(1, "Table not available"); + } + + try { + rust::Vec rust_indices; + for (size_t idx : column_indices) { + rust_indices.push_back(idx); + } + out.scanner_ = table_->new_log_scanner_with_projection(std::move(rust_indices)); + return utils::make_ok(); + } catch (const rust::Error& e) { + return utils::make_error(1, e.what()); + } catch (const std::exception& e) { + return utils::make_error(1, e.what()); + } +} + +TableInfo Table::GetTableInfo() const { + if (!Available()) { + return TableInfo{}; + } + auto ffi_info = table_->get_table_info_from_table(); + return utils::from_ffi_table_info(ffi_info); +} + +TablePath Table::GetTablePath() const { + if (!Available()) { + return TablePath{}; + } + auto ffi_path = table_->get_table_path(); + return TablePath{std::string(ffi_path.database_name), std::string(ffi_path.table_name)}; +} + +bool Table::HasPrimaryKey() const { + if (!Available()) { + return false; + } + return table_->has_primary_key(); +} + +// AppendWriter implementation +AppendWriter::AppendWriter() noexcept = default; + +AppendWriter::AppendWriter(ffi::AppendWriter* writer) noexcept : writer_(writer) {} + +AppendWriter::~AppendWriter() noexcept { Destroy(); } + +void AppendWriter::Destroy() noexcept { + if (writer_) { + ffi::delete_append_writer(writer_); + writer_ = nullptr; + } +} + +AppendWriter::AppendWriter(AppendWriter&& other) noexcept : writer_(other.writer_) { + other.writer_ = nullptr; +} + +AppendWriter& AppendWriter::operator=(AppendWriter&& other) noexcept { + if (this != &other) { + Destroy(); + writer_ = other.writer_; + other.writer_ = nullptr; + } + return *this; +} + +bool AppendWriter::Available() const { return writer_ != nullptr; } + +Result AppendWriter::Append(const GenericRow& row) { + if (!Available()) { + return utils::make_error(1, "AppendWriter not available"); + } + + auto ffi_row = utils::to_ffi_generic_row(row); + auto ffi_result = writer_->append(ffi_row); + return utils::from_ffi_result(ffi_result); +} + +Result AppendWriter::Flush() { + if (!Available()) { + return utils::make_error(1, "AppendWriter not available"); + } + + auto ffi_result = writer_->flush(); + return utils::from_ffi_result(ffi_result); +} + +// LogScanner implementation +LogScanner::LogScanner() noexcept = default; + +LogScanner::LogScanner(ffi::LogScanner* scanner) noexcept : scanner_(scanner) {} + +LogScanner::~LogScanner() noexcept { Destroy(); } + +void LogScanner::Destroy() noexcept { + if (scanner_) { + ffi::delete_log_scanner(scanner_); + scanner_ = nullptr; + } +} + +LogScanner::LogScanner(LogScanner&& other) noexcept : scanner_(other.scanner_) { + other.scanner_ = nullptr; +} + +LogScanner& LogScanner::operator=(LogScanner&& other) noexcept { + if (this != &other) { + Destroy(); + scanner_ = other.scanner_; + other.scanner_ = nullptr; + } + return *this; +} + +bool LogScanner::Available() const { return scanner_ != nullptr; } + +Result LogScanner::Subscribe(int32_t bucket_id, int64_t start_offset) { + if (!Available()) { + return utils::make_error(1, "LogScanner not available"); + } + + auto ffi_result = scanner_->subscribe(bucket_id, start_offset); + return utils::from_ffi_result(ffi_result); +} + +Result LogScanner::Poll(int64_t timeout_ms, ScanRecords& out) { + if (!Available()) { + return utils::make_error(1, "LogScanner not available"); + } + + auto ffi_result = scanner_->poll(timeout_ms); + auto result = utils::from_ffi_result(ffi_result.result); + if (!result.Ok()) { + return result; + } + + out = utils::from_ffi_scan_records(ffi_result.scan_records); + return utils::make_ok(); +} + +} // namespace fluss diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs new file mode 100644 index 0000000..6fa181f --- /dev/null +++ b/bindings/cpp/src/types.rs @@ -0,0 +1,498 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::ffi; +use anyhow::{anyhow, Result}; +use arrow::array::{ + Date32Array, LargeBinaryArray, LargeStringArray, Time32MillisecondArray, Time32SecondArray, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, +}; +use arrow::datatypes::{DataType as ArrowDataType, TimeUnit}; +use fluss as fcore; +use fcore::row::InternalRow; + +pub const DATA_TYPE_BOOLEAN: i32 = 1; +pub const DATA_TYPE_TINYINT: i32 = 2; +pub const DATA_TYPE_SMALLINT: i32 = 3; +pub const DATA_TYPE_INT: i32 = 4; +pub const DATA_TYPE_BIGINT: i32 = 5; +pub const DATA_TYPE_FLOAT: i32 = 6; +pub const DATA_TYPE_DOUBLE: i32 = 7; +pub const DATA_TYPE_STRING: i32 = 8; +pub const DATA_TYPE_BYTES: i32 = 9; +pub const DATA_TYPE_DATE: i32 = 10; +pub const DATA_TYPE_TIME: i32 = 11; +pub const DATA_TYPE_TIMESTAMP: i32 = 12; +pub const DATA_TYPE_TIMESTAMP_LTZ: i32 = 13; + +pub const DATUM_TYPE_NULL: i32 = 0; +pub const DATUM_TYPE_BOOL: i32 = 1; +pub const DATUM_TYPE_INT32: i32 = 2; +pub const DATUM_TYPE_INT64: i32 = 3; +pub const DATUM_TYPE_FLOAT32: i32 = 4; +pub const DATUM_TYPE_FLOAT64: i32 = 5; +pub const DATUM_TYPE_STRING: i32 = 6; +pub const DATUM_TYPE_BYTES: i32 = 7; + +fn ffi_data_type_to_core(dt: i32) -> Result { + match dt { + DATA_TYPE_BOOLEAN => Ok(fcore::metadata::DataTypes::boolean()), + DATA_TYPE_TINYINT => Ok(fcore::metadata::DataTypes::tinyint()), + DATA_TYPE_SMALLINT => Ok(fcore::metadata::DataTypes::smallint()), + DATA_TYPE_INT => Ok(fcore::metadata::DataTypes::int()), + DATA_TYPE_BIGINT => Ok(fcore::metadata::DataTypes::bigint()), + DATA_TYPE_FLOAT => Ok(fcore::metadata::DataTypes::float()), + DATA_TYPE_DOUBLE => Ok(fcore::metadata::DataTypes::double()), + DATA_TYPE_STRING => Ok(fcore::metadata::DataTypes::string()), + DATA_TYPE_BYTES => Ok(fcore::metadata::DataTypes::bytes()), + DATA_TYPE_DATE => Ok(fcore::metadata::DataTypes::date()), + DATA_TYPE_TIME => Ok(fcore::metadata::DataTypes::time()), + DATA_TYPE_TIMESTAMP => Ok(fcore::metadata::DataTypes::timestamp()), + DATA_TYPE_TIMESTAMP_LTZ => Ok(fcore::metadata::DataTypes::timestamp_ltz()), + _ => Err(anyhow!("Unknown data type: {}", dt)), + } +} + +fn core_data_type_to_ffi(dt: &fcore::metadata::DataType) -> i32 { + match dt { + fcore::metadata::DataType::Boolean(_) => DATA_TYPE_BOOLEAN, + fcore::metadata::DataType::TinyInt(_) => DATA_TYPE_TINYINT, + fcore::metadata::DataType::SmallInt(_) => DATA_TYPE_SMALLINT, + fcore::metadata::DataType::Int(_) => DATA_TYPE_INT, + fcore::metadata::DataType::BigInt(_) => DATA_TYPE_BIGINT, + fcore::metadata::DataType::Float(_) => DATA_TYPE_FLOAT, + fcore::metadata::DataType::Double(_) => DATA_TYPE_DOUBLE, + fcore::metadata::DataType::String(_) => DATA_TYPE_STRING, + fcore::metadata::DataType::Bytes(_) => DATA_TYPE_BYTES, + fcore::metadata::DataType::Date(_) => DATA_TYPE_DATE, + fcore::metadata::DataType::Time(_) => DATA_TYPE_TIME, + fcore::metadata::DataType::Timestamp(_) => DATA_TYPE_TIMESTAMP, + fcore::metadata::DataType::TimestampLTz(_) => DATA_TYPE_TIMESTAMP_LTZ, + _ => 0, + } +} + +pub fn ffi_descriptor_to_core( + descriptor: &ffi::FfiTableDescriptor, +) -> Result { + let mut schema_builder = fcore::metadata::Schema::builder(); + + for col in &descriptor.schema.columns { + let dt = ffi_data_type_to_core(col.data_type)?; + schema_builder = schema_builder.column(&col.name, dt); + if !col.comment.is_empty() { + schema_builder = schema_builder.with_comment(&col.comment); + } + } + + if !descriptor.schema.primary_keys.is_empty() { + schema_builder = schema_builder.primary_key(descriptor.schema.primary_keys.clone()); + } + + let schema = schema_builder.build()?; + + let mut builder = fcore::metadata::TableDescriptor::builder() + .schema(schema) + .partitioned_by(descriptor.partition_keys.clone()); + + if descriptor.bucket_count > 0 { + builder = builder.distributed_by(Some(descriptor.bucket_count), descriptor.bucket_keys.clone()); + } else { + builder = builder.distributed_by(None, descriptor.bucket_keys.clone()); + } + + for prop in &descriptor.properties { + builder = builder.property(&prop.key, &prop.value); + } + + if !descriptor.comment.is_empty() { + builder = builder.comment(&descriptor.comment); + } + + Ok(builder.build()?) +} + +pub fn core_table_info_to_ffi(info: &fcore::metadata::TableInfo) -> ffi::FfiTableInfo { + let schema = info.get_schema(); + let columns: Vec = schema + .columns() + .iter() + .map(|col| ffi::FfiColumn { + name: col.name().to_string(), + data_type: core_data_type_to_ffi(col.data_type()), + comment: col.comment().unwrap_or("").to_string(), + }) + .collect(); + + let primary_keys: Vec = schema + .primary_key() + .map(|pk| pk.column_names().to_vec()) + .unwrap_or_default(); + + let properties: Vec = info + .get_properties() + .iter() + .map(|(k, v)| ffi::HashMapValue { + key: k.clone(), + value: v.clone(), + }) + .collect(); + + ffi::FfiTableInfo { + table_id: info.get_table_id(), + schema_id: info.get_schema_id(), + table_path: ffi::FfiTablePath { + database_name: info.get_table_path().database().to_string(), + table_name: info.get_table_path().table().to_string(), + }, + created_time: info.get_created_time(), + modified_time: info.get_modified_time(), + primary_keys: info.get_primary_keys().clone(), + bucket_keys: info.get_bucket_keys().to_vec(), + partition_keys: info.get_partition_keys().to_vec(), + num_buckets: info.get_num_buckets(), + has_primary_key: info.has_primary_key(), + is_partitioned: info.is_partitioned(), + properties, + comment: info.get_comment().unwrap_or("").to_string(), + schema: ffi::FfiSchema { + columns, + primary_keys, + }, + } +} + +pub fn empty_table_info() -> ffi::FfiTableInfo { + ffi::FfiTableInfo { + table_id: 0, + schema_id: 0, + table_path: ffi::FfiTablePath { + database_name: String::new(), + table_name: String::new(), + }, + created_time: 0, + modified_time: 0, + primary_keys: vec![], + bucket_keys: vec![], + partition_keys: vec![], + num_buckets: 0, + has_primary_key: false, + is_partitioned: false, + properties: vec![], + comment: String::new(), + schema: ffi::FfiSchema { + columns: vec![], + primary_keys: vec![], + }, + } +} + +pub struct OwnedRowData { + strings: Vec, +} + +impl OwnedRowData { + pub fn new() -> Self { + Self { strings: Vec::new() } + } + + pub fn collect_strings(&mut self, row: &ffi::FfiGenericRow) { + for field in &row.fields { + if field.datum_type == DATUM_TYPE_STRING { + self.strings.push(field.string_val.to_string()); + } + } + } + + pub fn get_strings(&self) -> &[String] { + &self.strings + } +} + +pub fn ffi_row_to_core<'a>( + row: &ffi::FfiGenericRow, + owner: &'a OwnedRowData, +) -> fcore::row::GenericRow<'a> { + use fcore::row::{Blob, Datum, F32, F64}; + + let mut generic_row = fcore::row::GenericRow::new(); + let mut string_idx = 0; + + for (idx, field) in row.fields.iter().enumerate() { + let datum = match field.datum_type { + DATUM_TYPE_NULL => Datum::Null, + DATUM_TYPE_BOOL => Datum::Bool(field.bool_val), + DATUM_TYPE_INT32 => Datum::Int32(field.i32_val), + DATUM_TYPE_INT64 => Datum::Int64(field.i64_val), + DATUM_TYPE_FLOAT32 => Datum::Float32(F32::from(field.f32_val)), + DATUM_TYPE_FLOAT64 => Datum::Float64(F64::from(field.f64_val)), + DATUM_TYPE_STRING => { + let str_ref = owner.get_strings()[string_idx].as_str(); + string_idx += 1; + Datum::String(str_ref) + } + DATUM_TYPE_BYTES => Datum::Blob(Blob::from(field.bytes_val.clone())), + _ => Datum::Null, + }; + generic_row.set_field(idx, datum); + } + + generic_row +} + +pub fn core_scan_records_to_ffi(records: &fcore::record::ScanRecords) -> ffi::FfiScanRecords { + let mut ffi_records = Vec::new(); + + // Iterate over all buckets and their records + for bucket_records in records.records_by_buckets().values() { + for record in bucket_records { + let row = record.row(); + let fields = core_row_to_ffi_fields(row); + + ffi_records.push(ffi::FfiScanRecord { + offset: record.offset(), + timestamp: record.timestamp(), + row: ffi::FfiGenericRow { fields }, + }); + } + } + + ffi::FfiScanRecords { records: ffi_records } +} + +fn core_row_to_ffi_fields(row: &fcore::row::ColumnarRow) -> Vec { + fn new_datum(datum_type: i32) -> ffi::FfiDatum { + ffi::FfiDatum { + datum_type, + bool_val: false, + i32_val: 0, + i64_val: 0, + f32_val: 0.0, + f64_val: 0.0, + string_val: String::new(), + bytes_val: vec![], + } + } + + let record_batch = row.get_record_batch(); + let schema = record_batch.schema(); + let row_id = row.get_row_id(); + + let mut fields = Vec::with_capacity(schema.fields().len()); + + for (i, field) in schema.fields().iter().enumerate() { + if row.is_null_at(i) { + fields.push(new_datum(DATUM_TYPE_NULL)); + continue; + } + + let datum = match field.data_type() { + ArrowDataType::Boolean => { + let mut datum = new_datum(DATUM_TYPE_BOOL); + datum.bool_val = row.get_boolean(i); + datum + } + ArrowDataType::Int8 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_byte(i) as i32; + datum + } + ArrowDataType::Int16 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_short(i) as i32; + datum + } + ArrowDataType::Int32 => { + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = row.get_int(i); + datum + } + ArrowDataType::Int64 => { + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = row.get_long(i); + datum + } + ArrowDataType::Float32 => { + let mut datum = new_datum(DATUM_TYPE_FLOAT32); + datum.f32_val = row.get_float(i); + datum + } + ArrowDataType::Float64 => { + let mut datum = new_datum(DATUM_TYPE_FLOAT64); + datum.f64_val = row.get_double(i); + datum + } + ArrowDataType::Utf8 => { + let mut datum = new_datum(DATUM_TYPE_STRING); + datum.string_val = row.get_string(i).to_string(); + datum + } + ArrowDataType::LargeUtf8 => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("LargeUtf8 column expected"); + let mut datum = new_datum(DATUM_TYPE_STRING); + datum.string_val = array.value(row_id).to_string(); + datum + } + ArrowDataType::Binary => { + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = row.get_bytes(i); + datum + } + ArrowDataType::FixedSizeBinary(len) => { + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = row.get_binary(i, *len as usize); + datum + } + ArrowDataType::LargeBinary => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("LargeBinary column expected"); + let mut datum = new_datum(DATUM_TYPE_BYTES); + datum.bytes_val = array.value(row_id).to_vec(); + datum + } + ArrowDataType::Date32 => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Date32 column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + ArrowDataType::Timestamp(unit, _) => match unit { + TimeUnit::Second => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Timestamp(second) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Millisecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Timestamp(millisecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Microsecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Timestamp(microsecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Nanosecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Timestamp(nanosecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + }, + ArrowDataType::Time32(unit) => match unit { + TimeUnit::Second => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Time32(second) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + TimeUnit::Millisecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Time32(millisecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT32); + datum.i32_val = array.value(row_id); + datum + } + _ => panic!("Unsupported Time32 unit for column {}", i), + }, + ArrowDataType::Time64(unit) => match unit { + TimeUnit::Microsecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Time64(microsecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + TimeUnit::Nanosecond => { + let array = record_batch + .column(i) + .as_any() + .downcast_ref::() + .expect("Time64(nanosecond) column expected"); + let mut datum = new_datum(DATUM_TYPE_INT64); + datum.i64_val = array.value(row_id); + datum + } + _ => panic!("Unsupported Time64 unit for column {}", i), + }, + other => panic!("Unsupported Arrow data type for column {}: {:?}", i, other), + }; + + fields.push(datum); + } + + fields +} + +pub fn core_lake_snapshot_to_ffi(snapshot: &fcore::metadata::LakeSnapshot) -> ffi::FfiLakeSnapshot { + let bucket_offsets: Vec = snapshot + .table_buckets_offset + .iter() + .map(|(bucket, offset)| ffi::FfiBucketOffset { + table_id: bucket.table_id(), + partition_id: bucket.partition_id().unwrap_or(-1), + bucket_id: bucket.bucket_id(), + offset: *offset, + }) + .collect(); + + ffi::FfiLakeSnapshot { + snapshot_id: snapshot.snapshot_id, + bucket_offsets, + } +} diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 0857496..92f600e 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -18,7 +18,7 @@ use clap::Parser; use serde::{Deserialize, Serialize}; -#[derive(Parser, Debug, Clone, Deserialize, Serialize, Default)] +#[derive(Parser, Debug, Clone, Deserialize, Serialize)] #[command(author, version, about, long_about = None)] pub struct Config { #[arg(long)] @@ -37,3 +37,15 @@ pub struct Config { #[arg(long, default_value_t = 2 * 1024 * 1024)] pub writer_batch_size: i32, } + +impl Default for Config { + fn default() -> Self { + Self { + bootstrap_server: None, + request_max_size: 10 * 1024 * 1024, + writer_acks: String::from("all"), + writer_retries: i32::MAX, + writer_batch_size: 2 * 1024 * 1024, + } + } +}