Skip to content

Commit

Permalink
initial work (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
cardenaso11 committed Jan 22, 2024
1 parent 9c7d20f commit f23b790
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 29 deletions.
42 changes: 33 additions & 9 deletions hydra-node/src/Hydra/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import Hydra.Node.EventQueue (EventQueue (..), Queued (..))
import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..))
import Hydra.Options (ChainConfig (..), DirectChainConfig (..), RunOptions (..), defaultContestationPeriod)
import Hydra.Party (Party (..), deriveParty)
import Hydra.Persistence (PersistenceIncremental (..))
import Hydra.Persistence (PersistenceIncremental (..), NewPersistenceIncremental(..), EventSource(..), EventSink(..), putEventToSinks, putEventsToSinks, NewPersistenceIncremental)

-- * Environment Handling

Expand Down Expand Up @@ -142,7 +142,12 @@ data HydraNode tx m = HydraNode
, server :: Server tx m
, ledger :: Ledger tx
, env :: Environment
, persistence :: PersistenceIncremental (StateChanged tx) m
, persistence :: NewPersistenceIncremental (StateChanged tx) m
-- , latestStateChangeId :: TVar m Word64
-- , eventSource :: EventSource (StateChanged tx) m
-- , eventSinks :: NonEmpty (EventSink (StateChanged tx) m)
--FIXME(Elaine): bundle eventSource,Sinks, latestStateChangeId into a single type for convenience?
-- they should still definitely be separable too
}

data HydraNodeLog tx
Expand Down Expand Up @@ -197,7 +202,7 @@ stepHydraNode tracer node = do
handleOutcome e = \case
Error _ -> pure ()
Wait _reason -> putEventAfter eq waitDelay (decreaseTTL e)
StateChanged sc -> append sc
StateChanged sc -> putEventToSinks eventSinks sc
Effects _ -> pure ()
Combined l r -> handleOutcome e l >> handleOutcome e r

Expand All @@ -210,9 +215,7 @@ stepHydraNode tracer node = do

Environment{party} = env

PersistenceIncremental{append} = persistence

HydraNode{persistence, eq, env} = node
HydraNode{eq, env, persistence = NewPersistenceIncremental{eventSinks}} = node

-- | The time to wait between re-enqueuing a 'Wait' outcome from 'HeadLogic'.
waitDelay :: DiffTime
Expand Down Expand Up @@ -280,14 +283,35 @@ createNodeState initialState = do
loadState ::
(MonadThrow m, IsChainState tx) =>
Tracer m (HydraNodeLog tx) ->
PersistenceIncremental (StateChanged tx) m ->
EventSource (StateChanged tx) m ->
ChainStateType tx ->
m (HeadState tx, ChainStateHistory tx)
loadState tracer persistence defaultChainState = do
events <- loadAll persistence
loadState tracer eventSource defaultChainState = do
events <- getEvents' eventSource
traceWith tracer LoadedState{numberOfEvents = fromIntegral $ length events}
let headState = recoverState initialState events
chainStateHistory = recoverChainStateHistory defaultChainState events
pure (headState, chainStateHistory)
where
initialState = Idle IdleState{chainState = defaultChainState}

loadStateEventSource ::
(MonadThrow m, MonadIO m, IsChainState tx) =>
Tracer m (HydraNodeLog tx) ->
EventSource (StateChanged tx) m ->
[EventSink (StateChanged tx) m] ->
ChainStateType tx ->
m (HeadState tx, ChainStateHistory tx)
loadStateEventSource tracer eventSource eventSinks defaultChainState = do
events <- getEvents' eventSource
traceWith tracer LoadedState{numberOfEvents = fromIntegral $ length events}
let headState = recoverState initialState events
chainStateHistory = recoverChainStateHistory defaultChainState events
-- deliver to sinks per spec, deduplication is handled by the sinks
-- FIXME(Elaine): persistence currently not handling duplication, so this relies on not providing the eventSource's sink as an arg here
case nonEmpty eventSinks of
Nothing -> putStrLn "ELAINE: deduplicate events for disk persistence so we can get rid of this kludge"
Just sinks' -> putEventsToSinks sinks' events
pure (headState, chainStateHistory)
where
initialState = Idle IdleState{chainState = defaultChainState}
19 changes: 15 additions & 4 deletions hydra-node/src/Hydra/Node/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import Hydra.Node (
createNodeState,
initEnvironment,
loadState,
runHydraNode,
runHydraNode, loadStateEventSource,
)
import Hydra.Node.EventQueue (EventQueue (..), createEventQueue)
import Hydra.Node.Network (NetworkConfiguration (..), withNetwork)
Expand All @@ -49,7 +49,9 @@ import Hydra.Options (
RunOptions (..),
validateRunOptions,
)
import Hydra.Persistence (createPersistenceIncremental)
import Hydra.Persistence (NewPersistenceIncremental (..), createPersistenceIncremental, eventPairFromPersistenceIncremental, createNewPersistenceIncremental)

