diff --git a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs index 567c91d85c8..711e5b1a404 100644 --- a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs +++ b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs @@ -1,3 +1,5 @@ +{-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-} + module Hydra.Network.ReliabilitySpec where import Hydra.Prelude hiding (empty, fromList, head, replicate, unlines) @@ -11,18 +13,22 @@ import Control.Concurrent.Class.MonadSTM ( newTVarIO, writeTVar, ) -import Control.Monad.IOSim (IOSim, runSimOrThrow) +import Control.Monad.Class.MonadAsync (mapConcurrently_) +import Control.Monad.IOSim (runSimOrThrow) import Control.Tracer (Tracer (..), nullTracer) +import Data.Map qualified as Map +import Data.Maybe qualified as Maybe import Data.Sequence.Strict ((|>)) import Data.Vector (Vector, empty, fromList, head, replicate, snoc) import Data.Vector qualified as Vector -import Hydra.Network (Network (..), NodeId) +import Hydra.Network (Network (..), NodeId (..)) import Hydra.Network.Authenticate (Authenticated (..)) import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat) import Hydra.Network.Message (Connectivity) import Hydra.Network.Ouroboros (NetworkComponent) import Hydra.Network.Reliability (MessagePersistence (..), ReliabilityLog (..), ReliableMsg (..), withReliability) import Hydra.Node.Network (withFlipHeartbeats) +import Hydra.Party (Party) import Hydra.Persistence ( Persistence (..), PersistenceIncremental (..), @@ -32,7 +38,7 @@ import Hydra.Persistence ( import System.Directory (doesFileExist) import System.FilePath (()) import System.Random (StdGen, mkStdGen, uniformR) -import Test.Hydra.Fixture (alice, bob, carol) +import Test.Hydra.Fixture (Actor (..), actorName, actorParty, alice, bob, carol) import Test.QuickCheck ( Positive (Positive), collect, @@ -40,10 +46,10 @@ import Test.QuickCheck ( generate, tabulate, within, + (.&&.), (===), ) import Prelude (unlines) -import Hydra.Party (Party) spec :: Spec spec = parallel $ do @@ -112,61 +118,63 @@ spec = parallel $ do -- this test is quite critical as it demonstrates messages dropped are properly managed and resent to the -- other party whatever the length of queue, and whatever the interleaving of threads modifyMaxSuccess (const 5000) $ - prop "stress test networking layer" $ \(aliceToBobMessages :: [Int]) (bobToAliceMessages :: [Int]) seed -> + prop "stress test networking layer" $ \(aliceToMessages :: [Int]) (bobToMessages :: [Int]) (carolToMessages :: [Int]) seed -> do + let actorAToBMessagesMappings :: Map Actor [Int] = + Map.fromList + [ (Alice, aliceToMessages) + , (Bob, bobToMessages) + , (Carol, carolToMessages) + ] let - (msgReceivedByAlice, msgReceivedByBob, traces) = runSimOrThrow $ do - messagesReceivedByAlice <- newTVarIO empty - messagesReceivedByBob <- newTVarIO empty - messagesReceivedByCarol <- newTVarIO empty + ([msgReceivedByAlice, msgReceivedByBob, msgReceivedByCarol], traces) = runSimOrThrow $ do emittedTraces <- newTVarIO [] randomSeed <- newTVarIO $ mkStdGen seed - aliceToBob <- newTQueueIO - aliceToCarol <- newTQueueIO - bobToAlice <- newTQueueIO - bobToCarol <- newTQueueIO - carolToAlice <- newTQueueIO - carolToBob <- newTQueueIO - alicePersistence <- failingMessagePersistence randomSeed 2 - bobPersistence <- failingMessagePersistence randomSeed 2 - carolPersistence <- failingMessagePersistence randomSeed 2 - let - -- this is a NetworkComponent that broadcasts authenticated messages - -- mediated through a read and a write TQueue but drops 0.2 % of them - aliceToBobFailingNetwork = failingNetwork randomSeed alice bobToAlice aliceToBob - aliceToCarolFailingNetwork = failingNetwork randomSeed alice carolToAlice aliceToCarol - bobToAliceFailingNetwork = failingNetwork randomSeed bob aliceToBob bobToAlice - bobToCarolFailingNetwork = failingNetwork randomSeed bob bobToAlice bobToCarol - carolToAliceFailingNetwork = failingNetwork randomSeed carol aliceToCarol carolToAlice - carolToBobFailingNetwork = failingNetwork randomSeed carol bobToCarol carolToBob - - aliceToBobReliabilityStack = reliabilityStack alicePersistence aliceToBobFailingNetwork (captureTraces emittedTraces) "alice" alice [bob] - aliceToCarolReliabilityStack = reliabilityStack alicePersistence aliceToCarolFailingNetwork (captureTraces emittedTraces) "alice" alice [carol] - bobToAliceReliabilityStack = reliabilityStack bobPersistence bobToAliceFailingNetwork (captureTraces emittedTraces) "bob" bob [alice] - bobToCarolReliabilityStack = reliabilityStack bobPersistence bobToCarolFailingNetwork (captureTraces emittedTraces) "bob" bob [carol] - carolToAliceReliabilityStack = reliabilityStack bobPersistence carolToAliceFailingNetwork (captureTraces emittedTraces) "carol" carol [alice] - carolToBobReliabilityStack = reliabilityStack bobPersistence carolToBobFailingNetwork (captureTraces emittedTraces) "carol" carol [bob] - - runAliceToBob = runPeer aliceToBobReliabilityStack "alice" messagesReceivedByAlice messagesReceivedByBob aliceToBobMessages bobToAliceMessages - runAliceToCarol = runPeer aliceToBobReliabilityStack "alice" messagesReceivedByAlice messagesReceivedByCarol aliceToCarolMessages carolToAliceMessages - runBobToAlice = runPeer bobToAliceReliabilityStack "bob" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages - runBobToCarol = runPeer bobToAliceReliabilityStack "bob" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages - runCarolToAlice = runPeer carolToAliceReliabilityStack "carol" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages - runCarolToBob = runPeer carolToBobReliabilityStack "carol" messagesReceivedByCarol messagesReceivedByBob bobToAliceMessages aliceToBobMessages - - concurrently_ runAliceToBob runAliceToCarol runBobToAlice runBobToCarol runCarolToAlice runCarolToBob + + let actors = [Alice, Bob, Carol] + let connectedActors :: [(Actor, Actor)] = [(a, b) | a <- actors, b <- actors, a /= b] + inboundMappings <- Map.fromList <$> mapM (\as -> newTQueueIO >>= (\tq -> pure (as, tq))) connectedActors + outboundMappings <- Map.fromList <$> mapM (\as -> newTQueueIO >>= (\tq -> pure (as, tq))) connectedActors + persistenceMappings <- Map.fromList <$> mapM (\a -> mockMessagePersistence 2 >>= (\p -> pure (a, p))) actors + messagesReceivedByMappings <- Map.fromList <$> mapM (\a -> newTVarIO empty >>= (\tv -> pure (a, tv))) actors + let createFailingNodePeer key@(actorA :: Actor, actorB :: Actor) = + let actorDriver :: Party = actorParty actorA + actorInbound = Maybe.fromJust $ Map.lookup key inboundMappings + actorOutbound = Maybe.fromJust $ Map.lookup key outboundMappings + actorPersistence = Maybe.fromJust $ Map.lookup actorA persistenceMappings + messagesReceivedByActorA = Maybe.fromJust $ Map.lookup actorA messagesReceivedByMappings + actorAToBMessages = Maybe.fromJust $ Map.lookup actorA actorAToBMessagesMappings + messagesReceivedByActorB = Maybe.fromJust $ Map.lookup actorB messagesReceivedByMappings + actorBToAMessages = Maybe.fromJust $ Map.lookup actorB actorAToBMessagesMappings + peerFailingNetwork = failingNetwork randomSeed actorDriver actorInbound actorOutbound + peerReliabilityStack = reliabilityStack actorPersistence peerFailingNetwork (captureTraces emittedTraces) (NodeId $ actorName actorA) actorDriver [actorParty actorB] + in runPeer peerReliabilityStack (NodeId $ actorName actorA) messagesReceivedByActorA messagesReceivedByActorB actorAToBMessages actorBToAMessages + + mapConcurrently_ createFailingNodePeer [] logs <- readTVarIO emittedTraces - aliceReceived <- Vector.toList <$> readTVarIO messagesReceivedByAlice - bobReceived <- Vector.toList <$> readTVarIO messagesReceivedByBob - carolReceived <- Vector.toList <$> readTVarIO messagesReceivedByCarol - pure (aliceReceived, bobReceived, logs) + + let peerReceived (actor :: Actor) = do + let messagesReceivedByActor = Maybe.fromJust $ Map.lookup actor messagesReceivedByMappings + Vector.toList <$> readTVarIO messagesReceivedByActor + + peersReceivd <- mapM peerReceived actors + pure (peersReceivd, logs) in within 1000000 $ - msgReceivedByBob - === aliceToBobMessages + ( ( msgReceivedByAlice + === bobToMessages <> carolToMessages + ) + .&&. ( msgReceivedByBob + === aliceToMessages <> carolToMessages + ) + .&&. ( msgReceivedByCarol + === aliceToMessages <> bobToMessages + ) + ) & counterexample (unlines $ show <$> reverse traces) - & tabulate "Messages from Alice to Bob" ["< " <> show ((length msgReceivedByBob `div` 10 + 1) * 10)] & tabulate "Messages from Bob to Alice" ["< " <> show ((length msgReceivedByAlice `div` 10 + 1) * 10)] + & tabulate "Messages from Alice to Bob" ["< " <> show ((length msgReceivedByBob `div` 10 + 1) * 10)] + & tabulate "Messages from Bob to Carol" ["< " <> show ((length msgReceivedByCarol `div` 10 + 1) * 10)] it "broadcast updates counter from peers" $ do let receivedMsgs = runSimOrThrow $ do @@ -245,18 +253,23 @@ spec = parallel $ do (waitForAllMessages expectedMessages receivedMessageContainer) (waitForAllMessages messagesToSend sentMessageContainer) -reliabilityStack :: (MonadThrow m, MonadThrow (STM m), MonadAsync m, MonadDelay m) => MessagePersistence m outbound - -> NetworkComponent - m - (Authenticated (ReliableMsg (Heartbeat inbound))) - (ReliableMsg (Heartbeat outbound)) - a - -> Tracer m ReliabilityLog - -> NodeId - -> Party - -> [Party] - -> NetworkComponent - m (Either Connectivity (Authenticated inbound)) outbound a +reliabilityStack :: + (MonadThrow m, MonadThrow (STM m), MonadAsync m, MonadDelay m) => + MessagePersistence m outbound -> + NetworkComponent + m + (Authenticated (ReliableMsg (Heartbeat inbound))) + (ReliableMsg (Heartbeat outbound)) + a -> + Tracer m ReliabilityLog -> + NodeId -> + Party -> + [Party] -> + NetworkComponent + m + (Either Connectivity (Authenticated inbound)) + outbound + a reliabilityStack persistence underlyingNetwork tracer nodeId party peers = withHeartbeat nodeId $ withFlipHeartbeats $ @@ -266,12 +279,13 @@ reloadAll fileName = createPersistenceIncremental fileName >>= \PersistenceIncremental{loadAll} -> loadAll - -failingNetwork :: MonadAsync m => TVar m StdGen - -> Party - -> TQueue m inbound - -> TQueue m (Authenticated outbound) - -> NetworkComponent m inbound outbound a +failingNetwork :: + MonadAsync m => + TVar m StdGen -> + Party -> + TQueue m inbound -> + TQueue m (Authenticated outbound) -> + NetworkComponent m inbound outbound a failingNetwork seed peer readQueue writeQueue callback action = withAsync ( forever $ do @@ -281,12 +295,12 @@ failingNetwork seed peer readQueue writeQueue callback action = $ \_ -> action $ Network - { broadcast = \m -> dropPercent 0.02 seed $ - atomically $ + { broadcast = \m -> + dropPercent 0.02 seed $ + atomically $ writeTQueue writeQueue (Authenticated m peer) } - dropPercent :: MonadSTM m => Double -> TVar m StdGen -> m () -> m () dropPercent x seed f = do r <- randomNumber seed diff --git a/hydra-node/test/Test/Hydra/Fixture.hs b/hydra-node/test/Test/Hydra/Fixture.hs index 575cf65ae1d..2d5b4c88408 100644 --- a/hydra-node/test/Test/Hydra/Fixture.hs +++ b/hydra-node/test/Test/Hydra/Fixture.hs @@ -15,6 +15,24 @@ import Hydra.Ledger.Cardano (genVerificationKey) import Hydra.OnChainId (AsType (AsOnChainId), OnChainId) import Hydra.Party (Party (..), deriveParty) +data Actor + = Alice + | Bob + | Carol + deriving stock (Eq, Show, Ord) + +actorName :: Actor -> Text +actorName = \case + Alice -> "alice" + Bob -> "bob" + Carol -> "carol" + +actorParty :: Actor -> Party +actorParty = \case + Alice -> deriveParty aliceSk + Bob -> deriveParty bobSk + Carol -> deriveParty carolSk + -- | Our beloved alice, bob, and carol. alice, bob, carol :: Party alice = deriveParty aliceSk