Skip to content

Commit

Permalink
Fix the flaky test TestLeaderElection
Browse files Browse the repository at this point in the history
  • Loading branch information
dengzhhu653 committed Jan 16, 2024
1 parent 06ef7c8 commit dcb5af6
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ public enum ConfVars {
"metastore.housekeeping.leader.election",
"host", new StringSetValidator("host", "lock"),
"Set to host, HMS will choose the leader by the configured metastore.housekeeping.leader.hostname.\n" +
"Set to lock, HMS will use the hive lock to elect the leader."),
"Set to lock, HMS will use the Hive lock to elect the leader."),
METASTORE_HOUSEKEEPING_LEADER_AUDITTABLE("metastore.housekeeping.leader.auditTable",
"metastore.housekeeping.leader.auditTable", "",
"Audit the leader election event to a plain json table when configured."),
Expand All @@ -593,6 +593,9 @@ public enum ConfVars {
"metastore.housekeeping.leader.auditFiles.limit", 10,
"Limit the number of small audit files when metastore.housekeeping.leader.newAuditFile is true.\n" +
"If the number of audit files exceeds the limit, then the oldest will be deleted."),
METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE("metastore.housekeeping.leader.lock.namespace",
"metastore.housekeeping.leader.lock.namespace", "",
"The database where the Hive lock sits when metastore.housekeeping.leader.election is set to lock."),
METASTORE_HOUSEKEEPING_THREADS_ON("metastore.housekeeping.threads.on",
"hive.metastore.housekeeping.threads.on", false,
"Whether to run the tasks under metastore.task.threads.remote on this metastore instance or not.\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ public class LeaderElectionContext {
* For those tasks which belong to the same type, they will be running in the same leader.
*/
public enum TTYPE {
HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
"metastore_housekeeping_leader"), "housekeeping"),
WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
"metastore_worker_leader"), "compactor_worker"),
ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
"metastore_always_tasks_leader"), "always_tasks");
HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "__METASTORE_LEADER_ELECTION__",
"metastore_housekeeping"), "housekeeping"),
WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "__METASTORE_LEADER_ELECTION__",
"metastore_compactor_worker"), "compactor_worker"),
ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "__METASTORE_LEADER_ELECTION__",
"metastore_always_tasks"), "always_tasks");
// Mutex of TTYPE, which can be a nonexistent table
private final TableName mutex;
// Name of TTYPE
Expand Down Expand Up @@ -127,9 +127,10 @@ public void start() throws Exception {
throw new RuntimeException("Error claiming to be leader: " + leaderElection.getName(), e);
}
});
daemon.setName("Metastore Election " + leaderElection.getName());
daemon.setDaemon(true);

