Skip to content

Commit 980f608

Browse files
Merge pull request #1396 from input-output-hk/refactor-connectivity
Refactor connectivity
2 parents 4bbbe28 + d4a0b58 commit 980f608

File tree

18 files changed

+293
-187
lines changed

18 files changed

+293
-187
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ changes.
1515

1616
- _DEPRECATED_ the `GetUTxO` client input and `GetUTxOResponse` server output. Use `GET /snapshot/utxo` instead.
1717

18+
- `hydra-node` logs will now report `NetworkEvents` to distinguish between `ConnectivityEvent`s and `ReceivedMessage`s on the network.
19+
1820
## [0.17.0] - UNRELEASED
1921

2022
- **BREAKING** `hydra-node` `/commit` enpoint now also accepts a _blueprint/draft_

hydra-node/json-schemas/logs.yaml

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1578,20 +1578,17 @@ definitions:
15781578
required:
15791579
- tag
15801580
- ttl
1581-
- message
1582-
- party
1581+
- networkEvent
15831582
description: >-
1584-
Input representing some message received from peers on the network.
1583+
Input representing some network event message received from peers on the network.
15851584
properties:
15861585
tag:
15871586
type: string
15881587
enum: ["NetworkInput"]
1589-
message:
1590-
$ref: "logs.yaml#/definitions/Message"
1588+
networkEvent:
1589+
$ref: "logs.yaml#/definitions/NetworkEvent"
15911590
ttl:
15921591
type: number
1593-
party:
1594-
$ref: "api.yaml#/components/schemas/Party"
15951592
- title: ChainInput
15961593
type: object
15971594
additionalProperties: false
@@ -1608,6 +1605,62 @@ definitions:
16081605
chainEvent:
16091606
$ref: "logs.yaml#/definitions/ChainEvent"
16101607

1608+
NetworkEvent:
1609+
oneOf:
1610+
- title: ConnectivityEvent
1611+
type: object
1612+
additionalProperties: false
1613+
required:
1614+
- tag
1615+
- contents
1616+
properties:
1617+
tag:
1618+
type: string
1619+
enum: ["ConnectivityEvent"]
1620+
contents:
1621+
oneOf:
1622+
- title: Connected
1623+
type: object
1624+
additionalProperties: false
1625+
required:
1626+
- tag
1627+
- nodeId
1628+
properties:
1629+
tag:
1630+
type: string
1631+
enum: ["Connected"]
1632+
nodeId:
1633+
type: string
1634+
1635+
- title: Disconnected
1636+
type: object
1637+
additionalProperties: false
1638+
required:
1639+
- tag
1640+
- nodeId
1641+
properties:
1642+
tag:
1643+
type: string
1644+
enum: ["Disconnected"]
1645+
nodeId:
1646+
type: string
1647+
1648+
- title: ReceivedMessage
1649+
type: object
1650+
additionalProperties: false
1651+
required:
1652+
- tag
1653+
- sender
1654+
- msg
1655+
properties:
1656+
tag:
1657+
type: string
1658+
enum: ["ReceivedMessage"]
1659+
sender:
1660+
$ref: "api.yaml#/components/schemas/Party"
1661+
msg:
1662+
$ref: "logs.yaml#/definitions/Message"
1663+
16111664
Message:
16121665
description: >-
16131666
Messages exchanged by Hydra network peers over a broadcasting network.

hydra-node/src/Hydra/HeadLogic.hs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,21 @@ import Hydra.Ledger (
8888
applyTransactions,
8989
txId,
9090
)
91-
import Hydra.Network.Message (Message (..))
91+
import Hydra.Network.Message (Connectivity (..), Message (..), NetworkEvent (..))
9292
import Hydra.OnChainId (OnChainId)
9393
import Hydra.Party (Party (vkey))
9494
import Hydra.Snapshot (ConfirmedSnapshot (..), Snapshot (..), SnapshotNumber, getSnapshot)
9595

9696
defaultTTL :: TTL
9797
defaultTTL = 5
9898

