@@ -18,8 +18,14 @@ import (
1818
1919 "github.com/elazarl/goproxy"
2020 "github.com/getlantern/broflake/common"
21+ "github.com/getlantern/telemetry"
2122 "github.com/google/uuid"
2223 "github.com/lucas-clemente/quic-go"
24+ "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
25+ "go.opentelemetry.io/otel"
26+ "go.opentelemetry.io/otel/metric/global"
27+ "go.opentelemetry.io/otel/metric/instrument"
28+ "go.opentelemetry.io/otel/trace"
2329 "nhooyr.io/websocket"
2430)
2531
@@ -29,6 +35,12 @@ import (
2935var nClients uint64
3036var nQUICStreams uint64
3137
38+ // TODO: it'd be more elegant to use observers rather than counters, such that we could simply
39+ // observe the value of nClients and nQUICStreams instead of duplicating the increment/decrement
40+ // operations. However, the otel observer API seems more complicated than it's worth?
41+ var nClientsCounter instrument.Int64UpDownCounter
42+ var nQUICStreamsCounter instrument.Int64UpDownCounter
43+
3244// webSocketPacketConn wraps a websocket.Conn as a net.PacketConn
3345type websocketPacketConn struct {
3446 net.PacketConn
@@ -49,6 +61,7 @@ func (q websocketPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error)
4961
5062func (q websocketPacketConn ) Close () error {
5163 defer log .Printf ("Closed a WebSocket connection! (%v total)\n " , atomic .AddUint64 (& nClients , ^ uint64 (0 )))
64+ defer nClientsCounter .Add (context .Background (), - 1 )
5265 return q .w .Close (websocket .StatusNormalClosure , "" )
5366}
5467
@@ -60,6 +73,7 @@ type proxyListener struct {
6073 net.Listener
6174 connections chan net.Conn
6275 tlsConfig * tls.Config
76+ tracer trace.Tracer
6377}
6478
6579func (l proxyListener ) Accept () (net.Conn , error ) {
@@ -71,7 +85,6 @@ func (l proxyListener) Addr() net.Addr {
7185 return common .DebugAddr ("DEBUG NELSON WUZ HERE" )
7286}
7387
74- // TODO: Someone should scrutinize this
7588func generateTLSConfig () * tls.Config {
7689 key , err := rsa .GenerateKey (rand .Reader , 1024 )
7790 if err != nil {
@@ -100,6 +113,9 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) {
100113 // patterns as strings using AcceptOptions.OriginPattern
101114 // TODO: disabling compression is a workaround for a WebKit bug:
102115 // https://github.com/getlantern/broflake/issues/45
116+ ctx := r .Context ()
117+ ctx , span := l .tracer .Start (ctx , "handleWebsocket" )
118+ defer span .End ()
103119 c , err := websocket .Accept (
104120 w ,
105121 r ,
@@ -117,23 +133,25 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) {
117133 defer wspconn .Close ()
118134
119135 if err != nil {
120- // TODO: this is the idiom for our WebSocket library, but we should log the err better
121- log .Println (err )
136+ span .RecordError (err )
122137 return
123138 }
124139
125140 log .Printf ("Accepted a new WebSocket connection! (%v total)\n " , atomic .AddUint64 (& nClients , 1 ))
141+ nClientsCounter .Add (r .Context (), 1 )
126142
127143 listener , err := quic .Listen (wspconn , l .tlsConfig , & common .QUICCfg )
128144 if err != nil {
129145 log .Printf ("Error creating QUIC listener: %v\n " , err )
146+ span .RecordError (err )
130147 return
131148 }
132149
133150 for {
134151 conn , err := listener .Accept (context .Background ())
135152 if err != nil {
136153 log .Printf ("%v QUIC listener error (%v), closing!\n " , wspconn .addr , err )
154+ span .RecordError (err )
137155 listener .Close ()
138156 break
139157 }
@@ -154,21 +172,44 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) {
154172 }
155173
156174 log .Printf ("Accepted a new QUIC stream! (%v total)\n " , atomic .AddUint64 (& nQUICStreams , 1 ))
175+ nQUICStreamsCounter .Add (r .Context (), 1 )
157176
158177 l .connections <- common.QUICStreamNetConn {Stream : stream , OnClose : func () {
159178 defer log .Printf ("Closed a QUIC stream! (%v total)\n " , atomic .AddUint64 (& nQUICStreams , ^ uint64 (0 )))
179+ nQUICStreamsCounter .Add (r .Context (), - 1 )
160180 }}
161181 }
162182 }()
163183 }
164184}
165185
166186func main () {
187+ ctx := context .Background ()
188+ closeFuncTrace := telemetry .EnableOTELTracing (ctx )
189+ closeFuncMetric := telemetry .EnableOTELMetrics (ctx )
190+ defer func () {
191+ _ = closeFuncTrace (ctx )
192+ _ = closeFuncMetric (ctx )
193+ }()
194+
195+ m := global .Meter ("github.com/getlantern/broflake/egress" )
196+ var err error
197+ nClientsCounter , err = m .Int64UpDownCounter ("websocket-counter" )
198+ if err != nil {
199+ panic (err )
200+ }
201+
202+ nQUICStreamsCounter , err = m .Int64UpDownCounter ("quic-stream-counter" )
203+ if err != nil {
204+ panic (err )
205+ }
206+
167207 // We use this wrapped listener to enable our local HTTP proxy to listen for WebSocket connections
168208 l := proxyListener {
169209 Listener : & net.TCPListener {},
170210 connections : make (chan net.Conn , 2048 ),
171211 tlsConfig : generateTLSConfig (),
212+ tracer : otel .Tracer ("websocket-tracer" ),
172213 }
173214
174215 // Instantiate our local HTTP CONNECT proxy
@@ -213,9 +254,9 @@ func main() {
213254 Addr : fmt .Sprintf (":%v" , port ),
214255 }
215256
216- http .HandleFunc ("/ws" , l .handleWebsocket )
257+ http .Handle ("/ws" , otelhttp . NewHandler ( http . HandlerFunc ( l .handleWebsocket ), "/ws" ) )
217258 log .Printf ("Egress server listening for WebSocket connections on %v\n \n " , srv .Addr )
218- err : = srv .ListenAndServe ()
259+ err = srv .ListenAndServe ()
219260 if err != nil {
220261 log .Println (err )
221262 }
0 commit comments