Skip to content

Commit

Permalink
Merge pull request #5956 from 317787106/feature/test_isolated3
Browse files Browse the repository at this point in the history
feat(net): adjust disconnect strategy in isolated scene
  • Loading branch information
lvs007 authored Aug 16, 2024
2 parents 424a3dd + 1a7abf3 commit ea7ef8e
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.tron.p2p.connection.Channel;
import org.tron.protos.Protocol;
import org.tron.protos.Protocol.Inventory.InventoryType;
import org.tron.protos.Protocol.ReasonCode;

@Slf4j(topic = "net")
@Component
Expand Down Expand Up @@ -207,7 +206,7 @@ private void processMessage(PeerConnection peer, byte[] data) {
default:
throw new P2pException(P2pException.TypeEnum.NO_SUCH_MESSAGE, msg.getType().toString());
}
updateLastActiveTime(peer, msg);
updateLastInteractiveTime(peer, msg);
} catch (Exception e) {
processException(peer, msg, e);
} finally {
Expand All @@ -223,7 +222,7 @@ private void processMessage(PeerConnection peer, byte[] data) {
}
}

private void updateLastActiveTime(PeerConnection peer, TronMessage msg) {
private void updateLastInteractiveTime(PeerConnection peer, TronMessage msg) {
MessageTypes type = msg.getType();

boolean flag = false;
Expand All @@ -240,7 +239,7 @@ private void updateLastActiveTime(PeerConnection peer, TronMessage msg) {
break;
}
if (flag) {
peer.setLastActiveTime(System.currentTimeMillis());
peer.setLastInteractiveTime(System.currentTimeMillis());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void processMessage(PeerConnection peer, TronMessage msg) {
peer.getAdvInvReceive().put(item, System.currentTimeMillis());
advService.addInv(item);
if (type.equals(InventoryType.BLOCK) && peer.getAdvInvSpread().getIfPresent(item) == null) {
peer.setLastActiveTime(System.currentTimeMillis());
peer.setLastInteractiveTime(System.currentTimeMillis());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class PeerConnection {

@Getter
@Setter
private volatile long lastActiveTime;
private volatile long lastInteractiveTime;

@Getter
@Setter
Expand Down Expand Up @@ -163,7 +163,7 @@ public void setChannel(Channel channel) {
this.isRelayPeer = true;
}
this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress());
lastActiveTime = System.currentTimeMillis();
lastInteractiveTime = System.currentTimeMillis();
}

public void setBlockBothHave(BlockId blockId) {
Expand Down Expand Up @@ -245,7 +245,7 @@ public String log() {
remainNum,
requested == null ? 0 : (now - requested.getValue())
/ Constant.ONE_THOUSAND,
(now - lastActiveTime) / Constant.ONE_THOUSAND,
(now - lastInteractiveTime) / Constant.ONE_THOUSAND,
syncBlockInProcess.size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private void disconnectRandom() {
if (peerSize >= CommonParameter.getInstance().getMaxConnections()) {
long now = System.currentTimeMillis();
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold)
.filter(peer -> now - peer.getLastInteractiveTime() >= inactiveThreshold)
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> !peer.isNeedSyncFromUs() && !peer.isNeedSyncFromPeer())
.collect(Collectors.toList());
Expand All @@ -96,7 +96,7 @@ private void disconnectLan() {
if (peerSize >= CommonParameter.getInstance().getMinConnections()) {
long now = System.currentTimeMillis();
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> now - peer.getLastActiveTime() >= inactiveThreshold)
.filter(peer -> now - peer.getLastInteractiveTime() >= inactiveThreshold)
.filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs())
.filter(peer -> !peer.getChannel().isTrustPeer())
.collect(Collectors.toList());
Expand All @@ -111,10 +111,12 @@ private void disconnectIsolated2() {
return;
}
logger.warn("Node is isolated, try to disconnect from peers");
int peerSize = tronNetDelegate.getActivePeer().size();

//disconnect from the node whose lastActiveTime is smallest
if (peerSize >= CommonParameter.getInstance().getMinActiveConnections()) {
int activePeerSize = (int) tronNetDelegate.getActivePeer().stream()
.filter(peer -> peer.getChannel().isActive())
.count();
if (activePeerSize >= CommonParameter.getInstance().getMinActiveConnections()) {
List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> !peer.getChannel().isTrustPeer())
.filter(peer -> peer.getChannel().isActive())
Expand All @@ -127,7 +129,7 @@ private void disconnectIsolated2() {

//disconnect from some passive nodes, make sure retention nodes' num <= 0.8 * maxConnection,
//so new peers can come in
peerSize = tronNetDelegate.getActivePeer().size();
int peerSize = tronNetDelegate.getActivePeer().size();
int threshold = (int) (CommonParameter.getInstance().getMaxConnections() * retentionPercent);
if (peerSize > threshold) {
int disconnectSize = peerSize - threshold;
Expand All @@ -136,7 +138,7 @@ private void disconnectIsolated2() {
.filter(peer -> !peer.getChannel().isActive())
.collect(Collectors.toList());
try {
peers.sort(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo));
peers.sort(Comparator.comparing(PeerConnection::getLastInteractiveTime, Long::compareTo));
} catch (Exception e) {
logger.warn("Sort disconnectIsolated2 peers failed: {}", e.getMessage());
return;
Expand All @@ -156,7 +158,7 @@ private Optional<PeerConnection> getEarliestPeer(List<PeerConnection> pees) {
Optional<PeerConnection> one = Optional.empty();
try {
one = pees.stream()
.min(Comparator.comparing(PeerConnection::getLastActiveTime, Long::compareTo));
.min(Comparator.comparing(PeerConnection::getLastInteractiveTime, Long::compareTo));
} catch (Exception e) {
logger.warn("Get earliest peer failed: {}", e.getMessage());
}
Expand All @@ -182,7 +184,8 @@ private boolean isIsolateLand2() {

private void disconnectFromPeer(PeerConnection peer, ReasonCode reasonCode,
DisconnectCause cause) {
int inactiveSeconds = (int) ((System.currentTimeMillis() - peer.getLastActiveTime()) / 1000);
int inactiveSeconds = (int) ((System.currentTimeMillis() - peer.getLastInteractiveTime())
/ 1000);
logger.info("Disconnect from peer {}, inactive seconds {}, cause: {}",
peer.getInetSocketAddress(), inactiveSeconds, cause);
peer.disconnect(reasonCode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,20 @@ public void testProcessInventoryMessage() throws Exception {
}

@Test
public void testUpdateLastActiveTime() throws Exception {
public void testUpdateLastInteractiveTime() throws Exception {
String[] a = new String[0];
Args.setParam(a, Constant.TESTNET_CONF);

PeerConnection peer = new PeerConnection();
P2pEventHandlerImpl p2pEventHandler = new P2pEventHandlerImpl();

Method method = p2pEventHandler.getClass()
.getDeclaredMethod("updateLastActiveTime", PeerConnection.class, TronMessage.class);
.getDeclaredMethod("updateLastInteractiveTime", PeerConnection.class, TronMessage.class);
method.setAccessible(true);

long t1 = System.currentTimeMillis();
FetchInvDataMessage message = new FetchInvDataMessage(new ArrayList<>(), InventoryType.BLOCK);
method.invoke(p2pEventHandler, peer, message);
Assert.assertTrue(peer.getLastActiveTime() >= t1);
Assert.assertTrue(peer.getLastInteractiveTime() >= t1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ public void testDisconnectRandom() {
Assert.assertEquals(maxConnection, PeerManager.getPeers().size());

PeerConnection p1 = PeerManager.getPeers().get(1);
p1.setLastActiveTime(
p1.setLastInteractiveTime(
System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 1000);
PeerConnection p2 = PeerManager.getPeers().get(10);
p2.setLastActiveTime(
p2.setLastInteractiveTime(
System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 2000);

ReflectUtils.invokeMethod(service, "disconnectRandom");
Expand Down Expand Up @@ -108,11 +108,11 @@ public void testDisconnectLan() {

PeerConnection p1 = PeerManager.getPeers().get(1);
InetSocketAddress address1 = p1.getChannel().getInetSocketAddress();
p1.setLastActiveTime(
p1.setLastInteractiveTime(
System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 1000);
PeerConnection p2 = PeerManager.getPeers().get(2);
InetSocketAddress address2 = p2.getChannel().getInetSocketAddress();
p2.setLastActiveTime(
p2.setLastInteractiveTime(
System.currentTimeMillis() - Args.getInstance().inactiveThreshold * 1000L - 2000);

ReflectUtils.invokeMethod(service, "disconnectLan");
Expand Down

0 comments on commit ea7ef8e

Please sign in to comment.