Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZenohUTransport implementaton #54

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ find_package(spdlog REQUIRED)
find_package(up-core-api REQUIRED)
find_package(up-cpp REQUIRED)
find_package(zenohc REQUIRED)
find_package(zenohcxx REQUIRED zenohc::lib)
find_package(GTest REQUIRED)
include(GoogleTest)

# TODO NEEDED?
#add_definitions(-DSPDLOG_FMT_EXTERNAL)
Expand All @@ -42,6 +45,7 @@ target_include_directories(${PROJECT_NAME}
$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>
$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}>
${zenohc_INCLUDE_DIR}
${zenohcxx_INCLUDE_DIR}
${up-cpp_INCLUDE_DIR}
${up-core-api_INCLUDE_DIR}
${protobuf_INCLUDE_DIR}
Expand All @@ -52,9 +56,11 @@ set_property(TARGET ${PROJECT_NAME} PROPERTY POSITION_INDEPENDENT_CODE ON)
target_link_libraries(${PROJECT_NAME}
PRIVATE
zenohc::lib
zenohcxx::zenohc::lib
up-cpp::up-cpp
up-core-api::up-core-api
protobuf::libprotobuf
GTest::gtest_main
spdlog::spdlog)

enable_testing()
Expand Down
102 changes: 102 additions & 0 deletions include/up-transport-zenoh-cpp/ZenohUTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,36 @@
#ifndef UP_TRANSPORT_ZENOH_CPP_ZENOHUTRANSPORT_H
#define UP_TRANSPORT_ZENOH_CPP_ZENOHUTRANSPORT_H

#include <gtest/gtest.h>
#include <up-cpp/transport/UTransport.h>

#include <filesystem>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <zenoh.hxx>

namespace zenohc {

class OwnedQuery {
public:
OwnedQuery(const z_query_t& query) : _query(z_query_clone(&query)) {}

OwnedQuery(const OwnedQuery&) = delete;
OwnedQuery& operator=(const OwnedQuery&) = delete;

~OwnedQuery() { z_drop(&_query); }

Query loan() const { return z_loan(_query); }
bool check() const { return z_check(_query); }

private:
z_owned_query_t _query;
};

using OwnedQueryPtr = std::shared_ptr<OwnedQuery>;

} // namespace zenohc

namespace uprotocol::transport {

Expand Down Expand Up @@ -61,6 +87,31 @@ struct ZenohUTransport : public UTransport {

/// @brief Represents the callable end of a callback connection.
using CallableConn = typename UTransport::CallableConn;
using UuriKey = std::string;

struct ListenerKey {
CallableConn listener;
std::string zenoh_key;

ListenerKey(CallableConn listener, const std::string& zenoh_key)
: listener(listener), zenoh_key(zenoh_key) {}

bool operator==(const ListenerKey& other) const {
return listener == other.listener && zenoh_key == other.zenoh_key;
}

bool operator<(const ListenerKey& other) const {
if (listener == other.listener) {
return zenoh_key < other.zenoh_key;
}
return listener < other.listener;
}
};

using RpcCallbackMap = std::map<UuriKey, CallableConn>;
using SubscriberMap = std::map<ListenerKey, zenoh::Subscriber>;
using QueryableMap = std::map<ListenerKey, zenoh::Queryable>;
using QueryMap = std::map<std::string, zenoh::OwnedQueryPtr>;

/// @brief Register listener to be called when UMessage is received
/// for the given URI.
Expand Down Expand Up @@ -95,6 +146,57 @@ struct ZenohUTransport : public UTransport {
virtual void cleanupListener(CallableConn listener) override;

private:
FRIEND_TEST(TestZenohUTransport, toZenohKeyString);

static v1::UStatus uError(v1::UCode code, std::string_view message);

static std::string toZenohKeyString(
const std::string& default_authority_name, const v1::UUri& source,
const std::optional<v1::UUri>& sink);

static std::vector<std::pair<std::string, std::string>>
uattributesToAttachment(const v1::UAttributes& attributes);

static v1::UAttributes attachmentToUAttributes(
const zenoh::AttachmentView& attachment);

static zenoh::Priority mapZenohPriority(v1::UPriority upriority);

static v1::UMessage sampleToUMessage(const zenoh::Sample& sample);

v1::UStatus registerRequestListener_(const std::string& zenoh_key,
CallableConn listener);

v1::UStatus registerResponseListener_(const std::string& zenoh_key,
CallableConn listener);

v1::UStatus registerPublishNotificationListener_(
const std::string& zenoh_key, CallableConn listener);

v1::UStatus sendRequest_(const std::string& zenoh_key,
const std::string& payload,
const v1::UAttributes& attributes);

v1::UStatus sendResponse_(const std::string& payload,
const v1::UAttributes& attributes);

v1::UStatus sendPublishNotification_(const std::string& zenoh_key,
const std::string& payload,
const v1::UAttributes& attributes);

zenoh::Session session_;

RpcCallbackMap rpc_callback_map_;
std::mutex rpc_callback_map_mutex_;

SubscriberMap subscriber_map_;
std::mutex subscriber_map_mutex_;

QueryableMap queryable_map_;
std::mutex queryable_map_mutex_;

QueryMap query_map_;
std::mutex query_map_mutex_;
};

} // namespace uprotocol::transport
Expand Down
Loading
Loading