Skip to content

Commit

Permalink
Merge pull request #5944 from 317787106/feature/test_isolated3
Browse files Browse the repository at this point in the history
feat(net): add some log for isolated2 disconnection
  • Loading branch information
lvs007 authored Aug 14, 2024
2 parents 21f0cb5 + 14a0750 commit 424a3dd
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 58 deletions.
3 changes: 3 additions & 0 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,9 @@ public static void setParam(final String[] args, final String confFileName) {

PARAMETER.inactiveThreshold = config.hasPath(Constant.NODE_INACTIVE_THRESHOLD)
? config.getInt(Constant.NODE_INACTIVE_THRESHOLD) : 600;
if (PARAMETER.inactiveThreshold < 1) {
PARAMETER.inactiveThreshold = 1;
}

PARAMETER.maxTransactionPendingSize = config.hasPath(Constant.NODE_MAX_TRANSACTION_PENDING_SIZE)
? config.getInt(Constant.NODE_MAX_TRANSACTION_PENDING_SIZE) : 2000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public String log() {
+ "syncBlockRequestedSize:%d\n"
+ "remainNum:%d\n"
+ "syncChainRequested:%d\n"
+ "inactiveSeconds:%d\n"
+ "blockInProcess:%d\n",
channel.getInetSocketAddress(),
(now - channel.getStartTime()) / Constant.ONE_THOUSAND,
Expand All @@ -244,6 +245,7 @@ public String log() {
remainNum,
requested == null ? 0 : (now - requested.getValue())
/ Constant.ONE_THOUSAND,
(now - lastActiveTime) / Constant.ONE_THOUSAND,
syncBlockInProcess.size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class ResilienceService {

private static final long inactiveThreshold =
CommonParameter.getInstance().getInactiveThreshold() * 1000L;
public static final long blockNotChangeThreshold = 90 * 1000L;
public static final long blockNotChangeThreshold = 60 * 1000L;

//when node is isolated, retention percent peers will not be disconnected
public static final double retentionPercent = 0.8;
Expand Down Expand Up @@ -74,75 +74,81 @@ private void disconnectRandom() {
if (peerSize >= CommonParameter.getInstance().getMaxConnections()) {
long now = System.currentTimeMillis();
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold)
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
.collect(Collectors.toList());
if (!peers.isEmpty()) {
int index = new Random().nextInt(peers.size());
disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION);
disconnectFromPeer(peers.get(index), ReasonCode.RANDOM_ELIMINATION,
DisconnectCause.RANDOM_ELIMINATION);
}
}
}

private void disconnectLan() {
if (isLanNode()) {
// disconnect from the node that has keep inactive for more than inactiveThreshold
// and its lastActiveTime is smallest
int peerSize = tronNetDelegate.getActivePeer().size();
if (peerSize >= CommonParameter.getInstance().getMinConnections()) {
long now = System.currentTimeMillis();
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold)
.filter(peer -> !peer.getChannel().isTrustPeer())
.collect(Collectors.toList());
Optional<PeerConnection> one = getEarliestPeer(peers);
one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL));
}
if (!isLanNode()) {
return;
}
// disconnect from the node that has keep inactive for more than inactiveThreshold
// and its lastActiveTime is smallest
int peerSize = tronNetDelegate.getActivePeer().size();
if (peerSize >= CommonParameter.getInstance().getMinConnections()) {
long now = System.currentTimeMillis();
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold)
.filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs())
.filter(peer -> !peer.getChannel().isTrustPeer())
.collect(Collectors.toList());
Optional<PeerConnection> one = getEarliestPeer(peers);
one.ifPresent(
peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL, DisconnectCause.LAN_NODE));
}
}

private void disconnectIsolated2() {
if (isIsolateLand2()) {
logger.info("Node is isolated, try to disconnect from peers");
int peerSize = tronNetDelegate.getActivePeer().size();

//disconnect from the node whose lastActiveTime is smallest
if (peerSize >= CommonParameter.getInstance().getMinActiveConnections()) {
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> peer.getChannel().isActive())
.collect(Collectors.toList());

Optional<PeerConnection> one = getEarliestPeer(peers);
one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL));
}
if (!isIsolateLand2()) {
return;
}
logger.warn("Node is isolated, try to disconnect from peers");
int peerSize = tronNetDelegate.getActivePeer().size();

