Skip to content

Create database table for blob data sidecar quarantine #7108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions AllTests-mainnet.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ AllTests-mainnet
```
## BlobQuarantine data structure test suite [Preset: mainnet]
```diff
+ database and memory overfill protection and pruning test OK
+ database unload/load test OK
+ overfill protection test OK
+ popSidecars()/hasSidecars() return []/true on block without blobs OK
+ pruneAfterFinalization() test OK
Expand Down Expand Up @@ -159,6 +161,8 @@ AllTests-mainnet
## ColumnQuarantine data structure test suite [Preset: mainnet]
```diff
+ ColumnMap test OK
+ database and memory overfill protection and pruning test OK
+ database unload/load test OK
+ overfill protection test OK
+ popSidecars()/hasSidecars() return []/true on block without columns OK
+ pruneAfterFinalization() test OK
Expand Down Expand Up @@ -826,6 +830,11 @@ AllTests-mainnet
```diff
+ prune states OK
```
## Quarantine [Preset: mainnet]
```diff
+ put/iterate/remove test [BlobSidecars] OK
+ put/iterate/remove test [DataColumnSidecar] OK
```
## REST JSON encoding and decoding
```diff
+ Blob OK
Expand Down
36 changes: 15 additions & 21 deletions beacon_chain/beacon_chain_db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import
forks,
presets,
state_transition],
"."/[beacon_chain_db_light_client, filepath]
"."/[beacon_chain_db_light_client,
beacon_chain_db_quarantine,
db_utils,
filepath]

from ./spec/datatypes/capella import BeaconState
from ./spec/datatypes/deneb import TrustedSignedBeaconBlock
Expand Down Expand Up @@ -152,6 +155,10 @@ type
##
## See `summaries` for an index in the other direction.

quarantine: QuarantineDB
## Pending data that passed basic checks including proposer signature
## but that is not fully validated / trusted yet.

lcData: LightClientDataDB
## Persistent light client data to avoid expensive recomputations

