Skip to content

Commit

Permalink
dry-run feature
Browse files Browse the repository at this point in the history
  • Loading branch information
losman0s committed Apr 17, 2024
1 parent 9e9e6ab commit 0d3957b
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 60 deletions.
1 change: 1 addition & 0 deletions observability/crates/event_indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ path = "bin/inspect_tx.rs"
[features]
default = ["mainnet-beta"]
mainnet-beta = ["marginfi/mainnet-beta"]
dry-run = []

[dependencies]
anchor-lang = { workspace = true }
Expand Down
115 changes: 64 additions & 51 deletions observability/crates/event_indexer/bin/backfill_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ pub async fn main() {
}

let parser = MarginfiEventParser::new(marginfi::ID, MARGINFI_GROUP_ADDRESS);
let mut entity_store = EntityStore::new(rpc_endpoint, config.database_url.clone());
let mut db_connection = establish_connection(config.database_url.clone());

let mut event_counter = 0;
let mut print_time = std::time::Instant::now();
Expand All @@ -202,6 +200,7 @@ pub async fn main() {

let TransactionData {
transaction,
#[cfg(not(feature = "dry-run"))]
task_id,
..
} = match maybe_item {
Expand Down Expand Up @@ -235,59 +234,73 @@ pub async fn main() {
print_time = std::time::Instant::now();
}

for MarginfiEventWithMeta {
event,
timestamp,
slot,
in_flashloan,
call_stack,
tx_sig,
} in events
#[cfg(not(feature = "dry-run"))]
{
let timestamp = DateTime::from_timestamp(timestamp, 0).unwrap().naive_utc();
let tx_sig = tx_sig.to_string();
let call_stack = serde_json::to_string(
&call_stack
.into_iter()
.map(|cs| cs.to_string())
.collect::<Vec<_>>(),
)
.unwrap_or_else(|_| "null".to_string());

let mut retries = 0;
retry(
ExponentialBackoffBuilder::<SystemClock>::new()
.with_max_interval(Duration::from_secs(5))
.build(),
|| match event.db_insert(
timestamp,
slot,
tx_sig.clone(),
in_flashloan,
call_stack.clone(),
&mut db_connection,
&mut entity_store,
) {
Ok(signatures) => Ok(signatures),
Err(e) => {
if retries > 5 {
error!(
let mut entity_store = EntityStore::new(rpc_endpoint, config.database_url.clone());
let mut db_connection = establish_connection(config.database_url.clone());

for MarginfiEventWithMeta {
event,
timestamp,
slot,
in_flashloan,
call_stack,
tx_sig,
} in events
{
let timestamp = DateTime::from_timestamp(timestamp, 0).unwrap().naive_utc();
let tx_sig = tx_sig.to_string();
let call_stack = serde_json::to_string(
&call_stack
.into_iter()
.map(|cs| cs.to_string())
.collect::<Vec<_>>(),
)
.unwrap_or_else(|_| "null".to_string());

#[cfg(feature = "dry-run")]
{
info!("Event: {:?} ({:?})", event, tx_sig);
}

#[cfg(not(feature = "dry-run"))]
{
let mut retries = 0;
retry(
ExponentialBackoffBuilder::<SystemClock>::new()
.with_max_interval(Duration::from_secs(5))
.build(),
|| match event.db_insert(
timestamp,
slot,
tx_sig.clone(),
in_flashloan,
call_stack.clone(),
&mut db_connection,
&mut entity_store,
) {
Ok(signatures) => Ok(signatures),
Err(e) => {
if retries > 5 {
error!(
"[{:?}] Failed to insert event after 5 retries: {:?} - {:?} ({:?})",
task_id, event, e, tx_sig
);
Err(backoff::Error::permanent(e))
} else {
warn!(
"[{:?}] Failed to insert event, retrying: {:?} - {:?} ({:?})",
task_id, event, e, tx_sig
);
retries += 1;
Err(backoff::Error::transient(e))
}
}
},
)
.unwrap();
Err(backoff::Error::permanent(e))
} else {
warn!(
"[{:?}] Failed to insert event, retrying: {:?} - {:?} ({:?})",
task_id, event, e, tx_sig
);
retries += 1;
Err(backoff::Error::transient(e))
}
}
},
)
.unwrap();
}
}
}
}
}
41 changes: 32 additions & 9 deletions observability/crates/event_indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ use yellowstone_grpc_proto::{
},
};

use crate::{db::establish_connection, entity_store::EntityStore, parser::MarginfiEvent};
use crate::{
db::establish_connection,
entity_store::EntityStore,
parser::{Event, MarginfiEvent},
};

use super::parser::{MarginfiEventParser, MarginfiEventWithMeta, MARGINFI_GROUP_ADDRESS};

Expand Down Expand Up @@ -56,14 +60,23 @@ impl EventIndexer {
.await
});

#[cfg(not(feature = "dry-run"))]
let mut db_connection = establish_connection(database_connection_url.clone());

let rpc_endpoint = format!("{}/{}", rpc_host, rpc_auth_token).to_string();
#[cfg(not(feature = "dry-run"))]
let mut entity_store = EntityStore::new(rpc_endpoint, database_connection_url);

tokio::spawn(
async move { store_events(&mut db_connection, event_rx, &mut entity_store).await },
);
tokio::spawn(async move {
store_events(
#[cfg(not(feature = "dry-run"))]
&mut db_connection,
event_rx,
#[cfg(not(feature = "dry-run"))]
&mut entity_store,
)
.await
});

Self {
parser,
Expand Down Expand Up @@ -273,9 +286,9 @@ async fn listen_to_updates(
}

async fn store_events(
db_connection: &mut PgConnection,
#[cfg(not(feature = "dry-run"))] db_connection: &mut PgConnection,
event_rx: Receiver<Vec<MarginfiEventWithMeta>>,
entity_store: &mut EntityStore,
#[cfg(not(feature = "dry-run"))] entity_store: &mut EntityStore,
) {
loop {
while let Ok(events) = event_rx.try_recv() {
Expand All @@ -289,8 +302,8 @@ async fn store_events(
tx_sig,
} in events
{
let timestamp = DateTime::from_timestamp(timestamp, 0).unwrap().naive_utc();
let tx_sig = tx_sig.to_string();
let timestamp = DateTime::from_timestamp(timestamp, 0).unwrap().naive_utc();
let call_stack = serde_json::to_string(
&call_stack
.into_iter()
Expand All @@ -299,8 +312,17 @@ async fn store_events(
)
.unwrap_or_else(|_| "null".to_string());

let mut retries = 0;
retry(
#[cfg(feature = "dry-run")]
{
if let Event::Liquidate(e) = event {
info!("Event: {:?} ({:?})", e, tx_sig);
}
}

#[cfg(not(feature = "dry-run"))]
{
let mut retries = 0;
retry(
ExponentialBackoffBuilder::<SystemClock>::new()
.with_max_interval(Duration::from_secs(5))
.build(),
Expand Down Expand Up @@ -333,6 +355,7 @@ async fn store_events(
},
)
.unwrap();
}
}
}
}
Expand Down

0 comments on commit 0d3957b

Please sign in to comment.