//disconnect from some passive nodes, make sure retention nodes' num <= 0.8 * maxConnection,
//so new peers can come in
peerSize = tronNetDelegate.getActivePeer().size();
int threshold = (int) (CommonParameter.getInstance().getMaxConnections() * retentionPercent);
if (peerSize > threshold) {
int disconnectSize = peerSize - threshold;
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.isDisconnect())
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> !peer.getChannel().isActive())
.collect(Collectors.toList());
try {
peers.sort(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo));
} catch (Exception e) {
logger.warn("Sort disconnectIsolated2 peers failed: {}", e.getMessage());
return;
}
//disconnect from the node whose lastActiveTime is smallest
if (peerSize >= CommonParameter.getInstance().getMinActiveConnections()) {
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> peer.getChannel().isActive())
.collect(Collectors.toList());

if (peers.size() > disconnectSize) {
peers = peers.subList(0, disconnectSize);
}
peers.forEach(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL));
Optional<PeerConnection> one = getEarliestPeer(peers);
one.ifPresent(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL,
DisconnectCause.ISOLATE2_ACTIVE));
}

//disconnect from some passive nodes, make sure retention nodes' num <= 0.8 * maxConnection,
//so new peers can come in
peerSize = tronNetDelegate.getActivePeer().size();
int threshold = (int) (CommonParameter.getInstance().getMaxConnections() * retentionPercent);
if (peerSize > threshold) {
int disconnectSize = peerSize - threshold;
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> !peer.getChannel().isActive())
.collect(Collectors.toList());
try {
peers.sort(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo));
} catch (Exception e) {
logger.warn("Sort disconnectIsolated2 peers failed: {}", e.getMessage());
return;
}
int candidateSize = peers.size();
if (peers.size() > disconnectSize) {
peers = peers.subList(0, disconnectSize);
}
logger.info("All peer Size:{}, plan size:{}, candidate size:{}, real size:{}", peerSize,
disconnectSize, candidateSize, peers.size());
peers.forEach(peer -> disconnectFromPeer(peer, ReasonCode.BAD_PROTOCOL,
DisconnectCause.ISOLATE2_PASSIVE));
}
}

Expand All @@ -162,7 +168,8 @@ private boolean isLanNode() {
int activePeerSize = (int) tronNetDelegate.getActivePeer().stream()
.filter(peer -> peer.getChannel().isActive())
.count();
return peerSize > 0 && peerSize == activePeerSize;
return peerSize >= CommonParameter.getInstance().getMinActiveConnections()
&& peerSize == activePeerSize;
}

private boolean isIsolateLand2() {
Expand All @@ -173,13 +180,21 @@ private boolean isIsolateLand2() {
return advPeerCount >= 1 && diff >= blockNotChangeThreshold;
}

private void disconnectFromPeer(PeerConnection peer, ReasonCode reasonCode) {
private void disconnectFromPeer(PeerConnection peer, ReasonCode reasonCode,
DisconnectCause cause) {
int inactiveSeconds = (int) ((System.currentTimeMillis() - peer.getLastActiveTime()) / 1000);
logger.info("Disconnect from peer {}, inactive seconds {}", peer.getInetSocketAddress(),
inactiveSeconds);
logger.info("Disconnect from peer {}, inactive seconds {}, cause: {}",
peer.getInetSocketAddress(), inactiveSeconds, cause);
peer.disconnect(reasonCode);
}

private enum DisconnectCause {
RANDOM_ELIMINATION,
LAN_NODE,
ISOLATE2_ACTIVE,
ISOLATE2_PASSIVE,
}

public void close() {
ExecutorServiceManager.shutdownAndAwaitTermination(executor, esName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public void testDisconnectRandom() {

PeerManager.add(context, c1);
}
for (PeerConnection peer : PeerManager.getPeers()) {
peer.setNeedSyncFromPeer(false);
peer.setNeedSyncFromUs(false);
}
ReflectUtils.invokeMethod(service, "disconnectRandom");
Assert.assertEquals(maxConnection, PeerManager.getPeers().size());

Expand Down Expand Up @@ -93,7 +97,10 @@ public void testDisconnectLan() {

PeerManager.add(context, c1);
}

for (PeerConnection peer : PeerManager.getPeers()) {
peer.setNeedSyncFromPeer(false);
peer.setNeedSyncFromUs(false);
}
Assert.assertEquals(9, PeerManager.getPeers().size());

boolean isLan = ReflectUtils.invokeMethod(service, "isLanNode");
Expand Down

0 comments on commit 424a3dd

Please sign in to comment.