Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ObjectPoolAllocator #903

Open
wants to merge 17 commits into
base: v3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions worker/fuzzer/src/RTC/FuzzerRtpPacket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,7 @@ void Fuzzer::RTC::RtpPacket::Fuzz(const uint8_t* data, size_t len)
packet->GetPayloadPadding();
packet->IsKeyFrame();

auto* clonedPacket = packet->Clone();

delete clonedPacket;
auto clonedPacket = packet->Clone();

// TODO: packet->RtxEncode(); // This cannot be tested this way.
// TODO: packet->RtxDecode(); // This cannot be tested this way.
Expand All @@ -190,5 +188,5 @@ void Fuzzer::RTC::RtpPacket::Fuzz(const uint8_t* data, size_t len)
// TODO: packet->ProcessPayload();
// TODO: packet->ShiftPayload();

delete packet;
::RTC::RtpPacket::Deallocate(packet);
}
3 changes: 2 additions & 1 deletion worker/fuzzer/src/RTC/FuzzerRtpStreamSend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ void Fuzzer::RTC::RtpStreamSend::Fuzz(const uint8_t* data, size_t len)
}

delete stream;
delete packet;

::RTC::RtpPacket::Deallocate(packet);
}
3 changes: 2 additions & 1 deletion worker/include/RTC/DirectTransport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ namespace RTC
void SendRtpPacket(
RTC::Consumer* consumer,
RTC::RtpPacket* packet,
RTC::Transport::onSendCallback* cb = nullptr) override;
RTC::Transport::onSendCallback* cb = nullptr,
RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override;
void SendRtcpPacket(RTC::RTCP::Packet* packet) override;
void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override;
void SendMessage(
Expand Down
3 changes: 2 additions & 1 deletion worker/include/RTC/PipeTransport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ namespace RTC
void SendRtpPacket(
RTC::Consumer* consumer,
RTC::RtpPacket* packet,
RTC::Transport::onSendCallback* cb = nullptr) override;
RTC::Transport::onSendCallback* cb = nullptr,
RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override;
void SendRtcpPacket(RTC::RTCP::Packet* packet) override;
void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override;
void SendMessage(
Expand Down
3 changes: 2 additions & 1 deletion worker/include/RTC/PlainTransport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ namespace RTC
void SendRtpPacket(
RTC::Consumer* consumer,
RTC::RtpPacket* packet,
RTC::Transport::onSendCallback* cb = nullptr) override;
RTC::Transport::onSendCallback* cb = nullptr,
RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override;
void SendRtcpPacket(RTC::RTCP::Packet* packet) override;
void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override;
void SendMessage(
Expand Down
16 changes: 12 additions & 4 deletions worker/include/RTC/RtpPacket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ namespace RTC
class RtpPacket
{
public:
using Allocator = Utils::ObjectPoolAllocator<RtpPacket>;
using AllocatorTraits = std::allocator_traits<Allocator>;
// Memory to hold the cloned packet (with extra space for RTX encoding).
using RtpPacketBuffer = std::array<uint8_t, MtuSize + 100>;
using BufferAllocator = Utils::ObjectPoolAllocator<RtpPacket::RtpPacketBuffer>;
using BufferAllocatorTraits = std::allocator_traits<BufferAllocator>;

static void Deallocate(RtpPacket* packet);

/* Struct for RTP header. */
struct Header
{
Expand Down Expand Up @@ -133,7 +142,7 @@ namespace RTC

static RtpPacket* Parse(const uint8_t* data, size_t len);

private:
public:
jmillan marked this conversation as resolved.
Show resolved Hide resolved
RtpPacket(
Header* header,
HeaderExtension* headerExtension,
Expand All @@ -142,7 +151,6 @@ namespace RTC
uint8_t payloadPadding,
size_t size);

public:
~RtpPacket();

void Dump() const;
Expand Down Expand Up @@ -589,7 +597,7 @@ namespace RTC
return this->payloadDescriptorHandler->IsKeyFrame();
}

RtpPacket* Clone() const;
std::shared_ptr<RtpPacket> Clone() const;

void RtxEncode(uint8_t payloadType, uint32_t ssrc, uint16_t seq);

Expand Down Expand Up @@ -635,7 +643,7 @@ namespace RTC
std::shared_ptr<Codecs::PayloadDescriptorHandler> payloadDescriptorHandler;
// Buffer where this packet is allocated, can be `nullptr` if packet was
// parsed from externally provided buffer.
uint8_t* buffer{ nullptr };
RtpPacketBuffer* buffer{ nullptr };
};
} // namespace RTC

Expand Down
3 changes: 3 additions & 0 deletions worker/include/RTC/RtpStreamSend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ namespace RTC
public:
struct StorageItem
{
using Allocator = Utils::ObjectPoolAllocator<RtpStreamSend::StorageItem>;
using AllocatorTraits = std::allocator_traits<Allocator>;

void Reset();

// Original packet.
Expand Down
6 changes: 5 additions & 1 deletion worker/include/RTC/TcpConnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ namespace RTC
~TcpConnection() override;

public:
void Send(const uint8_t* data, size_t len, ::TcpConnectionHandler::onSendCallback* cb);
void Send(
const uint8_t* data,
size_t len,
RTC::Transport::onSendCallback* cb,
RTC::Transport::OnSendCallbackCtx* ctx);

/* Pure virtual methods inherited from ::TcpConnectionHandler. */
public:
Expand Down
29 changes: 27 additions & 2 deletions worker/include/RTC/Transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,31 @@ namespace RTC
public Timer::Listener
{
protected:
using onSendCallback = const std::function<void(bool sent)>;
using onQueuedCallback = const std::function<void(bool queued, bool sctpSendBufferFull)>;

public:
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
struct OnSendCallbackCtx
{
RTC::TransportCongestionControlClient* tccClient;
webrtc::RtpPacketSendInfo packetInfo;
RTC::SenderBandwidthEstimator* senderBwe;
RTC::SenderBandwidthEstimator::SentInfo sentInfo;
};
#else
struct OnSendCallbackCtx
{
using Allocator = Utils::ObjectPoolAllocator<Transport::OnSendCallbackCtx>;
using AllocatorTraits = std::allocator_traits<Allocator>;

RTC::TransportCongestionControlClient* tccClient;
webrtc::RtpPacketSendInfo packetInfo;
};
#endif
// This function MUST NOT be de-allocated manually and MUST be called EXACTLY once.
static void OnSendCallback(bool sent, OnSendCallbackCtx* ctx);
using onSendCallback = void(bool sent, OnSendCallbackCtx* ctx);

public:
class Listener
{
Expand Down Expand Up @@ -170,7 +192,10 @@ namespace RTC
private:
virtual bool IsConnected() const = 0;
virtual void SendRtpPacket(
RTC::Consumer* consumer, RTC::RtpPacket* packet, onSendCallback* cb = nullptr) = 0;
RTC::Consumer* consumer,
RTC::RtpPacket* packet,
onSendCallback* cb = nullptr,
OnSendCallbackCtx* ctx = nullptr) = 0;
void HandleRtcpPacket(RTC::RTCP::Packet* packet);
void SendRtcp(uint64_t nowMs);
virtual void SendRtcpPacket(RTC::RTCP::Packet* packet) = 0;
Expand Down
14 changes: 8 additions & 6 deletions worker/include/RTC/TransportTuple.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "common.hpp"
#include "Utils.hpp"
#include "RTC/TcpConnection.hpp"
#include "RTC/Transport.hpp"
#include "RTC/UdpSocket.hpp"
#include <nlohmann/json.hpp>
#include <string>
Expand All @@ -14,9 +15,6 @@ namespace RTC
{
class TransportTuple
{
protected:
using onSendCallback = const std::function<void(bool sent)>;

public:
enum class Protocol
{
Expand Down Expand Up @@ -85,12 +83,16 @@ namespace RTC
this->localAnnouncedIp = localAnnouncedIp;
}

void Send(const uint8_t* data, size_t len, RTC::TransportTuple::onSendCallback* cb = nullptr)
void Send(
const uint8_t* data,
size_t len,
Transport::onSendCallback* cb = nullptr,
Transport::OnSendCallbackCtx* ctx = nullptr)
{
if (this->protocol == Protocol::UDP)
this->udpSocket->Send(data, len, this->udpRemoteAddr, cb);
this->udpSocket->Send(data, len, this->udpRemoteAddr, cb, ctx);
else
this->tcpConnection->Send(data, len, cb);
this->tcpConnection->Send(data, len, cb, ctx);
}

Protocol GetProtocol() const
Expand Down
3 changes: 2 additions & 1 deletion worker/include/RTC/WebRtcTransport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ namespace RTC
void SendRtpPacket(
RTC::Consumer* consumer,
RTC::RtpPacket* packet,
RTC::Transport::onSendCallback* cb = nullptr) override;
RTC::Transport::onSendCallback* cb = nullptr,
RTC::Transport::OnSendCallbackCtx* ctx = nullptr) override;
void SendRtcpPacket(RTC::RTCP::Packet* packet) override;
void SendRtcpCompoundPacket(RTC::RTCP::CompoundPacket* packet) override;
void SendMessage(
Expand Down
81 changes: 81 additions & 0 deletions worker/include/Utils.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
#ifndef MS_UTILS_HPP
#define MS_UTILS_HPP

#ifndef MS_CLASS
#define MS_CLASS "Utils"
jmillan marked this conversation as resolved.
Show resolved Hide resolved
#endif

// #define MS_MEM_POOL_FREE_ON_RETURN 1

#include "common.hpp"
#include "Logger.hpp"
#include <openssl/evp.h>
#include <cmath>
#include <cstring> // std::memcmp(), std::memcpy()
Expand Down Expand Up @@ -353,6 +360,80 @@ namespace Utils
return false;
}
};

// Simple implementation of object pool only for single objects.
// Arrays are allocated as usual.
template<typename T>
class ObjectPoolAllocator
{
std::shared_ptr<std::vector<T*>> pool_data;

public:
typedef T value_type;
thread_local static Utils::ObjectPoolAllocator<T> Pool;

ObjectPoolAllocator()
{
pool_data = std::shared_ptr<std::vector<T*>>(
new std::vector<T*>(),
[](std::vector<T*>* pool)
{
for (auto* ptr : *pool)
{
std::free(ptr);
}
delete pool;
});
}

template<typename U>
ObjectPoolAllocator(const ObjectPoolAllocator<U>& other)
: pool_data(ObjectPoolAllocator<T>::Pool.pool_data)
{
}

~ObjectPoolAllocator()
{
}

T* allocate(size_t n)
{
MS_ASSERT(n == 1, "only single object can be allocated");

if (this->pool_data->empty())
{
return static_cast<T*>(std::malloc(sizeof(T)));
}

T* ptr = this->pool_data->back();
this->pool_data->pop_back();

return ptr;
}

void deallocate(T* ptr, size_t n)
{
if (!ptr)
{
return;
}

if (n > 1)
{
std::free(ptr);
return;
}

#ifdef MS_MEM_POOL_FREE_ON_RETURN
std::free(ptr);
#else
this->pool_data->push_back(ptr);
#endif
}
};

template<typename T>
thread_local Utils::ObjectPoolAllocator<T> Utils::ObjectPoolAllocator<T>::Pool;
} // namespace Utils

#endif
10 changes: 6 additions & 4 deletions worker/include/handles/TcpConnectionHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define MS_TCP_CONNECTION_HPP

#include "common.hpp"
#include "RTC/Transport.hpp"
#include <uv.h>
#include <string>

Expand Down Expand Up @@ -35,12 +36,12 @@ class TcpConnectionHandler
~UvWriteData()
{
delete[] this->store;
delete this->cb;
}

uv_write_t req;
uint8_t* store{ nullptr };
TcpConnectionHandler::onSendCallback* cb{ nullptr };
RTC::Transport::onSendCallback* cb{ nullptr };
RTC::Transport::OnSendCallbackCtx* ctx{ nullptr };
};

public:
Expand Down Expand Up @@ -71,7 +72,8 @@ class TcpConnectionHandler
size_t len1,
const uint8_t* data2,
size_t len2,
TcpConnectionHandler::onSendCallback* cb);
RTC::Transport::onSendCallback* cb,
RTC::Transport::OnSendCallbackCtx* ctx);
void ErrorReceiving();
const struct sockaddr* GetLocalAddress() const
{
Expand Down Expand Up @@ -117,7 +119,7 @@ class TcpConnectionHandler
public:
void OnUvReadAlloc(size_t suggestedSize, uv_buf_t* buf);
void OnUvRead(ssize_t nread, const uv_buf_t* buf);
void OnUvWrite(int status, onSendCallback* cb);
void OnUvWrite(int status, RTC::Transport::onSendCallback* cb, RTC::Transport::OnSendCallbackCtx* ctx);

/* Pure virtual methods that must be implemented by the subclass. */
protected:
Expand Down
Loading