diff --git a/Cargo.toml b/Cargo.toml index 3e32910..e100990 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,7 +69,7 @@ tower-http = { version = "0.4", features = [ tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } uuid = { version = "1.3.3", features = ["serde", "v4", "v7"] } verify-keplr-sign = "0.1.0" -twilight-relayer-rust = { git = "ssh://git@github.com/twilight-project/twilight-relayer.git", branch = "develop" } +twilight-relayer-rust = { git = "ssh://git@github.com/twilight-project/twilight-relayer.git", branch = "develop-fee" } redis = { version = "0.25.4", features = ["r2d2"] } [dependencies.zkos-relayer-wallet] diff --git a/migrations/2025-07-02-150000_fee_columns_and_fee_history/down.sql b/migrations/2025-07-02-150000_fee_columns_and_fee_history/down.sql new file mode 100644 index 0000000..e4bbe2a --- /dev/null +++ b/migrations/2025-07-02-150000_fee_columns_and_fee_history/down.sql @@ -0,0 +1,12 @@ +-- Drop fee_history table +DROP TABLE IF EXISTS public.fee_history; + +-- Remove fee columns from trader_order table +ALTER TABLE public.trader_order + DROP COLUMN IF EXISTS fee_filled, + DROP COLUMN IF EXISTS fee_settled; + +-- Remove fee columns from trader_order_funding_updated table +ALTER TABLE public.trader_order_funding_updated + DROP COLUMN IF EXISTS fee_filled, + DROP COLUMN IF EXISTS fee_settled; \ No newline at end of file diff --git a/migrations/2025-07-02-150000_fee_columns_and_fee_history/up.sql b/migrations/2025-07-02-150000_fee_columns_and_fee_history/up.sql new file mode 100644 index 0000000..a87fbcc --- /dev/null +++ b/migrations/2025-07-02-150000_fee_columns_and_fee_history/up.sql @@ -0,0 +1,26 @@ +-- Add fee columns to trader_order table +ALTER TABLE public.trader_order + ADD COLUMN IF NOT EXISTS fee_filled numeric NOT NULL DEFAULT 0, + ADD COLUMN IF NOT EXISTS fee_settled numeric NOT NULL DEFAULT 0; + +-- Add fee columns to trader_order_funding_updated table +ALTER TABLE public.trader_order_funding_updated + ADD COLUMN IF NOT EXISTS fee_filled numeric NOT NULL DEFAULT 0, + ADD COLUMN IF NOT EXISTS fee_settled numeric NOT NULL DEFAULT 0; + +-- Create fee_history table +CREATE TABLE IF NOT EXISTS public.fee_history +( + id bigint NOT NULL GENERATED ALWAYS AS IDENTITY, + order_filled_on_market numeric NOT NULL, + order_filled_on_limit numeric NOT NULL, + order_settled_on_market numeric NOT NULL, + order_settled_on_limit numeric NOT NULL, + "timestamp" timestamptz NOT NULL, + CONSTRAINT fee_history_pkey PRIMARY KEY (id) +); + +-- Index to speed up time based queries +CREATE INDEX IF NOT EXISTS fee_history_timestamp_idx + ON public.fee_history USING btree + ("timestamp" ASC NULLS LAST); \ No newline at end of file diff --git a/migrations/2025-07-03-120000_create_trigger_on_funding_updated/down.sql b/migrations/2025-07-03-120000_create_trigger_on_funding_updated/down.sql new file mode 100644 index 0000000..5e62fe1 --- /dev/null +++ b/migrations/2025-07-03-120000_create_trigger_on_funding_updated/down.sql @@ -0,0 +1,5 @@ +-- Drop the trigger +DROP TRIGGER IF EXISTS update_trader_order_trigger ON public.trader_order_funding_updated; + +-- Drop the function +DROP FUNCTION IF EXISTS update_trader_order_from_funding(); \ No newline at end of file diff --git a/migrations/2025-07-03-120000_create_trigger_on_funding_updated/up.sql b/migrations/2025-07-03-120000_create_trigger_on_funding_updated/up.sql new file mode 100644 index 0000000..1cc4c1f --- /dev/null +++ b/migrations/2025-07-03-120000_create_trigger_on_funding_updated/up.sql @@ -0,0 +1,18 @@ +-- Create a function to update trader_order table +CREATE OR REPLACE FUNCTION update_trader_order_from_funding() +RETURNS TRIGGER AS $$ +BEGIN + -- Update available_margin only when the order in trader_order is FILLED + UPDATE public.trader_order + SET available_margin = NEW.available_margin + WHERE uuid = NEW.uuid + AND order_status = 'FILLED'; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Create a trigger to call the function on insert or update +CREATE TRIGGER update_trader_order_trigger +AFTER INSERT OR UPDATE ON public.trader_order_funding_updated +FOR EACH ROW +EXECUTE FUNCTION update_trader_order_from_funding(); \ No newline at end of file diff --git a/src/archiver.rs b/src/archiver.rs index 23c9864..7e76cd6 100644 --- a/src/archiver.rs +++ b/src/archiver.rs @@ -115,6 +115,7 @@ pub struct DatabaseArchiver { script_sha: String, trader_orders: Vec, trader_order_funding_updated: Vec, + fee_history: Vec, lend_orders: Vec, position_size: Vec, tx_hashes: Vec, @@ -142,6 +143,7 @@ impl DatabaseArchiver { let trader_orders = Vec::with_capacity(BATCH_SIZE); let trader_order_funding_updated = Vec::with_capacity(BATCH_SIZE); + let fee_history = Vec::with_capacity(BATCH_SIZE); let lend_orders = Vec::with_capacity(BATCH_SIZE); let position_size = Vec::with_capacity(BATCH_SIZE); let tx_hashes = Vec::with_capacity(BATCH_SIZE); @@ -165,6 +167,7 @@ impl DatabaseArchiver { script_sha, trader_orders, trader_order_funding_updated, + fee_history, lend_orders, position_size, tx_hashes, @@ -645,6 +648,35 @@ impl DatabaseArchiver { Ok(()) } + /// Add a fee history to the next update batch, if the queue is full, commit and clear the + /// queue. + fn fee_history(&mut self, fee_history: NewFeeHistory) -> Result<(), ApiError> { + debug!("Appending fee history"); + + self.fee_history.push(fee_history); + + if self.fee_history.len() == self.fee_history.capacity() { + self.commit_fee_history()?; + } + + Ok(()) + } + + /// Commit a batch of fee history to the database. If we're failing to update the database, we + /// should exit. + fn commit_fee_history(&mut self) -> Result<(), ApiError> { + debug!("Committing fee history"); + + let mut conn = self.get_conn()?; + + let mut fee_history = Vec::with_capacity(self.fee_history.capacity()); + std::mem::swap(&mut fee_history, &mut self.fee_history); + + FeeHistory::append(&mut conn, fee_history)?; + + Ok(()) + } + /// Commit any pending orders of any type, regardless of batch size. fn commit_orders(&mut self) -> Result<(), ApiError> { if self.trader_orders.len() > 0 { @@ -677,12 +709,24 @@ impl DatabaseArchiver { if self.lend_pool_commands.len() > 0 { self.commit_lend_pool_commands()?; } + if self.fee_history.len() > 0 { + self.commit_fee_history()?; + } Ok(()) } fn process_msg(&mut self, event: Event) -> Result<(), ApiError> { match event { + Event::FeeUpdate(cmd, event_time) => match cmd { + relayer::RelayerCommand::UpdateFees(f_on_m, f_on_l, s_on_m, s_on_l) => { + info!("Fee update: {:?}, {:?}", cmd, event_time); + let fee_history = + NewFeeHistory::new(f_on_m, f_on_l, s_on_m, s_on_l, event_time); + self.fee_history(fee_history)?; + } + _ => {} + }, Event::TraderOrder(trader_order, _cmd, _seq) => { self.trader_order(trader_order.into())?; } diff --git a/src/database/models.rs b/src/database/models.rs index 153720e..67daca5 100644 --- a/src/database/models.rs +++ b/src/database/models.rs @@ -2,15 +2,15 @@ use crate::database::{ schema::{ address_customer_id, btc_usd_price, current_nonce, customer_account, - customer_apikey_linking, customer_order_linking, funding_rate, lend_order, lend_pool, - lend_pool_command, position_size_log, sorted_set_command, trader_order, + customer_apikey_linking, customer_order_linking, fee_history, funding_rate, lend_order, + lend_pool, lend_pool_command, position_size_log, sorted_set_command, trader_order, trader_order_funding_updated, transaction_hash, }, sql_types::*, }; use crate::rpc::{ - HistoricalFundingArgs, HistoricalPriceArgs, Interval, OrderHistoryArgs, OrderId, PnlArgs, - TradeVolumeArgs, TransactionHashArgs, + HistoricalFeeArgs, HistoricalFundingArgs, HistoricalPriceArgs, Interval, OrderHistoryArgs, + OrderId, PnlArgs, TradeVolumeArgs, TransactionHashArgs, }; use bigdecimal::{BigDecimal, FromPrimitive, ToPrimitive, Zero}; use chrono::{prelude::*, DurationRound}; @@ -23,6 +23,103 @@ use uuid::Uuid; pub type PositionSizeUpdate = (relayer::PositionSizeLogCommand, relayer_db::PositionSizeLog); +#[derive(Serialize, Deserialize, Debug, Clone, Queryable, QueryableByName)] +#[diesel(table_name = fee_history)] +pub struct FeeHistory { + pub id: i64, + pub order_filled_on_market: BigDecimal, + pub order_filled_on_limit: BigDecimal, + pub order_settled_on_market: BigDecimal, + pub order_settled_on_limit: BigDecimal, + #[diesel(sql_type = diesel::sql_types::Timestamptz)] + pub timestamp: DateTime, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Insertable, Queryable)] +#[diesel(table_name = fee_history)] +pub struct NewFeeHistory { + pub order_filled_on_market: BigDecimal, + pub order_filled_on_limit: BigDecimal, + pub order_settled_on_market: BigDecimal, + pub order_settled_on_limit: BigDecimal, + #[diesel(sql_type = diesel::sql_types::Timestamptz)] + pub timestamp: DateTime, +} + +impl NewFeeHistory { + pub fn new( + order_filled_on_market: f64, + order_filled_on_limit: f64, + order_settled_on_market: f64, + order_settled_on_limit: f64, + updated_at: String, + ) -> Self { + Self { + order_filled_on_market: BigDecimal::from_f64(order_filled_on_market) + .unwrap_or(BigDecimal::from_f64(0.0).unwrap()) + .round(4), + order_filled_on_limit: BigDecimal::from_f64(order_filled_on_limit) + .unwrap_or(BigDecimal::from_f64(0.0).unwrap()) + .round(4), + order_settled_on_market: BigDecimal::from_f64(order_settled_on_market) + .unwrap_or(BigDecimal::from_f64(0.0).unwrap()) + .round(4), + order_settled_on_limit: BigDecimal::from_f64(order_settled_on_limit) + .unwrap_or(BigDecimal::from_f64(0.0).unwrap()) + .round(4), + timestamp: DateTime::parse_from_rfc3339(&updated_at) + .unwrap_or(Utc::now().with_timezone(&FixedOffset::east(0))) + .into(), + } + } +} + +impl FeeHistory { + pub fn get(conn: &mut PgConnection) -> QueryResult { + use crate::database::schema::fee_history::dsl::*; + + fee_history.order_by(timestamp.desc()).first(conn) + } + + pub fn get_historical( + conn: &mut PgConnection, + args: HistoricalFeeArgs, + ) -> QueryResult> { + use crate::database::schema::fee_history::dsl::*; + let HistoricalFeeArgs { + from, + to, + limit, + offset, + } = args; + + fee_history + .filter(diesel::BoolExpressionMethods::and( + timestamp.ge(from), + timestamp.lt(to), + )) + .limit(limit) + .offset(offset) + .load(conn) + } + + pub fn create(conn: &mut PgConnection, new: NewFeeHistory) -> QueryResult<()> { + use crate::database::schema::fee_history::dsl::*; + + diesel::insert_into(fee_history).values(new).execute(conn)?; + + Ok(()) + } + + pub fn append(conn: &mut PgConnection, new_hashes: Vec) -> QueryResult { + use crate::database::schema::fee_history::dsl::*; + + diesel::insert_into(fee_history) + .values(new_hashes) + .execute(conn) + } +} + #[derive(Serialize, Deserialize, Debug, Clone, Queryable, QueryableByName)] #[diesel(table_name = transaction_hash)] pub struct TxHash { @@ -1152,6 +1249,8 @@ pub struct TraderOrder { pub entry_nonce: i64, pub exit_nonce: i64, pub entry_sequence: i64, + pub fee_filled: BigDecimal, + pub fee_settled: BigDecimal, } #[derive( @@ -1181,6 +1280,8 @@ pub struct TraderOrderFundingUpdates { pub entry_nonce: i64, pub exit_nonce: i64, pub entry_sequence: i64, + pub fee_filled: BigDecimal, + pub fee_settled: BigDecimal, } #[derive(Serialize, Deserialize, Debug, Clone, QueryableByName, Queryable)] @@ -1227,6 +1328,8 @@ pub struct InsertTraderOrder { pub entry_nonce: i64, pub exit_nonce: i64, pub entry_sequence: i64, + pub fee_filled: BigDecimal, + pub fee_settled: BigDecimal, } #[derive(Serialize, Deserialize, Debug, Clone, Queryable, Insertable, AsChangeset)] @@ -1253,6 +1356,8 @@ pub struct InsertTraderOrderFundingUpdates { pub entry_nonce: i64, pub exit_nonce: i64, pub entry_sequence: i64, + pub fee_filled: BigDecimal, + pub fee_settled: BigDecimal, } #[derive( @@ -2006,6 +2111,8 @@ impl From for TraderOrder { entry_nonce, exit_nonce, entry_sequence, + fee_filled, + fee_settled, } = src; TraderOrder { @@ -2034,6 +2141,8 @@ impl From for TraderOrder { entry_nonce: entry_nonce as i64, exit_nonce: exit_nonce as i64, entry_sequence: entry_sequence as i64, + fee_filled: BigDecimal::from_f64(fee_filled).unwrap().round(4), + fee_settled: BigDecimal::from_f64(fee_settled).unwrap().round(4), } } } @@ -2061,6 +2170,8 @@ impl From for InsertTraderOrder { entry_nonce, exit_nonce, entry_sequence, + fee_filled, + fee_settled, } = src; InsertTraderOrder { @@ -2088,6 +2199,8 @@ impl From for InsertTraderOrder { entry_nonce: entry_nonce as i64, exit_nonce: exit_nonce as i64, entry_sequence: entry_sequence as i64, + fee_filled: BigDecimal::from_f64(fee_filled).unwrap().round(4), + fee_settled: BigDecimal::from_f64(fee_settled).unwrap().round(4), } } } @@ -2115,6 +2228,8 @@ impl From for TraderOrderFundingUpdates { entry_nonce, exit_nonce, entry_sequence, + fee_filled, + fee_settled, } = src; TraderOrderFundingUpdates { @@ -2143,6 +2258,8 @@ impl From for TraderOrderFundingUpdates { entry_nonce: entry_nonce as i64, exit_nonce: exit_nonce as i64, entry_sequence: entry_sequence as i64, + fee_filled: BigDecimal::from_f64(fee_filled).unwrap().round(4), + fee_settled: BigDecimal::from_f64(fee_settled).unwrap().round(4), } } } @@ -2170,6 +2287,8 @@ impl From for InsertTraderOrderFundingUpdates { entry_nonce, exit_nonce, entry_sequence, + fee_filled, + fee_settled, } = src; InsertTraderOrderFundingUpdates { @@ -2197,6 +2316,8 @@ impl From for InsertTraderOrderFundingUpdates { entry_nonce: entry_nonce as i64, exit_nonce: exit_nonce as i64, entry_sequence: entry_sequence as i64, + fee_filled: BigDecimal::from_f64(fee_filled).unwrap().round(4), + fee_settled: BigDecimal::from_f64(fee_settled).unwrap().round(4), } } } diff --git a/src/database/schema.rs b/src/database/schema.rs index 7ef954a..e0fbcc0 100644 --- a/src/database/schema.rs +++ b/src/database/schema.rs @@ -129,6 +129,17 @@ diesel::table! { } } +diesel::table! { + fee_history (id) { + id -> Int8, + order_filled_on_market -> Numeric, + order_filled_on_limit -> Numeric, + order_settled_on_market -> Numeric, + order_settled_on_limit -> Numeric, + timestamp -> Timestamptz, + } +} + diesel::table! { funding_rate (id) { id -> Int8, @@ -257,6 +268,8 @@ diesel::table! { entry_nonce -> Int8, exit_nonce -> Int8, entry_sequence -> Int8, + fee_filled -> Numeric, + fee_settled -> Numeric, } } @@ -290,6 +303,8 @@ diesel::table! { entry_nonce -> Int8, exit_nonce -> Int8, entry_sequence -> Int8, + fee_filled -> Numeric, + fee_settled -> Numeric, } } @@ -325,6 +340,7 @@ diesel::allow_tables_to_appear_in_same_query!( customer_account, customer_apikey_linking, customer_order_linking, + fee_history, funding_rate, lend_order, lend_pool, @@ -335,32 +351,3 @@ diesel::allow_tables_to_appear_in_same_query!( trader_order_funding_updated, transaction_hash, ); - -// // /* ---- View: orderbook -------------------------------------------- */ -// diesel::table! { -// // Read-only view. No primary key. -// orderbook (id) { -// id -> Int8, -// uuid -> Uuid, -// account_id -> Text, -// position_type -> Varchar, -// order_status -> Varchar, -// order_type -> Varchar, -// entryprice -> Numeric, -// execution_price -> Numeric, -// positionsize -> Numeric, -// leverage -> Numeric, -// initial_margin -> Numeric, -// available_margin -> Numeric, -// timestamp -> Timestamp, -// bankruptcy_price -> Numeric, -// bankruptcy_value -> Numeric, -// maintenance_margin -> Numeric, -// liquidation_price -> Numeric, -// unrealized_pnl -> Numeric, -// settlement_price -> Nullable, -// entry_nonce -> Nullable, -// exit_nonce -> Nullable, -// entry_sequence -> Nullable, -// } -// } diff --git a/src/rpc.rs b/src/rpc.rs index 6eac06d..bbdd7d9 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -13,8 +13,9 @@ mod types; mod util; pub use types::{ - CandleSubscription, Candles, HistoricalFundingArgs, HistoricalPriceArgs, Interval, Order, - OrderHistoryArgs, OrderId, PnlArgs, RpcArgs, TradeVolumeArgs, TransactionHashArgs, + CandleSubscription, Candles, HistoricalFeeArgs, HistoricalFundingArgs, HistoricalPriceArgs, + Interval, Order, OrderHistoryArgs, OrderId, PnlArgs, RpcArgs, TradeVolumeArgs, + TransactionHashArgs, }; pub use util::{order_book, recent_orders}; @@ -89,6 +90,16 @@ pub fn init_public_methods(database_url: &str, redis_url: &str) -> RpcModule Err(Error::Custom(format!("Database error: {:?}", e))), } } +pub(super) fn historical_fee_rate( + params: Params<'_>, + ctx: &RelayerContext, +) -> Result { + let args: HistoricalFeeArgs = params.parse()?; + + match ctx.pool.get() { + Ok(mut conn) => match FeeHistory::get_historical(&mut conn, args) { + Ok(o) => Ok(serde_json::to_value(o).expect("Error converting response")), + Err(e) => Err(Error::Custom(format!("Error fetching order info: {:?}", e))), + }, + Err(e) => Err(Error::Custom(format!("Database error: {:?}", e))), + } +} + +pub(super) fn get_fee_rate( + _: Params<'_>, + ctx: &RelayerContext, +) -> Result { + match ctx.pool.get() { + Ok(mut conn) => match FeeHistory::get(&mut conn) { + Ok(o) => Ok(serde_json::to_value(o).expect("Error converting response")), + Err(e) => Err(Error::Custom(format!("Error fetching order info: {:?}", e))), + }, + Err(e) => Err(Error::Custom(format!("Database error: {:?}", e))), + } +} pub(super) fn open_limit_orders( _: Params<'_>, diff --git a/src/rpc/types.rs b/src/rpc/types.rs index 015d15b..bd5d781 100644 --- a/src/rpc/types.rs +++ b/src/rpc/types.rs @@ -11,6 +11,7 @@ // • Candle data (Kline data: 1min, 5min, 15min, 30min, 1hr, 4hr, 8hr, 12hr, 24hr) // • Position Size (For Long, Short and Total) // • Server Time +// • Fee History use crate::auth::UserInfo; use crate::database::OrderStatus; use chrono::{prelude::*, Duration}; @@ -285,3 +286,36 @@ pub struct HistoricalFundingArgs { pub limit: i64, pub offset: i64, } + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct FeeHistory { + pub order_filled_on_market: f64, + pub order_filled_on_limit: f64, + pub order_settled_on_market: f64, + pub order_settled_on_limit: f64, + pub updated_at: DateTime, +} +// impl FeeHistory { +// pub fn new( +// order_filled_on_market: f64, +// order_filled_on_limit: f64, +// order_settled_on_market: f64, +// order_settled_on_limit: f64, +// updated_at: DateTime, +// ) -> Self { +// Self { +// order_filled_on_market, +// order_filled_on_limit, +// order_settled_on_market, +// order_settled_on_limit, +// updated_at, +// } +// } + +#[derive(Debug, Serialize, Deserialize)] +pub struct HistoricalFeeArgs { + pub from: DateTime, + pub to: DateTime, + pub limit: i64, + pub offset: i64, +} diff --git a/src/ws.rs b/src/ws.rs index 7caeec4..ba9a53d 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -81,6 +81,24 @@ impl WsContext { Ok((completion, msgs)) => { for msg in msgs { match msg { + Event::FeeUpdate(cmd, event_time) => match cmd { + twilight_relayer_rust::relayer::RelayerCommand::UpdateFees( + order_filled_on_market, + order_filled_on_limit, + order_settled_on_limit, + order_settled_on_market, + ) => { + info!( + "Fee update: {:?}, {:?}, {:?}, {:?} at {:?}", + order_filled_on_market, + order_filled_on_limit, + order_settled_on_limit, + order_settled_on_market, + event_time, + ); + } + _ => {} + }, Event::TraderOrder(to, ..) | Event::TraderOrderUpdate(to, ..) | Event::TraderOrderFundingUpdate(to, ..)