Skip to content

Commit 5003ed5

Browse files
committed
FEC interceptor enhancements
1 parent 8d3fc6d commit 5003ed5

File tree

8 files changed

+523
-40
lines changed

8 files changed

+523
-40
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Pion Interceptor
44
<br>
55
</h1>
6-
<h4 align="center">RTCP and RTCP processors for building real time communications</h4>
6+
<h4 align="center">RTP and RTCP processors for building real time communications</h4>
77
<p align="center">
88
<a href="https://pion.ly"><img src="https://img.shields.io/badge/pion-interceptor-gray.svg?longCache=true&colorB=brightgreen" alt="Pion Interceptor"></a>
99
<a href="https://discord.gg/PngbdqpFbt"><img src="https://img.shields.io/badge/join-us%20on%20discord-gray.svg?longCache=true&logo=discord&colorB=brightblue" alt="join us on Discord"></a> <a href="https://bsky.app/profile/pion.ly"><img src="https://img.shields.io/badge/follow-us%20on%20bluesky-gray.svg?longCache=true&logo=bluesky&colorB=brightblue" alt="Follow us on Bluesky"></a>
@@ -36,12 +36,12 @@ by anyone. With the following tenets in mind.
3636
* [Google Congestion Control](https://github.com/pion/interceptor/tree/master/pkg/gcc)
3737
* [Stats](https://github.com/pion/interceptor/tree/master/pkg/stats) A [webrtc-stats](https://www.w3.org/TR/webrtc-stats/) compliant statistics generation
3838
* [Interval PLI](https://github.com/pion/interceptor/tree/master/pkg/intervalpli) Generate PLI on a interval. Useful when no decoder is available.
39+
* [FlexFec](https://github.com/pion/interceptor/tree/master/pkg/flexfec)[FlexFEC-03](https://datatracker.ietf.org/doc/html/draft-ietf-payload-flexible-fec-scheme-03) encoder implementation
3940

4041
### Planned Interceptors
4142
* Bandwidth Estimation
4243
- [NADA](https://tools.ietf.org/html/rfc8698)
4344
* JitterBuffer, re-order packets and wait for arrival
44-
* [FlexFec](https://tools.ietf.org/html/draft-ietf-payload-flexible-fec-scheme-20)
4545
* [RTCP Feedback for Congestion Control](https://datatracker.ietf.org/doc/html/rfc8888) the standardized alternative to TWCC.
4646

4747
### Interceptor Public API

pkg/flexfec/encoder_interceptor.go

Lines changed: 74 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,30 @@
44
package flexfec
55

66
import (
7+
"errors"
8+
"sync"
9+
710
"github.com/pion/interceptor"
811
"github.com/pion/rtp"
912
)
1013

14+
// streamState holds the state for a single stream.
15+
type streamState struct {
16+
mu sync.Mutex
17+
flexFecEncoder FlexEncoder
18+
packetBuffer []rtp.Packet
19+
}
20+
1121
// FecInterceptor implements FlexFec.
1222
type FecInterceptor struct {
1323
interceptor.NoOp
14-
flexFecEncoder FlexEncoder
15-
packetBuffer []rtp.Packet
16-
minNumMediaPackets uint32
24+
mu sync.Mutex
25+
streams map[uint32]*streamState
26+
numMediaPackets uint32
27+
numFecPackets uint32
28+
encoderFactory EncoderFactory
1729
}
1830

19-
// FecOption can be used to set initial options on Fec encoder interceptors.
20-
type FecOption func(d *FecInterceptor) error
21-
2231
// FecInterceptorFactory creates new FecInterceptors.
2332
type FecInterceptorFactory struct {
2433
opts []FecOption
@@ -31,54 +40,89 @@ func NewFecInterceptor(opts ...FecOption) (*FecInterceptorFactory, error) {
3140

3241
// NewInterceptor constructs a new FecInterceptor.
3342
func (r *FecInterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
34-
// Hardcoded for now:
35-
// Min num media packets to encode FEC -> 5
36-
// Min num fec packets -> 1
37-
3843
interceptor := &FecInterceptor{
39-
packetBuffer: make([]rtp.Packet, 0),
40-
minNumMediaPackets: 5,
44+
streams: make(map[uint32]*streamState),
45+
numMediaPackets: 5,
46+
numFecPackets: 2,
47+
encoderFactory: FlexEncoder03Factory{},
48+
}
49+
50+
for _, opt := range r.opts {
51+
if err := opt(interceptor); err != nil {
52+
return nil, err
53+
}
4154
}
4255

4356
return interceptor, nil
4457
}
4558

59+
// UnbindLocalStream removes the stream state for a specific SSRC.
60+
func (r *FecInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
61+
r.mu.Lock()
62+
defer r.mu.Unlock()
63+
64+
delete(r.streams, info.SSRC)
65+
}
66+
4667
// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
4768
// will be called once per rtp packet.
4869
func (r *FecInterceptor) BindLocalStream(
4970
info *interceptor.StreamInfo, writer interceptor.RTPWriter,
5071
) interceptor.RTPWriter {
51-
// Chromium supports version flexfec-03 of existing draft, this is the one we will configure by default
52-
// although we should support configuring the latest (flexfec-20) as well.
53-
r.flexFecEncoder = NewFlexEncoder03(info.PayloadType, info.SSRC)
72+
if info.PayloadTypeForwardErrorCorrection == 0 || info.SSRCForwardErrorCorrection == 0 {
73+
return writer
74+
}
75+
76+
mediaSSRC := info.SSRC
77+
78+
r.mu.Lock()
79+
stream := &streamState{
80+
// Chromium supports version flexfec-03 of existing draft, this is the one we will configure by default
81+
// although we should support configuring the latest (flexfec-20) as well.
82+
flexFecEncoder: r.encoderFactory.NewEncoder(info.PayloadTypeForwardErrorCorrection, info.SSRCForwardErrorCorrection),
83+
packetBuffer: make([]rtp.Packet, 0),
84+
}
85+
r.streams[mediaSSRC] = stream
86+
r.mu.Unlock()
5487

5588
return interceptor.RTPWriterFunc(
5689
func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
57-
r.packetBuffer = append(r.packetBuffer, rtp.Packet{
90+
// Ignore non-media packets
91+
if header.SSRC != mediaSSRC {
92+
return writer.Write(header, payload, attributes)
93+
}
94+
95+
var fecPackets []rtp.Packet
96+
stream.mu.Lock()
97+
stream.packetBuffer = append(stream.packetBuffer, rtp.Packet{
5898
Header: *header,
5999
Payload: payload,
60100
})
61101

62-
// Send the media RTP packet
63-
result, err := writer.Write(header, payload, attributes)
102+
// Check if we have enough packets to generate FEC
103+
if len(stream.packetBuffer) == int(r.numMediaPackets) {
104+
fecPackets = stream.flexFecEncoder.EncodeFec(stream.packetBuffer, r.numFecPackets)
105+
// Reset the packet buffer now that we've sent the corresponding FEC packets.
106+
stream.packetBuffer = nil
107+
}
108+
stream.mu.Unlock()
64109

65-
// Send the FEC packets
66-
var fecPackets []rtp.Packet
67-
if len(r.packetBuffer) == int(r.minNumMediaPackets) {
68-
fecPackets = r.flexFecEncoder.EncodeFec(r.packetBuffer, 2)
110+
var errs []error
111+
result, err := writer.Write(header, payload, attributes)
112+
if err != nil {
113+
errs = append(errs, err)
114+
}
69115

70-
for i := range fecPackets {
71-
fecResult, fecErr := writer.Write(&(fecPackets[i].Header), fecPackets[i].Payload, attributes)
116+
for _, packet := range fecPackets {
117+
header := packet.Header
72118

73-
if fecErr != nil && fecResult == 0 {
74-
break
75-
}
119+
_, err = writer.Write(&header, packet.Payload, attributes)
120+
if err != nil {
121+
errs = append(errs, err)
76122
}
77-
// Reset the packet buffer now that we've sent the corresponding FEC packets.
78-
r.packetBuffer = nil
79123
}
80124

81-
return result, err
125+
return result, errors.Join(errs...)
82126
},
83127
)
84128
}

0 commit comments

Comments
 (0)