Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions internal/test/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type RTPWithError struct {
// RTCPWithError is used to send a batch of rtcp packets or an error on a channel.
type RTCPWithError struct {
Packets []rtcp.Packet
Attr interceptor.Attributes
Err error
}

Expand Down Expand Up @@ -121,23 +122,23 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc
go func() {
buf := make([]byte, 1500)
for {
i, _, err := mockStream.rtcpReader.Read(buf, interceptor.Attributes{})
i, attr, err := mockStream.rtcpReader.Read(buf, interceptor.Attributes{})
if err != nil {
if !errors.Is(err, io.EOF) {
mockStream.rtcpInModified <- RTCPWithError{Err: err}
mockStream.rtcpInModified <- RTCPWithError{Attr: attr, Err: err}
}

return
}

pkts, err := rtcp.Unmarshal(buf[:i])
if err != nil {
mockStream.rtcpInModified <- RTCPWithError{Err: err}
mockStream.rtcpInModified <- RTCPWithError{Attr: attr, Err: err}

return
}

mockStream.rtcpInModified <- RTCPWithError{Packets: pkts}
mockStream.rtcpInModified <- RTCPWithError{Attr: attr, Packets: pkts}
}
}()
go func() {
Expand Down
68 changes: 68 additions & 0 deletions pkg/rtpfb/ccfb_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package rtpfb

import (
"time"

"github.com/pion/interceptor/internal/ntp"
"github.com/pion/rtcp"
)

type acknowledgement struct {
seqNr uint16
arrived bool
arrival time.Time
ecn rtcp.ECN
}

func convertCCFB(ts time.Time, feedback *rtcp.CCFeedbackReport) (time.Time, map[uint32][]acknowledgement) {
if feedback == nil {
return time.Time{}, nil
}
result := map[uint32][]acknowledgement{}
referenceTime := ntp.ToTime32(feedback.ReportTimestamp, ts)
for _, rb := range feedback.ReportBlocks {
result[rb.MediaSSRC] = convertMetricBlock(referenceTime, rb.BeginSequence, rb.MetricBlocks)
}

return referenceTime, result
}

func convertMetricBlock(
reference time.Time,
seqNrOffset uint16,
blocks []rtcp.CCFeedbackMetricBlock,
) []acknowledgement {
reports := make([]acknowledgement, len(blocks))
for i, mb := range blocks {
if mb.Received {
arrival := time.Time{}

// RFC 8888 states: If the measurement is unavailable or if the
// arrival time of the RTP packet is after the time represented by
// the RTS field, then an ATO value of 0x1FFF MUST be reported for
// the packet. In that case, we set a zero time.Time value.
if mb.ArrivalTimeOffset != 0x1FFF {
delta := time.Duration((float64(mb.ArrivalTimeOffset) / 1024.0) * float64(time.Second))
arrival = reference.Add(-delta)
}
reports[i] = acknowledgement{
seqNr: seqNrOffset + uint16(i), // nolint:gosec
arrived: true,
arrival: arrival,
ecn: mb.ECN,
}
} else {
reports[i] = acknowledgement{
seqNr: seqNrOffset + uint16(i), // nolint:gosec
arrived: false,
arrival: time.Time{},
ecn: 0,
}
}
}

return reports
}
196 changes: 196 additions & 0 deletions pkg/rtpfb/ccfb_receiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package rtpfb

import (
"fmt"
"testing"
"time"

"github.com/pion/interceptor/internal/ntp"
"github.com/pion/rtcp"
"github.com/stretchr/testify/assert"
)

func TestConvertCCFB(t *testing.T) {
timeZero := time.Now()
cases := []struct {
ts time.Time
feedback *rtcp.CCFeedbackReport
expect map[uint32][]acknowledgement
expectTS time.Time
}{
{},
{
ts: timeZero.Add(2 * time.Second),
feedback: &rtcp.CCFeedbackReport{
SenderSSRC: 1,
ReportBlocks: []rtcp.CCFeedbackReportBlock{
{
MediaSSRC: 2,
BeginSequence: 17,
MetricBlocks: []rtcp.CCFeedbackMetricBlock{
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 512,
},
},
},
},
ReportTimestamp: ntp.ToNTP32(timeZero.Add(time.Second)),
},
expect: map[uint32][]acknowledgement{
2: {
{
seqNr: 17,
arrived: true,
arrival: timeZero.Add(500 * time.Millisecond),
ecn: 0,
},
},
},
expectTS: timeZero.Add(time.Second),
},
}
for i, tc := range cases {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
resTS, res := convertCCFB(tc.ts, tc.feedback)

assert.InDelta(t, tc.expectTS.UnixNano(), resTS.UnixNano(), float64(time.Millisecond.Nanoseconds()))

// Can't directly check equality since arrival timestamp conversions
// may be slightly off due to ntp conversions.
assert.Equal(t, len(tc.expect), len(res))
for i, acks := range tc.expect {
for j, ack := range acks {
assert.Equal(t, ack.seqNr, res[i][j].seqNr)
assert.Equal(t, ack.arrived, res[i][j].arrived)
assert.Equal(t, ack.ecn, res[i][j].ecn)
assert.InDelta(t, ack.arrival.UnixNano(), res[i][j].arrival.UnixNano(), float64(time.Millisecond.Nanoseconds()))
}
}
})
}
}

func TestConvertMetricBlock(t *testing.T) {
cases := []struct {
ts time.Time
reference time.Time
seqNrOffset uint16
blocks []rtcp.CCFeedbackMetricBlock
expected []acknowledgement
}{
{
ts: time.Time{},
reference: time.Time{},
seqNrOffset: 0,
blocks: []rtcp.CCFeedbackMetricBlock{},
expected: []acknowledgement{},
},
{
ts: time.Time{}.Add(2 * time.Second),
reference: time.Time{}.Add(time.Second),
seqNrOffset: 3,
blocks: []rtcp.CCFeedbackMetricBlock{
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 512,
},
{
Received: false,
ECN: 0,
ArrivalTimeOffset: 0,
},
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 0,
},
},
expected: []acknowledgement{
{
seqNr: 3,
arrived: true,
arrival: time.Time{}.Add(500 * time.Millisecond),
ecn: 0,
},
{
seqNr: 4,
arrived: false,
arrival: time.Time{},
ecn: 0,
},
{
seqNr: 5,
arrived: true,
arrival: time.Time{}.Add(time.Second),
ecn: 0,
},
},
},
{
ts: time.Time{}.Add(2 * time.Second),
reference: time.Time{}.Add(time.Second),
seqNrOffset: 3,
blocks: []rtcp.CCFeedbackMetricBlock{
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 512,
},
{
Received: false,
ECN: 0,
ArrivalTimeOffset: 0,
},
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 0,
},
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 0x1FFF,
},
},
expected: []acknowledgement{
{
seqNr: 3,
arrived: true,
arrival: time.Time{}.Add(500 * time.Millisecond),
ecn: 0,
},
{
seqNr: 4,
arrived: false,
arrival: time.Time{},
ecn: 0,
},
{
seqNr: 5,
arrived: true,
arrival: time.Time{}.Add(time.Second),
ecn: 0,
},
{
seqNr: 6,
arrived: true,
arrival: time.Time{},
ecn: 0,
},
},
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
res := convertMetricBlock(tc.reference, tc.seqNrOffset, tc.blocks)
assert.Equal(t, tc.expected, res)
})
}
}
Loading
Loading