Skip to content

Commit c47f890

Browse files
thatsnotrightSean-Der
authored andcommitted
SampleBuilder: Port to use jitter buffer
1 parent cd73129 commit c47f890

File tree

2 files changed

+50
-23
lines changed

2 files changed

+50
-23
lines changed

pkg/media/samplebuilder/samplebuilder.go

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"math"
99
"time"
1010

11+
"github.com/pion/interceptor/pkg/jitterbuffer"
1112
"github.com/pion/rtp"
1213
"github.com/pion/webrtc/v4/pkg/media"
1314
)
@@ -16,7 +17,7 @@ import (
1617
type SampleBuilder struct {
1718
maxLate uint16 // how many packets to wait until we get a valid Sample
1819
maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets
19-
buffer [math.MaxUint16 + 1]*rtp.Packet
20+
buffer *jitterbuffer.JitterBuffer
2021
preparedSamples [math.MaxUint16 + 1]*media.Sample
2122

2223
// Interface that allows us to take RTP packets to samples
@@ -60,7 +61,7 @@ type SampleBuilder struct {
6061
// The depacketizer extracts media samples from RTP packets.
6162
// Several depacketizers are available in package github.com/pion/rtp/codecs.
6263
func New(maxLate uint16, depacketizer rtp.Depacketizer, sampleRate uint32, opts ...Option) *SampleBuilder {
63-
s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate}
64+
s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate, buffer: jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(1))}
6465
for _, o := range opts {
6566
o(s)
6667
}
@@ -76,7 +77,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
7677
var foundTail *rtp.Packet
7778

7879
for i := location.head; i != location.tail; i++ {
79-
if packet := s.buffer[i]; packet != nil {
80+
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
8081
foundHead = packet
8182
break
8283
}
@@ -87,7 +88,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
8788
}
8889

8990
for i := location.tail - 1; i != location.head; i-- {
90-
if packet := s.buffer[i]; packet != nil {
91+
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
9192
foundTail = packet
9293
break
9394
}
@@ -105,16 +106,16 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta
105106
if location.empty() {
106107
return 0, false
107108
}
108-
packet := s.buffer[location.head]
109-
if packet == nil {
109+
packet, err := s.buffer.PeekAtSequence(location.head)
110+
if packet == nil || err != nil {
110111
return 0, false
111112
}
112113
return packet.Timestamp, true
113114
}
114115

115116
func (s *SampleBuilder) releasePacket(i uint16) {
116117
var p *rtp.Packet
117-
p, s.buffer[i] = s.buffer[i], nil
118+
p, _ = s.buffer.PopAtSequence(i)
118119
if p != nil && s.packetReleaseHandler != nil {
119120
s.packetReleaseHandler(p)
120121
}
@@ -178,7 +179,7 @@ func (s *SampleBuilder) purgeBuffers(flush bool) {
178179
// Push does not copy the input. If you wish to reuse
179180
// this memory make sure to copy before calling Push
180181
func (s *SampleBuilder) Push(p *rtp.Packet) {
181-
s.buffer[p.SequenceNumber] = p
182+
s.buffer.Push(p)
182183

183184
switch s.filled.compare(p.SequenceNumber) {
184185
case slCompareVoid:
@@ -220,14 +221,19 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
220221

221222
var consume sampleSequenceLocation
222223

223-
for i := s.active.head; s.buffer[i] != nil && s.active.compare(i) != slCompareAfter; i++ {
224-
if s.depacketizer.IsPartitionTail(s.buffer[i].Marker, s.buffer[i].Payload) {
224+
for i := s.active.head; s.active.compare(i) != slCompareAfter; i++ {
225+
pkt, err := s.buffer.PeekAtSequence(i)
226+
if pkt == nil || err != nil {
227+
break
228+
}
229+
230+
if s.depacketizer.IsPartitionTail(pkt.Marker, pkt.Payload) {
225231
consume.head = s.active.head
226232
consume.tail = i + 1
227233
break
228234
}
229235
headTimestamp, hasData := s.fetchTimestamp(s.active)
230-
if hasData && s.buffer[i].Timestamp != headTimestamp {
236+
if hasData && pkt.Timestamp != headTimestamp {
231237
consume.head = s.active.head
232238
consume.tail = i
233239
break
@@ -237,8 +243,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
237243
if consume.empty() {
238244
return nil
239245
}
240-
241-
if !purgingBuffers && s.buffer[consume.tail] == nil {
246+
pkt, _ := s.buffer.PeekAtSequence(consume.tail)
247+
if !purgingBuffers && pkt == nil {
242248
// wait for the next packet after this set of packets to arrive
243249
// to ensure at least one post sample timestamp is known
244250
// (unless we have to release right now)
@@ -250,8 +256,10 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
250256

251257
// scan for any packet after the current and use that time stamp as the diff point
252258
for i := consume.tail; i < s.active.tail; i++ {
253-
if s.buffer[i] != nil {
254-
afterTimestamp = s.buffer[i].Timestamp
259+
pkt, _ = s.buffer.PeekAtSequence(i)
260+
261+
if pkt != nil {
262+
afterTimestamp = pkt.Timestamp
255263
break
256264
}
257265
}
@@ -261,10 +269,11 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
261269

262270
// prior to decoding all the packets, check if this packet
263271
// would end being disposed anyway
264-
if !s.depacketizer.IsPartitionHead(s.buffer[consume.head].Payload) {
272+
pkt, err := s.buffer.PeekAtSequence(consume.head)
273+
if err == nil && !s.depacketizer.IsPartitionHead(pkt.Payload) {
265274
isPadding := false
266275
for i := consume.head; i != consume.tail; i++ {
267-
if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == s.buffer[i].Timestamp && len(s.buffer[i].Payload) == 0 {
276+
if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == pkt.Timestamp && len(pkt.Payload) == 0 {
268277
isPadding = true
269278
}
270279
}
@@ -282,16 +291,22 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
282291
var metadata interface{}
283292
var rtpHeaders []*rtp.Header
284293
for i := consume.head; i != consume.tail; i++ {
285-
p, err := s.depacketizer.Unmarshal(s.buffer[i].Payload)
294+
pkt, err := s.buffer.PeekAtSequence(i)
295+
if err != nil {
296+
return nil
297+
}
298+
p, err := s.depacketizer.Unmarshal(pkt.Payload)
286299
if err != nil {
287300
return nil
288301
}
289302
if i == consume.head && s.packetHeadHandler != nil {
290303
metadata = s.packetHeadHandler(s.depacketizer)
291304
}
292305
if s.returnRTPHeaders {
293-
h := s.buffer[i].Header.Clone()
294-
rtpHeaders = append(rtpHeaders, &h)
306+
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
307+
h := pkt.Header.Clone()
308+
rtpHeaders = append(rtpHeaders, &h)
309+
}
295310
}
296311

297312
data = append(data, p...)
@@ -389,3 +404,11 @@ func WithRTPHeaders(enable bool) Option {
389404
o.returnRTPHeaders = enable
390405
}
391406
}
407+
408+
// WithJitterBufferMinimumLength sets the minimum number of packets which must first
409+
// be received before starting any playback
410+
func WithJitterBufferMinimumLength(length uint16) Option {
411+
return func(o *SampleBuilder) {
412+
o.buffer = jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(length))
413+
}
414+
}

pkg/media/samplebuilder/samplebuilder_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,14 +394,18 @@ func TestSampleBuilderCleanReference(t *testing.T) {
394394
s.Push(pkt5)
395395

396396
for i := 0; i < 3; i++ {
397-
if s.buffer[(i+int(seqStart))%0x10000] != nil {
397+
pkt, err := s.buffer.PeekAtSequence(uint16((i + int(seqStart)) % 0x10000))
398+
399+
if pkt != nil || err == nil {
398400
t.Errorf("Old packet (%d) is not unreferenced (maxLate: 10, pushed: 12)", i)
399401
}
400402
}
401-
if s.buffer[(14+int(seqStart))%0x10000] != pkt4 {
403+
pkt, err := s.buffer.PeekAtSequence(uint16((14 + int(seqStart)) % 0x10000))
404+
if pkt != pkt4 || err != nil {
402405
t.Error("New packet must be referenced after jump")
403406
}
404-
if s.buffer[(12+int(seqStart))%0x10000] != pkt5 {
407+
pkt, err = s.buffer.PeekAtSequence(uint16((12 + int(seqStart)) % 0x10000))
408+
if pkt != pkt5 || err != nil {
405409
t.Error("New packet must be referenced after jump")
406410
}
407411
})

0 commit comments

Comments
 (0)