Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to disable price collector and improve configuration code #68

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config-example.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"refresh_interval_seconds": 10
},
"price_collector": {
"enable": true,
"refresh_interval_seconds": 10
},
"notifications": {
Expand Down
65 changes: 21 additions & 44 deletions monitor/__main__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import json
import logging
import sys
from asyncio.queues import Queue
Expand All @@ -10,7 +9,9 @@
from chia.util.default_root import DEFAULT_ROOT_PATH
from sqlalchemy.exc import OperationalError

from monitor.collectors import RpcCollector, WsCollector
from monitor.configuration import Configuration, read_config
from monitor.collectors.ws_collector import WsCollector
from monitor.collectors.rpc_collector import RpcCollector
from monitor.collectors.price_collector import PriceCollector
from monitor.database import ChiaEvent, session
from monitor.exporter import ChiaExporter
Expand All @@ -20,7 +21,7 @@
chia_config = load_config(DEFAULT_ROOT_PATH, "config.yaml")


def initilize_logging():
def initialize_logging():
handler = colorlog.StreamHandler()
log_date_format = "%Y-%m-%dT%H:%M:%S"
handler.setFormatter(
Expand All @@ -40,16 +41,16 @@ def persist_event(event: ChiaEvent):
db_session.commit()


async def aggregator(exporter: ChiaExporter, notifier: Optional[Notifier], rpc_refresh_interval: int,
price_refresh_interval: int) -> None:
async def aggregator(exporter: ChiaExporter, notifier: Optional[Notifier], config: Configuration) -> None:
rpc_collector = None
ws_collector = None
price_collector = None
event_queue = Queue()
logger = ChiaLogger()

try:
logging.info("🔌 Creating RPC Collector...")
rpc_collector = await RpcCollector.create(DEFAULT_ROOT_PATH, chia_config, event_queue, rpc_refresh_interval)
rpc_collector = await RpcCollector.create(DEFAULT_ROOT_PATH, chia_config, event_queue, config.rpc_collector)
except Exception as e:
logging.warning(f"Failed to create RPC collector. Continuing without it. {type(e).__name__}: {e}")

Expand All @@ -59,11 +60,13 @@ async def aggregator(exporter: ChiaExporter, notifier: Optional[Notifier], rpc_r
except Exception as e:
logging.warning(f"Failed to create WebSocket collector. Continuing without it. {type(e).__name__}: {e}")

try:
logging.info("🔌 Creating Price Collector...")
price_collector = await PriceCollector.create(DEFAULT_ROOT_PATH, chia_config, event_queue, price_refresh_interval)
except Exception as e:
logging.warning(f"Failed to create Price collector. Continuing without it. {type(e).__name__}: {e}")
if config.price_collector.enable:
try:
logging.info("🔌 Creating Price Collector...")
price_collector = await PriceCollector.create(DEFAULT_ROOT_PATH, chia_config, event_queue,
config.price_collector)
except Exception as e:
logging.warning(f"Failed to create Price collector. Continuing without it. {type(e).__name__}: {e}")

if rpc_collector and ws_collector:
logging.info("🚀 Starting monitoring loop!")
Expand Down Expand Up @@ -102,47 +105,21 @@ async def aggregator(exporter: ChiaExporter, notifier: Optional[Notifier], rpc_r
notifier.stop()


def read_config():
with open("config.json") as f:
config = json.load(f)
return config


if __name__ == "__main__":
initilize_logging()
initialize_logging()
try:
config = read_config()
except:
logging.error(
"Failed to read config.json. Please copy the config-example.json to config.json and configure it to your preferences."
)
sys.exit(1)

try:
exporter_port = config["exporter_port"]
rpc_refresh_interval = config["rpc_collector"]["refresh_interval_seconds"]
price_refresh_interval = enable_notifications = config["price_collector"]["refresh_interval_seconds"]
enable_notifications = config["notifications"]["enable"]
notifications_refresh_interval = config["notifications"]["refresh_interval_seconds"]
status_url = config["notifications"]["status_service_url"]
alert_url = config["notifications"]["alert_service_url"]
status_interval_minutes = config["notifications"]["status_interval_minutes"]
lost_plots_alert_threshold = config["notifications"]["lost_plots_alert_threshold"]
disable_proof_found_alert = config["notifications"]["disable_proof_found_alert"]
except KeyError as ex:
logging.error(
f"Failed to validate config. Missing required key {ex}. Please compare the fields of your config.json with the config-example.json and fix all inconsistencies."
)
except RuntimeError as ex:
logging.error(ex)
sys.exit(1)

exporter = ChiaExporter(exporter_port)
if enable_notifications:
notifier = Notifier(status_url, alert_url, status_interval_minutes, lost_plots_alert_threshold,
disable_proof_found_alert, notifications_refresh_interval)
exporter = ChiaExporter(config)
if config.notifier.enable_notifications:
notifier = Notifier(config.notifier)
else:
notifier = None

try:
asyncio.run(aggregator(exporter, notifier, rpc_refresh_interval, price_refresh_interval))
asyncio.run(aggregator(exporter, notifier, config))
except KeyboardInterrupt:
logging.info("👋 Bye!")
15 changes: 11 additions & 4 deletions monitor/collectors/price_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import logging
from asyncio.queues import Queue
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict
Expand All @@ -16,18 +17,24 @@
PRICE_API = f"https://api.coingecko.com/api/v3/simple/price?ids=chia&vs_currencies={','.join(VS_CURRENCIES)}"


@dataclass(frozen=True)
class PriceCollectorConfiguration:
enable: bool
refresh_interval_seconds: int


class PriceCollector(Collector):
session: aiohttp.ClientSession
refresh_interval_seconds: int
config: PriceCollectorConfiguration

@staticmethod
async def create(_root_path: Path, _net_config: Dict, event_queue: Queue[ChiaEvent],
refresh_interval_seconds: int) -> Collector:
config: PriceCollectorConfiguration) -> Collector:
self = PriceCollector()
self.config = config
self.log = logging.getLogger(__name__)
self.event_queue = event_queue
self.session = aiohttp.ClientSession()
self.refresh_interval_seconds = refresh_interval_seconds
return self

async def get_current_prices(self) -> None:
Expand All @@ -49,7 +56,7 @@ async def task(self) -> None:
except Exception as e:
self.log.warning(
f"Error while collecting prices. Trying again... {type(e).__name__}: {e}")
await asyncio.sleep(self.refresh_interval_seconds)
await asyncio.sleep(self.config.refresh_interval_seconds)

async def close(self) -> None:
await self.session.close()
15 changes: 10 additions & 5 deletions monitor/collectors/rpc_collector.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations

import asyncio
import json
import logging
import time
from asyncio import Queue
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Callable, Dict, List
Expand All @@ -21,6 +21,11 @@
HarvesterPlotsEvent, PoolStateEvent, WalletBalanceEvent)


@dataclass(frozen=True)
class RpcCollectorConfiguration:
refresh_interval_seconds: int


class RpcCollector(Collector):
full_node_client: FullNodeRpcClient
wallet_client: WalletRpcClient
Expand All @@ -30,11 +35,11 @@ class RpcCollector(Collector):
net_config: Dict
hostname: str
tasks: List[Callable]
refresh_interval_seconds: int
config: RpcCollectorConfiguration

@staticmethod
async def create(root_path: Path, net_config: Dict, event_queue: Queue[ChiaEvent],
refresh_interval_seconds: int) -> RpcCollector:
config: RpcCollectorConfiguration) -> RpcCollector:
self = RpcCollector()
self.log = logging.getLogger(__name__)
self.event_queue = event_queue
Expand All @@ -44,7 +49,7 @@ async def create(root_path: Path, net_config: Dict, event_queue: Queue[ChiaEvent
self.hostname = net_config["self_hostname"]
self.tasks = []
self.harvester_clients = []
self.refresh_interval_seconds = refresh_interval_seconds
self.config = config

try:
full_node_rpc_port = net_config["full_node"]["rpc_port"]
Expand Down Expand Up @@ -209,7 +214,7 @@ async def task(self) -> None:
except Exception as e:
self.log.warning(
f"Error while collecting events. Trying again... {type(e).__name__}: {e}")
await asyncio.sleep(self.refresh_interval_seconds)
await asyncio.sleep(self.config.refresh_interval_seconds)

@staticmethod
async def close_rpc_client(rpc_client: RpcClient) -> None:
Expand Down
49 changes: 49 additions & 0 deletions monitor/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import json
from dataclasses import dataclass

from monitor.collectors.rpc_collector import RpcCollectorConfiguration
from monitor.collectors.price_collector import PriceCollectorConfiguration
from monitor.notifier import NotifierConfiguration


@dataclass(frozen=True)
class Configuration:
exporter_port: int
rpc_collector: RpcCollectorConfiguration
price_collector: PriceCollectorConfiguration
notifier: NotifierConfiguration


def read_config() -> Configuration:
try:
with open("config.json") as f:
config_json = json.load(f)
except Exception:
raise RuntimeError("Failed to read config.json. "
"Please copy the config-example.json to config.json and configure it to your preferences.")

try:
config = Configuration(
config_json["exporter_port"],
RpcCollectorConfiguration(
config_json["rpc_collector"]["refresh_interval_seconds"]
),
PriceCollectorConfiguration(
config_json["price_collector"]["enable"],
config_json["price_collector"]["refresh_interval_seconds"]
),
NotifierConfiguration(
config_json["notifications"]["enable"],
config_json["notifications"]["refresh_interval_seconds"],
config_json["notifications"]["status_interval_minutes"],
config_json["notifications"]["lost_plots_alert_threshold"],
config_json["notifications"]["disable_proof_found_alert"],
config_json["notifications"]["status_service_url"],
config_json["notifications"]["alert_service_url"]
)
)
except KeyError as ex:
raise RuntimeError(f"Failed to validate config. Missing required key {ex}. Please compare the fields of your "
f"config.json with the config-example.json and fix all inconsistencies. ")

return config
19 changes: 11 additions & 8 deletions monitor/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from monitor.database.events import (BlockchainStateEvent, ChiaEvent, ConnectionsEvent, FarmingInfoEvent,
HarvesterPlotsEvent, PoolStateEvent, PriceEvent, SignagePointEvent, WalletBalanceEvent)
from monitor.database.queries import get_signage_point_ts
from monitor.configuration import Configuration


class ChiaExporter:
Expand Down Expand Up @@ -50,16 +51,18 @@ class ChiaExporter:
num_pool_errors_24h_gauge = Gauge('chia_num_pool_errors_24h', 'Number of pool errors during the last 24 hours',
['p2', 'url'])

# Price metrics
price_usd_cents_gauge = Gauge('chia_price_usd_cent', 'Current Chia price in USD cent')
price_eur_cents_gauge = Gauge('chia_price_eur_cent', 'Current Chia price in EUR cent')
price_btc_satoshi_gauge = Gauge('chia_price_btc_satoshi', 'Current Chia price in BTC satoshi')
price_eth_gwei_gauge = Gauge('chia_price_eth_gwei', 'Current Chia price in ETH gwei')

last_signage_point: SignagePointEvent = None

def __init__(self, port: int) -> None:
start_http_server(port)
def __init_price_metrics(self):
self.price_usd_cents_gauge = Gauge('chia_price_usd_cent', 'Current Chia price in USD cent')
self.price_eur_cents_gauge = Gauge('chia_price_eur_cent', 'Current Chia price in EUR cent')
self.price_btc_satoshi_gauge = Gauge('chia_price_btc_satoshi', 'Current Chia price in BTC satoshi')
self.price_eth_gwei_gauge = Gauge('chia_price_eth_gwei', 'Current Chia price in ETH gwei')

def __init__(self, config: Configuration) -> None:
if config.price_collector.enable:
self.__init_price_metrics()
start_http_server(config.exporter_port)

def process_event(self, event: ChiaEvent) -> None:
if isinstance(event, HarvesterPlotsEvent):
Expand Down
28 changes: 19 additions & 9 deletions monitor/notifier.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from dataclasses import dataclass
from threading import Thread
from time import sleep

Expand All @@ -9,27 +10,36 @@
PaymentNotification, SummaryNotification)


@dataclass(frozen=True)
class NotifierConfiguration:
enable_notifications: bool
refresh_interval_seconds: int
status_interval_minutes: int
lost_plots_alert_threshold: int
disable_proof_found_alert: bool
status_url: str
alert_url: str


class Notifier:
asset = AppriseAsset(async_mode=False)
status_apobj = Apprise(asset=asset)
alert_apobj = Apprise(asset=asset)
refresh_interval: int
stopped = False

def __init__(self, status_url: str, alert_url: str, status_interval_minutes: int,
lost_plots_alert_threshold: int, disable_proof_found_alert: bool,
refresh_interval_seconds: int) -> None:
def __init__(self, config: NotifierConfiguration) -> None:
self.log = logging.getLogger(__name__)
self.status_apobj.add(status_url)
self.alert_apobj.add(alert_url)
self.refresh_interval = refresh_interval_seconds
self.status_apobj.add(config.status_url)
self.alert_apobj.add(config.alert_url)
self.refresh_interval = config.refresh_interval_seconds
self.notifications = [
LostSyncNotification(self.alert_apobj),
LostPlotsNotification(self.alert_apobj, lost_plots_alert_threshold),
LostPlotsNotification(self.alert_apobj, config.lost_plots_alert_threshold),
PaymentNotification(self.alert_apobj),
SummaryNotification(self.status_apobj, status_interval_minutes),
SummaryNotification(self.status_apobj, config.status_interval_minutes),
]
if not disable_proof_found_alert:
if not config.disable_proof_found_alert:
self.notifications.append(FoundProofNotification(self.status_apobj))

def task(self) -> None:
Expand Down