Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/ahcorde/rolling/zenoh_secutiry_c…
Browse files Browse the repository at this point in the history
…onfig' into ahcorde/rolling/zenoh_secutiry_config
  • Loading branch information
ahcorde committed Jan 30, 2025
2 parents 6f2add0 + 7f349b4 commit 29040ff
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 28 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,15 @@ The `ZENOH_ROUTER_CHECK_ATTEMPTS` environment variable can be used to configure
The behavior is explained in the table below.


| ZENOH_ROUTER_CHECK_ATTEMPTS | Session behavior |
|:---------------------------:|:----------------------------------------------------------------------------------------------------------------:|
| unset or 0 | Indefinitely waits for connection to a Zenoh router. |
| < 0 | Skips Zenoh router check. |
| ZENOH_ROUTER_CHECK_ATTEMPTS | Session behavior |
|:---------------------------:|:------------------------------------------------------------------------------------------------------------------:|
| 0 | Indefinitely waits for connection to a Zenoh router. |
| < 0 | Skips Zenoh router check. |
| > 0 | Attempts to connect to a Zenoh router in `ZENOH_ROUTER_CHECK_ATTEMPTS` attempts with 1 second wait between checks. |
| unset | Equivalent to `1`: the check is made only once. |

If after the configured number of attempts the Node is still not connected to a `Zenoh router`, the initialisation goes on anyway.
If a `Zenoh router` is started after initialization phase, the Node will automatically connect to it, and autoconnect to other Nodes if gossip scouting is enabled (true with default configuratiuon).

### Session and Router configs
`rmw_zenoh` relies on separate configurations files to configure the `Zenoh router` and `Zenoh session` respectively.
Expand Down
72 changes: 71 additions & 1 deletion rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@

/// Which endpoints to connect to. E.g. tcp/localhost:7447.
/// By configuring the endpoints, it is possible to tell zenoh which router/peer to connect to at startup.
///
/// For TCP/UDP on Linux, it is possible additionally specify the interface to be connected to:
/// E.g. tcp/192.168.0.1:7447#iface=eth0, for connect only if the IP address is reachable via the interface eth0
///
/// It is also possible to specify a priority range and/or a reliability setting to be used on the link.
/// For example `tcp/localhost?prio=6-7;rel=0` assigns priorities "data_low" and "background" to the established link.
///
/// For TCP and TLS links, it is possible to specify the TCP buffer sizes:
/// E.g. tcp/192.168.0.1:7447#so_sndbuf=65000;so_rcvbuf=65000
connect: {
/// timeout waiting for all endpoints connected (0: no retry, -1: infinite timeout)
/// Accepts a single value (e.g. timeout_ms: 0)
Expand Down Expand Up @@ -52,8 +59,15 @@
/// Which endpoints to listen on. E.g. tcp/0.0.0.0:7447.
/// By configuring the endpoints, it is possible to tell zenoh which are the endpoints that other routers,
/// peers, or client can use to establish a zenoh session.
///
/// For TCP/UDP on Linux, it is possible additionally specify the interface to be listened to:
/// E.g. tcp/0.0.0.0:7447#iface=eth0, for listen connection only on eth0
///
/// It is also possible to specify a priority range and/or a reliability setting to be used on the link.
/// For example `tcp/localhost?prio=6-7;rel=0` assigns priorities "data_low" and "background" to the established link.
///
/// For TCP and TLS links, it is possible to specify the TCP buffer sizes:
/// E.g. tcp/192.168.0.1:7447#so_sndbuf=65000;so_rcvbuf=65000
listen: {
/// timeout waiting for all listen endpoints (0: no retry, -1: infinite timeout)
/// Accepts a single value (e.g. timeout_ms: 0)
Expand Down Expand Up @@ -87,7 +101,18 @@
period_increase_factor: 2,
},
},

/// Configure the session open behavior.
open: {
/// Configure the conditions to be met before session open returns.
return_conditions: {
/// Session open waits to connect to scouted peers and routers before returning.
/// When set to false, first publications and queries after session open from peers may be lost.
connect_scouted: true,
/// Session open waits to receive initial declares from connected peers before returning.
/// Setting to false may cause extra traffic at startup from peers.
declares: true,
},
},
/// Configure the scouting mechanisms and their behaviours
scouting: {
/// In client mode, the period in milliseconds dedicated to scouting for a router before failing.
Expand Down Expand Up @@ -168,6 +193,27 @@
},
},

