Skip to content

Commit

Permalink
Support zero-copy transmissions in NTS
Browse files Browse the repository at this point in the history
  • Loading branch information
smtrfnv authored Dec 12, 2023
1 parent a512761 commit 87a2533
Show file tree
Hide file tree
Showing 46 changed files with 3,175 additions and 1,043 deletions.
3 changes: 2 additions & 1 deletion groups/ntc/ntcd/ntcd_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,8 @@ class Session : public ntsi::DatagramSocket,
/// Read data from the socket error queue. Then if the specified
/// 'notifications' is not null parse fetched data to extract control
/// messages into the specified 'notifications'. Return the error.
ntsa::Error receiveNotifications(ntsa::NotificationQueue* notifications);
ntsa::Error receiveNotifications(ntsa::NotificationQueue* notifications)
BSLS_KEYWORD_OVERRIDE;

/// Shutdown the stream socket in the specified 'direction'. Return the
/// error.
Expand Down
13 changes: 13 additions & 0 deletions groups/nts/ntsa/ntsa_notification.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ Notification::Notification(const Notification& original)
new (d_timestamp.buffer())
ntsa::Timestamp(original.d_timestamp.object());
break;
case (ntsa::NotificationType::e_ZERO_COPY):
new (d_zeroCopy.buffer()) ntsa::ZeroCopy(original.d_zeroCopy.object());
break;
default:
BSLS_ASSERT(d_type == ntsa::NotificationType::e_UNDEFINED);
}
Expand All @@ -50,6 +53,9 @@ Notification& Notification::operator=(const Notification& other)
case (ntsa::NotificationType::e_TIMESTAMP):
new (d_timestamp.buffer()) ntsa::Timestamp(other.d_timestamp.object());
break;
case (ntsa::NotificationType::e_ZERO_COPY):
new (d_zeroCopy.buffer()) ntsa::ZeroCopy(other.d_zeroCopy.object());
break;
default:
BSLS_ASSERT(d_type == ntsa::NotificationType::e_UNDEFINED);
}
Expand All @@ -66,6 +72,8 @@ bool Notification::equals(const Notification& other) const
switch (d_type) {
case ntsa::NotificationType::e_TIMESTAMP:
return d_timestamp.object() == other.d_timestamp.object();
case ntsa::NotificationType::e_ZERO_COPY:
return d_zeroCopy.object() == other.d_zeroCopy.object();
default:
return true;
}
Expand All @@ -80,6 +88,8 @@ bool Notification::less(const Notification& other) const
switch (d_type) {
case ntsa::NotificationType::e_TIMESTAMP:
return d_timestamp.object() < other.d_timestamp.object();
case ntsa::NotificationType::e_ZERO_COPY:
return d_zeroCopy.object() < other.d_zeroCopy.object();
default:
return true;
}
Expand All @@ -93,6 +103,9 @@ bsl::ostream& Notification::print(bsl::ostream& stream,
case ntsa::NotificationType::e_TIMESTAMP:
d_timestamp.object().print(stream, level, spacesPerLevel);
break;
case ntsa::NotificationType::e_ZERO_COPY:
d_zeroCopy.object().print(stream, level, spacesPerLevel);
break;
default:
BSLS_ASSERT(d_type == ntsa::NotificationType::e_UNDEFINED);
stream << "UNDEFINED";
Expand Down
134 changes: 98 additions & 36 deletions groups/nts/ntsa/ntsa_notification.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ BSLS_IDENT("$Id: $")

#include <ntsa_notificationtype.h>
#include <ntsa_timestamp.h>
#include <ntsa_zerocopy.h>
#include <ntscfg_platform.h>
#include <ntsscm_version.h>
#include <bsls_objectbuffer.h>
Expand All @@ -31,8 +32,8 @@ namespace ntsa {
/// Provide a union of notifications.
///
/// @details
/// Provide a value-semantic type that represents a discriminated
/// union of notifications.
/// Provide a value-semantic type that represents a discriminated union of
/// notifications.
//
/// @par Thread Safety
/// This class is not thread safe.
Expand All @@ -42,6 +43,7 @@ class Notification
{
union {
bsls::ObjectBuffer<ntsa::Timestamp> d_timestamp;
bsls::ObjectBuffer<ntsa::ZeroCopy> d_zeroCopy;
};

ntsa::NotificationType::Value d_type;
Expand All @@ -50,19 +52,18 @@ class Notification
/// Create a new notification having an undefined type.
Notification();

/// Create a new notification the same value as the specified 'other'
/// object.
/// Create a new notification having the same value as the specified
/// 'other' object.
Notification(const Notification& other);

/// Destroy this object.
~Notification();

/// Assign the value of the specified 'other' object to this object.
/// Return a reference to this modifiable object.
/// Assign the value of the specified 'other' object to this object. Return
/// a reference to this modifiable object.
Notification& operator=(const Notification& other);

/// Reset the value of this object to its value upon default
/// construction.
/// Reset the value of this object to its value upon default construction.
void reset();

/// Select the "timestamp" representation. Return a reference to the
Expand All @@ -71,54 +72,69 @@ class Notification

/// Select the "timestamp" representation initially having the specified
/// 'value'. Return a reference to the modifiable representation.
ntsa::Timestamp& makeTimestamp(const ntsa::Timestamp& ts);
ntsa::Timestamp& makeTimestamp(const ntsa::Timestamp& value);

/// Return a reference to the modifiable "timestamp" representation.
/// The behavior is undefined unless 'isTimestamp()' is true.
/// Select the "zeroCopy" representation. Return a reference to the
/// modifiable representation.
ntsa::ZeroCopy& makeZeroCopy();

/// Select the "zeroCopy" representation initially having the specified
/// 'value'. Return a reference to the modifiable representation.
ntsa::ZeroCopy& makeZeroCopy(const ntsa::ZeroCopy& value);

/// Return a reference to the "timestamp" representation. The behavior is
/// undefined unless 'isTimestamp()' is true.
const ntsa::Timestamp& timestamp() const;

/// Return a reference to the "zeroCopy" representation. The behavior is
/// undefined unless 'isZeroCopy()' is true.
const ntsa::ZeroCopy& zeroCopy() const;

/// Return the type of the notification representation.
ntsa::NotificationType::Value type() const;

/// Return true if the "timestamp" representation is currently selected,
/// otherwise return false.
bool isTimestamp() const;

/// Return true if the "zeroCopy" representation is currently selected,
/// otherwise return false.
bool isZeroCopy() const;

/// Return true if the notification representation is undefined, otherwise
/// return false.
bool isUndefined() const;

/// Return true if this object has the same value as the specified
/// 'other' object, otherwise return false.
/// Return true if this object has the same value as the specified 'other'
/// object, otherwise return false.
bool equals(const Notification& other) const;

/// Return true if the value of this object is less than the value of
/// the specified 'other' object, otherwise return false.
/// Return true if the value of this object is less than the value of the
/// specified 'other' object, otherwise return false.
bool less(const Notification& other) const;

/// Format this object to the specified output 'stream' at the
/// optionally specified indentation 'level' and return a reference to
/// the modifiable 'stream'. If 'level' is specified, optionally
/// specify 'spacesPerLevel', the number of spaces per indentation level
/// for this and all of its nested objects. Each line is indented by
/// the absolute value of 'level * spacesPerLevel'. If 'level' is
/// negative, suppress indentation of the first line. If
/// 'spacesPerLevel' is negative, suppress line breaks and format the
/// entire output on one line. If 'stream' is initially invalid, this
/// operation has no effect. Note that a trailing newline is provided
/// in multiline mode only.
/// Format this object to the specified output 'stream' at the optionally
/// specified indentation 'level' and return a reference to the modifiable
/// 'stream'. If 'level' is specified, optionally specify
/// 'spacesPerLevel', the number of spaces per indentation level for this
/// and all of its nested objects. Each line is indented by the absolute
/// value of 'level * spacesPerLevel'. If 'level' is negative, suppress
/// indentation of the first line. If 'spacesPerLevel' is negative,
/// suppress line breaks and format the entire output on one line. If
/// 'stream' is initially invalid, this operation has no effect. Note that
/// a trailing newline is provided in multiline mode only.
bsl::ostream& print(bsl::ostream& stream,
int level = 0,
int spacesPerLevel = 4) const;

/// Defines the traits of this type. These traits can be used to select,
/// at compile-time, the most efficient algorithm to manipulate objects
/// of this type.
/// Defines the traits of this type. These traits can be used to select, at
/// compile-time, the most efficient algorithm to manipulate objects of
/// this type.
NTSCFG_DECLARE_NESTED_BITWISE_MOVABLE_TRAITS(Notification);
};

/// Write the specified 'object' to the specified 'stream'. Return a
/// modifiable reference to the 'stream'.
/// Write the specified 'object' to the specified 'stream'. Return a modifiable
/// reference to the 'stream'.
///
/// @related ntsa::Notification
bsl::ostream& operator<<(bsl::ostream& stream, const Notification& object);
Expand All @@ -141,8 +157,8 @@ bool operator!=(const Notification& lhs, const Notification& rhs);
/// @related ntsa::Notification
bool operator<(const Notification& lhs, const Notification& rhs);

/// Contribute the values of the salient attributes of the specified 'value'
/// to the specified hash 'algorithm'.
/// Contribute the values of the salient attributes of the specified 'value' to
/// the specified hash 'algorithm'.
///
/// @related ntsa::Notification
template <typename HASH_ALGORITHM>
Expand Down Expand Up @@ -181,27 +197,64 @@ ntsa::Timestamp& Notification::makeTimestamp()
}

NTSCFG_INLINE
ntsa::Timestamp& Notification::makeTimestamp(const ntsa::Timestamp& ts)
ntsa::Timestamp& Notification::makeTimestamp(const ntsa::Timestamp& value)
{
if (d_type == ntsa::NotificationType::e_TIMESTAMP) {
d_timestamp.object() = ts;
d_timestamp.object() = value;
}
else {
this->reset();
new (d_timestamp.buffer()) ntsa::Timestamp(ts);
new (d_timestamp.buffer()) ntsa::Timestamp(value);
d_type = ntsa::NotificationType::e_TIMESTAMP;
}

return d_timestamp.object();
}

NTSCFG_INLINE
ntsa::ZeroCopy& Notification::makeZeroCopy()
{
if (d_type == ntsa::NotificationType::e_ZERO_COPY) {
d_zeroCopy.object() = ntsa::ZeroCopy();
}
else {
this->reset();
new (d_zeroCopy.buffer()) ntsa::ZeroCopy();
d_type = ntsa::NotificationType::e_ZERO_COPY;
}

return d_zeroCopy.object();
}

NTSCFG_INLINE
ntsa::ZeroCopy& Notification::makeZeroCopy(const ntsa::ZeroCopy& value)
{
if (d_type == ntsa::NotificationType::e_ZERO_COPY) {
d_zeroCopy.object() = value;
}
else {
this->reset();
new (d_zeroCopy.buffer()) ntsa::ZeroCopy(value);
d_type = ntsa::NotificationType::e_ZERO_COPY;
}

return d_zeroCopy.object();
}

NTSCFG_INLINE
const ntsa::Timestamp& Notification::timestamp() const
{
BSLS_ASSERT(d_type == ntsa::NotificationType::e_TIMESTAMP);
return d_timestamp.object();
}

NTSCFG_INLINE
const ntsa::ZeroCopy& Notification::zeroCopy() const
{
BSLS_ASSERT(d_type == ntsa::NotificationType::e_ZERO_COPY);
return d_zeroCopy.object();
}

NTSCFG_INLINE
ntsa::NotificationType::Value Notification::type() const
{
Expand All @@ -214,6 +267,12 @@ bool Notification::isTimestamp() const
return d_type == ntsa::NotificationType::e_TIMESTAMP;
}

NTSCFG_INLINE
bool Notification::isZeroCopy() const
{
return d_type == ntsa::NotificationType::e_ZERO_COPY;
}

NTSCFG_INLINE
bool Notification::isUndefined() const
{
Expand All @@ -228,6 +287,9 @@ void hashAppend(HASH_ALGORITHM& algorithm, const Notification& value)
if (value.isTimestamp()) {
hashAppend(algorithm, value.timestamp());
}
else if (value.isZeroCopy()) {
hashAppend(algorithm, value.zeroCopy());
}
}

} // close package namespace
Expand Down
44 changes: 44 additions & 0 deletions groups/nts/ntsa/ntsa_notification.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ NTSCFG_TEST_CASE(1)
Notification n;
NTSCFG_TEST_TRUE(n.isUndefined());
NTSCFG_TEST_FALSE(n.isTimestamp());
NTSCFG_TEST_FALSE(n.isZeroCopy());
NTSCFG_TEST_EQ(n.type(), NotificationType::e_UNDEFINED);

