Skip to content
This repository has been archived by the owner on Sep 26, 2022. It is now read-only.

System monitor #148

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.vscode/
.idea/
.venv/
.venv*
*.log
.DS_Store
bitcoin.conf*
Expand All @@ -18,3 +18,5 @@ htmlcov
docs/
.teos
.teos_cli
*.orig
/monitor/monitor.conf
34 changes: 34 additions & 0 deletions monitor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
This is a system monitor for viewing available user slots, appointments, and other data related to Teos. Data is loaded and searched using Elasticsearch and visualized using Kibana to produce something like this:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The README would need some work, but I'll leave the nits for the final version.


![Dashboard example](https://ibb.co/ypBtfdM)

### Prerequisites

Need to already be running a bitcoin node and a Teos watchtower. (See: https://github.com/talaia-labs/python-teos)

### Installation

Install and run both Elasticsearch and Kibana, which both need to be running for this visualization tool to work.

https://www.elastic.co/guide/en/elasticsearch/reference/current/install-elasticsearch.html
https://www.elastic.co/guide/en/kibana/current/install.html

### Dependencies

Install the dependencies by running:

```pip install -r requirements.txt```

### Config

It is also required to create a config file in this directory. `sample-monitor.conf` in this directory provides an example.
orbitalturtle marked this conversation as resolved.
Show resolved Hide resolved

Create a file named `monitor.conf` in this directory with the correct configuration values, including the correct host and port where Elasticsearch and Kibana are running, either on localhost or on another host.

### Run it

Follow the same instructions as shown here for running the module: https://github.com/talaia-labs/python-teos/blob/master/INSTALL.md

In short, run it with:

```python3 -m monitor.monitor_start```
13 changes: 13 additions & 0 deletions monitor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os

MONITOR_DIR = os.path.expanduser("~/.teos_monitor/")
MONITOR_CONF = "monitor.conf"

MONITOR_DEFAULT_CONF = {
"ES_HOST": {"value": "localhost", "type": str},
"ES_PORT": {"value": 9200, "type": int},
"KIBANA_HOST": {"value": "localhost", "type": str},
"KIBANA_PORT": {"value": 5601, "type": int},
"API_BIND": {"value": "localhost", "type": str},
"API_PORT": {"value": 9814, "type": int},
}
orbitalturtle marked this conversation as resolved.
Show resolved Hide resolved
264 changes: 264 additions & 0 deletions monitor/data_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
import json
import time

from elasticsearch import Elasticsearch, helpers
from elasticsearch.client import IndicesClient

from cli import teos_cli
from common.logger import Logger

LOG_PREFIX = "System Monitor"
logger = Logger(actor="Data loader", log_name_prefix=LOG_PREFIX)


class DataLoader:
"""
The :class:`DataLoader` is in charge of the monitor's Elasticsearch functionality for loading and searching through data.

Args:
es_host (:obj:`str`): The host Elasticsearch is listening on.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't ES host and port only used to create the ElasticSearch object? If so they don't need to be stored.

es_port (:obj:`int`): The port Elasticsearch is listening on.
api_host (:obj:`str`): The host Teos is listening on.
api_port (:obj:`int`): The port Teos is listening on.
log_file (:obj:`int`): The path to the log file ES will pull data from.

Attributes:
es_host (:obj:`str`): The host Elasticsearch is running on.
es_port (:obj:`int`): The port Elasticsearch is runnning on.
cloud_id (:obj:`str`): Elasticsearch cloud id, if Elasticsearch Cloud is being used.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be removed

es (:obj:`Elasticsearch <elasticsearch.Elasticsearch>`): The Elasticsearch client for searching for data to be visualized.
index_client (:obj:`IndicesClient <elasticsearch.client.IndiciesClient>`): The index client where log data is stored.
log_path (:obj:`str`): The path to the log file where log file will be pulled from and analyzed by ES.
api_host (:obj:`str`): The host Teos is listening on.
api_port (:obj:`int`): The port Teos is listening on.

"""

def __init__(self, es_host, es_port, api_host, api_port, log_file):
self.es_host = es_host
self.es_port = es_port
self.es = Elasticsearch([
{'host': self.es_host, 'port': self.es_port}
])
self.index_client = IndicesClient(self.es)
self.log_path = log_file
self.api_host = api_host
self.api_port = api_port

def start(self):
"""Loads data to be visualized in Kibana"""

if self.index_client.exists("logs"):
self.delete_index("logs")

# Pull the watchtower logs into Elasticsearch.
self.create_index("logs")
log_data = self.load_logs(self.log_path)
self.index_data_bulk("logs", log_data)

# Grab the other data we need to visualize a graph.
self.load_and_index_other_data()

def create_index(self, index):
"""
Create index with a particular mapping.

Args:
index (:obj:`str`): Index the mapping is in.

"""

body = {
"mappings": {
"properties": {
"doc.time": {
"type": "date",
"format": "epoch_second||strict_date_optional_time||dd/MM/yyyy HH:mm:ss"
},
"doc.error.code": {
"type": "integer"
},
"doc.watcher_appts": {
"type": "integer"
},
"doc.responder_appts": {
"type": "integer"
}
}
}
}

resp = self.index_client.create(index, body)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resp is not being used. If there's not return nor checks to be done, you may just call the method.


# TODO: Logs are constantly being updated. Keep that data updated
def load_logs(self, log_path):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be static

"""
Reads teos log into a list.

Args:
log_path (:obj:`str`): The path to the log file.

Returns:
:obj:`list`: A list of logs in dict form.

Raises:
FileNotFoundError: If path doesn't correspond to an existing log file.

"""

# Load the initial log file.
logs = []
with open(log_path, "r") as log_file:
for log in log_file:
log_data = json.loads(log.strip())
logs.append(log_data)

return logs

# TODO: Throw an error if the file is empty or if data isn't JSON-y.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would also raise a FileNotFoundError if the log file is not found but you're not handling exceptions in the main method. Not a big deal since we should move from this approach to an online approach, but have in mind adding a general try/catch if you're bubbling exceptions up.


def load_and_index_other_data(self):
"""
Loads and indexes the rest of the data into Elasticsearch that we'll need to visualize using Kibana.

"""

# Grab # of appointments in watcher and responder
num_appts = self.get_num_appointments()
watcher_appts = num_appts[0]
responder_appts = num_appts[1]

# self.es.search for the watcher_appts doc... if it exists, then update the item.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an old line / comment?


# index current number of appointments in watcher and responder
self.index_item("logs", "watcher_appts", watcher_appts)
self.index_item("logs", "responder_appts", responder_appts)

def index_item(self, index, field, value):
"""
Indexes logs in elasticsearch so they can be searched.

Args:
index (:obj:`str`): The index to which we want to load data.
field (:obj:`str`): The field of the data to be loaded.
value (:obj:`str`): The value of the data to be loaded.

"""

body = {
"doc.{}".format(field): value,
"doc.time": time.time()
}

resp = self.es.index(index, body)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, resp not being used.


@staticmethod
def gen_data(index, data):
"""
Formats logs so it can be sent to Elasticsearch in bulk.

Args:
log_data (:obj:`list`): A list of logs in dict form.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The args do not match


Yields:
:obj:`dict`: A dict conforming to the required format for sending data to elasticsearch in bulk.
"""

for log in data:
yield {
"_index": index,
"doc": log
}

def index_data_bulk(self, index, data):
"""
Indexes logs in elasticsearch so they can be searched.

Args:
index (:obj:`str`): The index to which we want to load data.
data (:obj:`list`): A list of data in dict form.

Returns:
response (:obj:`tuple`): The first value of the tuple equals the number of the logs data was entered successfully. If there are errors the second value in the tuple includes the errors.

Raises:
elasticsearch.helpers.errors.BulkIndexError: Returned by Elasticsearch if indexing log data fails.

"""

response = helpers.bulk(self.es, self.gen_data(index, data))

# The response is a tuple of two items: 1) The number of items successfully indexed. 2) Any errors returned.
if (response[0] <= 0):
logger.error("None of the logs were indexed. Log data might be in the wrong form.")

return response

def get_num_appointments(self):
"""
Gets number of appointments the tower is storing in the watcher and responder, so we can load this data into Elasticsearch.

Returns:
:obj:`list`: A list where the 0th element describes # of watcher appointments and the 1st element describes # of responder appointments.
"""

teos_url = "http://{}:{}".format(self.api_host, self.api_port)

resp = teos_cli.get_all_appointments(teos_url)

response = json.loads(resp)

watcher_appts = len(response.get("watcher_appointments"))
responder_appts = len(response.get("responder_trackers"))

return [watcher_appts, responder_appts]

def delete_index(self, index):
"""
Deletes the chosen index of Elasticsearch.

Args:
index (:obj:`str`): The ES index to delete.
"""

results = self.index_client.delete(index)

# For testing purposes...
def search_logs(self, field, keyword, index):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem to be used anymore. If the only purpose is testing, maybe it could be part of a local temp file or the testing suite.

"""
Searches Elasticsearch for data with a certain field and keyword.

Args:
field (:obj:`str`): The search field.
keyword (:obj:`str`): The search keyword.
index (:obj:`str`): The index in Elasticsearch to search through.

Returns:
:obj:`dict`: A dict describing the results, including the first 10 docs matching the search words.
"""

body = {
"query": {"match": {"doc.{}".format(field): keyword}}
}
results = self.es.search(body, index)

return results


def get_all_logs(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method does not seem to be used anywhere.

"""
Retrieves all logs in the logs index of Elasticsearch.

Returns:
:obj:`dict`: A dict describing the results, including the first 10 docs.
"""

body = {
"query": { "match_all": {} }
}
results = self.es.search(body, "logs")

results = json.dumps(results, indent=4)

return results

Loading