Skip to content

Commit

Permalink
Add failingMessagePersistance function
Browse files Browse the repository at this point in the history
  • Loading branch information
locallycompact committed May 15, 2024
1 parent 476b8c8 commit c278876
Showing 1 changed file with 74 additions and 36 deletions.
110 changes: 74 additions & 36 deletions hydra-node/test/Hydra/Network/ReliabilitySpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ import Control.Concurrent.Class.MonadSTM (
newTVarIO,
writeTVar,
)
import Control.Monad.IOSim (runSimOrThrow)
import Control.Monad.IOSim (IOSim, runSimOrThrow)
import Control.Tracer (Tracer (..), nullTracer)
import Data.Sequence.Strict ((|>))
import Data.Vector (Vector, empty, fromList, head, replicate, snoc)
import Data.Vector qualified as Vector
import Hydra.Network (Network (..))
import Hydra.Network (Network (..), NodeId)
import Hydra.Network.Authenticate (Authenticated (..))
import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat)
import Hydra.Network.Message (Connectivity)
import Hydra.Network.Ouroboros (NetworkComponent)
import Hydra.Network.Reliability (MessagePersistence (..), ReliabilityLog (..), ReliableMsg (..), withReliability)
import Hydra.Node.Network (withFlipHeartbeats)
import Hydra.Persistence (
Expand All @@ -30,7 +31,7 @@ import Hydra.Persistence (
)
import System.Directory (doesFileExist)
import System.FilePath ((</>))
import System.Random (mkStdGen, uniformR)
import System.Random (StdGen, mkStdGen, uniformR)
import Test.Hydra.Fixture (alice, bob, carol)
import Test.QuickCheck (
Positive (Positive),
Expand All @@ -42,6 +43,7 @@ import Test.QuickCheck (
(===),
)
import Prelude (unlines)
import Hydra.Party (Party)

spec :: Spec
spec = parallel $ do
Expand Down Expand Up @@ -119,13 +121,13 @@ spec = parallel $ do
randomSeed <- newTVarIO $ mkStdGen seed
aliceToBob <- newTQueueIO
bobToAlice <- newTQueueIO
alicePersistence <- mockMessagePersistence 2
bobPersistence <- mockMessagePersistence 2
alicePersistence <- failingMessagePersistence randomSeed 2
bobPersistence <- failingMessagePersistence randomSeed 2
let
-- this is a NetworkComponent that broadcasts authenticated messages
-- mediated through a read and a write TQueue but drops 0.2 % of them
aliceFailingNetwork = failingNetwork randomSeed alice (bobToAlice, aliceToBob)
bobFailingNetwork = failingNetwork randomSeed bob (aliceToBob, bobToAlice)
aliceFailingNetwork = failingNetwork randomSeed alice bobToAlice aliceToBob
bobFailingNetwork = failingNetwork randomSeed bob aliceToBob bobToAlice

bobReliabilityStack = reliabilityStack bobPersistence bobFailingNetwork (captureTraces emittedTraces) "bob" bob [alice]
aliceReliabilityStack = reliabilityStack alicePersistence aliceFailingNetwork (captureTraces emittedTraces) "alice" alice [bob]
Expand Down Expand Up @@ -224,35 +226,59 @@ spec = parallel $ do
(waitForAllMessages expectedMessages receivedMessageContainer)
(waitForAllMessages messagesToSend sentMessageContainer)

reliabilityStack persistence underlyingNetwork tracer nodeId party peers =
withHeartbeat nodeId $
withFlipHeartbeats $
withReliability tracer persistence party peers underlyingNetwork

failingNetwork seed peer (readQueue, writeQueue) callback action =
withAsync
( forever $ do
newMsg <- atomically $ readTQueue readQueue
callback newMsg
)
$ \_ ->
action $
Network
{ broadcast = \m -> atomically $ do
-- drop 2% of messages
r <- randomNumber seed
unless (r < 0.02) $ writeTQueue writeQueue (Authenticated m peer)
}
randomNumber seed' = do
genSeed <- readTVar seed'
let (res, newGenSeed) = uniformR (0 :: Double, 1) genSeed
writeTVar seed' newGenSeed
pure res

reloadAll :: FilePath -> IO [Heartbeat (Heartbeat String)]
reloadAll fileName =
createPersistenceIncremental fileName
>>= \PersistenceIncremental{loadAll} -> loadAll
reliabilityStack :: (MonadThrow m, MonadThrow (STM m), MonadAsync m, MonadDelay m) => MessagePersistence m outbound
-> NetworkComponent
m
(Authenticated (ReliableMsg (Heartbeat inbound)))
(ReliableMsg (Heartbeat outbound))
a
-> Tracer m ReliabilityLog
-> NodeId
-> Party
-> [Party]
-> NetworkComponent
m (Either Connectivity (Authenticated inbound)) outbound a
reliabilityStack persistence underlyingNetwork tracer nodeId party peers =
withHeartbeat nodeId $
withFlipHeartbeats $
withReliability tracer persistence party peers underlyingNetwork
reloadAll :: FilePath -> IO [Heartbeat (Heartbeat String)]
reloadAll fileName =
createPersistenceIncremental fileName
>>= \PersistenceIncremental{loadAll} -> loadAll


failingNetwork :: MonadAsync m => TVar m StdGen
-> Party
-> TQueue m inbound
-> TQueue m (Authenticated outbound)
-> NetworkComponent m inbound outbound a
failingNetwork seed peer readQueue writeQueue callback action =
withAsync
( forever $ do
newMsg <- atomically $ readTQueue readQueue
dropPercent 0.02 seed $ callback newMsg
)
$ \_ ->
action $
Network
{ broadcast = \m -> dropPercent 0.02 seed $
atomically $
writeTQueue writeQueue (Authenticated m peer)
}


dropPercent :: MonadSTM m => Double -> TVar m StdGen -> m () -> m ()
dropPercent x seed f = do
r <- randomNumber seed
unless (r < x) f

randomNumber :: MonadSTM m => TVar m StdGen -> m Double
randomNumber seed' = do
genSeed <- readTVarIO seed'
let (res, newGenSeed) = uniformR (0 :: Double, 1) genSeed
atomically $ writeTVar seed' newGenSeed
pure res

noop :: Monad m => b -> m ()
noop = const $ pure ()
Expand Down Expand Up @@ -299,6 +325,18 @@ captureTraces ::
captureTraces tvar = Tracer $ \msg -> do
atomically $ modifyTVar' tvar (msg :)

failingMessagePersistence :: MonadSTM m => TVar m StdGen -> Int -> m (MessagePersistence m msg)
failingMessagePersistence seed numberOfParties = do
acks <- newTVarIO $ replicate numberOfParties 0
messages <- newTVarIO mempty
pure $
MessagePersistence
{ loadAcks = readTVarIO acks
, saveAcks = dropPercent 0.02 seed . atomically . writeTVar acks
, loadMessages = toList <$> readTVarIO messages
, appendMessage = \msg -> dropPercent 0.02 seed $ atomically $ modifyTVar' messages (|> msg)
}

mockMessagePersistence :: Int -> MonadSTM m => m (MessagePersistence m msg)
mockMessagePersistence numberOfParties = do
acks <- newTVarIO $ replicate numberOfParties 0
Expand Down

0 comments on commit c278876

Please sign in to comment.