From d9bf2991f6c29b4a9fb6816c91645ed5924df54b Mon Sep 17 00:00:00 2001 From: 0x0001 <2239315+0x0001@users.noreply.github.com> Date: Fri, 29 Aug 2025 15:15:17 +0800 Subject: [PATCH 01/10] Add Binance announcement WebSocket support --- v2/annouancement_service.go | 56 ++++++++++++++++++++++ v2/websocket.go | 3 +- v2/websocket_service.go | 93 ++++++++++++++++++++++++++++++++---- v2/websocket_service_test.go | 42 ++++++++++++++++ 4 files changed, 185 insertions(+), 9 deletions(-) create mode 100644 v2/annouancement_service.go diff --git a/v2/annouancement_service.go b/v2/annouancement_service.go new file mode 100644 index 000000000..6d42f2fad --- /dev/null +++ b/v2/annouancement_service.go @@ -0,0 +1,56 @@ +package binance + +import ( + "crypto/rand" + "encoding/hex" + "errors" + "fmt" + "time" + + "github.com/adshao/go-binance/v2/common" +) + +// CreateAnnouncementParam creates a new WsAnnouncementParam for use with WsAnnouncementServe. +// +// Currently supports only WithRecvWindow option, which defaults to 6000 milliseconds +// if not specified. +func (c *Client) CreateAnnouncementParam(opts ...RequestOption) (WsAnnouncementParam, error) { + if c.APIKey == "" || c.SecretKey == "" { + return WsAnnouncementParam{}, errors.New("miss apikey or secret key") + } + kt := c.KeyType + if kt == "" { + kt = common.KeyTypeHmac + } + req := new(request) + for _, opt := range opts { + opt(req) + } + if req.recvWindow == 0 { + req.recvWindow = 6000 + } + + sf, err := common.SignFunc(kt) + if err != nil { + return WsAnnouncementParam{}, err + } + r := make([]byte, 16) + rand.Read(r) + random := hex.EncodeToString(r) + timestamp := time.Now().UnixMilli() + recvWindow := req.recvWindow + + param := WsAnnouncementParam{ + Random: random, + Topic: "com_announcement_en", + RecvWindow: recvWindow, + Timestamp: timestamp, + ApiKey: c.APIKey, + } + signature, err := sf(c.SecretKey, fmt.Sprintf("random=%s&topic=%s&recvWindow=%d×tamp=%d", param.Random, param.Topic, param.RecvWindow, param.Timestamp)) + if err != nil { + return WsAnnouncementParam{}, err + } + param.Signature = *signature + return param, nil +} diff --git a/v2/websocket.go b/v2/websocket.go index 864443787..e41b23e59 100644 --- a/v2/websocket.go +++ b/v2/websocket.go @@ -17,6 +17,7 @@ type ErrHandler func(err error) // WsConfig webservice configuration type WsConfig struct { Endpoint string + Header *http.Header Proxy *string } @@ -42,7 +43,7 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don EnableCompression: true, } - c, _, err := Dialer.Dial(cfg.Endpoint, nil) + c, _, err := Dialer.Dial(cfg.Endpoint, *cfg.Header) if err != nil { return nil, nil, err } diff --git a/v2/websocket_service.go b/v2/websocket_service.go index 8224af5a7..d9461bd3e 100644 --- a/v2/websocket_service.go +++ b/v2/websocket_service.go @@ -2,7 +2,9 @@ package binance import ( "encoding/json" + "errors" "fmt" + "net/http" "strings" "time" @@ -17,6 +19,7 @@ var ( BaseCombinedTestnetURL = "wss://stream.testnet.binance.vision/stream?streams=" BaseWsApiMainURL = "wss://ws-api.binance.com:443/ws-api/v3" BaseWsApiTestnetURL = "wss://ws-api.testnet.binance.vision/ws-api/v3" + BaseWsAnnouncementURL = "wss://api.binance.com/sapi/wss" // WebsocketTimeout is an interval for sending ping/pong messages if WebsocketKeepalive is enabled WebsocketTimeout = time.Second * 600 @@ -135,20 +138,20 @@ func WsCombinedPartialDepthServe(symbolLevels map[string]string, handler WsParti event.Symbol = strings.ToUpper(symbol) data := j.Get("data").MustMap() event.LastUpdateID, _ = data["lastUpdateId"].(json.Number).Int64() - bidsLen := len(data["bids"].([]interface{})) + bidsLen := len(data["bids"].([]any)) event.Bids = make([]Bid, bidsLen) for i := 0; i < bidsLen; i++ { - item := data["bids"].([]interface{})[i].([]interface{}) + item := data["bids"].([]any)[i].([]any) event.Bids[i] = Bid{ Price: item[0].(string), Quantity: item[1].(string), } } - asksLen := len(data["asks"].([]interface{})) + asksLen := len(data["asks"].([]any)) event.Asks = make([]Ask, asksLen) for i := 0; i < asksLen; i++ { - item := data["asks"].([]interface{})[i].([]interface{}) + item := data["asks"].([]any)[i].([]any) event.Asks[i] = Ask{ Price: item[0].(string), Quantity: item[1].(string), @@ -258,20 +261,20 @@ func wsCombinedDepthServe(endpoint string, handler WsDepthHandler, errHandler Er event.Time, _ = data["E"].(json.Number).Int64() event.LastUpdateID, _ = data["u"].(json.Number).Int64() event.FirstUpdateID, _ = data["U"].(json.Number).Int64() - bidsLen := len(data["b"].([]interface{})) + bidsLen := len(data["b"].([]any)) event.Bids = make([]Bid, bidsLen) for i := 0; i < bidsLen; i++ { - item := data["b"].([]interface{})[i].([]interface{}) + item := data["b"].([]any)[i].([]any) event.Bids[i] = Bid{ Price: item[0].(string), Quantity: item[1].(string), } } - asksLen := len(data["a"].([]interface{})) + asksLen := len(data["a"].([]any)) event.Asks = make([]Ask, asksLen) for i := 0; i < asksLen; i++ { - item := data["a"].([]interface{})[i].([]interface{}) + item := data["a"].([]any)[i].([]any) event.Asks[i] = Ask{ Price: item[0].(string), Quantity: item[1].(string), @@ -872,6 +875,80 @@ func WsApiInitReadWriteConn() (*websocket.Conn, error) { return conn, err } +type WsAnnouncementEvent struct { + CatalogID int64 `json:"catalogId"` + CatalogName string `json:"catalogName"` + PublishDate int64 `json:"publishDate"` + Title string `json:"title"` + Body string `json:"body"` + Disclaimer string `json:"disclaimer"` +} + +type WsAnnouncementParam struct { + Random string + Topic string + RecvWindow int64 + Timestamp int64 + Signature string + ApiKey string +} +type WsAnnouncementHandler func(event *WsAnnouncementEvent) + +// WsAnnouncementServe establishes a WebSocket connection to listen for Binance announcements. +// See API documentation: https://developers.binance.com/docs/cms/announcement +// +// Parameters: +// +// params - Should be created using client.CreateAnnouncementParam +// handler - Callback function to handle incoming announcement messages +// errHandler - Error callback function for connection errors +// +// Returns: +// +// doneC - Channel that closes when the connection terminates +// stopC - Channel that can be closed to stop the connection +// err - Any initial connection error +func WsAnnouncementServe(params WsAnnouncementParam, handler WsAnnouncementHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) { + if UseTestnet { + return nil, nil, errors.New("not support testnet") + } + endpoint := fmt.Sprintf("%s?random=%s&topic=%s&recvWindow=%d×tamp=%d&signature=%s", + BaseWsAnnouncementURL, params.Random, params.Topic, params.RecvWindow, params.Timestamp, params.Signature, + ) + + cfg := newWsConfig(endpoint) + cfg.Header = &http.Header{} + cfg.Header.Add("X-MBX-APIKEY", params.ApiKey) + wsHandler := func(message []byte) { + event := struct { + Type string `json:"type"` + Topic string `json:"topic"` + Data string `json:"data"` + }{} + + err := json.Unmarshal(message, &event) + if err != nil { + errHandler(err) + return + } + + if event.Type != "DATA" { + errHandler(errors.New("type is not DATA: " + event.Type)) + return + } + + if event.Topic != "com_announcement_en" { + errHandler(errors.New("topic is not com_announcement_en: " + event.Topic)) + return + } + + e := new(WsAnnouncementEvent) + json.Unmarshal([]byte(event.Data), &e) + handler(e) + } + return wsServe(cfg, wsHandler, errHandler) +} + // getWsApiEndpoint return the base endpoint of the API WS according the UseTestnet flag func getWsApiEndpoint() string { if UseTestnet { diff --git a/v2/websocket_service_test.go b/v2/websocket_service_test.go index 7379bb174..e0e3461ee 100644 --- a/v2/websocket_service_test.go +++ b/v2/websocket_service_test.go @@ -1678,3 +1678,45 @@ func (s *websocketServiceTestSuite) assertWsBookTickerEvent(e, a *WsBookTickerEv r.Equal(e.BestAskPrice, a.BestAskPrice, "BestAskPrice") r.Equal(e.BestAskQty, a.BestAskQty, "BestAskQty") } + +// https://binance-docs.github.io/apidocs/spot/en/#all-book-tickers-stream +func (s *websocketServiceTestSuite) TestWsAnnouncementServe() { + data := []byte(`{ + "type": "DATA", + "topic": "com_announcement_en", + "data": "{\"catalogId\":161,\"catalogName\":\"Delisting\",\"publishDate\":1753257631403,\"title\":\"Notice of...\",\"body\":\"This is...\",\"disclaimer\":\"Trade on-the-go...\"}" +}`) + fakeErrMsg := "fake error" + s.mockWsServe(data, errors.New(fakeErrMsg)) + defer s.assertWsServe() + + doneC, stopC, err := WsAnnouncementServe(WsAnnouncementParam{}, func(event *WsAnnouncementEvent) { + e := &WsAnnouncementEvent{ + CatalogID: 161, + CatalogName: "Delisting", + PublishDate: 1753257631403, + Title: "Notice of...", + Body: "This is...", + Disclaimer: "Trade on-the-go...", + } + _ = e + s.assertWsAnnouncementEvent(e, event) + }, + func(err error) { + s.r().EqualError(err, fakeErrMsg) + }) + + s.r().NoError(err) + stopC <- struct{}{} + <-doneC +} + +func (s *websocketServiceTestSuite) assertWsAnnouncementEvent(e, a *WsAnnouncementEvent) { + r := s.r() + r.Equal(e.CatalogID, a.CatalogID, "CatalogID") + r.Equal(e.CatalogName, a.CatalogName, "CatalogName") + r.Equal(e.PublishDate, a.PublishDate, "PublishDate") + r.Equal(e.Title, a.Title, "Title") + r.Equal(e.Body, a.Body, "Body") + r.Equal(e.Disclaimer, a.Disclaimer, "Disclaimer") +} From b70b844ec1dc37bd94a64008b68c1a958f31d830 Mon Sep 17 00:00:00 2001 From: 0x0001 <2239315+0x0001@users.noreply.github.com> Date: Fri, 29 Aug 2025 16:16:14 +0800 Subject: [PATCH 02/10] Add custom connection handler for websocket keepalive Allows passing a ConnHandler to wsServeWithConnHandler for custom connection management, enabling active keepalive via ping messages. WsAnnouncementServe now uses this to send periodic pings. --- v2/websocket.go | 34 ++++++++++++++++++++++++++++++++-- v2/websocket_service.go | 2 +- v2/websocket_service_test.go | 8 ++++---- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/v2/websocket.go b/v2/websocket.go index e41b23e59..ec5b44e0b 100644 --- a/v2/websocket.go +++ b/v2/websocket.go @@ -1,6 +1,7 @@ package binance import ( + "context" "net/http" "net/url" "time" @@ -28,7 +29,14 @@ func newWsConfig(endpoint string) *WsConfig { } } -var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) { +func wsServe(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) { + return wsServeWithConnHandler(cfg, handler, errHandler, nil) +} + +type ConnHandler func(context.Context, *websocket.Conn) + +// WsServeWithConnHandler serves websocket with custom connection handler, useful for custom keepalive +var wsServeWithConnHandler = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler, connHandler ConnHandler) (doneC, stopC chan struct{}, err error) { proxy := http.ProxyFromEnvironment if cfg.Proxy != nil { u, err := url.Parse(*cfg.Proxy) @@ -62,6 +70,13 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don keepAlive(c, WebsocketTimeout) } + // Custom connection handling, useful in active keepalive scenarios + if connHandler != nil { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go connHandler(ctx, c) + } + // Wait for the stopC channel to be closed. We do that in a // separate goroutine because ReadMessage is a blocking // operation. @@ -88,6 +103,21 @@ var wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (don return } +func keepAliveHandler(duration time.Duration) ConnHandler { + return func(ctx context.Context, c *websocket.Conn) { + ticker := time.NewTicker(duration) + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + c.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WebsocketPongTimeout)) + } + } + } +} + func keepAlive(c *websocket.Conn, timeout time.Duration) { ticker := time.NewTicker(timeout) @@ -137,7 +167,7 @@ var WsGetReadWriteConnection = func(cfg *WsConfig) (*websocket.Conn, error) { EnableCompression: false, } - c, _, err := Dialer.Dial(cfg.Endpoint, nil) + c, _, err := Dialer.Dial(cfg.Endpoint, *cfg.Header) if err != nil { return nil, err } diff --git a/v2/websocket_service.go b/v2/websocket_service.go index d9461bd3e..20c1c813f 100644 --- a/v2/websocket_service.go +++ b/v2/websocket_service.go @@ -946,7 +946,7 @@ func WsAnnouncementServe(params WsAnnouncementParam, handler WsAnnouncementHandl json.Unmarshal([]byte(event.Data), &e) handler(e) } - return wsServe(cfg, wsHandler, errHandler) + return wsServeWithConnHandler(cfg, wsHandler, errHandler, keepAliveHandler(30*time.Second)) } // getWsApiEndpoint return the base endpoint of the API WS according the UseTestnet flag diff --git a/v2/websocket_service_test.go b/v2/websocket_service_test.go index e0e3461ee..040a23fdc 100644 --- a/v2/websocket_service_test.go +++ b/v2/websocket_service_test.go @@ -9,7 +9,7 @@ import ( type websocketServiceTestSuite struct { baseTestSuite - origWsServe func(*WsConfig, WsHandler, ErrHandler) (chan struct{}, chan struct{}, error) + origWsServe func(*WsConfig, WsHandler, ErrHandler, ConnHandler) (chan struct{}, chan struct{}, error) serveCount int } @@ -18,16 +18,16 @@ func TestWebsocketService(t *testing.T) { } func (s *websocketServiceTestSuite) SetupTest() { - s.origWsServe = wsServe + s.origWsServe = wsServeWithConnHandler } func (s *websocketServiceTestSuite) TearDownTest() { - wsServe = s.origWsServe + wsServeWithConnHandler = s.origWsServe s.serveCount = 0 } func (s *websocketServiceTestSuite) mockWsServe(data []byte, err error) { - wsServe = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, innerErr error) { + wsServeWithConnHandler = func(cfg *WsConfig, handler WsHandler, errHandler ErrHandler, connHandler ConnHandler) (doneC, stopC chan struct{}, innerErr error) { s.serveCount++ doneC = make(chan struct{}) stopC = make(chan struct{}) From 395d12ec5430af603ecee94a95402845431d1434 Mon Sep 17 00:00:00 2001 From: 0x0001 <2239315+0x0001@users.noreply.github.com> Date: Fri, 29 Aug 2025 17:12:25 +0800 Subject: [PATCH 03/10] Refactor websocket keepalive handling and improve pong timeout --- v2/websocket.go | 38 ++++++++++++++++++++++++++++---------- v2/websocket_service.go | 2 +- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/v2/websocket.go b/v2/websocket.go index ec5b44e0b..24467b811 100644 --- a/v2/websocket.go +++ b/v2/websocket.go @@ -30,7 +30,13 @@ func newWsConfig(endpoint string) *WsConfig { } func wsServe(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) { - return wsServeWithConnHandler(cfg, handler, errHandler, nil) + return wsServeWithConnHandler(cfg, handler, errHandler, func(ctx context.Context, c *websocket.Conn) { + if WebsocketKeepalive { + // This function overwrites the default ping frame handler + // sent by the websocket API server + keepAlive(c, WebsocketTimeout) + } + }) } type ConnHandler func(context.Context, *websocket.Conn) @@ -64,11 +70,6 @@ var wsServeWithConnHandler = func(cfg *WsConfig, handler WsHandler, errHandler E // closed by the client. defer close(doneC) - if WebsocketKeepalive { - // This function overwrites the default ping frame handler - // sent by the websocket API server - keepAlive(c, WebsocketTimeout) - } // Custom connection handling, useful in active keepalive scenarios if connHandler != nil { @@ -103,16 +104,33 @@ var wsServeWithConnHandler = func(cfg *WsConfig, handler WsHandler, errHandler E return } -func keepAliveHandler(duration time.Duration) ConnHandler { +func keepAliveHandler(interval time.Duration, pongTimeout time.Duration) ConnHandler { return func(ctx context.Context, c *websocket.Conn) { - ticker := time.NewTicker(duration) + ticker := time.NewTicker(interval) + defer ticker.Stop() + + lastResponse := time.Now() + c.SetPongHandler(func(appData string) error { + lastResponse = time.Now() + return nil + }) + + lastPongTicker := time.NewTicker(pongTimeout) + defer lastPongTicker.Stop() + for { select { case <-ctx.Done(): - ticker.Stop() return case <-ticker.C: - c.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WebsocketPongTimeout)) + if err := c.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WebsocketPongTimeout)); err != nil { + return + } + case <-lastPongTicker.C: + if time.Since(lastResponse) > pongTimeout { + c.Close() + return + } } } } diff --git a/v2/websocket_service.go b/v2/websocket_service.go index 20c1c813f..69485a34c 100644 --- a/v2/websocket_service.go +++ b/v2/websocket_service.go @@ -946,7 +946,7 @@ func WsAnnouncementServe(params WsAnnouncementParam, handler WsAnnouncementHandl json.Unmarshal([]byte(event.Data), &e) handler(e) } - return wsServeWithConnHandler(cfg, wsHandler, errHandler, keepAliveHandler(30*time.Second)) + return wsServeWithConnHandler(cfg, wsHandler, errHandler, keepAliveHandler(30*time.Second, WebsocketTimeout)) } // getWsApiEndpoint return the base endpoint of the API WS according the UseTestnet flag From e0409a0cb17a0806ecd20992fd213d519ddc52d1 Mon Sep 17 00:00:00 2001 From: 0x0001 <2239315+0x0001@users.noreply.github.com> Date: Sun, 31 Aug 2025 10:33:08 +0800 Subject: [PATCH 04/10] Refactor websocket keepalive functions for clarity and context handling --- v2/websocket.go | 20 ++++++++++++-------- v2/websocket_service.go | 2 +- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/v2/websocket.go b/v2/websocket.go index 24467b811..4a8a976f2 100644 --- a/v2/websocket.go +++ b/v2/websocket.go @@ -34,7 +34,7 @@ func wsServe(cfg *WsConfig, handler WsHandler, errHandler ErrHandler) (doneC, st if WebsocketKeepalive { // This function overwrites the default ping frame handler // sent by the websocket API server - keepAlive(c, WebsocketTimeout) + keepAliveWithPong(ctx, c, WebsocketTimeout) } }) } @@ -104,7 +104,8 @@ var wsServeWithConnHandler = func(cfg *WsConfig, handler WsHandler, errHandler E return } -func keepAliveHandler(interval time.Duration, pongTimeout time.Duration) ConnHandler { +// keepAliveWithPing Keepalive by actively sending ping messages +func keepAliveWithPing(interval time.Duration, pongTimeout time.Duration) ConnHandler { return func(ctx context.Context, c *websocket.Conn) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -136,8 +137,10 @@ func keepAliveHandler(interval time.Duration, pongTimeout time.Duration) ConnHan } } -func keepAlive(c *websocket.Conn, timeout time.Duration) { +// keepAliveWithPong Keepalive by responding to ping messages +func keepAliveWithPong(ctx context.Context, c *websocket.Conn, timeout time.Duration) { ticker := time.NewTicker(timeout) + defer ticker.Stop() lastResponse := time.Now() @@ -157,16 +160,17 @@ func keepAlive(c *websocket.Conn, timeout time.Duration) { return nil }) - go func() { - defer ticker.Stop() - for { - <-ticker.C + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: if time.Since(lastResponse) > timeout { c.Close() return } } - }() + } } var WsGetReadWriteConnection = func(cfg *WsConfig) (*websocket.Conn, error) { diff --git a/v2/websocket_service.go b/v2/websocket_service.go index 69485a34c..5ffdd1d28 100644 --- a/v2/websocket_service.go +++ b/v2/websocket_service.go @@ -946,7 +946,7 @@ func WsAnnouncementServe(params WsAnnouncementParam, handler WsAnnouncementHandl json.Unmarshal([]byte(event.Data), &e) handler(e) } - return wsServeWithConnHandler(cfg, wsHandler, errHandler, keepAliveHandler(30*time.Second, WebsocketTimeout)) + return wsServeWithConnHandler(cfg, wsHandler, errHandler, keepAliveWithPing(30*time.Second, WebsocketTimeout)) } // getWsApiEndpoint return the base endpoint of the API WS according the UseTestnet flag From 692bacd56d95fc7ae476244df7766b253512d98c Mon Sep 17 00:00:00 2001 From: 0x0001 <2239315+0x0001@users.noreply.github.com> Date: Sun, 31 Aug 2025 10:46:10 +0800 Subject: [PATCH 05/10] Refactor WsConfig Header to non-pointer and initialize by default --- v2/websocket.go | 7 ++++--- v2/websocket_service.go | 2 -- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/v2/websocket.go b/v2/websocket.go index 4a8a976f2..9a8adb023 100644 --- a/v2/websocket.go +++ b/v2/websocket.go @@ -18,7 +18,7 @@ type ErrHandler func(err error) // WsConfig webservice configuration type WsConfig struct { Endpoint string - Header *http.Header + Header http.Header Proxy *string } @@ -26,6 +26,7 @@ func newWsConfig(endpoint string) *WsConfig { return &WsConfig{ Endpoint: endpoint, Proxy: getWsProxyUrl(), + Header: make(http.Header), } } @@ -57,7 +58,7 @@ var wsServeWithConnHandler = func(cfg *WsConfig, handler WsHandler, errHandler E EnableCompression: true, } - c, _, err := Dialer.Dial(cfg.Endpoint, *cfg.Header) + c, _, err := Dialer.Dial(cfg.Endpoint, cfg.Header) if err != nil { return nil, nil, err } @@ -189,7 +190,7 @@ var WsGetReadWriteConnection = func(cfg *WsConfig) (*websocket.Conn, error) { EnableCompression: false, } - c, _, err := Dialer.Dial(cfg.Endpoint, *cfg.Header) + c, _, err := Dialer.Dial(cfg.Endpoint, cfg.Header) if err != nil { return nil, err } diff --git a/v2/websocket_service.go b/v2/websocket_service.go index 5ffdd1d28..7667a3d6a 100644 --- a/v2/websocket_service.go +++ b/v2/websocket_service.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "fmt" - "net/http" "strings" "time" @@ -917,7 +916,6 @@ func WsAnnouncementServe(params WsAnnouncementParam, handler WsAnnouncementHandl ) cfg := newWsConfig(endpoint) - cfg.Header = &http.Header{} cfg.Header.Add("X-MBX-APIKEY", params.ApiKey) wsHandler := func(message []byte) { event := struct { From e43820ed557bac09fa386c71d3606ee3cf51cad8 Mon Sep 17 00:00:00 2001 From: 0x0001 <2239315+0x0001@users.noreply.github.com> Date: Wed, 3 Sep 2025 09:07:43 +0800 Subject: [PATCH 06/10] Fix typos and improve error handling in announcement service --- v2/{annouancement_service.go => announcement_service.go} | 2 +- v2/websocket_service.go | 7 +++++-- v2/websocket_service_test.go | 1 - 3 files changed, 6 insertions(+), 4 deletions(-) rename v2/{annouancement_service.go => announcement_service.go} (94%) diff --git a/v2/annouancement_service.go b/v2/announcement_service.go similarity index 94% rename from v2/annouancement_service.go rename to v2/announcement_service.go index 6d42f2fad..034b9f305 100644 --- a/v2/annouancement_service.go +++ b/v2/announcement_service.go @@ -16,7 +16,7 @@ import ( // if not specified. func (c *Client) CreateAnnouncementParam(opts ...RequestOption) (WsAnnouncementParam, error) { if c.APIKey == "" || c.SecretKey == "" { - return WsAnnouncementParam{}, errors.New("miss apikey or secret key") + return WsAnnouncementParam{}, errors.New("missing API key or secret key") } kt := c.KeyType if kt == "" { diff --git a/v2/websocket_service.go b/v2/websocket_service.go index 7667a3d6a..c65d7b219 100644 --- a/v2/websocket_service.go +++ b/v2/websocket_service.go @@ -909,7 +909,7 @@ type WsAnnouncementHandler func(event *WsAnnouncementEvent) // err - Any initial connection error func WsAnnouncementServe(params WsAnnouncementParam, handler WsAnnouncementHandler, errHandler ErrHandler) (doneC, stopC chan struct{}, err error) { if UseTestnet { - return nil, nil, errors.New("not support testnet") + return nil, nil, errors.New("testnet is not supported") } endpoint := fmt.Sprintf("%s?random=%s&topic=%s&recvWindow=%d×tamp=%d&signature=%s", BaseWsAnnouncementURL, params.Random, params.Topic, params.RecvWindow, params.Timestamp, params.Signature, @@ -941,7 +941,10 @@ func WsAnnouncementServe(params WsAnnouncementParam, handler WsAnnouncementHandl } e := new(WsAnnouncementEvent) - json.Unmarshal([]byte(event.Data), &e) + if err := json.Unmarshal([]byte(event.Data), &e); err != nil { + errHandler(err) + return + } handler(e) } return wsServeWithConnHandler(cfg, wsHandler, errHandler, keepAliveWithPing(30*time.Second, WebsocketTimeout)) diff --git a/v2/websocket_service_test.go b/v2/websocket_service_test.go index 040a23fdc..b8b7b75ef 100644 --- a/v2/websocket_service_test.go +++ b/v2/websocket_service_test.go @@ -1699,7 +1699,6 @@ func (s *websocketServiceTestSuite) TestWsAnnouncementServe() { Body: "This is...", Disclaimer: "Trade on-the-go...", } - _ = e s.assertWsAnnouncementEvent(e, event) }, func(err error) { From b234b72df37619490ee15aff4a23f1b1ab676eb9 Mon Sep 17 00:00:00 2001 From: 0x0001 <2239315+0x0001@users.noreply.github.com> Date: Tue, 23 Sep 2025 15:04:40 +0800 Subject: [PATCH 07/10] ADD atomic timestamp handling and new keepalive timeout config This commit introduces atomic operations for tracking the last response time in websocket keepalive handlers, ensuring thread-safe access to the timestamp. It also adds a new `WebsocketKeepaliveTimeout` configuration variable to control the ping/pong message interval when keepalive is enabled, replacing the previous use of `WebsocketPongTimeout` for this purpose. --- v2/websocket.go | 19 +++++++++++-------- v2/websocket_service.go | 2 ++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/v2/websocket.go b/v2/websocket.go index 9a8adb023..257348b0c 100644 --- a/v2/websocket.go +++ b/v2/websocket.go @@ -4,6 +4,7 @@ import ( "context" "net/http" "net/url" + "sync/atomic" "time" "github.com/gorilla/websocket" @@ -111,9 +112,10 @@ func keepAliveWithPing(interval time.Duration, pongTimeout time.Duration) ConnHa ticker := time.NewTicker(interval) defer ticker.Stop() - lastResponse := time.Now() + var lastResponse int64 + atomic.StoreInt64(&lastResponse, time.Now().Unix()) c.SetPongHandler(func(appData string) error { - lastResponse = time.Now() + atomic.StoreInt64(&lastResponse, time.Now().Unix()) return nil }) @@ -125,11 +127,11 @@ func keepAliveWithPing(interval time.Duration, pongTimeout time.Duration) ConnHa case <-ctx.Done(): return case <-ticker.C: - if err := c.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WebsocketPongTimeout)); err != nil { + if err := c.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WebsocketKeepaliveTimeout)); err != nil { return } case <-lastPongTicker.C: - if time.Since(lastResponse) > pongTimeout { + if time.Since(time.Unix(atomic.LoadInt64(&lastResponse), 0)) > pongTimeout { c.Close() return } @@ -143,20 +145,21 @@ func keepAliveWithPong(ctx context.Context, c *websocket.Conn, timeout time.Dura ticker := time.NewTicker(timeout) defer ticker.Stop() - lastResponse := time.Now() + var lastResponse int64 + atomic.StoreInt64(&lastResponse, time.Now().Unix()) c.SetPingHandler(func(pingData string) error { // Respond with Pong using the server's PING payload err := c.WriteControl( websocket.PongMessage, []byte(pingData), - time.Now().Add(WebsocketPongTimeout), // Short deadline to ensure timely response + time.Now().Add(WebsocketKeepaliveTimeout), // Short deadline to ensure timely response ) if err != nil { return err } - lastResponse = time.Now() + atomic.StoreInt64(&lastResponse, time.Now().Unix()) return nil }) @@ -166,7 +169,7 @@ func keepAliveWithPong(ctx context.Context, c *websocket.Conn, timeout time.Dura case <-ctx.Done(): return case <-ticker.C: - if time.Since(lastResponse) > timeout { + if time.Since(time.Unix(atomic.LoadInt64(&lastResponse), 0)) > timeout { c.Close() return } diff --git a/v2/websocket_service.go b/v2/websocket_service.go index e3cd88428..99df740b0 100644 --- a/v2/websocket_service.go +++ b/v2/websocket_service.go @@ -24,6 +24,8 @@ var ( WebsocketTimeout = time.Second * 600 // WebsocketPongTimeout is an interval for sending a PONG frame in response to PING frame from server WebsocketPongTimeout = time.Second * 10 + // WebsocketKeepaliveTimeout is an interval for sending ping/pong messages if WebsocketKeepalive is enabled + WebsocketKeepaliveTimeout = time.Second * 10 // WebsocketKeepalive enables sending ping/pong messages to check the connection stability WebsocketKeepalive = true // WebsocketTimeoutReadWriteConnection is an interval for sending ping/pong messages if WebsocketKeepalive is enabled From 86649a8a7cc41fab9b762dbf930833eac44000ac Mon Sep 17 00:00:00 2001 From: 0x0001 <2239315+0x0001@users.noreply.github.com> Date: Tue, 23 Sep 2025 19:26:18 +0800 Subject: [PATCH 08/10] RENAME WebsocketKeepaliveTimeout to WebsocketPingTimeout The timeout variable names are now consistent with their actual usage: - WebsocketPingTimeout for PING frame deadlines - WebsocketPongTimeout for PONG frame deadlines This change affects both websocket.go and websocket_service.go files. --- v2/websocket.go | 4 ++-- v2/websocket_service.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/v2/websocket.go b/v2/websocket.go index 257348b0c..ac7a7a893 100644 --- a/v2/websocket.go +++ b/v2/websocket.go @@ -127,7 +127,7 @@ func keepAliveWithPing(interval time.Duration, pongTimeout time.Duration) ConnHa case <-ctx.Done(): return case <-ticker.C: - if err := c.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WebsocketKeepaliveTimeout)); err != nil { + if err := c.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(WebsocketPingTimeout)); err != nil { return } case <-lastPongTicker.C: @@ -153,7 +153,7 @@ func keepAliveWithPong(ctx context.Context, c *websocket.Conn, timeout time.Dura err := c.WriteControl( websocket.PongMessage, []byte(pingData), - time.Now().Add(WebsocketKeepaliveTimeout), // Short deadline to ensure timely response + time.Now().Add(WebsocketPongTimeout), // Short deadline to ensure timely response ) if err != nil { return err diff --git a/v2/websocket_service.go b/v2/websocket_service.go index 99df740b0..645b5bb6a 100644 --- a/v2/websocket_service.go +++ b/v2/websocket_service.go @@ -24,8 +24,8 @@ var ( WebsocketTimeout = time.Second * 600 // WebsocketPongTimeout is an interval for sending a PONG frame in response to PING frame from server WebsocketPongTimeout = time.Second * 10 - // WebsocketKeepaliveTimeout is an interval for sending ping/pong messages if WebsocketKeepalive is enabled - WebsocketKeepaliveTimeout = time.Second * 10 + /// WebsocketPingTimeout is an interval for sending a PING frame in response to PONG frame from server + WebsocketPingTimeout = time.Second * 10 // WebsocketKeepalive enables sending ping/pong messages to check the connection stability WebsocketKeepalive = true // WebsocketTimeoutReadWriteConnection is an interval for sending ping/pong messages if WebsocketKeepalive is enabled From f2bd8a79aebb669abb445ad1e130613df30e93bc Mon Sep 17 00:00:00 2001 From: 0x0001 <2239315+0x0001@users.noreply.github.com> Date: Tue, 23 Sep 2025 19:50:12 +0800 Subject: [PATCH 09/10] Use Set instead of Add for X-MBX-APIKEY header --- v2/websocket_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/websocket_service.go b/v2/websocket_service.go index 645b5bb6a..67c001c34 100644 --- a/v2/websocket_service.go +++ b/v2/websocket_service.go @@ -955,7 +955,7 @@ func WsAnnouncementServe(params WsAnnouncementParam, handler WsAnnouncementHandl ) cfg := newWsConfig(endpoint) - cfg.Header.Add("X-MBX-APIKEY", params.ApiKey) + cfg.Header.Set("X-MBX-APIKEY", params.ApiKey) wsHandler := func(message []byte) { event := struct { Type string `json:"type"` From e1f6ce17caf9468099417c3a20872f286fa1db9f Mon Sep 17 00:00:00 2001 From: 0x0001 <2239315+0x0001@users.noreply.github.com> Date: Sun, 19 Oct 2025 09:32:28 +0800 Subject: [PATCH 10/10] Handle rand.Read error and fix ping comment --- v2/announcement_service.go | 4 +++- v2/websocket_service.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/v2/announcement_service.go b/v2/announcement_service.go index 034b9f305..40c6f8b58 100644 --- a/v2/announcement_service.go +++ b/v2/announcement_service.go @@ -35,7 +35,9 @@ func (c *Client) CreateAnnouncementParam(opts ...RequestOption) (WsAnnouncementP return WsAnnouncementParam{}, err } r := make([]byte, 16) - rand.Read(r) + if _, err := rand.Read(r); err != nil { + return WsAnnouncementParam{}, fmt.Errorf("failed to generate random bytes: %w", err) + } random := hex.EncodeToString(r) timestamp := time.Now().UnixMilli() recvWindow := req.recvWindow diff --git a/v2/websocket_service.go b/v2/websocket_service.go index 13ae60ff1..242a6caab 100644 --- a/v2/websocket_service.go +++ b/v2/websocket_service.go @@ -26,7 +26,7 @@ var ( WebsocketTimeout = time.Second * 600 // WebsocketPongTimeout is an interval for sending a PONG frame in response to PING frame from server WebsocketPongTimeout = time.Second * 10 - /// WebsocketPingTimeout is an interval for sending a PING frame in response to PONG frame from server + // WebsocketPingTimeout is an interval for waiting for a PONG response after sending a PING framer WebsocketPingTimeout = time.Second * 10 // WebsocketKeepalive enables sending ping/pong messages to check the connection stability WebsocketKeepalive = true