Skip to content

Commit

Permalink
imporve ConnecionManager (x-infra-lab#21)
Browse files Browse the repository at this point in the history
* fix reconnetor current problem

* "update connection manager"

* close eventLoopGroup

* fix dead lock
  • Loading branch information
JoeCqupt authored Nov 8, 2024
1 parent 2384fc9 commit 9f67f95
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public abstract class AbstractConnectionManager extends AbstractLifeCycle implem

protected ConnectionManagerConfig config = new ConnectionManagerConfig();

private ConnectionEventProcessor connectionEventProcessor;
private ConnectionEventProcessor connectionEventProcessor = new DefaultConnectionEventProcessor();

public AbstractConnectionManager() {
}
Expand All @@ -31,12 +31,28 @@ public AbstractConnectionManager(ConnectionManagerConfig config) {
this.config = config;
}

@Override
public synchronized void disconnect(SocketAddress socketAddress) {
ensureStarted();
Validate.notNull(socketAddress, "socket address can not be null");
if (reconnector() != null) {
reconnector().disconnect(socketAddress);
}

ConnectionHolder connectionHolder = connections.get(socketAddress);
if (connectionHolder != null) {
connectionHolder.close();
connections.remove(socketAddress);
}
log.info("Disconnect connection for address: {}", socketAddress);
}

@Override
public void check(Connection connection) throws RemotingException {
ensureStarted();
Validate.notNull(connection, "connection can not be null");

if (connection.getChannel() == null || !connection.getChannel().isActive()) {
if (connection.getChannel() == null || !connection.getChannel().isActive() || connection.isClosed()) {
this.close(connection);
throw new RemotingException("Check connection failed for address: " + connection.remoteAddress());
}
Expand All @@ -58,7 +74,11 @@ public synchronized void close(Connection connection) {
connection.close();
}
else {
connectionHolder.invalidate(connection);
if (connectionHolder.invalidate(connection)) {
if (reconnector() != null) {
reconnector().reconnect(socketAddress);
}
}
if (connectionHolder.isEmpty()) {
connections.remove(socketAddress);
}
Expand Down Expand Up @@ -89,21 +109,15 @@ public ConnectionEventProcessor connectionEventProcessor() {
@Override
public void startup() {
super.startup();

connectionEventProcessor = new DefaultConnectionEventProcessor();
connectionEventProcessor.startup();
}

@Override
public synchronized void shutdown() {
super.shutdown();
for (Map.Entry<SocketAddress, ConnectionHolder> entry : connections.entrySet()) {
SocketAddress socketAddress = entry.getKey();
ConnectionHolder connectionHolder = entry.getValue();
connectionHolder.close();
connections.remove(socketAddress);
disconnect(entry.getKey());
}

super.shutdown();
connectionEventProcessor.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -16,7 +17,7 @@
public class ClientConnectionManager extends AbstractConnectionManager {

@AccessForTest
protected Reconnector reconnector;
protected Reconnector reconnector = new DefaultReconnector(this);

public ClientConnectionManager(Protocol protocol) {
this.connectionFactory = new DefaultConnectionFactory(protocol, defaultChannelSuppliers());
Expand Down Expand Up @@ -54,7 +55,8 @@ private List<Supplier<ChannelHandler>> defaultChannelSuppliers() {
}

@Override
public Connection connect(SocketAddress socketAddress) throws RemotingException {
public synchronized Connection connect(SocketAddress socketAddress) throws RemotingException {
ensureStarted();
ConnectionHolder connectionHolder = connections.get(socketAddress);
if (connectionHolder == null) {
connectionHolder = createConnectionHolder(socketAddress);
Expand All @@ -65,7 +67,7 @@ public Connection connect(SocketAddress socketAddress) throws RemotingException
}

@Override
public synchronized Connection get(SocketAddress socketAddress) throws RemotingException {
public Connection get(SocketAddress socketAddress) throws RemotingException {
ensureStarted();
Validate.notNull(socketAddress, "socketAddress can not be null");

Expand All @@ -85,22 +87,19 @@ public Reconnector reconnector() {
@Override
public void startup() {
super.startup();

reconnector = new DefaultReconnector(this);
reconnector.startup();
}

@Override
public synchronized void shutdown() {
if (reconnector != null) {
for (SocketAddress socketAddress : connections.keySet()) {
reconnector.disableReconnect(socketAddress);
}

super.shutdown();

reconnector.shutdown();
super.shutdown();
try {
connectionFactory.close();
}
catch (IOException e) {
log.warn("connectionFactory close ex", e);
}
reconnector.shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public SocketAddress remoteAddress() {

public ChannelFuture close() {
if (closed.compareAndSet(false, true)) {
onClose();
return channel.close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Expand All @@ -76,6 +77,10 @@ public void operationComplete(ChannelFuture future) throws Exception {
return channel.newSucceededFuture();
}

public boolean isClosed() {
return closed.get();
}

public void onClose() {
for (int requestId : invokeMap.keySet()) {
InvokeFuture<?> invokeFuture = removeInvokeFuture(requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@ public class ConnectionEventHandler extends ChannelDuplexHandler {

private ConnectionManager connectionManager;

private ConnectionEventProcessor connectionEventProcessor;

public ConnectionEventHandler(ConnectionManager connectionManager) {
Validate.notNull(connectionManager, "connectionManager can not be null.");
Validate.notNull(connectionManager.connectionEventProcessor(), "connectionEventProcessor can not be null.");
this.connectionManager = connectionManager;
this.connectionEventProcessor = connectionManager.connectionEventProcessor();
}

public ConnectionEventHandler() {
// server side do not manage connections
public ConnectionEventHandler(ConnectionEventProcessor connectionEventProcessor) {
this.connectionEventProcessor = connectionEventProcessor;
}

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
Connection connection = ctx.channel().attr(CONNECTION).get();
connection.onClose();
super.close(ctx, promise);
}

Expand Down Expand Up @@ -58,17 +60,8 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ConnectionEvent) {
Connection connection = ctx.channel().attr(CONNECTION).get();
ConnectionEvent connectionEvent = (ConnectionEvent) evt;
if (connectionEvent == ConnectionEvent.CLOSE) {
if (connectionManager != null && connectionManager.isStarted()
&& connectionManager.reconnector() != null) {
connectionManager.reconnector().reconnect(connection.remoteAddress());
}
}

if (connectionManager != null && connectionManager.isStarted()
&& connectionManager.connectionEventProcessor() != null) {
connectionManager.connectionEventProcessor().handleEvent((ConnectionEvent) evt, connection);
if (connectionEventProcessor.isStarted()) {
connectionEventProcessor.handleEvent((ConnectionEvent) evt, connection);
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import io.github.xinfra.lab.remoting.exception.RemotingException;

import java.io.Closeable;
import java.net.SocketAddress;

public interface ConnectionFactory {
public interface ConnectionFactory extends Closeable {

Connection create(SocketAddress socketAddress) throws RemotingException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public void add(Connection connection) {
connections.addIfAbsent(connection);
}

public void invalidate(Connection connection) {
connections.remove(connection);
public boolean invalidate(Connection connection) {
connection.close();
return connections.remove(connection);
}

public boolean isEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public interface ConnectionManager extends LifeCycle {

Connection connect(SocketAddress socketAddress) throws RemotingException;

void disconnect(SocketAddress socketAddress);

Connection get(SocketAddress socketAddress) throws RemotingException;

void check(Connection connection) throws RemotingException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,7 +37,7 @@ public class DefaultConnectionFactory implements ConnectionFactory {
private ConnectionConfig connectionConfig;

// todo EpollUtils
private static final EventLoopGroup workerGroup = Epoll.isAvailable()
private final EventLoopGroup workerGroup = Epoll.isAvailable()
? new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors(),
new NamedThreadFactory("Remoting-Client-Worker"))
: new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(),
Expand Down Expand Up @@ -115,4 +116,9 @@ public Connection create(SocketAddress socketAddress) throws RemotingException {
return new Connection(protocol, channel);
}

@Override
public void close() throws IOException {
workerGroup.shutdownGracefully();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@
import io.github.xinfra.lab.remoting.annotation.AccessForTest;
import io.github.xinfra.lab.remoting.common.AbstractLifeCycle;
import io.github.xinfra.lab.remoting.common.NamedThreadFactory;
import io.github.xinfra.lab.remoting.exception.RemotingException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;

import java.net.SocketAddress;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

@Slf4j
public class DefaultReconnector extends AbstractLifeCycle implements Reconnector {

private Set<SocketAddress> disabledAddresses = new CopyOnWriteArraySet<>();
private static final long DEFAULT_RECONNECT_INTERVAL = 1000L;

private CopyOnWriteArraySet<SocketAddress> disabledAddresses = new CopyOnWriteArraySet<>();

@AccessForTest
protected LinkedBlockingQueue<SocketAddress> reconnectAddressQueue = new LinkedBlockingQueue<>();
protected CopyOnWriteArrayList<SocketAddress> reconnectAddresses = new CopyOnWriteArrayList<>();

@AccessForTest
protected ConnectionManager connectionManager;
Expand All @@ -40,26 +40,37 @@ public void startup() {
public void shutdown() {
super.shutdown();
reconeectThread.interrupt();
reconnectAddresses.clear();
disabledAddresses.clear();
reconnectAddressQueue.clear();
}

@Override
public synchronized void reconnect(SocketAddress socketAddress) {
public void reconnect(SocketAddress socketAddress) {
ensureStarted();
Validate.notNull(socketAddress, "socketAddress must not be null");
synchronized (connectionManager) {
reconnectAddresses.addIfAbsent(socketAddress);
}
}

@Override
public void disconnect(SocketAddress socketAddress) {
ensureStarted();
Validate.notNull(socketAddress, "socketAddress must not be null");
reconnectAddressQueue.add(socketAddress);
synchronized (connectionManager) {
reconnectAddresses.remove(socketAddress);
}
}

@Override
public synchronized void disableReconnect(SocketAddress socketAddress) {
public void disableReconnect(SocketAddress socketAddress) {
ensureStarted();
Validate.notNull(socketAddress, "socketAddress must not be null");
disabledAddresses.add(socketAddress);
}

@Override
public synchronized void enableReconnect(SocketAddress socketAddress) {
public void enableReconnect(SocketAddress socketAddress) {
ensureStarted();
Validate.notNull(socketAddress, "socketAddress must not be null");
disabledAddresses.remove(socketAddress);
Expand All @@ -70,29 +81,26 @@ class ReconnectTask implements Runnable {
@Override
public void run() {
while (isStarted()) {
SocketAddress socketAddress = null;
try {
socketAddress = reconnectAddressQueue.take();
}
catch (InterruptedException e) {
continue;
}

if (disabledAddresses.contains(socketAddress)) {
log.warn("reconnect to {} has been disabled", socketAddress);
}
else {
try {
connectionManager.connect(socketAddress);
}
catch (Throwable e) {
log.warn("reconnect {} fail.", socketAddress);
reconnectAddressQueue.add(socketAddress);
synchronized (connectionManager) {
if (!reconnectAddresses.isEmpty()) {
SocketAddress socketAddress = reconnectAddresses.remove(0);
if (disabledAddresses.contains(socketAddress)) {
log.warn("reconnect to {} has been disabled", socketAddress);
}
else {
try {
connectionManager.connect(socketAddress);
}
catch (Throwable e) {
log.warn("reconnect {} fail.", socketAddress, e);
reconnectAddresses.addIfAbsent(socketAddress);
}
}
}
}

try {
TimeUnit.SECONDS.sleep(1);
TimeUnit.MILLISECONDS.sleep(DEFAULT_RECONNECT_INTERVAL);
}
catch (InterruptedException e) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ public interface Reconnector extends LifeCycle {

void reconnect(SocketAddress socketAddress);

void disconnect(SocketAddress socketAddress);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public class RpcServer extends BaseRemotingServer {
@Getter
private RpcProtocol protocol;

@Getter
private RpcRemoting rpcServerRemoting;

public RpcServer() {
Expand Down
Loading

0 comments on commit 9f67f95

Please sign in to comment.