diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index 04fc2ed90..76b610d51 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -277,9 +277,9 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: if args.find_deployed_blocks: addresses_set = set() for job in filtered_event_jobs: - addresses_set.update(job.contracts) + addresses_set.update(job.contracts) # type: ignore for function_job in filtered_function_call_jobs: - addresses_set.add(function_job.contract_address) + addresses_set.add(function_job.contract_address) # type: ignore if args.start is None: start_block = web3.eth.blockNumber - 1 @@ -330,8 +330,8 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: db_session, blockchain_type, web3, - filtered_event_jobs, - filtered_function_call_jobs, + filtered_event_jobs, # type: ignore + filtered_function_call_jobs, # type: ignore start_block, end_block, args.max_blocks_batch, diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index 0aac7927d..dccee87dd 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -26,7 +26,13 @@ merge_function_call_crawl_jobs, moonworm_crawler_update_job_as_pickedup, ) -from .db import add_events_to_session, add_function_calls_to_session, commit_session +from .db import ( + add_events_to_session, + add_function_calls_to_session, + commit_session, + write_to_db, + delete_unverified_duplicates, +) from .event_crawler import _crawl_events from .function_call_crawler import _crawl_functions from ..settings import ( @@ -95,7 +101,7 @@ def continuous_crawler( function_call_crawl_jobs: List[FunctionCallCrawlJob], start_block: int, max_blocks_batch: int = 100, - min_blocks_batch: int = 40, + min_blocks_batch: int = 5, confirmations: int = 60, min_sleep_time: float = 0.1, heartbeat_interval: float = 60, @@ -162,7 +168,7 @@ def continuous_crawler( crawler_status=heartbeat_template, ) last_heartbeat_time = datetime.utcnow() - blocks_cache: Dict[int, int] = {} + blocks_cache: Dict[int, Optional[int]] = {} current_sleep_time = min_sleep_time failed_count = 0 try: @@ -171,7 +177,7 @@ def continuous_crawler( time.sleep(current_sleep_time) end_block = min( - web3.eth.blockNumber - confirmations, + web3.eth.blockNumber - min_blocks_batch, start_block + max_blocks_batch, ) @@ -192,7 +198,7 @@ def continuous_crawler( from_block=start_block, to_block=end_block, blocks_cache=blocks_cache, - db_block_query_batch=min_blocks_batch * 2, + db_block_query_batch=max_blocks_batch * 3, ) logger.info( f"Crawled {len(all_events)} events from {start_block} to {end_block}." @@ -220,6 +226,18 @@ def continuous_crawler( current_time = datetime.utcnow() + write_to_db( + web3=web3, db_session=db_session, blockchain_type=blockchain_type + ) + + delete_unverified_duplicates( + db_session=db_session, blockchain_type=blockchain_type + ) + + commit_session(db_session) + + ### fetch confirmed transactions and events + if current_time - jobs_refetchet_time > timedelta( seconds=new_jobs_refetch_interval ): @@ -243,8 +261,6 @@ def continuous_crawler( if current_time - last_heartbeat_time > timedelta( seconds=heartbeat_interval ): - # Commiting to db - commit_session(db_session) # Update heartbeat heartbeat_template["last_block"] = end_block heartbeat_template["current_time"] = _date_to_str(current_time) @@ -304,7 +320,7 @@ def continuous_crawler( heartbeat_template[ "die_reason" ] = f"{e.__class__.__name__}: {e}\n error_summary: {error_summary}\n error_traceback: {error_traceback}" - heartbeat_template["last_block"] = end_block + heartbeat_template["last_block"] = end_block # type: ignore heartbeat( crawler_type=crawler_type, blockchain_type=blockchain_type, diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index fba881b57..7343808d0 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -124,7 +124,7 @@ def _retry_connect_web3( logger.info(f"Retrying in {sleep_time} seconds") time.sleep(sleep_time) raise Exception( - f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}" + f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}" # type: ignore ) @@ -151,7 +151,7 @@ def blockchain_type_to_subscription_type( @dataclass class EventCrawlJob: - event_abi_hash: str + event_abi_selector: str event_abi: Dict[str, Any] contracts: List[ChecksumAddress] address_entries: Dict[ChecksumAddress, Dict[UUID, List[str]]] @@ -226,6 +226,7 @@ def find_all_deployed_blocks( """ all_deployed_blocks = {} + logger.info(f"Finding deployment blocks for {len(addresses_set)} addresses") for address in addresses_set: try: code = web3.eth.getCode(address) @@ -237,6 +238,7 @@ def find_all_deployed_blocks( ) if block is not None: all_deployed_blocks[address] = block + logger.info(f"Found deployment block for {address}: {block}") if block is None: logger.error(f"Failed to get deployment block for {address}") except Exception as e: @@ -256,15 +258,15 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ Create EventCrawlJob objects from bugout entries. """ - crawl_job_by_hash: Dict[str, EventCrawlJob] = {} + crawl_job_by_selector: Dict[str, EventCrawlJob] = {} for entry in entries: - abi_hash = _get_tag(entry, "abi_method_hash") + abi_selector = _get_tag(entry, "abi_selector") contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji - existing_crawl_job = crawl_job_by_hash.get(abi_hash) + existing_crawl_job = crawl_job_by_selector.get(abi_selector) if existing_crawl_job is not None: if contract_address not in existing_crawl_job.contracts: existing_crawl_job.contracts.append(contract_address) @@ -275,15 +277,15 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ else: abi = cast(str, entry.content) new_crawl_job = EventCrawlJob( - event_abi_hash=abi_hash, + event_abi_selector=abi_selector, event_abi=json.loads(abi), contracts=[contract_address], address_entries={contract_address: {entry_id: entry.tags}}, created_at=int(datetime.fromisoformat(entry.created_at).timestamp()), ) - crawl_job_by_hash[abi_hash] = new_crawl_job + crawl_job_by_selector[abi_selector] = new_crawl_job - return [crawl_job for crawl_job in crawl_job_by_hash.values()] + return [crawl_job for crawl_job in crawl_job_by_selector.values()] def make_function_call_crawl_jobs( @@ -300,7 +302,8 @@ def make_function_call_crawl_jobs( entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) abi = json.loads(cast(str, entry.content)) - method_signature = encode_function_signature(abi) + method_signature = _get_tag(entry, "abi_selector") + if method_signature is None: raise ValueError(f"{abi} is not a function ABI") @@ -340,7 +343,7 @@ def merge_event_crawl_jobs( """ for new_crawl_job in new_event_crawl_jobs: for old_crawl_job in old_crawl_jobs: - if new_crawl_job.event_abi_hash == old_crawl_job.event_abi_hash: + if new_crawl_job.event_abi_selector == old_crawl_job.event_abi_selector: old_crawl_job.contracts.extend( [ contract @@ -355,8 +358,8 @@ def merge_event_crawl_jobs( else: old_crawl_job.address_entries[contract_address] = entries break - else: - old_crawl_jobs.append(new_crawl_job) + else: + old_crawl_jobs.append(new_crawl_job) return old_crawl_jobs diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 26fd64ef0..20e940271 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -2,13 +2,23 @@ import json from typing import Dict, List, Optional -from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model +from moonstreamdb.blockchain import ( + AvailableBlockchainType, + get_label_model, + get_block_model, +) from moonstreamdb.models import Base from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore -from sqlalchemy.orm import Session + +# from sqlalchemy.dialects.postgresql.dml import insert +from sqlalchemy.orm import Session, aliased +from sqlalchemy import insert, text, and_, exists, or_, func, update + from ..settings import CRAWLER_LABEL from .event_crawler import Event +from web3 import Web3 + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -16,7 +26,7 @@ def _event_to_label( blockchain_type: AvailableBlockchainType, event: Event, label_name=CRAWLER_LABEL -) -> Base: +) -> Base: # type: ignore """ Creates a label model. """ @@ -47,7 +57,7 @@ def _function_call_to_label( blockchain_type: AvailableBlockchainType, function_call: ContractFunctionCall, label_name=CRAWLER_LABEL, -) -> Base: +) -> Base: # type: ignore """ Creates a label model. """ @@ -154,37 +164,26 @@ def add_events_to_session( ) -> None: label_model = get_label_model(blockchain_type) - events_hashes_to_save = [event.transaction_hash for event in events] + label_name = f"{label_name}-unverified" - existing_labels = ( - db_session.query(label_model.transaction_hash, label_model.log_index) - .filter( - label_model.label == label_name, - label_model.log_index != None, - label_model.transaction_hash.in_(events_hashes_to_save), + events_insert = [] + for raw_event in events: + db_event = _event_to_label(blockchain_type, raw_event, label_name) + events_insert.append( + { + "label": db_event.label, + "label_data": db_event.label_data, + "address": db_event.address, + "block_number": db_event.block_number, + "block_timestamp": db_event.block_timestamp, + "transaction_hash": db_event.transaction_hash, + "log_index": db_event.log_index, + } ) - .all() - ) - - existing_labels_transactions = [] - existing_log_index_by_tx_hash: Dict[str, List[int]] = {} - for label in existing_labels: - if label[0] not in existing_labels_transactions: - existing_labels_transactions.append(label[0]) - existing_log_index_by_tx_hash[label[0]] = [] - existing_log_index_by_tx_hash[label[0]].append(label[1]) - labels_to_save = [] - for event in events: - if event.transaction_hash not in existing_labels_transactions: - labels_to_save.append(_event_to_label(blockchain_type, event, label_name)) - elif ( - event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash] - ): - labels_to_save.append(_event_to_label(blockchain_type, event, label_name)) + insert_statement = insert(label_model).values(events_insert) - logger.info(f"Saving {len(labels_to_save)} event labels to session") - db_session.add_all(labels_to_save) + db_session.execute(insert_statement) def add_function_calls_to_session( @@ -194,27 +193,229 @@ def add_function_calls_to_session( label_name=CRAWLER_LABEL, ) -> None: label_model = get_label_model(blockchain_type) - transactions_hashes_to_save = [ - function_call.transaction_hash for function_call in function_calls - ] - existing_labels = ( - db_session.query(label_model.transaction_hash) + label_name = f"{label_name}-unverified" + + function_calls_insert = [] + + for raw_function_call in function_calls: + db_function_call = _function_call_to_label( + blockchain_type, raw_function_call, label_name + ) + function_calls_insert.append( + { + "label": db_function_call.label, + "label_data": db_function_call.label_data, + "address": db_function_call.address, + "block_number": db_function_call.block_number, + "block_timestamp": db_function_call.block_timestamp, + "transaction_hash": db_function_call.transaction_hash, + } + ) + + logger.info(f"Saving {len(function_calls_insert)} labels to session") + insert_statement = insert(label_model).values(function_calls_insert) + + db_session.execute(insert_statement) + + +def write_to_db( + web3: Web3, + blockchain_type: AvailableBlockchainType, + db_session: Session, + label_name=CRAWLER_LABEL, + conformations: int = 100, +): + """ + Take all unvirified labels and update label to label_name. + Get last block number from block model and update label to label_name with deduplication of labels distinct ON (transaction_hash, log_index) for events and distinct ON (transaction_hash) for function calls. + And that + """ + + label_model = get_label_model(blockchain_type) + + block_model = get_block_model(blockchain_type) + + label_name_unverified = f"{label_name}-unverified" + + label_model_alias = aliased(label_model, name="polygon_labels2") + + # update all labels to label_name + + blockchain_block_number = web3.eth.block_number + + latest_block = ( + db_session.query(block_model.block_number) + .order_by(block_model.block_number.desc()) + .limit(1) + ) + + latest_block_cte = latest_block.cte("latest_block") + + events = ( + db_session.query( + label_model.id, label_model.transaction_hash, label_model.log_index + ) + .distinct(label_model.transaction_hash, label_model.log_index) + .filter(label_model.label == label_name_unverified) + .filter(label_model.log_index != None) .filter( - label_model.label == label_name, - label_model.log_index == None, - label_model.transaction_hash.in_(transactions_hashes_to_save), + or_( + label_model.block_number <= latest_block_cte.c.block_number, + label_model.block_number <= blockchain_block_number - conformations, + ) + ) + .filter( + ~db_session.query(label_model_alias) + .filter( + and_( + label_model_alias.label == label_name, + label_model_alias.transaction_hash == label_model.transaction_hash, + label_model_alias.log_index == label_model.log_index, + ) + ) + .exists() + ) + .order_by( + label_model.transaction_hash, + label_model.log_index, + label_model.block_number.desc(), ) - .all() ) - existing_labels_transactions = [label[0] for label in existing_labels] + events_cte = events.cte("events") - labels_to_save = [ - _function_call_to_label(blockchain_type, function_call) - for function_call in function_calls - if function_call.transaction_hash not in existing_labels_transactions - ] + function_calls = ( + db_session.query( + label_model.id, label_model.transaction_hash, label_model.log_index + ) + .distinct(label_model.transaction_hash) + .filter(label_model.label == label_name_unverified) + .filter(label_model.log_index == None) + .filter( + or_( + label_model.block_number <= latest_block_cte.c.block_number, + label_model.block_number <= blockchain_block_number - conformations, + ) + ) + .filter( + ~db_session.query(label_model_alias) + .filter( + and_( + label_model_alias.label == label_name, + label_model_alias.transaction_hash == label_model.transaction_hash, + label_model_alias.log_index == None, + ) + ) + .exists() + ) + .order_by(label_model.transaction_hash, label_model.block_number.desc()) + ) + + function_calls_cte = function_calls.cte("function_calls") + + union_all_subquery = ( + db_session.query(events_cte) + .union_all(db_session.query(function_calls_cte)) + .subquery() + ) + + logger.info("Updating labels") + # Update query + updated_labels = ( + db_session.query(label_model) + .filter(label_model.id == union_all_subquery.c.events_id) + .update( + {"label": label_name}, synchronize_session=False + ) # returns number of rows updated by query + ) + + logger.info( + f"latest block number database {latest_block.one_or_none()} , blockchain {blockchain_block_number} - conformations {conformations}" + ) + + logger.info(f"Updated {updated_labels} labels") + + +def delete_unverified_duplicates( + db_session: Session, + blockchain_type: AvailableBlockchainType, + label_name=CRAWLER_LABEL, +): + """ + Delete all unverified labels which already have verified labels. + """ + + label_model = get_label_model(blockchain_type) + + label_name_unverified = f"{label_name}-unverified" + + label_model_alias = aliased(label_model, name="polygon_labels2") + + duplicated_events = ( + db_session.query( + label_model.id, label_model.transaction_hash, label_model.log_index + ) + .distinct(label_model.transaction_hash, label_model.log_index) + .filter(label_model.label == label_name_unverified) + .filter(label_model.log_index != None) + .filter( + db_session.query(label_model_alias) + .filter( + and_( + label_model_alias.label == label_name, + label_model_alias.transaction_hash == label_model.transaction_hash, + label_model_alias.log_index == label_model.log_index, + ) + ) + .exists() + ) + .order_by( + label_model.transaction_hash, + label_model.log_index, + label_model.block_number.desc(), + ) + ) + + events_cte = duplicated_events.cte("events") + + duplicated_function_calls = ( + db_session.query( + label_model.id, label_model.transaction_hash, label_model.log_index + ) + .distinct(label_model.transaction_hash) + .filter(label_model.label == label_name_unverified) + .filter(label_model.log_index == None) + .filter( + db_session.query(label_model_alias) + .filter( + and_( + label_model_alias.label == label_name, + label_model_alias.transaction_hash == label_model.transaction_hash, + label_model_alias.log_index == None, + ) + ) + .exists() + ) + .order_by(label_model.transaction_hash, label_model.block_number.desc()) + ) + + function_calls_cte = duplicated_function_calls.cte("function_calls") + + union_all_subquery = ( + db_session.query(events_cte) + .union_all(db_session.query(function_calls_cte)) + .subquery() + ) + + logger.info("Deleting duplicates labels") + + # Delete query + + deleted_labels = ( + db_session.query(label_model) + .filter(label_model.id == union_all_subquery.c.events_id) + .delete(synchronize_session=False) # returns number of rows updated by query + ) - logger.info(f"Saving {len(labels_to_save)} labels to session") - db_session.add_all(labels_to_save) + logger.info(f"Deleted duplicates {deleted_labels} labels") diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py index a2c5f7412..13d2df809 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py @@ -1,14 +1,23 @@ import logging from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple +import time -from moonstreamdb.blockchain import AvailableBlockchainType, get_block_model +from moonstreamdb.blockchain import ( + AvailableBlockchainType, + get_block_model, + get_label_model, +) from moonworm.crawler.log_scanner import _fetch_events_chunk, _crawl_events as moonworm_autoscale_crawl_events # type: ignore from sqlalchemy.orm.session import Session -from sqlalchemy.sql.expression import and_ +from sqlalchemy.sql.expression import and_, func +from sqlalchemy import text from web3 import Web3 +from ..settings import CRAWLER_LABEL from .crawler import EventCrawlJob +from .db import get_block_model, get_label_model + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -43,8 +52,11 @@ def get_block_timestamp( web3: Web3, blockchain_type: AvailableBlockchainType, block_number: int, - blocks_cache: Dict[int, int], + blocks_cache: Dict[int, Optional[int]], + from_block: int, + to_block: int, max_blocks_batch: int = 30, + label_name: str = CRAWLER_LABEL, ) -> int: """ Get the timestamp of a block. @@ -62,38 +74,103 @@ def get_block_timestamp( """ assert max_blocks_batch > 0 - if block_number in blocks_cache: - return blocks_cache[block_number] + if block_number in blocks_cache and blocks_cache[block_number] is not None: + return blocks_cache[block_number] # type: ignore block_model = get_block_model(blockchain_type) + label_model = get_label_model(blockchain_type) + + # from_block and to_block can be in reverse order - blocks = ( - db_session.query(block_model.block_number, block_model.timestamp) - .filter( - and_( - block_model.block_number >= block_number - max_blocks_batch - 1, - block_model.block_number <= block_number + max_blocks_batch + 1, + if from_block > to_block: + from_block, to_block = to_block, from_block + + if block_number not in blocks_cache: + from_block_filter = from_block - max_blocks_batch - 1 + to_block_filter = to_block + max_blocks_batch + 1 + + blocks_range = db_session.query( + func.generate_series(from_block_filter, to_block_filter).label( + "block_number" ) + ).cte("blocks_range") + + blocks_table_cache = ( + db_session.query(block_model.block_number, block_model.timestamp) + .filter( + and_( + block_model.block_number >= from_block_filter, + block_model.block_number <= to_block_filter, + ) + ) + .cte("blocks_table_cache") ) - .order_by(block_model.block_number.asc()) - .all() - ) - target_block_timestamp: Optional[int] = None - if blocks and blocks[0].block_number == block_number: - target_block_timestamp = blocks[0].timestamp + label_table_cache = ( + db_session.query(label_model.block_number, label_model.block_timestamp) + .filter( + and_( + label_model.block_number >= from_block_filter, + label_model.block_number <= to_block_filter, + label_model.label == label_name, + ) + ) + .distinct(label_model.block_number) + .cte("label_table_cache") + ) + + full_blocks_cache = ( + db_session.query( + blocks_range.c.block_number, + func.coalesce( + blocks_table_cache.c.timestamp, label_table_cache.c.block_timestamp + ).label("block_timestamp"), + ) + .outerjoin( + blocks_table_cache, + blocks_range.c.block_number == blocks_table_cache.c.block_number, + ) + .outerjoin( + label_table_cache, + blocks_range.c.block_number == label_table_cache.c.block_number, + ) + .cte("blocks_cache") + ) - if target_block_timestamp is None: - target_block_timestamp = _get_block_timestamp_from_web3(web3, block_number) + blocks = db_session.query( + func.json_object_agg( + full_blocks_cache.c.block_number, full_blocks_cache.c.block_timestamp + ) + ).one()[0] - if len(blocks_cache) > (max_blocks_batch * 3 + 2): - blocks_cache.clear() + ### transform all keys to int to avoid casting after + + if blocks is not None: + blocks = { + int(block_number): timestamp + for block_number, timestamp in blocks.items() + } + + blocks_cache.update(blocks) + + target_block_timestamp: Optional[int] = None + + if blocks_cache[block_number] is None: + target_block_timestamp = _get_block_timestamp_from_web3( + web3, block_number + ) # can be improved by using batch call blocks_cache[block_number] = target_block_timestamp - for block in blocks: - blocks_cache[block.block_number] = block.timestamp - return target_block_timestamp + if len(blocks_cache) > (max_blocks_batch * 3 + 2): + # clear cache lower than from_block + blocks_cache = { + block_number: timestamp + for block_number, timestamp in blocks_cache.items() + if block_number >= from_block + } + + return target_block_timestamp # type: ignore def _crawl_events( @@ -103,7 +180,7 @@ def _crawl_events( jobs: List[EventCrawlJob], from_block: int, to_block: int, - blocks_cache: Dict[int, int] = {}, + blocks_cache: Dict[int, Optional[int]] = {}, db_block_query_batch=10, ) -> List[Event]: all_events = [] @@ -125,6 +202,8 @@ def _crawl_events( blockchain_type, raw_event["blockNumber"], blocks_cache, + from_block, + to_block, db_block_query_batch, ) event = Event( @@ -148,7 +227,7 @@ def _autoscale_crawl_events( jobs: List[EventCrawlJob], from_block: int, to_block: int, - blocks_cache: Dict[int, int] = {}, + blocks_cache: Dict[int, Optional[int]] = {}, batch_size: int = 1000, db_block_query_batch=10, ) -> Tuple[List[Event], int]: @@ -165,6 +244,7 @@ def _autoscale_crawl_events( batch_size, job.contracts[0], ) + for raw_event in raw_events: raw_event["blockTimestamp"] = get_block_timestamp( db_session, @@ -172,6 +252,8 @@ def _autoscale_crawl_events( blockchain_type, raw_event["blockNumber"], blocks_cache, + from_block, + to_block, db_block_query_batch, ) event = Event( diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py index 24d7c33f3..78dd18781 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py @@ -18,7 +18,13 @@ _retry_connect_web3, update_entries_status_and_progress, ) -from .db import add_events_to_session, add_function_calls_to_session, commit_session +from .db import ( + add_events_to_session, + add_function_calls_to_session, + commit_session, + write_to_db, + delete_unverified_duplicates, +) from .event_crawler import _crawl_events, _autoscale_crawl_events from .function_call_crawler import _crawl_functions @@ -76,7 +82,7 @@ def historical_crawler( logger.info(f"Starting historical event crawler start_block={start_block}") - blocks_cache: Dict[int, int] = {} + blocks_cache: Dict[int, Optional[int]] = {} failed_count = 0 original_start_block = start_block @@ -163,6 +169,16 @@ def historical_crawler( progess_map=progess_map, ) + write_to_db( + web3, + blockchain_type, + db_session, + ) + + delete_unverified_duplicates( + db_session=db_session, blockchain_type=blockchain_type + ) + # Commiting to db commit_session(db_session)