Skip to content

Commit

Permalink
Add snapshot interval to book backends
Browse files Browse the repository at this point in the history
  • Loading branch information
bmoscon committed Sep 22, 2021
1 parent 9cadf8c commit 7a716ce
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 15 deletions.
4 changes: 3 additions & 1 deletion cryptofeed/backends/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ async def __call__(self, dtype, receipt_timestamp: float):

class BackendBookCallback:
async def __call__(self, book, receipt_timestamp: float):
if self.snapshots_only:
if self.snapshots_only or self.snapshot_interval == self.snapshot_count[book.symbol]:
data = book.to_dict(as_type=self.numeric_type)
del data['delta']
data['receipt_timestamp'] = receipt_timestamp
await self.write(data)
self.snapshot_count[book.symbol] = 0
else:
data = book.to_dict(delta=book.delta is not None, as_type=self.numeric_type)
data['receipt_timestamp'] = receipt_timestamp
if book.delta is None:
del data['delta']
await self.write(data)
self.snapshot_count[book.symbol] += 1
5 changes: 4 additions & 1 deletion cryptofeed/backends/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from collections import defaultdict
import itertools
import logging
from datetime import datetime as dt
Expand Down Expand Up @@ -52,8 +53,10 @@ class FundingElastic(ElasticCallback, BackendCallback):
class BookElastic(ElasticCallback, BackendBookCallback):
default_index = 'book'

def __init__(self, *args, index='book', snapshots_only=False, **kwargs):
def __init__(self, *args, index='book', snapshots_only=False, snapshot_interval=1000, **kwargs):
self.snapshots_only = snapshots_only
self.snapshot_interval = snapshot_interval
self.snapshot_count = defaultdict(int)
super().__init__(*args, index=index, **kwargs)
self.addr = f"{self.addr}/_bulk"

Expand Down
5 changes: 4 additions & 1 deletion cryptofeed/backends/gcppubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from collections import defaultdict
import os
import io
from typing import Optional
Expand Down Expand Up @@ -108,8 +109,10 @@ class FundingGCPPubSub(GCPPubSubCallback, BackendCallback):
class BookGCPPubSub(GCPPubSubCallback, BackendBookCallback):
default_key = 'book'

def __init__(self, *args, snapshots_only=False, **kwargs):
def __init__(self, *args, snapshots_only=False, snapshot_interval=1000, **kwargs):
self.snapshots_only = snapshots_only
self.snapshot_interval = snapshot_interval
self.snapshot_count = defaultdict(int)
super().__init__(*args, **kwargs)


Expand Down
5 changes: 4 additions & 1 deletion cryptofeed/backends/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from collections import defaultdict
import logging

from yapic import json
Expand Down Expand Up @@ -92,8 +93,10 @@ class FundingInflux(InfluxCallback, BackendCallback):
class BookInflux(InfluxCallback, BackendBookCallback):
default_key = 'book'

def __init__(self, *args, snapshots_only=False, **kwargs):
def __init__(self, *args, snapshots_only=False, snapshot_interval=1000, **kwargs):
self.snapshots_only = snapshots_only
self.snapshot_interval = snapshot_interval
self.snapshot_count = defaultdict(int)
super().__init__(*args, **kwargs)

def format(self, data):
Expand Down
5 changes: 4 additions & 1 deletion cryptofeed/backends/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from collections import defaultdict
import asyncio

from aiokafka import AIOKafkaProducer
Expand Down Expand Up @@ -46,8 +47,10 @@ class FundingKafka(KafkaCallback, BackendCallback):
class BookKafka(KafkaCallback, BackendBookCallback):
default_key = 'book'

def __init__(self, *args, snapshots_only=False, **kwargs):
def __init__(self, *args, snapshots_only=False, snapshot_interval=1000, **kwargs):
self.snapshots_only = snapshots_only
self.snapshot_interval = snapshot_interval
self.snapshot_count = defaultdict(int)
super().__init__(*args, **kwargs)


Expand Down
7 changes: 5 additions & 2 deletions cryptofeed/backends/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from collections import defaultdict

import bson
import motor.motor_asyncio


from cryptofeed.backends.backend import BackendBookCallback, BackendCallback


Expand Down Expand Up @@ -37,8 +38,10 @@ class FundingMongo(MongoCallback, BackendCallback):
class BookMongo(MongoCallback, BackendBookCallback):
default_key = 'book'

def __init__(self, *args, snapshots_only=False, **kwargs):
def __init__(self, *args, snapshots_only=False, snapshot_interval=1000, **kwargs):
self.snapshots_only = snapshots_only
self.snapshot_interval = snapshot_interval
self.snapshot_count = defaultdict(int)
super().__init__(*args, **kwargs)


Expand Down
5 changes: 4 additions & 1 deletion cryptofeed/backends/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from collections import defaultdict
from datetime import datetime as dt
from typing import Tuple

Expand Down Expand Up @@ -145,8 +146,10 @@ class LiquidationsPostgres(PostgresCallback, BackendCallback):
class BookPostgres(PostgresCallback, BackendBookCallback):
default_table = 'book'

