diff --git a/crawlers/deploy/mumbai-state.service b/crawlers/deploy/mumbai-state.service index 121f89767..cf1fe6273 100644 --- a/crawlers/deploy/mumbai-state.service +++ b/crawlers/deploy/mumbai-state.service @@ -6,6 +6,6 @@ After=network.target Type=oneshot WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env -ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain mumbai --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain mumbai --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/mumbai-jobs.json --max-workers 2 CPUWeight=60 SyslogIdentifier=mumbai-state \ No newline at end of file diff --git a/crawlers/deploy/polygon-state-ownership.service b/crawlers/deploy/polygon-state-ownership.service new file mode 100644 index 000000000..d384d81ec --- /dev/null +++ b/crawlers/deploy/polygon-state-ownership.service @@ -0,0 +1,11 @@ +[Unit] +Description=Execute state crawler +After=network.target + +[Service] +Type=oneshot +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain polygon --jobs-file ./mooncrawl/state_crawler/jobs/polygon-cu-ownership.json --batch-size 10000 --max-batch-size 10000 --no-block-specifier --max-workers 20 +CPUWeight=60 +SyslogIdentifier=polygon-state-ownership \ No newline at end of file diff --git a/crawlers/deploy/polygon-state.service b/crawlers/deploy/polygon-state.service index 04ace9d00..de4d8fec0 100644 --- a/crawlers/deploy/polygon-state.service +++ b/crawlers/deploy/polygon-state.service @@ -6,6 +6,6 @@ After=network.target Type=oneshot WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env -ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain polygon --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.state_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" crawl-jobs --blockchain polygon --infura --jobs-file /home/ubuntu/moonstream/crawlers/mooncrawl/mooncrawl/state_crawler/jobs/polygon-jobs.json --max-workers 2 CPUWeight=60 SyslogIdentifier=polygon-state \ No newline at end of file diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py index 903632bc6..350b0da90 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/cli.py @@ -4,15 +4,17 @@ import json import logging import time -from concurrent.futures import ThreadPoolExecutor + +from concurrent.futures import as_completed, ProcessPoolExecutor from concurrent.futures._base import TimeoutError -from pprint import pprint -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Literal, Union from uuid import UUID from moonstreamdb.blockchain import AvailableBlockchainType from web3._utils.request import cache_session from web3.middleware import geth_poa_middleware +from web3 import Web3, AsyncHTTPProvider + from mooncrawl.moonworm_crawler.crawler import _retry_connect_web3 @@ -31,14 +33,60 @@ logger = logging.getLogger(__name__) +def select_provider( + web3_provider_uri: Optional[str], + blockchain_type: AvailableBlockchainType, + access_id: UUID, +) -> Web3: + if web3_provider_uri is not None: + try: + logger.info( + f"Connecting to blockchain: {blockchain_type} with custom provider!" + ) + + web3_client = connect(web3_provider_uri, request_kwargs={"timeout": 60}) + + if blockchain_type != AvailableBlockchainType.ETHEREUM: + web3_client.middleware_onion.inject(geth_poa_middleware, layer=0) + except Exception as e: + logger.error( + f"Web3 connection to custom provider {web3_provider_uri} failed error: {e}" + ) + raise (e) + else: + logger.info(f"Connecting to blockchain: {blockchain_type} with Node balancer.") + web3_client = _retry_connect_web3( + blockchain_type=blockchain_type, access_id=access_id + ) + + return web3_client + + def make_multicall( - multicall_method: Any, + web3_provider_uri: Optional[str], + blockchain_type: AvailableBlockchainType, + access_id: UUID, calls: List[Any], block_timestamp: int, - block_number: str = "latest", + block_number: Union[ + Literal["latest", "earliest", "pending"], + int, + ] = "latest", + no_block_specifier: bool = False, ) -> Any: multicall_calls = [] + web3_client = select_provider( + web3_provider_uri=web3_provider_uri, + blockchain_type=blockchain_type, + access_id=access_id, + ) + + multicall_contract = Multicall2( + web3_client, + web3_client.toChecksumAddress(multicall_contracts[blockchain_type]), + ) + for call in calls: try: multicall_calls.append( @@ -52,13 +100,31 @@ def make_multicall( f'Error encoding data for method {call["method"].name} call: {call}' ) - multicall_result = multicall_method(False, calls=multicall_calls).call( - block_identifier=block_number - ) + start_time = time.time() + + if no_block_specifier: + """ + tryBlockAndAggregate returns (uint256 blockNumber, bytes32 blockHash, Result[] memory returnData) + """ + multicall_method = multicall_contract.tryBlockAndAggregate + multicall_result_with_block = multicall_method( + False, calls=multicall_calls + ).call() + multicall_result = multicall_result_with_block[2] + block_number = multicall_result_with_block[0] + block_timestamp = web3_client.eth.get_block(block_number).timestamp # type: ignore + else: + multicall_method = multicall_contract.tryAggregate + multicall_result = multicall_method(False, calls=multicall_calls).call( + block_identifier=block_number + ) + + print(f"Multicall2 call time: {time.time() - start_time}") results = [] # Handle the case with not successful calls + for index, encoded_data in enumerate(multicall_result): try: if encoded_data[0]: @@ -118,17 +184,19 @@ def make_multicall( def crawl_calls_level( - web3_client, + access_id: UUID, + web3_provider_uri: Optional[str], db_session, calls, responces, contracts_ABIs, interfaces, batch_size, - multicall_method, block_number, blockchain_type, block_timestamp, + max_workers, + no_block_specifier, max_batch_size=5000, min_batch_size=4, ): @@ -180,28 +248,62 @@ def crawl_calls_level( while len(calls_of_level) > 0: make_multicall_result = [] + try: - call_chunk = calls_of_level[:batch_size] + # make multicall calls in batches with split on max_workers threads + + futures = [] + + calls_for_workers = [] + + faild_calls = [] + + with ProcessPoolExecutor(max_workers=max_workers) as executor: + for call_chunk in [ + calls_of_level[i : i + batch_size] + for i in range( + 0, + batch_size * max_workers + if batch_size * max_workers < len(calls_of_level) + else len(calls_of_level), + batch_size, + ) + ]: + calls_for_workers.append(call_chunk) + + for call_chunk in calls_for_workers: + logger.info( + f"Calling multicall2 with {len(call_chunk)} calls at block {block_number}" + ) + + futures.append( + executor.submit( + make_multicall, + web3_provider_uri, + blockchain_type, + access_id, + call_chunk, + block_timestamp, + block_number, + no_block_specifier, + ) + ) - logger.info( - f"Calling multicall2 with {len(call_chunk)} calls at block {block_number}" - ) + for index, future in enumerate(as_completed(futures)): + try: + result = future.result(timeout=30) + logger.info( + f"Multicall2 returned {len(result)} results at block {block_number}" + ) + make_multicall_result.extend(result) + + except Exception as e: + logger.error(f"Exception: {e}") + faild_calls.extend(calls_for_workers[index]) - # 1 thead with timeout for hung multicall calls - with ThreadPoolExecutor(max_workers=1) as executor: - future = executor.submit( - make_multicall, - multicall_method, - call_chunk, - block_timestamp, - block_number, - ) - make_multicall_result = future.result(timeout=20) - logger.info( - f"Multicall2 returned {len(make_multicall_result)} results at block {block_number}" - ) retry = 0 - calls_of_level = calls_of_level[batch_size:] + calls_of_level = calls_of_level[len(make_multicall_result) :] + calls_of_level.extend(faild_calls) logger.info(f"lenght of task left {len(calls_of_level)}.") batch_size = min(batch_size * 2, max_batch_size) except ValueError as e: # missing trie node @@ -221,7 +323,6 @@ def crawl_calls_level( except Exception as e: logger.error(f"Exception: {e}") raise (e) - time.sleep(2) print(f"retry: {retry}") # results parsing and writing to database add_to_session_count = 0 @@ -233,7 +334,9 @@ def crawl_calls_level( if result["hash"] not in responces: responces[result["hash"]] = [] responces[result["hash"]].append(result["result"]) + start_time = time.time() commit_session(db_session) + print(f"Commit time: {time.time() - start_time}") logger.info(f"{add_to_session_count} labels commit to database.") return batch_size @@ -246,6 +349,9 @@ def parse_jobs( block_number: Optional[int], batch_size: int, access_id: UUID, + max_batch_size: int = 5000, + max_workers: int = 10, + no_block_specifier: bool = False, ): """ Parse jobs from list and generate web3 interfaces for each contract. @@ -255,42 +361,23 @@ def parse_jobs( contracts_methods: Dict[str, Any] = {} calls: Dict[int, Any] = {0: []} - if web3_provider_uri is not None: - try: - logger.info( - f"Connecting to blockchain: {blockchain_type} with custom provider!" - ) - - web3_client = connect(web3_provider_uri) - - if blockchain_type != AvailableBlockchainType.ETHEREUM: - web3_client.middleware_onion.inject(geth_poa_middleware, layer=0) - except Exception as e: - logger.error( - f"Web3 connection to custom provider {web3_provider_uri} failed error: {e}" - ) - raise (e) - else: - logger.info(f"Connecting to blockchain: {blockchain_type} with Node balancer.") - web3_client = _retry_connect_web3( - blockchain_type=blockchain_type, access_id=access_id - ) + web3_client = select_provider( + web3_provider_uri=web3_provider_uri, + blockchain_type=blockchain_type, + access_id=access_id, + ) logger.info(f"Crawler started connected to blockchain: {blockchain_type}") if block_number is None: block_number = web3_client.eth.get_block("latest").number # type: ignore + else: + block_number = int(block_number) logger.info(f"Current block number: {block_number}") block_timestamp = web3_client.eth.get_block(block_number).timestamp # type: ignore - multicaller = Multicall2( - web3_client, web3_client.toChecksumAddress(multicall_contracts[blockchain_type]) - ) - - multicall_method = multicaller.tryAggregate - def recursive_unpack(method_abi: Any, level: int = 0) -> Any: """ Generate tree of calls for crawling @@ -383,17 +470,20 @@ def recursive_unpack(method_abi: Any, level: int = 0) -> Any: logger.info(f"call_tree_levels: {call_tree_levels}") batch_size = crawl_calls_level( - web3_client, - db_session, - calls[0], - responces, - contracts_ABIs, - interfaces, - batch_size, - multicall_method, - block_number, - blockchain_type, - block_timestamp, + access_id=access_id, + web3_provider_uri=web3_provider_uri, + db_session=db_session, + calls=calls[0], + responces=responces, + contracts_ABIs=contracts_ABIs, + interfaces=interfaces, + batch_size=batch_size, + block_number=block_number, + blockchain_type=blockchain_type, + block_timestamp=block_timestamp, + max_batch_size=max_batch_size, + max_workers=max_workers, + no_block_specifier=no_block_specifier, ) for level in call_tree_levels: @@ -401,17 +491,20 @@ def recursive_unpack(method_abi: Any, level: int = 0) -> Any: logger.info(f"Jobs amount: {len(calls[level])}") batch_size = crawl_calls_level( - web3_client, - db_session, - calls[level], - responces, - contracts_ABIs, - interfaces, - batch_size, - multicall_method, - block_number, - blockchain_type, - block_timestamp, + access_id=access_id, + web3_provider_uri=web3_provider_uri, + db_session=db_session, + calls=calls[level], + responces=responces, + contracts_ABIs=contracts_ABIs, + interfaces=interfaces, + batch_size=batch_size, + block_number=block_number, + blockchain_type=blockchain_type, + block_timestamp=block_timestamp, + max_batch_size=max_batch_size, + max_workers=max_workers, + no_block_specifier=no_block_specifier, ) finally: @@ -447,6 +540,9 @@ def handle_crawl(args: argparse.Namespace) -> None: args.block_number, args.batch_size, args.access_id, + args.max_batch_size, + args.max_workers, + args.no_block_specifier, ) @@ -540,6 +636,26 @@ def main() -> None: default=500, help="Size of chunks wich send to Multicall2 contract.", ) + view_state_crawler_parser.add_argument( + "--max-batch-size", + "-m", + type=int, + default=4000, + help="Max size of chunks wich send to Multicall2 contract.", + ) + view_state_crawler_parser.add_argument( + "--max-workers", + "-w", + type=int, + default=10, + help="Max workers for Multicall2 contract.", + ) + view_state_crawler_parser.add_argument( + "--no-block-specifier", + "-nb", + action="store_true", + help="Disable block specifier.", + ) view_state_crawler_parser.set_defaults(func=handle_crawl) view_state_cleaner = subparsers.add_parser( diff --git a/crawlers/mooncrawl/mooncrawl/state_crawler/web3_util.py b/crawlers/mooncrawl/mooncrawl/state_crawler/web3_util.py index 224034a77..01e4fcc8f 100644 --- a/crawlers/mooncrawl/mooncrawl/state_crawler/web3_util.py +++ b/crawlers/mooncrawl/mooncrawl/state_crawler/web3_util.py @@ -165,10 +165,10 @@ def read_keys_from_env() -> Tuple[ChecksumAddress, str]: ) -def connect(web3_uri: str) -> Web3: +def connect(web3_uri: str, request_kwargs: Dict[str, Any] = {}) -> Web3: web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider() if web3_uri.startswith("http://") or web3_uri.startswith("https://"): - web3_provider = Web3.HTTPProvider(web3_uri) + web3_provider = Web3.HTTPProvider(web3_uri, request_kwargs=request_kwargs) else: web3_provider = Web3.IPCProvider(web3_uri) web3_client = Web3(web3_provider)