Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added live trading support #1009

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added livetrading/__init__.py
Empty file.
85 changes: 85 additions & 0 deletions livetrading/broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from decimal import Decimal
from typing import Any, Dict, Optional

from livetrading.event import KLinesEventSource, Pair, PairInfo, TickersEventSource
from livetrading.rest_cli import RestClient
from livetrading.websocket_client import WSClient


class Broker:
"""A client for crypto currency exchange.

:param dispatcher: The event dispatcher.
:param config: Config settings for exchange.
"""
def __init__(
self, dispatcher, config
):
self.dispatcher = dispatcher
self.config = config
self.api_cli = RestClient(self.config)
self.cli: Optional[Any] = None # external libs as ccxt
self.ws_cli = WSClient(config)
self._cached_pairs: Dict[Pair] = {}

def subscribe_to_ticker_events(
self, pair: Pair, interval: str, event_handler
):
"""Registers a callable that will be called every ticker.

:param bar_duration: The bar duration. One of 1s, 1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 3d, 1w, 1M.
:param pair: The trading pair.
:param event_handler: A callable that receives an TickerEvent.
"""

event_source = TickersEventSource(pair, interval, self.ws_cli)
channel = "ticker"

self._subscribe_to_ws_channel_events(
channel,
event_handler,
event_source
)

def subscribe_to_bar_events(
self, pair: Pair, event_handler, interval
):
"""Registers a callable that will be called every bar.

:param pair: The trading pair.
:param event_handler: A callable that receives an BarEvent.
"""
event_source = KLinesEventSource(pair, self.ws_cli)
channel = event_source.ws_channel(interval)

self._subscribe_to_ws_channel_events(
channel,
event_handler,
event_source
)

def get_pair_info(self, pair: Pair) -> PairInfo:
"""Returns information about a trading pair.

:param pair: The trading pair.
"""
ret = self._cached_pairs.get(pair)
api_path = '/'.join(['products', pair])
if not ret:
pair_info = self.api_cli.call(method='GET', apipath=api_path)
self._cached_pairs[pair] = PairInfo(Decimal(pair_info['base_increment']),
Decimal(pair_info['quote_increment']))
return self._cached_pairs

def get_data_df(self, event_source):
data_source = self.ws_cli.event_sources[event_source]
return list(data_source.events)

def _subscribe_to_ws_channel_events(
self, channel: str, event_handler, event_source
):
# Set the event source for the channel.
self.ws_cli.set_channel_event_source(channel, event_source)

# Subscribe the event handler to the event source.
self.dispatcher.subscribe(event_source, event_handler)
4 changes: 4 additions & 0 deletions livetrading/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from configloader import ConfigLoader

config = ConfigLoader()
config.update_from_json_file('path_to_json_file')
17 changes: 17 additions & 0 deletions livetrading/converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import pandas as pd

DEFAULT_DATAFRAME_COLUMNS = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']

def ohlcv_to_dataframe(historical_data: list) -> pd.DataFrame:
"""
Converts historical data to a Dataframe
:param historical_data: list with candle (OHLCV) data
:return: DataFrame
"""
df = pd.DataFrame(
[{fn: getattr(f, fn) for fn in DEFAULT_DATAFRAME_COLUMNS} for f in historical_data]
)
df['Date'] = pd.to_datetime(df['Date'], unit='ms', utc=True, )
df = df.set_index('Date')
df = df.sort_index(ascending=True)
return df.head()
5 changes: 5 additions & 0 deletions livetrading/env
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"ws_url": "wss://ws-feed.exchange.coinbase.com",
"api_url": "https://api.exchange.coinbase.com/",
"ws_timeout": 5
}
236 changes: 236 additions & 0 deletions livetrading/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
import abc
import dataclasses
import datetime

from collections import deque
from dateutil.parser import isoparse
from typing import Optional


intervals = {
"1s": 1,
"1m": 60,
"3m": 3 * 60,
"5m": 5 * 60,
"15m": 15 * 60,
"30m": 30 * 60,
"1h": 3600,
"2h": 2 * 3600,
"4h": 4 * 3600,
"6h": 6 * 3600,
"8h": 8 * 3600,
"12h": 12 * 3600,
"1d": 86400,
"3d": 3 * 86400,
"1w": 7 * 86400,
"1M": 31 * 86400
}


@dataclasses.dataclass
class Bar:
"""A Bar, aka candlestick, is the summary of the trading activity in a given period.

:param date: The beginning of the period. It must have timezone information set.
:param pair: The trading pair.
:param open: The opening price.
:param high: The highest traded price.
:param low: The lowest traded price.
:param close: The closing price.
:param volume: The volume traded.
"""
date: datetime
pair: str
Open: float
High: float
Low: float
Close: float
Volume: float


