Skip to content

Nft crawler blocklist #642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
19 changes: 11 additions & 8 deletions crawlers/mooncrawl/mooncrawl/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,21 @@ def connect(
blockchain_type: AvailableBlockchainType,
web3_uri: Optional[str] = None,
access_id: Optional[UUID] = None,
request_timeout: Optional[int] = None,
) -> Web3:
web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider()

request_kwargs: Any = None
request_kwargs = {}
if access_id is not None:
request_kwargs = {
"headers": {
NB_ACCESS_ID_HEADER: str(access_id),
NB_DATA_SOURCE_HEADER: "blockchain",
"Content-Type": "application/json",
}
request_kwargs["headers"] = {
NB_ACCESS_ID_HEADER: str(access_id),
NB_DATA_SOURCE_HEADER: "blockchain",
"Content-Type": "application/json",
}

if request_timeout is not None:
request_kwargs["timeout"] = request_timeout

if web3_uri is None:
if blockchain_type == AvailableBlockchainType.ETHEREUM:
web3_uri = MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI
Expand All @@ -73,10 +75,11 @@ def connect(

if web3_uri.startswith("http://") or web3_uri.startswith("https://"):
web3_provider = Web3.HTTPProvider(web3_uri, request_kwargs=request_kwargs)
elif web3_uri.startswith("wss://"):
web3_provider = Web3.WebsocketProvider(web3_uri)
else:
web3_provider = Web3.IPCProvider(web3_uri)
web3_client = Web3(web3_provider)

# Inject --dev middleware if it is not Ethereum mainnet
# Docs: https://web3py.readthedocs.io/en/stable/middleware.html#geth-style-proof-of-authority
if blockchain_type != AvailableBlockchainType.ETHEREUM:
Expand Down
153 changes: 108 additions & 45 deletions crawlers/mooncrawl/mooncrawl/generic_crawler/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import logging
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Set, Union

from typing import Any, Dict, List, Optional, Set, Union, Callable
from eth_abi.codec import ABICodec
from web3._utils.events import get_event_data
from web3._utils.filters import construct_event_filter_params
import web3
from eth_typing import ChecksumAddress
from hexbytes.main import HexBytes
Expand All @@ -19,7 +21,6 @@
ContractFunctionCall,
utfy_dict,
)
from moonworm.crawler.log_scanner import _fetch_events_chunk # type: ignore
from sqlalchemy.orm.session import Session
from tqdm import tqdm
from web3 import Web3
Expand All @@ -44,13 +45,21 @@
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# TODO: ADD VALUE!!!

@dataclass
class ExtededFunctionCall(ContractFunctionCall):
class ExtededFunctionCall:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extended

block_number: int
block_timestamp: int
transaction_hash: str
contract_address: str
caller_address: str
function_name: str
function_args: Dict[str, Any]
gas_price: int
value: int = 0
max_fee_per_gas: Optional[int] = None
max_priority_fee_per_gas: Optional[int] = None
value: int = 0
status: Optional[str] = None


def _function_call_with_gas_price_to_label(
Expand All @@ -69,8 +78,6 @@ def _function_call_with_gas_price_to_label(
"name": function_call.function_name,
"caller": function_call.caller_address,
"args": function_call.function_args,
"status": function_call.status,
"gasUsed": function_call.gas_used,
"gasPrice": function_call.gas_price,
"maxFeePerGas": function_call.max_fee_per_gas,
"maxPriorityFeePerGas": function_call.max_priority_fee_per_gas,
Expand All @@ -96,7 +103,7 @@ def add_function_calls_with_gas_price_to_session(
transactions_hashes_to_save = [
function_call.transaction_hash for function_call in function_calls
]

logger.info(f"Querrying existing labels (function call)")
existing_labels = (
db_session.query(label_model.transaction_hash)
.filter(
Expand All @@ -106,6 +113,7 @@ def add_function_calls_with_gas_price_to_session(
)
.all()
)
logger.info(f"Querry finished")

existing_labels_transactions = [label[0] for label in existing_labels]

Expand Down Expand Up @@ -152,6 +160,76 @@ def _transform_to_w3_tx(
return tx


def _fetch_events_chunk(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO(zomglings): Why update here and not moonworm?

web3,
event_abi,
from_block: int,
to_block: int,
addresses: Optional[List[ChecksumAddress]] = None,
on_decode_error: Optional[Callable[[Exception], None]] = None,
address_block_list: Optional[List[ChecksumAddress]] = None,
) -> List[Any]:
"""Get events using eth_getLogs API.

Event structure:
{
"event": Event name,
"args": dictionary of event arguments,
"address": contract address,
"blockNumber": block number,
"transactionHash": transaction hash,
"logIndex": log index
}

"""

if from_block is None:
raise TypeError("Missing mandatory keyword argument to getLogs: fromBlock")

# Depending on the Solidity version used to compile
# the contract that uses the ABI,
# it might have Solidity ABI encoding v1 or v2.
# We just assume the default that you set on Web3 object here.
# More information here https://eth-abi.readthedocs.io/en/latest/index.html
codec: ABICodec = web3.codec

_, event_filter_params = construct_event_filter_params(
event_abi,
codec,
fromBlock=from_block,
toBlock=to_block,
)
if addresses:
event_filter_params["address"] = addresses

logs = web3.eth.get_logs(event_filter_params)
logger.info(f"Fetched {len(logs)} raw logs")
# Convert raw binary data to Python proxy objects as described by ABI
all_events = []
for log in logs:
if address_block_list and log["address"] in address_block_list:
continue
try:
raw_event = get_event_data(codec, event_abi, log)
event = {
"event": raw_event["event"],
"args": json.loads(Web3.toJSON(utfy_dict(dict(raw_event["args"])))),
"address": raw_event["address"],
"blockNumber": raw_event["blockNumber"],
"transactionHash": raw_event["transactionHash"].hex(),
"logIndex": raw_event["logIndex"],
}
all_events.append(event)
except Exception as e:
if address_block_list is not None:
address_block_list.append(log["address"])
if on_decode_error:
on_decode_error(e)
continue
logger.info(f"Decoded {len(all_events)} logs")
return all_events


def process_transaction(
db_session: Session,
web3: Web3,
Expand All @@ -160,19 +238,20 @@ def process_transaction(
secondary_abi: List[Dict[str, Any]],
transaction: Dict[str, Any],
blocks_cache: Dict[int, int],
skip_decoding: bool = False,
):
selector = transaction["input"][:10]
function_name = selector
function_args = "unknown"
if not skip_decoding:
try:
raw_function_call = contract.decode_function_input(transaction["input"])
function_name = raw_function_call[0].fn_name
function_args = utfy_dict(raw_function_call[1])
except Exception as e:
pass
# logger.error(f"Failed to decode transaction : {str(e)}")

try:
raw_function_call = contract.decode_function_input(transaction["input"])
function_name = raw_function_call[0].fn_name
function_args = utfy_dict(raw_function_call[1])
except Exception as e:
# logger.error(f"Failed to decode transaction : {str(e)}")
selector = transaction["input"][:10]
function_name = selector
function_args = "unknown"

transaction_reciept = web3.eth.getTransactionReceipt(transaction["hash"])
block_timestamp = get_block_timestamp(
db_session,
web3,
Expand All @@ -190,8 +269,6 @@ def process_transaction(
caller_address=transaction["from"],
function_name=function_name,
function_args=function_args,
status=transaction_reciept["status"],
gas_used=transaction_reciept["gasUsed"],
gas_price=transaction["gasPrice"],
max_fee_per_gas=transaction.get(
"maxFeePerGas",
Expand All @@ -200,28 +277,7 @@ def process_transaction(
value=transaction["value"],
)

secondary_logs = []
for log in transaction_reciept["logs"]:
for abi in secondary_abi:
try:
raw_event = get_event_data(web3.codec, abi, log)
event = {
"event": raw_event["event"],
"args": json.loads(Web3.toJSON(utfy_dict(dict(raw_event["args"])))),
"address": raw_event["address"],
"blockNumber": raw_event["blockNumber"],
"transactionHash": raw_event["transactionHash"].hex(),
"logIndex": raw_event["logIndex"],
"blockTimestamp": block_timestamp,
}
processed_event = _processEvent(event)
secondary_logs.append(processed_event)

break
except:
pass

return function_call, secondary_logs
return function_call, []


def _get_transactions(
Expand Down Expand Up @@ -350,6 +406,7 @@ def crawl(
crawl_transactions: bool = True,
addresses: Optional[List[ChecksumAddress]] = None,
batch_size: int = 100,
skip_decoding_transactions: bool = False,
) -> None:
current_block = from_block

Expand All @@ -371,13 +428,15 @@ def crawl(
logger.info(f"Crawling blocks {current_block}-{current_block + batch_size}")
events = []
logger.info("Fetching events")
block_list = []
for event_abi in events_abi:
raw_events = _fetch_events_chunk(
web3,
event_abi,
current_block,
batch_end,
addresses,
address_block_list=block_list,
)
for raw_event in raw_events:
raw_event["blockTimestamp"] = get_block_timestamp(
Expand All @@ -386,7 +445,7 @@ def crawl(
blockchain_type,
raw_event["blockNumber"],
blocks_cache=db_blocks_cache,
max_blocks_batch=1000,
max_blocks_batch=100,
)
event = _processEvent(raw_event)
events.append(event)
Expand All @@ -401,6 +460,7 @@ def crawl(
)
logger.info(f"Fetched {len(transactions)} transactions")

logger.info(f"Processing transactions")
function_calls = []
for tx in transactions:
processed_tx, secondary_logs = process_transaction(
Expand All @@ -411,9 +471,12 @@ def crawl(
secondary_abi,
tx,
db_blocks_cache,
skip_decoding=skip_decoding_transactions,
)
function_calls.append(processed_tx)
events.extend(secondary_logs)
logger.info(f"Processed {len(function_calls)} transactions")

add_function_calls_with_gas_price_to_session(
db_session,
function_calls,
Expand Down
33 changes: 17 additions & 16 deletions crawlers/mooncrawl/mooncrawl/generic_crawler/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from mooncrawl.data import AvailableBlockchainType # type: ignore

from ..blockchain import connect
from .base import crawl, get_checkpoint, populate_with_events
from ..settings import NB_CONTROLLER_ACCESS_ID
from .base import crawl, get_checkpoint, populate_with_events

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -43,12 +43,12 @@ def handle_nft_crawler(args: argparse.Namespace) -> None:
web3 = connect(blockchain_type, access_id=args.access_id)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(args.web3),
web3 = connect(
blockchain_type,
access_id=args.access_id,
web3_uri=args.web3,
request_timeout=60,
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_crawled_block = get_checkpoint(
db_session, blockchain_type, from_block, to_block, label
)
Expand All @@ -64,6 +64,7 @@ def handle_nft_crawler(args: argparse.Namespace) -> None:
from_block=last_crawled_block,
to_block=to_block,
batch_size=args.max_blocks_batch,
skip_decoding_transactions=True,
)


Expand Down Expand Up @@ -95,12 +96,12 @@ def populate_with_erc20_transfers(args: argparse.Namespace) -> None:
web3 = connect(blockchain_type, access_id=args.access_id)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(args.web3),
web3 = connect(
blockchain_type,
access_id=args.access_id,
web3_uri=args.web3,
request_timeout=60,
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_crawled_block = get_checkpoint(
db_session, blockchain_type, from_block, to_block, label
)
Expand All @@ -120,6 +121,8 @@ def populate_with_erc20_transfers(args: argparse.Namespace) -> None:


def handle_crawl(args: argparse.Namespace) -> None:
# TODO(yhtiyar): fix it
raise NotImplementedError("Deprecated for now, since blocklist is added")
logger.info(f"Starting generic crawler")

label = args.label_name
Expand All @@ -141,12 +144,10 @@ def handle_crawl(args: argparse.Namespace) -> None:
web3 = connect(blockchain_type, access_id=args.access_id)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(args.web3),

web3 = connect(
blockchain_type, access_id=args.access_id, web3_uri=args.web3
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_crawled_block = get_checkpoint(
db_session, blockchain_type, from_block, to_block, label
)
Expand Down
Loading