Skip to content

Commit

Permalink
Add an API to manually create a snapshot (#360)
Browse files Browse the repository at this point in the history
* `create_snapshot` will create a snapshot based on the latest committed
log index.
  • Loading branch information
greensky00 authored Jun 13, 2022
1 parent 9d7c4cb commit 031827b
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 15 deletions.
11 changes: 10 additions & 1 deletion include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,14 @@ public:
*/
void notify_log_append_completion(bool ok);

/**
* Manually create a snapshot based on the latest committed
* log index of the state machine.
*
* @return `true` on success.
*/
bool create_snapshot();

protected:
typedef std::unordered_map<int32, ptr<peer>>::const_iterator peer_itor;

Expand Down Expand Up @@ -854,7 +862,7 @@ protected:
void destroy_user_snp_ctx(ptr<snapshot_sync_ctx> sync_ctx);
void clear_snapshot_sync_ctx(peer& pp);
void commit(ulong target_idx);
void snapshot_and_compact(ulong committed_idx);
bool snapshot_and_compact(ulong committed_idx, bool forced_creation = false);
bool update_term(ulong term);
void reconfigure(const ptr<cluster_config>& new_config);
void update_target_priority();
Expand Down Expand Up @@ -1144,6 +1152,7 @@ protected:

/**
* `true` if this server is creating a snapshot.
* Only one snapshot creation is allowed at a time.
*/
std::atomic<bool> snp_in_progress_;

Expand Down
44 changes: 30 additions & 14 deletions src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -450,30 +450,43 @@ bool raft_server::apply_config_log_entry(ptr<log_entry>& le,
return true;
}

void raft_server::snapshot_and_compact(ulong committed_idx) {
bool raft_server::create_snapshot() {
uint64_t committed_idx = sm_commit_index_;
p_in("manually create a snapshot on %lu", committed_idx);
return snapshot_and_compact(committed_idx, true);
}

bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation) {
ptr<raft_params> params = ctx_->get_params();
if ( params->snapshot_distance_ == 0 ||
( committed_idx - log_store_->start_index() + 1 ) <
(ulong)params->snapshot_distance_ ) {
// snapshot is disabled or the log store is not long enough
return;
}

// get the latest configuration info
ptr<cluster_config> conf = get_config();
if ( conf->get_prev_log_idx() >= log_store_->next_slot() ) {
// The latest config and previous config is not in log_store, so skip the snapshot creation
return;
// The latest config and previous config is not in log_store,
// so skip the snapshot creation.
return false;
}
if ( !state_machine_->chk_create_snapshot() ) {
// User-defined state machine doesn't want to create a snapshot.
return;

if (!forced_creation) {
// If `forced_creation == true`, ignore below conditions.
if ( params->snapshot_distance_ == 0 ||
( committed_idx - log_store_->start_index() + 1 ) <
(ulong)params->snapshot_distance_ ) {
// snapshot is disabled or the log store is not long enough
return false;
}
if ( !state_machine_->chk_create_snapshot() ) {
// User-defined state machine doesn't want to create a snapshot.
return false;
}
}

bool snapshot_in_action = false;
try {
bool f = false;
ptr<snapshot> local_snp = get_last_snapshot();
if ( ( !local_snp ||
if ( ( forced_creation ||
!local_snp ||
( committed_idx - local_snp->get_last_log_idx() ) >=
(ulong)params->snapshot_distance_ ) &&
snp_in_progress_.compare_exchange_strong(f, true) )
Expand All @@ -498,7 +511,7 @@ void raft_server::snapshot_and_compact(ulong committed_idx) {
"this is a system error, exiting");
ctx_->state_mgr_->system_exit(raft_err::N6_no_snapshot_found);
::exit(-1);
return;
return false;
// LCOV_EXCL_STOP
}
conf = local_snp->get_last_config();
Expand Down Expand Up @@ -535,7 +548,9 @@ void raft_server::snapshot_and_compact(ulong committed_idx) {
committed_idx, log_term_to_compact, tt.get_us() );

snapshot_in_action = false;
return true;
}
return false;

} catch (...) {
p_er( "failed to compact logs at index %llu due to errors",
Expand All @@ -544,6 +559,7 @@ void raft_server::snapshot_and_compact(ulong committed_idx) {
bool val = true;
snp_in_progress_.compare_exchange_strong(val, false);
}
return false;
}
}

Expand Down
4 changes: 4 additions & 0 deletions tests/unit/raft_functional_common.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public:
}

bool apply_snapshot(snapshot& s) {
std::lock_guard<std::mutex> ll(lastSnapshotLock);
// NOTE: We only handle logical snapshot.
ptr<buffer> snp_buf = s.serialize();
lastSnapshot = snapshot::deserialize(*snp_buf);
return true;
}

Expand Down
101 changes: 101 additions & 0 deletions tests/unit/raft_server_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
#include "raft_package_fake.hxx"

#include "event_awaiter.h"
#include "raft_params.hxx"
#include "test_common.h"

#include <stdio.h>
Expand Down Expand Up @@ -1929,6 +1930,103 @@ int snapshot_basic_test() {
return 0;
}

int snapshot_manual_creation_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();

std::string s1_addr = "S1";
std::string s2_addr = "S2";
std::string s3_addr = "S3";

RaftPkg s1(f_base, 1, s1_addr);
RaftPkg s2(f_base, 2, s2_addr);
RaftPkg s3(f_base, 3, s3_addr);
std::vector<RaftPkg*> pkgs = {&s1, &s2, &s3};

CHK_Z( launch_servers( pkgs ) );
CHK_Z( make_group( pkgs ) );

// Append a message using separate thread.
ExecArgs exec_args(&s1);
TestSuite::ThreadHolder hh(&exec_args, fake_executer, fake_executer_killer);

for (auto& entry: pkgs) {
RaftPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.return_method_ = raft_params::async_handler;
param.snapshot_distance_ = 100;
pp->raftServer->update_params(param);
}

const size_t NUM = 10;

// Append messages asynchronously.
std::list< ptr< cmd_result< ptr<buffer> > > > handlers;
for (size_t ii=0; ii<NUM; ++ii) {
std::string test_msg = "test" + std::to_string(ii);
ptr<buffer> msg = buffer::alloc(test_msg.size() + 1);
msg->put(test_msg);
ptr< cmd_result< ptr<buffer> > > ret =
s1.raftServer->append_entries( {msg} );

CHK_TRUE( ret->get_accepted() );

handlers.push_back(ret);
}

// NOTE: Send it to S2 only, S3 will be lagging behind.
s1.fNet->execReqResp("S2"); // replication.
s1.fNet->execReqResp("S2"); // commit.
CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) ); // commit execution.

