Skip to content

Commit

Permalink
draft stress test for multiple parties
Browse files Browse the repository at this point in the history
  • Loading branch information
ffakenz committed May 15, 2024
1 parent ef4a1ce commit 34ee3ff
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 71 deletions.
151 changes: 80 additions & 71 deletions hydra-node/test/Hydra/Network/ReliabilitySpec.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-}

module Hydra.Network.ReliabilitySpec where

import Hydra.Prelude hiding (empty, fromList, head, replicate, unlines)
Expand All @@ -11,18 +13,22 @@ import Control.Concurrent.Class.MonadSTM (
newTVarIO,
writeTVar,
)
import Control.Monad.IOSim (IOSim, runSimOrThrow)
import Control.Monad.Class.MonadAsync (mapConcurrently_)
import Control.Monad.IOSim (runSimOrThrow)
import Control.Tracer (Tracer (..), nullTracer)
import Data.Map qualified as Map
import Data.Maybe qualified as Maybe
import Data.Sequence.Strict ((|>))
import Data.Vector (Vector, empty, fromList, head, replicate, snoc)
import Data.Vector qualified as Vector
import Hydra.Network (Network (..), NodeId)
import Hydra.Network (Network (..), NodeId (..))
import Hydra.Network.Authenticate (Authenticated (..))
import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat)
import Hydra.Network.Message (Connectivity)
import Hydra.Network.Ouroboros (NetworkComponent)
import Hydra.Network.Reliability (MessagePersistence (..), ReliabilityLog (..), ReliableMsg (..), withReliability)
import Hydra.Node.Network (withFlipHeartbeats)
import Hydra.Party (Party)
import Hydra.Persistence (
Persistence (..),
PersistenceIncremental (..),
Expand All @@ -32,7 +38,7 @@ import Hydra.Persistence (
import System.Directory (doesFileExist)
import System.FilePath ((</>))
import System.Random (StdGen, mkStdGen, uniformR)
import Test.Hydra.Fixture (alice, bob, carol)
import Test.Hydra.Fixture (Actor (..), actorName, actorParty, alice, bob, carol)
import Test.QuickCheck (
Positive (Positive),
collect,
Expand All @@ -43,7 +49,6 @@ import Test.QuickCheck (
(===),
)
import Prelude (unlines)
import Hydra.Party (Party)

spec :: Spec
spec = parallel $ do
Expand Down Expand Up @@ -112,61 +117,59 @@ spec = parallel $ do
-- this test is quite critical as it demonstrates messages dropped are properly managed and resent to the
-- other party whatever the length of queue, and whatever the interleaving of threads
modifyMaxSuccess (const 5000) $
prop "stress test networking layer" $ \(aliceToBobMessages :: [Int]) (bobToAliceMessages :: [Int]) seed ->
prop "stress test networking layer" $ \(aliceToBobMessages :: [Int]) (aliceToCarolMessages :: [Int]) (bobToAliceMessages :: [Int]) (bobToCarolMessages :: [Int]) (carolToAliceMessages :: [Int]) (carolToBobMessages :: [Int]) seed -> do
let actorAToBMessagesMappings :: Map (Actor, Actor) [Int] =
Map.fromList
[ ((Alice, Bob), aliceToBobMessages)
, ((Alice, Carol), aliceToCarolMessages)
, ((Bob, Alice), bobToAliceMessages)
, ((Bob, Carol), bobToCarolMessages)
, ((Carol, Alice), carolToAliceMessages)
, ((Carol, Bob), carolToBobMessages)
]
let
(msgReceivedByAlice, msgReceivedByBob, traces) = runSimOrThrow $ do
messagesReceivedByAlice <- newTVarIO empty
messagesReceivedByBob <- newTVarIO empty
messagesReceivedByCarol <- newTVarIO empty
([msgReceivedByAlice, msgReceivedByBob, msgReceivedByCarol], traces) = runSimOrThrow $ do
emittedTraces <- newTVarIO []
randomSeed <- newTVarIO $ mkStdGen seed
aliceToBob <- newTQueueIO
aliceToCarol <- newTQueueIO
bobToAlice <- newTQueueIO
bobToCarol <- newTQueueIO
carolToAlice <- newTQueueIO
carolToBob <- newTQueueIO
alicePersistence <- failingMessagePersistence randomSeed 2
bobPersistence <- failingMessagePersistence randomSeed 2
carolPersistence <- failingMessagePersistence randomSeed 2
let
-- this is a NetworkComponent that broadcasts authenticated messages
-- mediated through a read and a write TQueue but drops 0.2 % of them
aliceToBobFailingNetwork = failingNetwork randomSeed alice bobToAlice aliceToBob
aliceToCarolFailingNetwork = failingNetwork randomSeed alice carolToAlice aliceToCarol
bobToAliceFailingNetwork = failingNetwork randomSeed bob aliceToBob bobToAlice
bobToCarolFailingNetwork = failingNetwork randomSeed bob bobToAlice bobToCarol
carolToAliceFailingNetwork = failingNetwork randomSeed carol aliceToCarol carolToAlice
carolToBobFailingNetwork = failingNetwork randomSeed carol bobToCarol carolToBob

aliceToBobReliabilityStack = reliabilityStack alicePersistence aliceToBobFailingNetwork (captureTraces emittedTraces) "alice" alice [bob]
aliceToCarolReliabilityStack = reliabilityStack alicePersistence aliceToCarolFailingNetwork (captureTraces emittedTraces) "alice" alice [carol]
bobToAliceReliabilityStack = reliabilityStack bobPersistence bobToAliceFailingNetwork (captureTraces emittedTraces) "bob" bob [alice]
bobToCarolReliabilityStack = reliabilityStack bobPersistence bobToCarolFailingNetwork (captureTraces emittedTraces) "bob" bob [carol]
carolToAliceReliabilityStack = reliabilityStack bobPersistence carolToAliceFailingNetwork (captureTraces emittedTraces) "carol" carol [alice]
carolToBobReliabilityStack = reliabilityStack bobPersistence carolToBobFailingNetwork (captureTraces emittedTraces) "carol" carol [bob]

runAliceToBob = runPeer aliceToBobReliabilityStack "alice" messagesReceivedByAlice messagesReceivedByBob aliceToBobMessages bobToAliceMessages
runAliceToCarol = runPeer aliceToBobReliabilityStack "alice" messagesReceivedByAlice messagesReceivedByCarol aliceToCarolMessages carolToAliceMessages
runBobToAlice = runPeer bobToAliceReliabilityStack "bob" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages
runBobToCarol = runPeer bobToAliceReliabilityStack "bob" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages
runCarolToAlice = runPeer carolToAliceReliabilityStack "carol" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages
runCarolToBob = runPeer carolToBobReliabilityStack "carol" messagesReceivedByCarol messagesReceivedByBob bobToAliceMessages aliceToBobMessages

concurrently_ runAliceToBob runAliceToCarol runBobToAlice runBobToCarol runCarolToAlice runCarolToBob

let actors = [Alice, Bob, Carol]
let connectedActors :: [(Actor, Actor)] = [(a, b) | a <- actors, b <- actors, a /= b]
inboundMappings <- Map.fromList <$> mapM (\as -> newTQueueIO >>= (\tq -> pure (as, tq))) connectedActors
outboundMappings <- Map.fromList <$> mapM (\as -> newTQueueIO >>= (\tq -> pure (as, tq))) connectedActors
persistenceMappings <- Map.fromList <$> mapM (\a -> mockMessagePersistence 2 >>= (\p -> pure (a, p))) actors
messagesReceivedByMappings <- Map.fromList <$> mapM (\a -> newTVarIO empty >>= (\tv -> pure (a, tv))) actors
let createFailingNodePeer key@(actorA :: Actor, actorB :: Actor) =
let actorDriver :: Party = actorParty actorA
actorInbound = Maybe.fromJust $ Map.lookup key inboundMappings
actorOutbound = Maybe.fromJust $ Map.lookup key outboundMappings
actorPersistence = Maybe.fromJust $ Map.lookup actorA persistenceMappings
messagesReceivedByActorA = Maybe.fromJust $ Map.lookup actorA messagesReceivedByMappings
actorAToBMessages = Maybe.fromJust $ Map.lookup key actorAToBMessagesMappings
messagesReceivedByActorB = Maybe.fromJust $ Map.lookup actorB messagesReceivedByMappings
actorBToAMessages = Maybe.fromJust $ Map.lookup key actorAToBMessagesMappings
peerFailingNetwork = failingNetwork randomSeed actorDriver actorInbound actorOutbound
peerReliabilityStack = reliabilityStack actorPersistence peerFailingNetwork (captureTraces emittedTraces) (NodeId $ actorName actorA) actorDriver [actorParty actorB]
in runPeer peerReliabilityStack (NodeId $ actorName actorA) messagesReceivedByActorA messagesReceivedByActorB actorAToBMessages actorBToAMessages

mapConcurrently_ createFailingNodePeer []

logs <- readTVarIO emittedTraces
aliceReceived <- Vector.toList <$> readTVarIO messagesReceivedByAlice
bobReceived <- Vector.toList <$> readTVarIO messagesReceivedByBob
carolReceived <- Vector.toList <$> readTVarIO messagesReceivedByCarol
pure (aliceReceived, bobReceived, logs)

let peerReceived (actor :: Actor) = do
let messagesReceivedByActor = Maybe.fromJust $ Map.lookup actor messagesReceivedByMappings
Vector.toList <$> readTVarIO messagesReceivedByActor

peersReceivd <- mapM peerReceived actors
pure (peersReceivd, logs)
in
within 1000000 $
msgReceivedByBob
=== aliceToBobMessages
( msgReceivedByBob
=== aliceToBobMessages ++ carolToBobMessages
)
& counterexample (unlines $ show <$> reverse traces)
& tabulate "Messages from Alice to Bob" ["< " <> show ((length msgReceivedByBob `div` 10 + 1) * 10)]
& tabulate "Messages from Bob to Alice" ["< " <> show ((length msgReceivedByAlice `div` 10 + 1) * 10)]
& tabulate "Messages from Alice to Bob" ["< " <> show ((length msgReceivedByBob `div` 10 + 1) * 10)]
& tabulate "Messages from Bob to Carol" ["< " <> show ((length msgReceivedByCarol `div` 10 + 1) * 10)]

it "broadcast updates counter from peers" $ do
let receivedMsgs = runSimOrThrow $ do
Expand Down Expand Up @@ -245,18 +248,23 @@ spec = parallel $ do
(waitForAllMessages expectedMessages receivedMessageContainer)
(waitForAllMessages messagesToSend sentMessageContainer)

reliabilityStack :: (MonadThrow m, MonadThrow (STM m), MonadAsync m, MonadDelay m) => MessagePersistence m outbound
-> NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
-> Tracer m ReliabilityLog
-> NodeId
-> Party
-> [Party]
-> NetworkComponent
m (Either Connectivity (Authenticated inbound)) outbound a
reliabilityStack ::
(MonadThrow m, MonadThrow (STM m), MonadAsync m, MonadDelay m) =>
MessagePersistence m outbound ->
NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a ->
Tracer m ReliabilityLog ->
NodeId ->
Party ->
[Party] ->
NetworkComponent
m
(Either Connectivity (Authenticated inbound))
outbound
a
reliabilityStack persistence underlyingNetwork tracer nodeId party peers =
withHeartbeat nodeId $
withFlipHeartbeats $
Expand All @@ -266,12 +274,13 @@ reloadAll fileName =
createPersistenceIncremental fileName
>>= \PersistenceIncremental{loadAll} -> loadAll


failingNetwork :: MonadAsync m => TVar m StdGen
-> Party
-> TQueue m inbound
-> TQueue m (Authenticated outbound)
-> NetworkComponent m inbound outbound a
failingNetwork ::
MonadAsync m =>
TVar m StdGen ->
Party ->
TQueue m inbound ->
TQueue m (Authenticated outbound) ->
NetworkComponent m inbound outbound a
failingNetwork seed peer readQueue writeQueue callback action =
withAsync
( forever $ do
Expand All @@ -281,12 +290,12 @@ failingNetwork seed peer readQueue writeQueue callback action =
$ \_ ->
action $
Network
{ broadcast = \m -> dropPercent 0.02 seed $
atomically $
{ broadcast = \m ->
dropPercent 0.02 seed $
atomically $
writeTQueue writeQueue (Authenticated m peer)
}


dropPercent :: MonadSTM m => Double -> TVar m StdGen -> m () -> m ()
dropPercent x seed f = do
r <- randomNumber seed
Expand Down
18 changes: 18 additions & 0 deletions hydra-node/test/Test/Hydra/Fixture.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,24 @@ import Hydra.Ledger.Cardano (genVerificationKey)
import Hydra.OnChainId (AsType (AsOnChainId), OnChainId)
import Hydra.Party (Party (..), deriveParty)

data Actor
= Alice
| Bob
| Carol
deriving stock (Eq, Show, Ord)

actorName :: Actor -> Text
actorName = \case
Alice -> "alice"
Bob -> "bob"
Carol -> "carol"

actorParty :: Actor -> Party
actorParty = \case
Alice -> deriveParty aliceSk
Bob -> deriveParty bobSk
Carol -> deriveParty carolSk

-- | Our beloved alice, bob, and carol.
alice, bob, carol :: Party
alice = deriveParty aliceSk
Expand Down

0 comments on commit 34ee3ff

Please sign in to comment.