// /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config for overwritten values)
// qos: {
// /// Overwrite QoS options for PUT and DELETE messages
// publication: [
// {
// /// PUT and DELETE messages on key expressions that are included by these key expressions
// /// will have their QoS options overwritten by the given config.
// key_exprs: ["demo/**", "example/key"],
// /// Configurations that will be applied on the publisher.
// /// Options that are supplied here will overwrite the configuration given in Zenoh API
// config: {
// congestion_control: "block",
// priority: "data_high",
// express: true,
// reliability: "best_effort",
// allowed_destination: "remote",
// },
// },
// ],
// },

// /// The declarations aggregation strategy.
// aggregation: {
// /// A list of key-expressions for which all included subscribers will be aggregated into.
Expand Down Expand Up @@ -209,6 +255,7 @@
// "messages": [
// "put", "delete", "declare_subscriber",
// "query", "reply", "declare_queryable",
// "liveliness_token", "liveliness_query", "declare_liveliness_subscriber",
// ],
// "flows":["egress","ingress"],
// "permission": "allow",
Expand Down Expand Up @@ -293,6 +340,8 @@
transport: {
unicast: {
/// Timeout in milliseconds when opening a link
open_timeout: 10000,
/// Timeout in milliseconds when accepting a link
accept_timeout: 10000,
/// Maximum number of zenoh session in pending state while accepting
accept_pending: 100,
Expand Down Expand Up @@ -395,6 +444,8 @@
drop: {
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available.
wait_before_drop: 1000,
/// The maximum deadline limit for multi-fragment messages.
max_wait_before_drop_fragments: 50000,
},
/// Behavior pushing CongestionControl::Block messages to the queue.
block: {
Expand Down Expand Up @@ -450,7 +501,26 @@
// This could be dangerous because your CA can have signed a server cert for foo.com, that's later being used to host a server at baz.com.
// If you want your ca to verify that the server at baz.com is actually baz.com, let this be true (default).
verify_name_on_connect: true,
// Whether or not to close links when remote certificates expires.
// If set to true, links that require certificates (tls/quic) will automatically disconnect when the time of expiration of the remote certificate chain is reached
// note that mTLS (client authentication) is required for a listener to disconnect a client on expiration
close_link_on_expiration: false,
/// Optional configuration for TCP system buffers sizes for TLS links
///
/// Configure TCP read buffer size (bytes)
// so_rcvbuf: 123456,
/// Configure TCP write buffer size (bytes)
// so_sndbuf: 123456,
},
// // Configure optional TCP link specific parameters
// tcp: {
// /// Optional configuration for TCP system buffers sizes for TCP links
// ///
// /// Configure TCP read buffer size (bytes)
// // so_rcvbuf: 123456,
// /// Configure TCP write buffer size (bytes)
// // so_sndbuf: 123456,
// }
},
/// Shared memory configuration.
/// NOTE: shared memory can be used only if zenoh is compiled with "shared-memory" feature, otherwise
Expand Down
72 changes: 71 additions & 1 deletion rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@

/// Which endpoints to connect to. E.g. tcp/localhost:7447.
/// By configuring the endpoints, it is possible to tell zenoh which router/peer to connect to at startup.
///
/// For TCP/UDP on Linux, it is possible additionally specify the interface to be connected to:
/// E.g. tcp/192.168.0.1:7447#iface=eth0, for connect only if the IP address is reachable via the interface eth0
///
/// It is also possible to specify a priority range and/or a reliability setting to be used on the link.
/// For example `tcp/localhost?prio=6-7;rel=0` assigns priorities "data_low" and "background" to the established link.
///
/// For TCP and TLS links, it is possible to specify the TCP buffer sizes:
/// E.g. tcp/192.168.0.1:7447#so_sndbuf=65000;so_rcvbuf=65000
connect: {
/// timeout waiting for all endpoints connected (0: no retry, -1: infinite timeout)
/// Accepts a single value (e.g. timeout_ms: 0)
Expand Down Expand Up @@ -54,8 +61,15 @@
/// Which endpoints to listen on. E.g. tcp/0.0.0.0:7447.
/// By configuring the endpoints, it is possible to tell zenoh which are the endpoints that other routers,
/// peers, or client can use to establish a zenoh session.
///
/// For TCP/UDP on Linux, it is possible additionally specify the interface to be listened to:
/// E.g. tcp/0.0.0.0:7447#iface=eth0, for listen connection only on eth0
///
/// It is also possible to specify a priority range and/or a reliability setting to be used on the link.
/// For example `tcp/localhost?prio=6-7;rel=0` assigns priorities "data_low" and "background" to the established link.
///
/// For TCP and TLS links, it is possible to specify the TCP buffer sizes:
/// E.g. tcp/192.168.0.1:7447#so_sndbuf=65000;so_rcvbuf=65000
listen: {
/// timeout waiting for all listen endpoints (0: no retry, -1: infinite timeout)
/// Accepts a single value (e.g. timeout_ms: 0)
Expand Down Expand Up @@ -92,7 +106,18 @@
period_increase_factor: 2,
},
},

/// Configure the session open behavior.
open: {
/// Configure the conditions to be met before session open returns.
return_conditions: {
/// Session open waits to connect to scouted peers and routers before returning.
/// When set to false, first publications and queries after session open from peers may be lost.
connect_scouted: true,
/// Session open waits to receive initial declares from connected peers before returning.
/// Setting to false may cause extra traffic at startup from peers.
declares: true,
},
},
/// Configure the scouting mechanisms and their behaviours
scouting: {
/// In client mode, the period in milliseconds dedicated to scouting for a router before failing.
Expand Down Expand Up @@ -171,6 +196,27 @@
},
},

