Skip to content

Commit

Permalink
ZOOKEEPER-4785: Txn loss due to race condition in Learner.syncWithLea…
Browse files Browse the repository at this point in the history
…der() during DIFF sync (#2111) (#2133)

Author: Li Wang <[email protected]>

Co-authored-by: liwang <[email protected]>
li4wang and liwang authored Feb 12, 2024
1 parent ebe136e commit 4a49d45
Showing 5 changed files with 394 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -702,14 +702,14 @@ private void sendNotifications() {
qv.toString().getBytes(UTF_8));

LOG.debug(
"Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"
+ " {} (myid), 0x{} (n.peerEpoch) ",
"Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.peerEpoch), 0x{} (n.round), {} (recipient),"
+ " {} (myid) ",
proposedLeader,
Long.toHexString(proposedZxid),
Long.toHexString(proposedEpoch),
Long.toHexString(logicalclock.get()),
sid,
self.getMyId(),
Long.toHexString(proposedEpoch));
self.getMyId());

sendqueue.offer(notmsg);
}
@@ -722,12 +722,13 @@ private void sendNotifications() {
*/
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug(
"id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}",
"id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}, epoch: 0x{}, proposed epoch: 0x{}",
newId,
curId,
Long.toHexString(newZxid),
Long.toHexString(curZxid));

Long.toHexString(curZxid),
Long.toHexString(newEpoch),
Long.toHexString(curEpoch));
if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}
Original file line number Diff line number Diff line change
@@ -80,14 +80,23 @@ protected void setupRequestProcessors() {
LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<>();

public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
request.setTxnDigest(digest);
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
final Request request = buildRequestToProcess(hdr, txn, digest);
syncProcessor.processRequest(request);
}

/**
* Build a request for the txn and append it to the transaction log
* @param hdr the txn header
* @param txn the txn
* @param digest the digest of txn
* @return a request moving through a chain of RequestProcessors
*/
public Request appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException {
final Request request = buildRequestToProcess(hdr, txn, digest);
getZKDatabase().append(request);
return request;
}

/**
* When a COMMIT message is received, eventually this method is called,
* which matches up the zxid from the COMMIT with (hopefully) the head of
@@ -181,4 +190,19 @@ protected void unregisterMetrics() {

}

/**
* Build a request for the txn
* @param hdr the txn header
* @param txn the txn
* @param digest the digest of txn
* @return a request moving through a chain of RequestProcessors
*/
private Request buildRequestToProcess(final TxnHeader hdr, final Record txn, final TxnDigest digest) {
final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
request.setTxnDigest(digest);
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
return request;
}
}
Original file line number Diff line number Diff line change
@@ -556,6 +556,8 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
Deque<Request> requestsToAck = new ArrayDeque<>();

synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
@@ -745,7 +747,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
zk.takeSnapshot(syncSnapshot);
}

self.setCurrentEpoch(newEpoch);

writeToTxnLog = true;
//Anything after this needs to go to the transaction log, not applied directly in memory
isPreZAB1_0 = false;
@@ -755,14 +757,27 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
long startTime = Time.currentElapsedTime();
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest);
requestsToAck.add(request);
}

// persist the txns to disk
fzk.getZKDatabase().commit();
LOG.info("{} txns have been persisted and it took {}ms",
packetsNotCommitted.size(), Time.currentElapsedTime() - startTime);
packetsNotCommitted.clear();
}

// set the current epoch after all the tnxs are persisted
self.setCurrentEpoch(newEpoch);
LOG.info("Set the current epoch to {}", newEpoch);

