Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
)
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CsClient
import Ouroboros.Consensus.MiniProtocol.ChainSync.Server
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound (objectDiffusionInbound)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1 (objectDiffusionInbound)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound (objectDiffusionOutbound)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.PerasCert
Expand Down
84 changes: 84 additions & 0 deletions ouroboros-consensus/bench/ObjectDiffusion-bench/Main.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE TypeApplications #-}

-- | This module contains benchmarks for Peras Object diffusion decision logic
-- as implemented by the by the function
-- 'Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Decision.makeDecision'
module Main (main) where

import Control.DeepSeq (NFData (..))
import Control.Exception (evaluate)
import Data.Hashable (Hashable)
import GHC.Generics (Generic)
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Decision qualified as OD
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.TestUtils qualified as OD
import Test.QuickCheck (Arbitrary (..), generate)
import Test.QuickCheck.Gen (vectorOf)
import Test.Tasty.Bench

-- TODO: We will probably want to use the actual types used in vote/cert diffusion,
-- instead of placeholders.
newtype DummyPeerAddr = DummyPeerAddr Int
deriving (Eq, Ord, Generic, NFData)

instance Arbitrary DummyPeerAddr where
arbitrary = DummyPeerAddr <$> arbitrary

newtype DummyObjectId = DummyObjectId Int
deriving (Eq, Ord, Generic, Hashable, NFData)

instance Arbitrary DummyObjectId where
arbitrary = DummyObjectId <$> arbitrary

data DummyObject = DummyObject
{ doId :: DummyObjectId
, doPayload :: ()
}
deriving (Eq, Ord, Generic, Hashable, NFData)

instance Arbitrary DummyObject where
arbitrary = DummyObject <$> arbitrary <*> arbitrary

-- TODO: We should probably use specific policies that are well suited to the
-- number of peers and objects.

main :: IO ()
main =
defaultMain
[ bgroup
"ouroboros-consensus:ObjectDiffusion"
[ bgroup
"VoteDiffusion"
[ env
(genToNF $ vectorOf 1_000 $ OD.genDecisionContext 10 50 doId Nothing)
( \contexts ->
bench "makeDecisions: 1000 decisions with (10 pairs, 50 objects) each" $
nf (fmap makeVoteDiffusionDecisions) contexts
)
, env
(genToNF $ vectorOf 1_000 $ OD.genDecisionContext 100 500 doId Nothing)
( \contexts ->
bench "makeDecisions: 1000 decisions with (100 pairs, 500 objects) each" $
nf (fmap makeVoteDiffusionDecisions) contexts
)
, env
(genToNF $ vectorOf 1_000 $ OD.genDecisionContext 1_000 5_000 doId Nothing)
( \contexts ->
bench "makeDecisions: 1000 decisions with (1000 pairs, 5000 objects) each" $
nf (fmap makeVoteDiffusionDecisions) contexts
)
]
, bgroup "CertDiffusion" []
]
]
where
genToNF gen = do
x <- generate gen
evaluate $ rnf x
pure $! x

makeVoteDiffusionDecisions decisionContext =
OD.makeDecisions @DummyPeerAddr @DummyObjectId @DummyObject decisionContext
25 changes: 24 additions & 1 deletion ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,13 @@ library
Ouroboros.Consensus.MiniProtocol.LocalStateQuery.Server
Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server
Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Decision
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Registry
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.State
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.TestUtils
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Types
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound
Expand Down Expand Up @@ -312,6 +318,7 @@ library

build-depends:
FailT ^>=0.1.2,
QuickCheck,
aeson,
base >=4.14 && <4.22,
base-deriving-via,
Expand Down Expand Up @@ -348,6 +355,8 @@ library
primitive,
psqueues ^>=0.2.3,
quiet ^>=0.2,
random,
random-shuffle,
rawlock ^>=0.1.1,
resource-registry ^>=0.2,
semialign >=1.1,
Expand Down Expand Up @@ -920,6 +929,20 @@ benchmark PerasCertDB-bench
tasty-bench,
unstable-consensus-testlib,