99+
onConnectionEvent :: Connectivity -> Outcome tx
100+
onConnectionEvent = \case
101+
Connected{nodeId} ->
102+
causes [ClientEffect (ServerOutput.PeerConnected nodeId)]
103+
Disconnected{nodeId} ->
104+
causes [ClientEffect (ServerOutput.PeerDisconnected nodeId)]
105+
99106
-- * The Coordinated Head protocol
100107

101108
-- ** On-Chain Protocol
@@ -691,6 +698,8 @@ update ::
691698
Input tx ->
692699
Outcome tx
693700
update env ledger st ev = case (st, ev) of
701+
(_, NetworkInput _ (ConnectivityEvent conn)) ->
702+
onConnectionEvent conn
694703
(Idle _, ClientInput Init) ->
695704
onIdleClientInit env
696705
(Idle _, ChainInput Observation{observedTx = OnInitTx{headId, headSeed, headParameters, participants}, newChainState}) ->
@@ -713,14 +722,14 @@ update env ledger st ev = case (st, ev) of
713722
onOpenClientClose openState
714723
(Open{}, ClientInput (NewTx tx)) ->
715724
onOpenClientNewTx tx
716-
(Open openState, NetworkInput ttl _ (ReqTx tx)) ->
725+
(Open openState, NetworkInput ttl (ReceivedMessage{msg = ReqTx tx})) ->
717726
onOpenNetworkReqTx env ledger openState ttl tx
718-
(Open openState, NetworkInput _ otherParty (ReqSn sn txIds)) ->
727+
(Open openState, NetworkInput _ (ReceivedMessage{sender, msg = ReqSn sn txIds})) ->
719728
-- XXX: ttl == 0 not handled for ReqSn
720-
onOpenNetworkReqSn env ledger openState otherParty sn txIds
721-
(Open openState, NetworkInput _ otherParty (AckSn snapshotSignature sn)) ->
729+
onOpenNetworkReqSn env ledger openState sender sn txIds
730+
(Open openState, NetworkInput _ (ReceivedMessage{sender, msg = AckSn snapshotSignature sn})) ->
722731
-- XXX: ttl == 0 not handled for AckSn
723-
onOpenNetworkAckSn env openState otherParty snapshotSignature sn
732+
onOpenNetworkAckSn env openState sender snapshotSignature sn
724733
( Open openState@OpenState{headId = ourHeadId}
725734
, ChainInput Observation{observedTx = OnCloseTx{headId, snapshotNumber = closedSnapshotNumber, contestationDeadline}, newChainState}
726735
)

hydra-node/src/Hydra/HeadLogic/Input.hs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ import Hydra.Prelude
44

55
import Hydra.API.ClientInput (ClientInput)
66
import Hydra.Chain (ChainEvent, IsChainState)
7-
import Hydra.Network.Message (Message)
8-
import Hydra.Party (Party)
7+
import Hydra.Network.Message (Message, NetworkEvent)
98

109
type TTL = Natural
1110

@@ -19,7 +18,7 @@ data Input tx
1918
--
2019
-- * `ttl` is a simple counter that's decreased every time the event is
2120
-- reenqueued due to a wait. It's default value is `defaultTTL`
22-
NetworkInput {ttl :: TTL, party :: Party, message :: Message tx}
21+
NetworkInput {ttl :: TTL, networkEvent :: NetworkEvent (Message tx)}
2322
| -- | Input received from the chain via a "Hydra.Chain".
2423
ChainInput {chainEvent :: ChainEvent tx}
2524
deriving stock (Generic)

