Skip to content

Commit

Permalink
Make ObjectPoolAllocator from ObjectPool (#8)
Browse files Browse the repository at this point in the history
* Make ObjectPoolAllocator from ObjectPool
  • Loading branch information
Hartigan authored Mar 21, 2022
1 parent 217ceba commit 740f7f2
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 108 deletions.
29 changes: 22 additions & 7 deletions worker/include/RTC/RTCP/CompoundPacket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define MS_RTC_RTCP_COMPOUND_PACKET_HPP

#include "common.hpp"
#include "Utils.hpp"
#include "RTC/RTCP/ReceiverReport.hpp"
#include "RTC/RTCP/Sdes.hpp"
#include "RTC/RTCP/SenderReport.hpp"
Expand All @@ -15,7 +16,14 @@ namespace RTC
class CompoundPacket
{
public:
using UniquePtr = std::unique_ptr<CompoundPacket>;
struct CompoundPacketDeleter
{
void operator()(CompoundPacket* packet) const;
};

using UniquePtr = std::unique_ptr<CompoundPacket, CompoundPacketDeleter>;
using Allocator = Utils::ObjectPoolAllocator<CompoundPacket>;
using AllocatorTraits = std::allocator_traits<Allocator>;
static UniquePtr Create();

public:
Expand Down Expand Up @@ -57,11 +65,11 @@ namespace RTC
private:
// Use `CompoundPacket::Create()` instead
CompoundPacket() = default;
// Use `CompoundPacket::ReturnIntoPool()` instead
// Used by CompoundPacketDeleter
~CompoundPacket() = default;

friend struct std::default_delete<RTC::RTCP::CompoundPacket>;
static void ReturnIntoPool(CompoundPacket* packet);
friend struct CompoundPacketDeleter;
friend AllocatorTraits;

private:
uint8_t* header{ nullptr };
Expand All @@ -77,11 +85,18 @@ namespace RTC
namespace std
{
template<>
struct default_delete<RTC::RTCP::CompoundPacket>
struct allocator_traits<RTC::RTCP::CompoundPacket::Allocator>
{
void operator()(RTC::RTCP::CompoundPacket* ptr) const
template<typename... Args>
static void construct(
RTC::RTCP::CompoundPacket::Allocator& a, RTC::RTCP::CompoundPacket* p, Args&&... args)
{
new (p) RTC::RTCP::CompoundPacket(forward<Args>(args)...);
}

static void destroy(RTC::RTCP::CompoundPacket::Allocator& a, RTC::RTCP::CompoundPacket* p)
{
RTC::RTCP::CompoundPacket::ReturnIntoPool(ptr);
p->~CompoundPacket();
}
};
}; // namespace std
Expand Down
10 changes: 7 additions & 3 deletions worker/include/RTC/RtpPacket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ namespace RTC
public:
using RtpPacketBuffer = std::array<uint8_t, MtuSize + 100>;
using SharedPtr = std::shared_ptr<RtpPacket>;
using Allocator = Utils::ObjectPoolAllocator<RtpPacket>;
using AllocatorTraits = std::allocator_traits<Allocator>;
// Memory to hold the cloned packet (with extra space for RTX encoding).
using BufferAllocator = Utils::ObjectPoolAllocator<RtpPacket::RtpPacketBuffer>;
using BufferAllocatorTraits = std::allocator_traits<BufferAllocator>;

/* Struct for RTP header. */
struct Header
Expand Down Expand Up @@ -135,7 +140,7 @@ namespace RTC

static SharedPtr Parse(const uint8_t* data, size_t len);

private:
public:
RtpPacket(
Header* header,
HeaderExtension* headerExtension,
Expand Down Expand Up @@ -609,7 +614,7 @@ namespace RTC
void ShiftPayload(size_t payloadOffset, size_t shift, bool expand = true);

private:
friend SharedPtr;
friend AllocatorTraits;

void ParseExtensions();

Expand Down Expand Up @@ -643,5 +648,4 @@ namespace RTC
RtpPacketBuffer* buffer{ nullptr };
};
} // namespace RTC

#endif
3 changes: 3 additions & 0 deletions worker/include/RTC/RtpStreamSend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ namespace RTC
public:
struct StorageItem
{
using Allocator = Utils::ObjectPoolAllocator<RtpStreamSend::StorageItem>;
using AllocatorTraits = std::allocator_traits<Allocator>;

void Dump() const;

// Original packet.
Expand Down
3 changes: 3 additions & 0 deletions worker/include/RTC/Transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ namespace RTC
#else
struct OnSendCallbackCtx
{
using Allocator = Utils::ObjectPoolAllocator<Transport::OnSendCallbackCtx>;
using AllocatorTraits = std::allocator_traits<Allocator>;

RTC::TransportCongestionControlClient* tccClient;
webrtc::RtpPacketSendInfo packetInfo;
};
Expand Down
75 changes: 53 additions & 22 deletions worker/include/Utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,51 +351,82 @@ namespace Utils
}
};