// One more time to make sure.
s1.fNet->execReqResp("S2");
s1.fNet->execReqResp("S2");
CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) );

// Remember the current commit index.
uint64_t committed_index = s1.raftServer->get_committed_log_idx();

// Create a manual snapshot.
CHK_OK( s1.raftServer->create_snapshot() );
CHK_EQ( committed_index, s1.getTestSm()->last_snapshot()->get_last_log_idx() );

// Make req to S3 failed.
s1.fNet->makeReqFail("S3");

// Trigger heartbeat to S3, it will initiate snapshot transmission.
s1.fTimer->invoke(timer_task_type::heartbeat_timer);
s1.fNet->execReqResp();

// Send the entire snapshot.
do {
s1.fNet->execReqResp();
} while (s3.raftServer->is_receiving_snapshot());

s1.fNet->execReqResp(); // commit.
CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) ); // commit execution.

// State machine should be identical.
CHK_OK( s2.getTestSm()->isSame( *s1.getTestSm() ) );
CHK_OK( s3.getTestSm()->isSame( *s1.getTestSm() ) );

CHK_EQ( committed_index, s3.getTestSm()->last_snapshot()->get_last_log_idx() );

print_stats(pkgs);

s1.raftServer->shutdown();
s2.raftServer->shutdown();
s3.raftServer->shutdown();

fake_executer_killer(&exec_args);
hh.join();
CHK_Z( hh.getResult() );

f_base->destroy();

return 0;
}

int join_empty_node_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();
Expand Down Expand Up @@ -2911,6 +3009,9 @@ int main(int argc, char** argv) {
ts.doTest( "snapshot basic test",
snapshot_basic_test );

ts.doTest( "snapshot manual creation test",
snapshot_manual_creation_test );

ts.doTest( "join empty node test",
join_empty_node_test );

Expand Down

0 comments on commit 031827b

Please sign in to comment.