hydra-node/src/Hydra/Logging/Monitoring.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import Hydra.HeadLogic (
2525
import Hydra.Ledger (IsTx (TxIdType), txId)
2626
import Hydra.Logging.Messages (HydraLog (..))
2727
import Hydra.Network (PortNumber)
28-
import Hydra.Network.Message (Message (ReqTx))
28+
import Hydra.Network.Message (Message (ReqTx), NetworkEvent (..))
2929
import Hydra.Node (HydraNodeLog (BeginEffect, BeginInput, EndInput, input))
3030
import Hydra.Snapshot (Snapshot (confirmed))
3131
import System.Metrics.Prometheus.Http.Scrape (serveMetrics)
@@ -89,7 +89,7 @@ monitor ::
8989
HydraLog tx net ->
9090
m ()
9191
monitor transactionsMap metricsMap = \case
92-
(Node BeginInput{input = NetworkInput _ _ (ReqTx tx)}) -> do
92+
(Node BeginInput{input = NetworkInput _ (ReceivedMessage{msg = ReqTx tx})}) -> do
9393
t <- getMonotonicTime
9494
-- NOTE: If a requested transaction never gets confirmed, it might stick
9595
-- forever in the transactions map which could lead to unbounded growth and

hydra-node/src/Hydra/Network/Heartbeat.hs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,6 @@ heartbeatDelay = 0.5
7878
livenessDelay :: DiffTime
7979
livenessDelay = 3
8080

81-
type ConnectionMessages m = Connectivity -> m ()
82-
8381
-- | Wrap a lower-level `NetworkComponent` and handle sending/receiving of heartbeats.
8482
--
8583
-- Note that the type of consumed and sent messages can be different.
@@ -89,35 +87,34 @@ withHeartbeat ::
8987
) =>
9088
-- | This node's id, used to identify `Heartbeat` messages broadcast to peers.
9189
NodeId ->
92-
-- | Callback listening to peers' status change as computed by the `withIncomingHeartbeat` layer.
93-
ConnectionMessages m ->
9490
-- | Underlying `NetworkComponent` for sending and consuming `Heartbeat` messages.
9591
NetworkComponent m (Heartbeat inbound) (Heartbeat outbound) a ->
9692
-- | Returns a network component that can be used to send and consume arbitrary messages.
9793
-- This layer will take care of peeling out/wrapping messages into `Heartbeat`s.
98-
NetworkComponent m inbound outbound a
99-
withHeartbeat nodeId connectionMessages withNetwork callback action = do
94+
NetworkComponent m (Either Connectivity inbound) outbound a
95+
withHeartbeat nodeId withNetwork callback action = do
10096
heartbeat <- newTVarIO initialHeartbeatState
10197
lastSent <- newTVarIO Nothing
102-
withNetwork (updateStateFromIncomingMessages heartbeat connectionMessages callback) $ \network ->
103-
withAsync (checkRemoteParties heartbeat connectionMessages) $ \_ ->
98+
withNetwork (updateStateFromIncomingMessages heartbeat callback) $ \network ->
99+
withAsync (checkRemoteParties heartbeat onConnectivityChanged) $ \_ ->
104100
withAsync (checkHeartbeatState nodeId lastSent network) $ \_ ->
105101
action (updateStateFromOutgoingMessages nodeId lastSent network)
102+
where
103+
onConnectivityChanged = callback . Left
106104

107105
updateStateFromIncomingMessages ::
108106
(MonadSTM m, MonadMonotonicTime m) =>
109107
TVar m HeartbeatState ->
110-
ConnectionMessages m ->
111-
NetworkCallback inbound m ->
108+
NetworkCallback (Either Connectivity inbound) m ->
112109
NetworkCallback (Heartbeat inbound) m
113-
updateStateFromIncomingMessages heartbeatState connectionMessages callback = \case
114-
Data nodeId msg -> notifyAlive nodeId >> callback msg
110+
updateStateFromIncomingMessages heartbeatState callback = \case
111+
Data nodeId msg -> notifyAlive nodeId >> callback (Right msg)
115112
Ping nodeId -> notifyAlive nodeId
116113
where
117114
notifyAlive peer = do
118115
now <- getMonotonicTime
119116
aliveSet <- alive <$> readTVarIO heartbeatState
120-
unless (peer `Map.member` aliveSet) $ connectionMessages (Connected peer)
117+
unless (peer `Map.member` aliveSet) $ callback (Left $ Connected peer)
121118
atomically $
122119
modifyTVar' heartbeatState $ \s ->
123120
s
@@ -165,14 +162,14 @@ checkRemoteParties ::
165162
, MonadSTM m
166163
) =>
167164
TVar m HeartbeatState ->
168-
ConnectionMessages m ->
165+
(Connectivity -> m ()) ->
169166
m ()
170-
checkRemoteParties heartbeatState connectionMessages =
167+
checkRemoteParties heartbeatState onConnectivity =
171168
forever $ do
172169
threadDelay (heartbeatDelay * 2)
173170
now <- getMonotonicTime
174171
updateSuspected heartbeatState now
175-
>>= mapM_ (connectionMessages . Disconnected)
172+
>>= mapM_ (onConnectivity . Disconnected)
176173

