Skip to content

Commit

Permalink
fix LeaseTest::apply_thread_hung unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
PFZheng committed Feb 19, 2020
1 parent 503013f commit cfa7943
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
6 changes: 5 additions & 1 deletion src/braft/test/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ class Cluster {
}

int start(const butil::EndPoint& listen_addr, bool empty_peers = false,
int snapshot_interval_s = 30) {
int snapshot_interval_s = 30,
braft::Closure* leader_start_closure = NULL) {
if (_server_map[listen_addr] == NULL) {
brpc::Server* server = new brpc::Server();
if (braft::add_service(server, listen_addr) != 0
Expand All @@ -250,6 +251,9 @@ class Cluster {
options.initial_conf = braft::Configuration(_peers);
}
MockFSM* fsm = new MockFSM(listen_addr);
if (leader_start_closure) {
fsm->set_on_leader_start_closure(leader_start_closure);
}
options.fsm = fsm;
options.node_owns_fsm = true;
butil::string_printf(&options.log_uri, "local://./data/%s/log",
Expand Down
34 changes: 22 additions & 12 deletions test/test_leader_lease.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,16 +473,23 @@ TEST_F(LeaseTest, leader_step_down) {

class OnLeaderStartHungClosure : public braft::SynchronizedClosure {
public:
OnLeaderStartHungClosure() : braft::SynchronizedClosure(), hung(true) {}
OnLeaderStartHungClosure(int i)
: braft::SynchronizedClosure(), idx(i), hung(true), running(false) {}

void Run() {
running = true;
LOG(WARNING) << "start run on leader start hung closure " << idx;
while (hung) {
usleep(10 * 1000);
bthread_usleep(10 * 1000);
}
LOG(WARNING) << "finish run on leader start hung closure" << idx;
braft::SynchronizedClosure::Run();
running = false;
}

volatile bool hung;
int idx;
butil::atomic<bool> hung;
butil::atomic<bool> running;
};

TEST_F(LeaseTest, apply_thread_hung) {
Expand All @@ -496,27 +503,29 @@ TEST_F(LeaseTest, apply_thread_hung) {
peer.addr.port = 5006 + i;
peer.idx = 0;
peers.push_back(peer);
on_leader_start_closures.push_back(new OnLeaderStartHungClosure);
on_leader_start_closures.push_back(new OnLeaderStartHungClosure(i));
}

// start cluster
Cluster cluster("unittest", peers, 500, 10);
for (size_t i = 0; i < peers.size(); i++) {
ASSERT_EQ(0, cluster.start(peers[i].addr));
ASSERT_EQ(0, cluster.start(peers[i].addr, false, 30, on_leader_start_closures[i]));
}

std::vector<braft::Node*> nodes;
/*
cluster.all_nodes(&nodes);
for (size_t i = 0; i < nodes.size(); ++i) {
static_cast<MockFSM*>(nodes[i]->_impl->_options.fsm)
->set_on_leader_start_closure(on_leader_start_closures[i]);
}
*/

// elect leader
cluster.wait_leader();
braft::Node* leader = cluster.leader();
ASSERT_TRUE(leader != NULL);
LOG(WARNING) << "leader is " << leader->node_id();
std::vector<braft::Node*> nodes;
cluster.followers(&nodes);
braft::PeerId target = nodes[0]->node_id().peer_id;

Expand All @@ -530,16 +539,17 @@ TEST_F(LeaseTest, apply_thread_hung) {
ASSERT_EQ(lease_status.state, braft::LEASE_NOT_READY);

// apply thread resume to work
for (size_t i = 0; i < nodes.size(); ++i) {
on_leader_start_closures[i]->hung = false;
for (auto c : on_leader_start_closures) {
c->hung = false;
if (c->running) {
LOG(WARNING) << "waiting leader resume";
c->wait();
LOG(WARNING) << "leader resumed";
}
}

usleep(100 * 1000);

// lease status should be valid
leader->get_leader_lease_status(&lease_status);
ASSERT_EQ(lease_status.state, braft::LEASE_VALID);

cluster.stop_all();

for (auto c : on_leader_start_closures) {
Expand Down

0 comments on commit cfa7943

Please sign in to comment.