From b6c119c5da1b1d605cf9d3dfbba2fcb21d7f2381 Mon Sep 17 00:00:00 2001 From: Franco Testagrossa Date: Wed, 15 May 2024 12:30:49 +0200 Subject: [PATCH] draft stress test for multiple parties --- .../test/Hydra/Network/ReliabilitySpec.hs | 151 ++++++++++-------- hydra-node/test/Test/Hydra/Fixture.hs | 18 +++ 2 files changed, 98 insertions(+), 71 deletions(-) diff --git a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs index 567c91d85c8..bdf1087cc38 100644 --- a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs +++ b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs @@ -1,3 +1,5 @@ +{-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-} + module Hydra.Network.ReliabilitySpec where import Hydra.Prelude hiding (empty, fromList, head, replicate, unlines) @@ -11,18 +13,22 @@ import Control.Concurrent.Class.MonadSTM ( newTVarIO, writeTVar, ) -import Control.Monad.IOSim (IOSim, runSimOrThrow) +import Control.Monad.Class.MonadAsync (mapConcurrently_) +import Control.Monad.IOSim (runSimOrThrow) import Control.Tracer (Tracer (..), nullTracer) +import Data.Map qualified as Map +import Data.Maybe qualified as Maybe import Data.Sequence.Strict ((|>)) import Data.Vector (Vector, empty, fromList, head, replicate, snoc) import Data.Vector qualified as Vector -import Hydra.Network (Network (..), NodeId) +import Hydra.Network (Network (..), NodeId (..)) import Hydra.Network.Authenticate (Authenticated (..)) import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat) import Hydra.Network.Message (Connectivity) import Hydra.Network.Ouroboros (NetworkComponent) import Hydra.Network.Reliability (MessagePersistence (..), ReliabilityLog (..), ReliableMsg (..), withReliability) import Hydra.Node.Network (withFlipHeartbeats) +import Hydra.Party (Party) import Hydra.Persistence ( Persistence (..), PersistenceIncremental (..), @@ -32,7 +38,7 @@ import Hydra.Persistence ( import System.Directory (doesFileExist) import System.FilePath (()) import System.Random (StdGen, mkStdGen, uniformR) -import Test.Hydra.Fixture (alice, bob, carol) +import Test.Hydra.Fixture (Actor (..), actorName, actorParty, alice, bob, carol) import Test.QuickCheck ( Positive (Positive), collect, @@ -43,7 +49,6 @@ import Test.QuickCheck ( (===), ) import Prelude (unlines) -import Hydra.Party (Party) spec :: Spec spec = parallel $ do @@ -112,61 +117,59 @@ spec = parallel $ do -- this test is quite critical as it demonstrates messages dropped are properly managed and resent to the -- other party whatever the length of queue, and whatever the interleaving of threads modifyMaxSuccess (const 5000) $ - prop "stress test networking layer" $ \(aliceToBobMessages :: [Int]) (bobToAliceMessages :: [Int]) seed -> + prop "stress test networking layer" $ \(aliceToBobMessages :: [Int]) (aliceToCarolMessages :: [Int]) (bobToAliceMessages :: [Int]) (bobToCarolMessages :: [Int]) (carolToAliceMessages :: [Int]) (carolToBobMessages :: [Int]) seed -> do + let actorAToBMessagesMappings :: Map (Actor, Actor) [Int] = + Map.fromList + [ ((Alice, Bob), aliceToBobMessages) + , ((Alice, Carol), aliceToCarolMessages) + , ((Bob, Alice), bobToAliceMessages) + , ((Bob, Carol), bobToCarolMessages) + , ((Carol, Alice), carolToAliceMessages) + , ((Carol, Bob), carolToBobMessages) + ] let - (msgReceivedByAlice, msgReceivedByBob, traces) = runSimOrThrow $ do - messagesReceivedByAlice <- newTVarIO empty - messagesReceivedByBob <- newTVarIO empty - messagesReceivedByCarol <- newTVarIO empty + ([msgReceivedByAlice, msgReceivedByBob, msgReceivedByCarol], traces) = runSimOrThrow $ do emittedTraces <- newTVarIO [] randomSeed <- newTVarIO $ mkStdGen seed - aliceToBob <- newTQueueIO - aliceToCarol <- newTQueueIO - bobToAlice <- newTQueueIO - bobToCarol <- newTQueueIO - carolToAlice <- newTQueueIO - carolToBob <- newTQueueIO - alicePersistence <- failingMessagePersistence randomSeed 2 - bobPersistence <- failingMessagePersistence randomSeed 2 - carolPersistence <- failingMessagePersistence randomSeed 2 - let - -- this is a NetworkComponent that broadcasts authenticated messages - -- mediated through a read and a write TQueue but drops 0.2 % of them - aliceToBobFailingNetwork = failingNetwork randomSeed alice bobToAlice aliceToBob - aliceToCarolFailingNetwork = failingNetwork randomSeed alice carolToAlice aliceToCarol - bobToAliceFailingNetwork = failingNetwork randomSeed bob aliceToBob bobToAlice - bobToCarolFailingNetwork = failingNetwork randomSeed bob bobToAlice bobToCarol - carolToAliceFailingNetwork = failingNetwork randomSeed carol aliceToCarol carolToAlice - carolToBobFailingNetwork = failingNetwork randomSeed carol bobToCarol carolToBob - - aliceToBobReliabilityStack = reliabilityStack alicePersistence aliceToBobFailingNetwork (captureTraces emittedTraces) "alice" alice [bob] - aliceToCarolReliabilityStack = reliabilityStack alicePersistence aliceToCarolFailingNetwork (captureTraces emittedTraces) "alice" alice [carol] - bobToAliceReliabilityStack = reliabilityStack bobPersistence bobToAliceFailingNetwork (captureTraces emittedTraces) "bob" bob [alice] - bobToCarolReliabilityStack = reliabilityStack bobPersistence bobToCarolFailingNetwork (captureTraces emittedTraces) "bob" bob [carol] - carolToAliceReliabilityStack = reliabilityStack bobPersistence carolToAliceFailingNetwork (captureTraces emittedTraces) "carol" carol [alice] - carolToBobReliabilityStack = reliabilityStack bobPersistence carolToBobFailingNetwork (captureTraces emittedTraces) "carol" carol [bob] - - runAliceToBob = runPeer aliceToBobReliabilityStack "alice" messagesReceivedByAlice messagesReceivedByBob aliceToBobMessages bobToAliceMessages - runAliceToCarol = runPeer aliceToBobReliabilityStack "alice" messagesReceivedByAlice messagesReceivedByCarol aliceToCarolMessages carolToAliceMessages - runBobToAlice = runPeer bobToAliceReliabilityStack "bob" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages - runBobToCarol = runPeer bobToAliceReliabilityStack "bob" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages - runCarolToAlice = runPeer carolToAliceReliabilityStack "carol" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages - runCarolToBob = runPeer carolToBobReliabilityStack "carol" messagesReceivedByCarol messagesReceivedByBob bobToAliceMessages aliceToBobMessages - - concurrently_ runAliceToBob runAliceToCarol runBobToAlice runBobToCarol runCarolToAlice runCarolToBob + + let actors = [Alice, Bob, Carol] + let connectedActors :: [(Actor, Actor)] = [(a, b) | a <- actors, b <- actors, a /= b] + inboundMappings <- Map.fromList <$> mapM (\as -> newTQueueIO >>= (\i -> pure (as, i))) connectedActors + outboundMappings <- Map.fromList <$> mapM (\as -> newTQueueIO >>= (\i -> pure (as, i))) connectedActors + persistenceMappings <- Map.fromList <$> mapM (\a -> mockMessagePersistence 2 <&> (pure . (a,))) actors + messagesReceivedByMappings <- Map.fromList <$> mapM (\a -> newTVarIO empty <&> (pure . (a,))) actors + let createFailingNodePeer key@(actorA :: Actor, actorB :: Actor) = + let actorDriver = actorParty actorA + actorInbound = snd . Maybe.fromJust $ Map.lookup key inboundMappings + actorOutbound = snd . Maybe.fromJust $ Map.lookup key outboundMappings + actorPersistence = snd . Maybe.fromJust $ Map.lookup key persistenceMappings + messagesReceivedByActorA = snd . Maybe.fromJust $ Map.lookup actorA messagesReceivedByMappings + actorAToBMessages = Maybe.fromJust $ Map.lookup key actorAToBMessagesMappings + messagesReceivedByActorB = snd . Maybe.fromJust $ Map.lookup actorB messagesReceivedByMappings + actorBToAMessages = Maybe.fromJust $ Map.lookup key actorAToBMessagesMappings + peerFailingNetwork = failingNetwork randomSeed actorDriver actorInbound actorOutbound + peerReliabilityStack = reliabilityStack actorPersistence peerFailingNetwork (captureTraces emittedTraces) (NodeId $ actorName actorA) actorDriver [actorParty actorB] + in runPeer peerReliabilityStack (NodeId $ actorName actorA) messagesReceivedByActorA messagesReceivedByActorB actorAToBMessages actorBToAMessages + + mapConcurrently_ createFailingNodePeer connectedActors logs <- readTVarIO emittedTraces - aliceReceived <- Vector.toList <$> readTVarIO messagesReceivedByAlice - bobReceived <- Vector.toList <$> readTVarIO messagesReceivedByBob - carolReceived <- Vector.toList <$> readTVarIO messagesReceivedByCarol - pure (aliceReceived, bobReceived, logs) + + let peerReceived (actor :: Actor) = do + let messagesReceivedByActor = snd . Maybe.fromJust $ Map.lookup actor messagesReceivedByMappings + Vector.toList <$> readTVarIO messagesReceivedByActor + + peersReceivd <- mapM peerReceived actors + pure (peersReceivd, logs) in within 1000000 $ - msgReceivedByBob - === aliceToBobMessages + ( msgReceivedByBob + === aliceToBobMessages ++ carolToBobMessages + ) & counterexample (unlines $ show <$> reverse traces) - & tabulate "Messages from Alice to Bob" ["< " <> show ((length msgReceivedByBob `div` 10 + 1) * 10)] & tabulate "Messages from Bob to Alice" ["< " <> show ((length msgReceivedByAlice `div` 10 + 1) * 10)] + & tabulate "Messages from Alice to Bob" ["< " <> show ((length msgReceivedByBob `div` 10 + 1) * 10)] + & tabulate "Messages from Bob to Carol" ["< " <> show ((length msgReceivedByCarol `div` 10 + 1) * 10)] it "broadcast updates counter from peers" $ do let receivedMsgs = runSimOrThrow $ do @@ -245,18 +248,23 @@ spec = parallel $ do (waitForAllMessages expectedMessages receivedMessageContainer) (waitForAllMessages messagesToSend sentMessageContainer) -reliabilityStack :: (MonadThrow m, MonadThrow (STM m), MonadAsync m, MonadDelay m) => MessagePersistence m outbound - -> NetworkComponent - m - (Authenticated (ReliableMsg (Heartbeat inbound))) - (ReliableMsg (Heartbeat outbound)) - a - -> Tracer m ReliabilityLog - -> NodeId - -> Party - -> [Party] - -> NetworkComponent - m (Either Connectivity (Authenticated inbound)) outbound a +reliabilityStack :: + (MonadThrow m, MonadThrow (STM m), MonadAsync m, MonadDelay m) => + MessagePersistence m outbound -> + NetworkComponent + m + (Authenticated (ReliableMsg (Heartbeat inbound))) + (ReliableMsg (Heartbeat outbound)) + a -> + Tracer m ReliabilityLog -> + NodeId -> + Party -> + [Party] -> + NetworkComponent + m + (Either Connectivity (Authenticated inbound)) + outbound + a reliabilityStack persistence underlyingNetwork tracer nodeId party peers = withHeartbeat nodeId $ withFlipHeartbeats $ @@ -266,12 +274,13 @@ reloadAll fileName = createPersistenceIncremental fileName >>= \PersistenceIncremental{loadAll} -> loadAll - -failingNetwork :: MonadAsync m => TVar m StdGen - -> Party - -> TQueue m inbound - -> TQueue m (Authenticated outbound) - -> NetworkComponent m inbound outbound a +failingNetwork :: + MonadAsync m => + TVar m StdGen -> + Party -> + TQueue m inbound -> + TQueue m (Authenticated outbound) -> + NetworkComponent m inbound outbound a failingNetwork seed peer readQueue writeQueue callback action = withAsync ( forever $ do @@ -281,12 +290,12 @@ failingNetwork seed peer readQueue writeQueue callback action = $ \_ -> action $ Network - { broadcast = \m -> dropPercent 0.02 seed $ - atomically $ + { broadcast = \m -> + dropPercent 0.02 seed $ + atomically $ writeTQueue writeQueue (Authenticated m peer) } - dropPercent :: MonadSTM m => Double -> TVar m StdGen -> m () -> m () dropPercent x seed f = do r <- randomNumber seed diff --git a/hydra-node/test/Test/Hydra/Fixture.hs b/hydra-node/test/Test/Hydra/Fixture.hs index 575cf65ae1d..2d5b4c88408 100644 --- a/hydra-node/test/Test/Hydra/Fixture.hs +++ b/hydra-node/test/Test/Hydra/Fixture.hs @@ -15,6 +15,24 @@ import Hydra.Ledger.Cardano (genVerificationKey) import Hydra.OnChainId (AsType (AsOnChainId), OnChainId) import Hydra.Party (Party (..), deriveParty) +data Actor + = Alice + | Bob + | Carol + deriving stock (Eq, Show, Ord) + +actorName :: Actor -> Text +actorName = \case + Alice -> "alice" + Bob -> "bob" + Carol -> "carol" + +actorParty :: Actor -> Party +actorParty = \case + Alice -> deriveParty aliceSk + Bob -> deriveParty bobSk + Carol -> deriveParty carolSk + -- | Our beloved alice, bob, and carol. alice, bob, carol :: Party alice = deriveParty aliceSk