Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate and compare CRC when writing and reading ledger snapshots #1319

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
### Breaking

- When writing a ledger state snapshot to disk, calculate the state's CRC checksum and write it to a separate file, which is named the same as the snapshot file, plus the `.checksum` extension.
- When reading a snapshot file, calculate its checksum and compare it to the value in the corresponding `.checksum` file. Return an error if either the checksum file does not exist or the checksum is different or invalid.
geo2a marked this conversation as resolved.
Show resolved Hide resolved
- To support the previous item, change the error type of the `readSnapshot` from `ReadIncrementalErr` to the extended `ReadSnaphotErr`.

- Make `Ouroboros.Consensus.Util.CBOR.readIncremental` compute the checksum of the data as it is read.

### Non-breaking

- Introduce an explicit `Ord` instance for `DiskSnapshot` that compares the values on `dsNumber`.

Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
Expand Down Expand Up @@ -37,10 +39,15 @@ import Codec.Serialise.Encoding (Encoding)
import Control.Monad (forM, void)
import Control.Monad.Except (ExceptT (..))
import Control.Tracer
import Data.Bits
import qualified Data.ByteString.Builder as BS
import qualified Data.ByteString.Char8 as BSC
import qualified Data.ByteString.Lazy as BSL
import Data.Char (ord)
import Data.Functor.Contravariant ((>$<))
import qualified Data.List as List
import Data.Maybe (isJust, mapMaybe)
import Data.Ord (Down (..))
import Data.Ord (Down (..), comparing)
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Word
Expand All @@ -56,6 +63,7 @@ import Ouroboros.Consensus.Util.Enclose
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.Versioned
import System.FS.API.Lazy
import System.FS.CRC (CRC (..), hPutAllCRC)
import Text.Read (readMaybe)

{-------------------------------------------------------------------------------
Expand All @@ -66,7 +74,7 @@ data SnapshotFailure blk =
-- | We failed to deserialise the snapshot
--
-- This can happen due to data corruption in the ledger DB.
InitFailureRead ReadIncrementalErr
InitFailureRead ReadSnapshotErr

-- | This snapshot is too recent (ahead of the tip of the chain)
| InitFailureTooRecent (RealPoint blk)
Expand Down Expand Up @@ -149,6 +157,9 @@ trimSnapshots tracer hasFS DiskPolicy{..} = do
Internal: reading from disk
-------------------------------------------------------------------------------}

