Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crawlers improvements. #924

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
if args.find_deployed_blocks:
addresses_set = set()
for job in filtered_event_jobs:
addresses_set.update(job.contracts)
addresses_set.update(job.contracts) # type: ignore
for function_job in filtered_function_call_jobs:
addresses_set.add(function_job.contract_address)
addresses_set.add(function_job.contract_address) # type: ignore

if args.start is None:
start_block = web3.eth.blockNumber - 1
Expand Down Expand Up @@ -330,8 +330,8 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
db_session,
blockchain_type,
web3,
filtered_event_jobs,
filtered_function_call_jobs,
filtered_event_jobs, # type: ignore
filtered_function_call_jobs, # type: ignore
start_block,
end_block,
args.max_blocks_batch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@
merge_function_call_crawl_jobs,
moonworm_crawler_update_job_as_pickedup,
)
from .db import add_events_to_session, add_function_calls_to_session, commit_session
from .db import (
add_events_to_session,
add_function_calls_to_session,
commit_session,
write_to_db,
delete_unverified_duplicates,
)
from .event_crawler import _crawl_events
from .function_call_crawler import _crawl_functions
from ..settings import (
Expand Down Expand Up @@ -95,7 +101,7 @@ def continuous_crawler(
function_call_crawl_jobs: List[FunctionCallCrawlJob],
start_block: int,
max_blocks_batch: int = 100,
min_blocks_batch: int = 40,
min_blocks_batch: int = 5,
confirmations: int = 60,
min_sleep_time: float = 0.1,
heartbeat_interval: float = 60,
Expand Down Expand Up @@ -162,7 +168,7 @@ def continuous_crawler(
crawler_status=heartbeat_template,
)
last_heartbeat_time = datetime.utcnow()
blocks_cache: Dict[int, int] = {}
blocks_cache: Dict[int, Optional[int]] = {}
current_sleep_time = min_sleep_time
failed_count = 0
try:
Expand All @@ -171,7 +177,7 @@ def continuous_crawler(
time.sleep(current_sleep_time)

end_block = min(
web3.eth.blockNumber - confirmations,
web3.eth.blockNumber - min_blocks_batch,
start_block + max_blocks_batch,
)

Expand All @@ -192,7 +198,7 @@ def continuous_crawler(
from_block=start_block,
to_block=end_block,
blocks_cache=blocks_cache,
db_block_query_batch=min_blocks_batch * 2,
db_block_query_batch=max_blocks_batch * 3,
)
logger.info(
f"Crawled {len(all_events)} events from {start_block} to {end_block}."
Expand Down Expand Up @@ -220,6 +226,18 @@ def continuous_crawler(

current_time = datetime.utcnow()

write_to_db(
web3=web3, db_session=db_session, blockchain_type=blockchain_type
)

delete_unverified_duplicates(
db_session=db_session, blockchain_type=blockchain_type
)

commit_session(db_session)

### fetch confirmed transactions and events

if current_time - jobs_refetchet_time > timedelta(
seconds=new_jobs_refetch_interval
):
Expand All @@ -243,8 +261,6 @@ def continuous_crawler(
if current_time - last_heartbeat_time > timedelta(
seconds=heartbeat_interval
):
# Commiting to db
commit_session(db_session)
# Update heartbeat
heartbeat_template["last_block"] = end_block
heartbeat_template["current_time"] = _date_to_str(current_time)
Expand Down Expand Up @@ -304,7 +320,7 @@ def continuous_crawler(
heartbeat_template[
"die_reason"
] = f"{e.__class__.__name__}: {e}\n error_summary: {error_summary}\n error_traceback: {error_traceback}"
heartbeat_template["last_block"] = end_block
heartbeat_template["last_block"] = end_block # type: ignore
heartbeat(
crawler_type=crawler_type,
blockchain_type=blockchain_type,
Expand Down
27 changes: 15 additions & 12 deletions crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def _retry_connect_web3(
logger.info(f"Retrying in {sleep_time} seconds")
time.sleep(sleep_time)
raise Exception(
f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}"
f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}" # type: ignore
)


Expand All @@ -151,7 +151,7 @@ def blockchain_type_to_subscription_type(

@dataclass
class EventCrawlJob:
event_abi_hash: str
event_abi_selector: str
event_abi: Dict[str, Any]
contracts: List[ChecksumAddress]
address_entries: Dict[ChecksumAddress, Dict[UUID, List[str]]]
Expand Down Expand Up @@ -226,6 +226,7 @@ def find_all_deployed_blocks(
"""

all_deployed_blocks = {}
logger.info(f"Finding deployment blocks for {len(addresses_set)} addresses")
for address in addresses_set:
try:
code = web3.eth.getCode(address)
Expand All @@ -237,6 +238,7 @@ def find_all_deployed_blocks(
)
if block is not None:
all_deployed_blocks[address] = block
logger.info(f"Found deployment block for {address}: {block}")
if block is None:
logger.error(f"Failed to get deployment block for {address}")
except Exception as e:
Expand All @@ -256,15 +258,15 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ
Create EventCrawlJob objects from bugout entries.
"""

crawl_job_by_hash: Dict[str, EventCrawlJob] = {}
crawl_job_by_selector: Dict[str, EventCrawlJob] = {}

for entry in entries:
abi_hash = _get_tag(entry, "abi_method_hash")
abi_selector = _get_tag(entry, "abi_selector")
contract_address = Web3().toChecksumAddress(_get_tag(entry, "address"))

entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji

existing_crawl_job = crawl_job_by_hash.get(abi_hash)
existing_crawl_job = crawl_job_by_selector.get(abi_selector)
if existing_crawl_job is not None:
if contract_address not in existing_crawl_job.contracts:
existing_crawl_job.contracts.append(contract_address)
Expand All @@ -275,15 +277,15 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ
else:
abi = cast(str, entry.content)
new_crawl_job = EventCrawlJob(
event_abi_hash=abi_hash,
event_abi_selector=abi_selector,
event_abi=json.loads(abi),
contracts=[contract_address],
address_entries={contract_address: {entry_id: entry.tags}},
created_at=int(datetime.fromisoformat(entry.created_at).timestamp()),
)
crawl_job_by_hash[abi_hash] = new_crawl_job
crawl_job_by_selector[abi_selector] = new_crawl_job

return [crawl_job for crawl_job in crawl_job_by_hash.values()]
return [crawl_job for crawl_job in crawl_job_by_selector.values()]


def make_function_call_crawl_jobs(
Expand All @@ -300,7 +302,8 @@ def make_function_call_crawl_jobs(
entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji
contract_address = Web3().toChecksumAddress(_get_tag(entry, "address"))
abi = json.loads(cast(str, entry.content))
method_signature = encode_function_signature(abi)
method_signature = _get_tag(entry, "abi_selector")

if method_signature is None:
raise ValueError(f"{abi} is not a function ABI")

Expand Down Expand Up @@ -340,7 +343,7 @@ def merge_event_crawl_jobs(
"""
for new_crawl_job in new_event_crawl_jobs:
for old_crawl_job in old_crawl_jobs:
if new_crawl_job.event_abi_hash == old_crawl_job.event_abi_hash:
if new_crawl_job.event_abi_selector == old_crawl_job.event_abi_selector:
old_crawl_job.contracts.extend(
[
contract
Expand All @@ -355,8 +358,8 @@ def merge_event_crawl_jobs(
else:
old_crawl_job.address_entries[contract_address] = entries
break
else:
old_crawl_jobs.append(new_crawl_job)
else:
old_crawl_jobs.append(new_crawl_job)
return old_crawl_jobs


Expand Down
Loading
Loading