Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chrismainwjb #18999

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
7 changes: 6 additions & 1 deletion chia/consensus/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import dataclasses
import enum
import inspect
import logging
import traceback
from concurrent.futures import Executor, ThreadPoolExecutor
Expand Down Expand Up @@ -399,7 +400,9 @@ async def add_block(

try:
# Always add the block to the database
async with self.block_store.db_wrapper.writer():
async with self.block_store.db_wrapper.writer_maybe_transaction() as conn:
frame = inspect.currentframe().f_back
log.info(f"BEGIN WJB task {asyncio.current_task().get_name()} add_block writer_maybe_transaction {conn} {inspect.getframeinfo(frame).filename} {frame.f_lineno}")
# Perform the DB operations to update the state, and rollback if something goes wrong
await self.block_store.add_full_block(header_hash, block, block_record)
records, state_change_summary = await self._reconsider_peak(block_record, genesis, fork_info)
Expand All @@ -408,6 +411,8 @@ async def add_block(
# This is done after all async/DB operations, so there is a decreased chance of failure.
self.add_block_record(block_record)

log.info(f"END WJB task {asyncio.current_task().get_name()} add_block writer_maybe_transaction {conn} {inspect.getframeinfo(frame).filename} {frame.f_lineno}")

# there's a suspension point here, as we leave the async context
# manager

Expand Down
99 changes: 75 additions & 24 deletions chia/full_node/coin_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import inspect
import asyncio
import dataclasses
import logging
import sqlite3
Expand All @@ -19,7 +21,6 @@
from chia.util.batches import to_batches
from chia.util.db_wrapper import SQLITE_MAX_VARIABLE_NUMBER, DBWrapper2
from chia.util.ints import uint32, uint64
from chia.util.lru_cache import LRUCache

log = logging.getLogger(__name__)

Expand All @@ -32,13 +33,14 @@ class CoinStore:
"""

db_wrapper: DBWrapper2
coins_added_at_height_cache: LRUCache[uint32, list[CoinRecord]]

@classmethod
async def create(cls, db_wrapper: DBWrapper2) -> CoinStore:
if db_wrapper.db_version != 2:
raise RuntimeError(f"CoinStore does not support database schema v{db_wrapper.db_version}")
self = CoinStore(db_wrapper, LRUCache(100))
raise RuntimeError(
f"CoinStore does not support database schema v{db_wrapper.db_version}"
)
self = CoinStore(db_wrapper)

async with self.db_wrapper.writer_maybe_transaction() as conn:
log.info("DB: Creating coin store tables and indexes.")
Expand All @@ -58,22 +60,32 @@ async def create(cls, db_wrapper: DBWrapper2) -> CoinStore:

# Useful for reorg lookups
log.info("DB: Creating index coin_confirmed_index")
await conn.execute("CREATE INDEX IF NOT EXISTS coin_confirmed_index on coin_record(confirmed_index)")
await conn.execute(
"CREATE INDEX IF NOT EXISTS coin_confirmed_index on coin_record(confirmed_index)"
)

log.info("DB: Creating index coin_spent_index")
await conn.execute("CREATE INDEX IF NOT EXISTS coin_spent_index on coin_record(spent_index)")
await conn.execute(
"CREATE INDEX IF NOT EXISTS coin_spent_index on coin_record(spent_index)"
)

log.info("DB: Creating index coin_puzzle_hash")
await conn.execute("CREATE INDEX IF NOT EXISTS coin_puzzle_hash on coin_record(puzzle_hash)")
await conn.execute(
"CREATE INDEX IF NOT EXISTS coin_puzzle_hash on coin_record(puzzle_hash)"
)

log.info("DB: Creating index coin_parent_index")
await conn.execute("CREATE INDEX IF NOT EXISTS coin_parent_index on coin_record(coin_parent)")
await conn.execute(
"CREATE INDEX IF NOT EXISTS coin_parent_index on coin_record(coin_parent)"
)

return self

async def num_unspent(self) -> int:
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute("SELECT COUNT(*) FROM coin_record WHERE spent_index=0") as cursor:
async with conn.execute(
"SELECT COUNT(*) FROM coin_record WHERE spent_index=0"
) as cursor:
row = await cursor.fetchone()
if row is not None:
count: int = row[0]
Expand Down Expand Up @@ -156,6 +168,18 @@ async def get_coin_records(self, names: Collection[bytes32]) -> list[CoinRecord]
coins: list[CoinRecord] = []

async with self.db_wrapper.reader_no_transaction() as conn:
log.info(
f"BEGIN WJB task {asyncio.current_task().get_name()} get_coin_records reader_no_transaction {conn}"
)
frame = inspect.currentframe().f_back
log.info(f"WJB 1 {inspect.getframeinfo(frame).filename} {frame.f_lineno} {conn}")
frame = frame.f_back
log.info(f"WJB 2 {inspect.getframeinfo(frame).filename} {frame.f_lineno} {conn}")
frame = frame.f_back
log.info(f"WJB 3 {inspect.getframeinfo(frame).filename} {frame.f_lineno} {conn}")
frame = frame.f_back
log.info(f"WJB 4 {inspect.getframeinfo(frame).filename} {frame.f_lineno} {conn}")

cursors: list[Cursor] = []
for batch in to_batches(names, SQLITE_MAX_VARIABLE_NUMBER):
names_db: tuple[Any, ...] = tuple(batch.entries)
Expand All @@ -173,14 +197,12 @@ async def get_coin_records(self, names: Collection[bytes32]) -> list[CoinRecord]
coin = self.row_to_coin(row)
record = CoinRecord(coin, row[0], row[1], row[2], row[6])
coins.append(record)

log.info(
f"END WJB task {asyncio.current_task().get_name()} get_coin_records reader_no_transaction {conn} {inspect.getframeinfo(frame).filename} {frame.f_lineno}"
)
return coins

async def get_coins_added_at_height(self, height: uint32) -> list[CoinRecord]:
coins_added: Optional[list[CoinRecord]] = self.coins_added_at_height_cache.get(height)
if coins_added is not None:
return coins_added

async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"SELECT confirmed_index, spent_index, coinbase, puzzle_hash, "
Expand All @@ -192,7 +214,6 @@ async def get_coins_added_at_height(self, height: uint32) -> list[CoinRecord]:
for row in rows:
coin = self.row_to_coin(row)
coins.append(CoinRecord(coin, row[0], row[1], row[2], row[6]))
self.coins_added_at_height_cache.put(height, coins)
return coins

async def get_coins_removed_at_height(self, height: uint32) -> list[CoinRecord]:
Expand Down Expand Up @@ -506,7 +527,11 @@ async def batch_coin_states_by_puzzle_hashes(
coin_states = list(coin_states_dict.values())

if include_hinted:
coin_states.sort(key=lambda cr: max(cr.created_height or uint32(0), cr.spent_height or uint32(0)))
coin_states.sort(
key=lambda cr: max(
cr.created_height or uint32(0), cr.spent_height or uint32(0)
)
)
while len(coin_states) > max_items + 1:
coin_states.pop()

Expand All @@ -517,13 +542,20 @@ async def batch_coin_states_by_puzzle_hashes(

# The last item is the start of the next batch of coin states.
next_coin_state = coin_states.pop()
next_height = uint32(max(next_coin_state.created_height or 0, next_coin_state.spent_height or 0))
next_height = uint32(
max(next_coin_state.created_height or 0, next_coin_state.spent_height or 0)
)

# In order to prevent blocks from being split up between batches, remove
# all coin states whose max height is the same as the last coin state's height.
while len(coin_states) > 0:
last_coin_state = coin_states[-1]
height = uint32(max(last_coin_state.created_height or 0, last_coin_state.spent_height or 0))
height = uint32(
max(
last_coin_state.created_height or 0,
last_coin_state.spent_height or 0,
)
)
if height != next_height:
break

Expand Down Expand Up @@ -551,7 +583,9 @@ async def rollback_to_block(self, block_index: int) -> list[CoinRecord]:
coin_changes[record.name] = record

# Delete reverted blocks from storage
await conn.execute("DELETE FROM coin_record WHERE confirmed_index>?", (block_index,))
await conn.execute(
"DELETE FROM coin_record WHERE confirmed_index>?", (block_index,)
)

# Add coins that are confirmed in the reverted blocks to the list of changed coins.
async with conn.execute(
Expand All @@ -565,8 +599,10 @@ async def rollback_to_block(self, block_index: int) -> list[CoinRecord]:
if record.name not in coin_changes:
coin_changes[record.name] = record

await conn.execute("UPDATE coin_record SET spent_index=0 WHERE spent_index>?", (block_index,))
self.coins_added_at_height_cache = LRUCache(self.coins_added_at_height_cache.capacity)
await conn.execute(
"UPDATE coin_record SET spent_index=0 WHERE spent_index>?",
(block_index,),
)
return list(coin_changes.values())

# Store CoinRecord in DB
Expand All @@ -587,10 +623,17 @@ async def _add_coin_records(self, records: list[CoinRecord]) -> None:
)
if len(values2) > 0:
async with self.db_wrapper.writer_maybe_transaction() as conn:
frame = inspect.currentframe().f_back
log.info(
f"BEGIN WJB task {asyncio.current_task().get_name()} _add_coin_records writer_maybe_transaction {conn} {inspect.getframeinfo(frame).filename} {frame.f_lineno}"
)
await conn.executemany(
"INSERT INTO coin_record VALUES(?, ?, ?, ?, ?, ?, ?, ?)",
values2,
)
log.info(
f"END WJB task {asyncio.current_task().get_name()} _add_coin_records writer_maybe_transaction {conn} {inspect.getframeinfo(frame).filename} {frame.f_lineno}"
)

# Update coin_record to be spent in DB
async def _set_spent(self, coin_names: list[bytes32], index: uint32) -> None:
Expand All @@ -617,7 +660,9 @@ async def _set_spent(self, coin_names: list[bytes32], index: uint32) -> None:
)

# Lookup the most recent unspent lineage that matches a puzzle hash
async def get_unspent_lineage_info_for_puzzle_hash(self, puzzle_hash: bytes32) -> Optional[UnspentLineageInfo]:
async def get_unspent_lineage_info_for_puzzle_hash(
self, puzzle_hash: bytes32
) -> Optional[UnspentLineageInfo]:
async with self.db_wrapper.reader_no_transaction() as conn:
async with conn.execute(
"SELECT unspent.coin_name, "
Expand All @@ -635,9 +680,15 @@ async def get_unspent_lineage_info_for_puzzle_hash(self, puzzle_hash: bytes32) -
) as cursor:
rows = list(await cursor.fetchall())
if len(rows) != 1:
log.debug("Expected 1 unspent with puzzle hash %s, but found %s", puzzle_hash.hex(), len(rows))
log.debug(
"Expected 1 unspent with puzzle hash %s, but found %s",
puzzle_hash.hex(),
len(rows),
)
return None
coin_id, coin_amount, parent_id, parent_amount, parent_parent_id = rows[0]
coin_id, coin_amount, parent_id, parent_amount, parent_parent_id = rows[
0
]
return UnspentLineageInfo(
coin_id=bytes32(coin_id),
coin_amount=uint64(int_from_bytes(coin_amount)),
Expand Down
Loading
Loading