Skip to content

Commit

Permalink
Local History Service
Browse files Browse the repository at this point in the history
This new service registers to dbus and collect all the traffic to
files for the specified period.
User can define the number of days (1 file per day) of history and also the dst endpoints to keep.
  • Loading branch information
GwendalRaoul committed Oct 13, 2023
1 parent 9d72b6f commit 55bdf9b
Show file tree
Hide file tree
Showing 3 changed files with 312 additions and 0 deletions.
71 changes: 71 additions & 0 deletions .github/workflows/docker_local_history_service.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
name: build local history service docker

on:
push:
branches:
# temporary for testing purpose, to be removed once merged
- history_service
- master
paths:
- 'docker/gateway_local_history_service/Dockerfile'
- 'local_history_service/**'
- '.github/workflows/local_history_service.yml'

workflow_dispatch:
inputs:
tag:
description: 'Tag to push on docker hub'
required: true

release:
types: [created]

env:
IMAGE_NAME: 'wirepas/gateway_local_history_service'

jobs:
build:
runs-on: ubuntu-latest
steps:
- name: checkout code
uses: actions/checkout@v2

- name: Set up QEMU
uses: docker/setup-qemu-action@v1

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1

- name: Set tag for push
if: github.event_name == 'push'
run: echo "TAG1=$IMAGE_NAME:edge" >> $GITHUB_ENV

- name: Set tag for manually triggered
if: github.event_name == 'workflow_dispatch'
run: echo "TAG1=$IMAGE_NAME:${{ github.event.inputs.tag }}" >> $GITHUB_ENV

- name: Set tag for release version
if: github.event_name == 'release'
run: echo "TAG1=$IMAGE_NAME:${{ github.event.release.tag_name }}" >> $GITHUB_ENV

- name: Set additionnal latest tag also for official release
if: github.event_name == 'release' && !contains(github.event.release.tag_name, 'rc')
run: echo "TAG2=$IMAGE_NAME:latest" >> $GITHUB_ENV

- name: Login to docker hub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}

- name: Build and push
uses: docker/build-push-action@v2
with:
context: .
file: docker/local_history_service/Dockerfile
platforms: linux/amd64, linux/arm64, linux/arm/v7, linux/arm/v6
push: true
build-args: GATEWAY_BUILD_SHA1= ${{ github.sha }}
tags: |
${{ env.TAG1 }}
${{ env.TAG2 }}
59 changes: 59 additions & 0 deletions docker/local_history_service/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Duplicate of transport service for now
FROM python:3.10.8-alpine3.17 AS builder

RUN adduser --disabled-password wirepas

RUN apk add --no-cache \
gcc \
bash \
build-base \
make \
musl-dev \
elogind-dev \
python3-dev \
py3-gobject3 \
cairo-dev \
gobject-introspection-dev

RUN python3 -m pip install wheel setuptools pkgconfig

USER wirepas
WORKDIR /home/wirepas

COPY --chown=wirepas ./python_transport /home/wirepas/python_transport
WORKDIR /home/wirepas/python_transport

RUN ./utils/generate_wheel.sh

USER wirepas

RUN pip3 install dist/wirepas_gateway*.whl --no-deps --user

RUN pip3 install pydbus==0.6.0 PyYAML==6.0.1 --user
RUN pip3 install gobject PyGObject --user


# Build the final image
FROM wirepas/wmm_alpine_cpp:1.2.3 as runner

USER root

# Variable set from CI
ARG GATEWAY_BUILD_SHA1=unset

RUN apk add --no-cache libelogind glib gobject-introspection ndisc6 iproute2 sudo

RUN adduser wirepas wheel

USER wirepas

ENV PATH="/home/wirepas/.local/bin:${PATH}"

# Copy the built wheel and its dependencies from builder
COPY --from=builder /home/wirepas/.local /home/wirepas/.local

