Skip to content

Commit f76c4b8

Browse files
fix: threadApiManager (#1449)
* fix theadedapimanager * pr comments * add test * rename test * add test_ticker_socket * get loop as dependenc yinjection --------- Co-authored-by: carlosmiei <[email protected]>
1 parent 2ce30ca commit f76c4b8

File tree

4 files changed

+63
-4
lines changed

4 files changed

+63
-4
lines changed

binance/streams.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,9 +1306,10 @@ class ThreadedWebsocketManager(ThreadedApiManager):
13061306
def __init__(
13071307
self, api_key: Optional[str] = None, api_secret: Optional[str] = None,
13081308
requests_params: Optional[Dict[str, Any]] = None, tld: str = 'com',
1309-
testnet: bool = False, session_params: Optional[Dict[str, Any]] = None
1309+
testnet: bool = False, session_params: Optional[Dict[str, Any]] = None,
1310+
loop: Optional[asyncio.AbstractEventLoop] = None
13101311
):
1311-
super().__init__(api_key, api_secret, requests_params, tld, testnet, session_params)
1312+
super().__init__(api_key, api_secret, requests_params, tld, testnet, session_params, loop)
13121313
self._bsm: Optional[BinanceSocketManager] = None
13131314

13141315
async def _before_socket_listener_start(self):

binance/threaded_stream.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ class ThreadedApiManager(threading.Thread):
1111
def __init__(
1212
self, api_key: Optional[str] = None, api_secret: Optional[str] = None,
1313
requests_params: Optional[Dict[str, Any]] = None, tld: str = 'com',
14-
testnet: bool = False, session_params: Optional[Dict[str, Any]] = None
14+
testnet: bool = False, session_params: Optional[Dict[str, Any]] = None,
15+
_loop: Optional[asyncio.AbstractEventLoop] = None
1516
):
1617
"""Initialise the BinanceSocketManager
1718
1819
"""
1920
super().__init__()
20-
self._loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() if asyncio.get_event_loop().is_running() else asyncio.new_event_loop()
21+
self._loop: asyncio.AbstractEventLoop = get_loop() if _loop is None else _loop
2122
self._client: Optional[AsyncClient] = None
2223
self._running: bool = True
2324
self._socket_running: Dict[str, bool] = {}

tests/test_socket_manager.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from binance import BinanceSocketManager, AsyncClient
2+
import pytest
3+
4+
5+
def assert_message(msg):
6+
assert msg['stream'] == '!ticker@arr'
7+
assert len(msg['data']) > 0
8+
9+
@pytest.mark.asyncio()
10+
async def test_ticker_socket():
11+
client = await AsyncClient.create(testnet=True)
12+
bm = BinanceSocketManager(client)
13+
14+
ts = bm.futures_ticker_socket()
15+
16+
async with ts as tscm:
17+
try:
18+
res = await tscm.recv()
19+
assert_message(res)
20+
except Exception as e:
21+
print(f"An error occurred: {e}")
22+
23+
await client.close_connection()
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
2+
3+
from binance import ThreadedWebsocketManager
4+
5+
6+
received_ohlcv = False
7+
received_depth = False
8+
9+
twm: ThreadedWebsocketManager
10+
11+
def handle_socket_message(msg):
12+
global received_ohlcv, received_depth
13+
print(msg)
14+
if 'e' in msg:
15+
if msg['e'] == 'depthUpdate':
16+
received_depth = True
17+
if msg['e'] == 'kline':
18+
received_ohlcv = True
19+
if received_depth and received_ohlcv:
20+
twm.stop()
21+
22+
def test_threaded_socket_manager():
23+
global twm
24+
twm = ThreadedWebsocketManager(api_key="", api_secret="", testnet=True)
25+
26+
symbol = "BTCUSDT"
27+
28+
twm.start()
29+
30+
twm.start_kline_socket(callback=handle_socket_message, symbol=symbol)
31+
32+
twm.start_depth_socket(callback=handle_socket_message, symbol=symbol)
33+
34+
twm.join()

0 commit comments

Comments
 (0)