Skip to content

Commit

Permalink
Merge pull request #76 from skalenetwork/fix-metrics-collector
Browse files Browse the repository at this point in the history
Restructure metrics collector
  • Loading branch information
dmytrotkk authored Nov 1, 2024
2 parents 95f4e5f + ec820b7 commit b6e3344
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 54 deletions.
5 changes: 3 additions & 2 deletions metrics/requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
ruff==0.6.4
pytest==8.3.3
Faker==28.4.1
eth-typing==4.1.0
pytest-aiohttp==1.0.5
pytest-aiohttp==1.0.5
eth-typing==4.0.0
eth-utils==4.0.0
110 changes: 66 additions & 44 deletions metrics/src/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import asyncio
import aiohttp
from datetime import datetime, timedelta
from typing import Tuple, Optional, Dict
from typing import Tuple, Optional, Dict, List
from aiohttp import ClientError, ClientSession

from src.explorer import get_address_counters_url, get_chain_stats
Expand All @@ -36,24 +36,27 @@
GITHUB_RAW_URL,
OFFCHAIN_KEY,
)
from src.metrics_types import AddressCounter, AddressCountersMap, MetricsData, ChainMetrics

from src.metrics_types import (
AddressCounter,
AddressCountersMap,
MetricsData,
ChainMetrics,
AddressType,
)

logger = logging.getLogger(__name__)


def get_metadata_url(network_name: str) -> str:
return f'{GITHUB_RAW_URL}/skalenetwork/skale-network/master/metadata/{network_name}/chains.json'


async def download_metadata(session, network_name: str) -> Dict:
url = get_metadata_url(network_name)
async def download_metadata(session: ClientSession, network_name: str) -> Dict:
"""Download and parse network metadata."""
url = f'{GITHUB_RAW_URL}/skalenetwork/skale-network/master/metadata/{network_name}/chains.json'
async with session.get(url) as response:
metadata_srt = await response.text()
return json.loads(metadata_srt)
metadata_str = await response.text()
return json.loads(metadata_str)


def get_empty_address_counter() -> AddressCounter:
"""Return an empty counter structure with zero values."""
return {
'gas_usage_count': '0',
'token_transfers_count': '0',
Expand All @@ -65,47 +68,59 @@ def get_empty_address_counter() -> AddressCounter:
}


async def fetch_address_data(session: ClientSession, url: str) -> AddressCounter:
async def fetch_address_data(
session: ClientSession, url: str, chain_name: str, app_name: str, address: str
) -> AddressCounter:
"""Fetch and process address data, updating DB immediately."""
async with session.get(url) as response:
if response.status == 404:
data = await response.json()
if data.get('message') == 'Not found':
logger.warning(f'Address not found at {url}. Returning empty counter.')
return get_empty_address_counter()
response.raise_for_status()
return await response.json()

current_data: Dict = await response.json()

await update_transaction_counts(chain_name, app_name, address, current_data)

today = datetime.now().date()
yesterday = today - timedelta(days=1)
week_ago = today - timedelta(days=7)
month_ago = today - timedelta(days=30)

transactions_last_day = await get_address_transaction_counts(
chain_name, app_name, address, yesterday, yesterday
)
transactions_last_7_days = await get_address_transaction_counts(
chain_name, app_name, address, week_ago, yesterday
)
transactions_last_30_days = await get_address_transaction_counts(
chain_name, app_name, address, month_ago, yesterday
)

result: AddressCounter = {
'gas_usage_count': str(current_data.get('gas_usage_count', '0')),
'token_transfers_count': str(current_data.get('token_transfers_count', '0')),
'transactions_count': str(current_data.get('transactions_count', '0')),
'validations_count': str(current_data.get('validations_count', '0')),
'transactions_last_day': transactions_last_day,
'transactions_last_7_days': transactions_last_7_days,
'transactions_last_30_days': transactions_last_30_days,
}
logger.info(f'Fetched data for {address} at {url}: {result}')
return result


async def get_address_counters(
session: ClientSession, network: str, chain_name: str, app_name: str, address: str
) -> AddressCounter:
"""Get address counters with retries."""
url = get_address_counters_url(network, chain_name, address)

for attempt in range(API_ERROR_RETRIES):
try:
data = await fetch_address_data(session, url)

today = datetime.now().date()
yesterday = today - timedelta(days=1)
week_ago = today - timedelta(days=7)
month_ago = today - timedelta(days=30)

transactions_last_day = await get_address_transaction_counts(
chain_name, app_name, address, yesterday, yesterday
)
transactions_last_7_days = await get_address_transaction_counts(
chain_name, app_name, address, week_ago, yesterday
)
transactions_last_30_days = await get_address_transaction_counts(
chain_name, app_name, address, month_ago, yesterday
)

data['transactions_last_day'] = transactions_last_day
data['transactions_last_7_days'] = transactions_last_7_days
data['transactions_last_30_days'] = transactions_last_30_days

await update_transaction_counts(chain_name, app_name, address, data)

