Skip to content

Commit 9cfda8b

Browse files
committed
Add EnableDataChannelBlockWrite to SettingEngine
The settingengine can enabled block write if datachannel is detached, makes it working like a normal net.Conn.
1 parent 6a3f355 commit 9cfda8b

File tree

4 files changed

+58
-1
lines changed

4 files changed

+58
-1
lines changed

datachannel.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,12 @@ func (d *DataChannel) ensureOpen() error {
450450
// pion/datachannel documentation for the correct way to handle the
451451
// resulting DataChannel object.
452452
func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) {
453+
return d.DetachWithDeadline()
454+
}
455+
456+
// DetachWithDeadline allows you to detach the underlying datachannel.
457+
// It is the same as Detach but returns a ReadWriteCloserDeadliner.
458+
func (d *DataChannel) DetachWithDeadline() (datachannel.ReadWriteCloserDeadliner, error) {
453459
d.mu.Lock()
454460

455461
if !d.api.settingEngine.detach.DataChannels {

sctptransport.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (r *SCTPTransport) GetCapabilities() SCTPCapabilities {
9898
// Start the SCTPTransport. Since both local and remote parties must mutually
9999
// create an SCTPTransport, SCTP SO (Simultaneous Open) is used to establish
100100
// a connection over SCTP.
101-
func (r *SCTPTransport) Start(SCTPCapabilities) error {
101+
func (r *SCTPTransport) Start(_ SCTPCapabilities) error {
102102
if r.isStarted {
103103
return nil
104104
}
@@ -114,6 +114,7 @@ func (r *SCTPTransport) Start(SCTPCapabilities) error {
114114
EnableZeroChecksum: r.api.settingEngine.sctp.enableZeroChecksum,
115115
LoggerFactory: r.api.settingEngine.LoggerFactory,
116116
RTOMax: float64(r.api.settingEngine.sctp.rtoMax) / float64(time.Millisecond),
117+
BlockWrite: r.api.settingEngine.detach.DataChannels && r.api.settingEngine.dataChannelBlockWrite,
117118
})
118119
if err != nil {
119120
return err

settingengine.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ type SettingEngine struct {
103103
iceMaxBindingRequests *uint16
104104
fireOnTrackBeforeFirstRTP bool
105105
disableCloseByDTLS bool
106+
dataChannelBlockWrite bool
106107
}
107108

108109
// getReceiveMTU returns the configured MTU. If SettingEngine's MTU is configured to 0 it returns the default
@@ -121,6 +122,12 @@ func (e *SettingEngine) DetachDataChannels() {
121122
e.detach.DataChannels = true
122123
}
123124

125+
// EnableDataChannelBlockWrite allows data channels to block on write,
126+
// it only works if DetachDataChannels is enabled
127+
func (e *SettingEngine) EnableDataChannelBlockWrite(nonblockWrite bool) {
128+
e.dataChannelBlockWrite = nonblockWrite
129+
}
130+
124131
// SetSRTPProtectionProfiles allows the user to override the default SRTP Protection Profiles
125132
// The default srtp protection profiles are provided by the function `defaultSrtpProtectionProfiles`
126133
func (e *SettingEngine) SetSRTPProtectionProfiles(profiles ...dtls.SRTPProtectionProfile) {

settingengine_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"testing"
1313
"time"
1414

15+
"github.com/pion/datachannel"
1516
"github.com/pion/dtls/v3/pkg/crypto/elliptic"
1617
"github.com/pion/dtls/v3/pkg/protocol/handshake"
1718
"github.com/pion/ice/v4"
@@ -417,3 +418,45 @@ func TestDisableCloseByDTLS(t *testing.T) {
417418
assert.True(t, offer.ConnectionState() == PeerConnectionStateConnected)
418419
assert.NoError(t, offer.Close())
419420
}
421+
422+
func TestEnableDataChannelBlockWrite(t *testing.T) {
423+
lim := test.TimeOut(time.Second * 30)
424+
defer lim.Stop()
425+
426+
report := test.CheckRoutines(t)
427+
defer report()
428+
429+
s := SettingEngine{}
430+
s.DetachDataChannels()
431+
s.EnableDataChannelBlockWrite(true)
432+
s.SetSCTPMaxReceiveBufferSize(1500)
433+
434+
offer, answer, err := NewAPI(WithSettingEngine(s)).newPair(Configuration{})
435+
assert.NoError(t, err)
436+
437+
dc, err := offer.CreateDataChannel("data", nil)
438+
assert.NoError(t, err)
439+
detachChan := make(chan datachannel.ReadWriteCloserDeadliner, 1)
440+
dc.OnOpen(func() {
441+
detached, err1 := dc.DetachWithDeadline()
442+
assert.NoError(t, err1)
443+
detachChan <- detached
444+
})
445+
446+
assert.NoError(t, signalPair(offer, answer))
447+
untilConnectionState(PeerConnectionStateConnected, offer, answer).Wait()
448+
449+
// write should block and return deadline exceeded since the receiver is not reading
450+
// and the buffer size is 1500 bytes
451+
rawDC := <-detachChan
452+
assert.NoError(t, rawDC.SetWriteDeadline(time.Now().Add(time.Second)))
453+
buf := make([]byte, 1000)
454+
for i := 0; i < 10; i++ {
455+
_, err = rawDC.Write(buf)
456+
if err != nil {
457+
break
458+
}
459+
}
460+
assert.ErrorIs(t, err, context.DeadlineExceeded)
461+
closePairNow(t, offer, answer)
462+
}

0 commit comments

Comments
 (0)