@@ -16,7 +16,7 @@ const transportCCURI = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide
16
16
type ccfbAttributesKeyType uint32
17
17
18
18
// CCFBAttributesKey is the key which can be used to retrieve the Report objects
19
- // from the interceptor.Attributes
19
+ // from the interceptor.Attributes.
20
20
const CCFBAttributesKey ccfbAttributesKeyType = iota
21
21
22
22
// A Report contains Arrival and Departure (from the remote end) times of a RTCP
@@ -33,60 +33,66 @@ type history interface {
33
33
getReportForAck ([]acknowledgement ) []PacketReport
34
34
}
35
35
36
- // Option can be used to set initial options on CCFB interceptors
36
+ // Option can be used to set initial options on CCFB interceptors.
37
37
type Option func (* Interceptor ) error
38
38
39
39
// HistorySize sets the size of the history of outgoing packets.
40
40
func HistorySize (size int ) Option {
41
41
return func (i * Interceptor ) error {
42
42
i .historySize = size
43
+
43
44
return nil
44
45
}
45
46
}
46
47
47
48
func timeFactory (f func () time.Time ) Option {
48
49
return func (i * Interceptor ) error {
49
50
i .timestamp = f
51
+
50
52
return nil
51
53
}
52
54
}
53
55
54
56
func historyFactory (f func (int ) history ) Option {
55
57
return func (i * Interceptor ) error {
56
58
i .historyFactory = f
59
+
57
60
return nil
58
61
}
59
62
}
60
63
64
+ // nolint
61
65
func ccfbConverterFactory (f func (ts time.Time , feedback * rtcp.CCFeedbackReport ) (time.Time , map [uint32 ][]acknowledgement )) Option {
62
66
return func (i * Interceptor ) error {
63
67
i .convertCCFB = f
68
+
64
69
return nil
65
70
}
66
71
}
67
72
68
73
func twccConverterFactory (f func (feedback * rtcp.TransportLayerCC ) (time.Time , map [uint32 ][]acknowledgement )) Option {
69
74
return func (i * Interceptor ) error {
70
75
i .convertTWCC = f
76
+
71
77
return nil
72
78
}
73
79
}
74
80
75
- // InterceptorFactory is a factory for CCFB interceptors
81
+ // InterceptorFactory is a factory for CCFB interceptors.
76
82
type InterceptorFactory struct {
77
83
opts []Option
78
84
}
79
85
80
- // NewInterceptor returns a new CCFB InterceptorFactory
86
+ // NewInterceptor returns a new CCFB InterceptorFactory.
81
87
func NewInterceptor (opts ... Option ) (* InterceptorFactory , error ) {
82
88
return & InterceptorFactory {
83
89
opts : opts ,
84
90
}, nil
85
91
}
86
92
87
- // NewInterceptor returns a new ccfb.Interceptor
93
+ // NewInterceptor returns a new ccfb.Interceptor.
88
94
func (f * InterceptorFactory ) NewInterceptor (_ string ) (interceptor.Interceptor , error ) {
89
- i := & Interceptor {
95
+ in := & Interceptor {
90
96
NoOp : interceptor.NoOp {},
91
97
lock : sync.Mutex {},
92
98
log : logging .NewDefaultLoggerFactory ().NewLogger ("ccfb_interceptor" ),
@@ -100,11 +106,12 @@ func (f *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor,
100
106
},
101
107
}
102
108
for _ , opt := range f .opts {
103
- if err := opt (i ); err != nil {
109
+ if err := opt (in ); err != nil {
104
110
return nil , err
105
111
}
106
112
}
107
- return i , nil
113
+
114
+ return in , nil
108
115
}
109
116
110
117
// Interceptor implements a congestion control feedback receiver. It keeps track
@@ -129,13 +136,17 @@ type Interceptor struct {
129
136
}
130
137
131
138
// BindLocalStream implements interceptor.Interceptor.
132
- func (i * Interceptor ) BindLocalStream (info * interceptor.StreamInfo , writer interceptor.RTPWriter ) interceptor.RTPWriter {
139
+ func (i * Interceptor ) BindLocalStream (
140
+ info * interceptor.StreamInfo ,
141
+ writer interceptor.RTPWriter ,
142
+ ) interceptor.RTPWriter {
133
143
var twccHdrExtID uint8
134
144
var useTWCC bool
135
145
for _ , e := range info .RTPHeaderExtensions {
136
146
if e .URI == transportCCURI {
137
147
twccHdrExtID = uint8 (e .ID ) // nolint:gosec
138
148
useTWCC = true
149
+
139
150
break
140
151
}
141
152
}
@@ -149,6 +160,7 @@ func (i *Interceptor) BindLocalStream(info *interceptor.StreamInfo, writer inter
149
160
}
150
161
i .ssrcToHistory [ssrc ] = i .historyFactory (i .historySize )
151
162
163
+ // nolint
152
164
return interceptor .RTPWriterFunc (func (header * rtp.Header , payload []byte , attributes interceptor.Attributes ) (int , error ) {
153
165
i .lock .Lock ()
154
166
defer i .lock .Unlock ()
@@ -162,7 +174,11 @@ func (i *Interceptor) BindLocalStream(info *interceptor.StreamInfo, writer inter
162
174
if useTWCC {
163
175
var twccHdrExt rtp.TransportCCExtension
164
176
if err := twccHdrExt .Unmarshal (header .GetExtension (twccHdrExtID )); err != nil {
165
- i .log .Warnf ("CCFB configured for TWCC, but failed to get TWCC header extension from outgoing packet. Falling back to saving history for CCFB feedback reports. err: %v" , err )
177
+ i .log .Warnf (
178
+ "CCFB configured for TWCC, but failed to get TWCC header extension from outgoing packet." +
179
+ "Falling back to saving history for CCFB feedback reports. err: %v" ,
180
+ err ,
181
+ )
166
182
if _ , ok := i .ssrcToHistory [ssrc ]; ! ok {
167
183
i .ssrcToHistory [ssrc ] = i .historyFactory (i .historySize )
168
184
}
@@ -174,6 +190,7 @@ func (i *Interceptor) BindLocalStream(info *interceptor.StreamInfo, writer inter
174
190
if err := i .ssrcToHistory [ssrc ].add (seqNr , header .MarshalSize ()+ len (payload ), i .timestamp ()); err != nil {
175
191
return 0 , err
176
192
}
193
+
177
194
return writer .Write (header , payload , attributes )
178
195
})
179
196
}
@@ -226,6 +243,7 @@ func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.
226
243
})
227
244
}
228
245
attr .Set (CCFBAttributesKey , res )
246
+
229
247
return n , attr , err
230
248
})
231
249
}
0 commit comments