Skip to content

Commit 4e4a67d

Browse files
committed
Add PeerConnection.GracefulClose
1 parent b8d3a7b commit 4e4a67d

File tree

10 files changed

+314
-24
lines changed

10 files changed

+314
-24
lines changed

datachannel.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type DataChannel struct {
4040
readyState atomic.Value // DataChannelState
4141
bufferedAmountLowThreshold uint64
4242
detachCalled bool
43+
readLoopActive chan struct{}
44+
isGracefulClosed bool
4345

4446
// The binaryType represents attribute MUST, on getting, return the value to
4547
// which it was last set. On setting, if the new value is either the string
@@ -225,6 +227,10 @@ func (d *DataChannel) OnOpen(f func()) {
225227
func (d *DataChannel) onOpen() {
226228
d.mu.RLock()
227229
handler := d.onOpenHandler
230+
if d.isGracefulClosed {
231+
d.mu.RUnlock()
232+
return
233+
}
228234
d.mu.RUnlock()
229235

230236
if handler != nil {
@@ -252,6 +258,10 @@ func (d *DataChannel) OnDial(f func()) {
252258
func (d *DataChannel) onDial() {
253259
d.mu.RLock()
254260
handler := d.onDialHandler
261+
if d.isGracefulClosed {
262+
d.mu.RUnlock()
263+
return
264+
}
255265
d.mu.RUnlock()
256266

257267
if handler != nil {
@@ -261,6 +271,10 @@ func (d *DataChannel) onDial() {
261271

262272
// OnClose sets an event handler which is invoked when
263273
// the underlying data transport has been closed.
274+
// Note: Due to backwards compatibility, there is a chance that
275+
// OnClose can be called, even if the GracefulClose is used.
276+
// If this is the case for you, you can deregister OnClose
277+
// prior to GracefulClose.
264278
func (d *DataChannel) OnClose(f func()) {
265279
d.mu.Lock()
266280
defer d.mu.Unlock()
@@ -292,6 +306,10 @@ func (d *DataChannel) OnMessage(f func(msg DataChannelMessage)) {
292306
func (d *DataChannel) onMessage(msg DataChannelMessage) {
293307
d.mu.RLock()
294308
handler := d.onMessageHandler
309+
if d.isGracefulClosed {
310+
d.mu.RUnlock()
311+
return
312+
}
295313
d.mu.RUnlock()
296314

297315
if handler == nil {
@@ -302,6 +320,10 @@ func (d *DataChannel) onMessage(msg DataChannelMessage) {
302320

303321
func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlreadyNegotiated bool) {
304322
d.mu.Lock()
323+
if d.isGracefulClosed {
324+
d.mu.Unlock()
325+
return
326+
}
305327
d.dataChannel = dc
306328
bufferedAmountLowThreshold := d.bufferedAmountLowThreshold
307329
onBufferedAmountLow := d.onBufferedAmountLow
@@ -326,7 +348,12 @@ func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlread
326348
d.mu.Lock()
327349
defer d.mu.Unlock()
328350

351+
if d.isGracefulClosed {
352+
return
353+
}
354+
329355
if !d.api.settingEngine.detach.DataChannels {
356+
d.readLoopActive = make(chan struct{})
330357
go d.readLoop()
331358
}
332359
}
@@ -342,6 +369,10 @@ func (d *DataChannel) OnError(f func(err error)) {
342369
func (d *DataChannel) onError(err error) {
343370
d.mu.RLock()
344371
handler := d.onErrorHandler
372+
if d.isGracefulClosed {
373+
d.mu.RUnlock()
374+
return
375+
}
345376
d.mu.RUnlock()
346377

347378
if handler != nil {
@@ -356,6 +387,12 @@ var rlBufPool = sync.Pool{New: func() interface{} {
356387
}}
357388

358389
func (d *DataChannel) readLoop() {
390+
defer func() {
391+
d.mu.Lock()
392+
readLoopActive := d.readLoopActive
393+
d.mu.Unlock()
394+
defer close(readLoopActive)
395+
}()
359396
for {
360397
buffer := rlBufPool.Get().([]byte) //nolint:forcetypeassert
361398
n, isString, err := d.dataChannel.ReadDataChannel(buffer)
@@ -438,7 +475,32 @@ func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) {
438475
// Close Closes the DataChannel. It may be called regardless of whether
439476
// the DataChannel object was created by this peer or the remote peer.
440477
func (d *DataChannel) Close() error {
478+
return d.close(false)
479+
}
480+
481+
// GracefulClose Closes the DataChannel. It may be called regardless of whether
482+
// the DataChannel object was created by this peer or the remote peer. It also waits
483+
// for any goroutines it started to complete. This is only safe to call outside of
484+
// DataChannel callbacks or if in a callback, in its own goroutine.
485+
func (d *DataChannel) GracefulClose() error {
486+
return d.close(true)
487+
}
488+
489+
// Normally, close only stops writes from happening, so graceful=true
490+
// will wait for reads to be finished based on underlying SCTP association
491+
// closure or a SCTP reset stream from the other side. This is safe to call
492+
// with graceful=true after tearing down a PeerConnection but not
493+
// necessarily before. For example, if you used a vnet and dropped all packets
494+
// right before closing the DataChannel, you'd need never see a reset stream.
495+
func (d *DataChannel) close(shouldGracefullyClose bool) error {
441496
d.mu.Lock()
497+
d.isGracefulClosed = true
498+
readLoopActive := d.readLoopActive
499+
if shouldGracefullyClose && readLoopActive != nil {
500+
defer func() {
501+
<-readLoopActive
502+
}()
503+
}
442504
haveSctpTransport := d.dataChannel != nil
443505
d.mu.Unlock()
444506

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.17
55
require (
66
github.com/pion/datachannel v1.5.8
77
github.com/pion/dtls/v2 v2.2.12
8-
github.com/pion/ice/v2 v2.3.31
8+
github.com/pion/ice/v2 v2.3.34
99
github.com/pion/interceptor v0.1.29
1010
github.com/pion/logging v0.2.2
1111
github.com/pion/randutil v0.1.0
@@ -15,7 +15,7 @@ require (
1515
github.com/pion/sdp/v3 v3.0.9
1616
github.com/pion/srtp/v2 v2.0.20
1717
github.com/pion/stun v0.6.1
18-
github.com/pion/transport/v2 v2.2.8
18+
github.com/pion/transport/v2 v2.2.10
1919
github.com/sclevine/agouti v3.0.0+incompatible
2020
github.com/stretchr/testify v1.9.0
2121
golang.org/x/net v0.22.0

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ github.com/pion/datachannel v1.5.8/go.mod h1:PgmdpoaNBLX9HNzNClmdki4DYW5JtI7Yibu
4545
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
4646
github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk=
4747
github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
48-
github.com/pion/ice/v2 v2.3.31 h1:qag/YqiOn5qPi0kgeVdsytxjx8szuriWSIeXKu8dDQc=
49-
github.com/pion/ice/v2 v2.3.31/go.mod h1:8fac0+qftclGy1tYd/nfwfHC729BLaxtVqMdMVCAVPU=
48+
github.com/pion/ice/v2 v2.3.34 h1:Ic1ppYCj4tUOcPAp76U6F3fVrlSw8A9JtRXLqw6BbUM=
49+
github.com/pion/ice/v2 v2.3.34/go.mod h1:mBF7lnigdqgtB+YHkaY/Y6s6tsyRyo4u4rPGRuOjUBQ=
5050
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
5151
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
5252
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
@@ -74,8 +74,8 @@ github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/
7474
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
7575
github.com/pion/transport/v2 v2.2.3/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
7676
github.com/pion/transport/v2 v2.2.4/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
77-
github.com/pion/transport/v2 v2.2.8 h1:HzsqGBChgtF4Cj47gu51l5hONuK/NwgbZL17CMSuwS0=
78-
github.com/pion/transport/v2 v2.2.8/go.mod h1:sq1kSLWs+cHW9E+2fJP95QudkzbK7wscs8yYgQToO5E=
77+
github.com/pion/transport/v2 v2.2.10 h1:ucLBLE8nuxiHfvkFKnkDQRYWYfp8ejf4YBOPfaQpw6Q=
78+
github.com/pion/transport/v2 v2.2.10/go.mod h1:sq1kSLWs+cHW9E+2fJP95QudkzbK7wscs8yYgQToO5E=
7979
github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0=
8080
github.com/pion/transport/v3 v3.0.2 h1:r+40RJR25S9w3jbA6/5uEPTzcdn7ncyU44RWCbHkLg4=
8181
github.com/pion/transport/v3 v3.0.2/go.mod h1:nIToODoOlb5If2jF9y2Igfx3PFYWfuXi37m0IlWa/D0=

icegatherer.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,13 +188,31 @@ func (g *ICEGatherer) Gather() error {
188188

189189
// Close prunes all local candidates, and closes the ports.
190190
func (g *ICEGatherer) Close() error {
191+
return g.close(false /* shouldGracefullyClose */)
192+
}
193+
194+
// GracefulClose prunes all local candidates, and closes the ports. It also waits
195+
// for any goroutines it started to complete. This is only safe to call outside of
196+
// ICEGatherer callbacks or if in a callback, in its own goroutine.
197+
func (g *ICEGatherer) GracefulClose() error {
198+
return g.close(true /* shouldGracefullyClose */)
199+
}
200+
201+
func (g *ICEGatherer) close(shouldGracefullyClose bool) error {
191202
g.lock.Lock()
192203
defer g.lock.Unlock()
193204

194205
if g.agent == nil {
195206
return nil
196-
} else if err := g.agent.Close(); err != nil {
197-
return err
207+
}
208+
if shouldGracefullyClose {
209+
if err := g.agent.GracefulClose(); err != nil {
210+
return err
211+
}
212+
} else {
213+
if err := g.agent.Close(); err != nil {
214+
return err
215+
}
198216
}
199217

200218
g.agent = nil

icetransport.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/pion/ice/v2"
1717
"github.com/pion/logging"
1818
"github.com/pion/webrtc/v3/internal/mux"
19+
"github.com/pion/webrtc/v3/internal/util"
1920
)
2021

2122
// ICETransport allows an application access to information about the ICE
@@ -187,6 +188,17 @@ func (t *ICETransport) restart() error {
187188

188189
// Stop irreversibly stops the ICETransport.
189190
func (t *ICETransport) Stop() error {
191+
return t.stop(false /* shouldGracefullyClose */)
192+
}
193+
194+
// GracefulStop irreversibly stops the ICETransport. It also waits
195+
// for any goroutines it started to complete. This is only safe to call outside of
196+
// ICETransport callbacks or if in a callback, in its own goroutine.
197+
func (t *ICETransport) GracefulStop() error {
198+
return t.stop(true /* shouldGracefullyClose */)
199+
}
200+
201+
func (t *ICETransport) stop(shouldGracefullyClose bool) error {
190202
t.lock.Lock()
191203
defer t.lock.Unlock()
192204

@@ -197,8 +209,18 @@ func (t *ICETransport) Stop() error {
197209
}
198210

199211
if t.mux != nil {
200-
return t.mux.Close()
212+
var closeErrs []error
213+
if shouldGracefullyClose && t.gatherer != nil {
214+
// we can't access icegatherer/icetransport.Close via
215+
// mux's net.Conn Close so we call it earlier here.
216+
closeErrs = append(closeErrs, t.gatherer.GracefulClose())
217+
}
218+
closeErrs = append(closeErrs, t.mux.Close())
219+
return util.FlattenErrs(closeErrs)
201220
} else if t.gatherer != nil {
221+
if shouldGracefullyClose {
222+
return t.gatherer.GracefulClose()
223+
}
202224
return t.gatherer.Close()
203225
}
204226
return nil

internal/mux/mux.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ func (m *Mux) readLoop() {
120120
}
121121

122122
if err = m.dispatch(buf[:n]); err != nil {
123+
if errors.Is(err, io.ErrClosedPipe) {
124+
// if the buffer was closed, that's not an error we care to report
125+
return
126+
}
123127
m.log.Errorf("mux: ending readLoop dispatch error %s", err.Error())
124128
return
125129
}

operations.go

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@ type operation func()
1313

1414
// Operations is a task executor.
1515
type operations struct {
16-
mu sync.Mutex
17-
busy bool
18-
ops *list.List
16+
mu sync.Mutex
17+
busyCh chan struct{}
18+
ops *list.List
1919

2020
updateNegotiationNeededFlagOnEmptyChain *atomicBool
2121
onNegotiationNeeded func()
22+
isClosed bool
2223
}
2324

2425
func newOperations(
@@ -33,21 +34,34 @@ func newOperations(
3334
}
3435

3536
// Enqueue adds a new action to be executed. If there are no actions scheduled,
36-
// the execution will start immediately in a new goroutine.
37+
// the execution will start immediately in a new goroutine. If the queue has been
38+
// closed, the operation will be dropped. The queue is only deliberately closed
39+
// by a user.
3740
func (o *operations) Enqueue(op operation) {
41+
o.mu.Lock()
42+
defer o.mu.Unlock()
43+
_ = o.tryEnqueue(op)
44+
}
45+
46+
// tryEnqueue attempts to enqueue the given operation. It returns false
47+
// if the op is invalid or the queue is closed. mu must be locked by
48+
// tryEnqueue's caller.
49+
func (o *operations) tryEnqueue(op operation) bool {
3850
if op == nil {
39-
return
51+
return false
4052
}
4153

42-
o.mu.Lock()
43-
running := o.busy
54+
if o.isClosed {
55+
return false
56+
}
4457
o.ops.PushBack(op)
45-
o.busy = true
46-
o.mu.Unlock()
4758

48-
if !running {
59+
if o.busyCh == nil {
60+
o.busyCh = make(chan struct{})
4961
go o.start()
5062
}
63+
64+
return true
5165
}
5266

5367
// IsEmpty checks if there are tasks in the queue
@@ -62,12 +76,38 @@ func (o *operations) IsEmpty() bool {
6276
func (o *operations) Done() {
6377
var wg sync.WaitGroup
6478
wg.Add(1)
65-
o.Enqueue(func() {
79+
o.mu.Lock()
80+
enqueued := o.tryEnqueue(func() {
6681
wg.Done()
6782
})
83+
o.mu.Unlock()
84+
if !enqueued {
85+
return
86+
}
6887
wg.Wait()
6988
}
7089

90+
// GracefulClose waits for the operations queue to be cleared and forbids
91+
// new operations from being enqueued.
92+
func (o *operations) GracefulClose() {
93+
o.mu.Lock()
94+
if o.isClosed {
95+
o.mu.Unlock()
96+
return
97+
}
98+
// do not enqueue anymore ops from here on
99+
// o.isClosed=true will also not allow a new busyCh
100+
// to be created.
101+
o.isClosed = true
102+
103+
busyCh := o.busyCh
104+
o.mu.Unlock()
105+
if busyCh == nil {
106+
return
107+
}
108+
<-busyCh
109+
}
110+
71111
func (o *operations) pop() func() {
72112
o.mu.Lock()
73113
defer o.mu.Unlock()
@@ -87,12 +127,17 @@ func (o *operations) start() {
87127
defer func() {
88128
o.mu.Lock()
89129
defer o.mu.Unlock()
90-
if o.ops.Len() == 0 {
91-
o.busy = false
130+
// this wil lbe the most recent busy chan
131+
close(o.busyCh)
132+
133+
if o.ops.Len() == 0 || o.isClosed {
134+
o.busyCh = nil
92135
return
93136
}
137+
94138
// either a new operation was enqueued while we
95139
// were busy, or an operation panicked
140+
o.busyCh = make(chan struct{})
96141
go o.start()
97142
}()
98143

0 commit comments

Comments
 (0)