Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions dtlstransport.go
Original file line number Diff line number Diff line change
@@ -530,36 +530,42 @@
func (t *DTLSTransport) streamsForSSRC(
ssrc SSRC,
streamInfo interceptor.StreamInfo,
) (*srtp.ReadStreamSRTP, interceptor.RTPReader, *srtp.ReadStreamSRTCP, interceptor.RTCPReader, error) {
) (*srtp.ReadStreamSRTP, interceptor.RTPReader, interceptor.RTPProcessor, *srtp.ReadStreamSRTCP, interceptor.RTCPReader, error) {

Check failure on line 533 in dtlstransport.go

GitHub Actions / lint / Go

The line is 129 characters long, which exceeds the maximum of 120 characters. (lll)
srtpSession, err := t.getSRTPSession()
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}

rtpReadStream, err := srtpSession.OpenReadStream(uint32(ssrc))
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err

Check warning on line 541 in dtlstransport.go

Codecov / codecov/patch

dtlstransport.go#L541

Added line #L541 was not covered by tests
}

rtpInterceptor := t.api.interceptor.BindRemoteStream(
rtpProcessor := t.api.interceptor.BindRemoteStream(
&streamInfo,
interceptor.RTPReaderFunc(
func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
n, err = rtpReadStream.Read(in)

return n, a, err
interceptor.RTPProcessorFunc(
func(s int, in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {

Check failure on line 547 in dtlstransport.go

GitHub Actions / lint / Go

unused-parameter: parameter 'in' seems to be unused, consider removing or renaming it as _ (revive)
return s, a, nil
},
),
)

rtpReader := interceptor.RTPReaderFunc(
func(in []byte, a interceptor.Attributes) (n int, attributes interceptor.Attributes, err error) {
n, err = rtpReadStream.Read(in)

return n, a, err
},
)

srtcpSession, err := t.getSRTCPSession()
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err

Check warning on line 563 in dtlstransport.go

Codecov / codecov/patch

dtlstransport.go#L563

Added line #L563 was not covered by tests
}

rtcpReadStream, err := srtcpSession.OpenReadStream(uint32(ssrc))
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err

Check warning on line 568 in dtlstransport.go

Codecov / codecov/patch

dtlstransport.go#L568

Added line #L568 was not covered by tests
}

rtcpInterceptor := t.api.interceptor.BindRTCPReader(interceptor.RTCPReaderFunc(
@@ -570,5 +576,5 @@
}),
)

return rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, nil
return rtpReadStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor, nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -34,3 +34,5 @@ require (
golang.org/x/sys v0.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pion/interceptor v0.1.37 => github.com/arjunshajitech/interceptor v0.0.0-20250313131735-36472c6290cb
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
github.com/arjunshajitech/interceptor v0.0.0-20250312123050-c9d476a510a1 h1:5p3Tm/VZUdN8aqLJp1noK/fAqggXJBHSsWXQJbksmw0=
github.com/arjunshajitech/interceptor v0.0.0-20250312123050-c9d476a510a1/go.mod h1:tYRp/5W3dEUrbYzdB49i4WictfIG2eEOSoFCb+oJAHY=
github.com/arjunshajitech/interceptor v0.0.0-20250313131735-36472c6290cb h1:qu70eQhcmCvNkrzYeVTDXS1RGmt14Qu5vo+sQH+q16w=
github.com/arjunshajitech/interceptor v0.0.0-20250313131735-36472c6290cb/go.mod h1:tYRp/5W3dEUrbYzdB49i4WictfIG2eEOSoFCb+oJAHY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
26 changes: 22 additions & 4 deletions interceptor_test.go
Original file line number Diff line number Diff line change
@@ -51,15 +51,15 @@
},
)
},
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPProcessor) interceptor.RTPProcessor {
return interceptor.RTPProcessorFunc(func(i int, b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {

Check failure on line 55 in interceptor_test.go

GitHub Actions / lint / Go

The line is 128 characters long, which exceeds the maximum of 120 characters. (lll)
if a == nil {
a = interceptor.Attributes{}
}

a.Set("attribute", "value")

return reader.Read(b, a)
return reader.Process(i, b, a)
})
},
}, nil
@@ -146,7 +146,7 @@
UnbindLocalStreamFn: func(*interceptor.StreamInfo) {
atomic.AddUint32(&cntUnbindLocalStream, 1)
},
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPProcessor) interceptor.RTPProcessor {
atomic.AddUint32(&cntBindRemoteStream, 1)

return reader
@@ -413,6 +413,24 @@
close(done)
})

pcOfferConnected := make(chan struct{})
pcAnswerConnected := make(chan struct{})

pc1.OnConnectionStateChange(func(state PeerConnectionState) {
if state == PeerConnectionStateConnected {
close(pcOfferConnected)
}
})

