diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java
new file mode 100644
index 00000000000..95d021a7b53
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+/**
+ * Enumeration representing the type of cluster in an HA group configuration. Used to distinguish
+ * between local and peer clusters when subscribing to HA group state change notifications.
+ */
+public enum ClusterType {
+ /**
+ * Represents the local cluster where the client is running.
+ */
+ LOCAL,
+
+ /**
+ * Represents the peer cluster in the HA group configuration.
+ */
+ PEER
+}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
new file mode 100644
index 00000000000..0fbdbf5b5ef
--- /dev/null
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
+
+/**
+ * Interface for external clients who want to be notified of HA group state transitions.
+ *
+ * Listeners can subscribe to be notified when:
+ *
+ *
+ * - Specific state transitions occur (from one state to another)
+ * - Any transition to a target state occurs (from any state to a specific state)
+ *
+ *
+ * Notifications are provided for both local and peer cluster state changes, distinguished by the
+ * {@link ClusterType} parameter.
+ *
+ * @see HAGroupStoreManager#subscribeToTargetState
+ */
+public interface HAGroupStateListener {
+
+ /**
+ * Called when an HA group state transition occurs.
+ *
+ * Implementations should be fast and non-blocking to avoid impacting the HA group state
+ * management system. If heavy processing is required, consider delegating to a separate thread.
+ *
+ * @param haGroupName the name of the HA group that transitioned
+ * @param toState the new state after the transition
+ * @param modifiedTime the time the state transition occurred
+ * @param clusterType whether this transition occurred on the local or peer cluster
+ * @throws Exception implementations may throw exceptions, but they will be logged and will not
+ * prevent other listeners from being notified
+ */
+ void onStateChange(String haGroupName, HAGroupState toState, long modifiedTime,
+ ClusterType clusterType);
+}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index 64d76b9e6d7..70bd4b43f0c 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -42,10 +42,13 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
@@ -75,14 +78,13 @@
*/
public class HAGroupStoreClient implements Closeable {
- public static final String ZK_CONSISTENT_HA_NAMESPACE =
- "phoenix" + ZKPaths.PATH_SEPARATOR + "consistentHA";
+ public static final String ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE =
+ "phoenix" + ZKPaths.PATH_SEPARATOR + "consistentHA" + ZKPaths.PATH_SEPARATOR + "groupState";
private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS = 30000L;
// Multiplier for ZK session timeout to account for time it will take for HMaster to abort
// the region server in case ZK connection is lost from the region server.
- private static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1;
- private static final String CACHE_TYPE_LOCAL = "LOCAL";
- private static final String CACHE_TYPE_PEER = "PEER";
+ @VisibleForTesting
+ static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1;
private PhoenixHAAdmin phoenixHaAdmin;
private PhoenixHAAdmin peerPhoenixHaAdmin;
private static final Logger LOGGER = LoggerFactory.getLogger(HAGroupStoreClient.class);
@@ -107,6 +109,14 @@ public class HAGroupStoreClient implements Closeable {
private final PathChildrenCacheListener peerCustomPathChildrenCacheListener;
// Wait time for sync mode
private final long waitTimeForSyncModeInMs;
+ // State tracking for transition detection
+ private volatile HAGroupState lastKnownLocalState;
+ private volatile HAGroupState lastKnownPeerState;
+
+ // Subscription storage for HA group state change notifications per client instance
+ // Map key format: "clusterType:targetState" -> Set
+ private final ConcurrentHashMap> targetStateSubscribers = new ConcurrentHashMap<>();
// Policy for the HA group
private HighAvailabilityPolicy policy;
private ClusterRole clusterRole;
@@ -130,7 +140,7 @@ public static HAGroupStoreClient getInstance(Configuration conf, String haGroupN
* @return HAGroupStoreClient instance
*/
public static HAGroupStoreClient getInstanceForZkUrl(Configuration conf, String haGroupName,
- String zkUrl) throws SQLException {
+ String zkUrl) {
Preconditions.checkNotNull(haGroupName, "haGroupName cannot be null");
String localZkUrl = Objects.toString(zkUrl, getLocalZkUrl(conf));
Preconditions.checkNotNull(localZkUrl, "zkUrl cannot be null");
@@ -157,7 +167,7 @@ public static HAGroupStoreClient getInstanceForZkUrl(Configuration conf, String
/**
* Get the list of HAGroupNames from system table. We can also get the list of HAGroupNames from
- * the system table by providing the zkUrl in where clause but we need to match the formatted
+ * the system table by providing the zkUrl in where clause, but we need to match the formatted
* zkUrl with the zkUrl in the system table so that matching is done correctly.
* @param zkUrl for connecting to Table
* @return the list of HAGroupNames
@@ -204,10 +214,11 @@ public static List getHAGroupNames(String zkUrl) throws SQLException {
// Initialize HAGroupStoreClient attributes
initializeHAGroupStoreClientAttributes(haGroupName);
// Initialize Phoenix HA Admin
- this.phoenixHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf, ZK_CONSISTENT_HA_NAMESPACE);
+ this.phoenixHaAdmin =
+ new PhoenixHAAdmin(this.zkUrl, conf, ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
// Initialize local cache
this.pathChildrenCache =
- initializePathChildrenCache(phoenixHaAdmin, pathChildrenCacheListener, CACHE_TYPE_LOCAL);
+ initializePathChildrenCache(phoenixHaAdmin, pathChildrenCacheListener, ClusterType.LOCAL);
// Initialize ZNode if not present in ZK
initializeZNodeIfNeeded();
if (this.pathChildrenCache != null) {
@@ -237,7 +248,6 @@ public void rebuild() throws Exception {
initializeHAGroupStoreClientAttributes(haGroupName);
initializeZNodeIfNeeded();
maybeInitializePeerPathChildrenCache();
-
// NOTE: this is a BLOCKING method.
// Completely rebuild the internal cache by querying for all needed data
// WITHOUT generating any events to send to listeners.
@@ -259,7 +269,7 @@ public HAGroupStoreRecord getHAGroupStoreRecord() throws IOException {
if (!isHealthy) {
throw new IOException("HAGroupStoreClient is not healthy");
}
- return fetchCacheRecord(this.pathChildrenCache, CACHE_TYPE_LOCAL).getLeft();
+ return fetchCacheRecord(this.pathChildrenCache, ClusterType.LOCAL).getLeft();
}
/**
@@ -279,7 +289,7 @@ public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState haGroupStat
throw new IOException("HAGroupStoreClient is not healthy");
}
Pair cacheRecord =
- fetchCacheRecord(this.pathChildrenCache, CACHE_TYPE_LOCAL);
+ fetchCacheRecord(this.pathChildrenCache, ClusterType.LOCAL);
HAGroupStoreRecord currentHAGroupStoreRecord = cacheRecord.getLeft();
Stat currentHAGroupStoreRecordStat = cacheRecord.getRight();
if (currentHAGroupStoreRecord == null) {
@@ -291,9 +301,29 @@ public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState haGroupStat
isUpdateNeeded(currentHAGroupStoreRecord.getHAGroupState(),
currentHAGroupStoreRecordStat.getMtime(), haGroupState)
) {
+ // We maintain last sync time as the last time cluster was in sync state.
+ // If state changes from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, record that time
+ // Once state changes back to ACTIVE_IN_SYNC or the role is
+ // NOT ACTIVE or ACTIVE_TO_STANDBY
+ // set the time to null to mark that we are current(or we don't have any reader).
+ // TODO: Verify that for reader this is the correct approach.
+ Long lastSyncTimeInMs = currentHAGroupStoreRecord.getLastSyncStateTimeInMs();
+ ClusterRole clusterRole = haGroupState.getClusterRole();
+ if (
+ currentHAGroupStoreRecord.getHAGroupState()
+ == HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC
+ && haGroupState == HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC
+ ) {
+ lastSyncTimeInMs = System.currentTimeMillis();
+ } else if (
+ haGroupState == HAGroupState.ACTIVE_IN_SYNC || !(ClusterRole.ACTIVE.equals(clusterRole)
+ || ClusterRole.ACTIVE_TO_STANDBY.equals(clusterRole))
+ ) {
+ lastSyncTimeInMs = null;
+ }
HAGroupStoreRecord newHAGroupStoreRecord =
new HAGroupStoreRecord(currentHAGroupStoreRecord.getProtocolVersion(),
- currentHAGroupStoreRecord.getHaGroupName(), haGroupState);
+ currentHAGroupStoreRecord.getHaGroupName(), haGroupState, lastSyncTimeInMs);
// TODO: Check if cluster role is changing, if so, we need to update
// the system table first
// Lock the row in System Table and make sure update is reflected
@@ -326,11 +356,11 @@ public ClusterRoleRecord getClusterRoleRecord() throws IOException {
* @return HAGroupStoreRecord for the specified HA group name, or null if not found
* @throws IOException if the client is not healthy
*/
- private HAGroupStoreRecord getHAGroupStoreRecordFromPeer() throws IOException {
+ public HAGroupStoreRecord getHAGroupStoreRecordFromPeer() throws IOException {
if (!isHealthy) {
throw new IOException("HAGroupStoreClient is not healthy");
}
- return fetchCacheRecord(this.peerPathChildrenCache, CACHE_TYPE_PEER).getLeft();
+ return fetchCacheRecord(this.peerPathChildrenCache, ClusterType.PEER).getLeft();
}
private void initializeZNodeIfNeeded()
@@ -340,11 +370,18 @@ private void initializeZNodeIfNeeded()
Pair cacheRecordFromZK =
phoenixHaAdmin.getHAGroupStoreRecordInZooKeeper(this.haGroupName);
HAGroupStoreRecord haGroupStoreRecord = cacheRecordFromZK.getLeft();
+ HAGroupState defaultHAGroupState = this.clusterRole.getDefaultHAGroupState();
+ // Initialize lastSyncTimeInMs only if we start in ACTIVE_NOT_IN_SYNC state
+ // and ZNode is not already present
+ Long lastSyncTimeInMs = defaultHAGroupState.equals(HAGroupState.ACTIVE_NOT_IN_SYNC)
+ ? System.currentTimeMillis()
+ : null;
HAGroupStoreRecord newHAGroupStoreRecord =
new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName,
- this.clusterRole.getDefaultHAGroupState());
+ this.clusterRole.getDefaultHAGroupState(), lastSyncTimeInMs);
// Only update current ZNode if it doesn't have the same role as present in System Table.
// If not exists, then create ZNode
+ // TODO: Discuss if this approach is what reader needs.
if (
haGroupStoreRecord != null && !haGroupStoreRecord.getClusterRole().equals(this.clusterRole)
) {
@@ -414,17 +451,17 @@ private void maybeInitializePeerPathChildrenCache() {
try {
// Setup peer connection if needed (first time or ZK Url changed)
if (
- peerPathChildrenCache == null || (peerPhoenixHaAdmin != null
- && !StringUtils.equals(this.peerZKUrl, peerPhoenixHaAdmin.getZkUrl()))
+ peerPathChildrenCache == null || peerPhoenixHaAdmin != null
+ && !StringUtils.equals(this.peerZKUrl, peerPhoenixHaAdmin.getZkUrl())
) {
// Clean up existing peer connection if it exists
closePeerConnection();
// Setup new peer connection
this.peerPhoenixHaAdmin =
- new PhoenixHAAdmin(this.peerZKUrl, conf, ZK_CONSISTENT_HA_NAMESPACE);
+ new PhoenixHAAdmin(this.peerZKUrl, conf, ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
// Create new PeerPathChildrenCache
this.peerPathChildrenCache = initializePathChildrenCache(peerPhoenixHaAdmin,
- this.peerCustomPathChildrenCacheListener, CACHE_TYPE_PEER);
+ this.peerCustomPathChildrenCacheListener, ClusterType.PEER);
}
} catch (Exception e) {
closePeerConnection();
@@ -442,7 +479,7 @@ private void maybeInitializePeerPathChildrenCache() {
}
private PathChildrenCache initializePathChildrenCache(PhoenixHAAdmin admin,
- PathChildrenCacheListener customListener, String cacheType) {
+ PathChildrenCacheListener customListener, ClusterType cacheType) {
LOGGER.info("Initializing {} PathChildrenCache with URL {}", cacheType, admin.getZkUrl());
PathChildrenCache newPathChildrenCache = null;
try {
@@ -468,29 +505,37 @@ private PathChildrenCache initializePathChildrenCache(PhoenixHAAdmin admin,
}
}
- private PathChildrenCacheListener createCacheListener(CountDownLatch latch, String cacheType) {
+ private PathChildrenCacheListener createCacheListener(CountDownLatch latch,
+ ClusterType cacheType) {
return (client, event) -> {
final ChildData childData = event.getData();
- HAGroupStoreRecord eventRecord = extractHAGroupStoreRecordOrNull(childData);
+ Pair eventRecordAndStat =
+ extractHAGroupStoreRecordOrNull(childData);
+ HAGroupStoreRecord eventRecord = eventRecordAndStat.getLeft();
+ Stat eventStat = eventRecordAndStat.getRight();
LOGGER.info("HAGroupStoreClient Cache {} received event {} type {} at {}", cacheType,
eventRecord, event.getType(), System.currentTimeMillis());
switch (event.getType()) {
- // TODO: Add support for event watcher for HAGroupStoreRecord
- // case CHILD_ADDED:
- // case CHILD_UPDATED:
- // case CHILD_REMOVED:
+ case CHILD_ADDED:
+ case CHILD_UPDATED:
+ if (eventRecord != null) {
+ handleStateChange(eventRecord, eventStat, cacheType);
+ }
+ break;
+ case CHILD_REMOVED:
+ break;
case INITIALIZED:
latch.countDown();
break;
case CONNECTION_LOST:
case CONNECTION_SUSPENDED:
- if (CACHE_TYPE_LOCAL.equals(cacheType)) {
+ if (ClusterType.LOCAL.equals(cacheType)) {
isHealthy = false;
}
LOGGER.warn("{} HAGroupStoreClient cache connection lost/suspended", cacheType);
break;
case CONNECTION_RECONNECTED:
- if (CACHE_TYPE_LOCAL.equals(cacheType)) {
+ if (ClusterType.LOCAL.equals(cacheType)) {
isHealthy = true;
}
LOGGER.info("{} HAGroupStoreClient cache connection reconnected", cacheType);
@@ -503,7 +548,7 @@ private PathChildrenCacheListener createCacheListener(CountDownLatch latch, Stri
}
private Pair fetchCacheRecord(PathChildrenCache cache,
- String cacheType) {
+ ClusterType cacheType) {
if (cache == null) {
LOGGER.warn("{} HAGroupStoreClient cache is null, returning null", cacheType);
return Pair.of(null, null);
@@ -516,7 +561,7 @@ private Pair fetchCacheRecord(PathChildrenCache cache,
return result;
}
- if (cacheType.equals(CACHE_TYPE_PEER)) {
+ if (cacheType.equals(ClusterType.PEER)) {
return Pair.of(null, null);
}
// If no record found, try to rebuild and fetch again
@@ -533,23 +578,23 @@ private Pair fetchCacheRecord(PathChildrenCache cache,
}
private Pair extractRecordAndStat(PathChildrenCache cache,
- String targetPath, String cacheType) {
+ String targetPath, ClusterType cacheType) {
ChildData childData = cache.getCurrentData(targetPath);
if (childData != null) {
- HAGroupStoreRecord record = extractHAGroupStoreRecordOrNull(childData);
- Stat currentStat = childData.getStat();
- LOGGER.info("Built {} cluster record: {}", cacheType, record);
- return Pair.of(record, currentStat);
+ Pair recordAndStat = extractHAGroupStoreRecordOrNull(childData);
+ LOGGER.info("Built {} cluster record: {}", cacheType, recordAndStat.getLeft());
+ return recordAndStat;
}
return Pair.of(null, null);
}
- private HAGroupStoreRecord extractHAGroupStoreRecordOrNull(final ChildData childData) {
+ private Pair
+ extractHAGroupStoreRecordOrNull(final ChildData childData) {
if (childData != null) {
byte[] data = childData.getData();
- return HAGroupStoreRecord.fromJson(data).orElse(null);
+ return Pair.of(HAGroupStoreRecord.fromJson(data).orElse(null), childData.getStat());
}
- return null;
+ return Pair.of(null, null);
}
/**
@@ -613,4 +658,117 @@ private boolean isUpdateNeeded(HAGroupStoreRecord.HAGroupState currentHAGroupSta
return ((System.currentTimeMillis() - currentHAGroupStoreRecordMtime) > waitTime);
}
+ // ========== HA Group State Change Subscription Methods ==========
+
+ /**
+ * Subscribe to be notified when any transition to a target state occurs.
+ * @param targetState the target state to watch for
+ * @param clusterType whether to monitor local or peer cluster
+ * @param listener the listener to notify when any transition to the target state occurs
+ */
+ public void subscribeToTargetState(HAGroupState targetState, ClusterType clusterType,
+ HAGroupStateListener listener) {
+ String key = buildTargetStateKey(clusterType, targetState);
+ targetStateSubscribers.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>()).add(listener);
+ LOGGER.info("Subscribed listener to target state {} for HA group {} on {} cluster", targetState,
+ haGroupName, clusterType);
+ }
+
+ /**
+ * Unsubscribe from target state notifications.
+ * @param targetState the target state
+ * @param clusterType whether monitoring local or peer cluster
+ * @param listener the listener to remove
+ */
+ public void unsubscribeFromTargetState(HAGroupState targetState, ClusterType clusterType,
+ HAGroupStateListener listener) {
+ String key = buildTargetStateKey(clusterType, targetState);
+ CopyOnWriteArraySet listeners = targetStateSubscribers.get(key);
+ if (listeners != null && listeners.remove(listener)) {
+ if (listeners.isEmpty()) {
+ targetStateSubscribers.remove(key);
+ }
+ LOGGER.info("Unsubscribed listener from target state {} for HA group {} on {} cluster",
+ targetState, haGroupName, clusterType);
+ }
+ }
+
+ /**
+ * Handle state change detection and notify subscribers if a transition occurred.
+ * @param newRecord the new HA group store record
+ * @param cacheType the type of cache (LOCAL or PEER)
+ */
+ private void handleStateChange(HAGroupStoreRecord newRecord, Stat newStat,
+ ClusterType cacheType) {
+ HAGroupState newState = newRecord.getHAGroupState();
+ HAGroupState oldState;
+ ClusterType clusterType;
+
+ if (ClusterType.LOCAL.equals(cacheType)) {
+ oldState = lastKnownLocalState;
+ lastKnownLocalState = newState;
+ clusterType = ClusterType.LOCAL;
+ } else {
+ oldState = lastKnownPeerState;
+ lastKnownPeerState = newState;
+ clusterType = ClusterType.PEER;
+ }
+
+ // Only notify if there's an actual state transition or initial state
+ if (oldState == null || !oldState.equals(newState)) {
+ LOGGER.info("Detected state transition for HA group {} from {} to {} on {} cluster",
+ haGroupName, oldState, newState, clusterType);
+ notifySubscribers(oldState, newState, newStat.getMtime(), clusterType);
+ }
+ }
+
+ /**
+ * Notify all relevant subscribers of a state transition.
+ * @param fromState the state transitioned from
+ * @param toState the state transitioned to
+ * @param clusterType the cluster type where the transition occurred
+ */
+ private void notifySubscribers(HAGroupState fromState, HAGroupState toState, long modifiedTime,
+ ClusterType clusterType) {
+ LOGGER.debug(
+ "Notifying subscribers of state transition for HA group {} from {} to {} on {} " + "cluster",
+ haGroupName, fromState, toState, clusterType);
+ String targetStateKey = buildTargetStateKey(clusterType, toState);
+
+ // Collect all listeners that need to be notified
+ Set listenersToNotify = new HashSet<>();
+
+ // Find target state subscribers
+ CopyOnWriteArraySet targetListeners =
+ targetStateSubscribers.get(targetStateKey);
+ if (targetListeners != null) {
+ listenersToNotify.addAll(targetListeners);
+ }
+
+ // Notify all listeners with error isolation
+ if (!listenersToNotify.isEmpty()) {
+ LOGGER.info("Notifying {} listeners of state transitionfor HA group {} from {} to {} on {} "
+ + "cluster", listenersToNotify.size(), haGroupName, fromState, toState, clusterType);
+
+ for (HAGroupStateListener listener : listenersToNotify) {
+ try {
+ listener.onStateChange(haGroupName, toState, modifiedTime, clusterType);
+ } catch (Exception e) {
+ LOGGER.error("Error notifying listener of state transition for HA group {} from {} to "
+ + "{} on {} cluster", haGroupName, fromState, toState, clusterType, e);
+ // Continue notifying other listeners
+ }
+ }
+ }
+ }
+
+ // ========== Helper Methods ==========
+
+ /**
+ * Build key for target state subscriptions.
+ */
+ private String buildTargetStateKey(ClusterType clusterType, HAGroupState targetState) {
+ return clusterType + ":" + targetState;
+ }
+
}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
index 1a7fe0985bc..11a5ce7a40a 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
@@ -37,7 +37,8 @@
/**
* Implementation of HAGroupStoreManager that uses HAGroupStoreClient. Manages all
- * HAGroupStoreClient instances.
+ * HAGroupStoreClient instances and provides passthrough functionality for HA group state change
+ * notifications.
*/
public class HAGroupStoreManager {
private static volatile HAGroupStoreManager haGroupStoreManagerInstance;
@@ -99,16 +100,12 @@ public List getHAGroupNames() throws SQLException {
* @return true if mutation is blocked, false otherwise.
* @throws IOException when HAGroupStoreClient is not healthy.
*/
- public boolean isMutationBlocked(String haGroupName) throws IOException, SQLException {
+ public boolean isMutationBlocked(String haGroupName) throws IOException {
if (mutationBlockEnabled) {
- HAGroupStoreClient haGroupStoreClient =
- HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl);
- if (haGroupStoreClient != null) {
- return haGroupStoreClient.getHAGroupStoreRecord() != null
- && haGroupStoreClient.getHAGroupStoreRecord().getClusterRole() != null
- && haGroupStoreClient.getHAGroupStoreRecord().getClusterRole().isMutationBlocked();
- }
- throw new IOException("HAGroupStoreClient is not initialized");
+ HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+ HAGroupStoreRecord recordWithMetadata = haGroupStoreClient.getHAGroupStoreRecord();
+ return recordWithMetadata != null && recordWithMetadata.getClusterRole() != null
+ && recordWithMetadata.getClusterRole().isMutationBlocked();
}
return false;
}
@@ -146,13 +143,8 @@ public void invalidateHAGroupStoreClient(boolean broadcastUpdate) throws Excepti
*/
public void invalidateHAGroupStoreClient(final String haGroupName, boolean broadcastUpdate)
throws Exception {
- HAGroupStoreClient haGroupStoreClient =
- HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl);
- if (haGroupStoreClient != null) {
- haGroupStoreClient.rebuild();
- } else {
- throw new IOException("HAGroupStoreClient is not initialized");
- }
+ HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+ haGroupStoreClient.rebuild();
}
/**
@@ -163,13 +155,22 @@ public void invalidateHAGroupStoreClient(final String haGroupName, boolean broad
* @throws IOException when HAGroupStoreClient is not healthy.
*/
public Optional getHAGroupStoreRecord(final String haGroupName)
- throws IOException, SQLException {
- HAGroupStoreClient haGroupStoreClient =
- HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl);
- if (haGroupStoreClient != null) {
- return Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecord());
- }
- throw new IOException("HAGroupStoreClient is not initialized");
+ throws IOException {
+ HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+ return Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecord());
+ }
+
+ /**
+ * Returns the HAGroupStoreRecord for a specific HA group from peer cluster.
+ * @param haGroupName name of the HA group
+ * @return Optional HAGroupStoreRecord for the HA group from peer cluster can be empty if the HA
+ * group is not found or peer cluster is not available.
+ * @throws IOException when HAGroupStoreClient is not healthy.
+ */
+ public Optional getPeerHAGroupStoreRecord(final String haGroupName)
+ throws IOException {
+ HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+ return Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecordFromPeer());
}
/**
@@ -178,15 +179,9 @@ public Optional getHAGroupStoreRecord(final String haGroupNa
* @throws IOException when HAGroupStoreClient is not healthy.
*/
public void setHAGroupStatusToStoreAndForward(final String haGroupName) throws IOException,
- StaleHAGroupStoreRecordVersionException, InvalidClusterRoleTransitionException, SQLException {
- HAGroupStoreClient haGroupStoreClient =
- HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl);
- if (haGroupStoreClient != null) {
- haGroupStoreClient
- .setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
- } else {
- throw new IOException("HAGroupStoreClient is not initialized");
- }
+ StaleHAGroupStoreRecordVersionException, InvalidClusterRoleTransitionException {
+ HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+ haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
}
/**
@@ -194,15 +189,65 @@ public void setHAGroupStatusToStoreAndForward(final String haGroupName) throws I
* @param haGroupName name of the HA group
* @throws IOException when HAGroupStoreClient is not healthy.
*/
- public void setHAGroupStatusRecordToSync(final String haGroupName) throws IOException,
- StaleHAGroupStoreRecordVersionException, InvalidClusterRoleTransitionException, SQLException {
- HAGroupStoreClient haGroupStoreClient =
- HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl);
- if (haGroupStoreClient != null) {
- haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
- } else {
- throw new IOException("HAGroupStoreClient is not initialized");
+ public void setHAGroupStatusToSync(final String haGroupName) throws IOException,
+ StaleHAGroupStoreRecordVersionException, InvalidClusterRoleTransitionException {
+ HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+ haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
+ }
+
+ /**
+ * Sets the HAGroupStoreRecord to degrade reader functionality in local cluster. Transitions from
+ * STANDBY to DEGRADED_STANDBY_FOR_READER or from DEGRADED_STANDBY_FOR_WRITER to DEGRADED_STANDBY.
+ * @param haGroupName name of the HA group
+ * @throws IOException when HAGroupStoreClient is not healthy.
+ * @throws InvalidClusterRoleTransitionException when the current state cannot transition to a
+ * degraded reader state
+ */
+ public void setReaderToDegraded(final String haGroupName) throws IOException,
+ StaleHAGroupStoreRecordVersionException, InvalidClusterRoleTransitionException {
+ HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+ HAGroupStoreRecord currentRecord = haGroupStoreClient.getHAGroupStoreRecord();
+
+ if (currentRecord == null) {
+ throw new IOException("Current HAGroupStoreRecord is null for HA group: " + haGroupName);
+ }
+
+ HAGroupStoreRecord.HAGroupState currentState = currentRecord.getHAGroupState();
+ HAGroupStoreRecord.HAGroupState targetState =
+ HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER;
+
+ if (currentState == HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER) {
+ targetState = HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY;
}
+
+ haGroupStoreClient.setHAGroupStatusIfNeeded(targetState);
+ }
+
+ /**
+ * Sets the HAGroupStoreRecord to restore reader functionality in local cluster. Transitions from
+ * DEGRADED_STANDBY_FOR_READER to STANDBY or from DEGRADED_STANDBY to DEGRADED_STANDBY_FOR_WRITER.
+ * @param haGroupName name of the HA group
+ * @throws IOException when HAGroupStoreClient is not healthy.
+ * @throws InvalidClusterRoleTransitionException when the current state cannot transition to a
+ * healthy reader state
+ */
+ public void setReaderToHealthy(final String haGroupName) throws IOException,
+ StaleHAGroupStoreRecordVersionException, InvalidClusterRoleTransitionException {
+ HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+ HAGroupStoreRecord currentRecord = haGroupStoreClient.getHAGroupStoreRecord();
+
+ if (currentRecord == null) {
+ throw new IOException("Current HAGroupStoreRecord is null for HA group: " + haGroupName);
+ }
+
+ HAGroupStoreRecord.HAGroupState currentState = currentRecord.getHAGroupState();
+ HAGroupStoreRecord.HAGroupState targetState = HAGroupStoreRecord.HAGroupState.STANDBY;
+
+ if (currentState == HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY) {
+ targetState = HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER;
+ }
+
+ haGroupStoreClient.setHAGroupStatusIfNeeded(targetState);
}
/**
@@ -212,13 +257,62 @@ public void setHAGroupStatusRecordToSync(final String haGroupName) throws IOExce
* @return ClusterRoleRecord for the cluster pair
* @throws IOException when HAGroupStoreClient is not healthy.
*/
- public ClusterRoleRecord getClusterRoleRecord(String haGroupName)
- throws IOException, SQLException {
+ public ClusterRoleRecord getClusterRoleRecord(String haGroupName) throws IOException {
+ HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName);
+ return haGroupStoreClient.getClusterRoleRecord();
+ }
+
+ /**
+ * Subscribe to be notified when any transition to a target state occurs.
+ * @param haGroupName the name of the HA group to monitor
+ * @param targetState the target state to watch for
+ * @param clusterType whether to monitor local or peer cluster
+ * @param listener the listener to notify when any transition to the target state occurs
+ * @throws IOException if unable to get HAGroupStoreClient instance
+ */
+ public void subscribeToTargetState(String haGroupName,
+ HAGroupStoreRecord.HAGroupState targetState, ClusterType clusterType,
+ HAGroupStateListener listener) throws IOException {
+ HAGroupStoreClient client = getHAGroupStoreClient(haGroupName);
+ client.subscribeToTargetState(targetState, clusterType, listener);
+ LOGGER.debug(
+ "Delegated subscription to target state {} " + "for HA group {} on {} cluster to client",
+ targetState, haGroupName, clusterType);
+ }
+
+ /**
+ * Unsubscribe from target state notifications.
+ * @param haGroupName the name of the HA group
+ * @param targetState the target state
+ * @param clusterType whether monitoring local or peer cluster
+ * @param listener the listener to remove
+ */
+ public void unsubscribeFromTargetState(String haGroupName,
+ HAGroupStoreRecord.HAGroupState targetState, ClusterType clusterType,
+ HAGroupStateListener listener) {
+ try {
+ HAGroupStoreClient client = getHAGroupStoreClient(haGroupName);
+ client.unsubscribeFromTargetState(targetState, clusterType, listener);
+ LOGGER.debug("Delegated unsubscription from target state {} "
+ + "for HA group {} on {} cluster to client", targetState, haGroupName, clusterType);
+ } catch (IOException e) {
+ LOGGER.warn("HAGroupStoreClient not found for HA group: {} - cannot unsubscribe: {}",
+ haGroupName, e.getMessage());
+ }
+ }
+
+ /**
+ * Helper method to get HAGroupStoreClient instance with consistent error handling.
+ * @param haGroupName name of the HA group
+ * @return HAGroupStoreClient instance for the specified HA group
+ * @throws IOException when HAGroupStoreClient is not initialized
+ */
+ private HAGroupStoreClient getHAGroupStoreClient(final String haGroupName) throws IOException {
HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl);
- if (haGroupStoreClient != null) {
- return haGroupStoreClient.getClusterRoleRecord();
+ if (haGroupStoreClient == null) {
+ throw new IOException("HAGroupStoreClient is not initialized for HA group: " + haGroupName);
}
- throw new IOException("HAGroupStoreClient is not initialized");
+ return haGroupStoreClient;
}
}
diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
index e3e1a4a628b..4077a4b8194 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java
@@ -152,17 +152,27 @@ public static HAGroupState from(byte[] bytes) {
private final String protocolVersion;
private final String haGroupName;
private final HAGroupState haGroupState;
+ private final Long lastSyncStateTimeInMs;
@JsonCreator
public HAGroupStoreRecord(@JsonProperty("protocolVersion") String protocolVersion,
@JsonProperty("haGroupName") String haGroupName,
- @JsonProperty("haGroupState") HAGroupState haGroupState) {
+ @JsonProperty("haGroupState") HAGroupState haGroupState,
+ @JsonProperty("lastSyncStateTimeInMs") Long lastSyncStateTimeInMs) {
Preconditions.checkNotNull(haGroupName, "HA group name cannot be null!");
Preconditions.checkNotNull(haGroupState, "HA group state cannot be null!");
this.protocolVersion = Objects.toString(protocolVersion, DEFAULT_PROTOCOL_VERSION);
this.haGroupName = haGroupName;
this.haGroupState = haGroupState;
+ this.lastSyncStateTimeInMs = lastSyncStateTimeInMs;
+ }
+
+ /**
+ * Convenience constructor for backward compatibility without lastSyncStateTimeInMs.
+ */
+ public HAGroupStoreRecord(String protocolVersion, String haGroupName, HAGroupState haGroupState) {
+ this(protocolVersion, haGroupName, haGroupState, null);
}
public static Optional fromJson(byte[] bytes) {
@@ -183,7 +193,8 @@ public static byte[] toJson(HAGroupStoreRecord record) throws IOException {
public boolean hasSameInfo(HAGroupStoreRecord other) {
return haGroupName.equals(other.haGroupName) && haGroupState.equals(other.haGroupState)
- && protocolVersion.equals(other.protocolVersion);
+ && protocolVersion.equals(other.protocolVersion)
+ && Objects.equals(lastSyncStateTimeInMs, other.lastSyncStateTimeInMs);
}
public String getProtocolVersion() {
@@ -199,6 +210,10 @@ public HAGroupState getHAGroupState() {
return haGroupState;
}
+ public Long getLastSyncStateTimeInMs() {
+ return lastSyncStateTimeInMs;
+ }
+
@JsonIgnore
public ClusterRoleRecord.ClusterRole getClusterRole() {
return haGroupState.getClusterRole();
@@ -207,7 +222,7 @@ public ClusterRoleRecord.ClusterRole getClusterRole() {
@Override
public int hashCode() {
return new HashCodeBuilder().append(protocolVersion).append(haGroupName).append(haGroupState)
- .hashCode();
+ .append(lastSyncStateTimeInMs).hashCode();
}
@Override
@@ -222,14 +237,15 @@ public boolean equals(Object other) {
HAGroupStoreRecord otherRecord = (HAGroupStoreRecord) other;
return new EqualsBuilder().append(protocolVersion, otherRecord.protocolVersion)
.append(haGroupName, otherRecord.haGroupName).append(haGroupState, otherRecord.haGroupState)
- .isEquals();
+ .append(lastSyncStateTimeInMs, otherRecord.lastSyncStateTimeInMs).isEquals();
}
}
@Override
public String toString() {
return "HAGroupStoreRecord{" + "protocolVersion='" + protocolVersion + '\'' + ", haGroupName='"
- + haGroupName + '\'' + ", haGroupState=" + haGroupState + '}';
+ + haGroupName + '\'' + ", haGroupState=" + haGroupState + ", lastSyncStateTimeInMs="
+ + lastSyncStateTimeInMs + '}';
}
public String toPrettyString() {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
index e34513b135b..9f7a3108211 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java
@@ -17,7 +17,7 @@
*/
package org.apache.phoenix.end2end;
-import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -85,7 +85,7 @@ public void testGetClusterRoleRecordAndInvalidate() throws Exception {
ServerRpcController controller = new ServerRpcController();
try (PhoenixHAAdmin peerHAAdmin = new PhoenixHAAdmin(
- CLUSTERS.getHBaseCluster2().getConfiguration(), ZK_CONSISTENT_HA_NAMESPACE)) {
+ CLUSTERS.getHBaseCluster2().getConfiguration(), ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE)) {
HAGroupStoreRecord peerHAGroupStoreRecord = new HAGroupStoreRecord(
HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, HAGroupState.STANDBY);
peerHAAdmin.createHAGroupStoreRecordInZooKeeper(peerHAGroupStoreRecord);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
index c24e15b2ab7..4dc59e93959 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java
@@ -17,7 +17,7 @@
*/
package org.apache.phoenix.end2end.index;
-import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
import static org.junit.Assert.assertEquals;
@@ -76,7 +76,7 @@ public static synchronized void doSetup() throws Exception {
@Before
public void setUp() throws Exception {
- haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_NAMESPACE);
+ haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
String zkUrl = getLocalZkUrl(config);
String peerZkUrl = CLUSTERS.getZkUrl2();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java
new file mode 100644
index 00000000000..a55302ffa53
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
+import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
+import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.util.HAGroupStoreTestUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+/**
+ * Integration tests for HA Group State Change subscription functionality. Tests the new
+ * subscription system where HAGroupStoreClient directly manages subscriptions and
+ * HAGroupStoreManager acts as a passthrough.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class HAGroupStateSubscriptionIT extends BaseTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HAGroupStateSubscriptionIT.class);
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private PhoenixHAAdmin haAdmin;
+ private PhoenixHAAdmin peerHaAdmin;
+ private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 2000L;
+ private String zkUrl;
+ private String peerZKUrl;
+ private static final HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS =
+ new HighAvailabilityTestingUtility.HBaseTestingUtilityPair();
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map props = Maps.newHashMapWithExpectedSize(2);
+ props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true");
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ CLUSTERS.start();
+ }
+
+ @Before
+ public void before() throws Exception {
+ haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+ zkUrl = getLocalZkUrl(config);
+ this.peerZKUrl = CLUSTERS.getZkUrl2();
+ peerHaAdmin = new PhoenixHAAdmin(peerZKUrl, config, ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+
+ // Clean up existing HAGroupStoreRecords
+ try {
+ List haGroupNames = HAGroupStoreClient.getHAGroupNames(zkUrl);
+ for (String haGroupName : haGroupNames) {
+ haAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName));
+ peerHaAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName));
+ }
+
+ } catch (Exception e) {
+ // Ignore cleanup errors
+ }
+ // Remove any existing entries in the system table
+ HAGroupStoreTestUtil.deleteAllHAGroupRecordsInSystemTable(zkUrl);
+
+ // Insert a HAGroupStoreRecord into the system table
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), zkUrl,
+ peerZKUrl, ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY, null);
+ }
+
+ // ========== Multi-Cluster & Basic Subscription Tests ==========
+
+ @Test
+ public void testDifferentTargetStatesPerCluster() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ // Track notifications
+ AtomicInteger localNotifications = new AtomicInteger(0);
+ AtomicInteger peerNotifications = new AtomicInteger(0);
+ AtomicReference lastLocalClusterType = new AtomicReference<>();
+ AtomicReference lastPeerClusterType = new AtomicReference<>();
+
+ // Create listeners for different target states
+ HAGroupStateListener localListener = (groupName, toState, modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType == ClusterType.LOCAL) {
+ localNotifications.incrementAndGet();
+ lastLocalClusterType.set(clusterType);
+ LOGGER.info("Local target state listener called: {} on {}", toState, clusterType);
+ }
+ };
+
+ HAGroupStateListener peerListener = (groupName, toState, modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_NOT_IN_SYNC && clusterType == ClusterType.PEER) {
+ peerNotifications.incrementAndGet();
+ lastPeerClusterType.set(clusterType);
+ LOGGER.info("Peer target state listener called: {} on {}", toState, clusterType);
+ }
+ };
+
+ // Subscribe to different target states on different clusters
+ manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY_TO_ACTIVE, ClusterType.LOCAL,
+ localListener);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC, ClusterType.PEER,
+ peerListener);
+
+ // Trigger transition to STANDBY_TO_ACTIVE on LOCAL cluster
+ HAGroupStoreRecord localRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY_TO_ACTIVE);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Trigger transition to STANDBY on PEER cluster
+ HAGroupStoreRecord peerRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC);
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify no cross-cluster triggering
+ assertEquals("Local cluster should receive its target state notification", 1,
+ localNotifications.get());
+ assertEquals("Peer cluster should receive its target state notification", 1,
+ peerNotifications.get());
+ assertEquals("Local notification should have LOCAL cluster type", ClusterType.LOCAL,
+ lastLocalClusterType.get());
+ assertEquals("Peer notification should have PEER cluster type", ClusterType.PEER,
+ lastPeerClusterType.get());
+
+ }
+
+ @Test
+ public void testUnsubscribeSpecificCluster() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ // Track notifications
+ AtomicInteger totalNotifications = new AtomicInteger(0);
+ AtomicReference lastClusterType = new AtomicReference<>();
+
+ HAGroupStateListener listener = (groupName, toState, modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.STANDBY) {
+ totalNotifications.incrementAndGet();
+ lastClusterType.set(clusterType);
+ LOGGER.info("Listener called: {} on {}", toState, clusterType);
+ }
+ };
+
+ // Subscribe to same target state on both clusters
+ manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, ClusterType.LOCAL, listener);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, ClusterType.PEER, listener);
+
+ // Unsubscribe from LOCAL cluster only
+ manager.unsubscribeFromTargetState(haGroupName, HAGroupState.STANDBY, ClusterType.LOCAL,
+ listener);
+
+ // Trigger transition to STANDBY on LOCAL → should NOT call listener
+ HAGroupStoreRecord localRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ assertEquals("Should receive no notifications from LOCAL cluster", 0, totalNotifications.get());
+
+ // Trigger transition to STANDBY on PEER → should call listener
+ HAGroupStoreRecord peerRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY);
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ assertEquals("Should receive notification only from PEER cluster", 1, totalNotifications.get());
+ assertEquals("Notification should be from PEER cluster", ClusterType.PEER,
+ lastClusterType.get());
+
+ }
+
+ // ========== Multiple Listeners Tests ==========
+
+ @Test
+ public void testMultipleListenersMultipleClusters() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ // Track notifications from multiple listeners
+ AtomicInteger listener1LocalNotifications = new AtomicInteger(0);
+ AtomicInteger listener2LocalNotifications = new AtomicInteger(0);
+ AtomicInteger listener1PeerNotifications = new AtomicInteger(0);
+ AtomicInteger listener2PeerNotifications = new AtomicInteger(0);
+
+ HAGroupStateListener listener1 = (groupName, toState, modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.DEGRADED_STANDBY) {
+ if (clusterType == ClusterType.LOCAL) {
+ listener1LocalNotifications.incrementAndGet();
+ } else {
+ listener1PeerNotifications.incrementAndGet();
+ }
+ LOGGER.info("Listener1 called: {} on {}", toState, clusterType);
+ }
+ };
+
+ HAGroupStateListener listener2 = (groupName, toState, modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.DEGRADED_STANDBY) {
+ if (clusterType == ClusterType.LOCAL) {
+ listener2LocalNotifications.incrementAndGet();
+ } else {
+ listener2PeerNotifications.incrementAndGet();
+ }
+ LOGGER.info("Listener2 called: {} on {}", toState, clusterType);
+ }
+ };
+
+ // Register multiple listeners for same target state on both clusters
+ manager.subscribeToTargetState(haGroupName, HAGroupState.DEGRADED_STANDBY, ClusterType.LOCAL,
+ listener1);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.DEGRADED_STANDBY, ClusterType.LOCAL,
+ listener2);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.DEGRADED_STANDBY, ClusterType.PEER,
+ listener1);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.DEGRADED_STANDBY, ClusterType.PEER,
+ listener2);
+
+ // Trigger transition to DEGRADED_STANDBY on LOCAL
+ HAGroupStoreRecord localRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.DEGRADED_STANDBY);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Trigger transition to DEGRADED_STANDBY on PEER
+ HAGroupStoreRecord peerRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.DEGRADED_STANDBY);
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify all listeners called for each cluster
+ assertEquals("Listener1 should receive LOCAL notification", 1,
+ listener1LocalNotifications.get());
+ assertEquals("Listener2 should receive LOCAL notification", 1,
+ listener2LocalNotifications.get());
+ assertEquals("Listener1 should receive PEER notification", 1, listener1PeerNotifications.get());
+ assertEquals("Listener2 should receive PEER notification", 1, listener2PeerNotifications.get());
+
+ }
+
+ @Test
+ public void testSameListenerDifferentTargetStates() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ // Track which target states were reached
+ AtomicInteger stateANotifications = new AtomicInteger(0);
+ AtomicInteger stateBNotifications = new AtomicInteger(0);
+ AtomicReference lastStateAClusterType = new AtomicReference<>();
+ AtomicReference lastStateBClusterType = new AtomicReference<>();
+
+ HAGroupStateListener sharedListener = (groupName, toState, modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY && clusterType == ClusterType.LOCAL) {
+ stateANotifications.incrementAndGet();
+ lastStateAClusterType.set(clusterType);
+ LOGGER.info("Shared listener - Target State A: {} on {}", toState, clusterType);
+ } else if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == ClusterType.PEER) {
+ stateBNotifications.incrementAndGet();
+ lastStateBClusterType.set(clusterType);
+ LOGGER.info("Shared listener - Target State B: {} on {}", toState, clusterType);
+ }
+ };
+
+ // Register same listener for different target states on different clusters
+ manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY,
+ ClusterType.LOCAL, sharedListener);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.PEER,
+ sharedListener);
+
+ // Trigger target state A on LOCAL
+ HAGroupStoreRecord localRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Trigger target state B on PEER
+ HAGroupStoreRecord peerRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify listener called for each appropriate target state/cluster combination
+ assertEquals("Should receive target state A notification", 1, stateANotifications.get());
+ assertEquals("Should receive target state B notification", 1, stateBNotifications.get());
+ assertEquals("Target state A should be from LOCAL cluster", ClusterType.LOCAL,
+ lastStateAClusterType.get());
+ assertEquals("Target state B should be from PEER cluster", ClusterType.PEER,
+ lastStateBClusterType.get());
+
+ }
+
+ // ========== Edge Cases & Error Handling ==========
+
+ @Test
+ public void testSubscriptionToNonExistentHAGroup() throws Exception {
+ String nonExistentHAGroup = "nonExistentGroup_" + testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ HAGroupStateListener listener = (groupName, toState, modifiedTime, clusterType) -> {
+ // Should not be called
+ };
+
+ // Try to subscribe to non-existent HA group
+ try {
+ manager.subscribeToTargetState(nonExistentHAGroup, HAGroupState.STANDBY, ClusterType.LOCAL,
+ listener);
+ fail("Expected IOException for non-existent HA group");
+ } catch (IOException e) {
+ assertTrue("Exception should mention the HA group name",
+ e.getMessage().contains(nonExistentHAGroup));
+ LOGGER.info("Correctly caught exception for non-existent HA group: {}", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testListenerExceptionIsolation() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ // Track notifications
+ AtomicInteger goodListener1Notifications = new AtomicInteger(0);
+ AtomicInteger goodListener2Notifications = new AtomicInteger(0);
+ AtomicInteger badListenerCalls = new AtomicInteger(0);
+
+ HAGroupStateListener goodListener1 = (groupName, toState, modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_IN_SYNC) {
+ goodListener1Notifications.incrementAndGet();
+ LOGGER.info("Good listener 1 called: {} on {}", toState, clusterType);
+ }
+ };
+
+ HAGroupStateListener badListener = (groupName, toState, modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_IN_SYNC) {
+ badListenerCalls.incrementAndGet();
+ LOGGER.info("Bad listener called, about to throw exception");
+ throw new RuntimeException("Test exception from bad listener");
+ }
+ };
+
+ HAGroupStateListener goodListener2 = (groupName, toState, modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_IN_SYNC) {
+ goodListener2Notifications.incrementAndGet();
+ LOGGER.info("Good listener 2 called: {} on {}", toState, clusterType);
+ }
+ };
+
+ // Register listeners - bad listener in the middle
+ manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL,
+ goodListener1);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL,
+ badListener);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL,
+ goodListener2);
+
+ // Trigger transition to target state
+ HAGroupStoreRecord transitionRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, transitionRecord, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify all listeners were called despite exception in bad listener
+ assertEquals("Good listener 1 should be called", 1, goodListener1Notifications.get());
+ assertEquals("Good listener 2 should be called", 1, goodListener2Notifications.get());
+ assertEquals("Bad listener should be called", 1, badListenerCalls.get());
+ }
+
+ // ========== Performance & Concurrency Tests ==========
+
+ @Test
+ public void testConcurrentMultiClusterSubscriptions() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ final int threadCount = 10;
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch completionLatch = new CountDownLatch(threadCount);
+ final AtomicInteger successfulSubscriptions = new AtomicInteger(0);
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+ // Create concurrent subscription tasks
+ for (int i = 0; i < threadCount; i++) {
+ final int threadIndex = i;
+ executor.submit(() -> {
+ try {
+ startLatch.await(); // Wait for all threads to be ready
+
+ HAGroupStateListener listener = (groupName, toState, modifiedTime, clusterType) -> {
+ LOGGER.debug("Thread {} listener called: {} on {}", threadIndex, toState, clusterType);
+ };
+
+ // Half subscribe to LOCAL, half to PEER
+ ClusterType clusterType = (threadIndex % 2 == 0) ? ClusterType.LOCAL : ClusterType.PEER;
+
+ manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, clusterType,
+ listener);
+ successfulSubscriptions.incrementAndGet();
+
+ // Also test unsubscribe
+ manager.unsubscribeFromTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, clusterType,
+ listener);
+
+ } catch (Exception e) {
+ LOGGER.error("Thread {} failed", threadIndex, e);
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ // Start all threads simultaneously
+ startLatch.countDown();
+
+ // Wait for completion
+ assertTrue("All threads should complete within timeout",
+ completionLatch.await(30, TimeUnit.SECONDS));
+ assertEquals("All threads should successfully subscribe", threadCount,
+ successfulSubscriptions.get());
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void testHighFrequencyMultiClusterChanges() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ // Track notifications
+ AtomicInteger localNotifications = new AtomicInteger(0);
+ AtomicInteger peerNotifications = new AtomicInteger(0);
+
+ HAGroupStateListener listener = (groupName, toState, modifiedTime, clusterType) -> {
+ if (clusterType == ClusterType.LOCAL) {
+ localNotifications.incrementAndGet();
+ } else {
+ peerNotifications.incrementAndGet();
+ }
+ LOGGER.debug("High frequency listener: {} on {}", toState, clusterType);
+ };
+
+ // Subscribe to target state on both clusters
+ manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, ClusterType.LOCAL, listener);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, ClusterType.PEER, listener);
+
+ // Rapidly alternate state changes on both clusters
+ final int changeCount = 5;
+ HAGroupStoreRecord initialPeerRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.DEGRADED_STANDBY_FOR_WRITER);
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(initialPeerRecord);
+
+ for (int i = 0; i < changeCount; i++) {
+ // Change local cluster
+ HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0", haGroupName,
+ (i % 2 == 0) ? HAGroupState.STANDBY : HAGroupState.DEGRADED_STANDBY_FOR_READER);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, -1);
+
+ // Change peer cluster
+ HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0", haGroupName,
+ (i % 2 == 0) ? HAGroupState.STANDBY : HAGroupState.DEGRADED_STANDBY_FOR_WRITER);
+ peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, peerRecord, -1);
+
+ // Small delay between changes
+ Thread.sleep(500);
+ }
+
+ // Final wait for all events to propagate
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify transitions detected on both clusters
+ // Expected: 3 transitions to STANDBY state (i=0,2,4 → STANDBY)
+ assertEquals("Should detect exactly 3 local cluster transitions to STANDBY", 3,
+ localNotifications.get());
+ assertEquals("Should detect exactly 3 peer cluster transitions to STANDBY", 3,
+ peerNotifications.get());
+
+ LOGGER.info("Detected {} local and {} peer notifications", localNotifications.get(),
+ peerNotifications.get());
+
+ }
+
+ // ========== Cleanup & Resource Management Tests ==========
+
+ @Test
+ public void testSubscriptionCleanupPerCluster() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config);
+
+ // Track notifications to verify functionality
+ AtomicInteger localActiveNotifications = new AtomicInteger(0);
+ AtomicInteger peerActiveNotifications = new AtomicInteger(0);
+ AtomicInteger localStandbyNotifications = new AtomicInteger(0);
+ AtomicInteger peerStandbyNotifications = new AtomicInteger(0);
+
+ // Create listeners that track which ones are called
+ HAGroupStateListener listener1 = (groupName, toState, modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == ClusterType.LOCAL) {
+ localActiveNotifications.incrementAndGet();
+ LOGGER.info("Listener1 LOCAL ACTIVE_IN_SYNC: {}", toState);
+ }
+ };
+
+ HAGroupStateListener listener2 = (groupName, toState, modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == ClusterType.LOCAL) {
+ localActiveNotifications.incrementAndGet();
+ LOGGER.info("Listener2 LOCAL ACTIVE_IN_SYNC: {}", toState);
+ } else if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType == ClusterType.PEER) {
+ peerActiveNotifications.incrementAndGet();
+ LOGGER.info("Listener2 PEER STANDBY_TO_ACTIVE: {}", toState);
+ }
+ };
+
+ HAGroupStateListener listener3 = (groupName, toState, modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType == ClusterType.PEER) {
+ peerActiveNotifications.incrementAndGet();
+ LOGGER.info("Listener3 PEER STANDBY_TO_ACTIVE: {}", toState);
+ }
+ };
+
+ HAGroupStateListener listener4 = (groupName, toState, modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == ClusterType.LOCAL) {
+ localStandbyNotifications.incrementAndGet();
+ LOGGER.info("Listener4 LOCAL ACTIVE_IN_SYNC: {}", toState);
+ } else if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType == ClusterType.PEER) {
+ peerStandbyNotifications.incrementAndGet();
+ LOGGER.info("Listener4 PEER STANDBY_TO_ACTIVE: {}", toState);
+ }
+ };
+
+ // Subscribe listeners to both clusters for target states
+ manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL,
+ listener1);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL,
+ listener2);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER,
+ listener2);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER,
+ listener3);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL,
+ listener4);
+ manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER,
+ listener4);
+
+ // Test initial functionality - trigger ACTIVE_IN_SYNC on LOCAL
+ HAGroupStoreRecord localActiveRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localActiveRecord, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Should have 2 notifications for LOCAL ACTIVE_NOT_IN_SYNC (listener1 + listener2)
+ assertEquals("Should have 2 LOCAL ACTIVE_IN_SYNC notifications initially", 2,
+ localActiveNotifications.get());
+
+ // Test initial functionality - trigger STANDBY_TO_ACTIVE on PEER
+ HAGroupStoreRecord peerActiveRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY_TO_ACTIVE);
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerActiveRecord);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Should have 2 notifications for PEER STANDBY_TO_ACTIVE (listener2 + listener3)
+ assertEquals("Should have 2 PEER STANDBY_TO_ACTIVE notifications initially", 2,
+ peerActiveNotifications.get());
+
+ // Reset counters for cleanup testing
+ localActiveNotifications.set(0);
+ peerActiveNotifications.set(0);
+
+ // Unsubscribe selectively
+ manager.unsubscribeFromTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL,
+ listener1);
+ manager.unsubscribeFromTargetState(haGroupName, HAGroupState.STANDBY_TO_ACTIVE,
+ ClusterType.PEER, listener2);
+ manager.unsubscribeFromTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL,
+ listener4);
+
+ // Test after partial unsubscribe - trigger ACTIVE_IN_SYNC on LOCAL again by first changing to
+ // some other state.
+ HAGroupStoreRecord localActiveRecord2 =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localActiveRecord2, 1);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ HAGroupStoreRecord localActiveRecord3 =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_IN_SYNC);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localActiveRecord3, 2);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Should have only 1 notification for LOCAL ACTIVE_IN_SYNC (only listener2 remains)
+ assertEquals("Should have 1 LOCAL ACTIVE_IN_SYNC notification after partial unsubscribe", 1,
+ localActiveNotifications.get());
+
+ // Test after partial unsubscribe - trigger STANDBY_TO_ACTIVE on PEER again
+ HAGroupStoreRecord peerActiveRecord2 =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY);
+ peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, peerActiveRecord2, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ HAGroupStoreRecord peerActiveRecord3 =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY_TO_ACTIVE);
+ peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, peerActiveRecord3, 1);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Should have only 1 notification for PEER STANDBY_TO_ACTIVE (only listener3 remains)
+ assertEquals("Should have 1 PEER STANDBY_TO_ACTIVE notification after partial unsubscribe", 1,
+ peerActiveNotifications.get());
+
+ // Reset counters again
+ localActiveNotifications.set(0);
+ peerActiveNotifications.set(0);
+
+ // Unsubscribe all remaining listeners
+ manager.unsubscribeFromTargetState(haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC,
+ ClusterType.LOCAL, listener2);
+ manager.unsubscribeFromTargetState(haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC,
+ ClusterType.PEER, listener3);
+ manager.unsubscribeFromTargetState(haGroupName, HAGroupState.STANDBY, ClusterType.PEER,
+ listener4);
+
+ // Test after complete unsubscribe - trigger ACTIVE_NOT_IN_SYNC on both clusters
+ HAGroupStoreRecord localActiveRecord4 =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localActiveRecord4, 3);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ HAGroupStoreRecord peerActiveRecord4 =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC);
+ peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, peerActiveRecord4, 2);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Should have no notifications after complete unsubscribe
+ assertEquals("Should have 0 LOCAL ACTIVE_NOT_IN_SYNC notifications after complete unsubscribe",
+ 0, localActiveNotifications.get());
+ assertEquals("Should have 0 PEER ACTIVE_NOT_IN_SYNC notifications after complete unsubscribe",
+ 0, peerActiveNotifications.get());
+
+ // Test that new subscriptions still work properly
+ AtomicInteger newSubscriptionNotifications = new AtomicInteger(0);
+ HAGroupStateListener newTestListener = (groupName, toState, modifiedTime, clusterType) -> {
+ if (toState == HAGroupState.STANDBY && clusterType == ClusterType.LOCAL) {
+ newSubscriptionNotifications.incrementAndGet();
+ LOGGER.info("New subscription triggered: {} on {} at {}", toState, clusterType,
+ modifiedTime);
+ }
+ };
+
+ // Subscribe with new test listener
+ manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, ClusterType.LOCAL,
+ newTestListener);
+
+ // Trigger STANDBY state and verify new subscription works
+ HAGroupStoreRecord standbyRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY);
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, standbyRecord, 4);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Expected: exactly 1 notification for the new subscription
+ assertEquals("New subscription should receive exactly 1 notification", 1,
+ newSubscriptionNotifications.get());
+
+ LOGGER.info(
+ "Subscription cleanup test completed successfully with {} notifications from new subscription",
+ newSubscriptionNotifications.get());
+
+ }
+
+ /**
+ * Helper method to access private subscription maps via reflection
+ */
+ @SuppressWarnings("unchecked")
+ private ConcurrentHashMap>
+ getSubscriptionMap(HAGroupStoreClient client, String fieldName) throws Exception {
+ Field field = HAGroupStoreClient.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return (ConcurrentHashMap>) field.get(client);
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index f5df507c7bf..da3f29d2c68 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -18,7 +18,7 @@
package org.apache.phoenix.jdbc;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
-import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
@@ -93,9 +93,9 @@ public static synchronized void doSetup() throws Exception {
@Before
public void before() throws Exception {
haAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(),
- ZK_CONSISTENT_HA_NAMESPACE);
+ ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
peerHaAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
- ZK_CONSISTENT_HA_NAMESPACE);
+ ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName()));
zkUrl = getLocalZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration());
@@ -468,6 +468,8 @@ public void testHAGroupStoreClientWithRootPathDeletion() throws Exception {
assertNotNull(currentRecord);
assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
currentRecord.getHAGroupState());
+ // The record should have a timestamp
+ assertNotNull(currentRecord.getLastSyncStateTimeInMs());
record1 = new HAGroupStoreRecord("v1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC_TO_STANDBY);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
index a4ae0aeb255..16d4545e7b4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
@@ -17,24 +17,34 @@
*/
package org.apache.phoenix.jdbc;
-import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
+import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
+import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_SESSION_TIMEOUT_MULTIPLIER;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.exception.InvalidClusterRoleTransitionException;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.util.HAGroupStoreTestUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -64,13 +74,14 @@ public class HAGroupStoreManagerIT extends BaseTest {
public static synchronized void doSetup() throws Exception {
Map props = Maps.newHashMapWithExpectedSize(2);
props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true");
+ props.put(ZK_SESSION_TIMEOUT, String.valueOf(30 * 1000));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
CLUSTERS.start();
}
@Before
public void before() throws Exception {
- haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_NAMESPACE);
+ haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
zkUrl = getLocalZkUrl(config);
this.peerZKUrl = CLUSTERS.getZkUrl2();
@@ -167,12 +178,97 @@ public void testGetHAGroupStoreRecord() throws Exception {
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
assertTrue(retrievedOpt.isPresent());
- // Record for comparison
- HAGroupStoreRecord record = new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
- haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
+ // Get MTime from HAAdmin for equality verification below.
+ Pair currentRecordAndStat =
+ haAdmin.getHAGroupStoreRecordInZooKeeper(haGroupName);
- // Complete object comparison instead of field-by-field
- assertEquals(record, retrievedOpt.get());
+ // Complete object comparison field-by-field
+ assertEquals(haGroupName, retrievedOpt.get().getHaGroupName());
+ assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
+ retrievedOpt.get().getHAGroupState());
+ Long lastSyncStateTimeInMs = retrievedOpt.get().getLastSyncStateTimeInMs();
+ Long mtime = currentRecordAndStat.getRight().getMtime();
+ // Allow a small margin of error
+ assertTrue(Math.abs(lastSyncStateTimeInMs - mtime) <= 1);
+ assertEquals(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
+ retrievedOpt.get().getProtocolVersion());
+ }
+
+ @Test
+ public void testGetPeerHAGroupStoreRecord() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config);
+
+ // Initially, peer record should not be present
+ Optional peerRecordOpt =
+ haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+ assertFalse(peerRecordOpt.isPresent());
+
+ // Create a peer HAAdmin to create records in peer cluster
+ PhoenixHAAdmin peerHaAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
+ ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
+
+ try {
+ // Create a HAGroupStoreRecord in the peer cluster
+ HAGroupStoreRecord peerRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY);
+
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Now peer record should be present
+ peerRecordOpt = haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+ assertTrue(peerRecordOpt.isPresent());
+
+ // Verify the peer record details
+ HAGroupStoreRecord retrievedPeerRecord = peerRecordOpt.get();
+ assertEquals(haGroupName, retrievedPeerRecord.getHaGroupName());
+ assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY, retrievedPeerRecord.getHAGroupState());
+ assertEquals(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION,
+ retrievedPeerRecord.getProtocolVersion());
+
+ // Delete peer record
+ peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Peer record should no longer be present
+ peerRecordOpt = haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+ assertFalse(peerRecordOpt.isPresent());
+
+ // Create peer record again with different state
+ HAGroupStoreRecord newPeerRecord = new HAGroupStoreRecord("1.0", haGroupName,
+ HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER);
+
+ peerHaAdmin.createHAGroupStoreRecordInZooKeeper(newPeerRecord);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify the updated peer record
+ peerRecordOpt = haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+ assertTrue(peerRecordOpt.isPresent());
+ assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER,
+ peerRecordOpt.get().getHAGroupState());
+
+ } finally {
+ // Clean up peer record
+ try {
+ peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName);
+ } catch (Exception e) {
+ // Ignore cleanup errors
+ }
+ peerHaAdmin.close();
+ }
+ }
+
+ @Test
+ public void testGetPeerHAGroupStoreRecordWhenHAGroupNotInSystemTable() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config);
+
+ // Try to get peer record for an HA group that doesn't exist in system table
+ Optional peerRecordOpt =
+ haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName);
+ assertFalse("Peer record should not be present for non-existent HA group",
+ peerRecordOpt.isPresent());
}
@Test
@@ -215,9 +311,13 @@ public void testMutationBlockDisabled() throws Exception {
conf.set(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "false");
conf.set(HConstants.ZOOKEEPER_QUORUM, getLocalZkUrl(config));
- HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config);
+ // Set the HAGroupStoreManager instance to null via reflection to force recreation
+ Field field = HAGroupStoreManager.class.getDeclaredField("haGroupStoreManagerInstance");
+ field.setAccessible(true);
+ field.set(null, null);
- // Create HAGroupStoreRecord with ACTIVE_TO_STANDBY role
+ HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config);
+ // Create HAGroupStoreRecord with ACTIVE_IN_SYNC_TO_STANDBY role
HAGroupStoreRecord transitionRecord = new HAGroupStoreRecord("1.0", haGroupName,
HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
@@ -226,6 +326,10 @@ public void testMutationBlockDisabled() throws Exception {
// Mutations should not be blocked even with ACTIVE_TO_STANDBY role
assertFalse(haGroupStoreManager.isMutationBlocked(haGroupName));
+
+ // Set the HAGroupStoreManager instance back to null via reflection to force
+ // recreation for other tests
+ field.set(null, null);
}
@Test
@@ -251,30 +355,49 @@ public void testSetHAGroupStatusToStoreAndForward() throws Exception {
HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
updatedRecord.getHAGroupState());
+ assertNotNull(updatedRecord.getLastSyncStateTimeInMs());
+
+ // Set the HA group status to store and forward again and verify
+ // that getLastSyncStateTimeInMs is same (ACTIVE_NOT_IN_SYNC)
+ // The time should only update when we move to AIS to ANIS
+ haGroupStoreManager.setHAGroupStatusToStoreAndForward(haGroupName);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ Optional updatedRecordOpt2 =
+ haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(updatedRecordOpt2.isPresent());
+ HAGroupStoreRecord updatedRecord2 = updatedRecordOpt.get();
+ assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
+ updatedRecord2.getHAGroupState());
+ assertEquals(updatedRecord.getLastSyncStateTimeInMs(),
+ updatedRecord2.getLastSyncStateTimeInMs());
}
@Test
- public void testSetHAGroupStatusRecordToSync() throws Exception {
+ public void testSetHAGroupStatusToSync() throws Exception {
String haGroupName = testName.getMethodName();
HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config);
- // Create an initial HAGroupStoreRecord with ACTIVE_NOT_IN_SYNC status
- HAGroupStoreRecord initialRecord = new HAGroupStoreRecord("1.0", haGroupName,
- HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
-
- haAdmin.createHAGroupStoreRecordInZooKeeper(initialRecord);
- Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+ // Initial record should be present in ACTIVE_NOT_IN_SYNC status
+ HAGroupStoreRecord initialRecord =
+ haGroupStoreManager.getHAGroupStoreRecord(haGroupName).orElse(null);
+ assertNotNull(initialRecord);
+ assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
+ initialRecord.getHAGroupState());
+ assertNotNull(initialRecord.getLastSyncStateTimeInMs());
- // Set the HA group status to sync (ACTIVE)
- haGroupStoreManager.setHAGroupStatusRecordToSync(haGroupName);
+ // Set the HA group status to sync (ACTIVE), we need to wait for ZK_SESSION_TIMEOUT * Multiplier
+ Thread.sleep((long) Math.ceil(config.getLong(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT)
+ * ZK_SESSION_TIMEOUT_MULTIPLIER));
+ haGroupStoreManager.setHAGroupStatusToSync(haGroupName);
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
- // Verify the status was updated to ACTIVE
+ // Verify the state was updated to ACTIVE_IN_SYNC
Optional updatedRecordOpt =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
assertTrue(updatedRecordOpt.isPresent());
HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
- assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE, updatedRecord.getClusterRole());
+ assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, updatedRecord.getHAGroupState());
+ assertNull(updatedRecord.getLastSyncStateTimeInMs());
}
@Test
@@ -356,4 +479,143 @@ public void testGetHAGroupNamesWhenNoMatchingZkUrl() throws Exception {
}
+ @Test
+ public void testSetReaderToDegraded() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config);
+
+ // Update the auto-created record to STANDBY state for testing
+ HAGroupStoreRecord standbyRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY);
+
+ // Get the record to initialize ZNode from HAGroup so that we can artificially update
+ // it via HAAdmin
+ Optional currentRecord =
+ haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(currentRecord.isPresent());
+
+ // Update via HAAdmin
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, standbyRecord, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Call setReaderToDegraded
+ haGroupStoreManager.setReaderToDegraded(haGroupName);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify the status was updated to DEGRADED_STANDBY_FOR_READER
+ Optional updatedRecordOpt =
+ haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(updatedRecordOpt.isPresent());
+ HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
+ assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER,
+ updatedRecord.getHAGroupState());
+
+ // Test transition from DEGRADED_STANDBY_FOR_WRITER to DEGRADED_STANDBY
+ HAGroupStoreRecord degradedWriterRecord = new HAGroupStoreRecord("1.0", haGroupName,
+ HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER);
+
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, degradedWriterRecord, 2);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Call setReaderToDegraded again
+ haGroupStoreManager.setReaderToDegraded(haGroupName);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify the status was updated to DEGRADED_STANDBY
+ updatedRecordOpt = haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(updatedRecordOpt.isPresent());
+ updatedRecord = updatedRecordOpt.get();
+ assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, updatedRecord.getHAGroupState());
+ }
+
+ @Test
+ public void testSetReaderToHealthy() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config);
+
+ // Get the record to initialize ZNode from HAGroup so that we can artificially
+ // update it via HAAdmin
+ Optional currentRecord =
+ haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(currentRecord.isPresent());
+
+ // Update the auto-created record to DEGRADED_STANDBY_FOR_READER state for testing
+ HAGroupStoreRecord degradedReaderRecord = new HAGroupStoreRecord("1.0", haGroupName,
+ HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER);
+
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, degradedReaderRecord, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Call setReaderToHealthy
+ haGroupStoreManager.setReaderToHealthy(haGroupName);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify the status was updated to STANDBY
+ Optional updatedRecordOpt =
+ haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(updatedRecordOpt.isPresent());
+ HAGroupStoreRecord updatedRecord = updatedRecordOpt.get();
+ assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY, updatedRecord.getHAGroupState());
+
+ // Test transition from DEGRADED_STANDBY to DEGRADED_STANDBY_FOR_WRITER
+ HAGroupStoreRecord degradedRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY);
+
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, degradedRecord, 2);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Call setReaderToHealthy again
+ haGroupStoreManager.setReaderToHealthy(haGroupName);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Verify the status was updated to DEGRADED_STANDBY_FOR_WRITER
+ updatedRecordOpt = haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(updatedRecordOpt.isPresent());
+ updatedRecord = updatedRecordOpt.get();
+ assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER,
+ updatedRecord.getHAGroupState());
+ }
+
+ @Test
+ public void testReaderStateTransitionInvalidStates() throws Exception {
+ String haGroupName = testName.getMethodName();
+ HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config);
+
+ // Get the record to initialize ZNode from HAGroup so that we can artificially
+ // update it via HAAdmin
+ Optional currentRecord =
+ haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
+ assertTrue(currentRecord.isPresent());
+
+ // Update the auto-created record to ACTIVE_IN_SYNC state (invalid for both operations)
+ HAGroupStoreRecord activeRecord =
+ new HAGroupStoreRecord("1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
+
+ haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, activeRecord, 0);
+ Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
+
+ // Test setReaderToDegraded with invalid state
+ try {
+ haGroupStoreManager.setReaderToDegraded(haGroupName);
+ fail("Expected InvalidClusterRoleTransitionException for setReaderToDegraded "
+ + "with ACTIVE_IN_SYNC state");
+ } catch (InvalidClusterRoleTransitionException e) {
+ // Expected behavior
+ assertTrue("Exception should mention the invalid transition",
+ e.getMessage().contains("ACTIVE_IN_SYNC")
+ && e.getMessage().contains("DEGRADED_STANDBY_FOR_READER"));
+ }
+
+ // Test setReaderToHealthy with invalid state
+ try {
+ haGroupStoreManager.setReaderToHealthy(haGroupName);
+ fail(
+ "Expected InvalidClusterRoleTransitionException for setReaderToHealthy with ACTIVE_IN_SYNC state");
+ } catch (InvalidClusterRoleTransitionException e) {
+ // Expected behavior
+ assertTrue("Exception should mention the invalid transition",
+ e.getMessage().contains("ACTIVE_IN_SYNC") && e.getMessage().contains("STANDBY"));
+ }
+ }
+
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java
index 79ae44203f8..14391e6d621 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java
@@ -17,7 +17,7 @@
*/
package org.apache.phoenix.jdbc;
-import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE;
+import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -73,9 +73,9 @@ public static synchronized void doSetup() throws Exception {
@Before
public void before() throws Exception {
haAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(),
- ZK_CONSISTENT_HA_NAMESPACE);
+ ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
peerHaAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(),
- ZK_CONSISTENT_HA_NAMESPACE);
+ ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE);
cleanupTestZnodes();
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java
index b25c6c77a05..0a042eff93a 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java
@@ -114,4 +114,20 @@ public static void deleteHAGroupRecordInSystemTable(String haGroupName, String z
conn.commit();
}
}
+
+ /**
+ * Deletes all HA group records from the system table for testing purposes.
+ * @param zkUrl the ZooKeeper URL to connect to
+ * @throws SQLException if the database operation fails
+ */
+ public static void deleteAllHAGroupRecordsInSystemTable(String zkUrl) throws SQLException {
+ // Delete all records from System Table
+ try (
+ PhoenixConnection conn = (PhoenixConnection) DriverManager
+ .getConnection(JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + zkUrl);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("DELETE FROM " + SYSTEM_HA_GROUP_NAME);
+ conn.commit();
+ }
+ }
}