Skip to content

Commit

Permalink
Merge pull request #2815 from limebell/cherrypick/2814
Browse files Browse the repository at this point in the history
Cherrypick 2814 to PBFT
  • Loading branch information
limebell authored Feb 15, 2023
2 parents ec7293c + 7cb864a commit 7a7faba
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 2 deletions.
96 changes: 96 additions & 0 deletions Libplanet.Net.Tests/SwarmTest.Broadcast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Libplanet.Blockchain;
Expand All @@ -11,6 +12,7 @@
using Libplanet.Blocks;
using Libplanet.Crypto;
using Libplanet.Net.Messages;
using Libplanet.Net.Transports;
using Libplanet.Store;
using Libplanet.Store.Trie;
using Libplanet.Tests.Common.Action;
Expand Down Expand Up @@ -1006,5 +1008,99 @@ public async Task CanFillWithInvalidTransaction()
swarm2.Dispose();
}
}

[Fact(Timeout = Timeout)]
public async Task DoNotSpawnMultipleTaskForSinglePeer()
{
var key = new PrivateKey();
var apv = new AppProtocolVersionOptions();
Swarm<DumbAction> receiver = CreateSwarm(appProtocolVersionOptions: apv);
ITransport mockTransport = await NetMQTransport.Create(
new PrivateKey(),
apv,
new HostOptions(
IPAddress.Loopback.ToString(),
Array.Empty<IceServer>()));
int requestCount = 0;

async Task MessageHandler(Message message)
{
_logger.Debug("Received message: {Message}", message);
switch (message)
{
case PingMsg ping:
await mockTransport.ReplyMessageAsync(
new PongMsg { Identity = ping.Identity },
default);
break;

case GetBlockHashesMsg gbhm:
requestCount++;
break;
}
}

mockTransport.ProcessMessageHandler.Register(MessageHandler);

Block<DumbAction> block1 = ProposeNextBlock(
receiver.BlockChain.Genesis,
key,
new Transaction<DumbAction>[] { });
Block<DumbAction> block2 = ProposeNextBlock(
block1,
key,
new Transaction<DumbAction>[] { });

try
{
await StartAsync(receiver);
_ = mockTransport.StartAsync();
await mockTransport.WaitForRunningAsync();

// Send block header for block 1.
var blockHeaderMsg1 = new BlockHeaderMsg(
receiver.BlockChain.Genesis.Hash,
block1.Header);
await mockTransport.SendMessageAsync(
receiver.AsPeer,
blockHeaderMsg1,
TimeSpan.FromSeconds(5),
default);
await receiver.BlockHeaderReceived.WaitAsync();

// Wait until FillBlockAsync task has spawned block demand task.
await Task.Delay(1000);

// Re-send block header for block 1, make sure it does not spawn new task.
await mockTransport.SendMessageAsync(
receiver.AsPeer,
blockHeaderMsg1,
TimeSpan.FromSeconds(5),
default);
await receiver.BlockHeaderReceived.WaitAsync();
await Task.Delay(1000);

// Send block header for block 2, make sure it does not spawn new task.
var blockHeaderMsg2 = new BlockHeaderMsg(
receiver.BlockChain.Genesis.Hash,
block2.Header);
await mockTransport.SendMessageAsync(
receiver.AsPeer,
blockHeaderMsg2,
TimeSpan.FromSeconds(5),
default);
await receiver.BlockHeaderReceived.WaitAsync();
await Task.Delay(1000);

Assert.Equal(1, requestCount);
}
finally
{
await StopAsync(receiver);
await mockTransport.StopAsync(TimeSpan.FromMilliseconds(10));
receiver.Dispose();
mockTransport.Dispose();
}
}
}
}
20 changes: 18 additions & 2 deletions Libplanet.Net/Swarm.BlockCandidate.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#nullable disable
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand All @@ -11,6 +12,8 @@ namespace Libplanet.Net
{
public partial class Swarm<T>
{
private readonly ConcurrentDictionary<BoundPeer, int> _processBlockDemandSessions;

private async Task ConsumeBlockCandidates(
CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -278,12 +281,18 @@ private async Task<bool> ProcessBlockDemandAsync(
BlockDemand demand,
CancellationToken cancellationToken)
{
BoundPeer peer = demand.Peer;

if (_processBlockDemandSessions.ContainsKey(peer))
{
// Another task has spawned for the peer.
return false;
}

var sessionRandom = new Random();

int sessionId = sessionRandom.Next();

BoundPeer peer = demand.Peer;

if (demand.Index <= BlockChain.Tip.Index)
{
return false;
Expand All @@ -302,6 +311,7 @@ private async Task<bool> ProcessBlockDemandAsync(

try
{
_processBlockDemandSessions.TryAdd(peer, sessionId);
var result = await BlockCandidateDownload(
peer: peer,
blockChain: BlockChain,
Expand Down Expand Up @@ -349,6 +359,12 @@ private async Task<bool> ProcessBlockDemandAsync(
_logger.Error(e, msg, sessionId, peer, e);
return false;
}
finally
{
// Maybe demand table can be cleaned up here, but it will be eventually
// cleaned up in FillBlocksAsync()
_processBlockDemandSessions.TryRemove(peer, out _);
}
}

private async Task<bool> BlockCandidateDownload(
Expand Down
1 change: 1 addition & 0 deletions Libplanet.Net/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public Swarm(
_appProtocolVersionOptions,
hostOptions,
Options.MessageTimestampBuffer).ConfigureAwait(false).GetAwaiter().GetResult();
_processBlockDemandSessions = new ConcurrentDictionary<BoundPeer, int>();
Transport.ProcessMessageHandler.Register(ProcessMessageHandlerAsync);
PeerDiscovery = new KademliaProtocol(RoutingTable, Transport, Address);

Expand Down

0 comments on commit 7a7faba

Please sign in to comment.