import qualified Data.List.NonEmpty as NE

data ConfigurationException
= ConfigurationException ProtocolParametersConversionError
Expand Down Expand Up @@ -80,8 +82,17 @@ run opts = do
globals <- getGlobalsForChain chainConfig

withCardanoLedger pparams globals $ \ledger -> do
persistence <- createPersistenceIncremental $ persistenceDir <> "/state"
(hs, chainStateHistory) <- loadState (contramap Node tracer) persistence initialChainState
-- persistence <- createPersistenceIncremental $ persistenceDir <> "/state"
--TODO(Elaine): remove in favor of eventSource/Sink directly
-- (eventSource, eventSink) <- createEventPairIncremental $ persistenceDir <> "/state"

-- let -- (eventSource, eventSink) = eventPairFromPersistenceIncremental persistence
-- eventSinks = eventSink :| [] --FIXME(Elaine): load other event sinks
-- eventSinksSansSource = [] --TODO(Elaine): this needs a better name. essentially, don't load events back into where they came from, at least until disk-based persistence can handle redelivery
let eventSinksSansSource = undefined
persistence@NewPersistenceIncremental{eventSource, eventSinks} <- createNewPersistenceIncremental $ persistenceDir <> "/state"

(hs, chainStateHistory) <- loadStateEventSource (contramap Node tracer) eventSource (NE.toList eventSinks) initialChainState

checkHeadState (contramap Node tracer) env hs
nodeState <- createNodeState hs
Expand Down
102 changes: 101 additions & 1 deletion hydra-node/src/Hydra/Persistence.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Hydra.Persistence where

import Hydra.Prelude