@dataclasses.dataclass
class Pair:
"""A trading pair.

:param base_symbol: The base symbol.
:param quote_symbol: The quote symbol.
"""
base_symbol: str
quote_symbol: str

def __str__(self):
# change format here to reflect corresponding exchange
return "{}-{}".format(self.base_symbol, self.quote_symbol)


@dataclasses.dataclass
class PairInfo:
"""Information about a trading pair.

:param base_increment: The increment for the base symbol.
:param quote_increment: The increment for the quote symbol.
"""
base_increment: float
quote_increment: float


class Ticker:
"""A Ticker constantly updating stream of information about a stock.
:param datetime: The beginning of the period. It must have timezone information set.
:param pair: The trading pair.
:param open: The opening price.
:param high: The highest traded price.
:param low: The lowest traded price.
:param price: The price.
:param volume: The volume traded.
"""
def __init__(self, pair: Pair, json: dict):
self.pair: Pair = pair
self.json: dict = json
self.Date = isoparse(json['time'])
self.Volume = float(json["volume_24h"])
self.Open = float(json["open_24h"])
self.High = float(json["high_24h"])
self.Low = float(json["low_24h"])
self.Close = float(json["price"])


class KlineBar(Bar):
"""
K-line, aka candlestick, is a chart marked with the opening price, closing price,
highest price, and lowest price to reflect price changes.
:param pair: The trading pair.
:param json: Message json.
"""
def __init__(self, pair: Pair, json: dict):
super().__init__(
datetime.utcfromtimestamp(
int(json["t"] / 1e3).replace(tzinfo=datetime.timezone.utc)),
pair, float(json["o"]), float(json["h"]),
float(json["l"]), float(json["c"]), float(json["v"])
)
self.pair: Pair = pair
self.json: dict = json


class EventProducer:
"""Base class for event producers.
.. note::

Main method is for main functions that should be performed for an event producer.
Finalize method is called on error or stop.
"""
def main(self):
"""Override to run the loop that produces events."""
pass

def finalize(self):
"""Override to perform task and transaction cancellation."""
pass


class Event:
"""Base class for events.

:param when: The datetime when the event occurred.
Used to calculate the datetime for the next event.
It must have timezone information set.
"""

def __init__(self, when: datetime.datetime):
self.when: datetime.datetime = when


class EventSource(metaclass=abc.ABCMeta):
"""Base class for events storage.

:param producer: EventProducer.
"""

def __init__(self, producer: Optional[EventProducer] = None):
self.producer = producer
self.events = deque()


class ChannelEventSource(EventSource):
"""Base class for websockets channels.

:param producer: EventProducer.
"""
def __init__(self, producer: EventProducer):
super().__init__(producer=producer)

@abc.abstractmethod
def push_to_queue(self, message: dict):
raise NotImplementedError()


class TickersEventSource(ChannelEventSource):
"""An event source for :class:`Ticker` instances.

:param pair: The trading pair.
"""
def __init__(self, pair: Pair, when: datetime, producer: EventProducer):
super().__init__(producer=producer)
self.pair: Pair = pair
self.when = intervals.get(when)

def push_to_queue(self, message: dict):
timestamp = message["time"]
dt = isoparse(timestamp) + datetime.timedelta(seconds=self.when)
self.events.append(TickerEvent(
dt,
Ticker(self.pair, message)))


class KLinesEventSource(EventSource):
"""An event source for :class:`KLineBar` instances.

:param pair: The trading pair..
"""
def __init__(self, pair: Pair, producer: EventProducer):
super().__init__(producer=producer)
self.pair: Pair = pair

def push_to_queue(self, message: dict):
kline_event = message["data"]
kline = kline_event["k"]
# Wait for the last update to the kline.
if kline["x"] is False:
return
self.events.append(BarEvent(
datetime.utcfromtimestamp(
int(kline_event["E"] / 1e3).replace(tzinfo=datetime.timezone.utc)),
KlineBar(self.pair, kline)))

def ws_channel(self, interval: str) -> str:
"""
Generate websocket channel
"""
return "{}@kline_{}".format(
"{}{}".format(self.pair.base_symbol.upper(), self.pair.quote_symbol.upper()).lower(),
interval)


class BarEvent(Event):
"""An event for :class:`Bar` instances.

:param when: The datetime when the event occurred. It must have timezone information set.
:param bar: The bar.
"""
def __init__(self, when, bar: Bar):
super().__init__(when)

self.data = bar


class TickerEvent(Event):
"""An event for :class:`Ticker` instances.

:param when: The datetime when the event occurred. It must have timezone information set.
:param ticker: The Ticker.
"""
def __init__(self, when, ticker: Ticker):
super().__init__(when)

self.data = ticker
Loading