Skip to content

Commit

Permalink
Ensure end_of_append_batch is Called for All Raft Log Types
Browse files Browse the repository at this point in the history
Previously, HomeRaftLogStore::end_of_append_batch was only invoked for app_logs, which required requests in m_lsn_req_map.
This behavior caused issues when only non-app raft logs (e.g., conf logs) were appended, as the function would not be called, leaving m_last_durable_lsn outdated.
Consequently, next_slot() could return incorrect values based on the stale m_last_durable_lsn.
This update ensures that HomeRaftLogStore::end_of_append_batch is called for all raft log types, guaranteeing that all logs are flushed and m_last_durable_lsn is consistently updated everytime log_store's end_of_append_batch is executed
  • Loading branch information
yawzhang authored and xiaoxichen committed Jan 17, 2025
1 parent c739f11 commit edbc307
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 25 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.6.11"
version = "6.6.12"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
44 changes: 20 additions & 24 deletions src/lib/replication/log_store/repl_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,8 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
auto proposer_reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) {
auto rreq = m_sm.lsn_to_req(lsn);
// Skip this call in proposer, since this method will synchronously flush the data, which is not required for
// leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a
// high possibility the log entry is already flushed. Skip it for rreq == nullptr which is the case for raft
// config entries.
if ((rreq == nullptr) /*|| rreq->is_proposer()*/) {
// Skip it for rreq == nullptr which is the case for raft config entries.
if ((rreq == nullptr)) {
continue;
} else if (rreq->is_proposer()) {
proposer_reqs->emplace_back(std::move(rreq));
Expand All @@ -60,41 +57,40 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={} {}", start_lsn, count,
reqs->size(), proposer_reqs->size());

// All requests are from proposer for data write, so as mentioned above we can skip the flush for now
if (!reqs->empty()) {
// Check the map if data corresponding to all of these requsts have been received and written. If not, schedule
// a fetch and write. Once all requests are completed and written, these requests are poped out of the map and
// the future will be ready.
auto cur_time = std::chrono::steady_clock::now();
auto fut = m_rd.notify_after_data_written(reqs);
// Wait for the fetch and write to be completed successfully.
// It is essential to complete the data write before appending to the log. If the logs are flushed
// before the data is written, a restart and subsequent log replay occurs, as the in-memory state is lost,
// it leaves us uncertain about whether the data was actually written, potentially leading to data inconsistency.
std::move(fut).wait();
HISTOGRAM_OBSERVE(m_rd.metrics(), data_channel_wait_latency_us, get_elapsed_time_us(cur_time));
}

// Flushing log now.
auto cur_time = std::chrono::steady_clock::now();
HomeRaftLogStore::end_of_append_batch(start_lsn, count);
HISTOGRAM_OBSERVE(m_rd.metrics(), raft_end_of_append_batch_latency_us, get_elapsed_time_us(cur_time));
// Flushing logs now.
auto cur_time = std::chrono::steady_clock::now();
HomeRaftLogStore::end_of_append_batch(start_lsn, count);
HISTOGRAM_OBSERVE(m_rd.metrics(), raft_end_of_append_batch_latency_us, get_elapsed_time_us(cur_time));

cur_time = std::chrono::steady_clock::now();
HISTOGRAM_OBSERVE(m_rd.metrics(), data_channel_wait_latency_us, get_elapsed_time_us(cur_time));
// Mark all the reqs completely written
for (auto const& rreq : *reqs) {
if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); }
}

// Mark all the reqs also completely written
for (auto const& rreq : *reqs) {
if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); }
}
} else if (!proposer_reqs->empty()) {
RD_LOGT("Raft Channel: end_of_append_batch, I am proposer, only flush log s from {} , count {}", start_lsn,
count);
// Mark all the reqs also completely written
HomeRaftLogStore::end_of_append_batch(start_lsn, count);
for (auto const& rreq : *proposer_reqs) {
if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); }
// Data corresponding to proposer reqs have already been written before propose reqs to raft,
// so skip waiting data written and mark reqs as flushed here.
for (auto const& rreq : *proposer_reqs) {
if (rreq) {
RD_LOGT("Raft Channel: end_of_append_batch, I am proposer for lsn {}, only flushed log for it", rreq->lsn());
rreq->add_state(repl_req_state_t::LOG_FLUSHED);
}
}

// Convert volatile logs to non-volatile logs in state machine
// Convert volatile logs to non-volatile logs in state machine.
for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) {
auto rreq = m_sm.lsn_to_req(lsn);
if (rreq != nullptr) {
Expand Down

0 comments on commit edbc307

Please sign in to comment.