Skip to content

Commit 0dde73c

Browse files
committed
ARTEMIS-4325 - Ability to failback after failover
1 parent dc4bf95 commit 0dde73c

File tree

7 files changed

+357
-0
lines changed

7 files changed

+357
-0
lines changed

artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class ServerLocatorConfig {
4545
public int reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS;
4646
public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
4747
public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS;
48+
public int failbackAttempts = ActiveMQClient.DEFAULT_FAILBACK_ATTEMPTS;
4849
public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
4950
public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
5051
public int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;
@@ -83,6 +84,7 @@ public ServerLocatorConfig(final ServerLocatorConfig locator) {
8384
reconnectAttempts = locator.reconnectAttempts;
8485
initialConnectAttempts = locator.initialConnectAttempts;
8586
failoverAttempts = locator.failoverAttempts;
87+
failbackAttempts = locator.failbackAttempts;
8688
initialMessagePacketSize = locator.initialMessagePacketSize;
8789
useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
8890
compressionLevel = locator.compressionLevel;

artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ public final class ActiveMQClient {
117117

118118
public static final int DEFAULT_FAILOVER_ATTEMPTS = 0;
119119

120+
public static final int DEFAULT_FAILBACK_ATTEMPTS = 0;
121+
120122
@Deprecated
121123
public static final boolean DEFAULT_FAILOVER_ON_INITIAL_CONNECTION = false;
122124

artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,21 @@ ClientSessionFactory createSessionFactory(TransportConfiguration transportConfig
668668
*/
669669
int getFailoverAttempts();
670670

671+
/**
672+
* Sets the maximum number of failback attempts to establish a new conection to the original broker after a failover.
673+
* <p>
674+
* Value must be -1 (to try infinitely), 0 (to never atempt failback) or greater than 0.
675+
*
676+
* @param attempts maximum number of failback attempts after a successful failover
677+
* @return this ServerLocator
678+
*/
679+
ServerLocator setFailbackAttempts(int attempts);
680+
681+
/**
682+
* @return the number of failback attempts after a successful failover
683+
*/
684+
int getFailbackAttempts();
685+
671686
/**
672687
* Returns true if the client will automatically attempt to connect to the backup server if the initial
673688
* connection to the live server fails

artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@
7777
import java.lang.invoke.MethodHandles;
7878
import java.util.function.BiPredicate;
7979

80+
import static org.apache.activemq.artemis.api.core.ActiveMQExceptionType.DISCONNECTED;
81+
8082
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {
8183

8284
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -93,6 +95,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
9395

9496
private volatile TransportConfiguration backupConnectorConfig;
9597

98+
private Pair<TransportConfiguration, TransportConfiguration> failbackConnectorPair;
99+
96100
private ConnectorFactory connectorFactory;
97101

98102
private final long callTimeout;
@@ -135,6 +139,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
135139

136140
private int failoverAttempts;
137141

142+
private int failbackAttempts;
143+
138144
private final Set<SessionFailureListener> listeners = new ConcurrentHashSet<>();
139145

140146
private final Set<FailoverEventListener> failoverListeners = new ConcurrentHashSet<>();
@@ -144,6 +150,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
144150
private Future<?> pingerFuture;
145151
private PingRunnable pingRunnable;
146152

153+
private FailbackCheck failbackChecker;
154+
147155
private final List<Interceptor> incomingInterceptors;
148156

149157
private final List<Interceptor> outgoingInterceptors;
@@ -244,6 +252,8 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
244252

245253
this.failoverAttempts = locatorConfig.failoverAttempts;
246254

255+
this.failbackAttempts = locatorConfig.failbackAttempts;
256+
247257
this.scheduledThreadPool = scheduledThreadPool;
248258

249259
this.threadPool = threadPool;
@@ -722,6 +732,12 @@ private void failoverOrReconnect(final Object connectionID,
722732
int connectorsCount = 0;
723733
int failoverRetries = 0;
724734
long failoverRetryInterval = retryInterval;
735+
736+
//Save current connector config for failback purposes
737+
if (failbackConnectorPair == null) {
738+
failbackConnectorPair = new Pair<>(connectorConfig, backupConnectorConfig);
739+
}
740+
725741
Pair<TransportConfiguration, TransportConfiguration> connectorPair;
726742
BiPredicate<Boolean, Integer> failoverRetryPredicate =
727743
(reconnected, retries) -> clientProtocolManager.isAlive() &&
@@ -752,6 +768,9 @@ private void failoverOrReconnect(final Object connectionID,
752768
oldConnection = connection;
753769
connection = null;
754770
}
771+
if (failbackAttempts != 0 && !connection.getTransportConnection().getConnectorConfig().equals(failbackConnectorPair.getA())) {
772+
runFailBackCheck();
773+
}
755774
}
756775

757776
if (connectorsCount >= serverLocator.getConnectorsSize()) {
@@ -815,6 +834,130 @@ private void failoverOrReconnect(final Object connectionID,
815834
}
816835
}
817836

837+
private void failback(final ActiveMQException me,
838+
final Pair<TransportConfiguration, TransportConfiguration> connectorPair) {
839+
840+
logger.debug("Original node has come back online, performing failback now");
841+
842+
for (ClientSessionInternal session : sessions) {
843+
SessionContext context = session.getSessionContext();
844+
if (context instanceof ActiveMQSessionContext) {
845+
ActiveMQSessionContext sessionContext = (ActiveMQSessionContext) context;
846+
if (sessionContext.isKilled()) {
847+
setReconnectAttempts(0);
848+
}
849+
}
850+
}
851+
852+
Set<ClientSessionInternal> sessionsToClose = null;
853+
if (!clientProtocolManager.isAlive()) {
854+
return;
855+
}
856+
857+
Lock localFailoverLock = lockFailover();
858+
859+
try {
860+
861+
callFailoverListeners(FailoverEventType.FAILURE_DETECTED);
862+
callSessionFailureListeners(me, false, false, null);
863+
864+
if (clientProtocolManager.cleanupBeforeFailover(me)) {
865+
866+
RemotingConnection oldConnection = connection;
867+
868+
connection = null;
869+
870+
Connector localConnector = connector;
871+
if (localConnector != null) {
872+
try {
873+
localConnector.close();
874+
} catch (Exception ignore) {
875+
// no-op
876+
}
877+
}
878+
879+
cancelScheduledTasks();
880+
881+
connector = null;
882+
883+
HashSet<ClientSessionInternal> sessionsToFailover;
884+
synchronized (sessions) {
885+
sessionsToFailover = new HashSet<>(sessions);
886+
}
887+
888+
// Notify sessions before failover.
889+
for (ClientSessionInternal session : sessionsToFailover) {
890+
session.preHandleFailover(connection);
891+
}
892+
893+
boolean sessionsReconnected = false;
894+
895+
connectorConfig = connectorPair.getA();
896+
currentConnectorConfig = connectorPair.getA();
897+
if (connectorPair.getB() != null) {
898+
backupConnectorConfig = connectorPair.getB();
899+
}
900+
901+
getConnection();
902+
903+
if (connection != null) {
904+
sessionsReconnected = reconnectSessions(sessionsToFailover, oldConnection, me);
905+
906+
if (!sessionsReconnected) {
907+
if (oldConnection != null) {
908+
oldConnection.destroy();
909+
}
910+
911+
oldConnection = connection;
912+
connection = null;
913+
}
914+
}
915+
916+
// Notify sessions after failover.
917+
for (ClientSessionInternal session : sessionsToFailover) {
918+
session.postHandleFailover(connection, sessionsReconnected);
919+
}
920+
921+
if (oldConnection != null) {
922+
oldConnection.destroy();
923+
}
924+
925+
if (connection != null) {
926+
callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED);
927+
}
928+
}
929+
930+
if (connection == null) {
931+
synchronized (sessions) {
932+
sessionsToClose = new HashSet<>(sessions);
933+
}
934+
callFailoverListeners(FailoverEventType.FAILOVER_FAILED);
935+
callSessionFailureListeners(me, true, false, null);
936+
}
937+
} finally {
938+
localFailoverLock.unlock();
939+
}
940+
941+
// This needs to be outside the failover lock to prevent deadlock
942+
if (connection != null) {
943+
callSessionFailureListeners(me, true, true);
944+
}
945+
946+
if (sessionsToClose != null) {
947+
// If connection is null it means we didn't succeed in failing over or reconnecting
948+
// so we close all the sessions, so they will throw exceptions when attempted to be used
949+
950+
for (ClientSessionInternal session : sessionsToClose) {
951+
try {
952+
session.cleanUp(true);
953+
} catch (Exception cause) {
954+
ActiveMQClientLogger.LOGGER.failedToCleanupSession(cause);
955+
}
956+
}
957+
}
958+
959+
}
960+
818961
private ClientSession createSessionInternal(final String rawUsername,
819962
final String rawPassword,
820963
final boolean xa,
@@ -995,6 +1138,10 @@ private int getConnectionWithRetry(final int reconnectAttempts, RemotingConnecti
9951138
return count;
9961139
}
9971140

1141+
private long getRetryInterval() {
1142+
return retryInterval;
1143+
}
1144+
9981145
private long getNextRetryInterval(long retryInterval) {
9991146
// Exponential back-off
10001147
long nextRetryInterval = (long) (retryInterval * retryIntervalMultiplier);
@@ -1492,6 +1639,59 @@ public synchronized void cancel() {
14921639
}
14931640
}
14941641

1642+
private void runFailBackCheck() {
1643+
if (failbackChecker == null) {
1644+
failbackChecker = new FailbackCheck();
1645+
}
1646+
threadPool.execute(failbackChecker);
1647+
}
1648+
1649+
private class FailbackCheck implements Runnable {
1650+
private boolean first = true;
1651+
1652+
@Override
1653+
public synchronized void run() {
1654+
1655+
if (!first) {
1656+
return;
1657+
}
1658+
1659+
first = false;
1660+
1661+
logger.debug("failbackChecker is running, trying to reach {} for failback", failbackConnectorPair.getA().toString());
1662+
1663+
boolean run = true;
1664+
int attempts = 0;
1665+
long failbackRetryInterval = getRetryInterval();
1666+
1667+
ConnectorFactory transportConnectorFactory;
1668+
Connector transportConnector;
1669+
Connection transportConnection;
1670+
1671+
while (failbackAttempts == -1 || attempts++ < failbackAttempts) {
1672+
1673+
waitForRetry(failbackRetryInterval);
1674+
failbackRetryInterval = getNextRetryInterval(failbackRetryInterval);
1675+
1676+
transportConnectorFactory = instantiateConnectorFactory(failbackConnectorPair.getA().getFactoryClassName());
1677+
transportConnector = createConnector(transportConnectorFactory, failbackConnectorPair.getA());
1678+
transportConnection = openTransportConnection(transportConnector);
1679+
1680+
if (transportConnection != null) {
1681+
transportConnector.close();
1682+
transportConnection.close();
1683+
ActiveMQException exception = new ActiveMQException("Failing back to original broker: " + failbackConnectorPair.getA().toString(), DISCONNECTED);
1684+
failback(exception, failbackConnectorPair);
1685+
break;
1686+
}
1687+
1688+
}
1689+
1690+
first = true;
1691+
}
1692+
1693+
}
1694+
14951695
protected RemotingConnection establishNewConnection() {
14961696
Connection transportConnection = createTransportConnection();
14971697

artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,6 +1199,18 @@ public int getFailoverAttempts() {
11991199
return config.failoverAttempts;
12001200
}
12011201

1202+
@Override
1203+
public ServerLocatorImpl setFailbackAttempts(int attempts) {
1204+
checkWrite();
1205+
this.config.failbackAttempts = attempts;
1206+
return this;
1207+
}
1208+
1209+
@Override
1210+
public int getFailbackAttempts() {
1211+
return config.failbackAttempts;
1212+
}
1213+
12021214
@Deprecated
12031215
@Override
12041216
public boolean isFailoverOnInitialConnection() {

docs/user-manual/en/client-failover.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,20 @@ to other live servers only after all attempts to
7878
[reconnect to the same server](#reconnect-to-the-same-server) or
7979
[reconnect to the backup server](#reconnect-to-the-backup-server) fail.
8080

81+
## Failing back after a successful failover
82+
It is also possile to have the client keep trying to reconnect to the original
83+
live server after a successful failover. In this case the failover connection
84+
will get disconnected and moved back to the original server if it comes
85+
back online at some later time.
86+
87+
This is controlled by the property `failbackAttempts`
88+
89+
If set to a non-zero value then following a failover the client will try to reach
90+
the original server continiously according to the configured `retryInterval`
91+
until it reaches the number of configured `failbackAttempts`.
92+
93+
Setting this parameter to `-1` means try forever. Default value is `0`
94+
8195
## Session reconnection
8296

8397
When clients [reconnect to the same server](#reconnect-to-the-same-server)

0 commit comments

Comments
 (0)