diff --git a/src/frontend/org/voltdb/ClientInterface.java b/src/frontend/org/voltdb/ClientInterface.java index 0d166365dce..4d4f7820528 100644 --- a/src/frontend/org/voltdb/ClientInterface.java +++ b/src/frontend/org/voltdb/ClientInterface.java @@ -2201,7 +2201,7 @@ void startMigratePartitionLeader() { int partitionKey = -1; //MigratePartitionLeader is completed or there are hosts down. Stop MigratePartitionLeader service on this host - if (targetHostId == -1 || !voltDB.isClusterCompelte()) { + if (targetHostId == -1 || !voltDB.isClusterComplete()) { voltDB.scheduleWork( () -> {m_mailbox.deliver(new MigratePartitionLeaderMessage());}, 0, 0, TimeUnit.SECONDS); @@ -2255,13 +2255,20 @@ void startMigratePartitionLeader() { spi = MiscUtils.roundTripForCL(spi); } + long targetHSId = m_cartographer.getHSIDForPartitionHost(targetHostId, partitionId); //Info saved for the node failure handling MigratePartitionLeaderInfo spiInfo = new MigratePartitionLeaderInfo( m_cartographer.getHSIDForPartitionHost(hostId, partitionId), - m_cartographer.getHSIDForPartitionHost(targetHostId, partitionId), + targetHSId, partitionId); VoltZK.createMigratePartitionLeaderInfo(m_zk, spiInfo); + notifyPartitionMigrationStatus(partitionId, targetHSId, false); + + if (Boolean.getBoolean("TEST_MIGRATION_FAILURE")) { + Thread.sleep(100); + throw new IOException("failure simulation"); + } synchronized (m_executeTaskAdpater) { createTransaction(m_executeTaskAdpater.connectionId(), spi, @@ -2282,15 +2289,20 @@ void startMigratePartitionLeader() { //not necessary a failure. tmLog.warn(String.format("Fail to move the leader of partition %d to host %d. %s", partitionId, targetHostId, resp.getStatusString())); + notifyPartitionMigrationStatus(partitionId, targetHSId, true); } } catch (IOException | InterruptedException e) { tmLog.warn(String.format("errors in leader change for partition %d: %s", partitionId, e.getMessage())); + notifyPartitionMigrationStatus(partitionId, + m_cartographer.getHSIDForPartitionHost(targetHostId, partitionId), + true); } finally { //wait for the Cartographer to see the new partition leader. The leader promotion process should happen instantly. //If the new leader does not show up in 5 min, the cluster may have experienced host-down events. long remainingWaitTime = TimeUnit.MINUTES.toMillis(5); final long waitingInterval = TimeUnit.SECONDS.toMillis(1); boolean anyFailedHosts = false; + boolean migrationComplete = false; while (remainingWaitTime > 0) { try { Thread.sleep(waitingInterval); @@ -2298,13 +2310,15 @@ void startMigratePartitionLeader() { } remainingWaitTime -= waitingInterval; if (CoreUtils.getHostIdFromHSId(m_cartographer.getHSIdForMaster(partitionId)) == targetHostId) { + migrationComplete = true; break; } //some hosts may be down. - if (!voltDB.isClusterCompelte()) { + if (!voltDB.isClusterComplete()) { anyFailedHosts = true; - break; + // If the target host is still alive, migration is still going on. + if (!voltDB.getHostMessenger().getLiveHostIds().contains(targetHostId)) break; } } @@ -2314,6 +2328,40 @@ void startMigratePartitionLeader() { () -> {VoltZK.removeActionBlocker(m_zk, VoltZK.migratePartitionLeaderBlocker, tmLog);}, 5, 0, TimeUnit.SECONDS); } + + if (!migrationComplete) { + notifyPartitionMigrationStatus(partitionId, + m_cartographer.getHSIDForPartitionHost(targetHostId, partitionId), + true); + } + } + } + + private void notifyPartitionMigrationStatus(int partitionId, long targetHSId, boolean failed) { + for (final ClientInterfaceHandleManager cihm : m_cihm.values()) { + try { + cihm.connection.queueTask(new Runnable() { + @Override + public void run() { + if (cihm.repairCallback != null) { + if (failed) { + cihm.repairCallback.leaderMigrationFailed(partitionId, targetHSId); + } else { + cihm.repairCallback.leaderMigrationStarted(partitionId, targetHSId); + } + } + } + }); + } catch (UnsupportedOperationException ignore) { + // In case some internal connections don't implement queueTask() + if (cihm.repairCallback != null) { + if (failed) { + cihm.repairCallback.leaderMigrationFailed(partitionId, targetHSId); + } else { + cihm.repairCallback.leaderMigrationStarted(partitionId, targetHSId); + } + } + } } } diff --git a/src/frontend/org/voltdb/ClientInterfaceRepairCallback.java b/src/frontend/org/voltdb/ClientInterfaceRepairCallback.java index 9ed87bb6cf5..a7134e4f6ab 100644 --- a/src/frontend/org/voltdb/ClientInterfaceRepairCallback.java +++ b/src/frontend/org/voltdb/ClientInterfaceRepairCallback.java @@ -21,10 +21,24 @@ public interface ClientInterfaceRepairCallback { public void repairCompleted(int partitionId, long initiatorHSId); /** - * Callback invoked when managed leadership migration occurs. There should + * Callback invoked when managed leadership migration is initiated. + * @param partitionId The partition ID + * @param initiatorHSId The target leader's HSID + */ + default void leaderMigrationStarted(int partitionId, long initiatorHSId) {} + + /** + * Callback invoked when managed leadership migration completes. There should * be no transaction to repair in this case. * @param partitionId The partition ID * @param initiatorHSId The new leader's HSID */ default void leaderMigrated(int partitionId, long initiatorHSId) {} + + /** + * Callback invoked when managed leadership migration fails. + * @param partitionId The partition ID + * @param initiatorHSId The target leader's HSID + */ + default void leaderMigrationFailed(int partitionId, long initiatorHSId) {} } diff --git a/src/frontend/org/voltdb/RealVoltDB.java b/src/frontend/org/voltdb/RealVoltDB.java index 9e0299ffcee..10f5717b9a4 100644 --- a/src/frontend/org/voltdb/RealVoltDB.java +++ b/src/frontend/org/voltdb/RealVoltDB.java @@ -2193,7 +2193,7 @@ public void run() { GCInspector.instance.start(m_periodicPriorityWorkThread, m_gcStats); } - public boolean isClusterCompelte() { + public boolean isClusterComplete() { return (m_config.m_hostCount == m_messenger.getLiveHostIds().size()); } @@ -2205,7 +2205,7 @@ private void startMigratePartitionLeaderTask() { } //MigratePartitionLeader service will be started up only after the last rejoining has finished - if(!isClusterCompelte() || m_config.m_hostCount == 1 || m_configuredReplicationFactor == 0) { + if(!isClusterComplete() || m_config.m_hostCount == 1 || m_configuredReplicationFactor == 0) { return; } @@ -2221,6 +2221,7 @@ private void startMigratePartitionLeaderTask() { int hostId = it.next(); final int currentMasters = m_cartographer.getMasterCount(hostId); if (currentMasters > minimalNumberOfLeaders) { + hostLog.debug("Host " + hostId + " has more than " + minimalNumberOfLeaders + ". Sending migrate partition message"); m_messenger.send(CoreUtils.getHSIdFromHostAndSite(hostId, HostMessenger.CLIENT_INTERFACE_SITE_ID), msg); } diff --git a/src/frontend/org/voltdb/VoltZK.java b/src/frontend/org/voltdb/VoltZK.java index d2f3e856588..307914513b8 100644 --- a/src/frontend/org/voltdb/VoltZK.java +++ b/src/frontend/org/voltdb/VoltZK.java @@ -61,7 +61,6 @@ public class VoltZK { public static final String perPartitionTxnIds = "/db/perPartitionTxnIds"; public static final String operationMode = "/db/operation_mode"; public static final String exportGenerations = "/db/export_generations"; - public static final String importerBase = "/db/import"; // configuration (ports, interfaces, ...) public static final String cluster_metadata = "/db/cluster_metadata"; @@ -99,6 +98,7 @@ public static enum MailboxType { // root for MigratePartitionLeader information nodes public static final String migrate_partition_leader_info = "/core/migrate_partition_leader_info"; + public static final String drConsumerPartitionMigration = "/db/dr_consumer_partition_migration"; public static final String iv2masters = "/db/iv2masters"; public static final String iv2appointees = "/db/iv2appointees"; @@ -210,6 +210,7 @@ private static final void printZKDir(ZooKeeper zk, String dir, StringBuilder bui root, mailboxes, cluster_metadata, + drConsumerPartitionMigration, operationMode, iv2masters, iv2appointees, diff --git a/src/frontend/org/voltdb/iv2/InitiatorMailbox.java b/src/frontend/org/voltdb/iv2/InitiatorMailbox.java index b7e29619892..085ef0715a3 100644 --- a/src/frontend/org/voltdb/iv2/InitiatorMailbox.java +++ b/src/frontend/org/voltdb/iv2/InitiatorMailbox.java @@ -387,7 +387,7 @@ private void initiateSPIMigrationIfRequested(Iv2InitiateTaskMessage msg) { } //one more check to make sure all the hosts are up before any changes are made - if (!db.isClusterCompelte()) { + if (!db.isClusterComplete()) { return; } diff --git a/src/frontend/org/voltdb/messaging/Dr2MultipartResponseMessage.java b/src/frontend/org/voltdb/messaging/Dr2MultipartResponseMessage.java index 158ef7f666e..0ec5afe505f 100644 --- a/src/frontend/org/voltdb/messaging/Dr2MultipartResponseMessage.java +++ b/src/frontend/org/voltdb/messaging/Dr2MultipartResponseMessage.java @@ -26,6 +26,7 @@ public class Dr2MultipartResponseMessage extends VoltMessage { private boolean m_drain; + private boolean m_reneg; private byte m_producerClusterId; private int m_producerPID; private ClientResponseImpl m_response; @@ -36,14 +37,17 @@ public class Dr2MultipartResponseMessage extends VoltMessage { public Dr2MultipartResponseMessage(byte producerClusterId, int producerPID, ClientResponseImpl response) { m_drain = false; + m_reneg = false; m_producerClusterId = producerClusterId; m_producerPID = producerPID; m_response = response; } - public static Dr2MultipartResponseMessage createDrainMessage(byte producerClusterId, int producerPID) { + public static Dr2MultipartResponseMessage createDrainOrRenegMessage(byte producerClusterId, int producerPID, + boolean withDrainFlag, boolean withRenegFlag) { final Dr2MultipartResponseMessage msg = new Dr2MultipartResponseMessage(); - msg.m_drain = true; + msg.m_drain = withDrainFlag; + msg.m_reneg = withRenegFlag; msg.m_producerClusterId = producerClusterId; msg.m_producerPID = producerPID; msg.m_response = null; @@ -66,9 +70,14 @@ public boolean isDrain() { return m_drain; } + public boolean isReneg() { + return m_reneg; + } + @Override protected void initFromBuffer(ByteBuffer buf) throws IOException { m_drain = buf.get() == 1; + m_reneg = buf.get() == 1; m_producerClusterId = buf.get(); m_producerPID = buf.getInt(); if (buf.remaining() > 0) { @@ -81,10 +90,11 @@ protected void initFromBuffer(ByteBuffer buf) throws IOException { public void flattenToBuffer(ByteBuffer buf) throws IOException { buf.put(VoltDbMessageFactory.DR2_MULTIPART_RESPONSE_ID); buf.put((byte) (m_drain ? 1 : 0)); + buf.put((byte) (m_reneg ? 1 : 0)); buf.put(m_producerClusterId); buf.putInt(m_producerPID); - if (!m_drain) { + if (!m_drain && !m_reneg) { m_response.flattenToBuffer(buf); } @@ -96,9 +106,10 @@ public void flattenToBuffer(ByteBuffer buf) throws IOException { public int getSerializedSize() { int size = super.getSerializedSize() + 1 // drain or not + + 1 // reneg or not + 1 // producer cluster ID + 4; // producer partition ID - if (!m_drain) { + if (!m_drain && !m_reneg) { size += m_response.getSerializedSize(); } return size; diff --git a/src/frontend/org/voltdb/messaging/Dr2MultipartTaskMessage.java b/src/frontend/org/voltdb/messaging/Dr2MultipartTaskMessage.java index 593682a5617..0bb8ee62d9a 100644 --- a/src/frontend/org/voltdb/messaging/Dr2MultipartTaskMessage.java +++ b/src/frontend/org/voltdb/messaging/Dr2MultipartTaskMessage.java @@ -27,6 +27,7 @@ public class Dr2MultipartTaskMessage extends VoltMessage { private int m_producerPID; private boolean m_drain; + private boolean m_reneg; private byte m_producerClusterId; private short m_producerPartitionCnt; @@ -46,6 +47,7 @@ public Dr2MultipartTaskMessage(StoredProcedureInvocation invocation, byte produc m_lastExecutedMPUniqueID = lastExecutedMPUniqueID; m_producerPID = producerPID; m_drain = false; + m_reneg = false; } public static Dr2MultipartTaskMessage createDrainMessage(byte producerClusterId, int producerPID) { @@ -54,6 +56,19 @@ public static Dr2MultipartTaskMessage createDrainMessage(byte producerClusterId, msg.m_producerClusterId = producerClusterId; msg.m_producerPartitionCnt = -1; msg.m_drain = true; + msg.m_reneg = false; + msg.m_invocation = null; + msg.m_lastExecutedMPUniqueID = Long.MIN_VALUE; + return msg; + } + + public static Dr2MultipartTaskMessage createRenegMessage(byte producerClusterId, int producerPID) { + final Dr2MultipartTaskMessage msg = new Dr2MultipartTaskMessage(); + msg.m_producerPID = producerPID; + msg.m_producerClusterId = producerClusterId; + msg.m_producerPartitionCnt = -1; + msg.m_drain = false; + msg.m_reneg = true; msg.m_invocation = null; msg.m_lastExecutedMPUniqueID = Long.MIN_VALUE; return msg; @@ -71,6 +86,10 @@ public boolean isDrain() { return m_drain; } + public boolean isReneg() { + return m_reneg; + } + public byte getProducerClusterId() { return m_producerClusterId; } @@ -87,6 +106,7 @@ public int getProducerPID() { protected void initFromBuffer(ByteBuffer buf) throws IOException { m_producerPID = buf.getInt(); m_drain = buf.get() == 1; + m_reneg = buf.get() == 1; m_producerClusterId = buf.get(); m_producerPartitionCnt = buf.getShort(); m_lastExecutedMPUniqueID = buf.getLong(); @@ -103,6 +123,7 @@ public void flattenToBuffer(ByteBuffer buf) throws IOException { buf.put(VoltDbMessageFactory.DR2_MULTIPART_TASK_ID); buf.putInt(m_producerPID); buf.put((byte) (m_drain ? 1 : 0)); + buf.put((byte) (m_reneg ? 1 : 0)); buf.put(m_producerClusterId); buf.putShort(m_producerPartitionCnt); buf.putLong(m_lastExecutedMPUniqueID); @@ -120,6 +141,7 @@ public int getSerializedSize() { int size = super.getSerializedSize() + 4 // producer partition ID + 1 // is drain or not + + 1 // is reneg or not + 1 // producer clusterId + 2 // producer partition count + 8; // last executed MP unique ID diff --git a/tests/frontend/org/voltdb/regressionsuites/LocalCluster.java b/tests/frontend/org/voltdb/regressionsuites/LocalCluster.java index e827af188da..3032e4ed5e2 100644 --- a/tests/frontend/org/voltdb/regressionsuites/LocalCluster.java +++ b/tests/frontend/org/voltdb/regressionsuites/LocalCluster.java @@ -2254,19 +2254,11 @@ public static void failIfValgrindErrors(File valgrindOutputFile) { } // Use this for optionally enabling localServer in one of the DR clusters (usually for debugging) - public static LocalCluster createLocalCluster(String schemaDDL, int siteCount, int hostCount, int kfactor, int clusterId, - int replicationPort, int remoteReplicationPort, String pathToVoltDBRoot, String jar, - boolean isReplica) throws IOException { - return createLocalCluster(schemaDDL, siteCount, hostCount, kfactor, clusterId, replicationPort, remoteReplicationPort, - pathToVoltDBRoot, jar, isReplica ? DrRoleType.REPLICA : DrRoleType.MASTER, false); - } - public static LocalCluster createLocalCluster(String schemaDDL, int siteCount, int hostCount, int kfactor, int clusterId, int replicationPort, int remoteReplicationPort, String pathToVoltDBRoot, String jar, DrRoleType drRole, boolean hasLocalServer) throws IOException { - VoltProjectBuilder builder = new VoltProjectBuilder(); return createLocalCluster(schemaDDL, siteCount, hostCount, kfactor, clusterId, replicationPort, remoteReplicationPort, - pathToVoltDBRoot, jar, drRole, hasLocalServer, builder); + pathToVoltDBRoot, jar, drRole, hasLocalServer, null, null); } public static LocalCluster createLocalCluster(String schemaDDL, int siteCount, int hostCount, int kfactor, int clusterId, @@ -2278,16 +2270,19 @@ public static LocalCluster createLocalCluster(String schemaDDL, int siteCount, i public static LocalCluster createLocalCluster(String schemaDDL, int siteCount, int hostCount, int kfactor, int clusterId, int replicationPort, int remoteReplicationPort, String pathToVoltDBRoot, String jar, - DrRoleType drRole, boolean hasLocalServer, String callingMethodName) throws IOException { - VoltProjectBuilder builder = new VoltProjectBuilder(); + DrRoleType drRole, boolean hasLocalServer, VoltProjectBuilder builder, + String callingMethodName) throws IOException { return createLocalCluster(schemaDDL, siteCount, hostCount, kfactor, clusterId, replicationPort, remoteReplicationPort, - pathToVoltDBRoot, jar, drRole, hasLocalServer, builder, callingMethodName); + pathToVoltDBRoot, jar, drRole, hasLocalServer, builder, callingMethodName, false, null); } public static LocalCluster createLocalCluster(String schemaDDL, int siteCount, int hostCount, int kfactor, int clusterId, int replicationPort, int remoteReplicationPort, String pathToVoltDBRoot, String jar, DrRoleType drRole, boolean hasLocalServer, VoltProjectBuilder builder, - String callingMethodName) throws IOException { + String callingMethodName, + boolean enableSPIMigration, + Map javaProps) throws IOException { + if (builder == null) builder = new VoltProjectBuilder(); LocalCluster lc = compileBuilder(schemaDDL, siteCount, hostCount, kfactor, clusterId, replicationPort, remoteReplicationPort, pathToVoltDBRoot, jar, drRole, builder, callingMethodName); @@ -2296,6 +2291,14 @@ public static LocalCluster createLocalCluster(String schemaDDL, int siteCount, i lc.overrideAnyRequestForValgrind(); lc.setJavaProperty("DR_QUERY_INTERVAL", "5"); lc.setJavaProperty("DR_RECV_TIMEOUT", "5000"); + // temporary, until we always enable SPI migration + if (enableSPIMigration) { + lc.setJavaProperty("DISABLE_MIGRATE_PARTITION_LEADER", "false"); + } + if (javaProps != null) + for (Map.Entry prop : javaProps.entrySet()) { + lc.setJavaProperty(prop.getKey(), prop.getValue()); + } if (!lc.isNewCli()) { lc.setDeploymentAndVoltDBRoot(builder.getPathToDeployment(), pathToVoltDBRoot); lc.startUp(false); diff --git a/tests/frontend/org/voltdb/regressionsuites/statistics/TestStatisticsSuiteDRStats.java b/tests/frontend/org/voltdb/regressionsuites/statistics/TestStatisticsSuiteDRStats.java index 826b6be74ab..15596460c25 100644 --- a/tests/frontend/org/voltdb/regressionsuites/statistics/TestStatisticsSuiteDRStats.java +++ b/tests/frontend/org/voltdb/regressionsuites/statistics/TestStatisticsSuiteDRStats.java @@ -38,6 +38,7 @@ import org.voltdb.client.ClientFactory; import org.voltdb.client.ClientResponse; import org.voltdb.client.ProcCallException; +import org.voltdb.compiler.deploymentfile.DrRoleType; import org.voltdb.dr2.DRProtocol; import org.voltdb.regressionsuites.LocalCluster; import org.voltdb.regressionsuites.StatisticsTestSuiteBase; @@ -158,7 +159,7 @@ public void testDRPartitionStatisticsWithConsumers() throws Exception { int CONSUMER_CLUSTER_COUNT = 2; for (int n = 1; n <= CONSUMER_CLUSTER_COUNT; n++) { LocalCluster consumerCluster = LocalCluster.createLocalCluster(drSchema, SITES, HOSTS, KFACTOR, n, - REPLICATION_PORT + 100 * n, REPLICATION_PORT, secondaryRoot, jarName, true); + REPLICATION_PORT + 100 * n, REPLICATION_PORT, secondaryRoot, jarName, DrRoleType.REPLICA, false); ClientConfig clientConfig = new ClientConfig(); clientConfig.setProcedureCallTimeout(10 * 60 * 1000); Client consumerClient = createClient(clientConfig, consumerCluster);