diff --git a/hydra-node/src/Hydra/API/Server.hs b/hydra-node/src/Hydra/API/Server.hs index 13bd0b7ea80..78c573ee5de 100644 --- a/hydra-node/src/Hydra/API/Server.hs +++ b/hydra-node/src/Hydra/API/Server.hs @@ -31,7 +31,7 @@ import Hydra.Chain.Direct.State () import Hydra.Logging (Tracer, traceWith) import Hydra.Network (IP, PortNumber) import Hydra.Party (Party) -import Hydra.Persistence (NewPersistenceIncremental(..), EventSource(..), putEventToSinks) +import Hydra.Persistence (EventSource (..), NewPersistenceIncremental (..), putEventToSinks) import Network.Wai.Handler.Warp ( defaultSettings, runSettings, @@ -68,7 +68,7 @@ withAPIServer :: Chain tx IO -> PParams LedgerEra -> ServerComponent tx IO () -withAPIServer host port party NewPersistenceIncremental{eventSource=EventSource{getEvents'}, eventSinks = sinks} tracer chain pparams callback action = +withAPIServer host port party NewPersistenceIncremental{eventSource = EventSource{getEvents'}, eventSinks = sinks} tracer chain pparams callback action = handle onIOException $ do responseChannel <- newBroadcastTChanIO timedOutputEvents <- getEvents' diff --git a/hydra-node/src/Hydra/HeadLogic/Outcome.hs b/hydra-node/src/Hydra/HeadLogic/Outcome.hs index a3a2cdf3ba8..40c58ff5b69 100644 --- a/hydra-node/src/Hydra/HeadLogic/Outcome.hs +++ b/hydra-node/src/Hydra/HeadLogic/Outcome.hs @@ -103,11 +103,9 @@ getStateChangeID = \case ChainRolledBack{stateChangeID} -> stateChangeID TickObserved{stateChangeID} -> stateChangeID ---FIXME(Elaine): these stateChangeID fields were added in an attempt to make every StateChanged keep track of its ID +-- FIXME(Elaine): these stateChangeID fields were added in an attempt to make every StateChanged keep track of its ID -- it's not clear how to handle the state for this. but for now the field is kept so that the type of putEvent' can be kept simple, and shouldn't do harm - - instance (IsTx tx, Arbitrary (HeadState tx), Arbitrary (ChainStateType tx)) => Arbitrary (StateChanged tx) where arbitrary = genericArbitrary diff --git a/hydra-node/src/Hydra/Network/Reliability.hs b/hydra-node/src/Hydra/Network/Reliability.hs index afcf356fd5a..ceaf03803f5 100644 --- a/hydra-node/src/Hydra/Network/Reliability.hs +++ b/hydra-node/src/Hydra/Network/Reliability.hs @@ -111,7 +111,7 @@ import Hydra.Network (Network (..), NetworkComponent) import Hydra.Network.Authenticate (Authenticated (..)) import Hydra.Network.Heartbeat (Heartbeat (..), isPing) import Hydra.Party (Party) -import Hydra.Persistence (Persistence (..), PersistenceIncremental (..), NewPersistenceIncremental (..), EventSource(..), EventSink(..), putEventToSinks) +import Hydra.Persistence (EventSink (..), EventSource (..), NewPersistenceIncremental (..), Persistence (..), PersistenceIncremental (..), putEventToSinks) import Test.QuickCheck (getPositive, listOf) data ReliableMsg msg = ReliableMsg @@ -199,7 +199,7 @@ mkMessagePersistence numberOfParties NewPersistenceIncremental{eventSource, even , saveAcks = \acks -> do save ackPersistence acks , loadMessages = do - getEvents' eventSource + getEvents' eventSource , appendMessage = \msg -> do putEventToSinks eventSinks msg } diff --git a/hydra-node/src/Hydra/Node.hs b/hydra-node/src/Hydra/Node.hs index ecd74e2120c..561fede171c 100644 --- a/hydra-node/src/Hydra/Node.hs +++ b/hydra-node/src/Hydra/Node.hs @@ -1,6 +1,6 @@ {-# LANGUAGE DuplicateRecordFields #-} -{-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE UndecidableInstances #-} -- | Top-level module to run a single Hydra node. -- @@ -14,9 +14,9 @@ import Hydra.Prelude import Control.Concurrent.Class.MonadSTM ( MonadLabelledSTM, labelTVarIO, + modifyTVar, newTVarIO, stateTVar, - modifyTVar, ) import Control.Monad.Trans.Writer (execWriter, tell) import Hydra.API.Server (Server, sendOutput) @@ -56,7 +56,7 @@ import Hydra.Node.EventQueue (EventQueue (..), Queued (..)) import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..)) import Hydra.Options (ChainConfig (..), DirectChainConfig (..), RunOptions (..), defaultContestationPeriod) import Hydra.Party (Party (..), deriveParty) -import Hydra.Persistence (PersistenceIncremental (..), NewPersistenceIncremental(..), EventSource(..), EventSink(..), putEventToSinks, putEventsToSinks, NewPersistenceIncremental) +import Hydra.Persistence (EventSink (..), EventSource (..), NewPersistenceIncremental (..), PersistenceIncremental (..), putEventToSinks, putEventsToSinks) -- * Environment Handling @@ -148,7 +148,7 @@ data HydraNode tx m = HydraNode -- , latestStateChangeId :: TVar m Word64 -- , eventSource :: EventSource (StateChanged tx) m -- , eventSinks :: NonEmpty (EventSink (StateChanged tx) m) - --FIXME(Elaine): bundle eventSource,Sinks, latestStateChangeId into a single type for convenience? + -- FIXME(Elaine): bundle eventSource,Sinks, latestStateChangeId into a single type for convenience? -- they should still definitely be separable too } @@ -196,7 +196,7 @@ stepHydraNode tracer node = do e@Queued{eventId, queuedEvent} <- nextEvent eq traceWith tracer $ BeginEvent{by = party, eventId, event = queuedEvent} outcome <- atomically $ do - nextStateChangeID <- readTVar . lastStateChangeId $ persistence node-- an event won't necessarily produce a statechange, but if it does, then this'll be its ID + nextStateChangeID <- readTVar . lastStateChangeId $ persistence node -- an event won't necessarily produce a statechange, but if it does, then this'll be its ID processNextEvent node queuedEvent nextStateChangeID traceWith tracer (LogicOutcome party outcome) handleOutcome e outcome @@ -246,16 +246,18 @@ processNextEvent HydraNode{nodeState, ledger, env} e nextStateChangeID = do computeOutcome = Logic.update env ledger nextStateChangeID processNextStateChange :: - forall m e tx. (Monad m, MonadSTM m, ToJSON (StateChanged tx)) => + forall m e tx. + (Monad m, MonadSTM m, ToJSON (StateChanged tx)) => HydraNode tx m -> NonEmpty (EventSink (StateChanged tx) m) -> StateChanged tx -> m () processNextStateChange HydraNode{persistence} sinks sc = do (putEventToSinks @m @(StateChanged tx)) sinks sc - atomically $ modifyTVar (lastStateChangeId persistence) (+1) - --FIXME(Elaine): put this whole thing in a single `atomically` call - -- that should be possible but there's some annoying classy monad transformer types to deal with + atomically $ modifyTVar (lastStateChangeId persistence) (+ 1) + +-- FIXME(Elaine): put this whole thing in a single `atomically` call +-- that should be possible but there's some annoying classy monad transformer types to deal with processEffects :: ( MonadAsync m @@ -335,4 +337,4 @@ loadStateEventSource tracer eventSource eventSinks defaultChainState = do Just sinks' -> putEventsToSinks sinks' events pure (headState, chainStateHistory) where - initialState = Idle IdleState{chainState = defaultChainState} \ No newline at end of file + initialState = Idle IdleState{chainState = defaultChainState} diff --git a/hydra-node/src/Hydra/Node/Network.hs b/hydra-node/src/Hydra/Node/Network.hs index c01c2f1ce31..396972d7436 100644 --- a/hydra-node/src/Hydra/Node/Network.hs +++ b/hydra-node/src/Hydra/Node/Network.hs @@ -83,7 +83,7 @@ import Hydra.Network.Reliability (MessagePersistence, ReliableMsg, mkMessagePers import Hydra.Node (HydraNodeLog (..)) import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..)) import Hydra.Party (Party, deriveParty) -import Hydra.Persistence (Persistence (..), createPersistence, createPersistenceIncremental, createNewPersistenceIncremental) +import Hydra.Persistence (Persistence (..), createNewPersistenceIncremental, createPersistence, createPersistenceIncremental) import System.FilePath (()) -- | An alias for logging messages output by network component. @@ -150,7 +150,7 @@ configureMessagePersistence :: m (MessagePersistence m msg) configureMessagePersistence tracer persistenceDir numberOfParties = do msgPersistence <- createNewPersistenceIncremental $ storedMessagesFile persistenceDir - --NOTE(Elaine): after deliberation, new persistence should be fine for network messages + -- NOTE(Elaine): after deliberation, new persistence should be fine for network messages ackPersistence@Persistence{load} <- createPersistence $ acksFile persistenceDir mAcks <- load ackPersistence' <- case fmap (\acks -> length acks == numberOfParties) mAcks of diff --git a/hydra-node/src/Hydra/Node/Run.hs b/hydra-node/src/Hydra/Node/Run.hs index 9c115f18e85..e114a503ccb 100644 --- a/hydra-node/src/Hydra/Node/Run.hs +++ b/hydra-node/src/Hydra/Node/Run.hs @@ -36,7 +36,8 @@ import Hydra.Node ( createNodeState, initEnvironment, loadState, - runHydraNode, loadStateEventSource, + loadStateEventSource, + runHydraNode, ) import Hydra.Node.EventQueue (EventQueue (..), createEventQueue) import Hydra.Node.Network (NetworkConfiguration (..), withNetwork) @@ -49,9 +50,9 @@ import Hydra.Options ( RunOptions (..), validateRunOptions, ) -import Hydra.Persistence (NewPersistenceIncremental (..), createPersistenceIncremental, eventPairFromPersistenceIncremental, createNewPersistenceIncremental) +import Hydra.Persistence (NewPersistenceIncremental (..), createNewPersistenceIncremental, createPersistenceIncremental, eventPairFromPersistenceIncremental) -import qualified Data.List.NonEmpty as NE +import Data.List.NonEmpty qualified as NE data ConfigurationException = ConfigurationException ProtocolParametersConversionError @@ -83,7 +84,7 @@ run opts = do withCardanoLedger pparams globals $ \ledger -> do -- persistence <- createPersistenceIncremental $ persistenceDir <> "/state" - --TODO(Elaine): remove in favor of eventSource/Sink directly + -- TODO(Elaine): remove in favor of eventSource/Sink directly -- (eventSource, eventSink) <- createEventPairIncremental $ persistenceDir <> "/state" -- let -- (eventSource, eventSink) = eventPairFromPersistenceIncremental persistence diff --git a/hydra-node/src/Hydra/Persistence.hs b/hydra-node/src/Hydra/Persistence.hs index 58e6b23b4e7..dd0e8e118c8 100644 --- a/hydra-node/src/Hydra/Persistence.hs +++ b/hydra-node/src/Hydra/Persistence.hs @@ -4,15 +4,15 @@ module Hydra.Persistence where import Hydra.Prelude -import Control.Concurrent.Class.MonadSTM (newTVarIO, throwSTM, writeTVar, MonadSTM (modifyTVar')) +import Control.Concurrent.Class.MonadSTM (MonadSTM (modifyTVar'), newTVarIO, throwSTM, writeTVar) import Control.Monad.Class.MonadFork (myThreadId) import Data.Aeson qualified as Aeson import Data.ByteString qualified as BS import Data.ByteString.Char8 qualified as C8 +import Hydra.HeadLogic.Outcome (StateChanged (stateChangeID), getStateChangeID) -- FIXME(Elaine): move this import to whatever is re-exporting StateChanged in Hydra.Preude import System.Directory (createDirectoryIfMissing, doesFileExist) import System.FilePath (takeDirectory) import UnliftIO.IO.File (withBinaryFile, writeBinaryFileDurableAtomic) -import Hydra.HeadLogic.Outcome (getStateChangeID, StateChanged (stateChangeID)) --FIXME(Elaine): move this import to whatever is re-exporting StateChanged in Hydra.Preude data PersistenceException = PersistenceException String @@ -50,21 +50,23 @@ createPersistence fp = do Right a -> pure (Just a) } -data EventSource e m = EventSource { getEvents' :: FromJSON e => m [e] } -data EventSink e m = EventSink { putEvent' :: ToJSON e => e -> m () } ---FIXME(Elaine): we have to figure out a better taxonomy/nomenclature for the events/statechange stuff +data EventSource e m = EventSource {getEvents' :: FromJSON e => m [e]} +data EventSink e m = EventSink {putEvent' :: ToJSON e => e -> m ()} + +-- FIXME(Elaine): we have to figure out a better taxonomy/nomenclature for the events/statechange stuff -- the eventID here is not the same as the eventID in Queued, that one is more fickle and influenced by non state change events -- this one is only incremented when we have a new state change event ---FIXME(Elaine): primary createPersistenceIncremental is in Run.hs, that's swapped now +-- FIXME(Elaine): primary createPersistenceIncremental is in Run.hs, that's swapped now -- but replacing PersistenceIncremental outside of that, for network messages ex, seems like it should happen after, to not break too much at once + -- | Handle to save incrementally and load files to/from disk using JSON encoding. data PersistenceIncremental a m = PersistenceIncremental { append :: ToJSON a => a -> m () , loadAll :: FromJSON a => m [a] } ---FIXME(Elaine): rename this, just taking the name of PersistenceIncremental once thats fully removed might suffice +-- FIXME(Elaine): rename this, just taking the name of PersistenceIncremental once thats fully removed might suffice data NewPersistenceIncremental a m = NewPersistenceIncremental { eventSource :: EventSource a m , eventSinks :: NonEmpty (EventSink a m) @@ -78,12 +80,12 @@ putEventsToSinks :: forall m e. (Monad m, ToJSON e) => NonEmpty (EventSink e m) putEventsToSinks sinks es = forM_ es (\e -> putEventToSinks sinks e) -- FIXME(Elaine): this needs to be the reverse, since we need to keep track of the eventID ---FIXME(Elaine): neither this nor the opposite direction can handle re-submission properly without keeping track of the state separately in step/hydranode & +-- FIXME(Elaine): neither this nor the opposite direction can handle re-submission properly without keeping track of the state separately in step/hydranode & -- so this means removing the old persistence for the purpose of statechanged events is more urgent eventPairFromPersistenceIncremental :: PersistenceIncremental a m -> (EventSource a m, EventSink a m) eventPairFromPersistenceIncremental PersistenceIncremental{append, loadAll} = - let eventSource = EventSource {getEvents' = loadAll} - eventSink = EventSink {putEvent' = append} + let eventSource = EventSource{getEvents' = loadAll} + eventSink = EventSink{putEvent' = append} in (eventSource, eventSink) -- persistenceIncrementalFromEventPair :: (Monad m, ToJSON a, FromJSON a) => (EventSource a m, EventSink a m) -> PersistenceIncremental a m @@ -101,68 +103,69 @@ createNewPersistenceIncremental fp = do authorizedThread <- newTVarIO Nothing lastStateChangeId <- newTVarIO (0 :: Word64) -- FIXME(Elaine): eventid too general for this, at least not without writing the eventids to disk, but even then, hacky - -- we'll have a new ID for each statechanged event, + -- we'll have a new ID for each statechanged event, -- i think this is probablyh fine and doesn't need fixing, but wanted to write it down first -- the eventId here is a monotonically increasing integer, and it lets us keep track of how "far along" we are in the persistence -- we can use this to skip resubmitting events -- more complicated solutions would be possible, in particular, rolling hash / merkle chain might be more resilient to corruption -- but given that persistence is already atomic and only needs to be consistent within a single node, it should suffice nextId <- newTVarIO (0 :: Word64) - let eventSource = EventSource - { getEvents' = do - tid <- myThreadId - atomically $ do - authTid <- readTVar authorizedThread - when (isJust authTid && authTid /= Just tid) $ - throwSTM (IncorrectAccessException $ "Trying to load persisted data in " <> fp <> " from different thread") - - liftIO (doesFileExist fp) >>= \case - False -> pure [] - True -> do - bs <- readFileBS fp - -- NOTE: We require the whole file to be loadable. It might - -- happen that the data written by 'append' is only there - -- partially and then this will fail (which we accept now). - result <- case forM (C8.lines bs) Aeson.eitherDecodeStrict' of - Left e -> throwIO $ PersistenceException e - Right decoded -> pure decoded - -- set initial nextId (zero-indexed) based on how many state change events we have - atomically $ do - writeTVar lastStateChangeId $ fromIntegral $ length result - writeTVar nextId . fromIntegral $ length result - - pure result - } - eventSink = EventSink - { putEvent' = \a -> do - threadId <- myThreadId - isEventNew <- atomically $ do - let stateChangeID = (undefined a) :: Word64 - -- FIXME(Elaine): we need to put getStateChangeID into a typeclass and add that constraint to a, in the EventSink type - -- or we can have separate versions of this for StateChanged, and for network functionality etc - - let outgoingStateChangeId = stateChangeID - -- outgoingStateChangeId <- readTVar $ stateChangeID -- this is the ID of the state change we just got from the node - -- it's not actually written to disk yet until this function is over - - - id <- readTVar nextId - writeTVar authorizedThread $ Just threadId - modifyTVar' nextId succ - pure $ outgoingStateChangeId `compare` id - let bytes = toStrict $ Aeson.encode a <> "\n" - case isEventNew of - -- event already persisted - LT -> pure () - -- event is as new as expected - EQ -> liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes) - -- event is newer than expected, - GT -> do - liftIO $ putStrLn "ELAINE: this shouldn't happen with my current understanding of stuff" - liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes) -- FIXME(Elaine): maybe error ? shouldnt really happen - } + let eventSource = + EventSource + { getEvents' = do + tid <- myThreadId + atomically $ do + authTid <- readTVar authorizedThread + when (isJust authTid && authTid /= Just tid) $ + throwSTM (IncorrectAccessException $ "Trying to load persisted data in " <> fp <> " from different thread") + + liftIO (doesFileExist fp) >>= \case + False -> pure [] + True -> do + bs <- readFileBS fp + -- NOTE: We require the whole file to be loadable. It might + -- happen that the data written by 'append' is only there + -- partially and then this will fail (which we accept now). + result <- case forM (C8.lines bs) Aeson.eitherDecodeStrict' of + Left e -> throwIO $ PersistenceException e + Right decoded -> pure decoded + -- set initial nextId (zero-indexed) based on how many state change events we have + atomically $ do + writeTVar lastStateChangeId $ fromIntegral $ length result + writeTVar nextId . fromIntegral $ length result + + pure result + } + eventSink = + EventSink + { putEvent' = \a -> do + threadId <- myThreadId + isEventNew <- atomically $ do + let stateChangeID = (undefined a) :: Word64 + -- FIXME(Elaine): we need to put getStateChangeID into a typeclass and add that constraint to a, in the EventSink type + -- or we can have separate versions of this for StateChanged, and for network functionality etc + + let outgoingStateChangeId = stateChangeID + -- outgoingStateChangeId <- readTVar $ stateChangeID -- this is the ID of the state change we just got from the node + -- it's not actually written to disk yet until this function is over + + id <- readTVar nextId + writeTVar authorizedThread $ Just threadId + modifyTVar' nextId succ + pure $ outgoingStateChangeId `compare` id + let bytes = toStrict $ Aeson.encode a <> "\n" + case isEventNew of + -- event already persisted + LT -> pure () + -- event is as new as expected + EQ -> liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes) + -- event is newer than expected, + GT -> do + liftIO $ putStrLn "ELAINE: this shouldn't happen with my current understanding of stuff" + liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes) -- FIXME(Elaine): maybe error ? shouldnt really happen + } eventSinks = eventSink :| [] - pure NewPersistenceIncremental { eventSource, eventSinks, lastStateChangeId} + pure NewPersistenceIncremental{eventSource, eventSinks, lastStateChangeId} -- | Initialize persistence handle for given type 'a' at given file path. -- diff --git a/hydra-node/test/Hydra/API/ServerSpec.hs b/hydra-node/test/Hydra/API/ServerSpec.hs index 58b973096a0..b9a8e16860d 100644 --- a/hydra-node/test/Hydra/API/ServerSpec.hs +++ b/hydra-node/test/Hydra/API/ServerSpec.hs @@ -38,7 +38,7 @@ import Hydra.Logging (Tracer, showLogsOnFailure) import Hydra.Network (PortNumber) import Hydra.Options qualified as Options import Hydra.Party (Party) -import Hydra.Persistence (PersistenceIncremental (..), NewPersistenceIncremental(..), createPersistenceIncremental, eventPairFromPersistenceIncremental) +import Hydra.Persistence (NewPersistenceIncremental (..), PersistenceIncremental (..), createPersistenceIncremental, eventPairFromPersistenceIncremental) import Hydra.Snapshot (Snapshot (Snapshot, utxo)) import Network.WebSockets (Connection, ConnectionException, receiveData, runClient, sendBinaryData) import System.IO.Error (isAlreadyInUseError) @@ -373,9 +373,9 @@ withTestAPIServer :: withTestAPIServer port actor persistence tracer action = do lastStateChangeId <- newTVarIO 0 let (eventSource, eventSink) = eventPairFromPersistenceIncremental persistence - persistenceNew = NewPersistenceIncremental {eventSource, eventSinks = eventSink :| [], lastStateChangeId} - --FIXME(Elaine): lastStateChangeId is technically okay for mockPersistence but elsewhere not guaranteed - -- still, this saves us a little bit of debug time + persistenceNew = NewPersistenceIncremental{eventSource, eventSinks = eventSink :| [], lastStateChangeId} + -- FIXME(Elaine): lastStateChangeId is technically okay for mockPersistence but elsewhere not guaranteed + -- still, this saves us a little bit of debug time withAPIServer @SimpleTx "127.0.0.1" port actor persistenceNew tracer dummyChainHandle defaultPParams noop action -- | Connect to a websocket server running at given path. Fails if not connected diff --git a/hydra-node/test/Hydra/BehaviorSpec.hs b/hydra-node/test/Hydra/BehaviorSpec.hs index 3c1c7b3ac78..9a43bcd54fa 100644 --- a/hydra-node/test/Hydra/BehaviorSpec.hs +++ b/hydra-node/test/Hydra/BehaviorSpec.hs @@ -63,7 +63,7 @@ import Hydra.Node ( import Hydra.Node.EventQueue (EventQueue (putEvent), createEventQueue) import Hydra.NodeSpec (createPersistenceInMemory) import Hydra.Party (Party (..), deriveParty) -import Hydra.Persistence (eventPairFromPersistenceIncremental, NewPersistenceIncremental(..)) +import Hydra.Persistence (NewPersistenceIncremental (..), eventPairFromPersistenceIncremental) import Hydra.Snapshot (Snapshot (..), SnapshotNumber, getSnapshot) import Test.Aeson.GenericSpecs (roundtripAndGoldenSpecs) import Test.Hydra.Fixture (alice, aliceSk, bob, bobSk, deriveOnChainId, testHeadId, testHeadSeed) @@ -779,10 +779,10 @@ createHydraNode ledger nodeState signingKey otherParties outputs outputHistory c persistence <- createPersistenceInMemory let (eventSource, eventSink) = eventPairFromPersistenceIncremental persistence eventSinks = eventSink :| [] - - --FIXME(Elaine): initialize last state change ID - let persistence = NewPersistenceIncremental {eventSource, eventSinks, lastStateChangeId = error "lastStateChangeId not implemented"} - + + -- FIXME(Elaine): initialize last state change ID + let persistence = NewPersistenceIncremental{eventSource, eventSinks, lastStateChangeId = error "lastStateChangeId not implemented"} + connectNode chain $ HydraNode { eq diff --git a/hydra-node/test/Hydra/NodeSpec.hs b/hydra-node/test/Hydra/NodeSpec.hs index c153b099533..30feb946da6 100644 --- a/hydra-node/test/Hydra/NodeSpec.hs +++ b/hydra-node/test/Hydra/NodeSpec.hs @@ -39,10 +39,8 @@ import Hydra.Node.EventQueue (EventQueue (..), createEventQueue) import Hydra.Node.ParameterMismatch (ParameterMismatch (..)) import Hydra.Options (defaultContestationPeriod) import Hydra.Party (Party, deriveParty) -import Hydra.Persistence (PersistenceIncremental (..), NewPersistenceIncremental (..), eventPairFromPersistenceIncremental, EventSource(..), EventSink(..), createNewPersistenceIncremental) +import Hydra.Persistence (EventSink (..), EventSource (..), NewPersistenceIncremental (..), PersistenceIncremental (..), createNewPersistenceIncremental, eventPairFromPersistenceIncremental) import Test.Hydra.Fixture (alice, aliceSk, bob, bobSk, carol, carolSk, deriveOnChainId, testHeadId, testHeadSeed) -import Hydra.Persistence (EventSource) -import Hydra.Persistence (EventSink) spec :: Spec spec = parallel $ do @@ -250,13 +248,11 @@ createHydraNode :: m (HydraNode SimpleTx m) createHydraNode = createHydraNode' eventSource (eventSink :| []) - - where append = const $ pure () loadAll = pure [] - eventSource = EventSource {getEvents' = loadAll} - eventSink = EventSink {putEvent' = append} + eventSource = EventSource{getEvents' = loadAll} + eventSink = EventSink{putEvent' = append} createHydraNode' :: (MonadDelay m, MonadAsync m, MonadLabelledSTM m, MonadThrow m) => @@ -273,8 +269,8 @@ createHydraNode' eventSource eventSinks signingKey otherParties contestationPeri (headState, _) <- loadState nullTracer eventSource SimpleChainState{slot = ChainSlot 0} nodeState <- createNodeState headState - --FIXME(Elaine): initialize last state change ID - let persistence = NewPersistenceIncremental {eventSource, eventSinks, lastStateChangeId = error "lastStateChangeId not implemented"} + -- FIXME(Elaine): initialize last state change ID + let persistence = NewPersistenceIncremental{eventSource, eventSinks, lastStateChangeId = error "lastStateChangeId not implemented"} pure $ HydraNode @@ -323,13 +319,14 @@ recordPersistedItems node = do atomically $ modifyTVar lastStateChangeId succ record e pure - ( node{persistence = - NewPersistenceIncremental { - eventSource = EventSource {getEvents'}, - eventSinks = EventSink{putEvent'} :| [], - lastStateChangeId - } - } + ( node + { persistence = + NewPersistenceIncremental + { eventSource = EventSource{getEvents'} + , eventSinks = EventSink{putEvent'} :| [] + , lastStateChangeId + } + } , query ) diff --git a/hydra-node/test/Hydra/PersistenceSpec.hs b/hydra-node/test/Hydra/PersistenceSpec.hs index 93c87323dd5..67949f12b6c 100644 --- a/hydra-node/test/Hydra/PersistenceSpec.hs +++ b/hydra-node/test/Hydra/PersistenceSpec.hs @@ -8,11 +8,10 @@ import Test.Hydra.Prelude import Data.Aeson (Value (..)) import Data.Aeson qualified as Aeson import Data.Text qualified as Text -import Hydra.Persistence (Persistence (..), PersistenceException (..), PersistenceIncremental (..), createPersistence, createPersistenceIncremental, putEvent', putEventsToSinks, getEvents') +import Hydra.Persistence (Persistence (..), PersistenceException (..), PersistenceIncremental (..), createPersistence, createPersistenceIncremental, eventPairFromPersistenceIncremental, getEvents', putEvent', putEventsToSinks) import Test.QuickCheck (checkCoverage, cover, elements, oneof, suchThat, (===)) import Test.QuickCheck.Gen (listOf) import Test.QuickCheck.Monadic (monadicIO, monitor, pick, run) -import Hydra.Persistence (eventPairFromPersistenceIncremental) spec :: Spec spec = do @@ -84,15 +83,15 @@ spec = do let (eventSource, eventSink) = eventPairFromPersistenceIncremental persistEventSource putEventsToSinks (eventSink :| []) items - -- initialize some event sinks + -- initialize some event sinks persistSink1 <- createPersistenceIncremental $ tmpDir <> "/data1" persistSink2 <- createPersistenceIncremental $ tmpDir <> "/data2" let (sink1Source, sink1Sink) = eventPairFromPersistenceIncremental persistSink1 (sink2Source, sink2Sink) = eventPairFromPersistenceIncremental persistSink2 - eventSinks = eventSink :| [sink1Sink, sink2Sink] - + eventSinks = eventSink :| [sink1Sink, sink2Sink] + -- load the event source, as if we had started a node - --TODO(Elaine): this on its own isn't enough to ensure persistence is working end to end, make sure to test that + -- TODO(Elaine): this on its own isn't enough to ensure persistence is working end to end, make sure to test that -- but it is an okay reference point -- maybe in node? -- test for loadStateEventSource