Skip to content

Commit 571b373

Browse files
authored
imporve connection manager (x-infra-lab#17)
* improve connection pool * improve connection manager * update reconnect
1 parent 0dc28d4 commit 571b373

20 files changed

+331
-422
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>io.github.x-infra-lab</groupId>
88
<artifactId>x-remoting</artifactId>
9-
<version>0.0.1</version>
9+
<version>0.0.2-RC1</version>
1010
<packaging>jar</packaging>
1111

1212
<name>x-remoting</name>

src/main/java/io/github/xinfra/lab/remoting/connection/AbstractConnectionManager.java

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -29,27 +29,13 @@ public AbstractConnectionManager(ConnectionManagerConfig config) {
2929
this.config = config;
3030
}
3131

32-
@Override
33-
public synchronized Connection getOrCreateIfAbsent(SocketAddress socketAddress) throws RemotingException {
34-
ensureStarted();
35-
Validate.notNull(socketAddress, "socketAddress can not be null");
36-
37-
ConnectionHolder connectionHolder = connections.get(socketAddress);
38-
if (connectionHolder == null) {
39-
connectionHolder = createConnectionHolder(socketAddress);
40-
createConnectionForHolder(socketAddress, connectionHolder, config.getConnectionNumPreEndpoint());
41-
}
42-
43-
return connectionHolder.get();
44-
}
45-
4632
@Override
4733
public void check(Connection connection) throws RemotingException {
4834
ensureStarted();
4935
Validate.notNull(connection, "connection can not be null");
5036

5137
if (connection.getChannel() == null || !connection.getChannel().isActive()) {
52-
this.removeAndClose(connection);
38+
this.close(connection);
5339
throw new RemotingException("Check connection failed for address: " + connection.remoteAddress());
5440
}
5541
if (!connection.getChannel().isWritable()) {
@@ -60,7 +46,7 @@ public void check(Connection connection) throws RemotingException {
6046
}
6147

6248
@Override
63-
public synchronized void removeAndClose(Connection connection) {
49+
public synchronized void close(Connection connection) {
6450
ensureStarted();
6551
Validate.notNull(connection, "connection can not be null");
6652

@@ -70,25 +56,13 @@ public synchronized void removeAndClose(Connection connection) {
7056
connection.close();
7157
}
7258
else {
73-
connectionHolder.removeAndClose(connection);
59+
connectionHolder.invalidate(connection);
7460
if (connectionHolder.isEmpty()) {
7561
connections.remove(socketAddress);
7662
}
7763
}
7864
}
7965

80-
@Override
81-
public Connection get(SocketAddress socketAddress) {
82-
ensureStarted();
83-
Validate.notNull(socketAddress, "socketAddress can not be null");
84-
85-
ConnectionHolder connectionHolder = connections.get(socketAddress);
86-
if (connectionHolder == null) {
87-
return null;
88-
}
89-
return connectionHolder.get();
90-
}
91-
9266
@Override
9367
public synchronized void add(Connection connection) {
9468
ensureStarted();
@@ -111,7 +85,7 @@ public synchronized void shutdown() {
11185
for (Map.Entry<SocketAddress, ConnectionHolder> entry : connections.entrySet()) {
11286
SocketAddress socketAddress = entry.getKey();
11387
ConnectionHolder connectionHolder = entry.getValue();
114-
connectionHolder.removeAndCloseAll();
88+
connectionHolder.close();
11589
connections.remove(socketAddress);
11690
}
11791

@@ -125,10 +99,10 @@ protected ConnectionHolder createConnectionHolder(SocketAddress socketAddress) {
12599

126100
protected void createConnectionForHolder(SocketAddress socketAddress, ConnectionHolder connectionHolder, int size)
127101
throws RemotingException {
128-
for (int i = 0; i < size; i++) {
102+
for (int i = connectionHolder.size(); i < size; i++) {
129103
Connection connection = connectionFactory.create(socketAddress);
130104
connectionHolder.add(connection);
131105
}
132106
}
133107

134-
}
108+
}

src/main/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManager.java

Lines changed: 29 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,22 @@
11
package io.github.xinfra.lab.remoting.connection;
22

3-
import io.github.xinfra.lab.remoting.common.NamedThreadFactory;
3+
import io.github.xinfra.lab.remoting.annotation.AccessForTest;
44
import io.github.xinfra.lab.remoting.exception.RemotingException;
55
import io.github.xinfra.lab.remoting.protocol.Protocol;
66
import io.netty.channel.ChannelHandler;
77
import lombok.extern.slf4j.Slf4j;
8+
import org.apache.commons.lang3.Validate;
89

910
import java.net.SocketAddress;
1011
import java.util.ArrayList;
1112
import java.util.List;
12-
import java.util.Set;
13-
import java.util.concurrent.ArrayBlockingQueue;
14-
import java.util.concurrent.Callable;
15-
import java.util.concurrent.CompletableFuture;
16-
import java.util.concurrent.CopyOnWriteArraySet;
17-
import java.util.concurrent.ExecutorService;
18-
import java.util.concurrent.Future;
19-
import java.util.concurrent.ThreadPoolExecutor;
20-
import java.util.concurrent.TimeUnit;
2113
import java.util.function.Supplier;
2214

2315
@Slf4j
2416
public class ClientConnectionManager extends AbstractConnectionManager {
2517

26-
private ExecutorService reconnector = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
27-
new ArrayBlockingQueue<>(1024), new NamedThreadFactory("Reconnector-Worker"));
28-
29-
private Set<SocketAddress> disableReconnectSocketAddresses = new CopyOnWriteArraySet<>();
18+
@AccessForTest
19+
protected Reconnector reconnector;
3020

3121
public ClientConnectionManager(Protocol protocol) {
3222
this.connectionFactory = new DefaultConnectionFactory(protocol, defaultChannelSuppliers());
@@ -64,78 +54,51 @@ private List<Supplier<ChannelHandler>> defaultChannelSuppliers() {
6454
}
6555

6656
@Override
67-
public synchronized void shutdown() {
68-
for (SocketAddress socketAddress : connections.keySet()) {
69-
disableReconnect(socketAddress);
57+
public Connection connect(SocketAddress socketAddress) throws RemotingException {
58+
ConnectionHolder connectionHolder = connections.get(socketAddress);
59+
if (connectionHolder == null) {
60+
connectionHolder = createConnectionHolder(socketAddress);
7061
}
71-
super.shutdown();
62+
createConnectionForHolder(socketAddress, connectionHolder, config.getConnectionNumPreEndpoint());
7263

73-
reconnector.shutdownNow();
64+
return connectionHolder.get();
7465
}
7566

7667
@Override
77-
public synchronized void reconnect(SocketAddress socketAddress) throws RemotingException {
68+
public synchronized Connection get(SocketAddress socketAddress) throws RemotingException {
7869
ensureStarted();
79-
if (disableReconnectSocketAddresses.contains(socketAddress)) {
80-
log.warn("socketAddress:{} is disable to reconnect", socketAddress);
81-
throw new RemotingException("socketAddress is disable to reconnect:" + socketAddress);
82-
}
70+
Validate.notNull(socketAddress, "socketAddress can not be null");
71+
8372
ConnectionHolder connectionHolder = connections.get(socketAddress);
8473
if (connectionHolder == null) {
85-
connectionHolder = createConnectionHolder(socketAddress);
86-
createConnectionForHolder(socketAddress, connectionHolder, config.getConnectionNumPreEndpoint());
87-
}
88-
else {
89-
int needCreateNum = config.getConnectionNumPreEndpoint() - connectionHolder.size();
90-
if (needCreateNum > 0) {
91-
createConnectionForHolder(socketAddress, connectionHolder, needCreateNum);
92-
}
74+
return connect(socketAddress);
9375
}
76+
77+
return connectionHolder.get();
9478
}
9579

9680
@Override
97-
public synchronized void disableReconnect(SocketAddress socketAddress) {
98-
ensureStarted();
99-
disableReconnectSocketAddresses.add(socketAddress);
81+
public Reconnector reconnector() {
82+
return reconnector;
10083
}
10184

10285
@Override
103-
public synchronized void enableReconnect(SocketAddress socketAddress) {
104-
ensureStarted();
105-
disableReconnectSocketAddresses.remove(socketAddress);
86+
public void startup() {
87+
super.startup();
88+
reconnector = new DefaultReconnector(this);
89+
reconnector.startup();
10690
}
10791

10892
@Override
109-
public synchronized Future<Void> asyncReconnect(SocketAddress socketAddress) {
110-
ensureStarted();
111-
if (disableReconnectSocketAddresses.contains(socketAddress)) {
112-
log.warn("socketAddress:{} is disable to asyncReconnect", socketAddress);
113-
CompletableFuture<Void> future = new CompletableFuture<>();
114-
future.completeExceptionally(
115-
new RemotingException("socketAddress is disable to asyncReconnect:" + socketAddress));
116-
return future;
117-
}
118-
119-
Callable<Void> callable = new Callable<Void>() {
120-
@Override
121-
public Void call() throws Exception {
122-
try {
123-
reconnect(socketAddress);
124-
}
125-
catch (Exception e) {
126-
log.warn("reconnect socketAddress:{} fail", socketAddress, e);
127-
throw e;
128-
}
129-
return null;
93+
public synchronized void shutdown() {
94+
if (reconnector() != null) {
95+
for (SocketAddress socketAddress : connections.keySet()) {
96+
reconnector().disableReconnect(socketAddress);
13097
}
131-
};
13298

133-
try {
134-
return reconnector.submit(callable);
135-
}
136-
catch (Throwable t) {
137-
log.warn("asyncReconnect submit failed.", t);
138-
throw t;
99+
super.shutdown();
100+
101+
reconnector().shutdown();
139102
}
140103
}
141104

src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionEventHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
4545
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
4646
Connection connection = ctx.channel().attr(CONNECTION).get();
4747
if (connectionManager != null && connectionManager.isStarted()) {
48-
connectionManager.removeAndClose(connection);
48+
connectionManager.close(connection);
4949
}
5050
userEventTriggered(ctx, ConnectionEvent.CLOSE);
5151

@@ -58,8 +58,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
5858
Connection connection = ctx.channel().attr(CONNECTION).get();
5959
ConnectionEvent connectionEvent = (ConnectionEvent) evt;
6060
if (connectionEvent == ConnectionEvent.CLOSE) {
61-
if (connectionManager != null && connectionManager.isStarted()) {
62-
connectionManager.asyncReconnect(connection.remoteAddress());
61+
if (connectionManager != null && connectionManager.isStarted()
62+
&& connectionManager.reconnector() != null) {
63+
connectionManager.reconnector().reconnect(connection.remoteAddress());
6364
}
6465
}
6566
}

src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionHolder.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22

33
import io.github.xinfra.lab.remoting.annotation.AccessForTest;
44

5+
import java.io.Closeable;
6+
import java.util.ArrayList;
7+
import java.util.List;
58
import java.util.concurrent.CopyOnWriteArrayList;
69

7-
public class ConnectionHolder {
10+
public class ConnectionHolder implements Closeable {
811

912
@AccessForTest
1013
protected CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<>();
@@ -16,14 +19,18 @@ public ConnectionHolder(ConnectionSelectStrategy connectionSelectStrategy) {
1619
}
1720

1821
public Connection get() {
19-
return connectionSelectStrategy.select(connections);
22+
List<Connection> snapshot = new ArrayList<>(connections);
23+
if (snapshot.size() > 0) {
24+
return connectionSelectStrategy.select(new ArrayList<>(connections));
25+
}
26+
return null;
2027
}
2128

2229
public void add(Connection connection) {
23-
connections.add(connection);
30+
connections.addIfAbsent(connection);
2431
}
2532

26-
public void removeAndClose(Connection connection) {
33+
public void invalidate(Connection connection) {
2734
connections.remove(connection);
2835
connection.close();
2936
}
@@ -32,15 +39,16 @@ public boolean isEmpty() {
3239
return connections.isEmpty();
3340
}
3441

35-
public void removeAndCloseAll() {
42+
public int size() {
43+
return connections.size();
44+
}
45+
46+
@Override
47+
public void close() {
3648
for (Connection connection : connections) {
3749
connections.remove(connection);
3850
connection.close();
3951
}
4052
}
4153

42-
public int size() {
43-
return connections.size();
44-
}
45-
4654
}

src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionManager.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,21 @@
22

33
import io.github.xinfra.lab.remoting.common.LifeCycle;
44
import io.github.xinfra.lab.remoting.exception.RemotingException;
5-
import io.github.xinfra.lab.remoting.protocol.Protocol;
65

76
import java.net.SocketAddress;
8-
import java.util.concurrent.Future;
97

108
public interface ConnectionManager extends LifeCycle {
119

12-
Connection getOrCreateIfAbsent(SocketAddress socketAddress) throws RemotingException;
10+
Connection connect(SocketAddress socketAddress) throws RemotingException;
1311

14-
Connection get(SocketAddress socketAddress);
12+
Connection get(SocketAddress socketAddress) throws RemotingException;
1513

1614
void check(Connection connection) throws RemotingException;
1715

18-
void removeAndClose(Connection connection);
16+
void close(Connection connection);
1917

2018
void add(Connection connection);
2119

22-
void reconnect(SocketAddress socketAddress) throws RemotingException;
23-
24-
void disableReconnect(SocketAddress socketAddress);
25-
26-
void enableReconnect(SocketAddress socketAddress);
27-
28-
Future<Void> asyncReconnect(SocketAddress socketAddress);
20+
Reconnector reconnector();
2921

3022
}

src/main/java/io/github/xinfra/lab/remoting/connection/DefaultConnectionFactory.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public class DefaultConnectionFactory implements ConnectionFactory {
4848
/**
4949
* Q: why use Supplier to get ChannelHandler? A: some ChannelHandler is
5050
* not @ChannelHandler.Sharable. need create instance every time
51-
* @param channelHandlerSuppliers
5251
*/
5352
public DefaultConnectionFactory(Protocol protocol, List<Supplier<ChannelHandler>> channelHandlerSuppliers) {
5453
this(protocol, channelHandlerSuppliers, new ConnectionConfig());
@@ -57,8 +56,6 @@ public DefaultConnectionFactory(Protocol protocol, List<Supplier<ChannelHandler>
5756
/**
5857
* Q: why use Supplier to get ChannelHandler? A: some ChannelHandler is
5958
* not @ChannelHandler.Sharable. need create instance every time
60-
* @param channelHandlerSuppliers
61-
* @param connectionConfig
6259
*/
6360
public DefaultConnectionFactory(Protocol protocol, List<Supplier<ChannelHandler>> channelHandlerSuppliers,
6461
ConnectionConfig connectionConfig) {

0 commit comments

Comments
 (0)