From edbc307e963e1a72c777bb3d778ecb1cfd5114c2 Mon Sep 17 00:00:00 2001 From: yawzhang Date: Thu, 16 Jan 2025 15:53:30 +0800 Subject: [PATCH] Ensure end_of_append_batch is Called for All Raft Log Types Previously, HomeRaftLogStore::end_of_append_batch was only invoked for app_logs, which required requests in m_lsn_req_map. This behavior caused issues when only non-app raft logs (e.g., conf logs) were appended, as the function would not be called, leaving m_last_durable_lsn outdated. Consequently, next_slot() could return incorrect values based on the stale m_last_durable_lsn. This update ensures that HomeRaftLogStore::end_of_append_batch is called for all raft log types, guaranteeing that all logs are flushed and m_last_durable_lsn is consistently updated everytime log_store's end_of_append_batch is executed --- conanfile.py | 2 +- .../replication/log_store/repl_log_store.cpp | 44 +++++++++---------- 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/conanfile.py b/conanfile.py index 6a0208560..3b6edff8e 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.6.11" + version = "6.6.12" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/log_store/repl_log_store.cpp b/src/lib/replication/log_store/repl_log_store.cpp index 97d70ff92..072d06b99 100644 --- a/src/lib/replication/log_store/repl_log_store.cpp +++ b/src/lib/replication/log_store/repl_log_store.cpp @@ -44,11 +44,8 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { auto proposer_reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) { auto rreq = m_sm.lsn_to_req(lsn); - // Skip this call in proposer, since this method will synchronously flush the data, which is not required for - // leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a - // high possibility the log entry is already flushed. Skip it for rreq == nullptr which is the case for raft - // config entries. - if ((rreq == nullptr) /*|| rreq->is_proposer()*/) { + // Skip it for rreq == nullptr which is the case for raft config entries. + if ((rreq == nullptr)) { continue; } else if (rreq->is_proposer()) { proposer_reqs->emplace_back(std::move(rreq)); @@ -60,41 +57,40 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={} {}", start_lsn, count, reqs->size(), proposer_reqs->size()); - // All requests are from proposer for data write, so as mentioned above we can skip the flush for now if (!reqs->empty()) { // Check the map if data corresponding to all of these requsts have been received and written. If not, schedule // a fetch and write. Once all requests are completed and written, these requests are poped out of the map and // the future will be ready. + auto cur_time = std::chrono::steady_clock::now(); auto fut = m_rd.notify_after_data_written(reqs); // Wait for the fetch and write to be completed successfully. // It is essential to complete the data write before appending to the log. If the logs are flushed // before the data is written, a restart and subsequent log replay occurs, as the in-memory state is lost, // it leaves us uncertain about whether the data was actually written, potentially leading to data inconsistency. std::move(fut).wait(); + HISTOGRAM_OBSERVE(m_rd.metrics(), data_channel_wait_latency_us, get_elapsed_time_us(cur_time)); + } - // Flushing log now. - auto cur_time = std::chrono::steady_clock::now(); - HomeRaftLogStore::end_of_append_batch(start_lsn, count); - HISTOGRAM_OBSERVE(m_rd.metrics(), raft_end_of_append_batch_latency_us, get_elapsed_time_us(cur_time)); + // Flushing logs now. + auto cur_time = std::chrono::steady_clock::now(); + HomeRaftLogStore::end_of_append_batch(start_lsn, count); + HISTOGRAM_OBSERVE(m_rd.metrics(), raft_end_of_append_batch_latency_us, get_elapsed_time_us(cur_time)); - cur_time = std::chrono::steady_clock::now(); - HISTOGRAM_OBSERVE(m_rd.metrics(), data_channel_wait_latency_us, get_elapsed_time_us(cur_time)); + // Mark all the reqs completely written + for (auto const& rreq : *reqs) { + if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); } + } - // Mark all the reqs also completely written - for (auto const& rreq : *reqs) { - if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); } - } - } else if (!proposer_reqs->empty()) { - RD_LOGT("Raft Channel: end_of_append_batch, I am proposer, only flush log s from {} , count {}", start_lsn, - count); - // Mark all the reqs also completely written - HomeRaftLogStore::end_of_append_batch(start_lsn, count); - for (auto const& rreq : *proposer_reqs) { - if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); } + // Data corresponding to proposer reqs have already been written before propose reqs to raft, + // so skip waiting data written and mark reqs as flushed here. + for (auto const& rreq : *proposer_reqs) { + if (rreq) { + RD_LOGT("Raft Channel: end_of_append_batch, I am proposer for lsn {}, only flushed log for it", rreq->lsn()); + rreq->add_state(repl_req_state_t::LOG_FLUSHED); } } - // Convert volatile logs to non-volatile logs in state machine + // Convert volatile logs to non-volatile logs in state machine. for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) { auto rreq = m_sm.lsn_to_req(lsn); if (rreq != nullptr) {