From 6fde804676d9125df134458eeee2e91360e70f3a Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 23 May 2024 16:16:55 +0200 Subject: [PATCH 1/7] Add zenohcxx dependency --- test/CMakeLists.txt | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 3501d45..49264ee 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -31,7 +31,16 @@ function(add_coverage_test Name) GTest::gmock pthread ) - target_include_directories(${Name} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include) + target_include_directories(${Name} + PRIVATE + $ + ${zenohc_INCLUDE_DIR} + ${zenohcxx_INCLUDE_DIR} + ${up-cpp_INCLUDE_DIR} + ${up-core-api_INCLUDE_DIR} + ${protobuf_INCLUDE_DIR} + ${spdlog_INCLUDE_DIR} + ) gtest_discover_tests(${Name} XML_OUTPUT_DIR results) endfunction() From 2860e03a57560b0c033c7b71d1645887c46d06d2 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 23 May 2024 20:51:54 +0200 Subject: [PATCH 2/7] Start drafting --- .../up-transport-zenoh-cpp/ZenohUTransport.h | 20 ++ src/ZenohUTransport.cpp | 257 ++++++++++++++++++ test/extra/PublisherSubscriberTest.cpp | 15 +- 3 files changed, 291 insertions(+), 1 deletion(-) diff --git a/include/up-transport-zenoh-cpp/ZenohUTransport.h b/include/up-transport-zenoh-cpp/ZenohUTransport.h index fbe28ed..9d97af7 100644 --- a/include/up-transport-zenoh-cpp/ZenohUTransport.h +++ b/include/up-transport-zenoh-cpp/ZenohUTransport.h @@ -15,7 +15,10 @@ #include #include +#include #include +#include +#include namespace uprotocol::transport { @@ -61,6 +64,8 @@ struct ZenohUTransport : public UTransport { /// @brief Represents the callable end of a callback connection. using CallableConn = typename UTransport::CallableConn; + using UuriKey = std::string; + using RpcCallbackMap = std::unordered_map; /// @brief Register listener to be called when UMessage is received /// for the given URI. @@ -95,6 +100,21 @@ struct ZenohUTransport : public UTransport { virtual void cleanupListener(CallableConn listener) override; private: + v1::UStatus sendPublishNotification_(const std::string& zenoh_key, + const std::string& payload, + const v1::UAttributes& attributes); + + v1::UStatus sendRequest_(const std::string& zenoh_key, + const std::string& payload, + const v1::UAttributes& attributes); + + v1::UStatus sendResponse_(const std::string& payload, + const v1::UAttributes& attributes); + + zenoh::Session session_; + + RpcCallbackMap rpcCallbackMap_; + std::mutex rpcCallbackMapMutex_; }; } // namespace uprotocol::transport diff --git a/src/ZenohUTransport.cpp b/src/ZenohUTransport.cpp index f8bf760..c299fe1 100644 --- a/src/ZenohUTransport.cpp +++ b/src/ZenohUTransport.cpp @@ -9,4 +9,261 @@ // // SPDX-License-Identifier: Apache-2.0 +#include +#include + +#include + #include "up-transport-zenoh-cpp/ZenohUTransport.h" + +namespace uprotocol::transport { + +using namespace zenoh; +using namespace uprotocol::v1; +using namespace uprotocol::datamodel; + +namespace { +/* +std::string to_hex_string(uint8_t byte) { + std::stringstream ss; + ss << std::hex << std::setw(2) << std::setfill('0') + << static_cast(byte); + return ss.str(); +} + +std::string get_uauth_from_uuri(const UUri& uri) { + if (!uri.authority_name().empty()) { + std::vector buf; + try { + buf = uri.to_bytes(); + } catch (const std::runtime_error& e) { + std::string msg = "Unable to transform UAuthority into micro form"; + std::cerr << msg << std::endl; + throw UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg); + } + + std::ostringstream oss; + for (auto c : buf) { + oss << std::hex << std::setw(2) << std::setfill('0') + << static_cast(c); + } + return oss.str(); + } else { + std::string msg = "UAuthority is empty"; + std::cerr << msg << std::endl; + throw UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg); + } +} +*/ +std::string toZenohKeyString(const UUri& uri) { + /* + if (uri.authority_name().has_value() && !uri.entity.has_value() && + !uri.resource.has_value()) { + try { + return "upr/" + UPClientZenoh::get_uauth_from_uuri(uri) + "/**"; + } catch (const std::runtime_error& e) { + std::string msg = + "Unable to get authority from UUri: " + std::string(e.what()); + std::cerr << msg << std::endl; + throw UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg); + } + } else { + std::vector micro_uuri; + try { + micro_uuri = uri.to_bytes(); + } catch (const std::runtime_error& e) { + std::string msg = "Unable to serialize into micro format: " + + std::string(e.what()); + std::cerr << msg << std::endl; + throw UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg); + } + + std::string micro_zenohKey; + if (micro_uuri.size() > 8) { + micro_zenohKey = "upr/"; + for (size_t i = 8; i < micro_uuri.size(); ++i) { + micro_zenohKey += to_hex_string(micro_uuri[i]); + } + micro_zenohKey += "/"; + } else { + micro_zenohKey = "upl/"; + } + + for (size_t i = 0; i < 8 && i < micro_uuri.size(); ++i) { + micro_zenohKey += to_hex_string(micro_uuri[i]); + } + + return micro_zenohKey; + } + */ + return ""; +} + +std::map uattributesToAttachment( + const UAttributes& attributes) { + std::map res; + return res; +} + +UAttributes attachmentToUattributes(const AttachmentView& attachment) { + UAttributes res; + return res; +} + +Priority mapZenohPriority(UPriority upriority) { + switch (upriority) { + case UPriority::UPRIORITY_CS0: + return Z_PRIORITY_BACKGROUND; + case UPriority::UPRIORITY_CS1: + return Z_PRIORITY_DATA_LOW; + case UPriority::UPRIORITY_CS2: + return Z_PRIORITY_DATA; + case UPriority::UPRIORITY_CS3: + return Z_PRIORITY_DATA_HIGH; + case UPriority::UPRIORITY_CS4: + return Z_PRIORITY_INTERACTIVE_LOW; + case UPriority::UPRIORITY_CS5: + return Z_PRIORITY_INTERACTIVE_HIGH; + case UPriority::UPRIORITY_CS6: + return Z_PRIORITY_REAL_TIME; + case UPriority::UPRIORITY_UNSPECIFIED: + default: + return Z_PRIORITY_DATA_LOW; + } +} + +} // namespace + +ZenohUTransport::ZenohUTransport(const UUri& defaultUri, + const std::filesystem::path& configFile) + : UTransport(defaultUri), + session_(expect(open( + std::move(expect(config_from_file(configFile.string().c_str())))))) {} + +UStatus ZenohUTransport::sendPublishNotification_( + const std::string& zenohKey, const std::string& payload, + const UAttributes& attributes) { + auto attachment = uattributesToAttachment(attributes); + + auto priority = mapZenohPriority(attributes.priority()); + + PutOptions options; + options.set_encoding(Z_ENCODING_PREFIX_APP_CUSTOM); + options.set_priority(priority); + options.set_attachment(attachment); + if (!session_.put(zenohKey, payload, options)) { + return UStatus(); // TODO: UCode::INTERNAL, "Unable to send with Zenoh" + } + + return UStatus(); +} + +UStatus ZenohUTransport::sendRequest_(const std::string& zenohKey, + const std::string& payload, + const UAttributes& attributes) { + auto uuidStr = serializer::uri::AsString().serialize(attributes.source()); + CallableConn respCallback; + { + std::unique_lock lock(rpcCallbackMapMutex_); + + if (auto respCallbackIt = rpcCallbackMap_.find(uuidStr); + respCallbackIt == rpcCallbackMap_.end()) { + return UStatus(); // TODO: UCode::UNAVAILABLE, "failed to find UUID + // = {}", uuidStr + + } else { + respCallback = respCallbackIt->second; + } + } + auto onReply = [&](Reply&& reply) { + auto result = reply.get(); + + if (auto sample = std::get_if(&result)) { + UAttributes attributes; + if (sample->get_attachment().check()) { + attributes = attachmentToUattributes(sample->get_attachment()); + } + std::string payload(sample->get_payload().as_string_view()); + UMessage message; + message.set_payload(payload); + message.set_allocated_attributes(&attributes); + (*respCallback)(message); + } else if (auto error = std::get_if(&result)) { + std::cout << "Received an error :" << error->as_string_view() + << "\n"; + } + }; + + auto attachment = uattributesToAttachment(attributes); + + GetOptions options; + options.set_target(Z_QUERY_TARGET_BEST_MATCHING); + options.set_value(Value(payload)); + options.set_attachment(attachment); + + auto onDone = []() {}; + + session_.get(zenohKey, "", {onReply, onDone}, options); + + return UStatus(); +} + +UStatus ZenohUTransport::sendResponse_(const std::string& payload, + const UAttributes& attributes) { + return UStatus(); +} + +v1::UStatus ZenohUTransport::sendImpl(const UMessage& message) { + if (!message.has_payload()) { + std::string msg = "Invalid UPayload"; + return UStatus(); // TODO: UCode::INVALID_ARGUMENT, msg + } + const auto& payload = message.payload(); + + const auto& attributes = message.attributes(); + if (attributes.type() == UMessageType::UMESSAGE_TYPE_UNSPECIFIED) { + std::string msg = "Invalid UAttributes"; + return UStatus(); // TODO: UCode::INVALID_ARGUMENT, msg + } + + switch (attributes.type()) { + case UMessageType::UMESSAGE_TYPE_PUBLISH: { + auto res = validator::message::isValidPublish(message); + // TODO: check res + std::string zenohKey = toZenohKeyString(attributes.source()); + return sendPublishNotification_(zenohKey, payload, attributes); + } + case UMessageType::UMESSAGE_TYPE_NOTIFICATION: { + auto res = validator::message::isValidNotification(message); + // TODO: check res + std::string zenohKey = toZenohKeyString(attributes.sink()); + return sendPublishNotification_(zenohKey, payload, attributes); + } + case UMessageType::UMESSAGE_TYPE_REQUEST: { + auto res = validator::message::isValidRpcRequest(message); + // TODO: check res + std::string zenohKey = toZenohKeyString(attributes.sink()); + return sendRequest_(zenohKey, payload, attributes); + } + case UMessageType::UMESSAGE_TYPE_RESPONSE: { + auto res = validator::message::isValidRpcResponse(message); + // TODO: check res + return sendResponse_(payload, attributes); + } + default: { + std::string msg = "Wrong Message type in UAttributes"; + return UStatus(); // TODO: UCode::INVALID_ARGUMENT, msg + } + } + return UStatus(); +} + +v1::UStatus ZenohUTransport::registerListenerImpl( + const v1::UUri& sink_filter, CallableConn&& listener, + std::optional&& source_filter) { + return v1::UStatus(); +} + +void ZenohUTransport::cleanupListener(CallableConn listener) {} + +} // namespace uprotocol::transport diff --git a/test/extra/PublisherSubscriberTest.cpp b/test/extra/PublisherSubscriberTest.cpp index 54e772d..5ba082f 100644 --- a/test/extra/PublisherSubscriberTest.cpp +++ b/test/extra/PublisherSubscriberTest.cpp @@ -10,6 +10,9 @@ // SPDX-License-Identifier: Apache-2.0 #include +#include + +#include "up-transport-zenoh-cpp/ZenohUTransport.h" namespace { @@ -31,7 +34,17 @@ class TestFixture : public testing::Test { static void TearDownTestSuite() {} }; +using namespace uprotocol::v1; +using namespace uprotocol::transport; + // TODO replace -TEST_F(TestFixture, SomeTestName) {} +TEST_F(TestFixture, PubSub) { + UUri uuri; + uuri.set_ue_id(1); + uuri.set_ue_version_major(1); + uuri.set_resource_id(1); + auto ut = + ZenohUTransport(uuri, "/home/sashacmc/src/zenoh/DEFAULT_CONFIG.json5"); +} } // namespace From f06dadb24225acb723e6aa84878182353e0f0f18 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 24 May 2024 21:46:17 +0200 Subject: [PATCH 3/7] Continue drafting --- .../up-transport-zenoh-cpp/ZenohUTransport.h | 89 ++++- src/ZenohUTransport.cpp | 362 ++++++++++++------ 2 files changed, 319 insertions(+), 132 deletions(-) diff --git a/include/up-transport-zenoh-cpp/ZenohUTransport.h b/include/up-transport-zenoh-cpp/ZenohUTransport.h index 9d97af7..97570d9 100644 --- a/include/up-transport-zenoh-cpp/ZenohUTransport.h +++ b/include/up-transport-zenoh-cpp/ZenohUTransport.h @@ -20,6 +20,37 @@ #include #include +namespace zenohc { + +class OwnedQuery { +public: + OwnedQuery(const z_query_t& query) : _query(z_query_clone(&query)) {} + + OwnedQuery(const OwnedQuery&) = delete; + OwnedQuery& operator=(const OwnedQuery&) = delete; + + ~OwnedQuery() { z_drop(&_query); } + + Query loan() const { return z_loan(_query); } + bool check() const { return z_check(_query); } + +private: + z_owned_query_t _query; +}; + +using OwnedQueryPtr = std::shared_ptr; + +} // namespace zenohc + +namespace { + +inline void hashCombine(size_t& seed, size_t value) { + // See boost::hash_combine + seed ^= value + 0x9e3779b9 + (seed << 6) + (seed >> 2); +} + +} // namespace + namespace uprotocol::transport { /// @brief Zenoh implementation of UTransport @@ -65,7 +96,37 @@ struct ZenohUTransport : public UTransport { /// @brief Represents the callable end of a callback connection. using CallableConn = typename UTransport::CallableConn; using UuriKey = std::string; + + struct ListenerKey { + CallableConn listener; + std::string zenoh_key; + + ListenerKey(CallableConn listener, const std::string& zenoh_key) + : listener(listener), zenoh_key(zenoh_key) {} + + bool operator==(const ListenerKey& other) const { + return listener == other.listener && zenoh_key == other.zenoh_key; + } + }; + + struct ListenerKeyHash { + std::size_t operator()(const ListenerKey& key) const { + std::hash hashCallableConn; + std::size_t seed = hashCallableConn(key.listener); + + std::hash hashString; + hashCombine(seed, hashString(key.zenoh_key)); + + return seed; + } + }; + using RpcCallbackMap = std::unordered_map; + using SubscriberMap = + std::unordered_map; + using QueryableMap = + std::unordered_map; + using QueryMap = std::unordered_map; /// @brief Register listener to be called when UMessage is received /// for the given URI. @@ -100,9 +161,14 @@ struct ZenohUTransport : public UTransport { virtual void cleanupListener(CallableConn listener) override; private: - v1::UStatus sendPublishNotification_(const std::string& zenoh_key, - const std::string& payload, - const v1::UAttributes& attributes); + v1::UStatus registerRequestListener_(const std::string& zenoh_key, + CallableConn listener); + + v1::UStatus registerResponseListener_(const std::string& zenoh_key, + CallableConn listener); + + v1::UStatus registerPublishNotificationListener_( + const std::string& zenoh_key, CallableConn listener); v1::UStatus sendRequest_(const std::string& zenoh_key, const std::string& payload, @@ -111,10 +177,23 @@ struct ZenohUTransport : public UTransport { v1::UStatus sendResponse_(const std::string& payload, const v1::UAttributes& attributes); + v1::UStatus sendPublishNotification_(const std::string& zenoh_key, + const std::string& payload, + const v1::UAttributes& attributes); + zenoh::Session session_; - RpcCallbackMap rpcCallbackMap_; - std::mutex rpcCallbackMapMutex_; + RpcCallbackMap rpc_callback_map_; + std::mutex rpc_callback_map_mutex_; + + SubscriberMap subscriber_map_; + std::mutex subscriber_map_mutex_; + + QueryableMap queryable_map_; + std::mutex queryable_map_mutex_; + + QueryMap query_map_; + std::mutex query_map_mutex_; }; } // namespace uprotocol::transport diff --git a/src/ZenohUTransport.cpp b/src/ZenohUTransport.cpp index c299fe1..e3f4391 100644 --- a/src/ZenohUTransport.cpp +++ b/src/ZenohUTransport.cpp @@ -10,7 +10,9 @@ // SPDX-License-Identifier: Apache-2.0 #include +#include #include +#include #include @@ -18,95 +20,96 @@ namespace uprotocol::transport { +const char UATTRIBUTE_VERSION = 1; + using namespace zenoh; using namespace uprotocol::v1; using namespace uprotocol::datamodel; namespace { -/* -std::string to_hex_string(uint8_t byte) { - std::stringstream ss; - ss << std::hex << std::setw(2) << std::setfill('0') - << static_cast(byte); - return ss.str(); + +UStatus uError(UCode code, std::string_view message) { + UStatus status; + status.set_code(code); + status.set_message(std::string(message)); + return status; } -std::string get_uauth_from_uuri(const UUri& uri) { - if (!uri.authority_name().empty()) { - std::vector buf; - try { - buf = uri.to_bytes(); - } catch (const std::runtime_error& e) { - std::string msg = "Unable to transform UAuthority into micro form"; - std::cerr << msg << std::endl; - throw UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg); - } - - std::ostringstream oss; - for (auto c : buf) { - oss << std::hex << std::setw(2) << std::setfill('0') - << static_cast(c); - } - return oss.str(); - } else { - std::string msg = "UAuthority is empty"; - std::cerr << msg << std::endl; - throw UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg); - } +std::string transformAuthority(const std::string& authority) { + std::string result; + for (char c : authority) { + if (std::isalnum(c)) { + result += std::tolower(c); + } else if (c == '.' || c == '-' || c == '_' || c == '*') { + result += c; + } else if (c == '$') { + result += "{dollar}"; + } else { + result += "{}"; + } + } + return result; } -*/ -std::string toZenohKeyString(const UUri& uri) { - /* - if (uri.authority_name().has_value() && !uri.entity.has_value() && - !uri.resource.has_value()) { - try { - return "upr/" + UPClientZenoh::get_uauth_from_uuri(uri) + "/**"; - } catch (const std::runtime_error& e) { - std::string msg = - "Unable to get authority from UUri: " + std::string(e.what()); - std::cerr << msg << std::endl; - throw UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg); - } + +// TODO: FFFF -> * not clear for spec +std::string toZenohKeyString(const UUri& source, + const std::optional& sink) { + std::ostringstream zenoh_key; + zenoh_key << std::uppercase << std::hex << std::setw(8) + << std::setfill('0'); + + zenoh_key << "up/"; + + zenoh_key << transformAuthority(source.authority_name()) << "/"; + zenoh_key << source.ue_id() << "/"; + zenoh_key << source.ue_version_major() << "/"; + zenoh_key << source.resource_id() << "/"; + + if (sink.has_value()) { + zenoh_key << transformAuthority(sink->authority_name()) << "/"; + zenoh_key << sink->ue_id() << "/"; + zenoh_key << sink->ue_version_major() << "/"; + zenoh_key << sink->resource_id(); } else { - std::vector micro_uuri; - try { - micro_uuri = uri.to_bytes(); - } catch (const std::runtime_error& e) { - std::string msg = "Unable to serialize into micro format: " + - std::string(e.what()); - std::cerr << msg << std::endl; - throw UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg); - } - - std::string micro_zenohKey; - if (micro_uuri.size() > 8) { - micro_zenohKey = "upr/"; - for (size_t i = 8; i < micro_uuri.size(); ++i) { - micro_zenohKey += to_hex_string(micro_uuri[i]); - } - micro_zenohKey += "/"; - } else { - micro_zenohKey = "upl/"; - } - - for (size_t i = 0; i < 8 && i < micro_uuri.size(); ++i) { - micro_zenohKey += to_hex_string(micro_uuri[i]); - } - - return micro_zenohKey; + zenoh_key << "/{}/{}/{}/{}"; } - */ - return ""; + return zenoh_key.str(); } -std::map uattributesToAttachment( +std::vector> uattributesToAttachment( const UAttributes& attributes) { - std::map res; + std::vector> res; + + std::string version(&UATTRIBUTE_VERSION, 1); + + std::string data; + attributes.SerializeToString(&data); + + res.push_back(std::make_pair("", version)); + res.push_back(std::make_pair("", data)); return res; } -UAttributes attachmentToUattributes(const AttachmentView& attachment) { +UAttributes attachmentToUAttributes(const AttachmentView& attachment) { + std::vector attachment_vec; + attachment.iterate( + [&](const BytesView& key, const BytesView& value) -> bool { + attachment_vec.push_back(value); + return true; + }); + + if (attachment_vec.size() != 2) { + // TODO: error report, exception? + } + + if (attachment_vec[0].get_len() == 1) { + if (attachment_vec[0].as_string_view()[0] != UATTRIBUTE_VERSION) { + // TODO: error report, exception? + } + }; UAttributes res; + // TODO: more efficient way? + res.ParseFromString(std::string(attachment_vec[1].as_string_view())); return res; } @@ -132,6 +135,19 @@ Priority mapZenohPriority(UPriority upriority) { } } +UMessage sampleToUMessage(const Sample& sample) { + UAttributes attributes; + if (sample.get_attachment().check()) { + attributes = attachmentToUAttributes(sample.get_attachment()); + } + std::string payload(sample.get_payload().as_string_view()); + UMessage message; + message.set_payload(payload); + message.set_allocated_attributes(&attributes); + + return message; +} + } // namespace ZenohUTransport::ZenohUTransport(const UUri& defaultUri, @@ -140,57 +156,76 @@ ZenohUTransport::ZenohUTransport(const UUri& defaultUri, session_(expect(open( std::move(expect(config_from_file(configFile.string().c_str())))))) {} -UStatus ZenohUTransport::sendPublishNotification_( - const std::string& zenohKey, const std::string& payload, - const UAttributes& attributes) { - auto attachment = uattributesToAttachment(attributes); +UStatus ZenohUTransport::registerRequestListener_(const std::string& zenoh_key, + CallableConn listener) { + auto on_query = [&](const Query& query) { + UAttributes attributes; + if (query.get_attachment().check()) { + attributes = attachmentToUAttributes(query.get_attachment()); + } + auto id_str = serializer::uuid::AsString().serialize(attributes.id()); + std::unique_lock lock(query_map_mutex_); + query_map_.insert(std::make_pair( + std::move(id_str), std::move(std::make_shared(query)))); + }; - auto priority = mapZenohPriority(attributes.priority()); + auto on_drop_queryable = []() {}; - PutOptions options; - options.set_encoding(Z_ENCODING_PREFIX_APP_CUSTOM); - options.set_priority(priority); - options.set_attachment(attachment); - if (!session_.put(zenohKey, payload, options)) { - return UStatus(); // TODO: UCode::INTERNAL, "Unable to send with Zenoh" - } + auto queryable = expect( + session_.declare_queryable(zenoh_key, {on_query, on_drop_queryable})); + + return UStatus(); +} + +UStatus ZenohUTransport::registerResponseListener_(const std::string& zenoh_key, + CallableConn listener) { + std::unique_lock lock(rpc_callback_map_mutex_); + rpc_callback_map_.insert(std::make_pair(zenoh_key, listener)); + + return UStatus(); +} + +UStatus ZenohUTransport::registerPublishNotificationListener_( + const std::string& zenoh_key, CallableConn listener) { + auto data_handler = [&](const Sample& sample) { + (*listener)(sampleToUMessage(sample)); + // invoke_nonblock_callback(&cb_sender, &listener_cloned, Ok(msg)); + }; + auto key = ListenerKey(listener, zenoh_key); + auto subscriber = expect( + session_.declare_subscriber(zenoh_key, data_handler)); + { + std::unique_lock lock(subscriber_map_mutex_); + subscriber_map_.insert( + std::make_pair(std::move(key), std::move(subscriber))); + } return UStatus(); } -UStatus ZenohUTransport::sendRequest_(const std::string& zenohKey, +UStatus ZenohUTransport::sendRequest_(const std::string& zenoh_key, const std::string& payload, const UAttributes& attributes) { - auto uuidStr = serializer::uri::AsString().serialize(attributes.source()); - CallableConn respCallback; + auto source_str = + serializer::uri::AsString().serialize(attributes.source()); + CallableConn resp_callback; { - std::unique_lock lock(rpcCallbackMapMutex_); - - if (auto respCallbackIt = rpcCallbackMap_.find(uuidStr); - respCallbackIt == rpcCallbackMap_.end()) { - return UStatus(); // TODO: UCode::UNAVAILABLE, "failed to find UUID - // = {}", uuidStr + std::unique_lock lock(rpc_callback_map_mutex_); + if (auto resp_callback_it = rpc_callback_map_.find(source_str); + resp_callback_it == rpc_callback_map_.end()) { + return uError(UCode::UNAVAILABLE, "failed to find UUID"); } else { - respCallback = respCallbackIt->second; + resp_callback = resp_callback_it->second; } } - auto onReply = [&](Reply&& reply) { + auto on_reply = [&](Reply&& reply) { auto result = reply.get(); if (auto sample = std::get_if(&result)) { - UAttributes attributes; - if (sample->get_attachment().check()) { - attributes = attachmentToUattributes(sample->get_attachment()); - } - std::string payload(sample->get_payload().as_string_view()); - UMessage message; - message.set_payload(payload); - message.set_allocated_attributes(&attributes); - (*respCallback)(message); + (*resp_callback)(sampleToUMessage(*sample)); } else if (auto error = std::get_if(&result)) { - std::cout << "Received an error :" << error->as_string_view() - << "\n"; + // TODO: error report } }; @@ -203,56 +238,103 @@ UStatus ZenohUTransport::sendRequest_(const std::string& zenohKey, auto onDone = []() {}; - session_.get(zenohKey, "", {onReply, onDone}, options); + session_.get(zenoh_key, "", {on_reply, onDone}, options); return UStatus(); } UStatus ZenohUTransport::sendResponse_(const std::string& payload, const UAttributes& attributes) { + auto reqid_str = serializer::uuid::AsString().serialize(attributes.reqid()); + OwnedQueryPtr query; + { + std::unique_lock lock(query_map_mutex_); + if (auto query_it = query_map_.find(reqid_str); + query_it == query_map_.end()) { + return uError(UCode::INTERNAL, "query doesn't exist"); + } else { + query = query_it->second; + } + } + + QueryReplyOptions options; + query->loan().reply(query->loan().get_keyexpr().as_string_view(), payload, + options); + + return UStatus(); +} + +UStatus ZenohUTransport::sendPublishNotification_( + const std::string& zenoh_key, const std::string& payload, + const UAttributes& attributes) { + auto attachment = uattributesToAttachment(attributes); + + auto priority = mapZenohPriority(attributes.priority()); + + PutOptions options; + options.set_encoding(Z_ENCODING_PREFIX_APP_CUSTOM); + options.set_priority(priority); + options.set_attachment(attachment); + if (!session_.put(zenoh_key, payload, options)) { + return uError(UCode::INTERNAL, "Unable to send with Zenoh"); + } + return UStatus(); } v1::UStatus ZenohUTransport::sendImpl(const UMessage& message) { if (!message.has_payload()) { - std::string msg = "Invalid UPayload"; - return UStatus(); // TODO: UCode::INVALID_ARGUMENT, msg + return uError(UCode::INVALID_ARGUMENT, "Invalid UPayload"); } const auto& payload = message.payload(); const auto& attributes = message.attributes(); if (attributes.type() == UMessageType::UMESSAGE_TYPE_UNSPECIFIED) { - std::string msg = "Invalid UAttributes"; - return UStatus(); // TODO: UCode::INVALID_ARGUMENT, msg + return uError(UCode::INVALID_ARGUMENT, "Invalid UAttributes"); } + std::string zenoh_key = + toZenohKeyString(attributes.sink(), attributes.source()); switch (attributes.type()) { case UMessageType::UMESSAGE_TYPE_PUBLISH: { - auto res = validator::message::isValidPublish(message); - // TODO: check res - std::string zenohKey = toZenohKeyString(attributes.source()); - return sendPublishNotification_(zenohKey, payload, attributes); + auto [valid, maybe_reason] = + validator::message::isValidPublish(message); + if (!valid) { + return uError(UCode::INVALID_ARGUMENT, + validator::message::message(*maybe_reason)); + } + return sendPublishNotification_(zenoh_key, payload, attributes); } case UMessageType::UMESSAGE_TYPE_NOTIFICATION: { - auto res = validator::message::isValidNotification(message); - // TODO: check res - std::string zenohKey = toZenohKeyString(attributes.sink()); - return sendPublishNotification_(zenohKey, payload, attributes); + auto [valid, maybe_reason] = + validator::message::isValidNotification(message); + if (!valid) { + return uError(UCode::INVALID_ARGUMENT, + validator::message::message(*maybe_reason)); + } + return sendPublishNotification_(zenoh_key, payload, attributes); } case UMessageType::UMESSAGE_TYPE_REQUEST: { - auto res = validator::message::isValidRpcRequest(message); - // TODO: check res - std::string zenohKey = toZenohKeyString(attributes.sink()); - return sendRequest_(zenohKey, payload, attributes); + auto [valid, maybe_reason] = + validator::message::isValidRpcRequest(message); + if (!valid) { + return uError(UCode::INVALID_ARGUMENT, + validator::message::message(*maybe_reason)); + } + return sendRequest_(zenoh_key, payload, attributes); } case UMessageType::UMESSAGE_TYPE_RESPONSE: { - auto res = validator::message::isValidRpcResponse(message); - // TODO: check res + auto [valid, maybe_reason] = + validator::message::isValidRpcResponse(message); + if (!valid) { + return uError(UCode::INVALID_ARGUMENT, + validator::message::message(*maybe_reason)); + } return sendResponse_(payload, attributes); } default: { - std::string msg = "Wrong Message type in UAttributes"; - return UStatus(); // TODO: UCode::INVALID_ARGUMENT, msg + return uError(UCode::INVALID_ARGUMENT, + "Wrong Message type in UAttributes"); } } return UStatus(); @@ -261,6 +343,32 @@ v1::UStatus ZenohUTransport::sendImpl(const UMessage& message) { v1::UStatus ZenohUTransport::registerListenerImpl( const v1::UUri& sink_filter, CallableConn&& listener, std::optional&& source_filter) { + std::string zenoh_key = toZenohKeyString(sink_filter, source_filter); + // TODO: Is 0 == none? + if (!sink_filter.authority_name().empty() && sink_filter.ue_id() == 0 && + sink_filter.resource_id() == 0) { + // This is special UUri which means we need to register for all of + // Publish, Notification, Request, and Response RPC response + registerResponseListener_(zenoh_key, listener); + registerRequestListener_(zenoh_key, listener); + registerPublishNotificationListener_(zenoh_key, listener); + } else { + auto [valid, maybe_reason] = validator::uri::isValid(sink_filter); + if (!valid) { + return uError(UCode::INVALID_ARGUMENT, + validator::uri::message(*maybe_reason)); + } + + if (std::get<0>(validator::uri::isValidRpcResponse(sink_filter))) { + registerResponseListener_(zenoh_key, std::move(listener)); + } else if (std::get<0>(validator::uri::isValidRpcMethod(sink_filter))) { + registerRequestListener_(zenoh_key, std::move(listener)); + } else { + registerPublishNotificationListener_(zenoh_key, + std::move(listener)); + } + } + return v1::UStatus(); } From 9e043a344eb67d74a5c82e6ec60c0a3c7298ca0e Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 14 Jun 2024 18:51:13 +0200 Subject: [PATCH 4/7] Update to use new CallableConn type --- .../up-transport-zenoh-cpp/ZenohUTransport.h | 34 +++++-------------- src/ZenohUTransport.cpp | 4 +-- 2 files changed, 11 insertions(+), 27 deletions(-) diff --git a/include/up-transport-zenoh-cpp/ZenohUTransport.h b/include/up-transport-zenoh-cpp/ZenohUTransport.h index 97570d9..1792dbb 100644 --- a/include/up-transport-zenoh-cpp/ZenohUTransport.h +++ b/include/up-transport-zenoh-cpp/ZenohUTransport.h @@ -42,15 +42,6 @@ using OwnedQueryPtr = std::shared_ptr; } // namespace zenohc -namespace { - -inline void hashCombine(size_t& seed, size_t value) { - // See boost::hash_combine - seed ^= value + 0x9e3779b9 + (seed << 6) + (seed >> 2); -} - -} // namespace - namespace uprotocol::transport { /// @brief Zenoh implementation of UTransport @@ -107,26 +98,19 @@ struct ZenohUTransport : public UTransport { bool operator==(const ListenerKey& other) const { return listener == other.listener && zenoh_key == other.zenoh_key; } - }; - - struct ListenerKeyHash { - std::size_t operator()(const ListenerKey& key) const { - std::hash hashCallableConn; - std::size_t seed = hashCallableConn(key.listener); - - std::hash hashString; - hashCombine(seed, hashString(key.zenoh_key)); - return seed; + bool operator<(const ListenerKey& other) const { + if (listener == other.listener) { + return zenoh_key < other.zenoh_key; + } + return listener < other.listener; } }; - using RpcCallbackMap = std::unordered_map; - using SubscriberMap = - std::unordered_map; - using QueryableMap = - std::unordered_map; - using QueryMap = std::unordered_map; + using RpcCallbackMap = std::map; + using SubscriberMap = std::map; + using QueryableMap = std::map; + using QueryMap = std::map; /// @brief Register listener to be called when UMessage is received /// for the given URI. diff --git a/src/ZenohUTransport.cpp b/src/ZenohUTransport.cpp index e3f4391..4dfbc39 100644 --- a/src/ZenohUTransport.cpp +++ b/src/ZenohUTransport.cpp @@ -188,7 +188,7 @@ UStatus ZenohUTransport::registerResponseListener_(const std::string& zenoh_key, UStatus ZenohUTransport::registerPublishNotificationListener_( const std::string& zenoh_key, CallableConn listener) { auto data_handler = [&](const Sample& sample) { - (*listener)(sampleToUMessage(sample)); + listener(sampleToUMessage(sample)); // invoke_nonblock_callback(&cb_sender, &listener_cloned, Ok(msg)); }; @@ -223,7 +223,7 @@ UStatus ZenohUTransport::sendRequest_(const std::string& zenoh_key, auto result = reply.get(); if (auto sample = std::get_if(&result)) { - (*resp_callback)(sampleToUMessage(*sample)); + resp_callback(sampleToUMessage(*sample)); } else if (auto error = std::get_if(&result)) { // TODO: error report } From d1f577a04b51fb99ce24efff76c7c4f0292168d2 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 21 Jun 2024 20:03:39 +0200 Subject: [PATCH 5/7] Update UUri conversion, UUri conversion test, PubSub test drafting --- CMakeLists.txt | 3 + .../up-transport-zenoh-cpp/ZenohUTransport.h | 19 ++++ src/ZenohUTransport.cpp | 95 +++++++++++-------- test/coverage/ZenohUTransportTest.cpp | 85 +++++++++++++++-- test/extra/PublisherSubscriberTest.cpp | 90 ++++++++++++++++-- 5 files changed, 237 insertions(+), 55 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1c4e37a..5cd93c1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,6 +17,8 @@ find_package(spdlog REQUIRED) find_package(up-core-api REQUIRED) find_package(up-cpp REQUIRED) find_package(zenohcpp REQUIRED) +find_package(GTest REQUIRED) +include(GoogleTest) # TODO NEEDED? #add_definitions(-DSPDLOG_FMT_EXTERNAL) @@ -55,6 +57,7 @@ target_link_libraries(${PROJECT_NAME} up-cpp::up-cpp up-core-api::up-core-api protobuf::libprotobuf + GTest::gtest_main spdlog::spdlog) enable_testing() diff --git a/include/up-transport-zenoh-cpp/ZenohUTransport.h b/include/up-transport-zenoh-cpp/ZenohUTransport.h index 1792dbb..cc86ebb 100644 --- a/include/up-transport-zenoh-cpp/ZenohUTransport.h +++ b/include/up-transport-zenoh-cpp/ZenohUTransport.h @@ -12,6 +12,7 @@ #ifndef UP_TRANSPORT_ZENOH_CPP_ZENOHUTRANSPORT_H #define UP_TRANSPORT_ZENOH_CPP_ZENOHUTRANSPORT_H +#include #include #include @@ -145,6 +146,24 @@ struct ZenohUTransport : public UTransport { virtual void cleanupListener(CallableConn listener) override; private: + FRIEND_TEST(TestZenohUTransport, toZenohKeyString); + + static v1::UStatus uError(v1::UCode code, std::string_view message); + + static std::string toZenohKeyString( + const std::string& default_authority_name, const v1::UUri& source, + const std::optional& sink); + + static std::vector> + uattributesToAttachment(const v1::UAttributes& attributes); + + static v1::UAttributes attachmentToUAttributes( + const zenoh::AttachmentView& attachment); + + static zenoh::Priority mapZenohPriority(v1::UPriority upriority); + + static v1::UMessage sampleToUMessage(const zenoh::Sample& sample); + v1::UStatus registerRequestListener_(const std::string& zenoh_key, CallableConn listener); diff --git a/src/ZenohUTransport.cpp b/src/ZenohUTransport.cpp index 4dfbc39..935a019 100644 --- a/src/ZenohUTransport.cpp +++ b/src/ZenohUTransport.cpp @@ -22,62 +22,76 @@ namespace uprotocol::transport { const char UATTRIBUTE_VERSION = 1; +const uint32_t WILDCARD_ENTITY_ID = 0x0000FFFF; +const uint32_t WILDCARD_ENTITY_VERSION = 0x000000FF; +const uint32_t WILDCARD_RESOURCE_ID = 0x0000FFFF; + using namespace zenoh; using namespace uprotocol::v1; using namespace uprotocol::datamodel; -namespace { - -UStatus uError(UCode code, std::string_view message) { +UStatus ZenohUTransport::uError(UCode code, std::string_view message) { UStatus status; status.set_code(code); status.set_message(std::string(message)); return status; } -std::string transformAuthority(const std::string& authority) { - std::string result; - for (char c : authority) { - if (std::isalnum(c)) { - result += std::tolower(c); - } else if (c == '.' || c == '-' || c == '_' || c == '*') { - result += c; - } else if (c == '$') { - result += "{dollar}"; +std::string ZenohUTransport::toZenohKeyString( + const std::string& default_authority_name, const UUri& source, + const std::optional& sink) { + std::ostringstream zenoh_key; + + auto writeUUri = [&](const v1::UUri& uuri) { + zenoh_key << "/"; + + // authority_name + if (uuri.authority_name().empty()) { + zenoh_key << default_authority_name; } else { - result += "{}"; + zenoh_key << uuri.authority_name(); } - } - return result; -} + zenoh_key << "/"; -// TODO: FFFF -> * not clear for spec -std::string toZenohKeyString(const UUri& source, - const std::optional& sink) { - std::ostringstream zenoh_key; - zenoh_key << std::uppercase << std::hex << std::setw(8) - << std::setfill('0'); + // ue_id + if (uuri.ue_id() == WILDCARD_ENTITY_ID) { + zenoh_key << "*"; + } else { + zenoh_key << uuri.ue_id(); + } + zenoh_key << "/"; - zenoh_key << "up/"; + // ue_version_major + if (uuri.ue_version_major() == WILDCARD_ENTITY_VERSION) { + zenoh_key << "*"; + } else { + zenoh_key << uuri.ue_version_major(); + } + zenoh_key << "/"; - zenoh_key << transformAuthority(source.authority_name()) << "/"; - zenoh_key << source.ue_id() << "/"; - zenoh_key << source.ue_version_major() << "/"; - zenoh_key << source.resource_id() << "/"; + // resource_id + if (uuri.resource_id() == WILDCARD_RESOURCE_ID) { + zenoh_key << "*"; + } else { + zenoh_key << uuri.resource_id(); + } + }; + + zenoh_key << "up"; + zenoh_key << std::uppercase << std::hex; + + writeUUri(source); if (sink.has_value()) { - zenoh_key << transformAuthority(sink->authority_name()) << "/"; - zenoh_key << sink->ue_id() << "/"; - zenoh_key << sink->ue_version_major() << "/"; - zenoh_key << sink->resource_id(); + writeUUri(*sink); } else { zenoh_key << "/{}/{}/{}/{}"; } return zenoh_key.str(); } -std::vector> uattributesToAttachment( - const UAttributes& attributes) { +std::vector> +ZenohUTransport::uattributesToAttachment(const UAttributes& attributes) { std::vector> res; std::string version(&UATTRIBUTE_VERSION, 1); @@ -90,7 +104,8 @@ std::vector> uattributesToAttachment( return res; } -UAttributes attachmentToUAttributes(const AttachmentView& attachment) { +UAttributes ZenohUTransport::attachmentToUAttributes( + const AttachmentView& attachment) { std::vector attachment_vec; attachment.iterate( [&](const BytesView& key, const BytesView& value) -> bool { @@ -113,7 +128,7 @@ UAttributes attachmentToUAttributes(const AttachmentView& attachment) { return res; } -Priority mapZenohPriority(UPriority upriority) { +Priority ZenohUTransport::mapZenohPriority(UPriority upriority) { switch (upriority) { case UPriority::UPRIORITY_CS0: return Z_PRIORITY_BACKGROUND; @@ -135,7 +150,7 @@ Priority mapZenohPriority(UPriority upriority) { } } -UMessage sampleToUMessage(const Sample& sample) { +UMessage ZenohUTransport::sampleToUMessage(const Sample& sample) { UAttributes attributes; if (sample.get_attachment().check()) { attributes = attachmentToUAttributes(sample.get_attachment()); @@ -148,8 +163,6 @@ UMessage sampleToUMessage(const Sample& sample) { return message; } -} // namespace - ZenohUTransport::ZenohUTransport(const UUri& defaultUri, const std::filesystem::path& configFile) : UTransport(defaultUri), @@ -294,7 +307,8 @@ v1::UStatus ZenohUTransport::sendImpl(const UMessage& message) { } std::string zenoh_key = - toZenohKeyString(attributes.sink(), attributes.source()); + toZenohKeyString(getDefaultSource().authority_name(), attributes.sink(), + attributes.source()); switch (attributes.type()) { case UMessageType::UMESSAGE_TYPE_PUBLISH: { auto [valid, maybe_reason] = @@ -343,7 +357,8 @@ v1::UStatus ZenohUTransport::sendImpl(const UMessage& message) { v1::UStatus ZenohUTransport::registerListenerImpl( const v1::UUri& sink_filter, CallableConn&& listener, std::optional&& source_filter) { - std::string zenoh_key = toZenohKeyString(sink_filter, source_filter); + std::string zenoh_key = toZenohKeyString( + getDefaultSource().authority_name(), sink_filter, source_filter); // TODO: Is 0 == none? if (!sink_filter.authority_name().empty() && sink_filter.ue_id() == 0 && sink_filter.resource_id() == 0) { diff --git a/test/coverage/ZenohUTransportTest.cpp b/test/coverage/ZenohUTransportTest.cpp index 45c0352..4f281ff 100644 --- a/test/coverage/ZenohUTransportTest.cpp +++ b/test/coverage/ZenohUTransportTest.cpp @@ -10,11 +10,21 @@ // SPDX-License-Identifier: Apache-2.0 #include -#include +#include +#include -namespace { +#include -class TestFixture : public testing::Test { +#include "up-transport-zenoh-cpp/ZenohUTransport.h" + +namespace uprotocol::transport { + +using namespace uprotocol::v1; +using namespace uprotocol::transport; + +constexpr const char* AUTHORITY_NAME = "test"; + +class TestZenohUTransport : public testing::Test { protected: // Run once per TEST_F. // Used to set up clean environments per test. @@ -23,8 +33,8 @@ class TestFixture : public testing::Test { // Run once per execution of the test application. // Used for setup of all tests. Has access to this instance. - TestFixture() = default; - ~TestFixture() = default; + TestZenohUTransport() = default; + ~TestZenohUTransport() = default; // Run once per execution of the test application. // Used only for global setup outside of tests. @@ -32,7 +42,66 @@ class TestFixture : public testing::Test { static void TearDownTestSuite() {} }; -// TODO replace -TEST_F(TestFixture, SomeTestName) {} +uprotocol::v1::UUri create_uuri(const std::string& authority, uint32_t ue_id, + uint32_t ue_version_major, + uint32_t resource_id) { + uprotocol::v1::UUri uuri; + uuri.set_authority_name(authority); + uuri.set_ue_id(ue_id); + uuri.set_ue_version_major(ue_version_major); + uuri.set_resource_id(resource_id); + + return uuri; +} + +// TODO(sashacmc): config generation +TEST_F(TestZenohUTransport, ConstructDestroy) { + uprotocol::v1::UUri def_src_uuri; + def_src_uuri.set_authority_name(AUTHORITY_NAME); + def_src_uuri.set_ue_id(0x18000); + def_src_uuri.set_ue_version_major(1); + def_src_uuri.set_resource_id(0); + + zenoh::init_logger(); + try { + auto ut = ZenohUTransport(def_src_uuri, + "/home/sashacmc/src/up-client-zenoh-cpp/test/" + "extra/DEFAULT_CONFIG.json5"); + } catch (zenoh::ErrorMessage& e) { + throw std::runtime_error(std::string(e.as_string_view())); + } +} + +TEST_F(TestZenohUTransport, toZenohKeyString) { + EXPECT_EQ( + ZenohUTransport::toZenohKeyString( + "", create_uuri("192.168.1.100", 0x10AB, 3, 0x80CD), std::nullopt), + "up/192.168.1.100/10AB/3/80CD/{}/{}/{}/{}"); + + EXPECT_EQ(ZenohUTransport::toZenohKeyString( + "", create_uuri("192.168.1.100", 0x10AB, 3, 0x80CD), + create_uuri("192.168.1.101", 0x20EF, 4, 0)), + "up/192.168.1.100/10AB/3/80CD/192.168.1.101/20EF/4/0"); + + EXPECT_EQ(ZenohUTransport::toZenohKeyString( + "", create_uuri("*", 0xFFFF, 0xFF, 0xFFFF), + create_uuri("192.168.1.101", 0x20EF, 4, 0)), + "up/*/*/*/*/192.168.1.101/20EF/4/0"); + + EXPECT_EQ(ZenohUTransport::toZenohKeyString( + "", create_uuri("my-host1", 0x10AB, 3, 0), + create_uuri("my-host2", 0x20EF, 4, 0xB)), + "up/my-host1/10AB/3/0/my-host2/20EF/4/B"); + + EXPECT_EQ(ZenohUTransport::toZenohKeyString( + "", create_uuri("*", 0xFFFF, 0xFF, 0xFFFF), + create_uuri("my-host2", 0x20EF, 4, 0xB)), + "up/*/*/*/*/my-host2/20EF/4/B"); + + EXPECT_EQ(ZenohUTransport::toZenohKeyString( + "", create_uuri("*", 0xFFFF, 0xFF, 0xFFFF), + create_uuri("[::1]", 0xFFFF, 0xFF, 0xFFFF)), + "up/*/*/*/*/[::1]/*/*/*"); +} -} // namespace +} // namespace uprotocol::transport diff --git a/test/extra/PublisherSubscriberTest.cpp b/test/extra/PublisherSubscriberTest.cpp index 5ba082f..d3fc132 100644 --- a/test/extra/PublisherSubscriberTest.cpp +++ b/test/extra/PublisherSubscriberTest.cpp @@ -9,13 +9,22 @@ // // SPDX-License-Identifier: Apache-2.0 +#include #include #include +#include + +#include #include "up-transport-zenoh-cpp/ZenohUTransport.h" namespace { +using namespace uprotocol::v1; +using namespace uprotocol::transport; + +constexpr const char* AUTHORITY_NAME = "test"; + class TestFixture : public testing::Test { protected: // Run once per TEST_F. @@ -34,17 +43,84 @@ class TestFixture : public testing::Test { static void TearDownTestSuite() {} }; -using namespace uprotocol::v1; -using namespace uprotocol::transport; +using MsgDiff = google::protobuf::util::MessageDifferencer; + +uprotocol::v1::UUID* make_uuid() { + uint64_t timestamp = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + auto id = new uprotocol::v1::UUID(); + id->set_msb((timestamp << 16) | (8ULL << 12) | + (0x123ULL)); // version 8 ; counter = 0x123 + id->set_lsb((2ULL << 62) | (0xFFFFFFFFFFFFULL)); // variant 10 + return id; +} -// TODO replace +// TODO(sashacmc): config generation TEST_F(TestFixture, PubSub) { UUri uuri; - uuri.set_ue_id(1); + + uuri.set_authority_name(AUTHORITY_NAME); + uuri.set_ue_id(0x00010001); uuri.set_ue_version_major(1); - uuri.set_resource_id(1); + uuri.set_resource_id(0); + + zenoh::init_logger(); + try { + std::cerr << "Test MESSAGE" << std::endl; + auto ut = ZenohUTransport(uuri, + "/home/sashacmc/src/up-client-zenoh-cpp/test/" + "extra/DEFAULT_CONFIG.json5"); + + uprotocol::v1::UUri sink_filter; + sink_filter.set_authority_name(AUTHORITY_NAME); + sink_filter.set_ue_id(0x00010001); + sink_filter.set_ue_version_major(1); + sink_filter.set_resource_id(0x8000); + + uprotocol::v1::UUri source_filter; + source_filter.set_authority_name(AUTHORITY_NAME); + source_filter.set_ue_id(0x00010001); + source_filter.set_ue_version_major(1); + source_filter.set_resource_id(0x8000); + + uprotocol::v1::UMessage capture_msg; + size_t capture_count = 0; + auto action = [&](const uprotocol::v1::UMessage& msg) { + capture_msg = msg; + capture_count++; + }; + auto lhandle = ut.registerListener(sink_filter, action, source_filter); + EXPECT_TRUE(lhandle.has_value()); + auto handle = std::move(lhandle).value(); + EXPECT_TRUE(handle); + + const size_t max_count = 1; // 1000 * 100; + for (auto i = 0; i < max_count; i++) { + auto src = new uprotocol::v1::UUri(); + src->set_authority_name(AUTHORITY_NAME); + src->set_ue_id(0x00010001); + src->set_ue_version_major(1); + src->set_resource_id(0x8000); + + auto attr = new uprotocol::v1::UAttributes(); + attr->set_type(uprotocol::v1::UMESSAGE_TYPE_PUBLISH); + attr->set_allocated_source(src); + attr->set_allocated_id(make_uuid()); + attr->set_payload_format(uprotocol::v1::UPAYLOAD_FORMAT_PROTOBUF); + attr->set_ttl(1000); - auto ut = - ZenohUTransport(uuri, "/home/sashacmc/src/zenoh/DEFAULT_CONFIG.json5"); + uprotocol::v1::UMessage msg; + msg.set_allocated_attributes(attr); + msg.set_payload("payload"); + auto result = ut.send(msg); + EXPECT_EQ(i + 1, capture_count); + EXPECT_TRUE(MsgDiff::Equals(msg, capture_msg)); + } + handle.reset(); + } catch (zenoh::ErrorMessage& e) { + throw std::runtime_error(std::string(e.as_string_view())); + } } } // namespace From feff2b6524383ed3a7d0bca14f98e3dc3dd008bf Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Tue, 23 Jul 2024 12:17:22 +0200 Subject: [PATCH 6/7] Apply build_fixes.patch --- .../up-transport-zenoh-cpp/ZenohUTransport.h | 11 ++++----- test/coverage/ZenohUTransportTest.cpp | 23 ++++++++++++------- test/extra/PublisherSubscriberTest.cpp | 2 -- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/include/up-transport-zenoh-cpp/ZenohUTransport.h b/include/up-transport-zenoh-cpp/ZenohUTransport.h index cc86ebb..a50f5da 100644 --- a/include/up-transport-zenoh-cpp/ZenohUTransport.h +++ b/include/up-transport-zenoh-cpp/ZenohUTransport.h @@ -12,13 +12,14 @@ #ifndef UP_TRANSPORT_ZENOH_CPP_ZENOHUTRANSPORT_H #define UP_TRANSPORT_ZENOH_CPP_ZENOHUTRANSPORT_H -#include #include #include #include #include #include + +#define ZENOHCXX_ZENOHC #include namespace zenohc { @@ -145,15 +146,13 @@ struct ZenohUTransport : public UTransport { /// @param listener shared_ptr of the Connection that has been broken. virtual void cleanupListener(CallableConn listener) override; -private: - FRIEND_TEST(TestZenohUTransport, toZenohKeyString); - - static v1::UStatus uError(v1::UCode code, std::string_view message); - static std::string toZenohKeyString( const std::string& default_authority_name, const v1::UUri& source, const std::optional& sink); +private: + static v1::UStatus uError(v1::UCode code, std::string_view message); + static std::vector> uattributesToAttachment(const v1::UAttributes& attributes); diff --git a/test/coverage/ZenohUTransportTest.cpp b/test/coverage/ZenohUTransportTest.cpp index 4f281ff..8926e7c 100644 --- a/test/coverage/ZenohUTransportTest.cpp +++ b/test/coverage/ZenohUTransportTest.cpp @@ -13,8 +13,6 @@ #include #include -#include - #include "up-transport-zenoh-cpp/ZenohUTransport.h" namespace uprotocol::transport { @@ -72,33 +70,42 @@ TEST_F(TestZenohUTransport, ConstructDestroy) { } } +struct ExposeKeyString : public ZenohUTransport { + template + static auto toZenohKeyString(Args&&... args) { + return ZenohUTransport::toZenohKeyString(std::forward(args)...); + } +}; + TEST_F(TestZenohUTransport, toZenohKeyString) { + EXPECT_TRUE((std::is_base_of_v)); + EXPECT_EQ( - ZenohUTransport::toZenohKeyString( + ExposeKeyString::toZenohKeyString( "", create_uuri("192.168.1.100", 0x10AB, 3, 0x80CD), std::nullopt), "up/192.168.1.100/10AB/3/80CD/{}/{}/{}/{}"); - EXPECT_EQ(ZenohUTransport::toZenohKeyString( + EXPECT_EQ(ExposeKeyString::toZenohKeyString( "", create_uuri("192.168.1.100", 0x10AB, 3, 0x80CD), create_uuri("192.168.1.101", 0x20EF, 4, 0)), "up/192.168.1.100/10AB/3/80CD/192.168.1.101/20EF/4/0"); - EXPECT_EQ(ZenohUTransport::toZenohKeyString( + EXPECT_EQ(ExposeKeyString::toZenohKeyString( "", create_uuri("*", 0xFFFF, 0xFF, 0xFFFF), create_uuri("192.168.1.101", 0x20EF, 4, 0)), "up/*/*/*/*/192.168.1.101/20EF/4/0"); - EXPECT_EQ(ZenohUTransport::toZenohKeyString( + EXPECT_EQ(ExposeKeyString::toZenohKeyString( "", create_uuri("my-host1", 0x10AB, 3, 0), create_uuri("my-host2", 0x20EF, 4, 0xB)), "up/my-host1/10AB/3/0/my-host2/20EF/4/B"); - EXPECT_EQ(ZenohUTransport::toZenohKeyString( + EXPECT_EQ(ExposeKeyString::toZenohKeyString( "", create_uuri("*", 0xFFFF, 0xFF, 0xFFFF), create_uuri("my-host2", 0x20EF, 4, 0xB)), "up/*/*/*/*/my-host2/20EF/4/B"); - EXPECT_EQ(ZenohUTransport::toZenohKeyString( + EXPECT_EQ(ExposeKeyString::toZenohKeyString( "", create_uuri("*", 0xFFFF, 0xFF, 0xFFFF), create_uuri("[::1]", 0xFFFF, 0xFF, 0xFFFF)), "up/*/*/*/*/[::1]/*/*/*"); diff --git a/test/extra/PublisherSubscriberTest.cpp b/test/extra/PublisherSubscriberTest.cpp index d3fc132..ee83874 100644 --- a/test/extra/PublisherSubscriberTest.cpp +++ b/test/extra/PublisherSubscriberTest.cpp @@ -14,8 +14,6 @@ #include #include -#include - #include "up-transport-zenoh-cpp/ZenohUTransport.h" namespace { From cb9f1c73426c431b92c9ceb35a3ff928065a644d Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Wed, 24 Jul 2024 13:21:02 +0200 Subject: [PATCH 7/7] Update version, bug fix --- conanfile.txt | 4 +- .../up-transport-zenoh-cpp/ZenohUTransport.h | 2 + src/ZenohUTransport.cpp | 55 ++++++++++++++----- test/extra/PublisherSubscriberTest.cpp | 19 +++---- 4 files changed, 52 insertions(+), 28 deletions(-) diff --git a/conanfile.txt b/conanfile.txt index dfaed33..6ec79c2 100644 --- a/conanfile.txt +++ b/conanfile.txt @@ -1,6 +1,6 @@ [requires] -up-cpp/0.2.0 -zenohcpp/0.11.0 +up-cpp/[>=1.1.0] +zenohcpp/0.11.0.3 # Should result in using the packages from up-cpp spdlog/[>=1.13.0] up-core-api/[>=1.5.8] diff --git a/include/up-transport-zenoh-cpp/ZenohUTransport.h b/include/up-transport-zenoh-cpp/ZenohUTransport.h index a50f5da..76e35bd 100644 --- a/include/up-transport-zenoh-cpp/ZenohUTransport.h +++ b/include/up-transport-zenoh-cpp/ZenohUTransport.h @@ -163,6 +163,8 @@ struct ZenohUTransport : public UTransport { static v1::UMessage sampleToUMessage(const zenoh::Sample& sample); + static v1::UMessage queryToUMessage(const zenoh::Query& query); + v1::UStatus registerRequestListener_(const std::string& zenoh_key, CallableConn listener); diff --git a/src/ZenohUTransport.cpp b/src/ZenohUTransport.cpp index 935a019..0d6571c 100644 --- a/src/ZenohUTransport.cpp +++ b/src/ZenohUTransport.cpp @@ -151,14 +151,29 @@ Priority ZenohUTransport::mapZenohPriority(UPriority upriority) { } UMessage ZenohUTransport::sampleToUMessage(const Sample& sample) { - UAttributes attributes; + UMessage message; + + std::string payload(sample.get_payload().as_string_view()); + message.set_payload(payload); + if (sample.get_attachment().check()) { - attributes = attachmentToUAttributes(sample.get_attachment()); + *message.mutable_attributes() = + attachmentToUAttributes(sample.get_attachment()); } - std::string payload(sample.get_payload().as_string_view()); + + return message; +} + +UMessage ZenohUTransport::queryToUMessage(const Query& query) { UMessage message; + + std::string payload(query.get_value().as_string_view()); message.set_payload(payload); - message.set_allocated_attributes(&attributes); + + if (query.get_attachment().check()) { + *message.mutable_attributes() = + attachmentToUAttributes(query.get_attachment()); + } return message; } @@ -171,21 +186,27 @@ ZenohUTransport::ZenohUTransport(const UUri& defaultUri, UStatus ZenohUTransport::registerRequestListener_(const std::string& zenoh_key, CallableConn listener) { - auto on_query = [&](const Query& query) { + std::cout << "!!! registerRequestListener_" << std::endl; + + auto on_query = [listener, this](const Query& query) mutable { UAttributes attributes; if (query.get_attachment().check()) { attributes = attachmentToUAttributes(query.get_attachment()); } auto id_str = serializer::uuid::AsString().serialize(attributes.id()); - std::unique_lock lock(query_map_mutex_); - query_map_.insert(std::make_pair( - std::move(id_str), std::move(std::make_shared(query)))); + { + std::unique_lock lock(this->query_map_mutex_); + this->query_map_.insert( + std::make_pair(std::move(id_str), + std::move(std::make_shared(query)))); + } + listener(queryToUMessage(query)); }; auto on_drop_queryable = []() {}; - auto queryable = expect( - session_.declare_queryable(zenoh_key, {on_query, on_drop_queryable})); + auto queryable = expect(session_.declare_queryable( + zenoh_key, {std::move(on_query), std::move(on_drop_queryable)})); return UStatus(); } @@ -200,14 +221,16 @@ UStatus ZenohUTransport::registerResponseListener_(const std::string& zenoh_key, UStatus ZenohUTransport::registerPublishNotificationListener_( const std::string& zenoh_key, CallableConn listener) { - auto data_handler = [&](const Sample& sample) { + std::cout << "!!! registerPublishNotificationListener_: " << zenoh_key + << std::endl; + + auto data_handler = [listener](const Sample& sample) mutable { listener(sampleToUMessage(sample)); - // invoke_nonblock_callback(&cb_sender, &listener_cloned, Ok(msg)); }; auto key = ListenerKey(listener, zenoh_key); auto subscriber = expect( - session_.declare_subscriber(zenoh_key, data_handler)); + session_.declare_subscriber(zenoh_key, std::move(data_handler))); { std::unique_lock lock(subscriber_map_mutex_); subscriber_map_.insert( @@ -249,15 +272,17 @@ UStatus ZenohUTransport::sendRequest_(const std::string& zenoh_key, options.set_value(Value(payload)); options.set_attachment(attachment); - auto onDone = []() {}; + auto on_done = []() {}; - session_.get(zenoh_key, "", {on_reply, onDone}, options); + session_.get(zenoh_key, "", {std::move(on_reply), std::move(on_done)}, + options); return UStatus(); } UStatus ZenohUTransport::sendResponse_(const std::string& payload, const UAttributes& attributes) { + std::cout << "!!! sendResponse_" << std::endl; auto reqid_str = serializer::uuid::AsString().serialize(attributes.reqid()); OwnedQueryPtr query; { diff --git a/test/extra/PublisherSubscriberTest.cpp b/test/extra/PublisherSubscriberTest.cpp index ee83874..7b0d135 100644 --- a/test/extra/PublisherSubscriberTest.cpp +++ b/test/extra/PublisherSubscriberTest.cpp @@ -43,15 +43,8 @@ class TestFixture : public testing::Test { using MsgDiff = google::protobuf::util::MessageDifferencer; -uprotocol::v1::UUID* make_uuid() { - uint64_t timestamp = - std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - auto id = new uprotocol::v1::UUID(); - id->set_msb((timestamp << 16) | (8ULL << 12) | - (0x123ULL)); // version 8 ; counter = 0x123 - id->set_lsb((2ULL << 62) | (0xFFFFFFFFFFFFULL)); // variant 10 +uprotocol::v1::UUID make_uuid() { + auto id = uprotocol::datamodel::builder::UuidBuilder::getBuilder().build(); return id; } @@ -66,7 +59,6 @@ TEST_F(TestFixture, PubSub) { zenoh::init_logger(); try { - std::cerr << "Test MESSAGE" << std::endl; auto ut = ZenohUTransport(uuri, "/home/sashacmc/src/up-client-zenoh-cpp/test/" "extra/DEFAULT_CONFIG.json5"); @@ -85,16 +77,21 @@ TEST_F(TestFixture, PubSub) { uprotocol::v1::UMessage capture_msg; size_t capture_count = 0; + std::mutex msg_mutex; + std::condition_variable msg_cond; auto action = [&](const uprotocol::v1::UMessage& msg) { + std::cout << "??? action: " << msg.payload() << std::endl; + std::unique_lock lock(msg_mutex); capture_msg = msg; capture_count++; + msg_cond.notify_all(); }; auto lhandle = ut.registerListener(sink_filter, action, source_filter); EXPECT_TRUE(lhandle.has_value()); auto handle = std::move(lhandle).value(); EXPECT_TRUE(handle); - const size_t max_count = 1; // 1000 * 100; + const size_t max_count = 100 * 100; for (auto i = 0; i < max_count; i++) { auto src = new uprotocol::v1::UUri(); src->set_authority_name(AUTHORITY_NAME);