diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index a2b93300..b5bfe4ea 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -57,11 +57,17 @@ set(PAIMON_COMMON_SRCS common/io/data_output_stream.cpp common/io/memory_segment_output_stream.cpp common/io/offset_input_stream.cpp + common/io/cache/cache.cpp + common/io/cache/cache_key.cpp + common/io/cache/cache_manager.cpp common/logging/logging.cpp common/memory/bytes.cpp common/memory/memory_pool.cpp common/memory/memory_segment.cpp common/memory/memory_segment_utils.cpp + common/memory/memory_slice.cpp + common/memory/memory_slice_input.cpp + common/memory/memory_slice_output.cpp common/metrics/metrics_impl.cpp common/options/memory_size.cpp common/options/time_duration.cpp @@ -90,6 +96,14 @@ set(PAIMON_COMMON_SRCS common/reader/reader_utils.cpp common/reader/complete_row_kind_batch_reader.cpp common/reader/data_evolution_file_reader.cpp + common/sst/block_handle.cpp + common/sst/block_footer.cpp + common/sst/block_iterator.cpp + common/sst/block_trailer.cpp + common/sst/block_reader.cpp + common/sst/block_writer.cpp + common/sst/sst_file_reader.cpp + common/sst/sst_file_writer.cpp common/types/data_field.cpp common/types/data_type.cpp common/types/data_type_json_parser.cpp @@ -97,6 +111,8 @@ set(PAIMON_COMMON_SRCS common/types/row_type.cpp common/utils/arrow/mem_utils.cpp common/utils/binary_row_partition_computer.cpp + common/utils/bit_set.cpp + common/utils/bloom_filter.cpp common/utils/bloom_filter64.cpp common/utils/bucket_id_calculator.cpp common/utils/decimal_utils.cpp @@ -367,6 +383,8 @@ if(PAIMON_BUILD_TESTS) common/utils/concurrent_hash_map_test.cpp common/utils/projected_row_test.cpp common/utils/projected_array_test.cpp + common/utils/bit_set_test.cpp + common/utils/bloom_filter_test.cpp common/utils/bloom_filter64_test.cpp common/utils/xxhash_test.cpp common/utils/bucket_id_calculator_test.cpp @@ -414,6 +432,17 @@ if(PAIMON_BUILD_TESTS) test_utils_static ${GTEST_LINK_TOOLCHAIN}) + add_paimon_test(common_sst_file_format_test + SOURCES + common/sst/sst_file_io_test.cpp + STATIC_LINK_LIBS + paimon_shared + test_utils_static + "-Wl,--whole-archive" + paimon_local_file_system_static + "-Wl,--no-whole-archive" + ${GTEST_LINK_TOOLCHAIN}) + add_paimon_test(core_test SOURCES core/append/append_only_writer_test.cpp diff --git a/src/paimon/common/io/cache/cache.cpp b/src/paimon/common/io/cache/cache.cpp new file mode 100644 index 00000000..7e2e3e1c --- /dev/null +++ b/src/paimon/common/io/cache/cache.cpp @@ -0,0 +1,42 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/io/cache/cache.h" + +namespace paimon { +std::shared_ptr NoCache::Get( + const std::shared_ptr& key, + std::function(const std::shared_ptr&)> supplier) { + return supplier(key); +} + +void NoCache::Put(const std::shared_ptr& key, const std::shared_ptr& value) { + // do nothing +} + +void NoCache::Invalidate(const std::shared_ptr& key) { + // do nothing +} + +void NoCache::InvalidateAll() { + // do nothing +} + +std::unordered_map, std::shared_ptr> NoCache::AsMap() { + return {}; +} + +} // namespace paimon diff --git a/src/paimon/common/io/cache/cache.h b/src/paimon/common/io/cache/cache.h new file mode 100644 index 00000000..0176b960 --- /dev/null +++ b/src/paimon/common/io/cache/cache.h @@ -0,0 +1,71 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include + +#include "paimon/common/io/cache/cache_key.h" +#include "paimon/common/memory/memory_segment.h" +#include "paimon/status.h" + +namespace paimon { +class CacheValue; + +class Cache { + public: + virtual ~Cache() = default; + virtual std::shared_ptr Get( + const std::shared_ptr& key, + std::function(const std::shared_ptr&)> supplier) = 0; + + virtual void Put(const std::shared_ptr& key, + const std::shared_ptr& value) = 0; + + virtual void Invalidate(const std::shared_ptr& key) = 0; + + virtual void InvalidateAll() = 0; + + virtual std::unordered_map, std::shared_ptr> AsMap() = 0; +}; + +class NoCache : public Cache { + public: + std::shared_ptr Get( + const std::shared_ptr& key, + std::function(const std::shared_ptr&)> supplier) + override; + void Put(const std::shared_ptr& key, + const std::shared_ptr& value) override; + void Invalidate(const std::shared_ptr& key) override; + void InvalidateAll() override; + std::unordered_map, std::shared_ptr> AsMap() override; +}; + +class CacheValue { + public: + explicit CacheValue(const std::shared_ptr& segment) : segment_(segment) {} + + std::shared_ptr GetSegment() { + return segment_; + } + + private: + std::shared_ptr segment_; +}; +} // namespace paimon diff --git a/src/paimon/common/io/cache/cache_key.cpp b/src/paimon/common/io/cache/cache_key.cpp new file mode 100644 index 00000000..9865de04 --- /dev/null +++ b/src/paimon/common/io/cache/cache_key.cpp @@ -0,0 +1,53 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/io/cache/cache_key.h" + +namespace paimon { + +std::shared_ptr CacheKey::ForPosition(const std::string& file_path, int64_t position, + int32_t length, bool is_index) { + return std::make_shared(file_path, position, length, is_index); +} + +bool PositionCacheKey::IsIndex() { + return is_index_; +} + +int64_t PositionCacheKey::Position() const { + return position_; +} + +int32_t PositionCacheKey::Length() const { + return length_; +} + +bool PositionCacheKey::operator==(const PositionCacheKey& other) const { + return file_path_ == other.file_path_ && position_ == other.position_ && + + length_ == other.length_ && is_index_ == other.is_index_; +} + +size_t PositionCacheKey::HashCode() const { + size_t seed = 0; + seed ^= std::hash{}(file_path_) + HASH_CONSTANT + (seed << 6) + (seed >> 2); + seed ^= std::hash{}(position_) + HASH_CONSTANT + (seed << 6) + (seed >> 2); + seed ^= std::hash{}(length_) + HASH_CONSTANT + (seed << 6) + (seed >> 2); + seed ^= std::hash{}(is_index_) + HASH_CONSTANT + (seed << 6) + (seed >> 2); + return seed; +} + +} // namespace paimon diff --git a/src/paimon/common/io/cache/cache_key.h b/src/paimon/common/io/cache/cache_key.h new file mode 100644 index 00000000..8e98bf7c --- /dev/null +++ b/src/paimon/common/io/cache/cache_key.h @@ -0,0 +1,68 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include + +#include "paimon/status.h" + +namespace paimon { + +class CacheKey { + public: + static std::shared_ptr ForPosition(const std::string& file_path, int64_t position, + int32_t length, bool is_index); + + public: + virtual ~CacheKey() = default; + + virtual bool IsIndex() = 0; +}; + +class PositionCacheKey : public CacheKey { + public: + PositionCacheKey(const std::string& file_path, int64_t position, int32_t length, bool is_index) + : file_path_(file_path), position_(position), length_(length), is_index_(is_index) {} + + bool IsIndex() override; + + int64_t Position() const; + int32_t Length() const; + + bool operator==(const PositionCacheKey& other) const; + size_t HashCode() const; + + private: + static constexpr uint64_t HASH_CONSTANT = 0x9e3779b97f4a7c15ULL; + + const std::string file_path_; + const int64_t position_; + const int32_t length_; + const bool is_index_; +}; +} // namespace paimon + +namespace std { +template <> +struct hash { + size_t operator()(const paimon::PositionCacheKey& key) const { + return key.HashCode(); + } +}; +} // namespace std diff --git a/src/paimon/common/io/cache/cache_manager.cpp b/src/paimon/common/io/cache/cache_manager.cpp new file mode 100644 index 00000000..14342a5d --- /dev/null +++ b/src/paimon/common/io/cache/cache_manager.cpp @@ -0,0 +1,45 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/io/cache/cache_manager.h" + +namespace paimon { + +std::shared_ptr CacheManager::GetPage( + std::shared_ptr& key, + std::function(const std::shared_ptr&)> reader) { + auto& cache = key->IsIndex() ? index_cache_ : data_cache_; + auto supplier = [=](const std::shared_ptr& k) -> std::shared_ptr { + auto ret = reader(k); + if (!ret.ok()) { + return nullptr; + } + auto segment = ret.value(); + auto ptr = std::make_shared(segment); + return std::make_shared(ptr); + }; + return cache->Get(key, supplier)->GetSegment(); +} + +void CacheManager::InvalidPage(std::shared_ptr& key) { + if (key->IsIndex()) { + index_cache_->Invalidate(key); + } else { + data_cache_->Invalidate(key); + } +} + +} // namespace paimon diff --git a/src/paimon/common/io/cache/cache_manager.h b/src/paimon/common/io/cache/cache_manager.h new file mode 100644 index 00000000..5e92f9cb --- /dev/null +++ b/src/paimon/common/io/cache/cache_manager.h @@ -0,0 +1,48 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include + +#include "paimon/common/io/cache/cache.h" +#include "paimon/common/io/cache/cache_key.h" +#include "paimon/common/memory/memory_segment.h" +#include "paimon/result.h" + +namespace paimon { +class CacheManager { + public: + CacheManager() { + // todo implements cache + data_cache_ = std::make_shared(); + index_cache_ = std::make_shared(); + } + + std::shared_ptr GetPage( + std::shared_ptr& key, + std::function(const std::shared_ptr&)> reader); + + void InvalidPage(std::shared_ptr& key); + + private: + std::shared_ptr data_cache_; + std::shared_ptr index_cache_; +}; + +} // namespace paimon diff --git a/src/paimon/common/memory/memory_slice.cpp b/src/paimon/common/memory/memory_slice.cpp new file mode 100644 index 00000000..6747dbd1 --- /dev/null +++ b/src/paimon/common/memory/memory_slice.cpp @@ -0,0 +1,118 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/memory/memory_slice.h" + +#include "paimon/common/memory/memory_slice_input.h" + +namespace paimon { +std::shared_ptr MemorySlice::Wrap(const std::shared_ptr& bytes) { + auto segment = MemorySegment::Wrap(bytes); + auto ptr = std::make_shared(segment); + return std::make_shared(ptr, 0, ptr->Size()); +} + +std::shared_ptr MemorySlice::Wrap(const std::shared_ptr& segment) { + return std::make_shared(segment, 0, segment->Size()); +} + +MemorySlice::MemorySlice(const std::shared_ptr& segment, int32_t offset, + int32_t length) + : segment_(segment), offset_(offset), length_(length) {} + +std::shared_ptr MemorySlice::Slice(int32_t index, int32_t length) { + if (index == 0 && length == length_) { + return shared_from_this(); + } + return std::make_shared(segment_, offset_ + index, length); +} + +int32_t MemorySlice::Length() const { + return length_; +} + +int32_t MemorySlice::Offset() const { + return offset_; +} + +std::shared_ptr MemorySlice::GetHeapMemory() { + return segment_->GetHeapMemory(); +} + +int8_t MemorySlice::ReadByte(int32_t position) { + return segment_->GetValue(offset_ + position); +} + +int32_t MemorySlice::ReadInt(int32_t position) { + return segment_->GetValue(offset_ + position); +} + +int16_t MemorySlice::ReadShort(int32_t position) { + return segment_->GetValue(offset_ + position); +} + +int64_t MemorySlice::ReadLong(int32_t position) { + return segment_->GetValue(offset_ + position); +} + +std::string_view MemorySlice::ReadStringView() { + auto array = segment_->GetArray(); + return {array->data() + offset_, static_cast(length_)}; +} + +std::shared_ptr MemorySlice::CopyBytes(MemoryPool* pool) { + auto bytes = std::make_shared(length_, pool); + auto target = MemorySegment::Wrap(bytes); + segment_->CopyTo(offset_, &target, 0, length_); + return bytes; +} + +bool MemorySlice::operator<(const MemorySlice& other) const { + return Compare(other) < 0; +} +bool MemorySlice::operator>(const MemorySlice& other) const { + return Compare(other) > 0; +} +bool MemorySlice::operator==(const MemorySlice& other) const { + return Compare(other) == 0; +} +bool MemorySlice::operator!=(const MemorySlice& other) const { + return !(*this == other); +} +bool MemorySlice::operator<=(const MemorySlice& other) const { + return Compare(other) <= 0; +} +bool MemorySlice::operator>=(const MemorySlice& other) const { + return Compare(other) >= 0; +} +std::shared_ptr MemorySlice::ToInput() { + auto self = shared_from_this(); + return std::make_shared(self); +} + +int32_t MemorySlice::Compare(const MemorySlice& other) const { + int32_t len = std::min(length_, other.length_); + for (int32_t i = 0; i < len; ++i) { + auto byte1 = static_cast(segment_->Get(offset_ + i)); + auto byte2 = static_cast(other.segment_->Get(other.offset_ + i)); + if (byte1 != byte2) { + return static_cast(byte1) - static_cast(byte2); + } + } + return length_ - other.length_; +} + +} // namespace paimon diff --git a/src/paimon/common/memory/memory_slice.h b/src/paimon/common/memory/memory_slice.h new file mode 100644 index 00000000..26a7198b --- /dev/null +++ b/src/paimon/common/memory/memory_slice.h @@ -0,0 +1,75 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/common/memory/memory_segment.h" +#include "paimon/memory/bytes.h" +#include "paimon/visibility.h" + +namespace paimon { +class MemoryPool; +class MemorySliceInput; + +/// Slice of a MemorySegment. +class PAIMON_EXPORT MemorySlice : public std::enable_shared_from_this { + public: + static std::shared_ptr Wrap(const std::shared_ptr& bytes); + static std::shared_ptr Wrap(const std::shared_ptr& segment); + + public: + MemorySlice() = default; + + MemorySlice(const std::shared_ptr& segment, int32_t offset, int32_t length); + std::shared_ptr Slice(int32_t index, int32_t length); + + int32_t Length() const; + int32_t Offset() const; + std::shared_ptr GetHeapMemory(); + + int8_t ReadByte(int32_t position); + int32_t ReadInt(int32_t position); + int16_t ReadShort(int32_t position); + int64_t ReadLong(int32_t position); + std::string_view ReadStringView(); + + std::shared_ptr CopyBytes(MemoryPool* pool); + + bool operator<(const MemorySlice& other) const; + bool operator>(const MemorySlice& other) const; + bool operator==(const MemorySlice& other) const; + bool operator!=(const MemorySlice& other) const; + bool operator<=(const MemorySlice& other) const; + bool operator>=(const MemorySlice& other) const; + + std::shared_ptr ToInput(); + + private: + int32_t Compare(const MemorySlice& other) const; + + private: + std::shared_ptr segment_; + int32_t offset_; + int32_t length_; +}; + +} // namespace paimon diff --git a/src/paimon/common/memory/memory_slice_input.cpp b/src/paimon/common/memory/memory_slice_input.cpp new file mode 100644 index 00000000..d78a6172 --- /dev/null +++ b/src/paimon/common/memory/memory_slice_input.cpp @@ -0,0 +1,109 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/memory/memory_slice_input.h" + +#include "paimon/common/utils/math.h" + +namespace paimon { + +MemorySliceInput::MemorySliceInput(const std::shared_ptr& slice) + : slice_(slice), position_(0) {} + +int32_t MemorySliceInput::Position() const { + return position_; +} + +Status MemorySliceInput::SetPosition(int32_t position) { + if (position < 0 || position > slice_->Length()) { + return Status::IndexError("position " + std::to_string(position) + " index out of bounds"); + } + position_ = position; + return Status::OK(); +} + +bool MemorySliceInput::IsReadable() { + return Available() > 0; +} + +int32_t MemorySliceInput::Available() { + return slice_->Length() - position_; +} + +int8_t MemorySliceInput::ReadByte() { + return slice_->ReadByte(position_++); +} + +int8_t MemorySliceInput::ReadUnsignedByte() { + return static_cast(ReadByte() & 0xFF); +} + +int32_t MemorySliceInput::ReadInt() { + int v = slice_->ReadInt(position_); + position_ += 4; + if (NeedSwap()) { + return EndianSwapValue(v); + } + return v; +} + +int64_t MemorySliceInput::ReadLong() { + int64_t v = slice_->ReadLong(position_); + position_ += 8; + if (NeedSwap()) { + return EndianSwapValue(v); + } + return v; +} + +int32_t MemorySliceInput::ReadVarLenInt() { + for (int offset = 0, result = 0; offset < 32; offset += 7) { + int b = ReadUnsignedByte(); + result |= (b & 0x7F) << offset; + if ((b & 0x80) == 0) { + return result; + } + } + throw std::invalid_argument("Malformed integer."); +} + +int64_t MemorySliceInput::ReadVarLenLong() { + int64_t result = 0; + for (int offset = 0; offset < 64; offset += 7) { + int64_t b = ReadUnsignedByte(); + result |= (b & 0x7F) << offset; + if ((b & 0x80) == 0) { + return result; + } + } + throw std::invalid_argument("Malformed long."); +} + +void MemorySliceInput::SetOrder(ByteOrder order) { + byte_order_ = order; +} + +bool MemorySliceInput::NeedSwap() const { + return SystemByteOrder() != byte_order_; +} + +std::shared_ptr MemorySliceInput::ReadSlice(int length) { + auto slice = slice_->Slice(position_, length); + position_ += length; + return slice; +} + +} // namespace paimon diff --git a/src/paimon/common/memory/memory_slice_input.h b/src/paimon/common/memory/memory_slice_input.h new file mode 100644 index 00000000..e98e4155 --- /dev/null +++ b/src/paimon/common/memory/memory_slice_input.h @@ -0,0 +1,67 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/common/memory/memory_slice.h" +#include "paimon/io/byte_order.h" +#include "paimon/status.h" +#include "paimon/visibility.h" + +namespace paimon { +class MemoryPool; + +/// Slice of a MemorySegment. +class PAIMON_EXPORT MemorySliceInput { + public: + MemorySliceInput() = default; + + explicit MemorySliceInput(const std::shared_ptr& slice); + + int32_t Position() const; + Status SetPosition(int32_t position); + + bool IsReadable(); + int32_t Available(); + + int8_t ReadByte(); + int8_t ReadUnsignedByte(); + int32_t ReadInt(); + int64_t ReadLong(); + int32_t ReadVarLenInt(); + int64_t ReadVarLenLong(); + std::shared_ptr ReadSlice(int length); + + void SetOrder(ByteOrder order); + + private: + bool NeedSwap() const; + + private: + std::shared_ptr slice_; + int32_t position_; + + ByteOrder byte_order_ = SystemByteOrder(); +}; + +} // namespace paimon diff --git a/src/paimon/common/memory/memory_slice_output.cpp b/src/paimon/common/memory/memory_slice_output.cpp new file mode 100644 index 00000000..1fefe68d --- /dev/null +++ b/src/paimon/common/memory/memory_slice_output.cpp @@ -0,0 +1,119 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/memory/memory_slice_output.h" + +#include "paimon/common/utils/math.h" +namespace paimon { + +MemorySliceOutput::MemorySliceOutput(int32_t estimated_size, MemoryPool* pool) { + size_ = 0; + pool_ = pool; + segment_ = MemorySegment::Wrap(Bytes::AllocateBytes(estimated_size, pool)); +} + +int32_t MemorySliceOutput::Size() const { + return size_; +} + +void MemorySliceOutput::Reset() { + size_ = 0; +} + +std::unique_ptr MemorySliceOutput::ToSlice() { + auto segment = std::make_shared(segment_); + return std::make_unique(segment, 0, size_); +} + +template +void MemorySliceOutput::WriteValue(T value) { + int32_t write_length = sizeof(T); + EnsureSize(size_ + write_length); + T write_value = value; + if (NeedSwap()) { + write_value = EndianSwapValue(value); + } + segment_.PutValue(size_, write_value); + size_ += write_length; +} + +void MemorySliceOutput::WriteVarLenInt(int32_t value) { + if (value < 0) { + throw std::invalid_argument("negative value: v=" + std::to_string(value)); + } + while ((value & ~0x7F) != 0) { + WriteValue(static_cast((value & 0x7F) | 0x80)); + value >>= 7; + } + WriteValue(static_cast(value)); +} + +void MemorySliceOutput::WriteVarLenLong(int64_t value) { + if (value < 0) { + throw std::invalid_argument("negative value: v=" + std::to_string(value)); + } + while ((value & ~0x7F) != 0) { + WriteValue(static_cast((value & 0x7F) | 0x80)); + value >>= 7; + } + WriteValue(static_cast(value)); +} + +void MemorySliceOutput::WriteBytes(const std::shared_ptr& source) { + WriteBytes(source, 0, source->size()); +} + +void MemorySliceOutput::WriteBytes(const std::shared_ptr& source, int source_index, + int length) { + EnsureSize(size_ + length); + std::string_view sv{source->data(), source->size()}; + segment_.Put(size_, sv, source_index, length); + size_ += length; +} + +void MemorySliceOutput::SetOrder(ByteOrder order) { + byte_order_ = order; +} + +bool MemorySliceOutput::NeedSwap() const { + return SystemByteOrder() != byte_order_; +} + +void MemorySliceOutput::EnsureSize(int size) { + if (size <= segment_.Size()) { + return; + } + int32_t capacity = segment_.Size(); + int min_capacity = segment_.Size() + size; + while (capacity < min_capacity) { + capacity <<= 1; + } + + auto bytes = std::make_shared(capacity, pool_); + MemorySegment new_segment = MemorySegment::Wrap(bytes); + + segment_.CopyTo(0, &new_segment, 0, segment_.Size()); + segment_ = new_segment; +} + +template void MemorySliceOutput::WriteValue(bool); +template void MemorySliceOutput::WriteValue(char); +template void MemorySliceOutput::WriteValue(int8_t); +template void MemorySliceOutput::WriteValue(int16_t); +template void MemorySliceOutput::WriteValue(int32_t); +template void MemorySliceOutput::WriteValue(int64_t); + +} // namespace paimon diff --git a/src/paimon/common/memory/memory_slice_output.h b/src/paimon/common/memory/memory_slice_output.h new file mode 100644 index 00000000..17e1b269 --- /dev/null +++ b/src/paimon/common/memory/memory_slice_output.h @@ -0,0 +1,67 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/common/memory/memory_slice.h" +#include "paimon/io/byte_order.h" +#include "paimon/visibility.h" + +namespace paimon { +class MemoryPool; + +/// Slice of a MemorySegment. +class PAIMON_EXPORT MemorySliceOutput { + public: + MemorySliceOutput() = default; + + MemorySliceOutput(int32_t estimated_size, MemoryPool* pool); + + int32_t Size() const; + void Reset(); + std::unique_ptr ToSlice(); + + template + void WriteValue(T value); + + void WriteVarLenInt(int32_t value); + void WriteVarLenLong(int64_t value); + + void WriteBytes(const std::shared_ptr& source); + void WriteBytes(const std::shared_ptr& source, int source_index, int length); + + void SetOrder(ByteOrder order); + + private: + void EnsureSize(int bytes); + bool NeedSwap() const; + + private: + MemoryPool* pool_; + MemorySegment segment_; + int32_t size_; + + ByteOrder byte_order_ = SystemByteOrder(); +}; + +} // namespace paimon diff --git a/src/paimon/common/sst/block_aligned_type.h b/src/paimon/common/sst/block_aligned_type.h new file mode 100644 index 00000000..d18fe30a --- /dev/null +++ b/src/paimon/common/sst/block_aligned_type.h @@ -0,0 +1,44 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/visibility.h" + +namespace paimon { + +/// Enumeration for stream seek origin positions. +enum class PAIMON_EXPORT BlockAlignedType { ALIGNED = 0, UNALIGNED = 1 }; + +inline Result From(int8_t v) { + if (v == 0) { + return BlockAlignedType::ALIGNED; + } else if (v == 1) { + return BlockAlignedType::UNALIGNED; + } else { + return Status::Invalid("Invalid block aligned type: " + std::to_string(v)); + } +} + +} // namespace paimon diff --git a/src/paimon/common/sst/block_cache.h b/src/paimon/common/sst/block_cache.h new file mode 100644 index 00000000..e0d20961 --- /dev/null +++ b/src/paimon/common/sst/block_cache.h @@ -0,0 +1,72 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/common/io/cache/cache_manager.h" +#include "paimon/common/memory/memory_segment.h" +#include "paimon/fs/file_system.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/result.h" + +namespace paimon { + +class BlockCache { + public: + BlockCache(std::string& file_path, const std::shared_ptr& in, + const std::shared_ptr& pool, + std::unique_ptr&& cache_manager) + : file_path_(file_path), in_(in), pool_(pool), cache_manager_(std::move(cache_manager)) {} + + ~BlockCache() = default; + + std::shared_ptr GetBlock(int64_t position, int32_t length, bool is_index) { + auto key = CacheKey::ForPosition(file_path_, position, length, is_index); + + auto it = blocks_.find(key); + if (it == blocks_.end()) { + auto segment = cache_manager_->GetPage( + key, [&](const std::shared_ptr&) -> Result { + return ReadFrom(position, length); + }); + if (!segment.get()) { + blocks_.insert({key, std::make_shared(segment)}); + } + return segment; + } + return it->second->GetSegment(); + } + + private: + Result ReadFrom(int64_t offset, int length) { + PAIMON_RETURN_NOT_OK(in_->Seek(offset, SeekOrigin::FS_SEEK_SET)); + auto segment = MemorySegment::AllocateHeapMemory(length, pool_.get()); + PAIMON_RETURN_NOT_OK(in_->Read(segment.GetHeapMemory()->data(), length)); + return segment; + } + + private: + std::string file_path_; + std::shared_ptr in_; + std::shared_ptr pool_; + + std::unique_ptr cache_manager_; + std::unordered_map, std::shared_ptr> blocks_; +}; +} // namespace paimon diff --git a/src/paimon/common/sst/block_entry.h b/src/paimon/common/sst/block_entry.h new file mode 100644 index 00000000..a5fdc26a --- /dev/null +++ b/src/paimon/common/sst/block_entry.h @@ -0,0 +1,36 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/common/memory/memory_slice.h" +#include "paimon/result.h" + +namespace paimon { + +struct BlockEntry { + public: + BlockEntry(std::shared_ptr& key, std::shared_ptr& value) + : key_(key), value_(value) {} + + ~BlockEntry() = default; + + std::shared_ptr key_; + std::shared_ptr value_; +}; +} // namespace paimon diff --git a/src/paimon/common/sst/block_footer.cpp b/src/paimon/common/sst/block_footer.cpp new file mode 100644 index 00000000..d7087c60 --- /dev/null +++ b/src/paimon/common/sst/block_footer.cpp @@ -0,0 +1,62 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/sst/block_footer.h" + +#include "paimon/common/memory/memory_slice_output.h" + +namespace paimon { + +std::unique_ptr BlockFooter::ReadBlockFooter( + std::shared_ptr& input) { + auto offset = input->ReadLong(); + auto size = input->ReadInt(); + auto expected_entries = input->ReadLong(); + std::shared_ptr bloom_filter_handle = nullptr; + if (offset || size || expected_entries) { + bloom_filter_handle = std::make_shared(offset, size, expected_entries); + } + auto index_offset = input->ReadLong(); + auto index_size = input->ReadInt(); + auto index_block_handle = std::make_shared(index_offset, index_size); + + auto magic = input->ReadInt(); + if (magic != MAGIC_NUMBER) { + return nullptr; + } + return std::make_unique(index_block_handle, bloom_filter_handle); +} + +std::shared_ptr BlockFooter::WriteBlockFooter(MemoryPool* pool) { + auto output = std::make_shared(ENCODED_LENGTH, pool); + // 20 bytes + if (!bloom_filter_handle_.get()) { + output->WriteValue(static_cast(0)); + output->WriteValue(static_cast(0)); + output->WriteValue(static_cast(0)); + } else { + output->WriteValue(bloom_filter_handle_->Offset()); + output->WriteValue(bloom_filter_handle_->Size()); + output->WriteValue(bloom_filter_handle_->ExpectedEntries()); + } + // 12 bytes + output->WriteValue(index_block_handle_->Offset()); + output->WriteValue(index_block_handle_->Size()); + // 4 bytes + output->WriteValue(MAGIC_NUMBER); + return output->ToSlice(); +} +} // namespace paimon diff --git a/src/paimon/common/sst/block_footer.h b/src/paimon/common/sst/block_footer.h new file mode 100644 index 00000000..13d3b93b --- /dev/null +++ b/src/paimon/common/sst/block_footer.h @@ -0,0 +1,60 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/common/memory/memory_slice_input.h" +#include "paimon/common/sst/block_handle.h" +#include "paimon/common/sst/bloom_filter_handle.h" +#include "paimon/core/key_value.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/result.h" + +namespace paimon { + +/// Footer of a block. +class BlockFooter { + public: + static std::unique_ptr ReadBlockFooter(std::shared_ptr& input); + + public: + BlockFooter(const std::shared_ptr& index_block_handle, + const std::shared_ptr& bloom_filter_handle) + : index_block_handle_(index_block_handle), bloom_filter_handle_(bloom_filter_handle) {} + + ~BlockFooter() = default; + + std::shared_ptr GetIndexBlockHandle() const { + return index_block_handle_; + } + std::shared_ptr GetBloomFilterHandle() const { + return bloom_filter_handle_; + } + + std::shared_ptr WriteBlockFooter(MemoryPool* pool); + + public: + // 20 bytes for bloom filter, 12 bytes for index block handle, 4 bytes for magic number + static constexpr int32_t ENCODED_LENGTH = 36; + static constexpr int32_t MAGIC_NUMBER = 1481571681; + + private: + std::shared_ptr index_block_handle_; + std::shared_ptr bloom_filter_handle_; +}; +} // namespace paimon diff --git a/src/paimon/common/sst/block_handle.cpp b/src/paimon/common/sst/block_handle.cpp new file mode 100644 index 00000000..73a9a78b --- /dev/null +++ b/src/paimon/common/sst/block_handle.cpp @@ -0,0 +1,55 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/sst/block_handle.h" + +#include "paimon/common/memory/memory_slice_output.h" + +namespace paimon { + +std::shared_ptr BlockHandle::ReadBlockHandle( + std::shared_ptr& input) { + int64_t offset = input->ReadVarLenLong(); + int size = input->ReadVarLenInt(); + return std::make_shared(offset, size); +} + +BlockHandle::BlockHandle(int64_t offset, int32_t size) : offset_(offset), size_(size) {} + +int64_t BlockHandle::Offset() const { + return offset_; +} + +int32_t BlockHandle::Size() const { + return size_; +} + +int32_t BlockHandle::GetFullBlockSize() const { + return size_ + MAX_ENCODED_LENGTH; +} + +std::string BlockHandle::ToString() const { + return "BlockHandle{offset=" + std::to_string(offset_) + ", size=" + std::to_string(size_) + + "}"; +} + +std::shared_ptr BlockHandle::WriteBlockHandle(MemoryPool* pool) { + auto output = std::make_shared(MAX_ENCODED_LENGTH, pool); + output->WriteVarLenLong(offset_); + output->WriteVarLenInt(size_); + return output->ToSlice(); +} +} // namespace paimon diff --git a/src/paimon/common/sst/block_handle.h b/src/paimon/common/sst/block_handle.h new file mode 100644 index 00000000..3e9e6eaf --- /dev/null +++ b/src/paimon/common/sst/block_handle.h @@ -0,0 +1,51 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/common/memory/memory_segment.h" +#include "paimon/common/memory/memory_slice_input.h" +#include "paimon/memory/bytes.h" +#include "paimon/result.h" + +namespace paimon { + +class BlockHandle { + public: + static std::shared_ptr ReadBlockHandle(std::shared_ptr& input); + + public: + BlockHandle(int64_t offset, int32_t size); + ~BlockHandle() = default; + + int64_t Offset() const; + int32_t Size() const; + int32_t GetFullBlockSize() const; + + std::string ToString() const; + std::shared_ptr WriteBlockHandle(MemoryPool* pool); + + public: + // max len for varlong is 9 bytes, max len for varint is 5 bytes + static constexpr int32_t MAX_ENCODED_LENGTH = 9 + 5; + + private: + int64_t offset_; + int32_t size_; +}; +} // namespace paimon diff --git a/src/paimon/common/sst/block_iterator.cpp b/src/paimon/common/sst/block_iterator.cpp new file mode 100644 index 00000000..6fb471dd --- /dev/null +++ b/src/paimon/common/sst/block_iterator.cpp @@ -0,0 +1,77 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/sst/block_iterator.h" + +#include "paimon/common/sst/block_reader.h" + +namespace paimon { +BlockIterator::BlockIterator(std::shared_ptr& reader) : reader_(reader) { + input_ = reader->BlockInput(); +} + +bool BlockIterator::HasNext() const { + return polled_.get() || input_->IsReadable(); +} + +Result> BlockIterator::Next() { + if (!HasNext()) { + return Status::Invalid("no such element"); + } + if (polled_.get()) { + return std::move(polled_); + } + return ReadEntry(); +} + +std::unique_ptr BlockIterator::ReadEntry() { + int key_length = input_->ReadVarLenInt(); + auto key = input_->ReadSlice(key_length); + int value_length = input_->ReadVarLenInt(); + auto value = input_->ReadSlice(value_length); + return std::make_unique(key, value); +} + +bool BlockIterator::SeekTo(std::shared_ptr target_key) { + int left = 0; + int right = reader_->RecordCount() - 1; + + while (left <= right) { + int mid = left + (right - left) / 2; + + auto status = input_->SetPosition(reader_->SeekTo(mid)); + if (!status.ok()) { + return false; + } + auto mid_entry = ReadEntry(); + int compare = reader_->Comparator()(mid_entry->key_, target_key); + + if (compare == 0) { + polled_ = std::move(mid_entry); + return true; + } else if (compare > 0) { + polled_ = std::move(mid_entry); + right = mid - 1; + } else { + polled_.reset(); + left = mid + 1; + } + } + + return false; +} + +} // namespace paimon diff --git a/src/paimon/common/sst/block_iterator.h b/src/paimon/common/sst/block_iterator.h new file mode 100644 index 00000000..284857c5 --- /dev/null +++ b/src/paimon/common/sst/block_iterator.h @@ -0,0 +1,47 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/common/memory/memory_slice_input.h" +#include "paimon/common/sst/block_entry.h" + +namespace paimon { +class BlockReader; + +class BlockIterator { + public: + explicit BlockIterator(std::shared_ptr& reader); + + ~BlockIterator() = default; + + bool HasNext() const; + + Result> Next(); + + std::unique_ptr ReadEntry(); + + bool SeekTo(std::shared_ptr target_key); + + private: + std::shared_ptr input_; + std::unique_ptr polled_; + std::shared_ptr reader_; +}; + +} // namespace paimon diff --git a/src/paimon/common/sst/block_reader.cpp b/src/paimon/common/sst/block_reader.cpp new file mode 100644 index 00000000..2bfaa8b1 --- /dev/null +++ b/src/paimon/common/sst/block_reader.cpp @@ -0,0 +1,64 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/sst/block_reader.h" + +#include "paimon/common/sst/block_trailer.h" + +namespace paimon { + +std::shared_ptr BlockReader::Create( + std::shared_ptr block, + std::function&, + const std::shared_ptr&)>& comparator) { + auto ret = From(block->ReadByte(block->Length() - 1)); + if (!ret.ok()) { + return nullptr; + } + BlockAlignedType type = ret.value(); + const auto trailer_len = BlockTrailer::ENCODED_LENGTH; + int size = block->ReadInt(block->Length() - trailer_len); + if (type == BlockAlignedType::ALIGNED) { + auto data = block->Slice(0, block->Length() - trailer_len); + return std::make_shared(data, size, comparator); + } else { + int index_length = size * 4; + int index_offset = block->Length() - trailer_len - index_length; + auto data = block->Slice(0, index_offset); + auto index = block->Slice(index_offset, index_length); + return std::make_shared(data, index, comparator); + } +} + +std::unique_ptr BlockReader::Iterator() { + std::shared_ptr ptr = shared_from_this(); + return std::make_unique(ptr); +} + +std::shared_ptr BlockReader::BlockInput() { + return block_->ToInput(); +} + +int32_t BlockReader::RecordCount() const { + return record_count_; +} + +std::function&, const std::shared_ptr&)>& +BlockReader::Comparator() { + return comparator_; +} + +} // namespace paimon diff --git a/src/paimon/common/sst/block_reader.h b/src/paimon/common/sst/block_reader.h new file mode 100644 index 00000000..990b2080 --- /dev/null +++ b/src/paimon/common/sst/block_reader.h @@ -0,0 +1,98 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/memory/memory_slice_input.h" +#include "paimon/common/sst/block_aligned_type.h" +#include "paimon/common/sst/block_iterator.h" +#include "paimon/memory/bytes.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/result.h" + +namespace paimon { +class BlockIterator; + +/// Reader for a block. +class BlockReader : public std::enable_shared_from_this { + public: + static std::shared_ptr Create( + std::shared_ptr block, + std::function&, + const std::shared_ptr&)>& comparator); + + virtual ~BlockReader() = default; + + std::unique_ptr Iterator(); + virtual int32_t SeekTo(int32_t record_position) = 0; + + std::shared_ptr BlockInput(); + + int32_t RecordCount() const; + + std::function&, + const std::shared_ptr&)>& + Comparator(); + + protected: + BlockReader(std::shared_ptr& block, int32_t record_count, + std::function&, + const std::shared_ptr&)>& comparator) + : block_(block), comparator_(comparator), record_count_(record_count) {} + + private: + std::shared_ptr block_; + std::function&, + const std::shared_ptr&)>& comparator_; + int32_t record_count_; +}; + +class AlignedBlockReader : public BlockReader { + public: + AlignedBlockReader(std::shared_ptr& block, int32_t record_size, + std::function&, + const std::shared_ptr&)>& comparator) + : BlockReader(block, block->Length() / record_size, comparator), + record_size_(record_size) {} + + int32_t SeekTo(int32_t record_position) override { + return record_size_ * record_position; + } + + private: + int32_t record_size_; +}; + +class UnAlignedBlockReader : public BlockReader { + public: + UnAlignedBlockReader(std::shared_ptr& data, std::shared_ptr& index, + std::function&, + const std::shared_ptr&)>& comparator) + : BlockReader(data, index->Length() / 4, comparator), index_(index) {} + + int32_t SeekTo(int32_t record_position) override { + return index_->ReadInt(record_position * 4); + } + + private: + std::shared_ptr index_; +}; + +} // namespace paimon diff --git a/src/paimon/common/sst/block_trailer.cpp b/src/paimon/common/sst/block_trailer.cpp new file mode 100644 index 00000000..abc08609 --- /dev/null +++ b/src/paimon/common/sst/block_trailer.cpp @@ -0,0 +1,51 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/sst/block_trailer.h" + +#include "paimon/common/memory/memory_slice_output.h" + +namespace paimon { + +std::unique_ptr BlockTrailer::ReadBlockTrailer( + std::shared_ptr& input) { + auto compress = input->ReadUnsignedByte(); + auto crc32c = input->ReadInt(); + return std::make_unique(compress, crc32c); +} + +int32_t BlockTrailer::Crc32c() const { + return crc32c_; +} + +int8_t BlockTrailer::CompressionType() const { + return compression_type_; +} + +std::string BlockTrailer::ToString() const { + std::stringstream sstream; + sstream << std::hex << crc32c_; + return "BlockTrailer{compression_type=" + std::to_string(compression_type_) + ", crc32c_=0x" + + sstream.str() + "}"; +} + +std::shared_ptr BlockTrailer::WriteBlockTrailer(MemoryPool* pool) { + auto output = std::make_shared(ENCODED_LENGTH, pool); + output->WriteValue(compression_type_); + output->WriteValue(crc32c_); + return output->ToSlice(); +} +} // namespace paimon diff --git a/src/paimon/common/sst/block_trailer.h b/src/paimon/common/sst/block_trailer.h new file mode 100644 index 00000000..0444fbad --- /dev/null +++ b/src/paimon/common/sst/block_trailer.h @@ -0,0 +1,52 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/common/memory/memory_slice_input.h" +#include "paimon/core/key_value.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/result.h" + +namespace paimon { + +/// Trailer of a block. +class BlockTrailer { + public: + static std::unique_ptr ReadBlockTrailer(std::shared_ptr& input); + + public: + BlockTrailer(int8_t compression_type, int32_t crc32c) + : crc32c_(crc32c), compression_type_(compression_type) {} + + ~BlockTrailer() = default; + + int32_t Crc32c() const; + int8_t CompressionType() const; + + std::string ToString() const; + std::shared_ptr WriteBlockTrailer(MemoryPool* pool); + + public: + static constexpr int32_t ENCODED_LENGTH = 5; + + private: + int32_t crc32c_; + int8_t compression_type_; +}; +} // namespace paimon diff --git a/src/paimon/common/sst/block_writer.cpp b/src/paimon/common/sst/block_writer.cpp new file mode 100644 index 00000000..b98cc9e0 --- /dev/null +++ b/src/paimon/common/sst/block_writer.cpp @@ -0,0 +1,67 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/sst/block_writer.h" + +#include "paimon/common/sst/block_aligned_type.h" + +namespace paimon { + +void BlockWriter::Write(std::shared_ptr& key, std::shared_ptr& value) { + int start_position = block_->Size(); + block_->WriteVarLenInt(key->size()); + block_->WriteBytes(key); + block_->WriteVarLenInt(value->size()); + block_->WriteBytes(value); + int end_position = block_->Size(); + positions_.push_back(start_position); + if (aligned_) { + int current_size = end_position - start_position; + if (aligned_size_ == 0) { + aligned_size_ = current_size; + } else { + aligned_ = aligned_size_ == current_size; + } + } +} + +void BlockWriter::Reset() { + positions_.clear(); + block_ = std::make_shared(size_, pool_.get()); + aligned_size_ = 0; + aligned_ = true; +} + +Result> BlockWriter::Finish() { + if (positions_.size() == 0) { + // Do not use alignment mode, as it is impossible to calculate how many records are + // inside when reading + aligned_ = false; + } + if (aligned_) { + block_->WriteValue(aligned_size_); + } else { + for (auto& position : positions_) { + block_->WriteValue(position); + } + block_->WriteValue(static_cast(positions_.size())); + } + block_->WriteValue(aligned_ ? static_cast(BlockAlignedType::ALIGNED) + : static_cast(BlockAlignedType::UNALIGNED)); + return block_->ToSlice(); +} + +} // namespace paimon diff --git a/src/paimon/common/sst/block_writer.h b/src/paimon/common/sst/block_writer.h new file mode 100644 index 00000000..ce76d944 --- /dev/null +++ b/src/paimon/common/sst/block_writer.h @@ -0,0 +1,90 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/common/memory/memory_slice_output.h" +#include "paimon/memory/bytes.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/result.h" + +namespace paimon { + +/// +/// Writer to build a Block. A block is designed for storing and random-accessing k-v pairs. The +/// layout is as below: +/// +///
+///     +---------------+
+///     | Block Trailer |
+///     +------------------------------------------------+
+///     |       Block CRC32C      |     Compression      |
+///     +------------------------------------------------+
+///     +---------------+
+///     |  Block Data   |
+///     +---------------+--------------------------------+----+
+///     | key len | key bytes | value len | value bytes  |    |
+///     +------------------------------------------------+    |
+///     | key len | key bytes | value len | value bytes  |    +-> Key-Value pairs
+///     +------------------------------------------------+    |
+///     |                  ... ...                       |    |
+///     +------------------------------------------------+----+
+///     | entry pos | entry pos |     ...    | entry pos |    +-> optional, for unaligned block
+///     +------------------------------------------------+----+
+///     |   entry num  /  entry size   |   aligned type  |
+///     +------------------------------------------------+
+/// 
+/// +class BlockWriter { + public: + BlockWriter(int32_t size, const std::shared_ptr& pool, bool aligned = true) + : size_(size), pool_(pool), aligned_(aligned) { + block_ = std::make_shared(size, pool_.get()); + aligned_size_ = 0; + } + + ~BlockWriter() = default; + + void Write(std::shared_ptr& key, std::shared_ptr& value); + + void Reset(); + + int32_t Size() const { + return positions_.size(); + } + + int32_t Memory() const { + int memory = block_->Size() + 5; + if (!aligned_) { + memory += positions_.size() * 4; + } + return memory; + } + + Result> Finish(); + + private: + const int32_t size_; + const std::shared_ptr pool_; + + std::vector positions_; + std::shared_ptr block_; + bool aligned_; + int32_t aligned_size_; +}; +} // namespace paimon diff --git a/src/paimon/common/sst/bloom_filter_handle.h b/src/paimon/common/sst/bloom_filter_handle.h new file mode 100644 index 00000000..784f30e5 --- /dev/null +++ b/src/paimon/common/sst/bloom_filter_handle.h @@ -0,0 +1,55 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/common/memory/memory_segment.h" +#include "paimon/memory/bytes.h" +#include "paimon/result.h" + +namespace paimon { + +class BloomFilterHandle { + public: + BloomFilterHandle(int64_t offset, int32_t size, int64_t expected_entries) + : offset_(offset), size_(size), expected_entries_(expected_entries) {} + + BloomFilterHandle() = default; + ~BloomFilterHandle() = default; + + int64_t Offset() const { + return offset_; + } + + int32_t Size() const { + return size_; + } + + int64_t ExpectedEntries() const { + return expected_entries_; + } + + public: + static constexpr int32_t MAX_ENCODED_LENGTH = 9 + 5; + + private: + int64_t offset_; + int32_t size_; + int64_t expected_entries_; +}; +} // namespace paimon diff --git a/src/paimon/common/sst/sst_file_io_test.cpp b/src/paimon/common/sst/sst_file_io_test.cpp new file mode 100644 index 00000000..3388cb66 --- /dev/null +++ b/src/paimon/common/sst/sst_file_io_test.cpp @@ -0,0 +1,199 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/array/array_base.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_nested.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/common/sst/sst_file_reader.h" +#include "paimon/common/sst/sst_file_writer.h" +#include "paimon/defs.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/predicate/literal.h" +#include "paimon/predicate/predicate_builder.h" +#include "paimon/status.h" +#include "paimon/testing/mock/mock_file_batch_reader.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon { +class Predicate; +} // namespace paimon + +namespace paimon::test { +class SstFileIOTest : public ::testing::Test { + public: + void SetUp() override { + dir_ = paimon::test::UniqueTestDirectory::Create(); + fs_ = dir_->GetFileSystem(); + index_path_ = dir_->Str() + "/sst_file_test.data"; + pool_ = GetDefaultPool(); + comparator_ = [](const std::shared_ptr& a, + const std::shared_ptr& b) -> int32_t { + std::string_view va = a->ReadStringView(); + std::string_view vb = b->ReadStringView(); + if (va == vb) { + return 0; + } + return va > vb ? 1 : -1; + }; + } + + void TearDown() override { + ASSERT_OK(fs_->Delete(dir_->Str())); + } + + protected: + std::unique_ptr dir_; + std::shared_ptr fs_; + std::string index_path_; + std::shared_ptr pool_; + + std::function&, const std::shared_ptr&)> + comparator_; +}; + +TEST_F(SstFileIOTest, TestSimple) { + // write content + ASSERT_OK_AND_ASSIGN(std::shared_ptr out, + fs_->Create(index_path_, /*overwrite=*/false)); + + // write data + auto bf = BloomFilter::Create(30, 0.01); + auto seg_for_bf = MemorySegment::AllocateHeapMemory(bf->ByteLength(), pool_.get()); + auto seg_ptr = std::make_shared(seg_for_bf); + ASSERT_OK(bf->SetMemorySegment(seg_ptr)); + auto writer = std::make_shared(out, pool_, bf, 50); + std::set value_hash; + for (size_t i = 1; i <= 5; i++) { + std::string key = "k" + std::to_string(i); + std::string value = std::to_string(i); + ASSERT_OK(writer->Write(std::make_shared(key, pool_.get()), + std::make_shared(value, pool_.get()))); + auto bytes = std::make_shared(key, pool_.get()); + value_hash.insert(MurmurHashUtils::HashBytes(bytes)); + } + for (size_t i = 10; i <= 20; i++) { + std::string key = "k9" + std::to_string(i); + std::string value = "looooooooooong-值-" + std::to_string(i); + ASSERT_OK(writer->Write(std::make_shared(key, pool_.get()), + std::make_shared(value, pool_.get()))); + auto bytes = std::make_shared(key, pool_.get()); + value_hash.insert(MurmurHashUtils::HashBytes(bytes)); + } + ASSERT_OK(writer->Flush()); + + ASSERT_EQ(6, writer->IndexWriter()->Size()); + + auto bloom_filter_handle_ret = writer->WriteBloomFilter(); + ASSERT_OK(bloom_filter_handle_ret); + auto index_block_handle_ret = writer->WriteIndexBlock(); + ASSERT_OK(index_block_handle_ret); + auto index_block_handle = index_block_handle_ret.value(); + auto bloom_filter_handle = bloom_filter_handle_ret.value(); + ASSERT_OK(writer->WriteFooter(index_block_handle, bloom_filter_handle)); + + ASSERT_OK(out->Flush()); + ASSERT_OK(out->Close()); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr in, fs_->Open(index_path_)); + auto block_cache = + std::make_shared(index_path_, in, pool_, std::make_unique()); + + // bloom filter test + auto entries = bloom_filter_handle->ExpectedEntries(); + auto offset = bloom_filter_handle->Offset(); + auto size = bloom_filter_handle->Size(); + ASSERT_OK(in->Seek(offset, SeekOrigin::FS_SEEK_SET)); + auto bloom_filer_bytes = Bytes::AllocateBytes(size, pool_.get()); + ASSERT_OK(in->Read(bloom_filer_bytes->data(), bloom_filer_bytes->size())); + auto seg = MemorySegment::Wrap(std::move(bloom_filer_bytes)); + auto ptr = std::make_shared(seg); + auto bloom_filter = std::make_shared(entries, size); + ASSERT_OK(bloom_filter->SetMemorySegment(ptr)); + for (const auto& value : value_hash) { + ASSERT_TRUE(bloom_filter->TestHash(value)); + } + + // test read + auto reader_ret = SstFileReader::Create(pool_, block_cache, in->Length().value(), comparator_); + ASSERT_OK(reader_ret); + auto reader = reader_ret.value(); + // not exist key + std::string k0 = "k0"; + ASSERT_EQ(nullptr, reader->Lookup(std::make_shared(k0, pool_.get()))); + + // k4 + std::string k4 = "k4"; + auto v4 = reader->Lookup(std::make_shared(k4, pool_.get())); + ASSERT_TRUE(v4); + std::string string4{v4->data(), v4->size()}; + ASSERT_EQ("4", string4); + + // not exist key + std::string k55 = "k55"; + ASSERT_EQ(nullptr, reader->Lookup(std::make_shared(k55, pool_.get()))); + + // k915 + std::string k915 = "k915"; + auto v15 = reader->Lookup(std::make_shared(k915, pool_.get())); + ASSERT_TRUE(v15); + std::string string15{v15->data(), v15->size()}; + ASSERT_EQ("looooooooooong-值-15", string15); +} + +TEST_F(SstFileIOTest, TestJavaCompatitable) { + // key range [1_000_000, 2_000_000], value is equal to the key + std::string file = GetDataDir() + "/sst/none/79d01717-8380-4504-86e1-387e6c058d0a"; + ASSERT_OK_AND_ASSIGN(std::shared_ptr in, fs_->Open(file)); + auto block_cache = + std::make_shared(index_path_, in, pool_, std::make_unique()); + + // test read + auto reader_ret = SstFileReader::Create(pool_, block_cache, in->Length().value(), comparator_); + ASSERT_OK(reader_ret); + auto reader = reader_ret.value(); + // not exist key + std::string k0 = "10000"; + ASSERT_EQ(nullptr, reader->Lookup(std::make_shared(k0, pool_.get()))); + + // k1314520 + std::string k1314520 = "1314520"; + auto v1314520 = reader->Lookup(std::make_shared(k1314520, pool_.get())); + ASSERT_TRUE(v1314520); + std::string string1314520{v1314520->data(), v1314520->size()}; + ASSERT_EQ("1314520", string1314520); + + // not exist key + std::string k13145200 = "13145200"; + ASSERT_EQ(nullptr, reader->Lookup(std::make_shared(k13145200, pool_.get()))); + + std::string k1314521 = "1314521"; + auto v1314521 = reader->Lookup(std::make_shared(k1314521, pool_.get())); + ASSERT_TRUE(v1314521); + std::string string1314521{v1314521->data(), v1314521->size()}; + ASSERT_EQ("1314521", string1314521); +} + +} // namespace paimon::test diff --git a/src/paimon/common/sst/sst_file_reader.cpp b/src/paimon/common/sst/sst_file_reader.cpp new file mode 100644 index 00000000..3015bbc7 --- /dev/null +++ b/src/paimon/common/sst/sst_file_reader.cpp @@ -0,0 +1,146 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/common/sst/sst_file_reader.h" + +#include "paimon/common/utils/murmurhash_utils.h" + +namespace paimon { + +Result> SstFileReader::Create( + const std::shared_ptr& pool, const std::shared_ptr& block_cache, + int64_t file_len, + std::function&, const std::shared_ptr&)> + comparator) { + // read footer + auto segment = block_cache->GetBlock(file_len - BlockFooter::ENCODED_LENGTH, + BlockFooter::ENCODED_LENGTH, true); + if (!segment.get()) { + return Status::Invalid("Read footer error"); + } + auto slice = MemorySlice::Wrap(segment); + auto input = slice->ToInput(); + auto footer = BlockFooter::ReadBlockFooter(input); + if (!footer.get()) { + return Status::Invalid("Read footer error"); + } + + // read bloom filter directly now + auto bloom_filter_handle = footer->GetBloomFilterHandle(); + std::shared_ptr bloom_filter = nullptr; + if (bloom_filter_handle->ExpectedEntries() || bloom_filter_handle->Size() || + bloom_filter_handle->Offset()) { + bloom_filter = std::make_shared(bloom_filter_handle->ExpectedEntries(), + bloom_filter_handle->Size()); + PAIMON_RETURN_NOT_OK(bloom_filter->SetMemorySegment(block_cache->GetBlock( + bloom_filter_handle->Offset(), bloom_filter_handle->Size(), true))); + } + + return std::make_shared(pool, block_cache, footer->GetIndexBlockHandle(), + bloom_filter, comparator); +} + +SstFileReader::SstFileReader( + const std::shared_ptr& pool, const std::shared_ptr& block_cache, + const std::shared_ptr& index_block_handle, + const std::shared_ptr& bloom_filter, + std::function&, const std::shared_ptr&)> + comparator) + : pool_(pool), block_cache_(block_cache), bloom_filter_(bloom_filter), comparator_(comparator) { + index_block_reader_ = ReadBlock(index_block_handle, true); +} + +std::unique_ptr SstFileReader::CreateIterator() { + return std::make_unique(this, index_block_reader_->Iterator()); +} + +std::shared_ptr SstFileReader::Lookup(std::shared_ptr key) { + if (bloom_filter_.get() && !bloom_filter_->TestHash(MurmurHashUtils::HashBytes(key))) { + return nullptr; + } + auto key_slice = MemorySlice::Wrap(key); + // seek the index to the block containing the key + auto index_block_iterator = index_block_reader_->Iterator(); + index_block_iterator->SeekTo(key_slice); + // if indexIterator does not have a next, it means the key does not exist in this iterator + if (index_block_iterator->HasNext()) { + // seek the current iterator to the key + auto current = GetNextBlock(index_block_iterator); + if (!current.get()) { + return nullptr; + } + if (current->SeekTo(key_slice)) { + auto ret = current->Next(); + if (!ret.ok()) { + return nullptr; + } + return ret.value()->value_->CopyBytes(pool_.get()); + } + } + return nullptr; +} + +std::unique_ptr SstFileReader::GetNextBlock( + std::unique_ptr& index_iterator) { + auto ret = index_iterator->Next(); + if (!ret.ok()) { + return nullptr; + } + auto& slice = ret.value()->value_; + auto input = slice->ToInput(); + return ReadBlock(BlockHandle::ReadBlockHandle(input), false)->Iterator(); +} + +std::shared_ptr SstFileReader::ReadBlock(std::shared_ptr&& handle, + bool index) { + auto block_handle = handle; + return ReadBlock(block_handle, index); +} + +std::shared_ptr SstFileReader::ReadBlock(const std::shared_ptr& handle, + bool index) { + // read block trailer + auto trailer_data = block_cache_->GetBlock(handle->Offset() + handle->Size(), + BlockTrailer::ENCODED_LENGTH, true); + auto trailer_input = MemorySlice::Wrap(trailer_data)->ToInput(); + auto trailer = BlockTrailer::ReadBlockTrailer(trailer_input); + + auto block_data = block_cache_->GetBlock(handle->Offset(), handle->Size(), index); + return BlockReader::Create(MemorySlice::Wrap(block_data), comparator_); +} + +SstFileIterator::SstFileIterator(SstFileReader* reader, + std::unique_ptr index_iterator) + : reader_(reader), index_iterator_(std::move(index_iterator)) {} + +/** + * Seek to the position of the record whose key is exactly equal to or greater than the + * specified key. + */ +void SstFileIterator::SeekTo(std::shared_ptr& key) { + auto key_slice = MemorySlice::Wrap(key); + index_iterator_->SeekTo(key_slice); + if (index_iterator_->HasNext()) { + data_iterator_ = reader_->GetNextBlock(index_iterator_); + // The index block entry key is the last key of the corresponding data block. + // If there is some index entry key >= targetKey, the related data block must + // also contain some key >= target key, which means seekedDataBlock.hasNext() + // must be true + data_iterator_->SeekTo(key_slice); + } else { + data_iterator_.reset(); + } +} +} // namespace paimon diff --git a/src/paimon/common/sst/sst_file_reader.h b/src/paimon/common/sst/sst_file_reader.h new file mode 100644 index 00000000..c5dfa55d --- /dev/null +++ b/src/paimon/common/sst/sst_file_reader.h @@ -0,0 +1,109 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/common/sst/block_cache.h" +#include "paimon/common/sst/block_footer.h" +#include "paimon/common/sst/block_handle.h" +#include "paimon/common/sst/block_iterator.h" +#include "paimon/common/sst/block_reader.h" +#include "paimon/common/sst/block_trailer.h" +#include "paimon/common/sst/bloom_filter_handle.h" +#include "paimon/common/utils/bit_set.h" +#include "paimon/common/utils/bloom_filter.h" +#include "paimon/fs/file_system.h" +#include "paimon/memory/bytes.h" +#include "paimon/result.h" + +namespace paimon { +class SstFileIterator; + +/// An SST File Reader which serves point queries and range queries. Users can call +/// CreateIterator() to create a file iterator and then use seek and read methods to do range +/// queries. Note that this class is NOT thread-safe. +class SstFileReader { + public: + static Result> Create( + const std::shared_ptr& pool, const std::shared_ptr& block_cache, + int64_t file_len, + std::function&, + const std::shared_ptr&)> + comparator); + + SstFileReader(const std::shared_ptr& pool, + const std::shared_ptr& block_cache, + const std::shared_ptr& index_block_handle, + const std::shared_ptr& bloom_filter, + std::function&, + const std::shared_ptr&)> + comparator); + ~SstFileReader() = default; + + std::unique_ptr CreateIterator(); + + /** + * Lookup the specified key in the file. + * + * @param key serialized key + * @return corresponding serialized value, nullptr if not found. + */ + std::shared_ptr Lookup(std::shared_ptr key); + + std::unique_ptr GetNextBlock(std::unique_ptr& index_iterator); + + /** + * @param handle The block handle. + * @param index Whether read the block as an index. + * @return The reader of the target block. + */ + std::shared_ptr ReadBlock(std::shared_ptr&& handle, bool index); + + /** + * @param handle The block handle. + * @param index Whether read the block as an index. + * @return The reader of the target block. + */ + std::shared_ptr ReadBlock(const std::shared_ptr& handle, bool index); + + private: + std::shared_ptr pool_; + std::shared_ptr block_cache_; + std::shared_ptr bloom_filter_; + std::shared_ptr index_block_reader_; + std::function&, const std::shared_ptr&)> + comparator_; +}; + +class SstFileIterator { + public: + SstFileIterator() = default; + SstFileIterator(SstFileReader* reader, std::unique_ptr index_iterator); + + /** + * Seek to the position of the record whose key is exactly equal to or greater than the + * specified key. + */ + void SeekTo(std::shared_ptr& key); + + private: + SstFileReader* reader_; + std::unique_ptr index_iterator_; + std::unique_ptr data_iterator_; +}; +} // namespace paimon diff --git a/src/paimon/common/sst/sst_file_writer.cpp b/src/paimon/common/sst/sst_file_writer.cpp new file mode 100644 index 00000000..569aa9cf --- /dev/null +++ b/src/paimon/common/sst/sst_file_writer.cpp @@ -0,0 +1,107 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/sst/sst_file_writer.h" +namespace paimon { +class MemoryPool; + +Status SstFileWriter::Write(std::shared_ptr&& key, std::shared_ptr&& value) { + data_block_writer_->Write(key, value); + last_key_ = key; + if (data_block_writer_->Memory() > block_size_) { + PAIMON_RETURN_NOT_OK(Flush()); + } + if (bloom_filter_.get()) { + PAIMON_RETURN_NOT_OK(bloom_filter_->AddHash(MurmurHashUtils::HashBytes(key))); + } + return Status::OK(); +} + +Status SstFileWriter::Write(std::shared_ptr& slice) { + auto data = slice->ReadStringView(); + return WriteBytes(data.data(), data.size()); +} + +Status SstFileWriter::Flush() { + if (data_block_writer_->Size() == 0) { + return Status::OK(); + } + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr handle, + FlushBlockWriter(data_block_writer_)); + + auto slice = handle->WriteBlockHandle(pool_.get()); + auto value = slice->CopyBytes(pool_.get()); + index_block_writer_->Write(last_key_, value); + return Status::OK(); +} + +Result> SstFileWriter::WriteIndexBlock() { + return FlushBlockWriter(index_block_writer_); +} + +Result> SstFileWriter::WriteBloomFilter() { + if (!bloom_filter_.get()) { + return Status::Invalid("bloom_filter_ should be set before writing"); + } + + auto data = bloom_filter_->GetBitSet()->ToSlice()->ReadStringView(); + + auto handle = std::make_shared(out_->GetPos().value(), data.size(), + bloom_filter_->ExpectedEntries()); + + PAIMON_RETURN_NOT_OK(WriteBytes(data.data(), data.size())); + + return handle; +} + +Status SstFileWriter::WriteFooter(const std::shared_ptr& index_block_handle, + const std::shared_ptr& bloom_filter_handle) { + auto footer = std::make_shared(index_block_handle, bloom_filter_handle); + auto slice = footer->WriteBlockFooter(pool_.get()); + auto data = slice->ReadStringView(); + PAIMON_RETURN_NOT_OK(WriteBytes(data.data(), data.size())); + return Status::OK(); +} + +Result> SstFileWriter::FlushBlockWriter( + std::unique_ptr& writer) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr block_data, writer->Finish()); + + auto size = block_data->Length(); + // todo attempt to compress the block + auto view = block_data->ReadStringView(); + auto crc32 = arrow::internal::crc32(0, view.data(), view.size()); + auto trailer = std::make_shared(0, crc32)->WriteBlockTrailer(pool_.get()); + auto trailer_data = trailer->ReadStringView(); + + auto block_handle = std::make_shared(out_->GetPos().value_or(0), size); + + // 1. write data + PAIMON_RETURN_NOT_OK(WriteBytes(view.data(), view.size())); + + // 2. write trailer + PAIMON_RETURN_NOT_OK(WriteBytes(trailer_data.data(), trailer_data.size())); + + writer->Reset(); + return block_handle; +} + +Status SstFileWriter::WriteBytes(const char* data, size_t size) { + PAIMON_RETURN_NOT_OK(out_->Write(data, size)); + return Status::OK(); +} +} // namespace paimon diff --git a/src/paimon/common/sst/sst_file_writer.h b/src/paimon/common/sst/sst_file_writer.h new file mode 100644 index 00000000..45a9ca84 --- /dev/null +++ b/src/paimon/common/sst/sst_file_writer.h @@ -0,0 +1,91 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "arrow/util/crc32.h" +#include "paimon/common/sst/block_footer.h" +#include "paimon/common/sst/block_handle.h" +#include "paimon/common/sst/block_trailer.h" +#include "paimon/common/sst/block_writer.h" +#include "paimon/common/sst/bloom_filter_handle.h" +#include "paimon/common/utils/bit_set.h" +#include "paimon/common/utils/bloom_filter.h" +#include "paimon/common/utils/murmurhash_utils.h" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" + +namespace arrow { +class Array; +} // namespace arrow + +namespace paimon { +class MemoryPool; + +/// The writer for writing SST Files. SST Files are row-oriented and designed to serve frequent +/// point queries and range queries by key. +class SstFileWriter { + public: + SstFileWriter(const std::shared_ptr& out, const std::shared_ptr& pool, + const std::shared_ptr& bloom_filter, int32_t block_size) + : out_(out), pool_(pool), bloom_filter_(bloom_filter), block_size_(block_size) { + data_block_writer_ = + std::make_unique(static_cast(block_size * 1.1), pool); + index_block_writer_ = + std::make_unique(BlockHandle::MAX_ENCODED_LENGTH * 1024, pool); + } + + ~SstFileWriter() = default; + + Status Write(std::shared_ptr&& key, std::shared_ptr&& value); + + Status Write(std::shared_ptr& slice); + + Status Flush(); + + Result> WriteIndexBlock(); + + Result> WriteBloomFilter(); + + Status WriteFooter(const std::shared_ptr& index_block_handle, + const std::shared_ptr& bloom_filter_handle); + + private: + Result> FlushBlockWriter(std::unique_ptr& writer); + + Status WriteBytes(const char* data, size_t size); + + // api for testing + BlockWriter* IndexWriter() const { + return index_block_writer_.get(); + } + + private: + const std::shared_ptr out_; + + const std::shared_ptr pool_; + + std::shared_ptr bloom_filter_; + + std::shared_ptr last_key_; + + int32_t block_size_; + std::unique_ptr data_block_writer_; + std::unique_ptr index_block_writer_; +}; +} // namespace paimon diff --git a/src/paimon/common/utils/bit_set.cpp b/src/paimon/common/utils/bit_set.cpp new file mode 100644 index 00000000..5f12ec4b --- /dev/null +++ b/src/paimon/common/utils/bit_set.cpp @@ -0,0 +1,71 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/bit_set.h" +namespace paimon { + +Status BitSet::SetMemorySegment(std::shared_ptr segment, int32_t offset) { + if (!segment) { + return Status::Invalid("MemorySegment can not be null."); + } + if (offset < 0) { + return Status::Invalid("Offset should be positive integer."); + } + if (offset + byte_length_ > segment->Size()) { + return Status::Invalid("Could not set MemorySegment, the remain buffers is not enough."); + } + segment_ = segment; + offset_ = offset; + return Status::OK(); +} + +void BitSet::UnsetMemorySegment() { + segment_.reset(); +} + +Status BitSet::Set(unsigned int index) { + if (index >= bit_size_) { + return Status::IndexError("Index out of bound"); + } + unsigned int byte_index = index >> 3; + auto val = segment_->Get(offset_ + byte_index); + val |= (1 << (index & BYTE_INDEX_MASK)); + segment_->PutValue(offset_ + byte_index, val); + return Status::OK(); +} + +bool BitSet::Get(unsigned int index) { + if (index >= bit_size_) { + return false; + } + unsigned int byte_index = index >> 3; + auto val = segment_->Get(offset_ + byte_index); + return (val & (1 << (index & BYTE_INDEX_MASK))) != 0; +} + +void BitSet::Clear() { + int index = 0; + while (index + 8 <= byte_length_) { + segment_->PutValue(offset_ + index, 0L); + index += 8; + } + while (index < byte_length_) { + segment_->PutValue(offset_ + index, static_cast(0)); + index += 1; + } +} + +} // namespace paimon diff --git a/src/paimon/common/utils/bit_set.h b/src/paimon/common/utils/bit_set.h new file mode 100644 index 00000000..22bcb7f7 --- /dev/null +++ b/src/paimon/common/utils/bit_set.h @@ -0,0 +1,69 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include + +#include "paimon/common/memory/memory_slice.h" +#include "paimon/memory/bytes.h" +#include "paimon/status.h" +#include "paimon/visibility.h" + +namespace paimon { +class MemoryPool; + +/// BitSet based on MemorySegment. +class PAIMON_EXPORT BitSet { + public: + explicit BitSet(int64_t byte_length) : byte_length_(byte_length), bit_size_(byte_length << 3) {} + + Status SetMemorySegment(std::shared_ptr segment, int32_t offset = 0); + void UnsetMemorySegment(); + + std::shared_ptr& GetMemorySegment() { + return segment_; + } + + std::shared_ptr ToSlice() { + return std::make_shared(segment_, offset_, byte_length_); + } + + int32_t Offset() const { + return offset_; + } + int64_t BitSize() const { + return bit_size_; + } + int64_t ByteLength() const { + return byte_length_; + } + + Status Set(unsigned int index); + bool Get(unsigned int index); + void Clear(); + + private: + static constexpr int32_t BYTE_INDEX_MASK = 0x00000007; + + private: + int64_t byte_length_; + int64_t bit_size_; + int32_t offset_; + std::shared_ptr segment_; +}; +} // namespace paimon diff --git a/src/paimon/common/utils/bit_set_test.cpp b/src/paimon/common/utils/bit_set_test.cpp new file mode 100644 index 00000000..a81e42fe --- /dev/null +++ b/src/paimon/common/utils/bit_set_test.cpp @@ -0,0 +1,51 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/bit_set.h" + +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(BitSetTest, TestBitSet) { + auto bit_set = std::make_shared(1024); + auto pool = GetDefaultPool(); + auto seg = MemorySegment::AllocateHeapMemory(1024, pool.get()); + auto ptr = std::make_shared(seg); + ASSERT_OK(bit_set->SetMemorySegment(ptr)); + for (int i = 0; i < 100; i++) { + ASSERT_OK(bit_set->Set(i * 2 + 1)); + } + for (int i = 0; i < 100; i++) { + ASSERT_TRUE(bit_set->Get(i * 2 + 1)); + } + bit_set->Clear(); + for (int i = 0; i < 100; i++) { + ASSERT_FALSE(bit_set->Get(i * 2 + 1)); + } +} + +} // namespace paimon::test diff --git a/src/paimon/common/utils/bloom_filter.cpp b/src/paimon/common/utils/bloom_filter.cpp new file mode 100644 index 00000000..f46f1b8d --- /dev/null +++ b/src/paimon/common/utils/bloom_filter.cpp @@ -0,0 +1,97 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/bloom_filter.h" + +#include +#include +#include +#include + +namespace paimon { + +int32_t BloomFilter::OptimalNumOfBits(int64_t expect_entries, double fpp) { + if (expect_entries <= 0 || fpp <= 0.0 || fpp >= 1.0) { + return 0; + } + double result = -static_cast(expect_entries) * log(fpp) / (log(2) * log(2)); + if (result > INT32_MAX) return INT32_MAX; + if (result < 0) return 0; + return static_cast(result); +} + +int32_t BloomFilter::OptimalNumOfHashFunctions(int64_t expect_entries, int64_t bit_size) { + if (expect_entries <= 0) { + return 1; + } + double ratio = static_cast(bit_size) / static_cast(expect_entries); + double result = ratio * std::log(2.0); + return std::max(1, static_cast(std::round(result))); +} + +std::shared_ptr BloomFilter::Create(int64_t expect_entries, double fpp) { + int bytes = static_cast(ceil(BloomFilter::OptimalNumOfBits(expect_entries, fpp) / 8.0)); + return std::make_shared(expect_entries, bytes); +} + +BloomFilter::BloomFilter(int64_t expected_entries, int32_t byte_length) + : expected_entries_(expected_entries) { + num_hash_functions_ = + OptimalNumOfHashFunctions(expected_entries, static_cast(byte_length) << 3); + bit_set_ = std::make_shared(byte_length); +} + +Status BloomFilter::AddHash(int32_t hash1) { + int hash2 = hash1 >> 16; + + for (int32_t i = 1; i <= num_hash_functions_; i++) { + int32_t combined_hash = hash1 + (i * hash2); + // hashcode should be positive, flip all the bits if it's negative + if (combined_hash < 0) { + combined_hash = ~combined_hash; + } + int32_t pos = combined_hash % bit_set_->BitSize(); + PAIMON_RETURN_NOT_OK(bit_set_->Set(pos)); + } + return Status::OK(); +} + +bool BloomFilter::TestHash(int32_t hash1) const { + int32_t hash2 = hash1 >> 16; + + for (int i = 1; i <= num_hash_functions_; i++) { + int32_t combined_hash = hash1 + (i * hash2); + // hashcode should be positive, flip all the bits if it's negative + if (combined_hash < 0) { + combined_hash = ~combined_hash; + } + int32_t pos = combined_hash % bit_set_->BitSize(); + if (!bit_set_->Get(pos)) { + return false; + } + } + return true; +} + +Status BloomFilter::SetMemorySegment(std::shared_ptr segment, int32_t offset) { + return bit_set_->SetMemorySegment(segment, offset); +} + +void BloomFilter::Reset() { + bit_set_->Clear(); +} + +} // namespace paimon diff --git a/src/paimon/common/utils/bloom_filter.h b/src/paimon/common/utils/bloom_filter.h new file mode 100644 index 00000000..2cb5012c --- /dev/null +++ b/src/paimon/common/utils/bloom_filter.h @@ -0,0 +1,70 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include + +#include "paimon/common/utils/bit_set.h" +#include "paimon/memory/bytes.h" +#include "paimon/visibility.h" + +namespace paimon { + +/// Bloom filter based on MemorySegment. +class PAIMON_EXPORT BloomFilter { + public: + static int32_t OptimalNumOfBits(int64_t expect_entries, double fpp); + static int32_t OptimalNumOfHashFunctions(int64_t expect_entries, int64_t bit_size); + static std::shared_ptr Create(int64_t expect_entries, double fpp); + + public: + BloomFilter(int64_t expected_entries, int32_t byte_length); + + int32_t GetNumHashFunctions() const { + return num_hash_functions_; + } + + int64_t ExpectedEntries() const { + return expected_entries_; + } + + int64_t ByteLength() const { + return bit_set_->ByteLength(); + } + + std::shared_ptr GetBitSet() { + return bit_set_; + } + + Status SetMemorySegment(std::shared_ptr segment, int32_t offset = 0); + + Status AddHash(int32_t hash1); + + bool TestHash(int32_t hash1) const; + + void Reset(); + + private: + static constexpr int32_t BYTE_SIZE = 8; + + private: + int64_t expected_entries_; + int32_t num_hash_functions_ = -1; + std::shared_ptr bit_set_; +}; +} // namespace paimon diff --git a/src/paimon/common/utils/bloom_filter_test.cpp b/src/paimon/common/utils/bloom_filter_test.cpp new file mode 100644 index 00000000..d652ce9a --- /dev/null +++ b/src/paimon/common/utils/bloom_filter_test.cpp @@ -0,0 +1,158 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/bloom_filter.h" + +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(BloomFilterTest, TestOneSegmentBuilder) { + int32_t items = 100; + auto pool = GetDefaultPool(); + auto bloom_filter = BloomFilter::Create(items, 0.01); + auto seg = MemorySegment::AllocateHeapMemory(1024, pool.get()); + auto ptr = std::make_shared(seg); + ASSERT_OK(bloom_filter->SetMemorySegment(ptr)); + + std::mt19937_64 engine(std::random_device{}()); // NOLINT(whitespace/braces) + std::uniform_int_distribution distribution(0, items); + std::set test_data; + for (int32_t i = 0; i < items; i++) { + int32_t random = distribution(engine); + test_data.insert(random); + ASSERT_OK(bloom_filter->AddHash(random)); + } + + for (const auto& value : test_data) { + ASSERT_TRUE(bloom_filter->TestHash(value)); + } +} + +TEST(BloomFilterTest, TestEstimatedHashFunctions) { + ASSERT_EQ(7, BloomFilter::Create(1000, 0.01)->GetNumHashFunctions()); + ASSERT_EQ(7, BloomFilter::Create(10000, 0.01)->GetNumHashFunctions()); + ASSERT_EQ(7, BloomFilter::Create(100000, 0.01)->GetNumHashFunctions()); + ASSERT_EQ(4, BloomFilter::Create(100000, 0.05)->GetNumHashFunctions()); + ASSERT_EQ(7, BloomFilter::Create(1000000, 0.01)->GetNumHashFunctions()); + ASSERT_EQ(4, BloomFilter::Create(1000000, 0.05)->GetNumHashFunctions()); +} + +TEST(BloomFilterTest, TestBloomNumBits) { + ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(0, 0)); + ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(0, 1)); + ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(1, 1)); + ASSERT_EQ(7, BloomFilter::OptimalNumOfBits(1, 0.03)); + ASSERT_EQ(72, BloomFilter::OptimalNumOfBits(10, 0.03)); + ASSERT_EQ(729, BloomFilter::OptimalNumOfBits(100, 0.03)); + ASSERT_EQ(7298, BloomFilter::OptimalNumOfBits(1000, 0.03)); + ASSERT_EQ(72984, BloomFilter::OptimalNumOfBits(10000, 0.03)); + ASSERT_EQ(729844, BloomFilter::OptimalNumOfBits(100000, 0.03)); + ASSERT_EQ(7298440, BloomFilter::OptimalNumOfBits(1000000, 0.03)); + ASSERT_EQ(6235224, BloomFilter::OptimalNumOfBits(1000000, 0.05)); + ASSERT_EQ(1870567268, BloomFilter::OptimalNumOfBits(300000000, 0.05)); + ASSERT_EQ(1437758756, BloomFilter::OptimalNumOfBits(300000000, 0.1)); + ASSERT_EQ(432808512, BloomFilter::OptimalNumOfBits(300000000, 0.5)); + ASSERT_EQ(1393332198, BloomFilter::OptimalNumOfBits(3000000000, 0.8)); + ASSERT_EQ(657882327, BloomFilter::OptimalNumOfBits(3000000000, 0.9)); + ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(3000000000, 1)); +} + +TEST(BloomFilterTest, TestBloomNumHashFunctions) { + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(-1, -1)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(0, 0)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(10, 0)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(10, 10)); + ASSERT_EQ(7, BloomFilter::OptimalNumOfHashFunctions(10, 100)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(100, 100)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(1000, 100)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(10000, 100)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(100000, 100)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(1000000, 100)); + ASSERT_EQ(3634, BloomFilter::OptimalNumOfHashFunctions(100, 64 * 1024 * 8)); + ASSERT_EQ(363, BloomFilter::OptimalNumOfHashFunctions(1000, 64 * 1024 * 8)); + ASSERT_EQ(36, BloomFilter::OptimalNumOfHashFunctions(10000, 64 * 1024 * 8)); + ASSERT_EQ(4, BloomFilter::OptimalNumOfHashFunctions(100000, 64 * 1024 * 8)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(1000000, 64 * 1024 * 8)); +} + +TEST(BloomFilterTest, TestBloomFilter) { + int32_t items = 100; + auto pool = GetDefaultPool(); + auto bloom_filter = std::make_shared(100, 1024); + + std::mt19937_64 engine(std::random_device{}()); // NOLINT(whitespace/braces) + std::uniform_int_distribution distribution(0, items); + + // segments 1 + auto seg1 = MemorySegment::AllocateHeapMemory(1024, pool.get()); + auto ptr1 = std::make_shared(seg1); + ASSERT_OK(bloom_filter->SetMemorySegment(ptr1)); + + std::set test_data1; + for (int32_t i = 0; i < items; i++) { + int32_t random = distribution(engine); + test_data1.insert(random); + ASSERT_OK(bloom_filter->AddHash(random)); + } + for (const auto& value : test_data1) { + ASSERT_TRUE(bloom_filter->TestHash(value)); + } + + // segments 2 + std::set test_data2; + auto seg2 = MemorySegment::AllocateHeapMemory(1024, pool.get()); + auto ptr2 = std::make_shared(seg2); + ASSERT_OK(bloom_filter->SetMemorySegment(ptr2)); + for (int32_t i = 0; i < items; i++) { + int32_t random = distribution(engine); + test_data2.insert(random); + ASSERT_OK(bloom_filter->AddHash(random)); + } + for (const auto& value : test_data2) { + ASSERT_TRUE(bloom_filter->TestHash(value)); + } + // switch to segment1 + ASSERT_OK(bloom_filter->SetMemorySegment(ptr1)); + for (const auto& value : test_data1) { + ASSERT_TRUE(bloom_filter->TestHash(value)); + } + + // clear segment1 + bloom_filter->Reset(); + for (const auto& value : test_data1) { + ASSERT_FALSE(bloom_filter->TestHash(value)); + } + + // switch to segment2 and clear + ASSERT_OK(bloom_filter->SetMemorySegment(ptr2)); + bloom_filter->Reset(); + for (const auto& value : test_data2) { + ASSERT_FALSE(bloom_filter->TestHash(value)); + } +} + +} // namespace paimon::test diff --git a/test/test_data/sst/none/79d01717-8380-4504-86e1-387e6c058d0a b/test/test_data/sst/none/79d01717-8380-4504-86e1-387e6c058d0a new file mode 100644 index 00000000..40f31452 Binary files /dev/null and b/test/test_data/sst/none/79d01717-8380-4504-86e1-387e6c058d0a differ