Skip to content

Commit 5637661

Browse files
committed
Add E2E Test for RTX
Assert that generation of NACKs and sending of RTX operates as expected.
1 parent 32f7063 commit 5637661

File tree

8 files changed

+95
-104
lines changed

8 files changed

+95
-104
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/pion/datachannel v1.5.9
77
github.com/pion/dtls/v3 v3.0.2
88
github.com/pion/ice/v4 v4.0.1
9-
github.com/pion/interceptor v0.1.34
9+
github.com/pion/interceptor v0.1.36
1010
github.com/pion/logging v0.2.2
1111
github.com/pion/randutil v0.1.0
1212
github.com/pion/rtcp v1.2.14

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ github.com/pion/dtls/v3 v3.0.2 h1:425DEeJ/jfuTTghhUDW0GtYZYIwwMtnKKJNMcWccTX0=
4141
github.com/pion/dtls/v3 v3.0.2/go.mod h1:dfIXcFkKoujDQ+jtd8M6RgqKK3DuaUilm3YatAbGp5k=
4242
github.com/pion/ice/v4 v4.0.1 h1:2d3tPoTR90F3TcGYeXUwucGlXI3hds96cwv4kjZmb9s=
4343
github.com/pion/ice/v4 v4.0.1/go.mod h1:2dpakjpd7+74L5j3TAe6gvkbI5UIzOgAnkimm9SuHvA=
44-
github.com/pion/interceptor v0.1.34 h1:jb1MG9LTdQ4VVCSZDUbUzjeJNngzz4dBXcr2dL+ejfA=
45-
github.com/pion/interceptor v0.1.34/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
44+
github.com/pion/interceptor v0.1.36 h1:WNOZUs5Vec3+NHeY6uGo4nvbxCcRglrI//DlUwLnl/M=
45+
github.com/pion/interceptor v0.1.36/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
4646
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
4747
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
4848
github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM=

peerconnection_go_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -931,7 +931,7 @@ func TestICERestart_Error_Handling(t *testing.T) {
931931
report := test.CheckRoutines(t)
932932
defer report()
933933

934-
offerPeerConnection, answerPeerConnection, wan := createVNetPair(t)
934+
offerPeerConnection, answerPeerConnection, wan := createVNetPair(t, nil)
935935

936936
pushICEState := func(i ICEConnectionState) { iceStates <- i }
937937
offerPeerConnection.OnICEConnectionStateChange(pushICEState)

peerconnection_media_test.go

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"errors"
1414
"fmt"
1515
"io"
16+
"math/rand"
1617
"regexp"
1718
"strings"
1819
"sync"
@@ -322,7 +323,7 @@ func TestPeerConnection_Media_Disconnected(t *testing.T) {
322323
m := &MediaEngine{}
323324
assert.NoError(t, m.RegisterDefaultCodecs())
324325

325-
pcOffer, pcAnswer, wan := createVNetPair(t)
326+
pcOffer, pcAnswer, wan := createVNetPair(t, nil)
326327

327328
keepPackets := &atomicBool{}
328329
keepPackets.set(true)
@@ -1780,3 +1781,76 @@ func TestPeerConnection_Zero_PayloadType(t *testing.T) {
17801781

17811782
closePairNow(t, pcOffer, pcAnswer)
17821783
}
1784+
1785+
// Assert that NACKs work E2E with no extra configuration. If media is sent over a lossy connection
1786+
// the user gets retransmitted RTP packets with no extra configuration
1787+
func Test_PeerConnection_RTX_E2E(t *testing.T) {
1788+
defer test.TimeOut(time.Second * 30).Stop()
1789+
1790+
pcOffer, pcAnswer, wan := createVNetPair(t, nil)
1791+
1792+
wan.AddChunkFilter(func(vnet.Chunk) bool {
1793+
return rand.Intn(5) != 4 //nolint: gosec
1794+
})
1795+
1796+
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id")
1797+
assert.NoError(t, err)
1798+
1799+
rtpSender, err := pcOffer.AddTrack(track)
1800+
assert.NoError(t, err)
1801+
1802+
go func() {
1803+
rtcpBuf := make([]byte, 1500)
1804+
for {
1805+
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
1806+
return
1807+
}
1808+
}
1809+
}()
1810+
1811+
rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC
1812+
ssrc := rtpSender.GetParameters().Encodings[0].SSRC
1813+
1814+
rtxRead, rtxReadCancel := context.WithCancel(context.Background())
1815+
pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
1816+
for {
1817+
pkt, attributes, readRTPErr := track.ReadRTP()
1818+
if errors.Is(readRTPErr, io.EOF) {
1819+
return
1820+
} else if pkt.PayloadType == 0 {
1821+
continue
1822+
}
1823+
1824+
assert.NotNil(t, pkt)
1825+
assert.Equal(t, pkt.SSRC, uint32(ssrc))
1826+
assert.Equal(t, pkt.PayloadType, uint8(96))
1827+
1828+
rtxPayloadType := attributes.Get(AttributeRtxPayloadType)
1829+
rtxSequenceNumber := attributes.Get(AttributeRtxSequenceNumber)
1830+
rtxSSRC := attributes.Get(AttributeRtxSsrc)
1831+
if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil {
1832+
assert.Equal(t, rtxPayloadType, uint8(97))
1833+
assert.Equal(t, rtxSSRC, uint32(rtxSsrc))
1834+
1835+
rtxReadCancel()
1836+
}
1837+
}
1838+
})
1839+
1840+
assert.NoError(t, signalPair(pcOffer, pcAnswer))
1841+
1842+
func() {
1843+
for {
1844+
select {
1845+
case <-time.After(20 * time.Millisecond):
1846+
writeErr := track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second})
1847+
assert.NoError(t, writeErr)
1848+
case <-rtxRead.Done():
1849+
return
1850+
}
1851+
}
1852+
}()
1853+
1854+
assert.NoError(t, wan.Stop())
1855+
closePairNow(t, pcOffer, pcAnswer)
1856+
}

