Skip to content

Commit

Permalink
Eng 13881 (#5440)
Browse files Browse the repository at this point in the history
ENG-13880, ENG-13881: 
On SPI migration, the DR partition buffer receiver(s) on old leader must be shutdown and DR partition buffer receiver must be started on the new leader. We must also make sure that the new leader buffer receiver is started only after the old has shutdown.

On SPI migration, DR receives 3 callbacks:

leader migration started - This callback happens only on the old leader. This will add a ZK blocker per producer-cluster and consumer partition id
leader migration failed - This callback happens only on the old leader. This will remove the ZK blocker that was added by 'started' callback
leader migration completed: This will be received on both old leader and new leader.
old leader will shutdown the receivers corresponding to this partition and once the receivers are shutdown, deletes the ZK node (description on how shutdown happens is below)
new leader will initiate repairCompleted, which will wait for the ZK blocker to be gone before it starts receivers on the new leader for the partitions
Shutting down partition receivers:

Pending and rate limited queues for the receivers are cleared (anything that hasn't been sent to the client interface will be removed).
send a message to consumer mp coordinator to reneg any MP txns for the producer partitions.
if there are any mp txns for the partitions waiting to get data from other partitions, they will be removed
if there are any mp txns for the partitions that were already sent down to the client interface, we will wait to receive a responses for those
once reneg work is complete, receivers are shutdown so that they won't accept anymore binary log data
once all the receivers are done, we delete the ZK blocker node
  • Loading branch information
manjujames authored Jul 5, 2018
1 parent 90fd371 commit b0980ed
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 27 deletions.
56 changes: 52 additions & 4 deletions src/frontend/org/voltdb/ClientInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -2201,7 +2201,7 @@ void startMigratePartitionLeader() {
int partitionKey = -1;

//MigratePartitionLeader is completed or there are hosts down. Stop MigratePartitionLeader service on this host
if (targetHostId == -1 || !voltDB.isClusterCompelte()) {
if (targetHostId == -1 || !voltDB.isClusterComplete()) {
voltDB.scheduleWork(
() -> {m_mailbox.deliver(new MigratePartitionLeaderMessage());},
0, 0, TimeUnit.SECONDS);
Expand Down Expand Up @@ -2255,13 +2255,20 @@ void startMigratePartitionLeader() {
spi = MiscUtils.roundTripForCL(spi);
}

long targetHSId = m_cartographer.getHSIDForPartitionHost(targetHostId, partitionId);
//Info saved for the node failure handling
MigratePartitionLeaderInfo spiInfo = new MigratePartitionLeaderInfo(
m_cartographer.getHSIDForPartitionHost(hostId, partitionId),
m_cartographer.getHSIDForPartitionHost(targetHostId, partitionId),
targetHSId,
partitionId);
VoltZK.createMigratePartitionLeaderInfo(m_zk, spiInfo);

notifyPartitionMigrationStatus(partitionId, targetHSId, false);

if (Boolean.getBoolean("TEST_MIGRATION_FAILURE")) {
Thread.sleep(100);
throw new IOException("failure simulation");
}
synchronized (m_executeTaskAdpater) {
createTransaction(m_executeTaskAdpater.connectionId(),
spi,
Expand All @@ -2282,29 +2289,36 @@ void startMigratePartitionLeader() {
//not necessary a failure.
tmLog.warn(String.format("Fail to move the leader of partition %d to host %d. %s",
partitionId, targetHostId, resp.getStatusString()));
notifyPartitionMigrationStatus(partitionId, targetHSId, true);
}
} catch (IOException | InterruptedException e) {
tmLog.warn(String.format("errors in leader change for partition %d: %s", partitionId, e.getMessage()));
notifyPartitionMigrationStatus(partitionId,
m_cartographer.getHSIDForPartitionHost(targetHostId, partitionId),
true);
} finally {
//wait for the Cartographer to see the new partition leader. The leader promotion process should happen instantly.
//If the new leader does not show up in 5 min, the cluster may have experienced host-down events.
long remainingWaitTime = TimeUnit.MINUTES.toMillis(5);
final long waitingInterval = TimeUnit.SECONDS.toMillis(1);
boolean anyFailedHosts = false;
boolean migrationComplete = false;
while (remainingWaitTime > 0) {
try {
Thread.sleep(waitingInterval);
} catch (InterruptedException ignoreIt) {
}
remainingWaitTime -= waitingInterval;
if (CoreUtils.getHostIdFromHSId(m_cartographer.getHSIdForMaster(partitionId)) == targetHostId) {
migrationComplete = true;
break;
}

//some hosts may be down.
if (!voltDB.isClusterCompelte()) {
if (!voltDB.isClusterComplete()) {
anyFailedHosts = true;
break;
// If the target host is still alive, migration is still going on.
if (!voltDB.getHostMessenger().getLiveHostIds().contains(targetHostId)) break;
}
}

Expand All @@ -2314,6 +2328,40 @@ void startMigratePartitionLeader() {
() -> {VoltZK.removeActionBlocker(m_zk, VoltZK.migratePartitionLeaderBlocker, tmLog);},
5, 0, TimeUnit.SECONDS);
}

if (!migrationComplete) {
notifyPartitionMigrationStatus(partitionId,
m_cartographer.getHSIDForPartitionHost(targetHostId, partitionId),
true);
}
}
}

private void notifyPartitionMigrationStatus(int partitionId, long targetHSId, boolean failed) {
for (final ClientInterfaceHandleManager cihm : m_cihm.values()) {
try {
cihm.connection.queueTask(new Runnable() {
@Override
public void run() {
if (cihm.repairCallback != null) {
if (failed) {
cihm.repairCallback.leaderMigrationFailed(partitionId, targetHSId);
} else {
cihm.repairCallback.leaderMigrationStarted(partitionId, targetHSId);
}
}
}
});
} catch (UnsupportedOperationException ignore) {
// In case some internal connections don't implement queueTask()
if (cihm.repairCallback != null) {
if (failed) {
cihm.repairCallback.leaderMigrationFailed(partitionId, targetHSId);
} else {
cihm.repairCallback.leaderMigrationStarted(partitionId, targetHSId);
}
}
}
}
}

Expand Down
16 changes: 15 additions & 1 deletion src/frontend/org/voltdb/ClientInterfaceRepairCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,24 @@ public interface ClientInterfaceRepairCallback {
public void repairCompleted(int partitionId, long initiatorHSId);

/**
* Callback invoked when managed leadership migration occurs. There should
* Callback invoked when managed leadership migration is initiated.
* @param partitionId The partition ID
* @param initiatorHSId The target leader's HSID
*/
default void leaderMigrationStarted(int partitionId, long initiatorHSId) {}

/**
* Callback invoked when managed leadership migration completes. There should
* be no transaction to repair in this case.
* @param partitionId The partition ID
* @param initiatorHSId The new leader's HSID
*/
default void leaderMigrated(int partitionId, long initiatorHSId) {}

/**
* Callback invoked when managed leadership migration fails.
* @param partitionId The partition ID
* @param initiatorHSId The target leader's HSID
*/
default void leaderMigrationFailed(int partitionId, long initiatorHSId) {}
}
5 changes: 3 additions & 2 deletions src/frontend/org/voltdb/RealVoltDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -2193,7 +2193,7 @@ public void run() {
GCInspector.instance.start(m_periodicPriorityWorkThread, m_gcStats);
}

public boolean isClusterCompelte() {
public boolean isClusterComplete() {
return (m_config.m_hostCount == m_messenger.getLiveHostIds().size());
}

Expand All @@ -2205,7 +2205,7 @@ private void startMigratePartitionLeaderTask() {
}

//MigratePartitionLeader service will be started up only after the last rejoining has finished
if(!isClusterCompelte() || m_config.m_hostCount == 1 || m_configuredReplicationFactor == 0) {
if(!isClusterComplete() || m_config.m_hostCount == 1 || m_configuredReplicationFactor == 0) {
return;
}

Expand All @@ -2221,6 +2221,7 @@ private void startMigratePartitionLeaderTask() {
int hostId = it.next();
final int currentMasters = m_cartographer.getMasterCount(hostId);
if (currentMasters > minimalNumberOfLeaders) {
hostLog.debug("Host " + hostId + " has more than " + minimalNumberOfLeaders + ". Sending migrate partition message");
m_messenger.send(CoreUtils.getHSIdFromHostAndSite(hostId,
HostMessenger.CLIENT_INTERFACE_SITE_ID), msg);
}
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/org/voltdb/VoltZK.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public class VoltZK {
public static final String perPartitionTxnIds = "/db/perPartitionTxnIds";
public static final String operationMode = "/db/operation_mode";
public static final String exportGenerations = "/db/export_generations";
public static final String importerBase = "/db/import";

// configuration (ports, interfaces, ...)
public static final String cluster_metadata = "/db/cluster_metadata";
Expand Down Expand Up @@ -99,6 +98,7 @@ public static enum MailboxType {

// root for MigratePartitionLeader information nodes
public static final String migrate_partition_leader_info = "/core/migrate_partition_leader_info";
public static final String drConsumerPartitionMigration = "/db/dr_consumer_partition_migration";

public static final String iv2masters = "/db/iv2masters";
public static final String iv2appointees = "/db/iv2appointees";
Expand Down Expand Up @@ -210,6 +210,7 @@ private static final void printZKDir(ZooKeeper zk, String dir, StringBuilder bui
root,
mailboxes,
cluster_metadata,
drConsumerPartitionMigration,
operationMode,
iv2masters,
iv2appointees,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/org/voltdb/iv2/InitiatorMailbox.java
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ private void initiateSPIMigrationIfRequested(Iv2InitiateTaskMessage msg) {
}

//one more check to make sure all the hosts are up before any changes are made
if (!db.isClusterCompelte()) {
if (!db.isClusterComplete()) {
return;
}

Expand Down
19 changes: 15 additions & 4 deletions src/frontend/org/voltdb/messaging/Dr2MultipartResponseMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
public class Dr2MultipartResponseMessage extends VoltMessage {

private boolean m_drain;
private boolean m_reneg;
private byte m_producerClusterId;
private int m_producerPID;
private ClientResponseImpl m_response;
Expand All @@ -36,14 +37,17 @@ public class Dr2MultipartResponseMessage extends VoltMessage {

public Dr2MultipartResponseMessage(byte producerClusterId, int producerPID, ClientResponseImpl response) {
m_drain = false;
m_reneg = false;
m_producerClusterId = producerClusterId;
m_producerPID = producerPID;
m_response = response;
}

public static Dr2MultipartResponseMessage createDrainMessage(byte producerClusterId, int producerPID) {
public static Dr2MultipartResponseMessage createDrainOrRenegMessage(byte producerClusterId, int producerPID,
boolean withDrainFlag, boolean withRenegFlag) {
final Dr2MultipartResponseMessage msg = new Dr2MultipartResponseMessage();
msg.m_drain = true;
msg.m_drain = withDrainFlag;
msg.m_reneg = withRenegFlag;
msg.m_producerClusterId = producerClusterId;
msg.m_producerPID = producerPID;
msg.m_response = null;
Expand All @@ -66,9 +70,14 @@ public boolean isDrain() {
return m_drain;
}

public boolean isReneg() {
return m_reneg;
}

@Override
protected void initFromBuffer(ByteBuffer buf) throws IOException {
m_drain = buf.get() == 1;
m_reneg = buf.get() == 1;
m_producerClusterId = buf.get();
m_producerPID = buf.getInt();
if (buf.remaining() > 0) {
Expand All @@ -81,10 +90,11 @@ protected void initFromBuffer(ByteBuffer buf) throws IOException {
public void flattenToBuffer(ByteBuffer buf) throws IOException {
buf.put(VoltDbMessageFactory.DR2_MULTIPART_RESPONSE_ID);
buf.put((byte) (m_drain ? 1 : 0));
buf.put((byte) (m_reneg ? 1 : 0));
buf.put(m_producerClusterId);
buf.putInt(m_producerPID);

if (!m_drain) {
if (!m_drain && !m_reneg) {
m_response.flattenToBuffer(buf);
}

Expand All @@ -96,9 +106,10 @@ public void flattenToBuffer(ByteBuffer buf) throws IOException {
public int getSerializedSize() {
int size = super.getSerializedSize()
+ 1 // drain or not
+ 1 // reneg or not
+ 1 // producer cluster ID
+ 4; // producer partition ID
if (!m_drain) {
if (!m_drain && !m_reneg) {
size += m_response.getSerializedSize();
}
return size;
Expand Down
22 changes: 22 additions & 0 deletions src/frontend/org/voltdb/messaging/Dr2MultipartTaskMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class Dr2MultipartTaskMessage extends VoltMessage {

private int m_producerPID;
private boolean m_drain;
private boolean m_reneg;
private byte m_producerClusterId;
private short m_producerPartitionCnt;

Expand All @@ -46,6 +47,7 @@ public Dr2MultipartTaskMessage(StoredProcedureInvocation invocation, byte produc
m_lastExecutedMPUniqueID = lastExecutedMPUniqueID;
m_producerPID = producerPID;
m_drain = false;
m_reneg = false;
}

public static Dr2MultipartTaskMessage createDrainMessage(byte producerClusterId, int producerPID) {
Expand All @@ -54,6 +56,19 @@ public static Dr2MultipartTaskMessage createDrainMessage(byte producerClusterId,
msg.m_producerClusterId = producerClusterId;
msg.m_producerPartitionCnt = -1;
msg.m_drain = true;
msg.m_reneg = false;
msg.m_invocation = null;
msg.m_lastExecutedMPUniqueID = Long.MIN_VALUE;
return msg;
}

public static Dr2MultipartTaskMessage createRenegMessage(byte producerClusterId, int producerPID) {
final Dr2MultipartTaskMessage msg = new Dr2MultipartTaskMessage();
msg.m_producerPID = producerPID;
msg.m_producerClusterId = producerClusterId;
msg.m_producerPartitionCnt = -1;
msg.m_drain = false;
msg.m_reneg = true;
msg.m_invocation = null;
msg.m_lastExecutedMPUniqueID = Long.MIN_VALUE;
return msg;
Expand All @@ -71,6 +86,10 @@ public boolean isDrain() {
return m_drain;
}

public boolean isReneg() {
return m_reneg;
}

public byte getProducerClusterId() {
return m_producerClusterId;
}
Expand All @@ -87,6 +106,7 @@ public int getProducerPID() {
protected void initFromBuffer(ByteBuffer buf) throws IOException {
m_producerPID = buf.getInt();
m_drain = buf.get() == 1;
m_reneg = buf.get() == 1;
m_producerClusterId = buf.get();
m_producerPartitionCnt = buf.getShort();
m_lastExecutedMPUniqueID = buf.getLong();
Expand All @@ -103,6 +123,7 @@ public void flattenToBuffer(ByteBuffer buf) throws IOException {
buf.put(VoltDbMessageFactory.DR2_MULTIPART_TASK_ID);
buf.putInt(m_producerPID);
buf.put((byte) (m_drain ? 1 : 0));
buf.put((byte) (m_reneg ? 1 : 0));
buf.put(m_producerClusterId);
buf.putShort(m_producerPartitionCnt);
buf.putLong(m_lastExecutedMPUniqueID);
Expand All @@ -120,6 +141,7 @@ public int getSerializedSize() {
int size = super.getSerializedSize()
+ 4 // producer partition ID
+ 1 // is drain or not
+ 1 // is reneg or not
+ 1 // producer clusterId
+ 2 // producer partition count
+ 8; // last executed MP unique ID
Expand Down
29 changes: 16 additions & 13 deletions tests/frontend/org/voltdb/regressionsuites/LocalCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -2254,19 +2254,11 @@ public static void failIfValgrindErrors(File valgrindOutputFile) {
}

// Use this for optionally enabling localServer in one of the DR clusters (usually for debugging)
public static LocalCluster createLocalCluster(String schemaDDL, int siteCount, int hostCount, int kfactor, int clusterId,
int replicationPort, int remoteReplicationPort, String pathToVoltDBRoot, String jar,
boolean isReplica) throws IOException {
return createLocalCluster(schemaDDL, siteCount, hostCount, kfactor, clusterId, replicationPort, remoteReplicationPort,
pathToVoltDBRoot, jar, isReplica ? DrRoleType.REPLICA : DrRoleType.MASTER, false);
}

public static LocalCluster createLocalCluster(String schemaDDL, int siteCount, int hostCount, int kfactor, int clusterId,
int replicationPort, int remoteReplicationPort, String pathToVoltDBRoot, String jar,
DrRoleType drRole, boolean hasLocalServer) throws IOException {
VoltProjectBuilder builder = new VoltProjectBuilder();
return createLocalCluster(schemaDDL, siteCount, hostCount, kfactor, clusterId, replicationPort, remoteReplicationPort,
pathToVoltDBRoot, jar, drRole, hasLocalServer, builder);
pathToVoltDBRoot, jar, drRole, hasLocalServer, null, null);
}

public static LocalCluster createLocalCluster(String schemaDDL, int siteCount, int hostCount, int kfactor, int clusterId,
Expand All @@ -2278,16 +2270,19 @@ public static LocalCluster createLocalCluster(String schemaDDL, int siteCount, i

public static LocalCluster createLocalCluster(String schemaDDL, int siteCount, int hostCount, int kfactor, int clusterId,
int replicationPort, int remoteReplicationPort, String pathToVoltDBRoot, String jar,
DrRoleType drRole, boolean hasLocalServer, String callingMethodName) throws IOException {
VoltProjectBuilder builder = new VoltProjectBuilder();
DrRoleType drRole, boolean hasLocalServer, VoltProjectBuilder builder,
String callingMethodName) throws IOException {
return createLocalCluster(schemaDDL, siteCount, hostCount, kfactor, clusterId, replicationPort, remoteReplicationPort,
pathToVoltDBRoot, jar, drRole, hasLocalServer, builder, callingMethodName);
pathToVoltDBRoot, jar, drRole, hasLocalServer, builder, callingMethodName, false, null);
}

public static LocalCluster createLocalCluster(String schemaDDL, int siteCount, int hostCount, int kfactor, int clusterId,
int replicationPort, int remoteReplicationPort, String pathToVoltDBRoot, String jar,
DrRoleType drRole, boolean hasLocalServer, VoltProjectBuilder builder,
String callingMethodName) throws IOException {
String callingMethodName,
boolean enableSPIMigration,
Map<String, String> javaProps) throws IOException {
if (builder == null) builder = new VoltProjectBuilder();
LocalCluster lc = compileBuilder(schemaDDL, siteCount, hostCount, kfactor, clusterId,
replicationPort, remoteReplicationPort, pathToVoltDBRoot, jar, drRole, builder, callingMethodName);

Expand All @@ -2296,6 +2291,14 @@ public static LocalCluster createLocalCluster(String schemaDDL, int siteCount, i
lc.overrideAnyRequestForValgrind();
lc.setJavaProperty("DR_QUERY_INTERVAL", "5");
lc.setJavaProperty("DR_RECV_TIMEOUT", "5000");
// temporary, until we always enable SPI migration
if (enableSPIMigration) {
lc.setJavaProperty("DISABLE_MIGRATE_PARTITION_LEADER", "false");
}
if (javaProps != null)
for (Map.Entry<String, String> prop : javaProps.entrySet()) {
lc.setJavaProperty(prop.getKey(), prop.getValue());
}
if (!lc.isNewCli()) {
lc.setDeploymentAndVoltDBRoot(builder.getPathToDeployment(), pathToVoltDBRoot);
lc.startUp(false);
Expand Down
Loading

0 comments on commit b0980ed

Please sign in to comment.