From 30db5871b6c0e5706e249c133aa7663c441f3c2d Mon Sep 17 00:00:00 2001 From: 0xZensh Date: Sun, 24 Dec 2023 12:35:45 +0800 Subject: [PATCH] fix: fix UTXO handling --- crates/ns-indexer/src/db/model_inscription.rs | 6 +- crates/ns-indexer/src/db/model_name_state.rs | 12 +++- crates/ns-indexer/src/db/model_utxo.rs | 58 ++++++++++--------- crates/ns-indexer/src/db/scylladb.rs | 13 +++-- 4 files changed, 52 insertions(+), 37 deletions(-) diff --git a/crates/ns-indexer/src/db/model_inscription.rs b/crates/ns-indexer/src/db/model_inscription.rs index 713bcf1..2306e5a 100644 --- a/crates/ns-indexer/src/db/model_inscription.rs +++ b/crates/ns-indexer/src/db/model_inscription.rs @@ -408,7 +408,11 @@ impl Inscription { }; let _ = db - .batch(statements[start..end].to_vec(), &values[start..end]) + .batch( + scylladb::BatchType::Logged, + statements[start..end].to_vec(), + &values[start..end], + ) .await?; start = end; } diff --git a/crates/ns-indexer/src/db/model_name_state.rs b/crates/ns-indexer/src/db/model_name_state.rs index cc22a97..8129b8e 100644 --- a/crates/ns-indexer/src/db/model_name_state.rs +++ b/crates/ns-indexer/src/db/model_name_state.rs @@ -203,7 +203,9 @@ impl NameState { statements.push(query.as_str()); values.push((state.0, state.1 as i64)); } - let _ = db.batch(statements, values).await?; + let _ = db + .batch(scylladb::BatchType::Unlogged, statements, values) + .await?; Ok(()) } @@ -220,7 +222,9 @@ impl NameState { statements.push(query); values.push((state.0, state.1)); } - let _ = db.batch(statements, values).await?; + let _ = db + .batch(scylladb::BatchType::Unlogged, statements, values) + .await?; Ok(()) } @@ -248,7 +252,9 @@ impl NameState { statements.push(query.as_str()); values.push((state.0, state.1)); } - let _ = db.batch(statements, values).await?; + let _ = db + .batch(scylladb::BatchType::Unlogged, statements, values) + .await?; Ok(()) } diff --git a/crates/ns-indexer/src/db/model_utxo.rs b/crates/ns-indexer/src/db/model_utxo.rs index 2aca136..e9c5ef1 100644 --- a/crates/ns-indexer/src/db/model_utxo.rs +++ b/crates/ns-indexer/src/db/model_utxo.rs @@ -40,66 +40,70 @@ impl Utxo { spent: &Vec, unspent: &Vec<(Vec, utxo::UTXO)>, ) -> anyhow::Result<()> { - // delete spent utxos let mut start = 0; - while start < spent.len() { - let end = if start + 1000 > spent.len() { - spent.len() + while start < unspent.len() { + let end = if start + 1000 > unspent.len() { + unspent.len() } else { start + 1000 }; - let mut statements: Vec<&str> = Vec::with_capacity(end - start); - let mut values: Vec> = Vec::with_capacity(end - start); - let query = "DELETE FROM utxo WHERE txid=? AND vout=?"; + let mut statements: Vec<&str> = Vec::with_capacity(unspent.len()); + let mut values: Vec> = Vec::with_capacity(unspent.len()); + let query = "INSERT INTO utxo (txid,vout,amount,address) VALUES (?,?,?,?)"; - for tx in &spent[start..end] { + for tx in &unspent[start..end] { statements.push(query); - values.push(vec![tx.txid.to_cql(), (tx.vout as i32).to_cql()]); + let tx = Self::from_utxo(tx.0.clone(), &tx.1); + values.push(vec![ + tx.txid.to_cql(), + tx.vout.to_cql(), + tx.amount.to_cql(), + tx.address.to_cql(), + ]); } if statements.len() > 500 { log::info!(target: "ns-indexer", - action = "handle_spent_utxos", + action = "handle_unspent_utxos", statements = statements.len(); "", ); } - let _ = db.batch(statements, values).await?; + let _ = db + .batch(scylladb::BatchType::Unlogged, statements, values) + .await?; start = end; } + // delete spent utxos after insert unspent utxos let mut start = 0; - while start < unspent.len() { - let end = if start + 1000 > unspent.len() { - unspent.len() + while start < spent.len() { + let end = if start + 1000 > spent.len() { + spent.len() } else { start + 1000 }; - let mut statements: Vec<&str> = Vec::with_capacity(unspent.len()); - let mut values: Vec> = Vec::with_capacity(unspent.len()); - let query = "INSERT INTO utxo (txid,vout,amount,address) VALUES (?,?,?,?)"; + let mut statements: Vec<&str> = Vec::with_capacity(end - start); + let mut values: Vec> = Vec::with_capacity(end - start); + let query = "DELETE FROM utxo WHERE txid=? AND vout=?"; - for tx in &unspent[start..end] { + for tx in &spent[start..end] { statements.push(query); - let tx = Self::from_utxo(tx.0.clone(), &tx.1); - values.push(vec![ - tx.txid.to_cql(), - tx.vout.to_cql(), - tx.amount.to_cql(), - tx.address.to_cql(), - ]); + values.push(vec![tx.txid.to_cql(), (tx.vout as i32).to_cql()]); } if statements.len() > 500 { log::info!(target: "ns-indexer", - action = "handle_unspent_utxos", + action = "handle_spent_utxos", statements = statements.len(); "", ); } - let _ = db.batch(statements, values).await?; + let _ = db + .batch(scylladb::BatchType::Unlogged, statements, values) + .await?; start = end; } diff --git a/crates/ns-indexer/src/db/scylladb.rs b/crates/ns-indexer/src/db/scylladb.rs index 636e86a..e11a82a 100644 --- a/crates/ns-indexer/src/db/scylladb.rs +++ b/crates/ns-indexer/src/db/scylladb.rs @@ -12,7 +12,7 @@ use scylla::{ use std::{sync::Arc, time::Duration}; pub use scylla::{ - batch::Batch, + batch::{Batch, BatchStatement, BatchType}, frame::response::result::{ColumnType, Row}, query::Query, Bytes, @@ -97,13 +97,14 @@ impl ScyllaDB { // BATCH with conditions cannot span multiple tables pub async fn batch( &self, - statements: Vec<&str>, + batch_type: BatchType, + statements: Vec>, values: impl BatchValues, ) -> anyhow::Result { - let mut batch: Batch = Default::default(); - for statement in statements { - batch.append_statement(statement); - } + let batch = Batch::new_with_statements( + batch_type, + statements.into_iter().map(|s| s.into()).collect(), + ); let res = self.session.batch(&batch, values).await?; Ok(res) }