Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dark vlm #260

Merged
merged 17 commits into from
Jan 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions piker/brokers/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import trio
import tractor
from tractor.experimental import msgpub
from async_generator import asynccontextmanager

from ..log import get_logger, get_console_log
Expand Down Expand Up @@ -98,7 +99,7 @@ class BrokerFeed:
)


@tractor.msg.pub(tasks=['stock', 'option'])
@msgpub(tasks=['stock', 'option'])
async def stream_poll_requests(
get_topics: Callable,
get_quotes: Coroutine,
Expand Down Expand Up @@ -293,7 +294,7 @@ async def start_quote_stream(

await stream_poll_requests(

# ``msg.pub`` required kwargs
# ``trionics.msgpub`` required kwargs
task_name=feed_type,
ctx=ctx,
topics=symbols,
Expand Down
12 changes: 10 additions & 2 deletions piker/brokers/ib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,11 @@ async def get_client(
# https://interactivebrokers.github.io/tws-api/tick_types.html
tick_types = {
77: 'trade',
48: 'utrade',

# a "utrade" aka an off exchange "unreportable" (dark) vlm:
# https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume
48: 'dark_trade',

0: 'bsize',
1: 'bid',
2: 'ask',
Expand All @@ -1046,13 +1050,17 @@ async def get_client(
def normalize(
ticker: Ticker,
calc_price: bool = False

) -> dict:
# convert named tuples to dicts so we send usable keys
new_ticks = []
for tick in ticker.ticks:
if tick and not isinstance(tick, dict):
td = tick._asdict()
td['type'] = tick_types.get(td['tickType'], 'n/a')
td['type'] = tick_types.get(
td['tickType'],
'n/a',
)

new_ticks.append(td)

Expand Down
2 changes: 0 additions & 2 deletions piker/brokers/kraken.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,6 @@ async def subscribe(ws: wsproto.WSConnection):
quote = ohlc
topic = quote['symbol'].lower()

# XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]``
await send_chan.send({topic: quote})


Expand Down
12 changes: 8 additions & 4 deletions piker/data/_normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@

def iterticks(
quote: dict,
types: Tuple[str] = ('trade', 'utrade'),
types: Tuple[str] = ('trade', 'dark_trade'),

) -> AsyncIterator:
"""Iterate through ticks delivered per quote cycle.
"""
'''
Iterate through ticks delivered per quote cycle.

'''
# print(f"{quote}\n\n")
ticks = quote.get('ticks', ())
if ticks:
for tick in ticks:
# print(f"{quote['symbol']}: {tick}")
if tick.get('type') in types:
ttype = tick.get('type')
if ttype in types:
yield tick
11 changes: 5 additions & 6 deletions piker/data/_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,11 @@ async def sample_and_broadcast(
# TODO: ``numba`` this!
for sym, quote in quotes.items():

# TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
# TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that
# will require at least some way to prevent task switching
# at the yield such that the array write isn't delayed while
# another consumer is serviced..

# start writing the shm buffer with appropriate
# trade data
Expand Down
163 changes: 163 additions & 0 deletions piker/fsp/_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship of pikers)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

'''
FSP (financial signal processing) apis.

'''

# TODO: things to figure the heck out:
# - how to handle non-plottable values (pyqtgraph has facility for this
# now in `arrayToQPath()`)
# - composition of fsps / implicit chaining syntax (we need an issue)

from __future__ import annotations
from functools import partial
from typing import (
Any,
Callable,
Awaitable,
Optional,
)

import numpy as np
import tractor
from tractor.msg import NamespacePath

from ..data._sharedmem import (
ShmArray,
maybe_open_shm_array,
)
from ..log import get_logger

log = get_logger(__name__)

# global fsp registry filled out by @fsp decorator below
_fsp_registry = {}


def _load_builtins() -> dict[tuple, Callable]:

# import to implicity trigger registration via ``@fsp``
from . import _momo # noqa
from . import _volume # noqa

return _fsp_registry


class Fsp:
'''
"Financial signal processor" decorator wrapped async function.

'''

# TODO: checkout the advanced features from ``wrapt``:
# - dynamic enable toggling,
# https://wrapt.readthedocs.io/en/latest/decorators.html#dynamically-disabling-decorators
# - custom object proxies, might be useful for implementing n-compose
# https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-object-proxies
# - custom function wrappers,
# https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-function-wrappers

def __init__(
self,
func: Callable[..., Awaitable],
*,
outputs: tuple[str] = (),
display_name: Optional[str] = None,
**config,

) -> None:

# TODO (maybe):
# - type introspection?
# - should we make this a wrapt object proxy?
self.func = func
self.__name__ = func.__name__ # XXX: must have func-object name

self.ns_path: tuple[str, str] = NamespacePath.from_ref(func)
self.outputs = outputs
self.config: dict[str, Any] = config

# register with declared set.
_fsp_registry[self.ns_path] = func

@property
def name(self) -> str:
return self.__name__

def __call__(
self,

# TODO: when we settle on py3.10 we should probably use the new
# type annots from pep 612:
# https://www.python.org/dev/peps/pep-0612/
# instance,
*args,
**kwargs
):
return self.func(*args, **kwargs)


def fsp(
wrapped=None,
*,
outputs: tuple[str] = (),
display_name: Optional[str] = None,
**config,

) -> Fsp:

if wrapped is None:
return partial(
Fsp,
outputs=outputs,
display_name=display_name,
**config,
)

return Fsp(wrapped, outputs=(wrapped.__name__,))


def maybe_mk_fsp_shm(
sym: str,
target: fsp,
readonly: bool = True,

) -> (ShmArray, bool):
'''
Allocate a single row shm array for an symbol-fsp pair if none
exists, otherwise load the shm already existing for that token.

'''
uid = tractor.current_actor().uid

# TODO: load output types from `Fsp`
# - should `index` be a required internal field?
fsp_dtype = np.dtype(
[('index', int)] +
[(field_name, float) for field_name in target.outputs]
)

key = f'{sym}.fsp.{target.name}.{".".join(uid)}'

shm, opened = maybe_open_shm_array(
key,
# TODO: create entry for each time frame
dtype=fsp_dtype,
readonly=True,
)
return shm, opened
Loading