Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ tower-http = { version = "0.4", features = [
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
uuid = { version = "1.3.3", features = ["serde", "v4", "v7"] }
verify-keplr-sign = "0.1.0"
twilight-relayer-rust = { git = "ssh://git@github.com/twilight-project/twilight-relayer.git", branch = "develop" }
twilight-relayer-rust = { git = "ssh://git@github.com/twilight-project/twilight-relayer.git", branch = "develop-fee" }
redis = { version = "0.25.4", features = ["r2d2"] }

[dependencies.zkos-relayer-wallet]
Expand Down
12 changes: 12 additions & 0 deletions migrations/2025-07-02-150000_fee_columns_and_fee_history/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- Drop fee_history table
DROP TABLE IF EXISTS public.fee_history;

-- Remove fee columns from trader_order table
ALTER TABLE public.trader_order
DROP COLUMN IF EXISTS fee_filled,
DROP COLUMN IF EXISTS fee_settled;

-- Remove fee columns from trader_order_funding_updated table
ALTER TABLE public.trader_order_funding_updated
DROP COLUMN IF EXISTS fee_filled,
DROP COLUMN IF EXISTS fee_settled;
26 changes: 26 additions & 0 deletions migrations/2025-07-02-150000_fee_columns_and_fee_history/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-- Add fee columns to trader_order table
ALTER TABLE public.trader_order
ADD COLUMN IF NOT EXISTS fee_filled numeric NOT NULL DEFAULT 0,
ADD COLUMN IF NOT EXISTS fee_settled numeric NOT NULL DEFAULT 0;

-- Add fee columns to trader_order_funding_updated table
ALTER TABLE public.trader_order_funding_updated
ADD COLUMN IF NOT EXISTS fee_filled numeric NOT NULL DEFAULT 0,
ADD COLUMN IF NOT EXISTS fee_settled numeric NOT NULL DEFAULT 0;

-- Create fee_history table
CREATE TABLE IF NOT EXISTS public.fee_history
(
id bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
order_filled_on_market numeric NOT NULL,
order_filled_on_limit numeric NOT NULL,
order_settled_on_market numeric NOT NULL,
order_settled_on_limit numeric NOT NULL,
"timestamp" timestamptz NOT NULL,
CONSTRAINT fee_history_pkey PRIMARY KEY (id)
);

-- Index to speed up time based queries
CREATE INDEX IF NOT EXISTS fee_history_timestamp_idx
ON public.fee_history USING btree
("timestamp" ASC NULLS LAST);
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Drop the trigger
DROP TRIGGER IF EXISTS update_trader_order_trigger ON public.trader_order_funding_updated;

-- Drop the function
DROP FUNCTION IF EXISTS update_trader_order_from_funding();
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- Create a function to update trader_order table
CREATE OR REPLACE FUNCTION update_trader_order_from_funding()
RETURNS TRIGGER AS $$
BEGIN
-- Update available_margin only when the order in trader_order is FILLED
UPDATE public.trader_order
SET available_margin = NEW.available_margin
WHERE uuid = NEW.uuid
AND order_status = 'FILLED';
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create a trigger to call the function on insert or update
CREATE TRIGGER update_trader_order_trigger
AFTER INSERT OR UPDATE ON public.trader_order_funding_updated
FOR EACH ROW
EXECUTE FUNCTION update_trader_order_from_funding();
44 changes: 44 additions & 0 deletions src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ pub struct DatabaseArchiver {
script_sha: String,
trader_orders: Vec<InsertTraderOrder>,
trader_order_funding_updated: Vec<InsertTraderOrderFundingUpdates>,
fee_history: Vec<NewFeeHistory>,
lend_orders: Vec<InsertLendOrder>,
position_size: Vec<PositionSizeUpdate>,
tx_hashes: Vec<NewTxHash>,
Expand Down Expand Up @@ -142,6 +143,7 @@ impl DatabaseArchiver {

let trader_orders = Vec::with_capacity(BATCH_SIZE);
let trader_order_funding_updated = Vec::with_capacity(BATCH_SIZE);
let fee_history = Vec::with_capacity(BATCH_SIZE);
let lend_orders = Vec::with_capacity(BATCH_SIZE);
let position_size = Vec::with_capacity(BATCH_SIZE);
let tx_hashes = Vec::with_capacity(BATCH_SIZE);
Expand All @@ -165,6 +167,7 @@ impl DatabaseArchiver {
script_sha,
trader_orders,
trader_order_funding_updated,
fee_history,
lend_orders,
position_size,
tx_hashes,
Expand Down Expand Up @@ -645,6 +648,35 @@ impl DatabaseArchiver {
Ok(())
}

/// Add a fee history to the next update batch, if the queue is full, commit and clear the
/// queue.
fn fee_history(&mut self, fee_history: NewFeeHistory) -> Result<(), ApiError> {
debug!("Appending fee history");

self.fee_history.push(fee_history);

if self.fee_history.len() == self.fee_history.capacity() {
self.commit_fee_history()?;
}

Ok(())
}

/// Commit a batch of fee history to the database. If we're failing to update the database, we
/// should exit.
fn commit_fee_history(&mut self) -> Result<(), ApiError> {
debug!("Committing fee history");

let mut conn = self.get_conn()?;

let mut fee_history = Vec::with_capacity(self.fee_history.capacity());
std::mem::swap(&mut fee_history, &mut self.fee_history);

FeeHistory::append(&mut conn, fee_history)?;

Ok(())
}

/// Commit any pending orders of any type, regardless of batch size.
fn commit_orders(&mut self) -> Result<(), ApiError> {
if self.trader_orders.len() > 0 {
Expand Down Expand Up @@ -677,12 +709,24 @@ impl DatabaseArchiver {
if self.lend_pool_commands.len() > 0 {
self.commit_lend_pool_commands()?;
}
if self.fee_history.len() > 0 {
self.commit_fee_history()?;
}

Ok(())
}

fn process_msg(&mut self, event: Event) -> Result<(), ApiError> {
match event {
Event::FeeUpdate(cmd, event_time) => match cmd {
relayer::RelayerCommand::UpdateFees(f_on_m, f_on_l, s_on_m, s_on_l) => {
info!("Fee update: {:?}, {:?}", cmd, event_time);
let fee_history =
NewFeeHistory::new(f_on_m, f_on_l, s_on_m, s_on_l, event_time);
self.fee_history(fee_history)?;
}
_ => {}
},
Event::TraderOrder(trader_order, _cmd, _seq) => {
self.trader_order(trader_order.into())?;
}
Expand Down
129 changes: 125 additions & 4 deletions src/database/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
use crate::database::{
schema::{
address_customer_id, btc_usd_price, current_nonce, customer_account,
customer_apikey_linking, customer_order_linking, funding_rate, lend_order, lend_pool,
lend_pool_command, position_size_log, sorted_set_command, trader_order,
customer_apikey_linking, customer_order_linking, fee_history, funding_rate, lend_order,
lend_pool, lend_pool_command, position_size_log, sorted_set_command, trader_order,
trader_order_funding_updated, transaction_hash,
},
sql_types::*,
};
use crate::rpc::{
HistoricalFundingArgs, HistoricalPriceArgs, Interval, OrderHistoryArgs, OrderId, PnlArgs,
TradeVolumeArgs, TransactionHashArgs,
HistoricalFeeArgs, HistoricalFundingArgs, HistoricalPriceArgs, Interval, OrderHistoryArgs,
OrderId, PnlArgs, TradeVolumeArgs, TransactionHashArgs,
};
use bigdecimal::{BigDecimal, FromPrimitive, ToPrimitive, Zero};
use chrono::{prelude::*, DurationRound};
Expand All @@ -23,6 +23,103 @@ use uuid::Uuid;

pub type PositionSizeUpdate = (relayer::PositionSizeLogCommand, relayer_db::PositionSizeLog);

#[derive(Serialize, Deserialize, Debug, Clone, Queryable, QueryableByName)]
#[diesel(table_name = fee_history)]
pub struct FeeHistory {
pub id: i64,
pub order_filled_on_market: BigDecimal,
pub order_filled_on_limit: BigDecimal,
pub order_settled_on_market: BigDecimal,
pub order_settled_on_limit: BigDecimal,
#[diesel(sql_type = diesel::sql_types::Timestamptz)]
pub timestamp: DateTime<Utc>,
}

#[derive(Serialize, Deserialize, Debug, Clone, Insertable, Queryable)]
#[diesel(table_name = fee_history)]
pub struct NewFeeHistory {
pub order_filled_on_market: BigDecimal,
pub order_filled_on_limit: BigDecimal,
pub order_settled_on_market: BigDecimal,
pub order_settled_on_limit: BigDecimal,
#[diesel(sql_type = diesel::sql_types::Timestamptz)]
pub timestamp: DateTime<Utc>,
}

impl NewFeeHistory {
pub fn new(
order_filled_on_market: f64,
order_filled_on_limit: f64,
order_settled_on_market: f64,
order_settled_on_limit: f64,
updated_at: String,
) -> Self {
Self {
order_filled_on_market: BigDecimal::from_f64(order_filled_on_market)
.unwrap_or(BigDecimal::from_f64(0.0).unwrap())
.round(4),
order_filled_on_limit: BigDecimal::from_f64(order_filled_on_limit)
.unwrap_or(BigDecimal::from_f64(0.0).unwrap())
.round(4),
order_settled_on_market: BigDecimal::from_f64(order_settled_on_market)
.unwrap_or(BigDecimal::from_f64(0.0).unwrap())
.round(4),
order_settled_on_limit: BigDecimal::from_f64(order_settled_on_limit)
.unwrap_or(BigDecimal::from_f64(0.0).unwrap())
.round(4),
timestamp: DateTime::parse_from_rfc3339(&updated_at)
.unwrap_or(Utc::now().with_timezone(&FixedOffset::east(0)))
.into(),
}
}
}

impl FeeHistory {
pub fn get(conn: &mut PgConnection) -> QueryResult<FeeHistory> {
use crate::database::schema::fee_history::dsl::*;

fee_history.order_by(timestamp.desc()).first(conn)
}

pub fn get_historical(
conn: &mut PgConnection,
args: HistoricalFeeArgs,
) -> QueryResult<Vec<FeeHistory>> {
use crate::database::schema::fee_history::dsl::*;
let HistoricalFeeArgs {
from,
to,
limit,
offset,
} = args;

fee_history
.filter(diesel::BoolExpressionMethods::and(
timestamp.ge(from),
timestamp.lt(to),
))
.limit(limit)
.offset(offset)
.load(conn)
}

pub fn create(conn: &mut PgConnection, new: NewFeeHistory) -> QueryResult<()> {
use crate::database::schema::fee_history::dsl::*;

diesel::insert_into(fee_history).values(new).execute(conn)?;

Ok(())
}

pub fn append(conn: &mut PgConnection, new_hashes: Vec<NewFeeHistory>) -> QueryResult<usize> {
use crate::database::schema::fee_history::dsl::*;

diesel::insert_into(fee_history)
.values(new_hashes)
.execute(conn)
}
}

#[derive(Serialize, Deserialize, Debug, Clone, Queryable, QueryableByName)]
#[diesel(table_name = transaction_hash)]
pub struct TxHash {
Expand Down Expand Up @@ -1152,6 +1249,8 @@ pub struct TraderOrder {
pub entry_nonce: i64,
pub exit_nonce: i64,
pub entry_sequence: i64,
pub fee_filled: BigDecimal,
pub fee_settled: BigDecimal,
}

#[derive(
Expand Down Expand Up @@ -1181,6 +1280,8 @@ pub struct TraderOrderFundingUpdates {
pub entry_nonce: i64,
pub exit_nonce: i64,
pub entry_sequence: i64,
pub fee_filled: BigDecimal,
pub fee_settled: BigDecimal,
}

#[derive(Serialize, Deserialize, Debug, Clone, QueryableByName, Queryable)]
Expand Down Expand Up @@ -1227,6 +1328,8 @@ pub struct InsertTraderOrder {
pub entry_nonce: i64,
pub exit_nonce: i64,
pub entry_sequence: i64,
pub fee_filled: BigDecimal,
pub fee_settled: BigDecimal,
}

#[derive(Serialize, Deserialize, Debug, Clone, Queryable, Insertable, AsChangeset)]
Expand All @@ -1253,6 +1356,8 @@ pub struct InsertTraderOrderFundingUpdates {
pub entry_nonce: i64,
pub exit_nonce: i64,
pub entry_sequence: i64,
pub fee_filled: BigDecimal,
pub fee_settled: BigDecimal,
}

#[derive(
Expand Down Expand Up @@ -2006,6 +2111,8 @@ impl From<relayer::TraderOrder> for TraderOrder {
entry_nonce,
exit_nonce,
entry_sequence,
fee_filled,
fee_settled,
} = src;

TraderOrder {
Expand Down Expand Up @@ -2034,6 +2141,8 @@ impl From<relayer::TraderOrder> for TraderOrder {
entry_nonce: entry_nonce as i64,
exit_nonce: exit_nonce as i64,
entry_sequence: entry_sequence as i64,
fee_filled: BigDecimal::from_f64(fee_filled).unwrap().round(4),
fee_settled: BigDecimal::from_f64(fee_settled).unwrap().round(4),
}
}
}
Expand Down Expand Up @@ -2061,6 +2170,8 @@ impl From<relayer::TraderOrder> for InsertTraderOrder {
entry_nonce,
exit_nonce,
entry_sequence,
fee_filled,
fee_settled,
} = src;

InsertTraderOrder {
Expand Down Expand Up @@ -2088,6 +2199,8 @@ impl From<relayer::TraderOrder> for InsertTraderOrder {
entry_nonce: entry_nonce as i64,
exit_nonce: exit_nonce as i64,
entry_sequence: entry_sequence as i64,
fee_filled: BigDecimal::from_f64(fee_filled).unwrap().round(4),
fee_settled: BigDecimal::from_f64(fee_settled).unwrap().round(4),
}
}
}
Expand Down Expand Up @@ -2115,6 +2228,8 @@ impl From<relayer::TraderOrder> for TraderOrderFundingUpdates {
entry_nonce,
exit_nonce,
entry_sequence,
fee_filled,
fee_settled,
} = src;

TraderOrderFundingUpdates {
Expand Down Expand Up @@ -2143,6 +2258,8 @@ impl From<relayer::TraderOrder> for TraderOrderFundingUpdates {
entry_nonce: entry_nonce as i64,
exit_nonce: exit_nonce as i64,
entry_sequence: entry_sequence as i64,
fee_filled: BigDecimal::from_f64(fee_filled).unwrap().round(4),
fee_settled: BigDecimal::from_f64(fee_settled).unwrap().round(4),
}
}
}
Expand Down Expand Up @@ -2170,6 +2287,8 @@ impl From<relayer::TraderOrder> for InsertTraderOrderFundingUpdates {
entry_nonce,
exit_nonce,
entry_sequence,
fee_filled,
fee_settled,
} = src;

InsertTraderOrderFundingUpdates {
Expand Down Expand Up @@ -2197,6 +2316,8 @@ impl From<relayer::TraderOrder> for InsertTraderOrderFundingUpdates {
entry_nonce: entry_nonce as i64,
exit_nonce: exit_nonce as i64,
entry_sequence: entry_sequence as i64,
fee_filled: BigDecimal::from_f64(fee_filled).unwrap().round(4),
fee_settled: BigDecimal::from_f64(fee_settled).unwrap().round(4),
}
}
}
Expand Down
Loading
Loading