// Simple implementation of object pool only for single objects
// Arrays are allocated as usual
template<typename T>
class ObjectPool
class ObjectPoolAllocator
{
std::shared_ptr<std::vector<T*>> pool_data;

public:
~ObjectPool()
typedef T value_type;
thread_local static Utils::ObjectPoolAllocator<T> Pool;

ObjectPoolAllocator()
{
for (auto ptr : this->pool)
{
std::free(ptr);
}
pool_data = std::shared_ptr<std::vector<T*>>(

This comment has been minimized.

Copy link
@jmillan

jmillan Sep 9, 2022

Member

Also, why is the internall pool vector a shared pointer?

This comment has been minimized.

Copy link
@Hartigan

Hartigan Sep 9, 2022

Author

We use reference to thread local pool for new allocator created via rebind operation. Rebind needed for single allocation for shared pointer structure and object structure

This comment has been minimized.

Copy link
@jmillan

jmillan Sep 9, 2022

Member

I'm probably missing something important.

This comment has been minimized.

Copy link
@Hartigan

Hartigan Sep 9, 2022

Author

new std::shared_ptr(new Object()) — two allocation’s

std::make_shared<Object>() — single memory allocation

std::allocate_shared — single allocate via pool. But we need create ObjectPoolAllocator<std::shared_ptr<T>> from ObjectPoolAllocator<T>. It’s case of usage of rebind

new std::vector<T*>(),
[](std::vector<T*>* pool)
{
for (auto* ptr : *pool)
{
std::free(ptr);
}
delete pool;
});
}

// Get pointer to allocated memory. This can be newly allocated memory or re-use of previously
// returned object. Object is not initialized and shouldn't be considered to be in a valid state.
T* Allocate()
template<typename U>
ObjectPoolAllocator(const ObjectPoolAllocator<U>& other)
: pool_data(ObjectPoolAllocator<T>::Pool.pool_data)

This comment has been minimized.

Copy link
@jmillan

jmillan Sep 9, 2022

Member

@Hartigan, that the reason for this implementation of the copy constructor?

I've removed it in #903 and seems not to be needed at all.

This comment has been minimized.

Copy link
@Hartigan

Hartigan Sep 9, 2022

Author

@jmillan it’s not copy constructor. It’s constructor for rebind operation: https://en.cppreference.com/w/cpp/memory/allocator_traits

This comment has been minimized.

Copy link
@jmillan

jmillan Sep 9, 2022

Member

I removed its body (assignment of pool_data) in #903 and it's properly working. Do you know why it's working without the body or whether it may make something not work as expected?

This comment has been minimized.

Copy link
@Hartigan

Hartigan Sep 9, 2022

Author

I expect NPE in std::allocate_shared<RtpPacket, RtpPacket::Allocator>. You can write simple test with std::allocate_shared call and with your changes.

This comment has been minimized.

Copy link
@jmillan

jmillan Sep 9, 2022

Member

NPE

Does it stand for Null Pointer Exception?

The branch of PR #903 is working with an empty constructor for rebind.

Since it is working with an empty constructor I'm wondering if it's needed in our case or otherwise which are the implications of it being empty.

You can write simple test with std::allocate_shared call and with your changes.

There are already tests for RtpPacket that use it, with no problem.

This comment has been minimized.

Copy link
@jmillan

jmillan Sep 9, 2022

Member

Forget this.., I thought I had removed the implementation but it's there... I'll test without it. Sorry for the confusion @Hartigan.

This comment has been minimized.

Copy link
@jmillan

jmillan Sep 9, 2022

Member

And yes, it crashes if the implementation is removed 👍

{
if (this->pool.empty())
}

~ObjectPoolAllocator()
{
}

T* allocate(size_t n)
{
if (n > 1)
{
return static_cast<T*>(std::malloc(sizeof(T) * n));
}

if (this->pool_data->empty())
{
return static_cast<T*>(std::malloc(sizeof(T)));
}

T* ptr = this->pool.back();
this->pool.pop_back();
T* ptr = this->pool_data->back();
this->pool_data->pop_back();

return ptr;
}

// Return allocated memory into internal pool for future use, make sure to run destructor before
// returning memory, ObjectPool will only de-allocate memory on exit.
void Return(T* ptr)
void deallocate(T* ptr, size_t n)
{
if (ptr)
if (!ptr)
{
return;
}

if (n > 1)
{
#ifdef MS_MEM_POOL_FREE_ON_RETURN
std::free(ptr);
return;
}

#ifdef MS_MEM_POOL_FREE_ON_RETURN
std::free(ptr);
#else
this->pool.push_back(ptr);
this->pool_data->push_back(ptr);
#endif
}
}

