Skip to content

Commit e17ce04

Browse files
committed
Revert 7c8bfbd and add test
Don't block Close on spawned goroutines
1 parent 6988aff commit e17ce04

File tree

4 files changed

+38
-152
lines changed

4 files changed

+38
-152
lines changed

datachannel.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ type DataChannel struct {
4040
readyState atomic.Value // DataChannelState
4141
bufferedAmountLowThreshold uint64
4242
detachCalled bool
43-
readLoopActive chan struct{}
4443

4544
// The binaryType represents attribute MUST, on getting, return the value to
4645
// which it was last set. On setting, if the new value is either the string
@@ -328,7 +327,6 @@ func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlread
328327
defer d.mu.Unlock()
329328

330329
if !d.api.settingEngine.detach.DataChannels {
331-
d.readLoopActive = make(chan struct{})
332330
go d.readLoop()
333331
}
334332
}
@@ -352,7 +350,6 @@ func (d *DataChannel) onError(err error) {
352350
}
353351

354352
func (d *DataChannel) readLoop() {
355-
defer close(d.readLoopActive)
356353
buffer := make([]byte, dataChannelBufferSize)
357354
for {
358355
n, isString, err := d.dataChannel.ReadDataChannel(buffer)
@@ -452,22 +449,6 @@ func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) {
452449
// Close Closes the DataChannel. It may be called regardless of whether
453450
// the DataChannel object was created by this peer or the remote peer.
454451
func (d *DataChannel) Close() error {
455-
return d.close(false)
456-
}
457-
458-
// Normally, close only stops writes from happening, so waitForReadsDone=true
459-
// will wait for reads to be finished based on underlying SCTP association
460-
// closure or a SCTP reset stream from the other side. This is safe to call
461-
// with waitForReadsDone=true after tearing down a PeerConnection but not
462-
// necessarily before. For example, if you used a vnet and dropped all packets
463-
// right before closing the DataChannel, you'd need never see a reset stream.
464-
func (d *DataChannel) close(waitForReadsDone bool) error {
465-
if waitForReadsDone && d.readLoopActive != nil {
466-
defer func() {
467-
<-d.readLoopActive
468-
}()
469-
}
470-
471452
d.mu.Lock()
472453
haveSctpTransport := d.dataChannel != nil
473454
d.mu.Unlock()

peerconnection.go

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ type PeerConnection struct {
5656
idpLoginURL *string
5757

5858
isClosed *atomicBool
59-
isClosedDone chan struct{}
6059
isNegotiationNeeded *atomicBool
6160
updateNegotiationNeededFlagOnEmptyChain *atomicBool
6261

@@ -117,7 +116,6 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
117116
ICECandidatePoolSize: 0,
118117
},
119118
isClosed: &atomicBool{},
120-
isClosedDone: make(chan struct{}),
121119
isNegotiationNeeded: &atomicBool{},
122120
updateNegotiationNeededFlagOnEmptyChain: &atomicBool{},
123121
lastOffer: "",
@@ -2046,31 +2044,14 @@ func (pc *PeerConnection) writeRTCP(pkts []rtcp.Packet, _ interceptor.Attributes
20462044
return pc.dtlsTransport.WriteRTCP(pkts)
20472045
}
20482046

2049-
// Close ends the PeerConnection.
2050-
// It will make a best effort to wait for all underlying goroutines it spawned to finish,
2051-
// except for cases that would cause deadlocks with itself.
2047+
// Close ends the PeerConnection
20522048
func (pc *PeerConnection) Close() error {
20532049
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1)
20542050
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2)
20552051
if pc.isClosed.swap(true) {
2056-
// someone else got here first but may still be closing (e.g. via DTLS close_notify)
2057-
<-pc.isClosedDone
20582052
return nil
20592053
}
2060-
defer close(pc.isClosedDone)
20612054

2062-
// Try closing everything and collect the errors
2063-
// Shutdown strategy:
2064-
// 1. Close all data channels.
2065-
// 2. All Conn close by closing their underlying Conn.
2066-
// 3. A Mux stops this chain. It won't close the underlying
2067-
// Conn if one of the endpoints is closed down. To
2068-
// continue the chain the Mux has to be closed.
2069-
pc.sctpTransport.lock.Lock()
2070-
closeErrs := make([]error, 0, 4+len(pc.sctpTransport.dataChannels))
2071-
pc.sctpTransport.lock.Unlock()
2072-
2073-
// canon steps
20742055
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
20752056
pc.signalingState.Set(SignalingStateClosed)
20762057

@@ -2080,6 +2061,7 @@ func (pc *PeerConnection) Close() error {
20802061
// 2. A Mux stops this chain. It won't close the underlying
20812062
// Conn if one of the endpoints is closed down. To
20822063
// continue the chain the Mux has to be closed.
2064+
closeErrs := make([]error, 4)
20832065

20842066
closeErrs = append(closeErrs, pc.api.interceptor.Close())
20852067

@@ -2106,6 +2088,7 @@ func (pc *PeerConnection) Close() error {
21062088

21072089
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #7)
21082090
closeErrs = append(closeErrs, pc.dtlsTransport.Stop())
2091+
21092092
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10)
21102093
if pc.iceTransport != nil {
21112094
closeErrs = append(closeErrs, pc.iceTransport.Stop())
@@ -2114,13 +2097,6 @@ func (pc *PeerConnection) Close() error {
21142097
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11)
21152098
pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State())
21162099

2117-
// non-canon steps
2118-
pc.sctpTransport.lock.Lock()
2119-
for _, d := range pc.sctpTransport.dataChannels {
2120-
closeErrs = append(closeErrs, d.close(true))
2121-
}
2122-
pc.sctpTransport.lock.Unlock()
2123-
21242100
return util.FlattenErrs(closeErrs)
21252101
}
21262102

@@ -2292,11 +2268,8 @@ func (pc *PeerConnection) startTransports(iceRole ICERole, dtlsRole DTLSRole, re
22922268
}
22932269

22942270
pc.dtlsTransport.internalOnCloseHandler = func() {
2295-
if pc.isClosed.get() {
2296-
return
2297-
}
2298-
22992271
pc.log.Info("Closing PeerConnection from DTLS CloseNotify")
2272+
23002273
go func() {
23012274
if pcClosErr := pc.Close(); pcClosErr != nil {
23022275
pc.log.Warnf("Failed to close PeerConnection from DTLS CloseNotify: %s", pcClosErr)

peerconnection_close_test.go

Lines changed: 0 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
package webrtc
88

99
import (
10-
"runtime"
11-
"strings"
1210
"testing"
1311
"time"
1412

@@ -181,103 +179,3 @@ func TestPeerConnection_Close_DuringICE(t *testing.T) {
181179
t.Error("pcOffer.Close() Timeout")
182180
}
183181
}
184-
185-
func TestPeerConnection_CloseWithIncomingMessages(t *testing.T) {
186-
// Limit runtime in case of deadlocks
187-
lim := test.TimeOut(time.Second * 20)
188-
defer lim.Stop()
189-
190-
report := CheckRoutinesIntolerant(t)
191-
defer report()
192-
193-
pcOffer, pcAnswer, err := newPair()
194-
if err != nil {
195-
t.Fatal(err)
196-
}
197-
198-
var dcAnswer *DataChannel
199-
answerDataChannelOpened := make(chan struct{})
200-
pcAnswer.OnDataChannel(func(d *DataChannel) {
201-
// Make sure this is the data channel we were looking for. (Not the one
202-
// created in signalPair).
203-
if d.Label() != "data" {
204-
return
205-
}
206-
dcAnswer = d
207-
close(answerDataChannelOpened)
208-
})
209-
210-
dcOffer, err := pcOffer.CreateDataChannel("data", nil)
211-
if err != nil {
212-
t.Fatal(err)
213-
}
214-
215-
offerDataChannelOpened := make(chan struct{})
216-
dcOffer.OnOpen(func() {
217-
close(offerDataChannelOpened)
218-
})
219-
220-
err = signalPair(pcOffer, pcAnswer)
221-
if err != nil {
222-
t.Fatal(err)
223-
}
224-
225-
<-offerDataChannelOpened
226-
<-answerDataChannelOpened
227-
228-
msgNum := 0
229-
dcOffer.OnMessage(func(_ DataChannelMessage) {
230-
t.Log("msg", msgNum)
231-
msgNum++
232-
})
233-
234-
// send 50 messages, then close pcOffer, and then send another 50
235-
for i := 0; i < 100; i++ {
236-
if i == 50 {
237-
err = pcOffer.Close()
238-
if err != nil {
239-
t.Fatal(err)
240-
}
241-
}
242-
_ = dcAnswer.Send([]byte("hello!"))
243-
}
244-
245-
err = pcAnswer.Close()
246-
if err != nil {
247-
t.Fatal(err)
248-
}
249-
}
250-
251-
// CheckRoutinesIntolerant is used to check for leaked go-routines.
252-
// It differs from test.CheckRoutines in that it won't wait at all
253-
// for lingering goroutines. This is helpful for tests that need
254-
// to ensure clean closure of resources.
255-
func CheckRoutinesIntolerant(t *testing.T) func() {
256-
return func() {
257-
routines := getRoutines()
258-
if len(routines) == 0 {
259-
return
260-
}
261-
t.Fatalf("%s: \n%s", "Unexpected routines on test end", strings.Join(routines, "\n\n")) // nolint
262-
}
263-
}
264-
265-
func getRoutines() []string {
266-
buf := make([]byte, 2<<20)
267-
buf = buf[:runtime.Stack(buf, true)]
268-
return filterRoutines(strings.Split(string(buf), "\n\n"))
269-
}
270-
271-
func filterRoutines(routines []string) []string {
272-
result := []string{}
273-
for _, stack := range routines {
274-
if stack == "" || // Empty
275-
strings.Contains(stack, "testing.Main(") || // Tests
276-
strings.Contains(stack, "testing.(*T).Run(") || // Test run
277-
strings.Contains(stack, "getRoutines(") { // This routine
278-
continue
279-
}
280-
result = append(result, stack)
281-
}
282-
return result
283-
}

peerconnection_go_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1606,3 +1606,37 @@ func TestPeerConnectionState(t *testing.T) {
16061606
assert.NoError(t, pc.Close())
16071607
assert.Equal(t, PeerConnectionStateClosed, pc.ConnectionState())
16081608
}
1609+
1610+
func TestPeerConnectionDeadlock(t *testing.T) {
1611+
lim := test.TimeOut(time.Second * 5)
1612+
defer lim.Stop()
1613+
1614+
report := test.CheckRoutines(t)
1615+
defer report()
1616+
1617+
closeHdlr := func(peerConnection *PeerConnection) {
1618+
peerConnection.OnICEConnectionStateChange(func(i ICEConnectionState) {
1619+
if i == ICEConnectionStateFailed || i == ICEConnectionStateClosed {
1620+
if err := peerConnection.Close(); err != nil {
1621+
assert.NoError(t, err)
1622+
}
1623+
}
1624+
})
1625+
}
1626+
1627+
pcOffer, pcAnswer, err := NewAPI().newPair(Configuration{})
1628+
assert.NoError(t, err)
1629+
1630+
assert.NoError(t, signalPair(pcOffer, pcAnswer))
1631+
1632+
onDataChannel, onDataChannelCancel := context.WithCancel(context.Background())
1633+
pcAnswer.OnDataChannel(func(*DataChannel) {
1634+
onDataChannelCancel()
1635+
})
1636+
<-onDataChannel.Done()
1637+
1638+
closeHdlr(pcOffer)
1639+
closeHdlr(pcAnswer)
1640+
1641+
closePairNow(t, pcOffer, pcAnswer)
1642+
}

0 commit comments

Comments
 (0)