Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
cardenaso11 committed Feb 15, 2024
1 parent cd9aa7b commit adc3e7f
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 119 deletions.
4 changes: 2 additions & 2 deletions hydra-node/src/Hydra/API/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import Hydra.Chain.Direct.State ()
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (IP, PortNumber)
import Hydra.Party (Party)
import Hydra.Persistence (NewPersistenceIncremental(..), EventSource(..), putEventToSinks)
import Hydra.Persistence (EventSource (..), NewPersistenceIncremental (..), putEventToSinks)
import Network.Wai.Handler.Warp (
defaultSettings,
runSettings,
Expand Down Expand Up @@ -68,7 +68,7 @@ withAPIServer ::
Chain tx IO ->
PParams LedgerEra ->
ServerComponent tx IO ()
withAPIServer host port party NewPersistenceIncremental{eventSource=EventSource{getEvents'}, eventSinks = sinks} tracer chain pparams callback action =
withAPIServer host port party NewPersistenceIncremental{eventSource = EventSource{getEvents'}, eventSinks = sinks} tracer chain pparams callback action =
handle onIOException $ do
responseChannel <- newBroadcastTChanIO
timedOutputEvents <- getEvents'
Expand Down
4 changes: 1 addition & 3 deletions hydra-node/src/Hydra/HeadLogic/Outcome.hs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,9 @@ getStateChangeID = \case
ChainRolledBack{stateChangeID} -> stateChangeID
TickObserved{stateChangeID} -> stateChangeID

--FIXME(Elaine): these stateChangeID fields were added in an attempt to make every StateChanged keep track of its ID
-- FIXME(Elaine): these stateChangeID fields were added in an attempt to make every StateChanged keep track of its ID
-- it's not clear how to handle the state for this. but for now the field is kept so that the type of putEvent' can be kept simple, and shouldn't do harm



instance (IsTx tx, Arbitrary (HeadState tx), Arbitrary (ChainStateType tx)) => Arbitrary (StateChanged tx) where
arbitrary = genericArbitrary

Expand Down
4 changes: 2 additions & 2 deletions hydra-node/src/Hydra/Network/Reliability.hs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ import Hydra.Network (Network (..), NetworkComponent)
import Hydra.Network.Authenticate (Authenticated (..))
import Hydra.Network.Heartbeat (Heartbeat (..), isPing)
import Hydra.Party (Party)
import Hydra.Persistence (Persistence (..), PersistenceIncremental (..), NewPersistenceIncremental (..), EventSource(..), EventSink(..), putEventToSinks)
import Hydra.Persistence (EventSink (..), EventSource (..), NewPersistenceIncremental (..), Persistence (..), PersistenceIncremental (..), putEventToSinks)
import Test.QuickCheck (getPositive, listOf)

data ReliableMsg msg = ReliableMsg
Expand Down Expand Up @@ -199,7 +199,7 @@ mkMessagePersistence numberOfParties NewPersistenceIncremental{eventSource, even
, saveAcks = \acks -> do
save ackPersistence acks
, loadMessages = do
getEvents' eventSource
getEvents' eventSource
, appendMessage = \msg -> do
putEventToSinks eventSinks msg
}
Expand Down
22 changes: 12 additions & 10 deletions hydra-node/src/Hydra/Node.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UndecidableInstances #-}

-- | Top-level module to run a single Hydra node.
--
Expand All @@ -14,9 +14,9 @@ import Hydra.Prelude
import Control.Concurrent.Class.MonadSTM (
MonadLabelledSTM,
labelTVarIO,
modifyTVar,
newTVarIO,
stateTVar,
modifyTVar,
)
import Control.Monad.Trans.Writer (execWriter, tell)
import Hydra.API.Server (Server, sendOutput)
Expand Down Expand Up @@ -56,7 +56,7 @@ import Hydra.Node.EventQueue (EventQueue (..), Queued (..))
import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..))
import Hydra.Options (ChainConfig (..), DirectChainConfig (..), RunOptions (..), defaultContestationPeriod)
import Hydra.Party (Party (..), deriveParty)
import Hydra.Persistence (PersistenceIncremental (..), NewPersistenceIncremental(..), EventSource(..), EventSink(..), putEventToSinks, putEventsToSinks, NewPersistenceIncremental)
import Hydra.Persistence (EventSink (..), EventSource (..), NewPersistenceIncremental (..), PersistenceIncremental (..), putEventToSinks, putEventsToSinks)

-- * Environment Handling

Expand Down Expand Up @@ -148,7 +148,7 @@ data HydraNode tx m = HydraNode
-- , latestStateChangeId :: TVar m Word64
-- , eventSource :: EventSource (StateChanged tx) m
-- , eventSinks :: NonEmpty (EventSink (StateChanged tx) m)
--FIXME(Elaine): bundle eventSource,Sinks, latestStateChangeId into a single type for convenience?
-- FIXME(Elaine): bundle eventSource,Sinks, latestStateChangeId into a single type for convenience?
-- they should still definitely be separable too
}

Expand Down Expand Up @@ -196,7 +196,7 @@ stepHydraNode tracer node = do
e@Queued{eventId, queuedEvent} <- nextEvent eq
traceWith tracer $ BeginEvent{by = party, eventId, event = queuedEvent}
outcome <- atomically $ do
nextStateChangeID <- readTVar . lastStateChangeId $ persistence node-- an event won't necessarily produce a statechange, but if it does, then this'll be its ID
nextStateChangeID <- readTVar . lastStateChangeId $ persistence node -- an event won't necessarily produce a statechange, but if it does, then this'll be its ID
processNextEvent node queuedEvent nextStateChangeID
traceWith tracer (LogicOutcome party outcome)
handleOutcome e outcome
Expand Down Expand Up @@ -246,16 +246,18 @@ processNextEvent HydraNode{nodeState, ledger, env} e nextStateChangeID = do
computeOutcome = Logic.update env ledger nextStateChangeID

processNextStateChange ::
forall m e tx. (Monad m, MonadSTM m, ToJSON (StateChanged tx)) =>
forall m e tx.
(Monad m, MonadSTM m, ToJSON (StateChanged tx)) =>
HydraNode tx m ->
NonEmpty (EventSink (StateChanged tx) m) ->
StateChanged tx ->
m ()
processNextStateChange HydraNode{persistence} sinks sc = do
(putEventToSinks @m @(StateChanged tx)) sinks sc
atomically $ modifyTVar (lastStateChangeId persistence) (+1)
--FIXME(Elaine): put this whole thing in a single `atomically` call
-- that should be possible but there's some annoying classy monad transformer types to deal with
atomically $ modifyTVar (lastStateChangeId persistence) (+ 1)

-- FIXME(Elaine): put this whole thing in a single `atomically` call
-- that should be possible but there's some annoying classy monad transformer types to deal with

processEffects ::
( MonadAsync m
Expand Down Expand Up @@ -335,4 +337,4 @@ loadStateEventSource tracer eventSource eventSinks defaultChainState = do
Just sinks' -> putEventsToSinks sinks' events
pure (headState, chainStateHistory)
where
initialState = Idle IdleState{chainState = defaultChainState}
initialState = Idle IdleState{chainState = defaultChainState}
4 changes: 2 additions & 2 deletions hydra-node/src/Hydra/Node/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ import Hydra.Network.Reliability (MessagePersistence, ReliableMsg, mkMessagePers
import Hydra.Node (HydraNodeLog (..))
import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..))
import Hydra.Party (Party, deriveParty)
import Hydra.Persistence (Persistence (..), createPersistence, createPersistenceIncremental, createNewPersistenceIncremental)
import Hydra.Persistence (Persistence (..), createNewPersistenceIncremental, createPersistence, createPersistenceIncremental)
import System.FilePath ((</>))

-- | An alias for logging messages output by network component.
Expand Down Expand Up @@ -150,7 +150,7 @@ configureMessagePersistence ::
m (MessagePersistence m msg)
configureMessagePersistence tracer persistenceDir numberOfParties = do
msgPersistence <- createNewPersistenceIncremental $ storedMessagesFile persistenceDir
--NOTE(Elaine): after deliberation, new persistence should be fine for network messages
-- NOTE(Elaine): after deliberation, new persistence should be fine for network messages
ackPersistence@Persistence{load} <- createPersistence $ acksFile persistenceDir
mAcks <- load
ackPersistence' <- case fmap (\acks -> length acks == numberOfParties) mAcks of
Expand Down
9 changes: 5 additions & 4 deletions hydra-node/src/Hydra/Node/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import Hydra.Node (
createNodeState,
initEnvironment,
loadState,
runHydraNode, loadStateEventSource,
loadStateEventSource,
runHydraNode,
)
import Hydra.Node.EventQueue (EventQueue (..), createEventQueue)
import Hydra.Node.Network (NetworkConfiguration (..), withNetwork)
Expand All @@ -49,9 +50,9 @@ import Hydra.Options (
RunOptions (..),
validateRunOptions,
)
import Hydra.Persistence (NewPersistenceIncremental (..), createPersistenceIncremental, eventPairFromPersistenceIncremental, createNewPersistenceIncremental)
import Hydra.Persistence (NewPersistenceIncremental (..), createNewPersistenceIncremental, createPersistenceIncremental, eventPairFromPersistenceIncremental)

import qualified Data.List.NonEmpty as NE
import Data.List.NonEmpty qualified as NE

data ConfigurationException
= ConfigurationException ProtocolParametersConversionError
Expand Down Expand Up @@ -83,7 +84,7 @@ run opts = do

withCardanoLedger pparams globals $ \ledger -> do
-- persistence <- createPersistenceIncremental $ persistenceDir <> "/state"
--TODO(Elaine): remove in favor of eventSource/Sink directly
-- TODO(Elaine): remove in favor of eventSource/Sink directly
-- (eventSource, eventSink) <- createEventPairIncremental $ persistenceDir <> "/state"

-- let -- (eventSource, eventSink) = eventPairFromPersistenceIncremental persistence
Expand Down
133 changes: 68 additions & 65 deletions hydra-node/src/Hydra/Persistence.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ module Hydra.Persistence where

import Hydra.Prelude

import Control.Concurrent.Class.MonadSTM (newTVarIO, throwSTM, writeTVar, MonadSTM (modifyTVar'))
import Control.Concurrent.Class.MonadSTM (MonadSTM (modifyTVar'), newTVarIO, throwSTM, writeTVar)
import Control.Monad.Class.MonadFork (myThreadId)
import Data.Aeson qualified as Aeson
import Data.ByteString qualified as BS
import Data.ByteString.Char8 qualified as C8
import Hydra.HeadLogic.Outcome (StateChanged (stateChangeID), getStateChangeID) -- FIXME(Elaine): move this import to whatever is re-exporting StateChanged in Hydra.Preude
import System.Directory (createDirectoryIfMissing, doesFileExist)
import System.FilePath (takeDirectory)
import UnliftIO.IO.File (withBinaryFile, writeBinaryFileDurableAtomic)
import Hydra.HeadLogic.Outcome (getStateChangeID, StateChanged (stateChangeID)) --FIXME(Elaine): move this import to whatever is re-exporting StateChanged in Hydra.Preude

data PersistenceException
= PersistenceException String
Expand Down Expand Up @@ -50,21 +50,23 @@ createPersistence fp = do
Right a -> pure (Just a)
}

data EventSource e m = EventSource { getEvents' :: FromJSON e => m [e] }
data EventSink e m = EventSink { putEvent' :: ToJSON e => e -> m () }
--FIXME(Elaine): we have to figure out a better taxonomy/nomenclature for the events/statechange stuff
data EventSource e m = EventSource {getEvents' :: FromJSON e => m [e]}
data EventSink e m = EventSink {putEvent' :: ToJSON e => e -> m ()}

-- FIXME(Elaine): we have to figure out a better taxonomy/nomenclature for the events/statechange stuff
-- the eventID here is not the same as the eventID in Queued, that one is more fickle and influenced by non state change events
-- this one is only incremented when we have a new state change event

--FIXME(Elaine): primary createPersistenceIncremental is in Run.hs, that's swapped now
-- FIXME(Elaine): primary createPersistenceIncremental is in Run.hs, that's swapped now
-- but replacing PersistenceIncremental outside of that, for network messages ex, seems like it should happen after, to not break too much at once

-- | Handle to save incrementally and load files to/from disk using JSON encoding.
data PersistenceIncremental a m = PersistenceIncremental
{ append :: ToJSON a => a -> m ()
, loadAll :: FromJSON a => m [a]
}

--FIXME(Elaine): rename this, just taking the name of PersistenceIncremental once thats fully removed might suffice
-- FIXME(Elaine): rename this, just taking the name of PersistenceIncremental once thats fully removed might suffice
data NewPersistenceIncremental a m = NewPersistenceIncremental
{ eventSource :: EventSource a m
, eventSinks :: NonEmpty (EventSink a m)
Expand All @@ -78,12 +80,12 @@ putEventsToSinks :: forall m e. (Monad m, ToJSON e) => NonEmpty (EventSink e m)
putEventsToSinks sinks es = forM_ es (\e -> putEventToSinks sinks e)

-- FIXME(Elaine): this needs to be the reverse, since we need to keep track of the eventID
--FIXME(Elaine): neither this nor the opposite direction can handle re-submission properly without keeping track of the state separately in step/hydranode &
-- FIXME(Elaine): neither this nor the opposite direction can handle re-submission properly without keeping track of the state separately in step/hydranode &
-- so this means removing the old persistence for the purpose of statechanged events is more urgent
eventPairFromPersistenceIncremental :: PersistenceIncremental a m -> (EventSource a m, EventSink a m)
eventPairFromPersistenceIncremental PersistenceIncremental{append, loadAll} =
let eventSource = EventSource {getEvents' = loadAll}
eventSink = EventSink {putEvent' = append}
let eventSource = EventSource{getEvents' = loadAll}
eventSink = EventSink{putEvent' = append}
in (eventSource, eventSink)

-- persistenceIncrementalFromEventPair :: (Monad m, ToJSON a, FromJSON a) => (EventSource a m, EventSink a m) -> PersistenceIncremental a m
Expand All @@ -101,68 +103,69 @@ createNewPersistenceIncremental fp = do
authorizedThread <- newTVarIO Nothing
lastStateChangeId <- newTVarIO (0 :: Word64)
-- FIXME(Elaine): eventid too general for this, at least not without writing the eventids to disk, but even then, hacky
-- we'll have a new ID for each statechanged event,
-- we'll have a new ID for each statechanged event,
-- i think this is probablyh fine and doesn't need fixing, but wanted to write it down first
-- the eventId here is a monotonically increasing integer, and it lets us keep track of how "far along" we are in the persistence
-- we can use this to skip resubmitting events
-- more complicated solutions would be possible, in particular, rolling hash / merkle chain might be more resilient to corruption
-- but given that persistence is already atomic and only needs to be consistent within a single node, it should suffice
nextId <- newTVarIO (0 :: Word64)
let eventSource = EventSource
{ getEvents' = do
tid <- myThreadId
atomically $ do
authTid <- readTVar authorizedThread
when (isJust authTid && authTid /= Just tid) $
throwSTM (IncorrectAccessException $ "Trying to load persisted data in " <> fp <> " from different thread")

liftIO (doesFileExist fp) >>= \case
False -> pure []
True -> do
bs <- readFileBS fp
-- NOTE: We require the whole file to be loadable. It might
-- happen that the data written by 'append' is only there
-- partially and then this will fail (which we accept now).
result <- case forM (C8.lines bs) Aeson.eitherDecodeStrict' of
Left e -> throwIO $ PersistenceException e
Right decoded -> pure decoded
-- set initial nextId (zero-indexed) based on how many state change events we have
atomically $ do
writeTVar lastStateChangeId $ fromIntegral $ length result
writeTVar nextId . fromIntegral $ length result

pure result
}
eventSink = EventSink
{ putEvent' = \a -> do
threadId <- myThreadId
isEventNew <- atomically $ do
let stateChangeID = (undefined a) :: Word64
-- FIXME(Elaine): we need to put getStateChangeID into a typeclass and add that constraint to a, in the EventSink type
-- or we can have separate versions of this for StateChanged, and for network functionality etc

let outgoingStateChangeId = stateChangeID
-- outgoingStateChangeId <- readTVar $ stateChangeID -- this is the ID of the state change we just got from the node
-- it's not actually written to disk yet until this function is over


id <- readTVar nextId
writeTVar authorizedThread $ Just threadId
modifyTVar' nextId succ
pure $ outgoingStateChangeId `compare` id
let bytes = toStrict $ Aeson.encode a <> "\n"
case isEventNew of
-- event already persisted
LT -> pure ()
-- event is as new as expected
EQ -> liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes)
-- event is newer than expected,
GT -> do
liftIO $ putStrLn "ELAINE: this shouldn't happen with my current understanding of stuff"
liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes) -- FIXME(Elaine): maybe error ? shouldnt really happen
}
let eventSource =
EventSource
{ getEvents' = do
tid <- myThreadId
atomically $ do
authTid <- readTVar authorizedThread
when (isJust authTid && authTid /= Just tid) $
throwSTM (IncorrectAccessException $ "Trying to load persisted data in " <> fp <> " from different thread")

liftIO (doesFileExist fp) >>= \case
False -> pure []
True -> do
bs <- readFileBS fp
-- NOTE: We require the whole file to be loadable. It might
-- happen that the data written by 'append' is only there
-- partially and then this will fail (which we accept now).
result <- case forM (C8.lines bs) Aeson.eitherDecodeStrict' of
Left e -> throwIO $ PersistenceException e
Right decoded -> pure decoded
-- set initial nextId (zero-indexed) based on how many state change events we have
atomically $ do
writeTVar lastStateChangeId $ fromIntegral $ length result
writeTVar nextId . fromIntegral $ length result

pure result
}
eventSink =
EventSink
{ putEvent' = \a -> do
threadId <- myThreadId
isEventNew <- atomically $ do
let stateChangeID = (undefined a) :: Word64
-- FIXME(Elaine): we need to put getStateChangeID into a typeclass and add that constraint to a, in the EventSink type
-- or we can have separate versions of this for StateChanged, and for network functionality etc

let outgoingStateChangeId = stateChangeID
-- outgoingStateChangeId <- readTVar $ stateChangeID -- this is the ID of the state change we just got from the node
-- it's not actually written to disk yet until this function is over

id <- readTVar nextId
writeTVar authorizedThread $ Just threadId
modifyTVar' nextId succ
pure $ outgoingStateChangeId `compare` id
let bytes = toStrict $ Aeson.encode a <> "\n"
case isEventNew of
-- event already persisted
LT -> pure ()
-- event is as new as expected
EQ -> liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes)
-- event is newer than expected,
GT -> do
liftIO $ putStrLn "ELAINE: this shouldn't happen with my current understanding of stuff"
liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes) -- FIXME(Elaine): maybe error ? shouldnt really happen
}
eventSinks = eventSink :| []
pure NewPersistenceIncremental { eventSource, eventSinks, lastStateChangeId}
pure NewPersistenceIncremental{eventSource, eventSinks, lastStateChangeId}

-- | Initialize persistence handle for given type 'a' at given file path.
--
Expand Down
Loading

0 comments on commit adc3e7f

Please sign in to comment.