Skip to content

Commit

Permalink
Add merge() method to ApproxMostFrequentStreamSummary as an optimizat…
Browse files Browse the repository at this point in the history
…ion over mergeSerialized() (#12236)

Summary:

# Context

This diff adds a merge() method which behaves equivalent to mergeSerialized().

The reason this is added is because this avoids the extra round trip serialization + deserialization that's required when calling mergeSerialized().

# Rationale

The rationale here is that, the serialize function simply calls values()[i] and priorities()[i], and `memcpy`'s the integers.

Then when we deserialize, we also simply take the values as-is and re-insert them into the target data structure.

So obviously; rather than ser + deser to perform the exact same operation, we can just copy directly.

There are a couple of considerations:
1. **StringView** - Because we do not serialize, velox::StringView is still pointing at whatever `other` ApproxMostFrequentStreamSummary is pointing to, which means *the lifetime of the string must be kept alive across both structures, even if the `other` one disappears*. This ... feels okay, but requires consideration from the user of this class.
2. **StringView equality** - seems like previous behavior is correct (insert will lookup index in priority queue and use operator== on StringView which should compare contents). So technically if all `StringView` point to equal strings, then this won't be problem.

Differential Revision: D68995835
  • Loading branch information
Yiyang Chen authored and facebook-github-bot committed Feb 3, 2025
1 parent 2e316dd commit 39458ef
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
16 changes: 16 additions & 0 deletions velox/functions/lib/ApproxMostFrequentStreamSummary.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ struct ApproxMostFrequentStreamSummary {
/// Merge this summary with values from another serialized summary.
void mergeSerialized(const char* bytes);

/// Merge this summary with values from another deserialized summary.
/// This behaves the same as if serializing `other` and calling
/// `mergeSerialized`, except any StringView in `other` that are added to this
/// ApproxMostFrequentStreamSummary will have pointers copied; whereas
/// `mergeSerialized` would be pointing at a position within the provided
/// `const char* bytes`.
void merge(const ApproxMostFrequentStreamSummary<T, Allocator>& other);

/// Return the number of distinct values currently being tracked.
int size() const;

Expand Down Expand Up @@ -242,4 +250,12 @@ void ApproxMostFrequentStreamSummary<T, A>::mergeSerialized(const char* other) {
}
}

template <typename T, typename A>
void ApproxMostFrequentStreamSummary<T, A>::merge(
const ApproxMostFrequentStreamSummary<T, A>& other) {
for (int i = 0; i < other.size(); ++i) {
insert(other.values()[i], other.counts()[i]);
}
}

} // namespace facebook::velox::functions
25 changes: 25 additions & 0 deletions velox/functions/lib/tests/ApproxMostFrequentStreamSummaryTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,31 @@ TEST(ApproxMostFrequentStreamSummaryTest, mergeSerialized) {
}
}

TEST(ApproxMostFrequentStreamSummaryTest, merge) {
constexpr int kSummaryCount = 10;
constexpr int kCapacity = 30;
std::default_random_engine gen(0);
ZetaDistribution dist(1.02, 100);
int64_t freq[101]{};
ApproxMostFrequentStreamSummary<int> summary;
summary.setCapacity(kCapacity);
for (int i = 0; i < kSummaryCount; ++i) {
ApproxMostFrequentStreamSummary<int> summary2;
summary2.setCapacity(kCapacity);
for (int j = 0; j < 100; ++j) {
int v = dist(gen);
summary2.insert(v);
++freq[v];
}
summary.merge(summary2);
}
auto topK = summary.topK(3);
ASSERT_EQ(topK.size(), 3);
for (int i = 0; i < 3; ++i) {
EXPECT_EQ(topK[i], std::make_pair(i + 1, freq[i + 1]));
}
}

TEST(ApproxMostFrequentStreamSummaryTest, capacity) {
constexpr int kCapacity = 30;
ApproxMostFrequentStreamSummary<int> summary;
Expand Down

0 comments on commit 39458ef

Please sign in to comment.