From 026ddfc1afe3d48156764e7753aac2b00673d613 Mon Sep 17 00:00:00 2001 From: denglong Date: Tue, 12 Aug 2025 12:30:40 +0800 Subject: [PATCH 1/4] fix: use std::string --- hybridse/src/udf/containers.h | 7 +++---- hybridse/src/udf/default_defs/window_functions_def.cc | 3 ++- src/catalog/distribute_iterator.cc | 2 +- src/catalog/distribute_iterator.h | 3 ++- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/hybridse/src/udf/containers.h b/hybridse/src/udf/containers.h index ad7d3811dcc..7ff41cb1af4 100644 --- a/hybridse/src/udf/containers.h +++ b/hybridse/src/udf/containers.h @@ -45,10 +45,9 @@ struct ContainerStorageTypeTrait { template <> struct ContainerStorageTypeTrait { - // FIXME: StringRef do not own data, ref #2944 - using type = codec::StringRef; - static codec::StringRef to_stored_value(codec::StringRef* t) { - return t == nullptr ? codec::StringRef() : *t; + using type = std::string; + static std::string to_stored_value(codec::StringRef* t) { + return t == nullptr ? "" : t->ToString(); } }; diff --git a/hybridse/src/udf/default_defs/window_functions_def.cc b/hybridse/src/udf/default_defs/window_functions_def.cc index 8f277967dd2..f6977bc420e 100644 --- a/hybridse/src/udf/default_defs/window_functions_def.cc +++ b/hybridse/src/udf/default_defs/window_functions_def.cc @@ -145,7 +145,8 @@ struct NthValueWhere { // saved value list // if `nth` > 0, work like a ring buffer // if `nth < 0`, have one value at most - std::queue, std::list>> data; + using StorageT = typename container::ContainerStorageTypeTrait::type; + std::queue, std::list>> data; }; // (nth idx, [(value, value_is_null)]) diff --git a/src/catalog/distribute_iterator.cc b/src/catalog/distribute_iterator.cc index 519dec5f2fa..e4c90bf7bcd 100644 --- a/src/catalog/distribute_iterator.cc +++ b/src/catalog/distribute_iterator.cc @@ -185,7 +185,7 @@ const ::hybridse::codec::Row& FullTableIterator::GetValue() { int8_t* copyed_row_data = reinterpret_cast(malloc(sz)); memcpy(copyed_row_data, slice_row.data(), sz); auto shared_slice = ::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, sz); - buffered_slices_.push_back(shared_slice); + buffered_slices_.put(shared_slice); value_.Reset(shared_slice); return value_; } diff --git a/src/catalog/distribute_iterator.h b/src/catalog/distribute_iterator.h index 4a8987ac77a..61c0f98fd37 100644 --- a/src/catalog/distribute_iterator.h +++ b/src/catalog/distribute_iterator.h @@ -24,6 +24,7 @@ #include "base/hash.h" #include "base/kv_iterator.h" +#include "base/ringqueue.h" #include "client/tablet_client.h" #include "storage/table.h" #include "vm/catalog.h" @@ -81,7 +82,7 @@ class FullTableIterator : public ::hybridse::codec::ConstIterator Next() -> return res bool valid_value_ = false; - std::vector buffered_slices_; + base::RingQueue buffered_slices_; int64_t cnt_ = 0; }; From 44af67dfce8672ee22f5e8111f92f06a0d435bf1 Mon Sep 17 00:00:00 2001 From: denglong Date: Tue, 12 Aug 2025 16:27:14 +0800 Subject: [PATCH 2/4] fix: update aggregate_def --- hybridse/src/udf/default_defs/aggregate_def.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hybridse/src/udf/default_defs/aggregate_def.cc b/hybridse/src/udf/default_defs/aggregate_def.cc index 5e7ed26587c..1281bc1c4fd 100644 --- a/hybridse/src/udf/default_defs/aggregate_def.cc +++ b/hybridse/src/udf/default_defs/aggregate_def.cc @@ -69,7 +69,8 @@ template struct StdTemplate { template struct Impl { - using ContainerT = std::pair, double>; + using StorageT = typename container::ContainerStorageTypeTrait::type; + using ContainerT = std::pair, double>; void operator()(UdafRegistryHelper& helper) { // NOLINT std::string suffix = absl::StrCat(".opaque_std_pair_std_vector_double_", DataTypeTrait::to_string()); @@ -113,9 +114,10 @@ struct StdTemplate { template struct ShannonEntropy { using CType = typename DataTypeTrait::CCallArgType; + using StorageT = typename container::ContainerStorageTypeTrait::type; // intermedate state: ([key -> count], total count) - using ContainerT = std::pair, int64_t>; + using ContainerT = std::pair, int64_t>; void operator()(UdafRegistryHelper& helper) { // NOLINT std::string prefix = absl::StrCat(helper.name(), "_", DataTypeTrait::to_string()); From b4fb4ad19b293949a5191892550032f2421475e3 Mon Sep 17 00:00:00 2001 From: denglong Date: Wed, 13 Aug 2025 17:17:25 +0800 Subject: [PATCH 3/4] feat: add std::string trait --- hybridse/src/udf/containers.h | 36 ++++++++++--------- .../src/udf/default_defs/aggregate_def.cc | 7 ++-- .../udf/default_defs/window_functions_def.cc | 6 ++-- hybridse/src/udf/default_udf_library.cc | 7 ++-- 4 files changed, 30 insertions(+), 26 deletions(-) diff --git a/hybridse/src/udf/containers.h b/hybridse/src/udf/containers.h index 7ff41cb1af4..28641b4708f 100644 --- a/hybridse/src/udf/containers.h +++ b/hybridse/src/udf/containers.h @@ -37,30 +37,34 @@ namespace container { /** * Specify actual stored type, store itself for most of the primitive types. */ -template +template struct ContainerStorageTypeTrait { using type = T; static T to_stored_value(const T& t) { return t; } }; -template <> -struct ContainerStorageTypeTrait { - using type = std::string; - static std::string to_stored_value(codec::StringRef* t) { - return t == nullptr ? "" : t->ToString(); +template +struct ContainerStorageTypeTrait { + using type = std::conditional_t; + static type to_stored_value(codec::StringRef* t) { + if constexpr (UseStringStorage) { + return t == nullptr ? "" : t->ToString(); + } else { + return t == nullptr ? codec::StringRef() : *t; + } } }; -template <> -struct ContainerStorageTypeTrait { +template +struct ContainerStorageTypeTrait { using type = openmldb::base::Date; static openmldb::base::Date to_stored_value(openmldb::base::Date* t) { return t == nullptr ? openmldb::base::Date(0) : *t; } }; -template <> -struct ContainerStorageTypeTrait { +template +struct ContainerStorageTypeTrait { using type = openmldb::base::Timestamp; static openmldb::base::Timestamp to_stored_value(openmldb::base::Timestamp* t) { return t == nullptr ? openmldb::base::Timestamp(0) : *t; @@ -74,7 +78,7 @@ class TopKContainer { using InputT = typename DataTypeTrait::CCallArgType; // actual stored type - using StorageT = typename ContainerStorageTypeTrait::type; + using StorageT = typename ContainerStorageTypeTrait::type; // self type using ContainerT = TopKContainer; @@ -140,7 +144,7 @@ class TopKContainer { } void Push(InputT t) { - auto key = ContainerStorageTypeTrait::to_stored_value(t); + auto key = ContainerStorageTypeTrait::to_stored_value(t); auto iter = map_.find(key); if (iter == map_.end()) { map_.insert(iter, {key, 1}); @@ -197,7 +201,7 @@ struct DefaultPairCmp { }; template ::type, + typename StorageV = typename ContainerStorageTypeTrait::type, template typename PairCmp = DefaultPairCmp> class BoundedGroupByDict { public: @@ -208,7 +212,7 @@ class BoundedGroupByDict { using InputK = typename DataTypeTrait::CCallArgType; using InputV = typename DataTypeTrait::CCallArgType; // actual stored type - using StorageK = typename ContainerStorageTypeTrait::type; + using StorageK = typename ContainerStorageTypeTrait::type; // self type using ContainerT = BoundedGroupByDict; @@ -217,10 +221,10 @@ class BoundedGroupByDict { // convert to internal key and value static inline StorageK to_stored_key(const InputK& key) { - return ContainerStorageTypeTrait::to_stored_value(key); + return ContainerStorageTypeTrait::to_stored_value(key); } static inline auto to_stored_value(const InputV& value) { - return ContainerStorageTypeTrait::to_stored_value(value); + return ContainerStorageTypeTrait::to_stored_value(value); } static void Init(ContainerT* addr) { new (addr) ContainerT(); } diff --git a/hybridse/src/udf/default_defs/aggregate_def.cc b/hybridse/src/udf/default_defs/aggregate_def.cc index 1281bc1c4fd..695573ee32e 100644 --- a/hybridse/src/udf/default_defs/aggregate_def.cc +++ b/hybridse/src/udf/default_defs/aggregate_def.cc @@ -69,8 +69,7 @@ template struct StdTemplate { template struct Impl { - using StorageT = typename container::ContainerStorageTypeTrait::type; - using ContainerT = std::pair, double>; + using ContainerT = std::pair, double>; void operator()(UdafRegistryHelper& helper) { // NOLINT std::string suffix = absl::StrCat(".opaque_std_pair_std_vector_double_", DataTypeTrait::to_string()); @@ -114,7 +113,7 @@ struct StdTemplate { template struct ShannonEntropy { using CType = typename DataTypeTrait::CCallArgType; - using StorageT = typename container::ContainerStorageTypeTrait::type; + using StorageT = typename container::ContainerStorageTypeTrait::type; // intermedate state: ([key -> count], total count) using ContainerT = std::pair, int64_t>; @@ -133,7 +132,7 @@ struct ShannonEntropy { if (is_null) { return ctr; } - auto val = container::ContainerStorageTypeTrait::to_stored_value(value); + auto val = container::ContainerStorageTypeTrait::to_stored_value(value); auto [it, inserted] = ctr->first.try_emplace(val, 0); it->second++; ctr->second++; diff --git a/hybridse/src/udf/default_defs/window_functions_def.cc b/hybridse/src/udf/default_defs/window_functions_def.cc index f6977bc420e..b6f3cb776a2 100644 --- a/hybridse/src/udf/default_defs/window_functions_def.cc +++ b/hybridse/src/udf/default_defs/window_functions_def.cc @@ -145,7 +145,7 @@ struct NthValueWhere { // saved value list // if `nth` > 0, work like a ring buffer // if `nth < 0`, have one value at most - using StorageT = typename container::ContainerStorageTypeTrait::type; + using StorageT = typename container::ContainerStorageTypeTrait::type; std::queue, std::list>> data; }; @@ -169,7 +169,7 @@ struct NthValueWhere { if (cond && !cond_is_null) { if (ctr->nth > 0) { // nth from window start - ctr->data.emplace(container::ContainerStorageTypeTrait::to_stored_value(value), value_is_null); + ctr->data.emplace(container::ContainerStorageTypeTrait::to_stored_value(value), value_is_null); if (ctr->data.size() > ctr->nth) { ctr->data.pop(); } @@ -178,7 +178,7 @@ struct NthValueWhere { ctr->cur_idx++; if (ctr->cur_idx + ctr->nth == 0) { // reaches nth - ctr->data.emplace(container::ContainerStorageTypeTrait::to_stored_value(value), + ctr->data.emplace(container::ContainerStorageTypeTrait::to_stored_value(value), value_is_null); } } diff --git a/hybridse/src/udf/default_udf_library.cc b/hybridse/src/udf/default_udf_library.cc index a6be6745917..9f25a1a9d19 100644 --- a/hybridse/src/udf/default_udf_library.cc +++ b/hybridse/src/udf/default_udf_library.cc @@ -328,7 +328,8 @@ struct EwAvgUdafDef { template struct DistinctCountDef { using ArgT = typename DataTypeTrait::CCallArgType; - using SetT = std::unordered_set; + using StorageT = typename container::ContainerStorageTypeTrait::type; + using SetT = std::unordered_set; void operator()(UdafRegistryHelper& helper) { // NOLINT std::string suffix = ".opaque_std_set_" + DataTypeTrait::to_string(); @@ -351,7 +352,7 @@ struct DistinctCountDef { template struct UpdateImpl { static SetT* update_set(SetT* set, V value) { - set->insert(value); + set->insert(container::ContainerStorageTypeTrait::to_stored_value(value)); return set; } }; @@ -359,7 +360,7 @@ struct DistinctCountDef { template struct UpdateImpl { static SetT* update_set(SetT* set, V* value) { - set->insert(*value); + set->insert(container::ContainerStorageTypeTrait::to_stored_value(value)); return set; } }; From 633a4e7ad04fcda00c4d04c4017df47e1f05f193 Mon Sep 17 00:00:00 2001 From: denglong Date: Thu, 14 Aug 2025 14:23:03 +0800 Subject: [PATCH 4/4] feat: update ringbuffer size --- src/catalog/distribute_iterator.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/catalog/distribute_iterator.cc b/src/catalog/distribute_iterator.cc index e4c90bf7bcd..319f67a5ced 100644 --- a/src/catalog/distribute_iterator.cc +++ b/src/catalog/distribute_iterator.cc @@ -29,7 +29,7 @@ constexpr uint32_t INVALID_PID = UINT32_MAX; FullTableIterator::FullTableIterator(uint32_t tid, std::shared_ptr tables, const std::map>& tablet_clients) : tid_(tid), tables_(tables), tablet_clients_(tablet_clients), in_local_(true), cur_pid_(INVALID_PID), - it_(), kv_it_(), key_(0), last_ts_(0), last_pk_(), value_() { + it_(), kv_it_(), key_(0), last_ts_(0), last_pk_(), value_(), buffered_slices_(10) { } void FullTableIterator::SeekToFirst() {