rtpreceiver_go_test.go

Lines changed: 1 addition & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,16 @@ package webrtc
88

99
import (
1010
"context"
11-
"encoding/binary"
12-
"errors"
13-
"io"
1411
"testing"
1512
"time"
1613

17-
"github.com/pion/rtp"
1814
"github.com/pion/sdp/v3"
19-
"github.com/pion/transport/v3/test"
2015
"github.com/pion/webrtc/v4/pkg/media"
2116
"github.com/stretchr/testify/assert"
2217
)
2318

2419
func TestSetRTPParameters(t *testing.T) {
25-
sender, receiver, wan := createVNetPair(t)
20+
sender, receiver, wan := createVNetPair(t, nil)
2621

2722
outgoingTrack, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
2823
assert.NoError(t, err)
@@ -75,89 +70,3 @@ func TestSetRTPParameters(t *testing.T) {
7570
assert.NoError(t, wan.Stop())
7671
closePairNow(t, sender, receiver)
7772
}
78-
79-
// Assert the behavior of reading a RTX with a distinct SSRC
80-
// All the attributes should be populated and the packet unpacked
81-
func Test_RTX_Read(t *testing.T) {
82-
defer test.TimeOut(time.Second * 30).Stop()
83-
84-
pcOffer, pcAnswer, err := newPair()
85-
assert.NoError(t, err)
86-
87-
track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id")
88-
assert.NoError(t, err)
89-
90-
rtpSender, err := pcOffer.AddTrack(track)
91-
assert.NoError(t, err)
92-
93-
rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC
94-
ssrc := rtpSender.GetParameters().Encodings[0].SSRC
95-
96-
rtxRead, rtxReadCancel := context.WithCancel(context.Background())
97-
pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
98-
for {
99-
pkt, attributes, readRTPErr := track.ReadRTP()
100-
if errors.Is(readRTPErr, io.EOF) {
101-
return
102-
} else if pkt.PayloadType == 0 {
103-
continue
104-
}
105-
106-
assert.NoError(t, readRTPErr)
107-
assert.NotNil(t, pkt)
108-
assert.Equal(t, pkt.SSRC, uint32(ssrc))
109-
assert.Equal(t, pkt.PayloadType, uint8(96))
110-
assert.Equal(t, pkt.Payload, []byte{0xB, 0xA, 0xD})
111-
112-
rtxPayloadType := attributes.Get(AttributeRtxPayloadType)
113-
rtxSequenceNumber := attributes.Get(AttributeRtxSequenceNumber)
114-
rtxSSRC := attributes.Get(AttributeRtxSsrc)
115-
if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil {
116-
assert.Equal(t, rtxPayloadType, uint8(97))
117-
assert.Equal(t, rtxSSRC, uint32(rtxSsrc))
118-
assert.Equal(t, rtxSequenceNumber, pkt.SequenceNumber+500)
119-
120-
rtxReadCancel()
121-
}
122-
}
123-
})
124-
125-
assert.NoError(t, signalPair(pcOffer, pcAnswer))
126-
127-
func() {
128-
for i := uint16(0); ; i++ {
129-
pkt := rtp.Packet{
130-
Header: rtp.Header{
131-
Version: 2,
132-
SSRC: uint32(ssrc),
133-
PayloadType: 96,
134-
SequenceNumber: i,
135-
},
136-
Payload: []byte{0xB, 0xA, 0xD},
137-
}
138-
139-
select {
140-
case <-time.After(20 * time.Millisecond):
141-
// Send the original packet
142-
err = track.WriteRTP(&pkt)
143-
assert.NoError(t, err)
144-
145-
rtxPayload := []byte{0x0, 0x0, 0xB, 0xA, 0xD}
146-
binary.BigEndian.PutUint16(rtxPayload[0:2], pkt.Header.SequenceNumber)
147-
148-
// Send the RTX
149-
_, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{
150-
Version: 2,
151-
SSRC: uint32(rtxSsrc),
152-
PayloadType: 97,
153-
SequenceNumber: i + 500,
154-
}, rtxPayload)
155-
assert.NoError(t, err)
156-
case <-rtxRead.Done():
157-
return
158-
}
159-
}
160-
}()
161-
162-
closePairNow(t, pcOffer, pcAnswer)
163-
}

