-
Notifications
You must be signed in to change notification settings - Fork 766
Add Binance announcement WebSocket support #750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
d9bf299
b70b844
395d12e
e0409a0
692bacd
e43820e
ce42051
b234b72
86649a8
f2bd8a7
f72bfec
2b8465d
e1f6ce1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| package binance | ||
|
|
||
| import ( | ||
| "crypto/rand" | ||
| "encoding/hex" | ||
| "errors" | ||
| "fmt" | ||
| "time" | ||
|
|
||
| "github.com/adshao/go-binance/v2/common" | ||
| ) | ||
|
|
||
| // CreateAnnouncementParam creates a new WsAnnouncementParam for use with WsAnnouncementServe. | ||
| // | ||
| // Currently supports only WithRecvWindow option, which defaults to 6000 milliseconds | ||
| // if not specified. | ||
| func (c *Client) CreateAnnouncementParam(opts ...RequestOption) (WsAnnouncementParam, error) { | ||
| if c.APIKey == "" || c.SecretKey == "" { | ||
| return WsAnnouncementParam{}, errors.New("missing API key or secret key") | ||
| } | ||
| kt := c.KeyType | ||
| if kt == "" { | ||
| kt = common.KeyTypeHmac | ||
| } | ||
| req := new(request) | ||
| for _, opt := range opts { | ||
| opt(req) | ||
| } | ||
| if req.recvWindow == 0 { | ||
| req.recvWindow = 6000 | ||
| } | ||
|
|
||
| sf, err := common.SignFunc(kt) | ||
| if err != nil { | ||
| return WsAnnouncementParam{}, err | ||
| } | ||
| r := make([]byte, 16) | ||
| if _, err := rand.Read(r); err != nil { | ||
| return WsAnnouncementParam{}, fmt.Errorf("failed to generate random bytes: %w", err) | ||
| } | ||
| random := hex.EncodeToString(r) | ||
| timestamp := time.Now().UnixMilli() | ||
| recvWindow := req.recvWindow | ||
|
|
||
| param := WsAnnouncementParam{ | ||
| Random: random, | ||
| Topic: "com_announcement_en", | ||
| RecvWindow: recvWindow, | ||
| Timestamp: timestamp, | ||
| ApiKey: c.APIKey, | ||
| } | ||
| signature, err := sf(c.SecretKey, fmt.Sprintf("random=%s&topic=%s&recvWindow=%d×tamp=%d", param.Random, param.Topic, param.RecvWindow, param.Timestamp)) | ||
| if err != nil { | ||
| return WsAnnouncementParam{}, err | ||
| } | ||
| param.Signature = *signature | ||
| return param, nil | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,10 @@ | ||
| package binance | ||
|
|
||
| import ( | ||
| "context" | ||
| "net/http" | ||
| "net/url" | ||
| "sync/atomic" | ||
| "time" | ||
|
|
||
| "github.com/gorilla/websocket" | ||
|
|
@@ -17,17 +19,32 @@ type ErrHandler func(err error) | |
| // WsConfig webservice configuration | ||
| type WsConfig struct { | ||
| Endpoint string | ||
| Header http.Header | ||
| Proxy *string | ||
| } | ||
|
|
||
| func newWsConfig(endpoint string) *WsConfig { | ||
| return &WsConfig{ | ||
| Endpoint: endpoint, | ||
| Proxy: getWsProxyUrl(), | ||
| Header: make(http.Header), | ||
| } | ||
| } | ||
|
|
||
| var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) { | ||
| func wsServe(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) { | ||
| return wsServeWithConnHandler(cfg, handler, errHandler, func(ctx context.Context, c *websocket.Conn) { | ||
| if WebsocketKeepalive { | ||
| // This function overwrites the default ping frame handler | ||
| // sent by the websocket API server | ||
| keepAliveWithPong(ctx, c, WebsocketTimeout) | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| type ConnHandler func(context.Context, *websocket.Conn) | ||
|
|
||
| // WsServeWithConnHandler serves websocket with custom connection handler, useful for custom keepalive | ||
| var wsServeWithConnHandler = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler, connHandler ConnHandler) (doneC, stopC chan struct{}, err error) { | ||
| proxy := http.ProxyFromEnvironment | ||
| if cfg.Proxy != nil { | ||
| u, err := url.Parse(*cfg.Proxy) | ||
|
|
@@ -42,7 +59,7 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don | |
| EnableCompression: true, | ||
| } | ||
|
|
||
| c, _, err := Dialer.Dial(cfg.Endpoint, nil) | ||
| c, _, err := Dialer.Dial(cfg.Endpoint, cfg.Header) | ||
| if err != nil { | ||
| return nil, nil, err | ||
| } | ||
|
|
@@ -55,10 +72,12 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don | |
| // closed by the client. | ||
|
|
||
| defer close(doneC) | ||
| if WebsocketKeepalive { | ||
| // This function overwrites the default ping frame handler | ||
| // sent by the websocket API server | ||
| keepAlive(c, WebsocketTimeout) | ||
|
|
||
| // Custom connection handling, useful in active keepalive scenarios | ||
| if connHandler != nil { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you wrap keepalive in another connHandler? The connHandler isn't used in other place. How do you think we keep in the old way? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The websocket for the Binance announcement requires active ping messages. I want to put passive keepalive and active keepalive into one logic. The active keepalive is used in the |
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
| go connHandler(ctx, c) | ||
| } | ||
|
|
||
| // Wait for the stopC channel to be closed. We do that in a | ||
|
|
@@ -87,10 +106,47 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don | |
| return | ||
| } | ||
|
|
||
| func keepAlive(c *websocket.Conn, timeout time.Duration) { | ||
| // keepAliveWithPing Keepalive by actively sending ping messages | ||
| func keepAliveWithPing(interval time.Duration, pongTimeout time.Duration) ConnHandler { | ||
| return func(ctx context.Context, c *websocket.Conn) { | ||
| ticker := time.NewTicker(interval) | ||
| defer ticker.Stop() | ||
|
|
||
| var lastResponse int64 | ||
| atomic.StoreInt64(&lastResponse, time.Now().Unix()) | ||
| c.SetPongHandler(func(appData string) error { | ||
| atomic.StoreInt64(&lastResponse, time.Now().Unix()) | ||
| return nil | ||
| }) | ||
|
|
||
| lastPongTicker := time.NewTicker(pongTimeout) | ||
| defer lastPongTicker.Stop() | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-ticker.C: | ||
| if err := c.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WebsocketPingTimeout)); err != nil { | ||
| return | ||
| } | ||
| case <-lastPongTicker.C: | ||
| if time.Since(time.Unix(atomic.LoadInt64(&lastResponse), 0)) > pongTimeout { | ||
| c.Close() | ||
| return | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // keepAliveWithPong Keepalive by responding to ping messages | ||
| func keepAliveWithPong(ctx context.Context, c *websocket.Conn, timeout time.Duration) { | ||
| ticker := time.NewTicker(timeout) | ||
| defer ticker.Stop() | ||
|
|
||
| lastResponse := time.Now() | ||
| var lastResponse int64 | ||
| atomic.StoreInt64(&lastResponse, time.Now().Unix()) | ||
|
|
||
| c.SetPingHandler(func(pingData string) error { | ||
| // Respond with Pong using the server's PING payload | ||
|
|
@@ -103,21 +159,22 @@ func keepAlive(c *websocket.Conn, timeout time.Duration) { | |
| return err | ||
| } | ||
|
|
||
| lastResponse = time.Now() | ||
| atomic.StoreInt64(&lastResponse, time.Now().Unix()) | ||
|
|
||
| return nil | ||
| }) | ||
|
|
||
| go func() { | ||
| defer ticker.Stop() | ||
| for { | ||
| <-ticker.C | ||
| if time.Since(lastResponse) > timeout { | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we close the connection when context is cancelled? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The go func() {
defer close(doneC) // 1
if connHandler != nil {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 2
go connHandler(ctx, c)
}
silent := false
go func() {
select {
case <-stopC:
silent = true
case <-doneC:
}
c.Close() // 3
}()
for {
_, message, err := c.ReadMessage()
if err != nil {
if !silent {
errHandler(err)
}
return
}
handler(message)
}
}()// 2 go routine will only cause context cancel when it exits |
||
| return | ||
| case <-ticker.C: | ||
| if time.Since(time.Unix(atomic.LoadInt64(&lastResponse), 0)) > timeout { | ||
| c.Close() | ||
| return | ||
| } | ||
| } | ||
| }() | ||
| } | ||
| } | ||
|
|
||
| var WsGetReadWriteConnection = func(cfg *WsConfig) (*websocket.Conn, error) { | ||
|
|
@@ -136,7 +193,7 @@ var WsGetReadWriteConnection = func(cfg *WsConfig) (*websocket.Conn, error) { | |
| EnableCompression: false, | ||
| } | ||
|
|
||
| c, _, err := Dialer.Dial(cfg.Endpoint, nil) | ||
| c, _, err := Dialer.Dial(cfg.Endpoint, cfg.Header) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ package binance | |
|
|
||
| import ( | ||
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "strings" | ||
| "time" | ||
|
|
@@ -19,11 +20,14 @@ var ( | |
| BaseCombinedTestnetURL = "wss://stream.testnet.binance.vision/stream?streams=" | ||
| BaseWsApiMainURL = "wss://ws-api.binance.com:443/ws-api/v3" | ||
| BaseWsApiTestnetURL = "wss://ws-api.testnet.binance.vision/ws-api/v3" | ||
| BaseWsAnnouncementURL = "wss://api.binance.com/sapi/wss" | ||
|
|
||
| // WebsocketTimeout is an interval for sending ping/pong messages if WebsocketKeepalive is enabled | ||
| WebsocketTimeout = time.Second * 600 | ||
| // WebsocketPongTimeout is an interval for sending a PONG frame in response to PING frame from server | ||
| WebsocketPongTimeout = time.Second * 10 | ||
| // WebsocketPingTimeout is an interval for waiting for a PONG response after sending a PING framer | ||
| WebsocketPingTimeout = time.Second * 10 | ||
| // WebsocketKeepalive enables sending ping/pong messages to check the connection stability | ||
| WebsocketKeepalive = true | ||
| // WebsocketTimeoutReadWriteConnection is an interval for sending ping/pong messages if WebsocketKeepalive is enabled | ||
|
|
@@ -137,20 +141,20 @@ func WsCombinedPartialDepthServe(symbolLevels map[string]string, handler WsParti | |
| event.Symbol = strings.ToUpper(symbol) | ||
| data := j.Get("data").MustMap() | ||
| event.LastUpdateID, _ = data["lastUpdateId"].(json.Number).Int64() | ||
| bidsLen := len(data["bids"].([]interface{})) | ||
| bidsLen := len(data["bids"].([]any)) | ||
| event.Bids = make([]Bid, bidsLen) | ||
| for i := 0; i < bidsLen; i++ { | ||
| item := data["bids"].([]interface{})[i].([]interface{}) | ||
| item := data["bids"].([]any)[i].([]any) | ||
| event.Bids[i] = Bid{ | ||
| Price: item[0].(string), | ||
| Quantity: item[1].(string), | ||
| } | ||
| } | ||
| asksLen := len(data["asks"].([]interface{})) | ||
| asksLen := len(data["asks"].([]any)) | ||
| event.Asks = make([]Ask, asksLen) | ||
| for i := 0; i < asksLen; i++ { | ||
|
|
||
| item := data["asks"].([]interface{})[i].([]interface{}) | ||
| item := data["asks"].([]any)[i].([]any) | ||
| event.Asks[i] = Ask{ | ||
| Price: item[0].(string), | ||
| Quantity: item[1].(string), | ||
|
|
@@ -260,20 +264,20 @@ func wsCombinedDepthServe(endpoint string, handler WsDepthHandler, errHandler Er | |
| event.Time, _ = data["E"].(json.Number).Int64() | ||
| event.LastUpdateID, _ = data["u"].(json.Number).Int64() | ||
| event.FirstUpdateID, _ = data["U"].(json.Number).Int64() | ||
| bidsLen := len(data["b"].([]interface{})) | ||
| bidsLen := len(data["b"].([]any)) | ||
| event.Bids = make([]Bid, bidsLen) | ||
| for i := 0; i < bidsLen; i++ { | ||
| item := data["b"].([]interface{})[i].([]interface{}) | ||
| item := data["b"].([]any)[i].([]any) | ||
| event.Bids[i] = Bid{ | ||
| Price: item[0].(string), | ||
| Quantity: item[1].(string), | ||
| } | ||
| } | ||
| asksLen := len(data["a"].([]interface{})) | ||
| asksLen := len(data["a"].([]any)) | ||
| event.Asks = make([]Ask, asksLen) | ||
| for i := 0; i < asksLen; i++ { | ||
|
|
||
| item := data["a"].([]interface{})[i].([]interface{}) | ||
| item := data["a"].([]any)[i].([]any) | ||
| event.Asks[i] = Ask{ | ||
| Price: item[0].(string), | ||
| Quantity: item[1].(string), | ||
|
|
@@ -1084,6 +1088,82 @@ func WsApiInitReadWriteConn() (*gorilla.Conn, error) { | |
| return conn, err | ||
| } | ||
|
|
||
| type WsAnnouncementEvent struct { | ||
| CatalogID int64 `json:"catalogId"` | ||
| CatalogName string `json:"catalogName"` | ||
| PublishDate int64 `json:"publishDate"` | ||
| Title string `json:"title"` | ||
| Body string `json:"body"` | ||
| Disclaimer string `json:"disclaimer"` | ||
| } | ||
|
|
||
| type WsAnnouncementParam struct { | ||
| Random string | ||
| Topic string | ||
| RecvWindow int64 | ||
| Timestamp int64 | ||
| Signature string | ||
| ApiKey string | ||
| } | ||
| type WsAnnouncementHandler func(event *WsAnnouncementEvent) | ||
|
|
||
| // WsAnnouncementServe establishes a WebSocket connection to listen for Binance announcements. | ||
| // See API documentation: https://developers.binance.com/docs/cms/announcement | ||
| // | ||
| // Parameters: | ||
| // | ||
| // params - Should be created using client.CreateAnnouncementParam | ||
| // handler - Callback function to handle incoming announcement messages | ||
| // errHandler - Error callback function for connection errors | ||
| // | ||
| // Returns: | ||
| // | ||
| // doneC - Channel that closes when the connection terminates | ||
| // stopC - Channel that can be closed to stop the connection | ||
| // err - Any initial connection error | ||
| func WsAnnouncementServe(params WsAnnouncementParam, handler WsAnnouncementHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) { | ||
| if UseTestnet { | ||
| return nil, nil, errors.New("testnet is not supported") | ||
| } | ||
| endpoint := fmt.Sprintf("%s?random=%s&topic=%s&recvWindow=%d×tamp=%d&signature=%s", | ||
| BaseWsAnnouncementURL, params.Random, params.Topic, params.RecvWindow, params.Timestamp, params.Signature, | ||
| ) | ||
|
|
||
| cfg := newWsConfig(endpoint) | ||
| cfg.Header.Set("X-MBX-APIKEY", params.ApiKey) | ||
| wsHandler := func(message []byte) { | ||
| event := struct { | ||
| Type string `json:"type"` | ||
| Topic string `json:"topic"` | ||
| Data string `json:"data"` | ||
| }{} | ||
|
|
||
| err := json.Unmarshal(message, &event) | ||
| if err != nil { | ||
| errHandler(err) | ||
| return | ||
| } | ||
|
|
||
| if event.Type != "DATA" { | ||
| errHandler(errors.New("type is not DATA: " + event.Type)) | ||
| return | ||
| } | ||
|
|
||
| if event.Topic != "com_announcement_en" { | ||
| errHandler(errors.New("topic is not com_announcement_en: " + event.Topic)) | ||
| return | ||
| } | ||
|
Comment on lines
+1152
to
+1155
|
||
|
|
||
| e := new(WsAnnouncementEvent) | ||
| if err := json.Unmarshal([]byte(event.Data), &e); err != nil { | ||
| errHandler(err) | ||
| return | ||
| } | ||
| handler(e) | ||
| } | ||
| return wsServeWithConnHandler(cfg, wsHandler, errHandler, keepAliveWithPing(30*time.Second, WebsocketTimeout)) | ||
|
Comment on lines
+1162
to
+1164
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sc0Vu here, send ping actively, in the last arguments There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My bad, you're right. |
||
| } | ||
|
|
||
| // getWsApiEndpoint return the base endpoint of the API WS according the UseTestnet flag | ||
| func getWsApiEndpoint() string { | ||
| if UseTestnet { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the docs (https://developers.binance.com/docs/cms/general-info), looks like we need to send ping actively.
wsServeseems to keep connection by replying ping from server. Does that work?