// /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config for overwritten values)
// qos: {
// /// Overwrite QoS options for PUT and DELETE messages
// publication: [
// {
// /// PUT and DELETE messages on key expressions that are included by these key expressions
// /// will have their QoS options overwritten by the given config.
// key_exprs: ["demo/**", "example/key"],
// /// Configurations that will be applied on the publisher.
// /// Options that are supplied here will overwrite the configuration given in Zenoh API
// config: {
// congestion_control: "block",
// priority: "data_high",
// express: true,
// reliability: "best_effort",
// allowed_destination: "remote",
// },
// },
// ],
// },

// /// The declarations aggregation strategy.
// aggregation: {
// /// A list of key-expressions for which all included subscribers will be aggregated into.
Expand Down Expand Up @@ -212,6 +258,7 @@
// "messages": [
// "put", "delete", "declare_subscriber",
// "query", "reply", "declare_queryable",
// "liveliness_token", "liveliness_query", "declare_liveliness_subscriber",
// ],
// "flows":["egress","ingress"],
// "permission": "allow",
Expand Down Expand Up @@ -296,6 +343,8 @@
transport: {
unicast: {
/// Timeout in milliseconds when opening a link
open_timeout: 10000,
/// Timeout in milliseconds when accepting a link
accept_timeout: 10000,
/// Maximum number of zenoh session in pending state while accepting
accept_pending: 100,
Expand Down Expand Up @@ -398,6 +447,8 @@
drop: {
/// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available.
wait_before_drop: 1000,
/// The maximum deadline limit for multi-fragment messages.
max_wait_before_drop_fragments: 50000,
},
/// Behavior pushing CongestionControl::Block messages to the queue.
block: {
Expand Down Expand Up @@ -453,7 +504,26 @@
// This could be dangerous because your CA can have signed a server cert for foo.com, that's later being used to host a server at baz.com.
// If you want your ca to verify that the server at baz.com is actually baz.com, let this be true (default).
verify_name_on_connect: true,
// Whether or not to close links when remote certificates expires.
// If set to true, links that require certificates (tls/quic) will automatically disconnect when the time of expiration of the remote certificate chain is reached
// note that mTLS (client authentication) is required for a listener to disconnect a client on expiration
close_link_on_expiration: false,
/// Optional configuration for TCP system buffers sizes for TLS links
///
/// Configure TCP read buffer size (bytes)
// so_rcvbuf: 123456,
/// Configure TCP write buffer size (bytes)
// so_sndbuf: 123456,
},
// // Configure optional TCP link specific parameters
// tcp: {
// /// Optional configuration for TCP system buffers sizes for TCP links
// ///
// /// Configure TCP read buffer size (bytes)
// // so_rcvbuf: 123456,
// /// Configure TCP write buffer size (bytes)
// // so_sndbuf: 123456,
// }
},
/// Shared memory configuration.
/// NOTE: shared memory can be used only if zenoh is compiled with "shared-memory" feature, otherwise
Expand Down
11 changes: 1 addition & 10 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@ AttachmentData::AttachmentData(
const std::array<uint8_t, RMW_GID_STORAGE_SIZE> source_gid)
: sequence_number_(sequence_number),
source_timestamp_(source_timestamp),
source_gid_(source_gid),
gid_hash_(hash_gid(source_gid))
source_gid_(source_gid)
{
}

AttachmentData::AttachmentData(AttachmentData && data)
{
gid_hash_ = std::move(data.gid_hash_);
sequence_number_ = std::move(data.sequence_number_);
source_timestamp_ = std::move(data.source_timestamp_);
source_gid_ = data.source_gid_;
Expand All @@ -66,12 +64,6 @@ std::array<uint8_t, RMW_GID_STORAGE_SIZE> AttachmentData::copy_gid() const
return source_gid_;
}

///=============================================================================
size_t AttachmentData::gid_hash() const
{
return gid_hash_;
}

zenoh::Bytes AttachmentData::serialize_to_zbytes()
{
auto serializer = zenoh::ext::Serializer();
Expand Down Expand Up @@ -104,6 +96,5 @@ AttachmentData::AttachmentData(const zenoh::Bytes & bytes)
throw std::runtime_error("source_gid is not found in the attachment.");
}
this->source_gid_ = deserializer.deserialize<std::array<uint8_t, RMW_GID_STORAGE_SIZE>>();
gid_hash_ = hash_gid(this->source_gid_);
}
} // namespace rmw_zenoh_cpp
2 changes: 0 additions & 2 deletions rmw_zenoh_cpp/src/detail/attachment_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@ class AttachmentData final
int64_t sequence_number() const;
int64_t source_timestamp() const;
std::array<uint8_t, RMW_GID_STORAGE_SIZE> copy_gid() const;
size_t gid_hash() const;