if (startAsDaemon) {
daemon.setName("Leader-Election-" + leaderElection.getName());
daemon.setDaemon(true);
daemon.start();
} else {
daemon.run();
Expand All @@ -154,7 +155,13 @@ public static Object getLeaderMutex(Configuration conf, TTYPE ttype, String serv
case "host":
return servHost;
case "lock":
return ttype.getTableName();
TableName mutex = ttype.getTableName();
String namespace =
MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE);
if (StringUtils.isNotEmpty(namespace)) {
return new TableName(mutex.getCat(), namespace, mutex.getTable());
}
return mutex;
default:
throw new UnsupportedOperationException(method + " not supported for leader election");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.hive.metastore.leader;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;

Expand All @@ -26,7 +28,7 @@
*/
public class LeaderElectionFactory {

public static LeaderElection create(Configuration conf) {
public static LeaderElection create(Configuration conf) throws IOException {
String method =
MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION);
switch (method.toLowerCase()) {
Expand All @@ -35,7 +37,7 @@ public static LeaderElection create(Configuration conf) {
case "lock":
return new LeaseLeaderElection();
default:
throw new UnsupportedOperationException("Do not support " + method + " now");
throw new UnsupportedOperationException(method + " is not supported for electing the leader");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
Expand All @@ -42,6 +43,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -92,9 +94,17 @@ public class LeaseLeaderElection implements LeaderElection<TableName> {
public static final String METASTORE_RENEW_LEASE = "metastore.renew.leader.lease";

private String name;
private String userName;
private String hostName;

private void doWork(LockResponse resp, Configuration conf,
LeaseLeaderElection() throws IOException {
userName = SecurityUtils.getUser();
hostName = InetAddress.getLocalHost().getHostName();
}

private synchronized void doWork(LockResponse resp, Configuration conf,
TableName tableName) throws LeaderException {
long start = System.currentTimeMillis();
lockId = resp.getLockid();
assert resp.getState() == LockState.ACQUIRED || resp.getState() == LockState.WAITING;
shutdownWatcher();
Expand All @@ -121,6 +131,7 @@ private void doWork(LockResponse resp, Configuration conf,
default:
throw new IllegalStateException("Unexpected lock state: " + resp.getState());
}
LOG.debug("Spent {}ms to notify the listeners, isLeader: {}", System.currentTimeMillis() - start, isLeader);
}

private void notifyListener() {
Expand All @@ -142,13 +153,6 @@ private void notifyListener() {
public void tryBeLeader(Configuration conf, TableName table) throws LeaderException {
requireNonNull(conf, "conf is null");
requireNonNull(table, "table is null");
String user, hostName;
try {
user = SecurityUtils.getUser();
hostName = InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
throw new LeaderException("Error while getting the username", e);
}

if (store == null) {
store = TxnUtils.getTxnStore(conf);
Expand All @@ -165,7 +169,7 @@ public void tryBeLeader(Configuration conf, TableName table) throws LeaderExcept
boolean lockable = false;
Exception recentException = null;
long start = System.currentTimeMillis();
LockRequest req = new LockRequest(components, user, hostName);
LockRequest req = new LockRequest(components, userName, hostName);
int numRetries = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.LOCK_NUMRETRIES);
long maxSleep = MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
Expand All @@ -175,6 +179,7 @@ public void tryBeLeader(Configuration conf, TableName table) throws LeaderExcept
if (res.getState() == LockState.WAITING || res.getState() == LockState.ACQUIRED) {
lockable = true;
doWork(res, conf, table);
LOG.debug("Spent {}ms to lock the table {}, retries: {}", System.currentTimeMillis() - start, table, i);
break;
}
} catch (NoSuchTxnException | TxnAbortedException e) {
Expand Down Expand Up @@ -324,6 +329,7 @@ public void runInternal() {
} catch (NoSuchTxnException | TxnAbortedException e) {
throw new AssertionError("This should not happen, we didn't open txn", e);
} catch (NoSuchLockException e) {
LOG.info("No such lock {} for NonLeaderWatcher, try to obtain the lock again...", lockId);
reclaim();
} catch (Exception e) {
// Wait for next cycle.
Expand Down Expand Up @@ -379,6 +385,7 @@ public void runInternal() {
} catch (NoSuchTxnException | TxnAbortedException e) {
throw new AssertionError("This should not happen, we didn't open txn", e);
} catch (NoSuchLockException e) {
LOG.info("No such lock {} for Heartbeater, try to obtain the lock again...", lockId);
reclaim();
} catch (Exception e) {
// Wait for next cycle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -35,6 +37,8 @@

public class TestLeaderElection {

private static final Logger LOG = LoggerFactory.getLogger(TestLeaderElection.class);

@Test
public void testConfigLeaderElection() throws Exception {
LeaderElection election = new StaticLeaderElection();
Expand Down Expand Up @@ -108,6 +112,7 @@ public void lossLeadership(LeaderElection election) {
// remove leader's lease (instance2)
long lockId2 = instance2.getLockId();
txnStore.unlock(new UnlockRequest(lockId2));
LOG.debug("Unlock the {} Successfully", lockId2);
Thread.sleep(4 * 1000);
assertTrue(flag1.get() && instance1.isLeader());
assertFalse(flag2.get() || instance2.isLeader());
Expand All @@ -117,6 +122,7 @@ public void lossLeadership(LeaderElection election) {
// remove leader's lease(instance1)
long lockId1 = instance1.getLockId();
txnStore.unlock(new UnlockRequest(lockId1));
LOG.debug("Unlock the {} Successfully", lockId1);
Thread.sleep(4 * 1000);
assertFalse(lockId1 == instance1.getLockId());
assertTrue(lockId1 > 0);
Expand Down

0 comments on commit dcb5af6

Please sign in to comment.