private:
std::vector<T*> pool;
};

template<typename T>
thread_local Utils::ObjectPoolAllocator<T> Utils::ObjectPoolAllocator<T>::Pool;
} // namespace Utils

#endif
26 changes: 12 additions & 14 deletions worker/src/RTC/RTCP/CompoundPacket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,13 @@ namespace RTC
{
namespace RTCP
{
thread_local static Utils::ObjectPool<CompoundPacket> CompoundPacketPool;

/* Instance methods. */

CompoundPacket::UniquePtr CompoundPacket::Create()
{
auto* packet = CompoundPacketPool.Allocate();

return UniquePtr(new (packet) CompoundPacket());
}

void CompoundPacket::ReturnIntoPool(CompoundPacket* packet)
{
if (packet)
{
packet->~CompoundPacket();
CompoundPacketPool.Return(packet);
}
auto* packet = CompoundPacket::Allocator::Pool.allocate(1);
CompoundPacket::AllocatorTraits::construct(CompoundPacket::Allocator::Pool, packet);
return UniquePtr(packet);
}

void CompoundPacket::Serialize(uint8_t* data)
Expand Down Expand Up @@ -163,5 +152,14 @@ namespace RTC

this->xrPacket.AddReport(report);
}

void CompoundPacket::CompoundPacketDeleter::operator()(CompoundPacket* packet) const
{
if (packet)
{
CompoundPacket::AllocatorTraits::destroy(CompoundPacket::Allocator::Pool, packet);
CompoundPacket::Allocator::Pool.deallocate(packet, 1);
}
}
} // namespace RTCP
} // namespace RTC
71 changes: 24 additions & 47 deletions worker/src/RTC/RtpPacket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@

namespace RTC
{
thread_local static Utils::ObjectPool<RtpPacket> RtpPacketPool;
// Memory to hold the cloned packet (with extra space for RTX encoding).
thread_local static Utils::ObjectPool<RtpPacket::RtpPacketBuffer> RtpPacketBufferPool;

/* Class methods. */

RtpPacket::SharedPtr RtpPacket::Parse(const uint8_t* data, size_t len)
Expand Down Expand Up @@ -121,21 +117,8 @@ namespace RTC
payloadLength + size_t{ payloadPadding },
"packet's computed size does not match received size");

auto* packet = RtpPacketPool.Allocate();
new (packet) RtpPacket(header, headerExtension, payload, payloadLength, payloadPadding, len);

