Skip to content

Commit

Permalink
fix get historical
Browse files Browse the repository at this point in the history
  • Loading branch information
sam-kleiner committed Jun 17, 2018
1 parent 7103210 commit 171368c
Showing 1 changed file with 52 additions and 42 deletions.
94 changes: 52 additions & 42 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,45 @@
logger = Logger()
logger.info("Starting binance db...")

pgurl = os.environ['BDB_POSTGRES_URL']
pgport = os.environ['BDB_POSTGRES_PORT']
pguser = os.environ['BDB_POSTGRES_USER']
pgpass = os.environ['BDB_POSTGRES_PASS']
conn = f'postgresql://{pguser}:{pgpass}@{pgurl}:{pgport}/{pguser}'

# let pg start up. first run can take longer than others
logger.info("Waiting for Postgres...")
pg_try = 0
while True:
time.sleep(5)
try:
bdb = BinanceDB(conn)
break
except:
pg_try += 1
if pg_try > 5:
logger.error("Unable to connect to postgres")
exit(1)

db = bdb.get_session()

client = Client(api_key='', api_secret='')
bm = BinanceSocketManager(client)

PAIR = os.environ['PAIR']
INTERVAL = '1m'

init = False
init_candles = []

def main():
start_ws()
load_historical()
db = connect_db()
client = Client(api_key='', api_secret='')
bm = BinanceSocketManager(client)

start_ws(db, bm)
load_historical(db, client)
logger.info("Binance DB locked and loaded!")

def start_ws():
def connect_db():
pgurl = os.environ['BDB_POSTGRES_URL']
pgport = os.environ['BDB_POSTGRES_PORT']
pguser = os.environ['BDB_POSTGRES_USER']
pgpass = os.environ['BDB_POSTGRES_PASS']
conn = f'postgresql://{pguser}:{pgpass}@{pgurl}:{pgport}/{pguser}'

# let pg start up. first run can take longer than others
logger.info("Waiting for Postgres...")
pg_try = 0
while True:
time.sleep(5)
try:
bdb = BinanceDB(conn)
break
except:
pg_try += 1
if pg_try > 5:
logger.error("Unable to connect to postgres")
exit(1)

return bdb.get_session()

def start_ws(db, bm):
logger.info("Starting Binance WS...")
pws = lambda x: process_ws(x, db)
bm.start_kline_socket(PAIR, pws, interval=INTERVAL)
Expand All @@ -69,40 +71,48 @@ def process_ws(msg, db):
else:
init_candles.append(candle)

def load_historical():
def load_historical(db, client):
logger.info("Getting historical data...")

# if db already has data start there
newest_candle = get_newest_in_db()
if newest_candle == None:
starttime = '100 years ago UTC'
else:
newest_candle = get_newest_in_db(db)
if newest_candle != None:
starttime = str(newest_candle.close_time)
else:
starttime = '100 years ago UTC'
logger.info("No data in DB. Getting all history...")
logger.info("This could take a while...")

klines = client.get_historical_klines(PAIR, '1m', starttime)
klines = client.get_historical_klines(PAIR, INTERVAL, starttime)
logger.info("Data retrieved, adding to db...")

# last kline not closed will get from ws
klines = klines[:-1]
candles = []

for kline in klines:
candle = Candle(PAIR, kline)
for b in batch(klines, 1000):
for kline in b:
candle = Candle(PAIR, kline)

# long running imports can cause overlap
if candle not in init_candles:
candles.append(candle)
# long running imports can cause overlap
if candle not in init_candles:
db.add(candle)
db.commit()

global init
init = True
db.add_all(candles)
db.add_all(init_candles)
db.commit()
logger.info("Historical data loaded...")

def get_newest_in_db():
def get_newest_in_db(db):
newest = db.query(Candle).filter_by(pair=PAIR).order_by(Candle.close_time.desc()).first()
logger.info(f'Most recrent candle on start: {newest}')
return newest

def batch(iterable, n=1):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]

if __name__ == '__main__':
main()

0 comments on commit 171368c

Please sign in to comment.