Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions clientcore/broflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clientcore

import (
"fmt"
"net/http"
"runtime"
"sync"
"time"
Expand All @@ -12,17 +13,18 @@ import (
)

type BroflakeEngine struct {
cTable *WorkerTable
pTable *WorkerTable
ui UI
wg *sync.WaitGroup
netstated string
tag string
netstateHeartbeat time.Duration
netstateStop chan struct{}
cTable *WorkerTable
pTable *WorkerTable
ui UI
wg *sync.WaitGroup
netstated string
tag string
netstateHeartbeat time.Duration
netstateStop chan struct{}
netstateHttpClient *http.Client
}

func NewBroflakeEngine(cTable, pTable *WorkerTable, ui UI, wg *sync.WaitGroup, netstated, tag string) *BroflakeEngine {
func NewBroflakeEngine(cTable, pTable *WorkerTable, ui UI, wg *sync.WaitGroup, netstated, tag string, httpClient *http.Client) *BroflakeEngine {
return &BroflakeEngine{
cTable,
pTable,
Expand All @@ -32,6 +34,7 @@ func NewBroflakeEngine(cTable, pTable *WorkerTable, ui UI, wg *sync.WaitGroup, n
tag,
1 * time.Minute,
make(chan struct{}, 0),
httpClient,
}
}

Expand All @@ -47,6 +50,7 @@ func (b *BroflakeEngine) start() {
for {
common.Debug("Netstate HEARTBEAT")
err := netstatecl.Exec(
b.netstateHttpClient,
b.netstated,
&netstatecl.Instruction{
Op: netstatecl.OpConsumerState,
Expand All @@ -71,10 +75,12 @@ func (b *BroflakeEngine) start() {
}
}

func (b *BroflakeEngine) stop() {
func (b *BroflakeEngine) stop() <-chan struct{} {
b.cTable.Stop()
b.pTable.Stop()

stopped := make(chan struct{})

go func() {
b.wg.Wait()

Expand All @@ -84,15 +90,23 @@ func (b *BroflakeEngine) stop() {

common.Debug("■ Broflake stopped.")
b.ui.OnReady()

close(stopped)
}()

return stopped
}

func (b *BroflakeEngine) Close() <-chan struct{} {
return b.stop()
}

func (b *BroflakeEngine) debug() {
common.Debugf("NumGoroutine: %v", runtime.NumGoroutine())
}

func NewBroflake(bfOpt *BroflakeOptions, rtcOpt *WebRTCOptions, egOpt *EgressOptions) (bfconn *BroflakeConn, ui *UIImpl, err error) {
if bfOpt.ClientType != "desktop" && bfOpt.ClientType != "widget" {
if bfOpt.ClientType != "desktop" && bfOpt.ClientType != "widget" && bfOpt.ClientType != "singbox-inbound" {
err = fmt.Errorf("invalid clientType '%v\n'", bfOpt.ClientType)
common.Debugf(err.Error())
return bfconn, ui, err
Expand Down Expand Up @@ -164,18 +178,30 @@ func NewBroflake(bfOpt *BroflakeOptions, rtcOpt *WebRTCOptions, egOpt *EgressOpt
}
}
pTable = NewWorkerTable(pfsms)
case "singbox-inbound": // only used in radiance/sing-box-extensions as an inbound protocol
// singbox-inbound peers share connectivity over WebRTC
var cfsms []WorkerFSM
for i := 0; i < bfOpt.CTableSize; i++ {
cfsms = append(cfsms, *NewProducerWebRTC(rtcOpt, &wgReady))
}
cTable = NewWorkerTable(cfsms)

// singbox-inbound peers consume connectivity from bfconn
var consumerUserStream *WorkerFSM
bfconn, consumerUserStream = NewConsumerUserStream(&wgReady)
pTable = NewWorkerTable([]WorkerFSM{*consumerUserStream})
}

// Step 2: Build Broflake
broflake := NewBroflakeEngine(cTable, pTable, ui, &wgReady, bfOpt.Netstated, rtcOpt.Tag)
broflake := NewBroflakeEngine(cTable, pTable, ui, &wgReady, bfOpt.Netstated, rtcOpt.Tag, bfOpt.NetstateHttpClient)

// Step 3: Init the UI (this constructs and exposes the JavaScript API as required)
ui.Init(broflake)

// Step 4: Set up the bus, bind upstream and downstream UI handlers
var bus = NewIpcObserver(
bfOpt.BusBufferSz,
UpstreamUIHandler(*ui, bfOpt.Netstated, rtcOpt.Tag),
UpstreamUIHandler(*ui, bfOpt.Netstated, rtcOpt.Tag, bfOpt.NetstateHttpClient),
DownstreamUIHandler(*ui, bfOpt.Netstated, rtcOpt.Tag),
)

Expand All @@ -187,6 +213,10 @@ func NewBroflake(bfOpt *BroflakeOptions, rtcOpt *WebRTCOptions, egOpt *EgressOpt
case "widget":
cRouter = NewConsumerRouter(bus.Downstream, cTable)
pRouter = NewProducerPoolRouter(bus.Upstream, pTable)
case "singbox-inbound":
cRouter = NewConsumerRouter(bus.Downstream, cTable)
// TODO: use of NewProducerSerialRouter() won't work. Find out why
pRouter = NewProducerPoolRouter(bus.Upstream, pTable)
}

// Step 6: Start the bus, init the routers, fire our UI events to announce that we're ready
Expand Down
56 changes: 48 additions & 8 deletions clientcore/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
// (no input data)
common.Debugf("Consumer state 0, constructing RTCPeerConnection...")

select {
case <-ctx.Done():
return 0, []interface{}{}
default:
}

// We're resetting this slot, so send a nil path assertion IPC message
com.tx <- IPCMsg{IpcType: PathAssertionIPC, Data: common.PathAssertion{}}

Expand Down Expand Up @@ -66,7 +72,7 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
// peerConnection, err := webrtcAPI.NewPeerConnection(config)

// Construct the RTCPeerConnection
peerConnection, err := webrtc.NewPeerConnection(config)
peerConnection, err := createPeerConnection(options.UDPConn, config)
if err != nil {
common.Debugf("Error creating RTCPeerConnection: %v", err)
return 0, []interface{}{}
Expand Down Expand Up @@ -153,15 +159,23 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
res, err := options.HttpClient.Do(req)
if err != nil {
common.Debugf("Couldn't subscribe to genesis stream at %v: %v", options.DiscoverySrv+options.Endpoint, err)
<-time.After(options.ErrorBackoff)
select {
case <-ctx.Done():
return 0, []interface{}{}
case <-time.After(options.ErrorBackoff):
}
return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed}
}
defer res.Body.Close()

// Handle bad protocol version
if res.StatusCode == http.StatusTeapot {
common.Debugf("Received 'bad protocol version' response")
<-time.After(options.ErrorBackoff)
select {
case <-ctx.Done():
return 0, []interface{}{}
case <-time.After(options.ErrorBackoff):
}
return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed}
}

Expand Down Expand Up @@ -205,7 +219,11 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
rt, _, err := common.DecodeSignalMsg(rawMsg)
if err != nil {
common.Debugf("Error decoding signal message: %v (msg: %v)", err, string(rawMsg))
<-time.After(options.ErrorBackoff)
select {
case <-ctx.Done():
return 0, []interface{}{}
case <-time.After(options.ErrorBackoff):
}
// Take the error in stride, continue listening to our existing HTTP request stream
continue
}
Expand All @@ -219,6 +237,8 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
break listenLoop
case <-patienceExpired:
break listenLoop
case <-ctx.Done():
return 0, []interface{}{}
}
}

Expand Down Expand Up @@ -302,15 +322,23 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
res, err := options.HttpClient.Do(req)
if err != nil {
common.Debugf("Couldn't signal offer SDP to %v: %v", options.DiscoverySrv+options.Endpoint, err)
<-time.After(options.ErrorBackoff)
select {
case <-ctx.Done():
return 0, []interface{}{}
case <-time.After(options.ErrorBackoff):
}
return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed}
}
defer res.Body.Close()

switch res.StatusCode {
case http.StatusTeapot:
common.Debugf("Received 'bad protocol version' response")
<-time.After(options.ErrorBackoff)
select {
case <-ctx.Done():
return 0, []interface{}{}
case <-time.After(options.ErrorBackoff):
}
return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed}
case http.StatusNotFound:
// We didn't win the connection
Expand Down Expand Up @@ -408,6 +436,9 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
// Borked!
peerConnection.Close() // TODO: there's an err we should handle here
return 0, []interface{}{}
case <-ctx.Done():
peerConnection.Close() // TODO: there's an err we should handle here
return 0, []interface{}{}
}

return 3, []interface{}{peerConnection, replyTo, candidates, connectionEstablished, connectionChange, connectionClosed}
Expand Down Expand Up @@ -461,7 +492,10 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
res, err := options.HttpClient.Do(req)
if err != nil {
common.Debugf("Couldn't signal ICE candidates to %v: %v", options.DiscoverySrv+options.Endpoint, err)
<-time.After(options.ErrorBackoff)
select {
case <-ctx.Done():
case <-time.After(options.ErrorBackoff):
}
// Borked!
peerConnection.Close() // TODO: there's an err we should handle here
return 0, []interface{}{}
Expand All @@ -471,7 +505,10 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
switch res.StatusCode {
case http.StatusTeapot:
common.Debugf("Received 'bad protocol version' response")
<-time.After(options.ErrorBackoff)
select {
case <-ctx.Done():
case <-time.After(options.ErrorBackoff):
}
// Borked!
peerConnection.Close() // TODO: there's an err we should handle here
return 0, []interface{}{}
Expand Down Expand Up @@ -518,6 +555,9 @@ func NewConsumerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
// Borked!
peerConnection.Close() // TODO: there's an err we should handle here
return 0, []interface{}{}
case <-ctx.Done():
peerConnection.Close() // TODO: there's an err we should handle here
return 0, []interface{}{}
}
}),
FSMstate(func(ctx context.Context, com *ipcChan, input []interface{}) (int, []interface{}) {
Expand Down
52 changes: 45 additions & 7 deletions clientcore/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
// (no input data)
common.Debugf("Producer state 0, constructing RTCPeerConnection...")

select {
case <-ctx.Done():
return 0, []interface{}{}
default:
}

// Populate the STUN cache if necessary
if scache.size() == 0 {
allSTUNSrvs, err := options.STUNBatch(math.MaxInt32)
Expand Down Expand Up @@ -61,7 +67,7 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
// peerConnection, err := webrtcAPI.NewPeerConnection(config)

// Construct the RTCPeerConnection
peerConnection, err := webrtc.NewPeerConnection(config)
peerConnection, err := createPeerConnection(options.UDPConn, config)
if err != nil {
common.Debugf("Error creating RTCPeerConnection: %v", err)
return 0, []interface{}{}
Expand Down Expand Up @@ -127,6 +133,12 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
connectionClosed := input[3].(chan struct{})
common.Debugf("Producer state 1...")

select {
case <-ctx.Done():
return 0, []interface{}{}
default:
}

// Do we have a non-nil path assertion, indicating that we have upstream connectivity to share?
// We find out by sending an ConnectivityCheckIPC message, which asks the process responsible
// for path assertions to send a message reflecting the current state of our path assertion.
Expand Down Expand Up @@ -192,7 +204,11 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
res, err := options.HttpClient.Do(req)
if err != nil {
common.Debugf("Couldn't signal genesis message to %v: %v", options.DiscoverySrv+options.Endpoint, err)
<-time.After(options.ErrorBackoff)
select {
case <-ctx.Done():
return 0, []interface{}{}
case <-time.After(options.ErrorBackoff):
}
return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed}
}
defer res.Body.Close()
Expand All @@ -202,7 +218,11 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
// Handle bad protocol version
if res.StatusCode == http.StatusTeapot {
common.Debugf("Received 'bad protocol version' response")
<-time.After(options.ErrorBackoff)
select {
case <-ctx.Done():
return 0, []interface{}{}
case <-time.After(options.ErrorBackoff):
}
return 1, []interface{}{peerConnection, connectionEstablished, connectionChange, connectionClosed}
}

Expand Down Expand Up @@ -282,10 +302,12 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
case <-time.After(options.ICEFailTimeout):
common.Debugf("Timeout, aborting ICE gathering!")
scache.drop()

// Borked!
peerConnection.Close() // TODO: there's an err we should handle here
return 0, []interface{}{}
case <-ctx.Done():
scache.drop()
peerConnection.Close()
return 0, []interface{}{}
}

// TODO: To maintain role agnosticism, we must assume that a producer can be censored, and
Expand Down Expand Up @@ -328,7 +350,10 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
res, err := options.HttpClient.Do(req)
if err != nil {
common.Debugf("Couldn't signal answer SDP to %v: %v", options.DiscoverySrv+options.Endpoint, err)
<-time.After(options.ErrorBackoff)
select {
case <-ctx.Done():
case <-time.After(options.ErrorBackoff):
}
// Borked!
peerConnection.Close() // TODO: there's an err we should handle here
return 0, []interface{}{}
Expand All @@ -338,7 +363,10 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
switch res.StatusCode {
case 418:
common.Debugf("Received 'bad protocol version' response")
<-time.After(options.ErrorBackoff)
select {
case <-ctx.Done():
case <-time.After(options.ErrorBackoff):
}
// Borked!
peerConnection.Close() // TODO: there's an err we should handle here
return 0, []interface{}{}
Expand Down Expand Up @@ -455,6 +483,9 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
// Borked!
peerConnection.Close() // TODO: there's an err we should handle here
return 0, []interface{}{}
case <-ctx.Done():
peerConnection.Close() // TODO: there's an err we should handle here
return 0, []interface{}{}
}

// XXX: This loop represents an alternate strategy for detecting NAT traversal success or
Expand Down Expand Up @@ -504,6 +535,13 @@ func NewProducerWebRTC(options *WebRTCOptions, wg *sync.WaitGroup) *WorkerFSM {
offer := input[5].(common.OfferMsg)
common.Debugf("Producer state 5...")

select {
case <-ctx.Done():
peerConnection.Close()
return 0, []interface{}{}
default:
}

// Announce the new connectivity situation for this slot
com.tx <- IPCMsg{
IpcType: ConsumerInfoIPC,
Expand Down
Loading