From 55bdf9b87a112e2c12dd1577b0914fdf21d7f85b Mon Sep 17 00:00:00 2001 From: Gwendal Raoul Date: Mon, 20 Mar 2023 13:05:59 +0100 Subject: [PATCH] Local History Service This new service registers to dbus and collect all the traffic to files for the specified period. User can define the number of days (1 file per day) of history and also the dst endpoints to keep. --- .../docker_local_history_service.yml | 71 +++++++ docker/local_history_service/Dockerfile | 59 ++++++ .../local_history_service.py | 182 ++++++++++++++++++ 3 files changed, 312 insertions(+) create mode 100644 .github/workflows/docker_local_history_service.yml create mode 100644 docker/local_history_service/Dockerfile create mode 100644 local_history_service/local_history_service.py diff --git a/.github/workflows/docker_local_history_service.yml b/.github/workflows/docker_local_history_service.yml new file mode 100644 index 00000000..902e998a --- /dev/null +++ b/.github/workflows/docker_local_history_service.yml @@ -0,0 +1,71 @@ +name: build local history service docker + +on: + push: + branches: + # temporary for testing purpose, to be removed once merged + - history_service + - master + paths: + - 'docker/gateway_local_history_service/Dockerfile' + - 'local_history_service/**' + - '.github/workflows/local_history_service.yml' + + workflow_dispatch: + inputs: + tag: + description: 'Tag to push on docker hub' + required: true + + release: + types: [created] + +env: + IMAGE_NAME: 'wirepas/gateway_local_history_service' + +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: checkout code + uses: actions/checkout@v2 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v1 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + + - name: Set tag for push + if: github.event_name == 'push' + run: echo "TAG1=$IMAGE_NAME:edge" >> $GITHUB_ENV + + - name: Set tag for manually triggered + if: github.event_name == 'workflow_dispatch' + run: echo "TAG1=$IMAGE_NAME:${{ github.event.inputs.tag }}" >> $GITHUB_ENV + + - name: Set tag for release version + if: github.event_name == 'release' + run: echo "TAG1=$IMAGE_NAME:${{ github.event.release.tag_name }}" >> $GITHUB_ENV + + - name: Set additionnal latest tag also for official release + if: github.event_name == 'release' && !contains(github.event.release.tag_name, 'rc') + run: echo "TAG2=$IMAGE_NAME:latest" >> $GITHUB_ENV + + - name: Login to docker hub + uses: docker/login-action@v1 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Build and push + uses: docker/build-push-action@v2 + with: + context: . + file: docker/local_history_service/Dockerfile + platforms: linux/amd64, linux/arm64, linux/arm/v7, linux/arm/v6 + push: true + build-args: GATEWAY_BUILD_SHA1= ${{ github.sha }} + tags: | + ${{ env.TAG1 }} + ${{ env.TAG2 }} diff --git a/docker/local_history_service/Dockerfile b/docker/local_history_service/Dockerfile new file mode 100644 index 00000000..6d442a16 --- /dev/null +++ b/docker/local_history_service/Dockerfile @@ -0,0 +1,59 @@ +# Duplicate of transport service for now +FROM python:3.10.8-alpine3.17 AS builder + +RUN adduser --disabled-password wirepas + +RUN apk add --no-cache \ + gcc \ + bash \ + build-base \ + make \ + musl-dev \ + elogind-dev \ + python3-dev \ + py3-gobject3 \ + cairo-dev \ + gobject-introspection-dev + +RUN python3 -m pip install wheel setuptools pkgconfig + +USER wirepas +WORKDIR /home/wirepas + +COPY --chown=wirepas ./python_transport /home/wirepas/python_transport +WORKDIR /home/wirepas/python_transport + +RUN ./utils/generate_wheel.sh + +USER wirepas + +RUN pip3 install dist/wirepas_gateway*.whl --no-deps --user + +RUN pip3 install pydbus==0.6.0 PyYAML==6.0.1 --user +RUN pip3 install gobject PyGObject --user + + +# Build the final image +FROM wirepas/wmm_alpine_cpp:1.2.3 as runner + +USER root + +# Variable set from CI +ARG GATEWAY_BUILD_SHA1=unset + +RUN apk add --no-cache libelogind glib gobject-introspection ndisc6 iproute2 sudo + +RUN adduser wirepas wheel + +USER wirepas + +ENV PATH="/home/wirepas/.local/bin:${PATH}" + +# Copy the built wheel and its dependencies from builder +COPY --from=builder /home/wirepas/.local /home/wirepas/.local + +COPY ./local_history_service/*.py /home/wirepas/local_history_service/ + +CMD ["python3", "/home/wirepas/local_history_service/local_history_service.py"] + +LABEL com.wirepas.gateway.build.sha1="${GATEWAY_BUILD_SHA1}" diff --git a/local_history_service/local_history_service.py b/local_history_service/local_history_service.py new file mode 100644 index 00000000..3c3fb980 --- /dev/null +++ b/local_history_service/local_history_service.py @@ -0,0 +1,182 @@ +# Copyright 2023 Wirepas Ltd licensed under Apache License, Version 2.0 +# +# See file LICENSE for full license details. + +import os +import sys +from datetime import datetime, timedelta +import argparse +import logging +import base64 + +from wirepas_gateway.dbus.dbus_client import BusClient +from wirepas_gateway import __pkg_name__ + + +class LocalHistoryService(BusClient): + def __init__(self, historical_days=5, file_path="", file_prefix="lhs", endpoints=None) -> None: + + super(LocalHistoryService, self).__init__( + ignored_ep_filter=None + ) + + self.historical_days = historical_days + self.file_path = file_path + self.file_prefix = file_prefix + self.endpoints = endpoints + + logging.info("Local history service started for %d days for EPs: %s", historical_days, endpoints) + + def on_data_received( + self, + sink_id, + timestamp, + src, + dst, + src_ep, + dst_ep, + travel_time, + qos, + hop_count, + data, + ): + if dst_ep not in self.endpoints: + logging.debug("Filtered EPs") + return + + # Get current time + now = datetime.now() + + # Compute the file name + file_suffix = now.strftime("_%d_%m_%Y") + target_file = os.path.join(self.file_path, self.file_prefix + file_suffix) + + logging.info("Packet received to be written to %s", target_file) + + # Check if we have created the file + file_created = not os.path.exists(target_file) + + with open(target_file, 'a') as cur_file: + cur_file.write("%d;%x;%d;%d;%s\n" % ( + now.timestamp(), + src, + src_ep, + dst_ep, + base64.b64encode(data)) + ) + + if file_created: + # File was created, check if we have to remove an older one + logging.info("Check if a file must be deleted") + file_suffix_to_remove = (now - timedelta(days=(self.historical_days + 1))).strftime("_%d_%m_%Y") + file_to_remove = os.path.join(self.file_path, self.file_prefix + file_suffix_to_remove) + + try: + logging.debug("Trying to remove file %s", file_to_remove) + os.remove(file_to_remove) + except OSError: + logging.debug("No file to remove") + +def str2none(value): + """ Ensures string to bool conversion """ + if value == "": + return None + return value + +def parse_setting_list(list_setting): + """ This function parse ep list specified from setting file or cmd line + + Input list has following format [1, 5, 10-15] as a string or list of string + and is expended as a single list [1, 5, 10, 11, 12, 13, 14, 15] + + Args: + list_setting(str or list): the list from setting file or cmd line. + + Returns: A single list of ep + """ + if isinstance(list_setting, str): + # List is a string from cmd line + list_setting = list_setting.replace("[", "") + list_setting = list_setting.replace("]", "") + list_setting = list_setting.split(",") + + single_list = [] + for ep in list_setting: + # Check if ep is directly an int + if isinstance(ep, int): + if ep < 0 or ep > 255: + raise SyntaxError("EP out of bound") + single_list.append(ep) + continue + + # Check if ep is a single ep as string + try: + ep = int(ep) + if ep < 0 or ep > 255: + raise SyntaxError("EP out of bound") + single_list.append(ep) + continue + except ValueError: + # Probably a range + pass + + # Check if ep is a range + try: + ep = ep.replace("'", "") + lower, upper = ep.split("-") + lower = int(lower) + upper = int(upper) + if lower > upper or lower < 0 or upper > 255: + raise SyntaxError("Wrong EP range value") + + single_list += list(range(lower, upper + 1)) + except (AttributeError, ValueError): + raise SyntaxError("Wrong EP range format") + + if len(single_list) == 0: + single_list = None + + return single_list + +if __name__ == "__main__": + + + debug_level = os.environ.get("WM_DEBUG_LEVEL", "info") + # Convert it in upper for logging config + debug_level = "{0}".format(debug_level.upper()) + + # enable its logger + logging.basicConfig( + format=f'%(asctime)s | [%(levelname)s] {__pkg_name__}@%(filename)s:%(lineno)d:%(message)s', + level=debug_level, + stream=sys.stdout + ) + + parser = argparse.ArgumentParser(fromfile_prefix_chars='@') + + parser.add_argument( + "--historical_days", + default=os.environ.get("WM_LHS_NUM_DAYS", 5), + action="store", + type=int, + help="Number of historical days to store (1 file per day)", + ) + + parser.add_argument( + "--historical_file_path", + default=os.environ.get("WM_LHS_PATH", ""), + action="store", + type=str, + help="Path to store files", + ) + + parser.add_argument( + "--endpoints_to_save", + type=str2none, + default=os.environ.get("WM_LHS_ENDPOINTS", None), + help=("Destination endpoints list to keep in history (all if not set)"), + ) + + args = parser.parse_args() + + LocalHistoryService(args.historical_days, args.historical_file_path, endpoints=parse_setting_list(args.endpoints_to_save)).run()