Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix usrsctp usage in Rust #1353

Draft
wants to merge 13 commits into
base: v3
Choose a base branch
from
3 changes: 2 additions & 1 deletion node/src/test/test-Consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@
}
});

test('transport.consume() succeeds', async () => {
console.log('REMOVE test.only');

Check failure on line 255 in node/src/test/test-Consumer.ts

View workflow job for this annotation

GitHub Actions / ci (macos-14, 20)

Unexpected console statement
test.only('transport.consume() succeeds', async () => {
const onObserverNewConsumer1 = jest.fn();

ctx.webRtcTransport2!.observer.once('newconsumer', onObserverNewConsumer1);
Expand Down
2 changes: 0 additions & 2 deletions node/src/test/test-node-sctp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,6 @@ test('ordered DataProducer delivers all SCTP messages to the DataConsumer', asyn
`ppid in message with id ${id} should be ${sctp.PPID.WEBRTC_BINARY} but it is ${ppid}`
)
);

return;
}
});
});
Expand Down
5 changes: 4 additions & 1 deletion worker/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,11 @@ fn main() {
.env("PYTHONPATH", &pythonpath)
.env("MEDIASOUP_OUT_DIR", &mediasoup_out_dir)
.env("MEDIASOUP_BUILDTYPE", build_type)
// Force forward slashes on Windows too, otherwise Meson thinks path is not absolute 🤷
// Force forward slashes on Windows too, otherwise Meson thinks path is
// not absolute 🤷.
.env("MEDIASOUP_INSTALL_DIR", &out_dir.replace('\\', "/"))
// In Rust we want to enable worker multi-thread usage.
.env("MEDIASOUP_ENABLE_MULTITHREAD", "true")
.spawn()
.expect("Failed to start")
.wait()
Expand Down
50 changes: 49 additions & 1 deletion worker/include/DepUsrSCTP.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,50 @@
#include "common.hpp"
#include "RTC/SctpAssociation.hpp"
#include "handles/TimerHandle.hpp"
#include <uv.h>
#include <absl/container/flat_hash_map.h>
#include <vector>

class DepUsrSCTP
{
private:
/* Struct for storing a pending SCTP message to be sent. */
struct SendSctpDataItem
{
// NOTE: We keep this struct simple, without explicit allocation or
// deallocation of members in constructor/destructor, and instead rely on
// the destructor of the main container SendSctpDataStore.

uint8_t* data{ nullptr };
size_t len{ 0u };
};

public:
/* Struct for storing pending datas to be sent. */
struct SendSctpDataStore
{
explicit SendSctpDataStore(RTC::SctpAssociation* sctpAssociation)
: sctpAssociation(sctpAssociation)
{
}
~SendSctpDataStore()
{
ClearItems();
}

void ClearItems()
{
for (auto& item : this->items)
{
delete[] item.data;
}
this->items.clear();
}

RTC::SctpAssociation* sctpAssociation{ nullptr };
std::vector<SendSctpDataItem> items;
};

private:
class Checker : public TimerHandle::Listener
{
Expand All @@ -33,16 +73,24 @@ class DepUsrSCTP
static void ClassDestroy();
static void CreateChecker();
static void CloseChecker();
static bool HasChecker()
{
return DepUsrSCTP::checker != nullptr;
}
static uintptr_t GetNextSctpAssociationId();
static void RegisterSctpAssociation(RTC::SctpAssociation* sctpAssociation);
static void DeregisterSctpAssociation(RTC::SctpAssociation* sctpAssociation);
static RTC::SctpAssociation* RetrieveSctpAssociation(uintptr_t id);
static void SendSctpData(RTC::SctpAssociation* sctpAssociation, uint8_t* data, size_t len);
static SendSctpDataStore* GetSendSctpDataStore(uv_async_t* handle);

private:
thread_local static Checker* checker;
static Checker* checker;
static uint64_t numSctpAssociations;
static uintptr_t nextSctpAssociationId;
static absl::flat_hash_map<uintptr_t, RTC::SctpAssociation*> mapIdSctpAssociation;
// Map of SendSctpDataStores indexed by uv_async_t*.
static absl::flat_hash_map<const uv_async_t*, SendSctpDataStore> mapAsyncHandlerSendSctpData;
};

#endif
11 changes: 9 additions & 2 deletions worker/include/RTC/SctpAssociation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "RTC/DataConsumer.hpp"
#include "RTC/DataProducer.hpp"
#include <usrsctp.h>
#include <uv.h>

namespace RTC
{
Expand Down Expand Up @@ -80,7 +81,11 @@ namespace RTC
public:
flatbuffers::Offset<FBS::SctpParameters::SctpParameters> FillBuffer(
flatbuffers::FlatBufferBuilder& builder) const;
void TransportConnected();
uv_async_t* GetAsyncHandle() const
{
return this->uvAsyncHandle;
}
void InitializeSyncHandle(uv_async_cb callback);
SctpState GetState() const
{
return this->state;
Expand All @@ -89,6 +94,7 @@ namespace RTC
{
return this->sctpBufferedAmount;
}
void TransportConnected();
void ProcessSctpData(const uint8_t* data, size_t len) const;
void SendSctpMessage(
RTC::DataConsumer* dataConsumer,
Expand All @@ -106,7 +112,7 @@ namespace RTC

/* Callbacks fired by usrsctp events. */
public:
void OnUsrSctpSendSctpData(void* buffer, size_t len);
void OnUsrSctpSendSctpData(uint8_t* data, size_t len);
void OnUsrSctpReceiveSctpData(
uint16_t streamId, uint16_t ssn, uint32_t ppid, int flags, const uint8_t* data, size_t len);
void OnUsrSctpReceiveSctpNotification(union sctp_notification* notification, size_t len);
Expand All @@ -125,6 +131,7 @@ namespace RTC
size_t sctpBufferedAmount{ 0u };
bool isDataChannel{ false };
// Allocated by this.
uv_async_t* uvAsyncHandle{ nullptr };
uint8_t* messageBuffer{ nullptr };
// Others.
SctpState state{ SctpState::NEW };
Expand Down
30 changes: 30 additions & 0 deletions worker/include/SCTP/UsrSctpChecker.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef MS_SCTP_USRSCTP_CHECKER_HPP
#define MS_SCTP_USRSCTP_CHECKER_HPP

#include "common.hpp"
#include "handles/TimerHandle.hpp"
#include <uv.h>

namespace SCTP
{
class UsrSctpChecker : public TimerHandle::Listener
{
public:
UsrSctpChecker();
~UsrSctpChecker() override;

public:
void Start();
void Stop();

/* Pure virtual methods inherited from TimerHandle::Listener. */
public:
void OnTimer(TimerHandle* timer) override;

private:
TimerHandle* timer{ nullptr };
uint64_t lastCalledAtMs{ 0u };
};
} // namespace SCTP

#endif
8 changes: 7 additions & 1 deletion worker/include/Worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
#include "RTC/Shared.hpp"
#include "RTC/WebRtcServer.hpp"
#include "handles/SignalHandle.hpp"
#include "handles/TimerHandle.hpp"
#include <flatbuffers/flatbuffer_builder.h>
#include <absl/container/flat_hash_map.h>
#include <string>

class Worker : public Channel::ChannelSocket::Listener,
public SignalHandle::Listener,
public RTC::Router::Listener
public RTC::Router::Listener,
public TimerHandle::Listener
{
public:
explicit Worker(Channel::ChannelSocket* channel);
Expand Down Expand Up @@ -49,6 +51,10 @@ class Worker : public Channel::ChannelSocket::Listener,
public:
RTC::WebRtcServer* OnRouterNeedWebRtcServer(RTC::Router* router, std::string& webRtcServerId) override;

/* Pure virtual methods inherited from TimerHandle::Listener. */
public:
void OnTimer(TimerHandle* timer) override;

private:
// Passed by argument.
Channel::ChannelSocket* channel{ nullptr };
Expand Down
3 changes: 2 additions & 1 deletion worker/include/handles/UdpSocketHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class UdpSocketHandle
{
}

// Disable copy constructor because of the dynamically allocated data (store).
// Disable copy constructor because of the dynamically allocated data
// (store).
UvSendData(const UvSendData&) = delete;

~UvSendData()
Expand Down
7 changes: 7 additions & 0 deletions worker/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ common_sources = [
'src/RTC/RTCP/XR.cpp',
'src/RTC/RTCP/XrDelaySinceLastRr.cpp',
'src/RTC/RTCP/XrReceiverReferenceTime.cpp',
'src/SCTP/UsrSctpChecker.cpp',
]

openssl_proj = subproject(
Expand Down Expand Up @@ -298,6 +299,12 @@ if host_machine.system() == 'linux' and not get_option('ms_disable_liburing')
endif
endif

if get_option('ms_enable_multithread')
cpp_args += [
'-DMS_MULTITHREAD_ENABLED',
]
endif

libmediasoup_worker = library(
'libmediasoup-worker',
name_prefix: '',
Expand Down
1 change: 1 addition & 0 deletions worker/meson_options.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ option('ms_log_trace', type : 'boolean', value : false, description : 'When set
option('ms_log_file_line', type : 'boolean', value : false, description : 'When set to true, all the logging macros print more verbose information, including current file and line')
option('ms_rtc_logger_rtp', type : 'boolean', value : false, description : 'When set to true, prints a line with information for each RTP packet')
option('ms_disable_liburing', type : 'boolean', value : false, description : 'When set to true, disables liburing integration despite current host supports it')
option('ms_enable_multithread', type : 'boolean', value : false, description : 'When set to true, mediasoup worker is built assuming multi-thread usage')
8 changes: 8 additions & 0 deletions worker/src/DepLibUV.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include "DepLibUV.hpp"
#include "Logger.hpp"

// TODO: REMOVE
#include <fstream>

/* Static variables. */

thread_local uv_loop_t* DepLibUV::loop{ nullptr };
Expand All @@ -17,6 +20,11 @@ inline static void onCloseLoop(uv_handle_t* handle)

inline static void onWalk(uv_handle_t* handle, void* /*arg*/)
{
std::ofstream outfile;
outfile.open("/tmp/ms_log.txt", std::ios_base::app);
outfile << "---- onWalk\n";
outfile.flush();

// Must use MS_ERROR_STD since at this point the Channel is already closed.
MS_ERROR_STD(
"alive UV handle found (this shouldn't happen) [type:%s, active:%d, closing:%d, has_ref:%d]",
Expand Down
Loading
Loading