Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dcrdata/exchanges: connect to signalR websocket without signalR client #1945

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions cmd/dcrdata/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ require (
github.com/btcsuite/btcwallet/wtxmgr v1.5.0 // indirect
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd // indirect
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 // indirect
github.com/carterjones/go-cloudflare-scraper v0.1.2 // indirect
github.com/carterjones/signalr v0.3.5 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/companyzero/sntrup4591761 v0.0.0-20200131011700-2b0d299dbd22 // indirect
Expand Down Expand Up @@ -146,7 +144,6 @@ require (
github.com/prometheus/tsdb v0.7.1 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rjeczalik/notify v0.9.1 // indirect
github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 // indirect
Expand All @@ -167,6 +164,5 @@ require (
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/sourcemap.v1 v1.0.5 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)
9 changes: 0 additions & 9 deletions cmd/dcrdata/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,6 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f
github.com/bufbuild/buf v0.37.0/go.mod h1:lQ1m2HkIaGOFba6w/aC3KYBHhKEOESP3gaAEpS3dAFM=
github.com/caarlos0/env/v6 v6.9.3 h1:Tyg69hoVXDnpO5Qvpsu8EoquarbPyQb+YwExWHP8wWU=
github.com/caarlos0/env/v6 v6.9.3/go.mod h1:hvp/ryKXKipEkcuYjs9mI4bBCg+UI0Yhgm5Zu0ddvwc=
github.com/carterjones/go-cloudflare-scraper v0.1.2 h1:GNmlJEfhIVPVXaEItnPSDtwOpuJo6rFH2SFWuAjeEuM=
github.com/carterjones/go-cloudflare-scraper v0.1.2/go.mod h1:maO/ygX7QWbdh/TzHqr5uR42b2BW81g/05QRx7fpw38=
github.com/carterjones/signalr v0.3.5 h1:kJSw+6a9XmsOb/+9HWTnY8SjTrVOdpzCSPV/9IVS2nI=
github.com/carterjones/signalr v0.3.5/go.mod h1:SOGIwr/0/4GGNjHWSSginY66OVSaOeM85yWCNytdEwE=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down Expand Up @@ -453,7 +449,6 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elazarl/goproxy v0.0.0-20181111060418-2ce16c963a8a/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -1064,8 +1059,6 @@ github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rjeczalik/notify v0.9.1 h1:CLCKso/QK1snAlnhNR/CNvNiFU2saUtjV0bx3EwNeCE=
github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho=
github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d h1:1VUlQbCfkoSGv7qP7Y+ro3ap1P1pPZxgdGVqiTVy5C4=
github.com/robertkrimen/otto v0.0.0-20180617131154-15f95af6e78d/go.mod h1:xvqspoSXJTIpemEonrMDFq6XzwHYYgToXWj5eRX1OtY=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down Expand Up @@ -1727,8 +1720,6 @@ gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU=
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/sourcemap.v1 v1.0.5 h1:inv58fC9f9J3TK2Y2R1NPntXEn3/wjWHkonhIUODNTI=
gopkg.in/sourcemap.v1 v1.0.5/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb78=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
Expand Down
140 changes: 61 additions & 79 deletions exchanges/exchanges.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
"decred.org/dcrdex/dex"
dexcandles "decred.org/dcrdex/dex/candles"
"decred.org/dcrdex/dex/msgjson"
"github.com/carterjones/signalr"
"github.com/carterjones/signalr/hubs"
dcrrates "github.com/decred/dcrdata/exchanges/v3/ratesproto"
)

Expand Down Expand Up @@ -122,7 +120,7 @@ var (
},
// Bittrex uses SignalR, which retrieves the actual websocket endpoint via
// HTTP.
Websocket: "socket.bittrex.com",
Websocket: "socket-v3.bittrex.com",
}
DragonExURLs = URLs{
Price: "https://openapi.dragonex.io/api/v1/market/real/?symbol_id=1520101",
Expand Down Expand Up @@ -453,20 +451,17 @@ type CommonExchange struct {
channels *BotChannels
wsMtx sync.RWMutex
ws websocketFeed
sr signalrClient
wsSync struct {
err error
errCount int
init time.Time
update time.Time
fail time.Time
}
// wsProcessor is only used for websockets, not SignalR. For SignalR, the
// callback function is passed as part of the signalrConfig.
// wsProcessor is used to process messages from websockets.
wsProcessor WebsocketProcessor
// Exchanges that use websockets or signalr to maintain a live orderbook can
// use the buy and sell slices to leverage some useful methods on
// CommonExchange.
// Exchanges that use websockets to maintain a live orderbook can use the
// buy and sell slices to leverage some useful methods on CommonExchange.
orderMtx sync.RWMutex
buys wsOrders
asks wsOrders
Expand Down Expand Up @@ -602,21 +597,9 @@ func (xc *CommonExchange) websocket() (websocketFeed, WebsocketProcessor) {
return xc.ws, xc.wsProcessor
}

// Grab the SignalR client, which is nil for most exchanges.
func (xc *CommonExchange) signalr() signalrClient {
xc.mtx.RLock()
defer xc.mtx.RUnlock()
return xc.sr
}

// Creates a websocket connection and starts a listen loop. Closes any existing
// connections for this exchange.
func (xc *CommonExchange) connectWebsocket(processor WebsocketProcessor, cfg *socketConfig) error {
ws, err := newSocketConnection(cfg)
if err != nil {
return err
}

// addWebsocketConnection adds a websocket connection and it's message processor
// to an exchange.
func (xc *CommonExchange) addWebsocketConnection(ws websocketFeed, processor WebsocketProcessor) {
xc.wsMtx.Lock()
// Ensure that any previous websocket is closed.
if xc.ws != nil {
Expand All @@ -627,6 +610,17 @@ func (xc *CommonExchange) connectWebsocket(processor WebsocketProcessor, cfg *so
xc.wsMtx.Unlock()

xc.startWebsocket()
}

// Creates a websocket connection and starts a listen loop. Closes any existing
// connections for this exchange.
func (xc *CommonExchange) connectWebsocket(processor WebsocketProcessor, cfg *socketConfig) error {
ws, err := newSocketConnection(cfg)
if err != nil {
return err
}

xc.addWebsocketConnection(ws, processor)
return nil
}

Expand All @@ -645,8 +639,7 @@ func (xc *CommonExchange) startWebsocket() {
}()
}

// wsSend sends a message on a standard websocket connection. For SignalR
// connections, use xc.sr.Send directly.
// wsSend sends a message on a standard websocket connection.
func (xc *CommonExchange) wsSend(msg interface{}) error {
ws, _ := xc.websocket()
if ws == nil {
Expand All @@ -656,6 +649,15 @@ func (xc *CommonExchange) wsSend(msg interface{}) error {
return ws.Write(msg)
}

// wsSendJSON is like wsSend but it encodes msg to JSON before sending.
func (xc *CommonExchange) wsSendJSON(msg interface{}) error {
ws, _ := xc.websocket()
if ws == nil {
return errors.New("no connection")
}
return ws.WriteJSON(msg)
}

// Checks whether the websocketFeed Done channel is closed.
func (xc *CommonExchange) wsListening() bool {
xc.wsMtx.RLock()
Expand All @@ -673,15 +675,6 @@ func (xc *CommonExchange) setWsFail(err error) {
// Clear the field to prevent double Close'ing.
xc.ws = nil
}
if xc.sr != nil {
// The carterjones/signalr can hang on Close. The goroutine is a stopgap while
// we migrate to a new signalr client.
// https://github.com/decred/dcrdata/issues/1818
go xc.sr.Close()
// Clear the field to prevent double Close'ing. signalr will hang on
// second call.
xc.sr = nil
}
xc.wsSync.err = err
xc.wsSync.errCount++
xc.wsSync.fail = time.Now()
Expand Down Expand Up @@ -731,26 +724,6 @@ func (xc *CommonExchange) wsErrorCount() int {
return xc.wsSync.errCount
}

// For exchanges that have SignalR-wrapped websockets, connectSignalr will be
// used instead of connectWebsocket.
func (xc *CommonExchange) connectSignalr(cfg *signalrConfig) (err error) {
if cfg.errHandler == nil {
cfg.errHandler = func(err error) {
xc.wsMtx.Lock()
xc.sr = nil
xc.wsMtx.Unlock()
xc.setWsFail(err)
}
}
xc.wsMtx.Lock()
defer xc.wsMtx.Unlock()
if xc.sr != nil {
xc.sr.Close()
}
xc.sr, err = newSignalrConnection(cfg)
return
}

// An intermediate order representation used to track an orderbook over a
// websocket connection.
type wsOrder struct {
Expand Down Expand Up @@ -860,7 +833,6 @@ func (xc *CommonExchange) wsDepthStatus(connector func()) (tryHttp, initializing
log.Errorf("%s websocket disabled. Too many errors. Will attempt to reconnect after %.1f minutes", xc.token, time.Until(okToTry).Minutes())
}
return

}

// Used to initialize the embedding exchanges.
Expand Down Expand Up @@ -1305,9 +1277,9 @@ func NewBittrex(client *http.Client, channels *BotChannels) (bittrex Exchange, e
}
go func() {
<-channels.done
sr := b.signalr()
if sr != nil {
sr.Close()
ws, _ := b.websocket()
if ws != nil {
ws.Close()
}
}()
bittrex = b
Expand Down Expand Up @@ -1429,7 +1401,7 @@ func translateBittrexCandlesticks(inSticks []*BittrexCandlestick) Candlesticks {

const maxBittrexQueueSize = 50

var bittrexSubscribeOrderbook = hubs.ClientMsg{
var bittrexSubscribeOrderbook = signalRClientMsg{
H: "c3",
M: "Subscribe",
A: []interface{}{[]interface{}{"orderbook_DCR-BTC_500", "heartbeat"}},
Expand All @@ -1443,7 +1415,7 @@ func (bittrex *BittrexExchange) processBittrexOrderbookPoint(order *BittrexOrder
case 0:
_, found := book[k]
if !found {
bittrex.setWsFail(fmt.Errorf("no order found for bittrex orderbook removal-type update at key %d\n", k))
bittrex.setWsFail(fmt.Errorf("no order found for bittrex orderbook removal-type update at key %d", k))
return
}
delete(book, k)
Expand Down Expand Up @@ -1554,7 +1526,7 @@ type BittrexOrderbookDelta struct {
Rate StringFloat `json:"rate"`
}

// BittrexWSMsg is used to parse the ridiculous signalr message format into
// BittrexWSMsg is used to parse the ridiculous signalR message format into
// something sane.
type BittrexWSMsg struct {
Name string
Expand All @@ -1566,7 +1538,7 @@ const (
BittrexMsgBookUpdate = "orderBook"
)

func decodeBittrexWSMessage(msg signalr.Message) ([]*BittrexWSMsg, error) {
func decodeBittrexWSMessage(msg signalRMessage) ([]*BittrexWSMsg, error) {
msgs := make([]*BittrexWSMsg, 0, len(msg.M))
for _, hubMsg := range msg.M {
msg := &BittrexWSMsg{
Expand Down Expand Up @@ -1613,11 +1585,23 @@ func decodeBittrexWSMessage(msg signalr.Message) ([]*BittrexWSMsg, error) {
return msgs, nil
}

// Handle the SignalR message. The message can be either a full orderbook at
// msg.R (msg.I == "1"), or a list of updates in msg.M[i].A.
func (bittrex *BittrexExchange) msgHandler(inMsg signalr.Message) {
// processWsMessage handles message from the bittrex websocket. The message can
// be either a full orderbook at msg.R (msg.I == "1"), or a list of updates in
// msg.M[i].A.
func (bittrex *BittrexExchange) processWsMessage(inMsg []byte) {
// Ignore KeepAlive messages.
if len(inMsg) == 2 && inMsg[0] == '{' && inMsg[1] == '}' {
return
}

var msg signalRMessage
err := json.Unmarshal(inMsg, &msg)
if err != nil {
bittrex.setWsFail(fmt.Errorf("unable to read message bytes: %v", err))
return
}

msgs, err := decodeBittrexWSMessage(inMsg)
msgs, err := decodeBittrexWSMessage(msg)
if err != nil {
bittrex.setWsFail(fmt.Errorf("Bittrex websocket message decode error: %v", err))
return
Expand Down Expand Up @@ -1685,20 +1669,18 @@ func (bittrex *BittrexExchange) connectWs() {
bittrex.orderSeq = 0
bittrex.orderMtx.Unlock()

err := bittrex.connectSignalr(&signalrConfig{
host: "socket-v3.bittrex.com",
protocol: "1.5",
endpoint: "/signalr",
msgHandler: bittrex.msgHandler,
})
conn, err := connectSignalRWebsocket(BittrexURLs.Websocket, "/signalr", nil)
if err != nil {
bittrex.setWsFail(err)
bittrex.setWsFail(fmt.Errorf("connectSignalrWebsocket error: %v", err))
return
}

// Subscribe to the feed. The full orderbook will be requested once the first
// delta is received.
err = bittrex.sr.Send(bittrexSubscribeOrderbook)
// Add the websocket feed to the bittrex exchange.
bittrex.addWebsocketConnection(conn, bittrex.processWsMessage)

// Subscribe to the feed. The full orderbook will be requested once the
// first delta is received.
err = bittrex.wsSendJSON(bittrexSubscribeOrderbook)
if err != nil {
bittrex.setWsFail(fmt.Errorf("Failed to send order update request to bittrex: %v", err))
return
Expand All @@ -1713,8 +1695,8 @@ func (bittrex *BittrexExchange) connectWs() {
bittrex.processFullOrderbook(book)
}

// Refresh retrieves and parses API data from Bittrex.
// Bittrex provides timestamps in a string format that is not quite RFC 3339.
// Refresh retrieves and parses API data from Bittrex. Bittrex provides
// timestamps in a string format that is not quite RFC 3339.
func (bittrex *BittrexExchange) Refresh() {
bittrex.LogRequest()
priceResponse := new(BittrexPriceResponse)
Expand Down
Loading