diff --git a/config-example.json b/config-example.json index 669c1d7..ee17ff9 100644 --- a/config-example.json +++ b/config-example.json @@ -4,6 +4,7 @@ "refresh_interval_seconds": 10 }, "price_collector": { + "enable": true, "refresh_interval_seconds": 10 }, "notifications": { diff --git a/monitor/__main__.py b/monitor/__main__.py index 3bc24f6..29d17bf 100644 --- a/monitor/__main__.py +++ b/monitor/__main__.py @@ -1,5 +1,4 @@ import asyncio -import json import logging import sys from asyncio.queues import Queue @@ -10,7 +9,9 @@ from chia.util.default_root import DEFAULT_ROOT_PATH from sqlalchemy.exc import OperationalError -from monitor.collectors import RpcCollector, WsCollector +from monitor.configuration import Configuration, read_config +from monitor.collectors.ws_collector import WsCollector +from monitor.collectors.rpc_collector import RpcCollector from monitor.collectors.price_collector import PriceCollector from monitor.database import ChiaEvent, session from monitor.exporter import ChiaExporter @@ -20,7 +21,7 @@ chia_config = load_config(DEFAULT_ROOT_PATH, "config.yaml") -def initilize_logging(): +def initialize_logging(): handler = colorlog.StreamHandler() log_date_format = "%Y-%m-%dT%H:%M:%S" handler.setFormatter( @@ -40,16 +41,16 @@ def persist_event(event: ChiaEvent): db_session.commit() -async def aggregator(exporter: ChiaExporter, notifier: Optional[Notifier], rpc_refresh_interval: int, - price_refresh_interval: int) -> None: +async def aggregator(exporter: ChiaExporter, notifier: Optional[Notifier], config: Configuration) -> None: rpc_collector = None ws_collector = None + price_collector = None event_queue = Queue() logger = ChiaLogger() try: logging.info("🔌 Creating RPC Collector...") - rpc_collector = await RpcCollector.create(DEFAULT_ROOT_PATH, chia_config, event_queue, rpc_refresh_interval) + rpc_collector = await RpcCollector.create(DEFAULT_ROOT_PATH, chia_config, event_queue, config.rpc_collector) except Exception as e: logging.warning(f"Failed to create RPC collector. Continuing without it. {type(e).__name__}: {e}") @@ -59,11 +60,13 @@ async def aggregator(exporter: ChiaExporter, notifier: Optional[Notifier], rpc_r except Exception as e: logging.warning(f"Failed to create WebSocket collector. Continuing without it. {type(e).__name__}: {e}") - try: - logging.info("🔌 Creating Price Collector...") - price_collector = await PriceCollector.create(DEFAULT_ROOT_PATH, chia_config, event_queue, price_refresh_interval) - except Exception as e: - logging.warning(f"Failed to create Price collector. Continuing without it. {type(e).__name__}: {e}") + if config.price_collector.enable: + try: + logging.info("🔌 Creating Price Collector...") + price_collector = await PriceCollector.create(DEFAULT_ROOT_PATH, chia_config, event_queue, + config.price_collector) + except Exception as e: + logging.warning(f"Failed to create Price collector. Continuing without it. {type(e).__name__}: {e}") if rpc_collector and ws_collector: logging.info("🚀 Starting monitoring loop!") @@ -102,47 +105,21 @@ async def aggregator(exporter: ChiaExporter, notifier: Optional[Notifier], rpc_r notifier.stop() -def read_config(): - with open("config.json") as f: - config = json.load(f) - return config - - if __name__ == "__main__": - initilize_logging() + initialize_logging() try: config = read_config() - except: - logging.error( - "Failed to read config.json. Please copy the config-example.json to config.json and configure it to your preferences." - ) - sys.exit(1) - - try: - exporter_port = config["exporter_port"] - rpc_refresh_interval = config["rpc_collector"]["refresh_interval_seconds"] - price_refresh_interval = enable_notifications = config["price_collector"]["refresh_interval_seconds"] - enable_notifications = config["notifications"]["enable"] - notifications_refresh_interval = config["notifications"]["refresh_interval_seconds"] - status_url = config["notifications"]["status_service_url"] - alert_url = config["notifications"]["alert_service_url"] - status_interval_minutes = config["notifications"]["status_interval_minutes"] - lost_plots_alert_threshold = config["notifications"]["lost_plots_alert_threshold"] - disable_proof_found_alert = config["notifications"]["disable_proof_found_alert"] - except KeyError as ex: - logging.error( - f"Failed to validate config. Missing required key {ex}. Please compare the fields of your config.json with the config-example.json and fix all inconsistencies." - ) + except RuntimeError as ex: + logging.error(ex) sys.exit(1) - exporter = ChiaExporter(exporter_port) - if enable_notifications: - notifier = Notifier(status_url, alert_url, status_interval_minutes, lost_plots_alert_threshold, - disable_proof_found_alert, notifications_refresh_interval) + exporter = ChiaExporter(config) + if config.notifier.enable_notifications: + notifier = Notifier(config.notifier) else: notifier = None try: - asyncio.run(aggregator(exporter, notifier, rpc_refresh_interval, price_refresh_interval)) + asyncio.run(aggregator(exporter, notifier, config)) except KeyboardInterrupt: logging.info("👋 Bye!") diff --git a/monitor/collectors/price_collector.py b/monitor/collectors/price_collector.py index 4b4f37d..f36c2a0 100644 --- a/monitor/collectors/price_collector.py +++ b/monitor/collectors/price_collector.py @@ -3,6 +3,7 @@ import asyncio import logging from asyncio.queues import Queue +from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Dict @@ -16,18 +17,24 @@ PRICE_API = f"https://api.coingecko.com/api/v3/simple/price?ids=chia&vs_currencies={','.join(VS_CURRENCIES)}" +@dataclass(frozen=True) +class PriceCollectorConfiguration: + enable: bool + refresh_interval_seconds: int + + class PriceCollector(Collector): session: aiohttp.ClientSession - refresh_interval_seconds: int + config: PriceCollectorConfiguration @staticmethod async def create(_root_path: Path, _net_config: Dict, event_queue: Queue[ChiaEvent], - refresh_interval_seconds: int) -> Collector: + config: PriceCollectorConfiguration) -> Collector: self = PriceCollector() + self.config = config self.log = logging.getLogger(__name__) self.event_queue = event_queue self.session = aiohttp.ClientSession() - self.refresh_interval_seconds = refresh_interval_seconds return self async def get_current_prices(self) -> None: @@ -49,7 +56,7 @@ async def task(self) -> None: except Exception as e: self.log.warning( f"Error while collecting prices. Trying again... {type(e).__name__}: {e}") - await asyncio.sleep(self.refresh_interval_seconds) + await asyncio.sleep(self.config.refresh_interval_seconds) async def close(self) -> None: await self.session.close() diff --git a/monitor/collectors/rpc_collector.py b/monitor/collectors/rpc_collector.py index 877dda5..c68a0f9 100644 --- a/monitor/collectors/rpc_collector.py +++ b/monitor/collectors/rpc_collector.py @@ -1,10 +1,10 @@ from __future__ import annotations import asyncio -import json import logging import time from asyncio import Queue +from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Callable, Dict, List @@ -21,6 +21,11 @@ HarvesterPlotsEvent, PoolStateEvent, WalletBalanceEvent) +@dataclass(frozen=True) +class RpcCollectorConfiguration: + refresh_interval_seconds: int + + class RpcCollector(Collector): full_node_client: FullNodeRpcClient wallet_client: WalletRpcClient @@ -30,11 +35,11 @@ class RpcCollector(Collector): net_config: Dict hostname: str tasks: List[Callable] - refresh_interval_seconds: int + config: RpcCollectorConfiguration @staticmethod async def create(root_path: Path, net_config: Dict, event_queue: Queue[ChiaEvent], - refresh_interval_seconds: int) -> RpcCollector: + config: RpcCollectorConfiguration) -> RpcCollector: self = RpcCollector() self.log = logging.getLogger(__name__) self.event_queue = event_queue @@ -44,7 +49,7 @@ async def create(root_path: Path, net_config: Dict, event_queue: Queue[ChiaEvent self.hostname = net_config["self_hostname"] self.tasks = [] self.harvester_clients = [] - self.refresh_interval_seconds = refresh_interval_seconds + self.config = config try: full_node_rpc_port = net_config["full_node"]["rpc_port"] @@ -209,7 +214,7 @@ async def task(self) -> None: except Exception as e: self.log.warning( f"Error while collecting events. Trying again... {type(e).__name__}: {e}") - await asyncio.sleep(self.refresh_interval_seconds) + await asyncio.sleep(self.config.refresh_interval_seconds) @staticmethod async def close_rpc_client(rpc_client: RpcClient) -> None: diff --git a/monitor/configuration.py b/monitor/configuration.py new file mode 100644 index 0000000..366d5ac --- /dev/null +++ b/monitor/configuration.py @@ -0,0 +1,49 @@ +import json +from dataclasses import dataclass + +from monitor.collectors.rpc_collector import RpcCollectorConfiguration +from monitor.collectors.price_collector import PriceCollectorConfiguration +from monitor.notifier import NotifierConfiguration + + +@dataclass(frozen=True) +class Configuration: + exporter_port: int + rpc_collector: RpcCollectorConfiguration + price_collector: PriceCollectorConfiguration + notifier: NotifierConfiguration + + +def read_config() -> Configuration: + try: + with open("config.json") as f: + config_json = json.load(f) + except Exception: + raise RuntimeError("Failed to read config.json. " + "Please copy the config-example.json to config.json and configure it to your preferences.") + + try: + config = Configuration( + config_json["exporter_port"], + RpcCollectorConfiguration( + config_json["rpc_collector"]["refresh_interval_seconds"] + ), + PriceCollectorConfiguration( + config_json["price_collector"]["enable"], + config_json["price_collector"]["refresh_interval_seconds"] + ), + NotifierConfiguration( + config_json["notifications"]["enable"], + config_json["notifications"]["refresh_interval_seconds"], + config_json["notifications"]["status_interval_minutes"], + config_json["notifications"]["lost_plots_alert_threshold"], + config_json["notifications"]["disable_proof_found_alert"], + config_json["notifications"]["status_service_url"], + config_json["notifications"]["alert_service_url"] + ) + ) + except KeyError as ex: + raise RuntimeError(f"Failed to validate config. Missing required key {ex}. Please compare the fields of your " + f"config.json with the config-example.json and fix all inconsistencies. ") + + return config diff --git a/monitor/exporter.py b/monitor/exporter.py index 6ac47b8..41aaacb 100644 --- a/monitor/exporter.py +++ b/monitor/exporter.py @@ -3,6 +3,7 @@ from monitor.database.events import (BlockchainStateEvent, ChiaEvent, ConnectionsEvent, FarmingInfoEvent, HarvesterPlotsEvent, PoolStateEvent, PriceEvent, SignagePointEvent, WalletBalanceEvent) from monitor.database.queries import get_signage_point_ts +from monitor.configuration import Configuration class ChiaExporter: @@ -50,16 +51,18 @@ class ChiaExporter: num_pool_errors_24h_gauge = Gauge('chia_num_pool_errors_24h', 'Number of pool errors during the last 24 hours', ['p2', 'url']) - # Price metrics - price_usd_cents_gauge = Gauge('chia_price_usd_cent', 'Current Chia price in USD cent') - price_eur_cents_gauge = Gauge('chia_price_eur_cent', 'Current Chia price in EUR cent') - price_btc_satoshi_gauge = Gauge('chia_price_btc_satoshi', 'Current Chia price in BTC satoshi') - price_eth_gwei_gauge = Gauge('chia_price_eth_gwei', 'Current Chia price in ETH gwei') - last_signage_point: SignagePointEvent = None - def __init__(self, port: int) -> None: - start_http_server(port) + def __init_price_metrics(self): + self.price_usd_cents_gauge = Gauge('chia_price_usd_cent', 'Current Chia price in USD cent') + self.price_eur_cents_gauge = Gauge('chia_price_eur_cent', 'Current Chia price in EUR cent') + self.price_btc_satoshi_gauge = Gauge('chia_price_btc_satoshi', 'Current Chia price in BTC satoshi') + self.price_eth_gwei_gauge = Gauge('chia_price_eth_gwei', 'Current Chia price in ETH gwei') + + def __init__(self, config: Configuration) -> None: + if config.price_collector.enable: + self.__init_price_metrics() + start_http_server(config.exporter_port) def process_event(self, event: ChiaEvent) -> None: if isinstance(event, HarvesterPlotsEvent): diff --git a/monitor/notifier.py b/monitor/notifier.py index 5bb668c..3544219 100644 --- a/monitor/notifier.py +++ b/monitor/notifier.py @@ -1,4 +1,5 @@ import logging +from dataclasses import dataclass from threading import Thread from time import sleep @@ -9,6 +10,17 @@ PaymentNotification, SummaryNotification) +@dataclass(frozen=True) +class NotifierConfiguration: + enable_notifications: bool + refresh_interval_seconds: int + status_interval_minutes: int + lost_plots_alert_threshold: int + disable_proof_found_alert: bool + status_url: str + alert_url: str + + class Notifier: asset = AppriseAsset(async_mode=False) status_apobj = Apprise(asset=asset) @@ -16,20 +28,18 @@ class Notifier: refresh_interval: int stopped = False - def __init__(self, status_url: str, alert_url: str, status_interval_minutes: int, - lost_plots_alert_threshold: int, disable_proof_found_alert: bool, - refresh_interval_seconds: int) -> None: + def __init__(self, config: NotifierConfiguration) -> None: self.log = logging.getLogger(__name__) - self.status_apobj.add(status_url) - self.alert_apobj.add(alert_url) - self.refresh_interval = refresh_interval_seconds + self.status_apobj.add(config.status_url) + self.alert_apobj.add(config.alert_url) + self.refresh_interval = config.refresh_interval_seconds self.notifications = [ LostSyncNotification(self.alert_apobj), - LostPlotsNotification(self.alert_apobj, lost_plots_alert_threshold), + LostPlotsNotification(self.alert_apobj, config.lost_plots_alert_threshold), PaymentNotification(self.alert_apobj), - SummaryNotification(self.status_apobj, status_interval_minutes), + SummaryNotification(self.status_apobj, config.status_interval_minutes), ] - if not disable_proof_found_alert: + if not config.disable_proof_found_alert: self.notifications.append(FoundProofNotification(self.status_apobj)) def task(self) -> None: