Skip to content

Commit

Permalink
Merge pull request #7 from KuguHome/http-base64
Browse files Browse the repository at this point in the history
Adding HTTP output and base64 decoding option
  • Loading branch information
blavka authored Sep 11, 2019
2 parents 729c1a8 + c1d07c5 commit aafe6db
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
14 changes: 13 additions & 1 deletion mqtt2influxdb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ def port_range(port):
Optional('certfile'): os.path.exists,
Optional('keyfile'): os.path.exists,
},
Optional('http'): {
'destination': And(str, len),
'action': And(str, len),
Optional('username'): And(str, len),
Optional('password'): And(str, len)
},

'influxdb': {
'host': And(str, len),
'port': And(int, port_range),
Expand All @@ -43,9 +50,14 @@ def port_range(port):
'database': And(str, len),
Optional('ssl'): bool
},
Optional("base64decode"): {
'source': And(str, len, Use(str_or_jsonPath)),
'target': And(str, len)
},
'points': [{
'measurement': And(str, len, Use(str_or_jsonPath)),
'topic': And(str, len),
Optional('httpcontent'): {str: And(str, len, Use(str_or_jsonPath))},
Optional('fields'): Or({str: And(str, len, Use(str_or_jsonPath))}, And(str, len, Use(str_or_jsonPath))),
Optional('tags'): {str: And(str, len, Use(str_or_jsonPath))},
Optional('database'): And(str, len)
Expand All @@ -55,7 +67,7 @@ def port_range(port):

def load_config(config_filename):
with open(config_filename, 'r') as f:
config = yaml.load(f)
config = yaml.load(f, Loader=yaml.FullLoader)
try:
return schema.validate(config)
except SchemaError as e:
Expand Down
25 changes: 25 additions & 0 deletions mqtt2influxdb/mqtt2influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@
from paho.mqtt.client import topic_matches_sub
import influxdb
import jsonpath_ng
import requests
import base64
from requests.auth import HTTPBasicAuth
import http.client as http_client


class Mqtt2InfluxDB:

def __init__(self, config):

self._points = config['points']
self._config = config

self._influxdb = influxdb.InfluxDBClient(config['influxdb']['host'],
config['influxdb']['port'],
Expand Down Expand Up @@ -106,6 +111,12 @@ def _on_mqtt_message(self, client, userdata, message):
'time': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
'tags': {},
'fields': {}}
if 'base64decode' in self._config:
data = self._get_value_from_str_or_JSONPath(self._config['base64decode']["source"], msg)
dataDecoded = base64.b64decode(data)
msg.update({"base64decoded": {self._config['base64decode']["target"]: {"raw": dataDecoded}}})
dataDecoded = dataDecoded.hex()
msg.update({"base64decoded": {self._config['base64decode']["target"]: {"hex": dataDecoded}}})

if 'fields' in point:
if isinstance(point['fields'], jsonpath_ng.JSONPath):
Expand Down Expand Up @@ -138,6 +149,20 @@ def _on_mqtt_message(self, client, userdata, message):

self._influxdb.write_points([record], database=point.get('database', None))

if 'http' in self._config:
http_record = {}
for key in point['httpcontent']:
val = self._get_value_from_str_or_JSONPath(point['httpcontent'][key], msg)
if val is None:
continue
http_record.update({key: val})

action = getattr(requests, self._config['http']['action'], None)
if action:
r = action(url=self._config['http']['destination'], data=http_record, auth=HTTPBasicAuth(self._config['http']['username'], self._config['http']['password']))
else:
print("Invalid HTTP method key!")

def _get_value_from_str_or_JSONPath(self, param, msg):
if isinstance(param, str):
return param
Expand Down

0 comments on commit aafe6db

Please sign in to comment.