pc2.OnConnectionStateChange(func(state PeerConnectionState) {
if state == PeerConnectionStateConnected {
close(pcAnswerConnected)
}
})

<-pcOfferConnected
<-pcAnswerConnected

go func() {
for i := 0; i < numPackets; i++ {
time.Sleep(20 * time.Millisecond)
9 changes: 5 additions & 4 deletions peerconnection.go
Original file line number Diff line number Diff line change
@@ -1732,7 +1732,7 @@
params.Codecs[0].RTPCodecCapability,
params.HeaderExtensions,
)
readStream, interceptor, rtcpReadStream, rtcpInterceptor, err := pc.dtlsTransport.streamsForSSRC(ssrc, *streamInfo)
readStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor, err := pc.dtlsTransport.streamsForSSRC(ssrc, *streamInfo)

Check failure on line 1735 in peerconnection.go

GitHub Actions / lint / Go

The line is 128 characters long, which exceeds the maximum of 120 characters. (lll)
if err != nil {
return err
}
@@ -1746,7 +1746,7 @@
readCount--
}

i, _, err := interceptor.Read(b, nil)
i, _, err := rtpReader.Read(b, nil)
if err != nil {
return err
}
@@ -1775,15 +1775,16 @@
receiver.mu.Lock()
defer receiver.mu.Unlock()

return receiver.receiveForRtx(SSRC(0), rsid, streamInfo, readStream, interceptor, rtcpReadStream, rtcpInterceptor)
return receiver.receiveForRtx(SSRC(0), rsid, streamInfo, readStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor)

Check failure on line 1778 in peerconnection.go

GitHub Actions / lint / Go

The line is 130 characters long, which exceeds the maximum of 120 characters. (lll)
}

track, err := receiver.receiveForRid(
rid,
params,
streamInfo,
readStream,
interceptor,
rtpReader,
rtpProcessor,
rtcpReadStream,
rtcpInterceptor,
)
10 changes: 8 additions & 2 deletions peerconnection_media_test.go
Original file line number Diff line number Diff line change
@@ -150,10 +150,11 @@

go func() {
for {
time.Sleep(time.Millisecond * 100)
if pcOffer.ICEConnectionState() != ICEConnectionStateConnected {
time.Sleep(time.Millisecond * 100)
continue

Check failure on line 155 in peerconnection_media_test.go

GitHub Actions / lint / Go

continue with no blank line before (nlreturn)
}

if routineErr := vp8Track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}); routineErr != nil {
fmt.Println(routineErr)
}
@@ -169,6 +170,12 @@
}()

go func() {
for {
if pcOffer.ICEConnectionState() == ICEConnectionStateConnected {
break
}
time.Sleep(time.Millisecond * 100)
}
parameters := sender.GetParameters()

for {
@@ -190,7 +197,6 @@
}
}
}()

