Skip to content

Commit bab4a6a

Browse files
committed
Change how last starvation is recorded
Previously, we only registered the time at which starvation started. This is in fact not enough: if after the grace period the peer is still not making us unstarved, we won't detect it. Instead, we record the last starvation as either ongoing or we record its end time.
1 parent 33e98b5 commit bab4a6a

File tree

7 files changed

+54
-25
lines changed

7 files changed

+54
-25
lines changed

ouroboros-consensus-diffusion/test/consensus-test/Test/Consensus/PeerSimulator/Trace.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ import Ouroboros.Network.AnchoredFragment (AnchoredFragment,
4848
headPoint)
4949
import qualified Ouroboros.Network.AnchoredFragment as AF
5050
import Ouroboros.Network.Block (SlotNo (SlotNo), Tip, castPoint)
51+
import Ouroboros.Network.BlockFetch.ConsensusInterface
52+
(ChainSelStarvation (..))
5153
import Test.Consensus.PointSchedule.NodeState (NodeState)
5254
import Test.Consensus.PointSchedule.Peers (Peer (Peer), PeerId)
5355
import Test.Util.TersePrinting (terseAnchor, terseBlock,
@@ -369,6 +371,8 @@ traceChainDBEventTestBlockWith tracer = \case
369371
AddedReprocessLoEBlocksToQueue ->
370372
trace $ "Requested ChainSel run"
371373
_ -> pure ()
374+
ChainDB.TraceChainSelStarvation ChainSelStarvationOngoing -> trace "ChainSel starved"
375+
ChainDB.TraceChainSelStarvation (ChainSelStarvationEndedAt time) -> trace $ "ChainSel starvation ended at " ++ prettyTime time
372376
_ -> pure ()
373377
where
374378
trace = traceUnitWith tracer "ChainDB"

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
4242
import qualified Ouroboros.Network.AnchoredFragment as AF
4343
import Ouroboros.Network.Block (MaxSlotNo)
4444
import Ouroboros.Network.BlockFetch.ConsensusInterface
45-
(BlockFetchConsensusInterface (..), FetchMode (..),
45+
(BlockFetchConsensusInterface (..),
46+
ChainSelStarvation (..), FetchMode (..),
4647
FromConsensus (..), WhetherReceivingTentativeBlocks (..))
4748
import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers,
4849
requiresBootstrapPeers)
@@ -56,7 +57,7 @@ data ChainDbView m blk = ChainDbView {
5657
, getIsFetched :: STM m (Point blk -> Bool)
5758
, getMaxSlotNo :: STM m MaxSlotNo
5859
, addBlockWaitWrittenToDisk :: InvalidBlockPunishment m -> blk -> m Bool
59-
, getLastTimeStarved :: STM m Time
60+
, getChainSelStarvation :: STM m ChainSelStarvation
6061
}
6162

6263
defaultChainDbView :: IOLike m => ChainDB m blk -> ChainDbView m blk
@@ -65,7 +66,7 @@ defaultChainDbView chainDB = ChainDbView {
6566
, getIsFetched = ChainDB.getIsFetched chainDB
6667
, getMaxSlotNo = ChainDB.getMaxSlotNo chainDB
6768
, addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB
68-
, getLastTimeStarved = ChainDB.getLastTimeStarved chainDB
69+
, getChainSelStarvation = ChainDB.getChainSelStarvation chainDB
6970
}
7071

7172
-- | How to get the wall-clock time of a slot. Note that this is a very
@@ -351,7 +352,7 @@ mkBlockFetchConsensusInterface
351352
headerForgeUTCTime = slotForgeTime . headerRealPoint . unFromConsensus
352353
blockForgeUTCTime = slotForgeTime . blockRealPoint . unFromConsensus
353354

354-
lastChainSelStarvation = getLastTimeStarved chainDB
355+
readChainSelStarvation = getChainSelStarvation chainDB
355356

356357
demoteCSJDynamo :: peer -> m ()
357358
demoteCSJDynamo = void . atomically . Jumping.rotateDynamo csHandlesCol

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ import qualified Ouroboros.Network.AnchoredFragment as AF
9191
import Ouroboros.Network.Block (ChainUpdate, MaxSlotNo,
9292
Serialised (..))
9393
import qualified Ouroboros.Network.Block as Network
94+
import Ouroboros.Network.BlockFetch.ConsensusInterface
95+
(ChainSelStarvation (..))
9496
import Ouroboros.Network.Mock.Chain (Chain (..))
9597
import qualified Ouroboros.Network.Mock.Chain as Chain
9698
import System.FS.API.Types (FsError)
@@ -334,9 +336,9 @@ data ChainDB m blk = ChainDB {
334336
-- invalid block is detected. These blocks are likely to be valid.
335337
, getIsInvalidBlock :: STM m (WithFingerprint (HeaderHash blk -> Maybe (InvalidBlockReason blk)))
336338

337-
-- | The last time we starved the chainsel thread. this is used by the
338-
-- blockfetch decision logic to demote peers.
339-
, getLastTimeStarved :: STM m Time
339+
-- | Whether ChainSel is currently starved, or when was last time it
340+
-- stopped being starved.
341+
, getChainSelStarvation :: STM m ChainSelStarvation
340342

341343
, closeDB :: m ()
342344

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ import Ouroboros.Consensus.Util.ResourceRegistry (WithTempRegistry,
6868
import Ouroboros.Consensus.Util.STM (Fingerprint (..),
6969
WithFingerprint (..))
7070
import qualified Ouroboros.Network.AnchoredFragment as AF
71+
import Ouroboros.Network.BlockFetch.ConsensusInterface
72+
(ChainSelStarvation (..))
7173

7274
{-------------------------------------------------------------------------------
7375
Initialization
@@ -176,7 +178,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
176178
copyFuse <- newFuse "copy to immutable db"
177179
chainSelFuse <- newFuse "chain selection"
178180
chainSelQueue <- newChainSelQueue (Args.cdbsBlocksToAddSize cdbSpecificArgs)
179-
varLastTimeStarved <- newTVarIO =<< getMonotonicTime
181+
varChainSelStarvation <- newTVarIO ChainSelStarvationOngoing
180182

181183
let env = CDB { cdbImmutableDB = immutableDB
182184
, cdbVolatileDB = volatileDB
@@ -201,7 +203,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
201203
, cdbChainSelQueue = chainSelQueue
202204
, cdbFutureBlocks = varFutureBlocks
203205
, cdbLoE = Args.cdbsLoE cdbSpecificArgs
204-
, cdbLastTimeStarved = varLastTimeStarved
206+
, cdbChainSelStarvation = varChainSelStarvation
205207
}
206208
h <- fmap CDBHandle $ newTVarIO $ ChainDbOpen env
207209
let chainDB = API.ChainDB
@@ -219,7 +221,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
219221
, stream = Iterator.stream h
220222
, newFollower = Follower.newFollower h
221223
, getIsInvalidBlock = getEnvSTM h Query.getIsInvalidBlock
222-
, getLastTimeStarved = getEnvSTM h Query.getLastTimeStarved
224+
, getChainSelStarvation = getEnvSTM h Query.getChainSelStarvation
223225
, closeDB = closeDB h
224226
, isOpen = isOpen h
225227
}

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
522522
-- exception (or it errored), notify the blocked thread
523523
withFuse fuse $
524524
bracketOnError
525-
(lift $ getChainSelMessage (writeTVar cdbLastTimeStarved) cdbChainSelQueue)
525+
(lift $ getChainSelMessage reportChainSelStarvation cdbChainSelQueue)
526526
(\message -> lift $ atomically $ do
527527
case message of
528528
ChainSelReprocessLoEBlocks -> pure ()
@@ -541,3 +541,7 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
541541
trace $ PoppedBlockFromQueue $ FallingEdgeWith $
542542
blockRealPoint blockToAdd
543543
chainSelSync cdb message)
544+
where
545+
reportChainSelStarvation s = do
546+
traceWith cdbTracer $ TraceChainSelStarvation s
547+
atomically $ writeTVar cdbChainSelStarvation s

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Query (
2121
, getAnyBlockComponent
2222
, getAnyKnownBlock
2323
, getAnyKnownBlockComponent
24-
, getLastTimeStarved
24+
, getChainSelStarvation
2525
) where
2626

2727
import qualified Data.Map.Strict as Map
@@ -43,6 +43,8 @@ import Ouroboros.Consensus.Util.STM (WithFingerprint (..))
4343
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
4444
import qualified Ouroboros.Network.AnchoredFragment as AF
4545
import Ouroboros.Network.Block (MaxSlotNo, maxSlotNoFromWithOrigin)
46+
import Ouroboros.Network.BlockFetch.ConsensusInterface
47+
(ChainSelStarvation (..))
4648

4749
-- | Return the last @k@ headers.
4850
--
@@ -149,8 +151,11 @@ getIsInvalidBlock ::
149151
getIsInvalidBlock CDB{..} =
150152
fmap (fmap (fmap invalidBlockReason) . flip Map.lookup) <$> readTVar cdbInvalid
151153

152-
getLastTimeStarved :: forall m blk. IOLike m => ChainDbEnv m blk -> STM m Time
153-
getLastTimeStarved CDB{..} = readTVar cdbLastTimeStarved
154+
getChainSelStarvation ::
155+
forall m blk. IOLike m
156+
=> ChainDbEnv m blk
157+
-> STM m ChainSelStarvation
158+
getChainSelStarvation CDB {..} = readTVar cdbChainSelStarvation
154159

155160
getIsValid ::
156161
forall m blk. (IOLike m, HasHeader blk)

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
6363
, TraceValidationEvent (..)
6464
) where
6565

66-
import Cardano.Prelude (whenM)
66+
import Control.Monad (when)
6767
import Control.Tracer
6868
import Data.Foldable (traverse_)
6969
import Data.Map.Strict (Map)
@@ -108,6 +108,8 @@ import Ouroboros.Consensus.Util.ResourceRegistry
108108
import Ouroboros.Consensus.Util.STM (WithFingerprint)
109109
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
110110
import Ouroboros.Network.Block (MaxSlotNo)
111+
import Ouroboros.Network.BlockFetch.ConsensusInterface
112+
(ChainSelStarvation (..))
111113

112114
-- | All the serialisation related constraints needed by the ChainDB.
113115
class ( ImmutableDbSerialiseConstraints blk
@@ -276,9 +278,9 @@ data ChainDbEnv m blk = CDB
276278
-- switch back to a chain containing it. The fragment is usually anchored at
277279
-- a recent immutable tip; if it does not, it will conservatively be treated
278280
-- as the empty fragment anchored in the current immutable tip.
279-
, cdbLastTimeStarved :: !(StrictTVar m Time)
280-
-- ^ The last time we starved the ChainSel thread. This is used by the
281-
-- BlockFetch decision logic to demote peers.
281+
, cdbChainSelStarvation :: !(StrictTVar m ChainSelStarvation)
282+
-- ^ Information on the last starvation of ChainSel, whether ongoing or
283+
-- ended recently.
282284
} deriving (Generic)
283285

284286
-- | We include @blk@ in 'showTypeOf' because it helps resolving type families
@@ -513,13 +515,21 @@ addReprocessLoEBlocks tracer (ChainSelQueue queue) = do
513515
atomically $ writeTBQueue queue ChainSelReprocessLoEBlocks
514516

515517
-- | Get the oldest message from the 'ChainSelQueue' queue. Can block when the
516-
-- queue is empty; in that case, reports the current time to the given callback.
517-
getChainSelMessage :: IOLike m => (Time -> STM m ()) -> ChainSelQueue m blk -> m (ChainSelMessage m blk)
518-
getChainSelMessage whenEmpty (ChainSelQueue queue) = do
519-
time <- getMonotonicTime
520-
-- NOTE: The two following lines are in different `atomically` on purpose.
521-
atomically $ whenM (isEmptyTBQueue queue) (whenEmpty time)
522-
atomically $ readTBQueue queue
518+
-- queue is empty; in that case, reports the starvation (and its end) to the
519+
-- callback.
520+
getChainSelMessage
521+
:: IOLike m
522+
=> (ChainSelStarvation -> m ())
523+
-> ChainSelQueue m blk
524+
-> m (ChainSelMessage m blk)
525+
getChainSelMessage report (ChainSelQueue queue) = do
526+
-- NOTE: The test of emptiness and the blocking read are in different STM
527+
-- transactions on purpose.
528+
starved <- atomically $ isEmptyTBQueue queue
529+
when starved $ report ChainSelStarvationOngoing
530+
message <- atomically $ readTBQueue queue
531+
when starved $ report =<< ChainSelStarvationEndedAt <$> getMonotonicTime
532+
return message
523533

524534
-- | Flush the 'ChainSelQueue' queue and notify the waiting threads.
525535
--
@@ -552,6 +562,7 @@ data TraceEvent blk
552562
| TraceLedgerReplayEvent (LgrDB.TraceReplayEvent blk)
553563
| TraceImmutableDBEvent (ImmutableDB.TraceEvent blk)
554564
| TraceVolatileDBEvent (VolatileDB.TraceEvent blk)
565+
| TraceChainSelStarvation ChainSelStarvation
555566
deriving (Generic)
556567

557568

0 commit comments

Comments
 (0)