diff --git a/clientcore/broflake.go b/clientcore/broflake.go index 56980c65..eff0d705 100644 --- a/clientcore/broflake.go +++ b/clientcore/broflake.go @@ -3,6 +3,7 @@ package clientcore import ( "fmt" + "net/http" "runtime" "sync" "time" @@ -12,17 +13,18 @@ import ( ) type BroflakeEngine struct { - cTable *WorkerTable - pTable *WorkerTable - ui UI - wg *sync.WaitGroup - netstated string - tag string - netstateHeartbeat time.Duration - netstateStop chan struct{} + cTable *WorkerTable + pTable *WorkerTable + ui UI + wg *sync.WaitGroup + netstated string + tag string + netstateHeartbeat time.Duration + netstateStop chan struct{} + netstateHttpClient *http.Client } -func NewBroflakeEngine(cTable, pTable *WorkerTable, ui UI, wg *sync.WaitGroup, netstated, tag string) *BroflakeEngine { +func NewBroflakeEngine(cTable, pTable *WorkerTable, ui UI, wg *sync.WaitGroup, netstated, tag string, httpClient *http.Client) *BroflakeEngine { return &BroflakeEngine{ cTable, pTable, @@ -32,6 +34,7 @@ func NewBroflakeEngine(cTable, pTable *WorkerTable, ui UI, wg *sync.WaitGroup, n tag, 1 * time.Minute, make(chan struct{}, 0), + httpClient, } } @@ -47,6 +50,7 @@ func (b *BroflakeEngine) start() { for { common.Debug("Netstate HEARTBEAT") err := netstatecl.Exec( + b.netstateHttpClient, b.netstated, &netstatecl.Instruction{ Op: netstatecl.OpConsumerState, @@ -71,10 +75,12 @@ func (b *BroflakeEngine) start() { } } -func (b *BroflakeEngine) stop() { +func (b *BroflakeEngine) stop() <-chan struct{} { b.cTable.Stop() b.pTable.Stop() + stopped := make(chan struct{}) + go func() { b.wg.Wait() @@ -84,7 +90,15 @@ func (b *BroflakeEngine) stop() { common.Debug("■ Broflake stopped.") b.ui.OnReady() + + close(stopped) }() + + return stopped +} + +func (b *BroflakeEngine) Close() <-chan struct{} { + return b.stop() } func (b *BroflakeEngine) debug() { @@ -92,7 +106,7 @@ func (b *BroflakeEngine) debug() { } func NewBroflake(bfOpt *BroflakeOptions, rtcOpt *WebRTCOptions, egOpt *EgressOptions) (bfconn *BroflakeConn, ui *UIImpl, err error) { - if bfOpt.ClientType != "desktop" && bfOpt.ClientType != "widget" { + if bfOpt.ClientType != "desktop" && bfOpt.ClientType != "widget" && bfOpt.ClientType != "singbox-inbound" { err = fmt.Errorf("invalid clientType '%v\n'", bfOpt.ClientType) common.Debugf(err.Error()) return bfconn, ui, err @@ -164,10 +178,22 @@ func NewBroflake(bfOpt *BroflakeOptions, rtcOpt *WebRTCOptions, egOpt *EgressOpt } } pTable = NewWorkerTable(pfsms) + case "singbox-inbound": // only used in radiance/sing-box-extensions as an inbound protocol + // singbox-inbound peers share connectivity over WebRTC + var cfsms []WorkerFSM + for i := 0; i < bfOpt.CTableSize; i++ { + cfsms = append(cfsms, *NewProducerWebRTC(rtcOpt, &wgReady)) + } + cTable = NewWorkerTable(cfsms) + + // singbox-inbound peers consume connectivity from bfconn + var consumerUserStream *WorkerFSM + bfconn, consumerUserStream = NewConsumerUserStream(&wgReady) + pTable = NewWorkerTable([]WorkerFSM{*consumerUserStream}) } // Step 2: Build Broflake - broflake := NewBroflakeEngine(cTable, pTable, ui, &wgReady, bfOpt.Netstated, rtcOpt.Tag) + broflake := NewBroflakeEngine(cTable, pTable, ui, &wgReady, bfOpt.Netstated, rtcOpt.Tag, bfOpt.NetstateHttpClient) // Step 3: Init the UI (this constructs and exposes the JavaScript API as required) ui.Init(broflake) @@ -175,7 +201,7 @@ func NewBroflake(bfOpt *BroflakeOptions, rtcOpt *WebRTCOptions, egOpt *EgressOpt // Step 4: Set up the bus, bind upstream and downstream UI handlers var bus = NewIpcObserver( bfOpt.BusBufferSz, - UpstreamUIHandler(*ui, bfOpt.Netstated, rtcOpt.Tag), + UpstreamUIHandler(*ui, bfOpt.Netstated, rtcOpt.Tag, bfOpt.NetstateHttpClient), DownstreamUIHandler(*ui, bfOpt.Netstated, rtcOpt.Tag), ) @@ -187,6 +213,10 @@ func NewBroflake(bfOpt *BroflakeOptions, rtcOpt *WebRTCOptions, egOpt *EgressOpt case "widget": cRouter = NewConsumerRouter(bus.Downstream, cTable) pRouter = NewProducerPoolRouter(bus.Upstream, pTable) + case "singbox-inbound": + cRouter = NewConsumerRouter(bus.Downstream, cTable) + // TODO: use of NewProducerSerialRouter() won't work. Find out why + pRouter = NewProducerPoolRouter(bus.Upstream, pTable) } // Step 6: Start the bus, init the routers, fire our UI events to announce that we're ready diff --git a/clientcore/consumer.go b/clientcore/consumer.go index caf47432..7504baf6 100644 --- a/clientcore/consumer.go +++ b/clientcore/consumer.go @@ -32,6 +32,12 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { // (no input data) common.Debugf("Consumer state 0, constructing RTCPeerConnection...") + select { + case <-ctx.Done(): + return 0, []interface{}{} + default: + } + // We're resetting this slot, so send a nil path assertion IPC message com.tx <- IPCMsg{IpcType: PathAssertionIPC, Data: common.PathAssertion{}} @@ -66,7 +72,7 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { // peerConnection, err := webrtcAPI.NewPeerConnection(config) // Construct the RTCPeerConnection - peerConnection, err := webrtc.NewPeerConnection(config) + peerConnection, err := createPeerConnection(options.UDPConn, config) if err != nil { common.Debugf("Error creating RTCPeerConnection: %v", err) return 0, []interface{}{} @@ -153,7 +159,11 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { res, err := options.HttpClient.Do(req) if err != nil { common.Debugf("Couldn't subscribe to genesis stream at %v: %v", options.DiscoverySrv+options.Endpoint, err) - <-time.After(options.ErrorBackoff) + select { + case <-ctx.Done(): + return 0, []interface{}{} + case <-time.After(options.ErrorBackoff): + } return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} } defer res.Body.Close() @@ -161,7 +171,11 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { // Handle bad protocol version if res.StatusCode == http.StatusTeapot { common.Debugf("Received 'bad protocol version' response") - <-time.After(options.ErrorBackoff) + select { + case <-ctx.Done(): + return 0, []interface{}{} + case <-time.After(options.ErrorBackoff): + } return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} } @@ -205,7 +219,11 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { rt, _, err := common.DecodeSignalMsg(rawMsg) if err != nil { common.Debugf("Error decoding signal message: %v (msg: %v)", err, string(rawMsg)) - <-time.After(options.ErrorBackoff) + select { + case <-ctx.Done(): + return 0, []interface{}{} + case <-time.After(options.ErrorBackoff): + } // Take the error in stride, continue listening to our existing HTTP request stream continue } @@ -219,6 +237,8 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { break listenLoop case <-patienceExpired: break listenLoop + case <-ctx.Done(): + return 0, []interface{}{} } } @@ -302,7 +322,11 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { res, err := options.HttpClient.Do(req) if err != nil { common.Debugf("Couldn't signal offer SDP to %v: %v", options.DiscoverySrv+options.Endpoint, err) - <-time.After(options.ErrorBackoff) + select { + case <-ctx.Done(): + return 0, []interface{}{} + case <-time.After(options.ErrorBackoff): + } return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} } defer res.Body.Close() @@ -310,7 +334,11 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { switch res.StatusCode { case http.StatusTeapot: common.Debugf("Received 'bad protocol version' response") - <-time.After(options.ErrorBackoff) + select { + case <-ctx.Done(): + return 0, []interface{}{} + case <-time.After(options.ErrorBackoff): + } return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} case http.StatusNotFound: // We didn't win the connection @@ -408,6 +436,9 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { // Borked! peerConnection.Close() // TODO: there's an err we should handle here return 0, []interface{}{} + case <-ctx.Done(): + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} } return 3, []interface{}{peerConnection, replyTo, candidates, connectionEstablished, connectionChange, connectionClosed} @@ -461,7 +492,10 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { res, err := options.HttpClient.Do(req) if err != nil { common.Debugf("Couldn't signal ICE candidates to %v: %v", options.DiscoverySrv+options.Endpoint, err) - <-time.After(options.ErrorBackoff) + select { + case <-ctx.Done(): + case <-time.After(options.ErrorBackoff): + } // Borked! peerConnection.Close() // TODO: there's an err we should handle here return 0, []interface{}{} @@ -471,7 +505,10 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { switch res.StatusCode { case http.StatusTeapot: common.Debugf("Received 'bad protocol version' response") - <-time.After(options.ErrorBackoff) + select { + case <-ctx.Done(): + case <-time.After(options.ErrorBackoff): + } // Borked! peerConnection.Close() // TODO: there's an err we should handle here return 0, []interface{}{} @@ -518,6 +555,9 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { // Borked! peerConnection.Close() // TODO: there's an err we should handle here return 0, []interface{}{} + case <-ctx.Done(): + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} } }), FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { diff --git a/clientcore/producer.go b/clientcore/producer.go index 57f5760f..498eb07f 100644 --- a/clientcore/producer.go +++ b/clientcore/producer.go @@ -30,6 +30,12 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { // (no input data) common.Debugf("Producer state 0, constructing RTCPeerConnection...") + select { + case <-ctx.Done(): + return 0, []interface{}{} + default: + } + // Populate the STUN cache if necessary if scache.size() == 0 { allSTUNSrvs, err := options.STUNBatch(math.MaxInt32) @@ -61,7 +67,7 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { // peerConnection, err := webrtcAPI.NewPeerConnection(config) // Construct the RTCPeerConnection - peerConnection, err := webrtc.NewPeerConnection(config) + peerConnection, err := createPeerConnection(options.UDPConn, config) if err != nil { common.Debugf("Error creating RTCPeerConnection: %v", err) return 0, []interface{}{} @@ -127,6 +133,12 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { connectionClosed := input[3].(chan struct{}) common.Debugf("Producer state 1...") + select { + case <-ctx.Done(): + return 0, []interface{}{} + default: + } + // Do we have a non-nil path assertion, indicating that we have upstream connectivity to share? // We find out by sending an ConnectivityCheckIPC message, which asks the process responsible // for path assertions to send a message reflecting the current state of our path assertion. @@ -192,7 +204,11 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { res, err := options.HttpClient.Do(req) if err != nil { common.Debugf("Couldn't signal genesis message to %v: %v", options.DiscoverySrv+options.Endpoint, err) - <-time.After(options.ErrorBackoff) + select { + case <-ctx.Done(): + return 0, []interface{}{} + case <-time.After(options.ErrorBackoff): + } return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} } defer res.Body.Close() @@ -202,7 +218,11 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { // Handle bad protocol version if res.StatusCode == http.StatusTeapot { common.Debugf("Received 'bad protocol version' response") - <-time.After(options.ErrorBackoff) + select { + case <-ctx.Done(): + return 0, []interface{}{} + case <-time.After(options.ErrorBackoff): + } return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed} } @@ -282,10 +302,12 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { case <-time.After(options.ICEFailTimeout): common.Debugf("Timeout, aborting ICE gathering!") scache.drop() - - // Borked! peerConnection.Close() // TODO: there's an err we should handle here return 0, []interface{}{} + case <-ctx.Done(): + scache.drop() + peerConnection.Close() + return 0, []interface{}{} } // TODO: To maintain role agnosticism, we must assume that a producer can be censored, and @@ -328,7 +350,10 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { res, err := options.HttpClient.Do(req) if err != nil { common.Debugf("Couldn't signal answer SDP to %v: %v", options.DiscoverySrv+options.Endpoint, err) - <-time.After(options.ErrorBackoff) + select { + case <-ctx.Done(): + case <-time.After(options.ErrorBackoff): + } // Borked! peerConnection.Close() // TODO: there's an err we should handle here return 0, []interface{}{} @@ -338,7 +363,10 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { switch res.StatusCode { case 418: common.Debugf("Received 'bad protocol version' response") - <-time.After(options.ErrorBackoff) + select { + case <-ctx.Done(): + case <-time.After(options.ErrorBackoff): + } // Borked! peerConnection.Close() // TODO: there's an err we should handle here return 0, []interface{}{} @@ -455,6 +483,9 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { // Borked! peerConnection.Close() // TODO: there's an err we should handle here return 0, []interface{}{} + case <-ctx.Done(): + peerConnection.Close() // TODO: there's an err we should handle here + return 0, []interface{}{} } // XXX: This loop represents an alternate strategy for detecting NAT traversal success or @@ -504,6 +535,13 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM { offer := input[5].(common.OfferMsg) common.Debugf("Producer state 5...") + select { + case <-ctx.Done(): + peerConnection.Close() + return 0, []interface{}{} + default: + } + // Announce the new connectivity situation for this slot com.tx <- IPCMsg{ IpcType: ConsumerInfoIPC, diff --git a/clientcore/quic.go b/clientcore/quic.go index 4ff074d0..d778328d 100644 --- a/clientcore/quic.go +++ b/clientcore/quic.go @@ -69,7 +69,7 @@ type QUICLayer struct { // DialAndMaintainQUICConnection attempts to create and maintain an e2e QUIC connection by dialing // the other end, detecting if that connection breaks, and redialing. Forever. -func (c *QUICLayer) DialAndMaintainQUICConnection() { +func (c *QUICLayer) DialAndMaintainQUICConnection(connChan chan quic.Connection) { c.ctx, c.cancel = context.WithCancel(context.Background()) // State 1 of 2: Keep dialing until we acquire a connection @@ -99,6 +99,9 @@ func (c *QUICLayer) DialAndMaintainQUICConnection() { }() select { + case <-c.ctx.Done(): + common.Debugf("Cancelling QUIC dialer!") + return case err := <-connErr: common.Debugf("QUIC dial failed (%v), retrying...", err) case conn := <-connEstablished: @@ -106,6 +109,10 @@ func (c *QUICLayer) DialAndMaintainQUICConnection() { c.eventualConn.set(conn) c.mx.Unlock() common.Debug("QUIC connection established, ready to proxy!") + // announce the QUIC connection over the channel + if connChan != nil { + connChan <- conn + } // State 2 of 2: Connection established, block until we detect a half open or a ctx cancel _, err := conn.AcceptStream(c.ctx) diff --git a/clientcore/settings.go b/clientcore/settings.go index e4e5dcfa..0d5eba7c 100644 --- a/clientcore/settings.go +++ b/clientcore/settings.go @@ -4,6 +4,7 @@ import ( "bufio" "fmt" "math/rand" + "net" "net/http" "time" ) @@ -17,7 +18,8 @@ type WebRTCOptions struct { STUNBatch func(size uint32) (batch []string, err error) STUNBatchSize uint32 Tag string - HttpClient *http.Client + HttpClient *http.Client // for communicating with signal server + UDPConn net.PacketConn // for WebRTC communication Patience time.Duration ErrorBackoff time.Duration } @@ -94,21 +96,23 @@ func NewDefaultWebTransportEgressOptions(ca []byte) *EgressOptions { } type BroflakeOptions struct { - ClientType string - CTableSize int - PTableSize int - BusBufferSz int - Netstated string - WebTransport bool + ClientType string + CTableSize int + PTableSize int + BusBufferSz int + Netstated string + WebTransport bool + NetstateHttpClient *http.Client // for making http requests to netstated } func NewDefaultBroflakeOptions() *BroflakeOptions { return &BroflakeOptions{ - ClientType: "desktop", - CTableSize: 5, - PTableSize: 5, - BusBufferSz: 4096, - Netstated: "", + ClientType: "desktop", + CTableSize: 5, + PTableSize: 5, + BusBufferSz: 4096, + Netstated: "", + NetstateHttpClient: &http.Client{}, } } diff --git a/clientcore/ui.go b/clientcore/ui.go index 5aa248b4..fe5d71da 100644 --- a/clientcore/ui.go +++ b/clientcore/ui.go @@ -3,6 +3,7 @@ package clientcore import ( "net" + "net/http" "strconv" "sync" "sync/atomic" @@ -99,7 +100,7 @@ func DownstreamUIHandler(ui UIImpl, netstated, tag string) func(msg IPCMsg) { } } -func UpstreamUIHandler(ui UIImpl, netstated, tag string) func(msg IPCMsg) { +func UpstreamUIHandler(ui UIImpl, netstated, tag string, httpClient *http.Client) func(msg IPCMsg) { return func(msg IPCMsg) { switch msg.IpcType { case ConsumerInfoIPC: @@ -128,6 +129,7 @@ func UpstreamUIHandler(ui UIImpl, netstated, tag string) func(msg IPCMsg) { // Send it to netstated! err := netstatecl.Exec( + httpClient, netstated, inst, ) diff --git a/clientcore/user.go b/clientcore/user.go index 8360f038..4ff04e13 100644 --- a/clientcore/user.go +++ b/clientcore/user.go @@ -8,6 +8,8 @@ package clientcore import ( "context" + "fmt" + "io" "net" "sync" "time" @@ -18,19 +20,21 @@ import ( ) type BroflakeConn struct { - net.PacketConn writeChan chan IPCMsg readChan chan IPCMsg - addr common.DebugAddr + localAddr common.DebugAddr + remoteAddr common.DebugAddr readDeadline time.Time updateReadDeadline chan time.Time + + closeOnce sync.Once } -func (c BroflakeConn) LocalAddr() net.Addr { - return c.addr +func (c *BroflakeConn) LocalAddr() net.Addr { + return c.localAddr } -func (c BroflakeConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { +func (c *BroflakeConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { for { var ctx context.Context @@ -38,18 +42,27 @@ func (c BroflakeConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { if c.readDeadline.IsZero() { ctx, _ = context.WithCancel(context.Background()) } else { - ctx, _ = context.WithDeadline(context.Background(), c.readDeadline) + var cancel context.CancelFunc + ctx, cancel = context.WithDeadline(context.Background(), c.readDeadline) + defer cancel() } select { - case msg := <-c.readChan: + case msg, ok := <-c.readChan: + if !ok { + return 0, c.remoteAddr, io.EOF + } + // The read completed, let's return some bytes! - payload := msg.Data.([]byte) + payload, ok := msg.Data.([]byte) + if !ok { + return 0, c.remoteAddr, fmt.Errorf("wrong type of IPC message: %T", msg.Data) + } copy(p, payload) - return len(payload), common.DebugAddr("DEBUG NELSON WUZ HERE"), nil + return len(payload), c.remoteAddr, nil case <-ctx.Done(): // We're past our deadline, so let's return failure! - return 0, common.DebugAddr("DEBUG NELSON WUZ HERE"), ctx.Err() + return 0, c.remoteAddr, ctx.Err() case d := <-c.updateReadDeadline: // Someone updated the read deadline, so let's iterate to respect the new deadline c.readDeadline = d @@ -57,7 +70,7 @@ func (c BroflakeConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { } } -func (c BroflakeConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { +func (c *BroflakeConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { // TODO: This copy seems necessary to avoid a data race b := make([]byte, len(p)) copy(b, p) @@ -67,6 +80,7 @@ func (c BroflakeConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { // Do nothing, message sent default: // Drop the chunk if we can't keep up with the data rate + return 0, fmt.Errorf("dropped chunk") } return len(b), nil @@ -77,11 +91,22 @@ func (c BroflakeConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { // But when we bumped to quic-go 0.40, it emerged that the dialer wouldn't work unless we added // support for read deadlines. Since there's still no evidence that the dialer cares about write // deadlines, we haven't added support for those yet. -func (c BroflakeConn) SetReadDeadline(t time.Time) error { +func (c *BroflakeConn) SetReadDeadline(t time.Time) error { c.updateReadDeadline <- t return nil } +func (c *BroflakeConn) SetWriteDeadline(t time.Time) error { return nil } +func (c *BroflakeConn) SetDeadline(t time.Time) error { return c.SetReadDeadline(t) } + +func (c *BroflakeConn) Close() error { + c.closeOnce.Do(func() { + close(c.writeChan) + close(c.readChan) + }) + return nil +} + func NewProducerUserStream(wg *sync.WaitGroup) (*BroflakeConn, *WorkerFSM) { worker := NewWorkerFSM(wg, []FSMstate{ FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { @@ -89,15 +114,45 @@ func NewProducerUserStream(wg *sync.WaitGroup) (*BroflakeConn, *WorkerFSM) { // (no input data) common.Debugf("User stream producer state 0...") // TODO: check for a non-nil path assertion to alert the UI that we're ready to proxy? - select {} + + <-ctx.Done() + return 0, nil + }), + }) + + bfconn := BroflakeConn{ + writeChan: worker.com.tx, + readChan: worker.com.rx, + localAddr: common.DebugAddr(uuid.NewString()), + remoteAddr: common.DebugAddr(uuid.NewString()), + readDeadline: time.Time{}, + updateReadDeadline: make(chan time.Time, 512), + } + + return &bfconn, worker +} + +func NewConsumerUserStream(wg *sync.WaitGroup) (*BroflakeConn, *WorkerFSM) { + worker := NewWorkerFSM(wg, []FSMstate{ + FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) { + // State 0 + // (no input data) + common.Debugf("User stream consumer state 0...") + + // Send a path assertion IPC message representing the connectivity now provided by this slot + allowAll := []common.Endpoint{{Host: "*", Distance: 1}} + com.tx <- IPCMsg{IpcType: PathAssertionIPC, Data: common.PathAssertion{Allow: allowAll}} + + <-ctx.Done() + return 0, nil }), }) bfconn := BroflakeConn{ - PacketConn: &net.UDPConn{}, writeChan: worker.com.tx, readChan: worker.com.rx, - addr: common.DebugAddr(uuid.NewString()), + localAddr: common.DebugAddr(uuid.NewString()), + remoteAddr: common.DebugAddr(uuid.NewString()), readDeadline: time.Time{}, updateReadDeadline: make(chan time.Time, 512), } diff --git a/clientcore/webrtc.go b/clientcore/webrtc.go new file mode 100644 index 00000000..ccf0c60a --- /dev/null +++ b/clientcore/webrtc.go @@ -0,0 +1,24 @@ +package clientcore + +import ( + "net" + + "github.com/pion/ice/v4" + "github.com/pion/webrtc/v4" +) + +// createPeerConnection creates a new RTCPeerConnection using the given PacketConn +func createPeerConnection(pconn net.PacketConn, config webrtc.Configuration) (*webrtc.PeerConnection, error) { + if pconn == nil { + return webrtc.NewPeerConnection(config) + } + udpMux := ice.NewUDPMuxDefault(ice.UDPMuxParams{ + UDPConn: pconn, + }) + se := webrtc.SettingEngine{} + se.SetICEUDPMux(udpMux) + se.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeUDP4, webrtc.NetworkTypeUDP6}) + se.SetInterfaceFilter(func(string) bool { return false }) + api := webrtc.NewAPI(webrtc.WithSettingEngine(se)) + return api.NewPeerConnection(config) +} diff --git a/cmd/proxy.go b/cmd/proxy.go index cb123c20..2567d99a 100644 --- a/cmd/proxy.go +++ b/cmd/proxy.go @@ -52,7 +52,7 @@ func runLocalProxy(port string, bfconn *clientcore.BroflakeConn, ca, sn string) return } - go ql.DialAndMaintainQUICConnection() + go ql.DialAndMaintainQUICConnection(nil) proxy.Tr = clientcore.CreateHTTPTransport(ql) proxy.OnRequest().DoFunc( diff --git a/common/network.go b/common/network.go index 174f46da..6dff95d9 100644 --- a/common/network.go +++ b/common/network.go @@ -20,6 +20,7 @@ var QUICCfg = quic.Config{ MaxIncomingUniStreams: int64(2 << 16), MaxIdleTimeout: 16 * time.Second, KeepAlivePeriod: 8 * time.Second, + EnableDatagrams: true, } type DebugAddr string @@ -55,12 +56,16 @@ func (c *QUICStreamNetConn) RemoteAddr() net.Addr { } func (c *QUICStreamNetConn) Close() error { + var err error c.closeOnce.Do(func() { if c.OnClose != nil { c.OnClose() } + c.Stream.CancelRead(quic.StreamErrorCode(0)) + c.Stream.CancelWrite(quic.StreamErrorCode(0)) + err = c.Stream.Close() }) - return c.Stream.Close() + return err } func IsPublicAddr(addr net.IP) bool { diff --git a/egress/egresslib.go b/egress/egresslib.go index 7805d2b4..b0abed8a 100644 --- a/egress/egresslib.go +++ b/egress/egresslib.go @@ -200,7 +200,8 @@ func (wtpconn webtransportPacketConn) SetWriteDeadline(t time.Time) error { // proxyListener implements net.Listener and listens for QUIC connections type proxyListener struct { - listeners []net.Listener // the listeners associated with it + listeners []net.Listener // the listeners associated with it + quicListener *quic.Listener mpconn *multiplexedPacketConn // the multiplexed PacketConn which will be used as the QUIC transport @@ -210,6 +211,9 @@ type proxyListener struct { closeMetrics func(ctx context.Context) error closeOnce sync.Once + + // this handler can be set that handles incoming packets + datagramHandler func(qconn quic.Connection) } func (l *proxyListener) Accept() (net.Conn, error) { @@ -233,6 +237,10 @@ func (l *proxyListener) Close() error { for _, listener := range l.listeners { err = errors.Join(err, listener.Close()) } + // close quic listener + if l.quicListener != nil { + err = errors.Join(err, l.quicListener.Close()) + } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -318,6 +326,7 @@ func (l *proxyListener) listenQUIC(pc net.PacketConn, quicConfig *quic.Config) { common.Debugf("Unable to start QUIC listener: %v", err) return } + l.quicListener = listener for { conn, err := listener.Accept(context.Background()) @@ -357,11 +366,16 @@ func (l *proxyListener) listenQUIC(pc net.PacketConn, quicConfig *quic.Config) { } } }() + + // call datagram handling callback if any + if l.datagramHandler != nil { + go l.datagramHandler(conn) + } } } // NewListenerFromPacketConn starts a QUIC listener on the given PacketConn and returns the QUIC listener -func NewListenerFromPacketConn(ctx context.Context, pc net.PacketConn, certPEM, keyPEM string) (net.Listener, error) { +func NewListenerFromPacketConn(ctx context.Context, pc net.PacketConn, certPEM, keyPEM string, datagramHandler func(qconn quic.Connection)) (net.Listener, error) { if err := otelInit(ctx); err != nil { return nil, err } @@ -371,8 +385,10 @@ func NewListenerFromPacketConn(ctx context.Context, pc net.PacketConn, certPEM, } l := &proxyListener{ - connections: make(chan net.Conn, 2048), - tlsConfig: tlsConfig, + connections: make(chan net.Conn, 2048), + tlsConfig: tlsConfig, + addr: pc.LocalAddr(), + datagramHandler: datagramHandler, } // start QUIC listener diff --git a/go.mod b/go.mod index d9f751e9..b93ed2e7 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/getlantern/quicwrapper v0.0.0-20250610202231-252f48357c93 github.com/getlantern/telemetry v0.0.0-20250606052628-8960164ec1f5 github.com/google/uuid v1.6.0 + github.com/pion/ice/v4 v4.0.10 github.com/pion/webrtc/v4 v4.1.2 github.com/quic-go/quic-go v0.50.1 github.com/stretchr/testify v1.10.0 @@ -59,7 +60,6 @@ require ( github.com/pion/datachannel v1.5.10 // indirect github.com/pion/dtls/v2 v2.2.12 // indirect github.com/pion/dtls/v3 v3.0.6 // indirect - github.com/pion/ice/v4 v4.0.10 // indirect github.com/pion/interceptor v0.1.40 // indirect github.com/pion/logging v0.2.4 // indirect github.com/pion/mdns/v2 v2.0.7 // indirect diff --git a/netstate/client/client.go b/netstate/client/client.go index b8342a36..413a1be7 100644 --- a/netstate/client/client.go +++ b/netstate/client/client.go @@ -40,13 +40,16 @@ func DecodeArgsOpConsumerState(args []string) [][]string { return decoded } -func Exec(netstated string, inst *Instruction) error { +func Exec(httpClient *http.Client, netstated string, inst *Instruction) error { serialized, err := json.Marshal(inst) if err != nil { return err } - res, err := http.Post(netstated, "application/json; charset=UTF-8", bytes.NewBuffer(serialized)) + if httpClient == nil { + httpClient = &http.Client{} + } + res, err := httpClient.Post(netstated, "application/json; charset=UTF-8", bytes.NewBuffer(serialized)) if err != nil { return err } diff --git a/netstate/d/netstated.go b/netstate/d/netstated.go index 719ff5f0..7b034725 100644 --- a/netstate/d/netstated.go +++ b/netstate/d/netstated.go @@ -110,6 +110,18 @@ func (g *multigraph) degree(v vertexLabel) int { return len(g.data[v].edges) } +func (g *multigraph) updateVertexEdge(label vertexLabel, edges []edge) { + g.Lock() + defer g.Unlock() + vv, ok := g.data[label] + if !ok { + common.Debugf("updateVertexEdge: label %v not found", label) + return + } + vv.edges = edges + g.data[label] = vv +} + // prune deletes expired vertices from this multigraph based on the delta between ttl and the current time // TODO: this is an unoptimized solution, requiring two passes through the data structure func (g *multigraph) prune(ttl time.Duration) { @@ -352,10 +364,7 @@ func handleExec(w http.ResponseWriter, r *http.Request) { world.addVertex(remoteLabel, lat, lon, clientTypeCensored) newEdges = append(newEdges, edge{label: remoteLabel, id: workerIdx}) } - - vv := world.data[localLabel] - vv.edges = newEdges - world.data[localLabel] = vv + world.updateVertexEdge(localLabel, newEdges) } w.WriteHeader(http.StatusOK)