diff --git a/worker/fuzzer/src/RTC/FuzzerRtpPacket.cpp b/worker/fuzzer/src/RTC/FuzzerRtpPacket.cpp index 4167e371d5..0536298b33 100644 --- a/worker/fuzzer/src/RTC/FuzzerRtpPacket.cpp +++ b/worker/fuzzer/src/RTC/FuzzerRtpPacket.cpp @@ -179,9 +179,7 @@ void Fuzzer::RTC::RtpPacket::Fuzz(const uint8_t* data, size_t len) packet->GetPayloadPadding(); packet->IsKeyFrame(); - auto* clonedPacket = packet->Clone(); - - delete clonedPacket; + auto clonedPacket = packet->Clone(); // TODO: packet->RtxEncode(); // This cannot be tested this way. // TODO: packet->RtxDecode(); // This cannot be tested this way. @@ -190,5 +188,5 @@ void Fuzzer::RTC::RtpPacket::Fuzz(const uint8_t* data, size_t len) // TODO: packet->ProcessPayload(); // TODO: packet->ShiftPayload(); - delete packet; + ::RTC::RtpPacket::Deallocate(packet); } diff --git a/worker/fuzzer/src/RTC/FuzzerRtpStreamSend.cpp b/worker/fuzzer/src/RTC/FuzzerRtpStreamSend.cpp index 88f8a806fb..b8c6d98e52 100644 --- a/worker/fuzzer/src/RTC/FuzzerRtpStreamSend.cpp +++ b/worker/fuzzer/src/RTC/FuzzerRtpStreamSend.cpp @@ -60,5 +60,6 @@ void Fuzzer::RTC::RtpStreamSend::Fuzz(const uint8_t* data, size_t len) } delete stream; - delete packet; + + ::RTC::RtpPacket::Deallocate(packet); } diff --git a/worker/include/ObjectPoolAllocator.hpp b/worker/include/ObjectPoolAllocator.hpp new file mode 100644 index 0000000000..72f045dfb4 --- /dev/null +++ b/worker/include/ObjectPoolAllocator.hpp @@ -0,0 +1,89 @@ +#ifndef MS_OBJECT_POOL_ALLOCATOR_HPP +#define MS_OBJECT_POOL_ALLOCATOR_HPP + +// #define MS_ALLOCATOR_FREE_ON_RETURN 1 + +#include "common.hpp" +#include + +namespace Utils +{ + // Simple implementation of object pool only for single objects. + // Arrays are allocated as usual. + template + class ObjectPoolAllocator + { + std::shared_ptr> pool_data; + + public: + typedef T value_type; + thread_local static Utils::ObjectPoolAllocator Pool; + + ObjectPoolAllocator() + { + pool_data = std::shared_ptr>( + new std::vector(), + [](std::vector* pool) + { + for (auto* ptr : *pool) + { + std::free(ptr); + } + delete pool; + }); + } + + template + ObjectPoolAllocator(const ObjectPoolAllocator& other) + : pool_data(ObjectPoolAllocator::Pool.pool_data) + { + } + + ~ObjectPoolAllocator() + { + } + + T* allocate(size_t n) + { + if (n > 1) + { + return static_cast(std::malloc(sizeof(T) * n)); + } + + if (this->pool_data->empty()) + { + return static_cast(std::malloc(sizeof(T))); + } + + T* ptr = this->pool_data->back(); + this->pool_data->pop_back(); + + return ptr; + } + + void deallocate(T* ptr, size_t n) + { + if (!ptr) + { + return; + } + + if (n > 1) + { + std::free(ptr); + return; + } + +#ifdef MS_ALLOCATOR_FREE_ON_RETURN + std::free(ptr); +#else + this->pool_data->push_back(ptr); +#endif + } + }; + + template + thread_local Utils::ObjectPoolAllocator Utils::ObjectPoolAllocator::Pool; +} // namespace Utils + +#endif diff --git a/worker/include/RTC/DirectTransport.hpp b/worker/include/RTC/DirectTransport.hpp index f8325ef07c..2a820e8354 100644 --- a/worker/include/RTC/DirectTransport.hpp +++ b/worker/include/RTC/DirectTransport.hpp @@ -20,7 +20,8 @@ namespace RTC void SendRtpPacket( RTC::Consumer* consumer, RTC::RtpPacket* packet, - RTC::Transport::onSendCallback* cb = nullptr) override; + RTC::Transport::onSendCallback* cb = nullptr, + RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override; void SendRtcpPacket(RTC::RTCP::Packet* packet) override; void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override; void SendMessage( diff --git a/worker/include/RTC/PipeTransport.hpp b/worker/include/RTC/PipeTransport.hpp index be6bee3cf8..256f2739c0 100644 --- a/worker/include/RTC/PipeTransport.hpp +++ b/worker/include/RTC/PipeTransport.hpp @@ -44,7 +44,8 @@ namespace RTC void SendRtpPacket( RTC::Consumer* consumer, RTC::RtpPacket* packet, - RTC::Transport::onSendCallback* cb = nullptr) override; + RTC::Transport::onSendCallback* cb = nullptr, + RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override; void SendRtcpPacket(RTC::RTCP::Packet* packet) override; void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override; void SendMessage( diff --git a/worker/include/RTC/PlainTransport.hpp b/worker/include/RTC/PlainTransport.hpp index b9609874cf..b464c296f8 100644 --- a/worker/include/RTC/PlainTransport.hpp +++ b/worker/include/RTC/PlainTransport.hpp @@ -45,7 +45,8 @@ namespace RTC void SendRtpPacket( RTC::Consumer* consumer, RTC::RtpPacket* packet, - RTC::Transport::onSendCallback* cb = nullptr) override; + RTC::Transport::onSendCallback* cb = nullptr, + RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override; void SendRtcpPacket(RTC::RTCP::Packet* packet) override; void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override; void SendMessage( diff --git a/worker/include/RTC/RtpPacket.hpp b/worker/include/RTC/RtpPacket.hpp index 0dece98d5b..ec4a214b00 100644 --- a/worker/include/RTC/RtpPacket.hpp +++ b/worker/include/RTC/RtpPacket.hpp @@ -2,6 +2,7 @@ #define MS_RTC_RTP_PACKET_HPP #include "common.hpp" +#include "ObjectPoolAllocator.hpp" #include "Utils.hpp" #include "RTC/Codecs/PayloadDescriptorHandler.hpp" #include @@ -23,6 +24,15 @@ namespace RTC class RtpPacket { public: + using Allocator = Utils::ObjectPoolAllocator; + using AllocatorTraits = std::allocator_traits; + // Memory to hold the cloned packet (with extra space for RTX encoding). + using RtpPacketBuffer = std::array; + using BufferAllocator = Utils::ObjectPoolAllocator; + using BufferAllocatorTraits = std::allocator_traits; + + static void Deallocate(RtpPacket* packet); + /* Struct for RTP header. */ struct Header { @@ -133,7 +143,7 @@ namespace RTC static RtpPacket* Parse(const uint8_t* data, size_t len); - private: + public: RtpPacket( Header* header, HeaderExtension* headerExtension, @@ -142,7 +152,6 @@ namespace RTC uint8_t payloadPadding, size_t size); - public: ~RtpPacket(); void Dump() const; @@ -589,7 +598,7 @@ namespace RTC return this->payloadDescriptorHandler->IsKeyFrame(); } - RtpPacket* Clone() const; + std::shared_ptr Clone() const; void RtxEncode(uint8_t payloadType, uint32_t ssrc, uint16_t seq); @@ -635,7 +644,7 @@ namespace RTC std::shared_ptr payloadDescriptorHandler; // Buffer where this packet is allocated, can be `nullptr` if packet was // parsed from externally provided buffer. - uint8_t* buffer{ nullptr }; + RtpPacketBuffer* buffer{ nullptr }; }; } // namespace RTC diff --git a/worker/include/RTC/RtpStreamSend.hpp b/worker/include/RTC/RtpStreamSend.hpp index e4b7e43f28..2bfb0fdcec 100644 --- a/worker/include/RTC/RtpStreamSend.hpp +++ b/worker/include/RTC/RtpStreamSend.hpp @@ -1,6 +1,7 @@ #ifndef MS_RTC_RTP_STREAM_SEND_HPP #define MS_RTC_RTP_STREAM_SEND_HPP +#include "ObjectPoolAllocator.hpp" #include "RTC/RateCalculator.hpp" #include "RTC/RtpStream.hpp" #include @@ -26,6 +27,9 @@ namespace RTC public: struct StorageItem { + using Allocator = Utils::ObjectPoolAllocator; + using AllocatorTraits = std::allocator_traits; + void Reset(); // Original packet. diff --git a/worker/include/RTC/TcpConnection.hpp b/worker/include/RTC/TcpConnection.hpp index 0aa17ebb62..420389297e 100644 --- a/worker/include/RTC/TcpConnection.hpp +++ b/worker/include/RTC/TcpConnection.hpp @@ -24,7 +24,11 @@ namespace RTC ~TcpConnection() override; public: - void Send(const uint8_t* data, size_t len, ::TcpConnectionHandler::onSendCallback* cb); + void Send( + const uint8_t* data, + size_t len, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx); /* Pure virtual methods inherited from ::TcpConnectionHandler. */ public: diff --git a/worker/include/RTC/Transport.hpp b/worker/include/RTC/Transport.hpp index e8e97247e5..198feb27b6 100644 --- a/worker/include/RTC/Transport.hpp +++ b/worker/include/RTC/Transport.hpp @@ -4,6 +4,7 @@ #include "common.hpp" #include "DepLibUV.hpp" +#include "ObjectPoolAllocator.hpp" #include "Channel/ChannelRequest.hpp" #include "Channel/ChannelSocket.hpp" #include "PayloadChannel/PayloadChannelNotification.hpp" @@ -51,9 +52,31 @@ namespace RTC public Timer::Listener { protected: - using onSendCallback = const std::function; using onQueuedCallback = const std::function; + public: +#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR + struct OnSendCallbackCtx + { + RTC::TransportCongestionControlClient* tccClient; + webrtc::RtpPacketSendInfo packetInfo; + RTC::SenderBandwidthEstimator* senderBwe; + RTC::SenderBandwidthEstimator::SentInfo sentInfo; + }; +#else + struct OnSendCallbackCtx + { + using Allocator = Utils::ObjectPoolAllocator; + using AllocatorTraits = std::allocator_traits; + + RTC::TransportCongestionControlClient* tccClient; + webrtc::RtpPacketSendInfo packetInfo; + }; +#endif + // This function MUST NOT be de-allocated manually and MUST be called EXACTLY once. + static void OnSendCallback(bool sent, OnSendCallbackCtx* ctx); + using onSendCallback = void(bool sent, OnSendCallbackCtx* ctx); + public: class Listener { @@ -170,7 +193,10 @@ namespace RTC private: virtual bool IsConnected() const = 0; virtual void SendRtpPacket( - RTC::Consumer* consumer, RTC::RtpPacket* packet, onSendCallback* cb = nullptr) = 0; + RTC::Consumer* consumer, + RTC::RtpPacket* packet, + onSendCallback* cb = nullptr, + OnSendCallbackCtx* ctx = nullptr) = 0; void HandleRtcpPacket(RTC::RTCP::Packet* packet); void SendRtcp(uint64_t nowMs); virtual void SendRtcpPacket(RTC::RTCP::Packet* packet) = 0; diff --git a/worker/include/RTC/TransportTuple.hpp b/worker/include/RTC/TransportTuple.hpp index 535d221a68..07d77fd471 100644 --- a/worker/include/RTC/TransportTuple.hpp +++ b/worker/include/RTC/TransportTuple.hpp @@ -4,6 +4,7 @@ #include "common.hpp" #include "Utils.hpp" #include "RTC/TcpConnection.hpp" +#include "RTC/Transport.hpp" #include "RTC/UdpSocket.hpp" #include #include @@ -14,9 +15,6 @@ namespace RTC { class TransportTuple { - protected: - using onSendCallback = const std::function; - public: enum class Protocol { @@ -85,12 +83,16 @@ namespace RTC this->localAnnouncedIp = localAnnouncedIp; } - void Send(const uint8_t* data, size_t len, RTC::TransportTuple::onSendCallback* cb = nullptr) + void Send( + const uint8_t* data, + size_t len, + Transport::onSendCallback* cb = nullptr, + Transport::OnSendCallbackCtx* ctx = nullptr) { if (this->protocol == Protocol::UDP) - this->udpSocket->Send(data, len, this->udpRemoteAddr, cb); + this->udpSocket->Send(data, len, this->udpRemoteAddr, cb, ctx); else - this->tcpConnection->Send(data, len, cb); + this->tcpConnection->Send(data, len, cb, ctx); } Protocol GetProtocol() const diff --git a/worker/include/RTC/WebRtcTransport.hpp b/worker/include/RTC/WebRtcTransport.hpp index 1fd308d1dd..efe2e4a176 100644 --- a/worker/include/RTC/WebRtcTransport.hpp +++ b/worker/include/RTC/WebRtcTransport.hpp @@ -80,7 +80,8 @@ namespace RTC void SendRtpPacket( RTC::Consumer* consumer, RTC::RtpPacket* packet, - RTC::Transport::onSendCallback* cb = nullptr) override; + RTC::Transport::onSendCallback* cb = nullptr, + RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override; void SendRtcpPacket(RTC::RTCP::Packet* packet) override; void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override; void SendMessage( diff --git a/worker/include/handles/TcpConnectionHandler.hpp b/worker/include/handles/TcpConnectionHandler.hpp index 0b536225ba..7ef163c05c 100644 --- a/worker/include/handles/TcpConnectionHandler.hpp +++ b/worker/include/handles/TcpConnectionHandler.hpp @@ -2,6 +2,7 @@ #define MS_TCP_CONNECTION_HPP #include "common.hpp" +#include "RTC/Transport.hpp" #include #include @@ -35,12 +36,12 @@ class TcpConnectionHandler ~UvWriteData() { delete[] this->store; - delete this->cb; } uv_write_t req; uint8_t* store{ nullptr }; - TcpConnectionHandler::onSendCallback* cb{ nullptr }; + RTC::Transport::onSendCallback* cb{ nullptr }; + RTC::Transport::OnSendCallbackCtx* ctx{ nullptr }; }; public: @@ -71,7 +72,8 @@ class TcpConnectionHandler size_t len1, const uint8_t* data2, size_t len2, - TcpConnectionHandler::onSendCallback* cb); + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx); void ErrorReceiving(); const struct sockaddr* GetLocalAddress() const { @@ -117,7 +119,7 @@ class TcpConnectionHandler public: void OnUvReadAlloc(size_t suggestedSize, uv_buf_t* buf); void OnUvRead(ssize_t nread, const uv_buf_t* buf); - void OnUvWrite(int status, onSendCallback* cb); + void OnUvWrite(int status, RTC::Transport::onSendCallback* cb, RTC::Transport::OnSendCallbackCtx* ctx); /* Pure virtual methods that must be implemented by the subclass. */ protected: diff --git a/worker/include/handles/UdpSocketHandler.hpp b/worker/include/handles/UdpSocketHandler.hpp index 188f23b4fa..7b4cbd1b8c 100644 --- a/worker/include/handles/UdpSocketHandler.hpp +++ b/worker/include/handles/UdpSocketHandler.hpp @@ -2,14 +2,12 @@ #define MS_UDP_SOCKET_HPP #include "common.hpp" +#include "RTC/Transport.hpp" #include #include class UdpSocketHandler { -protected: - using onSendCallback = const std::function; - public: /* Struct for the data field of uv_req_t when sending a datagram. */ struct UvSendData @@ -25,12 +23,12 @@ class UdpSocketHandler ~UvSendData() { delete[] this->store; - delete this->cb; } uv_udp_send_t req; uint8_t* store{ nullptr }; - UdpSocketHandler::onSendCallback* cb{ nullptr }; + RTC::Transport::onSendCallback* cb{ nullptr }; + RTC::Transport::OnSendCallbackCtx* ctx{ nullptr }; }; public: @@ -50,7 +48,11 @@ class UdpSocketHandler } virtual void Dump() const; void Send( - const uint8_t* data, size_t len, const struct sockaddr* addr, UdpSocketHandler::onSendCallback* cb); + const uint8_t* data, + size_t len, + const struct sockaddr* addr, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx); const struct sockaddr* GetLocalAddress() const { return reinterpret_cast(&this->localAddr); @@ -83,7 +85,7 @@ class UdpSocketHandler public: void OnUvRecvAlloc(size_t suggestedSize, uv_buf_t* buf); void OnUvRecv(ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned int flags); - void OnUvSend(int status, UdpSocketHandler::onSendCallback* cb); + void OnUvSend(int status, RTC::Transport::onSendCallback* cb, RTC::Transport::OnSendCallbackCtx* ctx); /* Pure virtual methods that must be implemented by the subclass. */ protected: diff --git a/worker/src/RTC/DirectTransport.cpp b/worker/src/RTC/DirectTransport.cpp index 9be530fe91..f50910dfac 100644 --- a/worker/src/RTC/DirectTransport.cpp +++ b/worker/src/RTC/DirectTransport.cpp @@ -111,7 +111,10 @@ namespace RTC } void DirectTransport::SendRtpPacket( - RTC::Consumer* consumer, RTC::RtpPacket* packet, RTC::Transport::onSendCallback* cb) + RTC::Consumer* consumer, + RTC::RtpPacket* packet, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); @@ -119,6 +122,11 @@ namespace RTC { MS_WARN_TAG(rtp, "cannot send RTP packet not associated to a Consumer"); + if (cb) + { + (*cb)(false, ctx); + } + return; } @@ -130,8 +138,7 @@ namespace RTC if (cb) { - (*cb)(true); - delete cb; + (*cb)(true, ctx); } // Increase send transmission. diff --git a/worker/src/RTC/PipeTransport.cpp b/worker/src/RTC/PipeTransport.cpp index 0092e001bd..80eaec7315 100644 --- a/worker/src/RTC/PipeTransport.cpp +++ b/worker/src/RTC/PipeTransport.cpp @@ -444,7 +444,10 @@ namespace RTC } void PipeTransport::SendRtpPacket( - RTC::Consumer* /*consumer*/, RTC::RtpPacket* packet, RTC::Transport::onSendCallback* cb) + RTC::Consumer* /*consumer*/, + RTC::RtpPacket* packet, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); @@ -452,8 +455,7 @@ namespace RTC { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -466,8 +468,7 @@ namespace RTC { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -475,7 +476,7 @@ namespace RTC auto len = static_cast(intLen); - this->tuple->Send(data, len, cb); + this->tuple->Send(data, len, cb, ctx); // Increase send transmission. RTC::Transport::DataSent(len); @@ -621,7 +622,7 @@ namespace RTC packet->GetPayloadType(), packet->GetSequenceNumber()); - delete packet; + RtpPacket::Deallocate(packet); } return; @@ -644,7 +645,7 @@ namespace RTC // Remove this SSRC. RecvStreamClosed(packet->GetSsrc()); - delete packet; + RtpPacket::Deallocate(packet); return; } diff --git a/worker/src/RTC/PlainTransport.cpp b/worker/src/RTC/PlainTransport.cpp index e020d21ab3..ce5afedd68 100644 --- a/worker/src/RTC/PlainTransport.cpp +++ b/worker/src/RTC/PlainTransport.cpp @@ -712,7 +712,10 @@ namespace RTC } void PlainTransport::SendRtpPacket( - RTC::Consumer* /*consumer*/, RTC::RtpPacket* packet, RTC::Transport::onSendCallback* cb) + RTC::Consumer* /*consumer*/, + RTC::RtpPacket* packet, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); @@ -720,8 +723,7 @@ namespace RTC { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -734,8 +736,7 @@ namespace RTC { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -743,7 +744,7 @@ namespace RTC auto len = static_cast(intLen); - this->tuple->Send(data, len, cb); + this->tuple->Send(data, len, cb, ctx); // Increase send transmission. RTC::Transport::DataSent(len); @@ -896,7 +897,7 @@ namespace RTC packet->GetPayloadType(), packet->GetSequenceNumber()); - delete packet; + RtpPacket::Deallocate(packet); } return; @@ -921,7 +922,7 @@ namespace RTC // Remove this SSRC. RecvStreamClosed(packet->GetSsrc()); - delete packet; + RtpPacket::Deallocate(packet); return; } @@ -957,7 +958,7 @@ namespace RTC // Remove this SSRC. RecvStreamClosed(packet->GetSsrc()); - delete packet; + RtpPacket::Deallocate(packet); return; } diff --git a/worker/src/RTC/RtpPacket.cpp b/worker/src/RTC/RtpPacket.cpp index f6a9905831..1cc6d85cac 100644 --- a/worker/src/RTC/RtpPacket.cpp +++ b/worker/src/RTC/RtpPacket.cpp @@ -9,6 +9,15 @@ namespace RTC { + /* Static Class methods. */ + + void RtpPacket::Deallocate(RtpPacket* packet) + { + // Destroy and deallocate the RtpPacket. + AllocatorTraits::destroy(Allocator::Pool, packet); + Allocator::Pool.deallocate(packet, 1); + } + /* Class methods. */ RtpPacket* RtpPacket::Parse(const uint8_t* data, size_t len) @@ -117,7 +126,19 @@ namespace RTC payloadLength + size_t{ payloadPadding }, "packet's computed size does not match received size"); - return new RtpPacket(header, headerExtension, payload, payloadLength, payloadPadding, len); + auto* rtpPacket = RtpPacket::Allocator::Pool.allocate(1); + + RtpPacket::AllocatorTraits::construct( + RtpPacket::Allocator::Pool, + rtpPacket, + header, + headerExtension, + payload, + payloadLength, + payloadPadding, + len); + + return rtpPacket; } /* Instance methods. */ @@ -145,9 +166,13 @@ namespace RTC { MS_TRACE(); + // This is a cloned RtpPacket. if (this->buffer) { - delete[] this->buffer; + // Destroy and deallocate the RtpPacket buffer. + BufferAllocatorTraits::destroy(BufferAllocator::Pool, this->buffer); + BufferAllocator::Pool.deallocate(this->buffer, 1); + this->buffer = nullptr; } } @@ -640,12 +665,15 @@ namespace RTC SetPayloadPaddingFlag(false); } - RtpPacket* RtpPacket::Clone() const + std::shared_ptr RtpPacket::Clone() const { MS_TRACE(); - auto* buffer = new uint8_t[MtuSize + 100]; - auto* ptr = const_cast(buffer); + auto* buffer = RtpPacket::BufferAllocator::Pool.allocate(1); + + RtpPacket::BufferAllocatorTraits::construct(RtpPacket::BufferAllocator::Pool, buffer); + + auto* ptr = const_cast(buffer->data()); size_t numBytes{ 0 }; @@ -699,28 +727,34 @@ namespace RTC ptr += size_t{ this->payloadPadding }; } - MS_ASSERT(static_cast(ptr - buffer) == this->size, "ptr - buffer == this->size"); + MS_ASSERT( + static_cast(ptr - buffer->data()) == this->size, "ptr - buffer->data() == this->size"); // Create the new RtpPacket instance and return it. - auto* packet = new RtpPacket( - newHeader, newHeaderExtension, newPayload, this->payloadLength, this->payloadPadding, this->size); - - // Keep already set extension ids. - packet->midExtensionId = this->midExtensionId; - packet->ridExtensionId = this->ridExtensionId; - packet->rridExtensionId = this->rridExtensionId; - packet->absSendTimeExtensionId = this->absSendTimeExtensionId; - packet->transportWideCc01ExtensionId = this->transportWideCc01ExtensionId; - packet->frameMarking07ExtensionId = this->frameMarking07ExtensionId; // Remove once RFC. - packet->frameMarkingExtensionId = this->frameMarkingExtensionId; - packet->ssrcAudioLevelExtensionId = this->ssrcAudioLevelExtensionId; - packet->videoOrientationExtensionId = this->videoOrientationExtensionId; - // Assign the payload descriptor handler. - packet->payloadDescriptorHandler = this->payloadDescriptorHandler; + std::shared_ptr shared = std::allocate_shared( + RtpPacket::Allocator::Pool, + newHeader, + newHeaderExtension, + newPayload, + this->payloadLength, + this->payloadPadding, + this->size); + + shared->midExtensionId = this->midExtensionId; + shared->ridExtensionId = this->ridExtensionId; + shared->rridExtensionId = this->rridExtensionId; + shared->absSendTimeExtensionId = this->absSendTimeExtensionId; + shared->transportWideCc01ExtensionId = this->transportWideCc01ExtensionId; + shared->frameMarking07ExtensionId = this->frameMarking07ExtensionId; // Remove once RFC. + shared->frameMarkingExtensionId = this->frameMarkingExtensionId; + shared->ssrcAudioLevelExtensionId = this->ssrcAudioLevelExtensionId; + shared->videoOrientationExtensionId = this->videoOrientationExtensionId; + // Clone payload descriptor handler. + shared->payloadDescriptorHandler = this->payloadDescriptorHandler; // Store allocated buffer. - packet->buffer = buffer; + shared->buffer = buffer; - return packet; + return shared; } // NOTE: The caller must ensure that the buffer/memmory of the packet has diff --git a/worker/src/RTC/RtpStreamSend.cpp b/worker/src/RTC/RtpStreamSend.cpp index f0cf95ac59..68d81c2604 100644 --- a/worker/src/RTC/RtpStreamSend.cpp +++ b/worker/src/RTC/RtpStreamSend.cpp @@ -123,7 +123,9 @@ namespace RTC auto* storageItem = this->buffer[0]; - delete storageItem; + // Destroy and deallocate the StorageItem. + StorageItem::AllocatorTraits::destroy(StorageItem::Allocator::Pool, storageItem); + StorageItem::Allocator::Pool.deallocate(storageItem, 1); this->buffer[0] = nullptr; @@ -143,10 +145,9 @@ namespace RTC if (!storageItem) continue; - // Reset the storage item (decrease RTP packet shared pointer counter). - storageItem->Reset(); - - delete storageItem; + // Destroy and deallocate the StorageItem. + StorageItem::AllocatorTraits::destroy(StorageItem::Allocator::Pool, storageItem); + StorageItem::Allocator::Pool.deallocate(storageItem, 1); } this->buffer.clear(); @@ -492,7 +493,10 @@ namespace RTC else { // Allocate a new storage item. - storageItem = new StorageItem(); + storageItem = StorageItem::Allocator::Pool.allocate(1); + // Memory is not initialized in any way, reset it. + // Create a new StorageItem instance in this memory. + StorageItem::AllocatorTraits::construct(StorageItem::Allocator::Pool, storageItem); this->storageItemBuffer.Insert(seq, storageItem); } @@ -500,7 +504,7 @@ namespace RTC // Only clone once and only if necessary. if (!sharedPacket.get()) { - sharedPacket.reset(packet->Clone()); + sharedPacket = packet->Clone(); } // Store original packet and some extra info into the storage item. diff --git a/worker/src/RTC/TcpConnection.cpp b/worker/src/RTC/TcpConnection.cpp index 511ec56190..98988ad76c 100644 --- a/worker/src/RTC/TcpConnection.cpp +++ b/worker/src/RTC/TcpConnection.cpp @@ -154,7 +154,11 @@ namespace RTC } } - void TcpConnection::Send(const uint8_t* data, size_t len, ::TcpConnectionHandler::onSendCallback* cb) + void TcpConnection::Send( + const uint8_t* data, + size_t len, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); @@ -163,6 +167,6 @@ namespace RTC uint8_t frameLen[2]; Utils::Byte::Set2Bytes(frameLen, 0, len); - ::TcpConnectionHandler::Write(frameLen, 2, data, len, cb); + ::TcpConnectionHandler::Write(frameLen, 2, data, len, cb, ctx); } } // namespace RTC diff --git a/worker/src/RTC/Transport.cpp b/worker/src/RTC/Transport.cpp index a9a68fa24c..5c79b00390 100644 --- a/worker/src/RTC/Transport.cpp +++ b/worker/src/RTC/Transport.cpp @@ -30,6 +30,32 @@ namespace RTC static size_t DefaultSctpSendBufferSize{ 262144 }; // 2^18. static size_t MaxSctpSendBufferSize{ 268435456 }; // 2^28. +#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR + void Transport::OnSendCallback(bool sent, OnSendCallbackCtx* ctx) + { + if (sent) + { + ctx->tccClient->PacketSent(ctx->packetInfo, DepLibUV::GetTimeMsInt64()); + + ctx->sentInfo.sentAtMs = DepLibUV::GetTimeMs(); + + ctx->senderBwe->RtpPacketSent(ctx->sentInfo); + } + + OnSendCallbackCtx::AllocatorTraits::destroy(OnSendCallbackCtx::Allocator::Pool, ctx); + OnSendCallbackCtx::Allocator::Pool.deallocate(ctx, 1); + } +#else + void Transport::OnSendCallback(bool sent, OnSendCallbackCtx* ctx) + { + if (sent) + ctx->tccClient->PacketSent(ctx->packetInfo, DepLibUV::GetTimeMsInt64()); + + OnSendCallbackCtx::AllocatorTraits::destroy(OnSendCallbackCtx::Allocator::Pool, ctx); + OnSendCallbackCtx::Allocator::Pool.deallocate(ctx, 1); + } +#endif + /* Instance methods. */ Transport::Transport(const std::string& id, Listener* listener, json& data) @@ -1587,7 +1613,7 @@ namespace RTC // Tell the child class to remove this SSRC. RecvStreamClosed(packet->GetSsrc()); - delete packet; + RtpPacket::Deallocate(packet); return; } @@ -1616,7 +1642,7 @@ namespace RTC default:; } - delete packet; + RtpPacket::Deallocate(packet); } void Transport::ReceiveRtcpPacket(RTC::RTCP::Packet* packet) @@ -2520,7 +2546,6 @@ namespace RTC { this->transportWideCcSeq++; - auto* tccClient = this->tccClient; webrtc::RtpPacketSendInfo packetInfo; packetInfo.ssrc = packet->GetSsrc(); @@ -2533,6 +2558,9 @@ namespace RTC // Indicate the pacer (and prober) that a packet is to be sent. this->tccClient->InsertPacket(packetInfo); + auto* ctx = OnSendCallbackCtx::Allocator::Pool.allocate(1); + + OnSendCallbackCtx::AllocatorTraits::construct(OnSendCallbackCtx::Allocator::Pool, ctx); #ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR auto* senderBwe = this->senderBwe; RTC::SenderBandwidthEstimator::SentInfo sentInfo; @@ -2541,30 +2569,15 @@ namespace RTC sentInfo.size = packet->GetSize(); sentInfo.sendingAtMs = DepLibUV::GetTimeMs(); - auto* cb = new onSendCallback( - [tccClient, &packetInfo, senderBwe, &sentInfo](bool sent) - { - if (sent) - { - tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64()); - - sentInfo.sentAtMs = DepLibUV::GetTimeMs(); - - senderBwe->RtpPacketSent(sentInfo); - } - }); - - SendRtpPacket(consumer, packet, cb); + ctx->tccClient = this->tccClient; + ctx->packetInfo = packetInfo; + ctx->senderBwe = senderBwe; + ctx->sentInfo = sentInfo; #else - const auto* cb = new onSendCallback( - [tccClient, &packetInfo](bool sent) - { - if (sent) - tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64()); - }); - - SendRtpPacket(consumer, packet, cb); + ctx->tccClient = this->tccClient; + ctx->packetInfo = packetInfo; #endif + SendRtpPacket(consumer, packet, OnSendCallback, ctx); } else { @@ -2592,7 +2605,6 @@ namespace RTC { this->transportWideCcSeq++; - auto* tccClient = this->tccClient; webrtc::RtpPacketSendInfo packetInfo; packetInfo.ssrc = packet->GetSsrc(); @@ -2605,6 +2617,9 @@ namespace RTC // Indicate the pacer (and prober) that a packet is to be sent. this->tccClient->InsertPacket(packetInfo); + auto* ctx = OnSendCallbackCtx::Allocator::Pool.allocate(1); + + OnSendCallbackCtx::AllocatorTraits::construct(OnSendCallbackCtx::Allocator::Pool, ctx); #ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR auto* senderBwe = this->senderBwe; RTC::SenderBandwidthEstimator::SentInfo sentInfo; @@ -2613,30 +2628,15 @@ namespace RTC sentInfo.size = packet->GetSize(); sentInfo.sendingAtMs = DepLibUV::GetTimeMs(); - auto* cb = new onSendCallback( - [tccClient, &packetInfo, senderBwe, &sentInfo](bool sent) - { - if (sent) - { - tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64()); - - sentInfo.sentAtMs = DepLibUV::GetTimeMs(); - - senderBwe->RtpPacketSent(sentInfo); - } - }); - - SendRtpPacket(consumer, packet, cb); + ctx->tccClient = this->tccClient; + ctx->packetInfo = packetInfo; + ctx->senderBwe = senderBwe; + ctx->sentInfo = sentInfo; #else - const auto* cb = new onSendCallback( - [tccClient, &packetInfo](bool sent) - { - if (sent) - tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64()); - }); - - SendRtpPacket(consumer, packet, cb); + ctx->tccClient = this->tccClient; + ctx->packetInfo = packetInfo; #endif + SendRtpPacket(consumer, packet, OnSendCallback, ctx); } else { @@ -2943,6 +2943,9 @@ namespace RTC // Indicate the pacer (and prober) that a packet is to be sent. this->tccClient->InsertPacket(packetInfo); + auto* ctx = OnSendCallbackCtx::Allocator::Pool.allocate(1); + + OnSendCallbackCtx::AllocatorTraits::construct(OnSendCallbackCtx::Allocator::Pool, ctx); #ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR auto* senderBwe = this->senderBwe; RTC::SenderBandwidthEstimator::SentInfo sentInfo; @@ -2952,30 +2955,15 @@ namespace RTC sentInfo.isProbation = true; sentInfo.sendingAtMs = DepLibUV::GetTimeMs(); - auto* cb = new onSendCallback( - [tccClient, &packetInfo, senderBwe, &sentInfo](bool sent) - { - if (sent) - { - tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64()); - - sentInfo.sentAtMs = DepLibUV::GetTimeMs(); - - senderBwe->RtpPacketSent(sentInfo); - } - }); - - SendRtpPacket(nullptr, packet, cb); + ctx->tccClient = this->tccClient; + ctx->packetInfo = packetInfo; + ctx->senderBwe = senderBwe; + ctx->sentInfo = sentInfo; #else - const auto* cb = new onSendCallback( - [tccClient, &packetInfo](bool sent) - { - if (sent) - tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64()); - }); - - SendRtpPacket(nullptr, packet, cb); + ctx->tccClient = this->tccClient; + ctx->packetInfo = packetInfo; #endif + SendRtpPacket(nullptr, packet, OnSendCallback, ctx); } else { diff --git a/worker/src/RTC/WebRtcTransport.cpp b/worker/src/RTC/WebRtcTransport.cpp index 698231dbbc..4da87055e9 100644 --- a/worker/src/RTC/WebRtcTransport.cpp +++ b/worker/src/RTC/WebRtcTransport.cpp @@ -831,7 +831,10 @@ namespace RTC } void WebRtcTransport::SendRtpPacket( - RTC::Consumer* /*consumer*/, RTC::RtpPacket* packet, RTC::Transport::onSendCallback* cb) + RTC::Consumer* /*consumer*/, + RTC::RtpPacket* packet, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); @@ -839,8 +842,7 @@ namespace RTC { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -853,8 +855,7 @@ namespace RTC if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -867,8 +868,7 @@ namespace RTC { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -876,7 +876,7 @@ namespace RTC auto len = static_cast(intLen); - this->iceServer->GetSelectedTuple()->Send(data, len, cb); + this->iceServer->GetSelectedTuple()->Send(data, len, cb, ctx); // Increase send transmission. RTC::Transport::DataSent(len); @@ -1121,7 +1121,7 @@ namespace RTC packet->GetPayloadType(), packet->GetSequenceNumber()); - delete packet; + RtpPacket::Deallocate(packet); } return; diff --git a/worker/src/handles/TcpConnectionHandler.cpp b/worker/src/handles/TcpConnectionHandler.cpp index 5fd6a7f045..7dfa24e272 100644 --- a/worker/src/handles/TcpConnectionHandler.cpp +++ b/worker/src/handles/TcpConnectionHandler.cpp @@ -32,11 +32,14 @@ inline static void onWrite(uv_write_t* req, int status) auto* handle = req->handle; auto* connection = static_cast(handle->data); auto* cb = writeData->cb; + auto* ctx = writeData->ctx; if (connection) - connection->OnUvWrite(status, cb); + connection->OnUvWrite(status, cb, ctx); + else if (cb) + (*cb)(false, ctx); - // Delete the UvWriteData struct and the cb. + // Delete the UvWriteData struct. delete writeData; } @@ -179,7 +182,8 @@ void TcpConnectionHandler::Write( size_t len1, const uint8_t* data2, size_t len2, - TcpConnectionHandler::onSendCallback* cb) + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); @@ -187,8 +191,7 @@ void TcpConnectionHandler::Write( { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -198,8 +201,7 @@ void TcpConnectionHandler::Write( { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -225,8 +227,7 @@ void TcpConnectionHandler::Write( if (cb) { - (*cb)(true); - delete cb; + (*cb)(true, ctx); } return; @@ -267,7 +268,8 @@ void TcpConnectionHandler::Write( len2 - (static_cast(written) - len1)); } - writeData->cb = cb; + writeData->cb = cb; + writeData->ctx = ctx; uv_buf_t buffer = uv_buf_init(reinterpret_cast(writeData->store), pendingLen); @@ -283,9 +285,9 @@ void TcpConnectionHandler::Write( MS_WARN_DEV("uv_write() failed: %s", uv_strerror(err)); if (cb) - (*cb)(false); + (*cb)(false, ctx); - // Delete the UvWriteData struct (it will delete the store and cb too). + // Delete the UvWriteData struct (it will delete the store too). delete writeData; } else @@ -399,16 +401,15 @@ inline void TcpConnectionHandler::OnUvRead(ssize_t nread, const uv_buf_t* /*buf* } } -inline void TcpConnectionHandler::OnUvWrite(int status, TcpConnectionHandler::onSendCallback* cb) +inline void TcpConnectionHandler::OnUvWrite( + int status, RTC::Transport::onSendCallback* cb, RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); - // NOTE: Do not delete cb here since it will be delete in onWrite() above. - if (status == 0) { if (cb) - (*cb)(true); + (*cb)(true, ctx); } else { @@ -418,7 +419,7 @@ inline void TcpConnectionHandler::OnUvWrite(int status, TcpConnectionHandler::on MS_WARN_DEV("write error, closing the connection: %s", uv_strerror(status)); if (cb) - (*cb)(false); + (*cb)(false, ctx); Close(); diff --git a/worker/src/handles/UdpSocketHandler.cpp b/worker/src/handles/UdpSocketHandler.cpp index 6cd1fa8e3e..39e74eea37 100644 --- a/worker/src/handles/UdpSocketHandler.cpp +++ b/worker/src/handles/UdpSocketHandler.cpp @@ -37,11 +37,14 @@ inline static void onSend(uv_udp_send_t* req, int status) auto* handle = req->handle; auto* socket = static_cast(handle->data); auto* cb = sendData->cb; + auto* ctx = sendData->ctx; if (socket) - socket->OnUvSend(status, cb); + socket->OnUvSend(status, cb, ctx); + else if (cb) + (*cb)(false, ctx); - // Delete the UvSendData struct (it will delete the store and cb too). + // Delete the UvSendData struct (it will delete the store too). delete sendData; } @@ -119,7 +122,11 @@ void UdpSocketHandler::Dump() const } void UdpSocketHandler::Send( - const uint8_t* data, size_t len, const struct sockaddr* addr, UdpSocketHandler::onSendCallback* cb) + const uint8_t* data, + size_t len, + const struct sockaddr* addr, + RTC::Transport::onSendCallback* cb, + RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); @@ -127,8 +134,7 @@ void UdpSocketHandler::Send( { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -138,8 +144,7 @@ void UdpSocketHandler::Send( { if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -159,8 +164,7 @@ void UdpSocketHandler::Send( if (cb) { - (*cb)(true); - delete cb; + (*cb)(true, ctx); } return; @@ -174,8 +178,7 @@ void UdpSocketHandler::Send( if (cb) { - (*cb)(false); - delete cb; + (*cb)(false, ctx); } return; @@ -190,7 +193,8 @@ void UdpSocketHandler::Send( sendData->req.data = static_cast(sendData); std::memcpy(sendData->store, data, len); - sendData->cb = cb; + sendData->cb = cb; + sendData->ctx = ctx; buffer = uv_buf_init(reinterpret_cast(sendData->store), len); @@ -204,9 +208,9 @@ void UdpSocketHandler::Send( MS_WARN_DEV("uv_udp_send() failed: %s", uv_strerror(err)); if (cb) - (*cb)(false); + (*cb)(false, ctx); - // Delete the UvSendData struct (it will delete the store and cb too). + // Delete the UvSendData struct. delete sendData; } else @@ -284,16 +288,15 @@ inline void UdpSocketHandler::OnUvRecv( } } -inline void UdpSocketHandler::OnUvSend(int status, UdpSocketHandler::onSendCallback* cb) +inline void UdpSocketHandler::OnUvSend( + int status, RTC::Transport::onSendCallback* cb, RTC::Transport::OnSendCallbackCtx* ctx) { MS_TRACE(); - // NOTE: Do not delete cb here since it will be delete in onSend() above. - if (status == 0) { if (cb) - (*cb)(true); + (*cb)(true, ctx); } else { @@ -302,6 +305,6 @@ inline void UdpSocketHandler::OnUvSend(int status, UdpSocketHandler::onSendCallb #endif if (cb) - (*cb)(false); + (*cb)(false, ctx); } } diff --git a/worker/test/src/RTC/TestRtpPacket.cpp b/worker/test/src/RTC/TestRtpPacket.cpp index 56f9f16f01..0da69b89ca 100644 --- a/worker/test/src/RTC/TestRtpPacket.cpp +++ b/worker/test/src/RTC/TestRtpPacket.cpp @@ -6,6 +6,8 @@ #include #include +// #define PERFORMANCE_TEST 1 + using namespace RTC; static uint8_t buffer[65536]; @@ -47,7 +49,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(packet->ReadRid(rid) == false); REQUIRE(rid == ""); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("parse packet2.raw") @@ -73,7 +75,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(packet->HasOneByteExtensions() == false); REQUIRE(packet->HasTwoBytesExtensions() == false); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("parse packet3.raw") @@ -127,7 +129,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(packet->ReadAbsSendTime(absSendTime) == true); REQUIRE(absSendTime == 0x65341e); - auto* clonedPacket = packet->Clone(); + auto clonedPacket = packet->Clone(); std::memset(buffer, '0', sizeof(buffer)); @@ -163,8 +165,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(clonedPacket->ReadAbsSendTime(absSendTime) == true); REQUIRE(absSendTime == 0x65341e); - delete packet; - delete clonedPacket; + RTC::RtpPacket::Deallocate(packet); } SECTION("create RtpPacket without header extension") @@ -192,7 +193,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(packet->HasTwoBytesExtensions() == false); REQUIRE(packet->GetSsrc() == 5); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("create RtpPacket with One-Byte header extension") @@ -233,7 +234,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(packet->GetPayloadLength() == 1000); REQUIRE(packet->GetSize() == 1028); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("create RtpPacket with Two-Bytes header extension") @@ -299,7 +300,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(extenValue == nullptr); REQUIRE(extenLen == 0); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("rtx encryption-decryption") @@ -340,7 +341,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") auto rtxPacket = packet->Clone(); - delete packet; + RTC::RtpPacket::Deallocate(packet); std::memset(buffer, '0', sizeof(buffer)); @@ -369,8 +370,6 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(rtxPacket->GetHeaderExtensionLength() == 12); REQUIRE(rtxPacket->HasOneByteExtensions() == false); REQUIRE(rtxPacket->HasTwoBytesExtensions()); - - delete rtxPacket; } SECTION("create RtpPacket and apply payload shift to it") @@ -481,7 +480,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(packet->GetPayloadPadding() == 0); REQUIRE(packet->GetSize() == 1028); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("set One-Byte header extensions") @@ -645,7 +644,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(extenValue[2] == 0x03); REQUIRE(extenValue[3] == 0x00); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("set Two-Bytes header extensions") @@ -790,7 +789,7 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(packet->HasExtension(24) == true); REQUIRE(extenLen == 4); - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("read frame-marking extension") @@ -840,6 +839,128 @@ SCENARIO("parse RTP packets", "[parser][rtp]") REQUIRE(frameMarking->lid == 1); REQUIRE(frameMarking->tl0picidx == 5); - delete packet; + RTC::RtpPacket::Deallocate(packet); + } + +#ifdef PERFORMANCE_TEST + SECTION("Parse()") + { + // clang-format off + uint8_t buffer[] = + { + 0b10010000, 0b00000001, 0, 8, + 0, 0, 0, 4, + 0, 0, 0, 5, + 0xBE, 0xDE, 0, 1, // Header Extension + 0b00110010, 0b10101011, 1, 5, + 1, 2, 3, 4 + }; + // clang-format on + + const uint8_t probes = 5; + size_t iterations = 1000000; + std::array durations; + + for (auto idx = 0; idx < probes; ++idx) + { + auto start = std::chrono::system_clock::now(); + + for (auto i = 0; i < iterations; ++i) + { + RtpPacket* packet = RtpPacket::Parse(buffer, sizeof(buffer)); + + if (!packet) + FAIL("not a RTP packet"); + + REQUIRE(packet->HasMarker() == false); + REQUIRE(packet->HasHeaderExtension() == true); + REQUIRE(packet->GetPayloadType() == 1); + REQUIRE(packet->GetSequenceNumber() == 8); + REQUIRE(packet->GetTimestamp() == 4); + REQUIRE(packet->GetSsrc() == 5); + REQUIRE(packet->GetHeaderExtensionId() == 0xBEDE); + REQUIRE(packet->GetHeaderExtensionLength() == 4); + REQUIRE(packet->HasOneByteExtensions()); + REQUIRE(packet->HasTwoBytesExtensions() == false); + REQUIRE(packet->GetPayloadLength() == 4); + + RTC::RtpPacket::Deallocate(packet); + } + + std::chrono::duration dur = std::chrono::system_clock::now() - start; + + durations[idx] = dur.count(); + } + + double sum{ 0 }; + + for (auto idx = 0; idx < probes; ++idx) + { + sum += durations[idx]; + } + + std::cout << +probes << " probes of " << iterations << " RtpPackets parsed in an AVG: \t" + << sum / probes << " seconds" << std::endl; + } + + SECTION("Clone()") + { + // clang-format off + uint8_t buffer[] = + { + 0b10010000, 0b00000001, 0, 8, + 0, 0, 0, 4, + 0, 0, 0, 5, + 0xBE, 0xDE, 0, 1, // Header Extension + 0b00110010, 0b10101011, 1, 5, + 1, 2, 3, 4 + }; + // clang-format on + + RtpPacket* packet = RtpPacket::Parse(buffer, sizeof(buffer)); + + if (!packet) + FAIL("not a RTP packet"); + + const uint8_t probes = 5; + size_t iterations = 1000000; + std::array durations; + + for (auto idx = 0; idx < probes; ++idx) + { + auto start = std::chrono::system_clock::now(); + + for (auto i = 0; i < iterations; ++i) + { + auto clone = packet->Clone(); + + REQUIRE(clone->HasMarker() == false); + REQUIRE(clone->HasHeaderExtension() == true); + REQUIRE(clone->GetPayloadType() == 1); + REQUIRE(clone->GetSequenceNumber() == 8); + REQUIRE(clone->GetTimestamp() == 4); + REQUIRE(clone->GetSsrc() == 5); + REQUIRE(clone->GetHeaderExtensionId() == 0xBEDE); + REQUIRE(clone->GetHeaderExtensionLength() == 4); + REQUIRE(clone->HasOneByteExtensions()); + REQUIRE(clone->HasTwoBytesExtensions() == false); + REQUIRE(clone->GetPayloadLength() == 4); + } + + std::chrono::duration dur = std::chrono::system_clock::now() - start; + + durations[idx] = dur.count(); + } + + double sum{ 0 }; + + for (auto idx = 0; idx < probes; ++idx) + { + sum += durations[idx]; + } + + std::cout << +probes << " probes of " << iterations << " RtpPackets cloned in an AVG: \t" + << sum / probes << " seconds" << std::endl; } +#endif } diff --git a/worker/test/src/RTC/TestRtpPacketH264Svc.cpp b/worker/test/src/RTC/TestRtpPacketH264Svc.cpp index 022bee4a13..195355fe15 100644 --- a/worker/test/src/RTC/TestRtpPacketH264Svc.cpp +++ b/worker/test/src/RTC/TestRtpPacketH264Svc.cpp @@ -12,7 +12,7 @@ using namespace RTC; static uint8_t buffer[65536]; static uint8_t buffer2[65536]; -SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") +SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp][h264]") { SECTION("parse I0-7.bin") { @@ -70,8 +70,7 @@ SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") REQUIRE(payloadDescriptor->isKeyFrame == true); delete payloadDescriptor; - - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("parse I0-8.bin") @@ -130,8 +129,7 @@ SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") REQUIRE(payloadDescriptor->isKeyFrame == false); delete payloadDescriptor; - - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("parse I0-5.bin") @@ -189,8 +187,7 @@ SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") REQUIRE(payloadDescriptor->hasTlIndex == false); delete payloadDescriptor; - - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("parse I1-15.bin") @@ -249,8 +246,7 @@ SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") REQUIRE(payloadDescriptor->isKeyFrame == false); delete payloadDescriptor; - - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("parse I0-14.bin") @@ -309,8 +305,7 @@ SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") REQUIRE(payloadDescriptor->isKeyFrame == true); delete payloadDescriptor; - - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("parse 2SL-I14.bin") @@ -370,8 +365,7 @@ SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") REQUIRE(payloadDescriptor->isKeyFrame == true); delete payloadDescriptor; - - delete packet; + RTC::RtpPacket::Deallocate(packet); } SECTION("create and test RTP files") @@ -446,7 +440,7 @@ SCENARIO("parse RTP packets with H264 SVC", "[parser][rtp]") rows++; delete payloadDescriptor; - delete packet; + RTC::RtpPacket::Deallocate(packet); } nf.close(); diff --git a/worker/test/src/RTC/TestRtpStreamSend.cpp b/worker/test/src/RTC/TestRtpStreamSend.cpp index b981730cc6..bb40f5855a 100644 --- a/worker/test/src/RTC/TestRtpStreamSend.cpp +++ b/worker/test/src/RTC/TestRtpStreamSend.cpp @@ -6,7 +6,7 @@ #include #include -// #define PERFORMANCE_TEST 1 +#define PERFORMANCE_TEST 1 using namespace RTC; @@ -441,24 +441,28 @@ SCENARIO("NACK and RTP packets retransmission", "[rtp][rtcp][nack]") auto start = std::chrono::system_clock::now(); + // Create packet. + auto* packet = RtpPacket::Parse(rtpBuffer1, 1500); + packet->SetSsrc(1111); + for (size_t i = 0; i < iterations; i++) { - // Create packet. - auto* packet = RtpPacket::Parse(rtpBuffer1, 1500); - packet->SetSsrc(1111); - - std::shared_ptr sharedPacket(packet); + std::shared_ptr sharedPacket; stream->ReceivePacket(packet, sharedPacket); + stream->Pause(); } std::chrono::duration dur = std::chrono::system_clock::now() - start; - std::cout << "nullptr && initialized shared_ptr: \t" << dur.count() << " seconds" << std::endl; + std::cout << iterations << " video RtpPackets processed in \t" << dur.count() + << " seconds for a NACK enabled stream" << std::endl; delete stream; - params.mimeType.type = RTC::RtpCodecMimeType::Type::AUDIO; - stream = new RtpStreamSend(&testRtpStreamListener, params, mid); + // Perform the same test with NACK disabled. + + params.useNack = false; + stream = new RtpStreamSend(&testRtpStreamListener, params, mid); start = std::chrono::system_clock::now(); @@ -466,17 +470,17 @@ SCENARIO("NACK and RTP packets retransmission", "[rtp][rtcp][nack]") { std::shared_ptr sharedPacket; - // Create packet. - auto* packet = RtpPacket::Parse(rtpBuffer1, 1500); - packet->SetSsrc(1111); - stream->ReceivePacket(packet, sharedPacket); + stream->Pause(); } dur = std::chrono::system_clock::now() - start; - std::cout << "raw && empty shared_ptr duration: \t" << dur.count() << " seconds" << std::endl; + std::cout << iterations << " video RtpPackets processed in \t" << dur.count() + << " seconds for a NACK disabled stream" << std::endl; delete stream; + + RTC::RtpPacket::Deallocate(packet); } #endif }