-
Notifications
You must be signed in to change notification settings - Fork 50
Nft crawler blocklist #642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
Yhtiyar
wants to merge
14
commits into
main
Choose a base branch
from
nft-crawler-blocklist
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
14ef937
working hisotic crawler
Yhtiyar a3f2418
isort
Yhtiyar a5a9df8
added
Yhtiyar 4166d71
added help message for `crawl` command
Yhtiyar 2336dba
fixed some issues
Yhtiyar 0f110bf
removed unneeded log
Yhtiyar da78bff
removed select 1
Yhtiyar dab565b
added blocklist
Yhtiyar c265a53
removed tx reciept from crawling
Yhtiyar ea4e485
removed gasUsed from dataset
Yhtiyar 286f58f
added threadpool workers
Yhtiyar 2a93aeb
working threadpool
Yhtiyar a4493dc
added timeout to db conn
Yhtiyar a17a7ce
small fixes to dataset materialize
Yhtiyar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,10 @@ | |
import logging | ||
import time | ||
from dataclasses import dataclass | ||
from typing import Any, Dict, List, Optional, Set, Union | ||
|
||
from typing import Any, Dict, List, Optional, Set, Union, Callable | ||
from eth_abi.codec import ABICodec | ||
from web3._utils.events import get_event_data | ||
from web3._utils.filters import construct_event_filter_params | ||
import web3 | ||
from eth_typing import ChecksumAddress | ||
from hexbytes.main import HexBytes | ||
|
@@ -19,7 +21,6 @@ | |
ContractFunctionCall, | ||
utfy_dict, | ||
) | ||
from moonworm.crawler.log_scanner import _fetch_events_chunk # type: ignore | ||
from sqlalchemy.orm.session import Session | ||
from tqdm import tqdm | ||
from web3 import Web3 | ||
|
@@ -44,13 +45,21 @@ | |
logging.basicConfig(level=logging.INFO) | ||
logger = logging.getLogger(__name__) | ||
|
||
# TODO: ADD VALUE!!! | ||
|
||
@dataclass | ||
class ExtededFunctionCall(ContractFunctionCall): | ||
class ExtededFunctionCall: | ||
block_number: int | ||
block_timestamp: int | ||
transaction_hash: str | ||
contract_address: str | ||
caller_address: str | ||
function_name: str | ||
function_args: Dict[str, Any] | ||
gas_price: int | ||
value: int = 0 | ||
max_fee_per_gas: Optional[int] = None | ||
max_priority_fee_per_gas: Optional[int] = None | ||
value: int = 0 | ||
status: Optional[str] = None | ||
|
||
|
||
def _function_call_with_gas_price_to_label( | ||
|
@@ -69,8 +78,6 @@ def _function_call_with_gas_price_to_label( | |
"name": function_call.function_name, | ||
"caller": function_call.caller_address, | ||
"args": function_call.function_args, | ||
"status": function_call.status, | ||
"gasUsed": function_call.gas_used, | ||
"gasPrice": function_call.gas_price, | ||
"maxFeePerGas": function_call.max_fee_per_gas, | ||
"maxPriorityFeePerGas": function_call.max_priority_fee_per_gas, | ||
|
@@ -96,7 +103,7 @@ def add_function_calls_with_gas_price_to_session( | |
transactions_hashes_to_save = [ | ||
function_call.transaction_hash for function_call in function_calls | ||
] | ||
|
||
logger.info(f"Querrying existing labels (function call)") | ||
existing_labels = ( | ||
db_session.query(label_model.transaction_hash) | ||
.filter( | ||
|
@@ -106,6 +113,7 @@ def add_function_calls_with_gas_price_to_session( | |
) | ||
.all() | ||
) | ||
logger.info(f"Querry finished") | ||
|
||
existing_labels_transactions = [label[0] for label in existing_labels] | ||
|
||
|
@@ -152,6 +160,76 @@ def _transform_to_w3_tx( | |
return tx | ||
|
||
|
||
def _fetch_events_chunk( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO(zomglings): Why update here and not |
||
web3, | ||
event_abi, | ||
from_block: int, | ||
to_block: int, | ||
addresses: Optional[List[ChecksumAddress]] = None, | ||
on_decode_error: Optional[Callable[[Exception], None]] = None, | ||
address_block_list: Optional[List[ChecksumAddress]] = None, | ||
) -> List[Any]: | ||
"""Get events using eth_getLogs API. | ||
|
||
Event structure: | ||
{ | ||
"event": Event name, | ||
"args": dictionary of event arguments, | ||
"address": contract address, | ||
"blockNumber": block number, | ||
"transactionHash": transaction hash, | ||
"logIndex": log index | ||
} | ||
|
||
""" | ||
|
||
if from_block is None: | ||
raise TypeError("Missing mandatory keyword argument to getLogs: fromBlock") | ||
|
||
# Depending on the Solidity version used to compile | ||
# the contract that uses the ABI, | ||
# it might have Solidity ABI encoding v1 or v2. | ||
# We just assume the default that you set on Web3 object here. | ||
# More information here https://eth-abi.readthedocs.io/en/latest/index.html | ||
codec: ABICodec = web3.codec | ||
|
||
_, event_filter_params = construct_event_filter_params( | ||
event_abi, | ||
codec, | ||
fromBlock=from_block, | ||
toBlock=to_block, | ||
) | ||
if addresses: | ||
event_filter_params["address"] = addresses | ||
|
||
logs = web3.eth.get_logs(event_filter_params) | ||
logger.info(f"Fetched {len(logs)} raw logs") | ||
# Convert raw binary data to Python proxy objects as described by ABI | ||
all_events = [] | ||
for log in logs: | ||
if address_block_list and log["address"] in address_block_list: | ||
continue | ||
try: | ||
raw_event = get_event_data(codec, event_abi, log) | ||
event = { | ||
"event": raw_event["event"], | ||
"args": json.loads(Web3.toJSON(utfy_dict(dict(raw_event["args"])))), | ||
"address": raw_event["address"], | ||
"blockNumber": raw_event["blockNumber"], | ||
"transactionHash": raw_event["transactionHash"].hex(), | ||
"logIndex": raw_event["logIndex"], | ||
} | ||
all_events.append(event) | ||
except Exception as e: | ||
if address_block_list is not None: | ||
address_block_list.append(log["address"]) | ||
if on_decode_error: | ||
on_decode_error(e) | ||
continue | ||
logger.info(f"Decoded {len(all_events)} logs") | ||
return all_events | ||
|
||
|
||
def process_transaction( | ||
db_session: Session, | ||
web3: Web3, | ||
|
@@ -160,19 +238,20 @@ def process_transaction( | |
secondary_abi: List[Dict[str, Any]], | ||
transaction: Dict[str, Any], | ||
blocks_cache: Dict[int, int], | ||
skip_decoding: bool = False, | ||
): | ||
selector = transaction["input"][:10] | ||
function_name = selector | ||
function_args = "unknown" | ||
if not skip_decoding: | ||
try: | ||
raw_function_call = contract.decode_function_input(transaction["input"]) | ||
function_name = raw_function_call[0].fn_name | ||
function_args = utfy_dict(raw_function_call[1]) | ||
except Exception as e: | ||
pass | ||
# logger.error(f"Failed to decode transaction : {str(e)}") | ||
|
||
try: | ||
raw_function_call = contract.decode_function_input(transaction["input"]) | ||
function_name = raw_function_call[0].fn_name | ||
function_args = utfy_dict(raw_function_call[1]) | ||
except Exception as e: | ||
# logger.error(f"Failed to decode transaction : {str(e)}") | ||
selector = transaction["input"][:10] | ||
function_name = selector | ||
function_args = "unknown" | ||
|
||
transaction_reciept = web3.eth.getTransactionReceipt(transaction["hash"]) | ||
block_timestamp = get_block_timestamp( | ||
db_session, | ||
web3, | ||
|
@@ -190,8 +269,6 @@ def process_transaction( | |
caller_address=transaction["from"], | ||
function_name=function_name, | ||
function_args=function_args, | ||
status=transaction_reciept["status"], | ||
gas_used=transaction_reciept["gasUsed"], | ||
gas_price=transaction["gasPrice"], | ||
max_fee_per_gas=transaction.get( | ||
"maxFeePerGas", | ||
|
@@ -200,28 +277,7 @@ def process_transaction( | |
value=transaction["value"], | ||
) | ||
|
||
secondary_logs = [] | ||
for log in transaction_reciept["logs"]: | ||
for abi in secondary_abi: | ||
try: | ||
raw_event = get_event_data(web3.codec, abi, log) | ||
event = { | ||
"event": raw_event["event"], | ||
"args": json.loads(Web3.toJSON(utfy_dict(dict(raw_event["args"])))), | ||
"address": raw_event["address"], | ||
"blockNumber": raw_event["blockNumber"], | ||
"transactionHash": raw_event["transactionHash"].hex(), | ||
"logIndex": raw_event["logIndex"], | ||
"blockTimestamp": block_timestamp, | ||
} | ||
processed_event = _processEvent(event) | ||
secondary_logs.append(processed_event) | ||
|
||
break | ||
except: | ||
pass | ||
|
||
return function_call, secondary_logs | ||
return function_call, [] | ||
|
||
|
||
def _get_transactions( | ||
|
@@ -350,6 +406,7 @@ def crawl( | |
crawl_transactions: bool = True, | ||
addresses: Optional[List[ChecksumAddress]] = None, | ||
batch_size: int = 100, | ||
skip_decoding_transactions: bool = False, | ||
) -> None: | ||
current_block = from_block | ||
|
||
|
@@ -371,13 +428,15 @@ def crawl( | |
logger.info(f"Crawling blocks {current_block}-{current_block + batch_size}") | ||
events = [] | ||
logger.info("Fetching events") | ||
block_list = [] | ||
for event_abi in events_abi: | ||
raw_events = _fetch_events_chunk( | ||
web3, | ||
event_abi, | ||
current_block, | ||
batch_end, | ||
addresses, | ||
address_block_list=block_list, | ||
) | ||
for raw_event in raw_events: | ||
raw_event["blockTimestamp"] = get_block_timestamp( | ||
|
@@ -386,7 +445,7 @@ def crawl( | |
blockchain_type, | ||
raw_event["blockNumber"], | ||
blocks_cache=db_blocks_cache, | ||
max_blocks_batch=1000, | ||
max_blocks_batch=100, | ||
) | ||
event = _processEvent(raw_event) | ||
events.append(event) | ||
|
@@ -401,6 +460,7 @@ def crawl( | |
) | ||
logger.info(f"Fetched {len(transactions)} transactions") | ||
|
||
logger.info(f"Processing transactions") | ||
function_calls = [] | ||
for tx in transactions: | ||
processed_tx, secondary_logs = process_transaction( | ||
|
@@ -411,9 +471,12 @@ def crawl( | |
secondary_abi, | ||
tx, | ||
db_blocks_cache, | ||
skip_decoding=skip_decoding_transactions, | ||
) | ||
function_calls.append(processed_tx) | ||
events.extend(secondary_logs) | ||
logger.info(f"Processed {len(function_calls)} transactions") | ||
|
||
add_function_calls_with_gas_price_to_session( | ||
db_session, | ||
function_calls, | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extended