return data
return await fetch_address_data(session, url, chain_name, app_name, address)
except ClientError as e:
if attempt < API_ERROR_RETRIES - 1:
logger.warning(f'Attempt {attempt + 1} failed for {url}. Retrying... Error: {e}')
Expand All @@ -117,19 +132,22 @@ async def get_address_counters(


async def get_all_address_counters(
session, network, chain_name, app_name, addresses
session: ClientSession, network: str, chain_name: str, app_name: str, addresses: List[str]
) -> AddressCountersMap:
results = [
await get_address_counters(session, network, chain_name, app_name, address)
"""Get counters for multiple addresses concurrently."""
tasks = [
get_address_counters(session, network, chain_name, app_name, address)
for address in addresses
]
return dict(zip(addresses, results))
results = await asyncio.gather(*tasks)
return {AddressType(addr): counter for addr, counter in zip(addresses, results)}


async def fetch_counters_for_app(
session, network_name, chain_name, app_name, app_info
session: ClientSession, network_name: str, chain_name: str, app_name: str, app_info: Dict
) -> Tuple[str, Optional[AddressCountersMap]]:
logger.info(f'fetching counters for app {app_name}')
"""Fetch counters for a specific app."""
logger.info(f'Fetching counters for app {app_name}')
if 'contracts' in app_info:
counters = await get_all_address_counters(
session, network_name, chain_name, app_name, app_info['contracts']
Expand All @@ -138,7 +156,10 @@ async def fetch_counters_for_app(
return app_name, None


async def fetch_counters_for_apps(session, chain_info, network_name, chain_name):
async def fetch_counters_for_apps(
session: ClientSession, chain_info: Dict, network_name: str, chain_name: str
) -> List[Tuple[str, Optional[AddressCountersMap]]]:
"""Fetch counters for all apps in a chain concurrently."""
tasks = [
fetch_counters_for_app(session, network_name, chain_name, app_name, app_info)
for app_name, app_info in chain_info['apps'].items()
Expand All @@ -147,6 +168,7 @@ async def fetch_counters_for_apps(session, chain_info, network_name, chain_name)


async def collect_metrics(network_name: str) -> MetricsData:
"""Collect all metrics and save to file."""
async with aiohttp.ClientSession() as session:
metadata = await download_metadata(session, network_name)
metrics: Dict[str, ChainMetrics] = {}
Expand Down
20 changes: 20 additions & 0 deletions metrics/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
TEST_NETWORK = 'testnet'
TEST_CHAIN = 'chain2'
TEST_ADDRESS = '0x1234'
TEST_APP = 'test-app'


@pytest.fixture
Expand Down Expand Up @@ -82,6 +83,25 @@ def latest_day_counters():
return get_latest_day_counters()


@pytest.fixture
def mock_db_data():
return {
'transactions_last_day': 50,
'transactions_last_7_days': 300,
'transactions_last_30_days': 1000,
}


@pytest.fixture
def mock_address_data():
return {
'gas_usage_count': '16935',
'token_transfers_count': '174',
'transactions_count': '1734',
'validations_count': '22',
}


@pytest.fixture
def mock_chain_stats_data():
return CHAIN_STATS
Expand Down
41 changes: 33 additions & 8 deletions metrics/tests/test_metrics_collector.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest
from unittest.mock import patch
from typing import Dict
from src.collector import get_chain_stats, fetch_address_data
from conftest import TEST_NETWORK, TEST_CHAIN
from conftest import TEST_NETWORK, TEST_CHAIN, TEST_ADDRESS, TEST_APP

pytestmark = pytest.mark.asyncio
pytest_plugins = ('pytest_asyncio',)
Expand All @@ -13,10 +14,34 @@ async def test_get_chain_stats(mock_chain_stats_data: Dict, client, mock_explore
assert result == mock_chain_stats_data


async def test_fetch_address_data(client, mock_explorer_url) -> None:
result = await fetch_address_data(client, '/api/v2/addresses/0x1234/counters')
assert isinstance(result, dict)
assert result['gas_usage_count'] == '16935'
assert result['token_transfers_count'] == '174'
assert result['transactions_count'] == '1734'
assert result['validations_count'] == '22'
async def test_fetch_address_data_success(
client, mock_explorer_url, mock_address_data: Dict[str, str], mock_db_data: Dict[str, int]
) -> None:
with (
patch('src.collector.update_transaction_counts') as mock_update,
patch('src.collector.get_address_transaction_counts') as mock_get_counts,
):
mock_get_counts.side_effect = [
mock_db_data['transactions_last_day'],
mock_db_data['transactions_last_7_days'],
mock_db_data['transactions_last_30_days'],
]

result = await fetch_address_data(
client, '/api/v2/addresses/0x1234/counters', TEST_CHAIN, TEST_APP, TEST_ADDRESS
)

# Verify the result type and content
assert isinstance(result, dict)
assert result['gas_usage_count'] == mock_address_data['gas_usage_count']
assert result['token_transfers_count'] == mock_address_data['token_transfers_count']
assert result['transactions_count'] == mock_address_data['transactions_count']
assert result['validations_count'] == mock_address_data['validations_count']

# Verify historical data
assert result['transactions_last_day'] == mock_db_data['transactions_last_day']
assert result['transactions_last_7_days'] == mock_db_data['transactions_last_7_days']
assert result['transactions_last_30_days'] == mock_db_data['transactions_last_30_days']

# Verify database was updated
mock_update.assert_called_once()

0 comments on commit b6e3344

Please sign in to comment.