Skip to content

Commit

Permalink
Implement KuCoin orderbook WebSocket listener (#828)
Browse files Browse the repository at this point in the history
  • Loading branch information
v36u authored Feb 6, 2024
1 parent 3400769 commit 65f2427
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/ExchangeSharp/API/Exchanges/Kraken/ExchangeKrakenAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1484,7 +1484,7 @@ private async Task<List<string>> GetMarketSymbolList(string[] marketSymbols)
/// <returns></returns>
protected override async Task<IWebSocket> OnGetDeltaOrderBookWebSocketAsync(
Action<ExchangeOrderBook> callback,
int maxCount = 20,
int maxCount = 100,
params string[] marketSymbols
)
{
Expand Down Expand Up @@ -1639,7 +1639,7 @@ params string[] marketSymbols
{
Event = ActionType.Subscribe,
Pairs = marketSymbols.ToList(),
SubscriptionSettings = new Subscription { Name = "book", Depth = 100 }
SubscriptionSettings = new Subscription { Name = "book", Depth = maxCount }
};
await _socket.SendMessageAsync(channelAction);
}
Expand Down
128 changes: 127 additions & 1 deletion src/ExchangeSharp/API/Exchanges/KuCoin/ExchangeKuCoinAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ The above copyright notice and this permission notice shall be included in all c
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using ExchangeSharp.API.Exchanges.Kraken.Models.Request;
using ExchangeSharp.API.Exchanges.Kraken.Models.Types;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

Expand All @@ -39,6 +41,7 @@ private ExchangeKuCoinAPI()
NonceEndPointStyle = NonceStyle.UnixMilliseconds;
MarketSymbolSeparator = "-";
RateLimit = new RateGate(20, TimeSpan.FromSeconds(60.0));
WebSocketOrderBookType = WebSocketOrderBookType.FullBookFirstThenDeltas;
}

public override string PeriodSecondsToString(int seconds)
Expand Down Expand Up @@ -233,7 +236,9 @@ protected override async Task<ExchangeOrderBook> OnGetOrderBookAsync(
JToken token = await MakeJsonRequestAsync<JToken>(
"/market/orderbook/level2_" + maxCount + "?symbol=" + marketSymbol
);
return token.ParseOrderBookFromJTokenArrays(asks: "asks", bids: "bids");
var book = token.ParseOrderBookFromJTokenArrays(asks: "asks", bids: "bids", sequence: "sequence");
book.MarketSymbol = marketSymbol;
return book;
}

protected override async Task<ExchangeTicker> OnGetTickerAsync(string marketSymbol)
Expand Down Expand Up @@ -804,6 +809,127 @@ await _socket.SendMessageAsync(
);
}

protected override async Task<IWebSocket> OnGetDeltaOrderBookWebSocketAsync(
Action<ExchangeOrderBook> callback,
int maxCount = 20,
params string[] marketSymbols
)
{
if (marketSymbols == null || marketSymbols.Length == 0)
{
marketSymbols = (await GetMarketSymbolsAsync(true)).ToArray();
}

var initialSequenceIds = new Dictionary<string, long>();

var websocketUrlToken = GetWebsocketBulletToken();

return await ConnectPublicWebSocketAsync(
$"?token={websocketUrlToken}&acceptUserMessage=true",
messageCallback: (_socket, msg) =>
{
var message = msg.ToStringFromUTF8();
var deserializedMessage = JsonConvert.DeserializeObject(message) as JObject;

var data = deserializedMessage["data"];
if (data is null)
{
return Task.CompletedTask;
}

var changes = data["changes"];
var time = data["time"];
var symbol = data["symbol"];
if (changes is null || time is null || symbol is null)
{
return Task.CompletedTask;
}

var parsedTime = time.ConvertInvariant<long>();
var lastUpdatedDateTime = DateTimeOffset
.FromUnixTimeMilliseconds(parsedTime)
.DateTime;

var deltaBook = new ExchangeOrderBook
{
IsFromSnapshot = false,
ExchangeName = ExchangeName.Kucoin,
SequenceId = parsedTime,
MarketSymbol = symbol.ToString(),
LastUpdatedUtc = lastUpdatedDateTime,
};

var rawAsks = changes["asks"] as JArray;
foreach (var rawAsk in rawAsks)
{
var sequence = rawAsk[2].ConvertInvariant<long>();
if (sequence <= initialSequenceIds[deltaBook.MarketSymbol])
{
// A deprecated update should be ignored
continue;
}

var price = rawAsk[0].ConvertInvariant<decimal>();
var quantity = rawAsk[1].ConvertInvariant<decimal>();

deltaBook.Asks[price] = new ExchangeOrderPrice
{
Price = price,
Amount = quantity,
};
}

var rawBids = changes["bids"] as JArray;
foreach (var rawBid in rawBids)
{
var sequence = rawBid[2].ConvertInvariant<long>();
if (sequence <= initialSequenceIds[deltaBook.MarketSymbol])
{
// A deprecated update should be ignored
continue;
}

var price = rawBid[0].ConvertInvariant<decimal>();
var quantity = rawBid[1].ConvertInvariant<decimal>();

deltaBook.Bids[price] = new ExchangeOrderPrice
{
Price = price,
Amount = quantity,
};
}

callback(deltaBook);

return Task.CompletedTask;
},
connectCallback: async (_socket) =>
{
// Get full order book snapshot when connecting
foreach (var marketSymbol in marketSymbols)
{
var initialBook = await OnGetOrderBookAsync(marketSymbol, maxCount);
initialBook.IsFromSnapshot = true;

callback(initialBook);

initialSequenceIds[marketSymbol] = initialBook.SequenceId;
}

var id = CryptoUtility.UtcNow.Ticks;
var topic = $"/market/level2:{string.Join(",", marketSymbols)}";
await _socket.SendMessageAsync(
new
{
id = id++,
type = "subscribe",
topic = topic,
}
);
}
);
}

#endregion Websockets

#region Private Functions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static class ExchangeAPIExtensions
public static async Task<IWebSocket> GetFullOrderBookWebSocketAsync(
this IOrderBookProvider api,
Action<ExchangeOrderBook> callback,
int maxCount = 20,
int maxCount = 100,
params string[] symbols
)
{
Expand Down

0 comments on commit 65f2427

Please sign in to comment.