177174
updateSuspected :: MonadSTM m => TVar m HeartbeatState -> Time -> m (Set NodeId)
178175
updateSuspected heartbeatState now =

hydra-node/src/Hydra/Network/Message.hs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,27 @@ import Cardano.Crypto.Util (SignableRepresentation, getSignableRepresentation)
99
import Hydra.Crypto (Signature)
1010
import Hydra.Ledger (IsTx (TxIdType), UTxOType)
1111
import Hydra.Network (NodeId)
12+
import Hydra.Party (Party)
1213
import Hydra.Snapshot (Snapshot, SnapshotNumber)
1314

15+
data NetworkEvent msg
16+
= ConnectivityEvent Connectivity
17+
| ReceivedMessage {sender :: Party, msg :: msg}
18+
deriving stock (Eq, Show, Generic)
19+
deriving anyclass (ToJSON, FromJSON)
20+
21+
instance Arbitrary msg => Arbitrary (NetworkEvent msg) where
22+
arbitrary = genericArbitrary
23+
1424
data Connectivity
1525
= Connected {nodeId :: NodeId}
1626
| Disconnected {nodeId :: NodeId}
1727
deriving stock (Generic, Eq, Show)
1828
deriving anyclass (ToJSON, FromJSON)
1929

30+
instance Arbitrary Connectivity where
31+
arbitrary = genericArbitrary
32+
2033
data Message tx
2134
= ReqTx {transaction :: tx}
2235
| ReqSn {snapshotNumber :: SnapshotNumber, transactionIds :: [TxIdType tx]}

hydra-node/src/Hydra/Node.hs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ import Hydra.HeadLogic.State (getHeadParameters)
5252
import Hydra.Ledger (Ledger)
5353
import Hydra.Logging (Tracer, traceWith)
5454
import Hydra.Network (Network (..))
55-
import Hydra.Network.Authenticate (Authenticated (..))
56-
import Hydra.Network.Message (Message)
55+
import Hydra.Network.Message (Message, NetworkEvent (..))
5756
import Hydra.Node.InputQueue (InputQueue (..), Queued (..), createInputQueue)
5857
import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..))
5958
import Hydra.Options (ChainConfig (..), DirectChainConfig (..), RunOptions (..), defaultContestationPeriod)
@@ -202,9 +201,8 @@ wireClientInput node = enqueue . ClientInput
202201
where
203202
DraftHydraNode{inputQueue = InputQueue{enqueue}} = node
204203

205-
wireNetworkInput :: DraftHydraNode tx m -> (Authenticated (Message tx) -> m ())
206-
wireNetworkInput node (Authenticated msg otherParty) =
207-
enqueue $ NetworkInput defaultTTL otherParty msg
204+
wireNetworkInput :: DraftHydraNode tx m -> NetworkEvent (Message tx) -> m ()
205+
wireNetworkInput node = enqueue . NetworkInput defaultTTL
208206
where
209207
DraftHydraNode{inputQueue = InputQueue{enqueue}} = node
210208

@@ -275,8 +273,8 @@ stepHydraNode node = do
275273
where
276274
maybeReenqueue q@Queued{queuedId, queuedItem} =
277275
case queuedItem of
278-
NetworkInput ttl aParty msg
279-
| ttl > 0 -> reenqueue waitDelay q{queuedItem = NetworkInput (ttl - 1) aParty msg}
276+
NetworkInput ttl msg
277+
| ttl > 0 -> reenqueue waitDelay q{queuedItem = NetworkInput (ttl - 1) msg}
280278
_ -> traceWith tracer $ DroppedFromQueue{inputId = queuedId, input = queuedItem}
281279

282280
Environment{party} = env
@@ -331,7 +329,7 @@ processEffects node tracer inputId effects = do
331329
traceWith tracer $ BeginEffect party inputId effectId effect
332330
case effect of
333331
ClientEffect i -> sendOutput server i
334-
NetworkEffect msg -> broadcast hn msg >> enqueue (NetworkInput defaultTTL party msg)
332+
NetworkEffect msg -> broadcast hn msg >> enqueue (NetworkInput defaultTTL (ReceivedMessage{sender = party, msg}))
335333
OnChainEffect{postChainTx} ->
336334
postTx postChainTx
337335
`catch` \(postTxError :: PostTxError tx) ->