import Control.Concurrent.Class.MonadSTM (newTVarIO, throwSTM, writeTVar)
import Control.Concurrent.Class.MonadSTM (newTVarIO, throwSTM, writeTVar, MonadSTM (modifyTVar'))
import Control.Monad.Class.MonadFork (myThreadId)
import Data.Aeson qualified as Aeson
import Data.ByteString qualified as BS
Expand Down Expand Up @@ -49,12 +49,112 @@ createPersistence fp = do
Right a -> pure (Just a)
}

data EventSource e m = EventSource { getEvents' :: FromJSON e => m [e] }
data EventSink e m = EventSink { putEvent' :: ToJSON e => e -> m () }
--FIXME(Elaine): we have to figure out a better taxonomy/nomenclature for the events/statechange stuff
-- the eventID here is not the same as the eventID in Queued, that one is more fickle and influenced by non state change events
-- this one is only incremented when we have a new state change event

--FIXME(Elaine): primary createPersistenceIncremental is in Run.hs, that's swapped now
-- but replacing PersistenceIncremental outside of that, for network messages ex, seems like it should happen after, to not break too much at once
-- | Handle to save incrementally and load files to/from disk using JSON encoding.
data PersistenceIncremental a m = PersistenceIncremental
{ append :: ToJSON a => a -> m ()
, loadAll :: FromJSON a => m [a]
}

--FIXME(Elaine): rename this, just taking the name of PersistenceIncremental once thats fully removed might suffice
data NewPersistenceIncremental a m = NewPersistenceIncremental
{ eventSource :: EventSource a m
, eventSinks :: NonEmpty (EventSink a m)
, lastStateChangeId :: TVar m Word64 -- FIXME(Elaine): name change , persistence captures more than just this
}

putEventToSinks :: (Monad m, ToJSON e) => NonEmpty (EventSink e m) -> e -> m ()
putEventToSinks sinks e = forM_ sinks (\sink -> putEvent' sink e)

putEventsToSinks :: (Monad m, ToJSON e) => NonEmpty (EventSink e m) -> [e] -> m ()
putEventsToSinks sinks es = forM_ es (\e -> putEventToSinks sinks e)

-- FIXME(Elaine): this needs to be the reverse, since we need to keep track of the eventID
--FIXME(Elaine): neither this nor the opposite direction can handle re-submission properly without keeping track of the state separately in step/hydranode &
-- so this means removing the old persistence for the purpose of statechanged events is more urgent
eventPairFromPersistenceIncremental :: PersistenceIncremental a m -> (EventSource a m, EventSink a m)
eventPairFromPersistenceIncremental PersistenceIncremental{append, loadAll} =
let eventSource = EventSource {getEvents' = loadAll}
eventSink = EventSink {putEvent' = append}
in (eventSource, eventSink)

-- persistenceIncrementalFromEventPair :: (Monad m, ToJSON a, FromJSON a) => (EventSource a m, EventSink a m) -> PersistenceIncremental a m
-- persistenceIncrementalFromEventPair (EventSource{getEvents'}, EventSink{putEvent'}) =
-- let append = putEvent' 0
-- loadAll = getEvents'
-- in PersistenceIncremental{append, loadAll}

createNewPersistenceIncremental ::
(MonadIO m, MonadThrow m, MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
FilePath ->
m (NewPersistenceIncremental a m)
createNewPersistenceIncremental fp = do
liftIO . createDirectoryIfMissing True $ takeDirectory fp
authorizedThread <- newTVarIO Nothing
lastStateChangeId <- newTVarIO 0
-- FIXME(Elaine): eventid too general for this, at least not without writing the eventids to disk, but even then, hacky
-- we'll have a new ID for each statechanged event,
-- i think this is probablyh fine and doesn't need fixing, but wanted to write it down first
-- the eventId here is a monotonically increasing integer, and it lets us keep track of how "far along" we are in the persistence
-- we can use this to skip resubmitting events
-- more complicated solutions would be possible, in particular, rolling hash / merkle chain might be more resilient to corruption
-- but given that persistence is already atomic and only needs to be consistent within a single node, it should suffice
nextId <- newTVarIO 0
let eventSource = EventSource
{ getEvents' = do
tid <- myThreadId
atomically $ do
authTid <- readTVar authorizedThread
when (isJust authTid && authTid /= Just tid) $
throwSTM (IncorrectAccessException $ "Trying to load persisted data in " <> fp <> " from different thread")

liftIO (doesFileExist fp) >>= \case
False -> pure []
True -> do
bs <- readFileBS fp
-- NOTE: We require the whole file to be loadable. It might
-- happen that the data written by 'append' is only there
-- partially and then this will fail (which we accept now).
result <- case forM (C8.lines bs) Aeson.eitherDecodeStrict' of
Left e -> throwIO $ PersistenceException e
Right decoded -> pure decoded
-- set initial nextId (zero-indexed) based on how many state change events we have
atomically $ do
writeTVar lastStateChangeId $ fromIntegral $ length result
writeTVar nextId $ length result

pure result
}
eventSink = EventSink
{ putEvent' = \a -> do
threadId <- myThreadId
isEventNew <- atomically $ do
let outgoingStateChangeId = undefined a -- FIXME(Elaine)
id <- readTVar nextId
writeTVar authorizedThread $ Just threadId
modifyTVar' nextId succ
pure $ outgoingStateChangeId `compare` id
let bytes = toStrict $ Aeson.encode a <> "\n"
case isEventNew of
-- event already persisted
LT -> pure ()
-- event is as new as expected
EQ -> liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes)
-- event is newer than expected,
GT -> do
liftIO $ putStrLn "ELAINE: this shouldn't happen with my current understanding of stuff"
liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes) -- FIXME(Elaine): maybe error ? shouldnt really happen
}
eventSinks = eventSink :| []
pure NewPersistenceIncremental { eventSource, eventSinks, lastStateChangeId}

-- | Initialize persistence handle for given type 'a' at given file path.
--
-- This instance of `PersistenceIncremental` is "thread-safe" in the sense that
Expand Down
6 changes: 5 additions & 1 deletion hydra-node/test/Hydra/BehaviorSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import Hydra.Node (
import Hydra.Node.EventQueue (EventQueue (putEvent), createEventQueue)
import Hydra.NodeSpec (createPersistenceInMemory)
import Hydra.Party (Party (..), deriveParty)
import Hydra.Persistence (eventPairFromPersistenceIncremental)
import Hydra.Snapshot (Snapshot (..), SnapshotNumber, getSnapshot)
import Test.Aeson.GenericSpecs (roundtripAndGoldenSpecs)
import Test.Hydra.Fixture (alice, aliceSk, bob, bobSk, deriveOnChainId, testHeadId, testHeadSeed)
Expand Down Expand Up @@ -769,6 +770,8 @@ createHydraNode ::
createHydraNode ledger nodeState signingKey otherParties outputs outputHistory chain cp = do
eq <- createEventQueue
persistence <- createPersistenceInMemory
let (eventSource, eventSink) = eventPairFromPersistenceIncremental persistence
eventSinks = eventSink :| []
connectNode chain $
HydraNode
{ eq
Expand All @@ -795,7 +798,8 @@ createHydraNode ledger nodeState signingKey otherParties outputs outputHistory c
, contestationPeriod = cp
, participants
}
, persistence
, eventSource
, eventSinks
}
where
party = deriveParty signingKey
Expand Down
53 changes: 40 additions & 13 deletions hydra-node/test/Hydra/NodeSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ import Hydra.Node.EventQueue (EventQueue (..), createEventQueue)
import Hydra.Node.ParameterMismatch (ParameterMismatch (..))
import Hydra.Options (defaultContestationPeriod)
import Hydra.Party (Party, deriveParty)
import Hydra.Persistence (PersistenceIncremental (..))
import Hydra.Persistence (PersistenceIncremental (..), NewPersistenceIncremental (..), eventPairFromPersistenceIncremental, EventSource(..), EventSink(..))
import Test.Hydra.Fixture (alice, aliceSk, bob, bobSk, carol, carolSk, deriveOnChainId, testHeadId, testHeadSeed)
import Hydra.Persistence (EventSource)
import Hydra.Persistence (EventSink)

spec :: Spec
spec = parallel $ do
Expand Down Expand Up @@ -133,15 +135,17 @@ spec = parallel $ do
failAfter 1 $
showLogsOnFailure "NodeSpec" $ \tracer -> do
persistence <- createPersistenceInMemory
let (eventSource, eventSink) = eventPairFromPersistenceIncremental persistence
eventSinks = eventSink :| []

createHydraNode' persistence bobSk [alice, carol] defaultContestationPeriod eventsToOpenHead
createHydraNode' eventSource eventSinks bobSk [alice, carol] defaultContestationPeriod eventsToOpenHead
>>= runToCompletion tracer

let reqTxEvent = NetworkEvent{ttl = defaultTTL, party = alice, message = ReqTx{transaction = tx1}}
tx1 = SimpleTx{txSimpleId = 1, txInputs = utxoRefs [2], txOutputs = utxoRefs [4]}

(node, getServerOutputs) <-
createHydraNode' persistence bobSk [alice, carol] defaultContestationPeriod [reqTxEvent]
createHydraNode' eventSource eventSinks bobSk [alice, carol] defaultContestationPeriod [reqTxEvent]
>>= recordServerOutputs
runToCompletion tracer node

Expand Down Expand Up @@ -245,25 +249,32 @@ createHydraNode ::
[Event SimpleTx] ->
m (HydraNode SimpleTx m)
createHydraNode =
createHydraNode'
PersistenceIncremental
{ append = const $ pure ()
, loadAll = pure []
}
createHydraNode' eventSource (eventSink :| [])


where
append = const $ pure ()
loadAll = pure []
eventSource = EventSource {getEvents' = loadAll}
eventSink = EventSink {putEvent' = append}

createHydraNode' ::
(MonadDelay m, MonadAsync m, MonadLabelledSTM m, MonadThrow m) =>
PersistenceIncremental (StateChanged SimpleTx) m ->
EventSource (StateChanged SimpleTx) m ->
NonEmpty (EventSink (StateChanged SimpleTx) m) ->
SigningKey HydraKey ->
[Party] ->
ContestationPeriod ->
[Event SimpleTx] ->
m (HydraNode SimpleTx m)
createHydraNode' persistence signingKey otherParties contestationPeriod events = do
createHydraNode' eventSource eventSinks signingKey otherParties contestationPeriod events = do
eq@EventQueue{putEvent} <- createEventQueue
forM_ events putEvent
(headState, _) <- loadState nullTracer persistence SimpleChainState{slot = ChainSlot 0}
(headState, _) <- loadState nullTracer eventSource SimpleChainState{slot = ChainSlot 0}
nodeState <- createNodeState headState

let persistence = createPersiste

pure $
HydraNode
{ eq
Expand All @@ -285,7 +296,8 @@ createHydraNode' persistence signingKey otherParties contestationPeriod events =
, contestationPeriod
, participants
}
, persistence
, eventSource
, eventSinks
}
where
party = deriveParty signingKey
Expand All @@ -302,7 +314,22 @@ recordNetwork node = do
recordPersistedItems :: HydraNode tx IO -> IO (HydraNode tx IO, IO [StateChanged tx])
recordPersistedItems node = do
(record, query) <- messageRecorder
pure (node{persistence = PersistenceIncremental{append = record, loadAll = pure []}}, query)
lastStateChangeId <- newTVarIO 0
-- pure (node{persistence = PersistenceIncremental{append = record, loadAll = pure []}}, query)
let getEvents' = pure []
putEvent' = \e -> do
atomically $ modifyTVar' lastStateChangeId succ
record e
pure
( node{persistence =
NewPersistenceIncremental {
eventSource = EventSource {getEvents'},
eventSinks = EventSink{putEvent'} :| [],
lastStateChangeId
}
}
, query
)

recordServerOutputs :: HydraNode tx IO -> IO (HydraNode tx IO, IO [ServerOutput tx])
recordServerOutputs node = do
Expand Down
Loading

0 comments on commit f23b790

Please sign in to comment.