Skip to content

Commit

Permalink
Add HasEventId class to ensure events can be identified
Browse files Browse the repository at this point in the history
We kept the EventSource/EventSink very abstract to make implementations
not realy on the internas of the actual data type used. However, an
event source / sink will need at least identify individual events to
tell them apart, e.g. to deduplicate them in memory.
  • Loading branch information
ch1bo committed Mar 4, 2024
1 parent f34b83c commit a057ff7
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 114 deletions.
26 changes: 14 additions & 12 deletions hydra-node/src/Hydra/Events.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,23 @@ module Hydra.Events where

import Hydra.Prelude

-- TODO: Need to add a concept of identification to 'e' as used in these
-- definitions to enable deduplication etc.
-- FIXME(Elaine): we have to figure out a better taxonomy/nomenclature for the events/statechange stuff
-- the eventID here is not the same as the eventID in Queued, that one is more fickle and influenced by non state change events
-- this one is only incremented when we have a new state change event
type EventId = Word64

newtype EventSource e m = EventSource {getEvents' :: m [e]}
class HasEventId a where
getEventId :: a -> EventId

newtype EventSink e m = EventSink {putEvent' :: e -> m ()}
instance HasEventId (EventId, a) where
getEventId = fst

putEventToSinks :: Monad m => [EventSink e m] -> e -> m ()
putEventToSinks sinks e = forM_ sinks (\sink -> putEvent' sink e)
newtype EventSource e m = EventSource {getEvents' :: HasEventId e => m [e]}

putEventsToSinks :: Monad m => [EventSink e m] -> [e] -> m ()
putEventsToSinks sinks es = forM_ es (\e -> putEventToSinks sinks e)
newtype EventSink e m = EventSink {putEvent' :: HasEventId e => e -> m ()}

type EventID = Word64
putEventToSinks :: (Monad m, HasEventId e) => [EventSink e m] -> e -> m ()
putEventToSinks sinks e = forM_ sinks (\sink -> putEvent' sink e)

-- FIXME(Elaine): we have to figure out a better taxonomy/nomenclature for the events/statechange stuff
-- the eventID here is not the same as the eventID in Queued, that one is more fickle and influenced by non state change events
-- this one is only incremented when we have a new state change event
putEventsToSinks :: (Monad m, HasEventId e) => [EventSink e m] -> [e] -> m ()
putEventsToSinks sinks es = forM_ es (\e -> putEventToSinks sinks e)
37 changes: 19 additions & 18 deletions hydra-node/src/Hydra/HeadLogic/Outcome.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import Hydra.Prelude
import Hydra.API.ServerOutput (ServerOutput)
import Hydra.Chain (ChainStateType, HeadParameters, IsChainState, PostChainTx)
import Hydra.Crypto (MultiSignature, Signature)
import Hydra.Events (HasEventId (..))
import Hydra.HeadId (HeadId, HeadSeed)
import Hydra.HeadLogic.Error (LogicError)
import Hydra.HeadLogic.State (HeadState)
Expand Down Expand Up @@ -86,24 +87,24 @@ data StateChanged tx
| TickObserved {chainSlot :: ChainSlot, stateChangeID :: Word64}
deriving stock (Generic)

getStateChangeID :: StateChanged tx -> Word64
getStateChangeID = \case
HeadInitialized{stateChangeID} -> stateChangeID
CommittedUTxO{stateChangeID} -> stateChangeID
HeadAborted{stateChangeID} -> stateChangeID
HeadOpened{stateChangeID} -> stateChangeID
TransactionAppliedToLocalUTxO{stateChangeID} -> stateChangeID
SnapshotRequestDecided{stateChangeID} -> stateChangeID
SnapshotRequested{stateChangeID} -> stateChangeID
TransactionReceived{stateChangeID} -> stateChangeID
PartySignedSnapshot{stateChangeID} -> stateChangeID
SnapshotConfirmed{stateChangeID} -> stateChangeID
HeadClosed{stateChangeID} -> stateChangeID
HeadContested{stateChangeID} -> stateChangeID
HeadIsReadyToFanout{stateChangeID} -> stateChangeID
HeadFannedOut{stateChangeID} -> stateChangeID
ChainRolledBack{stateChangeID} -> stateChangeID
TickObserved{stateChangeID} -> stateChangeID
instance HasEventId (StateChanged tx) where
getEventId = \case
HeadInitialized{stateChangeID} -> stateChangeID
CommittedUTxO{stateChangeID} -> stateChangeID
HeadAborted{stateChangeID} -> stateChangeID
HeadOpened{stateChangeID} -> stateChangeID
TransactionAppliedToLocalUTxO{stateChangeID} -> stateChangeID
SnapshotRequestDecided{stateChangeID} -> stateChangeID
SnapshotRequested{stateChangeID} -> stateChangeID
TransactionReceived{stateChangeID} -> stateChangeID
PartySignedSnapshot{stateChangeID} -> stateChangeID
SnapshotConfirmed{stateChangeID} -> stateChangeID
HeadClosed{stateChangeID} -> stateChangeID
HeadContested{stateChangeID} -> stateChangeID
HeadIsReadyToFanout{stateChangeID} -> stateChangeID
HeadFannedOut{stateChangeID} -> stateChangeID
ChainRolledBack{stateChangeID} -> stateChangeID
TickObserved{stateChangeID} -> stateChangeID

-- FIXME(Elaine): these stateChangeID fields were added in an attempt to make every StateChanged keep track of its ID
-- it's not clear how to handle the state for this. but for now the field is kept so that the type of putEvent' can be kept simple, and shouldn't do harm
Expand Down
10 changes: 5 additions & 5 deletions hydra-node/test/Hydra/HeadLogicSnapshotSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import Hydra.HeadLogic (
update,
)
import Hydra.HeadLogic.Outcome (collectEffects)
import Hydra.HeadLogicSpec (getState, getStateAndEventID, inOpenState, inOpenState', runEvents, step)
import Hydra.HeadLogicSpec (getState, getStateAndEventId, inOpenState, inOpenState', runEvents, step)
import Hydra.Ledger (txId)
import Hydra.Ledger.Simple (SimpleTx (..), aValidTx, simpleLedger, utxoRef)
import Hydra.Network.Message (Message (..))
Expand Down Expand Up @@ -118,7 +118,7 @@ spec = do

(actualState, actualEventID) <- runEvents (envFor aliceSk) simpleLedger st stEventID $ do
step $ NetworkEvent defaultTTL alice $ ReqTx tx
getStateAndEventID
getStateAndEventId
actualState `shouldBe` st'
actualEventID `shouldBe` st'EventID

Expand All @@ -131,7 +131,7 @@ spec = do
step (NetworkEvent defaultTTL carol $ ReqTx $ aValidTx 1)
step (ackFrom carolSk carol)
step (ackFrom aliceSk alice)
getStateAndEventID
getStateAndEventId

let outcome = update bobEnv simpleLedger headStateEventID headState $ ackFrom bobSk bob
collectEffects outcome `shouldSatisfy` sendReqSn
Expand All @@ -141,7 +141,7 @@ spec = do
step (NetworkEvent defaultTTL alice $ ReqSn 1 [])
step (ackFrom carolSk carol)
step (ackFrom aliceSk alice)
getStateAndEventID
getStateAndEventId

let outcome = update bobEnv simpleLedger headStateEventID headState $ ackFrom bobSk bob
collectEffects outcome `shouldNotSatisfy` sendReqSn
Expand All @@ -160,7 +160,7 @@ spec = do
step (ackFrom carolSk carol)
newTxBeforeSnapshotAcknowledged
step (ackFrom aliceSk alice)
getStateAndEventID
getStateAndEventId

let everybodyAcknowleged = update notLeaderEnv simpleLedger headStateEventID headState $ ackFrom bobSk bob
collectEffects everybodyAcknowleged `shouldNotSatisfy` sendReqSn
Expand Down
Loading

0 comments on commit a057ff7

Please sign in to comment.