diff --git a/binance.py b/binance.py index a16744944..ea008de0a 100644 --- a/binance.py +++ b/binance.py @@ -3,7 +3,7 @@ import hmac import json import traceback -from time import time +from time import time, time_ns from urllib.parse import urlencode import aiohttp @@ -17,6 +17,8 @@ class BinanceBot(Bot): def __init__(self, config: dict): self.exchange = "binance" + self.max_n_orders_per_batch = 5 + self.max_n_cancellations_per_batch = 10 super().__init__(config) self.session = aiohttp.ClientSession() self.base_endpoint = "" @@ -35,25 +37,46 @@ async def public_get(self, url: str, params: dict = {}, base_endpoint=None) -> d traceback.print_exc() return {} - async def private_(self, type_: str, base_endpoint: str, url: str, params: dict = {}) -> dict: + async def private_( + self, type_: str, base_endpoint: str, url: str, params: dict = {}, data_: bool = False + ) -> dict: + def stringify(x): + if type(x) == bool: + return "true" if x else "false" + elif type(x) == float: + return format_float(x) + elif type(x) == int: + return str(x) + elif type(x) == list: + return json.dumps([stringify(y) for y in x]).replace(" ", "") + elif type(x) == dict: + return json.dumps({k: stringify(v) for k, v in x.items()}).replace(" ", "") + else: + return x + try: timestamp = int(time() * 1000) params.update({"timestamp": timestamp, "recvWindow": 5000}) for k in params: - if type(params[k]) == bool: - params[k] = "true" if params[k] else "false" - elif type(params[k]) == float: - params[k] = format_float(params[k]) + params[k] = stringify(params[k]) params = sort_dict_keys(params) params["signature"] = hmac.new( self.secret.encode("utf-8"), urlencode(params).encode("utf-8"), hashlib.sha256, ).hexdigest() - async with getattr(self.session, type_)( - base_endpoint + url, params=params, headers=self.headers - ) as response: - result = await response.text() + if data_: + async with getattr(self.session, type_)( + base_endpoint + url, data=params, headers=self.headers + ) as response: + result = await response.text() + else: + params_encoded = urlencode(params) + async with getattr(self.session, type_)( + base_endpoint + url, params=params_encoded, headers=self.headers + ) as response: + result = await response.text() + return json.loads(result) except Exception as e: print(f"error with private {type_} {base_endpoint} {url} {params}") @@ -68,12 +91,11 @@ async def private_get(self, url: str, params: dict = {}, base_endpoint: str = No params, ) - async def private_post(self, url: str, params: dict = {}, base_endpoint: str = None) -> dict: + async def private_post( + self, url: str, params: dict = {}, base_endpoint: str = None, data_: bool = False + ) -> dict: return await self.private_( - "post", - self.base_endpoint if base_endpoint is None else base_endpoint, - url, - params, + "post", self.base_endpoint if base_endpoint is None else base_endpoint, url, params, data_ ) async def private_put(self, url: str, params: dict = {}, base_endpoint: str = None) -> dict: @@ -84,80 +106,92 @@ async def private_put(self, url: str, params: dict = {}, base_endpoint: str = No params, ) - async def private_delete(self, url: str, params: dict = {}, base_endpoint: str = None) -> dict: + async def private_delete( + self, url: str, params: dict = {}, base_endpoint: str = None, data_: bool = False + ) -> dict: return await self.private_( "delete", self.base_endpoint if base_endpoint is None else base_endpoint, url, params, + data_, ) async def init_market_type(self): fapi_endpoint = "https://fapi.binance.com" dapi_endpoint = "https://dapi.binance.com" - self.exchange_info = await self.public_get( - "/fapi/v1/exchangeInfo", base_endpoint=fapi_endpoint - ) - if self.symbol in {e["symbol"] for e in self.exchange_info["symbols"]}: - print("linear perpetual") - self.market_type += "_linear_perpetual" - self.inverse = self.config["inverse"] = False - self.base_endpoint = fapi_endpoint - self.endpoints = { - "time": "/fapi/v1/time", - "position": "/fapi/v2/positionRisk", - "balance": "/fapi/v2/balance", - "exchange_info": "/fapi/v1/exchangeInfo", - "leverage_bracket": "/fapi/v1/leverageBracket", - "open_orders": "/fapi/v1/openOrders", - "ticker": "/fapi/v1/ticker/bookTicker", - "fills": "/fapi/v1/userTrades", - "income": "/fapi/v1/income", - "create_order": "/fapi/v1/order", - "cancel_order": "/fapi/v1/order", - "ticks": "/fapi/v1/aggTrades", - "ohlcvs": "/fapi/v1/klines", - "margin_type": "/fapi/v1/marginType", - "leverage": "/fapi/v1/leverage", - "position_side": "/fapi/v1/positionSide/dual", - "websocket": (ws := f"wss://fstream.binance.com/ws/"), - "websocket_market": ws + f"{self.symbol.lower()}@aggTrade", - "websocket_user": ws, - "listen_key": "/fapi/v1/listenKey", - } - else: + self.exchange_info = None + try: self.exchange_info = await self.public_get( - "/dapi/v1/exchangeInfo", base_endpoint=dapi_endpoint + "/fapi/v1/exchangeInfo", base_endpoint=fapi_endpoint ) if self.symbol in {e["symbol"] for e in self.exchange_info["symbols"]}: - print("inverse coin margined") - self.base_endpoint = dapi_endpoint - self.market_type += "_inverse_coin_margined" - self.inverse = self.config["inverse"] = True + print("linear perpetual") + self.market_type += "_linear_perpetual" + self.inverse = self.config["inverse"] = False + self.base_endpoint = fapi_endpoint self.endpoints = { - "time": "/dapi/v1/time", - "position": "/dapi/v1/positionRisk", - "balance": "/dapi/v1/balance", - "exchange_info": "/dapi/v1/exchangeInfo", - "leverage_bracket": "/dapi/v1/leverageBracket", - "open_orders": "/dapi/v1/openOrders", - "ticker": "/dapi/v1/ticker/bookTicker", - "fills": "/dapi/v1/userTrades", - "income": "/dapi/v1/income", - "create_order": "/dapi/v1/order", - "cancel_order": "/dapi/v1/order", - "ticks": "/dapi/v1/aggTrades", - "ohlcvs": "/dapi/v1/klines", - "margin_type": "/dapi/v1/marginType", - "leverage": "/dapi/v1/leverage", - "position_side": "/dapi/v1/positionSide/dual", - "websocket": (ws := f"wss://dstream.binance.com/ws/"), + "time": "/fapi/v1/time", + "position": "/fapi/v2/positionRisk", + "balance": "/fapi/v2/balance", + "exchange_info": "/fapi/v1/exchangeInfo", + "leverage_bracket": "/fapi/v1/leverageBracket", + "open_orders": "/fapi/v1/openOrders", + "ticker": "/fapi/v1/ticker/bookTicker", + "fills": "/fapi/v1/userTrades", + "income": "/fapi/v1/income", + "create_order": "/fapi/v1/order", + "cancel_order": "/fapi/v1/order", + "ticks": "/fapi/v1/aggTrades", + "ohlcvs": "/fapi/v1/klines", + "margin_type": "/fapi/v1/marginType", + "leverage": "/fapi/v1/leverage", + "position_side": "/fapi/v1/positionSide/dual", + "websocket": (ws := f"wss://fstream.binance.com/ws/"), "websocket_market": ws + f"{self.symbol.lower()}@aggTrade", "websocket_user": ws, - "listen_key": "/dapi/v1/listenKey", + "listen_key": "/fapi/v1/listenKey", + "batch_orders": "/fapi/v1/batchOrders", } else: - raise Exception(f"unknown symbol {self.symbol}") + self.exchange_info = await self.public_get( + "/dapi/v1/exchangeInfo", base_endpoint=dapi_endpoint + ) + if self.symbol in {e["symbol"] for e in self.exchange_info["symbols"]}: + print("inverse coin margined") + self.base_endpoint = dapi_endpoint + self.market_type += "_inverse_coin_margined" + self.inverse = self.config["inverse"] = True + self.endpoints = { + "time": "/dapi/v1/time", + "position": "/dapi/v1/positionRisk", + "balance": "/dapi/v1/balance", + "exchange_info": "/dapi/v1/exchangeInfo", + "leverage_bracket": "/dapi/v1/leverageBracket", + "open_orders": "/dapi/v1/openOrders", + "ticker": "/dapi/v1/ticker/bookTicker", + "fills": "/dapi/v1/userTrades", + "income": "/dapi/v1/income", + "create_order": "/dapi/v1/order", + "cancel_order": "/dapi/v1/order", + "ticks": "/dapi/v1/aggTrades", + "ohlcvs": "/dapi/v1/klines", + "margin_type": "/dapi/v1/marginType", + "leverage": "/dapi/v1/leverage", + "position_side": "/dapi/v1/positionSide/dual", + "websocket": (ws := f"wss://dstream.binance.com/ws/"), + "websocket_market": ws + f"{self.symbol.lower()}@aggTrade", + "websocket_user": ws, + "listen_key": "/dapi/v1/listenKey", + "batch_orders": "/dapi/v1/batchOrders", + } + else: + raise Exception(f"unknown symbol {self.symbol}") + except Exception as e: + logging.error(f"error initiating market type {e}") + print_async_exception(self.exchange_info) + traceback.print_exc() + raise Exception("stopping bot") self.spot_base_endpoint = "https://api.binance.com" self.endpoints["transfer"] = "/sapi/v1/asset/transfer" @@ -190,7 +224,6 @@ async def _init(self): self.min_cost = self.config["min_cost"] = 0.0 break - self.max_leverage = self.config["max_leverage"] = 25 await super()._init() await self.init_order_book() await self.update_position() @@ -245,72 +278,103 @@ async def init_exchange_config(self) -> bool: raise Exception("failed to set hedge mode") async def init_order_book(self): - ticker = await self.public_get(self.endpoints["ticker"], {"symbol": self.symbol}) - if "inverse_coin_margined" in self.market_type: - ticker = ticker[0] - self.ob = [float(ticker["bidPrice"]), float(ticker["askPrice"])] - self.price = np.random.choice(self.ob) + ticker = None + try: + ticker = await self.public_get(self.endpoints["ticker"], {"symbol": self.symbol}) + if "inverse_coin_margined" in self.market_type: + ticker = ticker[0] + self.ob = [float(ticker["bidPrice"]), float(ticker["askPrice"])] + self.price = np.random.choice(self.ob) + return True + except Exception as e: + logging.error(f"error updating order book {e}") + print_async_exception(ticker) + return False async def fetch_open_orders(self) -> [dict]: - return [ - { - "order_id": int(e["orderId"]), - "symbol": e["symbol"], - "price": float(e["price"]), - "qty": float(e["origQty"]), - "type": e["type"].lower(), - "side": e["side"].lower(), - "position_side": e["positionSide"].lower().replace("short", "short"), - "timestamp": int(e["time"]), - } - for e in await self.private_get(self.endpoints["open_orders"], {"symbol": self.symbol}) - ] + open_orders = None + try: + open_orders = await self.private_get( + self.endpoints["open_orders"], {"symbol": self.symbol} + ) + return [ + { + "order_id": int(e["orderId"]), + "symbol": e["symbol"], + "price": float(e["price"]), + "qty": float(e["origQty"]), + "type": e["type"].lower(), + "side": e["side"].lower(), + "position_side": e["positionSide"].lower().replace("short", "short"), + "timestamp": int(e["time"]), + } + for e in open_orders + ] + except Exception as e: + logging.error(f"error fetching open orders {e}") + print_async_exception(open_orders) + traceback.print_exc() + return False async def fetch_position(self) -> dict: - positions, balance = await asyncio.gather( - self.private_get( - self.endpoints["position"], - ( - {"symbol": self.symbol} - if "linear_perpetual" in self.market_type - else {"pair": self.pair} + positions, balance = None, None + try: + positions, balance = await asyncio.gather( + self.private_get( + self.endpoints["position"], + ( + {"symbol": self.symbol} + if "linear_perpetual" in self.market_type + else {"pair": self.pair} + ), ), - ), - self.private_get(self.endpoints["balance"], {}), - ) - assert all( - key in positions[0] for key in ["symbol", "positionAmt", "entryPrice"] - ), "bogus position fetch" - assert all( - key in balance[0] for key in ["asset", "balance", "crossUnPnl"] - ), "bogus balance fetch" - positions = [e for e in positions if e["symbol"] == self.symbol] - position = { - "long": {"size": 0.0, "price": 0.0, "liquidation_price": 0.0}, - "short": {"size": 0.0, "price": 0.0, "liquidation_price": 0.0}, - "wallet_balance": 0.0, - "equity": 0.0, - } - if positions: - for p in positions: - if p["positionSide"] == "LONG": - position["long"] = { - "size": float(p["positionAmt"]), - "price": float(p["entryPrice"]), - "liquidation_price": float(p["liquidationPrice"]), - } - elif p["positionSide"] == "SHORT": - position["short"] = { - "size": float(p["positionAmt"]), - "price": float(p["entryPrice"]), - "liquidation_price": float(p["liquidationPrice"]), - } - for e in balance: - if e["asset"] == (self.quot if "linear_perpetual" in self.market_type else self.coin): - position["wallet_balance"] = float(e["balance"]) - position["equity"] = position["wallet_balance"] + float(e["crossUnPnl"]) - break - return position + self.private_get(self.endpoints["balance"], {}), + ) + assert all( + key in positions[0] for key in ["symbol", "positionAmt", "entryPrice"] + ), "bogus position fetch" + assert all( + key in balance[0] for key in ["asset", "balance", "crossUnPnl"] + ), "bogus balance fetch" + positions = [e for e in positions if e["symbol"] == self.symbol] + position = { + "long": {"size": 0.0, "price": 0.0, "liquidation_price": 0.0}, + "short": {"size": 0.0, "price": 0.0, "liquidation_price": 0.0}, + "wallet_balance": 0.0, + "equity": 0.0, + } + if positions: + for p in positions: + if p["positionSide"] == "LONG": + position["long"] = { + "size": float(p["positionAmt"]), + "price": float(p["entryPrice"]), + "liquidation_price": float(p["liquidationPrice"]), + } + elif p["positionSide"] == "SHORT": + position["short"] = { + "size": float(p["positionAmt"]), + "price": float(p["entryPrice"]), + "liquidation_price": float(p["liquidationPrice"]), + } + for e in balance: + if e["asset"] == (self.quot if "linear_perpetual" in self.market_type else self.coin): + position["wallet_balance"] = float(e["balance"]) + position["equity"] = position["wallet_balance"] + float(e["crossUnPnl"]) + break + return position + except Exception as e: + logging.error(f"error fetching pos or balance {e}") + print_async_exception(positions) + print_async_exception(balance) + traceback.print_exc() + + async def execute_orders(self, orders: [dict]) -> [dict]: + if len(orders) == 0: + return [] + if len(orders) == 1: + return [await self.execute_order(orders[0])] + return await self.execute_batch_orders(orders) async def execute_order(self, order: dict) -> dict: o = None @@ -345,31 +409,74 @@ async def execute_order(self, order: dict) -> dict: traceback.print_exc() return {} - async def execute_cancellation(self, order: dict) -> dict: - cancellation = None + async def execute_batch_orders(self, orders: [dict]) -> [dict]: + executed = None try: - cancellation = await self.private_delete( - self.endpoints["cancel_order"], - {"symbol": self.symbol, "orderId": order["order_id"]}, + to_execute = [] + for order in orders: + params = { + "symbol": self.symbol, + "side": order["side"].upper(), + "positionSide": order["position_side"].replace("short", "short").upper(), + "type": order["type"].upper(), + "quantity": str(order["qty"]), + } + if params["type"] == "LIMIT": + params["timeInForce"] = "GTX" + params["price"] = order["price"] + if "custom_id" in order: + params[ + "newClientOrderId" + ] = f"{order['custom_id']}_{str(int(time() * 1000))[8:]}_{int(np.random.random() * 1000)}" + to_execute.append(params) + executed = await self.private_post( + self.endpoints["batch_orders"], {"batchOrders": to_execute}, data_=True ) + return [ + { + "symbol": self.symbol, + "side": ex["side"].lower(), + "position_side": ex["positionSide"].lower().replace("short", "short"), + "type": ex["type"].lower(), + "qty": float(ex["origQty"]), + "order_id": int(ex["orderId"]), + "price": float(ex["price"]), + } + for ex in executed + ] + except Exception as e: + print(f"error executing order {executed} {e}") + print_async_exception(executed) + traceback.print_exc() + return [] - return { - "symbol": self.symbol, - "side": cancellation["side"].lower(), - "order_id": int(cancellation["orderId"]), - "position_side": cancellation["positionSide"].lower().replace("short", "short"), - "qty": float(cancellation["origQty"]), - "price": float(cancellation["price"]), - } + async def execute_cancellations(self, orders: [dict]) -> [dict]: + if not orders: + return [] + cancellations = [] + symbol = orders[0]["symbol"] if "symbol" in orders[0] else self.symbol + try: + cancellations = await self.private_delete( + self.endpoints["batch_orders"], + {"symbol": symbol, "orderIdList": [order["order_id"] for order in orders]}, + data_=True, + ) + return [ + { + "symbol": symbol, + "side": cancellation["side"].lower(), + "order_id": int(cancellation["orderId"]), + "position_side": cancellation["positionSide"].lower().replace("short", "short"), + "qty": float(cancellation["origQty"]), + "price": float(cancellation["price"]), + } + for cancellation in cancellations + ] except Exception as e: - if cancellation is not None and "code" in cancellation and cancellation["code"] == -2011: - logging.error(f"error cancelling order {cancellation} {order}") # neater error message - else: - print(f"error cancelling order {order} {e}") - print_async_exception(cancellation) - traceback.print_exc() - self.ts_released["force_update"] = 0.0 - return {} + logging.error(f"error cancelling orders {orders} {e}") + print_async_exception(cancellations) + traceback.print_exc() + return [] async def fetch_fills( self, diff --git a/bitget.py b/bitget.py index 3b817ee63..86b84b20a 100644 --- a/bitget.py +++ b/bitget.py @@ -34,6 +34,8 @@ class BitgetBot(Bot): def __init__(self, config: dict): self.is_logged_into_user_stream = False self.exchange = "bitget" + self.max_n_orders_per_batch = 50 + self.max_n_cancellations_per_batch = 60 super().__init__(config) self.base_endpoint = "https://api.bitget.com" self.endpoints = { @@ -44,6 +46,8 @@ def __init__(self, config: dict): "ticker": "/api/mix/v1/market/ticker", "open_orders": "/api/mix/v1/order/current", "create_order": "/api/mix/v1/order/placeOrder", + "batch_orders": "/api/mix/v1/order/batch-orders", + "batch_cancel_orders": "/api/mix/v1/order/cancel-batch-orders", "cancel_order": "/api/mix/v1/order/cancel-order", "ticks": "/api/mix/v1/market/fills", "fills": "/api/mix/v1/order/fills", @@ -168,11 +172,22 @@ async def public_get(self, url: str, params: dict = {}) -> dict: async def private_( self, type_: str, base_endpoint: str, url: str, params: dict = {}, json_: bool = False ) -> dict: + def stringify(x): + if type(x) == bool: + return "true" if x else "false" + elif type(x) == float: + return format_float(x) + elif type(x) == int: + return str(x) + elif type(x) == list: + return [stringify(y) for y in x] + elif type(x) == dict: + return {k: stringify(v) for k, v in x.items()} + else: + return x timestamp = int(time() * 1000) - params = { - k: ("true" if v else "false") if type(v) == bool else str(v) for k, v in params.items() - } + params = {k: stringify(v) for k, v in params.items()} if type_ == "get": url = url + "?" + urlencode(sort_dict_keys(params)) to_sign = str(timestamp) + type_.upper() + url @@ -286,6 +301,13 @@ async def fetch_position(self) -> dict: return position + async def execute_orders(self, orders: [dict]) -> [dict]: + if len(orders) == 0: + return [] + if len(orders) == 1: + return [await self.execute_order(orders[0])] + return await self.execute_batch_orders(orders) + async def execute_order(self, order: dict) -> dict: o = None try: @@ -307,6 +329,7 @@ async def execute_order(self, order: dict) -> dict: custom_id = order["custom_id"] if "custom_id" in order else "0" params["clientOid"] = f"{self.broker_code}#{custom_id}_{random_str}" o = await self.private_post(self.endpoints["create_order"], params) + # print('debug create order', o, order) if o["data"]: # print('debug execute order', o) return { @@ -326,27 +349,81 @@ async def execute_order(self, order: dict) -> dict: traceback.print_exc() return {} - async def execute_cancellation(self, order: dict) -> dict: - cancellation = None + async def execute_batch_orders(self, orders: [dict]) -> [dict]: + executed = None try: - cancellation = await self.private_post( - self.endpoints["cancel_order"], - {"symbol": self.symbol, "marginCoin": self.margin_coin, "orderId": order["order_id"]}, + to_execute = [] + orders_with_custom_ids = [] + for order in orders: + params = { + "size": str(order["qty"]), + "side": self.order_side_map[order["side"]][order["position_side"]], + "orderType": order["type"], + "presetTakeProfitPrice": "", + "presetStopLossPrice": "", + } + if params["orderType"] == "limit": + params["timeInForceValue"] = "post_only" + params["price"] = str(order["price"]) + else: + params["timeInForceValue"] = "normal" + random_str = f"{str(int(time() * 1000))[-6:]}_{int(np.random.random() * 10000)}" + custom_id = order["custom_id"] if "custom_id" in order else "0" + params["clientOid"] = order[ + "custom_id" + ] = f"{self.broker_code}#{custom_id}_{random_str}" + orders_with_custom_ids.append({**order, **{"symbol": self.symbol}}) + to_execute.append(params) + executed = await self.private_post( + self.endpoints["batch_orders"], + {"symbol": self.symbol, "marginCoin": self.margin_coin, "orderDataList": to_execute}, ) - return { - "symbol": self.symbol, - "side": order["side"], - "order_id": cancellation["data"]["orderId"], - "position_side": order["position_side"], - "qty": order["qty"], - "price": order["price"], - } + formatted = [] + for ex in executed["data"]["orderInfo"]: + to_add = {"order_id": ex["orderId"], "custom_id": ex["clientOid"]} + for elm in orders_with_custom_ids: + if elm["custom_id"] == ex["clientOid"]: + to_add.update(elm) + formatted.append(to_add) + break + # print('debug execute batch orders', executed, orders, formatted) + return formatted except Exception as e: - print(f"error cancelling order {order} {e}") - print_async_exception(cancellation) + print(f"error executing order {executed} {e}") + print_async_exception(executed) traceback.print_exc() - self.ts_released["force_update"] = 0.0 - return {} + return [] + + async def execute_cancellations(self, orders: [dict]) -> [dict]: + if not orders: + return [] + cancellations = [] + symbol = orders[0]["symbol"] if "symbol" in orders[0] else self.symbol + try: + cancellations = await self.private_post( + self.endpoints["batch_cancel_orders"], + { + "symbol": symbol, + "marginCoin": self.margin_coin, + "orderIds": [order["order_id"] for order in orders], + }, + ) + + formatted = [] + for oid in cancellations["data"]["order_ids"]: + to_add = {"order_id": oid} + for order in orders: + if order["order_id"] == oid: + to_add.update(order) + formatted.append(to_add) + break + # print('debug cancel batch orders', cancellations, orders, formatted) + return formatted + except Exception as e: + logging.error(f"error cancelling orders {orders} {e}") + print_async_exception(cancellations) + traceback.print_exc() + return [] async def fetch_account(self): raise NotImplementedError("not implemented") @@ -582,7 +659,7 @@ async def init_exchange_config(self): ) print(res) except Exception as e: - print(e) + print("error initiating exchange config", e) def standardize_market_stream_event(self, data: dict) -> [dict]: if "action" not in data or data["action"] != "update": diff --git a/bybit.py b/bybit.py index 6c3bb240f..6a5fe40b1 100644 --- a/bybit.py +++ b/bybit.py @@ -11,7 +11,7 @@ import aiohttp import numpy as np -from njit_funcs import round_ +from njit_funcs import round_, calc_diff from passivbot import Bot, logging from procedures import print_async_exception, print_ from pure_funcs import ts_to_date, sort_dict_keys, date_to_ts @@ -41,7 +41,9 @@ def determine_pos_side(o: dict) -> str: class BybitBot(Bot): def __init__(self, config: dict): self.exchange = "bybit" - self.min_notional = 0.0 + self.min_notional = 1.0 + self.max_n_orders_per_batch = 5 + self.max_n_cancellations_per_batch = 10 super().__init__(config) self.base_endpoint = "https://api.bybit.com" if self.test_mode: @@ -58,7 +60,7 @@ def init_market_type(self): websockets_base_endpoint = "wss://stream.bybit.com" if self.test_mode: websockets_base_endpoint = "wss://stream-testnet.bybit.com" - + if self.symbol.endswith("USDT"): print("linear perpetual") self.market_type += "_linear_perpetual" @@ -117,7 +119,7 @@ def init_market_type(self): self.spot_base_endpoint = "https://api.bybit.com" if self.test_mode: self.spot_base_endpoint = "https://api-testnet.bybit.com" - + self.endpoints["spot_balance"] = "/spot/v1/account" self.endpoints["balance"] = "/v2/private/wallet/balance" self.endpoints["exchange_info"] = "/v2/public/symbols" @@ -271,6 +273,31 @@ async def fetch_position(self) -> dict: } return position + async def execute_orders(self, orders: [dict]) -> [dict]: + if not orders: + return [] + creations = [] + for order in sorted(orders, key=lambda x: calc_diff(x["price"], self.price)): + creation = None + try: + creation = asyncio.create_task(self.execute_order(order)) + creations.append((order, creation)) + except Exception as e: + print(f"error creating order {order} {e}") + print_async_exception(creation) + traceback.print_exc() + results = [] + for creation in creations: + result = None + try: + result = await creation[1] + results.append(result) + except Exception as e: + print(f"error creating order {creation} {e}") + print_async_exception(result) + traceback.print_exc() + return results + async def execute_order(self, order: dict) -> dict: o = None try: @@ -317,6 +344,31 @@ async def execute_order(self, order: dict) -> dict: traceback.print_exc() return {} + async def execute_cancellations(self, orders: [dict]) -> [dict]: + if not orders: + return [] + cancellations = [] + for order in sorted(orders, key=lambda x: calc_diff(x["price"], self.price)): + cancellation = None + try: + cancellation = asyncio.create_task(self.execute_cancellation(order)) + cancellations.append((order, cancellation)) + except Exception as e: + print(f"error cancelling order {order} {e}") + print_async_exception(cancellation) + traceback.print_exc() + results = [] + for cancellation in cancellations: + result = None + try: + result = await cancellation[1] + results.append(result) + except Exception as e: + print(f"error cancelling order {cancellation} {e}") + print_async_exception(result) + traceback.print_exc() + return results + async def execute_cancellation(self, order: dict) -> dict: cancellation = None try: @@ -333,9 +385,17 @@ async def execute_cancellation(self, order: dict) -> dict: "price": order["price"], } except Exception as e: - if cancellation is not None and "ret_code" in cancellation and cancellation["ret_code"] == 20001: - error_cropped = {k: v for k, v in cancellation.items() if k in ["ret_msg", "ret_code"]} - logging.error(f"error cancelling order {error_cropped} {order}") # neater error message + if ( + cancellation is not None + and "ret_code" in cancellation + and cancellation["ret_code"] == 20001 + ): + error_cropped = { + k: v for k, v in cancellation.items() if k in ["ret_msg", "ret_code"] + } + logging.error( + f"error cancelling order {error_cropped} {order}" + ) # neater error message else: print(f"error cancelling order {order} {e}") print_async_exception(cancellation) @@ -608,21 +668,29 @@ async def init_exchange_config(self): print(res) res = await self.private_post( "/private/linear/position/set-leverage", - {"symbol": self.symbol, "buy_leverage": self.leverage, "sell_leverage": self.leverage}, + { + "symbol": self.symbol, + "buy_leverage": self.leverage, + "sell_leverage": self.leverage, + }, ) print(res) elif "inverse_perpetual" in self.market_type: res = await self.private_post( "/v2/private/position/switch-isolated", - {"symbol": self.symbol, "is_isolated": False, - "buy_leverage": self.leverage, "sell_leverage": self.leverage}, + { + "symbol": self.symbol, + "is_isolated": False, + "buy_leverage": self.leverage, + "sell_leverage": self.leverage, + }, ) - print('1', res) + print("1", res) res = await self.private_post( "/v2/private/position/leverage/save", {"symbol": self.symbol, "leverage": self.leverage, "leverage_only": True}, ) - print('2', res) + print("2", res) except Exception as e: print(e) diff --git a/configs/live/neat_grid_mode.example.json b/configs/live/neat_grid_mode.example.json index 4e1b11376..6551fe8cb 100644 --- a/configs/live/neat_grid_mode.example.json +++ b/configs/live/neat_grid_mode.example.json @@ -1,34 +1,34 @@ -{"config_name": "neat_grid_118_symbols_626days", +{"config_name": "neat_grid_118_symbols_688days", "logging_level": 0, - "long": {"auto_unstuck_ema_dist": 0, - "auto_unstuck_wallet_exposure_threshold": 0.012195009245955374, + "long": {"auto_unstuck_ema_dist": 0.001, + "auto_unstuck_wallet_exposure_threshold": 0.1, "backwards_tp": true, - "ema_span_0": 240, - "ema_span_1": 307.0566300659701, + "ema_span_0": 2245.681580255934, + "ema_span_1": 983.7519751474474, "enabled": true, - "eprice_exp_base": 0.9814428052289045, - "eqty_exp_base": 1.8910056941476132, - "grid_span": 0.35120092600982644, - "initial_eprice_ema_dist": -0.0724883315452062, + "eprice_exp_base": 0.9, + "eqty_exp_base": 0.9, + "grid_span": 0.3634600582110678, + "initial_eprice_ema_dist": 0.006593185590130529, "initial_qty_pct": 0.015, - "markup_range": 0.029915290808625504, - "max_n_entry_orders": 9, - "min_markup": 0.003, - "n_close_orders": 10, + "markup_range": 0.03989354389116942, + "max_n_entry_orders": 26, + "min_markup": 0.00390450048454419, + "n_close_orders": 30, "wallet_exposure_limit": 0.1}, - "short": {"auto_unstuck_ema_dist": 0.02, - "auto_unstuck_wallet_exposure_threshold": 0.010010044896137589, + "short": {"auto_unstuck_ema_dist": 0.004166734020707882, + "auto_unstuck_wallet_exposure_threshold": 0.09483563420728498, "backwards_tp": true, - "ema_span_0": 3578.5992758249126, - "ema_span_1": 1300.2248624251254, + "ema_span_0": 240, + "ema_span_1": 240, "enabled": true, - "eprice_exp_base": 0.9, - "eqty_exp_base": 2.741199913514829, - "grid_span": 0.35422351795434553, - "initial_eprice_ema_dist": 0.005310285956060753, + "eprice_exp_base": 0.9030929538070822, + "eqty_exp_base": 1.139844561045326, + "grid_span": 0.5588225833929379, + "initial_eprice_ema_dist": -0.0403097381673301, "initial_qty_pct": 0.015, - "markup_range": 0.011750423363748088, - "max_n_entry_orders": 7, - "min_markup": 0.003, - "n_close_orders": 8, + "markup_range": 0.01710717452246016, + "max_n_entry_orders": 12, + "min_markup": 0.0032327368401944916, + "n_close_orders": 10, "wallet_exposure_limit": 0.1}} diff --git a/configs/optimize/harmony_search.hjson b/configs/optimize/harmony_search.hjson index 990ab0862..51bb26e1f 100644 --- a/configs/optimize/harmony_search.hjson +++ b/configs/optimize/harmony_search.hjson @@ -33,6 +33,10 @@ minimum_eqbal_ratio_min_long: -1 minimum_eqbal_ratio_min_short: -1 + # clip results: compute score on top performers only + # clip_threshold=0.1 means drop 10% worst performers; clip_threshold=0.0 means include all + clip_threshold: 0.4 + # will override starting configs' parameters do_long: true do_short: true diff --git a/configs/optimize/particle_swarm_optimization.hjson b/configs/optimize/particle_swarm_optimization.hjson index 9045db90d..016e7a557 100644 --- a/configs/optimize/particle_swarm_optimization.hjson +++ b/configs/optimize/particle_swarm_optimization.hjson @@ -5,8 +5,8 @@ c0: 1.0 # attraction to local best c1: 0.5 # attraction to global best - n_cpus: 4 - iters: 4000 + n_cpus: 6 + iters: 2000 # score = adg_realized_per_exposure @@ -33,6 +33,10 @@ minimum_eqbal_ratio_min_long: -1 minimum_eqbal_ratio_min_short: -1 + # clip results: compute score on top performers only + # clip_threshold=0.1 means drop 10% worst performers; clip_threshold=0.0 means include all + clip_threshold: 0.4 + # will override starting configs' parameters do_long: true do_short: true @@ -120,7 +124,7 @@ initial_qty_pct: [0.015, 0.015] initial_eprice_ema_dist: [-0.1, 0.01] wallet_exposure_limit: [0.1, 0.1] - ddown_factor: [0.05, 3.0] + ddown_factor: [0.05, 4.0] rentry_pprice_dist: [0.005, 0.05] rentry_pprice_dist_wallet_exposure_weighting: [0.0, 90.0] min_markup: [0.001, 0.004] @@ -136,7 +140,7 @@ initial_qty_pct: [0.015, 0.015] initial_eprice_ema_dist: [-0.1, 0.01] wallet_exposure_limit: [0.1, 0.1] - ddown_factor: [0.05, 3.0] + ddown_factor: [0.05, 4.0] rentry_pprice_dist: [0.005, 0.05] rentry_pprice_dist_wallet_exposure_weighting: [0.0, 90.0] min_markup: [0.001, 0.004] diff --git a/harmony_search.py b/harmony_search.py index 5d081a090..a6eb77416 100644 --- a/harmony_search.py +++ b/harmony_search.py @@ -23,6 +23,7 @@ sort_dict_keys, determine_passivbot_mode, get_empty_analysis, + calc_scores, ) from procedures import ( add_argparse_args, @@ -162,46 +163,13 @@ def post_process(self, wi: int): results = deepcopy(self.unfinished_evals[id_key]["single_results"]) if set(results) == set(self.symbols): # completed multisymbol iter - sides = ["long", "short"] - keys = [ - ("adg_realized_per_exposure", True), - ("pa_distance_std", False), - ("pa_distance_mean", False), - ("hrs_stuck_max", False), - ("loss_profit_ratio", False), - ("eqbal_ratio_min", True), - ] - means = {s: {} for s in sides} # adjusted means - scores = {s: -1.0 for s in sides} - raws = {s: {} for s in sides} # unadjusted means - for side in sides: - for key, mult in keys: - raws[side][key] = np.mean([v[f"{key}_{side}"] for v in results.values()]) - if (max_key := f"maximum_{key}_{side}") in self.config: - if self.config[max_key] >= 0.0: - ms = [ - max(self.config[max_key], v[f"{key}_{side}"]) - for v in results.values() - ] - means[side][key] = max(np.mean(ms), self.config[max_key]) - else: - means[side][key] = 1.0 - elif (min_key := f"minimum_{key}_{side}") in self.config: - if self.config[min_key] >= 0.0: - ms = [ - min(self.config[min_key], v[f"{key}_{side}"]) - for v in results.values() - ] - means[side][key] = min(np.mean(ms), self.config[min_key]) - else: - means[side][key] = 1.0 - else: - means[side][key] = np.mean([v[f"{key}_{side}"] for v in results.values()]) - if mult: - scores[side] *= means[side][key] - else: - scores[side] /= means[side][key] - + scores_res = calc_scores(self.config, results) + scores, means, raws, keys = ( + scores_res["scores"], + scores_res["means"], + scores_res["raws"], + scores_res["keys"], + ) # check whether initial eval or new harmony if "initial_eval_key" in cfg: self.hm[cfg["initial_eval_key"]]["long"]["score"] = scores["long"] diff --git a/inspect_opt_results.py b/inspect_opt_results.py index 4995722dd..e0aff4328 100755 --- a/inspect_opt_results.py +++ b/inspect_opt_results.py @@ -10,7 +10,7 @@ import argparse import hjson from procedures import load_live_config, dump_live_config, make_get_filepath -from pure_funcs import config_pretty_str, candidate_to_live_config +from pure_funcs import config_pretty_str, candidate_to_live_config, calc_scores from njit_funcs import round_dynamic @@ -29,6 +29,7 @@ def main(): ("hss", "maximum_hrs_stuck_max_short"), ("erl", "minimum_eqbal_ratio_min_long"), ("ers", "minimum_eqbal_ratio_min_short"), + ("ct", "clip_threshold"), ] for k0, k1 in weights_keys: parser.add_argument( @@ -78,49 +79,26 @@ def main(): print(f"{'n results': <{klen}} {len(results)}") sides = ["long", "short"] - keys = [ - ("adg_realized_per_exposure", True), - ("pa_distance_std", False), - ("pa_distance_mean", False), - ("hrs_stuck_max", False), - ("loss_profit_ratio", False), - ("eqbal_ratio_min", True), - ] all_scores = [] symbols = [s for s in results[0]["results"] if s != "config_no"] for r in results: - cfg = r["config"] + cfg = r["config"].copy() + cfg.update(minsmaxs) ress = r["results"] - - means = {s: {} for s in sides} # adjusted means - scores = {s: -1.0 for s in sides} - raws = {s: {} for s in sides} # unadjusted means all_scores.append({}) + scores_res = calc_scores(cfg, {s: r["results"][s] for s in symbols}) + scores, individual_scores, keys = ( + scores_res["scores"], + scores_res["individual_scores"], + scores_res["keys"], + ) for side in sides: - for key, mult in keys: - raws[side][key] = np.mean([ress[s][f"{key}_{side}"] for s in symbols]) - if (max_key := f"maximum_{key}_{side}") in minsmaxs: - if minsmaxs[max_key] >= 0.0: - ms = [max(minsmaxs[max_key], ress[s][f"{key}_{side}"]) for s in symbols] - means[side][key] = max(minsmaxs[max_key], np.mean(ms)) - else: - means[side][key] = 1.0 - elif (min_key := f"minimum_{key}_{side}") in minsmaxs: - if minsmaxs[min_key] >= 0.0: - ms = [min(minsmaxs[min_key], ress[s][f"{key}_{side}"]) for s in symbols] - means[side][key] = min(minsmaxs[min_key], np.mean(ms)) - else: - means[side][key] = 1.0 - else: - means[side][key] = np.mean([ress[s][f"{key}_{side}"] for s in symbols]) - if mult: - scores[side] *= means[side][key] - else: - scores[side] /= means[side][key] all_scores[-1][side] = { "config": cfg[side], "score": scores[side], - "stats": {s: {k: v for k, v in ress[s].items() if side in k} for s in symbols}, + "individual_scores": individual_scores[side], + "symbols_to_include": scores_res["symbols_to_include"][side], + "stats": {sym: {k: v for k, v in ress[sym].items() if side in k} for sym in symbols}, "config_no": ress["config_no"], } best_candidate = {} @@ -132,7 +110,7 @@ def main(): "short": best_candidate["short"]["config"], } for side in sides: - row_headers = ["symbol"] + [k[0] for k in keys] + row_headers = ["symbol"] + [k[0] for k in keys] + ["score"] table = PrettyTable(row_headers) for rh in row_headers: table.align[rh] = "l" @@ -140,17 +118,29 @@ def main(): f"{side} (config no. {best_candidate[side]['config_no']}," + f" score {round_dynamic(best_candidate[side]['score'], 6)})" ) - for s in sorted( + for sym in sorted( symbols, - key=lambda x: best_candidate[side]["stats"][x][f"adg_realized_per_exposure_{side}"], + key=lambda x: best_candidate[side]["individual_scores"][x], + reverse=True, ): - xs = [best_candidate[side]["stats"][s][f"{k[0]}_{side}"] for k in keys] - table.add_row([s] + [round_dynamic(x, 4) for x in xs]) + xs = [best_candidate[side]["stats"][sym][f"{k[0]}_{side}"] for k in keys] + table.add_row( + [("-> " if sym in best_candidate[side]["symbols_to_include"] else "") + sym] + + [round_dynamic(x, 4) for x in xs] + + [best_candidate[side]["individual_scores"][sym]] + ) means = [ - np.mean([best_candidate[side]["stats"][s_][f"{k[0]}_{side}"] for s_ in symbols]) + np.mean( + [ + best_candidate[side]["stats"][s_][f"{k[0]}_{side}"] + for s_ in symbols + if s_ in best_candidate[side]["symbols_to_include"] + ] + ) for k in keys ] - table.add_row(["mean"] + [round_dynamic(m, 4) for m in means]) + ind_scores_mean = np.mean([best_candidate[side]["individual_scores"][sym] for sym in symbols]) + table.add_row(["mean"] + [round_dynamic(m, 4) for m in means] + [ind_scores_mean]) print(table) live_config = candidate_to_live_config(best_config) if args.dump_live_config: diff --git a/particle_swarm_optimization.py b/particle_swarm_optimization.py index 57a69d763..3fb9ae0c0 100644 --- a/particle_swarm_optimization.py +++ b/particle_swarm_optimization.py @@ -23,6 +23,7 @@ sort_dict_keys, determine_passivbot_mode, get_empty_analysis, + calc_scores, ) from procedures import ( add_argparse_args, @@ -176,45 +177,13 @@ def post_process(self, wi: int): ) if set(results) == set(self.symbols): # completed multisymbol iter - sides = ["long", "short"] - keys = [ - ("adg_realized_per_exposure", True), - ("pa_distance_std", False), - ("pa_distance_mean", False), - ("hrs_stuck_max", False), - ("loss_profit_ratio", False), - ("eqbal_ratio_min", True), - ] - means = {s: {} for s in sides} # adjusted means - scores = {s: -1.0 for s in sides} - raws = {s: {} for s in sides} # unadjusted means - for side in sides: - for key, mult in keys: - raws[side][key] = np.mean([v[f"{key}_{side}"] for v in results.values()]) - if (max_key := f"maximum_{key}_{side}") in self.config: - if self.config[max_key] >= 0.0: - ms = [ - max(self.config[max_key], v[f"{key}_{side}"]) - for v in results.values() - ] - means[side][key] = max(np.mean(ms), self.config[max_key]) - else: - means[side][key] = 1.0 - elif (min_key := f"minimum_{key}_{side}") in self.config: - if self.config[min_key] >= 0.0: - ms = [ - min(self.config[min_key], v[f"{key}_{side}"]) - for v in results.values() - ] - means[side][key] = min(np.mean(ms), self.config[min_key]) - else: - means[side][key] = 1.0 - else: - means[side][key] = np.mean([v[f"{key}_{side}"] for v in results.values()]) - if mult: - scores[side] *= means[side][key] - else: - scores[side] /= means[side][key] + scores_res = calc_scores(self.config, results) + scores, means, raws, keys = ( + scores_res["scores"], + scores_res["means"], + scores_res["raws"], + scores_res["keys"], + ) self.swarm[swarm_key]["long"]["score"] = scores["long"] self.swarm[swarm_key]["short"]["score"] = scores["short"] @@ -703,6 +672,7 @@ async def main(): "maximum_pa_distance_mean_short", "maximum_loss_profit_ratio_long", "maximum_loss_profit_ratio_short", + "clip_threshold", ] if k in config and k not in [z[0] for z in lines] ] diff --git a/passivbot.py b/passivbot.py index 32430065c..7f17a9feb 100644 --- a/passivbot.py +++ b/passivbot.py @@ -125,18 +125,18 @@ def __init__(self, config: dict): self.process_websocket_ticks = True def set_config(self, config): - if "long_mode" not in config: - config["long_mode"] = None - if "short_mode" not in config: - config["short_mode"] = None - if "test_mode" not in config: - config["test_mode"] = False - if "assigned_balance" not in config: - config["assigned_balance"] = None - if "cross_wallet_pct" not in config: - config["cross_wallet_pct"] = 1.0 - if "price_distance_threshold" not in config: - config["price_distance_threshold"] = 0.5 + for k, v in [ + ("long_mode", None), + ("short_mode", None), + ("test_mode", False), + ("assigned_balance", None), + ("cross_wallet_pct", 1.0), + ("price_distance_threshold", 0.5), + ("c_mult", 1.0), + ("leverage", 7.0), + ]: + if k not in config: + config[k] = v self.passivbot_mode = config["passivbot_mode"] = determine_passivbot_mode(config) if config["cross_wallet_pct"] > 1.0 or config["cross_wallet_pct"] <= 0.0: logging.warning( @@ -226,11 +226,13 @@ async def update_open_orders(self) -> None: self.dump_log({"log_type": "open_orders", "data": open_orders}) self.open_orders = open_orders self.error_halt["update_open_orders"] = False + return True except Exception as e: self.error_halt["update_open_orders"] = True logging.error(f"error with update open orders {e}") traceback.print_exc() + return False finally: self.ts_released["update_open_orders"] = time() @@ -301,10 +303,12 @@ async def update_position(self) -> None: self.dump_log({"log_type": "position", "data": position}) self.position = position self.error_halt["update_position"] = False + return True except Exception as e: self.error_halt["update_position"] = True logging.error(f"error with update position {e}") traceback.print_exc() + return False finally: self.ts_released["update_position"] = time() @@ -343,29 +347,14 @@ async def create_orders(self, orders_to_create: [dict]) -> [dict]: return [] self.ts_locked["create_orders"] = time() try: - creations = [] - for oc in sorted(orders_to_create, key=lambda x: calc_diff(x["price"], self.price)): - try: - creations.append((oc, asyncio.create_task(self.execute_order(oc)))) - except Exception as e: - logging.error(f"error creating order a {oc} {e}") - created_orders = [] - for oc, c in creations: - try: - o = await c - created_orders.append(o) - if "side" in o: - logging.info( - f' created order {o["symbol"]} {o["side"]: <4} ' - + f'{o["position_side"]: <5} {o["qty"]} {o["price"]}' - ) - if o["order_id"] not in {x["order_id"] for x in self.open_orders}: - self.open_orders.append(o) - else: - logging.error(f"error creating order b {o} {oc}") - except Exception as e: - logging.error(f"error creating order c {oc} {c.exception()} {e}") - return created_orders + orders = await self.execute_orders(orders_to_create) + for order in sorted(orders, key=lambda x: calc_diff(x["price"], self.price)): + if "side" in order: + logging.info( + f' created order {order["symbol"]} {order["side"]: <4} ' + + f'{order["position_side"]: <5} {order["qty"]} {order["price"]}' + ) + return orders finally: self.ts_released["create_orders"] = time() @@ -381,41 +370,38 @@ async def cancel_orders(self, orders_to_cancel: [dict]) -> [dict]: if o["order_id"] not in oo_ids: oo_ids.add(o["order_id"]) orders_to_cancel_dedup.append(o) - for oc in orders_to_cancel_dedup: - try: - deletions.append((oc, asyncio.create_task(self.execute_cancellation(oc)))) - except Exception as e: - logging.error(f"error cancelling order c {oc} {e}") - cancelled_orders = [] - for oc, c in deletions: - try: - o = await c - cancelled_orders.append(o) - if "order_id" in o: + cancellations = None + try: + cancellations = await self.execute_cancellations(orders_to_cancel_dedup) + for cancellation in cancellations: + if "order_id" in cancellation: logging.info( - f'cancelled order {o["symbol"]} {o["side"]: <4} ' - + f'{o["position_side"]: <5} {o["qty"]} {o["price"]}' + f'cancelled order {cancellation["symbol"]} {cancellation["side"]: <4} ' + + f'{cancellation["position_side"]: <5} {cancellation["qty"]} {cancellation["price"]}' ) self.open_orders = [ - oo for oo in self.open_orders if oo["order_id"] != o["order_id"] + oo + for oo in self.open_orders + if oo["order_id"] != cancellation["order_id"] ] - except Exception as e: - logging.error(f"error cancelling order {oc} {e}") - print_async_exception(c) - return cancelled_orders + return cancellations + except Exception as e: + logging.error(f"error cancelling orders {cancellations} {e}") + print_async_exception(cancellations) + return [] finally: self.ts_released["cancel_orders"] = time() def stop(self, signum=None, frame=None) -> None: logging.info("Stopping passivbot, please wait...") - try: + self.stop_websocket = True + if not self.ohlcv: + try: + self.user_stream_task.cancel() + self.market_stream_task.cancel() - self.stop_websocket = True - self.user_stream_task.cancel() - self.market_stream_task.cancel() - - except Exception as e: - logging.error(f"An error occurred during shutdown: {e}") + except Exception as e: + logging.error(f"An error occurred during shutdown: {e}") def pause(self) -> None: self.process_websocket_ticks = False @@ -730,17 +716,23 @@ async def cancel_and_create(self): ideal_orders = [] all_orders = self.calc_orders() for o in all_orders: - if "ientry" in o["custom_id"] and calc_diff(o["price"], self.price) < 0.002: + if ( + not self.ohlcv + and "ientry" in o["custom_id"] + and calc_diff(o["price"], self.price) < 0.002 + ): # call update_position() before making initial entry orders # in case websocket has failed - logging.info("update_position with REST API before creating initial entries") + logging.info( + f"update_position with REST API before creating initial entries. Last price {self.price}" + ) await self.update_position() all_orders = self.calc_orders() break for o in all_orders: - if any(x in o["custom_id"] for x in ["ientry", "unstuck"]): + if any(x in o["custom_id"] for x in ["ientry", "unstuck"]) and not self.ohlcv: if calc_diff(o["price"], self.price) < 0.01: - # EMA based orders must be closer than 1% of current price + # EMA based orders must be closer than 1% of current price unless ohlcv mode ideal_orders.append(o) else: if calc_diff(o["price"], self.price) < self.price_distance_threshold: @@ -792,17 +784,16 @@ async def cancel_and_create(self): results = [] if to_cancel: - # to avoid building backlog, cancel n+1 orders, create n orders results.append( asyncio.create_task( - self.cancel_orders(to_cancel[: self.n_orders_per_execution + 1]) + self.cancel_orders(to_cancel[: self.max_n_cancellations_per_batch]) ) ) await asyncio.sleep( 0.01 ) # sleep 10 ms between sending cancellations and sending creations if to_create: - results.append(await self.create_orders(to_create[: self.n_orders_per_execution])) + results.append(await self.create_orders(to_create[: self.max_n_orders_per_batch])) return results finally: await asyncio.sleep(self.delay_between_executions) # sleep before releasing lock @@ -825,23 +816,26 @@ async def on_market_stream_event(self, ticks: [dict]): await asyncio.gather(self.update_position(), self.update_open_orders()) if now - self.heartbeat_ts > self.heartbeat_interval_seconds: # print heartbeat once an hour - logging.info(f"heartbeat {self.symbol}") - self.log_position_long() - self.log_position_short() - liq_price = self.position["long"]["liquidation_price"] - if calc_diff(self.position["short"]["liquidation_price"], self.price) < calc_diff( - liq_price, self.price - ): - liq_price = self.position["short"]["liquidation_price"] - logging.info( - f'balance: {round_dynamic(self.position["wallet_balance"], 6)}' - + f' equity: {round_dynamic(self.position["equity"], 6)} last price: {self.price}' - + f" liq: {round_(liq_price, self.price_step)}" - ) + self.heartbeat_print() self.heartbeat_ts = time() await self.cancel_and_create() - def log_position_long(self): + def heartbeat_print(self): + logging.info(f"heartbeat {self.symbol}") + self.log_position_long() + self.log_position_short() + liq_price = self.position["long"]["liquidation_price"] + if calc_diff(self.position["short"]["liquidation_price"], self.price) < calc_diff( + liq_price, self.price + ): + liq_price = self.position["short"]["liquidation_price"] + logging.info( + f'balance: {round_dynamic(self.position["wallet_balance"], 6)}' + + f' equity: {round_dynamic(self.position["equity"], 6)} last price: {self.price}' + + f" liq: {round_(liq_price, self.price_step)}" + ) + + def log_position_long(self, prev_pos=None): closes_long = sorted( [o for o in self.open_orders if o["side"] == "sell" and o["position_side"] == "long"], key=lambda x: x["price"], @@ -856,16 +850,25 @@ def log_position_long(self): lcqty, lcprice = ( (closes_long[0]["qty"], closes_long[0]["price"]) if closes_long else (0.0, 0.0) ) + prev_pos_line = ( + ( + f'long: {prev_pos["long"]["size"]} @' + + f' {round_(prev_pos["long"]["price"], self.price_step)} -> ' + ) + if prev_pos + else "" + ) logging.info( - f'long: {self.position["long"]["size"]} @' - + f' {round_dynamic(self.position["long"]["price"], 5)}' + prev_pos_line + + f'long: {self.position["long"]["size"]} @' + + f' {round_(self.position["long"]["price"], self.price_step)}' + f' lWE: {self.position["long"]["wallet_exposure"]:.4f}' + f' pprc diff {self.position["long"]["price"] / self.price - 1:.3f}' + f" EMAs: {[round_dynamic(e, 5) for e in self.emas_long]}" + f" e {leqty} @ {leprice} | c {lcqty} @ {lcprice}" ) - def log_position_short(self): + def log_position_short(self, prev_pos=None): closes_short = sorted( [o for o in self.open_orders if o["side"] == "buy" and o["position_side"] == "short"], key=lambda x: x["price"], @@ -885,9 +888,18 @@ def log_position_short(self): if self.position["short"]["price"] != 0.0 else 1.0 ) + prev_pos_line = ( + ( + f'short: {prev_pos["short"]["size"]} @' + + f' {round_(prev_pos["short"]["price"], self.price_step)} -> ' + ) + if prev_pos + else "" + ) logging.info( - f'short: {self.position["short"]["size"]} @' - + f' {round_dynamic(self.position["short"]["price"], 5)}' + prev_pos_line + + f'short: {self.position["short"]["size"]} @' + + f' {round_(self.position["short"]["price"], self.price_step)}' + f' sWE: {self.position["short"]["wallet_exposure"]:.4f}' + f" pprc diff {pprice_diff:.3f}" + f" EMAs: {[round_dynamic(e, 5) for e in self.emas_short]}" @@ -1056,17 +1068,80 @@ async def subscribe_to_market_stream(self, ws): async def subscribe_to_user_stream(self, ws): pass + async def start_ohlcv_mode(self): + await asyncio.gather(self.update_position(), self.update_open_orders()) + await self.init_exchange_config() + await self.init_order_book() + await self.init_emas() + logging.info("starting bot...") + while True: + now = time() + # print('secs until next', ((now + 60) - now % 60) - now) + while int(now) % 60 != 0: + if self.stop_websocket: + break + await asyncio.sleep(0.5) + now = time() + print( + f"\rcountdown: {((now + 60) - now % 60) - now:.1f} last price: {self.price} ", + end=" ", + ) + if self.stop_websocket: + break + await self.on_minute_mark() + await asyncio.sleep(1.0) -async def start_bot(bot): - while not bot.stop_websocket: + async def on_minute_mark(self): + # called each whole minute try: - await bot.start_websocket() - except Exception as e: - logging.warning( - "Websocket connection has been lost, attempting to reinitialize the bot... {e}", + print("\r", end="") + if time() - self.heartbeat_ts > self.heartbeat_interval_seconds: + # print heartbeat once an hour + self.heartbeat_print() + self.heartbeat_ts = time() + self.prev_price = self.ob[0] + prev_pos = self.position.copy() + res = await asyncio.gather( + self.update_position(), self.update_open_orders(), self.init_order_book() ) + # TODO catch when res != [True, True, True] + self.update_emas(self.ob[0], self.prev_price) + await self.cancel_and_create() + if prev_pos["wallet_balance"] != self.position["wallet_balance"]: + logging.info( + f"balance: {round_dynamic(prev_pos['wallet_balance'], 7)}" + + f" -> {round_dynamic(self.position['wallet_balance'], 7)}" + ) + if prev_pos["long"]["size"] != self.position["long"]["size"]: + plp = prev_pos["long"]["size"], round_(prev_pos["long"]["price"], self.price_step) + clp = self.position["long"]["size"], round_( + self.position["long"]["price"], self.price_step + ) + self.log_position_long(prev_pos) + if prev_pos["short"]["size"] != self.position["short"]["size"]: + psp = prev_pos["short"]["size"], round_(prev_pos["short"]["price"], self.price_step) + csp = self.position["short"]["size"], round_( + self.position["short"]["price"], self.price_step + ) + self.log_position_short(prev_pos) + except Exception as e: + logging.error(f"error on minute mark {e}") traceback.print_exc() - await asyncio.sleep(10) + + +async def start_bot(bot): + if bot.ohlcv: + await bot.start_ohlcv_mode() + else: + while not bot.stop_websocket: + try: + await bot.start_websocket() + except Exception as e: + logging.warning( + "Websocket connection has been lost, attempting to reinitialize the bot... {e}", + ) + traceback.print_exc() + await asyncio.sleep(10) async def main() -> None: @@ -1158,6 +1233,12 @@ async def main() -> None: action="store_true", help=f"if true, run on the test net instead of normal exchange. Supported exchanges: {TEST_MODE_SUPPORTED_EXCHANGES}", ) + parser.add_argument( + "-oh", + "--ohlcv", + action="store_true", + help=f"if true, execute to exchange only on each minute mark instead of continuously", + ) float_kwargs = [ ("-lmm", "--long_min_markup", "--long-min-markup", "long_min_markup"), @@ -1201,9 +1282,16 @@ async def main() -> None: logging.error(f"{e} failed to load config {args.live_config_path}") return config["exchange"] = exchange - for k in ["user", "api_keys", "symbol", "leverage", "price_distance_threshold"]: + for k in [ + "user", + "api_keys", + "symbol", + "leverage", + "price_distance_threshold", + "ohlcv", + "test_mode", + ]: config[k] = getattr(args, k) - config["test_mode"] = args.test_mode if config["test_mode"] and config["exchange"] not in TEST_MODE_SUPPORTED_EXCHANGES: raise IOError(f"Exchange {config['exchange']} is not supported in test mode.") config["market_type"] = args.market_type if args.market_type is not None else "futures" @@ -1282,6 +1370,7 @@ async def main() -> None: if "spot" in config["market_type"]: config = spotify_config(config) + logging.info(f"using config \n{config_pretty_str(denumpyize(config))}") if config["exchange"] == "binance": if "spot" in config["market_type"]: @@ -1308,7 +1397,11 @@ async def main() -> None: else: raise Exception("unknown exchange", config["exchange"]) - logging.info(f"using config \n{config_pretty_str(denumpyize(config))}") + if args.ohlcv: + logging.info( + "starting passivbot in ohlcv mode, using REST API only and updating once a minute" + ) + signal.signal(signal.SIGINT, bot.stop) signal.signal(signal.SIGTERM, bot.stop) await start_bot(bot) diff --git a/pure_funcs.py b/pure_funcs.py index a502211d8..5addb7663 100644 --- a/pure_funcs.py +++ b/pure_funcs.py @@ -285,6 +285,15 @@ def candidate_to_live_config(candidate_: dict) -> dict: n_days = (date_to_ts(result_dict["end_date"]) - date_to_ts(result_dict["start_date"])) / ( 1000 * 60 * 60 * 24 ) + elif "config_name" in candidate and "days" in candidate["config_name"]: + try: + cn = candidate["config_name"] + for i in range(len(cn) - 1, -1, -1): + if cn[i] == "_": + break + n_days = int(cn[i + 1 : cn.find("days")]) + except: + n_days = 0 else: n_days = 0 name += f"_{n_days:.0f}days" @@ -1127,3 +1136,67 @@ def strip_config(cfg: dict) -> dict: template["long"][k] = cfg["long"][k] template["short"][k] = cfg["short"][k] return template + + +def calc_scores(config: dict, results: dict): + sides = ["long", "short"] + keys = [ + ("adg_realized_per_exposure", True), + ("pa_distance_std", False), + ("pa_distance_mean", False), + ("hrs_stuck_max", False), + ("loss_profit_ratio", False), + ("eqbal_ratio_min", True), + ] + means = {side: {} for side in sides} # adjusted means + scores = {side: -1.0 for side in sides} + raws = {side: {} for side in sides} # unadjusted means + individual_raws = {side: {sym: {} for sym in results} for side in sides} + individual_vals = {side: {sym: {} for sym in results} for side in sides} + individual_scores = {side: {sym: -1.0 for sym in results} for side in sides} + symbols_to_include = {side: [] for side in sides} + for side in sides: + for sym in results: + for key, mult in keys: + individual_raws[side][sym][key] = results[sym][f"{key}_{side}"] + if (max_key := f"maximum_{key}_{side}") in config: + if config[max_key] >= 0.0: + val = max(config[max_key], results[sym][f"{key}_{side}"]) + else: + val = 1.0 + elif (min_key := f"minimum_{key}_{side}") in config: + if config[min_key] >= 0.0: + val = min(config[min_key], results[sym][f"{key}_{side}"]) + else: + val = 1.0 + else: + val = results[sym][f"{key}_{side}"] + individual_vals[side][sym][key] = val + if mult: + individual_scores[side][sym] *= val + else: + individual_scores[side][sym] /= val + raws[side] = { + key: np.mean([individual_raws[side][sym][key] for sym in results]) for key, _ in keys + } + symbols_to_include[side] = sorted( + individual_scores[side], key=lambda x: individual_scores[side][x] + )[: int(len(individual_scores[side]) * (1 - config["clip_threshold"]))] + # print(symbols_to_include, individual_scores[side], config["clip_threshold"]) + means[side] = { + key: np.mean([individual_vals[side][sym][key] for sym in symbols_to_include[side]]) + for key, _ in keys + } + for key, mult in keys: + if mult: + scores[side] *= means[side][key] + else: + scores[side] /= means[side][key] + return { + "scores": scores, + "means": means, + "raws": raws, + "individual_scores": individual_scores, + "keys": keys, + "symbols_to_include": symbols_to_include, + }