Skip to content

Commit 73da667

Browse files
committed
Track the last time the ChainDB thread was starved
1 parent 48003d5 commit 73da667

File tree

6 files changed

+27
-4
lines changed

6 files changed

+27
-4
lines changed

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ data ChainDbView m blk = ChainDbView {
5252
, getIsFetched :: STM m (Point blk -> Bool)
5353
, getMaxSlotNo :: STM m MaxSlotNo
5454
, addBlockWaitWrittenToDisk :: InvalidBlockPunishment m -> blk -> m Bool
55+
, getLastTimeStarved :: STM m Time
5556
}
5657

5758
defaultChainDbView :: IOLike m => ChainDB m blk -> ChainDbView m blk
@@ -60,6 +61,7 @@ defaultChainDbView chainDB = ChainDbView {
6061
, getIsFetched = ChainDB.getIsFetched chainDB
6162
, getMaxSlotNo = ChainDB.getMaxSlotNo chainDB
6263
, addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB
64+
, getLastTimeStarved = ChainDB.getLastTimeStarved chainDB
6365
}
6466

6567
-- | How to get the wall-clock time of a slot. Note that this is a very
@@ -340,3 +342,5 @@ mkBlockFetchConsensusInterface
340342

341343
headerForgeUTCTime = slotForgeTime . headerRealPoint . unFromConsensus
342344
blockForgeUTCTime = slotForgeTime . blockRealPoint . unFromConsensus
345+
346+
lastChainSelStarvation = getLastTimeStarved chainDB

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,10 @@ data ChainDB m blk = ChainDB {
334334
-- invalid block is detected. These blocks are likely to be valid.
335335
, getIsInvalidBlock :: STM m (WithFingerprint (HeaderHash blk -> Maybe (InvalidBlockReason blk)))
336336

337+
-- | The last time we starved the chainsel thread. this is used by the
338+
-- blockfetch decision logic to demote peers.
339+
, getLastTimeStarved :: STM m Time
340+
337341
, closeDB :: m ()
338342

339343
-- | Return 'True' when the database is open.

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
176176
copyFuse <- newFuse "copy to immutable db"
177177
chainSelFuse <- newFuse "chain selection"
178178
chainSelQueue <- newChainSelQueue (Args.cdbsBlocksToAddSize cdbSpecificArgs)
179+
varLastTimeStarved <- newTVarIO =<< getMonotonicTime
179180

180181
let env = CDB { cdbImmutableDB = immutableDB
181182
, cdbVolatileDB = volatileDB
@@ -200,6 +201,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
200201
, cdbChainSelQueue = chainSelQueue
201202
, cdbFutureBlocks = varFutureBlocks
202203
, cdbLoE = Args.cdbsLoE cdbSpecificArgs
204+
, cdbLastTimeStarved = varLastTimeStarved
203205
}
204206
h <- fmap CDBHandle $ newTVarIO $ ChainDbOpen env
205207
let chainDB = API.ChainDB
@@ -217,6 +219,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
217219
, stream = Iterator.stream h
218220
, newFollower = Follower.newFollower h
219221
, getIsInvalidBlock = getEnvSTM h Query.getIsInvalidBlock
222+
, getLastTimeStarved = getEnvSTM h Query.getLastTimeStarved
220223
, closeDB = closeDB h
221224
, isOpen = isOpen h
222225
}

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
522522
-- exception (or it errored), notify the blocked thread
523523
withFuse fuse $
524524
bracketOnError
525-
(lift $ getChainSelMessage cdbChainSelQueue)
525+
(lift $ getChainSelMessage (writeTVar cdbLastTimeStarved) cdbChainSelQueue)
526526
(\message -> lift $ atomically $ do
527527
case message of
528528
ChainSelReprocessLoEBlocks -> pure ()

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Query (
2121
, getAnyBlockComponent
2222
, getAnyKnownBlock
2323
, getAnyKnownBlockComponent
24+
, getLastTimeStarved
2425
) where
2526

2627
import qualified Data.Map.Strict as Map
@@ -148,6 +149,9 @@ getIsInvalidBlock ::
148149
getIsInvalidBlock CDB{..} =
149150
fmap (fmap (fmap invalidBlockReason) . flip Map.lookup) <$> readTVar cdbInvalid
150151

152+
getLastTimeStarved :: forall m blk. IOLike m => ChainDbEnv m blk -> STM m Time
153+
getLastTimeStarved CDB{..} = readTVar cdbLastTimeStarved
154+
151155
getIsValid ::
152156
forall m blk. (IOLike m, HasHeader blk)
153157
=> ChainDbEnv m blk

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
6363
, TraceValidationEvent (..)
6464
) where
6565

66+
import Cardano.Prelude (whenM)
6667
import Control.Tracer
6768
import Data.Foldable (traverse_)
6869
import Data.Map.Strict (Map)
@@ -275,6 +276,9 @@ data ChainDbEnv m blk = CDB
275276
-- switch back to a chain containing it. The fragment is usually anchored at
276277
-- a recent immutable tip; if it does not, it will conservatively be treated
277278
-- as the empty fragment anchored in the current immutable tip.
279+
, cdbLastTimeStarved :: !(StrictTVar m Time)
280+
-- ^ The last time we starved the ChainSel thread. This is used by the
281+
-- BlockFetch decision logic to demote peers.
278282
} deriving (Generic)
279283

280284
-- | We include @blk@ in 'showTypeOf' because it helps resolving type families
@@ -509,9 +513,13 @@ addReprocessLoEBlocks tracer (ChainSelQueue queue) = do
509513
atomically $ writeTBQueue queue ChainSelReprocessLoEBlocks
510514

511515
-- | Get the oldest message from the 'ChainSelQueue' queue. Can block when the
512-
-- queue is empty.
513-
getChainSelMessage :: IOLike m => ChainSelQueue m blk -> m (ChainSelMessage m blk)
514-
getChainSelMessage (ChainSelQueue queue) = atomically $ readTBQueue queue
516+
-- queue is empty; in that case, reports the current time to the given callback.
517+
getChainSelMessage :: IOLike m => (Time -> STM m ()) -> ChainSelQueue m blk -> m (ChainSelMessage m blk)
518+
getChainSelMessage whenEmpty (ChainSelQueue queue) = do
519+
time <- getMonotonicTime
520+
-- NOTE: The two following lines are in different `atomically` on purpose.
521+
atomically $ whenM (isEmptyTBQueue queue) (whenEmpty time)
522+
atomically $ readTBQueue queue
515523

516524
-- | Flush the 'ChainSelQueue' queue and notify the waiting threads.
517525
--

0 commit comments

Comments
 (0)