Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add merge() method to ApproxMostFrequentStreamSummary as an optimization over mergeSerialized() #12236

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions velox/functions/lib/ApproxMostFrequentStreamSummary.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ struct ApproxMostFrequentStreamSummary {
/// Merge this summary with values from another serialized summary.
void mergeSerialized(const char* bytes);

/// Merge this summary with values from another deserialized summary.
/// This behaves the same as if serializing `other` and calling
/// `mergeSerialized`, except:
/// - StringView would copied from the `other` data structure's StringViews
/// as opposed to `const char* bytes`
///
/// Prefer `mergeSerialized`. This is more inefficient as it requires us to
/// deserialize `other` first, and then copy the values; whereas
/// `mergeSerialized` will directly merge the values from the serialized bytes
void merge(const ApproxMostFrequentStreamSummary<T, Allocator>& other);

/// Return the number of distinct values currently being tracked.
int size() const;

Expand Down Expand Up @@ -242,4 +253,12 @@ void ApproxMostFrequentStreamSummary<T, A>::mergeSerialized(const char* other) {
}
}

template <typename T, typename A>
void ApproxMostFrequentStreamSummary<T, A>::merge(
const ApproxMostFrequentStreamSummary<T, A>& other) {
for (int i = 0; i < other.size(); ++i) {
insert(other.values()[i], other.counts()[i]);
}
}

} // namespace facebook::velox::functions
25 changes: 25 additions & 0 deletions velox/functions/lib/tests/ApproxMostFrequentStreamSummaryTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,31 @@ TEST(ApproxMostFrequentStreamSummaryTest, mergeSerialized) {
}
}

TEST(ApproxMostFrequentStreamSummaryTest, merge) {
constexpr int kSummaryCount = 10;
constexpr int kCapacity = 30;
std::default_random_engine gen(0);
ZetaDistribution dist(1.02, 100);
int64_t freq[101]{};
ApproxMostFrequentStreamSummary<int> summary;
summary.setCapacity(kCapacity);
for (int i = 0; i < kSummaryCount; ++i) {
ApproxMostFrequentStreamSummary<int> summary2;
summary2.setCapacity(kCapacity);
for (int j = 0; j < 100; ++j) {
int v = dist(gen);
summary2.insert(v);
++freq[v];
}
summary.merge(summary2);
}
auto topK = summary.topK(3);
ASSERT_EQ(topK.size(), 3);
for (int i = 0; i < 3; ++i) {
EXPECT_EQ(topK[i], std::make_pair(i + 1, freq[i + 1]));
}
}

TEST(ApproxMostFrequentStreamSummaryTest, capacity) {
constexpr int kCapacity = 30;
ApproxMostFrequentStreamSummary<int> summary;
Expand Down