diff --git a/Makefile b/Makefile index 64f5e3c..47d1fe1 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ run: - go run ./examples/main.go + go run ./examples/candles/main.go build: - go build -o ./bin/example ./examples/main.go + go build -o ./bin/candles ./examples/candles/main.go + go build -o ./bin/account ./examples/account/main.go test: go test -v ./.../ --race diff --git a/account.go b/account.go index bcd5cb1..5f5ecf7 100644 --- a/account.go +++ b/account.go @@ -15,41 +15,57 @@ import ( type Order struct { Guid string `json:"guid"` + // The order id of the returned order. OrderId string `json:"orderId"` + // Is a timestamp in milliseconds since 1 Jan 1970. Created int64 `json:"created"` + // Is a timestamp in milliseconds since 1 Jan 1970. Updated int64 `json:"updated"` - // The current status of the order + + // The current status of the order. // Enum: "new" | "awaitingTrigger" | "canceled" | "canceledAuction" | "canceledSelfTradePrevention" | "canceledIOC" | "canceledFOK" | "canceledMarketProtection" | "canceledPostOnly" | "filled" | "partiallyFilled" | "expired" | "rejected" Status string `json:"status"` + // Side // Enum: "buy" | "sell" Side string `json:"side"` + // OrderType // Enum: "limit" | "market" OrderType string `json:"orderType"` + // Original amount. Amount float64 `json:"amount"` + // Amount remaining (lower than 'amount' after fills). AmountRemaining float64 `json:"amountRemaining"` + // The price of the order. Price float64 `json:"price"` + // Amount of 'onHoldCurrency' that is reserved for this order. This is released when orders are canceled. OnHold float64 `json:"onHold"` + // The currency placed on hold is the quote currency for sell orders and base currency for buy orders. OnHoldCurrency string `json:"onHoldCurrency"` + // Only for stop orders: The current price used in the trigger. This is based on the triggerAmount and triggerType. TriggerPrice float64 `json:"triggerPrice"` + // Only for stop orders: The value used for the triggerType to determine the triggerPrice. TriggerAmount float64 `json:"triggerAmount"` - // Only for stop orders + + // Only for stop orders. // Enum: "price" TriggerType string `json:"triggerType"` + // Only for stop orders: The reference price used for stop orders. // Enum: "lastTrade" | "bestBid" | "bestAsk" | "midPrice" TriggerReference string `json:"triggerReference"` + // Only for limit orders: Determines how long orders remain active. // Possible values: Good-Til-Canceled (GTC), Immediate-Or-Cancel (IOC), Fill-Or-Kill (FOK). // GTC orders will remain on the order book until they are filled or canceled. @@ -57,8 +73,10 @@ type Order struct { // FOK orders will fill against existing orders in its entirety, or will be canceled (if the entire order cannot be filled). // Enum: "GTC" | "IOC" | "FOK" TimeInForce string `json:"timeInForce"` + // Default: false PostOnly bool `json:"postOnly"` + // Self trading is not allowed on Bitvavo. Multiple options are available to prevent this from happening. // The default ‘decrementAndCancel’ decrements both orders by the amount that would have been filled, which in turn cancels the smallest of the two orders. // ‘cancelOldest’ will cancel the entire older order and places the new order. @@ -67,16 +85,19 @@ type Order struct { // Default: "decrementAndCancel" // Enum: "decrementAndCancel" | "cancelOldest" | "cancelNewest" | "cancelBoth" SelfTradePrevention string `json:"selfTradePrevention"` + // Whether this order is visible on the order book. Visible bool `json:"visible"` } type OrderEvent struct { - // Describes the returned event over the socket + // Describes the returned event over the socket. Event string `json:"event"` - // The market which was requested in the subscription + + // The market which was requested in the subscription. Market string `json:"market"` - // The order itself + + // The order itself. Order Order `json:"order"` } @@ -213,35 +234,36 @@ func (f *FillEvent) UnmarshalJSON(data []byte) error { return nil } -type AccountSubscription interface { +type AccountSub interface { // Order channel to receive order events // You can set the buffSize for this channel, 0 for no buffer Order(buffSize uint64) <-chan OrderEvent + // Order channel to receive fill events // You can set the buffSize for this channel, 0 for no buffer Fill(buffSize uint64) <-chan FillEvent } -type accountSubscription struct { +type accountSub struct { orderchn chan<- OrderEvent fillchn chan<- FillEvent } -func (a *accountSubscription) Order(buffSize uint64) <-chan OrderEvent { +func (a *accountSub) Order(buffSize uint64) <-chan OrderEvent { orderchn := make(chan OrderEvent, buffSize) a.orderchn = orderchn return orderchn } -func (a *accountSubscription) Fill(buffSize uint64) <-chan FillEvent { +func (a *accountSub) Fill(buffSize uint64) <-chan FillEvent { fillchn := make(chan FillEvent, buffSize) a.fillchn = fillchn return fillchn } -type AccountWsHandler interface { +type AccountEventHandler interface { // Subscribe to market - Subscribe(market string) (AccountSubscription, error) + Subscribe(market string) (AccountSub, error) // Unsubscribe from market Unsubscribe(market string) error @@ -250,26 +272,28 @@ type AccountWsHandler interface { UnsubscribeAll() error } -type accountWsHandler struct { +type accountEventHandler struct { apiKey string apiSecret string + windowTimeMs uint64 authenticated bool authchn chan bool writechn chan<- WebSocketMessage - subs *safemap.SafeMap[string, *accountSubscription] + subs *safemap.SafeMap[string, *accountSub] } -func newAccountWsHandler(apiKey string, apiSecret string, writechn chan<- WebSocketMessage) *accountWsHandler { - return &accountWsHandler{ - apiKey: apiKey, - apiSecret: apiSecret, - writechn: writechn, - authchn: make(chan bool), - subs: safemap.New[string, *accountSubscription](), +func newAccountEventHandler(apiKey string, apiSecret string, windowTimeMs uint64, writechn chan<- WebSocketMessage) *accountEventHandler { + return &accountEventHandler{ + apiKey: apiKey, + apiSecret: apiSecret, + windowTimeMs: windowTimeMs, + writechn: writechn, + authchn: make(chan bool), + subs: safemap.New[string, *accountSub](), } } -func (t *accountWsHandler) Subscribe(market string) (AccountSubscription, error) { +func (t *accountEventHandler) Subscribe(market string) (AccountSub, error) { if t.subs.Has(market) { return nil, fmt.Errorf("subscription already active for market: %s", market) } @@ -280,7 +304,7 @@ func (t *accountWsHandler) Subscribe(market string) (AccountSubscription, error) return nil, err } - subscription := new(accountSubscription) + subscription := new(accountSub) t.subs.Set(market, subscription) @@ -288,7 +312,7 @@ func (t *accountWsHandler) Subscribe(market string) (AccountSubscription, error) } -func (t *accountWsHandler) Unsubscribe(market string) error { +func (t *accountEventHandler) Unsubscribe(market string) error { sub, exist := t.subs.Get(market) if exist { @@ -310,7 +334,7 @@ func (t *accountWsHandler) Unsubscribe(market string) error { return fmt.Errorf("no subscription active for market: %s", market) } -func (t *accountWsHandler) UnsubscribeAll() error { +func (t *accountEventHandler) UnsubscribeAll() error { for sub := range t.subs.IterBuffered() { market := sub.Key if err := t.Unsubscribe(market); err != nil { @@ -320,7 +344,7 @@ func (t *accountWsHandler) UnsubscribeAll() error { return nil } -func (t *accountWsHandler) handleOrderMessage(bytes []byte) { +func (t *accountEventHandler) handleOrderMessage(bytes []byte) { var orderEvent *OrderEvent if err := json.Unmarshal(bytes, &orderEvent); err != nil { log.Logger().Error("Couldn't unmarshal message into OrderEvent", "message", string(bytes)) @@ -330,7 +354,7 @@ func (t *accountWsHandler) handleOrderMessage(bytes []byte) { } } -func (t *accountWsHandler) handleFillMessage(bytes []byte) { +func (t *accountEventHandler) handleFillMessage(bytes []byte) { var fillEvent *FillEvent if err := json.Unmarshal(bytes, &fillEvent); err != nil { log.Logger().Error("Couldn't unmarshal message into FillEvent", "message", string(bytes)) @@ -340,7 +364,7 @@ func (t *accountWsHandler) handleFillMessage(bytes []byte) { } } -func (t *accountWsHandler) handleAuthMessage(bytes []byte) { +func (t *accountEventHandler) handleAuthMessage(bytes []byte) { var authEvent *AuthEvent if err := json.Unmarshal(bytes, &authEvent); err != nil { log.Logger().Error("Couldn't unmarshal message into AuthEvent", "message", string(bytes)) @@ -350,30 +374,29 @@ func (t *accountWsHandler) handleAuthMessage(bytes []byte) { } } -func newWebSocketAuthMessage(apiKey string, apiSecret string) WebSocketMessage { +func newWebSocketAuthMessage(apiKey string, apiSecret string, windowTimeMs uint64) WebSocketMessage { timestamp := time.Now().UnixMilli() return WebSocketMessage{ Action: ActionAuthenticate.Value, Key: apiKey, Signature: createSignature(timestamp, apiSecret), Timestamp: timestamp, - Window: 10000, + Window: windowTimeMs, } } func createSignature(timestamp int64, apiSecret string) string { hash := hmac.New(sha256.New, []byte(apiSecret)) hash.Write([]byte(fmt.Sprintf("%dGET/v2/websocket", timestamp))) - sha := hex.EncodeToString(hash.Sum(nil)) - return sha + return hex.EncodeToString(hash.Sum(nil)) } -func (t *accountWsHandler) authenticate() { - t.writechn <- newWebSocketAuthMessage(t.apiKey, t.apiSecret) +func (t *accountEventHandler) authenticate() { + t.writechn <- newWebSocketAuthMessage(t.apiKey, t.apiSecret, t.windowTimeMs) t.authenticated = <-t.authchn } -func (t *accountWsHandler) reconnect() { +func (t *accountEventHandler) reconnect() { t.authenticated = false for sub := range t.subs.IterBuffered() { @@ -386,7 +409,7 @@ func (t *accountWsHandler) reconnect() { } } -func (t *accountWsHandler) withAuth(action func()) error { +func (t *accountEventHandler) withAuth(action func()) error { if !t.authenticated { t.authenticate() } @@ -399,7 +422,7 @@ func (t *accountWsHandler) withAuth(action func()) error { return fmt.Errorf("could not subscribe, authentication failed") } -func (t *accountWsHandler) hasOrderChn(market string) bool { +func (t *accountEventHandler) hasOrderChn(market string) bool { sub, exist := t.subs.Get(market) if exist { @@ -409,7 +432,7 @@ func (t *accountWsHandler) hasOrderChn(market string) bool { return false } -func (t *accountWsHandler) hasFillChn(market string) bool { +func (t *accountEventHandler) hasFillChn(market string) bool { sub, exist := t.subs.Get(market) if exist { diff --git a/book.go b/book.go index 1fefc54..1cfa100 100644 --- a/book.go +++ b/book.go @@ -11,27 +11,32 @@ import ( ) type BookEvent struct { - // Describes the returned event over the socket + // Describes the returned event over the socket. Event string `json:"event"` - // The market which was requested in the subscription + + // The market which was requested in the subscription. Market string `json:"market"` - // The book containing the bids and asks + + // The book containing the bids and asks. Book Book `json:"book"` } type Page struct { - // Bid / ask price + // Bid / ask price. Price float64 `json:"price"` - // Size of 0 means orders are no longer present at that price level, otherwise the returned size is the new total size on that price level + + // Size of 0 means orders are no longer present at that price level, otherwise the returned size is the new total size on that price level. Size float64 `json:"size"` } type Book struct { - // Integer which is increased by one for every update to the book. Useful for synchronizing. Resets to zero after restarting the matching engine + // Integer which is increased by one for every update to the book. Useful for synchronizing. Resets to zero after restarting the matching engine. Nonce int64 `json:"nonce"` + // Slice with all bids in the format [price, size], where an size of 0 means orders are no longer present at that price level, - // otherwise the returned size is the new total size on that price level + // otherwise the returned size is the new total size on that price level. Bids []Page `json:"bids"` + // Slice with all asks in the format [price, size], where an size of 0 means orders are no longer present at that price level, // otherwise the returned size is the new total size on that price level. Asks []Page `json:"asks"` @@ -85,32 +90,32 @@ func (b *BookEvent) UnmarshalJSON(bytes []byte) error { return nil } -type bookWsHandler struct { +type bookEventHandler struct { writechn chan<- WebSocketMessage subs *safemap.SafeMap[string, chan<- BookEvent] } -func newBookWsHandler(writechn chan<- WebSocketMessage) *bookWsHandler { - return &bookWsHandler{ +func newBookEventHandler(writechn chan<- WebSocketMessage) *bookEventHandler { + return &bookEventHandler{ writechn: writechn, subs: safemap.New[string, chan<- BookEvent](), } } -func (t *bookWsHandler) Subscribe(market string) (<-chan BookEvent, error) { +func (t *bookEventHandler) Subscribe(market string, buffSize uint64) (<-chan BookEvent, error) { if t.subs.Has(market) { return nil, fmt.Errorf("subscription already active for market: %s", market) } t.writechn <- newWebSocketMessage(ActionSubscribe, ChannelNameBook, market) - chn := make(chan BookEvent) + chn := make(chan BookEvent, buffSize) t.subs.Set(market, chn) return chn, nil } -func (t *bookWsHandler) Unsubscribe(market string) error { +func (t *bookEventHandler) Unsubscribe(market string) error { sub, exist := t.subs.Get(market) if exist { @@ -123,7 +128,7 @@ func (t *bookWsHandler) Unsubscribe(market string) error { return fmt.Errorf("no subscription active for market: %s", market) } -func (t *bookWsHandler) UnsubscribeAll() error { +func (t *bookEventHandler) UnsubscribeAll() error { for sub := range t.subs.IterBuffered() { market := sub.Key if err := t.Unsubscribe(market); err != nil { @@ -133,7 +138,7 @@ func (t *bookWsHandler) UnsubscribeAll() error { return nil } -func (t *bookWsHandler) handleMessage(bytes []byte) { +func (t *bookEventHandler) handleMessage(bytes []byte) { var bookEvent *BookEvent if err := json.Unmarshal(bytes, &bookEvent); err != nil { log.Logger().Error("Couldn't unmarshal message into BookEvent", "message", string(bytes)) @@ -148,7 +153,7 @@ func (t *bookWsHandler) handleMessage(bytes []byte) { } } -func (t *bookWsHandler) reconnect() { +func (t *bookEventHandler) reconnect() { for sub := range t.subs.IterBuffered() { market := sub.Key t.writechn <- newWebSocketMessage(ActionSubscribe, ChannelNameBook, market) diff --git a/candles.go b/candles.go index b9a1780..78a50c6 100644 --- a/candles.go +++ b/candles.go @@ -12,18 +12,21 @@ import ( ) type CandlesEvent struct { - // Describes the returned event over the socket + // Describes the returned event over the socket. Event string `json:"event"` - // The market which was requested in the subscription + + // The market which was requested in the subscription. Market string `json:"market"` - //The interval which was requested in the subscription + + //The interval which was requested in the subscription. Interval string `json:"interval"` - // The candle in the defined time period + + // The candle in the defined time period. Candle Candle `json:"candle"` } type Candle struct { - // Timestamp in unix milliseconds + // Timestamp in unix milliseconds. Timestamp int64 `json:"timestamp"` Open float64 `json:"open"` High float64 `json:"high"` @@ -55,9 +58,10 @@ func (c *Candle) UnmarshalJSON(data []byte) error { return nil } -type CandlesWsHandler interface { - // Subscribe to market with interval - Subscribe(market string, interval string) (<-chan CandlesEvent, error) +type CandlesEventHandler interface { + // Subscribe to market with interval. + // You can set the buffSize for this channel, 0 for no buffer + Subscribe(market string, interval string, buffSize uint64) (<-chan CandlesEvent, error) // Unsubscribe from market with interval Unsubscribe(market string, interval string) error @@ -66,13 +70,13 @@ type CandlesWsHandler interface { UnsubscribeAll() error } -type candleWsHandler struct { +type candlesEventHandler struct { writechn chan<- WebSocketMessage subs *safemap.SafeMap[string, chan<- CandlesEvent] } -func newCandleWsHandler(writechn chan<- WebSocketMessage) *candleWsHandler { - return &candleWsHandler{ +func newCandlesEventHandler(writechn chan<- WebSocketMessage) *candlesEventHandler { + return &candlesEventHandler{ writechn: writechn, subs: safemap.New[string, chan<- CandlesEvent](), } @@ -91,7 +95,7 @@ func newCandleWebSocketMessage(action Action, market string, interval string) We } } -func (c *candleWsHandler) Subscribe(market string, interval string) (<-chan CandlesEvent, error) { +func (c *candlesEventHandler) Subscribe(market string, interval string, buffSize uint64) (<-chan CandlesEvent, error) { key := getMapKey(market, interval) if c.subs.Has(key) { @@ -100,13 +104,13 @@ func (c *candleWsHandler) Subscribe(market string, interval string) (<-chan Cand c.writechn <- newCandleWebSocketMessage(ActionSubscribe, market, interval) - chn := make(chan CandlesEvent) + chn := make(chan CandlesEvent, buffSize) c.subs.Set(key, chn) return chn, nil } -func (c *candleWsHandler) Unsubscribe(market string, interval string) error { +func (c *candlesEventHandler) Unsubscribe(market string, interval string) error { key := getMapKey(market, interval) sub, exist := c.subs.Get(key) @@ -120,7 +124,7 @@ func (c *candleWsHandler) Unsubscribe(market string, interval string) error { return fmt.Errorf("no subscription active for market: %s with interval: %s", market, interval) } -func (c *candleWsHandler) UnsubscribeAll() error { +func (c *candlesEventHandler) UnsubscribeAll() error { for sub := range c.subs.IterBuffered() { market, interval := getMapKeyValue(sub.Key) if err := c.Unsubscribe(market, interval); err != nil { @@ -130,7 +134,7 @@ func (c *candleWsHandler) UnsubscribeAll() error { return nil } -func (c *candleWsHandler) handleMessage(bytes []byte) { +func (c *candlesEventHandler) handleMessage(bytes []byte) { var candleEvent *CandlesEvent if err := json.Unmarshal(bytes, &candleEvent); err != nil { log.Logger().Error("Couldn't unmarshal message into CandlesEvent", "message", string(bytes)) @@ -150,7 +154,7 @@ func (c *candleWsHandler) handleMessage(bytes []byte) { } } -func (c *candleWsHandler) reconnect() { +func (c *candlesEventHandler) reconnect() { for sub := range c.subs.IterBuffered() { market, interval := getMapKeyValue(sub.Key) c.writechn <- newCandleWebSocketMessage(ActionSubscribe, market, interval) diff --git a/examples/main.go b/examples/account/main.go similarity index 100% rename from examples/main.go rename to examples/account/main.go diff --git a/examples/candles/main.go b/examples/candles/main.go new file mode 100644 index 0000000..425cb52 --- /dev/null +++ b/examples/candles/main.go @@ -0,0 +1,23 @@ +package main + +import ( + "log" + + "github.com/larscom/go-bitvavo/v2" +) + +func main() { + ws, err := bitvavo.NewWebSocket(bitvavo.WithDebug(true)) + if err != nil { + log.Fatal(err) + } + + candlechn, err := ws.Candles().Subscribe("ETH-EUR", "5m", 0) + if err != nil { + log.Fatal(err) + } + + for value := range candlechn { + log.Println("value", value) + } +} diff --git a/ticker.go b/ticker.go index e4454ac..e5af59f 100644 --- a/ticker.go +++ b/ticker.go @@ -11,24 +11,30 @@ import ( ) type TickerEvent struct { - // Describes the returned event over the socket + // Describes the returned event over the socket. Event string `json:"event"` - // The market which was requested in the subscription + + // The market which was requested in the subscription. Market string `json:"market"` - // The ticker containing the prices + + // The ticker containing the prices. Ticker Ticker `json:"ticker"` } type Ticker struct { - // The price of the best (highest) bid offer available, only sent when either bestBid or bestBidSize has changed + // The price of the best (highest) bid offer available, only sent when either bestBid or bestBidSize has changed. BestBid float64 `json:"bestBid"` - // The size of the best (highest) bid offer available, only sent when either bestBid or bestBidSize has changed + + // The size of the best (highest) bid offer available, only sent when either bestBid or bestBidSize has changed. BestBidSize float64 `json:"bestBidSize"` - // The price of the best (lowest) ask offer available, only sent when either bestAsk or bestAskSize has changed + + // The price of the best (lowest) ask offer available, only sent when either bestAsk or bestAskSize has changed. BestAsk float64 `json:"bestAsk"` - // The size of the best (lowest) ask offer available, only sent when either bestAsk or bestAskSize has changed + + // The size of the best (lowest) ask offer available, only sent when either bestAsk or bestAskSize has changed. BestAskSize float64 `json:"bestAskSize"` - // The last price for which a trade has occurred, only sent when lastPrice has changed + + // The last price for which a trade has occurred, only sent when lastPrice has changed. LastPrice float64 `json:"lastPrice"` } @@ -61,32 +67,32 @@ func (t *TickerEvent) UnmarshalJSON(data []byte) error { return nil } -type tickerWsHandler struct { +type tickerEventHandler struct { writechn chan<- WebSocketMessage subs *safemap.SafeMap[string, chan<- TickerEvent] } -func newTickerWsHandler(writechn chan<- WebSocketMessage) *tickerWsHandler { - return &tickerWsHandler{ +func newTickerEventHandler(writechn chan<- WebSocketMessage) *tickerEventHandler { + return &tickerEventHandler{ writechn: writechn, subs: safemap.New[string, chan<- TickerEvent](), } } -func (t *tickerWsHandler) Subscribe(market string) (<-chan TickerEvent, error) { +func (t *tickerEventHandler) Subscribe(market string, buffSize uint64) (<-chan TickerEvent, error) { if t.subs.Has(market) { return nil, fmt.Errorf("subscription already active for market: %s", market) } t.writechn <- newWebSocketMessage(ActionSubscribe, ChannelNameTicker, market) - chn := make(chan TickerEvent) + chn := make(chan TickerEvent, buffSize) t.subs.Set(market, chn) return chn, nil } -func (t *tickerWsHandler) Unsubscribe(market string) error { +func (t *tickerEventHandler) Unsubscribe(market string) error { sub, exist := t.subs.Get(market) if exist { @@ -99,7 +105,7 @@ func (t *tickerWsHandler) Unsubscribe(market string) error { return fmt.Errorf("no subscription active for market: %s", market) } -func (t *tickerWsHandler) UnsubscribeAll() error { +func (t *tickerEventHandler) UnsubscribeAll() error { for sub := range t.subs.IterBuffered() { market := sub.Key if err := t.Unsubscribe(market); err != nil { @@ -109,7 +115,7 @@ func (t *tickerWsHandler) UnsubscribeAll() error { return nil } -func (t *tickerWsHandler) handleMessage(bytes []byte) { +func (t *tickerEventHandler) handleMessage(bytes []byte) { var tickerEvent *TickerEvent if err := json.Unmarshal(bytes, &tickerEvent); err != nil { log.Logger().Error("Couldn't unmarshal message into TickerEvent", "message", string(bytes)) @@ -124,7 +130,7 @@ func (t *tickerWsHandler) handleMessage(bytes []byte) { } } -func (t *tickerWsHandler) reconnect() { +func (t *tickerEventHandler) reconnect() { for sub := range t.subs.IterBuffered() { market := sub.Key t.writechn <- newWebSocketMessage(ActionSubscribe, ChannelNameTicker, market) diff --git a/ticker24h.go b/ticker24h.go index 049d53e..9f6a16c 100644 --- a/ticker24h.go +++ b/ticker24h.go @@ -11,42 +11,57 @@ import ( ) type Ticker24hEvent struct { - // Describes the returned event over the socket + // Describes the returned event over the socket. Event string `json:"event"` - // The market which was requested in the subscription + + // The market which was requested in the subscription. Market string `json:"market"` - // The ticker24h containing the prices etc + + // The ticker24h containing the prices etc. Ticker24h Ticker24h `json:"ticker24h"` } type Ticker24h struct { - // The open price of the 24 hour period + // The open price of the 24 hour period. Open float64 `json:"open"` - // The highest price for which a trade occurred in the 24 hour period + + // The highest price for which a trade occurred in the 24 hour period. High float64 `json:"high"` - // The lowest price for which a trade occurred in the 24 hour period + + // The lowest price for which a trade occurred in the 24 hour period. Low float64 `json:"low"` - // The last price for which a trade occurred in the 24 hour period + + // The last price for which a trade occurred in the 24 hour period. Last float64 `json:"last"` - // The total volume of the 24 hour period in base currency + + // The total volume of the 24 hour period in base currency. Volume float64 `json:"volume"` - // The total volume of the 24 hour period in quote currency + + // The total volume of the 24 hour period in quote currency. VolumeQuote float64 `json:"volumeQuote"` - // The best (highest) bid offer at the current moment + + // The best (highest) bid offer at the current moment. Bid float64 `json:"bid"` - // The size of the best (highest) bid offer + + // The size of the best (highest) bid offer. BidSize float64 `json:"bidSize"` - // The best (lowest) ask offer at the current moment + + // The best (lowest) ask offer at the current moment. Ask float64 `json:"ask"` - // The size of the best (lowest) ask offer + + // The size of the best (lowest) ask offer. AskSize float64 `json:"askSize"` - // Timestamp in unix milliseconds + + // Timestamp in unix milliseconds. Timestamp int64 `json:"timestamp"` - // Start timestamp in unix milliseconds + + // Start timestamp in unix milliseconds. StartTimestamp int64 `json:"startTimestamp"` - // Open timestamp in unix milliseconds + + // Open timestamp in unix milliseconds. OpenTimestamp int64 `json:"openTimestamp"` - // Close timestamp in unix milliseconds + + // Close timestamp in unix milliseconds. CloseTimestamp int64 `json:"closeTimestamp"` } @@ -105,32 +120,32 @@ func (t *Ticker24hEvent) UnmarshalJSON(bytes []byte) error { return nil } -type ticker24hWsHandler struct { +type ticker24hEventHandler struct { writechn chan<- WebSocketMessage subs *safemap.SafeMap[string, chan<- Ticker24hEvent] } -func newTicker24hWsHandler(writechn chan<- WebSocketMessage) *ticker24hWsHandler { - return &ticker24hWsHandler{ +func newTicker24hEventHandler(writechn chan<- WebSocketMessage) *ticker24hEventHandler { + return &ticker24hEventHandler{ writechn: writechn, subs: safemap.New[string, chan<- Ticker24hEvent](), } } -func (t *ticker24hWsHandler) Subscribe(market string) (<-chan Ticker24hEvent, error) { +func (t *ticker24hEventHandler) Subscribe(market string, buffSize uint64) (<-chan Ticker24hEvent, error) { if t.subs.Has(market) { return nil, fmt.Errorf("subscription already active for market: %s", market) } t.writechn <- newWebSocketMessage(ActionSubscribe, ChannelNameTicker24h, market) - chn := make(chan Ticker24hEvent) + chn := make(chan Ticker24hEvent, buffSize) t.subs.Set(market, chn) return chn, nil } -func (t *ticker24hWsHandler) Unsubscribe(market string) error { +func (t *ticker24hEventHandler) Unsubscribe(market string) error { sub, exist := t.subs.Get(market) if exist { @@ -143,7 +158,7 @@ func (t *ticker24hWsHandler) Unsubscribe(market string) error { return fmt.Errorf("no subscription active for market: %s", market) } -func (t *ticker24hWsHandler) UnsubscribeAll() error { +func (t *ticker24hEventHandler) UnsubscribeAll() error { for sub := range t.subs.IterBuffered() { market := sub.Key if err := t.Unsubscribe(market); err != nil { @@ -153,7 +168,7 @@ func (t *ticker24hWsHandler) UnsubscribeAll() error { return nil } -func (t *ticker24hWsHandler) handleMessage(bytes []byte) { +func (t *ticker24hEventHandler) handleMessage(bytes []byte) { var ticker24hEvent *Ticker24hEvent if err := json.Unmarshal(bytes, &ticker24hEvent); err != nil { log.Logger().Error("Couldn't unmarshal message into Ticker24hEvent", "message", string(bytes)) @@ -168,7 +183,7 @@ func (t *ticker24hWsHandler) handleMessage(bytes []byte) { } } -func (t *ticker24hWsHandler) reconnect() { +func (t *ticker24hEventHandler) reconnect() { for sub := range t.subs.IterBuffered() { market := sub.Key t.writechn <- newWebSocketMessage(ActionSubscribe, ChannelNameTicker24h, market) diff --git a/trades.go b/trades.go index 49ee696..e4c2146 100644 --- a/trades.go +++ b/trades.go @@ -11,25 +11,31 @@ import ( ) type TradesEvent struct { - // Describes the returned event over the socket + // Describes the returned event over the socket. Event string `json:"event"` - // The market which was requested in the subscription + + // The market which was requested in the subscription. Market string `json:"market"` - // The trade containing the price, side etc + + // The trade containing the price, side etc. Trade Trade `json:"trade"` } type Trade struct { - // The trade ID of the returned trade (UUID) + // The trade ID of the returned trade (UUID). Id string `json:"id"` - // The amount in base currency for which the trade has been made + + // The amount in base currency for which the trade has been made. Amount float64 `json:"amount"` - // The price in quote currency for which the trade has been made + + // The price in quote currency for which the trade has been made. Price float64 `json:"price"` - // The side for the taker + + // The side for the taker. // Enum: "buy" | "sell" Side string `json:"side"` - // Timestamp in unix milliseconds + + // Timestamp in unix milliseconds. Timestamp int64 `json:"timestamp"` } @@ -64,32 +70,32 @@ func (t *TradesEvent) UnmarshalJSON(bytes []byte) error { return nil } -type tradesWsHandler struct { +type tradesEventHandler struct { writechn chan<- WebSocketMessage subs *safemap.SafeMap[string, chan<- TradesEvent] } -func newTradesWsHandler(writechn chan<- WebSocketMessage) *tradesWsHandler { - return &tradesWsHandler{ +func newTradesEventHandler(writechn chan<- WebSocketMessage) *tradesEventHandler { + return &tradesEventHandler{ writechn: writechn, subs: safemap.New[string, chan<- TradesEvent](), } } -func (t *tradesWsHandler) Subscribe(market string) (<-chan TradesEvent, error) { +func (t *tradesEventHandler) Subscribe(market string, buffSize uint64) (<-chan TradesEvent, error) { if t.subs.Has(market) { return nil, fmt.Errorf("subscription already active for market: %s", market) } t.writechn <- newWebSocketMessage(ActionSubscribe, ChannelNameTrades, market) - chn := make(chan TradesEvent) + chn := make(chan TradesEvent, buffSize) t.subs.Set(market, chn) return chn, nil } -func (t *tradesWsHandler) Unsubscribe(market string) error { +func (t *tradesEventHandler) Unsubscribe(market string) error { sub, exist := t.subs.Get(market) if exist { @@ -102,7 +108,7 @@ func (t *tradesWsHandler) Unsubscribe(market string) error { return fmt.Errorf("no subscription active for market: %s", market) } -func (t *tradesWsHandler) UnsubscribeAll() error { +func (t *tradesEventHandler) UnsubscribeAll() error { for sub := range t.subs.IterBuffered() { market := sub.Key if err := t.Unsubscribe(market); err != nil { @@ -112,7 +118,7 @@ func (t *tradesWsHandler) UnsubscribeAll() error { return nil } -func (t *tradesWsHandler) handleMessage(bytes []byte) { +func (t *tradesEventHandler) handleMessage(bytes []byte) { var tradeEvent *TradesEvent if err := json.Unmarshal(bytes, &tradeEvent); err != nil { log.Logger().Error("Couldn't unmarshal message into TradesEvent", "message", string(bytes)) @@ -127,7 +133,7 @@ func (t *tradesWsHandler) handleMessage(bytes []byte) { } } -func (t *tradesWsHandler) reconnect() { +func (t *tradesEventHandler) reconnect() { for sub := range t.subs.IterBuffered() { market := sub.Key t.writechn <- newWebSocketMessage(ActionSubscribe, ChannelNameTrades, market) diff --git a/types.go b/types.go index b8c9ec7..e0c5817 100644 --- a/types.go +++ b/types.go @@ -40,7 +40,7 @@ var ( ) type SubscribedEvent struct { - // Describes the returned event over the socket + // Describes the returned event over the socket. Event string `json:"event"` // Subscriptions map[event][]markets @@ -48,9 +48,10 @@ type SubscribedEvent struct { } type AuthEvent struct { - // Describes the returned event over the socket + // Describes the returned event over the socket. Event string `json:"event"` - // Whether the user is authenticated + + // Whether the user is authenticated. Authenticated bool `json:"authenticated"` } @@ -68,14 +69,14 @@ type WebSocketMessage struct { Action string `json:"action"` Channels []Channel `json:"channels,omitempty"` - // Api Key + // Api Key. Key string `json:"key,omitempty"` - // SHA256 HMAC hex digest of timestamp + method + url + body + // SHA256 HMAC hex digest of timestamp + method + url + body. Signature string `json:"signature,omitempty"` - // The current timestamp in milliseconds since 1 Jan 1970 + // The current timestamp in milliseconds since 1 Jan 1970. Timestamp int64 `json:"timestamp,omitempty"` - // The window that allows execution of your request in milliseconds since 1 Jan 1970. The default value is 10000 (10s) and maximum value is 60000 (60s) - Window int64 `json:"window,omitempty"` + // The window that allows execution of your request in milliseconds since 1 Jan 1970. The default value is 10000 (10s) and maximum value is 60000 (60s). + Window uint64 `json:"window,omitempty"` } type Channel struct { diff --git a/websocket.go b/websocket.go index 31cc706..ab81e9d 100644 --- a/websocket.go +++ b/websocket.go @@ -16,14 +16,15 @@ const ( handshakeTimeout = 45 * time.Second ) -type WsHandler[T any] interface { - // Subscribe to market - Subscribe(market string) (<-chan T, error) +type EventHandler[T any] interface { + // Subscribe to market. + // You can set the buffSize for the underlying channel, 0 for no buffer. + Subscribe(market string, buffSize uint64) (<-chan T, error) - // Unsubscribe from market + // Unsubscribe from market. Unsubscribe(market string) error - // Unsubscribe from every market + // Unsubscribe from every market. UnsubscribeAll() error } @@ -31,39 +32,42 @@ type WebSocket interface { // Close everything, including subscriptions, underlying websockets, gracefull shutdown... Close() error - // Candles websocket handler to handle candle events and subscriptions - Candles() CandlesWsHandler + // Candles event handler to handle candle events and subscriptions. + Candles() CandlesEventHandler - // Ticker websocket handler to handle ticker events and subscriptions - Ticker() WsHandler[TickerEvent] + // Ticker event handler to handle ticker events and subscriptions. + Ticker() EventHandler[TickerEvent] - // Ticker24h websocket handler to handle ticker24h events and subscriptions - Ticker24h() WsHandler[Ticker24hEvent] + // Ticker24h event handler to handle ticker24h events and subscriptions. + Ticker24h() EventHandler[Ticker24hEvent] - // Trades websocket handler to handle trade events and subscriptions - Trades() WsHandler[TradesEvent] + // Trades event handler to handle trade events and subscriptions. + Trades() EventHandler[TradesEvent] - // Book websocket handler to handle book events and subscriptions - Book() WsHandler[BookEvent] + // Book event handler to handle book events and subscriptions. + Book() EventHandler[BookEvent] - // Account websocket handler to handle account events and subscriptions, requires authentication - Account(apiKey string, apiSecret string) AccountWsHandler + // Account event handler to handle account subscription and order/fill events, requires authentication. + Account(apiKey string, apiSecret string) AccountEventHandler } type webSocket struct { - reconnectCount int64 + reconnectCount uint64 autoReconnect bool conn *websocket.Conn writechn chan WebSocketMessage debug bool - // websocket handlers - candleWsHandler *candleWsHandler - tickerWsHandler *tickerWsHandler - ticker24hWsHandler *ticker24hWsHandler - tradesWsHandler *tradesWsHandler - bookWsHandler *bookWsHandler - accountWsHandler *accountWsHandler + // public + candlesEventHandler *candlesEventHandler + tickerEventHandler *tickerEventHandler + ticker24hEventHandler *ticker24hEventHandler + tradesEventHandler *tradesEventHandler + bookEventHandler *bookEventHandler + + // authenticated + accountEventHandler *accountEventHandler + windowTimeMs uint64 } func NewWebSocket(options ...Option) (WebSocket, error) { @@ -75,6 +79,7 @@ func NewWebSocket(options ...Option) (WebSocket, error) { ws := &webSocket{ conn: conn, autoReconnect: true, + windowTimeMs: 10000, writechn: make(chan WebSocketMessage), } @@ -106,56 +111,75 @@ func WithAutoReconnect(autoReconnect bool) Option { } } -func (ws *webSocket) Candles() CandlesWsHandler { - ws.candleWsHandler = newCandleWsHandler(ws.writechn) - return ws.candleWsHandler +// The time in milliseconds that your request is allowed to execute in. +// The default value is 10000 (10s), the maximum value is 60000 (60s). +func WithWindowTime(windowTimeMs uint64) Option { + return func(ws *webSocket) { + if windowTimeMs > 60000 { + windowTimeMs = 60000 + } + ws.windowTimeMs = windowTimeMs + } +} + +// The buff size for the write channel, by default the write channel is unbuffered. +// The write channel writes messages to the websocket. +func WithWriteBuffSize(buffSize uint64) Option { + return func(ws *webSocket) { + ws.writechn = make(chan WebSocketMessage, buffSize) + } +} + +func (ws *webSocket) Candles() CandlesEventHandler { + ws.candlesEventHandler = newCandlesEventHandler(ws.writechn) + return ws.candlesEventHandler } -func (ws *webSocket) Ticker() WsHandler[TickerEvent] { - ws.tickerWsHandler = newTickerWsHandler(ws.writechn) - return ws.tickerWsHandler +func (ws *webSocket) Ticker() EventHandler[TickerEvent] { + ws.tickerEventHandler = newTickerEventHandler(ws.writechn) + return ws.tickerEventHandler } -func (ws *webSocket) Ticker24h() WsHandler[Ticker24hEvent] { - ws.ticker24hWsHandler = newTicker24hWsHandler(ws.writechn) - return ws.ticker24hWsHandler +func (ws *webSocket) Ticker24h() EventHandler[Ticker24hEvent] { + ws.ticker24hEventHandler = newTicker24hEventHandler(ws.writechn) + return ws.ticker24hEventHandler } -func (ws *webSocket) Trades() WsHandler[TradesEvent] { - ws.tradesWsHandler = newTradesWsHandler(ws.writechn) - return ws.tradesWsHandler +func (ws *webSocket) Trades() EventHandler[TradesEvent] { + ws.tradesEventHandler = newTradesEventHandler(ws.writechn) + return ws.tradesEventHandler } -func (ws *webSocket) Book() WsHandler[BookEvent] { - ws.bookWsHandler = newBookWsHandler(ws.writechn) - return ws.bookWsHandler +func (ws *webSocket) Book() EventHandler[BookEvent] { + ws.bookEventHandler = newBookEventHandler(ws.writechn) + return ws.bookEventHandler } -func (ws *webSocket) Account(apiKey string, apiSecret string) AccountWsHandler { - ws.accountWsHandler = newAccountWsHandler(apiKey, apiSecret, ws.writechn) - return ws.accountWsHandler +func (ws *webSocket) Account(apiKey string, apiSecret string) AccountEventHandler { + ws.accountEventHandler = newAccountEventHandler(apiKey, apiSecret, ws.windowTimeMs, ws.writechn) + return ws.accountEventHandler } func (ws *webSocket) Close() error { defer close(ws.writechn) if ws.hasCandleWsHandler() { - ws.candleWsHandler.UnsubscribeAll() + ws.candlesEventHandler.UnsubscribeAll() } if ws.hasTickerWsHandler() { - ws.tickerWsHandler.UnsubscribeAll() + ws.tickerEventHandler.UnsubscribeAll() } if ws.hasTicker24hWsHandler() { - ws.ticker24hWsHandler.UnsubscribeAll() + ws.ticker24hEventHandler.UnsubscribeAll() } if ws.hasTradesWsHandler() { - ws.tradesWsHandler.UnsubscribeAll() + ws.tradesEventHandler.UnsubscribeAll() } if ws.hasBookWsHandler() { - ws.bookWsHandler.UnsubscribeAll() + ws.bookEventHandler.UnsubscribeAll() } if ws.hasAccountWsHandler() { - ws.accountWsHandler.UnsubscribeAll() + ws.accountEventHandler.UnsubscribeAll() } return ws.conn.Close() @@ -221,22 +245,22 @@ func (ws *webSocket) reconnect() { go ws.readLoop() if ws.hasCandleWsHandler() { - ws.candleWsHandler.reconnect() + ws.candlesEventHandler.reconnect() } if ws.hasTickerWsHandler() { - ws.tickerWsHandler.reconnect() + ws.tickerEventHandler.reconnect() } if ws.hasTicker24hWsHandler() { - ws.ticker24hWsHandler.reconnect() + ws.ticker24hEventHandler.reconnect() } if ws.hasTradesWsHandler() { - ws.tradesWsHandler.reconnect() + ws.tradesEventHandler.reconnect() } if ws.hasBookWsHandler() { - ws.bookWsHandler.reconnect() + ws.bookEventHandler.reconnect() } if ws.hasAccountWsHandler() { - ws.accountWsHandler.reconnect() + ws.accountEventHandler.reconnect() } } @@ -324,7 +348,7 @@ func (ws *webSocket) handleCandleEvent(bytes []byte) { ws.logDebug("Received candles event") if ws.hasCandleWsHandler() { - ws.candleWsHandler.handleMessage(bytes) + ws.candlesEventHandler.handleMessage(bytes) } } @@ -332,7 +356,7 @@ func (ws *webSocket) handleTickerEvent(bytes []byte) { ws.logDebug("Received ticker event") if ws.hasTickerWsHandler() { - ws.tickerWsHandler.handleMessage(bytes) + ws.tickerEventHandler.handleMessage(bytes) } } @@ -340,7 +364,7 @@ func (ws *webSocket) handleTicker24hEvent(bytes []byte) { ws.logDebug("Received ticker24h event") if ws.hasTicker24hWsHandler() { - ws.ticker24hWsHandler.handleMessage(bytes) + ws.ticker24hEventHandler.handleMessage(bytes) } } @@ -348,7 +372,7 @@ func (ws *webSocket) handleTradesEvent(bytes []byte) { ws.logDebug("Received trades event") if ws.hasTradesWsHandler() { - ws.tradesWsHandler.handleMessage(bytes) + ws.tradesEventHandler.handleMessage(bytes) } } @@ -356,7 +380,7 @@ func (ws *webSocket) handleBookEvent(bytes []byte) { ws.logDebug("Received book event") if ws.hasBookWsHandler() { - ws.bookWsHandler.handleMessage(bytes) + ws.bookEventHandler.handleMessage(bytes) } } @@ -364,7 +388,7 @@ func (ws *webSocket) handleOrderEvent(bytes []byte) { ws.logDebug("Received order event") if ws.hasAccountWsHandler() { - ws.accountWsHandler.handleOrderMessage(bytes) + ws.accountEventHandler.handleOrderMessage(bytes) } } @@ -372,7 +396,7 @@ func (ws *webSocket) handleFillEvent(bytes []byte) { ws.logDebug("Received fill event") if ws.hasAccountWsHandler() { - ws.accountWsHandler.handleFillMessage(bytes) + ws.accountEventHandler.handleFillMessage(bytes) } } @@ -380,32 +404,32 @@ func (ws *webSocket) handleAuthEvent(bytes []byte) { ws.logDebug("Received auth event") if ws.hasAccountWsHandler() { - ws.accountWsHandler.handleAuthMessage(bytes) + ws.accountEventHandler.handleAuthMessage(bytes) } } func (ws *webSocket) hasCandleWsHandler() bool { - return ws.candleWsHandler != nil + return ws.candlesEventHandler != nil } func (ws *webSocket) hasTickerWsHandler() bool { - return ws.tickerWsHandler != nil + return ws.tickerEventHandler != nil } func (ws *webSocket) hasTicker24hWsHandler() bool { - return ws.ticker24hWsHandler != nil + return ws.ticker24hEventHandler != nil } func (ws *webSocket) hasTradesWsHandler() bool { - return ws.tradesWsHandler != nil + return ws.tradesEventHandler != nil } func (ws *webSocket) hasBookWsHandler() bool { - return ws.bookWsHandler != nil + return ws.bookEventHandler != nil } func (ws *webSocket) hasAccountWsHandler() bool { - return ws.accountWsHandler != nil + return ws.accountEventHandler != nil } func (ws *webSocket) logDebug(message string, args ...any) {