From 71032104c9fdbfcf03855f0ac7da160137f21d63 Mon Sep 17 00:00:00 2001 From: Sam Kleiner Date: Sat, 16 Jun 2018 18:37:44 -0400 Subject: [PATCH] configure postgres --- .dockerignore | 1 + README.md | 2 +- docker-compose.yml | 23 +++++++++----- main.py | 74 ++++++++++++++++++++++++++++++++++++++++------ setup.py | 2 +- 5 files changed, 84 insertions(+), 18 deletions(-) create mode 100644 .dockerignore diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..ed18ea5 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +pgdata/* \ No newline at end of file diff --git a/README.md b/README.md index 04a3d6b..b4ea2b1 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Binance DB -A local cache for binance data, stored in Postgress +A local cache for binance data, stored in Postgres ## Setup diff --git a/docker-compose.yml b/docker-compose.yml index 7a3be13..676be51 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,14 +4,23 @@ services: image: stoicperlman/binance-db build: context: . - restart: always + restart: on-failure:3 environment: - BDB_POSTGRESS_URL: postgress - BDB_POSTGRESS_USER: binancedb - BDB_POSTGRESS_PASS: ${BDB_POSTGRESS_PASS} - posgress: + PAIR: BTCUSDT + BDB_POSTGRES_URL: postgres + BDB_POSTGRES_PORT: 5432 + BDB_POSTGRES_DB: binancedb + BDB_POSTGRES_USER: binancedb + BDB_POSTGRES_PASS: ${BDB_POSTGRES_PASS} + postgres: image: postgres:alpine restart: always + ports: + - 5432:5432 environment: - POSTGRESS_USER: binancedb - POSTGRESS_PASS: ${BDB_POSTGRESS_PASS} \ No newline at end of file + POSTGRES_DB: binancedb + POSTGRES_USER: binancedb + POSTGRES_PASSWORD: ${BDB_POSTGRES_PASS} + PGDATA: /var/lib/postgresql/data/pgdata + volumes: + - ./pgdata:/var/lib/postgresql/data \ No newline at end of file diff --git a/main.py b/main.py index b6a67aa..800db0d 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,5 @@ +import os +import time from binance_db.db import BinanceDB from binance_db.util.logger import Logger from binance_db.candle import Candle, WSCandle @@ -7,21 +9,49 @@ from binance.websockets import BinanceSocketManager logger = Logger() +logger.info("Starting binance db...") + +pgurl = os.environ['BDB_POSTGRES_URL'] +pgport = os.environ['BDB_POSTGRES_PORT'] +pguser = os.environ['BDB_POSTGRES_USER'] +pgpass = os.environ['BDB_POSTGRES_PASS'] +conn = f'postgresql://{pguser}:{pgpass}@{pgurl}:{pgport}/{pguser}' + +# let pg start up. first run can take longer than others +logger.info("Waiting for Postgres...") +pg_try = 0 +while True: + time.sleep(5) + try: + bdb = BinanceDB(conn) + break + except: + pg_try += 1 + if pg_try > 5: + logger.error("Unable to connect to postgres") + exit(1) -bdb = BinanceDB('sqlite:///:memory:', echo=True) db = bdb.get_session() client = Client(api_key='', api_secret='') bm = BinanceSocketManager(client) -PAIR = 'BTCUSDT' +PAIR = os.environ['PAIR'] INTERVAL = '1m' +init = False +init_candles = [] + def main(): + start_ws() + load_historical() + logger.info("Binance DB locked and loaded!") + +def start_ws(): + logger.info("Starting Binance WS...") pws = lambda x: process_ws(x, db) bm.start_kline_socket(PAIR, pws, interval=INTERVAL) bm.start() - load_historical() def process_ws(msg, db): if msg[ws.EVENT_TYPE] == ws.ERROR_EVENT: @@ -31,22 +61,48 @@ def process_ws(msg, db): candle = WSCandle(msg) if candle.closed: - logger.debug(f'New candle: {candle}') - db.add(candle) - db.commit() + global init + logger.info(f'New candle: {candle}') + if init: + db.add(candle) + db.commit() + else: + init_candles.append(candle) def load_historical(): - klines = client.get_historical_klines('BTCUSDT', '1m', '10 minutes ago UTC') + logger.info("Getting historical data...") + + # if db already has data start there + newest_candle = get_newest_in_db() + if newest_candle == None: + starttime = '100 years ago UTC' + else: + starttime = str(newest_candle.close_time) + + klines = client.get_historical_klines(PAIR, '1m', starttime) - # last isn't closed and will be added by ws + # last kline not closed will get from ws klines = klines[:-1] candles = [] for kline in klines: - candles.append(Candle(PAIR, kline)) + candle = Candle(PAIR, kline) + # long running imports can cause overlap + if candle not in init_candles: + candles.append(candle) + + global init + init = True db.add_all(candles) + db.add_all(init_candles) db.commit() + logger.info("Historical data loaded...") + +def get_newest_in_db(): + newest = db.query(Candle).filter_by(pair=PAIR).order_by(Candle.close_time.desc()).first() + logger.info(f'Most recrent candle on start: {newest}') + return newest if __name__ == '__main__': main() diff --git a/setup.py b/setup.py index e2a21ec..97c71c9 100644 --- a/setup.py +++ b/setup.py @@ -25,5 +25,5 @@ ], keywords='binance data cache', packages=['binance_db'], - install_requires=['sqlalchemy', 'python-binance'] + install_requires=['sqlalchemy', 'psycopg2-binary', 'python-binance'] )