hydra-node/src/Hydra/Node/Network.hs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,13 @@ import Hydra.Prelude hiding (fromList, replicate)
7373

7474
import Control.Tracer (Tracer)
7575
import Hydra.Crypto (HydraKey, SigningKey)
76+
import Hydra.Ledger (IsTx)
7677
import Hydra.Logging (traceWith)
7778
import Hydra.Logging.Messages (HydraLog (..))
7879
import Hydra.Network (Host (..), IP, NetworkComponent, NodeId, PortNumber)
79-
import Hydra.Network.Authenticate (Authenticated (Authenticated), Signed, withAuthentication)
80-
import Hydra.Network.Heartbeat (ConnectionMessages, Heartbeat (..), withHeartbeat)
80+
import Hydra.Network.Authenticate (Authenticated (..), Signed, withAuthentication)
81+
import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat)
82+
import Hydra.Network.Message (Connectivity, Message, NetworkEvent (..))
8183
import Hydra.Network.Ouroboros (TraceOuroborosNetwork, WithHost, withOuroborosNetwork)
8284
import Hydra.Network.Reliability (MessagePersistence, ReliableMsg, mkMessagePersistence, withReliability)
8385
import Hydra.Node (HydraNodeLog (..))
@@ -110,34 +112,36 @@ data NetworkConfiguration m = NetworkConfiguration
110112

111113
-- | Starts the network layer of a node, passing configured `Network` to its continuation.
112114
withNetwork ::
113-
forall msg tx.
114-
(ToCBOR msg, ToJSON msg, FromJSON msg, FromCBOR msg) =>
115+
forall tx.
116+
IsTx tx =>
115117
-- | Tracer to use for logging messages.
116-
Tracer IO (LogEntry tx msg) ->
117-
-- | Callback/observer for connectivity changes in peers.
118-
ConnectionMessages IO ->
118+
Tracer IO (LogEntry tx (Message tx)) ->
119119
-- | The network configuration
120120
NetworkConfiguration IO ->
121121
-- | Produces a `NetworkComponent` that can send `msg` and consumes `Authenticated` @msg@.
122-
NetworkComponent IO (Authenticated msg) msg ()
123-
withNetwork tracer connectionMessages configuration callback action = do
122+
NetworkComponent IO (NetworkEvent (Message tx)) (Message tx) ()
123+
withNetwork tracer configuration callback action = do
124124
let localhost = Host{hostname = show host, port}
125125
me = deriveParty signingKey
126126
numberOfParties = length $ me : otherParties
127127
messagePersistence <- configureMessagePersistence (contramap Node tracer) persistenceDir numberOfParties
128128

129-
let reliability :: NetworkComponent IO (Heartbeat (Authenticated msg)) (Heartbeat msg) ()
130-
reliability =
129+
let reliability =
131130
withFlipHeartbeats $
132131
withReliability (contramap Reliability tracer) messagePersistence me otherParties $
133132
withAuthentication (contramap Authentication tracer) signingKey otherParties $
134133
withOuroborosNetwork (contramap Network tracer) localhost peers
135134

136-
withHeartbeat nodeId connectionMessages reliability callback $ \network ->
135+
withHeartbeat nodeId reliability (callback . mapHeartbeat) $ \network ->
137136
action network
138137
where
139138
NetworkConfiguration{persistenceDir, signingKey, otherParties, host, port, peers, nodeId} = configuration
140139

140+
mapHeartbeat :: Either Connectivity (Authenticated (Message tx)) -> NetworkEvent (Message tx)
141+
mapHeartbeat = \case
142+
Left connectivity -> ConnectivityEvent connectivity
143+
Right (Authenticated{payload, party}) -> ReceivedMessage{sender = party, msg = payload}
144+
141145
-- | Create `MessagePersistence` handle to be used by `Reliability` network layer.
142146
--
143147
-- This function will `throw` a `ParameterMismatch` exception if:

0 commit comments

Comments
 (0)