Skip to content

Commit

Permalink
Fix HeadUpdate seq_nums
Browse files Browse the repository at this point in the history
  • Loading branch information
riordanp committed Apr 8, 2023
1 parent b691220 commit 66ef1c0
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 44 deletions.
63 changes: 21 additions & 42 deletions service-mango-fills/src/fill_event_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn publish_changes_perp(
header: &EventQueueHeader,
events: &EventQueueEvents,
prev_seq_num: u64,
prev_head: u64,
prev_head: usize,
prev_events: &EventQueueEvents,
fill_update_sender: &async_channel::Sender<FillEventFilterMessage>,
metric_events_new: &mut MetricU64,
Expand Down Expand Up @@ -166,34 +166,32 @@ fn publish_changes_perp(
}
}

let head_idx = header.head();
let head = head_idx as u64;
let head = header.head();

let head_seq_num = if events[head_idx].event_type == EventType::Fill as u8 {
let event: PerpFillEvent = bytemuck::cast(events[head_idx]);
event.seq_num
} else if events[head_idx].event_type == EventType::Out as u8 {
let event: PerpOutEvent = bytemuck::cast(events[head_idx]);
event.seq_num
let head_seq_num = if events[head - 1].event_type == EventType::Fill as u8 {
let event: PerpFillEvent = bytemuck::cast(events[head - 1]);
event.seq_num + 1
} else if events[head - 1].event_type == EventType::Out as u8 {
let event: PerpOutEvent = bytemuck::cast(events[head - 1]);
event.seq_num + 1
} else {
0
};

let prev_head_idx = prev_head as usize;
let prev_head_seq_num = if prev_events[prev_head_idx].event_type == EventType::Fill as u8 {
let event: PerpFillEvent = bytemuck::cast(prev_events[prev_head_idx]);
event.seq_num
} else if prev_events[prev_head_idx].event_type == EventType::Out as u8 {
let event: PerpOutEvent = bytemuck::cast(prev_events[prev_head_idx]);
event.seq_num
let prev_head_seq_num = if prev_events[prev_head - 1].event_type == EventType::Fill as u8 {
let event: PerpFillEvent = bytemuck::cast(prev_events[prev_head - 1]);
event.seq_num + 1
} else if prev_events[prev_head - 1].event_type == EventType::Out as u8 {
let event: PerpOutEvent = bytemuck::cast(prev_events[prev_head - 1]);
event.seq_num + 1
} else {
0
};

// publish a head update event if the head increased (events were consumed)
if head > prev_head {
// publish a head update event if the head changed (events were consumed)
if head != prev_head {
metric_head_update.increment();

fill_update_sender
.try_send(FillEventFilterMessage::HeadUpdate(HeadUpdate {
head,
Expand All @@ -209,25 +207,6 @@ fn publish_changes_perp(
.unwrap(); // TODO: use anyhow to bubble up error
}

// revoke head update event if it decreased (fork)
if head < prev_head {
metric_head_revoke.increment();

fill_update_sender
.try_send(FillEventFilterMessage::HeadUpdate(HeadUpdate {
head,
prev_head,
head_seq_num,
prev_head_seq_num,
status: FillUpdateStatus::Revoke,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
slot,
write_version,
}))
.unwrap(); // TODO: use anyhow to bubble up error
}

fill_update_sender
.try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint {
slot,
Expand Down Expand Up @@ -560,7 +539,7 @@ pub async fn init(
seq_num_cache.get(&evq_pk_string),
head_cache.get(&evq_pk_string),
) {
(Some(prev_seq_num), Some(old_head)) => match perp_events_cache
(Some(prev_seq_num), Some(prev_head)) => match perp_events_cache
.get(&evq_pk_string)
{
Some(prev_events) => publish_changes_perp(
Expand All @@ -570,7 +549,7 @@ pub async fn init(
&event_queue.header,
&event_queue.buf,
*prev_seq_num,
*old_head,
*prev_head,
prev_events,
&fill_update_sender,
&mut metric_events_new,
Expand All @@ -589,7 +568,7 @@ pub async fn init(
seq_num_cache
.insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
head_cache
.insert(evq_pk_string.clone(), event_queue.header.head() as u64);
.insert(evq_pk_string.clone(), event_queue.header.head());
perp_events_cache
.insert(evq_pk_string.clone(), event_queue.buf.clone());
} else {
Expand Down Expand Up @@ -634,7 +613,7 @@ pub async fn init(
}

seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
head_cache.insert(evq_pk_string.clone(), header.head);
head_cache.insert(evq_pk_string.clone(), header.head as usize);
serum_events_cache
.insert(evq_pk_string.clone(), events.clone().to_vec());
}
Expand Down
4 changes: 2 additions & 2 deletions service-mango-fills/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ impl Serialize for FillUpdate {

#[derive(Clone, Debug)]
pub struct HeadUpdate {
pub head: u64,
pub prev_head: u64,
pub head: usize,
pub prev_head: usize,
pub head_seq_num: u64,
pub prev_head_seq_num: u64,
pub status: FillUpdateStatus,
Expand Down

0 comments on commit 66ef1c0

Please sign in to comment.