-
-
Notifications
You must be signed in to change notification settings - Fork 117
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[monitoring] Adding influxDB 2.x version support #274
Fixes #274
- Loading branch information
1 parent
0366a4b
commit 2e3043c
Showing
11 changed files
with
954 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,25 +1,38 @@ | ||
FROM python:3.9.19-slim-bullseye | ||
|
||
# Install system dependencies | ||
RUN apt update && \ | ||
apt install --yes zlib1g-dev libjpeg-dev gdal-bin libproj-dev \ | ||
libgeos-dev libspatialite-dev libsqlite3-mod-spatialite \ | ||
sqlite3 libsqlite3-dev openssl libssl-dev fping && \ | ||
rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/* | ||
|
||
# Upgrade pip and install Python dependencies | ||
RUN pip install -U pip setuptools wheel | ||
|
||
# Copy and install project dependencies | ||
COPY requirements-test.txt requirements.txt /opt/openwisp/ | ||
RUN pip install -r /opt/openwisp/requirements.txt && \ | ||
pip install -r /opt/openwisp/requirements-test.txt && \ | ||
rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/* | ||
|
||
# Copy project files and install the project | ||
ADD . /opt/openwisp | ||
RUN pip install -U /opt/openwisp && \ | ||
rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/* | ||
|
||
# Set working directory | ||
WORKDIR /opt/openwisp/tests/ | ||
|
||
# Set environment variables | ||
ENV NAME=openwisp-monitoring \ | ||
PYTHONBUFFERED=1 \ | ||
INFLUXDB_HOST=influxdb \ | ||
INFLUXDB1_HOST=influxdb \ | ||
INFLUXDB2_HOST=influxdb2 \ | ||
REDIS_HOST=redis | ||
CMD ["sh", "docker-entrypoint.sh"] | ||
|
||
# Expose the application port | ||
EXPOSE 8000 | ||
|
||
# Command to run the application | ||
CMD ["sh", "docker-entrypoint.sh"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import logging | ||
|
||
from django.utils.functional import cached_property | ||
|
||
from openwisp_monitoring.utils import retry | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class BaseDatabaseClient: | ||
def __init__(self, db_name=None): | ||
self._db = None | ||
self.db_name = db_name | ||
|
||
@cached_property | ||
def db(self): | ||
raise NotImplementedError("Subclasses must implement `db` method") | ||
|
||
@retry | ||
def create_database(self): | ||
raise NotImplementedError("Subclasses must implement `create_database` method") | ||
|
||
@retry | ||
def drop_database(self): | ||
raise NotImplementedError("Subclasses must implement `drop_database` method") | ||
|
||
@retry | ||
def query(self, query): | ||
raise NotImplementedError("Subclasses must implement `query` method") | ||
|
||
def write(self, name, values, **kwargs): | ||
raise NotImplementedError("Subclasses must implement `write` method") | ||
|
||
def get_list_retention_policies(self, name=None): | ||
raise NotImplementedError( | ||
"Subclasses must implement `get_list_retention_policies` method" | ||
) | ||
|
||
def create_or_alter_retention_policy(self, name, duration): | ||
raise NotImplementedError( | ||
"Subclasses must implement `create_or_alter_retention_policy` method" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
import logging | ||
|
||
from django.utils.functional import cached_property | ||
from influxdb_client import InfluxDBClient, Point | ||
from influxdb_client.client.exceptions import InfluxDBError | ||
from influxdb_client.client.write_api import SYNCHRONOUS | ||
|
||
from openwisp_monitoring.utils import retry | ||
|
||
from ...exceptions import TimeseriesWriteException | ||
from .. import TIMESERIES_DB | ||
from ..base import BaseDatabaseClient | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class DatabaseClient(BaseDatabaseClient): | ||
backend_name = 'influxdb2' | ||
|
||
def __init__(self, db_name=None): | ||
super().__init__(db_name) | ||
self.client_error = InfluxDBError | ||
|
||
@cached_property | ||
def db(self): | ||
return InfluxDBClient( | ||
url=f"http://{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}", | ||
token=TIMESERIES_DB['TOKEN'], | ||
org=TIMESERIES_DB['ORG'], | ||
bucket=self.db_name, | ||
) | ||
|
||
@retry | ||
def create_database(self): | ||
self.write_api = self.db.write_api(write_options=SYNCHRONOUS) | ||
self.query_api = self.db.query_api() | ||
logger.debug('Initialized APIs for InfluxDB 2.0') | ||
|
||
@retry | ||
def drop_database(self): | ||
pass # Implement as needed for InfluxDB 2.0 | ||
|
||
@retry | ||
def query(self, query): | ||
return self.query_api.query(query) | ||
|
||
def write(self, name, values, **kwargs): | ||
point = Point(name).time(self._get_timestamp(kwargs.get('timestamp'))) | ||
tags = kwargs.get('tags', {}) | ||
for tag, value in tags.items(): | ||
point.tag(tag, value) | ||
for field, value in values.items(): | ||
point.field(field, value) | ||
try: | ||
self.write_api.write(bucket=self.db_name, record=point) | ||
except InfluxDBError as e: | ||
raise TimeseriesWriteException(str(e)) | ||
|
||
@retry | ||
def get_list_retention_policies(self, name=None): | ||
bucket = self.db.buckets_api().find_bucket_by_name(name) | ||
if bucket: | ||
return bucket.retention_rules | ||
return [] | ||
|
||
@retry | ||
def create_or_alter_retention_policy(self, name, duration): | ||
bucket = self.db.buckets_api().find_bucket_by_name(name) | ||
retention_rules = [{"type": "expire", "everySeconds": duration}] | ||
if bucket: | ||
bucket.retention_rules = retention_rules | ||
self.db.buckets_api().update_bucket(bucket=bucket) | ||
else: | ||
self.db.buckets_api().create_bucket( | ||
bucket_name=name, | ||
retention_rules=retention_rules, | ||
org=TIMESERIES_DB["ORG"], | ||
) |
Oops, something went wrong.