Skip to content

Commit

Permalink
Merge pull request #1389 from input-output-hk/lc/distinguish-inbound-…
Browse files Browse the repository at this point in the history
…outbound

Allow network functions to distinguish between inbound and outbound m…
  • Loading branch information
locallycompact authored Apr 12, 2024
2 parents d944019 + b3b1e07 commit b554d3d
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 35 deletions.
9 changes: 5 additions & 4 deletions hydra-node/src/Hydra/Network/Authenticate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,19 @@ instance FromCBOR msg => FromCBOR (Signed msg) where
-- Only verified messages are pushed downstream to the internal network for the
-- node to consume and process. Non-verified messages get discarded.
withAuthentication ::
( SignableRepresentation msg
, ToJSON msg
( SignableRepresentation inbound
, ToJSON inbound
, SignableRepresentation outbound
) =>
Tracer m AuthLog ->
-- The party signing key
SigningKey HydraKey ->
-- Other party members
[Party] ->
-- The underlying raw network.
NetworkComponent m (Signed msg) (Signed msg) a ->
NetworkComponent m (Signed inbound) (Signed outbound) a ->
-- The node internal authenticated network.
NetworkComponent m (Authenticated msg) msg a
NetworkComponent m (Authenticated inbound) outbound a
withAuthentication tracer signingKey parties withRawNetwork callback action = do
withRawNetwork checkSignature authenticate
where
Expand Down
22 changes: 11 additions & 11 deletions hydra-node/src/Hydra/Network/Heartbeat.hs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ withHeartbeat ::
-- | Callback listening to peers' status change as computed by the `withIncomingHeartbeat` layer.
ConnectionMessages m ->
-- | Underlying `NetworkComponent` for sending and consuming `Heartbeat` messages.
NetworkComponent m (Heartbeat msg1) (Heartbeat msg) a ->
NetworkComponent m (Heartbeat inbound) (Heartbeat outbound) a ->
-- | Returns a network component that can be used to send and consume arbitrary messages.
-- This layer will take care of peeling out/wrapping messages into `Heartbeat`s.
NetworkComponent m msg1 msg a
NetworkComponent m inbound outbound a
withHeartbeat nodeId connectionMessages withNetwork =
withIncomingHeartbeat connectionMessages $
withOutgoingHeartbeat nodeId withNetwork
Expand All @@ -107,8 +107,8 @@ withIncomingHeartbeat ::
ConnectionMessages m ->
-- | Underlying `NetworkComponent`.
-- We only care about the fact it notifies us with `Heartbeat` messages.
NetworkComponent m (Heartbeat msg1) msg a ->
NetworkComponent m msg1 msg a
NetworkComponent m (Heartbeat inbound) outbound a ->
NetworkComponent m inbound outbound a
withIncomingHeartbeat connectionMessages withNetwork callback action = do
heartbeat <- newTVarIO initialHeartbeatState
withNetwork (updateStateFromIncomingMessages heartbeat connectionMessages callback) $ \network ->
Expand All @@ -119,8 +119,8 @@ updateStateFromIncomingMessages ::
(MonadSTM m, MonadMonotonicTime m) =>
TVar m HeartbeatState ->
ConnectionMessages m ->
NetworkCallback msg m ->
NetworkCallback (Heartbeat msg) m
NetworkCallback inbound m ->
NetworkCallback (Heartbeat inbound) m
updateStateFromIncomingMessages heartbeatState connectionMessages callback = \case
Data nodeId msg -> notifyAlive nodeId >> callback msg
Ping nodeId -> notifyAlive nodeId
Expand All @@ -143,8 +143,8 @@ withOutgoingHeartbeat ::
NodeId ->
-- | Underlying `NetworkComponent`.
-- We only care about the fact it allows us to broadcast `Heartbeat` messages.
NetworkComponent m msg1 (Heartbeat msg) a ->
NetworkComponent m msg1 msg a
NetworkComponent m inbound (Heartbeat outbound) a ->
NetworkComponent m inbound outbound a
withOutgoingHeartbeat nodeId withNetwork callback action = do
lastSent <- newTVarIO Nothing
withNetwork callback $ \network ->
Expand All @@ -155,8 +155,8 @@ updateStateFromOutgoingMessages ::
(MonadSTM m, MonadMonotonicTime m) =>
NodeId ->
TVar m (Maybe Time) ->
Network m (Heartbeat msg) ->
Network m msg
Network m (Heartbeat outbound) ->
Network m outbound
updateStateFromOutgoingMessages nodeId lastSent Network{broadcast} =
Network $ \msg -> do
now <- getMonotonicTime
Expand All @@ -172,7 +172,7 @@ checkHeartbeatState ::
) =>
NodeId ->
TVar m (Maybe Time) ->
Network m (Heartbeat msg) ->
Network m (Heartbeat outbound) ->
m ()
checkHeartbeatState nodeId lastSent Network{broadcast} =
forever $ do
Expand Down
17 changes: 9 additions & 8 deletions hydra-node/src/Hydra/Network/Ouroboros.hs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,13 @@ import Ouroboros.Network.Subscription.Ip (SubscriptionParams (..), WithIPList (W
import Ouroboros.Network.Subscription.Worker (LocalAddresses (LocalAddresses))

withOuroborosNetwork ::
forall msg.
(ToCBOR msg, FromCBOR msg) =>
Tracer IO (WithHost (TraceOuroborosNetwork msg)) ->
forall inbound outbound.
(ToCBOR outbound, FromCBOR outbound) =>
(ToCBOR inbound, FromCBOR inbound) =>
Tracer IO (WithHost (TraceOuroborosNetwork outbound)) ->
Host ->
[Host] ->
NetworkComponent IO msg msg ()
NetworkComponent IO inbound outbound ()
withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do
bchan <- newBroadcastTChanIO
let newBroadcastChannel = atomically $ dupTChan bchan
Expand Down Expand Up @@ -225,7 +226,7 @@ withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do
}

hydraClient ::
TChan msg ->
TChan outbound ->
OuroborosApplicationWithMinimalCtx 'InitiatorMode addr LByteString IO () Void
hydraClient chan =
OuroborosApplication
Expand Down Expand Up @@ -264,14 +265,14 @@ withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do
MiniProtocolLimits{maximumIngressQueue = maxBound}

client ::
TChan msg ->
FireForgetClient msg IO ()
TChan outbound ->
FireForgetClient outbound IO ()
client chan =
Idle $ do
atomically (readTChan chan) <&> \msg ->
SendMsg msg (pure $ client chan)

server :: FireForgetServer msg IO ()
server :: FireForgetServer inbound IO ()
server =
FireForgetServer
{ recvMsg = \msg -> networkCallback msg $> server
Expand Down
6 changes: 3 additions & 3 deletions hydra-node/src/Hydra/Network/Reliability.hs
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,14 @@ withReliability ::
-- | Tracer for logging messages.
Tracer m ReliabilityLog ->
-- | Our persistence handle
MessagePersistence m msg ->
MessagePersistence m outbound ->
-- | Our own party identifier.
Party ->
-- | Other parties' identifiers.
[Party] ->
-- | Underlying network component providing consuming and sending channels.
NetworkComponent m (Authenticated (ReliableMsg (Heartbeat msg))) (ReliableMsg (Heartbeat msg)) a ->
NetworkComponent m (Authenticated (Heartbeat msg)) (Heartbeat msg) a
NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound))) (ReliableMsg (Heartbeat outbound)) a ->
NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a
withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loadMessages} me otherParties withRawNetwork callback action = do
acksCache <- loadAcks >>= newTVarIO
sentMessages <- loadMessages >>= newTVarIO . Seq.fromList
Expand Down
8 changes: 5 additions & 3 deletions hydra-node/src/Hydra/Node/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ data NetworkConfiguration m = NetworkConfiguration

