|
35 | 35 | from dipdup.pysignalr import WebsocketMessage
|
36 | 36 | from dipdup.pysignalr import WebsocketProtocol
|
37 | 37 | from dipdup.pysignalr import WebsocketTransport
|
| 38 | +from dipdup.utils import Watchdog |
38 | 39 |
|
39 | 40 | WEB3_CACHE_SIZE = 256
|
40 | 41 |
|
@@ -66,6 +67,7 @@ def __init__(self, config: EvmNodeDatasourceConfig, merge_subscriptions: bool =
|
66 | 67 | self._subscription_ids: dict[str, EvmNodeSubscription] = {}
|
67 | 68 | self._logs_queue: Queue[dict[str, Any]] = Queue()
|
68 | 69 | self._heads: defaultdict[int, NodeHead] = defaultdict(NodeHead)
|
| 70 | + self._watchdog: Watchdog = Watchdog(self._http_config.connection_timeout) |
69 | 71 |
|
70 | 72 | self._on_connected_callbacks: set[EmptyCallback] = set()
|
71 | 73 | self._on_disconnected_callbacks: set[EmptyCallback] = set()
|
@@ -109,14 +111,22 @@ async def run(self) -> None:
|
109 | 111 | await asyncio.gather(
|
110 | 112 | self._ws_loop(),
|
111 | 113 | self._log_processor_loop(),
|
| 114 | + self._watchdog.run(), |
112 | 115 | )
|
113 | 116 |
|
114 | 117 | async def _log_processor_loop(self) -> None:
|
115 | 118 | while True:
|
116 | 119 | log_json = await self._logs_queue.get()
|
117 | 120 | level = int(log_json['blockNumber'], 16)
|
118 | 121 |
|
119 |
| - await self._heads[level].event.wait() |
| 122 | + try: |
| 123 | + await asyncio.wait_for( |
| 124 | + self._heads[level].event.wait(), |
| 125 | + timeout=self._http_config.connection_timeout, |
| 126 | + ) |
| 127 | + except asyncio.TimeoutError as e: |
| 128 | + msg = f'Head for level {level} not received in {self._http_config.connection_timeout} seconds' |
| 129 | + raise FrameworkException(msg) from e |
120 | 130 | timestamp = self._heads[level].timestamp
|
121 | 131 | if timestamp is None:
|
122 | 132 | raise FrameworkException('Head received but timestamp is None')
|
@@ -275,6 +285,7 @@ async def _on_message(self, message: Message) -> None:
|
275 | 285 | raise FrameworkException(f'Unknown message type: {type(message)}')
|
276 | 286 |
|
277 | 287 | data = message.data
|
| 288 | + self._watchdog.reset() |
278 | 289 |
|
279 | 290 | if 'id' in data:
|
280 | 291 | request_id = data['id']
|
|
0 commit comments