From 0028391e20833c1bfa552aa2d140e304bd4f2684 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Feb 2022 13:03:11 -0500 Subject: [PATCH 1/8] Basic module-script for spawning `marketstore`, needs correct bind mount usage --- piker/data/_ahab.py | 151 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) create mode 100644 piker/data/_ahab.py diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py new file mode 100644 index 0000000000..591c19a935 --- /dev/null +++ b/piker/data/_ahab.py @@ -0,0 +1,151 @@ +# piker: trading gear for hackers +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Supervisor for docker with included specific-image service helpers. + +''' +from typing import Optional +from contextlib import contextmanager as cm +# import time + +import trio +import tractor +import docker +import json +# from docker.containers import Container +from requests import ConnectionError + +from ..log import get_logger, get_console_log + +log = get_logger(__name__) + + +_config = ''' +# mount this config using: +# sudo docker run --mount type=bind,source="$HOME/.config/piker/",target="/etc" -i -p 5993:5993 alpacamarkets/marketstore:latest +root_directory: data +listen_port: 5993 +grpc_listen_port: 5995 +log_level: info +queryable: true +stop_grace_period: 0 +wal_rotate_interval: 5 +stale_threshold: 5 +enable_add: true +enable_remove: false + +triggers: + - module: ondiskagg.so + on: "*/1Sec/OHLCV" + config: + # filter: "nasdaq" + destinations: + - 1Min + - 5Min + - 15Min + - 1H + - 1D + + - module: stream.so + on: '*/*/*' + # config: + # filter: "nasdaq" + +''' + + +@cm +def open_docker( + url: Optional[str] = None, + **kwargs, + +) -> docker.DockerClient: + + # yield docker.Client( + # base_url=url, + # **kwargs + # ) if url else + yield docker.from_env(**kwargs) + + +@tractor.context +async def open_marketstore_container( + ctx: tractor.Context, + **kwargs, + +) -> None: + log = get_console_log('info', name=__name__) + # this cli should "just work" + # sudo docker run --mount + # type=bind,source="$HOME/.config/piker/",target="/etc" -i -p + # 5993:5993 alpacamarkets/marketstore:latest + client = docker.from_env(**kwargs) + + # with open_docker() as client: + ctnr = client.containers.run( + 'alpacamarkets/marketstore:latest', + [ + '--mount', + 'type=bind,source="$HOME/.config/piker/",target="/etc"', + '-i', + '-p 5993:5993', + ], + detach=True, + ) + started: bool = False + logs = ctnr.logs(stream=True) + + with trio.move_on_after(0.5): + for entry in logs: + entry = entry.decode() + try: + record = json.loads(entry.strip()) + except json.JSONDecodeError: + if 'Error' in entry: + raise RuntimeError(entry) + # await tractor.breakpoint() + msg = record['msg'] + + if "launching tcp listener for all services..." in msg: + started = True + break + + await trio.sleep(0) + + if not started and ctnr not in client.containers.list(): + raise RuntimeError( + 'Failed to start `marketstore` check logs output for deats' + ) + + await ctx.started() + await tractor.breakpoint() + + +async def main(): + async with tractor.open_nursery( + loglevel='info', + ) as tn: + portal = await tn.start_actor('ahab', enable_modules=[__name__]) + + async with portal.open_context( + open_marketstore_container + + ) as (first, ctx): + await trio.sleep_forever() + +if __name__ == '__main__': + trio.run(main) From d18d0d77db5034ec1c7cbba6a146e2eb817e7f71 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Feb 2022 14:03:24 -0500 Subject: [PATCH 2/8] Extract non-sudo user for config dir path --- piker/cli/__init__.py | 20 +++----------------- piker/config.py | 28 +++++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 97b6ebc3dc..8e9e466ace 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -16,20 +16,6 @@ log = get_logger('cli') DEFAULT_BROKER = 'questrade' -_config_dir = click.get_app_dir('piker') -_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') -_context_defaults = dict( - default_map={ - # Questrade specific quote poll rates - 'monitor': { - 'rate': 3, - }, - 'optschain': { - 'rate': 1, - }, - } -) - @click.command() @click.option('--loglevel', '-l', default='warning', help='Logging level') @@ -58,7 +44,7 @@ async def main(): trio.run(main) -@click.group(context_settings=_context_defaults) +@click.group(context_settings=config._context_defaults) @click.option( '--brokers', '-b', default=[DEFAULT_BROKER], @@ -87,8 +73,8 @@ def cli(ctx, brokers, loglevel, tl, configdir): 'loglevel': loglevel, 'tractorloglevel': None, 'log': get_console_log(loglevel), - 'confdir': _config_dir, - 'wl_path': _watchlists_data_path, + 'confdir': config._config_dir, + 'wl_path': config._watchlists_data_path, }) # allow enabling same loglevel in ``tractor`` machinery diff --git a/piker/config.py b/piker/config.py index 93a4737849..836da4323c 100644 --- a/piker/config.py +++ b/piker/config.py @@ -30,8 +30,34 @@ log = get_logger('broker-config') -_config_dir = click.get_app_dir('piker') +_config_dir = _click_config_dir = click.get_app_dir('piker') +_parent_user = os.environ.get('SUDO_USER') + +if _parent_user: + non_root_user_dir = os.path.expanduser( + f'~{_parent_user}' + ) + root = 'root' + _config_dir = ( + non_root_user_dir + + _click_config_dir[ + _click_config_dir.rfind(root) + len(root): + ] + ) + _file_name = 'brokers.toml' +_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') +_context_defaults = dict( + default_map={ + # Questrade specific quote poll rates + 'monitor': { + 'rate': 3, + }, + 'optschain': { + 'rate': 1, + }, + } +) def _override_config_dir( From 394218da968589f237e94361b3b4b823277726c7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Feb 2022 22:07:50 -0500 Subject: [PATCH 3/8] Add a super simple `marketstore` container supervisor --- piker/data/_ahab.py | 198 ++++++++++++++++++++++++++++++-------------- 1 file changed, 135 insertions(+), 63 deletions(-) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 591c19a935..08c0bcd1e7 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -18,25 +18,33 @@ Supervisor for docker with included specific-image service helpers. ''' -from typing import Optional -from contextlib import contextmanager as cm +from typing import ( + Optional, + # Any, +) +from contextlib import asynccontextmanager as acm # import time import trio import tractor import docker import json -# from docker.containers import Container -from requests import ConnectionError +from docker.models.containers import Container -from ..log import get_logger, get_console_log +from ..log import get_logger # , get_console_log +from ..config import _config_dir log = get_logger(__name__) _config = ''' +# piker's ``marketstore`` config. + # mount this config using: -# sudo docker run --mount type=bind,source="$HOME/.config/piker/",target="/etc" -i -p 5993:5993 alpacamarkets/marketstore:latest +# sudo docker run --mount \ +# type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \ +# 5993:5993 alpacamarkets/marketstore:latest + root_directory: data listen_port: 5993 grpc_listen_port: 5995 @@ -68,18 +76,58 @@ ''' -@cm -def open_docker( +@acm +async def open_docker( url: Optional[str] = None, **kwargs, ) -> docker.DockerClient: - # yield docker.Client( - # base_url=url, - # **kwargs - # ) if url else - yield docker.from_env(**kwargs) + client = docker.DockerClient( + base_url=url, + **kwargs + ) if url else docker.from_env(**kwargs) + + try: + yield client + finally: + # for c in client.containers.list(): + # c.kill() + client.close() + # client.api._custom_adapter.close() + + +# async def waitfor( +# cntr: Container, +# attr_path: tuple[str], +# expect=None, +# timeout: float = 0.5, + +# ) -> Any: +# ''' +# Wait for a container's attr value to be set. If ``expect`` is +# provided wait for the value to be set to that value. + +# This is an async version of the helper from our ``pytest-dockerctl`` +# plugin. + +# ''' +# def get(val, path): +# for key in path: +# val = val[key] +# return val + +# start = time.time() +# while time.time() - start < timeout: +# cntr.reload() +# val = get(cntr.attrs, attr_path) +# if expect is None and val: +# return val +# elif val == expect: +# return val +# else: +# raise TimeoutError("{} failed to be {}, value: \"{}\"".format( +# attr_path, expect if expect else 'not None', val)) @tractor.context @@ -88,64 +136,88 @@ async def open_marketstore_container( **kwargs, ) -> None: - log = get_console_log('info', name=__name__) - # this cli should "just work" - # sudo docker run --mount - # type=bind,source="$HOME/.config/piker/",target="/etc" -i -p - # 5993:5993 alpacamarkets/marketstore:latest - client = docker.from_env(**kwargs) - - # with open_docker() as client: - ctnr = client.containers.run( - 'alpacamarkets/marketstore:latest', - [ - '--mount', - 'type=bind,source="$HOME/.config/piker/",target="/etc"', - '-i', - '-p 5993:5993', - ], - detach=True, - ) - started: bool = False - logs = ctnr.logs(stream=True) - - with trio.move_on_after(0.5): - for entry in logs: - entry = entry.decode() - try: - record = json.loads(entry.strip()) - except json.JSONDecodeError: - if 'Error' in entry: - raise RuntimeError(entry) - # await tractor.breakpoint() - msg = record['msg'] - - if "launching tcp listener for all services..." in msg: - started = True - break - - await trio.sleep(0) - - if not started and ctnr not in client.containers.list(): - raise RuntimeError( - 'Failed to start `marketstore` check logs output for deats' + ''' + Start and supervise a marketstore instance with its config bind-mounted + in from the piker config directory on the system. + + The equivalent cli cmd to this code is: + + sudo docker run --mount \ + type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \ + 5993:5993 alpacamarkets/marketstore:latest + + ''' + # log = get_console_log('info', name=__name__) + + # client = docker.from_env(**kwargs) + async with open_docker() as client: + # create a mount from user's local piker config dir into container + config_dir_mnt = docker.types.Mount( + target='/etc', + source=_config_dir, + type='bind', ) - await ctx.started() - await tractor.breakpoint() + cntr: Container = client.containers.run( + 'alpacamarkets/marketstore:latest', + # do we need this for cmds? + # '-i', + + # '-p 5993:5993', + ports={'5993/tcp': 5993}, + mounts=[config_dir_mnt], + detach=True, + # stop_signal='SIGINT', + # init=True, + # remove=True, + ) + try: + started: bool = False + logs = cntr.logs(stream=True) + + with trio.move_on_after(0.5): + for entry in logs: + entry = entry.decode() + try: + record = json.loads(entry.strip()) + except json.JSONDecodeError: + if 'Error' in entry: + raise RuntimeError(entry) + msg = record['msg'] + + if "launching tcp listener for all services..." in msg: + started = True + break + + await trio.sleep(0) + + if not started and cntr not in client.containers.list(): + raise RuntimeError( + 'Failed to start `marketstore` check logs output for deats' + ) + + await ctx.started() + await trio.sleep_forever() + + finally: + cntr.stop() async def main(): + async with tractor.open_nursery( - loglevel='info', + loglevel='runtime', ) as tn: - portal = await tn.start_actor('ahab', enable_modules=[__name__]) - - async with portal.open_context( - open_marketstore_container - - ) as (first, ctx): + async with ( + ( + await tn.start_actor('ahab', enable_modules=[__name__]) + ).open_context( + open_marketstore_container + ) as (ctx, first), + ): + assert not first await trio.sleep_forever() + if __name__ == '__main__': trio.run(main) From aa286baa0778d3c41a8d0d154c23f2d29775b184 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Feb 2022 08:53:54 -0500 Subject: [PATCH 4/8] Drop import, it's got madness with and SIGINT? --- piker/config.py | 66 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/piker/config.py b/piker/config.py index 836da4323c..cf946405a4 100644 --- a/piker/config.py +++ b/piker/config.py @@ -17,6 +17,8 @@ """ Broker configuration mgmt. """ +import platform +import sys import os from os.path import dirname import shutil @@ -24,13 +26,73 @@ from bidict import bidict import toml -import click from .log import get_logger log = get_logger('broker-config') -_config_dir = _click_config_dir = click.get_app_dir('piker') + +# taken from ``click`` since apparently they have some +# super weirdness with sigint and sudo..no clue +def get_app_dir(app_name, roaming=True, force_posix=False): + r"""Returns the config folder for the application. The default behavior + is to return whatever is most appropriate for the operating system. + + To give you an idea, for an app called ``"Foo Bar"``, something like + the following folders could be returned: + + Mac OS X: + ``~/Library/Application Support/Foo Bar`` + Mac OS X (POSIX): + ``~/.foo-bar`` + Unix: + ``~/.config/foo-bar`` + Unix (POSIX): + ``~/.foo-bar`` + Win XP (roaming): + ``C:\Documents and Settings\\Local Settings\Application Data\Foo Bar`` + Win XP (not roaming): + ``C:\Documents and Settings\\Application Data\Foo Bar`` + Win 7 (roaming): + ``C:\Users\\AppData\Roaming\Foo Bar`` + Win 7 (not roaming): + ``C:\Users\\AppData\Local\Foo Bar`` + + .. versionadded:: 2.0 + + :param app_name: the application name. This should be properly capitalized + and can contain whitespace. + :param roaming: controls if the folder should be roaming or not on Windows. + Has no affect otherwise. + :param force_posix: if this is set to `True` then on any POSIX system the + folder will be stored in the home folder with a leading + dot instead of the XDG config home or darwin's + application support folder. + """ + + def _posixify(name): + return "-".join(name.split()).lower() + + # if WIN: + if platform.system() == 'Windows': + key = "APPDATA" if roaming else "LOCALAPPDATA" + folder = os.environ.get(key) + if folder is None: + folder = os.path.expanduser("~") + return os.path.join(folder, app_name) + if force_posix: + return os.path.join(os.path.expanduser("~/.{}".format(_posixify(app_name)))) + if sys.platform == "darwin": + return os.path.join( + os.path.expanduser("~/Library/Application Support"), app_name + ) + return os.path.join( + os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), + _posixify(app_name), + ) + + +_config_dir = _click_config_dir = get_app_dir('piker') _parent_user = os.environ.get('SUDO_USER') if _parent_user: From d319a6310afd24397ef9ba59711d2c9a40efba20 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Feb 2022 08:55:22 -0500 Subject: [PATCH 5/8] Drop old client instantiate line --- piker/data/_ahab.py | 1 - 1 file changed, 1 deletion(-) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 08c0bcd1e7..1e09a65a35 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -149,7 +149,6 @@ async def open_marketstore_container( ''' # log = get_console_log('info', name=__name__) - # client = docker.from_env(**kwargs) async with open_docker() as client: # create a mount from user's local piker config dir into container config_dir_mnt = docker.types.Mount( From 6a212df2bdcd0332ea0dd6251f96dce0ec6637f7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Feb 2022 08:56:10 -0500 Subject: [PATCH 6/8] Drop dependence on `msgpack` and `msgpack_numpy` --- piker/__init__.py | 7 ------- setup.py | 1 - 2 files changed, 8 deletions(-) diff --git a/piker/__init__.py b/piker/__init__.py index c84bfac2bb..a6437f8806 100644 --- a/piker/__init__.py +++ b/piker/__init__.py @@ -18,10 +18,3 @@ piker: trading gear for hackers. """ -import msgpack # noqa - -# TODO: remove this now right? -import msgpack_numpy - -# patch msgpack for numpy arrays -msgpack_numpy.patch() diff --git a/setup.py b/setup.py index 6f8fd89821..faaa8dace9 100755 --- a/setup.py +++ b/setup.py @@ -66,7 +66,6 @@ 'numpy', 'numba', 'pandas', - 'msgpack-numpy', # UI 'PyQt5', From 78e53943768fbea886a9ae52d7c1122c78b9382f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Feb 2022 12:16:07 -0500 Subject: [PATCH 7/8] Type annot updates --- piker/data/_sampling.py | 15 +++++++++------ piker/data/feed.py | 1 + 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 669f624eec..22d3660d40 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -16,27 +16,30 @@ """ Data buffers for fast shared humpy. + """ +from __future__ import annotations import time -from typing import Dict, List +from typing import TYPE_CHECKING import tractor import trio from trio_typing import TaskStatus -from ._sharedmem import ShmArray from ..log import get_logger +if TYPE_CHECKING: + from ._sharedmem import ShmArray log = get_logger(__name__) # TODO: we could stick these in a composed type to avoid # angering the "i hate module scoped variables crowd" (yawn). -_shms: Dict[int, List[ShmArray]] = {} -_start_increment: Dict[str, trio.Event] = {} -_incrementers: Dict[int, trio.CancelScope] = {} -_subscribers: Dict[str, tractor.Context] = {} +_shms: dict[int, list[ShmArray]] = {} +_start_increment: dict[str, trio.Event] = {} +_incrementers: dict[int, trio.CancelScope] = {} +_subscribers: dict[str, tractor.Context] = {} def shm_incrementing(shm_token_name: str) -> trio.Event: diff --git a/piker/data/feed.py b/piker/data/feed.py index 55f8b9b95b..100c31f019 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -20,6 +20,7 @@ This module is enabled for ``brokerd`` daemons. """ +from __future__ import annotations from dataclasses import dataclass, field from contextlib import asynccontextmanager from functools import partial From a90d20544784dd4589401041b6ee158888cab1a0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Feb 2022 12:20:17 -0500 Subject: [PATCH 8/8] Only throttle warn on rate >= display rate --- piker/ui/_display.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 3bfd327a84..14e1c2c74a 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -131,6 +131,7 @@ async def graphics_update_loop( # of copying it from last bar's close # - 1-5 sec bar lookback-autocorrection like tws does? # (would require a background history checker task) + display_rate = linked.godwidget.window.current_screen().refreshRate() chart = linked.chart @@ -215,7 +216,8 @@ async def graphics_update_loop( # in the absolute worst case we shouldn't see more then # twice the expected throttle rate right!? - and quote_rate >= _quote_throttle_rate * 1.5 + # and quote_rate >= _quote_throttle_rate * 2 + and quote_rate >= display_rate ): log.warning(f'High quote rate {symbol.key}: {quote_rate}')