go func() {
if _, _, routineErr := sender.Read(make([]byte, 1400)); routineErr == nil {
close(awaitRTCPSenderRecv)
50 changes: 36 additions & 14 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
@@ -26,14 +26,16 @@

streamInfo, repairStreamInfo *interceptor.StreamInfo

rtpReadStream *srtp.ReadStreamSRTP
rtpInterceptor interceptor.RTPReader
rtpReadStream *srtp.ReadStreamSRTP
rtpReader interceptor.RTPReader
rtpProcessor interceptor.RTPProcessor

rtcpReadStream *srtp.ReadStreamSRTCP
rtcpInterceptor interceptor.RTCPReader

repairReadStream *srtp.ReadStreamSRTP
repairInterceptor interceptor.RTPReader
repairReader interceptor.RTPReader
repairProcessor interceptor.RTPProcessor
repairStreamChannel chan rtxPacketWithAttributes

repairRtcpReadStream *srtp.ReadStreamSRTCP
@@ -228,13 +230,13 @@
var err error

//nolint:lll // # TODO refactor
if streams.rtpReadStream, streams.rtpInterceptor, streams.rtcpReadStream, streams.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *streams.streamInfo); err != nil {
if streams.rtpReadStream, streams.rtpReader, streams.rtpProcessor, streams.rtcpReadStream, streams.rtcpInterceptor, err = r.transport.streamsForSSRC(parameters.Encodings[i].SSRC, *streams.streamInfo); err != nil {
return err
}

if rtxSsrc := parameters.Encodings[i].RTX.SSRC; rtxSsrc != 0 {
streamInfo := createStreamInfo("", rtxSsrc, 0, 0, 0, 0, 0, codec, globalParams.HeaderExtensions)
rtpReadStream, rtpInterceptor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC(
rtpReadStream, rtpReader, rtpProcessor, rtcpReadStream, rtcpInterceptor, err := r.transport.streamsForSSRC(
rtxSsrc,
*streamInfo,
)
@@ -247,7 +249,8 @@
"",
streamInfo,
rtpReadStream,
rtpInterceptor,
rtpReader,
rtpProcessor,
rtcpReadStream,
rtcpInterceptor,
); err != nil {
@@ -412,7 +415,11 @@
}

if t := r.streamsForTrack(reader); t != nil {
return t.rtpInterceptor.Read(b, a)
i, attr, err := t.rtpReader.Read(b, a)
if err != nil {
return 0, nil, err
}
return t.rtpProcessor.Process(i, b, attr)

Check failure on line 422 in rtpreceiver.go

GitHub Actions / lint / Go

return with no blank line before (nlreturn)
}

return 0, nil, fmt.Errorf("%w: %d", errRTPReceiverWithSSRCTrackStreamNotFound, reader.SSRC())
@@ -425,7 +432,8 @@
params RTPParameters,
streamInfo *interceptor.StreamInfo,
rtpReadStream *srtp.ReadStreamSRTP,
rtpInterceptor interceptor.RTPReader,
rtpReader interceptor.RTPReader,
rtpProcessor interceptor.RTPProcessor,
rtcpReadStream *srtp.ReadStreamSRTCP,
rtcpInterceptor interceptor.RTCPReader,
) (*TrackRemote, error) {
@@ -443,7 +451,8 @@

r.tracks[i].streamInfo = streamInfo
r.tracks[i].rtpReadStream = rtpReadStream
r.tracks[i].rtpInterceptor = rtpInterceptor
r.tracks[i].rtpReader = rtpReader
r.tracks[i].rtpProcessor = rtpProcessor
r.tracks[i].rtcpReadStream = rtcpReadStream
r.tracks[i].rtcpInterceptor = rtcpInterceptor

@@ -457,12 +466,13 @@
// receiveForRtx starts a routine that processes the repair stream.
//
//nolint:cyclop
func (r *RTPReceiver) receiveForRtx(

Check failure on line 469 in rtpreceiver.go

GitHub Actions / lint / Go

cognitive complexity 33 of func `(*RTPReceiver).receiveForRtx` is high (> 30) (gocognit)
ssrc SSRC,
rsid string,
streamInfo *interceptor.StreamInfo,
rtpReadStream *srtp.ReadStreamSRTP,
rtpInterceptor interceptor.RTPReader,
rtpReader interceptor.RTPReader,
rtpProcessor interceptor.RTPProcessor,
rtcpReadStream *srtp.ReadStreamSRTCP,
rtcpInterceptor interceptor.RTCPReader,
) error {
@@ -488,15 +498,21 @@

track.repairStreamInfo = streamInfo
track.repairReadStream = rtpReadStream
track.repairInterceptor = rtpInterceptor
track.repairReader = rtpReader
track.repairProcessor = rtpProcessor
track.repairRtcpReadStream = rtcpReadStream
track.repairRtcpInterceptor = rtcpInterceptor
track.repairStreamChannel = make(chan rtxPacketWithAttributes, 50)

go func() {
for {
b := r.rtxPool.Get().([]byte) // nolint:forcetypeassert
i, attributes, err := track.repairInterceptor.Read(b, nil)
i, attributes, err := track.repairReader.Read(b, nil)
if err != nil {
r.rtxPool.Put(b) // nolint:staticcheck
return

Check failure on line 513 in rtpreceiver.go

GitHub Actions / lint / Go

return with no blank line before (nlreturn)
}
i, attributes, err = track.repairProcessor.Process(i, b, attributes)
if err != nil {
r.rtxPool.Put(b) // nolint:staticcheck

@@ -590,7 +606,7 @@
}

// readRTX returns an RTX packet if one is available on the RTX track, otherwise returns nil.
func (r *RTPReceiver) readRTX(reader *TrackRemote) *rtxPacketWithAttributes {
func (r *RTPReceiver) readRTX(b []byte, reader *TrackRemote) *rtxPacketWithAttributes {
if !reader.HasRTX() {
return nil
}
@@ -604,7 +620,13 @@
if t := r.streamsForTrack(reader); t != nil {
select {
case rtxPacketReceived := <-t.repairStreamChannel:
return &rtxPacketReceived
{
n := copy(b, rtxPacketReceived.pkt)
_, _, err := t.rtpProcessor.Process(n, b, nil)
if err == nil {
return &rtxPacketReceived
}
}
default:
}
}
2 changes: 1 addition & 1 deletion track_remote.go
Original file line number Diff line number Diff line change
@@ -135,7 +135,7 @@ func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes,
}

// If there's a separate RTX track and an RTX packet is available, return that
if rtxPacketReceived := receiver.readRTX(t); rtxPacketReceived != nil {
if rtxPacketReceived := receiver.readRTX(b, t); rtxPacketReceived != nil {
n = copy(b, rtxPacketReceived.pkt)
attributes = rtxPacketReceived.attributes
rtxPacketReceived.release()