diff --git a/.compose.env b/.compose.env deleted file mode 100644 index 2d81271..0000000 --- a/.compose.env +++ /dev/null @@ -1,75 +0,0 @@ -# When relayer is inside docker - -NGINX_AUTH_SECRET="FNiPGdfBuvqToL7PHu4EHzu7ehSr8+L5yV2kkxSutSo" -RUST_LOG=info -DATABASE_URL=postgres://relayer:relayer@database:5432/relayer -REDIS_HOSTNAME=redis://default:foobared@redis-server/0 -POSTGRESQL_URL = postgresql://postgres:my_password@postgresql-master:5432/my_database -BROKER=kafka:29092 -QUESTDB_URL = postgresql://quest:my_password@questdb:8812/qdb -QUESTDB_INFLUX_URL = questdb:9009 - -# When relayer is inside host machine -# REDIS_HOSTNAME=redis://default:foobared@localhost/0 -# POSTGRESQL_URL = postgresql://postgres:my_password@localhost:5432/my_database -# QUESTDB_URL = postgresql://quest:my_password@localhost:8812/qdb -# QUESTDB_INFLUX_URL = 127.0.0.1:9009 -# BROKER = localhost:9092 - -# bitmex socket connection url - -BITMEX_BTC_SOCKET_ORDERBOOK_URL=wss://ws.bitmex.com/realtime?subscribe=orderBookL2_25:XBTUSD -BITMEX_BTC_SOCKET_INSTRUMENT_URL=wss://ws.bitmex.com/realtime?subscribe=instrument - -# Binance BTC socket URL - -# https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-mini-ticker-stream - -# A single connection to stream.binance.com is only valid for 24 hours; expect to be disconnected at the 24 hour mark - -# The websocket server will send a ping frame every 3 minutes. If the websocket server does not receive a pong frame back from the connection within a 10 minute period, the connection will be disconnected. Unsolicited pong frames are allowed. - -BINANCE_BTC_SOCKET=wss://stream.binance.com:9443/ws/btcusdt@miniTicker - -# Binance BTC API URL - -BINANCE_BTC_API=https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT - -# coincap API 2.0 BTC Price Socket URL - -# https://docs.coincap.io/#37dcec0b-1f7b-4d98-b152-0217a6798058 - -COINCAP_BTC_SOCKET=wss://ws.coincap.io/prices?assets=bitcoin - - -RelayerVersion=1.000 -SnapshotVersion=1.001 - -# RPC loadbalance mode -# RPC_QUEUE_MODE = KAFKA -# RPC_QUEUE_MODE = AERON -RPC_QUEUE_MODE = DIRECT - -#RPC Thread Count -RPC_SERVER_THREAD = 15 -RPC_SERVER_SOCKETADDR = 0.0.0.0:3032 - - -KAFKA_STATUS = Enabled - -# kafka topics -# Topics should be already created before running the application -# kafkalib::kafka_topic::kafka_new_topic("BinanceMiniTickerPayload"); -# kafkalib::kafka_topic::kafka_new_topic("CLIENT-REQUEST"); -# kafkalib::kafka_topic::kafka_new_topic("TraderOrderEventLog1"); -# kafkalib::kafka_topic::kafka_new_topic("LendOrderEventLog1"); -# kafkalib::kafka_topic::kafka_new_topic("LendPoolEventLog1"); - -PRICE_LOG = BinanceMiniTickerPayload -RPC_CLIENT_REQUEST=CLIENT-REQUEST - -TRADERORDER_EVENT_LOG = CoreEventLogTopic -LENDORDER_EVENT_LOG = CoreEventLogTopic -LENDPOOL_EVENT_LOG = CoreEventLogTopic -CORE_EVENT_LOG = CoreEventLogTopic -SNAPSHOT_LOG = SnapShotLogTopic diff --git a/.env b/.env deleted file mode 100644 index 36637ef..0000000 --- a/.env +++ /dev/null @@ -1,74 +0,0 @@ -# When relayer is inside docker - -# DATABASE_URL=postgres://relayer:relayer@localhost:5433/relayer -# REDIS_HOSTNAME=redis://default:foobared@redis-server/0 -# POSTGRESQL_URL = postgresql://postgres:my_password@postgresql-master:5432/my_database -# BROKER=localhost:9092 -# QUESTDB_URL = postgresql://quest:my_password@questdb:8812/qdb -# QUESTDB_INFLUX_URL = questdb:9009 - -# When relayer is inside host machine -DATABASE_URL=postgres://relayer:relayer@localhost:5433/relayer -REDIS_HOSTNAME=redis://default:foobared@localhost/0 -POSTGRESQL_URL=postgresql://postgres:my_password@localhost:5432/my_database -QUESTDB_URL=postgresql://quest:my_password@localhost:8812/qdb -QUESTDB_INFLUX_URL=127.0.0.1:9009 -BROKER=localhost:9092 -DATABASE_URL=postgres://relayer:relayer@localhost:5433/relayer -# bitmex socket connection url - -BITMEX_BTC_SOCKET_ORDERBOOK_URL=wss://ws.bitmex.com/realtime?subscribe=orderBookL2_25:XBTUSD -BITMEX_BTC_SOCKET_INSTRUMENT_URL=wss://ws.bitmex.com/realtime?subscribe=instrument - -# Binance BTC socket URL - -# https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-mini-ticker-stream - -# A single connection to stream.binance.com is only valid for 24 hours; expect to be disconnected at the 24 hour mark - -# The websocket server will send a ping frame every 3 minutes. If the websocket server does not receive a pong frame back from the connection within a 10 minute period, the connection will be disconnected. Unsolicited pong frames are allowed. - -BINANCE_BTC_SOCKET=wss://stream.binance.com:9443/ws/btcusdt@miniTicker - -# Binance BTC API URL - -BINANCE_BTC_API=https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT - -# coincap API 2.0 BTC Price Socket URL - -# https://docs.coincap.io/#37dcec0b-1f7b-4d98-b152-0217a6798058 - -COINCAP_BTC_SOCKET=wss://ws.coincap.io/prices?assets=bitcoin - - -RelayerVersion=1.000 -SnapshotVersion=1.001 - -# RPC loadbalance mode -# RPC_QUEUE_MODE = KAFKA -# RPC_QUEUE_MODE = AERON -RPC_QUEUE_MODE = DIRECT - -#RPC Thread Count -RPC_SERVER_THREAD = 15 -RPC_SERVER_SOCKETADDR = 0.0.0.0:3032 - - -KAFKA_STATUS = Enabled - -# kafka topics -# Topics should be already created before running the application -# kafkalib::kafka_topic::kafka_new_topic("BinanceMiniTickerPayload"); -# kafkalib::kafka_topic::kafka_new_topic("CLIENT-REQUEST"); -# kafkalib::kafka_topic::kafka_new_topic("TraderOrderEventLog1"); -# kafkalib::kafka_topic::kafka_new_topic("LendOrderEventLog1"); -# kafkalib::kafka_topic::kafka_new_topic("LendPoolEventLog1"); - -PRICE_LOG = BinanceMiniTickerPayload -RPC_CLIENT_REQUEST = CLIENT-REQUEST - -TRADERORDER_EVENT_LOG = CoreEventLogTopic -LENDORDER_EVENT_LOG = CoreEventLogTopic -LENDPOOL_EVENT_LOG = CoreEventLogTopic -CORE_EVENT_LOG = CoreEventLogTopic -SNAPSHOT_LOG = SnapShotLogTopic diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..410613e --- /dev/null +++ b/.env.example @@ -0,0 +1,20 @@ +# Authentication and Logging Configuration +NGINX_AUTH_SECRET="FNiPGdfBuvqToL7PHu4EHzu7ehSr8+L5yV2kkxSutSo" # Secret key for NGINX authentication +RUST_LOG=info # Logging level for Rust applications + +# Database Connection URLs +DATABASE_URL= # Main PostgreSQL database connection +REDIS_HOSTNAME= # Redis cache connection +BROKER= # Kafka message broker connection + +# Event Logging Topics +PRICE_LOG=BinanceMiniTickerPayload # Topic for price updates +RPC_CLIENT_REQUEST=CLIENT-REQUEST # Topic for RPC client requests +RPC_CLIENT_FAILED_REQUEST=CLIENT-FAILED-REQUEST # Topic for failed RPC requests + +# Event Log Topics for Different Components +TRADERORDER_EVENT_LOG=CoreEventLogTopic # Topic for trader order events +LENDORDER_EVENT_LOG=CoreEventLogTopic # Topic for lending order events +LENDPOOL_EVENT_LOG=CoreEventLogTopic # Topic for lending pool events +CORE_EVENT_LOG=CoreEventLogTopic # Topic for core system events +SNAPSHOT_LOG=SnapShotLogTopic # Topic for snapshot events \ No newline at end of file diff --git a/.gitignore b/.gitignore index b8045f3..27c6cb4 100644 --- a/.gitignore +++ b/.gitignore @@ -28,4 +28,5 @@ node_modules #/target /Cargo.lock -.vscode \ No newline at end of file +.vscode +.env diff --git a/Cargo.toml b/Cargo.toml index 3e66589..3e32910 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,9 +11,9 @@ name = "archiver" [[bin]] name = "api" -# [lib] -# name = "relayerarchiverlib" -# path = "src/lib.rs" +[lib] +name = "relayerarchiverlib" +path = "src/lib.rs" [dependencies] @@ -67,14 +67,14 @@ tower-http = { version = "0.4", features = [ "validate-request", ] } tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } -twilight-relayer-rust = { git = "ssh://git@github.com/twilight-project/twilight-relayer.git", branch = "output-archiver-0.03.031" } - uuid = { version = "1.3.3", features = ["serde", "v4", "v7"] } verify-keplr-sign = "0.1.0" +twilight-relayer-rust = { git = "ssh://git@github.com/twilight-project/twilight-relayer.git", branch = "develop" } +redis = { version = "0.25.4", features = ["r2d2"] } [dependencies.zkos-relayer-wallet] -git = "ssh://github.com/twilight-project/zkos-relayer-wallet.git" +git = "ssh://git@github.com/twilight-project/zkos-relayer-wallet.git" branch = "develop" [dependencies.zkos-client-wallet] -git = "ssh://github.com/twilight-project/zkos-client-wallet.git" +git = "ssh://git@github.com/twilight-project/zkos-client-wallet.git" branch = "develop" diff --git a/Dockerfile b/Dockerfile index bcad935..b77f982 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,18 +1,22 @@ -FROM rust:1.72.0-slim-buster as builder +FROM rust:1.87 as builder ARG SSH_KEY RUN apt-get update && apt-get install -y openssh-client git libssl-dev build-essential libpq-dev pkg-config RUN mkdir /root/.ssh -RUN echo "${SSH_KEY}" > /root/.ssh/id_rsa && \ +RUN echo "${SSH_KEY}" > /root/.ssh/id_ed25519 && \ touch /root/.ssh/known_hosts && \ ssh-keyscan github.com >> /root/.ssh/known_hosts && \ - chmod 0600 /root/.ssh/id_rsa - + chmod 0600 /root/.ssh/id_ed25519 COPY . ./twilight-relayerAPI +WORKDIR /twilight-relayerAPI + +RUN git config --global url."ssh://git@github.com/".insteadOf https://github.com/ +RUN git config --global url."ssh://git@github.com/".insteadOf ssh://github.com/ -RUN --mount=type=cache,target=/usr/local/cargo/registry \ - --mount=type=cache,target=${PWD}/target \ - cd ./twilight-relayerAPI && \ +RUN eval `ssh-agent -s` && ssh-add /root/.ssh/id_ed25519 && \ + git config --global url."ssh://git@github.com/".insteadOf https://github.com/ && \ + git config --global url."ssh://git@github.com/".insteadOf ssh://github.com/ && \ + git config --global url."ssh://git@github.com/twilight-project/zkos-relayer-wallet.git".insteadOf https://github.com/twilight-project/zkos-relayer-wallet.git && \ cargo --config "net.git-fetch-with-cli = true" b --release --bins FROM debian:10-slim @@ -23,7 +27,7 @@ COPY --from=builder ./twilight-relayerAPI/target/release/api ./ COPY --from=builder ./twilight-relayerAPI/target/release/archiver ./ COPY --from=builder ./twilight-relayerAPI/target/release/auth ./ COPY ./scripts/run.sh ./ -COPY ./.compose.env .env +COPY ./.env ./.env ENTRYPOINT ["/app/run.sh"] diff --git a/README.md b/README.md index 1471432..b3581e5 100644 --- a/README.md +++ b/README.md @@ -3,12 +3,14 @@ For subscriptions and rpcs, see [here](./docs/API.md) ## install diesel_cli (Only necessary for development) + ```command sudo apt install libpq-dev cargo install diesel_cli --no-default-features --features postgres ``` ## running with docker-compose + Build the container, you will need an ssh-key with read-only access to the twilight-relayer repo for this: ```console @@ -34,7 +36,12 @@ Check the .env file for the DATABASE_URL environment variable, be sure it's set `cargo r --release --bin api` +## Run the auth server + +`cargo r --release --bin auth` + ## Testing + Tests are using uuid features that require additional compiler flags: ```command diff --git a/migrations/2024-03-26-213323_add_aggregation_tables/down.sql b/migrations/2024-03-26-213323_add_aggregation_tables/down.sql new file mode 100644 index 0000000..fc66c07 --- /dev/null +++ b/migrations/2024-03-26-213323_add_aggregation_tables/down.sql @@ -0,0 +1,16 @@ +-- This file should undo anything in `up.sql` +-- DROP INDEX trader_order_time; +-- DROP INDEX price_time; +DROP INDEX trader_order_uuid; + + +DROP TABLE candles_1min; +DROP TABLE candles_1hour; +DROP TABLE candles_1day; + +DROP FUNCTION get_ohlc_interval; +DROP FUNCTION get_volume_interval; +DROP FUNCTION get_candles_interval; +DROP FUNCTION update_candles_1min; +DROP FUNCTION update_candles_1hour; +DROP FUNCTION update_candles_1day; diff --git a/migrations/2024-03-26-213323_add_aggregation_tables/up.sql b/migrations/2024-03-26-213323_add_aggregation_tables/up.sql new file mode 100644 index 0000000..b6433e0 --- /dev/null +++ b/migrations/2024-03-26-213323_add_aggregation_tables/up.sql @@ -0,0 +1,179 @@ +-- Your SQL goes here + +-- CREATE INDEX trader_order_time ON trader_order(timestamp); +-- CREATE INDEX price_time ON btc_usd_price(timestamp); + +CREATE INDEX trader_order_uuid ON trader_order(uuid); + + +CREATE TABLE candles_1min ( + start_time timestamptz primary key, + end_time timestamptz not null, + low numeric not null, + high numeric not null, + open numeric not null, + close numeric not null, + trades integer not null, + btc_volume numeric not null, + usd_volume numeric not null +); + +CREATE TABLE candles_1hour ( + start_time timestamptz primary key, + end_time timestamptz not null, + low numeric not null, + high numeric not null, + open numeric not null, + close numeric not null, + trades integer not null, + btc_volume numeric not null, + usd_volume numeric not null +); + +CREATE TABLE candles_1day ( + start_time timestamptz primary key, + end_time timestamptz not null, + low numeric not null, + high numeric not null, + open numeric not null, + close numeric not null, + trades integer not null, + btc_volume numeric not null, + usd_volume numeric not null +); + +CREATE FUNCTION get_ohlc_interval(intvl interval, trunc_by text, since timestamptz) +RETURNS TABLE(start_time timestamptz, end_time timestamptz, high numeric, low numeric, open numeric, close numeric) +AS $$ SELECT DISTINCT + bucket as start_time, + bucket + intvl - interval '1 microsecond' as end_time, + max(price) over w as high, + min(price) over w as low, + first_value(price) over w as close, + last_value(price) over w as open +from ( + SELECT date_trunc(trunc_by, timestamp) as bucket,* + FROM btc_usd_price + WHERE timestamp BETWEEN since AND now() +) as t +WINDOW w as (partition by bucket order by timestamp asc ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) +$$ +LANGUAGE SQL; + + +CREATE FUNCTION get_volume_interval(intvl interval, trunc_by text, since timestamptz) +RETURNS TABLE(start_time timestamptz, end_time timestamptz, usd_volume numeric, btc_volume numeric, trades integer) +AS $$ SELECT DISTINCT + bucket as start_time, + bucket + intvl - interval '1 microsecond' as end_time, + sum(entryprice) over w as usd_volume, + sum(positionsize) over w as btc_volume, + count(*) over w as trades +FROM ( + select date_trunc(trunc_by, timestamp) as bucket,* + from trader_order + WHERE timestamp BETWEEN since AND now() +) as t +WINDOW w as (partition by bucket ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) +$$ +LANGUAGE SQL; + + +CREATE FUNCTION get_candles_interval(intvl interval, trunc_by text, since timestamptz) +RETURNS TABLE( + start_time timestamptz, + end_time timestamptz, + usd_volume numeric, + btc_volume numeric, + trades integer, + open numeric, + high numeric, + low numeric, + close numeric +) +AS $$ +WITH t1 AS ( + SELECT * FROM get_ohlc_interval(intvl, trunc_by, since) +), t2 AS ( + SELECT * FROM get_volume_interval(intvl, trunc_by, since) +) SELECT + coalesce(t1.start_time, t2.start_time) as start_time, + coalesce(t1.end_time, t2.end_time) as start_time, + coalesce(t2.usd_volume, 0) as usd_volume, + coalesce(t2.btc_volume, 0) as btc_volume, + coalesce(t2.trades, 0) as trades, + coalesce(t1.open, 0) as open, + coalesce(t1.high, 0) as high, + coalesce(t1.low, 0) as low, + coalesce(t1.close, 0) as close +FROM + t1 FULL OUTER JOIN t2 + ON t1.start_time = t2.start_time +$$ +LANGUAGE SQL; + +CREATE FUNCTION update_candles_1min() +RETURNS void +AS $$ INSERT INTO candles_1min ( + start_time, end_time, usd_volume, btc_volume, trades, open, high, low, close +) +SELECT * FROM get_candles_interval('1 minute', 'minute', (SELECT coalesce(max(start_time) - interval '10 minute', '1970-01-01 00:00:00.0000+00'::timestamptz) FROM candles_1min)) + ON CONFLICT(start_time) + DO UPDATE SET + start_time = excluded.start_time, + end_time = excluded.end_time, + usd_volume = excluded.usd_volume, + btc_volume = excluded.btc_volume, + trades = excluded.trades, + open = excluded.open, + high = excluded.high, + low = excluded.low, + close = excluded.close + ; + +$$ +LANGUAGE SQL; + +CREATE FUNCTION update_candles_1hour() +RETURNS void +AS $$ INSERT INTO candles_1hour ( + start_time, end_time, usd_volume, btc_volume, trades, open, high, low, close +) +SELECT * FROM get_candles_interval('1 hour', 'hour', (SELECT coalesce(max(start_time) - interval '10 minute', '1970-01-01 00:00:00.0000+00'::timestamptz) FROM candles_1hour)) + ON CONFLICT(start_time) + DO UPDATE SET + start_time = excluded.start_time, + end_time = excluded.end_time, + usd_volume = excluded.usd_volume, + btc_volume = excluded.btc_volume, + trades = excluded.trades, + open = excluded.open, + high = excluded.high, + low = excluded.low, + close = excluded.close + ; + +$$ +LANGUAGE SQL; + +CREATE FUNCTION update_candles_1day() +RETURNS void +AS $$ INSERT INTO candles_1day ( + start_time, end_time, usd_volume, btc_volume, trades, open, high, low, close +) +SELECT * FROM get_candles_interval('1 day', 'day', (SELECT coalesce(max(start_time) - interval '10 minute', '1970-01-01 00:00:00.0000+00'::timestamptz) FROM candles_1day)) + ON CONFLICT(start_time) + DO UPDATE SET + start_time = excluded.start_time, + end_time = excluded.end_time, + usd_volume = excluded.usd_volume, + btc_volume = excluded.btc_volume, + trades = excluded.trades, + open = excluded.open, + high = excluded.high, + low = excluded.low, + close = excluded.close + ; + +$$ +LANGUAGE SQL; diff --git a/migrations/2024-03-27-065655_alter_transaction_hash/down.sql b/migrations/2024-03-27-065655_alter_transaction_hash/down.sql new file mode 100644 index 0000000..80e0342 --- /dev/null +++ b/migrations/2024-03-27-065655_alter_transaction_hash/down.sql @@ -0,0 +1,5 @@ +-- This file should undo anything in `up.sql` + +ALTER TABLE "transaction_hash" DROP COLUMN "request_id"; + + diff --git a/migrations/2024-03-27-065655_alter_transaction_hash/up.sql b/migrations/2024-03-27-065655_alter_transaction_hash/up.sql new file mode 100644 index 0000000..c2fbf68 --- /dev/null +++ b/migrations/2024-03-27-065655_alter_transaction_hash/up.sql @@ -0,0 +1,4 @@ +-- Your SQL goes here +ALTER TABLE "transaction_hash" ADD COLUMN "request_id" VARCHAR; + + diff --git a/migrations/2024-03-27-095235_order_status/down.sql b/migrations/2024-03-27-095235_order_status/down.sql new file mode 100644 index 0000000..dd6fc44 --- /dev/null +++ b/migrations/2024-03-27-095235_order_status/down.sql @@ -0,0 +1,16 @@ +-- This file should undo anything in `up.sql` + + +ALTER TYPE order_status DROP VALUE 'DuplicateOrder'; +ALTER TYPE order_status DROP VALUE 'UtxoError'; +ALTER TYPE order_status DROP VALUE 'Error'; +ALTER TYPE order_status DROP VALUE 'NoResponseFromChain'; +ALTER TYPE order_status DROP VALUE 'BincodeError'; +ALTER TYPE order_status DROP VALUE 'HexCodeError'; +ALTER TYPE order_status DROP VALUE 'SerializationError'; +ALTER TYPE order_status DROP VALUE 'RequestSubmitted'; +ALTER TYPE order_status DROP VALUE 'OrderNotFound'; +ALTER TYPE order_status DROP VALUE 'RejectedFromChain'; +ALTER TYPE order_status DROP VALUE 'FilledUpdated'; +DROP INDEX transaction_hash_account_id; +DROP INDEX transaction_hash_request_id; \ No newline at end of file diff --git a/migrations/2024-03-27-095235_order_status/up.sql b/migrations/2024-03-27-095235_order_status/up.sql new file mode 100644 index 0000000..d86d3b5 --- /dev/null +++ b/migrations/2024-03-27-095235_order_status/up.sql @@ -0,0 +1,16 @@ +-- Your SQL goes here + + +ALTER TYPE order_status ADD VALUE 'DuplicateOrder'; +ALTER TYPE order_status ADD VALUE 'UtxoError'; +ALTER TYPE order_status ADD VALUE 'Error'; +ALTER TYPE order_status ADD VALUE 'NoResponseFromChain'; +ALTER TYPE order_status ADD VALUE 'BincodeError'; +ALTER TYPE order_status ADD VALUE 'HexCodeError'; +ALTER TYPE order_status ADD VALUE 'SerializationError'; +ALTER TYPE order_status ADD VALUE 'RequestSubmitted'; +ALTER TYPE order_status ADD VALUE 'OrderNotFound'; +ALTER TYPE order_status ADD VALUE 'RejectedFromChain'; +ALTER TYPE order_status ADD VALUE 'FilledUpdated'; +CREATE INDEX transaction_hash_account_id ON transaction_hash(account_id); +CREATE INDEX transaction_hash_request_id ON transaction_hash(request_id); \ No newline at end of file diff --git a/migrations/2024-06-20-130635_candle_data_trigger/down.sql b/migrations/2024-06-20-130635_candle_data_trigger/down.sql new file mode 100644 index 0000000..d9a93fe --- /dev/null +++ b/migrations/2024-06-20-130635_candle_data_trigger/down.sql @@ -0,0 +1 @@ +-- This file should undo anything in `up.sql` diff --git a/migrations/2024-06-20-130635_candle_data_trigger/up.sql b/migrations/2024-06-20-130635_candle_data_trigger/up.sql new file mode 100644 index 0000000..129796f --- /dev/null +++ b/migrations/2024-06-20-130635_candle_data_trigger/up.sql @@ -0,0 +1,86 @@ +-- Your SQL goes here + +-- FUNCTION: public.create_price_triger_after_insert() + +-- DROP FUNCTION IF EXISTS public.create_price_triger_after_insert(); + +CREATE OR REPLACE FUNCTION public.create_price_triger_after_insert_for_candle_data_generation() + RETURNS trigger + LANGUAGE 'plpgsql' + COST 100 + VOLATILE NOT LEAKPROOF +AS $BODY$ +BEGIN + + CASE + WHEN (NOW() <( New."timestamp" + INTERVAL '500 ms')) THEN + PERFORM update_candles_1min(); + PERFORM update_candles_1hour(); + PERFORM update_candles_1day(); + ELSE + NULL; + END CASE; + + RETURN New; +END; +$BODY$; + +ALTER FUNCTION public.create_price_triger_after_insert_for_candle_data_generation() + OWNER TO relayer; + + +-- Trigger: after_insert_price_trigger + +-- DROP TRIGGER IF EXISTS after_insert_price_trigger ON public.btc_usd_price; + +CREATE OR REPLACE TRIGGER after_insert_price_trigger_for_candle_data_generation + AFTER INSERT + ON public.btc_usd_price + FOR EACH ROW + EXECUTE FUNCTION public.create_price_triger_after_insert_for_candle_data_generation(); + + + +-- FUNCTION: public.update_request_id_after_pending_order_fill() + +-- DROP FUNCTION IF EXISTS public.update_request_id_after_pending_order_fill(); + +CREATE OR REPLACE FUNCTION public.update_request_id_after_pending_order_fill() + RETURNS trigger + LANGUAGE 'plpgsql' + COST 100 + VOLATILE NOT LEAKPROOF +AS $BODY$ +BEGIN + + CASE + WHEN (New.request_id is NULL and New.order_status = 'FILLED') THEN + UPDATE public.transaction_hash + SET request_id=(Select request_id from public.transaction_hash where order_id = New.order_id and order_status = 'PENDING' limit 1) + WHERE id = New.id and order_status = 'FILLED'; + + WHEN (New.request_id is NULL and New.order_status = 'LIQUIDATE') THEN + UPDATE public.transaction_hash + SET request_id=(Select request_id from public.transaction_hash where order_id = New.order_id and order_status = 'FILLED' limit 1) + WHERE id = New.id and order_status = 'LIQUIDATE'; + ELSE + NULL; + END CASE; + + RETURN New; +END; +$BODY$; + +ALTER FUNCTION public.update_request_id_after_pending_order_fill() + OWNER TO relayer; + + +-- Trigger: after_insert_update_request_id_after_pending_order_fill + +-- DROP TRIGGER IF EXISTS after_insert_update_request_id_after_pending_order_fill ON public.transaction_hash; + +CREATE OR REPLACE TRIGGER after_insert_update_request_id_after_pending_order_fill + AFTER INSERT + ON public.transaction_hash + FOR EACH ROW + EXECUTE FUNCTION public.update_request_id_after_pending_order_fill(); \ No newline at end of file diff --git a/migrations/2024-07-26-163222_funding_updates_table/down.sql b/migrations/2024-07-26-163222_funding_updates_table/down.sql new file mode 100644 index 0000000..1eccb8e --- /dev/null +++ b/migrations/2024-07-26-163222_funding_updates_table/down.sql @@ -0,0 +1,10 @@ +-- This file should undo anything in `up.sql` +DROP TABLE IF EXISTS public.trader_order_funding_updated; + +-- Index: timestamp_trader_order_funding_updated + +DROP INDEX IF EXISTS public.timestamp_trader_order_funding_updated; + +-- Index: trader_order_funding_updated_uuid + +DROP INDEX IF EXISTS public.trader_order_funding_updated_uuid; \ No newline at end of file diff --git a/migrations/2024-07-26-163222_funding_updates_table/up.sql b/migrations/2024-07-26-163222_funding_updates_table/up.sql new file mode 100644 index 0000000..5c72c0a --- /dev/null +++ b/migrations/2024-07-26-163222_funding_updates_table/up.sql @@ -0,0 +1,51 @@ +-- Your SQL goes here + +-- DROP TABLE IF EXISTS public.trader_order_funding_updated; + +CREATE TABLE IF NOT EXISTS public.trader_order_funding_updated +( + id bigint NOT NULL DEFAULT nextval('trader_order_id_seq'::regclass), + uuid character varying(64) COLLATE pg_catalog."default" NOT NULL, + account_id character varying COLLATE pg_catalog."default" NOT NULL, + position_type position_type NOT NULL, + order_status order_status NOT NULL, + order_type order_type NOT NULL, + entryprice numeric NOT NULL, + execution_price numeric NOT NULL, + positionsize numeric NOT NULL, + leverage numeric NOT NULL, + initial_margin numeric NOT NULL, + available_margin numeric NOT NULL, + "timestamp" timestamp with time zone NOT NULL, + bankruptcy_price numeric NOT NULL, + bankruptcy_value numeric NOT NULL, + maintenance_margin numeric NOT NULL, + liquidation_price numeric NOT NULL, + unrealized_pnl numeric NOT NULL, + settlement_price numeric NOT NULL, + entry_nonce bigint NOT NULL, + exit_nonce bigint NOT NULL, + entry_sequence bigint NOT NULL, + CONSTRAINT trader_order_funding_updated_pkey PRIMARY KEY (id) +) + +TABLESPACE pg_default; + +ALTER TABLE IF EXISTS public.trader_order_funding_updated + OWNER to relayer; +-- Index: timestamp_trader_order_funding_updated + +-- DROP INDEX IF EXISTS public.timestamp_trader_order_funding_updated; + +CREATE INDEX IF NOT EXISTS timestamp_trader_order_funding_updated + ON public.trader_order_funding_updated USING btree + ("timestamp" ASC NULLS LAST) + TABLESPACE pg_default; +-- Index: trader_order_funding_updated_uuid + +-- DROP INDEX IF EXISTS public.trader_order_funding_updated_uuid; + +CREATE INDEX IF NOT EXISTS trader_order_funding_updated_uuid + ON public.trader_order_funding_updated USING btree + (uuid COLLATE pg_catalog."default" ASC NULLS LAST) + TABLESPACE pg_default; \ No newline at end of file diff --git a/migrations/2025-06-23-213723_create_orderbook_view/down.sql b/migrations/2025-06-23-213723_create_orderbook_view/down.sql new file mode 100644 index 0000000..3dc2d36 --- /dev/null +++ b/migrations/2025-06-23-213723_create_orderbook_view/down.sql @@ -0,0 +1,8 @@ + +DROP VIEW IF EXISTS orderbook; + + +DROP INDEX IF EXISTS sorted_set_command_cmd_uuid; +DROP INDEX IF EXISTS trader_order_open_limit_idx; +DROP INDEX IF EXISTS trader_order_uuid_id_desc; +DROP INDEX IF EXISTS sorted_set_command_uuid_id_desc_close_limit; diff --git a/migrations/2025-06-23-213723_create_orderbook_view/up.sql b/migrations/2025-06-23-213723_create_orderbook_view/up.sql new file mode 100644 index 0000000..da261e1 --- /dev/null +++ b/migrations/2025-06-23-213723_create_orderbook_view/up.sql @@ -0,0 +1,110 @@ +-- Orderbook view +CREATE OR REPLACE VIEW orderbook AS +WITH ranked AS ( + SELECT t.*, + row_number() OVER ( + PARTITION BY t.uuid + ORDER BY t.id DESC + ) AS rn + FROM trader_order t +), + +sc_latest AS ( + SELECT uuid, + MAX(id) AS max_sc_id + FROM sorted_set_command + WHERE command IN ( + 'ADD_CLOSE_LIMIT_PRICE'::sorted_set_command_type, + 'UPDATE_CLOSE_LIMIT_PRICE'::sorted_set_command_type + ) + GROUP BY uuid +) + +-- /* === branch A: newest OPEN-LIMIT order per uuid ================= */ +SELECT r.id, + r.uuid, + r.account_id, + r.position_type, + r.order_status, + r.order_type, + CAST(r.entryprice AS numeric) AS entryprice, + r.execution_price, + r.positionsize, + r.leverage, + r.initial_margin, + r.available_margin, + r."timestamp", + r.bankruptcy_price, + r.bankruptcy_value, + r.maintenance_margin, + r.liquidation_price, + r.unrealized_pnl, + r.settlement_price, + r.entry_nonce, + r.exit_nonce, + r.entry_sequence +FROM ranked r +WHERE r.rn = 1 + AND r.order_type = 'LIMIT' + AND r.order_status NOT IN ('FILLED','CANCELLED','LIQUIDATE','SETTLED') + +UNION ALL + +-- /* === branch B: newest FILLED order + latest ADD/UPDATE cmd ====== */ +SELECT r.id, + r.uuid, + r.account_id, + CASE + WHEN r.position_type = 'LONG' THEN 'SHORT' + WHEN r.position_type = 'SHORT' THEN 'LONG' + ELSE r.position_type + END AS position_type, + r.order_status, + r.order_type, + CAST(sc.amount AS numeric) AS entryprice, + r.execution_price, + r.positionsize, + r.leverage, + r.initial_margin, + r.available_margin, + r."timestamp", + r.bankruptcy_price, + r.bankruptcy_value, + r.maintenance_margin, + r.liquidation_price, + r.unrealized_pnl, + r.settlement_price, + r.entry_nonce, + r.exit_nonce, + r.entry_sequence +FROM ranked r +JOIN sc_latest ls ON ls.uuid = r.uuid +JOIN sorted_set_command sc ON sc.id = ls.max_sc_id +WHERE r.rn = 1 + AND r.order_status = 'FILLED'; + +-- /* ================================================================ */ +-- /* 2️⃣ Performance indexes */ +-- /* ================================================================ */ + +-- /* Latest row look-ups for both open & filled branches */ +CREATE INDEX IF NOT EXISTS trader_order_uuid_id_desc + ON trader_order (uuid, id DESC); + +-- /* Selective index for open-LIMIT orders */ +CREATE INDEX IF NOT EXISTS trader_order_open_limit_idx + ON trader_order (uuid, id DESC) + WHERE order_type = 'LIMIT' + AND order_status NOT IN ('FILLED','CANCELLED','LIQUIDATE','SETTLED'); + +-- /* Join filter on any close-limit command (ADD or UPDATE) */ +CREATE INDEX IF NOT EXISTS sorted_set_command_cmd_uuid + ON sorted_set_command (command, uuid); + +-- /* Optional: index that matches sc_latest’s GROUP BY + MAX pattern */ +CREATE INDEX IF NOT EXISTS sorted_set_command_uuid_id_desc_close_limit + ON sorted_set_command (uuid, id DESC) + WHERE command IN ( + 'ADD_CLOSE_LIMIT_PRICE'::sorted_set_command_type, + 'UPDATE_CLOSE_LIMIT_PRICE'::sorted_set_command_type + ); diff --git a/redis_script.lua b/redis_script.lua new file mode 100644 index 0000000..6109d62 --- /dev/null +++ b/redis_script.lua @@ -0,0 +1,79 @@ + -- args: + local id = ARGV[1] + local status = ARGV[2] + local side = ARGV[3] + local price = tonumber(ARGV[4]) + local price_cents = tonumber(ARGV[5]) + local size = tonumber(ARGV[6]) + local timestamp = ARGV[7] + local time = tonumber(ARGV[8]) + local exp_time = tonumber(ARGV[9]) + -- OPEN_LIMIT or CLOSE_LIMIT or OPEN_MARKET or CLOSE_MARKET + local execution_type = ARGV[10] + redis.call('ECHO', 'id: ' .. id) + + if (status == "FILLED" or status == "SETTLED" or status == "LIQUIDATE") and execution_type ~= "CLOSE_LIMIT" then + local old_price = tonumber(redis.call('HGET', 'orders', id)) + redis.call('HDEL', 'orders', id) + + + local table = { order_id = id, side = side, price = price, positionsize = size, timestamp = timestamp } + + local order_json = cjson.encode(table) + redis.call('ZADD', 'recent_orders', time, order_json) + + + local result = tonumber(redis.pcall('ZRANGEBYSCORE', side, old_price, old_price)[1]) or 0 + if result == 0 then + return + end + local new_size = 0 + if (status == "SETTLED" or status == "LIQUIDATE") then + new_size = result - size + else + new_size = result - (size*old_price/price_cents) + end + + redis.call('ZREM', side, result) + + if new_size > 0 + then + redis.call('ZADD', side, old_price, new_size) + end + return + end + + -- settle order on limit + -- just opened a new order + if status == "PENDING" or (status == "FILLED" and execution_type == "CLOSE_LIMIT") then + -- if the limit order is already exist then remove the old limit price and position size + local is_exist =redis.call('HEXISTS', 'orders', id) + if is_exist == 1 then + local old_price = tonumber(redis.call('HGET', 'orders', id)) + redis.call('HDEL', 'orders', id) + local old_position_size = tonumber(redis.pcall('ZRANGEBYSCORE', side, old_price, old_price)[1]) or 0 + if old_position_size > 0 then + + local new_size = old_position_size - size + + redis.call('ZREM', side, old_position_size) + + if new_size > 0 + then + redis.call('ZADD', side, old_price, new_size) + end + end + end + -- add the new limit price and position size + redis.call('HSET', 'orders', id, price_cents) + + local result = tonumber(redis.pcall('ZRANGEBYSCORE', side, price_cents, price_cents)[1]) or 0 + local new_size = result + size + + if result ~= 0 then + redis.call('ZREM', side, result) + end + redis.call('ZADD', side, price_cents, new_size) + end + -- TODO: clean out expired > 24h... + redis.call('ZREMRANGEBYSCORE', 'recent_orders', 0, exp_time) \ No newline at end of file diff --git a/src/archiver.rs b/src/archiver.rs index deda284..23c9864 100644 --- a/src/archiver.rs +++ b/src/archiver.rs @@ -1,27 +1,120 @@ use crate::{database::*, error::ApiError, kafka::Completion, migrations}; +use bigdecimal::ToPrimitive; use chrono::prelude::*; +use chrono::TimeDelta; use crossbeam_channel::{Receiver, Sender}; use diesel::prelude::PgConnection; use diesel::r2d2::ConnectionManager; use log::{debug, error, info, trace}; use r2d2::PooledConnection; -use std::time::{Duration, Instant}; +use redis::Client; +use serde_json::json; +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; use twilight_relayer_rust::{ db::{self as relayer_db, Event}, relayer, }; const BATCH_INTERVAL: u64 = 100; -const BATCH_SIZE: usize = 500; +const BATCH_SIZE: usize = 2_000; const MAX_RETRIES: usize = 5; const RETRY_SLEEP: u64 = 2000; +const PIPELINE_CHUNK: usize = 512; type ManagedConnection = ConnectionManager; type ManagedPool = r2d2::Pool; +const UPDATE_FN: &str = r#" + -- args: + local id = ARGV[1] + local status = ARGV[2] + local side = ARGV[3] + local price = tonumber(ARGV[4]) + local price_cents = tonumber(ARGV[5]) + local size = tonumber(ARGV[6]) + local timestamp = ARGV[7] + local time = tonumber(ARGV[8]) + local exp_time = tonumber(ARGV[9]) + -- OPEN_LIMIT or CLOSE_LIMIT or OPEN_MARKET or CLOSE_MARKET + local execution_type = ARGV[10] + redis.call('ECHO', 'id: ' .. id) + + if (status == "FILLED" or status == "SETTLED" or status == "LIQUIDATE") and execution_type ~= "CLOSE_LIMIT" then + local old_price = tonumber(redis.call('HGET', 'orders', id)) + redis.call('HDEL', 'orders', id) + + + local table = { order_id = id, side = side, price = price, positionsize = size, timestamp = timestamp } + + local order_json = cjson.encode(table) + redis.call('ZADD', 'recent_orders', time, order_json) + + + local result = tonumber(redis.pcall('ZRANGEBYSCORE', side, old_price, old_price)[1]) or 0 + if result == 0 then + return + end + local new_size = 0 + if (status == "SETTLED" or status == "LIQUIDATE") then + new_size = result - size + else + new_size = result - (size*old_price/price_cents) + end + + redis.call('ZREM', side, result) + + if new_size > 0 + then + redis.call('ZADD', side, old_price, new_size) + end + return + end + + -- settle order on limit + -- just opened a new order + if status == "PENDING" or (status == "FILLED" and execution_type == "CLOSE_LIMIT") then + -- if the limit order is already exist then remove the old limit price and position size + local is_exist =redis.call('HEXISTS', 'orders', id) + if is_exist == 1 then + local old_price = tonumber(redis.call('HGET', 'orders', id)) + redis.call('HDEL', 'orders', id) + local old_position_size = tonumber(redis.pcall('ZRANGEBYSCORE', side, old_price, old_price)[1]) or 0 + if old_position_size > 0 then + + local new_size = old_position_size - size + + redis.call('ZREM', side, old_position_size) + + if new_size > 0 + then + redis.call('ZADD', side, old_price, new_size) + end + end + end + -- add the new limit price and position size + redis.call('HSET', 'orders', id, price_cents) + + local result = tonumber(redis.pcall('ZRANGEBYSCORE', side, price_cents, price_cents)[1]) or 0 + local new_size = result + size + + if result ~= 0 then + redis.call('ZREM', side, result) + end + redis.call('ZADD', side, price_cents, new_size) + end + -- TODO: clean out expired > 24h... + redis.call('ZREMRANGEBYSCORE', 'recent_orders', 0, exp_time) +"#; + pub struct DatabaseArchiver { + redis: Client, pool: ManagedPool, + script_sha: String, trader_orders: Vec, + trader_order_funding_updated: Vec, lend_orders: Vec, position_size: Vec, tx_hashes: Vec, @@ -34,15 +127,21 @@ pub struct DatabaseArchiver { impl DatabaseArchiver { /// Start an archiver, provided a postgres connection string. - pub fn from_host(database_url: String, completions: Sender) -> DatabaseArchiver { + pub fn from_host( + database_url: &str, + redis_url: &str, + completions: Sender, + ) -> DatabaseArchiver { let manager = ConnectionManager::::new(database_url); let pool = r2d2::Pool::new(manager).expect("Could not instantiate connection pool"); + let redis = Client::open(redis_url).expect("Could not establish redis connection"); let mut conn = pool.get().expect("Could not get pooled connection!"); migrations::run_migrations(&mut *conn).expect("Failed to run database migrations!"); let trader_orders = Vec::with_capacity(BATCH_SIZE); + let trader_order_funding_updated = Vec::with_capacity(BATCH_SIZE); let lend_orders = Vec::with_capacity(BATCH_SIZE); let position_size = Vec::with_capacity(BATCH_SIZE); let tx_hashes = Vec::with_capacity(BATCH_SIZE); @@ -51,9 +150,21 @@ impl DatabaseArchiver { let lend_pool_commands = Vec::with_capacity(BATCH_SIZE); let nonce = Nonce::get(&mut conn).expect("Failed to query for current nonce"); + Self::load_cache(pool.clone(), redis.clone()); + + let mut redis_conn = redis.get_connection().expect("Redis connection"); + let script_sha: String = redis::cmd("SCRIPT") + .arg("LOAD") + .arg(UPDATE_FN) + .query(&mut redis_conn) + .expect("Script load failed"); + DatabaseArchiver { + redis, pool, + script_sha, trader_orders, + trader_order_funding_updated, lend_orders, position_size, tx_hashes, @@ -65,6 +176,171 @@ impl DatabaseArchiver { } } + fn load_cache(pool: ManagedPool, redis: Client) { + let mut conn = pool.get().expect("Could not get pooled connection!"); + + let recent_orders = + TraderOrder::list_past_24hrs(&mut conn).expect("Failed to load recent orders"); + let order_book_orders = + TraderOrder::order_book_orders(&mut conn).expect("Failed to load order book"); + { + let mut pipe = redis::pipe(); + + let mut redis_conn = redis + .get_connection() + .expect("Failed to acquire redis connection"); + + let mut cmd = redis::cmd("DEL"); + cmd.arg("recent_orders"); + + pipe.add_command(cmd); + + // cmd.execute(&mut redis_conn); + let mut cmd = redis::cmd("DEL"); + cmd.arg("orders"); + + pipe.add_command(cmd); + // cmd.execute(&mut redis_conn); + + let mut cmd = redis::cmd("DEL"); + cmd.arg("bid"); + + pipe.add_command(cmd); + // cmd.execute(&mut redis_conn); + + let mut cmd = redis::cmd("DEL"); + cmd.arg("ask"); + // cmd.execute(&mut redis_conn); + pipe.add_command(cmd); + + pipe.atomic().execute(&mut redis_conn); + } + for chunk in recent_orders.chunks(PIPELINE_CHUNK) { + let mut redis_conn = redis + .get_connection() + .expect("Failed to acquire redis connection"); + let mut pipe = redis::pipe(); + + for item in chunk { + let timestamp = item.timestamp.timestamp_millis(); + + let order = json!({ + "order_id": item.order_id.clone(), + "side": item.side.to_string(), + "price": item.price.to_f64().unwrap(), + "positionsize": item.positionsize.to_f64().unwrap(), + "timestamp": item.timestamp.to_rfc3339(), + }); + + let order = serde_json::to_string(&order).expect("Invalid JSON"); + + let mut cmd = redis::cmd("ZADD"); + cmd.arg("recent_orders").arg(timestamp).arg(order); + pipe.add_command(cmd); + } + + pipe.atomic().execute(&mut redis_conn); + } + + let mut redis_conn = redis + .get_connection() + .expect("Failed to acquire redis connection"); + let mut bids = HashMap::new(); + let mut asks = HashMap::new(); + + for order in order_book_orders.iter() { + let key = (order.entryprice.to_f64().unwrap() * 100.0) as i64; + let positionsize = order.positionsize.to_f64().unwrap() as i64; + + if order.position_type == PositionType::SHORT { + asks.entry(key) + .and_modify(|size| *size += positionsize) + .or_insert(positionsize); + } else { + bids.entry(key) + .and_modify(|size| *size += positionsize) + .or_insert(positionsize); + } + + redis::cmd("HSET") + .arg("orders") + // .arg("id") + .arg(order.uuid.clone()) + // .arg("price") + .arg((order.entryprice.to_f64().unwrap() * 100.0) as i64) + .execute(&mut redis_conn); + } + + for (price, size) in bids.into_iter() { + redis::cmd("ZADD") + .arg("bid") + .arg(price) + .arg(size.to_i64().unwrap()) + .execute(&mut redis_conn); + } + + for (price, size) in asks.into_iter() { + redis::cmd("ZADD") + .arg("ask") + .arg(price) + .arg(size.to_i64().unwrap()) + .execute(&mut redis_conn); + } + } + + fn update_sorted_set(&self, _cmd: &relayer::SortedSetCommand) -> Result<(), ApiError> { + Ok(()) + } + + fn update_order_cache(&self, order: &InsertTraderOrder) -> Result<(), ApiError> { + let mut pipe = redis::pipe(); + let side; + let price; + let execution_type; + if order.order_status == OrderStatus::SETTLED + || order.order_status == OrderStatus::LIQUIDATE + { + execution_type = "CLOSE_MARKET"; + price = order.settlement_price.to_f64().unwrap(); + side = match order.position_type { + PositionType::LONG => "ask", + PositionType::SHORT => "bid", + } + } else { + execution_type = match order.order_type { + OrderType::LIMIT => "OPEN_LIMIT", + OrderType::MARKET => "OPEN_MARKET", + OrderType::DARK => "OPEN_MARKET", + _ => "", //lend not applicable + }; + price = order.entryprice.to_f64().unwrap(); + side = match order.position_type { + PositionType::LONG => "bid", + PositionType::SHORT => "ask", + }; + } + + let mut cmd = redis::cmd("EVALSHA"); + cmd.arg(&self.script_sha) + .arg(0) + .arg(order.uuid.clone()) + .arg(order.order_status.as_str()) + .arg(side) + .arg((price) as i64) + .arg((price * 100.0) as i64) + .arg(order.positionsize.to_f64().unwrap() as i64) + .arg(order.timestamp.to_rfc3339()) + .arg(order.timestamp.timestamp_millis() as i64) + .arg((Utc::now() - TimeDelta::days(1)).timestamp_millis() as i64) + .arg(execution_type); + + pipe.add_command(cmd); + let mut redis_conn = self.redis.get_connection()?; + pipe.atomic().execute(&mut redis_conn); + + Ok(()) + } + /// Fetch a connection, will retry MAX_RETRIES before giving up. fn get_conn(&self) -> Result, ApiError> { let mut retries = MAX_RETRIES; @@ -95,6 +371,7 @@ impl DatabaseArchiver { sorted_set_update: relayer::SortedSetCommand, ) -> Result<(), ApiError> { debug!("Appending sorted set update"); + let _ = self.update_sorted_set(&sorted_set_update); self.sorted_set.push(sorted_set_update); if self.sorted_set.len() == self.sorted_set.capacity() { @@ -179,6 +456,8 @@ impl DatabaseArchiver { /// queue. fn trader_order(&mut self, order: InsertTraderOrder) -> Result<(), ApiError> { debug!("Appending trader order"); + + let _ = self.update_order_cache(&order); self.trader_orders.push(order); if self.trader_orders.len() == self.trader_orders.capacity() { @@ -187,6 +466,55 @@ impl DatabaseArchiver { Ok(()) } + /// Add a trader order to the next update batch, if the queue is full, commit and clear the + /// queue. for trader order limit updates on settlement + fn trader_order_limit_update_redis( + &mut self, + order: InsertTraderOrder, + settlement_price: f64, + ) -> Result<(), ApiError> { + let mut pipe = redis::pipe(); + let side; + let price; + let execution_type; + if order.order_status == OrderStatus::SETTLED + || order.order_status == OrderStatus::LIQUIDATE + { + execution_type = "CLOSE_MARKET"; + price = order.settlement_price.to_f64().unwrap(); + side = match order.position_type { + PositionType::LONG => "ask", + PositionType::SHORT => "bid", + } + } else { + execution_type = "CLOSE_LIMIT"; + price = settlement_price; + side = match order.position_type { + PositionType::LONG => "ask", + PositionType::SHORT => "bid", + }; + } + + let mut cmd = redis::cmd("EVALSHA"); + cmd.arg(&self.script_sha) + .arg(0) + .arg(order.uuid.clone()) + .arg(order.order_status.as_str()) + .arg(side) + .arg((price) as i64) + .arg((price * 100.0) as i64) + .arg(order.positionsize.to_f64().unwrap() as i64) + .arg(order.timestamp.to_rfc3339()) + .arg(order.timestamp.timestamp_millis() as i64) + .arg((Utc::now() - TimeDelta::days(1)).timestamp_millis() as i64) + .arg(execution_type); + + pipe.add_command(cmd); + let mut redis_conn = self.redis.get_connection()?; + pipe.atomic().execute(&mut redis_conn); + + Ok(()) + } /// Commit a batch of trader orders to the database. If we're failing to update the database, we /// should exit. @@ -202,6 +530,36 @@ impl DatabaseArchiver { Ok(()) } + /// Add a trader order funidng update to the next update batch, if the queue is full, commit and clear the + /// queue. + fn trader_order_funding_update( + &mut self, + order: InsertTraderOrderFundingUpdates, + ) -> Result<(), ApiError> { + debug!("Appending trader order"); + self.trader_order_funding_updated.push(order); + + if self.trader_order_funding_updated.len() == self.trader_order_funding_updated.capacity() { + self.commit_trader_order_funding_updated()?; + } + + Ok(()) + } + + /// Commit a batch of trader orders funidng update to the database. If we're failing to update the database, we + /// should exit. + fn commit_trader_order_funding_updated(&mut self) -> Result<(), ApiError> { + debug!("Committing trader orders"); + + let mut conn = self.get_conn()?; + + let mut orders = Vec::with_capacity(self.trader_order_funding_updated.capacity()); + std::mem::swap(&mut orders, &mut self.trader_order_funding_updated); + + TraderOrderFundingUpdates::insert(&mut conn, orders)?; + + Ok(()) + } /// Add a lend order to the next update batch, if the queue is full, commit and clear the /// queue. @@ -292,6 +650,9 @@ impl DatabaseArchiver { if self.trader_orders.len() > 0 { self.commit_trader_orders()?; } + if self.trader_order_funding_updated.len() > 0 { + self.commit_trader_order_funding_updated()?; + } if self.lend_orders.len() > 0 { self.commit_lend_orders()?; @@ -329,7 +690,20 @@ impl DatabaseArchiver { self.trader_order(trader_order.into())?; } Event::TraderOrderFundingUpdate(trader_order, _cmd) => { - self.trader_order(trader_order.into())?; + self.trader_order_funding_update(trader_order.into())?; + } + // added for limit order update for settlement order + Event::TraderOrderLimitUpdate(trader_order, cmd, _seq) => { + let settlement_price = match cmd { + twilight_relayer_rust::relayer::RpcCommand::ExecuteTraderOrder( + execute_trader_order, + _meta, + _zkos_hex_string, + _request_id, + ) => execute_trader_order.execution_price, + _ => 0.0, // Default value for other command types + }; + self.trader_order_limit_update_redis(trader_order.into(), settlement_price)?; } Event::TraderOrderLiquidation(trader_order, _cmd, _seq) => { self.trader_order(trader_order.into())?; @@ -368,6 +742,7 @@ impl DatabaseArchiver { order_status, datetime, output, + request_id, ) => { let hash = NewTxHash { order_id: uuid.to_string(), @@ -377,9 +752,32 @@ impl DatabaseArchiver { order_status: order_status.into(), datetime, output, + request_id: Some(request_id), }; self.tx_hash(hash)?; } + Event::TxHashUpdate( + uuid, + account_id, + tx_hash, + order_type, + order_status, + datetime, + output, + ) => { + let hash = NewTxHash { + order_id: uuid.to_string(), + account_id, + tx_hash, + order_type: order_type.into(), + order_status: order_status.into(), + datetime, + output, + request_id: None, + }; + self.tx_hash(hash)?; + } + Event::AdvanceStateQueue(_, _) => {} } Ok(()) @@ -395,7 +793,7 @@ impl DatabaseArchiver { for msg in msgs { self.process_msg(msg)?; } - self.commit_orders()?; + self.completions .send(completion) .map_err(|e| ApiError::CrossbeamChannel(format!("{:?}", e)))?; diff --git a/src/bin/api.rs b/src/bin/api.rs index da6222c..e9d2610 100644 --- a/src/bin/api.rs +++ b/src/bin/api.rs @@ -1,11 +1,11 @@ use jsonrpsee::server::ServerBuilder; use log::info; +use relayerarchiverlib::{rpc, ws}; use std::{net::SocketAddr, time::Duration}; use structopt::StructOpt; use tokio::time::sleep; use tower::ServiceBuilder; use tower_http::cors::{Any, CorsLayer}; -use twilight_relayerAPI::{rpc, ws}; #[derive(Debug, StructOpt)] #[structopt(name = "Relayer API", about = "Twilight Relayer API server")] @@ -32,8 +32,8 @@ struct Opt { )] ws_listen_addr: SocketAddr, } - -#[tokio::main] +#[tokio::main(flavor = "multi_thread", worker_threads = 40)] +// #[tokio::main] async fn main() { let opts = Opt::from_args(); dotenv::dotenv().expect("dotenv file not found!"); @@ -45,6 +45,7 @@ async fn main() { .init(); let database_url = std::env::var("DATABASE_URL").expect("No database url found!"); + let redis_url = std::env::var("ORDERBOOK_REDIS").expect("No redis url found!"); info!("Database backend: {}", database_url); let cors = CorsLayer::new() @@ -66,7 +67,7 @@ async fn main() { .await .expect("Failed to build public API server"); - let methods = rpc::init_public_methods(&database_url); + let methods = rpc::init_public_methods(&database_url, &redis_url); let _pub_handle = public_server .start(methods) .expect("Failed to start API server"); @@ -80,7 +81,7 @@ async fn main() { .await .expect("Failed to build private API server"); - let methods = rpc::init_private_methods(&database_url); + let methods = rpc::init_private_methods(&database_url, &redis_url); let _priv_handle = private_server .start(methods) .expect("Failed to start API server"); @@ -92,7 +93,7 @@ async fn main() { .await .expect("Failed to build websocket server"); - let ws_methods = ws::init_methods(&database_url); + let ws_methods = ws::init_methods(&database_url, &redis_url); let _ws_handle = ws_server .start(ws_methods) .expect("Failed to start websocket server"); diff --git a/src/bin/archiver.rs b/src/bin/archiver.rs index cb50f4d..4c69173 100644 --- a/src/bin/archiver.rs +++ b/src/bin/archiver.rs @@ -1,10 +1,10 @@ use crossbeam_channel::unbounded; use log::warn; -use twilight_relayerAPI::kafka; -use twilight_relayerAPI::DatabaseArchiver; +use relayerarchiverlib::kafka; +use relayerarchiverlib::DatabaseArchiver; const SNAPSHOT_TOPIC: &str = "CoreEventLogTopic"; -const ARCHIVER_GROUP: &str = "Archiver"; +const ARCHIVER_GROUP: &str = "Archiver_Redis"; fn main() { tracing_subscriber::fmt::Subscriber::builder() @@ -18,12 +18,13 @@ fn main() { } let database_url = std::env::var("DATABASE_URL").expect("No database url found!"); + let redis_url = std::env::var("ORDERBOOK_REDIS").expect("No redis url found!"); let (tx, rx) = unbounded(); let (completions, _handle) = kafka::start_consumer(ARCHIVER_GROUP.into(), SNAPSHOT_TOPIC.into(), tx); - let database_worker = DatabaseArchiver::from_host(database_url, completions); + let database_worker = DatabaseArchiver::from_host(&database_url, &redis_url, completions); database_worker .run(rx) diff --git a/src/bin/auth.rs b/src/bin/auth.rs index ef0c168..5b9b67a 100644 --- a/src/bin/auth.rs +++ b/src/bin/auth.rs @@ -4,14 +4,14 @@ use hmac::{Hmac, Mac}; use http::{Request, StatusCode}; use hyper::{body::to_bytes, server::Server, Body, Response}; use log::debug; -use serde::{Deserialize, Serialize}; -use sha2::Sha256; -use std::net::SocketAddr; -use tower::{make::Shared, ServiceBuilder}; -use twilight_relayerAPI::{ +use relayerarchiverlib::{ auth::{AuthInfo, UserInfo}, database::{AddressCustomerId, CustomerApiKeyLinking}, }; +use serde::Deserialize; +use sha2::Sha256; +use std::net::SocketAddr; +use tower::{make::Shared, ServiceBuilder}; use verify_keplr_sign::{verify_arbitrary, Signature}; type HS = Hmac; @@ -48,7 +48,7 @@ async fn verify_signature(request: Request) -> VerifyResult { data, } = match serde_json::from_slice(&request) { Ok(auth) => auth, - Err(e) => return VerifyResult::InvalidJson, + Err(_e) => return VerifyResult::InvalidJson, }; let is_ok = verify_arbitrary( @@ -69,7 +69,7 @@ async fn register_handler(account_address: String) -> Result, htt let database_url = std::env::var("DATABASE_URL").expect("No database url set!"); let mut conn = match PgConnection::establish(&database_url) { Ok(c) => c, - Err(e) => { + Err(_e) => { return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Internal db error".into()); @@ -83,7 +83,7 @@ async fn register_handler(account_address: String) -> Result, htt "User already registered, call /regenerate if you need a new API key".into(), ); } - Err(e) => { + Err(_e) => { return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Internal db error".into()); @@ -92,7 +92,7 @@ async fn register_handler(account_address: String) -> Result, htt let (api_key, api_secret) = match CustomerApiKeyLinking::create(&mut conn, customer_id) { Ok(link) => (link.api_key, link.api_salt_key), - Err(e) => { + Err(_e) => { return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Internal db error".into()); @@ -114,7 +114,7 @@ async fn regenerate_handler(address: String) -> Result, http::Err let database_url = std::env::var("DATABASE_URL").expect("No database url set!"); let mut conn = match PgConnection::establish(&database_url) { Ok(c) => c, - Err(e) => { + Err(_e) => { return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Internal db error".into()); @@ -128,7 +128,7 @@ async fn regenerate_handler(address: String) -> Result, http::Err .status(StatusCode::NOT_FOUND) .body("User not found, please call /register".into()); } - Err(e) => { + Err(_e) => { return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Internal db error".into()); @@ -137,7 +137,7 @@ async fn regenerate_handler(address: String) -> Result, http::Err let (api_key, api_secret) = match CustomerApiKeyLinking::regenerate(&mut conn, customer_id) { Ok(link) => (link.api_key, link.api_salt_key), - Err(e) => { + Err(_e) => { return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Internal db error".into()); @@ -159,7 +159,7 @@ async fn check_signature(request: Request) -> Result, http: let database_url = std::env::var("DATABASE_URL").expect("No database url set!"); let mut conn = match PgConnection::establish(&database_url) { Ok(c) => c, - Err(e) => { + Err(_e) => { return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body("Internal db error".into()); @@ -174,12 +174,12 @@ async fn check_signature(request: Request) -> Result, http: api_key, sig, body, - datetime, + datetime: _, } = serde_json::from_slice(&request).expect("f"); let key = match CustomerApiKeyLinking::get_key(&mut conn, api_key) { Ok(k) => k, - Err(e) => { + Err(_e) => { return Response::builder() .status(StatusCode::UNAUTHORIZED) .body("No customer with that key".into()); diff --git a/src/database/models.rs b/src/database/models.rs index 6254e31..153720e 100644 --- a/src/database/models.rs +++ b/src/database/models.rs @@ -1,8 +1,10 @@ +#![allow(warnings)] use crate::database::{ schema::{ address_customer_id, btc_usd_price, current_nonce, customer_account, customer_apikey_linking, customer_order_linking, funding_rate, lend_order, lend_pool, - lend_pool_command, position_size_log, sorted_set_command, trader_order, transaction_hash, + lend_pool_command, position_size_log, sorted_set_command, trader_order, + trader_order_funding_updated, transaction_hash, }, sql_types::*, }; @@ -11,8 +13,8 @@ use crate::rpc::{ TradeVolumeArgs, TransactionHashArgs, }; use bigdecimal::{BigDecimal, FromPrimitive, ToPrimitive, Zero}; -use chrono::prelude::*; -use diesel::pg::Pg; +use chrono::{prelude::*, DurationRound}; +// use diesel::pg::Pg; use diesel::prelude::*; use itertools::join; use serde::{Deserialize, Serialize}; @@ -32,6 +34,7 @@ pub struct TxHash { pub order_status: OrderStatus, pub datetime: String, pub output: Option, + pub request_id: Option, } #[derive(Serialize, Deserialize, Debug, Clone, Insertable, Queryable)] @@ -44,6 +47,7 @@ pub struct NewTxHash { pub order_status: OrderStatus, pub datetime: String, pub output: Option, + pub request_id: Option, } impl TxHash { @@ -72,6 +76,18 @@ impl TxHash { transaction_hash.filter(account_id.eq(acct_id)).load(conn) } } + TransactionHashArgs::RequestId { + id: reqt_id, + status, + } => { + if let Some(status) = status { + transaction_hash + .filter(request_id.eq(reqt_id).and(order_status.eq(status))) + .load(conn) + } else { + transaction_hash.filter(request_id.eq(reqt_id)).load(conn) + } + } } } @@ -186,7 +202,7 @@ impl AddressCustomerId { .filter(address.eq(addr)) .first::(conn) { - Ok(o) => return Ok(None), + Ok(_o) => return Ok(None), Err(diesel::result::Error::NotFound) => { let mut account = NewCustomerAccount::default(); account.customer_registration_id = Uuid::new_v4().to_string(); @@ -402,6 +418,11 @@ impl LendPool { lend_pool.order_by(nonce.desc()).first(conn) } + pub fn get_pool_share_value(&self) -> f64 { + let tps = self.total_pool_share.to_f64().unwrap_or(1.0); + let tlv = self.total_locked_value.to_f64().unwrap_or(0.0); + tlv / tps * 100.0 + } pub fn insert( conn: &mut PgConnection, @@ -502,7 +523,7 @@ fn lend_pool_to_batch( let uuid = order.uuid.to_string(); vec![(LendPoolCommandType::ADD_FUNDING_DATA, uuid, pay).into()] } - relayer_db::LendPoolCommand::AddTraderOrderLiquidation(_, order, p) => { + relayer_db::LendPoolCommand::AddTraderOrderLiquidation(_, order, p, _) => { let pay = Some(BigDecimal::from_f64(p).expect("Invalid floating point number")); let uuid = order.uuid.to_string(); vec![(LendPoolCommandType::ADD_TRADER_ORDER_LIQUIDATION, uuid, pay).into()] @@ -794,6 +815,8 @@ pub struct BtcUsdPrice { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Queryable, QueryableByName)] pub struct CandleData { + #[diesel(sql_type = diesel::sql_types::Timestamptz)] + pub updated_at: DateTime, #[diesel(sql_type = diesel::sql_types::Timestamptz)] pub start: DateTime, #[diesel(sql_type = diesel::sql_types::Timestamptz)] @@ -817,6 +840,14 @@ pub struct CandleData { } impl BtcUsdPrice { + pub fn update_candles(conn: &mut PgConnection) -> QueryResult<()> { + diesel::sql_query("SELECT * from update_candles_1min()").execute(conn)?; + diesel::sql_query("SELECT * from update_candles_1hour()").execute(conn)?; + diesel::sql_query("SELECT * from update_candles_1day()").execute(conn)?; + + Ok(()) + } + pub fn get(conn: &mut PgConnection) -> QueryResult { use crate::database::schema::btc_usd_price::dsl::*; @@ -852,77 +883,83 @@ impl BtcUsdPrice { limit: Option, offset: Option, ) -> QueryResult> { + let start: DateTime; + let table: String; + // temp for 24 hour candle change + // need to create new api for 24hour candle change data + match interval { + Interval::ONE_DAY_CHANGE => { + start = since + chrono::Duration::seconds(5); + table = "candles_1hour".into() + } + Interval::ONE_MINUTE + | Interval::FIVE_MINUTE + | Interval::FIFTEEN_MINUTE + | Interval::THIRTY_MINUTE => { + start = since.duration_trunc(interval.duration()).unwrap(); + table = "candles_1min".into() + } + Interval::ONE_HOUR + | Interval::FOUR_HOUR + | Interval::EIGHT_HOUR + | Interval::TWELVE_HOUR => { + start = since.duration_trunc(interval.duration()).unwrap(); + table = "candles_1hour".into() + } + Interval::ONE_DAY => { + start = since.duration_trunc(interval.duration()).unwrap(); + table = "candles_1day".into() + } + } let interval = interval.interval_sql(); - let trader_subquery = format!( + let subquery = format!( r#" - SELECT - window_start, - sum(entryprice) as usd_volume, - sum(positionsize) as btc_volume, - count(*) as trades - FROM ( - SELECT - t.timestamp as window_start, - coalesce(c.entryprice, 0) as entryprice, - coalesce(c.positionsize, 0) as positionsize - FROM generate_series('{}', now(), {}) t(timestamp) - LEFT JOIN trader_order c - ON c.timestamp BETWEEN t.timestamp AND t.timestamp + {} - ) as sq - GROUP BY window_start - "#, - since, interval, interval - ); - - let ohlc_subquery = format!( - r#" - SELECT - window_ts, - min(timestamp) as start, - max(timestamp) as end, - min(open) as open, - min(close) as close, - max(price) as high, - min(price) as low - FROM ( - SELECT - t.timestamp as window_ts, - c.*, - first_value(price) OVER (PARTITION BY t.timestamp ORDER BY c.timestamp asc) AS open, - first_value(price) OVER (PARTITION BY t.timestamp ORDER BY c.timestamp desc) AS close - FROM generate_series('{}', now(), {}) t(timestamp) - LEFT JOIN btc_usd_price c - ON c.timestamp BETWEEN t.timestamp AND t.timestamp + {} - ) as w - WHERE open IS NOT NULL - GROUP BY window_ts - "#, - since, interval, interval + with t as ( + select * from + generate_series('{}', now(), {}) timestamp + ), c as ( + select * from {} + where start_time between '{}' and now() + ) + select + t.timestamp as bucket, + c.start_time as start, + c.open, + c.close, + c.high, + c.low, + c.btc_volume, + c.trades, + c.usd_volume + from t + inner join c + on c.start_time >= t.timestamp AND c.start_time < t.timestamp + interval {} order by c.start_time + "#, + start, interval, table, start, interval ); let query = format!( r#" - WITH volumes AS ( - {} - ), ohlc AS ( - {} - ) - SELECT - ohlc.start, - ohlc.end, - ohlc.open, - ohlc.close, - ohlc.high, - ohlc.low, + select distinct + now() as updated_at, {} as resolution, - volumes.btc_volume as btc_volume, - volumes.trades as trades, - volumes.usd_volume as usd_volume - FROM volumes - JOIN ohlc ON volumes.window_start = ohlc.window_ts + bucket as start, + bucket + interval {} as end, + max(high) over w as high, + min(low) over w as low, + last_value(close) over w as close, + first_value(open) over w as open, + sum(btc_volume) over w as btc_volume, + sum(usd_volume) over w as usd_volume, + sum(trades) over w as trades + from ( + {} + ) as s + WINDOW w as (partition by bucket order by start asc ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) + order by start asc "#, - trader_subquery, ohlc_subquery, interval + interval, interval, subquery, ); let query = if let Some(limit) = limit { @@ -1117,8 +1154,39 @@ pub struct TraderOrder { pub entry_sequence: i64, } +#[derive( + Serialize, Deserialize, Debug, Clone, QueryableByName, Queryable, Insertable, AsChangeset, +)] +#[diesel(table_name = trader_order_funding_updated)] +pub struct TraderOrderFundingUpdates { + pub id: i64, + pub uuid: String, + pub account_id: String, + pub position_type: PositionType, + pub order_status: OrderStatus, + pub order_type: OrderType, + pub entryprice: BigDecimal, + pub execution_price: BigDecimal, + pub positionsize: BigDecimal, + pub leverage: BigDecimal, + pub initial_margin: BigDecimal, + pub available_margin: BigDecimal, + pub timestamp: DateTime, + pub bankruptcy_price: BigDecimal, + pub bankruptcy_value: BigDecimal, + pub maintenance_margin: BigDecimal, + pub liquidation_price: BigDecimal, + pub unrealized_pnl: BigDecimal, + pub settlement_price: BigDecimal, + pub entry_nonce: i64, + pub exit_nonce: i64, + pub entry_sequence: i64, +} + #[derive(Serialize, Deserialize, Debug, Clone, QueryableByName, Queryable)] pub struct RecentOrder { + #[diesel(sql_type = diesel::sql_types::Text)] + pub order_id: String, #[diesel(sql_type = crate::database::schema::sql_types::PositionType)] pub side: PositionType, #[diesel(sql_type = diesel::sql_types::Numeric)] @@ -1161,6 +1229,32 @@ pub struct InsertTraderOrder { pub entry_sequence: i64, } +#[derive(Serialize, Deserialize, Debug, Clone, Queryable, Insertable, AsChangeset)] +#[diesel(table_name = trader_order_funding_updated)] +pub struct InsertTraderOrderFundingUpdates { + pub uuid: String, + pub account_id: String, + pub position_type: PositionType, + pub order_status: OrderStatus, + pub order_type: OrderType, + pub entryprice: BigDecimal, + pub execution_price: BigDecimal, + pub positionsize: BigDecimal, + pub leverage: BigDecimal, + pub initial_margin: BigDecimal, + pub available_margin: BigDecimal, + pub timestamp: DateTime, + pub bankruptcy_price: BigDecimal, + pub bankruptcy_value: BigDecimal, + pub maintenance_margin: BigDecimal, + pub liquidation_price: BigDecimal, + pub unrealized_pnl: BigDecimal, + pub settlement_price: BigDecimal, + pub entry_nonce: i64, + pub exit_nonce: i64, + pub entry_sequence: i64, +} + #[derive( Serialize, Deserialize, Debug, Clone, Queryable, QueryableByName, Insertable, AsChangeset, )] @@ -1172,6 +1266,7 @@ pub struct OrderBookOrder { } #[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "lowercase")] pub enum NewOrderBookOrder { Bid { id: String, @@ -1189,18 +1284,34 @@ impl NewOrderBookOrder { pub fn new(to: TraderOrder) -> Self { if to.position_type == PositionType::LONG { Self::Bid { - id: to.uuid, + id: "".to_string(), positionsize: to.positionsize.to_f64().unwrap(), price: to.entryprice.to_f64().unwrap(), } } else { Self::Ask { - id: to.uuid, + id: "".to_string(), positionsize: to.positionsize.to_f64().unwrap(), price: to.entryprice.to_f64().unwrap(), } } } + // added for limit order update for settlement order + pub fn new_close_limit(to: TraderOrder, price: f64) -> Self { + if to.position_type == PositionType::SHORT { + Self::Bid { + id: "".to_string(), + positionsize: to.positionsize.to_f64().unwrap(), + price: price, + } + } else { + Self::Ask { + id: "".to_string(), + positionsize: to.positionsize.to_f64().unwrap(), + price: price, + } + } + } } pub fn unrealizedpnl( @@ -1248,6 +1359,25 @@ impl TraderOrder { .order(timestamp.desc()) .first(conn) } + pub fn get_by_signature( + conn: &mut PgConnection, + accountid: String, + ) -> QueryResult { + use crate::database::schema::trader_order::dsl::*; + trader_order + .filter(account_id.eq(accountid)) + .order(timestamp.desc()) + .first(conn) + } + pub fn get_by_uuid(conn: &mut PgConnection, order_id: String) -> QueryResult { + // use crate::database::schema::address_customer_id::dsl as addr_dsl; + use crate::database::schema::trader_order::dsl::*; + + trader_order + .filter(uuid.eq(order_id)) + .order(timestamp.desc()) + .first(conn) + } pub fn insert(conn: &mut PgConnection, orders: Vec) -> QueryResult { use crate::database::schema::trader_order::dsl::*; @@ -1270,7 +1400,7 @@ impl TraderOrder { .load(conn)?; let accounts: Vec<_> = accounts.into_iter().map(|a| a.address).collect(); - let price = BtcUsdPrice::get(conn)?; + let _price = BtcUsdPrice::get(conn)?; let closed = vec![ OrderStatus::PENDING, OrderStatus::CANCELLED, @@ -1401,7 +1531,7 @@ impl TraderOrder { args: TradeVolumeArgs, ) -> QueryResult { use crate::database::schema::address_customer_id::dsl as acct_dsl; - use crate::database::schema::trader_order::dsl::*; + // use crate::database::schema::trader_order::dsl::*; let accounts: Vec = acct_dsl::address_customer_id .filter(acct_dsl::customer_id.eq(customer_id)) @@ -1426,33 +1556,99 @@ impl TraderOrder { Ok(tv.volume.to_f64().unwrap()) } + pub fn order_book_orders(conn: &mut PgConnection) -> QueryResult> { + // let query = r#" + // SELECT * FROM trader_order + // WHERE id IN ( + // SELECT MAX(id) FROM trader_order + // WHERE order_type = 'LIMIT' + // GROUP BY uuid + // ) + // AND order_status NOT IN ('FILLED', 'CANCELLED', 'LIQUIDATE', 'SETTLED') + // "#; + let query = r#" + SELECT * FROM orderbook + "#; + + diesel::sql_query(query).get_results(conn) + } + pub fn order_book(conn: &mut PgConnection) -> QueryResult { - use crate::database::schema::trader_order::dsl::*; - use diesel::dsl::{max, sum}; + // use crate::database::schema::trader_order::dsl::*; + // use diesel::dsl::{max, sum}; let query = r#" - WITH orders AS ( - SELECT * FROM trader_order - WHERE id IN ( - SELECT MAX(id) FROM trader_order - WHERE order_type = 'LIMIT' - GROUP BY uuid - ) - AND order_status <> 'FILLED' + WITH orders AS ( + SELECT * FROM trader_order + WHERE id IN ( + SELECT MAX(id) FROM trader_order + WHERE order_type = 'LIMIT' AND position_type = 'SHORT' + GROUP BY uuid ) + AND order_status <> 'FILLED' AND order_status <> 'CANCELLED' AND order_status <> 'LIQUIDATE' + ), commands AS ( + SELECT MAX(id) as id,uuid FROM sorted_set_command + WHERE uuid IN ( SELECT uuid FROM orders ) + GROUP BY uuid + ), updates AS ( + SELECT * FROM sorted_set_command + WHERE id IN ( SELECT id FROM commands ) + ), updated AS( SELECT - MAX(uuid) AS uuid, - entryprice, - SUM(positionsize) AS positionsize + orders.uuid as uuid, + COALESCE(amount, entryprice) as entryprice, + positionsize as positionsize, + updates.command as command FROM orders - GROUP BY entryprice - ORDER BY positionsize DESC - LIMIT 10; + LEFT JOIN updates + ON updates.uuid = orders.uuid + ), sorted_set AS ( + SELECT * + FROM sorted_set_command + WHERE id IN ( + SELECT MAX(id) FROM sorted_set_command + WHERE uuid IS NOT NULL + GROUP BY uuid + ) + AND command IN ('ADD_CLOSE_LIMIT_PRICE', 'UPDATE_CLOSE_LIMIT_PRICE') + AND position_type ='SHORT' ORDER BY id DESC + ) + SELECT + trader_o.uuid AS uuid, + sort.amount AS entryprice, + trader_o.positionsize as positionsize + FROM ( + SELECT * + FROM trader_order + WHERE id IN ( + SELECT MAX(id) FROM trader_order + WHERE position_type = 'LONG' AND uuid IN ( + SELECT uuid + FROM sorted_Set + ) + GROUP BY uuid + ) + AND order_status = 'FILLED' + ) AS trader_o + LEFT OUTER JOIN ( + SELECT amount,uuid FROM sorted_Set + ) AS sort + ON trader_o.uuid = sort.uuid + UNION ALL + SELECT + MAX(uuid) AS uuid, + entryprice, + SUM(positionsize) AS positionsize + FROM updated + WHERE command IS NULL OR command <> 'REMOVE_CLOSE_LIMIT_PRICE' + GROUP BY entryprice + ORDER BY entryprice ASC + LIMIT 15; "#; let shorts: Vec = diesel::sql_query(query).get_results(conn)?; - let mut ask: Vec<_> = shorts + let ask: Vec<_> = shorts .into_iter() .map(|order| Ask { id: order.uuid, @@ -1469,21 +1665,69 @@ impl TraderOrder { WHERE order_type = 'LIMIT' AND position_type = 'LONG' GROUP BY uuid ) - AND order_status <> 'FILLED' + AND order_status <> 'FILLED' AND order_status <> 'CANCELLED' AND order_status <> 'LIQUIDATE' + ), commands AS ( + SELECT MAX(id) as id,uuid FROM sorted_set_command + WHERE uuid IN ( SELECT uuid FROM orders ) + GROUP BY uuid + ), updates AS ( + SELECT * FROM sorted_set_command + WHERE id IN ( SELECT id FROM commands ) + ), updated AS( + SELECT + orders.uuid as uuid, + COALESCE(amount, entryprice) as entryprice, + positionsize as positionsize, + updates.command as command + FROM orders + LEFT JOIN updates + ON updates.uuid = orders.uuid + ), sorted_set AS ( + SELECT * + FROM sorted_set_command + WHERE id IN ( + SELECT MAX(id) FROM sorted_set_command + WHERE uuid IS NOT NULL + GROUP BY uuid + ) + AND command IN ('ADD_CLOSE_LIMIT_PRICE', 'UPDATE_CLOSE_LIMIT_PRICE') + AND position_type ='SHORT' + ORDER BY id DESC ) + SELECT + trader_o.uuid AS uuid, + sort.amount AS entryprice, + trader_o.positionsize AS positionsize + FROM ( + SELECT * + FROM trader_order + WHERE id IN ( + SELECT MAX(id) FROM trader_order + WHERE position_type = 'SHORT' + AND uuid IN ( SELECT uuid FROM sorted_set) + GROUP BY uuid + ) + AND trader_order.order_status = 'FILLED' + ) AS trader_o + LEFT OUTER JOIN ( + SELECT amount,uuid FROM sorted_set + ) AS sort + ON trader_o.uuid = sort.uuid + UNION ALL SELECT MAX(uuid) AS uuid, entryprice, SUM(positionsize) AS positionsize - FROM orders + FROM updated + WHERE command IS NULL OR command <> 'REMOVE_CLOSE_LIMIT_PRICE' GROUP BY entryprice - ORDER BY positionsize DESC - LIMIT 10; + ORDER BY entryprice DESC + LIMIT 15; "#; let longs: Vec = diesel::sql_query(query).get_results(conn)?; - let mut bid = longs + let bid = longs .into_iter() .map(|order| Bid { id: order.uuid, @@ -1499,7 +1743,7 @@ impl TraderOrder { pub fn open_orders(conn: &mut PgConnection, customer_id: i64) -> QueryResult> { use crate::database::schema::address_customer_id::dsl as acct_dsl; - use crate::database::schema::trader_order::dsl::*; + // use crate::database::schema::trader_order::dsl::*; let account: Vec = acct_dsl::address_customer_id .filter(acct_dsl::customer_id.eq(customer_id)) @@ -1528,9 +1772,10 @@ impl TraderOrder { } pub fn list_past_24hrs(conn: &mut PgConnection) -> QueryResult> { - use crate::database::schema::trader_order::dsl::*; + // use crate::database::schema::trader_order::dsl::*; - let query = r#"SELECT + let query = r#"SELECT * FROM (SELECT + trader_order.uuid as order_id, trader_order.position_type as side, trader_order.entryprice as price, trader_order.positionsize as positionsize, @@ -1538,38 +1783,90 @@ impl TraderOrder { FROM trader_order INNER JOIN ( SELECT uuid,min(timestamp) AS timestamp - FROM trader_order GROUP BY uuid + FROM trader_order WHERE trader_order.order_status = 'FILLED' and timestamp > now() - INTERVAL '1 day' GROUP BY uuid order by timestamp desc limit 50 + ) as t ON trader_order.uuid = t.uuid AND trader_order.timestamp = t.timestamp - WHERE t.timestamp > now() - INTERVAL '1 day' - AND trader_order.order_status = 'FILLED' UNION ALL - + SELECT - trader_order.position_type as side, + trader_order.uuid as order_id, + ( + CASE WHEN trader_order.position_type = 'LONG' THEN position_type('SHORT') + ELSE position_type('LONG') + END + ) as side, trader_order.settlement_price as price, trader_order.positionsize as positionsize, trader_order.timestamp as timestamp + FROM trader_order INNER JOIN ( - SELECT uuid,min(timestamp) AS timestamp - FROM trader_order GROUP BY uuid + SELECT uuid,max(timestamp) AS timestamp + FROM trader_order + where order_status IN ('SETTLED', 'LIQUIDATE') + AND timestamp > now() - INTERVAL '1 day' + GROUP BY uuid order by timestamp desc limit 50 + ) as t ON trader_order.uuid = t.uuid AND trader_order.timestamp = t.timestamp - WHERE t.timestamp > now() - INTERVAL '1 day' - AND trader_order.order_status IN ('SETTLED', 'LIQUIDATE') + order by timestamp desc ) as recent_order order by timestamp desc limit 50 "#; diesel::sql_query(query).load(conn) } } +impl TraderOrderFundingUpdates { + pub fn insert( + conn: &mut PgConnection, + orders: Vec, + ) -> QueryResult { + use crate::database::schema::trader_order_funding_updated::dsl::*; + + let query = diesel::insert_into(trader_order_funding_updated).values(&orders); + + query.execute(conn) + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct OrderBook { pub bid: Vec, pub ask: Vec, } +impl OrderBook { + pub fn new(bid: Vec, ask: Vec) -> OrderBook { + OrderBook { bid, ask } + } + pub fn add_order(&mut self, order: NewOrderBookOrder) { + match order { + NewOrderBookOrder::Bid { + id, + positionsize, + price, + } => { + self.bid.push(Bid { + id: "".to_string(), + positionsize, + price, + }); + } + NewOrderBookOrder::Ask { + id, + positionsize, + price, + } => { + self.ask.push(Ask { + id: "".to_string(), + positionsize, + price, + }); + } + } + } +} #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Ask { @@ -1659,6 +1956,22 @@ impl LendOrder { .order(timestamp.desc()) .first(conn) } + pub fn get_by_signature(conn: &mut PgConnection, accountid: String) -> QueryResult { + use crate::database::schema::lend_order::dsl::*; + + lend_order + .filter(account_id.eq(accountid)) + .order(timestamp.desc()) + .first(conn) + } + pub fn get_by_uuid(conn: &mut PgConnection, order_id: String) -> QueryResult { + use crate::database::schema::lend_order::dsl::*; + + lend_order + .filter(uuid.eq(order_id)) + .order(timestamp.desc()) + .first(conn) + } pub fn insert(conn: &mut PgConnection, orders: Vec) -> QueryResult { use crate::database::schema::lend_order::dsl::*; @@ -1778,6 +2091,115 @@ impl From for InsertTraderOrder { } } } +impl From for TraderOrderFundingUpdates { + fn from(src: relayer::TraderOrder) -> TraderOrderFundingUpdates { + let relayer::TraderOrder { + uuid, + account_id, + position_type, + order_status, + order_type, + entryprice, + execution_price, + positionsize, + leverage, + initial_margin, + available_margin, + timestamp, + bankruptcy_price, + bankruptcy_value, + maintenance_margin, + liquidation_price, + unrealized_pnl, + settlement_price, + entry_nonce, + exit_nonce, + entry_sequence, + } = src; + + TraderOrderFundingUpdates { + id: 0, + uuid: uuid.to_string(), + account_id, + position_type: position_type.into(), + order_status: order_status.into(), + order_type: order_type.into(), + // TODO: maybe a TryFrom impl instead... + entryprice: BigDecimal::from_f64(entryprice).unwrap().round(2), + execution_price: BigDecimal::from_f64(execution_price).unwrap().round(2), + positionsize: BigDecimal::from_f64(positionsize).unwrap(), + leverage: BigDecimal::from_f64(leverage).unwrap(), + initial_margin: BigDecimal::from_f64(initial_margin).unwrap(), + available_margin: BigDecimal::from_f64(available_margin).unwrap().round(4), + timestamp: DateTime::parse_from_rfc3339(×tamp) + .expect("Bad datetime format") + .into(), + bankruptcy_price: BigDecimal::from_f64(bankruptcy_price).unwrap().round(2), + bankruptcy_value: BigDecimal::from_f64(bankruptcy_value).unwrap().round(4), + maintenance_margin: BigDecimal::from_f64(maintenance_margin).unwrap().round(4), + liquidation_price: BigDecimal::from_f64(liquidation_price).unwrap().round(2), + unrealized_pnl: BigDecimal::from_f64(unrealized_pnl).unwrap().round(2), + settlement_price: BigDecimal::from_f64(settlement_price).unwrap().round(2), + entry_nonce: entry_nonce as i64, + exit_nonce: exit_nonce as i64, + entry_sequence: entry_sequence as i64, + } + } +} +impl From for InsertTraderOrderFundingUpdates { + fn from(src: relayer::TraderOrder) -> InsertTraderOrderFundingUpdates { + let relayer::TraderOrder { + uuid, + account_id, + position_type, + order_status, + order_type, + entryprice, + execution_price, + positionsize, + leverage, + initial_margin, + available_margin, + timestamp, + bankruptcy_price, + bankruptcy_value, + maintenance_margin, + liquidation_price, + unrealized_pnl, + settlement_price, + entry_nonce, + exit_nonce, + entry_sequence, + } = src; + + InsertTraderOrderFundingUpdates { + uuid: uuid.to_string(), + account_id, + position_type: position_type.into(), + order_status: order_status.into(), + order_type: order_type.into(), + // TODO: maybe a TryFrom impl instead... + entryprice: BigDecimal::from_f64(entryprice).unwrap().round(2), + execution_price: BigDecimal::from_f64(execution_price).unwrap().round(2), + positionsize: BigDecimal::from_f64(positionsize).unwrap(), + leverage: BigDecimal::from_f64(leverage).unwrap(), + initial_margin: BigDecimal::from_f64(initial_margin).unwrap(), + available_margin: BigDecimal::from_f64(available_margin).unwrap().round(4), + timestamp: DateTime::parse_from_rfc3339(×tamp) + .expect("Bad datetime format") + .into(), + bankruptcy_price: BigDecimal::from_f64(bankruptcy_price).unwrap().round(2), + bankruptcy_value: BigDecimal::from_f64(bankruptcy_value).unwrap().round(4), + maintenance_margin: BigDecimal::from_f64(maintenance_margin).unwrap().round(4), + liquidation_price: BigDecimal::from_f64(liquidation_price).unwrap().round(2), + unrealized_pnl: BigDecimal::from_f64(unrealized_pnl).unwrap().round(2), + settlement_price: BigDecimal::from_f64(settlement_price).unwrap().round(2), + entry_nonce: entry_nonce as i64, + exit_nonce: exit_nonce as i64, + entry_sequence: entry_sequence as i64, + } + } +} impl From for InsertLendOrder { fn from(src: relayer::LendOrder) -> InsertLendOrder { diff --git a/src/database/schema.rs b/src/database/schema.rs index 3747a6f..7ef954a 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -42,6 +42,48 @@ diesel::table! { } } +diesel::table! { + candles_1day (start_time) { + start_time -> Timestamptz, + end_time -> Timestamptz, + low -> Numeric, + high -> Numeric, + open -> Numeric, + close -> Numeric, + trades -> Int4, + btc_volume -> Numeric, + usd_volume -> Numeric, + } +} + +diesel::table! { + candles_1hour (start_time) { + start_time -> Timestamptz, + end_time -> Timestamptz, + low -> Numeric, + high -> Numeric, + open -> Numeric, + close -> Numeric, + trades -> Int4, + btc_volume -> Numeric, + usd_volume -> Numeric, + } +} + +diesel::table! { + candles_1min (start_time) { + start_time -> Timestamptz, + end_time -> Timestamptz, + low -> Numeric, + high -> Numeric, + open -> Numeric, + close -> Numeric, + trades -> Int4, + btc_volume -> Numeric, + usd_volume -> Numeric, + } +} + diesel::table! { current_nonce (id) { id -> Int8, @@ -218,6 +260,39 @@ diesel::table! { } } +diesel::table! { + use diesel::sql_types::*; + use super::sql_types::PositionType; + use super::sql_types::OrderStatus; + use super::sql_types::OrderType; + + trader_order_funding_updated (id) { + id -> Int8, + #[max_length = 64] + uuid -> Varchar, + account_id -> Varchar, + position_type -> PositionType, + order_status -> OrderStatus, + order_type -> OrderType, + entryprice -> Numeric, + execution_price -> Numeric, + positionsize -> Numeric, + leverage -> Numeric, + initial_margin -> Numeric, + available_margin -> Numeric, + timestamp -> Timestamptz, + bankruptcy_price -> Numeric, + bankruptcy_value -> Numeric, + maintenance_margin -> Numeric, + liquidation_price -> Numeric, + unrealized_pnl -> Numeric, + settlement_price -> Numeric, + entry_nonce -> Int8, + exit_nonce -> Int8, + entry_sequence -> Int8, + } +} + diesel::table! { use diesel::sql_types::*; use super::sql_types::OrderType; @@ -232,6 +307,7 @@ diesel::table! { order_status -> OrderStatus, datetime -> Varchar, output -> Nullable, + request_id -> Nullable, } } @@ -242,6 +318,9 @@ diesel::joinable!(customer_order_linking -> customer_account (customer_account_i diesel::allow_tables_to_appear_in_same_query!( address_customer_id, btc_usd_price, + candles_1day, + candles_1hour, + candles_1min, current_nonce, customer_account, customer_apikey_linking, @@ -253,5 +332,35 @@ diesel::allow_tables_to_appear_in_same_query!( position_size_log, sorted_set_command, trader_order, + trader_order_funding_updated, transaction_hash, ); + +// // /* ---- View: orderbook -------------------------------------------- */ +// diesel::table! { +// // Read-only view. No primary key. +// orderbook (id) { +// id -> Int8, +// uuid -> Uuid, +// account_id -> Text, +// position_type -> Varchar, +// order_status -> Varchar, +// order_type -> Varchar, +// entryprice -> Numeric, +// execution_price -> Numeric, +// positionsize -> Numeric, +// leverage -> Numeric, +// initial_margin -> Numeric, +// available_margin -> Numeric, +// timestamp -> Timestamp, +// bankruptcy_price -> Numeric, +// bankruptcy_value -> Numeric, +// maintenance_margin -> Numeric, +// liquidation_price -> Numeric, +// unrealized_pnl -> Numeric, +// settlement_price -> Nullable, +// entry_nonce -> Nullable, +// exit_nonce -> Nullable, +// entry_sequence -> Nullable, +// } +// } diff --git a/src/database/sql_types.rs b/src/database/sql_types.rs index 2fed71b..23c8b45 100644 --- a/src/database/sql_types.rs +++ b/src/database/sql_types.rs @@ -1,13 +1,13 @@ #![allow(non_camel_case_types)] - +#![allow(warnings)] use crate::database::schema::sql_types::{ LendPoolCommandType as LendPoolCommandTypeSql, OrderStatus as OrderStatusSql, OrderType as OrderTypeSql, PositionSizeCommand as PositionSizeCommandSql, PositionType as PositionTypeSql, SortedSetCommandType as SortedSetCommandTypeSql, }; +use core::fmt::Display; use diesel::*; use diesel::{ - backend::Backend, deserialize::FromSql, pg::Pg, serialize::{self, IsNull, Output, ToSql}, @@ -15,7 +15,6 @@ use diesel::{ use relayerwalletlib::zkoswalletlib::relayer_types; use serde::{Deserialize, Serialize}; use std::io::Write; -use twilight_relayer_rust::relayer; #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub enum TXType { @@ -77,7 +76,9 @@ impl diesel::query_builder::QueryId for OrderTypeSql { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, FromSqlRow, AsExpression)] #[diesel(sql_type = PositionTypeSql)] pub enum PositionType { + #[serde(alias = "bid", alias = "Long", alias = "Bid")] LONG, + #[serde(alias = "ask", alias = "Short", alias = "Ask")] SHORT, } @@ -90,19 +91,50 @@ pub enum OrderStatus { CANCELLED, PENDING, FILLED, + DuplicateOrder, + UtxoError, + Error, + NoResponseFromChain, + BincodeError, + HexCodeError, + SerializationError, + RequestSubmitted, + OrderNotFound, + RejectedFromChain, + FilledUpdated, } impl OrderStatus { + pub fn as_str(&self) -> &'static str { + use OrderStatus::*; + + match self { + SETTLED => "SETTLED", + LENDED => "LENDED", + LIQUIDATE => "LIQUIDATE", + CANCELLED => "CANCELLED", + PENDING => "PENDING", + FILLED => "FILLED", + DuplicateOrder => "DuplicateOrder", + UtxoError => "UtxoError", + Error => "Error", + NoResponseFromChain => "NoResponseFromChain", + BincodeError => "BincodeError", + HexCodeError => "HexCodeError", + SerializationError => "SerializationError", + RequestSubmitted => "RequestSubmitted", + OrderNotFound => "OrderNotFound", + RejectedFromChain => "RejectedFromChain", + FilledUpdated => "FilledUpdated", + } + } + pub fn is_cancelable(&self) -> bool { use OrderStatus::*; match self { - SETTLED => false, - LENDED => false, - LIQUIDATE => false, - CANCELLED => false, PENDING => true, - FILLED => false, + _ => false, } } @@ -110,12 +142,8 @@ impl OrderStatus { use OrderStatus::*; match self { - SETTLED => true, - LENDED => true, - LIQUIDATE => true, - CANCELLED => true, - PENDING => false, FILLED => true, + _ => false, } } } @@ -250,6 +278,17 @@ impl ToSql for OrderStatus { OrderStatus::CANCELLED => out.write_all(b"CANCELLED")?, OrderStatus::PENDING => out.write_all(b"PENDING")?, OrderStatus::FILLED => out.write_all(b"FILLED")?, + OrderStatus::DuplicateOrder => out.write_all(b"DuplicateOrder")?, + OrderStatus::UtxoError => out.write_all(b"UtxoError")?, + OrderStatus::Error => out.write_all(b"Error")?, + OrderStatus::NoResponseFromChain => out.write_all(b"NoResponseFromChain")?, + OrderStatus::BincodeError => out.write_all(b"BincodeError")?, + OrderStatus::HexCodeError => out.write_all(b"HexCodeError")?, + OrderStatus::SerializationError => out.write_all(b"SerializationError")?, + OrderStatus::RequestSubmitted => out.write_all(b"RequestSubmitted")?, + OrderStatus::OrderNotFound => out.write_all(b"OrderNotFound")?, + OrderStatus::RejectedFromChain => out.write_all(b"RejectedFromChain")?, + OrderStatus::FilledUpdated => out.write_all(b"FilledUpdated")?, } Ok(IsNull::No) } @@ -284,6 +323,17 @@ impl FromSql for OrderStatus { b"CANCELLED" => Ok(OrderStatus::CANCELLED), b"PENDING" => Ok(OrderStatus::PENDING), b"FILLED" => Ok(OrderStatus::FILLED), + b"DuplicateOrder" => Ok(OrderStatus::DuplicateOrder), + b"UtxoError" => Ok(OrderStatus::UtxoError), + b"Error" => Ok(OrderStatus::Error), + b"NoResponseFromChain" => Ok(OrderStatus::NoResponseFromChain), + b"BincodeError" => Ok(OrderStatus::BincodeError), + b"HexCodeError" => Ok(OrderStatus::HexCodeError), + b"SerializationError" => Ok(OrderStatus::SerializationError), + b"RequestSubmitted" => Ok(OrderStatus::RequestSubmitted), + b"OrderNotFound" => Ok(OrderStatus::OrderNotFound), + b"RejectedFromChain" => Ok(OrderStatus::RejectedFromChain), + b"FilledUpdated" => Ok(OrderStatus::FilledUpdated), _ => panic!("Invalid enum type in database!"), } } @@ -333,6 +383,15 @@ impl FromSql for PositionType { } } +impl Display for PositionType { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + match self { + PositionType::LONG => write!(fmt, "LONG"), + PositionType::SHORT => write!(fmt, "SHORT"), + } + } +} + impl From for OrderStatus { fn from(status: relayer_types::OrderStatus) -> OrderStatus { match status { @@ -342,6 +401,17 @@ impl From for OrderStatus { relayer_types::OrderStatus::CANCELLED => OrderStatus::CANCELLED, relayer_types::OrderStatus::PENDING => OrderStatus::PENDING, relayer_types::OrderStatus::FILLED => OrderStatus::FILLED, + relayer_types::OrderStatus::DuplicateOrder => OrderStatus::DuplicateOrder, + relayer_types::OrderStatus::UtxoError => OrderStatus::UtxoError, + relayer_types::OrderStatus::Error => OrderStatus::Error, + relayer_types::OrderStatus::NoResponseFromChain => OrderStatus::NoResponseFromChain, + relayer_types::OrderStatus::BincodeError => OrderStatus::BincodeError, + relayer_types::OrderStatus::HexCodeError => OrderStatus::HexCodeError, + relayer_types::OrderStatus::SerializationError => OrderStatus::SerializationError, + relayer_types::OrderStatus::RequestSubmitted => OrderStatus::RequestSubmitted, + relayer_types::OrderStatus::OrderNotFound => OrderStatus::OrderNotFound, + relayer_types::OrderStatus::RejectedFromChain => OrderStatus::RejectedFromChain, + relayer_types::OrderStatus::FilledUpdated => OrderStatus::FilledUpdated, } } } diff --git a/src/error.rs b/src/error.rs index 79b6d72..8ec4659 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,4 +12,6 @@ pub enum ApiError { JsonError(#[from] serde_json::Error), #[error("Connection pool error {0:?}")] R2d2(#[from] r2d2::Error), + #[error("Redis error {0:?}")] + Redis(#[from] redis::RedisError), } diff --git a/src/kafka.rs b/src/kafka.rs index 0ea7305..9b41b11 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -27,6 +27,8 @@ pub fn start_consumer( .create() .unwrap(); + con.client_mut().load_metadata_all().unwrap(); + let mut connection_status = true; while connection_status { let sender_clone = tx.clone(); @@ -45,7 +47,9 @@ pub fn start_consumer( let message: Event = match serde_json::from_str(&msg_data) { Ok(event) => event, Err(e) => { - panic!("Invalid message! {:?} {}", e, msg_data); + println!("Invalid message! {:?} {}\n", e, msg_data); + // continue; + Event::Stop(e.to_string()) } }; message diff --git a/src/rpc.rs b/src/rpc.rs index a0293d7..6eac06d 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,7 +1,8 @@ use diesel::prelude::PgConnection; use diesel::r2d2::ConnectionManager; use jsonrpsee::{core::error::Error, server::logger::Params, RpcModule}; -use kafka::producer::{Producer, Record, RequiredAcks}; +use kafka::producer::{Producer, RequiredAcks}; +use redis::Client; use serde::Serialize; use std::sync::{Arc, Mutex}; use tokio::time::Duration; @@ -9,10 +10,13 @@ use tokio::time::Duration; mod private_methods; mod public_methods; mod types; +mod util; + pub use types::{ CandleSubscription, Candles, HistoricalFundingArgs, HistoricalPriceArgs, Interval, Order, OrderHistoryArgs, OrderId, PnlArgs, RpcArgs, TradeVolumeArgs, TransactionHashArgs, }; +pub use util::{order_book, recent_orders}; type ManagedConnection = ConnectionManager; type ManagedPool = r2d2::Pool; @@ -22,6 +26,7 @@ type HandlerType = pub struct RelayerContext { pub pool: ManagedPool, + pub client: Client, pub kafka: Arc>, } @@ -35,20 +40,25 @@ fn register_method( } } -pub fn init_public_methods(database_url: &str) -> RpcModule { +pub fn init_public_methods(database_url: &str, redis_url: &str) -> RpcModule { let manager = ConnectionManager::::new(database_url); let pool = r2d2::Pool::new(manager).expect("Could not instantiate connection pool"); + let client = Client::open(redis_url).expect("Could not establish redis connection"); let broker_host = std::env::var("BROKER").expect("missing environment variable BROKER"); let broker = vec![broker_host.to_owned()]; - let mut kafka = Producer::from_hosts(broker) + let kafka = Producer::from_hosts(broker) .with_ack_timeout(Duration::from_secs(1)) .with_required_acks(RequiredAcks::One) .create() .unwrap(); let kafka = Arc::new(Mutex::new(kafka)); - let mut module = RpcModule::new(RelayerContext { pool, kafka }); + let mut module = RpcModule::new(RelayerContext { + client, + pool, + kafka, + }); register_method( &mut module, "btc_usd_price", @@ -99,23 +109,68 @@ pub fn init_public_methods(database_url: &str) -> RpcModule { "transaction_hashes", Box::new(public_methods::transaction_hashes), ); + register_method( + &mut module, + "trader_order_info", + Box::new(public_methods::trader_order_info), + ); + register_method( + &mut module, + "lend_order_info", + Box::new(public_methods::lend_order_info), + ); + register_method( + &mut module, + "submit_trade_order", + Box::new(public_methods::submit_trade_order), + ); + register_method( + &mut module, + "submit_lend_order", + Box::new(public_methods::submit_lend_order), + ); + register_method( + &mut module, + "settle_trade_order", + Box::new(public_methods::settle_trade_order), + ); + register_method( + &mut module, + "settle_lend_order", + Box::new(public_methods::settle_lend_order), + ); + register_method( + &mut module, + "cancel_trader_order", + Box::new(public_methods::cancel_trader_order), + ); + register_method( + &mut module, + "pool_share_value", + Box::new(public_methods::pool_share_value), + ); module } -pub fn init_private_methods(database_url: &str) -> RpcModule { +pub fn init_private_methods(database_url: &str, redis_url: &str) -> RpcModule { let manager = ConnectionManager::::new(database_url); let pool = r2d2::Pool::new(manager).expect("Could not instantiate connection pool"); + let client = Client::open(redis_url).expect("Could not establish redis connection"); let broker_host = std::env::var("BROKER").expect("missing environment variable BROKER"); let broker = vec![broker_host.to_owned()]; - let mut kafka = Producer::from_hosts(broker) + let kafka = Producer::from_hosts(broker) .with_ack_timeout(Duration::from_secs(1)) .with_required_acks(RequiredAcks::One) .create() .unwrap(); let kafka = Arc::new(Mutex::new(kafka)); - let mut module = RpcModule::new(RelayerContext { pool, kafka }); + let mut module = RpcModule::new(RelayerContext { + client, + pool, + kafka, + }); register_method( &mut module, @@ -137,6 +192,11 @@ pub fn init_private_methods(database_url: &str) -> RpcModule { "settle_trade_order", Box::new(private_methods::settle_trade_order), ); + register_method( + &mut module, + "cancel_trader_order", + Box::new(private_methods::cancel_trader_order), + ); register_method( &mut module, "submit_bulk_order", diff --git a/src/rpc/private_methods.rs b/src/rpc/private_methods.rs index 26648dd..ec4dcb0 100644 --- a/src/rpc/private_methods.rs +++ b/src/rpc/private_methods.rs @@ -1,13 +1,13 @@ use super::*; -use crate::{auth::AuthInfo, database::*}; -use chrono::prelude::*; +use crate::database::*; use jsonrpsee::{core::error::Error, server::logger::Params}; use kafka::producer::Record; -use log::info; use relayerwalletlib::verify_client_message::{ - verify_query_order, verify_settle_requests, verify_trade_lend_order, + verify_client_create_trader_order, verify_query_order, verify_settle_requests, + verify_trade_lend_order, }; use twilight_relayer_rust::relayer; +use zkoswalletlib::relayer_rpcclient::method::RequestResponse; pub(super) fn submit_lend_order( params: Params<'_>, @@ -33,12 +33,15 @@ pub(super) fn submit_lend_order( let mut order = tx.create_lend_order.clone(); let meta = relayer::Meta::default(); - - order.account_id = tx.input.input.as_owner_address().cloned().unwrap(); - let deposit = order.deposit / 10000.0; - let balance = order.balance / 10000.0; - order.deposit = deposit; - order.balance = balance; + let public_key = order.account_id.clone(); + order.balance = order.deposit; + let response = RequestResponse::new( + "Order request submitted successfully".to_string(), + public_key, + ); + let Ok(response_value) = serde_json::to_value(&response) else { + return Ok(format!("Invalid response").into()); + }; let Ok(mut conn) = ctx.pool.get() else { return Ok(format!("Database connection error").into()); @@ -48,8 +51,12 @@ pub(super) fn submit_lend_order( return Ok(format!("Failed to update customer id!").into()); } - let order = - relayer::RpcCommand::CreateLendOrder(order.clone(), meta, tx.input.encode_as_hex_string()); + let order = relayer::RpcCommand::CreateLendOrder( + order.clone(), + meta, + tx.input.encode_as_hex_string(), + response.get_id(), + ); let Ok(serialized) = serde_json::to_vec(&order) else { return Ok(format!("Could not serialize order").into()); }; @@ -58,7 +65,7 @@ pub(super) fn submit_lend_order( if let Err(e) = ctx.kafka.lock().expect("Lock poisoned!").send(&record) { Ok(format!("Could not send order {:?}", e).into()) } else { - Ok("OK".into()) + Ok(response_value) } } @@ -84,8 +91,15 @@ pub(super) fn settle_lend_order( return Ok(format!("Invalid order params").into()); } - let mut order = tx.execute_lend_order.clone(); - + let order = tx.execute_lend_order.clone(); + let public_key = order.account_id.clone(); + let response = RequestResponse::new( + "Order request submitted successfully".to_string(), + public_key, + ); + let Ok(response_value) = serde_json::to_value(&response) else { + return Ok(format!("Invalid response").into()); + }; let Ok(mut conn) = ctx.pool.get() else { return Ok(format!("Database connection error").into()); }; @@ -100,20 +114,18 @@ pub(super) fn settle_lend_order( return Ok(format!("Order not found").into()); }; - if ord.order_status.is_closed() { + if !ord.order_status.is_closed() { return Ok(format!("Order closed").into()); } let meta = relayer::Meta::default(); - let Some(account_id) = tx.msg.output.as_output_data().get_owner_address() else { - return Ok(format!("Missing owner address").into()); - }; - - order.account_id = account_id.to_string(); - - let order = - relayer::RpcCommand::ExecuteLendOrder(order.clone(), meta, tx.msg.encode_as_hex_string()); + let order = relayer::RpcCommand::ExecuteLendOrder( + order.clone(), + meta, + tx.msg.encode_as_hex_string(), + response.get_id(), + ); let Ok(serialized) = serde_json::to_vec(&order) else { return Ok(format!("Could not serialize order").into()); }; @@ -122,7 +134,7 @@ pub(super) fn settle_lend_order( if let Err(e) = ctx.kafka.lock().expect("Lock poisoned!").send(&record) { Ok(format!("Could not send order {:?}", e).into()) } else { - Ok("OK".into()) + Ok(response_value) } } @@ -140,22 +152,29 @@ pub(super) fn submit_trade_order( return Ok(format!("Invalid hex data").into()); }; - let Ok(tx) = bincode::deserialize::(&bytes) else { + let Ok(tx) = bincode::deserialize::(&bytes) else { return Ok(format!("Invalid bincode").into()); }; - if let Err(_) = verify_trade_lend_order(&tx.input) { + if let Err(_) = verify_client_create_trader_order(&tx.tx) { return Ok(format!("Invalid order params").into()); } + let Ok(transaction_ser) = bincode::serialize(&tx.tx) else { + return Ok(format!("Invalid bincode").into()); + }; + let mut order = tx.create_trader_order.clone(); let meta = relayer::Meta::default(); - - order.account_id = tx.input.input.as_owner_address().cloned().unwrap(); - let margin = order.initial_margin / 10000.0; - order.initial_margin = margin; - order.available_margin = margin; - + let public_key = order.account_id.clone(); + order.available_margin = order.initial_margin; + let response = RequestResponse::new( + "Order request submitted successfully".to_string(), + public_key, + ); + let Ok(response_value) = serde_json::to_value(&response) else { + return Ok(format!("Invalid response").into()); + }; let Ok(mut conn) = ctx.pool.get() else { return Ok(format!("Database connection error").into()); }; @@ -167,7 +186,8 @@ pub(super) fn submit_trade_order( let order = relayer::RpcCommand::CreateTraderOrder( order.clone(), meta, - tx.input.encode_as_hex_string(), + hex::encode(transaction_ser), + response.get_id(), ); let Ok(serialized) = serde_json::to_vec(&order) else { return Ok(format!("Could not serialize order").into()); @@ -177,7 +197,7 @@ pub(super) fn submit_trade_order( if let Err(e) = ctx.kafka.lock().expect("Lock poisoned!").send(&record) { Ok(format!("Could not send order {:?}", e).into()) } else { - Ok("OK".into()) + Ok(response_value) } } @@ -203,8 +223,15 @@ pub(super) fn settle_trade_order( return Ok(format!("Invalid order params").into()); } - let mut order = tx.execute_trader_order.clone(); - + let order = tx.execute_trader_order.clone(); + let public_key = order.account_id.clone(); + let response = RequestResponse::new( + "Order request submitted successfully".to_string(), + public_key, + ); + let Ok(response_value) = serde_json::to_value(&response) else { + return Ok(format!("Invalid response").into()); + }; let Ok(mut conn) = ctx.pool.get() else { return Ok(format!("Database connection error").into()); }; @@ -213,20 +240,18 @@ pub(super) fn settle_trade_order( return Ok(format!("Order not found").into()); }; - if ord.order_status.is_closed() { + if !ord.order_status.is_closed() { return Ok(format!("Order closed").into()); } let meta = relayer::Meta::default(); - let Some(account_id) = tx.msg.output.as_output_data().get_owner_address() else { - return Ok(format!("Missing owner address").into()); - }; - - order.account_id = account_id.to_string(); - - let order = - relayer::RpcCommand::ExecuteTraderOrder(order.clone(), meta, tx.msg.encode_as_hex_string()); + let order = relayer::RpcCommand::ExecuteTraderOrder( + order.clone(), + meta, + tx.msg.encode_as_hex_string(), + response.get_id(), + ); let Ok(serialized) = serde_json::to_vec(&order) else { return Ok(format!("Could not serialize order").into()); }; @@ -235,11 +260,11 @@ pub(super) fn settle_trade_order( if let Err(e) = ctx.kafka.lock().expect("Lock poisoned!").send(&record) { Ok(format!("Could not send order {:?}", e).into()) } else { - Ok("OK".into()) + Ok(response_value) } } -pub(super) fn cancel_order( +pub(super) fn cancel_trader_order( params: Params<'_>, ctx: &RelayerContext, ) -> Result { @@ -264,8 +289,15 @@ pub(super) fn cancel_order( return Ok(format!("Invalid order params").into()); } - let mut order = tx.cancel_trader_order.clone(); - + let order = tx.cancel_trader_order.clone(); + let public_key = order.account_id.clone(); + let response = RequestResponse::new( + "Order request submitted successfully".to_string(), + public_key, + ); + let Ok(response_value) = serde_json::to_value(&response) else { + return Ok(format!("Invalid response").into()); + }; let Ok(mut conn) = ctx.pool.get() else { return Ok(format!("Database connection error").into()); }; @@ -280,11 +312,12 @@ pub(super) fn cancel_order( let meta = relayer::Meta::default(); - let account_id = tx.msg.public_key.clone(); - order.account_id = account_id; - - let order = - relayer::RpcCommand::CancelTraderOrder(order.clone(), meta, tx.msg.encode_as_hex_string()); + let order = relayer::RpcCommand::CancelTraderOrder( + order.clone(), + meta, + tx.msg.encode_as_hex_string(), + response.get_id(), + ); let Ok(serialized) = serde_json::to_vec(&order) else { return Ok(format!("Could not serialize order").into()); }; @@ -293,18 +326,18 @@ pub(super) fn cancel_order( if let Err(e) = ctx.kafka.lock().expect("Lock poisoned!").send(&record) { Ok(format!("Could not send order {:?}", e).into()) } else { - Ok("OK".into()) + Ok(response_value) } } pub(super) fn submit_bulk_order( params: Params<'_>, - ctx: &RelayerContext, + _ctx: &RelayerContext, ) -> Result { - let topic = std::env::var("RPC_CLIENT_REQUEST").expect("No client topic!"); + let _topic = std::env::var("RPC_CLIENT_REQUEST").expect("No client topic!"); let args: RpcArgs> = params.parse()?; - let (account_id, orders) = args.unpack(); - let account_id = format!("{:016x}", account_id); + let (account_id, _orders) = args.unpack(); + let _account_id = format!("{:016x}", account_id); // TODO: bulk orders with ZkOS?? Ok("OK".into()) @@ -403,12 +436,12 @@ pub(super) fn lend_order_info( } } -pub(super) fn last_day_apy( - _: Params<'_>, - ctx: &RelayerContext, -) -> Result { - todo!("APY") -} +// pub(super) fn last_day_apy( +// _: Params<'_>, +// ctx: &RelayerContext, +// ) -> Result { +// todo!("APY") +// } pub(super) fn unrealized_pnl( params: Params<'_>, @@ -431,7 +464,7 @@ pub(super) fn open_orders( ctx: &RelayerContext, ) -> Result { let args: RpcArgs<()> = params.parse()?; - let (id, params) = args.unpack(); + let (id, _params) = args.unpack(); match ctx.pool.get() { Ok(mut conn) => match TraderOrder::open_orders(&mut conn, id) { diff --git a/src/rpc/public_methods.rs b/src/rpc/public_methods.rs index c045cab..348a4b8 100644 --- a/src/rpc/public_methods.rs +++ b/src/rpc/public_methods.rs @@ -2,6 +2,13 @@ use super::*; use crate::database::*; use chrono::prelude::*; use jsonrpsee::{core::error::Error, server::logger::Params}; +use kafka::producer::Record; +use relayerwalletlib::verify_client_message::{ + verify_client_create_trader_order, verify_query_order, verify_settle_requests, + verify_trade_lend_order, +}; +use twilight_relayer_rust::relayer; +use zkoswalletlib::relayer_rpcclient::method::RequestResponse; pub(super) fn btc_usd_price( _: Params<'_>, @@ -91,26 +98,26 @@ pub(super) fn open_limit_orders( _: Params<'_>, ctx: &RelayerContext, ) -> Result { - match ctx.pool.get() { - Ok(mut conn) => match TraderOrder::order_book(&mut conn) { - Ok(o) => Ok(serde_json::to_value(o).expect("Error converting response")), - Err(e) => Err(Error::Custom(format!("Error fetching order info: {:?}", e))), - }, - Err(e) => Err(Error::Custom(format!("Database error: {:?}", e))), - } + let Ok(mut conn) = ctx.client.get_connection() else { + return Ok("Redis connection error.".into()); + }; + + let book = order_book(&mut conn); + + Ok(serde_json::to_value(book).expect("Failed to serialize order book")) } pub(super) fn recent_trade_orders( _: Params<'_>, ctx: &RelayerContext, ) -> Result { - match ctx.pool.get() { - Ok(mut conn) => match TraderOrder::list_past_24hrs(&mut conn) { - Ok(o) => Ok(serde_json::to_value(o).expect("Error converting response")), - Err(e) => Err(Error::Custom(format!("Error fetching order info: {:?}", e))), - }, - Err(e) => Err(Error::Custom(format!("Database error: {:?}", e))), - } + let Ok(mut conn) = ctx.client.get_connection() else { + return Ok("Redis connection error.".into()); + }; + + let orders = recent_orders(&mut conn); + + Ok(serde_json::to_value(&orders).expect("Failed to serialize recent orders")) } pub(super) fn position_size( @@ -150,3 +157,364 @@ pub(super) fn transaction_hashes( pub(super) fn server_time(_: Params<'_>, _: &RelayerContext) -> Result { Ok(serde_json::to_value(Utc::now()).expect("Failed to get timestamp")) } + +pub(super) fn trader_order_info( + params: Params<'_>, + ctx: &RelayerContext, +) -> Result { + let Order { data } = params.parse()?; + let Ok(bytes) = hex::decode(&data) else { + return Ok(format!("Invalid hex data").into()); + }; + // println!("bytes:{:?}", bytes); + let Ok(tx) = bincode::deserialize::(&bytes) else { + return Ok(format!("Invalid bincode").into()); + }; + if let Err(arg) = verify_query_order( + tx.msg.clone(), + &bincode::serialize(&tx.query_trader_order).unwrap(), + ) { + return Ok(format!("Invalid order params:{:?}", arg).into()); + } + match ctx.pool.get() { + Ok(mut conn) => { + match TraderOrder::get_by_signature(&mut conn, tx.query_trader_order.account_id) { + Ok(o) => Ok(serde_json::to_value(o).expect("Error converting response")), + Err(e) => Err(Error::Custom(format!("Error fetching order info: {:?}", e))), + } + } + Err(e) => Err(Error::Custom(format!("Database error: {:?}", e))), + } +} + +pub(super) fn lend_order_info( + params: Params<'_>, + ctx: &RelayerContext, +) -> Result { + let Order { data } = params.parse()?; + let Ok(bytes) = hex::decode(&data) else { + return Ok(format!("Invalid hex data").into()); + }; + // println!("bytes:{:?}", bytes); + let Ok(tx) = bincode::deserialize::(&bytes) else { + return Ok(format!("Invalid bincode").into()); + }; + if let Err(arg) = verify_query_order( + tx.msg.clone(), + &bincode::serialize(&tx.query_lend_order).unwrap(), + ) { + return Ok(format!("Invalid order params:{:?}", arg).into()); + } + match ctx.pool.get() { + Ok(mut conn) => { + match LendOrder::get_by_signature(&mut conn, tx.query_lend_order.account_id) { + Ok(o) => Ok(serde_json::to_value(o).expect("Error converting response")), + Err(e) => Err(Error::Custom(format!("Error fetching order info: {:?}", e))), + } + } + Err(e) => Err(Error::Custom(format!("Database error: {:?}", e))), + } +} + +pub(super) fn submit_trade_order( + params: Params<'_>, + ctx: &RelayerContext, +) -> Result { + let topic = std::env::var("RPC_CLIENT_REQUEST").expect("No client topic!"); + let Order { data } = params.parse()?; + + let Ok(bytes) = hex::decode(&data) else { + return Ok(format!("Invalid hex data").into()); + }; + + let Ok(tx) = bincode::deserialize::(&bytes) else { + return Ok(format!("Invalid bincode").into()); + }; + + if let Err(_) = verify_client_create_trader_order(&tx.tx) { + return Ok(format!("Invalid order params").into()); + } + + let Ok(transaction_ser) = bincode::serialize(&tx.tx) else { + return Ok(format!("Invalid bincode").into()); + }; + + let mut order = tx.create_trader_order.clone(); + let meta = relayer::Meta::default(); + let public_key = order.account_id.clone(); + order.available_margin = order.initial_margin; + let response = RequestResponse::new( + "Order request submitted successfully".to_string(), + public_key, + ); + let Ok(response_value) = serde_json::to_value(&response) else { + return Ok(format!("Invalid response").into()); + }; + + let order = relayer::RpcCommand::CreateTraderOrder( + order.clone(), + meta, + hex::encode(transaction_ser), + response.get_id(), + ); + let Ok(serialized) = serde_json::to_vec(&order) else { + return Ok(format!("Could not serialize order").into()); + }; + + let record = Record::from_key_value(&topic, "CreateTraderOrder", serialized); + if let Err(e) = ctx.kafka.lock().expect("Lock poisoned!").send(&record) { + Ok(format!("Could not send order {:?}", e).into()) + } else { + Ok(response_value) + } +} + +pub(super) fn submit_lend_order( + params: Params<'_>, + ctx: &RelayerContext, +) -> Result { + let topic = std::env::var("RPC_CLIENT_REQUEST").expect("No client topic!"); + let Order { data } = params.parse()?; + + let Ok(bytes) = hex::decode(&data) else { + return Ok(format!("Invalid hex data").into()); + }; + + let Ok(tx) = bincode::deserialize::(&bytes) else { + return Ok(format!("Invalid bincode").into()); + }; + + if let Err(_) = verify_trade_lend_order(&tx.input) { + return Ok(format!("Invalid order params").into()); + } + + let mut order = tx.create_lend_order.clone(); + let public_key = order.account_id.clone(); + let meta = relayer::Meta::default(); + order.balance = order.deposit; + let response = RequestResponse::new( + "Order request submitted successfully".to_string(), + public_key, + ); + let Ok(response_value) = serde_json::to_value(&response) else { + return Ok(format!("Invalid response").into()); + }; + let order = relayer::RpcCommand::CreateLendOrder( + order.clone(), + meta, + tx.input.encode_as_hex_string(), + response.get_id(), + ); + let Ok(serialized) = serde_json::to_vec(&order) else { + return Ok(format!("Could not serialize order").into()); + }; + + let record = Record::from_key_value(&topic, "CreateLendOrder", serialized); + if let Err(e) = ctx.kafka.lock().expect("Lock poisoned!").send(&record) { + Ok(format!("Could not send order {:?}", e).into()) + } else { + Ok(response_value) + } +} + +pub(super) fn settle_trade_order( + params: Params<'_>, + ctx: &RelayerContext, +) -> Result { + let topic = std::env::var("RPC_CLIENT_REQUEST").expect("No client topic!"); + let Order { data } = params.parse()?; + + let Ok(bytes) = hex::decode(&data) else { + return Ok(format!("Invalid hex data").into()); + }; + + let Ok(tx) = bincode::deserialize::(&bytes) else { + return Ok(format!("Invalid bincode").into()); + }; + + if let Err(_) = verify_settle_requests(&tx.msg) { + return Ok(format!("Invalid order params").into()); + } + + let order = tx.execute_trader_order.clone(); + let public_key = order.account_id.clone(); + let response = RequestResponse::new( + "Order request submitted successfully".to_string(), + public_key, + ); + let Ok(response_value) = serde_json::to_value(&response) else { + return Ok(format!("Invalid response").into()); + }; + let Ok(mut conn) = ctx.pool.get() else { + return Ok(format!("Database connection error").into()); + }; + + let Ok(ord) = TraderOrder::get_by_uuid(&mut conn, order.uuid.to_string()) else { + return Ok(format!("Order not found").into()); + }; + + if !ord.order_status.is_closed() { + return Ok(format!("Order closed").into()); + } + + let meta = relayer::Meta::default(); + + let order = relayer::RpcCommand::ExecuteTraderOrder( + order.clone(), + meta, + tx.msg.encode_as_hex_string(), + response.get_id(), + ); + let Ok(serialized) = serde_json::to_vec(&order) else { + return Ok(format!("Could not serialize order").into()); + }; + + let record = Record::from_key_value(&topic, "ExecuteTraderOrder", serialized); + if let Err(e) = ctx.kafka.lock().expect("Lock poisoned!").send(&record) { + Ok(format!("Could not send order {:?}", e).into()) + } else { + Ok(response_value) + } +} + +pub(super) fn settle_lend_order( + params: Params<'_>, + ctx: &RelayerContext, +) -> Result { + let topic = std::env::var("RPC_CLIENT_REQUEST").expect("No client topic!"); + let Order { data } = params.parse()?; + + let Ok(bytes) = hex::decode(&data) else { + return Ok(format!("Invalid hex data").into()); + }; + + let Ok(tx) = bincode::deserialize::(&bytes) else { + return Ok(format!("Invalid bincode").into()); + }; + + if let Err(_) = verify_settle_requests(&tx.msg) { + return Ok(format!("Invalid order params").into()); + } + + let order = tx.execute_lend_order.clone(); + let public_key = order.account_id.clone(); + let response = RequestResponse::new( + "Order request submitted successfully".to_string(), + public_key, + ); + let Ok(response_value) = serde_json::to_value(&response) else { + return Ok(format!("Invalid response").into()); + }; + let Ok(mut conn) = ctx.pool.get() else { + return Ok(format!("Database connection error").into()); + }; + + let Ok(ord) = LendOrder::get_by_uuid(&mut conn, order.uuid.to_string()) else { + return Ok(format!("Order not found").into()); + }; + + if !ord.order_status.is_closed() { + return Ok(format!("Order closed").into()); + } + + let meta = relayer::Meta::default(); + + let order = relayer::RpcCommand::ExecuteLendOrder( + order.clone(), + meta, + tx.msg.encode_as_hex_string(), + response.get_id(), + ); + let Ok(serialized) = serde_json::to_vec(&order) else { + return Ok(format!("Could not serialize order").into()); + }; + + let record = Record::from_key_value(&topic, "ExecuteLendOrder", serialized); + if let Err(e) = ctx.kafka.lock().expect("Lock poisoned!").send(&record) { + Ok(format!("Could not send order {:?}", e).into()) + } else { + Ok(response_value) + } +} + +pub(super) fn cancel_trader_order( + params: Params<'_>, + ctx: &RelayerContext, +) -> Result { + let topic = std::env::var("RPC_CLIENT_REQUEST").expect("No client topic!"); + let Order { data } = params.parse()?; + + let Ok(bytes) = hex::decode(&data) else { + return Ok(format!("Invalid hex data").into()); + }; + + let Ok(tx) = bincode::deserialize::(&bytes) else { + return Ok(format!("Invalid bincode").into()); + }; + + if let Err(_) = verify_query_order( + tx.msg.convert_cancel_to_query(), + &bincode::serialize(&tx.cancel_trader_order).unwrap(), + ) { + return Ok(format!("Invalid order params").into()); + } + + let order = tx.cancel_trader_order.clone(); + let public_key = order.account_id.clone(); + let response = RequestResponse::new( + "Order request submitted successfully".to_string(), + public_key, + ); + let Ok(response_value) = serde_json::to_value(&response) else { + return Ok(format!("Invalid response").into()); + }; + let Ok(mut conn) = ctx.pool.get() else { + return Ok(format!("Database connection error").into()); + }; + + let Ok(ord) = TraderOrder::get_by_uuid(&mut conn, order.uuid.to_string()) else { + return Ok(format!("Order not found").into()); + }; + + if !ord.order_status.is_cancelable() { + return Ok(format!("Order not cancelable").into()); + } + + let meta = relayer::Meta::default(); + + let order = relayer::RpcCommand::CancelTraderOrder( + order.clone(), + meta, + tx.msg.encode_as_hex_string(), + response.get_id(), + ); + let Ok(serialized) = serde_json::to_vec(&order) else { + return Ok(format!("Could not serialize order").into()); + }; + + let record = Record::from_key_value(&topic, "CancelTraderOrder", serialized); + if let Err(e) = ctx.kafka.lock().expect("Lock poisoned!").send(&record) { + Ok(format!("Could not send order {:?}", e).into()) + } else { + Ok(response_value) + } +} + +pub(super) fn pool_share_value( + _params: Params<'_>, + ctx: &RelayerContext, +) -> Result { + match ctx.pool.get() { + Ok(mut conn) => match LendPool::get(&mut conn) { + Ok(o) => { + let value = o.get_pool_share_value(); + Ok(serde_json::to_value(value).expect("Error converting response")) + } + Err(e) => Err(Error::Custom(format!( + "Error fetching lend pool info: {:?}", + e + ))), + }, + Err(e) => Err(Error::Custom(format!("Database error: {:?}", e))), + } +} diff --git a/src/rpc/types.rs b/src/rpc/types.rs index f2e7741..015d15b 100644 --- a/src/rpc/types.rs +++ b/src/rpc/types.rs @@ -1,5 +1,5 @@ #![allow(non_camel_case_types)] - +#![allow(warnings)] // • Live Price Data // • Historical Price Data // • Funding Rate @@ -13,8 +13,7 @@ // • Server Time use crate::auth::UserInfo; use crate::database::OrderStatus; -use chrono::prelude::*; -use relayerwalletlib::zkoswalletlib::relayer_types::{OrderType, PositionType}; +use chrono::{prelude::*, Duration}; use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] @@ -61,6 +60,10 @@ pub enum TransactionHashArgs { id: String, status: Option, }, + RequestId { + id: String, + status: Option, + }, } #[derive(Debug, Serialize, Deserialize)] @@ -220,7 +223,7 @@ pub struct CandleSubscription { pub interval: Interval, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Copy, Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)] pub enum Interval { ONE_MINUTE, FIVE_MINUTE, @@ -231,9 +234,25 @@ pub enum Interval { EIGHT_HOUR, TWELVE_HOUR, ONE_DAY, + ONE_DAY_CHANGE, } impl Interval { + pub fn duration(&self) -> Duration { + match self { + Interval::ONE_MINUTE => Duration::minutes(1), + Interval::FIVE_MINUTE => Duration::minutes(5), + Interval::FIFTEEN_MINUTE => Duration::minutes(15), + Interval::THIRTY_MINUTE => Duration::minutes(30), + Interval::ONE_HOUR => Duration::hours(1), + Interval::FOUR_HOUR => Duration::hours(4), + Interval::EIGHT_HOUR => Duration::hours(8), + Interval::TWELVE_HOUR => Duration::hours(12), + Interval::ONE_DAY => Duration::days(1), + Interval::ONE_DAY_CHANGE => Duration::days(1), + } + } + pub fn interval_sql(&self) -> String { match self { Interval::ONE_MINUTE => "'1 minute'", @@ -245,6 +264,7 @@ impl Interval { Interval::EIGHT_HOUR => "'8 hours'", Interval::TWELVE_HOUR => "'12 hours'", Interval::ONE_DAY => "'1 day'", + Interval::ONE_DAY_CHANGE => "'1 day'", } .into() } diff --git a/src/rpc/util.rs b/src/rpc/util.rs new file mode 100644 index 0000000..acf55bd --- /dev/null +++ b/src/rpc/util.rs @@ -0,0 +1,77 @@ +use crate::database::{Ask, Bid, OrderBook, RecentOrder}; +use chrono::{TimeDelta, Utc}; +use itertools::Itertools; + +const BOOK_LIMIT: usize = 10; +const RECENT_ORDER_LIMIT: usize = 25; + +pub fn order_book(conn: &mut redis::Connection) -> OrderBook { + let asks: redis::Iter = redis::cmd("ZSCAN") + .arg("ask") + .cursor_arg(0) + .clone() + .iter(conn) + .unwrap(); + + let ask: Vec<_> = asks + .chunks(2) + .into_iter() + .take(BOOK_LIMIT) + .map(|mut chunk| { + let positionsize = chunk.next().unwrap(); + let price = chunk.next().unwrap() / 100.0; + + Ask { + id: "".into(), + positionsize, + price, + } + }) + .collect(); + + let bids: redis::Iter = redis::cmd("ZSCAN") + .arg("bid") + .cursor_arg(0) + .clone() + .iter(conn) + .unwrap(); + + let bids: Vec<_> = bids.collect(); + let bid: Vec<_> = bids + .chunks(2) + .rev() + .into_iter() + .take(BOOK_LIMIT) + .map(|chunk| { + let positionsize = chunk[0]; + let price = chunk[1] / 100.0; + + Bid { + id: "".into(), + positionsize, + price, + } + }) + .collect(); + + OrderBook { ask, bid } +} + +pub fn recent_orders(conn: &mut redis::Connection) -> Vec { + let max = Utc::now(); + let min = max - TimeDelta::days(1); + + let orders: Vec = redis::cmd("ZRANGEBYSCORE") + .arg("recent_orders") + .arg(min.timestamp_millis()) + .arg(max.timestamp_millis()) + .query(conn) + .unwrap(); + + orders + .into_iter() + .rev() + .take(RECENT_ORDER_LIMIT) + .map(|order| serde_json::from_str(&order).expect("Invalid recent order!")) + .collect() +} diff --git a/src/ws.rs b/src/ws.rs index 72d851b..7caeec4 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -1,20 +1,27 @@ use crate::database::{NewOrderBookOrder, TraderOrder}; use crate::kafka::start_consumer; -use bigdecimal::ToPrimitive; +use crate::rpc::Interval; +// use bigdecimal::ToPrimitive; use chrono::prelude::*; use crossbeam_channel::{unbounded, Sender as CrossbeamSender}; use diesel::prelude::PgConnection; use diesel::r2d2::ConnectionManager; use jsonrpsee::RpcModule; use log::{error, info, trace}; +use redis::Client; use relayerwalletlib::zkoswalletlib::relayer_types::{OrderStatus, OrderType}; use serde::{Deserialize, Serialize}; -use std::time::{Duration, Instant}; +use std::{ + collections::HashMap, + sync::RwLock, + time::{Duration, Instant}, +}; use tokio::{ sync::broadcast::{channel, Sender}, task::JoinHandle, }; -use twilight_relayer_rust::{db::Event, relayer}; +use twilight_relayer_rust::db::Event; +use twilight_relayer_rust::relayer::PositionType; mod methods; @@ -27,10 +34,20 @@ const BROADCAST_CHANNEL_CAPACITY: usize = 20; type ManagedConnection = ConnectionManager; type ManagedPool = r2d2::Pool; +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct RecentOrder { + order_id: String, + side: PositionType, + price: f64, + positionsize: f64, + timestamp: String, +} pub struct WsContext { + client: Client, price_feed: Sender<(f64, DateTime)>, order_book: Sender, - recent_trades: Sender, + recent_trades: Sender, + pub candles: RwLock>>, pub pool: ManagedPool, _completions: CrossbeamSender, _watcher: JoinHandle<()>, @@ -38,10 +55,10 @@ pub struct WsContext { } impl WsContext { - pub fn with_pool(pool: ManagedPool) -> WsContext { + pub fn with_pool(pool: ManagedPool, client: Client) -> WsContext { let (price_feed, _) = channel::<(f64, DateTime)>(BROADCAST_CHANNEL_CAPACITY); let (order_book, _) = channel::(BROADCAST_CHANNEL_CAPACITY); - let (recent_trades, _) = channel::(BROADCAST_CHANNEL_CAPACITY); + let (recent_trades, _) = channel::(BROADCAST_CHANNEL_CAPACITY); let price_feed2 = price_feed.clone(); let order_book2 = order_book.clone(); @@ -77,12 +94,38 @@ impl WsContext { } match to.order_status { - OrderStatus::PENDING | OrderStatus::FILLED => { - recent_trades2.send(to); + OrderStatus::SETTLED + | OrderStatus::FILLED + | OrderStatus::LIQUIDATE => { + let recent_order = RecentOrder { + order_id: to.uuid.to_string(), + side: to.position_type.into(), + price: to.entryprice.into(), + positionsize: to.positionsize.into(), + timestamp: to.timestamp, + }; + let _ = recent_trades2.send(recent_order); } _ => {} } } + // added for limit order update for settlement order + Event::TraderOrderLimitUpdate(to, cmd, _seq) => { + let settlement_price = match cmd { + twilight_relayer_rust::relayer::RpcCommand::ExecuteTraderOrder(execute_trader_order, _meta, _zkos_hex_string, _request_id) => { + execute_trader_order.execution_price + } + _ => 0.0, // Default value for other command types + }; + + let order = NewOrderBookOrder::new_close_limit( + TraderOrder::from(to.clone()), + settlement_price, + ); + if let Err(e) = order_book2.send(order) { + info!("No order book subscribers present {:?}", e); + } + } Event::LendOrder(_lend_order, _cmd, _seq) => {} Event::FundingRateUpdate( _funding_rate, @@ -105,6 +148,8 @@ impl WsContext { ) => {} Event::Stop(_stop) => {} Event::TxHash(..) => {} + Event::TxHashUpdate(..) => {} + Event::AdvanceStateQueue(..) => {} } } if let Err(e) = notify.send(completion) { @@ -127,9 +172,11 @@ impl WsContext { }); WsContext { + client, price_feed, order_book, recent_trades, + candles: Default::default(), pool, _completions: completions, _watcher, @@ -138,11 +185,15 @@ impl WsContext { } } -pub fn init_methods(database_url: &str) -> RpcModule { +pub fn init_methods(database_url: &str, redis_url: &str) -> RpcModule { let manager = ConnectionManager::::new(database_url); - let pool = r2d2::Pool::new(manager).expect("Could not instantiate connection pool"); + let pool = r2d2::Pool::builder() + .max_size(50) + .build(manager) + .expect("Could not instantiate connection pool"); + let client = Client::open(redis_url).expect("Could not establish redis connection"); - let mut module = RpcModule::new(WsContext::with_pool(pool)); + let mut module = RpcModule::new(WsContext::with_pool(pool, client)); module .register_subscription( @@ -194,15 +245,15 @@ pub fn init_methods(database_url: &str) -> RpcModule { #[cfg(test)] mod tests { - use super::*; - use jsonrpsee::{ - core::{ - client::ClientT, - params::{ArrayParams, ObjectParams}, - }, - http_client::HttpClientBuilder, - server::ServerBuilder, - }; + // use super::*; + // use jsonrpsee::{ + // core::{ + // client::ClientT, + // params::{ArrayParams, ObjectParams}, + // }, + // http_client::HttpClientBuilder, + // server::ServerBuilder, + // }; // #[tokio::test] // async fn test_hello() { diff --git a/src/ws/methods.rs b/src/ws/methods.rs index afaaf09..ddb2cc4 100644 --- a/src/ws/methods.rs +++ b/src/ws/methods.rs @@ -1,7 +1,8 @@ +#![allow(warnings)] use crate::{ - database::{BtcUsdPrice, CandleData, TraderOrder}, + database::{Ask, Bid, BtcUsdPrice, OrderBook, TraderOrder}, error::ApiError, - rpc::{CandleSubscription, Interval}, + rpc::{order_book, CandleSubscription, Interval}, }; use chrono::prelude::*; use jsonrpsee::{ @@ -10,12 +11,9 @@ use jsonrpsee::{ }; use log::{error, info, warn}; use serde::Serialize; -use std::{ - sync::Arc, - time::{Duration, Instant}, -}; +use std::{sync::Arc, time::Duration}; use tokio::{ - sync::broadcast::{error::TryRecvError, Receiver}, + sync::broadcast::{channel, error::TryRecvError, Receiver}, task::JoinHandle, time::sleep, }; @@ -63,45 +61,73 @@ pub(super) fn candle_update( sink.accept()?; let CandleSubscription { interval } = params.parse()?; - // let time = match interval { - // Interval::ONE_MINUTE => chrono::Duration::minutes(1), - // Interval::FIVE_MINUTE => chrono::Duration::minutes(5), - // Interval::FIFTEEN_MINUTE => chrono::Duration::minutes(15), - // Interval::THIRTY_MINUTE => chrono::Duration::minutes(30), - // Interval::ONE_HOUR => chrono::Duration::minutes(60), - // Interval::FOUR_HOUR => chrono::Duration::hours(4), - // Interval::EIGHT_HOUR => chrono::Duration::hours(8), - // Interval::TWELVE_HOUR => chrono::Duration::hours(12), - // Interval::ONE_DAY => chrono::Duration::hours(24), - // _ => chrono::Duration::minutes(1), - // }; - // let mut last_candle_vec: Vec = Vec::new(); - let _: JoinHandle> = tokio::task::spawn(async move { - loop { - let mut conn = ctx.pool.get()?; - // let time_now = Utc::now(); - // let time_interval = chrono::Duration::seconds(time_now.second() as i64) - // + chrono::Duration::milliseconds(time_now.timestamp_millis() as i64) - // + chrono::Duration::microseconds(time_now.timestamp_micros() as i64) - // + chrono::Duration::nanoseconds(time_now.timestamp_nanos() as i64); - let since = Utc::now() - chrono::Duration::minutes(1); - let candles = BtcUsdPrice::candles(&mut conn, interval.clone(), since, None, None)?; - - let result = serde_json::to_value(&candles)?; - - if candles.len() > 0 { - // if candles.len() > 0 && candles != last_candle_vec { - // last_candle_vec = candles; - if let Err(e) = sink.send(&result) { - error!("Error sending candle updates: {:?}", e); + + let spawn = match ctx.candles.read() { + Ok(r) => !r.contains_key(&interval), + Err(e) => { + sink.send(&format!("RwLock poisoned!")); + return Ok(()); + } + }; + + if spawn { + info!("SPAWNING new subscriber for {:?}", interval); + let Ok(mut l) = ctx.candles.write() else { + sink.send(&"Write Lock poisoned!"); + return Ok(()); + }; + let (tx, _) = channel(10); + l.insert(interval, tx.clone()); + + let c = ctx.clone(); + let _: JoinHandle> = tokio::task::spawn(async move { + loop { + let mut conn = c.pool.get()?; + let since: DateTime = match interval { + Interval::ONE_DAY_CHANGE => Utc::now() - chrono::Duration::hours(24), + _ => Utc::now() - chrono::Duration::milliseconds(250), + }; + let candles = BtcUsdPrice::candles(&mut conn, interval, since, None, None)?; + + if candles.len() > 0 { + let result = serde_json::to_value(&candles)?; + if let Err(e) = tx.send(result) { + error!("Error sending candle updates: {:?}", e); + } + match interval { + Interval::ONE_DAY_CHANGE => { + sleep(Duration::from_millis(1000)).await; + } + _ => { + sleep(Duration::from_millis(250)).await; + } + }; + } else { + sleep(Duration::from_millis(250)).await; } - sleep(Duration::from_millis(500)).await; - } else { - sleep(Duration::from_millis(300)).await; - continue; + } + Ok(()) + }); + } + + let Ok(l) = ctx.candles.read() else { + sink.send(&"Failed to acquire rx candles channel"); + return Ok(()); + }; + + let mut rx = l.get(&interval).unwrap().subscribe(); + let _result = tokio::task::spawn(async move { + loop { + let Ok(msg) = rx.recv().await else { + error!("Recv channel broken!"); + break; + }; + + if let Err(e) = sink.send(&msg) { + error!("Error sending candle updates: {:?}", e); } } - Ok(()) + // Ok(()) }); Ok(()) @@ -120,11 +146,13 @@ pub(super) fn spawn_order_book( match rx.try_recv() { Ok(mesg) => { let mut conn = ctx.pool.get()?; - let orders = TraderOrder::order_book(&mut conn)?; - let result = serde_json::to_value(&orders)?; + let mut redis_conn = ctx.client.get_connection().expect("REDIS connection."); + // let mut orders = order_book(&mut redis_conn); + // orders.add_order(mesg); + let result = serde_json::to_value(&mesg)?; if let Err(e) = sink.send(&result) { - error!("Error sending candle updates: {:?}", e); + error!("Error sending orderbook updates: {:?}", e); } sleep(Duration::from_secs(5)).await; } @@ -149,7 +177,7 @@ pub(super) fn spawn_order_book( pub(super) fn heartbeat( _params: Params<'_>, mut sink: SubscriptionSink, - ctx: Arc, + _ctx: Arc, ) -> SubscriptionResult { sink.accept()?; @@ -161,7 +189,7 @@ pub(super) fn heartbeat( } sleep(Duration::from_secs(5)).await; } - Ok(()) + // Ok(()) }); Ok(())