Skip to content

Commit 83cfeea

Browse files
cnderrauberSean-Der
authored andcommitted
Update RtxSSRC for simulcast track remote
Fix #2751, updates remote track's rtx ssrc for simulcast track doesn't contain rtx ssrc in sdp since readRTX relies on rtx ssrc to determine if it has a rtx stream.
1 parent 4ca42e7 commit 83cfeea

File tree

3 files changed

+228
-0
lines changed

3 files changed

+228
-0
lines changed

peerconnection_media_test.go

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"errors"
1414
"fmt"
1515
"io"
16+
"regexp"
1617
"strings"
1718
"sync"
1819
"sync/atomic"
@@ -26,6 +27,7 @@ import (
2627
"github.com/pion/sdp/v3"
2728
"github.com/pion/transport/v3/test"
2829
"github.com/pion/transport/v3/vnet"
30+
"github.com/pion/webrtc/v4/internal/util"
2931
"github.com/pion/webrtc/v4/pkg/media"
3032
"github.com/stretchr/testify/assert"
3133
"github.com/stretchr/testify/require"
@@ -1388,6 +1390,222 @@ func TestPeerConnection_Simulcast(t *testing.T) {
13881390
})
13891391
}
13901392

1393+
type simulcastTestTrackLocal struct {
1394+
*TrackLocalStaticRTP
1395+
}
1396+
1397+
// don't use ssrc&payload in bindings to let the test write different stream packets.
1398+
func (s *simulcastTestTrackLocal) WriteRTP(pkt *rtp.Packet) error {
1399+
packet := getPacketAllocationFromPool()
1400+
1401+
defer resetPacketPoolAllocation(packet)
1402+
1403+
*packet = *pkt
1404+
1405+
s.mu.RLock()
1406+
defer s.mu.RUnlock()
1407+
1408+
writeErrs := []error{}
1409+
1410+
for _, b := range s.bindings {
1411+
if _, err := b.writeStream.WriteRTP(&packet.Header, packet.Payload); err != nil {
1412+
writeErrs = append(writeErrs, err)
1413+
}
1414+
}
1415+
1416+
return util.FlattenErrs(writeErrs)
1417+
}
1418+
1419+
func TestPeerConnection_Simulcast_RTX(t *testing.T) {
1420+
lim := test.TimeOut(time.Second * 30)
1421+
defer lim.Stop()
1422+
1423+
report := test.CheckRoutines(t)
1424+
defer report()
1425+
1426+
rids := []string{"a", "b"}
1427+
pcOffer, pcAnswer, err := newPair()
1428+
assert.NoError(t, err)
1429+
1430+
vp8WriterAStatic, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[0]))
1431+
assert.NoError(t, err)
1432+
1433+
vp8WriterBStatic, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[1]))
1434+
assert.NoError(t, err)
1435+
1436+
vp8WriterA, vp8WriterB := &simulcastTestTrackLocal{vp8WriterAStatic}, &simulcastTestTrackLocal{vp8WriterBStatic}
1437+
1438+
sender, err := pcOffer.AddTrack(vp8WriterA)
1439+
assert.NoError(t, err)
1440+
assert.NotNil(t, sender)
1441+
1442+
assert.NoError(t, sender.AddEncoding(vp8WriterB))
1443+
1444+
var ridMapLock sync.RWMutex
1445+
ridMap := map[string]int{}
1446+
1447+
assertRidCorrect := func(t *testing.T) {
1448+
ridMapLock.Lock()
1449+
defer ridMapLock.Unlock()
1450+
1451+
for _, rid := range rids {
1452+
assert.Equal(t, ridMap[rid], 1)
1453+
}
1454+
assert.Equal(t, len(ridMap), 2)
1455+
}
1456+
1457+
ridsFullfilled := func() bool {
1458+
ridMapLock.Lock()
1459+
defer ridMapLock.Unlock()
1460+
1461+
ridCount := len(ridMap)
1462+
return ridCount == 2
1463+
}
1464+
1465+
var rtxPacketRead atomic.Int32
1466+
var wg sync.WaitGroup
1467+
wg.Add(2)
1468+
1469+
pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
1470+
ridMapLock.Lock()
1471+
ridMap[trackRemote.RID()] = ridMap[trackRemote.RID()] + 1
1472+
ridMapLock.Unlock()
1473+
1474+
defer wg.Done()
1475+
1476+
for {
1477+
_, attr, rerr := trackRemote.ReadRTP()
1478+
if rerr != nil {
1479+
break
1480+
}
1481+
if pt, ok := attr.Get(AttributeRtxPayloadType).(byte); ok {
1482+
if pt == 97 {
1483+
rtxPacketRead.Add(1)
1484+
}
1485+
}
1486+
}
1487+
})
1488+
1489+
parameters := sender.GetParameters()
1490+
assert.Equal(t, "a", parameters.Encodings[0].RID)
1491+
assert.Equal(t, "b", parameters.Encodings[1].RID)
1492+
1493+
var midID, ridID, rsid uint8
1494+
for _, extension := range parameters.HeaderExtensions {
1495+
switch extension.URI {
1496+
case sdp.SDESMidURI:
1497+
midID = uint8(extension.ID)
1498+
case sdp.SDESRTPStreamIDURI:
1499+
ridID = uint8(extension.ID)
1500+
case sdesRepairRTPStreamIDURI:
1501+
rsid = uint8(extension.ID)
1502+
}
1503+
}
1504+
assert.NotZero(t, midID)
1505+
assert.NotZero(t, ridID)
1506+
assert.NotZero(t, rsid)
1507+
1508+
err = signalPairWithModification(pcOffer, pcAnswer, func(sdp string) string {
1509+
// Original chrome sdp contains no ssrc info https://pastebin.com/raw/JTjX6zg6
1510+
re := regexp.MustCompile("(?m)[\r\n]+^.*a=ssrc.*$")
1511+
res := re.ReplaceAllString(sdp, "")
1512+
return res
1513+
})
1514+
assert.NoError(t, err)
1515+
1516+
// padding only packets should not affect simulcast probe
1517+
var sequenceNumber uint16
1518+
for sequenceNumber = 0; sequenceNumber < simulcastProbeCount+10; sequenceNumber++ {
1519+
time.Sleep(20 * time.Millisecond)
1520+
1521+
for i, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
1522+
pkt := &rtp.Packet{
1523+
Header: rtp.Header{
1524+
Version: 2,
1525+
SequenceNumber: sequenceNumber,
1526+
PayloadType: 96,
1527+
Padding: true,
1528+
SSRC: uint32(i),
1529+
},
1530+
Payload: []byte{0x00, 0x02},
1531+
}
1532+
1533+
assert.NoError(t, track.WriteRTP(pkt))
1534+
}
1535+
}
1536+
assert.False(t, ridsFullfilled(), "Simulcast probe should not be fulfilled by padding only packets")
1537+
1538+
for ; !ridsFullfilled(); sequenceNumber++ {
1539+
time.Sleep(20 * time.Millisecond)
1540+
1541+
for i, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
1542+
pkt := &rtp.Packet{
1543+
Header: rtp.Header{
1544+
Version: 2,
1545+
SequenceNumber: sequenceNumber,
1546+
PayloadType: 96,
1547+
SSRC: uint32(i),
1548+
},
1549+
Payload: []byte{0x00},
1550+
}
1551+
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
1552+
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
1553+
1554+
assert.NoError(t, track.WriteRTP(pkt))
1555+
}
1556+
}
1557+
1558+
assertRidCorrect(t)
1559+
1560+
for i := 0; i < simulcastProbeCount+10; i++ {
1561+
sequenceNumber++
1562+
time.Sleep(10 * time.Millisecond)
1563+
1564+
for j, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
1565+
pkt := &rtp.Packet{
1566+
Header: rtp.Header{
1567+
Version: 2,
1568+
SequenceNumber: sequenceNumber,
1569+
PayloadType: 97,
1570+
SSRC: uint32(100 + j),
1571+
},
1572+
Payload: []byte{0x00, 0x00, 0x00, 0x00, 0x00},
1573+
}
1574+
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
1575+
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
1576+
assert.NoError(t, pkt.Header.SetExtension(rsid, []byte(track.RID())))
1577+
1578+
assert.NoError(t, track.WriteRTP(pkt))
1579+
}
1580+
}
1581+
1582+
for ; rtxPacketRead.Load() == 0; sequenceNumber++ {
1583+
time.Sleep(20 * time.Millisecond)
1584+
1585+
for i, track := range []*simulcastTestTrackLocal{vp8WriterA, vp8WriterB} {
1586+
pkt := &rtp.Packet{
1587+
Header: rtp.Header{
1588+
Version: 2,
1589+
SequenceNumber: sequenceNumber,
1590+
PayloadType: 96,
1591+
SSRC: uint32(i),
1592+
},
1593+
Payload: []byte{0x00},
1594+
}
1595+
assert.NoError(t, pkt.Header.SetExtension(midID, []byte("0")))
1596+
assert.NoError(t, pkt.Header.SetExtension(ridID, []byte(track.RID())))
1597+
1598+
assert.NoError(t, track.WriteRTP(pkt))
1599+
}
1600+
}
1601+
1602+
closePairNow(t, pcOffer, pcAnswer)
1603+
1604+
wg.Wait()
1605+
1606+
assert.Greater(t, rtxPacketRead.Load(), int32(0), "no rtx packet read")
1607+
}
1608+
13911609
// Everytime we receieve a new SSRC we probe it and try to determine the proper way to handle it.
13921610
// In most cases a Track explicitly declares a SSRC and a OnTrack is fired. In two cases we don't
13931611
// know the SSRC ahead of time

rtpreceiver.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,10 @@ func (r *RTPReceiver) receiveForRtx(ssrc SSRC, rsid string, streamInfo *intercep
418418
for i := range r.tracks {
419419
if r.tracks[i].track.RID() == rsid {
420420
track = &r.tracks[i]
421+
if track.track.RtxSSRC() == 0 {
422+
track.track.setRtxSSRC(SSRC(streamInfo.SSRC))
423+
}
424+
break
421425
}
422426
}
423427
}

track_remote.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,3 +224,9 @@ func (t *TrackRemote) HasRTX() bool {
224224
defer t.mu.RUnlock()
225225
return t.rtxSsrc != 0
226226
}
227+
228+
func (t *TrackRemote) setRtxSSRC(ssrc SSRC) {
229+
t.mu.Lock()
230+
defer t.mu.Unlock()
231+
t.rtxSsrc = ssrc
232+
}

0 commit comments

Comments
 (0)