Skip to content

Commit

Permalink
feat: use weakref on calls (#187)
Browse files Browse the repository at this point in the history
* feat: use weakref on calls

* feat: optimize eth_call semaphore access

* Update _requests.py

* fix: use weakref.proxy instead of weakref.ref

* fix: weakrefs and slots conflict

* fix: cannot proxy a proxy

* fix: x not in list

* fix: rematerialize proxy
  • Loading branch information
BobTheBuidler committed Apr 26, 2024
1 parent 35543d6 commit 87235bc
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
30 changes: 19 additions & 11 deletions dank_mids/_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import asyncio
import logging
import time
import weakref
from collections import defaultdict
from concurrent.futures.process import BrokenProcessPool
from contextlib import suppress
from functools import cached_property, lru_cache
from functools import cached_property, lru_cache, partial
from typing import (TYPE_CHECKING, Any, DefaultDict, Dict, Generator, Generic,
Iterable, Iterator, List, NoReturn, Optional, Tuple,
TypeVar, Union)
Expand Down Expand Up @@ -49,7 +50,7 @@
class _RequestEvent(a_sync.Event):
def __init__(self, owner: "_RequestMeta") -> None:
super().__init__(debug_daemon_interval=300)
self._owner = owner
self._owner = weakref.proxy(owner)
def __repr__(self) -> str:
return f"<{self.__class__.__name__} object at {hex(id(self))} [{'set' if self.is_set() else 'unset'}, waiter:{self._owner}>"
def set(self):
Expand All @@ -60,7 +61,7 @@ def set(self):
self._loop.call_soon_threadsafe(super().set)

class _RequestMeta(Generic[_Response], metaclass=abc.ABCMeta):
__slots__ = 'controller', 'uid', '_response', '_done', '_start', '_batch'
__slots__ = 'controller', 'uid', '_response', '_done', '_start', '_batch', '__weakref__'
controller: "DankMiddlewareController"
def __init__(self) -> None:
self.uid = self.controller.call_uid.next
Expand Down Expand Up @@ -108,7 +109,7 @@ class RPCRequest(_RequestMeta[RawResponse]):
_types = set()

def __init__(self, controller: "DankMiddlewareController", method: RPCEndpoint, params: Any, retry: bool = False):
self.controller = controller
self.controller = weakref.proxy(controller)
self.method = method
self.params = params
self.should_batch = _should_batch_method(method)
Expand Down Expand Up @@ -350,15 +351,14 @@ def semaphore(self) -> a_sync.Semaphore:
_Request = TypeVar("_Request")

class _Batch(_RequestMeta[List[RPCResponse]], Iterable[_Request]):
__slots__ = 'calls', '_fut', '_lock', '_daemon'
_fut = None
__slots__ = 'calls', '_lock', '_daemon'
calls: List[_Request]

def __init__(self, controller: "DankMiddlewareController", calls: Iterable[_Request]):
self.controller = controller
self.calls = list(calls) # type: ignore
self._fut = None
self.controller = weakref.proxy(controller)
self.calls = [weakref.proxy(call, callback=self._remove) for call in calls]
self._lock = _AlertingRLock(name=self.__class__.__name__)
#self._len = 0
super().__init__()

def __bool__(self) -> bool:
Expand Down Expand Up @@ -432,6 +432,12 @@ def should_retry(self, e: Exception) -> bool:
elif "429" not in f"{e}":
logger.warning(f"unexpected {e.__class__.__name__}: {e}")
return len(self) > 1

def _remove(self, proxy: weakref.CallableProxyType) -> None:
try:
self.calls.remove(proxy)
except ValueError:
pass

mcall_encoder = abi.default_codec._registry.get_encoder("(bool,(address,bytes)[])")
mcall_decoder = abi.default_codec._registry.get_decoder("(uint256,uint256,(bool,bytes)[])")
Expand Down Expand Up @@ -828,7 +834,9 @@ def adjust_batch_size(self) -> None:

def _post_future_cleanup(self) -> None:
with self.controller.pools_closed_lock:
self.controller.pending_rpc_calls = JSONRPCBatch(self.controller)
# a hacky way to get the real controller obj from the weak reference proxy
controller = self.controller.__repr__.__self__
self.controller.pending_rpc_calls = JSONRPCBatch(controller)

def _log_exception(e: Exception) -> bool:
# NOTE: These errors are expected during normal use and are not indicative of any problem(s). No need to log them.
Expand All @@ -849,4 +857,4 @@ def _log_exception(e: Exception) -> bool:
return ENVS.DEBUG # type: ignore [attr-defined]
logger.warning("The following exception is being logged for informational purposes and does not indicate failure:")
logger.warning(e, exc_info=True)
return ENVS.DEBUG # type: ignore [attr-defined]
return ENVS.DEBUG # type: ignore [attr-defined]
6 changes: 3 additions & 3 deletions dank_mids/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def __init__(self, w3: Web3) -> None:
if self.mc3:
self.no_multicall.add(self.mc3.address)

self.method_semaphores = _MethodSemaphores(self)
self.eth_call_semaphores = _MethodSemaphores(self)["eth_call"] # TODO: refactor this out
# semaphores soon to be deprecated for smart queue
self.method_queues = _MethodQueues(self)
self.batcher = NotSoBrightBatcher()
Expand Down Expand Up @@ -141,7 +141,7 @@ async def __call__(self, method: RPCEndpoint, params: Any) -> RPCResponse:
logger.debug(f'making {self.request_type.__name__} {method} with params {params}')
if method != "eth_call":
return await RPCRequest(self, method, params)
async with self.method_semaphores[method][params[1]]:
async with self.eth_call_semaphores[params[1]]:
if params[0]["to"] not in self.no_multicall:
return await eth_call(self, params)
return await RPCRequest(self, method, params)
Expand Down Expand Up @@ -249,4 +249,4 @@ def needs_override_code_for_block(self, block: BlockId) -> bool:
return block < self.deploy_block

def __hash__(self) -> int:
return hash(self.address)
return hash(self.address)

0 comments on commit 87235bc

Please sign in to comment.