From a8eb7faa34e90c748f5f49f211a6dbad78c16f0b Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Sat, 4 Jan 2025 06:16:55 +0800 Subject: [PATCH] ZOOKEEPER-3624: Fix flaky `QuorumPeerMainTest::testFailedTxnAsPartOfQuorumLoss` Reviewers: anmolnar Author: kezhuw Closes #2204 from kezhuw/ZOOKEEPER-3624-fix-flaky-testFailedTxnAsPartOfQuorumLoss --- .../zookeeper/server/quorum/QuorumPeer.java | 1 + .../server/quorum/QuorumPeerMainTest.java | 102 +++++++++++------- 2 files changed, 65 insertions(+), 38 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 876a297f9aa..786450d35a5 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -1578,6 +1578,7 @@ public void run() { } else { try { reconfigFlagClear(); + checkSuspended(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index 6c0f4f926dd..eb4966e7512 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -706,6 +706,9 @@ public void testWithOnlyMinSessionTimeout() throws Exception { assertEquals(maxSessionTimeOut, quorumPeer.getMaxSessionTimeout(), "maximumSessionTimeOut is wrong"); } + /** + * Verify that failed txn in isolated leader got truncated after rejoining quorum. + */ @Test public void testFailedTxnAsPartOfQuorumLoss() throws Exception { final int LEADER_TIMEOUT_MS = 10_000; @@ -729,6 +732,8 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception { // increase the tick time to delay the leader going to looking int previousTick = servers.mt[leader].main.quorumPeer.tickTime; servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS; + // isolate it from other quorum members by prevent it from rejoining + servers.mt[leader].getQuorumPeer().setSuspended(true); // let the previous tick on the leader exhaust itself so the new tick time takes effect Thread.sleep(previousTick); LOG.warn("LEADER {}", leader); @@ -739,34 +744,18 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception { } } - // 3. start up the followers to form a new quorum - for (int i = 0; i < SERVER_COUNT; i++) { - if (i != leader) { - servers.mt[i].start(); - } - } - - // 4. wait one of the follower to be the new leader - for (int i = 0; i < SERVER_COUNT; i++) { - if (i != leader) { - // Recreate a client session since the previous session was not persisted. - servers.restartClient(i, this); - waitForOne(servers.zk[i], States.CONNECTED); - } - } - - // 5. send a create request to old leader and make sure it's synced to disk, + // 3. send a create request to old leader and make sure it's synced to disk, // which means it acked from itself try { servers.zk[leader].create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); fail("create /zk" + leader + " should have failed"); - } catch (KeeperException e) { + } catch (KeeperException ignored) { } - // just make sure that we actually did get it in process at the - // leader + // just make sure that we actually did get it in process at the leader + // // there can be extra sessionClose proposals - assertTrue(outstanding.size() > 0); + assertFalse(outstanding.isEmpty()); Proposal p = findProposalOfType(outstanding, OpCode.create); LOG.info("Old leader id: {}. All proposals: {}", leader, outstanding); assertNotNull(p, "Old leader doesn't have 'create' proposal"); @@ -782,36 +771,73 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception { sleepTime += 100; } - // 6. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum - LOG.info("Waiting for leader {} to timeout followers", leader); + // 4. start up the followers to form a new quorum + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + servers.mt[i].start(); + } + } + + // 5. wait one of the follower to be the new leader + for (int i = 0; i < SERVER_COUNT; i++) { + if (i != leader) { + // Recreate a new client session to avoid ConnectionLoss as connecting server is restarted. + servers.restartClient(i, this); + waitForOne(servers.zk[i], States.CONNECTED); + } + } + + // 6. make sure new quorum does not replicate the failed txn + for (int i = 0; i < SERVER_COUNT; i++) { + if (i == leader) { + continue; + } + assertNull(servers.zk[i].exists("/zk" + leader, false), + "server " + i + " should not have /zk" + leader); + } + + // resume election to rejoin the cluster + servers.mt[leader].getQuorumPeer().setSuspended(false); + + // 7. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum + LOG.info("Waiting for leader {} to timeout and rejoin as follower", leader); sleepTime = 0; - Follower f = servers.mt[leader].main.quorumPeer.follower; - while (f == null || !f.isRunning()) { - if (sleepTime > LEADER_TIMEOUT_MS * 2) { - fail("Took too long for old leader to time out " + while (servers.mt[leader].getQuorumPeer().getPeerState() != QuorumPeer.ServerState.FOLLOWING) { + if (sleepTime > LEADER_TIMEOUT_MS * 10 * 2) { + fail("Took too long for old leader to time out and rejoin " + servers.mt[leader].main.quorumPeer.getPeerState()); } Thread.sleep(100); sleepTime += 100; - f = servers.mt[leader].main.quorumPeer.follower; } int newLeader = servers.findLeader(); // make sure a different leader was elected assertNotEquals(leader, newLeader); - // 7. restart the previous leader to force it to replay the edits and possibly come up in a bad state - servers.mt[leader].shutdown(); - servers.mt[leader].start(); - // old client session can expire, restart it + // Now, all preconditions meet. Let's verify that the failed txn got truncated in whole cluster. + + boolean restarted = false; servers.restartClient(leader, this); - waitForAll(servers, States.CONNECTED); + waitForOne(servers.zk[leader], States.CONNECTED); + while (true) { + // 7. make sure everything is consistent, that is the failed txn got truncated in old leader. + for (int i = 0; i < SERVER_COUNT; i++) { + assertNull(servers.zk[i].exists("/zk" + leader, false), + "server " + i + " should not have /zk" + leader); + } - // 8. check the node exist in previous leader but not others - // make sure everything is consistent - for (int i = 0; i < SERVER_COUNT; i++) { - assertNull(servers.zk[i].exists("/zk" + leader, false), - "server " + i + " should not have /zk" + leader); + if (restarted) { + break; + } + + // 8. make sure above holds after restart + servers.mt[leader].shutdown(); + servers.mt[leader].start(); + // old client session can expire, restart it + servers.restartClient(leader, this); + waitForAll(servers, States.CONNECTED); + restarted = true; } }