-- | Name of a disk snapshot.
--
-- The snapshot itself does not have to exist.
data DiskSnapshot = DiskSnapshot {
-- | Snapshots are numbered. We will try the snapshots with the highest
-- number first.
Expand All @@ -169,7 +180,10 @@ data DiskSnapshot = DiskSnapshot {
-- /not be trimmed/.
, dsSuffix :: Maybe String
}
deriving (Show, Eq, Ord, Generic)
deriving (Show, Eq, Generic)

instance Ord DiskSnapshot where
compare = comparing dsNumber

-- | Named snapshot are permanent, they will never be deleted when trimming.
diskSnapshotIsPermanent :: DiskSnapshot -> Bool
Expand All @@ -180,32 +194,98 @@ diskSnapshotIsPermanent = isJust . dsSuffix
diskSnapshotIsTemporary :: DiskSnapshot -> Bool
diskSnapshotIsTemporary = not . diskSnapshotIsPermanent

-- | Read snapshot from disk
data ReadSnapshotErr =
-- | Error while de-serialising data
ReadSnaphotFailed ReadIncrementalErr
-- | Checksum of read snapshot differs from the one tracked by
-- the corresponding '.checksum' file
| ReadSnaphotDataCorruption
-- | A '.checksum' file does not exist for a @'DiskSnapshot'@
| ReadSnapshotNoChecksumFile FsPath
-- | A '.checksum' file exists for a @'DiskSnapshot'@, but its contents is invalid
| ReadSnapshotInvalidChecksumFile FsPath
deriving (Eq, Show)

-- | Read snapshot from disk.
--
-- Fail on data corruption, i.e. when the checksum of the read data differs
-- from the one tracked by @'DiskSnapshot'@.
readSnapshot ::
forall m blk. IOLike m
=> SomeHasFS m
-> (forall s. Decoder s (ExtLedgerState blk))
-> (forall s. Decoder s (HeaderHash blk))
-> DiskSnapshot
-> ExceptT ReadIncrementalErr m (ExtLedgerState blk)
readSnapshot hasFS decLedger decHash =
ExceptT
. readIncremental hasFS decoder
. snapshotToPath
-> ExceptT ReadSnapshotErr m (ExtLedgerState blk)
readSnapshot someHasFS decLedger decHash snapshotName = do
!snapshotCRC <- readCRC someHasFS (snapshotToChecksumPath snapshotName)
ExceptT $
readIncremental someHasFS decoder (snapshotToPath snapshotName) >>= \case
Left e -> pure $ Left (ReadSnaphotFailed e)
Right (ledgerState, checksumAsRead) ->
if checksumAsRead /= snapshotCRC
then pure $ Left ReadSnaphotDataCorruption
else pure (Right ledgerState)
geo2a marked this conversation as resolved.
Show resolved Hide resolved
where
decoder :: Decoder s (ExtLedgerState blk)
decoder = decodeSnapshotBackwardsCompatible (Proxy @blk) decLedger decHash

-- | Write snapshot to disk
readCRC ::
SomeHasFS m
-> FsPath
-> ExceptT ReadSnapshotErr m CRC
readCRC (SomeHasFS hasFS) crcPath = ExceptT $ do
crcExists <- doesFileExist hasFS crcPath
if not crcExists
then pure (Left $ ReadSnapshotNoChecksumFile crcPath)
else do
withFile hasFS crcPath ReadMode $ \h -> do
str <- BSL.toStrict <$> hGetAll hasFS h
if not (BSC.length str == 8 && BSC.all isHexDigit str)
then pure (Left $ ReadSnapshotInvalidChecksumFile crcPath)
else pure . Right . CRC $ fromIntegral (hexdigitsToInt str)
-- TODO: remove the functions in the where clause when we start depending on lsm-tree
where
isHexDigit :: Char -> Bool
isHexDigit c = (c >= '0' && c <= '9')
|| (c >= 'a' && c <= 'f') --lower case only

-- Precondition: BSC.all isHexDigit
hexdigitsToInt :: BSC.ByteString -> Word
hexdigitsToInt =
BSC.foldl' accumdigit 0
where
accumdigit :: Word -> Char -> Word
accumdigit !a !c =
(a `shiftL` 4) .|. hexdigitToWord c


-- Precondition: isHexDigit
hexdigitToWord :: Char -> Word
hexdigitToWord c
| let !dec = fromIntegral (ord c - ord '0')
, dec <= 9 = dec

| let !hex = fromIntegral (ord c - ord 'a' + 10)
, otherwise = hex

-- | Write a ledger state snapshot to disk
--
-- This function writes two files:
-- * the snapshot file itself, with the name generated by @'snapshotToPath'@
-- * the checksum file, with the name generated by @'snapshotToChecksumPath'@
writeSnapshot ::
forall m blk. MonadThrow m
=> SomeHasFS m
-> (ExtLedgerState blk -> Encoding)
-> DiskSnapshot
-> ExtLedgerState blk -> m ()
writeSnapshot (SomeHasFS hasFS) encLedger ss cs = do
withFile hasFS (snapshotToPath ss) (WriteMode MustBeNew) $ \h ->
void $ hPut hasFS h $ CBOR.toBuilder (encode cs)
!crc <- withFile hasFS (snapshotToPath ss) (WriteMode MustBeNew) $ \h -> do
(_, !crc) <- hPutAllCRC hasFS h $ CBOR.toLazyByteString (encode cs)
geo2a marked this conversation as resolved.
Show resolved Hide resolved
pure crc
geo2a marked this conversation as resolved.
Show resolved Hide resolved
withFile hasFS (snapshotToChecksumPath ss) (WriteMode AllowExisting) $ \h -> do
void $ hPutAll hasFS h . BS.toLazyByteString . BS.word32HexFixed $ getCRC crc
geo2a marked this conversation as resolved.
Show resolved Hide resolved
where
encode :: ExtLedgerState blk -> Encoding
encode = encodeSnapshot encLedger
Expand All @@ -217,10 +297,13 @@ deleteSnapshot (SomeHasFS HasFS{..}) = removeFile . snapshotToPath
-- | List on-disk snapshots, highest number first.
listSnapshots :: Monad m => SomeHasFS m -> m [DiskSnapshot]
listSnapshots (SomeHasFS HasFS{..}) =
aux <$> listDirectory (mkFsPath [])
aux <$> listDirectory (mkFsPath [])
geo2a marked this conversation as resolved.
Show resolved Hide resolved
where
aux :: Set String -> [DiskSnapshot]
aux = List.sortOn (Down . dsNumber) . mapMaybe snapshotFromPath . Set.toList
aux = List.sortOn Down . mapMaybe snapshotFromPath . Set.toList

snapshotToChecksumFileName :: DiskSnapshot -> String
snapshotToChecksumFileName = (<> ".checksum") . snapshotToFileName
jasagredo marked this conversation as resolved.
Show resolved Hide resolved

snapshotToFileName :: DiskSnapshot -> String
snapshotToFileName DiskSnapshot { dsNumber, dsSuffix } =
Expand All @@ -230,6 +313,9 @@ snapshotToFileName DiskSnapshot { dsNumber, dsSuffix } =
Nothing -> ""
Just s -> "_" <> s

snapshotToChecksumPath :: DiskSnapshot -> FsPath
snapshotToChecksumPath = mkFsPath . (:[]) . snapshotToChecksumFileName

snapshotToPath :: DiskSnapshot -> FsPath
snapshotToPath = mkFsPath . (:[]) . snapshotToFileName

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import qualified Streaming as S
import qualified Streaming.Prelude as S
import Streaming.Prelude (Of (..), Stream)
import System.FS.API
import System.FS.CRC (CRC (..), initCRC, updateCRC)

{-------------------------------------------------------------------------------
Incremental parsing in I/O
Expand Down Expand Up @@ -186,24 +187,25 @@ readIncremental :: forall m a. IOLike m
=> SomeHasFS m
-> CBOR.D.Decoder (U.PrimState m) a
-> FsPath
-> m (Either ReadIncrementalErr a)
-> m (Either ReadIncrementalErr (a, CRC))
readIncremental = \(SomeHasFS hasFS) decoder fp -> do
withFile hasFS fp ReadMode $ \h ->
go hasFS h =<< U.stToIO (CBOR.R.deserialiseIncremental decoder)
go hasFS h initCRC =<< U.stToIO (CBOR.R.deserialiseIncremental decoder)
where
go :: HasFS m h
-> Handle h
-> CRC
-> CBOR.R.IDecode (U.PrimState m) a
-> m (Either ReadIncrementalErr a)
go hasFS@HasFS{..} h (CBOR.R.Partial k) = do
-> m (Either ReadIncrementalErr (a, CRC))
go hasFS@HasFS{..} h !checksum (CBOR.R.Partial k) = do
bs <- hGetSome h (fromIntegral defaultChunkSize)
dec' <- U.stToIO $ k (checkEmpty bs)
go hasFS h dec'
go _ _ (CBOR.R.Done leftover _ a) =
go hasFS h (updateCRC bs checksum) dec'
go _ _ !checksum (CBOR.R.Done leftover _ a) =
return $ if BS.null leftover
then Right a
then Right (a, checksum)
else Left $ TrailingBytes leftover
go _ _ (CBOR.R.Fail _ _ err) =
go _ _ _ (CBOR.R.Fail _ _ err) =
return $ Left $ ReadFailed err

checkEmpty :: ByteString -> Maybe ByteString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import Ouroboros.Network.Mock.Chain
import Ouroboros.Network.Mock.ProducerState
import Ouroboros.Network.Point
import System.FS.API
import System.FS.CRC (CRC (..))
import Test.Cardano.Slotting.TreeDiff ()
import Test.Util.ToExpr ()

Expand Down Expand Up @@ -65,6 +66,7 @@ instance ( ToExpr (TipInfo blk)
) => ToExpr (AnnTip blk)

instance ToExpr SecurityParam
instance ToExpr CRC
instance ToExpr DiskSnapshot

instance ToExpr ChunkSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type TestBlock = TestBlockWith Tx
data Tx = Tx {
-- | Input that the transaction consumes.
consumed :: Token
-- | Ouptupt that the transaction produces.
-- | Output that the transaction produces.
, produced :: (Token, TValue)
}
deriving stock (Show, Eq, Ord, Generic)
Expand Down
Loading