Skip to content

Commit

Permalink
WIP: stateChangeID into StateChanged, routed into disk-persistence ev…
Browse files Browse the repository at this point in the history
…entSink.\n\nIt's not clear yet if we can have the statechange ID in the StateChanged type as a substitute for tracking the current stateChangeID in the node\nBut for now it works to get the node's stateChangedID into the disk-persistence eventSink's putEvent', where we can compare it against the last persisted stateChange\nThis works for the particular disk-based eventSink where we *do have* exactly-once, but in ex kinesis poc or redis or something, it can be the key for at-least-once dict
  • Loading branch information
cardenaso11 committed Feb 15, 2024
1 parent bf8d695 commit cd9aa7b
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 26 deletions.
2 changes: 1 addition & 1 deletion hydra-node/src/Hydra/API/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import Hydra.Chain.Direct.State ()
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (IP, PortNumber)
import Hydra.Party (Party)
import Hydra.Persistence (PersistenceIncremental (..), NewPersistenceIncremental(..), EventSource(..), EventSink(..), putEventToSinks)
import Hydra.Persistence (NewPersistenceIncremental(..), EventSource(..), putEventToSinks)
import Network.Wai.Handler.Warp (
defaultSettings,
runSettings,
Expand Down
10 changes: 6 additions & 4 deletions hydra-node/src/Hydra/HeadLogic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ onIdleChainInitTx env newChainState headId headSeed headParameters participants
, chainState = newChainState
, headId
, headSeed
, stateChangeID = 0
}
)
<> Effects [ClientEffect $ ServerOutput.HeadIsInitializing{headId, parties}]
Expand Down Expand Up @@ -682,12 +683,13 @@ update ::
IsChainState tx =>
Environment ->
Ledger tx ->
Word64 ->
-- | Current HeadState to validate the command against.
HeadState tx ->
-- | Command sent to the HeadLogic to be processed.
Event tx ->
Outcome tx
update env ledger st ev = case (st, ev) of
update env ledger nextStateChangeID st ev = case (st, ev) of
(Idle _, ClientEvent Init) ->
onIdleClientInit env
(Idle _, OnChainEvent Observation{observedTx = OnInitTx{headId, headSeed, headParameters, participants}, newChainState}) ->
Expand Down Expand Up @@ -742,7 +744,7 @@ update env ledger st ev = case (st, ev) of
(Closed ClosedState{contestationDeadline, readyToFanoutSent, headId}, OnChainEvent Tick{chainTime})
| chainTime > contestationDeadline && not readyToFanoutSent ->
StateChanged
HeadIsReadyToFanout
HeadIsReadyToFanout{stateChangeID = nextStateChangeID}
<> Effects [ClientEffect $ ServerOutput.ReadyToFanout headId]
(Closed closedState, ClientEvent Fanout) ->
onClosedClientFanout closedState
Expand Down Expand Up @@ -946,7 +948,7 @@ aggregate st = \case
where
sigs = Map.insert party signature signatories
_otherState -> st
HeadIsReadyToFanout ->
HeadIsReadyToFanout{} ->
case st of
Closed cst -> Closed cst{readyToFanoutSent = True}
_otherState -> st
Expand Down Expand Up @@ -993,7 +995,7 @@ recoverChainStateHistory initialChainState =
PartySignedSnapshot{} -> history
SnapshotConfirmed{} -> history
HeadClosed{chainState} -> pushNewState chainState history
HeadIsReadyToFanout -> history
HeadIsReadyToFanout{} -> history
HeadFannedOut{chainState} -> pushNewState chainState history
ChainRolledBack{chainState} ->
rollbackHistory (chainStateSlot chainState) history
Expand Down
49 changes: 38 additions & 11 deletions hydra-node/src/Hydra/HeadLogic/Outcome.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,22 @@ data StateChanged tx
, chainState :: ChainStateType tx
, headId :: HeadId
, headSeed :: HeadSeed
, stateChangeID :: Word64
}
| CommittedUTxO
{ party :: Party
, committedUTxO :: UTxOType tx
, chainState :: ChainStateType tx
, stateChangeID :: Word64
}
| HeadAborted {chainState :: ChainStateType tx}
| HeadOpened {chainState :: ChainStateType tx, initialUTxO :: UTxOType tx}
| HeadAborted {chainState :: ChainStateType tx, stateChangeID :: Word64}
| HeadOpened {chainState :: ChainStateType tx, initialUTxO :: UTxOType tx, stateChangeID :: Word64}
| TransactionAppliedToLocalUTxO
{ tx :: tx
, newLocalUTxO :: UTxOType tx
, stateChangeID :: Word64
}
| SnapshotRequestDecided {snapshotNumber :: SnapshotNumber}
| SnapshotRequestDecided {snapshotNumber :: SnapshotNumber, stateChangeID :: Word64}
| -- | A snapshot was requested by some party.
-- NOTE: We deliberately already include an updated local ledger state to
-- not need a ledger to interpret this event.
Expand All @@ -70,17 +73,41 @@ data StateChanged tx
, requestedTxIds :: [TxIdType tx]
, newLocalUTxO :: UTxOType tx
, newLocalTxs :: [tx]
, stateChangeID :: Word64
}
| TransactionReceived {tx :: tx}
| PartySignedSnapshot {snapshot :: Snapshot tx, party :: Party, signature :: Signature (Snapshot tx)}
| SnapshotConfirmed {snapshot :: Snapshot tx, signatures :: MultiSignature (Snapshot tx)}
| HeadClosed {chainState :: ChainStateType tx, contestationDeadline :: UTCTime}
| HeadIsReadyToFanout
| HeadFannedOut {chainState :: ChainStateType tx}
| ChainRolledBack {chainState :: ChainStateType tx}
| TickObserved {chainSlot :: ChainSlot}
| TransactionReceived {tx :: tx, stateChangeID :: Word64}
| PartySignedSnapshot {snapshot :: Snapshot tx, party :: Party, signature :: Signature (Snapshot tx), stateChangeID :: Word64}
| SnapshotConfirmed {snapshot :: Snapshot tx, signatures :: MultiSignature (Snapshot tx), stateChangeID :: Word64}
| HeadClosed {chainState :: ChainStateType tx, contestationDeadline :: UTCTime, stateChangeID :: Word64}
| HeadIsReadyToFanout {stateChangeID :: Word64}
| HeadFannedOut {chainState :: ChainStateType tx, stateChangeID :: Word64}
| ChainRolledBack {chainState :: ChainStateType tx, stateChangeID :: Word64}
| TickObserved {chainSlot :: ChainSlot, stateChangeID :: Word64}
deriving stock (Generic)

