diff --git a/main.py b/main.py index 800db0d..83f5fff 100644 --- a/main.py +++ b/main.py @@ -11,31 +11,6 @@ logger = Logger() logger.info("Starting binance db...") -pgurl = os.environ['BDB_POSTGRES_URL'] -pgport = os.environ['BDB_POSTGRES_PORT'] -pguser = os.environ['BDB_POSTGRES_USER'] -pgpass = os.environ['BDB_POSTGRES_PASS'] -conn = f'postgresql://{pguser}:{pgpass}@{pgurl}:{pgport}/{pguser}' - -# let pg start up. first run can take longer than others -logger.info("Waiting for Postgres...") -pg_try = 0 -while True: - time.sleep(5) - try: - bdb = BinanceDB(conn) - break - except: - pg_try += 1 - if pg_try > 5: - logger.error("Unable to connect to postgres") - exit(1) - -db = bdb.get_session() - -client = Client(api_key='', api_secret='') -bm = BinanceSocketManager(client) - PAIR = os.environ['PAIR'] INTERVAL = '1m' @@ -43,11 +18,38 @@ init_candles = [] def main(): - start_ws() - load_historical() + db = connect_db() + client = Client(api_key='', api_secret='') + bm = BinanceSocketManager(client) + + start_ws(db, bm) + load_historical(db, client) logger.info("Binance DB locked and loaded!") -def start_ws(): +def connect_db(): + pgurl = os.environ['BDB_POSTGRES_URL'] + pgport = os.environ['BDB_POSTGRES_PORT'] + pguser = os.environ['BDB_POSTGRES_USER'] + pgpass = os.environ['BDB_POSTGRES_PASS'] + conn = f'postgresql://{pguser}:{pgpass}@{pgurl}:{pgport}/{pguser}' + + # let pg start up. first run can take longer than others + logger.info("Waiting for Postgres...") + pg_try = 0 + while True: + time.sleep(5) + try: + bdb = BinanceDB(conn) + break + except: + pg_try += 1 + if pg_try > 5: + logger.error("Unable to connect to postgres") + exit(1) + + return bdb.get_session() + +def start_ws(db, bm): logger.info("Starting Binance WS...") pws = lambda x: process_ws(x, db) bm.start_kline_socket(PAIR, pws, interval=INTERVAL) @@ -69,40 +71,48 @@ def process_ws(msg, db): else: init_candles.append(candle) -def load_historical(): +def load_historical(db, client): logger.info("Getting historical data...") # if db already has data start there - newest_candle = get_newest_in_db() - if newest_candle == None: - starttime = '100 years ago UTC' - else: + newest_candle = get_newest_in_db(db) + if newest_candle != None: starttime = str(newest_candle.close_time) + else: + starttime = '100 years ago UTC' + logger.info("No data in DB. Getting all history...") + logger.info("This could take a while...") - klines = client.get_historical_klines(PAIR, '1m', starttime) + klines = client.get_historical_klines(PAIR, INTERVAL, starttime) + logger.info("Data retrieved, adding to db...") # last kline not closed will get from ws klines = klines[:-1] - candles = [] - for kline in klines: - candle = Candle(PAIR, kline) + for b in batch(klines, 1000): + for kline in b: + candle = Candle(PAIR, kline) - # long running imports can cause overlap - if candle not in init_candles: - candles.append(candle) + # long running imports can cause overlap + if candle not in init_candles: + db.add(candle) + db.commit() global init init = True - db.add_all(candles) db.add_all(init_candles) db.commit() logger.info("Historical data loaded...") -def get_newest_in_db(): +def get_newest_in_db(db): newest = db.query(Candle).filter_by(pair=PAIR).order_by(Candle.close_time.desc()).first() logger.info(f'Most recrent candle on start: {newest}') return newest +def batch(iterable, n=1): + l = len(iterable) + for ndx in range(0, l, n): + yield iterable[ndx:min(ndx + n, l)] + if __name__ == '__main__': main()