Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dead Page Reference Count Bug Fix #181

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/llfs/committable_page_cache_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,12 @@ Status CommittablePageCacheJob::commit_impl(const JobCommitParams& params, u64 c
const PageCacheJob* job = this->job_.get();
BATT_CHECK_NOT_NULLPTR(job);

LLFS_VLOG(1) << "commit(PageCacheJob): entered";
if (durable_caller_slot) {
LLFS_VLOG(1) << "commit(PageCacheJob): entered" << BATT_INSPECT(prev_caller_slot)
<< BATT_INSPECT(*durable_caller_slot);
} else {
LLFS_VLOG(1) << "commit(PageCacheJob): entered" << BATT_INSPECT(prev_caller_slot);
}

// Make sure the job is pruned!
//
Expand Down Expand Up @@ -426,7 +431,7 @@ auto CommittablePageCacheJob::start_ref_count_updates(const JobCommitParams& par
PageRefCountUpdates& updates, u64 /*callers*/)
-> StatusOr<DeadPages>
{
LLFS_VLOG(1) << "commit(PageCacheJob): updating ref counts";
LLFS_VLOG(1) << "commit(PageCacheJob): updating ref counts" << BATT_INSPECT(params.caller_slot);

DeadPages dead_pages;

Expand Down Expand Up @@ -579,10 +584,11 @@ Status CommittablePageCacheJob::recycle_dead_pages(const JobCommitParams& params

BATT_ASSIGN_OK_RESULT(
slot_offset_type recycler_sync_point,
params.recycler.recycle_pages(as_slice(dead_pages.ids), params.recycle_grant,
params.recycle_depth + 1));
params.recycler.recycle_pages(as_slice(dead_pages.ids), params.caller_slot,
params.recycle_grant, params.recycle_depth + 1));

LLFS_VLOG(1) << "commit(PageCacheJob): waiting for PageRecycler sync point";
LLFS_VLOG(1) << "commit(PageCacheJob): waiting for PageRecycler sync point"
<< BATT_INSPECT(params.caller_slot);

return params.recycler.await_flush(recycler_sync_point);
//
Expand Down
18 changes: 11 additions & 7 deletions src/llfs/page_allocator_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,11 +458,14 @@ PageAllocatorState::ProposalStatus PageAllocatorState::propose(PackedPageAllocat
// If this is a valid proposal that will cause state change, go through and change the deltas to
// the new ref count values.
//
LLFS_VLOG(1) << "propose (start): txn-ref-count= " << txn->ref_counts.size();
if (status == ProposalStatus::kValid) {
for (PackedPageRefCount& prc : txn->ref_counts) {
prc.ref_count = this->calculate_new_ref_count(prc);
LLFS_VLOG(1) << "reference count: " << prc;
prc.ref_count = this->calculate_new_ref_count(prc, *user_index);
}
}
LLFS_VLOG(1) << "propose (end): txn-ref-count= " << txn->ref_counts.size();

return status;
}
Expand Down Expand Up @@ -556,23 +559,23 @@ namespace {
void run_ref_count_update_sanity_checks(const PageIdFactory& id_factory,
const PackedPageRefCount& delta,
const PageAllocatorRefCount& obj, i32 old_count,
i32 new_count)
i32 new_count, const u32 index)
{
const auto debug_info = [&](std::ostream& out) {
const page_generation_int delta_generation = id_factory.get_generation(delta.page_id.unpack());

out << "(" << BATT_INSPECT(delta) << BATT_INSPECT(obj) << BATT_INSPECT(old_count)
<< BATT_INSPECT(new_count) << ")" << BATT_INSPECT(delta_generation);
<< BATT_INSPECT(new_count) << ")" << BATT_INSPECT(delta_generation) << BATT_INSPECT(index);
};

LLFS_VLOG(2) << debug_info;
LLFS_VLOG(2) << "rrcusc: " << debug_info;

BATT_CHECK_GE(old_count, 0) << "ref counts must be non-negative" << debug_info;
BATT_CHECK_GE(new_count, 0) << "ref counts must be non-negative" << debug_info;

if (old_count == new_count) {
BATT_CHECK_EQ(delta.ref_count, 0)
<< "delta was non-zero but count did not change" << debug_info;
<< "delta was non-zero but count did not change (bbora)" << debug_info;
return;
}

Expand Down Expand Up @@ -639,7 +642,8 @@ void run_ref_count_update_sanity_checks(const PageIdFactory& id_factory,

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
i32 PageAllocatorState::calculate_new_ref_count(const PackedPageRefCount& delta) const
i32 PageAllocatorState::calculate_new_ref_count(const PackedPageRefCount& delta,
const u32 index) const
{
const PageId page_id = delta.page_id.unpack();
const page_id_int physical_page = this->page_ids_.get_physical_page(page_id);
Expand All @@ -662,7 +666,7 @@ i32 PageAllocatorState::calculate_new_ref_count(const PackedPageRefCount& delta)
new_count = old_count + delta.ref_count;
}

run_ref_count_update_sanity_checks(this->page_ids_, delta, obj, old_count, new_count);
run_ref_count_update_sanity_checks(this->page_ids_, delta, obj, old_count, new_count, index);

return new_count;
}
Expand Down
2 changes: 1 addition & 1 deletion src/llfs/page_allocator_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ class PageAllocatorState : public PageAllocatorStateNoLock

// Returns the new ref count that will result from applying the delta to the passed obj.
//
i32 calculate_new_ref_count(const PackedPageRefCount& delta) const;
i32 calculate_new_ref_count(const PackedPageRefCount& delta, const u32 index) const;

/** \brief If the given ref count object has a positive ref count but *is* in the free pool, then
* this function removes it; otherwise if the object has a zero ref count but is *not* in the free
Expand Down
78 changes: 65 additions & 13 deletions src/llfs/page_recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ StatusOr<SlotRange> refresh_recycler_info_slot(TypedSlotWriter<PageRecycleEvent>
{
initialize_status_codes();

LLFS_VLOG(1) << "PageRecycler:recover() start" << BATT_INSPECT(name);
PageRecyclerRecoveryVisitor visitor{default_options};

// Read the log, scanning its contents.
Expand Down Expand Up @@ -153,17 +154,20 @@ StatusOr<SlotRange> refresh_recycler_info_slot(TypedSlotWriter<PageRecycleEvent>

state->bulk_load(as_slice(visitor.recovered_pages()));

return std::unique_ptr<PageRecycler>{new PageRecycler(scheduler, std::string{name}, page_deleter,
std::move(*recovered_log),
std::move(latest_batch), std::move(state))};
LLFS_VLOG(1) << "PageRecycler:recover() end" << BATT_INSPECT(name);

return std::unique_ptr<PageRecycler>{
new PageRecycler(scheduler, std::string{name}, page_deleter, std::move(*recovered_log),
std::move(latest_batch), std::move(state), visitor.largest_offset())};
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
PageRecycler::PageRecycler(batt::TaskScheduler& scheduler, const std::string& name,
PageDeleter& page_deleter, std::unique_ptr<LogDevice>&& wal_device,
Optional<Batch>&& recovered_batch,
std::unique_ptr<PageRecycler::State>&& state) noexcept
std::unique_ptr<PageRecycler::State>&& state,
u64 largest_offset_as_unique_identifier_init) noexcept
: scheduler_{scheduler}
, name_{name}
, page_deleter_{page_deleter}
Expand All @@ -177,6 +181,7 @@ PageRecycler::PageRecycler(batt::TaskScheduler& scheduler, const std::string& na
, recycle_task_{}
, metrics_{}
, prepared_batch_{std::move(recovered_batch)}
, largest_offset_as_unique_identifier_{largest_offset_as_unique_identifier_init}
{
const PageRecyclerOptions& options = this->state_.no_lock().options;

Expand All @@ -202,6 +207,34 @@ PageRecycler::PageRecycler(batt::TaskScheduler& scheduler, const std::string& na
#undef ADD_METRIC_
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
/** \brief This infrastructure is to collect metrics for PageRecycler submodule.
* This metric collection is currently used by test suit to assess execution behavior of internal
* flows. This is static metric infrastructure so that any user level code could access it.
*
*/
auto PageRecycler::metrics_export() -> MetricsExported&
{
static MetricsExported& metrics_ = [&]() -> MetricsExported& {
static MetricsExported metrics_;

LOG(INFO) << "Registering PageRecycler metrics...";
const auto metric_name = [](std::string_view property) {
return batt::to_string("PageRecycler_", property);
};

#define ADD_METRIC_(n) global_metric_registry().add(metric_name(#n), metrics_.n)

ADD_METRIC_(page_id_deletion_reissue);

#undef ADD_METRIC_

return metrics_;
}();

return metrics_;
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
PageRecycler::~PageRecycler() noexcept
Expand Down Expand Up @@ -270,20 +303,33 @@ void PageRecycler::join()

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
StatusOr<slot_offset_type> PageRecycler::recycle_pages(const Slice<const PageId>& page_ids,
batt::Grant* grant, i32 depth)
StatusOr<slot_offset_type> PageRecycler::recycle_pages(
const Slice<const PageId>& page_ids, llfs::slot_offset_type offset_as_unique_identifier,
batt::Grant* grant, i32 depth)
{
BATT_CHECK_GE(depth, 0);

LLFS_VLOG(1) << "PageRecycler::recycle_pages(page_ids=" << batt::dump_range(page_ids) << "["
<< page_ids.size() << "]"
<< ", grant=[" << (grant ? grant->size() : usize{0}) << "], depth=" << depth << ") "
<< this->name_;
<< this->name_ << BATT_INSPECT(offset_as_unique_identifier);

if (page_ids.empty()) {
return this->wal_device_->slot_range(LogReadMode::kDurable).upper_bound;
}

// Check to see if we have already seen this or newer request.
//
if (this->largest_offset_as_unique_identifier_ >= offset_as_unique_identifier) {
this->metrics_export().page_id_deletion_reissue.fetch_add(1);

return this->wal_device_->slot_range(LogReadMode::kDurable).upper_bound;
}

// Update the largest unique identifier.
//
this->largest_offset_as_unique_identifier_ = offset_as_unique_identifier;

Optional<slot_offset_type> sync_point = None;

if (depth == 0) {
Expand All @@ -297,6 +343,7 @@ StatusOr<slot_offset_type> PageRecycler::recycle_pages(const Slice<const PageId>

const PageRecyclerOptions& options = this->state_.no_lock().options;

LLFS_VLOG(1) << "recycle_pages entered grant==NULL case";
for (PageId page_id : page_ids) {
StatusOr<batt::Grant> local_grant = [&] {
const usize needed_size = options.insert_grant_size();
Expand All @@ -319,8 +366,9 @@ StatusOr<slot_offset_type> PageRecycler::recycle_pages(const Slice<const PageId>
BATT_REQUIRE_OK(local_grant);
{
auto locked_state = this->state_.lock();
StatusOr<slot_offset_type> append_slot =
this->insert_to_log(*local_grant, page_id, depth, locked_state);
// Writing to recycler log.
StatusOr<slot_offset_type> append_slot = this->insert_to_log(
*local_grant, page_id, depth, offset_as_unique_identifier, locked_state);
BATT_REQUIRE_OK(append_slot);

clamp_min_slot(&sync_point, *append_slot);
Expand All @@ -329,10 +377,13 @@ StatusOr<slot_offset_type> PageRecycler::recycle_pages(const Slice<const PageId>
} else {
BATT_CHECK_LT(depth, (i32)kMaxPageRefDepth) << BATT_INSPECT_RANGE(page_ids);

LLFS_VLOG(1) << "recycle_pages entered the valid grant case";

auto locked_state = this->state_.lock();
for (PageId page_id : page_ids) {
// Writing to recycler log.
StatusOr<slot_offset_type> append_slot =
this->insert_to_log(*grant, page_id, depth, locked_state);
this->insert_to_log(*grant, page_id, depth, offset_as_unique_identifier, locked_state);
BATT_REQUIRE_OK(append_slot);

clamp_min_slot(&sync_point, *append_slot);
Expand All @@ -348,13 +399,13 @@ StatusOr<slot_offset_type> PageRecycler::recycle_pages(const Slice<const PageId>
StatusOr<slot_offset_type> PageRecycler::recycle_page(PageId page_id, batt::Grant* grant, i32 depth)
{
std::array<PageId, 1> page_ids{page_id};
return this->recycle_pages(batt::as_slice(page_ids), grant, depth);
return this->recycle_pages(batt::as_slice(page_ids), 0, grant, depth);
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
StatusOr<slot_offset_type> PageRecycler::insert_to_log(
batt::Grant& grant, PageId page_id, i32 depth,
batt::Grant& grant, PageId page_id, i32 depth, slot_offset_type offset_as_unique_identifier,
batt::Mutex<std::unique_ptr<State>>::Lock& locked_state)
{
BATT_CHECK(locked_state.is_held());
Expand All @@ -370,6 +421,7 @@ StatusOr<slot_offset_type> PageRecycler::insert_to_log(
.refresh_slot = None,
.batch_slot = None,
.depth = depth,
.offset_as_unique_identifier = offset_as_unique_identifier,
},
[&](const batt::SmallVecBase<PageToRecycle*>& to_append) -> StatusOr<slot_offset_type> {
if (to_append.empty()) {
Expand All @@ -387,7 +439,7 @@ StatusOr<slot_offset_type> PageRecycler::insert_to_log(
BATT_REQUIRE_OK(append_slot);
item->refresh_slot = append_slot->lower_bound;
last_slot = slot_max(last_slot, append_slot->upper_bound);
LLFS_VLOG(1) << "Write " << item << " to the log; last_slot=" << last_slot;
LLFS_VLOG(1) << "Write " << *item << " to the log; last_slot=" << last_slot;
}
BATT_CHECK_NE(this->slot_writer_.slot_offset(), current_slot);

Expand Down
12 changes: 11 additions & 1 deletion src/llfs/page_recycler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ class PageRecycler
CountMetric<u64> page_drop_error_count{0};
};

struct MetricsExported {
CountMetric<u32> page_id_deletion_reissue{0};
};
static MetricsExported& metrics_export();

//+++++++++++-+-+--+----- --- -- - - - -

static PageCount default_max_buffered_page_count(const PageRecyclerOptions& options);
Expand Down Expand Up @@ -105,6 +110,7 @@ class PageRecycler
// necessarily flushed (see `await_flush`).
//
StatusOr<slot_offset_type> recycle_pages(const Slice<const PageId>& page_ids,
llfs::slot_offset_type offset,
batt::Grant* grant = nullptr, i32 depth = 0);

// Schedule a single page to be recycled. \see recycle_pages
Expand Down Expand Up @@ -198,7 +204,8 @@ class PageRecycler
explicit PageRecycler(batt::TaskScheduler& scheduler, const std::string& name,
PageDeleter& page_deleter, std::unique_ptr<LogDevice>&& wal_device,
Optional<Batch>&& recovered_batch,
std::unique_ptr<PageRecycler::State>&& state) noexcept;
std::unique_ptr<PageRecycler::State>&& state,
u64 largest_offset_as_unique_identifier_init) noexcept;

void start_recycle_task();

Expand All @@ -209,6 +216,7 @@ class PageRecycler
void refresh_grants();

StatusOr<slot_offset_type> insert_to_log(batt::Grant& grant, PageId page_id, i32 depth,
slot_offset_type offset_as_unique_identifier,
batt::Mutex<std::unique_ptr<State>>::Lock& locked_state);

StatusOr<Batch> prepare_batch(std::vector<PageToRecycle>&& to_recycle);
Expand Down Expand Up @@ -250,6 +258,8 @@ class PageRecycler
Optional<Batch> prepared_batch_;

Optional<slot_offset_type> latest_batch_upper_bound_;

slot_offset_type largest_offset_as_unique_identifier_;
};

inline std::ostream& operator<<(std::ostream& out, const PageRecycler::Batch& t)
Expand Down
4 changes: 2 additions & 2 deletions src/llfs/page_recycler.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ class FakePageDeleter : public PageDeleter
// Recursively recycle any newly dead pages. If we try to recycle the same page multiple
// times, that is OK, since PageIds are never reused.
//
result = this->test_->recycler_->recycle_pages(as_slice(dead_pages), //
result = this->test_->recycler_->recycle_pages(as_slice(dead_pages), caller_slot, //
&recycle_grant, depth + 1);
BATT_REQUIRE_OK(result);

Expand Down Expand Up @@ -500,7 +500,7 @@ void PageRecyclerTest::run_crash_recovery_test()
const std::array<PageId, 1> to_recycle = {root_id};

BATT_DEBUG_INFO("Test - recycle_pages");
StatusOr<slot_offset_type> recycle_status = recycler.recycle_pages(to_recycle);
StatusOr<slot_offset_type> recycle_status = recycler.recycle_pages(to_recycle, 0);
if (!recycle_status.ok()) {
failed = true;
break;
Expand Down
Loading
Loading