def __init__(self, *args, snapshots_only=False, **kwargs):
def __init__(self, *args, snapshots_only=False, snapshot_interval=1000, **kwargs):
self.snapshots_only = snapshots_only
self.snapshot_interval = snapshot_interval
self.snapshot_count = defaultdict(int)
super().__init__(*args, **kwargs)


Expand Down
5 changes: 4 additions & 1 deletion cryptofeed/backends/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
associated with this software.
'''
import asyncio
from collections import defaultdict

import aio_pika
from yapic import json
Expand Down Expand Up @@ -78,8 +79,10 @@ class FundingRabbit(RabbitCallback, BackendCallback):


class BookRabbit(RabbitCallback, BackendBookCallback):
def __init__(self, *args, snapshots_only=False, **kwargs):
def __init__(self, *args, snapshots_only=False, snapshot_interval=1000, **kwargs):
self.snapshots_only = snapshots_only
self.snapshot_interval = snapshot_interval
self.snapshot_count = defaultdict(int)
super().__init__(*args, **kwargs)


Expand Down
10 changes: 8 additions & 2 deletions cryptofeed/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from collections import defaultdict
import asyncio

import aioredis
from yapic import json

Expand Down Expand Up @@ -133,8 +135,10 @@ class FundingStream(RedisStreamCallback, BackendCallback):
class BookRedis(RedisZSetCallback, BackendBookCallback):
default_key = 'book'

def __init__(self, *args, snapshots_only=False, score_key='receipt_timestamp', **kwargs):
def __init__(self, *args, snapshots_only=False, snapshot_interval=1000, score_key='receipt_timestamp', **kwargs):
self.snapshots_only = snapshots_only
self.snapshot_interval = snapshot_interval
self.snapshot_count = defaultdict(int)
super().__init__(*args, score_key=score_key, **kwargs)

async def write(self, data: dict):
Expand All @@ -149,8 +153,10 @@ async def write(self, data: dict):
class BookStream(RedisStreamCallback, BackendBookCallback):
default_key = 'book'

def __init__(self, *args, snapshots_only=False, **kwargs):
def __init__(self, *args, snapshots_only=False, snapshot_interval=1000, **kwargs):
self.snapshots_only = snapshots_only
self.snapshot_interval = snapshot_interval
self.snapshot_count = defaultdict(int)
super().__init__(*args, **kwargs)

async def write(self, data: dict):
Expand Down
5 changes: 4 additions & 1 deletion cryptofeed/backends/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from collections import defaultdict
import asyncio
import logging
from textwrap import wrap
Expand Down Expand Up @@ -111,8 +112,10 @@ class FundingSocket(SocketCallback, BackendCallback):
class BookSocket(SocketCallback, BackendBookCallback):
default_key = 'book'

def __init__(self, *args, snapshots_only=False, **kwargs):
def __init__(self, *args, snapshots_only=False, snapshot_interval=1000, **kwargs):
self.snapshots_only = snapshots_only
self.snapshot_interval = snapshot_interval
self.snapshot_count = defaultdict(int)
super().__init__(*args, **kwargs)


Expand Down
5 changes: 4 additions & 1 deletion cryptofeed/backends/victoriametrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from collections import defaultdict
import logging

from cryptofeed.backends.backend import BackendBookCallback, BackendCallback
Expand Down Expand Up @@ -109,8 +110,10 @@ class FundingVictoriaMetrics(VictoriaMetricsCallback, BackendCallback):


class BookVictoriaMetrics(VictoriaMetricsBookCallback, BackendBookCallback):
def __init__(self, *args, snapshots_only=False, **kwargs):
def __init__(self, *args, snapshots_only=False, snapshot_interval=1000, **kwargs):
self.snapshots_only = snapshots_only
self.snapshot_interval = snapshot_interval
self.snapshot_count = defaultdict(int)
super().__init__(*args, **kwargs)

async def write(self, data):
Expand Down
6 changes: 5 additions & 1 deletion cryptofeed/backends/zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from collections import defaultdict

import zmq
import zmq.asyncio
from yapic import json
Expand Down Expand Up @@ -48,8 +50,10 @@ class FundingZMQ(ZMQCallback, BackendCallback):
class BookZMQ(ZMQCallback, BackendBookCallback):
default_key = 'book'

def __init__(self, *args, snapshots_only=False, **kwargs):
def __init__(self, *args, snapshots_only=False, snapshot_interval=1000, **kwargs):
self.snapshots_only = snapshots_only
self.snapshot_interval = snapshot_interval
self.snapshot_count = defaultdict(int)
super().__init__(*args, **kwargs)


Expand Down
2 changes: 1 addition & 1 deletion examples/demo_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def main():
p.start()

f = FeedHandler()
f.add_feed(Kraken(max_depth=2, channels=[L2_BOOK], symbols=['ETH-USD'], callbacks={L2_BOOK: BookZMQ(snapshots_only=True, port=5678)}))
f.add_feed(Kraken(max_depth=2, channels=[L2_BOOK], symbols=['ETH-USD', 'BTC-USD'], callbacks={L2_BOOK: BookZMQ(snapshots_only=False, snapshot_interval=2, port=5678)}))
f.add_feed(Coinbase(channels=[TICKER], symbols=['BTC-USD'], callbacks={TICKER: TickerZMQ(port=5678)}))

f.run()
Expand Down

0 comments on commit 7a716ce

Please sign in to comment.