Skip to content

Commit

Permalink
feat: a lot, fix: a lot - no more hanging! (#73)
Browse files Browse the repository at this point in the history
* feat: better optimized distribution of load to node

* feat: tenderly warning

* feat: use msgspec with type hints for json decoding

* feat: JIT decoding

* feat: use msgspec encoder

* feat: PartialResponse class to skip unnecessary json decoding

* feat: bypass encoding of non-required `jsonrpc` field

* feat: decode in subprocess to unblock event loop

* feat: use subprocesses for brownie patch en/decoding

* feat: basic stats

* feat: ENVIRONMENT_VARIABLES file

* feat: logic for reducing jsonrpc batch size like mcall batch size

* feat: lift brownie call response decoding out of brownie call semaphore

* feat: BlockSemaphore

* feat: pre encode brownie calls

* feat: emergency exit for timed out calls - 60s default

* feat: rate limit calls to node, see if this helps with conn errs

* fix: hanging behavior
  • Loading branch information
BobTheBuidler authored Jul 17, 2023
1 parent f44a44a commit 6a6be52
Show file tree
Hide file tree
Showing 32 changed files with 1,723 additions and 746 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ dank_mids.egg-info
.pytest_cache
.eggs
.mypy_cache
__pycache__
__pycache__
just-leave-me-here-and-dont-touch-me-plz.log
85 changes: 85 additions & 0 deletions dank_mids/ENVIRONMENT_VARIABLES.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@

import logging

import a_sync
import typed_envs
from a_sync import AsyncProcessPoolExecutor

from dank_mids import _envs
from dank_mids._mode import OperationMode
from dank_mids.semaphores import BlockSemaphore

logger = logging.getLogger("dank_mids.envs")

if not typed_envs.logger.disabled:
logger.info("For your information, you can tweak your configuration for optimal performance using any of the envs below:")

###############
# ENVIRONMENT #
###############

# What mode should dank mids operate in?
# NOTE: infura mode is required for now
# TODO: fix the other modes, set default='default', and make this verbose again
OPERATION_MODE = _envs.create_env("OPERATION_MODE", OperationMode, default="infura", verbose=False)

# Max number of rpc calls to include in one batch call
MAX_JSONRPC_BATCH_SIZE = _envs.create_env("MAX_JSONRPC_BATCH_SIZE", int, default=500)

# Enable Demo Mode?
demo_mode = _envs._deprecated_format.create_env("DEMO_MODE", bool, default=False, verbose=False)
DEMO_MODE = _envs.create_env("DEMO_MODE", bool, default=demo_mode, verbose=False)

# Are you calling a ganache fork? Can't use state override code
ganache_fork = _envs._deprecated_format.create_env("GANACHE_FORK", bool, default=False, verbose=False)
GANACHE_FORK = _envs.create_env("GANACHE_FORK", bool, default=ganache_fork)

# We set the default to 20 minutes to account for potentially long event loop times if you're doing serious work.
AIOHTTP_TIMEOUT = _envs.create_env("AIOHTTP_TIMEOUT", int, default=20*60, string_converter=int)

# Brownie call Semaphore
# Used because I experienced some OOM errs due to web3 formatters when I was batching an absurd number of brownie calls.
# We need a separate semaphore here because the method-specific semaphores are too late in the code to prevent this OOM issue.
brownie_semaphore = _envs._deprecated_format.create_env("BROWNIE_CALL_SEMAPHORE", int, default=100_000, string_converter=int, verbose=False)
BROWNIE_CALL_SEMAPHORE = _envs.create_env("BROWNIE_CALL_SEMAPHORE", BlockSemaphore, default=brownie_semaphore, string_converter=int, verbose=not OPERATION_MODE.infura)
BROWNIE_ENCODER_SEMAPHORE = _envs.create_env("BROWNIE_ENCODER_SEMAPHORE", BlockSemaphore, default=BROWNIE_CALL_SEMAPHORE._default_value * 2, string_converter=int, verbose=not OPERATION_MODE.infura)

# Processes for decoding. This determines process pool size, not total subprocess count.
# There are 3 pools, each initialized with the same value.
# NOTE: Don't stress, these are good for you and will not hog your cpu. You can disable them by setting the var = 0. #TODO: lol u cant yet
BROWNIE_ENCODER_PROCESSES = _envs.create_env("BROWNIE_ENCODER_PROCESSES", AsyncProcessPoolExecutor, default=0 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura)
BROWNIE_DECODER_PROCESSES = _envs.create_env("BROWNIE_DECODER_PROCESSES", AsyncProcessPoolExecutor, default=0 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura)
MULTICALL_DECODER_PROCESSES = _envs.create_env("MULTICALL_DECODER_PROCESSES", AsyncProcessPoolExecutor, default=0 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura)

# NOTE: EXPORT_STATS is not implemented
# TODO: implement this
EXPORT_STATS = _envs.create_env("EXPORT_STATS", bool, default=False, verbose=False)
# NOTE: COLLECT_STATS is implemented
COLLECT_STATS = _envs.create_env("COLLECT_STATS", bool, default=EXPORT_STATS, verbose=not EXPORT_STATS)

# You probably don't need to use this unless you know you need to
STUCK_CALL_TIMEOUT = _envs.create_env("STUCK_CALL_TIMEOUT", int, default=60)

# Method-specific Semaphores
method_semaphores = {
"eth_call": _envs.create_env("ETH_CALL_SEMAPHORE", BlockSemaphore, default=BROWNIE_CALL_SEMAPHORE._value, string_converter=int),
"eth_getBlock": _envs.create_env("ETH_GETBLOCK_SEMAPHORE", a_sync.Semaphore, default=50, string_converter=int),
"eth_getLogs": _envs.create_env("ETH_GETLOGS_SEMAPHORE", a_sync.Semaphore, default=64, string_converter=int),
"eth_getTransaction": _envs.create_env("ETH_GETTRANSACTION_SEMAPHORE", a_sync.Semaphore, default=100, string_converter=int),
}

if not typed_envs.logger.disabled:
logger.info("More details can be found in dank_mids/ENVIRONMENT_VARIABLES.py")
logger.info("NOTE: You can disable these logs by setting the `TYPEDENVS_SHUTUP` env var to any value.")


# Validate some stuffs

# NOTE: The other modes are (probably) bugging out right now. More investigation needed. For now you use infura mode.
if not OPERATION_MODE.infura:
raise ValueError("Dank mids must be run in infura mode for now")

if OPERATION_MODE.infura:
for process_pool in {MULTICALL_DECODER_PROCESSES, BROWNIE_DECODER_PROCESSES, BROWNIE_ENCODER_PROCESSES}:
if process_pool._max_workers:
raise ValueError(f"You cannot set env var {process_pool.name} while running dank in infura mode.")
11 changes: 11 additions & 0 deletions dank_mids/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@



from dank_mids._how_is_this_real import _the_most_absurd_fix_youve_ever_seen
from dank_mids.controller import instances
from dank_mids.helpers import setup_dank_w3, setup_dank_w3_from_sync
from dank_mids.middleware import dank_middleware


def _configure_concurrent_future_work_queue_size():
import concurrent.futures.process as _cfp
_cfp.EXTRA_QUEUED_CALLS = 50_000

_configure_concurrent_future_work_queue_size()
_the_most_absurd_fix_youve_ever_seen()
35 changes: 0 additions & 35 deletions dank_mids/_config.py

This file was deleted.

4 changes: 2 additions & 2 deletions dank_mids/_demo_mode.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import logging
from typing import Any

from dank_mids._config import DEMO_MODE
from dank_mids import ENVIRONMENT_VARIABLES


class DummyLogger:
""" Replace a `logging.Logger` object with a dummy to save precious time """
def info(self, *args: Any, **kwargs: Any) -> None:
...

demo_logger = logging.getLogger("dank_mids.demo") if DEMO_MODE else DummyLogger()
demo_logger = logging.getLogger("dank_mids.demo") if ENVIRONMENT_VARIABLES.DEMO_MODE else DummyLogger()
8 changes: 8 additions & 0 deletions dank_mids/_envs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

from typed_envs import EnvVarFactory

_factory = EnvVarFactory("DANKMIDS")
create_env = _factory.create_env

# This only applies to the oldest of dank's envs
_deprecated_format = EnvVarFactory("DANK_MIDS")
45 changes: 45 additions & 0 deletions dank_mids/_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@

from typing import TYPE_CHECKING, Union
import logging

from aiohttp.client_exceptions import ClientResponseError

if TYPE_CHECKING:
from dank_mids.types import PartialRequest, PartialResponse


logger = logging.getLogger("dank_mids.exceptions")

class BadResponse(ValueError):
def __init__(self, response: "PartialResponse") -> None:
self.response = response
super().__init__(response.to_dict())

class EmptyBatch(ValueError):
pass

class ResponseNotReady(ValueError):
pass

class PayloadTooLarge(BadResponse):
pass

class DankMidsClientResponseError(ClientResponseError):
"""A wrapper around the standard aiohttp ClientResponseError that attaches the request that generated the error."""
def __init__(
self,
exc: ClientResponseError,
request: "PartialRequest",
) -> None:
self.request = request
super().__init__(exc.request_info, exc.history, code=exc.code, status=exc.status, message=exc.message, headers=exc.headers)
self.args = (*exc.request_info, exc.history, request)
self._exception = exc

internal_err_types = Union[AttributeError, TypeError, UnboundLocalError, NotImplementedError, RuntimeError, SyntaxError]

class DankMidsInternalError(Exception):
def __init__(self, e: internal_err_types) -> None:
logger.warning(f"unhandled exception inside dank mids internals: {e}", exc_info=True)
self._original_exception = e
super().__init__(e.__repr__())
37 changes: 37 additions & 0 deletions dank_mids/_how_is_this_real.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

import logging
from logging.handlers import RotatingFileHandler

LOGS_PATH = 'just-leave-me-here-and-dont-touch-me-plz.log'
ONE_KB = 1024

silencer = RotatingFileHandler(LOGS_PATH, maxBytes=500*ONE_KB, backupCount=0)

def _the_most_absurd_fix_youve_ever_seen():
"""
Somehow this resolves a race condition enough for the library to work as intended.
How? I have no idea at all. Please don't ask me, its magic.
I'll debug this later once I've pooshed a stable release to prod.
"""
enable_logger_but_divert_stream('a_sync.abstract')
enable_logger_but_divert_stream('a_sync.base')
enable_logger_but_divert_stream('dank_mids.controller')
enable_logger_but_divert_stream('dank_mids.session')
enable_logger_but_divert_stream('dank_mids.semaphores')

def enable_logger_but_divert_stream(name, show_every_x=None):
logger = logging.getLogger(name)
if logging.root.isEnabledFor(logging.DEBUG) or logger.isEnabledFor(logging.DEBUG):
# We don't need to do anything, the user has indicated they want the debug logs enabled and shown
return
# Break the logger apart from the root logger and its handlers
logger.propagate = False
# Enable the logger regardless of root settings
logger.setLevel(logging.DEBUG)
# remove the root handler that was added at basicConfig step (not sure if this is actually necessary)
logger.removeHandler(logging.StreamHandler())
# ensure the logger has no handlers
assert logger.handlers == [], logger.handlers
# add the silencer handler to direct the logs to a throwaway file
logger.addHandler(silencer)

22 changes: 22 additions & 0 deletions dank_mids/_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

from functools import cached_property

MODES = ["default", "application", "infura"]

class OperationMode(str):
@cached_property
def application(self) -> bool:
# This mode keeps the event loop as unblocked as possible so an asyncio application can run as designed
return self.mode == "application"
@cached_property
def default(self) -> bool:
return self.mode == "default"
@cached_property
def infura(self) -> bool:
# This mode minimizes the total number of calls sent to the node
return self.mode == "infura"
@property
def mode(self) -> str:
if self not in MODES:
raise ValueError(f'dank mids operation mode {self} is invalid', f'valid modes: {MODES}', str(self))
return self
77 changes: 77 additions & 0 deletions dank_mids/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@

import asyncio
import logging
from typing import TYPE_CHECKING, Any, Generator, List

from dank_mids._exceptions import DankMidsInternalError
from dank_mids.requests import JSONRPCBatch, RPCRequest, _Batch
from dank_mids.types import Multicalls

if TYPE_CHECKING:
from dank_mids.controller import DankMiddlewareController

MIN_SIZE = 1 # TODO: Play with this
CHECK = MIN_SIZE - 1

logger = logging.getLogger(__name__)

class DankBatch:
""" A batch of jsonrpc batches. This is pretty much deprecated and needs to be refactored away."""
def __init__(self, controller: "DankMiddlewareController", multicalls: Multicalls, rpc_calls: List[RPCRequest]):
self.controller = controller
self.multicalls = multicalls
self.rpc_calls = rpc_calls
self._started = False

def __await__(self) -> Generator[Any, None, Any]:
self.start()
return self._await().__await__()

async def _await(self) -> None:
batches = tuple(self.coroutines)
for batch, result in zip(batches, await asyncio.gather(*batches, return_exceptions=True)):
if isinstance(result, Exception):
if not isinstance(result, DankMidsInternalError):
logger.error(f"That's not good, there was an exception in a {batch.__class__.__name__}. These are supposed to be handled.\n{result}\n", exc_info=True)
raise result

def start(self) -> None:
for mcall in self.multicalls.values():
mcall.start(self, cleanup=False)
for call in self.rpc_calls:
call.start(self)
self._started = True

@property
def coroutines(self) -> Generator["_Batch", None, None]:
# Combine multicalls into one or more jsonrpc batches

# Create empty batch
working_batch = JSONRPCBatch(self.controller)

check_len = min(CHECK, self.controller.batcher.step)
# Go thru the multicalls and add calls to the batch
for mcall in self.multicalls.values():
# NOTE: If a multicall has less than `CHECK` calls, we should just throw the calls into a jsonrpc batch individually.
try: # NOTE: This should be faster than using len().
mcall[check_len]
working_batch.append(mcall, skip_check=True)
except IndexError:
working_batch.extend(mcall, skip_check=True)
if working_batch.is_full:
yield working_batch
working_batch = JSONRPCBatch(self.controller)

rpc_calls_to_batch = self.rpc_calls[:]
while rpc_calls_to_batch:
if working_batch.is_full:
yield working_batch
working_batch = JSONRPCBatch(self.controller)
working_batch.append(rpc_calls_to_batch.pop(), skip_check=True)
if working_batch:
if working_batch.is_single_multicall:
yield working_batch[0] # type: ignore [misc]
elif len(working_batch) == 1:
yield working_batch[0].make_request()
else:
yield working_batch
Loading

0 comments on commit 6a6be52

Please sign in to comment.