benchmark ObjectDiffusion-bench
import: common-bench
type: exitcode-stdio-1.0
hs-source-dirs: bench/ObjectDiffusion-bench
main-is: Main.hs
other-modules:
build-depends:
QuickCheck,
base,
deepseq,
hashable,
ouroboros-consensus,
tasty-bench,

test-suite doctest
import: common-test
main-is: doctest.hs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
-- way to diffuse objects for Peras, i.e. Peras votes and certificates.
--
-- See the design document here: https://tweag.github.io/cardano-peras/peras-design.pdf
module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound
module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1
( objectDiffusionInbound
, TraceObjectDiffusionInbound (..)
, ObjectDiffusionInboundError (..)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2
( -- * ObjectDiffusion Inbound client
objectDiffusionInbound

-- * PeerStateAPI
, withObjectDiffusionInboundPeer
, PeerStateAPI

-- * Supporting types
, module V2
, PeerDecisionChannelsVar
, newPeerDecisionChannelsVar
, DecisionPolicy (..)
) where

import Control.Concurrent.Class.MonadSTM (MonadSTM, atomically)
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer, traceWith)
import Data.List.NonEmpty qualified as NonEmpty
import Data.Set qualified as Set
import Network.TypedProtocol
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Registry
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Types as V2
import Ouroboros.Network.ControlMessage (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound

-- | A object-diffusion inbound side (client).
--
-- The steps are as follow
-- 1. Block on next decision from the decision logic
-- 2. Handle any available reply (`goCollect`)
-- 3. Request new objects if possible (`goReqObjects`)
-- 4. Request new ids (also responsible for ack) (`goReqIds`)
-- 5. Signal psaOnDecisionCompleted (as part of `goReqIds{Blocking,NonBlocking}`)
-- And loop again
--
-- The architecture/code org of this module should make sure we don't go again
-- into `goIdle` until `psaOnDecisionCompleted` has been called
--
-- NOTE: each `go____` function is responsible for calling the next one in order
-- to continue the protocol.
-- E.g. `goReqObjects` will call `goReqIds` whatever the outcome of its logic is.
objectDiffusionInbound ::
forall objectId object m.
( MonadThrow m
, MonadSTM m
) =>
Tracer m (TraceObjectDiffusionInbound objectId object) ->
ControlMessageSTM m ->
PeerStateAPI m objectId object ->
ObjectDiffusionInboundPipelined objectId object m ()
objectDiffusionInbound
tracer
controlMessageSTM
PeerStateAPI
{ psaReadDecision
, psaOnDecisionCompleted
, psaOnRequestIds
, psaOnRequestObjects
, psaOnReceiveIds
, psaOnReceiveObjects
} =
ObjectDiffusionInboundPipelined $ goIdle Zero
where
goIdle :: forall (n :: N). Nat n -> InboundStIdle n objectId object m ()
goIdle n = WithEffect $ do
ctrlMsg <- atomically controlMessageSTM
traceWith tracer $ TraceObjectDiffusionInboundReceivedControlMessage ctrlMsg
case ctrlMsg of
-- The peer selection governor is asking us to terminate the connection.
Terminate ->
pure $ terminateAfterDrain n
-- Otherwise, we can continue the protocol normally.
_continue -> do
-- Block on next decision.
decision <- psaReadDecision
traceWith tracer (TraceObjectDiffusionInboundReceivedDecision decision)
pure $ goCollect n decision

-- \| Block until all replies of pipelined requests have been received, then
-- sends `MsgDone` to terminate the protocol.
terminateAfterDrain ::
Nat n -> InboundStIdle n objectId object m ()
terminateAfterDrain = \case
Zero -> WithEffect $ do
traceWith tracer TraceObjectDiffusionInboundTerminated
pure $ SendMsgDone ()
Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> terminateAfterDrain n

-- \| Handle potential available replies before continuing with `goReqObjects`.
--
-- If there are no pipelined requests, this will directly call `goReqObjects`.
-- If there are pipelined requests, it will collect as many replies as
-- possible before continuing with `goReqObjects` once no more replies are
-- immediately available.
goCollect :: Nat n -> PeerDecision objectId object -> InboundStIdle n objectId object m ()
goCollect Zero decision =
goReqObjects Zero decision
goCollect (Succ n) decision =
CollectPipelined
(Just $ goReqObjects (Succ n) decision)
( \case
CollectObjectIds numIdsRequested ids -> WithEffect $ do
psaOnReceiveIds numIdsRequested ids
pure $ goCollect n decision
CollectObjects _objectIds objects -> WithEffect $ do
-- TODO: We could try to validate objects here, i.e.
-- as early as possible, instead of validating them when adding
-- them to the ObjectPool, in order to pivot away from
-- adversarial peers as soon as possible.
psaOnReceiveObjects objects
pure $ goCollect n decision
)

-- \| Request objects, if the set of ids of objects to request in the
-- decision is non-empty.
-- Regardless, it will ultimately call `goReqIds`.
goReqObjects ::
Nat n ->
PeerDecision objectId object ->
InboundStIdle n objectId object m ()
goReqObjects n decision = do
let objectIds = rodObjectsToReqIds (pdReqObjects decision)
if Set.null objectIds
then
goReqIds n decision
else WithEffect $ do
psaOnRequestObjects objectIds
pure $
SendMsgRequestObjectsPipelined
(Set.toList objectIds)
(goReqIds (Succ n) decision)

-- \| Request objectIds, either in a blocking or pipelined fashion depending
-- on the decision's `ridCanPipelineIdsRequests` flag.
-- In both cases, once done, we will ultimately call `psaOnDecisionCompleted`
-- and return to `goIdle`.
goReqIds ::
forall (n :: N).
Nat n ->
PeerDecision objectId object ->
InboundStIdle n objectId object m ()
goReqIds n decision = do
let canPipelineIdRequests = ridCanPipelineIdsRequests (pdReqIds decision)
if canPipelineIdRequests
then goReqIdsPipelined n decision
else case n of
Zero -> goReqIdsBlocking decision
Succ{} -> error "Impossible to have pipelined requests when we have no known unacknowledged objectIds"

-- \| Request objectIds in a blocking fashion if the number to request in the
-- decision is non-zero.
-- Regardless, it will ultimately call `psaOnDecisionCompleted` and return to
-- `goIdle`.
goReqIdsBlocking ::
PeerDecision objectId object ->
InboundStIdle Z objectId object m ()
goReqIdsBlocking decision = WithEffect $ do
let numIdsToAck = ridNumIdsToAck (pdReqIds decision)
let numIdsToReq = ridNumIdsToReq (pdReqIds decision)
if numIdsToReq == 0
then do
psaOnDecisionCompleted
pure $ goIdle Zero
else do
psaOnRequestIds numIdsToAck numIdsToReq
psaOnDecisionCompleted
pure $
SendMsgRequestObjectIdsBlocking
numIdsToAck
numIdsToReq
( \objectIds -> WithEffect $ do
psaOnReceiveIds numIdsToReq (NonEmpty.toList objectIds)
pure $ goIdle Zero
)

-- \| Request objectIds in a pipelined fashion if the number to request in the
-- decision is non-zero.
-- Regardless, it will ultimately call `psaOnDecisionCompleted` and return to
-- `goIdle`.
goReqIdsPipelined ::
forall (n :: N).
Nat n ->
PeerDecision objectId object ->
InboundStIdle n objectId object m ()
goReqIdsPipelined n decision = WithEffect $ do
let numIdsToAck = ridNumIdsToAck (pdReqIds decision)
let numIdsToReq = ridNumIdsToReq (pdReqIds decision)
if numIdsToReq == 0
then do
psaOnDecisionCompleted
pure $ goIdle n
else do
psaOnRequestIds numIdsToAck numIdsToReq
psaOnDecisionCompleted
pure $
SendMsgRequestObjectIdsPipelined
numIdsToAck
numIdsToReq
(goIdle (Succ n))
Loading
Loading