From 0635490bdb2b8c7537b621400096f7c4f520413b Mon Sep 17 00:00:00 2001 From: Rene Schwermer Date: Fri, 30 Jun 2023 11:12:34 +0200 Subject: [PATCH] Add pubsub functionality via MQTT Signed-off-by: Rene Schwermer --- CMakeLists.txt | 14 +++- doc/BUILDING.md | 84 +++++++++++-------- doc/geds_ansible.yml | 123 +++++++++++++++++++++++++++ src/metadataservice/CMakeLists.txt | 2 + src/metadataservice/PubSubMQTT.h | 130 +++++++++++++++++++++++++++++ src/metadataservice/main.cpp | 30 ++++++- src/python/create.py | 24 +++++- src/python/read.py | 30 +++++++ 8 files changed, 395 insertions(+), 42 deletions(-) create mode 100644 doc/geds_ansible.yml create mode 100644 src/metadataservice/PubSubMQTT.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 8d6002db..b5b8290f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,7 +41,19 @@ endif() set_property(GLOBAL PROPERTY USE_FOLDERS ON) -set(GEDS_EXTRA_COMPILER_FLAGS -Wall -Wextra -Werror) # -Wpedantic # error: ISO C++ does not support ‘__int128’ for ‘type name’ [-Werror=pedantic] +# Added for pub/sub service +# Requires to remove GEDS_EXTRA_COMPILER_FLAGS -Werror +include(FetchContent) +FetchContent_Declare(MQTT + GIT_REPOSITORY https://github.com/eclipse/paho.mqtt.c.git + GIT_TAG v1.3.8) +FetchContent_MakeAvailable(MQTT) +FetchContent_Declare(MQTT_CXX + GIT_REPOSITORY https://github.com/eclipse/paho.mqtt.cpp + GIT_TAG v1.2.0) +FetchContent_MakeAvailable(MQTT_CXX) + +set(GEDS_EXTRA_COMPILER_FLAGS -Wall -Wextra) # -Wpedantic # error: ISO C++ does not support ‘__int128’ for ‘type name’ [-Werror=pedantic] set(GEDS_EXTRA_LINKER_FLAGS) # if(CMAKE_CXX_COMPILER_ID MATCHES "Clang") diff --git a/doc/BUILDING.md b/doc/BUILDING.md index 277b4cbe..ad2cf3b9 100644 --- a/doc/BUILDING.md +++ b/doc/BUILDING.md @@ -1,31 +1,23 @@ -# Building - -## CMake -Install CMake > 3.20. - -- Build commands: - ```bash - cmake -DCMAKE_BUILD_TYPE=Debug -S . -B $BUILD_DIR - cmake --build $BUILD_DIR - ``` - -- Test commands: - ```bash - cmake --build $BUILD_DIR -t test - ``` - -- Install command: - ```bash - cmake --install $BUILD_DIR --prefix $INSTALL_DIR --component geds - ``` - -## Docker - -`build-docker.sh` builds a docker container with GRPC and a build of GEDS in `/usr/local/opt/geds`. - -## Dependencies - -### MacOS +# Building GEDS + +- [Workflow](#workflow) +- [Instructions for MacOS](#instructions-for-macos) +- [Instructions for Windows](#instructions-for-windows) +- [Instructions for Linux](#instructions-for-linux) +- [Deploying via Docker](#deploying-via-docker) +- [Deploying via Ansible](#deploying-via-ansible) + +## Workflow +The general workflow of building GEDS from source is: +1. Pull GEDS repository: `git pull https://github.com/IBM/GEDS.git` +2. Install dependencies, e.g. `cmake` version > 3.20 (check via `cmake --version`) +3. Create `build` and `install` directory in the GEDS folder and set environment variables: `export $BUILD_DIR=~/GEDS/build` & `export $INSTALL_DIR=~/GEDS/bin` +4. Build Boost +5. Build AWS SDK +6. Build GEDS +7. Install GEDS + +## Instructions for MacOS Install the following dependencies through homebrew: @@ -54,23 +46,41 @@ Finally build it with: cmake --build . --target all ``` -### Linux +## Instructions for Windows +Coming -Install the following dependencies: +## Instructions for Linux +Install GEDS dependencies: ``` -apt-get install -y \ - clang \ - curl wget \ - build-essential gcc ninja-build \ - openjdk-11-jdk \ - python3.9 python3.9-dev python3-distutils +sudo apt install -y clang curl wget build-essential gcc ninja-build openjdk-11-jdk python3-dev python3-distutils cmake ``` -and a recent version (>= 3.20) of CMake: +CMake version >= 3.20: ``` CMAKE_VERSION=3.22.4 wget --quiet -O cmake.tar.gz https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-linux-x86_64.tar.gz \ && tar xf cmake.tar.gz --strip-components=1 -C /usr/local/ \ && rm cmake.tar.gz ``` + +Install AWS SDK dependecies: +``` +sudo apt install libcurl4-openssl-dev libssl-de uuid-dev zlib1g-dev libpulse-dev +``` + +Build AWS SDK: `/bin/bash build-aws-sdk.sh` + +Build Boost: `/bin/bash build-boost.sh` + +Build GEDS: +1. Check if environment variables are correctly set via `printenv | grep BUILD_DIR` and `printenv | grep INSTALL_DIR` +2. `cmake -DCMAKE_BUILD_TYPE=Debug -S . -B $BUILD_DIR` +3. `cmake --build $BUILD_DIR -j 4` (-j specifies the number of cores to use) +4. `cmake --install $BUILD_DIR --prefix $INSTALL_DIR --component geds` + +## Deploying via Docker +`build-docker.sh` builds a docker container with GRPC and a build of GEDS in `/usr/local/opt/geds`. + +## Deploying via Ansible +We offer an Ansible playbook to automate GEDS building from source on multiple clients. diff --git a/doc/geds_ansible.yml b/doc/geds_ansible.yml new file mode 100644 index 00000000..cdb19177 --- /dev/null +++ b/doc/geds_ansible.yml @@ -0,0 +1,123 @@ +--- +- hosts: geds + name: Update all apt packages + become: false + vars: + ansible_python_interpreter: /usr/bin/python3 + remote_home: "{{ ansible_env.HOME }}" + + tasks: + - name: Update and upgrade + tags: update + become: true + apt: + upgrade: yes + update_cache: yes + + - name: Reboot + tags: reboot + become: true + reboot: + + - name: Install GEDS dependencies + tags: dependencies + become: true + apt: + pkg: + - clang + - curl + - wget + - build-essential + - gcc + - ninja-build + - openjdk-11-jdk + - python3-dev + - python3-distutils + - cmake + state: latest + update_cache: yes + + - name: Create GEDS directory + tags: git + become: false + file: + path: "{{ remote_home }}/GEDS" + state: directory + + - name: Git clone GEDS + tags: git + become: false + ansible.builtin.git: + repo: "https://github.com/IBM/GEDS.git" + dest: "{{ remote_home }}/GEDS/" + + - name: AWS dependencies + tags: aws + become: true + apt: + pkg: + - libcurl4-openssl-dev + - libssl-dev + - uuid-dev + - zlib1g-dev + - libpulse-dev + state: latest + update_cache: yes + + - name: Build AWS + tags: aws + become: false + ansible.builtin.command: /bin/bash build-aws-sdk.sh + async: 3600 + poll: 30 + args: + chdir: "{{ remote_home }}/GEDS" + + - name: Build boost + tags: boost + become: false + ansible.builtin.command: /bin/bash build-boost.sh + args: + chdir: "{{ remote_home }}/GEDS" + + - name: Create GEDS build directory + tags: geds + become: false + file: + path: "{{ remote_home }}/GEDS/build" + state: directory + + - name: Build GEDS + tags: geds + become: false + ansible.builtin.command: cmake -DCMAKE_BUILD_TYPE=Debug -S . -B $BUILD_DIR + args: + chdir: "{{ remote_home }}/GEDS" + environment: + BUILD_DIR: "{{ remote_home }}/GEDS/build" + + - name: Build GEDS + tags: geds + become: false + ansible.builtin.command: cmake --build $BUILD_DIR -j 4 + args: + chdir: "{{ remote_home }}/GEDS" + environment: + BUILD_DIR: "{{ remote_home }}/GEDS/build" + + - name: Create GEDS install directory + tags: geds + become: false + file: + path: "{{ remote_home }}/GEDS/bin" + state: directory + + - name: Install GEDS + tags: geds + become: false + ansible.builtin.command: cmake --install $BUILD_DIR --prefix $INSTALL_DIR --component geds + args: + chdir: "{{ remote_home }}/GEDS" + environment: + BUILD_DIR: "{{ remote_home }}/GEDS/build" + INSTALL_DIR: "{{ remote_home }}/GEDS/bin" diff --git a/src/metadataservice/CMakeLists.txt b/src/metadataservice/CMakeLists.txt index 07d70ae8..0951f2a1 100644 --- a/src/metadataservice/CMakeLists.txt +++ b/src/metadataservice/CMakeLists.txt @@ -10,6 +10,7 @@ set(SOURCES ObjectStoreHandler.h S3Helper.cpp S3Helper.h + PubSubMQTT.h ) add_library(libmetadataservice STATIC ${SOURCES}) @@ -19,6 +20,7 @@ target_link_libraries(libmetadataservice geds_utility geds_proto geds_s3 + paho-mqttpp3 ) target_compile_options(libmetadataservice PUBLIC ${GEDS_EXTRA_COMPILER_FLAGS}) target_compile_definitions(libmetadataservice diff --git a/src/metadataservice/PubSubMQTT.h b/src/metadataservice/PubSubMQTT.h new file mode 100644 index 00000000..ad5c5ce1 --- /dev/null +++ b/src/metadataservice/PubSubMQTT.h @@ -0,0 +1,130 @@ +/** + * Copyright 2022- IBM Inc. All rights reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "mqtt/async_client.h" + + +// https://github.com/eclipse/paho.mqtt.cpp/issues/141 +mqtt::async_client_ptr createClient(std::string serverAddress, + std::string clientID){ + // Use MQTT v5 to enable no_local: + // This flag avoids receiving messages from the same host by telling + // the broker not to send messages received with a client ID to a + // subscriber with the same client ID + + mqtt::create_options createOpts = mqtt::create_options(); + createOpts.set_mqtt_verison(5); + std::string persistDir= "None"; + auto client_ptr = std::make_shared(serverAddress, + clientID, + createOpts, + persistDir); + std::cout << "Created MQTT client to " + serverAddress + " and ID " + clientID << std::endl; + return client_ptr; +} + +mqtt::async_client_ptr connectClient(std::shared_ptr client_ptr, + std::string node){ + auto connOpts = std::make_shared(); + + if (node == "server"){ + connOpts->set_keep_alive_interval(20); + connOpts->set_clean_session(false); + connOpts->set_automatic_reconnect(true); + connOpts->set_mqtt_version(5); + } + if (node == "client"){ + connOpts->set_clean_session(false); + connOpts->set_mqtt_version(5); + } + client_ptr->connect(*connOpts)->wait(); + std::cout << "Connected MQTT client" << std::endl; + return client_ptr; +} + +void publishData(mqtt::async_client_ptr client_ptr, + std::string topic, + int QoS, + std::string data){ + mqtt::topic top(*client_ptr, topic, QoS, true); + mqtt::message_ptr message = mqtt::make_message(topic, data); + message->set_qos(QoS); + message->set_payload("A single message"); + + client_ptr->publish(message); + std::cout << "Published data to " + topic << std::endl; +} + +mqtt::async_client_ptr subscribe(mqtt::async_client_ptr client_ptr, + std::string topic, + int QoS){ + mqtt::subscribe_options subOpts; + subOpts.set_no_local(true); // Only works with MQTT v5 + + client_ptr->subscribe(topic, QoS, subOpts)->wait(); + std::cout << "Subscribed to " + topic << std::endl; + return client_ptr; +} + +void unsubscribe(mqtt::async_client_ptr client_ptr, + std::string topic){ + client_ptr->unsubscribe(topic)->wait(); + client_ptr->stop_consuming(); + std::cout << "Unsubscribed from " + topic << std::endl; +} + +std::tuple consumeMessage(mqtt::async_client_ptr client_ptr){ + client_ptr->start_consuming(); + auto msg = client_ptr->consume_message(); + // msg->get_payload() + return std::make_tuple(msg->get_topic(), msg->to_string()); +} + +void disconnectClient(mqtt::async_client_ptr client_ptr){ + client_ptr->disconnect()->wait(); + std::cout << "Disconnected client" << std::endl; +} + + +// Example publisher +// --------------------------------------------------------------- +// #include "PubSub.h" +// +// int main(int argc, char **argv) { +// mqtt::async_client_ptr client_ptr = createClient("tcp://localhost:1883", +// "mds_server"); +// std::string node_type = "server"; +// mqtt::async_client_ptr connected_client_ptr = connectClient(client_ptr, +// node_type); +// publishData(connected_client_ptr, "home/file1", 1, "Hello World"); +// disconnectClient(connected_client_ptr); +// return 0; +// } + +// Example subscriber +// --------------------------------------------------------------- +// #include "PubSub.h" +// +// int main(void) +// { +// mqtt::async_client_ptr client_ptr = createClient("tcp://localhost:1883", +// "mds_client"); +// std::string node_type = "client"; +// mqtt::async_client_ptr connected_client_ptr = connectClient(client_ptr, +// node_type); +// mqtt::async_client_ptr subscribed_client_ptr = subscribe(connected_client_ptr, +// "home/file1", 1); +// while (true) { +// auto msg = consumeMessage(subscribed_client_ptr); +// std::string topic = std::get<0>(msg); +// std::string payload = std::get<1>(msg); +// +// if (payload.empty()){ +// break; +// } +// std::cout << topic + " " + payload << std::endl; +// } +// return 0; +// } diff --git a/src/metadataservice/main.cpp b/src/metadataservice/main.cpp index b0383537..8ee298b4 100644 --- a/src/metadataservice/main.cpp +++ b/src/metadataservice/main.cpp @@ -4,24 +4,50 @@ */ #include +#include #include #include #include "GRPCServer.h" #include "Ports.h" +#include "PubSubMQTT.h" -ABSL_FLAG(std::string, address, "0.0.0.0", "Server interface address."); +ABSL_FLAG(std::string, address, "localhost", "Server interface address."); ABSL_FLAG(uint16_t, port, defaultMetdataServerPort, "Port."); int main(int argc, char **argv) { absl::ParseCommandLine(argc, argv); auto serverAddress = FLAGS_address.CurrentValue() + ":" + FLAGS_port.CurrentValue(); + + std::thread thread_geds([serverAddress]{ GRPCServer service(serverAddress); auto status = service.startAndWait(); if (!status.ok()) { std::cerr << status.message() << std::endl; - exit(1); } + }); + + mqtt::async_client_ptr client_ptr = createClient("172.24.33.70:1883", + "mds_server"); + std::string nodeType = "server"; + mqtt::async_client_ptr connected_client_ptr = connectClient(client_ptr, nodeType); + mqtt::async_client_ptr subscribed_client_ptr = subscribe(connected_client_ptr, + "bucket/#", 1); + + std::thread thread_mqtt([connected_client_ptr, subscribed_client_ptr]{ + while (true) { + auto msg = consumeMessage(subscribed_client_ptr); + std::string topic = std::get<0>(msg); + std::string payload = std::get<1>(msg); + if (payload.empty()){ + break; + } + publishData(connected_client_ptr, topic, 1, "Hello World"); + } + }); + + thread_geds.join(); + thread_mqtt.join(); return 0; } diff --git a/src/python/create.py b/src/python/create.py index 5e031688..7afc7f8f 100644 --- a/src/python/create.py +++ b/src/python/create.py @@ -3,12 +3,24 @@ # SPDX-License-Identifier: Apache-2.0 # +# stdlib import os -from time import sleep +import json +import time +# third party +import paho.mqtt.client as mqtt_client + +# relative from pygeds import status, GEDS, GEDSConfig -METADATA_SERVER = os.environ.get("GEDS_METADATASERVER", "zac13:4381") +METADATA_SERVER: str = os.environ.get("GEDS_METADATASERVER", "zac13:4381") +MQTT: bool = False + + +if MQTT: + client = mqtt_client.Client(client_id="client1") + client.connect("172.24.33.70", 1883) instance = GEDS(GEDSConfig(METADATA_SERVER)) try: @@ -37,6 +49,14 @@ print(f"Read: {message_read.decode()}") file.seal() +if MQTT: + client = mqtt_client.Client() + client.connect("172.24.33.70", 1883) + client.loop_start() + client.publish("bucket/testfile", json.dumps({"file_name": "testfile", + "created": True}).encode()) + client.disconnect() + client.loop_stop() file2 = instance.create("bucket2", "testfile2") file2.write(message_read, 0, len(message)) diff --git a/src/python/read.py b/src/python/read.py index ed0ae01b..25d2fb6e 100644 --- a/src/python/read.py +++ b/src/python/read.py @@ -3,11 +3,31 @@ # SPDX-License-Identifier: Apache-2.0 # +# stdlib import os +import time +# third party +import paho.mqtt.client as mqtt_client + +# relative from pygeds import status, GEDS, GEDSConfig + METADATA_SERVER = os.environ.get("GEDS_METADATASERVER", "zac13:4381") +MQTT: bool = False + + +def on_message(client, userdata, msg): + print(f"Received {msg.payload} on topic {msg.topic}") + file = instance.open("bucket", "testfile") + time.sleep(.1) + buffer = bytearray(file.size) + l = file.read(buffer, 0, len(buffer)) + print(f"Read {l} bytes") + print(buffer.decode("utf-8")) + + instance = GEDS(GEDSConfig(METADATA_SERVER)) try: instance.start() @@ -21,3 +41,13 @@ l = file.read(buffer, 0, len(buffer)) print(f"Read {l} bytes") print(buffer.decode("utf-8")) + +if MQTT: + client = mqtt_client.Client(client_id="client2") + client.on_message = on_message + + client.connect("172.24.33.70", 1883) + client.subscribe("bucket/testfile") + + client.loop_forever() + client.disconnect()