getStateChangeID :: StateChanged tx -> Word64
getStateChangeID = \case
HeadInitialized{stateChangeID} -> stateChangeID
CommittedUTxO{stateChangeID} -> stateChangeID
HeadAborted{stateChangeID} -> stateChangeID
HeadOpened{stateChangeID} -> stateChangeID
TransactionAppliedToLocalUTxO{stateChangeID} -> stateChangeID
SnapshotRequestDecided{stateChangeID} -> stateChangeID
SnapshotRequested{stateChangeID} -> stateChangeID
TransactionReceived{stateChangeID} -> stateChangeID
PartySignedSnapshot{stateChangeID} -> stateChangeID
SnapshotConfirmed{stateChangeID} -> stateChangeID
HeadClosed{stateChangeID} -> stateChangeID
HeadIsReadyToFanout{stateChangeID} -> stateChangeID
HeadFannedOut{stateChangeID} -> stateChangeID
ChainRolledBack{stateChangeID} -> stateChangeID
TickObserved{stateChangeID} -> stateChangeID

--FIXME(Elaine): these stateChangeID fields were added in an attempt to make every StateChanged keep track of its ID
-- it's not clear how to handle the state for this. but for now the field is kept so that the type of putEvent' can be kept simple, and shouldn't do harm



instance (IsTx tx, Arbitrary (HeadState tx), Arbitrary (ChainStateType tx)) => Arbitrary (StateChanged tx) where
arbitrary = genericArbitrary

Expand Down
29 changes: 25 additions & 4 deletions hydra-node/src/Hydra/Node.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | Top-level module to run a single Hydra node.
--
Expand All @@ -15,6 +16,7 @@ import Control.Concurrent.Class.MonadSTM (
labelTVarIO,
newTVarIO,
stateTVar,
modifyTVar,
)
import Control.Monad.Trans.Writer (execWriter, tell)
import Hydra.API.Server (Server, sendOutput)
Expand Down Expand Up @@ -193,7 +195,9 @@ stepHydraNode ::
stepHydraNode tracer node = do
e@Queued{eventId, queuedEvent} <- nextEvent eq
traceWith tracer $ BeginEvent{by = party, eventId, event = queuedEvent}
outcome <- atomically (processNextEvent node queuedEvent)
outcome <- atomically $ do
nextStateChangeID <- readTVar . lastStateChangeId $ persistence node-- an event won't necessarily produce a statechange, but if it does, then this'll be its ID
processNextEvent node queuedEvent nextStateChangeID
traceWith tracer (LogicOutcome party outcome)
handleOutcome e outcome
processEffects node tracer eventId outcome
Expand All @@ -202,7 +206,11 @@ stepHydraNode tracer node = do
handleOutcome e = \case
Error _ -> pure ()
Wait _reason -> putEventAfter eq waitDelay (decreaseTTL e)
StateChanged sc -> putEventToSinks eventSinks sc
StateChanged sc -> processNextStateChange node eventSinks sc
-- stateChangedId <- modifyTVar' latestStateChangeId (+1)
-- not all events will produce a statechange
-- FIXME(Elaine): if we are going to use eventId instead of stateChangeId, here is where we'd change it
-- this FIXME noted only so that we consider this again at review time
Effects _ -> pure ()
Combined l r -> handleOutcome e l >> handleOutcome e r