// send NEWLEADER ack after all the tnxs are persisted
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid));
break;
}
}
@@ -781,13 +796,25 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {

// We need to log the stuff that came in between the snapshot and the uptodate
if (zk instanceof FollowerZooKeeperServer) {
// reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader shutdown due to timeout
// on waiting for a quorum of followers
for (final Request request : requestsToAck) {
final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, request.getHdr().getZxid(), null, null);
writePacket(ackPacket, false);
}
writePacket(null, true);
requestsToAck.clear();

FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
LOG.info("{} txns have been logged asynchronously", packetsNotCommitted.size());

for (Long zxid : packetsCommitted) {
fzk.commit(zxid);
}
LOG.info("{} txns have been committed", packetsCommitted.size());
} else if (zk instanceof ObserverZooKeeperServer) {
// Similar to follower, we need to log requests between the snapshot
// and UPTODATE
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper.server.quorum;

import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.security.sasl.SaslException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerListener;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class DIFFSyncTest extends QuorumPeerTestBase {
private static final int SERVER_COUNT = 3;
private static final String PATH_PREFIX = "/test_";

private int[] clientPorts;
private MainThread[] mt;
private ZooKeeper[] zkClients;

@BeforeEach
public void start() throws Exception {
clientPorts = new int[SERVER_COUNT];
mt = startQuorum(clientPorts);
zkClients = new ZooKeeper[SERVER_COUNT];
}

@AfterEach
public void tearDown() throws Exception{
for (final ZooKeeper zk : zkClients) {
try {
if (zk != null) {
zk.close();
}
} catch (final InterruptedException e) {
LOG.warn("ZooKeeper interrupted while shutting it down", e);
}
}

for (final MainThread mainThread : mt) {
try {
mainThread.shutdown();
} catch (final InterruptedException e) {
LOG.warn("Quorum Peer interrupted while shutting it down", e);
}
}
}

@Test
@Timeout(value = 120)
public void testTxnLoss_FailToPersistAndCommitTxns() throws Exception {
final List<String> paths = new ArrayList<>();
assertEquals(2, mt[2].getQuorumPeer().getLeaderId());

// create a ZK client to the leader (currentEpoch=1, lastLoggedZxid=<1, 1>)
createZKClient(2);

// create a znode (currentEpoch=1, lastLoggedZxid=<1, 2>)
paths.add(createNode(zkClients[2], PATH_PREFIX + "0"));

// shut down S0
mt[0].shutdown();
LOG.info("S0 shutdown.");

// create a znode (currentEpoch=1, lastLoggedZxid=<1, 3>), so S0 is 1 txn behind
paths.add(createNode(zkClients[2], PATH_PREFIX + "1"));
logEpochsAndLastLoggedTxnForAllServers();

// shut down S1
mt[1].shutdown();
LOG.info("S1 shutdown.");

// restart S0 and trigger a new leader election (currentEpoch=2)
// S0 starts with MockSyncRequestProcessor and MockCommitProcessor to simulate it writes the
// currentEpoch and sends NEWLEADER ACK but fails to persist and commit txns afterwards
// in DIFF sync
mt[0].start(new MockTestQPMain());
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0], CONNECTION_TIMEOUT),
"waiting for server 0 being up");
LOG.info("S0 restarted.");
logEpochsAndLastLoggedTxnForAllServers();

// validate S2 is still the leader
assertEquals(2, mt[2].getQuorumPeer().getLeaderId());

// shut down the leader (i.e. S2). This causes S0 disconnects from leader, performs partial
// shutdown, fast forwards its database to the latest persisted tnx (i.e. <1, 3>) and change
// its state to LOOKING
mt[2].shutdown();
LOG.info("S2 shutdown.");

// start S1 and trigger a leader election (currentEpoch=3)
mt[1].start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[1], CONNECTION_TIMEOUT),
"waiting for server 1 being up");
LOG.info("S1 restarted.");
logEpochsAndLastLoggedTxnForAllServers();

// validate S0 is the new leader because of it has higher epoch
assertEquals(0, mt[0].getQuorumPeer().getLeaderId());

// connect to the new leader (i.e. S0) (currentEpoch=3, lastLoggedZxid=<3, 1>
createZKClient(0);

// create a znode (currentEpoch=3, lastLoggedZxid=<3, 2>)
paths.add(createNode(zkClients[0], PATH_PREFIX + "3"));

// start S2 which is the old leader
mt[2].start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2], CONNECTION_TIMEOUT),
"waiting for server " + 2 + " being up");
LOG.info("S2 restarted.");
logEpochsAndLastLoggedTxnForAllServers();

// validate all the znodes exist from all the clients
validateDataFromAllClients(paths);
}

@Test
@Timeout(value = 120)
public void testLeaderShutdown_AckProposalBeforeAckNewLeader() throws Exception {
assertEquals(2, mt[2].getQuorumPeer().getLeaderId());

// create a ZK client to the leader (currentEpoch=1, lastLoggedZxid=<1, 1>)
createZKClient(2);

// create a znode (currentEpoch=1, lastLoggedZxid=<1, 2>)
createNode(zkClients[2], PATH_PREFIX + "0");

// shut down S0
mt[0].shutdown();
LOG.info("S0 shutdown.");

// create a znode (currentEpoch=1, lastLoggedZxid=<1, 3>), so S0 is 1 txn behind
createNode(zkClients[2], PATH_PREFIX + "1");
logEpochsAndLastLoggedTxnForAllServers();

// shut down S1
mt[1].shutdown();
LOG.info("S1 shutdown.");

// restart S0 and trigger a new leader election and DIFF sync (currentEpoch=2)
mt[0].start();
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0], CONNECTION_TIMEOUT),
"waiting for server 0 being up");
LOG.info("S0 restarted.");

