@@ -23,6 +23,8 @@ import (
2323 "github.com/lucas-clemente/quic-go"
2424 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
2525 "go.opentelemetry.io/otel"
26+ "go.opentelemetry.io/otel/metric/global"
27+ "go.opentelemetry.io/otel/metric/instrument"
2628 "go.opentelemetry.io/otel/trace"
2729 "nhooyr.io/websocket"
2830)
@@ -33,6 +35,12 @@ import (
3335var nClients uint64
3436var nQUICStreams uint64
3537
38+ // TODO: it'd be more elegant to use observers rather than counters, such that we could simply
39+ // observe the value of nClients and nQUICStreams instead of duplicating the increment/decrement
40+ // operations. However, the otel observer API seems more complicated than it's worth?
41+ var nClientsCounter instrument.Int64UpDownCounter
42+ var nQUICStreamsCounter instrument.Int64UpDownCounter
43+
3644// webSocketPacketConn wraps a websocket.Conn as a net.PacketConn
3745type websocketPacketConn struct {
3846 net.PacketConn
@@ -53,6 +61,7 @@ func (q websocketPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error)
5361
5462func (q websocketPacketConn ) Close () error {
5563 defer log .Printf ("Closed a WebSocket connection! (%v total)\n " , atomic .AddUint64 (& nClients , ^ uint64 (0 )))
64+ defer nClientsCounter .Add (context .Background (), - 1 )
5665 return q .w .Close (websocket .StatusNormalClosure , "" )
5766}
5867
@@ -76,7 +85,6 @@ func (l proxyListener) Addr() net.Addr {
7685 return common .DebugAddr ("DEBUG NELSON WUZ HERE" )
7786}
7887
79- // TODO: Someone should scrutinize this
8088func generateTLSConfig () * tls.Config {
8189 key , err := rsa .GenerateKey (rand .Reader , 1024 )
8290 if err != nil {
@@ -130,6 +138,7 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) {
130138 }
131139
132140 log .Printf ("Accepted a new WebSocket connection! (%v total)\n " , atomic .AddUint64 (& nClients , 1 ))
141+ nClientsCounter .Add (r .Context (), 1 )
133142
134143 listener , err := quic .Listen (wspconn , l .tlsConfig , & common .QUICCfg )
135144 if err != nil {
@@ -163,9 +172,11 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) {
163172 }
164173
165174 log .Printf ("Accepted a new QUIC stream! (%v total)\n " , atomic .AddUint64 (& nQUICStreams , 1 ))
175+ nQUICStreamsCounter .Add (r .Context (), 1 )
166176
167177 l .connections <- common.QUICStreamNetConn {Stream : stream , OnClose : func () {
168178 defer log .Printf ("Closed a QUIC stream! (%v total)\n " , atomic .AddUint64 (& nQUICStreams , ^ uint64 (0 )))
179+ nQUICStreamsCounter .Add (r .Context (), - 1 )
169180 }}
170181 }
171182 }()
@@ -174,8 +185,24 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) {
174185
175186func main () {
176187 ctx := context .Background ()
177- closeFunc := telemetry .EnableOTELTracing (ctx )
178- defer func () { _ = closeFunc (ctx ) }()
188+ closeFuncTrace := telemetry .EnableOTELTracing (ctx )
189+ closeFuncMetric := telemetry .EnableOTELMetrics (ctx )
190+ defer func () {
191+ _ = closeFuncTrace (ctx )
192+ _ = closeFuncMetric (ctx )
193+ }()
194+
195+ m := global .Meter ("github.com/getlantern/broflake/egress" )
196+ var err error
197+ nClientsCounter , err = m .Int64UpDownCounter ("websocket-counter" )
198+ if err != nil {
199+ panic (err )
200+ }
201+
202+ nQUICStreamsCounter , err = m .Int64UpDownCounter ("quic-stream-counter" )
203+ if err != nil {
204+ panic (err )
205+ }
179206
180207 // We use this wrapped listener to enable our local HTTP proxy to listen for WebSocket connections
181208 l := proxyListener {
@@ -229,7 +256,7 @@ func main() {
229256
230257 http .Handle ("/ws" , otelhttp .NewHandler (http .HandlerFunc (l .handleWebsocket ), "/ws" ))
231258 log .Printf ("Egress server listening for WebSocket connections on %v\n \n " , srv .Addr )
232- err : = srv .ListenAndServe ()
259+ err = srv .ListenAndServe ()
233260 if err != nil {
234261 log .Println (err )
235262 }
0 commit comments