Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leaderboards generator improvements. #886

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 135 additions & 98 deletions crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import argparse
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError
import json
import logging
import os
from typing import cast, List
from typing import cast, List, Dict, Any, Optional
import uuid
import traceback


import requests # type: ignore
from bugout.data import BugoutSearchResult
Expand All @@ -28,6 +32,116 @@
end_c = "\033[0m"


def _get_leaderboard_results(
leaderboard: BugoutSearchResult,
query_api_access_token: str,
max_retries: int = 10,
interval: float = 30.0,
params: Optional[Dict[str, Any]] = None,
) -> None:
logger.info(
f"Processing leaderboard: {leaderboard.title} with id: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]}"
)

if leaderboard.content is None:
return

try:
leaderboard_data = json.loads(leaderboard.content)
except json.JSONDecodeError:
logger.error(
f"Could not parse leaderboard content: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]} in entry {leaderboard.entry_url.split('/')[-1]}"
)
return

### get results from query API

leaderboard_id = leaderboard_data["leaderboard_id"]

query_name = leaderboard_data["query_name"]

if not params:
params = leaderboard_data["params"]

blockchain = leaderboard_data.get("blockchain", None)

### execute query
try:
query_results = get_results_for_moonstream_query(
query_api_access_token,
query_name,
params, # type: ignore
blockchain,
MOONSTREAM_API_URL,
max_retries,
interval,
)
except Exception as e:
logger.error(f"Could not get results for query {query_name}: error: {e}")
return

### push results to leaderboard API

if query_results is None:
logger.error(f"Could not get results for query {query_name} in time")
return

leaderboard_push_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/scores?normalize_addresses={leaderboard_data['normalize_addresses']}&overwrite=true"

leaderboard_api_headers = {
"Authorization": f"Bearer {query_api_access_token}",
"Content-Type": "application/json",
}

try:
leaderboard_api_response = requests.put(
leaderboard_push_api_url,
json=query_results["data"],
headers=leaderboard_api_headers,
timeout=10,
)
except Exception as e:
logger.error(
f"Could not push results to leaderboard API: {e} for leaderboard {leaderboard_id}"
)
return

try:
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
)
return

### get leaderboard from leaderboard API

leaderboard_api_info_url = (
f"{MOONSTREAM_ENGINE_URL}/leaderboard/info?leaderboard_id={leaderboard_id}"
)

leaderboard_api_response = requests.get(
leaderboard_api_info_url, headers=leaderboard_api_headers, timeout=10
)

try:
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not get leaderboard info from leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
)
return

info = leaderboard_api_response.json()

logger.info(
f"Successfully pushed results to leaderboard {info['id']}:{blue_c} {info['title']} {end_c}"
)
logger.info(
f"can be check on:{green_c} {MOONSTREAM_ENGINE_URL}/leaderboard/?leaderboard_id={leaderboard_id} {end_c}"
)


def handle_leaderboards(args: argparse.Namespace) -> None:
"""
Run the leaderboard generator.
Expand Down Expand Up @@ -60,110 +174,33 @@ def handle_leaderboards(args: argparse.Namespace) -> None:

logger.info(f"Found {len(leaderboards_results)} leaderboards")

for leaderboard in leaderboards_results:
logger.info(
f"Processing leaderboard: {leaderboard.title} with id: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]}"
)

if leaderboard.content is None:
continue

try:
leaderboard_data = json.loads(leaderboard.content)
except json.JSONDecodeError:
logger.error(
f"Could not parse leaderboard content: {[tag for tag in leaderboard.tags if tag.startswith('leaderboard_id')]} in entry {leaderboard.entry_url.split('/')[-1]}"
)
continue

### get results from query API

leaderboard_id = leaderboard_data["leaderboard_id"]

query_name = leaderboard_data["query_name"]

if args.params:
params = json.loads(args.params)
else:
params = leaderboard_data["params"]
timeout = args.max_retries * args.interval + 10

blockchain = leaderboard_data.get("blockchain", None)
tasks = []

### execute query
try:
query_results = get_results_for_moonstream_query(
with ThreadPoolExecutor(max_workers=2) as executor:
for leaderboard in leaderboards_results:
task = executor.submit(
_get_leaderboard_results,
leaderboard,
args.query_api_access_token,
query_name,
params,
blockchain,
MOONSTREAM_API_URL,
args.max_retries,
args.interval,
args.params,
)
except Exception as e:
logger.error(f"Could not get results for query {query_name}: error: {e}")
continue
tasks.append(task)

### push results to leaderboard API
for task in tasks:
try:
task.result(timeout=timeout)
except TimeoutError:
logger.error("Timeout")
task.cancel()

if query_results is None:
logger.error(f"Could not get results for query {query_name} in time")
continue

leaderboard_push_api_url = f"{MOONSTREAM_ENGINE_URL}/leaderboard/{leaderboard_id}/scores?normalize_addresses={leaderboard_data['normalize_addresses']}&overwrite=true"

leaderboard_api_headers = {
"Authorization": f"Bearer {args.query_api_access_token}",
"Content-Type": "application/json",
}

try:
leaderboard_api_response = requests.put(
leaderboard_push_api_url,
json=query_results["data"],
headers=leaderboard_api_headers,
timeout=10,
)
except Exception as e:
logger.error(
f"Could not push results to leaderboard API: {e} for leaderboard {leaderboard_id}"
)
continue

try:
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not push results to leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
)
continue

### get leaderboard from leaderboard API

leaderboard_api_info_url = (
f"{MOONSTREAM_ENGINE_URL}/leaderboard/info?leaderboard_id={leaderboard_id}"
)

leaderboard_api_response = requests.get(
leaderboard_api_info_url, headers=leaderboard_api_headers, timeout=10
)

try:
leaderboard_api_response.raise_for_status()
except requests.exceptions.HTTPError as http_error:
logger.error(
f"Could not get leaderboard info from leaderboard API: {http_error.response.text} with status code {http_error.response.status_code}"
)
continue

info = leaderboard_api_response.json()

logger.info(
f"Successfully pushed results to leaderboard {info['id']}:{blue_c} {info['title']} {end_c}"
)
logger.info(
f"can be check on:{green_c} {MOONSTREAM_ENGINE_URL}/leaderboard/?leaderboard_id={leaderboard_id} {end_c}"
)
except Exception as e:
traceback.print_exc()
logger.error(f"Could not get results for leaderboard: {e}")
task.cancel()


def main():
Expand All @@ -187,7 +224,7 @@ def main():
leaderboard_generator_parser.add_argument(
"--max-retries",
type=int,
default=100,
default=10,
help="Number of times to retry requests for Moonstream Query results",
)
leaderboard_generator_parser.add_argument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def get_results_for_moonstream_query(
params: Dict[str, Any],
blockchain: Optional[str] = None,
api_url: str = MOONSTREAM_API_URL,
max_retries: int = 100,
max_retries: int = 10,
interval: float = 30.0,
) -> Optional[Dict[str, Any]]:
"""
Expand Down
Loading