// create a znode (currentEpoch=2, lastLoggedZxid=<2, 1>)
createNode(zkClients[2], PATH_PREFIX + "2");

// validate quorum is up without additional round of leader election
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != 1) {
final QuorumPeer qp = mt[i].getQuorumPeer();
assertNotNull(qp);
assertEquals(2, qp.getCurrentEpoch());
assertEquals(2, qp.getAcceptedEpoch());
assertEquals("200000001", Long.toHexString(qp.getLastLoggedZxid()));
}
}
}

private MainThread[] startQuorum(final int[] clientPorts) throws IOException {
final StringBuilder sb = new StringBuilder();
String server;

for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ ":participant;127.0.0.1:" + clientPorts[i];
sb.append(server);
sb.append("\n");
}

final MainThread[] mt = new MainThread[SERVER_COUNT];

// start all the servers
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i] = new MainThread(i, clientPorts[i], sb.toString(), false);
mt[i].start();
}

// ensure all servers started
for (int i = 0; i < SERVER_COUNT; i++) {
assertTrue(
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT),
"waiting for server " + i + " being up");
}
return mt;
}

private void createZKClient(final int idx) throws Exception {
zkClients[idx] = null;
final ClientBase.CountdownWatcher watch = new ClientBase.CountdownWatcher();
zkClients[idx] = new ZooKeeper("127.0.0.1:" + clientPorts[idx], ClientBase.CONNECTION_TIMEOUT, watch);
watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
}

private String createNode(final ZooKeeper zk, final String path) throws Exception {
final String fullPath = zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertNotNull(zk.exists(path, false));
return fullPath;
}

private static class MockTestQPMain extends TestQPMain {
@Override
protected QuorumPeer getQuorumPeer() throws SaslException {
return new TestQuorumPeer();
}
}

private static class TestQuorumPeer extends QuorumPeer {
public TestQuorumPeer() throws SaslException {
}

@Override
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
final FollowerZooKeeperServer followerZookeeperServer = new FollowerZooKeeperServer(logFactory, this, this.getZkDb()) {
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new MockCommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();

firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new MockSyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));

syncProcessor.start();
}
};
return new Follower(this, followerZookeeperServer);
}
}

private static class MockSyncRequestProcessor extends SyncRequestProcessor {
public MockSyncRequestProcessor(final ZooKeeperServer zks, final RequestProcessor nextProcessor) {
super(zks, nextProcessor);
}

@Override
public void processRequest(final Request request) {
LOG.info("Sync request for zxid {} is dropped", Long.toHexString(request.getHdr().getZxid()));
}
}

private static class MockCommitProcessor extends CommitProcessor {
public MockCommitProcessor(final RequestProcessor nextProcessor, final String id,
final boolean matchSyncs, final ZooKeeperServerListener listener) {

super(nextProcessor, id, matchSyncs, listener);
}

@Override
public void commit(final Request request) {
LOG.info("Commit request for zxid {} is dropped", Long.toHexString(request.getHdr().getZxid()));
}
}

private void logEpochsAndLastLoggedTxnForAllServers() throws Exception {
for (int i = 0; i < SERVER_COUNT; i++) {
final QuorumPeer qp = mt[i].getQuorumPeer();
if (qp != null) {
LOG.info(String.format("server id=%d, acceptedEpoch=%d, currentEpoch=%d, lastLoggedTxn=%s",
qp.getMyId(), qp.getAcceptedEpoch(),
qp.getCurrentEpoch(), Long.toHexString(qp.getLastLoggedZxid())));
}
}
}

private void validateDataFromAllClients(final List<String> paths) throws Exception{
for (int i = 0; i < SERVER_COUNT; i++) {
if (zkClients[i] == null) {
createZKClient(i);
}

for (final String path : paths) {
assertNotNull(zkClients[i].exists(path, false), "znode " + path + " is missing");
}
assertEquals(3, paths.size());
}
}
}
Original file line number Diff line number Diff line change
@@ -318,6 +318,18 @@ public synchronized void start() {
currentThread.start();
}

/**
* start the QuorumPeer with the passed TestQPMain
*
* @param testQPMain the TestQPMain to use
*/

public synchronized void start(final TestQPMain testQPMain) {
main = testQPMain;
currentThread = new Thread(this);
currentThread.start();
}

public TestQPMain getTestQPMain() {
return new TestQPMain();
}

0 comments on commit 4a49d45

Please sign in to comment.