Skip to content

Commit

Permalink
ZOOKEEPER-3624: Fix flaky `QuorumPeerMainTest::testFailedTxnAsPartOfQ…
Browse files Browse the repository at this point in the history
…uorumLoss`

Reviewers: anmolnar
Author: kezhuw
Closes #2204 from kezhuw/ZOOKEEPER-3624-fix-flaky-testFailedTxnAsPartOfQuorumLoss
  • Loading branch information
kezhuw authored Jan 3, 2025
1 parent c0e9241 commit a8eb7fa
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1578,6 +1578,7 @@ public void run() {
} else {
try {
reconfigFlagClear();
checkSuspended();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,9 @@ public void testWithOnlyMinSessionTimeout() throws Exception {
assertEquals(maxSessionTimeOut, quorumPeer.getMaxSessionTimeout(), "maximumSessionTimeOut is wrong");
}

/**
* Verify that failed txn in isolated leader got truncated after rejoining quorum.
*/
@Test
public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
final int LEADER_TIMEOUT_MS = 10_000;
Expand All @@ -729,6 +732,8 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
// increase the tick time to delay the leader going to looking
int previousTick = servers.mt[leader].main.quorumPeer.tickTime;
servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
// isolate it from other quorum members by prevent it from rejoining
servers.mt[leader].getQuorumPeer().setSuspended(true);
// let the previous tick on the leader exhaust itself so the new tick time takes effect
Thread.sleep(previousTick);
LOG.warn("LEADER {}", leader);
Expand All @@ -739,34 +744,18 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
}
}

// 3. start up the followers to form a new quorum
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
servers.mt[i].start();
}
}

// 4. wait one of the follower to be the new leader
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
// Recreate a client session since the previous session was not persisted.
servers.restartClient(i, this);
waitForOne(servers.zk[i], States.CONNECTED);
}
}

// 5. send a create request to old leader and make sure it's synced to disk,
// 3. send a create request to old leader and make sure it's synced to disk,
// which means it acked from itself
try {
servers.zk[leader].create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
fail("create /zk" + leader + " should have failed");
} catch (KeeperException e) {
} catch (KeeperException ignored) {
}

// just make sure that we actually did get it in process at the
// leader
// just make sure that we actually did get it in process at the leader
//
// there can be extra sessionClose proposals
assertTrue(outstanding.size() > 0);
assertFalse(outstanding.isEmpty());
Proposal p = findProposalOfType(outstanding, OpCode.create);
LOG.info("Old leader id: {}. All proposals: {}", leader, outstanding);
assertNotNull(p, "Old leader doesn't have 'create' proposal");
Expand All @@ -782,36 +771,73 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
sleepTime += 100;
}

// 6. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum
LOG.info("Waiting for leader {} to timeout followers", leader);
// 4. start up the followers to form a new quorum
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
servers.mt[i].start();
}
}

// 5. wait one of the follower to be the new leader
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
// Recreate a new client session to avoid ConnectionLoss as connecting server is restarted.
servers.restartClient(i, this);
waitForOne(servers.zk[i], States.CONNECTED);
}
}

// 6. make sure new quorum does not replicate the failed txn
for (int i = 0; i < SERVER_COUNT; i++) {
if (i == leader) {
continue;
}
assertNull(servers.zk[i].exists("/zk" + leader, false),
"server " + i + " should not have /zk" + leader);
}

// resume election to rejoin the cluster
servers.mt[leader].getQuorumPeer().setSuspended(false);

// 7. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum
LOG.info("Waiting for leader {} to timeout and rejoin as follower", leader);
sleepTime = 0;
Follower f = servers.mt[leader].main.quorumPeer.follower;
while (f == null || !f.isRunning()) {
if (sleepTime > LEADER_TIMEOUT_MS * 2) {
fail("Took too long for old leader to time out "
while (servers.mt[leader].getQuorumPeer().getPeerState() != QuorumPeer.ServerState.FOLLOWING) {
if (sleepTime > LEADER_TIMEOUT_MS * 10 * 2) {
fail("Took too long for old leader to time out and rejoin "
+ servers.mt[leader].main.quorumPeer.getPeerState());
}
Thread.sleep(100);
sleepTime += 100;
f = servers.mt[leader].main.quorumPeer.follower;
}

int newLeader = servers.findLeader();
// make sure a different leader was elected
assertNotEquals(leader, newLeader);

// 7. restart the previous leader to force it to replay the edits and possibly come up in a bad state
servers.mt[leader].shutdown();
servers.mt[leader].start();
// old client session can expire, restart it
// Now, all preconditions meet. Let's verify that the failed txn got truncated in whole cluster.

boolean restarted = false;
servers.restartClient(leader, this);
waitForAll(servers, States.CONNECTED);
waitForOne(servers.zk[leader], States.CONNECTED);
while (true) {
// 7. make sure everything is consistent, that is the failed txn got truncated in old leader.
for (int i = 0; i < SERVER_COUNT; i++) {
assertNull(servers.zk[i].exists("/zk" + leader, false),
"server " + i + " should not have /zk" + leader);
}

// 8. check the node exist in previous leader but not others
// make sure everything is consistent
for (int i = 0; i < SERVER_COUNT; i++) {
assertNull(servers.zk[i].exists("/zk" + leader, false),
"server " + i + " should not have /zk" + leader);
if (restarted) {
break;
}

// 8. make sure above holds after restart
servers.mt[leader].shutdown();
servers.mt[leader].start();
// old client session can expire, restart it
servers.restartClient(leader, this);
waitForAll(servers, States.CONNECTED);
restarted = true;
}
}

Expand Down

0 comments on commit a8eb7fa

Please sign in to comment.