-- | Starts the network layer of a node, passing configured `Network` to its continuation.
withNetwork ::
forall msg tx.
(ToCBOR msg, ToJSON msg, FromJSON msg, FromCBOR msg) =>
-- | Tracer to use for logging messages.
Tracer IO (LogEntry tx msg) ->
Expand All @@ -125,7 +126,8 @@ withNetwork tracer connectionMessages configuration callback action = do
numberOfParties = length $ me : otherParties
messagePersistence <- configureMessagePersistence (contramap Node tracer) persistenceDir numberOfParties

let reliability =
let reliability :: NetworkComponent IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ()
reliability =
withFlipHeartbeats $
withReliability (contramap Reliability tracer) messagePersistence me otherParties $
withAuthentication (contramap Authentication tracer) signingKey otherParties $
Expand Down Expand Up @@ -161,8 +163,8 @@ configureMessagePersistence tracer persistenceDir numberOfParties = do
pure $ mkMessagePersistence numberOfParties msgPersistence ackPersistence'

withFlipHeartbeats ::
NetworkComponent m (Authenticated (Heartbeat msg)) msg1 a ->
NetworkComponent m (Heartbeat (Authenticated msg)) msg1 a
NetworkComponent m (Authenticated (Heartbeat inbound)) outbound a ->
NetworkComponent m (Heartbeat (Authenticated inbound)) outbound a
withFlipHeartbeats withBaseNetwork callback =
withBaseNetwork unwrapHeartbeats
where
Expand Down
34 changes: 28 additions & 6 deletions hydra-node/test/Hydra/Network/AuthenticateSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ spec = parallel $ do
receivedMessages <- newTVarIO []

withAuthentication
@(Message SimpleTx)
@(Message SimpleTx)
nullTracer
aliceSk
[bob]
Expand All @@ -53,6 +55,8 @@ spec = parallel $ do
receivedMessages <- newTVarIO []

withAuthentication
@(Message SimpleTx)
@(Message SimpleTx)
nullTracer
aliceSk
[bob]
Expand All @@ -73,6 +77,8 @@ spec = parallel $ do
receivedMessages <- newTVarIO []

withAuthentication
@(Message SimpleTx)
@(Message SimpleTx)
nullTracer
aliceSk
[bob, carol]
Expand All @@ -92,10 +98,18 @@ spec = parallel $ do
sentMsgs = runSimOrThrow $ do
sentMessages <- newTVarIO []

withAuthentication nullTracer bobSk [] (captureOutgoing sentMessages) noop $ \Network{broadcast} -> do
threadDelay 0.6
broadcast someMessage
threadDelay 1
withAuthentication
@(Message SimpleTx)
@(Message SimpleTx)
nullTracer
bobSk
[]
(captureOutgoing sentMessages)
noop
$ \Network{broadcast} -> do
threadDelay 0.6
broadcast someMessage
threadDelay 1

readTVarIO sentMessages

Expand All @@ -108,8 +122,16 @@ spec = parallel $ do
traces <- newTVarIO []

let tracer = traceInTVar traces "AuthenticateSpec"
withAuthentication tracer aliceSk [bob, carol] (\incoming _ -> incoming signedMsg) noop $ \_ ->
threadDelay 1
withAuthentication
@(Message SimpleTx)
@(Message SimpleTx)
tracer
aliceSk
[bob, carol]
(\incoming _ -> incoming signedMsg)
noop
$ \_ ->
threadDelay 1

readTVarIO traces

Expand Down

0 comments on commit b554d3d

Please sign in to comment.