Expand Down Expand Up @@ -592,6 +599,8 @@ proc new*(T: type BeaconChainDB,
if cfg.FULU_FORK_EPOCH != FAR_FUTURE_EPOCH:
columns = kvStore db.openKvStore("fulu_columns").expectDb()

let quarantine = db.initQuarantineDB().expectDb()

# Versions prior to 1.4.0 (altair) stored validators in `immutable_validators`
# which stores validator keys in compressed format - this is
# slow to load and has been superceded by `immutable_validators2` which uses
Expand Down Expand Up @@ -633,6 +642,7 @@ proc new*(T: type BeaconChainDB,
stateDiffs: stateDiffs,
summaries: summaries,
finalizedBlocks: finalizedBlocks,
quarantine: quarantine,
lcData: lcData
)

Expand All @@ -656,6 +666,9 @@ proc new*(T: type BeaconChainDB,
dir, "nbc", readOnly = readOnly, manualCheckpoint = true).expectDb()
BeaconChainDB.new(db, cfg)

template getQuarantineDB*(db: BeaconChainDB): QuarantineDB =
db.quarantine

template getLightClientDataDB*(db: BeaconChainDB): LightClientDataDB =
db.lcData

Expand All @@ -682,18 +695,6 @@ proc decodeSnappySSZ[T](data: openArray[byte], output: var T): bool =
err = e.msg, typ = name(T), dataLen = data.len
false

proc decodeSZSSZ[T](data: openArray[byte], output: var T): bool =
try:
let decompressed = decodeFramed(data, checkIntegrity = false)
readSszBytes(decompressed, output, updateRoot = false)
true
except CatchableError as e:
# If the data can't be deserialized, it could be because it's from a
# version of the software that uses a different SSZ encoding
warn "Unable to deserialize data, old database?",
err = e.msg, typ = name(T), dataLen = data.len
false

func encodeSSZ*(v: auto): seq[byte] =
try:
SSZ.encode(v)
Expand All @@ -707,14 +708,6 @@ func encodeSnappySSZ(v: auto): seq[byte] =
# In-memory encode shouldn't fail!
raiseAssert err.msg

func encodeSZSSZ(v: auto): seq[byte] =
# https://github.com/google/snappy/blob/main/framing_format.txt
try:
encodeFramed(SSZ.encode(v))
except CatchableError as err:
# In-memory encode shouldn't fail!
raiseAssert err.msg

proc getRaw(db: KvStoreRef, key: openArray[byte], T: type Eth2Digest): Opt[T] =
var res: Opt[T]
proc decode(data: openArray[byte]) =
Expand Down Expand Up @@ -795,6 +788,7 @@ proc close*(db: BeaconChainDB) =
if db.db == nil: return

# Close things roughly in reverse order
db.quarantine.close()
if not isNil(db.columns):
discard db.columns.close()
if not isNil(db.blobs):
Expand Down
7 changes: 1 addition & 6 deletions beacon_chain/beacon_chain_db_light_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import
# Beacon chain internals
spec/datatypes/altair,
spec/[eth2_ssz_serialization, helpers],
./db_limits
./db_utils

logScope: topics = "lcdata"

Expand Down Expand Up @@ -172,11 +172,6 @@ type
## Tracks the finalized sync committee periods for which complete data
## has been imported (from `dag.tail.slot`).

template disposeSafe(s: untyped): untyped =
if distinctBase(s) != nil:
s.dispose()
s = typeof(s)(nil)

proc initHeadersStore(
backend: SqStoreRef,
name, typeName: string): KvResult[LightClientHeaderStore] =
Expand Down
211 changes: 211 additions & 0 deletions beacon_chain/beacon_chain_db_quarantine.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
# beacon_chain
# Copyright (c) 2022-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

import
# Status libraries
chronicles,
eth/db/kvstore_sqlite3,
# Beacon chain internals
spec/helpers,
./db_utils

# Without this export compilation fails with error
# vendor\nim-chronicles\chronicles.nim(352, 21) Error: undeclared identifier: 'activeChroniclesStream'
# It actually does not needed, because chronicles is not used in this file,
# but because decodeSZSSZ() generic and uses chronicles generic expansion
# introduces an issue.
export chronicles

logScope: topics = "qudata"

type
ForkyDataSidecar* = deneb.BlobSidecar | fulu.DataColumnSidecar

DataSidecarStore = object
getStmt: SqliteStmt[array[32, byte], seq[byte]]
putStmt: SqliteStmt[(array[32, byte], seq[byte]), void]
delStmt: SqliteStmt[array[32, byte], void]
countStmt: SqliteStmt[NoParams, int64]

QuarantineDB* = ref object
backend: SqStoreRef
## SQLite backend

electraDataSidecar: DataSidecarStore
## Proposer signature verified data blob sidecars.
fuluDataSidecar: DataSidecarStore
## Proposer signature verified data column sidecars.

template tableName(sidecar: typedesc[ForkyDataSidecar]): string =
when sidecar is deneb.BlobSidecar:
"electra_sidecars_quarantine"
else:
"fulu_sidecars_quarantine"

proc initDataSidecarStore(
backend: SqStoreRef,
name: string
): KvResult[DataSidecarStore] =
if name == "":
return ok(DataSidecarStore())

if not(backend.readOnly):
? backend.exec("BEGIN TRANSACTION;")
? backend.exec("DROP INDEX IF EXISTS `" & name & "_iblock_root`;")
? backend.exec("DROP TABLE IF EXISTS `" & name & "`;")
? backend.exec("""
CREATE TABLE IF NOT EXISTS `""" & name & """` (
`block_root` BLOB, -- `Eth2Digest`
`data_sidecar` BLOB -- `DataSidecar` (SZSSZ)
);
""")
? backend.exec("""
CREATE INDEX IF NOT EXISTS `""" & name & """_iblock_root`
ON `""" & name & """`(block_root);
""")
? backend.exec("COMMIT;")

if not ? backend.hasTable(name):
return ok(DataSidecarStore())

let
getStmt = backend.prepareStmt("""
SELECT `data_sidecar` FROM `""" & name & """`
WHERE `block_root` = ?;
""", array[32, byte], (seq[byte]), managed = false)
.expect("SQL query OK")
putStmt = backend.prepareStmt("""
INSERT INTO `""" & name & """` (
`block_root`, `data_sidecar`
) VALUES (?, ?);
""", (array[32, byte], seq[byte]), void, managed = false).expect("SQL query OK")
delStmt = backend.prepareStmt("""
DELETE FROM `""" & name & """` WHERE `block_root` == ?;
""", array[32, byte], void, managed = false).expect("SQL query OK")
countStmt = backend.prepareStmt("""
SELECT COUNT(1) FROM `""" & name & """`;
""", NoParams, int64, managed = false).expect("SQL query OK")

ok(DataSidecarStore(
getStmt: getStmt,
putStmt: putStmt,
delStmt: delStmt,
countStmt: countStmt
))

func close(store: var DataSidecarStore) =
if not(isNil(distinctBase(store.getStmt))): store.getStmt.disposeSafe()
if not(isNil(distinctBase(store.putStmt))): store.putStmt.disposeSafe()
if not(isNil(distinctBase(store.delStmt))): store.delStmt.disposeSafe()
if not(isNil(distinctBase(store.countStmt))): store.countStmt.disposeSafe()

iterator sidecars*(
db: QuarantineDB,
T: typedesc[ForkyDataSidecar],
blockRoot: Eth2Digest
): T =
when T is deneb.BlobSidecar:
template statement: untyped =
db.electraDataSidecar.getStmt
template storeName: untyped =
"electraDataSidecar"
else:
template statement: untyped =
db.fuluDataSidecar.getStmt
template storeName: untyped =
"fuluDataSidecar"

if not(isNil(distinctBase(statement))):
var row: statement.Result
for rowRes in statement.exec(blockRoot.data, row):
rowRes.expect("SQL query OK")
var res: T
if not(decodeSZSSZ(row, res)):
error "Quarantine store corrupted", store = storeName,
blockRoot
break
yield res

proc putDataSidecars*[T: ForkyDataSidecar](
db: QuarantineDB,
blockRoot: Eth2Digest,
dataSidecars: openArray[ref T]
) =
doAssert(not(db.backend.readOnly))

when T is deneb.BlobSidecar:
template statement: untyped =
db.electraDataSidecar.putStmt
else:
template statement: untyped =
db.fuluDataSidecar.putStmt

if not(isNil(distinctBase(statement))):
db.backend.exec("BEGIN TRANSACTION;").expect("SQL query OK")
for sidecar in dataSidecars:
let blob = encodeSZSSZ(sidecar[])
statement.exec((blockRoot.data, blob)).
expect("SQL query OK")
db.backend.exec("COMMIT;").expect("SQL query OK")

proc removeDataSidecars*(
db: QuarantineDB,
T: typedesc[ForkyDataSidecar],
blockRoot: Eth2Digest
) =
doAssert not(db.backend.readOnly)

when T is deneb.BlobSidecar:
template statement: untyped =
db.electraDataSidecar.delStmt
else:
template statement: untyped =
db.fuluDataSidecar.delStmt

if not(isNil(distinctBase(statement))):
statement.exec(blockRoot.data).expect("SQL query OK")

proc sidecarsCount*(
db: QuarantineDB,
T: typedesc[ForkyDataSidecar],
): int64 =
var recordCount = 0'i64

when T is deneb.BlobSidecar:
template statement: untyped =
db.electraDataSidecar.countStmt
else:
template statement: untyped =
db.fuluDataSidecar.countStmt

if not(isNil(distinctBase(statement))):
discard statement.exec do (res: int64):
recordCount = res
recordCount

proc initQuarantineDB*(
backend: SqStoreRef,
): KvResult[QuarantineDB] =
let
electraDataSidecar =
? backend.initDataSidecarStore(tableName(deneb.BlobSidecar))
fuluDataSidecar =
? backend.initDataSidecarStore(tableName(fulu.DataColumnSidecar))

ok QuarantineDB(
backend: backend,
electraDataSidecar: electraDataSidecar,
fuluDataSidecar: fuluDataSidecar
)

proc close*(db: QuarantineDB) =
if not(isNil(db.backend)):
db.electraDataSidecar.close()
db.fuluDataSidecar.close()
db[].reset()
Loading
Loading