From 6b618b89fa1c2f07bf96083ba67c4f1edf22e743 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 20 Sep 2023 13:19:38 +0300 Subject: [PATCH 1/8] Crawlers improvements. Added initial fix with event deduplication by selector instead of event hash. Functions call get selector from entry tags. --- .../mooncrawl/moonworm_crawler/crawler.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index fba881b57..1beec0061 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -151,7 +151,7 @@ def blockchain_type_to_subscription_type( @dataclass class EventCrawlJob: - event_abi_hash: str + event_abi_selector: str event_abi: Dict[str, Any] contracts: List[ChecksumAddress] address_entries: Dict[ChecksumAddress, Dict[UUID, List[str]]] @@ -256,15 +256,15 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ Create EventCrawlJob objects from bugout entries. """ - crawl_job_by_hash: Dict[str, EventCrawlJob] = {} + crawl_job_by_selector: Dict[str, EventCrawlJob] = {} for entry in entries: - abi_hash = _get_tag(entry, "abi_method_hash") + abi_selector = _get_tag(entry, "abi_selector") contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji - existing_crawl_job = crawl_job_by_hash.get(abi_hash) + existing_crawl_job = crawl_job_by_selector.get(abi_selector) if existing_crawl_job is not None: if contract_address not in existing_crawl_job.contracts: existing_crawl_job.contracts.append(contract_address) @@ -275,15 +275,15 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ else: abi = cast(str, entry.content) new_crawl_job = EventCrawlJob( - event_abi_hash=abi_hash, + event_abi_selector=abi_selector, event_abi=json.loads(abi), contracts=[contract_address], address_entries={contract_address: {entry_id: entry.tags}}, created_at=int(datetime.fromisoformat(entry.created_at).timestamp()), ) - crawl_job_by_hash[abi_hash] = new_crawl_job + crawl_job_by_selector[abi_selector] = new_crawl_job - return [crawl_job for crawl_job in crawl_job_by_hash.values()] + return [crawl_job for crawl_job in crawl_job_by_selector.values()] def make_function_call_crawl_jobs( @@ -300,7 +300,9 @@ def make_function_call_crawl_jobs( entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) abi = json.loads(cast(str, entry.content)) - method_signature = encode_function_signature(abi) + # method_signature = encode_function_signature(abi) + method_signature = _get_tag(entry, "abi_selector") + if method_signature is None: raise ValueError(f"{abi} is not a function ABI") From 9521ba807589575e82a6e965287452befb7cb5af Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 20 Sep 2023 19:38:28 +0300 Subject: [PATCH 2/8] Temp state. Refactor blocks cache. --- .../moonworm_crawler/continuous_crawler.py | 2 +- .../mooncrawl/moonworm_crawler/db.py | 21 +++++--- .../moonworm_crawler/event_crawler.py | 49 +++++++++++++------ 3 files changed, 49 insertions(+), 23 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index 0aac7927d..ee58258c9 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -192,7 +192,7 @@ def continuous_crawler( from_block=start_block, to_block=end_block, blocks_cache=blocks_cache, - db_block_query_batch=min_blocks_batch * 2, + db_block_query_batch=min_blocks_batch * 3, ) logger.info( f"Crawled {len(all_events)} events from {start_block} to {end_block}." diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 26fd64ef0..1d58fadd2 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -156,16 +156,21 @@ def add_events_to_session( events_hashes_to_save = [event.transaction_hash for event in events] - existing_labels = ( - db_session.query(label_model.transaction_hash, label_model.log_index) - .filter( - label_model.label == label_name, - label_model.log_index != None, - label_model.transaction_hash.in_(events_hashes_to_save), - ) - .all() + # it's a lot of dublicated hashes, but it's the only way to get log_index + + existing_labels = db_session.query( + label_model.transaction_hash, label_model.log_index + ).filter( + label_model.label == label_name, + label_model.log_index != None, + label_model.transaction_hash.in_(events_hashes_to_save), ) + print(existing_labels) + breakpoint() + + existing_labels = existing_labels.all() + existing_labels_transactions = [] existing_log_index_by_tx_hash: Dict[str, List[int]] = {} for label in existing_labels: diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py index a2c5f7412..efc9190c7 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py @@ -5,7 +5,7 @@ from moonstreamdb.blockchain import AvailableBlockchainType, get_block_model from moonworm.crawler.log_scanner import _fetch_events_chunk, _crawl_events as moonworm_autoscale_crawl_events # type: ignore from sqlalchemy.orm.session import Session -from sqlalchemy.sql.expression import and_ +from sqlalchemy.sql.expression import and_, func from web3 import Web3 from .crawler import EventCrawlJob @@ -44,6 +44,8 @@ def get_block_timestamp( blockchain_type: AvailableBlockchainType, block_number: int, blocks_cache: Dict[int, int], + from_block: int, + to_block: int, max_blocks_batch: int = 30, ) -> int: """ @@ -67,31 +69,48 @@ def get_block_timestamp( block_model = get_block_model(blockchain_type) + # from_block and to_block can be in reverse order + + if from_block > to_block: + from_block, to_block = to_block, from_block + + from_block_filter = from_block - max_blocks_batch - 1 + to_block_filter = to_block + max_blocks_batch + 1 + blocks = ( - db_session.query(block_model.block_number, block_model.timestamp) + db_session.query( + func.json_object_agg(block_model.block_number, block_model.timestamp) + ) .filter( and_( - block_model.block_number >= block_number - max_blocks_batch - 1, - block_model.block_number <= block_number + max_blocks_batch + 1, + block_model.block_number >= from_block_filter, + block_model.block_number <= to_block_filter, ) ) - .order_by(block_model.block_number.asc()) - .all() + .scalar() ) + ### transform all keys to int to avoid casting after + + if blocks is not None: + blocks = { + int(block_number): timestamp for block_number, timestamp in blocks.items() + } + target_block_timestamp: Optional[int] = None - if blocks and blocks[0].block_number == block_number: - target_block_timestamp = blocks[0].timestamp + if blocks: + target_block_timestamp = blocks.get(str(block_number)) if target_block_timestamp is None: - target_block_timestamp = _get_block_timestamp_from_web3(web3, block_number) - - if len(blocks_cache) > (max_blocks_batch * 3 + 2): - blocks_cache.clear() + target_block_timestamp = _get_block_timestamp_from_web3( + web3, block_number + ) # can be improved by using batch call blocks_cache[block_number] = target_block_timestamp - for block in blocks: - blocks_cache[block.block_number] = block.timestamp + + if len(blocks_cache) + len(blocks) > (max_blocks_batch * 3 + 2): + # clear cache lower than from_block + blocks_cache = blocks return target_block_timestamp @@ -126,6 +145,8 @@ def _crawl_events( raw_event["blockNumber"], blocks_cache, db_block_query_batch, + from_block, + to_block, ) event = Event( event_name=raw_event["event"], From 4a578df82c9d67f290358135c277d9a092d78a61 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 20 Sep 2023 19:45:17 +0300 Subject: [PATCH 3/8] Add mypy fixes. --- crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py | 6 +++--- .../mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 1beec0061..3edacc935 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -342,7 +342,7 @@ def merge_event_crawl_jobs( """ for new_crawl_job in new_event_crawl_jobs: for old_crawl_job in old_crawl_jobs: - if new_crawl_job.event_abi_hash == old_crawl_job.event_abi_hash: + if new_crawl_job.event_abi_selector == old_crawl_job.event_abi_selector: old_crawl_job.contracts.extend( [ contract @@ -357,8 +357,8 @@ def merge_event_crawl_jobs( else: old_crawl_job.address_entries[contract_address] = entries break - else: - old_crawl_jobs.append(new_crawl_job) + else: + old_crawl_jobs.append(new_crawl_job) return old_crawl_jobs diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py index efc9190c7..775dbf08d 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py @@ -193,6 +193,8 @@ def _autoscale_crawl_events( blockchain_type, raw_event["blockNumber"], blocks_cache, + from_block, + to_block, db_block_query_batch, ) event = Event( From a1bcd0ccc8ef11d7d5c12e5d2322f37459084701 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 21 Sep 2023 14:18:10 +0300 Subject: [PATCH 4/8] Deduplicate tx_hash and use any instead of in. --- crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 1d58fadd2..74199b838 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -154,7 +154,7 @@ def add_events_to_session( ) -> None: label_model = get_label_model(blockchain_type) - events_hashes_to_save = [event.transaction_hash for event in events] + events_hashes_to_save = set([event.transaction_hash for event in events]) # it's a lot of dublicated hashes, but it's the only way to get log_index @@ -163,7 +163,7 @@ def add_events_to_session( ).filter( label_model.label == label_name, label_model.log_index != None, - label_model.transaction_hash.in_(events_hashes_to_save), + label_model.transaction_hash.any_(events_hashes_to_save), ) print(existing_labels) From 3a19d014a066e5e23b68aa75b4954f19e1faf377 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 21 Sep 2023 22:56:22 +0300 Subject: [PATCH 5/8] Add changes. --- crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 74199b838..0149cca08 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -5,8 +5,10 @@ from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model from moonstreamdb.models import Base from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore +from sqlalchemy.dialects.postgresql import array from sqlalchemy.orm import Session + from ..settings import CRAWLER_LABEL from .event_crawler import Event @@ -156,14 +158,12 @@ def add_events_to_session( events_hashes_to_save = set([event.transaction_hash for event in events]) - # it's a lot of dublicated hashes, but it's the only way to get log_index - existing_labels = db_session.query( label_model.transaction_hash, label_model.log_index ).filter( label_model.label == label_name, label_model.log_index != None, - label_model.transaction_hash.any_(events_hashes_to_save), + label_model.transaction_hash.op("ANY")(array(events_hashes_to_save)), ) print(existing_labels) From c3dd0bac9b5704fd98b1cc51f074d05ec19d6c40 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 4 Oct 2023 15:04:22 +0300 Subject: [PATCH 6/8] direct write. --- .../mooncrawl/moonworm_crawler/db.py | 94 ++++++++----------- 1 file changed, 41 insertions(+), 53 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 0149cca08..2fa8d5ac7 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -5,8 +5,10 @@ from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model from moonstreamdb.models import Base from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore -from sqlalchemy.dialects.postgresql import array + +# from sqlalchemy.dialects.postgresql.dml import insert from sqlalchemy.orm import Session +from sqlalchemy import insert from ..settings import CRAWLER_LABEL @@ -156,40 +158,26 @@ def add_events_to_session( ) -> None: label_model = get_label_model(blockchain_type) - events_hashes_to_save = set([event.transaction_hash for event in events]) - - existing_labels = db_session.query( - label_model.transaction_hash, label_model.log_index - ).filter( - label_model.label == label_name, - label_model.log_index != None, - label_model.transaction_hash.op("ANY")(array(events_hashes_to_save)), - ) - - print(existing_labels) - breakpoint() - - existing_labels = existing_labels.all() + label_name = f"{label_name}-unverified" - existing_labels_transactions = [] - existing_log_index_by_tx_hash: Dict[str, List[int]] = {} - for label in existing_labels: - if label[0] not in existing_labels_transactions: - existing_labels_transactions.append(label[0]) - existing_log_index_by_tx_hash[label[0]] = [] - existing_log_index_by_tx_hash[label[0]].append(label[1]) + events_insert = [] + for raw_event in events: + db_event = _event_to_label(blockchain_type, raw_event, label_name) + events_insert.append( + { + "label": db_event.label, + "label_data": db_event.label_data, + "address": db_event.address, + "block_number": db_event.block_number, + "block_timestamp": db_event.block_timestamp, + "transaction_hash": db_event.transaction_hash, + "log_index": db_event.log_index, + } + ) - labels_to_save = [] - for event in events: - if event.transaction_hash not in existing_labels_transactions: - labels_to_save.append(_event_to_label(blockchain_type, event, label_name)) - elif ( - event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash] - ): - labels_to_save.append(_event_to_label(blockchain_type, event, label_name)) + insert_statement = insert(label_model).values(events_insert) - logger.info(f"Saving {len(labels_to_save)} event labels to session") - db_session.add_all(labels_to_save) + db_session.execute(insert_statement) def add_function_calls_to_session( @@ -199,27 +187,27 @@ def add_function_calls_to_session( label_name=CRAWLER_LABEL, ) -> None: label_model = get_label_model(blockchain_type) - transactions_hashes_to_save = [ - function_call.transaction_hash for function_call in function_calls - ] - - existing_labels = ( - db_session.query(label_model.transaction_hash) - .filter( - label_model.label == label_name, - label_model.log_index == None, - label_model.transaction_hash.in_(transactions_hashes_to_save), - ) - .all() - ) - existing_labels_transactions = [label[0] for label in existing_labels] + label_name = f"{label_name}-unverified" + + function_calls_insert = [] + + for raw_function_call in function_calls: + db_function_call = _function_call_to_label( + blockchain_type, raw_function_call, label_name + ) + function_calls_insert.append( + { + "label": db_function_call.label, + "label_data": db_function_call.label_data, + "address": db_function_call.address, + "block_number": db_function_call.block_number, + "block_timestamp": db_function_call.block_timestamp, + "transaction_hash": db_function_call.transaction_hash, + } + ) - labels_to_save = [ - _function_call_to_label(blockchain_type, function_call) - for function_call in function_calls - if function_call.transaction_hash not in existing_labels_transactions - ] + logger.info(f"Saving {len(function_calls_insert)} labels to session") + insert_statement = insert(label_model).values(function_calls_insert) - logger.info(f"Saving {len(labels_to_save)} labels to session") - db_session.add_all(labels_to_save) + db_session.execute(insert_statement) From ac2a1c11e3dcbba2c5ca9438e05990ae73726d3d Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 11 Oct 2023 16:14:47 +0300 Subject: [PATCH 7/8] Rewrite insert logic. 1. Add new label_name_unverified which go to db without confirmatios. 2. Add insert as 1 operation in sql with deduplication. Blocks cache will include labels_table. As well state of database will go to blocks cahce once per from/to blocks range. Add delete from database label with label_name_unverified wich already exists ind database. --- .../moonworm_crawler/continuous_crawler.py | 64 +++-- .../mooncrawl/moonworm_crawler/crawler.py | 11 +- .../mooncrawl/moonworm_crawler/db.py | 218 +++++++++++++++++- .../moonworm_crawler/event_crawler.py | 117 +++++++--- .../moonworm_crawler/historical_crawler.py | 20 +- 5 files changed, 366 insertions(+), 64 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index ee58258c9..be040b459 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -26,7 +26,13 @@ merge_function_call_crawl_jobs, moonworm_crawler_update_job_as_pickedup, ) -from .db import add_events_to_session, add_function_calls_to_session, commit_session +from .db import ( + add_events_to_session, + add_function_calls_to_session, + commit_session, + write_to_db, + delete_unverified_duplicates, +) from .event_crawler import _crawl_events from .function_call_crawler import _crawl_functions from ..settings import ( @@ -95,7 +101,7 @@ def continuous_crawler( function_call_crawl_jobs: List[FunctionCallCrawlJob], start_block: int, max_blocks_batch: int = 100, - min_blocks_batch: int = 40, + min_blocks_batch: int = 5, confirmations: int = 60, min_sleep_time: float = 0.1, heartbeat_interval: float = 60, @@ -156,13 +162,13 @@ def continuous_crawler( logger.info(f"Starting continuous event crawler start_block={start_block}") logger.info("Sending initial heartbeat") - heartbeat( - crawler_type=crawler_type, - blockchain_type=blockchain_type, - crawler_status=heartbeat_template, - ) + # heartbeat( + # crawler_type=crawler_type, + # blockchain_type=blockchain_type, + # crawler_status=heartbeat_template, + # ) last_heartbeat_time = datetime.utcnow() - blocks_cache: Dict[int, int] = {} + blocks_cache: Dict[int, Optional[int]] = {} current_sleep_time = min_sleep_time failed_count = 0 try: @@ -171,7 +177,7 @@ def continuous_crawler( time.sleep(current_sleep_time) end_block = min( - web3.eth.blockNumber - confirmations, + web3.eth.blockNumber - min_blocks_batch, start_block + max_blocks_batch, ) @@ -192,7 +198,7 @@ def continuous_crawler( from_block=start_block, to_block=end_block, blocks_cache=blocks_cache, - db_block_query_batch=min_blocks_batch * 3, + db_block_query_batch=max_blocks_batch * 3, ) logger.info( f"Crawled {len(all_events)} events from {start_block} to {end_block}." @@ -220,6 +226,18 @@ def continuous_crawler( current_time = datetime.utcnow() + write_to_db( + web3=web3, db_session=db_session, blockchain_type=blockchain_type + ) + + delete_unverified_duplicates( + db_session=db_session, blockchain_type=blockchain_type + ) + + commit_session(db_session) + + ### fetch confirmed transactions and events + if current_time - jobs_refetchet_time > timedelta( seconds=new_jobs_refetch_interval ): @@ -243,8 +261,6 @@ def continuous_crawler( if current_time - last_heartbeat_time > timedelta( seconds=heartbeat_interval ): - # Commiting to db - commit_session(db_session) # Update heartbeat heartbeat_template["last_block"] = end_block heartbeat_template["current_time"] = _date_to_str(current_time) @@ -260,11 +276,11 @@ def continuous_crawler( heartbeat_template[ "function_call metrics" ] = ethereum_state_provider.metrics - heartbeat( - crawler_type=crawler_type, - blockchain_type=blockchain_type, - crawler_status=heartbeat_template, - ) + # heartbeat( + # crawler_type=crawler_type, + # blockchain_type=blockchain_type, + # crawler_status=heartbeat_template, + # ) logger.info("Sending heartbeat.", heartbeat_template) last_heartbeat_time = datetime.utcnow() @@ -304,13 +320,13 @@ def continuous_crawler( heartbeat_template[ "die_reason" ] = f"{e.__class__.__name__}: {e}\n error_summary: {error_summary}\n error_traceback: {error_traceback}" - heartbeat_template["last_block"] = end_block - heartbeat( - crawler_type=crawler_type, - blockchain_type=blockchain_type, - crawler_status=heartbeat_template, - is_dead=True, - ) + heartbeat_template["last_block"] = end_block # type: ignore + # heartbeat( + # crawler_type=crawler_type, + # blockchain_type=blockchain_type, + # crawler_status=heartbeat_template, + # is_dead=True, + # ) logger.exception(e) raise e diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 3edacc935..50ba1b761 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -124,7 +124,7 @@ def _retry_connect_web3( logger.info(f"Retrying in {sleep_time} seconds") time.sleep(sleep_time) raise Exception( - f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}" + f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}" # type: ignore ) @@ -226,6 +226,7 @@ def find_all_deployed_blocks( """ all_deployed_blocks = {} + logger.info(f"Finding deployment blocks for {len(addresses_set)} addresses") for address in addresses_set: try: code = web3.eth.getCode(address) @@ -237,6 +238,7 @@ def find_all_deployed_blocks( ) if block is not None: all_deployed_blocks[address] = block + logger.info(f"Found deployment block for {address}: {block}") if block is None: logger.error(f"Failed to get deployment block for {address}") except Exception as e: @@ -259,7 +261,8 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ crawl_job_by_selector: Dict[str, EventCrawlJob] = {} for entry in entries: - abi_selector = _get_tag(entry, "abi_selector") + abi_selector = _get_tag(entry, "abi_method_hash") + # abi_selector = _get_tag(entry, "abi_selector") contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji @@ -300,8 +303,8 @@ def make_function_call_crawl_jobs( entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) abi = json.loads(cast(str, entry.content)) - # method_signature = encode_function_signature(abi) - method_signature = _get_tag(entry, "abi_selector") + method_signature = encode_function_signature(abi) + # method_signature = _get_tag(entry, "abi_selector") if method_signature is None: raise ValueError(f"{abi} is not a function ABI") diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 2fa8d5ac7..20e940271 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -2,17 +2,23 @@ import json from typing import Dict, List, Optional -from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model +from moonstreamdb.blockchain import ( + AvailableBlockchainType, + get_label_model, + get_block_model, +) from moonstreamdb.models import Base from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore # from sqlalchemy.dialects.postgresql.dml import insert -from sqlalchemy.orm import Session -from sqlalchemy import insert +from sqlalchemy.orm import Session, aliased +from sqlalchemy import insert, text, and_, exists, or_, func, update from ..settings import CRAWLER_LABEL from .event_crawler import Event +from web3 import Web3 + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -20,7 +26,7 @@ def _event_to_label( blockchain_type: AvailableBlockchainType, event: Event, label_name=CRAWLER_LABEL -) -> Base: +) -> Base: # type: ignore """ Creates a label model. """ @@ -51,7 +57,7 @@ def _function_call_to_label( blockchain_type: AvailableBlockchainType, function_call: ContractFunctionCall, label_name=CRAWLER_LABEL, -) -> Base: +) -> Base: # type: ignore """ Creates a label model. """ @@ -211,3 +217,205 @@ def add_function_calls_to_session( insert_statement = insert(label_model).values(function_calls_insert) db_session.execute(insert_statement) + + +def write_to_db( + web3: Web3, + blockchain_type: AvailableBlockchainType, + db_session: Session, + label_name=CRAWLER_LABEL, + conformations: int = 100, +): + """ + Take all unvirified labels and update label to label_name. + Get last block number from block model and update label to label_name with deduplication of labels distinct ON (transaction_hash, log_index) for events and distinct ON (transaction_hash) for function calls. + And that + """ + + label_model = get_label_model(blockchain_type) + + block_model = get_block_model(blockchain_type) + + label_name_unverified = f"{label_name}-unverified" + + label_model_alias = aliased(label_model, name="polygon_labels2") + + # update all labels to label_name + + blockchain_block_number = web3.eth.block_number + + latest_block = ( + db_session.query(block_model.block_number) + .order_by(block_model.block_number.desc()) + .limit(1) + ) + + latest_block_cte = latest_block.cte("latest_block") + + events = ( + db_session.query( + label_model.id, label_model.transaction_hash, label_model.log_index + ) + .distinct(label_model.transaction_hash, label_model.log_index) + .filter(label_model.label == label_name_unverified) + .filter(label_model.log_index != None) + .filter( + or_( + label_model.block_number <= latest_block_cte.c.block_number, + label_model.block_number <= blockchain_block_number - conformations, + ) + ) + .filter( + ~db_session.query(label_model_alias) + .filter( + and_( + label_model_alias.label == label_name, + label_model_alias.transaction_hash == label_model.transaction_hash, + label_model_alias.log_index == label_model.log_index, + ) + ) + .exists() + ) + .order_by( + label_model.transaction_hash, + label_model.log_index, + label_model.block_number.desc(), + ) + ) + + events_cte = events.cte("events") + + function_calls = ( + db_session.query( + label_model.id, label_model.transaction_hash, label_model.log_index + ) + .distinct(label_model.transaction_hash) + .filter(label_model.label == label_name_unverified) + .filter(label_model.log_index == None) + .filter( + or_( + label_model.block_number <= latest_block_cte.c.block_number, + label_model.block_number <= blockchain_block_number - conformations, + ) + ) + .filter( + ~db_session.query(label_model_alias) + .filter( + and_( + label_model_alias.label == label_name, + label_model_alias.transaction_hash == label_model.transaction_hash, + label_model_alias.log_index == None, + ) + ) + .exists() + ) + .order_by(label_model.transaction_hash, label_model.block_number.desc()) + ) + + function_calls_cte = function_calls.cte("function_calls") + + union_all_subquery = ( + db_session.query(events_cte) + .union_all(db_session.query(function_calls_cte)) + .subquery() + ) + + logger.info("Updating labels") + # Update query + updated_labels = ( + db_session.query(label_model) + .filter(label_model.id == union_all_subquery.c.events_id) + .update( + {"label": label_name}, synchronize_session=False + ) # returns number of rows updated by query + ) + + logger.info( + f"latest block number database {latest_block.one_or_none()} , blockchain {blockchain_block_number} - conformations {conformations}" + ) + + logger.info(f"Updated {updated_labels} labels") + + +def delete_unverified_duplicates( + db_session: Session, + blockchain_type: AvailableBlockchainType, + label_name=CRAWLER_LABEL, +): + """ + Delete all unverified labels which already have verified labels. + """ + + label_model = get_label_model(blockchain_type) + + label_name_unverified = f"{label_name}-unverified" + + label_model_alias = aliased(label_model, name="polygon_labels2") + + duplicated_events = ( + db_session.query( + label_model.id, label_model.transaction_hash, label_model.log_index + ) + .distinct(label_model.transaction_hash, label_model.log_index) + .filter(label_model.label == label_name_unverified) + .filter(label_model.log_index != None) + .filter( + db_session.query(label_model_alias) + .filter( + and_( + label_model_alias.label == label_name, + label_model_alias.transaction_hash == label_model.transaction_hash, + label_model_alias.log_index == label_model.log_index, + ) + ) + .exists() + ) + .order_by( + label_model.transaction_hash, + label_model.log_index, + label_model.block_number.desc(), + ) + ) + + events_cte = duplicated_events.cte("events") + + duplicated_function_calls = ( + db_session.query( + label_model.id, label_model.transaction_hash, label_model.log_index + ) + .distinct(label_model.transaction_hash) + .filter(label_model.label == label_name_unverified) + .filter(label_model.log_index == None) + .filter( + db_session.query(label_model_alias) + .filter( + and_( + label_model_alias.label == label_name, + label_model_alias.transaction_hash == label_model.transaction_hash, + label_model_alias.log_index == None, + ) + ) + .exists() + ) + .order_by(label_model.transaction_hash, label_model.block_number.desc()) + ) + + function_calls_cte = duplicated_function_calls.cte("function_calls") + + union_all_subquery = ( + db_session.query(events_cte) + .union_all(db_session.query(function_calls_cte)) + .subquery() + ) + + logger.info("Deleting duplicates labels") + + # Delete query + + deleted_labels = ( + db_session.query(label_model) + .filter(label_model.id == union_all_subquery.c.events_id) + .delete(synchronize_session=False) # returns number of rows updated by query + ) + + logger.info(f"Deleted duplicates {deleted_labels} labels") diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py index 775dbf08d..13d2df809 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/event_crawler.py @@ -1,14 +1,23 @@ import logging from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple +import time -from moonstreamdb.blockchain import AvailableBlockchainType, get_block_model +from moonstreamdb.blockchain import ( + AvailableBlockchainType, + get_block_model, + get_label_model, +) from moonworm.crawler.log_scanner import _fetch_events_chunk, _crawl_events as moonworm_autoscale_crawl_events # type: ignore from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import and_, func +from sqlalchemy import text from web3 import Web3 +from ..settings import CRAWLER_LABEL from .crawler import EventCrawlJob +from .db import get_block_model, get_label_model + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -43,10 +52,11 @@ def get_block_timestamp( web3: Web3, blockchain_type: AvailableBlockchainType, block_number: int, - blocks_cache: Dict[int, int], + blocks_cache: Dict[int, Optional[int]], from_block: int, to_block: int, max_blocks_batch: int = 30, + label_name: str = CRAWLER_LABEL, ) -> int: """ Get the timestamp of a block. @@ -64,55 +74,103 @@ def get_block_timestamp( """ assert max_blocks_batch > 0 - if block_number in blocks_cache: - return blocks_cache[block_number] + if block_number in blocks_cache and blocks_cache[block_number] is not None: + return blocks_cache[block_number] # type: ignore block_model = get_block_model(blockchain_type) + label_model = get_label_model(blockchain_type) # from_block and to_block can be in reverse order if from_block > to_block: from_block, to_block = to_block, from_block - from_block_filter = from_block - max_blocks_batch - 1 - to_block_filter = to_block + max_blocks_batch + 1 + if block_number not in blocks_cache: + from_block_filter = from_block - max_blocks_batch - 1 + to_block_filter = to_block + max_blocks_batch + 1 + + blocks_range = db_session.query( + func.generate_series(from_block_filter, to_block_filter).label( + "block_number" + ) + ).cte("blocks_range") - blocks = ( - db_session.query( - func.json_object_agg(block_model.block_number, block_model.timestamp) + blocks_table_cache = ( + db_session.query(block_model.block_number, block_model.timestamp) + .filter( + and_( + block_model.block_number >= from_block_filter, + block_model.block_number <= to_block_filter, + ) + ) + .cte("blocks_table_cache") ) - .filter( - and_( - block_model.block_number >= from_block_filter, - block_model.block_number <= to_block_filter, + + label_table_cache = ( + db_session.query(label_model.block_number, label_model.block_timestamp) + .filter( + and_( + label_model.block_number >= from_block_filter, + label_model.block_number <= to_block_filter, + label_model.label == label_name, + ) + ) + .distinct(label_model.block_number) + .cte("label_table_cache") + ) + + full_blocks_cache = ( + db_session.query( + blocks_range.c.block_number, + func.coalesce( + blocks_table_cache.c.timestamp, label_table_cache.c.block_timestamp + ).label("block_timestamp"), + ) + .outerjoin( + blocks_table_cache, + blocks_range.c.block_number == blocks_table_cache.c.block_number, ) + .outerjoin( + label_table_cache, + blocks_range.c.block_number == label_table_cache.c.block_number, + ) + .cte("blocks_cache") ) - .scalar() - ) - ### transform all keys to int to avoid casting after + blocks = db_session.query( + func.json_object_agg( + full_blocks_cache.c.block_number, full_blocks_cache.c.block_timestamp + ) + ).one()[0] - if blocks is not None: - blocks = { - int(block_number): timestamp for block_number, timestamp in blocks.items() - } + ### transform all keys to int to avoid casting after + + if blocks is not None: + blocks = { + int(block_number): timestamp + for block_number, timestamp in blocks.items() + } + + blocks_cache.update(blocks) target_block_timestamp: Optional[int] = None - if blocks: - target_block_timestamp = blocks.get(str(block_number)) - if target_block_timestamp is None: + if blocks_cache[block_number] is None: target_block_timestamp = _get_block_timestamp_from_web3( web3, block_number ) # can be improved by using batch call blocks_cache[block_number] = target_block_timestamp - if len(blocks_cache) + len(blocks) > (max_blocks_batch * 3 + 2): + if len(blocks_cache) > (max_blocks_batch * 3 + 2): # clear cache lower than from_block - blocks_cache = blocks + blocks_cache = { + block_number: timestamp + for block_number, timestamp in blocks_cache.items() + if block_number >= from_block + } - return target_block_timestamp + return target_block_timestamp # type: ignore def _crawl_events( @@ -122,7 +180,7 @@ def _crawl_events( jobs: List[EventCrawlJob], from_block: int, to_block: int, - blocks_cache: Dict[int, int] = {}, + blocks_cache: Dict[int, Optional[int]] = {}, db_block_query_batch=10, ) -> List[Event]: all_events = [] @@ -144,9 +202,9 @@ def _crawl_events( blockchain_type, raw_event["blockNumber"], blocks_cache, - db_block_query_batch, from_block, to_block, + db_block_query_batch, ) event = Event( event_name=raw_event["event"], @@ -169,7 +227,7 @@ def _autoscale_crawl_events( jobs: List[EventCrawlJob], from_block: int, to_block: int, - blocks_cache: Dict[int, int] = {}, + blocks_cache: Dict[int, Optional[int]] = {}, batch_size: int = 1000, db_block_query_batch=10, ) -> Tuple[List[Event], int]: @@ -186,6 +244,7 @@ def _autoscale_crawl_events( batch_size, job.contracts[0], ) + for raw_event in raw_events: raw_event["blockTimestamp"] = get_block_timestamp( db_session, diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py index 24d7c33f3..78dd18781 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py @@ -18,7 +18,13 @@ _retry_connect_web3, update_entries_status_and_progress, ) -from .db import add_events_to_session, add_function_calls_to_session, commit_session +from .db import ( + add_events_to_session, + add_function_calls_to_session, + commit_session, + write_to_db, + delete_unverified_duplicates, +) from .event_crawler import _crawl_events, _autoscale_crawl_events from .function_call_crawler import _crawl_functions @@ -76,7 +82,7 @@ def historical_crawler( logger.info(f"Starting historical event crawler start_block={start_block}") - blocks_cache: Dict[int, int] = {} + blocks_cache: Dict[int, Optional[int]] = {} failed_count = 0 original_start_block = start_block @@ -163,6 +169,16 @@ def historical_crawler( progess_map=progess_map, ) + write_to_db( + web3, + blockchain_type, + db_session, + ) + + delete_unverified_duplicates( + db_session=db_session, blockchain_type=blockchain_type + ) + # Commiting to db commit_session(db_session) From 49ad69e869426b7fbf030c425476ee7b3411090a Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 12 Oct 2023 14:11:19 +0300 Subject: [PATCH 8/8] Uncomment heartbeat --- .../mooncrawl/moonworm_crawler/cli.py | 8 ++--- .../moonworm_crawler/continuous_crawler.py | 32 +++++++++---------- .../mooncrawl/moonworm_crawler/crawler.py | 6 ++-- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index 04fc2ed90..76b610d51 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -277,9 +277,9 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: if args.find_deployed_blocks: addresses_set = set() for job in filtered_event_jobs: - addresses_set.update(job.contracts) + addresses_set.update(job.contracts) # type: ignore for function_job in filtered_function_call_jobs: - addresses_set.add(function_job.contract_address) + addresses_set.add(function_job.contract_address) # type: ignore if args.start is None: start_block = web3.eth.blockNumber - 1 @@ -330,8 +330,8 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: db_session, blockchain_type, web3, - filtered_event_jobs, - filtered_function_call_jobs, + filtered_event_jobs, # type: ignore + filtered_function_call_jobs, # type: ignore start_block, end_block, args.max_blocks_batch, diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index be040b459..dccee87dd 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -162,11 +162,11 @@ def continuous_crawler( logger.info(f"Starting continuous event crawler start_block={start_block}") logger.info("Sending initial heartbeat") - # heartbeat( - # crawler_type=crawler_type, - # blockchain_type=blockchain_type, - # crawler_status=heartbeat_template, - # ) + heartbeat( + crawler_type=crawler_type, + blockchain_type=blockchain_type, + crawler_status=heartbeat_template, + ) last_heartbeat_time = datetime.utcnow() blocks_cache: Dict[int, Optional[int]] = {} current_sleep_time = min_sleep_time @@ -276,11 +276,11 @@ def continuous_crawler( heartbeat_template[ "function_call metrics" ] = ethereum_state_provider.metrics - # heartbeat( - # crawler_type=crawler_type, - # blockchain_type=blockchain_type, - # crawler_status=heartbeat_template, - # ) + heartbeat( + crawler_type=crawler_type, + blockchain_type=blockchain_type, + crawler_status=heartbeat_template, + ) logger.info("Sending heartbeat.", heartbeat_template) last_heartbeat_time = datetime.utcnow() @@ -321,12 +321,12 @@ def continuous_crawler( "die_reason" ] = f"{e.__class__.__name__}: {e}\n error_summary: {error_summary}\n error_traceback: {error_traceback}" heartbeat_template["last_block"] = end_block # type: ignore - # heartbeat( - # crawler_type=crawler_type, - # blockchain_type=blockchain_type, - # crawler_status=heartbeat_template, - # is_dead=True, - # ) + heartbeat( + crawler_type=crawler_type, + blockchain_type=blockchain_type, + crawler_status=heartbeat_template, + is_dead=True, + ) logger.exception(e) raise e diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 50ba1b761..7343808d0 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -261,8 +261,7 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ crawl_job_by_selector: Dict[str, EventCrawlJob] = {} for entry in entries: - abi_selector = _get_tag(entry, "abi_method_hash") - # abi_selector = _get_tag(entry, "abi_selector") + abi_selector = _get_tag(entry, "abi_selector") contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji @@ -303,8 +302,7 @@ def make_function_call_crawl_jobs( entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) abi = json.loads(cast(str, entry.content)) - method_signature = encode_function_signature(abi) - # method_signature = _get_tag(entry, "abi_selector") + method_signature = _get_tag(entry, "abi_selector") if method_signature is None: raise ValueError(f"{abi} is not a function ABI")