From 1cb4932df026249387823fa01d814e82b13201d5 Mon Sep 17 00:00:00 2001 From: William Blanke Date: Thu, 5 Dec 2024 17:46:31 -0800 Subject: [PATCH 01/13] changes for main --- chia/consensus/blockchain.py | 7 ++- chia/full_node/coin_store.py | 99 +++++++++++++++++++++++++++--------- chia/full_node/full_node.py | 58 ++++++++++++++++++--- 3 files changed, 131 insertions(+), 33 deletions(-) diff --git a/chia/consensus/blockchain.py b/chia/consensus/blockchain.py index f989bfe96344..db8afbb5fd4b 100644 --- a/chia/consensus/blockchain.py +++ b/chia/consensus/blockchain.py @@ -3,6 +3,7 @@ import asyncio import dataclasses import enum +import inspect import logging import traceback from concurrent.futures import Executor, ThreadPoolExecutor @@ -399,7 +400,9 @@ async def add_block( try: # Always add the block to the database - async with self.block_store.db_wrapper.writer(): + async with self.block_store.db_wrapper.writer_maybe_transaction() as conn: + frame = inspect.currentframe().f_back + log.info(f"BEGIN WJB task {asyncio.current_task().get_name()} add_block writer_maybe_transaction {conn} {inspect.getframeinfo(frame).filename} {frame.f_lineno}") # Perform the DB operations to update the state, and rollback if something goes wrong await self.block_store.add_full_block(header_hash, block, block_record) records, state_change_summary = await self._reconsider_peak(block_record, genesis, fork_info) @@ -408,6 +411,8 @@ async def add_block( # This is done after all async/DB operations, so there is a decreased chance of failure. self.add_block_record(block_record) + log.info(f"END WJB task {asyncio.current_task().get_name()} add_block writer_maybe_transaction {conn} {inspect.getframeinfo(frame).filename} {frame.f_lineno}") + # there's a suspension point here, as we leave the async context # manager diff --git a/chia/full_node/coin_store.py b/chia/full_node/coin_store.py index b7cdbba85858..56b0957c66da 100644 --- a/chia/full_node/coin_store.py +++ b/chia/full_node/coin_store.py @@ -1,5 +1,7 @@ from __future__ import annotations +import inspect +import asyncio import dataclasses import logging import sqlite3 @@ -19,7 +21,6 @@ from chia.util.batches import to_batches from chia.util.db_wrapper import SQLITE_MAX_VARIABLE_NUMBER, DBWrapper2 from chia.util.ints import uint32, uint64 -from chia.util.lru_cache import LRUCache log = logging.getLogger(__name__) @@ -32,13 +33,14 @@ class CoinStore: """ db_wrapper: DBWrapper2 - coins_added_at_height_cache: LRUCache[uint32, list[CoinRecord]] @classmethod async def create(cls, db_wrapper: DBWrapper2) -> CoinStore: if db_wrapper.db_version != 2: - raise RuntimeError(f"CoinStore does not support database schema v{db_wrapper.db_version}") - self = CoinStore(db_wrapper, LRUCache(100)) + raise RuntimeError( + f"CoinStore does not support database schema v{db_wrapper.db_version}" + ) + self = CoinStore(db_wrapper) async with self.db_wrapper.writer_maybe_transaction() as conn: log.info("DB: Creating coin store tables and indexes.") @@ -58,22 +60,32 @@ async def create(cls, db_wrapper: DBWrapper2) -> CoinStore: # Useful for reorg lookups log.info("DB: Creating index coin_confirmed_index") - await conn.execute("CREATE INDEX IF NOT EXISTS coin_confirmed_index on coin_record(confirmed_index)") + await conn.execute( + "CREATE INDEX IF NOT EXISTS coin_confirmed_index on coin_record(confirmed_index)" + ) log.info("DB: Creating index coin_spent_index") - await conn.execute("CREATE INDEX IF NOT EXISTS coin_spent_index on coin_record(spent_index)") + await conn.execute( + "CREATE INDEX IF NOT EXISTS coin_spent_index on coin_record(spent_index)" + ) log.info("DB: Creating index coin_puzzle_hash") - await conn.execute("CREATE INDEX IF NOT EXISTS coin_puzzle_hash on coin_record(puzzle_hash)") + await conn.execute( + "CREATE INDEX IF NOT EXISTS coin_puzzle_hash on coin_record(puzzle_hash)" + ) log.info("DB: Creating index coin_parent_index") - await conn.execute("CREATE INDEX IF NOT EXISTS coin_parent_index on coin_record(coin_parent)") + await conn.execute( + "CREATE INDEX IF NOT EXISTS coin_parent_index on coin_record(coin_parent)" + ) return self async def num_unspent(self) -> int: async with self.db_wrapper.reader_no_transaction() as conn: - async with conn.execute("SELECT COUNT(*) FROM coin_record WHERE spent_index=0") as cursor: + async with conn.execute( + "SELECT COUNT(*) FROM coin_record WHERE spent_index=0" + ) as cursor: row = await cursor.fetchone() if row is not None: count: int = row[0] @@ -156,6 +168,18 @@ async def get_coin_records(self, names: Collection[bytes32]) -> list[CoinRecord] coins: list[CoinRecord] = [] async with self.db_wrapper.reader_no_transaction() as conn: + log.info( + f"BEGIN WJB task {asyncio.current_task().get_name()} get_coin_records reader_no_transaction {conn}" + ) + frame = inspect.currentframe().f_back + log.info(f"WJB 1 {inspect.getframeinfo(frame).filename} {frame.f_lineno} {conn}") + frame = frame.f_back + log.info(f"WJB 2 {inspect.getframeinfo(frame).filename} {frame.f_lineno} {conn}") + frame = frame.f_back + log.info(f"WJB 3 {inspect.getframeinfo(frame).filename} {frame.f_lineno} {conn}") + frame = frame.f_back + log.info(f"WJB 4 {inspect.getframeinfo(frame).filename} {frame.f_lineno} {conn}") + cursors: list[Cursor] = [] for batch in to_batches(names, SQLITE_MAX_VARIABLE_NUMBER): names_db: tuple[Any, ...] = tuple(batch.entries) @@ -173,14 +197,12 @@ async def get_coin_records(self, names: Collection[bytes32]) -> list[CoinRecord] coin = self.row_to_coin(row) record = CoinRecord(coin, row[0], row[1], row[2], row[6]) coins.append(record) - + log.info( + f"END WJB task {asyncio.current_task().get_name()} get_coin_records reader_no_transaction {conn} {inspect.getframeinfo(frame).filename} {frame.f_lineno}" + ) return coins async def get_coins_added_at_height(self, height: uint32) -> list[CoinRecord]: - coins_added: Optional[list[CoinRecord]] = self.coins_added_at_height_cache.get(height) - if coins_added is not None: - return coins_added - async with self.db_wrapper.reader_no_transaction() as conn: async with conn.execute( "SELECT confirmed_index, spent_index, coinbase, puzzle_hash, " @@ -192,7 +214,6 @@ async def get_coins_added_at_height(self, height: uint32) -> list[CoinRecord]: for row in rows: coin = self.row_to_coin(row) coins.append(CoinRecord(coin, row[0], row[1], row[2], row[6])) - self.coins_added_at_height_cache.put(height, coins) return coins async def get_coins_removed_at_height(self, height: uint32) -> list[CoinRecord]: @@ -506,7 +527,11 @@ async def batch_coin_states_by_puzzle_hashes( coin_states = list(coin_states_dict.values()) if include_hinted: - coin_states.sort(key=lambda cr: max(cr.created_height or uint32(0), cr.spent_height or uint32(0))) + coin_states.sort( + key=lambda cr: max( + cr.created_height or uint32(0), cr.spent_height or uint32(0) + ) + ) while len(coin_states) > max_items + 1: coin_states.pop() @@ -517,13 +542,20 @@ async def batch_coin_states_by_puzzle_hashes( # The last item is the start of the next batch of coin states. next_coin_state = coin_states.pop() - next_height = uint32(max(next_coin_state.created_height or 0, next_coin_state.spent_height or 0)) + next_height = uint32( + max(next_coin_state.created_height or 0, next_coin_state.spent_height or 0) + ) # In order to prevent blocks from being split up between batches, remove # all coin states whose max height is the same as the last coin state's height. while len(coin_states) > 0: last_coin_state = coin_states[-1] - height = uint32(max(last_coin_state.created_height or 0, last_coin_state.spent_height or 0)) + height = uint32( + max( + last_coin_state.created_height or 0, + last_coin_state.spent_height or 0, + ) + ) if height != next_height: break @@ -551,7 +583,9 @@ async def rollback_to_block(self, block_index: int) -> list[CoinRecord]: coin_changes[record.name] = record # Delete reverted blocks from storage - await conn.execute("DELETE FROM coin_record WHERE confirmed_index>?", (block_index,)) + await conn.execute( + "DELETE FROM coin_record WHERE confirmed_index>?", (block_index,) + ) # Add coins that are confirmed in the reverted blocks to the list of changed coins. async with conn.execute( @@ -565,8 +599,10 @@ async def rollback_to_block(self, block_index: int) -> list[CoinRecord]: if record.name not in coin_changes: coin_changes[record.name] = record - await conn.execute("UPDATE coin_record SET spent_index=0 WHERE spent_index>?", (block_index,)) - self.coins_added_at_height_cache = LRUCache(self.coins_added_at_height_cache.capacity) + await conn.execute( + "UPDATE coin_record SET spent_index=0 WHERE spent_index>?", + (block_index,), + ) return list(coin_changes.values()) # Store CoinRecord in DB @@ -587,10 +623,17 @@ async def _add_coin_records(self, records: list[CoinRecord]) -> None: ) if len(values2) > 0: async with self.db_wrapper.writer_maybe_transaction() as conn: + frame = inspect.currentframe().f_back + log.info( + f"BEGIN WJB task {asyncio.current_task().get_name()} _add_coin_records writer_maybe_transaction {conn} {inspect.getframeinfo(frame).filename} {frame.f_lineno}" + ) await conn.executemany( "INSERT INTO coin_record VALUES(?, ?, ?, ?, ?, ?, ?, ?)", values2, ) + log.info( + f"END WJB task {asyncio.current_task().get_name()} _add_coin_records writer_maybe_transaction {conn} {inspect.getframeinfo(frame).filename} {frame.f_lineno}" + ) # Update coin_record to be spent in DB async def _set_spent(self, coin_names: list[bytes32], index: uint32) -> None: @@ -617,7 +660,9 @@ async def _set_spent(self, coin_names: list[bytes32], index: uint32) -> None: ) # Lookup the most recent unspent lineage that matches a puzzle hash - async def get_unspent_lineage_info_for_puzzle_hash(self, puzzle_hash: bytes32) -> Optional[UnspentLineageInfo]: + async def get_unspent_lineage_info_for_puzzle_hash( + self, puzzle_hash: bytes32 + ) -> Optional[UnspentLineageInfo]: async with self.db_wrapper.reader_no_transaction() as conn: async with conn.execute( "SELECT unspent.coin_name, " @@ -635,9 +680,15 @@ async def get_unspent_lineage_info_for_puzzle_hash(self, puzzle_hash: bytes32) - ) as cursor: rows = list(await cursor.fetchall()) if len(rows) != 1: - log.debug("Expected 1 unspent with puzzle hash %s, but found %s", puzzle_hash.hex(), len(rows)) + log.debug( + "Expected 1 unspent with puzzle hash %s, but found %s", + puzzle_hash.hex(), + len(rows), + ) return None - coin_id, coin_amount, parent_id, parent_amount, parent_parent_id = rows[0] + coin_id, coin_amount, parent_id, parent_amount, parent_parent_id = rows[ + 0 + ] return UnspentLineageInfo( coin_id=bytes32(coin_id), coin_amount=uint64(int_from_bytes(coin_amount)), diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index e644ecb8b4c7..c66feeab9a79 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -634,9 +634,21 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t self.constants, new_slot, prev_b, self.blockchain ) vs = ValidationState(ssi, diff, None) - success, state_change_summary, _err = await self.add_block_batch( - AugmentedBlockchain(self.blockchain), response.blocks, peer_info, fork_info, vs - ) + + # Wrap add_block_batch with writer to ensure all writes and reads are on same connection. + # add_block_batch should only be called under priority_mutex so this will not stall other + # writes to the DB. + async with self.block_store.db_wrapper.writer() as conn: + self.log.info( + f"BEGIN WJB task {asyncio.current_task().get_name()} short sync add_block_batch writer {conn}" + ) + success, state_change_summary, _err = await self.add_block_batch( + AugmentedBlockchain(self.blockchain), response.blocks, peer_info, fork_info, vs + ) + self.log.info( + f"END WJB task {asyncio.current_task().get_name()} short sync add_block_batch writer {conn}" + ) + if not success: raise ValueError(f"Error short batch syncing, failed to validate blocks {height}-{end_height}") if state_change_summary is not None: @@ -707,10 +719,21 @@ async def short_sync_backtrack( first_block = blocks[-1] # blocks are reveresd this is the lowest block to add # we create the fork_info and pass it here so it would be updated on each call to add_block fork_info = ForkInfo(first_block.height - 1, first_block.height - 1, first_block.prev_header_hash) - for block in reversed(blocks): - # when syncing, we won't share any signatures with the - # mempool, so there's no need to pass in the BLS cache. - await self.add_block(block, peer, fork_info=fork_info) + + # Wrap add_block with writer to ensure all writes and reads are on same connection. + # add_block should only be called under priority_mutex so this will not stall other + # writes to the DB. + async with self.block_store.db_wrapper.writer() as conn: + self.log.info( + f"BEGIN WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" + ) + for block in reversed(blocks): + # when syncing, we won't share any signatures with the + # mempool, so there's no need to pass in the BLS cache. + await self.add_block(block, peer, fork_info=fork_info) + self.log.info( + f"END WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" + ) except (asyncio.CancelledError, Exception): self.sync_store.decrement_backtrack_syncing(node_id=peer.peer_node_id) raise @@ -1347,6 +1370,26 @@ async def ingest_blocks( pre_validation_results = list(await asyncio.gather(*futures)) # The ValidationState object (vs) is an in-out parameter. the add_block_batch() # call will update it + + # Wrap add_prevalidated_blocks with writer to ensure all writes and reads are on same connection. + # add_prevalidated_blocks should only be called under priority_mutex so this will not stall other + # writes to the DB. + async with self.block_store.db_wrapper.writer() as conn: + self.log.info( + f"BEGIN WJB task {asyncio.current_task().get_name()} ingest add_prevalidated_blocks writer {conn}" + ) + state_change_summary, err = await self.add_prevalidated_blocks( + blockchain, + blocks, + pre_validation_results, + fork_info, + peer.peer_info, + vs, + ) + self.log.info( + f"END WJB task {asyncio.current_task().get_name()} ingest add_prevalidated_blocks writer {conn}" + ) + state_change_summary, err = await self.add_prevalidated_blocks( blockchain, blocks, @@ -1478,7 +1521,6 @@ async def add_block_batch( # Returns a bool for success, as well as a StateChangeSummary if the peak was advanced pre_validate_start = time.monotonic() - blockchain = AugmentedBlockchain(self.blockchain) blocks_to_validate = await self.skip_blocks(blockchain, all_blocks, fork_info, vs) if len(blocks_to_validate) == 0: From ea5167ea2d33092d9d89c9abb0def66b89d113b2 Mon Sep 17 00:00:00 2001 From: William Blanke Date: Thu, 5 Dec 2024 17:54:13 -0800 Subject: [PATCH 02/13] oops --- chia/full_node/full_node.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index c66feeab9a79..2bfc1d2dce0a 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -1390,14 +1390,6 @@ async def ingest_blocks( f"END WJB task {asyncio.current_task().get_name()} ingest add_prevalidated_blocks writer {conn}" ) - state_change_summary, err = await self.add_prevalidated_blocks( - blockchain, - blocks, - pre_validation_results, - fork_info, - peer.peer_info, - vs, - ) if err is not None: await peer.close(600) raise ValueError(f"Failed to validate block batch {start_height} to {end_height}: {err}") From f2a707bdc104c7766080bb1a519071e38d8317de Mon Sep 17 00:00:00 2001 From: William Blanke Date: Thu, 5 Dec 2024 18:47:17 -0800 Subject: [PATCH 03/13] do nothing if no blocks --- chia/full_node/full_node.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 2bfc1d2dce0a..bc631184cbf8 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -720,20 +720,21 @@ async def short_sync_backtrack( # we create the fork_info and pass it here so it would be updated on each call to add_block fork_info = ForkInfo(first_block.height - 1, first_block.height - 1, first_block.prev_header_hash) - # Wrap add_block with writer to ensure all writes and reads are on same connection. - # add_block should only be called under priority_mutex so this will not stall other - # writes to the DB. - async with self.block_store.db_wrapper.writer() as conn: - self.log.info( - f"BEGIN WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" - ) - for block in reversed(blocks): - # when syncing, we won't share any signatures with the - # mempool, so there's no need to pass in the BLS cache. - await self.add_block(block, peer, fork_info=fork_info) - self.log.info( - f"END WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" - ) + if len(blocks) > 0: + # Wrap add_block with writer to ensure all writes and reads are on same connection. + # add_block should only be called under priority_mutex so this will not stall other + # writes to the DB. + async with self.block_store.db_wrapper.writer() as conn: + self.log.info( + f"BEGIN WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" + ) + for block in reversed(blocks): + # when syncing, we won't share any signatures with the + # mempool, so there's no need to pass in the BLS cache. + await self.add_block(block, peer, fork_info=fork_info) + self.log.info( + f"END WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" + ) except (asyncio.CancelledError, Exception): self.sync_store.decrement_backtrack_syncing(node_id=peer.peer_node_id) raise From 20a85e9a0bb20f0dcafbcda8b625efde76d74d83 Mon Sep 17 00:00:00 2001 From: William Blanke Date: Fri, 6 Dec 2024 11:17:53 -0800 Subject: [PATCH 04/13] set cached statements to 0 since python is flakey here --- chia/full_node/full_node.py | 29 ++++++++++++++--------------- chia/util/db_wrapper.py | 2 +- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index bc631184cbf8..2bfc1d2dce0a 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -720,21 +720,20 @@ async def short_sync_backtrack( # we create the fork_info and pass it here so it would be updated on each call to add_block fork_info = ForkInfo(first_block.height - 1, first_block.height - 1, first_block.prev_header_hash) - if len(blocks) > 0: - # Wrap add_block with writer to ensure all writes and reads are on same connection. - # add_block should only be called under priority_mutex so this will not stall other - # writes to the DB. - async with self.block_store.db_wrapper.writer() as conn: - self.log.info( - f"BEGIN WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" - ) - for block in reversed(blocks): - # when syncing, we won't share any signatures with the - # mempool, so there's no need to pass in the BLS cache. - await self.add_block(block, peer, fork_info=fork_info) - self.log.info( - f"END WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" - ) + # Wrap add_block with writer to ensure all writes and reads are on same connection. + # add_block should only be called under priority_mutex so this will not stall other + # writes to the DB. + async with self.block_store.db_wrapper.writer() as conn: + self.log.info( + f"BEGIN WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" + ) + for block in reversed(blocks): + # when syncing, we won't share any signatures with the + # mempool, so there's no need to pass in the BLS cache. + await self.add_block(block, peer, fork_info=fork_info) + self.log.info( + f"END WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" + ) except (asyncio.CancelledError, Exception): self.sync_store.decrement_backtrack_syncing(node_id=peer.peer_node_id) raise diff --git a/chia/util/db_wrapper.py b/chia/util/db_wrapper.py index b1d2b05e07b0..cc495ed95eea 100644 --- a/chia/util/db_wrapper.py +++ b/chia/util/db_wrapper.py @@ -74,7 +74,7 @@ async def _create_connection( log_file: Optional[TextIO] = None, name: Optional[str] = None, ) -> aiosqlite.Connection: - connection = await aiosqlite.connect(database=database, uri=uri) + connection = await aiosqlite.connect(database=database, uri=uri, cached_statements=0) if log_file is not None: await connection.set_trace_callback(functools.partial(sql_trace_callback, file=log_file, name=name)) From 643af4c088a36daa999862e68c268105c6c1aca2 Mon Sep 17 00:00:00 2001 From: William Blanke Date: Sun, 8 Dec 2024 15:07:33 -0800 Subject: [PATCH 05/13] fixed deadlock by ordering pm and write mutexes --- chia/full_node/full_node.py | 332 ++++++++++++++++++++++++++++-------- 1 file changed, 259 insertions(+), 73 deletions(-) diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 2bfc1d2dce0a..212d88b2605e 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -13,7 +13,17 @@ from collections.abc import AsyncIterator, Awaitable, Sequence from multiprocessing.context import BaseContext from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, ClassVar, Optional, TextIO, Union, cast, final +from typing import ( + TYPE_CHECKING, + Any, + Callable, + ClassVar, + Optional, + TextIO, + Union, + cast, + final, +) from chia_rs import ( AugSchemeMPL, @@ -27,18 +37,30 @@ from chia.consensus.block_body_validation import ForkInfo from chia.consensus.block_creation import unfinished_block_to_full_block from chia.consensus.block_record import BlockRecord -from chia.consensus.blockchain import AddBlockResult, Blockchain, BlockchainMutexPriority, StateChangeSummary +from chia.consensus.blockchain import ( + AddBlockResult, + Blockchain, + BlockchainMutexPriority, + StateChangeSummary, +) from chia.consensus.blockchain_interface import BlockchainInterface from chia.consensus.constants import ConsensusConstants from chia.consensus.cost_calculator import NPCResult from chia.consensus.difficulty_adjustment import get_next_sub_slot_iters_and_difficulty from chia.consensus.make_sub_epoch_summary import next_sub_epoch_summary -from chia.consensus.multiprocess_validation import PreValidationResult, pre_validate_block +from chia.consensus.multiprocess_validation import ( + PreValidationResult, + pre_validate_block, +) from chia.consensus.pot_iterations import calculate_sp_iters from chia.full_node.block_store import BlockStore from chia.full_node.coin_store import CoinStore from chia.full_node.full_node_api import FullNodeAPI -from chia.full_node.full_node_store import FullNodeStore, FullNodeStorePeakResult, UnfinishedBlockEntry +from chia.full_node.full_node_store import ( + FullNodeStore, + FullNodeStorePeakResult, + UnfinishedBlockEntry, +) from chia.full_node.hint_management import get_hints_and_subscription_coin_ids from chia.full_node.hint_store import HintStore from chia.full_node.mempool import MempoolRemoveInfo @@ -48,12 +70,30 @@ from chia.full_node.sync_store import Peak, SyncStore from chia.full_node.tx_processing_queue import TransactionQueue from chia.full_node.weight_proof import WeightProofHandler -from chia.protocols import farmer_protocol, full_node_protocol, timelord_protocol, wallet_protocol -from chia.protocols.farmer_protocol import SignagePointSourceData, SPSubSlotSourceData, SPVDFSourceData -from chia.protocols.full_node_protocol import RequestBlocks, RespondBlock, RespondBlocks, RespondSignagePoint +from chia.protocols import ( + farmer_protocol, + full_node_protocol, + timelord_protocol, + wallet_protocol, +) +from chia.protocols.farmer_protocol import ( + SignagePointSourceData, + SPSubSlotSourceData, + SPVDFSourceData, +) +from chia.protocols.full_node_protocol import ( + RequestBlocks, + RespondBlock, + RespondBlocks, + RespondSignagePoint, +) from chia.protocols.protocol_message_types import ProtocolMessageTypes from chia.protocols.shared_protocol import Capability -from chia.protocols.wallet_protocol import CoinState, CoinStateUpdate, RemovedMempoolItem +from chia.protocols.wallet_protocol import ( + CoinState, + CoinStateUpdate, + RemovedMempoolItem, +) from chia.rpc.rpc_server import StateChangedProtocol from chia.server.node_discovery import FullNodePeers from chia.server.outbound_message import Message, NodeType, make_msg @@ -63,7 +103,12 @@ from chia.types.blockchain_format.pool_target import PoolTarget from chia.types.blockchain_format.sized_bytes import bytes32 from chia.types.blockchain_format.sub_epoch_summary import SubEpochSummary -from chia.types.blockchain_format.vdf import CompressibleVDFField, VDFInfo, VDFProof, validate_vdf +from chia.types.blockchain_format.vdf import ( + CompressibleVDFField, + VDFInfo, + VDFProof, + validate_vdf, +) from chia.types.coin_record import CoinRecord from chia.types.end_of_slot_bundle import EndOfSubSlotBundle from chia.types.full_block import FullBlock @@ -315,8 +360,12 @@ async def manage(self) -> AsyncIterator[None]: f" {peak.height}, " f"time taken: {int(time_taken)}s" ) + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} mempool new_peak") async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): + self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} mempool new_peak") pending_tx = await self.mempool_manager.new_peak(self.blockchain.get_tx_peak(), None) + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} mempool new_peak") + assert len(pending_tx.items) == 0 # no pending transactions when starting up full_peak: Optional[FullBlock] = await self.blockchain.get_full_peak() @@ -589,7 +638,8 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t self.log.info(f"Starting batch short sync from {start_height} to height {target_height}") if start_height > 0: first = await peer.call_api( - FullNodeAPI.request_block, full_node_protocol.RequestBlock(uint32(start_height), False) + FullNodeAPI.request_block, + full_node_protocol.RequestBlock(uint32(start_height), False), ) if first is None or not isinstance(first, full_node_protocol.RespondBlock): self.sync_store.batch_syncing.remove(peer.peer_node_id) @@ -623,7 +673,9 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t response = await peer.call_api(FullNodeAPI.request_blocks, request) if not response: raise ValueError(f"Error short batch syncing, invalid/no response for {height}-{end_height}") + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} short batch syncing") async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): + self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} short batch syncing") state_change_summary: Optional[StateChangeSummary] prev_b = None if response.blocks[0].height > 0: @@ -643,7 +695,11 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t f"BEGIN WJB task {asyncio.current_task().get_name()} short sync add_block_batch writer {conn}" ) success, state_change_summary, _err = await self.add_block_batch( - AugmentedBlockchain(self.blockchain), response.blocks, peer_info, fork_info, vs + AugmentedBlockchain(self.blockchain), + response.blocks, + peer_info, + fork_info, + vs, ) self.log.info( f"END WJB task {asyncio.current_task().get_name()} short sync add_block_batch writer {conn}" @@ -671,10 +727,15 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t self.log.info(f"Added blocks {height}-{end_height}") finally: self.sync_store.batch_syncing.remove(peer.peer_node_id) + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} short batch syncing") return True async def short_sync_backtrack( - self, peer: WSChiaConnection, peak_height: uint32, target_height: uint32, target_unf_hash: bytes32 + self, + peer: WSChiaConnection, + peak_height: uint32, + target_height: uint32, + target_unf_hash: bytes32, ) -> bool: """ Performs a backtrack sync, where blocks are downloaded one at a time from newest to oldest. If we do not @@ -702,7 +763,8 @@ async def short_sync_backtrack( # but not the transactions fetch_tx: bool = unfinished_block is None or curr_height != target_height curr = await peer.call_api( - FullNodeAPI.request_block, full_node_protocol.RequestBlock(uint32(curr_height), fetch_tx) + FullNodeAPI.request_block, + full_node_protocol.RequestBlock(uint32(curr_height), fetch_tx), ) if curr is None: raise ValueError(f"Failed to fetch block {curr_height} from {peer.get_peer_logging()}, timed out") @@ -718,22 +780,34 @@ async def short_sync_backtrack( if found_fork_point: first_block = blocks[-1] # blocks are reveresd this is the lowest block to add # we create the fork_info and pass it here so it would be updated on each call to add_block - fork_info = ForkInfo(first_block.height - 1, first_block.height - 1, first_block.prev_header_hash) + fork_info = ForkInfo( + first_block.height - 1, + first_block.height - 1, + first_block.prev_header_hash, + ) # Wrap add_block with writer to ensure all writes and reads are on same connection. # add_block should only be called under priority_mutex so this will not stall other # writes to the DB. - async with self.block_store.db_wrapper.writer() as conn: - self.log.info( - f"BEGIN WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" - ) - for block in reversed(blocks): - # when syncing, we won't share any signatures with the - # mempool, so there's no need to pass in the BLS cache. - await self.add_block(block, peer, fork_info=fork_info) - self.log.info( - f"END WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" - ) + if len(blocks) > 0: + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} short_sync_backtrack") + async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): + self.log.info( + f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} short_sync_backtrack" + ) + async with self.block_store.db_wrapper.writer() as conn: + self.log.info( + f"BEGIN WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" + ) + for block in reversed(blocks): + # when syncing, we won't share any signatures with the + # mempool, so there's no need to pass in the BLS cache. + await self.add_block(block, peer, fork_info=fork_info) + self.log.info( + f"END WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" + ) + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} short_sync_backtrack") + except (asyncio.CancelledError, Exception): self.sync_store.decrement_backtrack_syncing(node_id=peer.peer_node_id) raise @@ -807,7 +881,10 @@ async def new_peak(self, request: full_node_protocol.NewPeak, peer: WSChiaConnec ): # This is the normal case of receiving the next block if await self.short_sync_backtrack( - peer, curr_peak_height, request.height, request.unfinished_reward_block_hash + peer, + curr_peak_height, + request.height, + request.unfinished_reward_block_hash, ): return None @@ -837,7 +914,9 @@ async def new_peak(self, request: full_node_protocol.NewPeak, peer: WSChiaConnec self._sync_task_list.append(asyncio.create_task(self._sync())) async def send_peak_to_timelords( - self, peak_block: Optional[FullBlock] = None, peer: Optional[WSChiaConnection] = None + self, + peak_block: Optional[FullBlock] = None, + peer: Optional[WSChiaConnection] = None, ) -> None: """ Sends current peak to timelords @@ -1039,14 +1118,22 @@ async def _sync(self) -> None: for i, target_peak_response in enumerate(await asyncio.gather(*coroutines)): if target_peak_response is not None and isinstance(target_peak_response, RespondBlock): self.sync_store.peer_has_block( - target_peak.header_hash, peers[i].peer_node_id, target_peak.weight, target_peak.height, False + target_peak.header_hash, + peers[i].peer_node_id, + target_peak.weight, + target_peak.height, + False, ) # TODO: disconnect from peer which gave us the heaviest_peak, if nobody has the peak fork_point, summaries = await self.request_validate_wp( target_peak.header_hash, target_peak.height, target_peak.weight ) # Ensures that the fork point does not change + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} Ensures that the fork point") async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): + self.log.info( + f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} Ensures that the fork point" + ) await self.blockchain.warmup(fork_point) fork_point = await check_fork_next_block( self.blockchain, @@ -1061,6 +1148,7 @@ async def _sync(self) -> None: tb = traceback.format_exc() self.log.error(f"Error with syncing: {type(e)}{tb}") finally: + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} Ensures that the fork point") if self._shut_down: return None await self._finish_sync(fork_point) @@ -1291,7 +1379,12 @@ async def validate_blocks( input_queue: asyncio.Queue[Optional[tuple[WSChiaConnection, list[FullBlock]]]], output_queue: asyncio.Queue[ Optional[ - tuple[WSChiaConnection, ValidationState, list[Awaitable[PreValidationResult]], list[FullBlock]] + tuple[ + WSChiaConnection, + ValidationState, + list[Awaitable[PreValidationResult]], + list[FullBlock], + ] ] ], ) -> None: @@ -1347,7 +1440,12 @@ async def validate_blocks( async def ingest_blocks( input_queue: asyncio.Queue[ Optional[ - tuple[WSChiaConnection, ValidationState, list[Awaitable[PreValidationResult]], list[FullBlock]] + tuple[ + WSChiaConnection, + ValidationState, + list[Awaitable[PreValidationResult]], + list[FullBlock], + ] ] ], ) -> None: @@ -1374,21 +1472,27 @@ async def ingest_blocks( # Wrap add_prevalidated_blocks with writer to ensure all writes and reads are on same connection. # add_prevalidated_blocks should only be called under priority_mutex so this will not stall other # writes to the DB. - async with self.block_store.db_wrapper.writer() as conn: - self.log.info( - f"BEGIN WJB task {asyncio.current_task().get_name()} ingest add_prevalidated_blocks writer {conn}" - ) - state_change_summary, err = await self.add_prevalidated_blocks( - blockchain, - blocks, - pre_validation_results, - fork_info, - peer.peer_info, - vs, - ) + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} add_prevalidated_blocks") + async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): self.log.info( - f"END WJB task {asyncio.current_task().get_name()} ingest add_prevalidated_blocks writer {conn}" + f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} add_prevalidated_blocks" ) + async with self.block_store.db_wrapper.writer() as conn: + self.log.info( + f"BEGIN WJB task {asyncio.current_task().get_name()} ingest add_prevalidated_blocks writer {conn}" + ) + state_change_summary, err = await self.add_prevalidated_blocks( + blockchain, + blocks, + pre_validation_results, + fork_info, + peer.peer_info, + vs, + ) + self.log.info( + f"END WJB task {asyncio.current_task().get_name()} ingest add_prevalidated_blocks writer {conn}" + ) + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} add_prevalidated_blocks") if err is not None: await peer.close(600) @@ -1421,7 +1525,14 @@ async def ingest_blocks( block_queue: asyncio.Queue[Optional[tuple[WSChiaConnection, list[FullBlock]]]] = asyncio.Queue(maxsize=10) validation_queue: asyncio.Queue[ - Optional[tuple[WSChiaConnection, ValidationState, list[Awaitable[PreValidationResult]], list[FullBlock]]] + Optional[ + tuple[ + WSChiaConnection, + ValidationState, + list[Awaitable[PreValidationResult]], + list[FullBlock], + ] + ] ] = asyncio.Queue(maxsize=10) fetch_task = asyncio.create_task(fetch_blocks(block_queue)) @@ -1659,7 +1770,11 @@ async def add_prevalidated_blocks( assert expected_sub_slot_iters == vs.ssi assert expected_difficulty == vs.difficulty result, error, state_change_summary = await self.blockchain.add_block( - block, pre_validation_results[i], vs.ssi, fork_info, prev_ses_block=vs.prev_ses_block + block, + pre_validation_results[i], + vs.ssi, + fork_info, + prev_ses_block=vs.prev_ses_block, ) if error is None: blockchain.remove_extra_block(header_hash) @@ -1683,7 +1798,10 @@ async def add_prevalidated_blocks( agg_state_change_summary.additions + state_change_summary.additions, agg_state_change_summary.new_rewards + state_change_summary.new_rewards, ) - elif result in {AddBlockResult.INVALID_BLOCK, AddBlockResult.DISCONNECTED_BLOCK}: + elif result in { + AddBlockResult.INVALID_BLOCK, + AddBlockResult.DISCONNECTED_BLOCK, + }: if error is not None: self.log.error(f"Error: {error}, Invalid block from peer: {peer_info} ") return agg_state_change_summary, error @@ -1748,7 +1866,9 @@ async def _finish_sync(self, fork_point: Optional[uint32]) -> None: if self._server is None: return None + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} _finish_sync") async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): + self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} _finish_sync") peak: Optional[BlockRecord] = self.blockchain.get_peak() peak_fb: Optional[FullBlock] = await self.blockchain.get_full_peak() if peak_fb is not None: @@ -1760,6 +1880,7 @@ async def _finish_sync(self, fork_point: Optional[uint32]) -> None: peak_fb, state_change_summary, None ) await self.peak_post_processing_2(peak_fb, None, state_change_summary, ppp_result) + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} _finish_sync") if peak is not None and self.weight_proof_handler is not None: await self.weight_proof_handler.get_proof_of_weight(peak.header_hash) @@ -1915,7 +2036,9 @@ async def peak_post_processing( and sp.rc_proof is not None ) await self.signage_point_post_processing( - RespondSignagePoint(index, sp.cc_vdf, sp.cc_proof, sp.rc_vdf, sp.rc_proof), peer, sub_slots[1] + RespondSignagePoint(index, sp.cc_vdf, sp.cc_proof, sp.rc_vdf, sp.rc_proof), + peer, + sub_slots[1], ) if sub_slots[1] is None: @@ -2007,7 +2130,9 @@ async def peak_post_processing_2( } peak = Peak( - state_change_summary.peak.header_hash, state_change_summary.peak.height, state_change_summary.peak.weight + state_change_summary.peak.header_hash, + state_change_summary.peak.height, + state_change_summary.peak.weight, ) # Looks up coin records in DB for the coins that wallets are interested in @@ -2083,7 +2208,8 @@ async def add_block( return None block_response: Optional[Any] = await peer.call_api( - FullNodeAPI.request_block, full_node_protocol.RequestBlock(block.height, True) + FullNodeAPI.request_block, + full_node_protocol.RequestBlock(block.height, True), ) if block_response is None or not isinstance(block_response, full_node_protocol.RespondBlock): self.log.warning( @@ -2107,13 +2233,7 @@ async def add_block( return await self.add_block(new_block, peer, bls_cache) state_change_summary: Optional[StateChangeSummary] = None ppp_result: Optional[PeakPostProcessingResult] = None - async with ( - self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high), - enable_profiler(self.profile_block_validation) as pr, - ): - # After acquiring the lock, check again, because another asyncio thread might have added it - if self.blockchain.contains_block(header_hash): - return None + async with enable_profiler(self.profile_block_validation) as pr: validation_start = time.monotonic() # Tries to add the block to the blockchain, if we already validated transactions, don't do it again conds = None @@ -2177,7 +2297,8 @@ async def add_block( # Evict any related BLS cache entries as we no longer need them if bls_cache is not None and pre_validation_result.conds is not None: pairs_pks, pairs_msgs = pkm_pairs( - pre_validation_result.conds, self.constants.AGG_SIG_ME_ADDITIONAL_DATA + pre_validation_result.conds, + self.constants.AGG_SIG_ME_ADDITIONAL_DATA, ) bls_cache.evict(pairs_pks, pairs_msgs) # Only propagate blocks which extend the blockchain (becomes one of the heads) @@ -2211,7 +2332,12 @@ async def add_block( percent_full_str = ( ( ", percent full: " - + str(round(100.0 * float(block.transactions_info.cost) / self.constants.MAX_BLOCK_COST_CLVM, 3)) + + str( + round( + 100.0 * float(block.transactions_info.cost) / self.constants.MAX_BLOCK_COST_CLVM, + 3, + ) + ) + "%" ) if block.transactions_info is not None @@ -2347,7 +2473,11 @@ async def add_unfinished_block( npc_result: Optional[NPCResult] = None pre_validation_time = None + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} validate_unfinished_block_header") async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): + self.log.info( + f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} validate_unfinished_block_header" + ) start_header_time = time.monotonic() _, header_error = await self.blockchain.validate_unfinished_block_header(block) if header_error is not None: @@ -2360,6 +2490,7 @@ async def add_unfinished_block( logging.WARNING if validate_time > 2 else logging.DEBUG, f"Time for header validate: {validate_time:0.3f}s", ) + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} validate_unfinished_block_header") if block.transactions_generator is not None: pre_validation_start = time.monotonic() @@ -2403,13 +2534,16 @@ async def add_unfinished_block( npc_result = NPCResult(None, conditions) pre_validation_time = time.monotonic() - pre_validation_start + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} validate_unfinished_block") async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): + self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} validate_unfinished_block") # TODO: pre-validate VDFs outside of lock validation_start = time.monotonic() validate_result = await self.blockchain.validate_unfinished_block(block, npc_result) if validate_result.error is not None: raise ConsensusError(Err(validate_result.error)) validation_time = time.monotonic() - validation_start + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} validate_unfinished_block") assert validate_result.required_iters is not None @@ -2447,7 +2581,12 @@ async def add_unfinished_block( percent_full_str = ( ( ", percent full: " - + str(round(100.0 * float(block.transactions_info.cost) / self.constants.MAX_BLOCK_COST_CLVM, 3)) + + str( + round( + 100.0 * float(block.transactions_info.cost) / self.constants.MAX_BLOCK_COST_CLVM, + 3, + ) + ) + "%" ) if block.transactions_info is not None @@ -2502,7 +2641,8 @@ async def add_unfinished_block( msg = make_msg(ProtocolMessageTypes.new_unfinished_block, full_node_request) full_node_request2 = full_node_protocol.NewUnfinishedBlock2( - block.reward_chain_block.get_hash(), block.foliage.foliage_transaction_block_hash + block.reward_chain_block.get_hash(), + block.foliage.foliage_transaction_block_hash, ) msg2 = make_msg(ProtocolMessageTypes.new_unfinished_block2, full_node_request2) @@ -2529,7 +2669,9 @@ def new_clients(conn: WSChiaConnection) -> bool: ) async def new_infusion_point_vdf( - self, request: timelord_protocol.NewInfusionPointVDF, timelord_peer: Optional[WSChiaConnection] = None + self, + request: timelord_protocol.NewInfusionPointVDF, + timelord_peer: Optional[WSChiaConnection] = None, ) -> Optional[Message]: # Lookup unfinished blocks unfinished_block: Optional[UnfinishedBlock] = self.full_node_store.get_unfinished_block( @@ -2625,7 +2767,16 @@ async def new_infusion_point_vdf( self.log.warning("Trying to make a pre-farm block but height is not 0") return None try: - await self.add_block(block, None, self._bls_cache, raise_on_disconnected=True) + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} timelord add_block") + async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): + self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} timelord add_block") + async with self.block_store.db_wrapper.writer() as conn: + self.log.info( + f"BEGIN WJB task {asyncio.current_task().get_name()} timelord add_block writer {conn}" + ) + await self.add_block(block, None, self._bls_cache, raise_on_disconnected=True) + self.log.info(f"END WJB task {asyncio.current_task().get_name()} timelord add_block writer {conn}") + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} timelord add_block") except Exception as e: self.log.warning(f"Consensus error validating block: {e}") if timelord_peer is not None: @@ -2660,7 +2811,10 @@ async def add_end_of_sub_slot( bytes32.zeros, ) return ( - make_msg(ProtocolMessageTypes.request_signage_point_or_end_of_sub_slot, full_node_request), + make_msg( + ProtocolMessageTypes.request_signage_point_or_end_of_sub_slot, + full_node_request, + ), False, ) @@ -2716,7 +2870,8 @@ async def add_end_of_sub_slot( uint32(0) if peak is None else peak.height, sp_source_data=SignagePointSourceData( sub_slot_data=SPSubSlotSourceData( - end_of_slot_bundle.challenge_chain, end_of_slot_bundle.reward_chain + end_of_slot_bundle.challenge_chain, + end_of_slot_bundle.reward_chain, ) ), ) @@ -2731,7 +2886,11 @@ async def add_end_of_sub_slot( return None, False async def add_transaction( - self, transaction: SpendBundle, spend_name: bytes32, peer: Optional[WSChiaConnection] = None, test: bool = False + self, + transaction: SpendBundle, + spend_name: bytes32, + peer: Optional[WSChiaConnection] = None, + test: bool = False, ) -> tuple[MempoolInclusionStatus, Optional[Err]]: if self.sync_store.get_sync_mode(): return MempoolInclusionStatus.FAILED, Err.NO_TRANSACTIONS_WHILE_SYNCING @@ -2763,17 +2922,23 @@ async def add_transaction( self.mempool_manager.remove_seen(spend_name) raise + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} mempool") async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.low): + self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} mempool") if self.mempool_manager.get_spendbundle(spend_name) is not None: self.mempool_manager.remove_seen(spend_name) return MempoolInclusionStatus.SUCCESS, None if self.mempool_manager.peak is None: return MempoolInclusionStatus.FAILED, Err.MEMPOOL_NOT_INITIALIZED info = await self.mempool_manager.add_spend_bundle( - transaction, cost_result, spend_name, self.mempool_manager.peak.height + transaction, + cost_result, + spend_name, + self.mempool_manager.peak.height, ) status = info.status error = info.error + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} mempool") if status == MempoolInclusionStatus.SUCCESS: self.log.debug( f"Added transaction to mempool: {spend_name} mempool size: " @@ -2836,7 +3001,8 @@ async def broadcast_added_tx( continue msg = make_msg( - ProtocolMessageTypes.mempool_items_added, wallet_protocol.MempoolItemsAdded([mempool_item.name]) + ProtocolMessageTypes.mempool_items_added, + wallet_protocol.MempoolItemsAdded([mempool_item.name]), ) await peer.send_message(msg) @@ -2914,7 +3080,10 @@ async def broadcast_removed_tx(self, mempool_removals: list[MempoolRemoveInfo]) ) async def _needs_compact_proof( - self, vdf_info: VDFInfo, header_block: HeaderBlock, field_vdf: CompressibleVDFField + self, + vdf_info: VDFInfo, + header_block: HeaderBlock, + field_vdf: CompressibleVDFField, ) -> bool: if field_vdf == CompressibleVDFField.CC_EOS_VDF: for sub_slot in header_block.finished_sub_slots: @@ -3059,7 +3228,11 @@ async def add_compact_proof_of_time(self, request: timelord_protocol.RespondComp field_vdf = CompressibleVDFField(int(request.field_vdf)) if not await self._can_accept_compact_proof( - request.vdf_info, request.vdf_proof, request.height, request.header_hash, field_vdf + request.vdf_info, + request.vdf_proof, + request.height, + request.header_hash, + field_vdf, ): return None async with self.blockchain.compact_proof_lock: @@ -3144,7 +3317,11 @@ async def request_compact_vdf(self, request: full_node_protocol.RequestCompactVD async def add_compact_vdf(self, request: full_node_protocol.RespondCompactVDF, peer: WSChiaConnection) -> None: field_vdf = CompressibleVDFField(int(request.field_vdf)) if not await self._can_accept_compact_proof( - request.vdf_info, request.vdf_proof, request.height, request.header_hash, field_vdf + request.vdf_info, + request.vdf_proof, + request.height, + request.header_hash, + field_vdf, ): return None async with self.blockchain.compact_proof_lock: @@ -3197,7 +3374,10 @@ def add_to_bad_peak_cache(self, peak_header_hash: bytes32, peak_height: uint32) self.bad_peak_cache = new_cache async def broadcast_uncompact_blocks( - self, uncompact_interval_scan: int, target_uncompact_proofs: int, sanitize_weight_proof_only: bool + self, + uncompact_interval_scan: int, + target_uncompact_proofs: int, + sanitize_weight_proof_only: bool, ) -> None: try: while not self._shut_down: @@ -3218,7 +3398,10 @@ async def broadcast_uncompact_blocks( total_target_uncompact_proofs = target_uncompact_proofs * max(1, len(connected_timelords)) heights = await self.block_store.get_random_not_compactified(total_target_uncompact_proofs) - self.log.info("Heights found for bluebox to compact: [%s]", ", ".join(map(str, heights))) + self.log.info( + "Heights found for bluebox to compact: [%s]", + ", ".join(map(str, heights)), + ) for h in heights: headers = await self.blockchain.get_header_blocks_in_range(h, h, tx_filter=False) @@ -3310,7 +3493,10 @@ async def broadcast_uncompact_blocks( broadcast_list = broadcast_list_chunks[chunk_index] chunk_index = (chunk_index + 1) % len(broadcast_list_chunks) for new_pot in broadcast_list: - msg = make_msg(ProtocolMessageTypes.request_compact_proof_of_time, new_pot) + msg = make_msg( + ProtocolMessageTypes.request_compact_proof_of_time, + new_pot, + ) msgs.append(msg) await self.server.send_to_specific(msgs, peer_node_id) await asyncio.sleep(uncompact_interval_scan) From 6c627b3753ce15eb2487b8a28edf721f8b19d92c Mon Sep 17 00:00:00 2001 From: William Blanke Date: Sun, 8 Dec 2024 15:24:43 -0800 Subject: [PATCH 06/13] fix long sync that i messed up --- chia/full_node/full_node.py | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 212d88b2605e..4fb978f9570f 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -1472,27 +1472,21 @@ async def ingest_blocks( # Wrap add_prevalidated_blocks with writer to ensure all writes and reads are on same connection. # add_prevalidated_blocks should only be called under priority_mutex so this will not stall other # writes to the DB. - self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} add_prevalidated_blocks") - async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): + async with self.block_store.db_wrapper.writer() as conn: self.log.info( - f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} add_prevalidated_blocks" + f"BEGIN WJB task {asyncio.current_task().get_name()} ingest add_prevalidated_blocks writer {conn}" + ) + state_change_summary, err = await self.add_prevalidated_blocks( + blockchain, + blocks, + pre_validation_results, + fork_info, + peer.peer_info, + vs, + ) + self.log.info( + f"END WJB task {asyncio.current_task().get_name()} ingest add_prevalidated_blocks writer {conn}" ) - async with self.block_store.db_wrapper.writer() as conn: - self.log.info( - f"BEGIN WJB task {asyncio.current_task().get_name()} ingest add_prevalidated_blocks writer {conn}" - ) - state_change_summary, err = await self.add_prevalidated_blocks( - blockchain, - blocks, - pre_validation_results, - fork_info, - peer.peer_info, - vs, - ) - self.log.info( - f"END WJB task {asyncio.current_task().get_name()} ingest add_prevalidated_blocks writer {conn}" - ) - self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} add_prevalidated_blocks") if err is not None: await peer.close(600) From 2b18a72c376edce73c6585fc7d08065636d1b8cf Mon Sep 17 00:00:00 2001 From: William Blanke Date: Sun, 8 Dec 2024 15:58:54 -0800 Subject: [PATCH 07/13] avoid grabbing lock and writer for existing blocks --- chia/full_node/full_node.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 4fb978f9570f..82cad2985b94 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -789,7 +789,9 @@ async def short_sync_backtrack( # Wrap add_block with writer to ensure all writes and reads are on same connection. # add_block should only be called under priority_mutex so this will not stall other # writes to the DB. - if len(blocks) > 0: + if (len(blocks) == 1) and (self.blockchain.contains_block(blocks[0].header_hash)): + self.log.info(f"short_sync_backtrack already has {blocks[0].header_hash.hex()}") + else: self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} short_sync_backtrack") async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): self.log.info( From c8d68cdc0f9bf024fb486a8feed3b17a000e21a2 Mon Sep 17 00:00:00 2001 From: William Blanke Date: Sun, 8 Dec 2024 16:11:47 -0800 Subject: [PATCH 08/13] readd check --- chia/full_node/full_node.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 82cad2985b94..722f0f0e43b7 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -2230,6 +2230,9 @@ async def add_block( state_change_summary: Optional[StateChangeSummary] = None ppp_result: Optional[PeakPostProcessingResult] = None async with enable_profiler(self.profile_block_validation) as pr: + # After acquiring the lock, check again, because another asyncio thread might have added it + if self.blockchain.contains_block(header_hash): + return None validation_start = time.monotonic() # Tries to add the block to the blockchain, if we already validated transactions, don't do it again conds = None From 75d7d1057ca2aebad6a38ce08f6fd49c437dc570 Mon Sep 17 00:00:00 2001 From: William Blanke Date: Sun, 8 Dec 2024 16:21:20 -0800 Subject: [PATCH 09/13] better concurrency by only holding locks for one block --- chia/full_node/full_node.py | 39 ++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 722f0f0e43b7..10554d99ba93 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -786,29 +786,32 @@ async def short_sync_backtrack( first_block.prev_header_hash, ) - # Wrap add_block with writer to ensure all writes and reads are on same connection. - # add_block should only be called under priority_mutex so this will not stall other - # writes to the DB. - if (len(blocks) == 1) and (self.blockchain.contains_block(blocks[0].header_hash)): - self.log.info(f"short_sync_backtrack already has {blocks[0].header_hash.hex()}") - else: - self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} short_sync_backtrack") - async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): - self.log.info( - f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} short_sync_backtrack" - ) - async with self.block_store.db_wrapper.writer() as conn: + for block in reversed(blocks): + # Wrap add_block with writer to ensure all writes and reads are on same connection. + # add_block should only be called under priority_mutex so this will not stall other + # writes to the DB. + if self.blockchain.contains_block(block.header_hash): + self.log.info(f"short_sync_backtrack already has {block.header_hash.hex()}") + else: + # Wrap add_block with writer to ensure all writes and reads are on same connection. + # add_block should only be called under priority_mutex so this will not stall other + # writes to the DB. + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} short_sync_backtrack") + async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): self.log.info( - f"BEGIN WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" + f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} short_sync_backtrack" ) - for block in reversed(blocks): + async with self.block_store.db_wrapper.writer() as conn: + self.log.info( + f"BEGIN WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" + ) # when syncing, we won't share any signatures with the # mempool, so there's no need to pass in the BLS cache. await self.add_block(block, peer, fork_info=fork_info) - self.log.info( - f"END WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" - ) - self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} short_sync_backtrack") + self.log.info( + f"END WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" + ) + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} short_sync_backtrack") except (asyncio.CancelledError, Exception): self.sync_store.decrement_backtrack_syncing(node_id=peer.peer_node_id) From ba84cecf10b4519def6d97d096a9b5771232d9ab Mon Sep 17 00:00:00 2001 From: William Blanke Date: Sun, 8 Dec 2024 16:23:16 -0800 Subject: [PATCH 10/13] removed dupe --- chia/full_node/full_node.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 10554d99ba93..73b86b35d7aa 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -787,9 +787,6 @@ async def short_sync_backtrack( ) for block in reversed(blocks): - # Wrap add_block with writer to ensure all writes and reads are on same connection. - # add_block should only be called under priority_mutex so this will not stall other - # writes to the DB. if self.blockchain.contains_block(block.header_hash): self.log.info(f"short_sync_backtrack already has {block.header_hash.hex()}") else: From 7c9a8887438870ef962b9a649628f20d008cfc27 Mon Sep 17 00:00:00 2001 From: William Blanke Date: Sun, 8 Dec 2024 16:32:53 -0800 Subject: [PATCH 11/13] safer place to check block --- chia/full_node/full_node.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 73b86b35d7aa..75434bce16e9 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -787,17 +787,17 @@ async def short_sync_backtrack( ) for block in reversed(blocks): - if self.blockchain.contains_block(block.header_hash): - self.log.info(f"short_sync_backtrack already has {block.header_hash.hex()}") - else: - # Wrap add_block with writer to ensure all writes and reads are on same connection. - # add_block should only be called under priority_mutex so this will not stall other - # writes to the DB. - self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} short_sync_backtrack") - async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): - self.log.info( - f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} short_sync_backtrack" - ) + # Wrap add_block with writer to ensure all writes and reads are on same connection. + # add_block should only be called under priority_mutex so this will not stall other + # writes to the DB. + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} short_sync_backtrack") + async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): + self.log.info( + f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} short_sync_backtrack" + ) + if self.blockchain.contains_block(block.header_hash): + self.log.info(f"short_sync_backtrack main has {block.header_hash.hex()}") + else: async with self.block_store.db_wrapper.writer() as conn: self.log.info( f"BEGIN WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" @@ -808,7 +808,7 @@ async def short_sync_backtrack( self.log.info( f"END WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" ) - self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} short_sync_backtrack") + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} short_sync_backtrack") except (asyncio.CancelledError, Exception): self.sync_store.decrement_backtrack_syncing(node_id=peer.peer_node_id) From 42ce7b17b7d651857e6046fd2aab81ea987f0229 Mon Sep 17 00:00:00 2001 From: William Blanke Date: Mon, 9 Dec 2024 09:58:05 -0800 Subject: [PATCH 12/13] post processing 2 fixes --- chia/full_node/full_node.py | 273 ++++++++++++++++++------------------ 1 file changed, 139 insertions(+), 134 deletions(-) diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index 75434bce16e9..f8f5c2126872 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -4,6 +4,7 @@ import contextlib import copy import dataclasses +import inspect import logging import multiprocessing import random @@ -364,17 +365,17 @@ async def manage(self) -> AsyncIterator[None]: async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} mempool new_peak") pending_tx = await self.mempool_manager.new_peak(self.blockchain.get_tx_peak(), None) - self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} mempool new_peak") - assert len(pending_tx.items) == 0 # no pending transactions when starting up + assert len(pending_tx.items) == 0 # no pending transactions when starting up - full_peak: Optional[FullBlock] = await self.blockchain.get_full_peak() - assert full_peak is not None - state_change_summary = StateChangeSummary(peak, uint32(max(peak.height - 1, 0)), [], [], [], []) - ppp_result: PeakPostProcessingResult = await self.peak_post_processing( - full_peak, state_change_summary, None - ) - await self.peak_post_processing_2(full_peak, None, state_change_summary, ppp_result) + full_peak: Optional[FullBlock] = await self.blockchain.get_full_peak() + assert full_peak is not None + state_change_summary = StateChangeSummary(peak, uint32(max(peak.height - 1, 0)), [], [], [], []) + ppp_result: PeakPostProcessingResult = await self.peak_post_processing( + full_peak, state_change_summary, None + ) + await self.peak_post_processing_2(full_peak, None, state_change_summary, ppp_result) + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} mempool new_peak") if self.config["send_uncompact_interval"] != 0: sanitize_weight_proof_only = False if "sanitize_weight_proof_only" in self.config: @@ -674,6 +675,12 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t if not response: raise ValueError(f"Error short batch syncing, invalid/no response for {height}-{end_height}") self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} short batch syncing") + + peak_fb = None + peer = None + state_change_summary = None + ppp_result = None + async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} short batch syncing") state_change_summary: Optional[StateChangeSummary] @@ -716,7 +723,6 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t state_change_summary, peer, ) - await self.peak_post_processing_2(peak_fb, peer, state_change_summary, ppp_result) except Exception: # Still do post processing after cancel (or exception) peak_fb = await self.blockchain.get_full_peak() @@ -725,9 +731,12 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t raise finally: self.log.info(f"Added blocks {height}-{end_height}") + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} short batch syncing") + if peak_fb is not None: + await self.peak_post_processing_2(peak_fb, peer, state_change_summary, ppp_result) finally: self.sync_store.batch_syncing.remove(peer.peer_node_id) - self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} short batch syncing") + return True async def short_sync_backtrack( @@ -787,28 +796,9 @@ async def short_sync_backtrack( ) for block in reversed(blocks): - # Wrap add_block with writer to ensure all writes and reads are on same connection. - # add_block should only be called under priority_mutex so this will not stall other - # writes to the DB. - self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} short_sync_backtrack") - async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): - self.log.info( - f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} short_sync_backtrack" - ) - if self.blockchain.contains_block(block.header_hash): - self.log.info(f"short_sync_backtrack main has {block.header_hash.hex()}") - else: - async with self.block_store.db_wrapper.writer() as conn: - self.log.info( - f"BEGIN WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" - ) - # when syncing, we won't share any signatures with the - # mempool, so there's no need to pass in the BLS cache. - await self.add_block(block, peer, fork_info=fork_info) - self.log.info( - f"END WJB task {asyncio.current_task().get_name()} short_sync_backtrack add_block writer {conn}" - ) - self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} short_sync_backtrack") + # when syncing, we won't share any signatures with the + # mempool, so there's no need to pass in the BLS cache. + await self.add_block(block, peer, fork_info=fork_info) except (asyncio.CancelledError, Exception): self.sync_store.decrement_backtrack_syncing(node_id=peer.peer_node_id) @@ -1862,6 +1852,10 @@ async def _finish_sync(self, fork_point: Optional[uint32]) -> None: if self._server is None: return None + peak_fb = None + state_change_summary = None + ppp_result = None + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} _finish_sync") async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} _finish_sync") @@ -1875,9 +1869,11 @@ async def _finish_sync(self, fork_point: Optional[uint32]) -> None: ppp_result: PeakPostProcessingResult = await self.peak_post_processing( peak_fb, state_change_summary, None ) - await self.peak_post_processing_2(peak_fb, None, state_change_summary, ppp_result) self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} _finish_sync") + if peak_fb is not None: + await self.peak_post_processing_2(peak_fb, None, state_change_summary, ppp_result) + if peak is not None and self.weight_proof_handler is not None: await self.weight_proof_handler.get_proof_of_weight(peak.header_hash) self._state_changed("block") @@ -2229,100 +2225,118 @@ async def add_block( return await self.add_block(new_block, peer, bls_cache) state_change_summary: Optional[StateChangeSummary] = None ppp_result: Optional[PeakPostProcessingResult] = None + + validation_time = 0 + async with enable_profiler(self.profile_block_validation) as pr: - # After acquiring the lock, check again, because another asyncio thread might have added it - if self.blockchain.contains_block(header_hash): - return None - validation_start = time.monotonic() - # Tries to add the block to the blockchain, if we already validated transactions, don't do it again - conds = None - if pre_validation_result is not None and pre_validation_result.conds is not None: - conds = pre_validation_result.conds + self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} add_block") + async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): + self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} add_block") + async with self.block_store.db_wrapper.writer() as conn: + self.log.info(f"BEGIN WJB task {asyncio.current_task().get_name()} add_block writer {conn}") + frame = inspect.currentframe().f_back + self.log.info(f"WJB A {inspect.getframeinfo(frame).filename} {frame.f_lineno} {conn}") - # Don't validate signatures because we want to validate them in the main thread later, since we have a - # cache available - prev_b = None - prev_ses_block = None - if block.height > 0: - prev_b = await self.blockchain.get_block_record_from_db(block.prev_header_hash) - assert prev_b is not None - curr = prev_b - while curr.height > 0 and curr.sub_epoch_summary_included is None: - curr = self.blockchain.block_record(curr.prev_hash) - prev_ses_block = curr - new_slot = len(block.finished_sub_slots) > 0 - ssi, diff = get_next_sub_slot_iters_and_difficulty(self.constants, new_slot, prev_b, self.blockchain) - future = await pre_validate_block( - self.blockchain.constants, - AugmentedBlockchain(self.blockchain), - block, - self.blockchain.pool, - conds, - ValidationState(ssi, diff, prev_ses_block), - ) - pre_validation_result = await future - added: Optional[AddBlockResult] = None - pre_validation_time = time.monotonic() - validation_start - try: - if pre_validation_result.error is not None: - if Err(pre_validation_result.error) == Err.INVALID_PREV_BLOCK_HASH: - added = AddBlockResult.DISCONNECTED_BLOCK - error_code: Optional[Err] = Err.INVALID_PREV_BLOCK_HASH - elif Err(pre_validation_result.error) == Err.TIMESTAMP_TOO_FAR_IN_FUTURE: - raise TimestampError() - else: - raise ValueError( - f"Failed to validate block {header_hash} height " - f"{block.height}: {Err(pre_validation_result.error).name}" - ) - else: - if fork_info is None: - fork_info = ForkInfo(block.height - 1, block.height - 1, block.prev_header_hash) - (added, error_code, state_change_summary) = await self.blockchain.add_block( - block, pre_validation_result, ssi, fork_info + # After acquiring the lock, check again, because another asyncio thread might have added it + if self.blockchain.contains_block(header_hash): + return None + validation_start = time.monotonic() + # Tries to add the block to the blockchain, if we already validated transactions, don't do it again + conds = None + if pre_validation_result is not None and pre_validation_result.conds is not None: + conds = pre_validation_result.conds + + # Don't validate signatures because we want to validate them in the main thread later, since we have a + # cache available + prev_b = None + prev_ses_block = None + if block.height > 0: + prev_b = await self.blockchain.get_block_record_from_db(block.prev_header_hash) + assert prev_b is not None + curr = prev_b + while curr.height > 0 and curr.sub_epoch_summary_included is None: + curr = self.blockchain.block_record(curr.prev_hash) + prev_ses_block = curr + new_slot = len(block.finished_sub_slots) > 0 + ssi, diff = get_next_sub_slot_iters_and_difficulty( + self.constants, new_slot, prev_b, self.blockchain ) - if added == AddBlockResult.ALREADY_HAVE_BLOCK: - return None - elif added == AddBlockResult.INVALID_BLOCK: - assert error_code is not None - self.log.error(f"Block {header_hash} at height {block.height} is invalid with code {error_code}.") - raise ConsensusError(error_code, [header_hash]) - elif added == AddBlockResult.DISCONNECTED_BLOCK: - self.log.info(f"Disconnected block {header_hash} at height {block.height}") - if raise_on_disconnected: - raise RuntimeError("Expected block to be added, received disconnected block.") - return None - elif added == AddBlockResult.NEW_PEAK: - # Evict any related BLS cache entries as we no longer need them - if bls_cache is not None and pre_validation_result.conds is not None: - pairs_pks, pairs_msgs = pkm_pairs( - pre_validation_result.conds, - self.constants.AGG_SIG_ME_ADDITIONAL_DATA, - ) - bls_cache.evict(pairs_pks, pairs_msgs) - # Only propagate blocks which extend the blockchain (becomes one of the heads) - assert state_change_summary is not None - post_process_time = time.monotonic() - ppp_result = await self.peak_post_processing(block, state_change_summary, peer) - post_process_time = time.monotonic() - post_process_time - - elif added == AddBlockResult.ADDED_AS_ORPHAN: - self.log.info( - f"Received orphan block of height {block.height} rh {block.reward_chain_block.get_hash()}" + future = await pre_validate_block( + self.blockchain.constants, + AugmentedBlockchain(self.blockchain), + block, + self.blockchain.pool, + conds, + ValidationState(ssi, diff, prev_ses_block), ) - post_process_time = 0 - else: - # Should never reach here, all the cases are covered - raise RuntimeError(f"Invalid result from add_block {added}") - except asyncio.CancelledError: - # We need to make sure to always call this method even when we get a cancel exception, to make sure - # the node stays in sync - if added == AddBlockResult.NEW_PEAK: - assert state_change_summary is not None - await self.peak_post_processing(block, state_change_summary, peer) - raise - - validation_time = time.monotonic() - validation_start + pre_validation_result = await future + added: Optional[AddBlockResult] = None + pre_validation_time = time.monotonic() - validation_start + try: + if pre_validation_result.error is not None: + if Err(pre_validation_result.error) == Err.INVALID_PREV_BLOCK_HASH: + added = AddBlockResult.DISCONNECTED_BLOCK + error_code: Optional[Err] = Err.INVALID_PREV_BLOCK_HASH + elif Err(pre_validation_result.error) == Err.TIMESTAMP_TOO_FAR_IN_FUTURE: + raise TimestampError() + else: + raise ValueError( + f"Failed to validate block {header_hash} height " + f"{block.height}: {Err(pre_validation_result.error).name}" + ) + else: + if fork_info is None: + fork_info = ForkInfo(block.height - 1, block.height - 1, block.prev_header_hash) + (added, error_code, state_change_summary) = await self.blockchain.add_block( + block, pre_validation_result, ssi, fork_info + ) + if added == AddBlockResult.ALREADY_HAVE_BLOCK: + return None + elif added == AddBlockResult.INVALID_BLOCK: + assert error_code is not None + self.log.error( + f"Block {header_hash} at height {block.height} is invalid with code {error_code}." + ) + raise ConsensusError(error_code, [header_hash]) + elif added == AddBlockResult.DISCONNECTED_BLOCK: + self.log.info(f"Disconnected block {header_hash} at height {block.height}") + if raise_on_disconnected: + raise RuntimeError("Expected block to be added, received disconnected block.") + return None + elif added == AddBlockResult.NEW_PEAK: + # Evict any related BLS cache entries as we no longer need them + if bls_cache is not None and pre_validation_result.conds is not None: + pairs_pks, pairs_msgs = pkm_pairs( + pre_validation_result.conds, + self.constants.AGG_SIG_ME_ADDITIONAL_DATA, + ) + bls_cache.evict(pairs_pks, pairs_msgs) + # Only propagate blocks which extend the blockchain (becomes one of the heads) + assert state_change_summary is not None + post_process_time = time.monotonic() + ppp_result = await self.peak_post_processing(block, state_change_summary, peer) + post_process_time = time.monotonic() - post_process_time + + elif added == AddBlockResult.ADDED_AS_ORPHAN: + self.log.info( + f"Received orphan block of height {block.height} rh {block.reward_chain_block.get_hash()}" + ) + post_process_time = 0 + else: + # Should never reach here, all the cases are covered + raise RuntimeError(f"Invalid result from add_block {added}") + except asyncio.CancelledError: + # We need to make sure to always call this method even when we get a cancel exception, to make sure + # the node stays in sync + if added == AddBlockResult.NEW_PEAK: + assert state_change_summary is not None + await self.peak_post_processing(block, state_change_summary, peer) + raise + finally: + self.log.info(f"END WJB task {asyncio.current_task().get_name()} add_block writer {conn}") + self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} add_block") + + validation_time = time.monotonic() - validation_start if ppp_result is not None: assert state_change_summary is not None @@ -2766,16 +2780,7 @@ async def new_infusion_point_vdf( self.log.warning("Trying to make a pre-farm block but height is not 0") return None try: - self.log.info(f"PM LOCK ATTEMPT WJB task {asyncio.current_task().get_name()} timelord add_block") - async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high): - self.log.info(f"PM LOCK ACQUIRE WJB task {asyncio.current_task().get_name()} timelord add_block") - async with self.block_store.db_wrapper.writer() as conn: - self.log.info( - f"BEGIN WJB task {asyncio.current_task().get_name()} timelord add_block writer {conn}" - ) - await self.add_block(block, None, self._bls_cache, raise_on_disconnected=True) - self.log.info(f"END WJB task {asyncio.current_task().get_name()} timelord add_block writer {conn}") - self.log.info(f"PM LOCK END WJB task {asyncio.current_task().get_name()} timelord add_block") + await self.add_block(block, None, self._bls_cache, raise_on_disconnected=True) except Exception as e: self.log.warning(f"Consensus error validating block: {e}") if timelord_peer is not None: From 28393544728661ea81455ec1203d92f83d36d2db Mon Sep 17 00:00:00 2001 From: William Blanke Date: Mon, 9 Dec 2024 10:14:19 -0800 Subject: [PATCH 13/13] make sure to log --- chia/full_node/full_node.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index f8f5c2126872..793c5557aced 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -2239,6 +2239,8 @@ async def add_block( # After acquiring the lock, check again, because another asyncio thread might have added it if self.blockchain.contains_block(header_hash): + self.log.info(f"END WJB*** task {asyncio.current_task().get_name()} add_block writer {conn}") + self.log.info(f"PM LOCK END WJB*** task {asyncio.current_task().get_name()} add_block") return None validation_start = time.monotonic() # Tries to add the block to the blockchain, if we already validated transactions, don't do it again