rtpreceiver_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"testing"
1212
"time"
1313

14+
"github.com/pion/interceptor"
1415
"github.com/pion/transport/v3/test"
1516
"github.com/pion/webrtc/v4/pkg/media"
1617
"github.com/stretchr/testify/assert"
@@ -25,7 +26,7 @@ func Test_RTPReceiver_SetReadDeadline(t *testing.T) {
2526
report := test.CheckRoutines(t)
2627
defer report()
2728

28-
sender, receiver, wan := createVNetPair(t)
29+
sender, receiver, wan := createVNetPair(t, &interceptor.Registry{})
2930

3031
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
3132
assert.NoError(t, err)

rtpsender_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"testing"
1515
"time"
1616

17+
"github.com/pion/interceptor"
1718
"github.com/pion/transport/v3/test"
1819
"github.com/pion/webrtc/v4/pkg/media"
1920
"github.com/stretchr/testify/assert"
@@ -157,7 +158,7 @@ func Test_RTPSender_SetReadDeadline(t *testing.T) {
157158
report := test.CheckRoutines(t)
158159
defer report()
159160

160-
sender, receiver, wan := createVNetPair(t)
161+
sender, receiver, wan := createVNetPair(t, &interceptor.Registry{})
161162

162163
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
163164
assert.NoError(t, err)

vnet_test.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
"github.com/stretchr/testify/assert"
1717
)
1818

19-
func createVNetPair(t *testing.T) (*PeerConnection, *PeerConnection, *vnet.Router) {
19+
func createVNetPair(t *testing.T, interceptorRegistry *interceptor.Registry) (*PeerConnection, *PeerConnection, *vnet.Router) {
2020
// Create a root router
2121
wan, err := vnet.NewRouter(&vnet.RouterConfig{
2222
CIDR: "1.2.3.0/24",
@@ -53,12 +53,18 @@ func createVNetPair(t *testing.T) (*PeerConnection, *PeerConnection, *vnet.Route
5353
// Start the virtual network by calling Start() on the root router
5454
assert.NoError(t, wan.Start())
5555

56-
offerInterceptorRegistry := &interceptor.Registry{}
57-
offerPeerConnection, err := NewAPI(WithSettingEngine(offerSettingEngine), WithInterceptorRegistry(offerInterceptorRegistry)).NewPeerConnection(Configuration{})
56+
offerOptions := []func(*API){WithSettingEngine(offerSettingEngine)}
57+
if interceptorRegistry != nil {
58+
offerOptions = append(offerOptions, WithInterceptorRegistry(interceptorRegistry))
59+
}
60+
offerPeerConnection, err := NewAPI(offerOptions...).NewPeerConnection(Configuration{})
5861
assert.NoError(t, err)
5962

60-
answerInterceptorRegistry := &interceptor.Registry{}
61-
answerPeerConnection, err := NewAPI(WithSettingEngine(answerSettingEngine), WithInterceptorRegistry(answerInterceptorRegistry)).NewPeerConnection(Configuration{})
63+
answerOptions := []func(*API){WithSettingEngine(answerSettingEngine)}
64+
if interceptorRegistry != nil {
65+
answerOptions = append(answerOptions, WithInterceptorRegistry(interceptorRegistry))
66+
}
67+
answerPeerConnection, err := NewAPI(answerOptions...).NewPeerConnection(Configuration{})
6268
assert.NoError(t, err)
6369

6470
return offerPeerConnection, answerPeerConnection, wan

0 commit comments

Comments
 (0)