Expand All @@ -226,15 +234,28 @@ processNextEvent ::
IsChainState tx =>
HydraNode tx m ->
Event tx ->
Word64 ->
STM m (Outcome tx)
processNextEvent HydraNode{nodeState, ledger, env} e =
processNextEvent HydraNode{nodeState, ledger, env} e nextStateChangeID = do
modifyHeadState $ \s ->
let outcome = computeOutcome s e
in (outcome, aggregateState s outcome)
where
NodeState{modifyHeadState} = nodeState

computeOutcome = Logic.update env ledger
computeOutcome = Logic.update env ledger nextStateChangeID

processNextStateChange ::
forall m e tx. (Monad m, MonadSTM m, ToJSON (StateChanged tx)) =>
HydraNode tx m ->
NonEmpty (EventSink (StateChanged tx) m) ->
StateChanged tx ->
m ()
processNextStateChange HydraNode{persistence} sinks sc = do
(putEventToSinks @m @(StateChanged tx)) sinks sc
atomically $ modifyTVar (lastStateChangeId persistence) (+1)
--FIXME(Elaine): put this whole thing in a single `atomically` call
-- that should be possible but there's some annoying classy monad transformer types to deal with

processEffects ::
( MonadAsync m
Expand Down
21 changes: 15 additions & 6 deletions hydra-node/src/Hydra/Persistence.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import Data.ByteString.Char8 qualified as C8
import System.Directory (createDirectoryIfMissing, doesFileExist)
import System.FilePath (takeDirectory)
import UnliftIO.IO.File (withBinaryFile, writeBinaryFileDurableAtomic)
import Hydra.HeadLogic.Outcome (getStateChangeID, StateChanged (stateChangeID)) --FIXME(Elaine): move this import to whatever is re-exporting StateChanged in Hydra.Preude

data PersistenceException
= PersistenceException String
Expand Down Expand Up @@ -70,10 +71,10 @@ data NewPersistenceIncremental a m = NewPersistenceIncremental
, lastStateChangeId :: TVar m Word64 -- FIXME(Elaine): name change , persistence captures more than just this
}

putEventToSinks :: (Monad m, ToJSON e) => NonEmpty (EventSink e m) -> e -> m ()
putEventToSinks :: forall m e. (Monad m, ToJSON e) => NonEmpty (EventSink e m) -> e -> m ()
putEventToSinks sinks e = forM_ sinks (\sink -> putEvent' sink e)

putEventsToSinks :: (Monad m, ToJSON e) => NonEmpty (EventSink e m) -> [e] -> m ()
putEventsToSinks :: forall m e. (Monad m, ToJSON e) => NonEmpty (EventSink e m) -> [e] -> m ()
putEventsToSinks sinks es = forM_ es (\e -> putEventToSinks sinks e)

-- FIXME(Elaine): this needs to be the reverse, since we need to keep track of the eventID
Expand All @@ -98,15 +99,15 @@ createNewPersistenceIncremental ::
createNewPersistenceIncremental fp = do
liftIO . createDirectoryIfMissing True $ takeDirectory fp
authorizedThread <- newTVarIO Nothing
lastStateChangeId <- newTVarIO 0
lastStateChangeId <- newTVarIO (0 :: Word64)
-- FIXME(Elaine): eventid too general for this, at least not without writing the eventids to disk, but even then, hacky
-- we'll have a new ID for each statechanged event,
-- i think this is probablyh fine and doesn't need fixing, but wanted to write it down first
-- the eventId here is a monotonically increasing integer, and it lets us keep track of how "far along" we are in the persistence
-- we can use this to skip resubmitting events
-- more complicated solutions would be possible, in particular, rolling hash / merkle chain might be more resilient to corruption
-- but given that persistence is already atomic and only needs to be consistent within a single node, it should suffice
nextId <- newTVarIO 0
nextId <- newTVarIO (0 :: Word64)
let eventSource = EventSource
{ getEvents' = do
tid <- myThreadId
Expand All @@ -128,15 +129,23 @@ createNewPersistenceIncremental fp = do
-- set initial nextId (zero-indexed) based on how many state change events we have
atomically $ do
writeTVar lastStateChangeId $ fromIntegral $ length result
writeTVar nextId $ length result
writeTVar nextId . fromIntegral $ length result

pure result
}
eventSink = EventSink
{ putEvent' = \a -> do
threadId <- myThreadId
isEventNew <- atomically $ do
let outgoingStateChangeId = undefined a -- FIXME(Elaine)
let stateChangeID = (undefined a) :: Word64
-- FIXME(Elaine): we need to put getStateChangeID into a typeclass and add that constraint to a, in the EventSink type
-- or we can have separate versions of this for StateChanged, and for network functionality etc

let outgoingStateChangeId = stateChangeID
-- outgoingStateChangeId <- readTVar $ stateChangeID -- this is the ID of the state change we just got from the node
-- it's not actually written to disk yet until this function is over


id <- readTVar nextId
writeTVar authorizedThread $ Just threadId
modifyTVar' nextId succ
Expand Down

0 comments on commit cd9aa7b

Please sign in to comment.