Skip to content

Commit bc30165

Browse files
aler9Sean-Der
authored andcommitted
Implement Sender and Receiver reports
Two interceptor are added to generate sender and receiver reports from incoming and outgoing RTP and RTCP packets.
1 parent 9e5bf02 commit bc30165

10 files changed

+1100
-0
lines changed

internal/test/mock_time.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package test
2+
3+
import (
4+
"sync"
5+
"time"
6+
)
7+
8+
// MockTime is a helper to replace time.Now() for testing purposes.
9+
type MockTime struct {
10+
m sync.RWMutex
11+
curNow time.Time
12+
}
13+
14+
// SetNow sets the current time.
15+
func (t *MockTime) SetNow(n time.Time) {
16+
t.m.Lock()
17+
defer t.m.Unlock()
18+
t.curNow = n
19+
}
20+
21+
// Now returns the current time.
22+
func (t *MockTime) Now() time.Time {
23+
t.m.RLock()
24+
defer t.m.RUnlock()
25+
return t.curNow
26+
}

pkg/report/receiver_interceptor.go

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package report
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/pion/interceptor"
8+
"github.com/pion/logging"
9+
"github.com/pion/rtcp"
10+
"github.com/pion/rtp"
11+
)
12+
13+
// ReceiverInterceptor interceptor generates receiver reports.
14+
type ReceiverInterceptor struct {
15+
interceptor.NoOp
16+
interval time.Duration
17+
now func() time.Time
18+
streams sync.Map
19+
log logging.LeveledLogger
20+
m sync.Mutex
21+
wg sync.WaitGroup
22+
close chan struct{}
23+
}
24+
25+
// NewReceiverInterceptor returns a new ReceiverInterceptor interceptor.
26+
func NewReceiverInterceptor(opts ...ReceiverOption) (*ReceiverInterceptor, error) {
27+
r := &ReceiverInterceptor{
28+
interval: 1 * time.Second,
29+
now: time.Now,
30+
log: logging.NewDefaultLoggerFactory().NewLogger("receiver_interceptor"),
31+
close: make(chan struct{}),
32+
}
33+
34+
for _, opt := range opts {
35+
if err := opt(r); err != nil {
36+
return nil, err
37+
}
38+
}
39+
40+
return r, nil
41+
}
42+
43+
func (r *ReceiverInterceptor) isClosed() bool {
44+
select {
45+
case <-r.close:
46+
return true
47+
default:
48+
return false
49+
}
50+
}
51+
52+
// Close closes the interceptor.
53+
func (r *ReceiverInterceptor) Close() error {
54+
defer r.wg.Wait()
55+
r.m.Lock()
56+
defer r.m.Unlock()
57+
58+
if !r.isClosed() {
59+
close(r.close)
60+
}
61+
62+
return nil
63+
}
64+
65+
// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
66+
// will be called once per packet batch.
67+
func (r *ReceiverInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
68+
r.m.Lock()
69+
defer r.m.Unlock()
70+
71+
if r.isClosed() {
72+
return writer
73+
}
74+
75+
r.wg.Add(1)
76+
77+
go r.loop(writer)
78+
79+
return writer
80+
}
81+
82+
func (r *ReceiverInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
83+
defer r.wg.Done()
84+
85+
ticker := time.NewTicker(r.interval)
86+
for {
87+
select {
88+
case <-ticker.C:
89+
now := r.now()
90+
r.streams.Range(func(key, value interface{}) bool {
91+
stream := value.(*receiverStream)
92+
93+
var pkts []rtcp.Packet
94+
95+
pkts = append(pkts, stream.generateReport(now))
96+
97+
if _, err := rtcpWriter.Write(pkts, interceptor.Attributes{}); err != nil {
98+
r.log.Warnf("failed sending: %+v", err)
99+
}
100+
101+
return true
102+
})
103+
104+
case <-r.close:
105+
return
106+
}
107+
}
108+
}
109+
110+
// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
111+
// will be called once per rtp packet.
112+
func (r *ReceiverInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
113+
stream := newReceiverStream(info.SSRC, info.ClockRate)
114+
r.streams.Store(info.SSRC, stream)
115+
116+
return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
117+
i, attr, err := reader.Read(b, a)
118+
if err != nil {
119+
return 0, nil, err
120+
}
121+
122+
pkt := rtp.Packet{}
123+
if err = pkt.Unmarshal(b[:i]); err != nil {
124+
return 0, nil, err
125+
}
126+
127+
stream.processRTP(r.now(), &pkt)
128+
129+
return i, attr, nil
130+
})
131+
}
132+
133+
// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
134+
func (r *ReceiverInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
135+
r.streams.Delete(info.SSRC)
136+
}
137+
138+
// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
139+
// change in the future. The returned method will be called once per packet batch.
140+
func (r *ReceiverInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
141+
return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
142+
i, attr, err := reader.Read(b, a)
143+
if err != nil {
144+
return 0, nil, err
145+
}
146+
147+
pkts, err := rtcp.Unmarshal(b[:i])
148+
if err != nil {
149+
return 0, nil, err
150+
}
151+
152+
for _, pkt := range pkts {
153+
if sr, ok := (pkt).(*rtcp.SenderReport); ok {
154+
value, ok := r.streams.Load(sr.SSRC)
155+
if !ok {
156+
continue
157+
}
158+
159+
stream := value.(*receiverStream)
160+
stream.processSenderReport(r.now(), sr)
161+
}
162+
}
163+
164+
return i, attr, nil
165+
})
166+
}

0 commit comments

Comments
 (0)