SharedPtr shared(
packet,
/*Deleter*/
[](RtpPacket* packet)
{
// Call destructor manually since memory was pre-allocated upfront.
packet->~RtpPacket();
// Return packet into object pool for future reuse of memory allocation.
RtpPacketPool.Return(packet);
});

return shared;
return std::allocate_shared<RtpPacket, Utils::ObjectPoolAllocator<RtpPacket>>(
RtpPacket::Allocator::Pool, header, headerExtension, payload, payloadLength, payloadPadding, len);
}

/* Instance methods. */
Expand Down Expand Up @@ -166,7 +149,7 @@ namespace RTC
if (this->buffer)
{
this->buffer->~array();
RtpPacketBufferPool.Return(this->buffer);
RtpPacket::BufferAllocator::Pool.deallocate(this->buffer, 1);
this->buffer = nullptr;
}
}
Expand Down Expand Up @@ -657,8 +640,8 @@ namespace RTC
{
MS_TRACE();

auto* buffer = RtpPacketBufferPool.Allocate();
new (buffer) RtpPacketBuffer();
auto* buffer = RtpPacket::BufferAllocator::Pool.allocate(1);
RtpPacket::BufferAllocatorTraits::construct(RtpPacket::BufferAllocator::Pool, buffer);

auto* ptr = const_cast<uint8_t*>(buffer->data());
size_t numBytes{ 0 };
Expand Down Expand Up @@ -714,35 +697,29 @@ namespace RTC
}

// Create the new RtpPacket instance and return it.
auto* packet = RtpPacketPool.Allocate();
new (packet) RtpPacket(
newHeader, newHeaderExtension, newPayload, this->payloadLength, this->payloadPadding, this->size);

SharedPtr shared(
packet,
/*Deleter*/
[](RtpPacket* packet)
{
// Call destructor manually since memory was pre-allocated upfront.
packet->~RtpPacket();
// Return packet into object pool for future reuse of memory allocation.
RtpPacketPool.Return(packet);
});
SharedPtr shared = std::allocate_shared<RtpPacket, RtpPacket::Allocator>(
RtpPacket::Allocator::Pool,
newHeader,
newHeaderExtension,
newPayload,
this->payloadLength,
this->payloadPadding,
this->size);

// Keep already set extension ids.
packet->midExtensionId = this->midExtensionId;
packet->ridExtensionId = this->ridExtensionId;
packet->rridExtensionId = this->rridExtensionId;
packet->absSendTimeExtensionId = this->absSendTimeExtensionId;
packet->transportWideCc01ExtensionId = this->transportWideCc01ExtensionId;
packet->frameMarking07ExtensionId = this->frameMarking07ExtensionId; // Remove once RFC.
packet->frameMarkingExtensionId = this->frameMarkingExtensionId;
packet->ssrcAudioLevelExtensionId = this->ssrcAudioLevelExtensionId;
packet->videoOrientationExtensionId = this->videoOrientationExtensionId;
shared->midExtensionId = this->midExtensionId;
shared->ridExtensionId = this->ridExtensionId;
shared->rridExtensionId = this->rridExtensionId;
shared->absSendTimeExtensionId = this->absSendTimeExtensionId;
shared->transportWideCc01ExtensionId = this->transportWideCc01ExtensionId;
shared->frameMarking07ExtensionId = this->frameMarking07ExtensionId; // Remove once RFC.
shared->frameMarkingExtensionId = this->frameMarkingExtensionId;
shared->ssrcAudioLevelExtensionId = this->ssrcAudioLevelExtensionId;
shared->videoOrientationExtensionId = this->videoOrientationExtensionId;
// Clone payload descriptor handler.
packet->payloadDescriptorHandler = this->payloadDescriptorHandler;
shared->payloadDescriptorHandler = this->payloadDescriptorHandler;
// Store allocated buffer.
packet->buffer = buffer;
shared->buffer = buffer;

return shared;
}
Expand Down
Loading

0 comments on commit 740f7f2

Please sign in to comment.