Timestamp& ts = n.makeTimestamp();
NTSCFG_TEST_TRUE(n.isTimestamp());
NTSCFG_TEST_FALSE(n.isUndefined());
NTSCFG_TEST_FALSE(n.isZeroCopy());
NTSCFG_TEST_EQ(n.type(), NotificationType::e_TIMESTAMP);

ts.setId(id);
Expand Down Expand Up @@ -90,6 +92,9 @@ NTSCFG_TEST_CASE(4)
n1.makeTimestamp();

NTSCFG_TEST_NE(n1, n2);

n2.makeZeroCopy();
NTSCFG_TEST_NE(n1, n2);
}

NTSCFG_TEST_CASE(5)
Expand All @@ -112,12 +117,51 @@ NTSCFG_TEST_CASE(5)
NTSCFG_TEST_EQ(n2.timestamp(), t);
}

NTSCFG_TEST_CASE(6)
{
Notification n;
NTSCFG_TEST_TRUE(n.isUndefined());
NTSCFG_TEST_FALSE(n.isZeroCopy());
NTSCFG_TEST_EQ(n.type(), NotificationType::e_UNDEFINED);

ZeroCopy& zc = n.makeZeroCopy();
NTSCFG_TEST_TRUE(n.isZeroCopy());
NTSCFG_TEST_FALSE(n.isUndefined());
NTSCFG_TEST_FALSE(n.isTimestamp());
NTSCFG_TEST_EQ(n.type(), NotificationType::e_ZERO_COPY);

zc.setFrom(1);
zc.setTo(22);
zc.setCode(1);
NTSCFG_TEST_EQ(n.zeroCopy(), zc);
}

NTSCFG_TEST_CASE(7)
{
ZeroCopy zc;
zc.setFrom(1);
zc.setTo(22);
zc.setCode(1);

Notification n;
n.makeZeroCopy(zc);

NTSCFG_TEST_TRUE(n.isZeroCopy());

n.reset();
NTSCFG_TEST_TRUE(n.isUndefined());
NTSCFG_TEST_FALSE(n.isZeroCopy());
NTSCFG_TEST_EQ(n.type(), NotificationType::e_UNDEFINED);
}

NTSCFG_TEST_DRIVER
{
NTSCFG_TEST_REGISTER(1);
NTSCFG_TEST_REGISTER(2);
NTSCFG_TEST_REGISTER(3);
NTSCFG_TEST_REGISTER(4);
NTSCFG_TEST_REGISTER(5);
NTSCFG_TEST_REGISTER(6);
NTSCFG_TEST_REGISTER(7);
}
NTSCFG_TEST_DRIVER_END;
Loading

0 comments on commit 87a2533

Please sign in to comment.