zenoh::Bytes serialize_to_zbytes();

private:
int64_t sequence_number_;
int64_t source_timestamp_;
std::array<uint8_t, RMW_GID_STORAGE_SIZE> source_gid_;
size_t gid_hash_;
};
} // namespace rmw_zenoh_cpp

Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
constexpr int64_t ticks_between_print(std::chrono::milliseconds(1000) / sleep_time);
do {
zenoh::ZResult result;
this->session_->get_routers_z_id(&result);
if (result == Z_OK) {
const auto zids = this->session_->get_routers_z_id(&result);
if (result == Z_OK && !zids.empty()) {
break;
}
if ((connection_attempts % ticks_between_print) == 0) {
Expand Down
5 changes: 0 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ SubscriptionData::Message::Message(
{
}

///=============================================================================
SubscriptionData::Message::~Message()
{
}

///=============================================================================
std::shared_ptr<SubscriptionData> SubscriptionData::make(
std::shared_ptr<zenoh::Session> session,
Expand Down
2 changes: 1 addition & 1 deletion rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
uint64_t recv_ts,
AttachmentData && attachment);

~Message();
~Message() = default;

Payload payload;
uint64_t recv_timestamp;
Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,7 @@ rmw_serialize(
auto data_length = tss.get_estimated_serialized_size(ros_message, callbacks);
if (serialized_message->buffer_capacity < data_length) {
if (rmw_serialized_message_resize(serialized_message, data_length) != RMW_RET_OK) {
rmw_reset_error();
RMW_SET_ERROR_MSG("unable to dynamically resize serialized message");
return RMW_RET_ERROR;
}
Expand Down
6 changes: 4 additions & 2 deletions zenoh_cpp_vendor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ set(ZENOHC_CARGO_FLAGS "--no-default-features$<SEMICOLON>--features=shared-memor
# Set VCS_VERSION to include latest changes from zenoh/zenoh-c to benefit from :
# - https://github.com/eclipse-zenoh/zenoh/pull/1685 (Fix deadlock in advanced subscription undeclaration)
# - https://github.com/eclipse-zenoh/zenoh/pull/1696 (Fix SHM Garbage Collection (GC) policy)
# - https://github.com/eclipse-zenoh/zenoh/pull/1708 (Fix gossip with TLS endpoints)
# - https://github.com/eclipse-zenoh/zenoh/pull/1717 (Improve performance of a large number of peers)
ament_vendor(zenoh_c_vendor
VCS_URL https://github.com/eclipse-zenoh/zenoh-c.git
VCS_VERSION 61d8fcc136ce4ed36d921a32244da4f3d81a6097
VCS_VERSION 328736fe9bb9b654b1d9f47eecfc6d52f0d7d587
CMAKE_ARGS
"-DZENOHC_CARGO_FLAGS=${ZENOHC_CARGO_FLAGS}"
"-DZENOHC_BUILD_WITH_UNSTABLE_API=TRUE"
Expand All @@ -36,7 +38,7 @@ ament_export_dependencies(zenohc)
# - https://github.com/eclipse-zenoh/zenoh-cpp/pull/363 (Fix memory leak in string deserialization)
ament_vendor(zenoh_cpp_vendor
VCS_URL https://github.com/eclipse-zenoh/zenoh-cpp
VCS_VERSION 05942637c29d3346ad18bab5a178aeebf4be5d62
VCS_VERSION bbfef04e843289aae70b5aa060a925e8ee5b1b6f
CMAKE_ARGS
-DZENOHCXX_ZENOHC=OFF
)
Expand Down

0 comments on commit 29040ff

Please sign in to comment.