COPY ./local_history_service/*.py /home/wirepas/local_history_service/

CMD ["python3", "/home/wirepas/local_history_service/local_history_service.py"]

LABEL com.wirepas.gateway.build.sha1="${GATEWAY_BUILD_SHA1}"
182 changes: 182 additions & 0 deletions local_history_service/local_history_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# Copyright 2023 Wirepas Ltd licensed under Apache License, Version 2.0
#
# See file LICENSE for full license details.

import os
import sys
from datetime import datetime, timedelta
import argparse
import logging
import base64

from wirepas_gateway.dbus.dbus_client import BusClient
from wirepas_gateway import __pkg_name__


class LocalHistoryService(BusClient):
def __init__(self, historical_days=5, file_path="", file_prefix="lhs", endpoints=None) -> None:

super(LocalHistoryService, self).__init__(
ignored_ep_filter=None
)

self.historical_days = historical_days
self.file_path = file_path
self.file_prefix = file_prefix
self.endpoints = endpoints

logging.info("Local history service started for %d days for EPs: %s", historical_days, endpoints)

def on_data_received(
self,
sink_id,
timestamp,
src,
dst,
src_ep,
dst_ep,
travel_time,
qos,
hop_count,
data,
):
if dst_ep not in self.endpoints:
logging.debug("Filtered EPs")
return

# Get current time
now = datetime.now()

# Compute the file name
file_suffix = now.strftime("_%d_%m_%Y")
target_file = os.path.join(self.file_path, self.file_prefix + file_suffix)

logging.info("Packet received to be written to %s", target_file)

# Check if we have created the file
file_created = not os.path.exists(target_file)

with open(target_file, 'a') as cur_file:
cur_file.write("%d;%x;%d;%d;%s\n" % (
now.timestamp(),
src,
src_ep,
dst_ep,
base64.b64encode(data))
)

if file_created:
# File was created, check if we have to remove an older one
logging.info("Check if a file must be deleted")
file_suffix_to_remove = (now - timedelta(days=(self.historical_days + 1))).strftime("_%d_%m_%Y")
file_to_remove = os.path.join(self.file_path, self.file_prefix + file_suffix_to_remove)

try:
logging.debug("Trying to remove file %s", file_to_remove)
os.remove(file_to_remove)
except OSError:
logging.debug("No file to remove")

def str2none(value):
""" Ensures string to bool conversion """
if value == "":
return None
return value

def parse_setting_list(list_setting):
""" This function parse ep list specified from setting file or cmd line
Input list has following format [1, 5, 10-15] as a string or list of string
and is expended as a single list [1, 5, 10, 11, 12, 13, 14, 15]
Args:
list_setting(str or list): the list from setting file or cmd line.
Returns: A single list of ep
"""
if isinstance(list_setting, str):
# List is a string from cmd line
list_setting = list_setting.replace("[", "")
list_setting = list_setting.replace("]", "")
list_setting = list_setting.split(",")

single_list = []
for ep in list_setting:
# Check if ep is directly an int
if isinstance(ep, int):
if ep < 0 or ep > 255:
raise SyntaxError("EP out of bound")
single_list.append(ep)
continue

# Check if ep is a single ep as string
try:
ep = int(ep)
if ep < 0 or ep > 255:
raise SyntaxError("EP out of bound")
single_list.append(ep)
continue
except ValueError:
# Probably a range
pass

# Check if ep is a range
try:
ep = ep.replace("'", "")
lower, upper = ep.split("-")
lower = int(lower)
upper = int(upper)
if lower > upper or lower < 0 or upper > 255:
raise SyntaxError("Wrong EP range value")

single_list += list(range(lower, upper + 1))
except (AttributeError, ValueError):
raise SyntaxError("Wrong EP range format")

if len(single_list) == 0:
single_list = None

return single_list

if __name__ == "__main__":


debug_level = os.environ.get("WM_DEBUG_LEVEL", "info")
# Convert it in upper for logging config
debug_level = "{0}".format(debug_level.upper())

# enable its logger
logging.basicConfig(
format=f'%(asctime)s | [%(levelname)s] {__pkg_name__}@%(filename)s:%(lineno)d:%(message)s',
level=debug_level,
stream=sys.stdout
)

parser = argparse.ArgumentParser(fromfile_prefix_chars='@')

parser.add_argument(
"--historical_days",
default=os.environ.get("WM_LHS_NUM_DAYS", 5),
action="store",
type=int,
help="Number of historical days to store (1 file per day)",
)

parser.add_argument(
"--historical_file_path",
default=os.environ.get("WM_LHS_PATH", ""),
action="store",
type=str,
help="Path to store files",
)

parser.add_argument(
"--endpoints_to_save",
type=str2none,
default=os.environ.get("WM_LHS_ENDPOINTS", None),
help=("Destination endpoints list to keep in history (all if not set)"),
)

args = parser.parse_args()

LocalHistoryService(args.historical_days, args.historical_file_path, endpoints=parse_setting_list(args.endpoints_to_save)).run()

0 comments on commit 55bdf9b

Please sign in to comment.