diff --git a/CHANGELOG.md b/CHANGELOG.md index 7dd79e1e46..c199aa914b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ### NEXT +- `Consumer`: Fix sequence number gap ([PR #1494](https://github.com/versatica/mediasoup/pull/1494)). + ### 3.15.4 - `Worker`: Drop VP8 packets with a higher temporal layer than the current one ([PR #1009](https://github.com/versatica/mediasoup/pull/1009)). diff --git a/worker/include/Channel/ChannelSocket.hpp b/worker/include/Channel/ChannelSocket.hpp index 83935b8b81..85c74bb87e 100644 --- a/worker/include/Channel/ChannelSocket.hpp +++ b/worker/include/Channel/ChannelSocket.hpp @@ -82,6 +82,9 @@ namespace Channel }; public: +#ifdef MS_TEST + explicit ChannelSocket(); +#endif explicit ChannelSocket(int consumerFd, int producerFd); explicit ChannelSocket( ChannelReadFn channelReadFn, diff --git a/worker/meson.build b/worker/meson.build index 8de14990a4..35ba26b7a1 100644 --- a/worker/meson.build +++ b/worker/meson.build @@ -333,6 +333,7 @@ test_sources = [ 'test/src/RTC/TestRtpStreamSend.cpp', 'test/src/RTC/TestRtpStreamRecv.cpp', 'test/src/RTC/TestSeqManager.cpp', + 'test/src/RTC/TestSimpleConsumer.cpp', 'test/src/RTC/TestTrendCalculator.cpp', 'test/src/RTC/TestRtpEncodingParameters.cpp', 'test/src/RTC/TestTransportCongestionControlServer.cpp', diff --git a/worker/src/Channel/ChannelSocket.cpp b/worker/src/Channel/ChannelSocket.cpp index 144712501b..24810996a6 100644 --- a/worker/src/Channel/ChannelSocket.cpp +++ b/worker/src/Channel/ChannelSocket.cpp @@ -30,6 +30,13 @@ namespace Channel /* Instance methods. */ +#ifdef MS_TEST + ChannelSocket::ChannelSocket() + { + MS_TRACE_STD(); + } +#endif + ChannelSocket::ChannelSocket(int consumerFd, int producerFd) : consumerSocket(new ConsumerSocket(consumerFd, MessageMaxLen, this)), producerSocket(new ProducerSocket(producerFd, MessageMaxLen)) @@ -256,7 +263,7 @@ namespace Channel { this->channelWriteFn(payload, payloadLen, this->channelWriteCtx); } - else + else if (this->producerSocket) { this->producerSocket->Write(payload, payloadLen); } diff --git a/worker/src/RTC/SimpleConsumer.cpp b/worker/src/RTC/SimpleConsumer.cpp index 65deccbe2d..0c2c9ba176 100644 --- a/worker/src/RTC/SimpleConsumer.cpp +++ b/worker/src/RTC/SimpleConsumer.cpp @@ -320,6 +320,8 @@ namespace RTC packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::CONSUMER_INACTIVE); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } @@ -335,6 +337,8 @@ namespace RTC packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::UNSUPPORTED_PAYLOAD_TYPE); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } @@ -355,6 +359,8 @@ namespace RTC packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::DROPPED_BY_CODEC); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } @@ -366,18 +372,20 @@ namespace RTC packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::NOT_A_KEYFRAME); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } // Packets with only padding are not forwarded. if (packet->GetPayloadLength() == 0) { - this->rtpSeqManager->Drop(packet->GetSequenceNumber()); - #ifdef MS_RTC_LOGGER_RTP packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::EMPTY_PAYLOAD); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } diff --git a/worker/src/RTC/SimulcastConsumer.cpp b/worker/src/RTC/SimulcastConsumer.cpp index 5ace36df3d..3fbc49737a 100644 --- a/worker/src/RTC/SimulcastConsumer.cpp +++ b/worker/src/RTC/SimulcastConsumer.cpp @@ -736,6 +736,8 @@ namespace RTC packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::CONSUMER_INACTIVE); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } @@ -745,6 +747,8 @@ namespace RTC packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::INVALID_TARGET_LAYER); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } @@ -777,6 +781,8 @@ namespace RTC packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::NOT_A_KEYFRAME); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } @@ -804,6 +810,7 @@ namespace RTC packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::NOT_A_KEYFRAME); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); return; } @@ -811,12 +818,12 @@ namespace RTC // not have payload other than padding, then drop it. if (spatialLayer == this->currentSpatialLayer && packet->GetPayloadLength() == 0) { - this->rtpSeqManager->Drop(packet->GetSequenceNumber()); - #ifdef MS_RTC_LOGGER_RTP packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::EMPTY_PAYLOAD); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } @@ -935,6 +942,8 @@ namespace RTC packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::TOO_HIGH_TIMESTAMP_EXTRA_NEEDED); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } @@ -980,6 +989,8 @@ namespace RTC RtcLogger::RtpPacket::DropReason::PACKET_PREVIOUS_TO_SPATIAL_LAYER_SWITCH); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } else if (SeqManager::IsSeqHigherThan( @@ -1022,12 +1033,12 @@ namespace RTC // Rewrite payload if needed. Drop packet if necessary. if (!packet->ProcessPayload(this->encodingContext.get(), marker)) { - this->rtpSeqManager->Drop(packet->GetSequenceNumber()); - #ifdef MS_RTC_LOGGER_RTP packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::DROPPED_BY_CODEC); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } diff --git a/worker/src/RTC/SvcConsumer.cpp b/worker/src/RTC/SvcConsumer.cpp index d5fb24d750..8635ac5d90 100644 --- a/worker/src/RTC/SvcConsumer.cpp +++ b/worker/src/RTC/SvcConsumer.cpp @@ -645,6 +645,8 @@ namespace RTC packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::CONSUMER_INACTIVE); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } @@ -659,6 +661,8 @@ namespace RTC packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::INVALID_TARGET_LAYER); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } @@ -674,6 +678,8 @@ namespace RTC packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::UNSUPPORTED_PAYLOAD_TYPE); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } @@ -684,18 +690,20 @@ namespace RTC packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::NOT_A_KEYFRAME); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } // Packets with only padding are not forwarded. if (packet->GetPayloadLength() == 0) { - this->rtpSeqManager->Drop(packet->GetSequenceNumber()); - #ifdef MS_RTC_LOGGER_RTP packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::EMPTY_PAYLOAD); #endif + this->rtpSeqManager->Drop(packet->GetSequenceNumber()); + return; } diff --git a/worker/test/src/RTC/TestSimpleConsumer.cpp b/worker/test/src/RTC/TestSimpleConsumer.cpp new file mode 100644 index 0000000000..4db23aa129 --- /dev/null +++ b/worker/test/src/RTC/TestSimpleConsumer.cpp @@ -0,0 +1,281 @@ +#include "flatbuffers/buffer.h" +#include "Channel/ChannelNotifier.hpp" +#include "Channel/ChannelSocket.hpp" +#include "FBS/rtpParameters.h" +#include "FBS/transport.h" +#include "RTC/RtpDictionaries.hpp" +#include "RTC/Shared.hpp" +#include "RTC/SimpleConsumer.hpp" +#include + +using namespace RTC; + +const uint8_t PayloadType = 111; +auto* channelMessageRegistrator = new ChannelMessageRegistrator(); +auto* channelSocket = new Channel::ChannelSocket(); +auto* channelNotifier = new Channel::ChannelNotifier(channelSocket); +auto shared = Shared(channelMessageRegistrator, channelNotifier); + +class RtpStreamRecvListener : public RtpStreamRecv::Listener +{ +public: + void OnRtpStreamScore(RtpStream* /*rtpStream*/, uint8_t /*score*/, uint8_t /*previousScore*/) override + { + } + + void OnRtpStreamSendRtcpPacket(RtpStreamRecv* rtpStream, RTCP::Packet* packet) override + { + } + + void OnRtpStreamNeedWorstRemoteFractionLost( + RTC::RtpStreamRecv* /*rtpStream*/, uint8_t& /*worstRemoteFractionLost*/) override + { + } +}; + +class ConsumerListener : public Consumer::Listener +{ + void OnConsumerSendRtpPacket(RTC::Consumer* /*consumer*/, RTC::RtpPacket* packet) final + { + this->sent.push_back(packet->GetSequenceNumber()); + }; + void OnConsumerRetransmitRtpPacket(RTC::Consumer* consumer, RTC::RtpPacket* packet) final + { + } + void OnConsumerKeyFrameRequested(RTC::Consumer* consumer, uint32_t mappedSsrc) final{}; + void OnConsumerNeedBitrateChange(RTC::Consumer* consumer) final{}; + void OnConsumerNeedZeroBitrate(RTC::Consumer* consumer) final{}; + void OnConsumerProducerClosed(RTC::Consumer* consumer) final{}; + +public: + // Verifies that the given number of packets have been sent, + // and that the sequence numbers are consecutive. + void Verify(size_t size) + { + REQUIRE(this->sent.size() == size); + + if (this->sent.size() <= 1) + { + return; + } + + auto currentSeq = this->sent[0]; + + for (auto it = std::next(this->sent.begin()); it != this->sent.end(); ++it) + { + REQUIRE(*it == currentSeq + 1); + currentSeq = *it; + } + } + +private: + std::vector sent; +}; + +flatbuffers::Offset<::flatbuffers::Vector<::flatbuffers::Offset>> CreateRtpEncodingParameters( + flatbuffers::FlatBufferBuilder& builder) +{ + std::vector> encodings; + + auto encoding = RTC::RtpEncodingParameters(); + + encoding.ssrc = 1234567890; + + encodings.emplace_back(encoding.FillBuffer(builder)); + + return builder.CreateVector(encodings); +}; + +flatbuffers::Offset CreateRtpParameters( + flatbuffers::FlatBufferBuilder& builder) +{ + auto rtpParameters = RTC::RtpParameters(); + auto codec = RTC::RtpCodecParameters(); + auto encoding = RTC::RtpEncodingParameters(); + + codec.mimeType.SetMimeType("audio/opus"); + codec.payloadType = PayloadType; + + encoding.ssrc = 1234567890; + + rtpParameters.mid = "mid"; + rtpParameters.codecs.emplace_back(codec); + rtpParameters.encodings.emplace_back(encoding); + rtpParameters.headerExtensions = std::vector(); + + return rtpParameters.FillBuffer(builder); +}; + +std::unique_ptr CreateConsumer(ConsumerListener* listener) +{ + flatbuffers::FlatBufferBuilder bufferBuilder; + + auto consumerId = bufferBuilder.CreateString("consumerId"); + auto producerId = bufferBuilder.CreateString("producerId"); + auto rtpParameters = CreateRtpParameters(bufferBuilder); + auto consumableEncodings = CreateRtpEncodingParameters(bufferBuilder); + + auto consumeRequestBuilder = FBS::Transport::ConsumeRequestBuilder(bufferBuilder); + + consumeRequestBuilder.add_consumerId(consumerId); + consumeRequestBuilder.add_producerId(producerId); + consumeRequestBuilder.add_kind(FBS::RtpParameters::MediaKind::AUDIO); + consumeRequestBuilder.add_rtpParameters(rtpParameters); + consumeRequestBuilder.add_type(FBS::RtpParameters::Type::SIMPLE); + consumeRequestBuilder.add_consumableRtpEncodings(consumableEncodings); + consumeRequestBuilder.add_paused(false); + consumeRequestBuilder.add_preferredLayers(0); + consumeRequestBuilder.add_ignoreDtx(false); + + auto offset = consumeRequestBuilder.Finish(); + bufferBuilder.Finish(offset); + + auto* buf = bufferBuilder.GetBufferPointer(); + + const auto* consumeRequest = flatbuffers::GetRoot(buf); + + return std::make_unique( + &shared, + consumeRequest->consumerId()->str(), + consumeRequest->producerId()->str(), + listener, + consumeRequest); +} + +std::unique_ptr CreateRtpStreamRecv() +{ + RtpStreamRecvListener streamRecvListener; + RtpStream::Params params; + + return std::make_unique(&streamRecvListener, params, 0u, false); +} + +SCENARIO("SimpleConsumer", "[rtp][consumer]") +{ + // clang-format off + uint8_t buffer[] = + { + 0x80, 0x01, 0x00, 0x08, + 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x05 + }; + // clang-format on + + SECTION("RTP packets are not forwarded when the consumer is not active") + { + auto listener = std::make_unique(); + auto consumer = CreateConsumer(listener.get()); + auto rtpStream = CreateRtpStreamRecv(); + + // Set producer scores and producer stream. + std::vector scores{ 10 }; + + consumer->ProducerRtpStreamScores(&scores); + consumer->ProducerNewRtpStream(rtpStream.get(), 1234); + + std::unique_ptr packet{ RtpPacket::Parse(buffer, sizeof(buffer)) }; + std::shared_ptr sharedPacket; + + packet->SetPayloadType(PayloadType); + packet->SetPayloadLength(64); + + consumer->SendRtpPacket(packet.get(), sharedPacket); + + listener->Verify(0); + } + + SECTION("RTP packets are not forwarded for unsupported payload types") + { + auto listener = std::make_unique(); + auto consumer = CreateConsumer(listener.get()); + + // Indicate that the transport is connected in order to activate the consumer. + dynamic_cast(consumer.get())->TransportConnected(); + + auto rtpStream = CreateRtpStreamRecv(); + + // Set producer scores and producer stream. + std::vector scores{ 10 }; + + consumer->ProducerRtpStreamScores(&scores); + consumer->ProducerNewRtpStream(rtpStream.get(), 1234); + + std::unique_ptr packet{ RtpPacket::Parse(buffer, sizeof(buffer)) }; + std::shared_ptr sharedPacket; + + packet->SetPayloadType(PayloadType + 1); + packet->SetPayloadLength(64); + + consumer->SendRtpPacket(packet.get(), sharedPacket); + + listener->Verify(0); + } + + SECTION("RTP packets with empty payload are not forwarded") + { + auto listener = std::make_unique(); + auto consumer = CreateConsumer(listener.get()); + + // Indicate that the transport is connected in order to activate the consumer. + dynamic_cast(consumer.get())->TransportConnected(); + + auto rtpStream = CreateRtpStreamRecv(); + + // Set producer scores and producer stream. + std::vector scores{ 10 }; + + consumer->ProducerRtpStreamScores(&scores); + consumer->ProducerNewRtpStream(rtpStream.get(), 1234); + + std::unique_ptr packet{ RtpPacket::Parse(buffer, sizeof(buffer)) }; + std::shared_ptr sharedPacket; + + packet->SetPayloadType(PayloadType + 1); + packet->SetPayloadLength(0); + + consumer->SendRtpPacket(packet.get(), sharedPacket); + + listener->Verify(0); + } + + SECTION("outgoing RTP packets are forwarded with increased sequence number") + { + auto listener = std::make_unique(); + auto consumer = CreateConsumer(listener.get()); + + // Indicate that the transport is connected in order to activate the consumer. + dynamic_cast(consumer.get())->TransportConnected(); + + auto rtpStream = CreateRtpStreamRecv(); + + // Set producer scores and producer stream. + std::vector scores{ 10 }; + + consumer->ProducerRtpStreamScores(&scores); + consumer->ProducerNewRtpStream(rtpStream.get(), 1234); + + std::unique_ptr packet{ RtpPacket::Parse(buffer, sizeof(buffer)) }; + std::shared_ptr sharedPacket; + + uint16_t seq = 1; + + packet->SetSequenceNumber(seq++); + packet->SetPayloadType(PayloadType); + packet->SetPayloadLength(64); + + consumer->SendRtpPacket(packet.get(), sharedPacket); + + packet->SetSequenceNumber(seq++); + consumer->SendRtpPacket(packet.get(), sharedPacket); + + packet->SetSequenceNumber(seq++); + packet->SetPayloadLength(0); + consumer->SendRtpPacket(packet.get(), sharedPacket); + + packet->SetSequenceNumber(seq++); + packet->SetPayloadLength(20); + consumer->SendRtpPacket(packet.get(), sharedPacket); + + listener->Verify(3); + } +}