diff --git a/src/braft/fsm_caller.cpp b/src/braft/fsm_caller.cpp index 98913eea..7330d821 100644 --- a/src/braft/fsm_caller.cpp +++ b/src/braft/fsm_caller.cpp @@ -345,6 +345,7 @@ void FSMCaller::do_snapshot_save(SaveSnapshotClosure* done) { iter != conf_entry.old_conf.end(); ++iter) { *meta.add_old_peers() = iter->to_string(); } + done->set_snapshot_index(last_applied_index); SnapshotWriter* writer = done->start(meta); if (!writer) { diff --git a/src/braft/fsm_caller.h b/src/braft/fsm_caller.h index 897a50f9..98393da5 100644 --- a/src/braft/fsm_caller.h +++ b/src/braft/fsm_caller.h @@ -95,6 +95,17 @@ class SaveSnapshotClosure : public Closure { public: // TODO: comments virtual SnapshotWriter* start(const SnapshotMeta& meta) = 0; + + void set_snapshot_index(int64_t index){ + _index = index; + } + + int64_t snapshot_index() const { + return _index; + } + +private: + int64_t _index; }; class LoadSnapshotClosure : public Closure { diff --git a/src/braft/snapshot_executor.cpp b/src/braft/snapshot_executor.cpp index e33ece86..3ac0b8d0 100644 --- a/src/braft/snapshot_executor.cpp +++ b/src/braft/snapshot_executor.cpp @@ -38,6 +38,8 @@ class SaveSnapshotDone : public SaveSnapshotClosure { private: static void* continue_run(void* arg); + void set_meta(); + SnapshotExecutor* _se; SnapshotWriter* _writer; Closure* _done; // user done @@ -308,6 +310,9 @@ SnapshotWriter* SaveSnapshotDone::start(const SnapshotMeta& meta) { void* SaveSnapshotDone::continue_run(void* arg) { SaveSnapshotDone* self = (SaveSnapshotDone*)arg; std::unique_ptr self_guard(self); + if (self->status().ok()) { + self->set_meta(); + } // Must call on_snapshot_save_done to clear _saving_snapshot int ret = self->_se->on_snapshot_save_done( self->status(), self->_meta, self->_writer); @@ -324,6 +329,14 @@ void* SaveSnapshotDone::continue_run(void* arg) { return NULL; } +void SaveSnapshotDone::set_meta(){ + const int64_t last_include_index = snapshot_index(); + _meta.set_last_included_index(last_include_index); + const int64_t last_include_term = _se->_log_manager->get_term(last_include_index); + CHECK(last_include_term > 0) << "last_included_index: " << last_include_index << " is out of range"; + _meta.set_last_included_term(last_include_term); +} + void SaveSnapshotDone::Run() { // Avoid blocking FSMCaller // This continuation of snapshot saving is likely running inplace where the diff --git a/test/test_fsm_caller.cpp b/test/test_fsm_caller.cpp index bb3c1866..b1a155ef 100644 --- a/test/test_fsm_caller.cpp +++ b/test/test_fsm_caller.cpp @@ -204,10 +204,12 @@ class DummySnapshoWriter : public braft::SnapshotWriter { class MockSaveSnapshotClosure : public braft::SaveSnapshotClosure { public: MockSaveSnapshotClosure(braft::SnapshotWriter* writer, - braft::SnapshotMeta *expected_meta) + braft::SnapshotMeta *expected_meta, + braft::LogManager* lm) : _start_times(0) , _writer(writer) , _expected_meta(expected_meta) + , _lm(lm) { } ~MockSaveSnapshotClosure() {} @@ -222,10 +224,18 @@ class MockSaveSnapshotClosure : public braft::SaveSnapshotClosure { ++_start_times; return _writer; } + + void set_meta(){ + const int64_t last_include_index = snapshot_index(); + _expected_meta->set_last_included_index(last_include_index); + const int64_t last_include_term = _lm->get_term(last_include_index); + _expected_meta->set_last_included_term(last_include_term); + } private: int _start_times; braft::SnapshotWriter* _writer; braft::SnapshotMeta* _expected_meta; + braft::LogManager* _lm; }; class MockLoadSnapshotClosure : public braft::LoadSnapshotClosure { @@ -253,13 +263,13 @@ TEST_F(FSMCallerTest, snapshot) { snapshot_meta.set_last_included_term(0); DummySnapshotReader dummy_reader(&snapshot_meta); DummySnapshoWriter dummy_writer; - MockSaveSnapshotClosure save_snapshot_done(&dummy_writer, &snapshot_meta); system("rm -rf ./data"); scoped_ptr cm( new braft::ConfigurationManager); scoped_ptr storage( new braft::SegmentLogStorage("./data")); scoped_ptr lm(new braft::LogManager()); + MockSaveSnapshotClosure save_snapshot_done(&dummy_writer, &snapshot_meta, lm.get()); braft::LogManagerOptions log_opt; log_opt.log_storage = storage.get(); log_opt.configuration_manager = cm.get(); @@ -286,3 +296,71 @@ TEST_F(FSMCallerTest, snapshot) { ASSERT_EQ(1, load_snapshot_done._start_times); } +TEST_F(FSMCallerTest, manually_set_snapshot_index) { + braft::SnapshotMeta snapshot_meta; + int64_t term_1_first_include_index = 1; + int64_t term_1_last_include_index = 10; + int64_t term_2_last_include_index = 20; + // in term 1 + int64_t first_snapshot_truncate_index = (term_1_last_include_index - term_1_first_include_index) / 2; + // in term 2 + int64_t second_snapshot_truncate_index = term_1_last_include_index+ (term_2_last_include_index - term_1_last_include_index) / 2; + int64_t frist_term = 1; + int64_t second_term = 2; + + DummySnapshotReader dummy_reader(&snapshot_meta); + DummySnapshoWriter dummy_writer; + system("rm -rf ./data"); + scoped_ptr cm( + new braft::ConfigurationManager); + scoped_ptr storage( + new braft::SegmentLogStorage("./data")); + scoped_ptr lm(new braft::LogManager()); + MockSaveSnapshotClosure save_snapshot_done(&dummy_writer, &snapshot_meta, lm.get()); + braft::LogManagerOptions log_opt; + log_opt.log_storage = storage.get(); + log_opt.configuration_manager = cm.get(); + ASSERT_EQ(0, lm->init(log_opt)); + for (int i = term_1_first_include_index; i <= term_1_last_include_index; i++){ + braft::LogEntry* entry = new braft::LogEntry; + entry->AddRef(); + butil::StringPiece data = "test"; + entry->data.append(data.data(), data.size()); + entry->type = braft::ENTRY_TYPE_DATA; + entry->id = braft::LogId(i, frist_term); + SyncClosure sc; + std::vector entries; + entries.push_back(entry); + lm->append_entries(&entries, &sc); + sc.join(); + } + for (int i = term_1_last_include_index + 1; i <= term_2_last_include_index; i++){ + braft::LogEntry* entry = new braft::LogEntry; + entry->AddRef(); + butil::StringPiece data = "test"; + entry->data.append(data.data(), data.size()); + entry->type = braft::ENTRY_TYPE_DATA; + entry->id = braft::LogId(i, second_term); + SyncClosure sc; + std::vector entries; + entries.push_back(entry); + lm->append_entries(&entries, &sc); + sc.join(); + } + // LogManager + // + // Log: |___1-10______| |___11-20____| + // term1 term2 + + save_snapshot_done.set_snapshot_index(first_snapshot_truncate_index); + save_snapshot_done.set_meta(); + ASSERT_EQ(snapshot_meta.last_included_index(), first_snapshot_truncate_index); + ASSERT_EQ(snapshot_meta.last_included_term(), frist_term); + + save_snapshot_done.set_snapshot_index(second_snapshot_truncate_index); + save_snapshot_done.set_meta(); + ASSERT_EQ(snapshot_meta.last_included_index(), second_snapshot_truncate_index); + ASSERT_EQ(snapshot_meta.last_included_term(), second_term); + +} + diff --git a/test/test_log_manager.cpp b/test/test_log_manager.cpp index 97b2dc43..bfcacd5f 100644 --- a/test/test_log_manager.cpp +++ b/test/test_log_manager.cpp @@ -681,3 +681,38 @@ TEST_F(LogManagerTest, set_snapshot_and_get_log_term) { ASSERT_EQ(1L, lm->get_term(N - 1)); LOG(INFO) << "Last_index=" << lm->last_log_index(); } + +TEST_F(LogManagerTest, manually_set_snapshot_index) { + system("rm -rf ./data"); + int64_t first_include_index = 1; + int64_t last_include_index = 20; + int64_t first_snapshot_truncate_index = 10; + int64_t second_snapshot_truncate_index = 15; + int64_t term = 1; + scoped_ptr cm( + new braft::ConfigurationManager); + scoped_ptr storage( + new braft::SegmentLogStorage("./data")); + scoped_ptr lm(new braft::LogManager()); + braft::LogManagerOptions opt; + opt.log_storage = storage.get(); + opt.configuration_manager = cm.get(); + ASSERT_EQ(0, lm->init(opt)); + for (int i = first_include_index; i <= last_include_index; ++i) { + append_entry(lm.get(), "test", i, term); + } + braft::SnapshotMeta first_snapshot_meta; + first_snapshot_meta.set_last_included_index(first_snapshot_truncate_index); + first_snapshot_meta.set_last_included_term(term); + //LogManager will truncate nothing in frist snapshot + lm->set_snapshot(&first_snapshot_meta); + ASSERT_EQ(first_include_index, lm->first_log_index()); + + + braft::SnapshotMeta second_snapshot_meta; + second_snapshot_meta.set_last_included_index(second_snapshot_truncate_index); + second_snapshot_meta.set_last_included_term(term); + // Logmanager will truncate first snapshot. + lm->set_snapshot(&second_snapshot_meta); + ASSERT_EQ(first_snapshot_truncate_index + 1, lm->first_log_index()); +}