Skip to content

Commit f31f3c9

Browse files
authored
feat(search): Defragment search indices (#6144)
* core: Add quota check to page usage * core/search: Add utility functions to search/base * core/search: Add defragment support to blocklist,sortedvector * core/search: Introduce a defragment utility * search/core: Add defragmentation support for TagIndex * core/search: Add defragmentation support in FieldIndices * server/search: Add defragmentation support for shard doc indices * tests: Unit tests for defragmentation of indices and quota checks Signed-off-by: Abhijat Malviya <[email protected]>
1 parent ba791bf commit f31f3c9

File tree

15 files changed

+421
-16
lines changed

15 files changed

+421
-16
lines changed

src/core/page_usage/page_usage_stats.cc

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
#include <string>
1414

15+
#include "base/cycle_clock.h"
16+
1517
extern "C" {
1618
#include <unistd.h>
1719

@@ -156,8 +158,11 @@ CollectedPageStats PageUsage::UniquePages::CollectedStats() const {
156158
.shard_wide_summary = {}};
157159
}
158160

159-
PageUsage::PageUsage(CollectPageStats collect_stats, float threshold)
160-
: collect_stats_{collect_stats}, threshold_{threshold} {
161+
PageUsage::PageUsage(CollectPageStats collect_stats, float threshold, uint64_t quota_usec)
162+
: collect_stats_{collect_stats},
163+
threshold_{threshold},
164+
quota_cycles_{quota_usec == kMaxQuota ? kMaxQuota : base::CycleClock::FromUsec(quota_usec)},
165+
start_cycles_(base::CycleClock::Now()) {
161166
}
162167

163168
bool PageUsage::IsPageForObjectUnderUtilized(void* object) {
@@ -179,4 +184,12 @@ bool PageUsage::ConsumePageStats(mi_page_usage_stats_t stat) {
179184
return force_reallocate_ || should_reallocate;
180185
}
181186

187+
bool PageUsage::QuotaDepleted() const {
188+
if (quota_cycles_ == kMaxQuota) {
189+
return false;
190+
}
191+
192+
return base::CycleClock::Now() - start_cycles_ >= quota_cycles_;
193+
}
194+
182195
} // namespace dfly

src/core/page_usage/page_usage_stats.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ struct CollectedPageStats {
4242

4343
class PageUsage {
4444
public:
45-
PageUsage(CollectPageStats collect_stats, float threshold);
45+
static constexpr uint64_t kMaxQuota = std::numeric_limits<uint64_t>::max();
46+
47+
PageUsage(CollectPageStats collect_stats, float threshold, uint64_t quota_usec = kMaxQuota);
4648

4749
bool IsPageForObjectUnderUtilized(void* object);
4850

@@ -66,6 +68,8 @@ class PageUsage {
6668
force_reallocate_ = force_reallocate;
6769
}
6870

71+
bool QuotaDepleted() const;
72+
6973
private:
7074
CollectPageStats collect_stats_{CollectPageStats::NO};
7175
float threshold_;
@@ -93,6 +97,9 @@ class PageUsage {
9397

9498
// For use in testing, forces reallocate check to always return true
9599
bool force_reallocate_{false};
100+
101+
uint64_t quota_cycles_;
102+
uint64_t start_cycles_;
96103
};
97104

98105
} // namespace dfly

src/core/page_usage_stats_test.cc

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@
1212
#include "core/compact_object.h"
1313
#include "core/qlist.h"
1414
#include "core/score_map.h"
15+
#include "core/search/block_list.h"
16+
#include "core/search/search.h"
1517
#include "core/small_string.h"
1618
#include "core/sorted_map.h"
1719
#include "core/string_map.h"
1820
#include "core/string_set.h"
1921
#include "redis/redis_aux.h"
22+
#include "util/fibers/fibers.h"
2023

2124
extern "C" {
2225
#include "redis/zmalloc.h"
@@ -25,6 +28,7 @@ extern "C" {
2528
ABSL_DECLARE_FLAG(bool, experimental_flat_json);
2629

2730
using namespace dfly;
31+
using namespace std::chrono_literals;
2832

2933
class PageUsageStatsTest : public ::testing::Test {
3034
protected:
@@ -213,3 +217,197 @@ TEST_F(PageUsageStatsTest, JSONCons) {
213217
EXPECT_EQ(json_obj->at("count").as_integer<uint8_t>(), 1);
214218
EXPECT_EQ(json_obj->at("checked").as_bool(), false);
215219
}
220+
221+
TEST_F(PageUsageStatsTest, QuotaChecks) {
222+
{
223+
PageUsage p{CollectPageStats::NO, 0};
224+
EXPECT_FALSE(p.QuotaDepleted());
225+
}
226+
{
227+
PageUsage p{CollectPageStats::NO, 0, 4};
228+
util::ThisFiber::SleepFor(5us);
229+
EXPECT_TRUE(p.QuotaDepleted());
230+
}
231+
}
232+
233+
TEST_F(PageUsageStatsTest, BlockList) {
234+
search::BlockList<search::SortedVector<search::DocId>> bl{&m_, 20};
235+
PageUsage p{CollectPageStats::NO, 0.1};
236+
p.SetForceReallocate(true);
237+
238+
// empty list
239+
auto result = bl.Defragment(&p);
240+
EXPECT_FALSE(result.quota_depleted);
241+
EXPECT_EQ(result.objects_moved, 0);
242+
243+
// single item will move twice, once for the blocklist and once for the sorted vector
244+
bl.Insert(1);
245+
result = bl.Defragment(&p);
246+
EXPECT_FALSE(result.quota_depleted);
247+
EXPECT_EQ(result.objects_moved, 2);
248+
249+
// quota depleted without defragmentation
250+
PageUsage p_zero{CollectPageStats::NO, 0.1, 0};
251+
p_zero.SetForceReallocate(true);
252+
result = bl.Defragment(&p_zero);
253+
EXPECT_TRUE(result.quota_depleted);
254+
EXPECT_EQ(result.objects_moved, 0);
255+
}
256+
257+
TEST_F(PageUsageStatsTest, BlockListDefragmentResumes) {
258+
search::BlockList<search::SortedVector<search::DocId>> bl{&m_, 20};
259+
PageUsage p{CollectPageStats::NO, 0.1};
260+
p.SetForceReallocate(true);
261+
262+
for (size_t i = 0; i < 1000; ++i) {
263+
bl.Insert(i);
264+
}
265+
266+
PageUsage p_small_quota{CollectPageStats::NO, 0.1, 10};
267+
p_small_quota.SetForceReallocate(true);
268+
util::ThisFiber::SleepFor(10us);
269+
auto result = bl.Defragment(&p_small_quota);
270+
EXPECT_TRUE(result.quota_depleted);
271+
EXPECT_GE(result.objects_moved, 0);
272+
273+
result = bl.Defragment(&p);
274+
EXPECT_FALSE(result.quota_depleted);
275+
EXPECT_GT(result.objects_moved, 0);
276+
}
277+
278+
TEST_F(PageUsageStatsTest, BlockListWithPairs) {
279+
search::BlockList<search::SortedVector<std::pair<search::DocId, double>>> bl{&m_, 20};
280+
PageUsage p{CollectPageStats::NO, 0.1};
281+
p.SetForceReallocate(true);
282+
283+
for (size_t i = 0; i < 100; ++i) {
284+
bl.Insert({i, i * 1.1});
285+
}
286+
287+
const auto result = bl.Defragment(&p);
288+
EXPECT_FALSE(result.quota_depleted);
289+
EXPECT_GT(result.objects_moved, 0);
290+
}
291+
292+
TEST_F(PageUsageStatsTest, BlockListWithNonDefragmentableContainer) {
293+
search::BlockList<search::CompressedSortedSet> bl{&m_, 20};
294+
PageUsage p{CollectPageStats::NO, 0.1};
295+
p.SetForceReallocate(true);
296+
297+
// empty list
298+
auto result = bl.Defragment(&p);
299+
EXPECT_FALSE(result.quota_depleted);
300+
EXPECT_EQ(result.objects_moved, 0);
301+
302+
// will reallocate once for the blocklist, the inner sorted set will be skipped
303+
bl.Insert(1);
304+
result = bl.Defragment(&p);
305+
EXPECT_FALSE(result.quota_depleted);
306+
EXPECT_EQ(result.objects_moved, 1);
307+
}
308+
309+
class MockDocument final : public search::DocumentAccessor {
310+
public:
311+
MockDocument() {
312+
words.reserve(1000);
313+
for (size_t i = 0; i < 1000; ++i) {
314+
words.push_back(absl::StrFormat("word-%d", i));
315+
}
316+
}
317+
318+
std::optional<StringList> GetStrings(std::string_view active_field) const override {
319+
return {{words[absl::GetCurrentTimeNanos() % words.size()]}};
320+
}
321+
std::optional<VectorInfo> GetVector(std::string_view active_field) const override {
322+
return std::nullopt;
323+
}
324+
std::optional<NumsList> GetNumbers(std::string_view active_field) const override {
325+
return {{1, 2, 3, 4}};
326+
}
327+
std::optional<StringList> GetTags(std::string_view active_field) const override {
328+
return {{words[absl::GetCurrentTimeNanos() % words.size()]}};
329+
}
330+
331+
std::vector<std::string> words;
332+
};
333+
334+
TEST_F(PageUsageStatsTest, DefragmentTagIndex) {
335+
search::Schema schema;
336+
schema.fields["field_name"] =
337+
search::SchemaField{search::SchemaField::TAG, 0, "fn", search::SchemaField::TagParams{}};
338+
search::FieldIndices index{schema, {}, &m_, nullptr};
339+
340+
PageUsage p{CollectPageStats::NO, 0.1};
341+
p.SetForceReallocate(true);
342+
343+
// Empty index
344+
search::DefragmentResult result = index.Defragment(&p);
345+
EXPECT_FALSE(result.quota_depleted);
346+
EXPECT_EQ(result.objects_moved, 0);
347+
348+
const MockDocument md;
349+
index.Add(1, md);
350+
351+
result = index.Defragment(&p);
352+
EXPECT_FALSE(result.quota_depleted);
353+
// single doc with single term returned by `GetTags` should result in two reallocations.
354+
EXPECT_EQ(result.objects_moved, 2);
355+
356+
PageUsage p_zero{CollectPageStats::NO, 0.1, 0};
357+
p_zero.SetForceReallocate(true);
358+
result = index.Defragment(&p_zero);
359+
EXPECT_TRUE(result.quota_depleted);
360+
EXPECT_EQ(result.objects_moved, 0);
361+
}
362+
363+
TEST_F(PageUsageStatsTest, TagIndexDefragResumeWithChanges) {
364+
search::Schema schema;
365+
schema.fields["field_name"] =
366+
search::SchemaField{search::SchemaField::TAG, 0, "fn", search::SchemaField::TagParams{}};
367+
search::FieldIndices index{schema, {}, &m_, nullptr};
368+
369+
PageUsage p{CollectPageStats::NO, 0.1};
370+
p.SetForceReallocate(true);
371+
372+
const MockDocument md;
373+
for (size_t i = 0; i < 100; ++i) {
374+
index.Add(i, md);
375+
}
376+
377+
PageUsage p_small_quota{CollectPageStats::NO, 0.1, 10};
378+
p_small_quota.SetForceReallocate(true);
379+
search::DefragmentResult result = index.Defragment(&p_small_quota);
380+
EXPECT_TRUE(result.quota_depleted);
381+
EXPECT_GE(result.objects_moved, 0);
382+
383+
index.Remove(99, md);
384+
385+
for (size_t i = 200; i < 300; ++i) {
386+
index.Add(i, md);
387+
}
388+
389+
result = index.Defragment(&p);
390+
EXPECT_FALSE(result.quota_depleted);
391+
EXPECT_GT(result.objects_moved, 0);
392+
}
393+
394+
TEST_F(PageUsageStatsTest, DefragmentIndexWithNonDefragmentableFields) {
395+
search::Schema schema;
396+
schema.fields["text"] =
397+
search::SchemaField{search::SchemaField::TEXT, 0, "fn", search::SchemaField::TextParams{}};
398+
schema.fields["num"] = search::SchemaField{search::SchemaField::NUMERIC, 0, "fn",
399+
search::SchemaField::NumericParams{}};
400+
search::IndicesOptions options{{}};
401+
search::FieldIndices index{schema, options, &m_, nullptr};
402+
403+
PageUsage p{CollectPageStats::NO, 0.1};
404+
p.SetForceReallocate(true);
405+
406+
const MockDocument md;
407+
index.Add(1, md);
408+
409+
// Unsupported index types will skip defragmenting themselves
410+
const search::DefragmentResult result = index.Defragment(&p);
411+
EXPECT_FALSE(result.quota_depleted);
412+
EXPECT_EQ(result.objects_moved, 0);
413+
}

src/core/search/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ add_library(dfly_search_core ast_expr.cc base.cc hnsw_index.cc query_driver.cc s
99
indices.cc sort_indices.cc vector_utils.cc compressed_sorted_set.cc block_list.cc
1010
range_tree.cc synonyms.cc ${gen_dir}/parser.cc ${gen_dir}/lexer.cc)
1111

12-
target_link_libraries(dfly_search_core base redis_lib absl::strings
12+
target_link_libraries(dfly_search_core dfly_page_usage base redis_lib absl::strings
1313
TRDP::reflex TRDP::uni-algo TRDP::hnswlib Boost::headers)
1414

1515
if(WITH_SIMSIMD)

src/core/search/base.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,10 @@ std::optional<double> ParseNumericField(std::string_view value) {
2525
return std::nullopt;
2626
}
2727

28+
DefragmentResult& DefragmentResult::Merge(DefragmentResult&& other) {
29+
quota_depleted |= other.quota_depleted;
30+
objects_moved += other.objects_moved;
31+
return *this;
32+
}
33+
2834
} // namespace dfly::search

src/core/search/base.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,18 @@
1414
#include <string_view>
1515
#include <vector>
1616

17+
namespace dfly {
18+
class PageUsage;
19+
}
20+
1721
namespace dfly::search {
1822

23+
struct DefragmentResult {
24+
bool quota_depleted{false};
25+
size_t objects_moved{0};
26+
DefragmentResult& Merge(DefragmentResult&& other);
27+
};
28+
1929
using DocId = uint32_t;
2030
using GlobalDocId = uint64_t;
2131
using ShardId = uint16_t;
@@ -101,6 +111,11 @@ struct BaseIndex {
101111
Some indices may need to finalize internal structures. See RangeTree for example. */
102112
virtual void FinalizeInitialization() {
103113
}
114+
115+
// Defragments the index by moving objects in underutilized pages to the current malloc page.
116+
virtual DefragmentResult Defragment(PageUsage* page_usage) {
117+
return DefragmentResult{.quota_depleted = false, .objects_moved = 0};
118+
}
104119
};
105120

106121
// Base class for type-specific sorting indices.

0 commit comments

Comments
 (0)