Skip to content
This repository has been archived by the owner on Jul 4, 2024. It is now read-only.

Commit

Permalink
cleanup, add buffsize
Browse files Browse the repository at this point in the history
  • Loading branch information
larscom committed Nov 26, 2023
1 parent ed6504d commit 1d15ebf
Show file tree
Hide file tree
Showing 11 changed files with 317 additions and 209 deletions.
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
run:
go run ./examples/main.go
go run ./examples/candles/main.go
build:
go build -o ./bin/example ./examples/main.go
go build -o ./bin/candles ./examples/candles/main.go
go build -o ./bin/account ./examples/account/main.go
test:
go test -v ./.../ --race
97 changes: 60 additions & 37 deletions account.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,68 @@ import (

type Order struct {
Guid string `json:"guid"`

// The order id of the returned order.
OrderId string `json:"orderId"`

// Is a timestamp in milliseconds since 1 Jan 1970.
Created int64 `json:"created"`

// Is a timestamp in milliseconds since 1 Jan 1970.
Updated int64 `json:"updated"`
// The current status of the order

// The current status of the order.
// Enum: "new" | "awaitingTrigger" | "canceled" | "canceledAuction" | "canceledSelfTradePrevention" | "canceledIOC" | "canceledFOK" | "canceledMarketProtection" | "canceledPostOnly" | "filled" | "partiallyFilled" | "expired" | "rejected"
Status string `json:"status"`

// Side
// Enum: "buy" | "sell"
Side string `json:"side"`

// OrderType
// Enum: "limit" | "market"
OrderType string `json:"orderType"`

// Original amount.
Amount float64 `json:"amount"`

// Amount remaining (lower than 'amount' after fills).
AmountRemaining float64 `json:"amountRemaining"`

// The price of the order.
Price float64 `json:"price"`

// Amount of 'onHoldCurrency' that is reserved for this order. This is released when orders are canceled.
OnHold float64 `json:"onHold"`

// The currency placed on hold is the quote currency for sell orders and base currency for buy orders.
OnHoldCurrency string `json:"onHoldCurrency"`

// Only for stop orders: The current price used in the trigger. This is based on the triggerAmount and triggerType.
TriggerPrice float64 `json:"triggerPrice"`

// Only for stop orders: The value used for the triggerType to determine the triggerPrice.
TriggerAmount float64 `json:"triggerAmount"`
// Only for stop orders

// Only for stop orders.
// Enum: "price"
TriggerType string `json:"triggerType"`

// Only for stop orders: The reference price used for stop orders.
// Enum: "lastTrade" | "bestBid" | "bestAsk" | "midPrice"
TriggerReference string `json:"triggerReference"`

// Only for limit orders: Determines how long orders remain active.
// Possible values: Good-Til-Canceled (GTC), Immediate-Or-Cancel (IOC), Fill-Or-Kill (FOK).
// GTC orders will remain on the order book until they are filled or canceled.
// IOC orders will fill against existing orders, but will cancel any remaining amount after that.
// FOK orders will fill against existing orders in its entirety, or will be canceled (if the entire order cannot be filled).
// Enum: "GTC" | "IOC" | "FOK"
TimeInForce string `json:"timeInForce"`

// Default: false
PostOnly bool `json:"postOnly"`

// Self trading is not allowed on Bitvavo. Multiple options are available to prevent this from happening.
// The default ‘decrementAndCancel’ decrements both orders by the amount that would have been filled, which in turn cancels the smallest of the two orders.
// ‘cancelOldest’ will cancel the entire older order and places the new order.
Expand All @@ -67,16 +85,19 @@ type Order struct {
// Default: "decrementAndCancel"
// Enum: "decrementAndCancel" | "cancelOldest" | "cancelNewest" | "cancelBoth"
SelfTradePrevention string `json:"selfTradePrevention"`

// Whether this order is visible on the order book.
Visible bool `json:"visible"`
}

type OrderEvent struct {
// Describes the returned event over the socket
// Describes the returned event over the socket.
Event string `json:"event"`
// The market which was requested in the subscription

// The market which was requested in the subscription.
Market string `json:"market"`
// The order itself

// The order itself.
Order Order `json:"order"`
}

Expand Down Expand Up @@ -213,35 +234,36 @@ func (f *FillEvent) UnmarshalJSON(data []byte) error {
return nil
}

type AccountSubscription interface {
type AccountSub interface {
// Order channel to receive order events
// You can set the buffSize for this channel, 0 for no buffer
Order(buffSize uint64) <-chan OrderEvent

// Order channel to receive fill events
// You can set the buffSize for this channel, 0 for no buffer
Fill(buffSize uint64) <-chan FillEvent
}

type accountSubscription struct {
type accountSub struct {
orderchn chan<- OrderEvent
fillchn chan<- FillEvent
}

func (a *accountSubscription) Order(buffSize uint64) <-chan OrderEvent {
func (a *accountSub) Order(buffSize uint64) <-chan OrderEvent {
orderchn := make(chan OrderEvent, buffSize)
a.orderchn = orderchn
return orderchn
}

func (a *accountSubscription) Fill(buffSize uint64) <-chan FillEvent {
func (a *accountSub) Fill(buffSize uint64) <-chan FillEvent {
fillchn := make(chan FillEvent, buffSize)
a.fillchn = fillchn
return fillchn
}

type AccountWsHandler interface {
type AccountEventHandler interface {
// Subscribe to market
Subscribe(market string) (AccountSubscription, error)
Subscribe(market string) (AccountSub, error)

// Unsubscribe from market
Unsubscribe(market string) error
Expand All @@ -250,26 +272,28 @@ type AccountWsHandler interface {
UnsubscribeAll() error
}

type accountWsHandler struct {
type accountEventHandler struct {
apiKey string
apiSecret string
windowTimeMs uint64
authenticated bool
authchn chan bool
writechn chan<- WebSocketMessage
subs *safemap.SafeMap[string, *accountSubscription]
subs *safemap.SafeMap[string, *accountSub]
}

func newAccountWsHandler(apiKey string, apiSecret string, writechn chan<- WebSocketMessage) *accountWsHandler {
return &accountWsHandler{
apiKey: apiKey,
apiSecret: apiSecret,
writechn: writechn,
authchn: make(chan bool),
subs: safemap.New[string, *accountSubscription](),
func newAccountEventHandler(apiKey string, apiSecret string, windowTimeMs uint64, writechn chan<- WebSocketMessage) *accountEventHandler {
return &accountEventHandler{
apiKey: apiKey,
apiSecret: apiSecret,
windowTimeMs: windowTimeMs,
writechn: writechn,
authchn: make(chan bool),
subs: safemap.New[string, *accountSub](),
}
}

func (t *accountWsHandler) Subscribe(market string) (AccountSubscription, error) {
func (t *accountEventHandler) Subscribe(market string) (AccountSub, error) {
if t.subs.Has(market) {
return nil, fmt.Errorf("subscription already active for market: %s", market)
}
Expand All @@ -280,15 +304,15 @@ func (t *accountWsHandler) Subscribe(market string) (AccountSubscription, error)
return nil, err
}

subscription := new(accountSubscription)
subscription := new(accountSub)

t.subs.Set(market, subscription)

return subscription, nil

}

func (t *accountWsHandler) Unsubscribe(market string) error {
func (t *accountEventHandler) Unsubscribe(market string) error {
sub, exist := t.subs.Get(market)

if exist {
Expand All @@ -310,7 +334,7 @@ func (t *accountWsHandler) Unsubscribe(market string) error {
return fmt.Errorf("no subscription active for market: %s", market)
}

func (t *accountWsHandler) UnsubscribeAll() error {
func (t *accountEventHandler) UnsubscribeAll() error {
for sub := range t.subs.IterBuffered() {
market := sub.Key
if err := t.Unsubscribe(market); err != nil {
Expand All @@ -320,7 +344,7 @@ func (t *accountWsHandler) UnsubscribeAll() error {
return nil
}

func (t *accountWsHandler) handleOrderMessage(bytes []byte) {
func (t *accountEventHandler) handleOrderMessage(bytes []byte) {
var orderEvent *OrderEvent
if err := json.Unmarshal(bytes, &orderEvent); err != nil {
log.Logger().Error("Couldn't unmarshal message into OrderEvent", "message", string(bytes))
Expand All @@ -330,7 +354,7 @@ func (t *accountWsHandler) handleOrderMessage(bytes []byte) {
}
}

func (t *accountWsHandler) handleFillMessage(bytes []byte) {
func (t *accountEventHandler) handleFillMessage(bytes []byte) {
var fillEvent *FillEvent
if err := json.Unmarshal(bytes, &fillEvent); err != nil {
log.Logger().Error("Couldn't unmarshal message into FillEvent", "message", string(bytes))
Expand All @@ -340,7 +364,7 @@ func (t *accountWsHandler) handleFillMessage(bytes []byte) {
}
}

func (t *accountWsHandler) handleAuthMessage(bytes []byte) {
func (t *accountEventHandler) handleAuthMessage(bytes []byte) {
var authEvent *AuthEvent
if err := json.Unmarshal(bytes, &authEvent); err != nil {
log.Logger().Error("Couldn't unmarshal message into AuthEvent", "message", string(bytes))
Expand All @@ -350,30 +374,29 @@ func (t *accountWsHandler) handleAuthMessage(bytes []byte) {
}
}

func newWebSocketAuthMessage(apiKey string, apiSecret string) WebSocketMessage {
func newWebSocketAuthMessage(apiKey string, apiSecret string, windowTimeMs uint64) WebSocketMessage {
timestamp := time.Now().UnixMilli()
return WebSocketMessage{
Action: ActionAuthenticate.Value,
Key: apiKey,
Signature: createSignature(timestamp, apiSecret),
Timestamp: timestamp,
Window: 10000,
Window: windowTimeMs,
}
}

func createSignature(timestamp int64, apiSecret string) string {
hash := hmac.New(sha256.New, []byte(apiSecret))
hash.Write([]byte(fmt.Sprintf("%dGET/v2/websocket", timestamp)))
sha := hex.EncodeToString(hash.Sum(nil))
return sha
return hex.EncodeToString(hash.Sum(nil))
}

func (t *accountWsHandler) authenticate() {
t.writechn <- newWebSocketAuthMessage(t.apiKey, t.apiSecret)
func (t *accountEventHandler) authenticate() {
t.writechn <- newWebSocketAuthMessage(t.apiKey, t.apiSecret, t.windowTimeMs)
t.authenticated = <-t.authchn
}

func (t *accountWsHandler) reconnect() {
func (t *accountEventHandler) reconnect() {
t.authenticated = false

for sub := range t.subs.IterBuffered() {
Expand All @@ -386,7 +409,7 @@ func (t *accountWsHandler) reconnect() {
}
}

func (t *accountWsHandler) withAuth(action func()) error {
func (t *accountEventHandler) withAuth(action func()) error {
if !t.authenticated {
t.authenticate()
}
Expand All @@ -399,7 +422,7 @@ func (t *accountWsHandler) withAuth(action func()) error {
return fmt.Errorf("could not subscribe, authentication failed")
}

func (t *accountWsHandler) hasOrderChn(market string) bool {
func (t *accountEventHandler) hasOrderChn(market string) bool {
sub, exist := t.subs.Get(market)

if exist {
Expand All @@ -409,7 +432,7 @@ func (t *accountWsHandler) hasOrderChn(market string) bool {
return false
}

func (t *accountWsHandler) hasFillChn(market string) bool {
func (t *accountEventHandler) hasFillChn(market string) bool {
sub, exist := t.subs.Get(market)

if exist {
Expand Down
Loading

0 comments on commit 1d15ebf

Please sign in to comment.