Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions hybridse/src/udf/containers.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,34 @@ namespace container {
/**
* Specify actual stored type, store itself for most of the primitive types.
*/
template <typename T>
template <typename T, bool UseStringStorage = false>
struct ContainerStorageTypeTrait {
using type = T;
static T to_stored_value(const T& t) { return t; }
};

template <>
struct ContainerStorageTypeTrait<openmldb::base::StringRef> {
// FIXME: StringRef do not own data, ref #2944
using type = codec::StringRef;
static codec::StringRef to_stored_value(codec::StringRef* t) {
return t == nullptr ? codec::StringRef() : *t;
template <bool UseStringStorage>
struct ContainerStorageTypeTrait<openmldb::base::StringRef, UseStringStorage> {
using type = std::conditional_t<UseStringStorage, std::string, openmldb::base::StringRef>;
static type to_stored_value(codec::StringRef* t) {
if constexpr (UseStringStorage) {
return t == nullptr ? "" : t->ToString();
} else {
return t == nullptr ? codec::StringRef() : *t;
}
}
};

template <>
struct ContainerStorageTypeTrait<openmldb::base::Date> {
template <bool UseStringStorage>
struct ContainerStorageTypeTrait<openmldb::base::Date, UseStringStorage> {
using type = openmldb::base::Date;
static openmldb::base::Date to_stored_value(openmldb::base::Date* t) {
return t == nullptr ? openmldb::base::Date(0) : *t;
}
};

template <>
struct ContainerStorageTypeTrait<openmldb::base::Timestamp> {
template <bool UseStringStorage>
struct ContainerStorageTypeTrait<openmldb::base::Timestamp, UseStringStorage> {
using type = openmldb::base::Timestamp;
static openmldb::base::Timestamp to_stored_value(openmldb::base::Timestamp* t) {
return t == nullptr ? openmldb::base::Timestamp(0) : *t;
Expand All @@ -75,7 +78,7 @@ class TopKContainer {
using InputT = typename DataTypeTrait<T>::CCallArgType;

// actual stored type
using StorageT = typename ContainerStorageTypeTrait<T>::type;
using StorageT = typename ContainerStorageTypeTrait<T, true>::type;

// self type
using ContainerT = TopKContainer<T, BoundT>;
Expand Down Expand Up @@ -141,7 +144,7 @@ class TopKContainer {
}

void Push(InputT t) {
auto key = ContainerStorageTypeTrait<T>::to_stored_value(t);
auto key = ContainerStorageTypeTrait<T, true>::to_stored_value(t);
auto iter = map_.find(key);
if (iter == map_.end()) {
map_.insert(iter, {key, 1});
Expand Down Expand Up @@ -198,7 +201,7 @@ struct DefaultPairCmp {
};

template <typename K, typename V,
typename StorageV = typename ContainerStorageTypeTrait<V>::type,
typename StorageV = typename ContainerStorageTypeTrait<V, true>::type,
template <typename, typename> typename PairCmp = DefaultPairCmp>
class BoundedGroupByDict {
public:
Expand All @@ -209,7 +212,7 @@ class BoundedGroupByDict {
using InputK = typename DataTypeTrait<K>::CCallArgType;
using InputV = typename DataTypeTrait<V>::CCallArgType;
// actual stored type
using StorageK = typename ContainerStorageTypeTrait<K>::type;
using StorageK = typename ContainerStorageTypeTrait<K, true>::type;

// self type
using ContainerT = BoundedGroupByDict<K, V, StorageV, PairCmp>;
Expand All @@ -218,10 +221,10 @@ class BoundedGroupByDict {

// convert to internal key and value
static inline StorageK to_stored_key(const InputK& key) {
return ContainerStorageTypeTrait<K>::to_stored_value(key);
return ContainerStorageTypeTrait<K, true>::to_stored_value(key);
}
static inline auto to_stored_value(const InputV& value) {
return ContainerStorageTypeTrait<V>::to_stored_value(value);
return ContainerStorageTypeTrait<V, true>::to_stored_value(value);
}

static void Init(ContainerT* addr) { new (addr) ContainerT(); }
Expand Down
5 changes: 3 additions & 2 deletions hybridse/src/udf/default_defs/aggregate_def.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ struct StdTemplate {
template <typename T>
struct ShannonEntropy {
using CType = typename DataTypeTrait<T>::CCallArgType;
using StorageT = typename container::ContainerStorageTypeTrait<T, true>::type;

// intermedate state: ([key -> count], total count)
using ContainerT = std::pair<std::map<T, int64_t>, int64_t>;
using ContainerT = std::pair<std::map<StorageT, int64_t>, int64_t>;

void operator()(UdafRegistryHelper& helper) { // NOLINT
std::string prefix = absl::StrCat(helper.name(), "_", DataTypeTrait<T>::to_string());
Expand All @@ -131,7 +132,7 @@ struct ShannonEntropy {
if (is_null) {
return ctr;
}
auto val = container::ContainerStorageTypeTrait<T>::to_stored_value(value);
auto val = container::ContainerStorageTypeTrait<T, true>::to_stored_value(value);
auto [it, inserted] = ctr->first.try_emplace(val, 0);
it->second++;
ctr->second++;
Expand Down
7 changes: 4 additions & 3 deletions hybridse/src/udf/default_defs/window_functions_def.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ struct NthValueWhere {
// saved value list
// if `nth` > 0, work like a ring buffer
// if `nth < 0`, have one value at most
std::queue<std::pair<T, bool>, std::list<std::pair<T, bool>>> data;
using StorageT = typename container::ContainerStorageTypeTrait<T, true>::type;
std::queue<std::pair<StorageT, bool>, std::list<std::pair<StorageT, bool>>> data;
};

// (nth idx, [(value, value_is_null)])
Expand All @@ -168,7 +169,7 @@ struct NthValueWhere {
if (cond && !cond_is_null) {
if (ctr->nth > 0) {
// nth from window start
ctr->data.emplace(container::ContainerStorageTypeTrait<T>::to_stored_value(value), value_is_null);
ctr->data.emplace(container::ContainerStorageTypeTrait<T, true>::to_stored_value(value), value_is_null);
if (ctr->data.size() > static_cast<size_t>(ctr->nth)) {
ctr->data.pop();
}
Expand All @@ -177,7 +178,7 @@ struct NthValueWhere {
ctr->cur_idx++;
if (ctr->cur_idx + ctr->nth == 0) {
// reaches nth
ctr->data.emplace(container::ContainerStorageTypeTrait<T>::to_stored_value(value),
ctr->data.emplace(container::ContainerStorageTypeTrait<T, true>::to_stored_value(value),
value_is_null);
}
}
Expand Down
7 changes: 4 additions & 3 deletions hybridse/src/udf/default_udf_library.cc
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ struct EwAvgUdafDef {
template <typename T>
struct DistinctCountDef {
using ArgT = typename DataTypeTrait<T>::CCallArgType;
using SetT = std::unordered_set<T>;
using StorageT = typename container::ContainerStorageTypeTrait<T, true>::type;
using SetT = std::unordered_set<StorageT>;

void operator()(UdafRegistryHelper& helper) { // NOLINT
std::string suffix = ".opaque_std_set_" + DataTypeTrait<T>::to_string();
Expand All @@ -351,15 +352,15 @@ struct DistinctCountDef {
template <typename V>
struct UpdateImpl {
static SetT* update_set(SetT* set, V value) {
set->insert(value);
set->insert(container::ContainerStorageTypeTrait<V, true>::to_stored_value(value));
return set;
}
};

template <typename V>
struct UpdateImpl<V*> {
static SetT* update_set(SetT* set, V* value) {
set->insert(*value);
set->insert(container::ContainerStorageTypeTrait<V, true>::to_stored_value(value));
return set;
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/distribute_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ constexpr uint32_t INVALID_PID = UINT32_MAX;
FullTableIterator::FullTableIterator(uint32_t tid, std::shared_ptr<Tables> tables,
const std::map<uint32_t, std::shared_ptr<::openmldb::client::TabletClient>>& tablet_clients)
: tid_(tid), tables_(tables), tablet_clients_(tablet_clients), in_local_(true), cur_pid_(INVALID_PID),
it_(), kv_it_(), key_(0), last_ts_(0), last_pk_(), value_() {
it_(), kv_it_(), key_(0), last_ts_(0), last_pk_(), value_(), buffered_slices_(10) {
}

void FullTableIterator::SeekToFirst() {
Expand Down Expand Up @@ -185,7 +185,7 @@ const ::hybridse::codec::Row& FullTableIterator::GetValue() {
int8_t* copyed_row_data = reinterpret_cast<int8_t*>(malloc(sz));
memcpy(copyed_row_data, slice_row.data(), sz);
auto shared_slice = ::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, sz);
buffered_slices_.push_back(shared_slice);
buffered_slices_.put(shared_slice);
value_.Reset(shared_slice);
return value_;
}
Expand Down
3 changes: 2 additions & 1 deletion src/catalog/distribute_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "base/hash.h"
#include "base/kv_iterator.h"
#include "base/ringqueue.h"
#include "client/tablet_client.h"
#include "storage/table.h"
#include "vm/catalog.h"
Expand Down Expand Up @@ -81,7 +82,7 @@ class FullTableIterator : public ::hybridse::codec::ConstIterator<uint64_t, ::hy
// refer to next_row_iterator() in udf.cc for the reason why we must make sure the `value_` is valid
// the call steps in next_row_iterator are: res = GetValue() -> Next() -> return res
bool valid_value_ = false;
std::vector<hybridse::base::RefCountedSlice> buffered_slices_;
base::RingQueue<hybridse::base::RefCountedSlice> buffered_slices_;
int64_t cnt_ = 0;
};

Expand Down
Loading