From 532ee8aa00c51e05d937b88e52359f1ee765db0c Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Fri, 22 Sep 2023 18:39:16 +0500 Subject: [PATCH 01/11] Separate send and confirm rpc calls into their own retry blocks --- consumer/src/solana.rs | 52 +++++++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/consumer/src/solana.rs b/consumer/src/solana.rs index 50c0dd5..b9bbf32 100644 --- a/consumer/src/solana.rs +++ b/consumer/src/solana.rs @@ -25,7 +25,10 @@ use mpl_token_metadata::{ }, state::{Creator, DataV2, EDITION, PREFIX}, }; -use solana_client::rpc_client::RpcClient as SolanaRpcClient; +use solana_client::{ + client_error::{ClientError, ClientErrorKind}, + rpc_client::RpcClient as SolanaRpcClient, +}; use solana_program::{ instruction::Instruction, program_pack::Pack, pubkey::Pubkey, system_instruction::create_account, system_program, @@ -33,7 +36,7 @@ use solana_program::{ use solana_sdk::{ signature::Signature, signer::{keypair::Keypair, Signer}, - transaction::Transaction, + transaction::{Transaction, TransactionError}, }; use solana_transaction_status::{UiInnerInstructions, UiInstruction, UiTransactionEncoding}; use spl_account_compression::{ @@ -68,6 +71,9 @@ macro_rules! call_with_retry { .with_min_delay(Duration::from_millis(30)) .with_max_times(10), ) + .notify(|err: &ClientError, dur: Duration| { + error!("retrying error {:?} in {:?}", err, dur); + }) .call() }}; } @@ -247,10 +253,7 @@ impl Solana { let signatures = transaction .signed_message_signatures .iter() - .map(|s| { - Signature::from_str(s) - .map_err(|e| anyhow!(format!("failed to parse signature: {e}"))) - }) + .map(|s| Signature::from_str(s).context("failed to parse signature")) .collect::>>()?; let message = bincode::deserialize( @@ -265,13 +268,40 @@ impl Solana { message, }; - call_with_retry!(self.rpc().send_and_confirm_transaction(&transaction)) - .map(|s| s.to_string()) - .map_err(|e| { - let msg = format!("failed to submit transaction: {e}"); + let signature = + call_with_retry!(self.rpc().send_transaction(&transaction)).map_err(|e| { + let msg = format!("failed to send transaction: {e}"); error!(msg); anyhow!(msg) - }) + })?; + + let signature = (|| { + let status = self + .rpc() + .get_signature_status(&signature) + .map_err(|e| e.kind)?; + + match status { + Some(Ok(_)) => Ok(signature), + Some(Err(e)) => Err(ClientErrorKind::TransactionError(e)), + None => Err(TransactionError::BlockhashNotFound.into()), + } + }) + .retry( + &ExponentialBuilder::default() + .with_jitter() + .with_min_delay(Duration::from_millis(10)) + .with_max_times(15), + ) + .when(|e| e.get_transaction_error() == Some(TransactionError::BlockhashNotFound)) + .notify(|err: &ClientErrorKind, dur: Duration| { + error!("retrying error {:?} in {:?}", err, dur); + }) + .call()?; + + Ok(signature.to_string()) + + } } From 54010e72f3da645041753bac2dcb9598b6833d8b Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Fri, 22 Sep 2023 19:05:19 +0500 Subject: [PATCH 02/11] fmt --- consumer/src/solana.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/consumer/src/solana.rs b/consumer/src/solana.rs index b9bbf32..4bb7af4 100644 --- a/consumer/src/solana.rs +++ b/consumer/src/solana.rs @@ -275,15 +275,14 @@ impl Solana { anyhow!(msg) })?; - let signature = (|| { + (|| { let status = self .rpc() .get_signature_status(&signature) .map_err(|e| e.kind)?; match status { - Some(Ok(_)) => Ok(signature), - Some(Err(e)) => Err(ClientErrorKind::TransactionError(e)), + Some(result) => result.map_err(ClientErrorKind::TransactionError), None => Err(TransactionError::BlockhashNotFound.into()), } }) @@ -293,15 +292,16 @@ impl Solana { .with_min_delay(Duration::from_millis(10)) .with_max_times(15), ) - .when(|e| e.get_transaction_error() == Some(TransactionError::BlockhashNotFound)) + .when(|e| { + e.get_transaction_error() == Some(TransactionError::BlockhashNotFound) + || e.get_transaction_error().is_none() + }) .notify(|err: &ClientErrorKind, dur: Duration| { error!("retrying error {:?} in {:?}", err, dur); }) .call()?; - Ok(signature.to_string()) - - + Ok(signature.to_string()) } } From c11d503eed8258f95dbb272e21301359372a124d Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Mon, 25 Sep 2023 22:28:49 +0500 Subject: [PATCH 03/11] Change min delay to 200ms for retry block of get signature status rpc call --- consumer/src/solana.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumer/src/solana.rs b/consumer/src/solana.rs index 4bb7af4..d438996 100644 --- a/consumer/src/solana.rs +++ b/consumer/src/solana.rs @@ -289,7 +289,7 @@ impl Solana { .retry( &ExponentialBuilder::default() .with_jitter() - .with_min_delay(Duration::from_millis(10)) + .with_min_delay(Duration::from_millis(200)) .with_max_times(15), ) .when(|e| { From a2998f8cbc0cc4f631d06c9c2177f2811985b93f Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Tue, 26 Sep 2023 00:38:28 +0500 Subject: [PATCH 04/11] remove retries for send transaction --- consumer/src/solana.rs | 44 +++++++----------------------------------- 1 file changed, 7 insertions(+), 37 deletions(-) diff --git a/consumer/src/solana.rs b/consumer/src/solana.rs index d438996..ed505d4 100644 --- a/consumer/src/solana.rs +++ b/consumer/src/solana.rs @@ -25,10 +25,7 @@ use mpl_token_metadata::{ }, state::{Creator, DataV2, EDITION, PREFIX}, }; -use solana_client::{ - client_error::{ClientError, ClientErrorKind}, - rpc_client::RpcClient as SolanaRpcClient, -}; +use solana_client::{client_error::ClientError, rpc_client::RpcClient as SolanaRpcClient}; use solana_program::{ instruction::Instruction, program_pack::Pack, pubkey::Pubkey, system_instruction::create_account, system_program, @@ -36,7 +33,7 @@ use solana_program::{ use solana_sdk::{ signature::Signature, signer::{keypair::Keypair, Signer}, - transaction::{Transaction, TransactionError}, + transaction::Transaction, }; use solana_transaction_status::{UiInnerInstructions, UiInstruction, UiTransactionEncoding}; use spl_account_compression::{ @@ -268,38 +265,11 @@ impl Solana { message, }; - let signature = - call_with_retry!(self.rpc().send_transaction(&transaction)).map_err(|e| { - let msg = format!("failed to send transaction: {e}"); - error!(msg); - anyhow!(msg) - })?; - - (|| { - let status = self - .rpc() - .get_signature_status(&signature) - .map_err(|e| e.kind)?; - - match status { - Some(result) => result.map_err(ClientErrorKind::TransactionError), - None => Err(TransactionError::BlockhashNotFound.into()), - } - }) - .retry( - &ExponentialBuilder::default() - .with_jitter() - .with_min_delay(Duration::from_millis(200)) - .with_max_times(15), - ) - .when(|e| { - e.get_transaction_error() == Some(TransactionError::BlockhashNotFound) - || e.get_transaction_error().is_none() - }) - .notify(|err: &ClientErrorKind, dur: Duration| { - error!("retrying error {:?} in {:?}", err, dur); - }) - .call()?; + let signature = self.rpc().send_transaction(&transaction).map_err(|e| { + let msg = format!("failed to send transaction: {e}"); + error!(msg); + anyhow!(msg) + })?; Ok(signature.to_string()) } From 888aab475380304cad553909d258e1a4ecac1444 Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Tue, 26 Sep 2023 01:59:42 +0500 Subject: [PATCH 05/11] Add retry to get_transaction rpc call --- consumer/src/solana.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/consumer/src/solana.rs b/consumer/src/solana.rs index ed505d4..2982140 100644 --- a/consumer/src/solana.rs +++ b/consumer/src/solana.rs @@ -66,7 +66,7 @@ macro_rules! call_with_retry { &ExponentialBuilder::default() .with_jitter() .with_min_delay(Duration::from_millis(30)) - .with_max_times(10), + .with_max_times(15), ) .notify(|err: &ClientError, dur: Duration| { error!("retrying error {:?} in {:?}", err, dur); @@ -204,9 +204,10 @@ impl Solana { &self, signature: &Signature, ) -> Result { - let response = self - .rpc() - .get_transaction(signature, UiTransactionEncoding::Json)?; + let response = call_with_retry!( + self.rpc() + .get_transaction(signature, UiTransactionEncoding::Json) + )?; let meta = response .transaction From b9ffbd7def56c4db96d4b4a416a9bb05a88fa992 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Mon, 2 Oct 2023 07:10:21 +0500 Subject: [PATCH 06/11] return if compression leaf not found --- indexer/src/processor.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/indexer/src/processor.rs b/indexer/src/processor.rs index b6531dd..71bc0f2 100644 --- a/indexer/src/processor.rs +++ b/indexer/src/processor.rs @@ -135,9 +135,12 @@ impl Processor { let asset_id = get_asset_id(&merkle_tree, tkn_instruction.nonce); let compression_leaf = - CompressionLeaf::find_by_asset_id(conn, asset_id.to_string()) - .await? - .context("compression leaf not found")?; + CompressionLeaf::find_by_asset_id(conn, asset_id.to_string()).await?; + + if compression_leaf.is_none() { + return Ok(()); + } + let compression_leaf = compression_leaf.context("Compression leaf not found")?; let collection_mint_id = compression_leaf.id; let leaf_owner = compression_leaf.leaf_owner.clone(); From 092b385c612bc57211574cd7a6ec5b563aa8094f Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Wed, 4 Oct 2023 08:20:08 +0500 Subject: [PATCH 07/11] use an unbounded channel to process transactions --- Cargo.lock | 15 ++++++++ core/src/db.rs | 2 +- indexer/Cargo.toml | 2 +- indexer/src/handler.rs | 75 ++++++++++++++++++++++++++++++++++++---- indexer/src/lib.rs | 3 ++ indexer/src/processor.rs | 45 +++--------------------- 6 files changed, 93 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63769e3..cd49ec5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1458,6 +1458,20 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -5406,6 +5420,7 @@ version = "0.1.0" dependencies = [ "anchor-lang", "backoff", + "crossbeam", "dashmap", "futures", "hex", diff --git a/core/src/db.rs b/core/src/db.rs index 2cc9c7f..18ce10b 100644 --- a/core/src/db.rs +++ b/core/src/db.rs @@ -12,7 +12,7 @@ pub struct DbArgs { pub connection_timeout: u64, #[arg(long, env, default_value_t = 10)] pub acquire_timeout: u64, - #[arg(long, env, default_value_t = 60)] + #[arg(long, env, default_value_t = 10)] pub idle_timeout: u64, #[arg(long, env)] pub database_url: String, diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index c9f3173..7b97ec1 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -29,7 +29,7 @@ dashmap = "5.4.0" spl-token = "=3.5.0" solana-client = "1.14" backoff = { version = "0.4.0", features = ["tokio"] } - +crossbeam = "0.8.2" [dependencies.hub-core] package = "holaplex-hub-core" diff --git a/indexer/src/handler.rs b/indexer/src/handler.rs index f70c3ee..223dd7e 100644 --- a/indexer/src/handler.rs +++ b/indexer/src/handler.rs @@ -1,16 +1,23 @@ use std::sync::Arc; +use crossbeam::channel::{self, Receiver, Sender}; +use dashmap::DashMap; use futures::{sink::SinkExt, stream::StreamExt}; use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents}; use hub_core::{ backon::{ExponentialBuilder, Retryable}, prelude::*, producer::Producer, - tokio, + tokio::{ + self, + task::{self, JoinSet}, + }, }; use solana_client::rpc_client::RpcClient; use yellowstone_grpc_client::GeyserGrpcClientError; -use yellowstone_grpc_proto::prelude::SubscribeRequest; +use yellowstone_grpc_proto::prelude::{ + subscribe_update::UpdateOneof, SubscribeRequest, SubscribeUpdateTransaction, +}; use crate::{processor::Processor, Args, GeyserGrpcConnector}; @@ -18,6 +25,10 @@ use crate::{processor::Processor, Args, GeyserGrpcConnector}; pub struct MessageHandler { connector: GeyserGrpcConnector, processor: Processor, + dashmap: DashMap>, + tx: Sender, + rx: Receiver, + parallelism: usize, } impl MessageHandler { @@ -26,6 +37,7 @@ impl MessageHandler { dragon_mouth_endpoint, dragon_mouth_x_token, solana_endpoint, + parallelism, db, } = args; @@ -35,12 +47,16 @@ impl MessageHandler { let rpc = Arc::new(RpcClient::new(solana_endpoint)); let connector = GeyserGrpcConnector::new(dragon_mouth_endpoint, dragon_mouth_x_token); - + let (tx, rx) = channel::unbounded(); let processor = Processor::new(db, rpc, producer); Ok(Self { connector, processor, + dashmap: DashMap::new(), + tx, + rx, + parallelism, }) } @@ -54,9 +70,23 @@ impl MessageHandler { .map_err(GeyserGrpcClientError::SubscribeSendError)?; while let Some(message) = stream.next().await { - self.processor.process(message).await?; + match message { + Ok(msg) => match msg.update_oneof { + Some(UpdateOneof::Transaction(tx)) => { + self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); + }, + Some(UpdateOneof::Slot(slot)) => { + if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) { + for tx in transactions { + self.tx.send(tx)?; + } + } + }, + _ => {}, + }, + Err(error) => bail!("stream error: {:?}", error), + }; } - Ok(()) }) .retry( @@ -81,12 +111,42 @@ impl MessageHandler { }); let mpl_bubblegum_stream = tokio::spawn({ + let handler = self.clone(); async move { - self.connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID)) + handler + .connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID)) .await } }); + let rx = self.rx.clone(); + let processor = self.processor; + + let process_task = task::spawn(async move { + let mut set = JoinSet::new(); + + loop { + let processor = processor.clone(); + + while set.len() >= self.parallelism { + match set.join_next().await { + Some(Err(e)) => bail!("failed to join task {:?}", e), + Some(Ok(_)) | None => (), + } + } + + match rx.recv() { + Ok(tx) => { + set.spawn(processor.process_transaction(tx)); + }, + Err(e) => { + error!("{:?}", e); + return Result::<(), Error>::Err(e.into()); + }, + } + } + }); + tokio::select! { Err(e) = spl_token_stream => { bail!("spl token stream error: {:?}", e) @@ -94,6 +154,9 @@ impl MessageHandler { Err(e) = mpl_bubblegum_stream => { bail!("mpl bumblegum stream error: {:?}", e) } + Err(e) = process_task => { + bail!("Receiver err: {:?}", e) + } } } } diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 5b66883..fbee2fe 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -18,6 +18,9 @@ pub struct Args { #[arg(long, env)] pub solana_endpoint: String, + #[arg(long, short = 'p', env, default_value_t = 8)] + pub parallelism: usize, + #[command(flatten)] pub db: db::DbArgs, } diff --git a/indexer/src/processor.rs b/indexer/src/processor.rs index 71bc0f2..6847de8 100644 --- a/indexer/src/processor.rs +++ b/indexer/src/processor.rs @@ -2,7 +2,6 @@ use std::{convert::TryInto, sync::Arc}; use anchor_lang::AnchorDeserialize; use backoff::ExponentialBackoff; -use dashmap::DashMap; use holaplex_hub_nfts_solana_core::{ db::Connection, proto::{ @@ -13,25 +12,19 @@ use holaplex_hub_nfts_solana_core::{ CollectionMint, CompressionLeaf, }; use holaplex_hub_nfts_solana_entity::compression_leafs; -use hub_core::{prelude::*, producer::Producer, tokio::task}; +use hub_core::{prelude::*, producer::Producer}; use mpl_bubblegum::utils::get_asset_id; use solana_client::rpc_client::RpcClient; use solana_program::program_pack::Pack; use solana_sdk::{pubkey::Pubkey, signature::Signature}; use spl_token::{instruction::TokenInstruction, state::Account}; -use yellowstone_grpc_proto::{ - prelude::{ - subscribe_update::UpdateOneof, Message, SubscribeUpdate, SubscribeUpdateTransaction, - }, - tonic::Status, -}; +use yellowstone_grpc_proto::prelude::{Message, SubscribeUpdateTransaction}; #[derive(Clone)] pub struct Processor { db: Connection, rpc: Arc, producer: Producer, - dashmap: DashMap>, } impl Processor { @@ -40,38 +33,7 @@ impl Processor { rpc: Arc, producer: Producer, ) -> Self { - Self { - db, - rpc, - producer, - dashmap: DashMap::new(), - } - } - - pub(crate) async fn process(&self, message: Result) -> Result<()> { - match message { - Ok(msg) => match msg.update_oneof { - Some(UpdateOneof::Transaction(tx)) => { - self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); - }, - Some(UpdateOneof::Slot(slot)) => { - if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) { - let handles: Vec<_> = transactions - .into_iter() - .map(|tx| task::spawn(self.clone().process_transaction(tx))) - .collect(); - - for handle in handles { - handle.await?? - } - } - }, - _ => {}, - }, - Err(error) => bail!("stream error: {:?}", error), - }; - - Ok(()) + Self { db, rpc, producer } } pub(crate) async fn process_transaction(self, tx: SubscribeUpdateTransaction) -> Result<()> { @@ -140,6 +102,7 @@ impl Processor { if compression_leaf.is_none() { return Ok(()); } + let compression_leaf = compression_leaf.context("Compression leaf not found")?; let collection_mint_id = compression_leaf.id; From 6f7d4c298c91ccdc53986bb8d6310a4af7e29b10 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Wed, 4 Oct 2023 09:30:51 +0500 Subject: [PATCH 08/11] use tokio unbounded channel --- Cargo.lock | 15 --------------- indexer/Cargo.toml | 1 - indexer/src/handler.rs | 40 ++++++++++++++++++++-------------------- 3 files changed, 20 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cd49ec5..63769e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1458,20 +1458,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" -dependencies = [ - "cfg-if", - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", -] - [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -5420,7 +5406,6 @@ version = "0.1.0" dependencies = [ "anchor-lang", "backoff", - "crossbeam", "dashmap", "futures", "hex", diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index 7b97ec1..23ff826 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -29,7 +29,6 @@ dashmap = "5.4.0" spl-token = "=3.5.0" solana-client = "1.14" backoff = { version = "0.4.0", features = ["tokio"] } -crossbeam = "0.8.2" [dependencies.hub-core] package = "holaplex-hub-core" diff --git a/indexer/src/handler.rs b/indexer/src/handler.rs index 223dd7e..2e6b21e 100644 --- a/indexer/src/handler.rs +++ b/indexer/src/handler.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use crossbeam::channel::{self, Receiver, Sender}; use dashmap::DashMap; use futures::{sink::SinkExt, stream::StreamExt}; use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents}; @@ -10,6 +9,10 @@ use hub_core::{ producer::Producer, tokio::{ self, + sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + Mutex, + }, task::{self, JoinSet}, }, }; @@ -25,9 +28,8 @@ use crate::{processor::Processor, Args, GeyserGrpcConnector}; pub struct MessageHandler { connector: GeyserGrpcConnector, processor: Processor, - dashmap: DashMap>, - tx: Sender, - rx: Receiver, + tx: UnboundedSender, + rx: Arc>>, parallelism: usize, } @@ -47,15 +49,14 @@ impl MessageHandler { let rpc = Arc::new(RpcClient::new(solana_endpoint)); let connector = GeyserGrpcConnector::new(dragon_mouth_endpoint, dragon_mouth_x_token); - let (tx, rx) = channel::unbounded(); + let (tx, rx) = mpsc::unbounded_channel(); let processor = Processor::new(db, rpc, producer); Ok(Self { connector, processor, - dashmap: DashMap::new(), tx, - rx, + rx: Arc::new(Mutex::new(rx)), parallelism, }) } @@ -63,7 +64,7 @@ impl MessageHandler { async fn connect(&self, request: SubscribeRequest) -> Result<()> { (|| async { let (mut subscribe_tx, mut stream) = self.connector.subscribe().await?; - + let dashmap = DashMap::new(); subscribe_tx .send(request.clone()) .await @@ -73,10 +74,10 @@ impl MessageHandler { match message { Ok(msg) => match msg.update_oneof { Some(UpdateOneof::Transaction(tx)) => { - self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); + dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); }, Some(UpdateOneof::Slot(slot)) => { - if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) { + if let Some((_, transactions)) = dashmap.remove(&slot.slot) { for tx in transactions { self.tx.send(tx)?; } @@ -119,7 +120,6 @@ impl MessageHandler { } }); - let rx = self.rx.clone(); let processor = self.processor; let process_task = task::spawn(async move { @@ -127,22 +127,22 @@ impl MessageHandler { loop { let processor = processor.clone(); + let mut rx = self.rx.lock().await; while set.len() >= self.parallelism { match set.join_next().await { - Some(Err(e)) => bail!("failed to join task {:?}", e), + Some(Err(e)) => { + return Result::<(), Error>::Err(anyhow!( + "failed to join task {:?}", + e + )); + }, Some(Ok(_)) | None => (), } } - match rx.recv() { - Ok(tx) => { - set.spawn(processor.process_transaction(tx)); - }, - Err(e) => { - error!("{:?}", e); - return Result::<(), Error>::Err(e.into()); - }, + if let Some(tx) = rx.recv().await { + set.spawn(processor.process_transaction(tx)); } } }); From 3091f2aef9e7699891499897aa08b6e6cc700e38 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Wed, 4 Oct 2023 12:15:44 +0500 Subject: [PATCH 09/11] replace dashmap with hashmap --- Cargo.lock | 14 -------------- indexer/Cargo.toml | 1 - indexer/src/handler.rs | 7 +++---- 3 files changed, 3 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63769e3..a14fdfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1595,19 +1595,6 @@ dependencies = [ "syn 2.0.37", ] -[[package]] -name = "dashmap" -version = "5.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" -dependencies = [ - "cfg-if", - "hashbrown 0.14.0", - "lock_api", - "once_cell", - "parking_lot_core 0.9.8", -] - [[package]] name = "data-encoding" version = "2.4.0" @@ -5406,7 +5393,6 @@ version = "0.1.0" dependencies = [ "anchor-lang", "backoff", - "dashmap", "futures", "hex", "holaplex-hub-core", diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index 23ff826..3837b4c 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -25,7 +25,6 @@ solana-program = "1.14" anchor-lang = "0.26.0" yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc", tag = "v1.7.1+solana.1.16.1" } yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc", tag = "v1.7.1+solana.1.16.1" } -dashmap = "5.4.0" spl-token = "=3.5.0" solana-client = "1.14" backoff = { version = "0.4.0", features = ["tokio"] } diff --git a/indexer/src/handler.rs b/indexer/src/handler.rs index 2e6b21e..0abc9c0 100644 --- a/indexer/src/handler.rs +++ b/indexer/src/handler.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use dashmap::DashMap; use futures::{sink::SinkExt, stream::StreamExt}; use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents}; use hub_core::{ @@ -64,7 +63,7 @@ impl MessageHandler { async fn connect(&self, request: SubscribeRequest) -> Result<()> { (|| async { let (mut subscribe_tx, mut stream) = self.connector.subscribe().await?; - let dashmap = DashMap::new(); + let mut hashmap = std::collections::HashMap::new(); subscribe_tx .send(request.clone()) .await @@ -74,10 +73,10 @@ impl MessageHandler { match message { Ok(msg) => match msg.update_oneof { Some(UpdateOneof::Transaction(tx)) => { - dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); + hashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); }, Some(UpdateOneof::Slot(slot)) => { - if let Some((_, transactions)) = dashmap.remove(&slot.slot) { + if let Some(transactions) = hashmap.remove(&slot.slot) { for tx in transactions { self.tx.send(tx)?; } From 03977eb750831be1210077c6160b24cf6c2ad59f Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Wed, 4 Oct 2023 22:52:19 +0500 Subject: [PATCH 10/11] retry submit transaction --- consumer/src/solana.rs | 54 ++++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/consumer/src/solana.rs b/consumer/src/solana.rs index 2982140..3d1a748 100644 --- a/consumer/src/solana.rs +++ b/consumer/src/solana.rs @@ -25,7 +25,10 @@ use mpl_token_metadata::{ }, state::{Creator, DataV2, EDITION, PREFIX}, }; -use solana_client::{client_error::ClientError, rpc_client::RpcClient as SolanaRpcClient}; +use solana_client::{ + client_error::{ClientError, ClientErrorKind}, + rpc_client::RpcClient as SolanaRpcClient, +}; use solana_program::{ instruction::Instruction, program_pack::Pack, pubkey::Pubkey, system_instruction::create_account, system_program, @@ -59,7 +62,7 @@ use crate::{ }, }; -macro_rules! call_with_retry { +macro_rules! with_retry { ($expr:expr) => {{ (|| $expr) .retry( @@ -71,7 +74,6 @@ macro_rules! call_with_retry { .notify(|err: &ClientError, dur: Duration| { error!("retrying error {:?} in {:?}", err, dur); }) - .call() }}; } @@ -204,10 +206,11 @@ impl Solana { &self, signature: &Signature, ) -> Result { - let response = call_with_retry!( + let response = with_retry!( self.rpc() .get_transaction(signature, UiTransactionEncoding::Json) - )?; + ) + .call()?; let meta = response .transaction @@ -266,11 +269,19 @@ impl Solana { message, }; - let signature = self.rpc().send_transaction(&transaction).map_err(|e| { - let msg = format!("failed to send transaction: {e}"); - error!(msg); - anyhow!(msg) - })?; + let signature = with_retry!(self.rpc().send_and_confirm_transaction(&transaction)) + .when(|e| { + !matches!( + e.kind, + ClientErrorKind::TransactionError(_) | ClientErrorKind::SigningError(_) + ) + }) + .call() + .map_err(|e| { + let msg = format!("failed to send transaction: {e}"); + error!(msg); + anyhow!(msg) + })?; Ok(signature.to_string()) } @@ -324,8 +335,8 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { ); let len = spl_token::state::Mint::LEN; - let rent = call_with_retry!(rpc.get_minimum_balance_for_rent_exemption(len))?; - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; let create_account_ins = solana_program::system_instruction::create_account( &payer, @@ -474,7 +485,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { None, ); - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; let message = solana_program::message::Message::new_with_blockhash(&[ins], Some(&payer), &blockhash); @@ -526,7 +537,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { &mpl_token_metadata::ID, ); - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; let update_ins: Instruction = mpl_token_metadata::instruction::update_metadata_accounts_v2( mpl_token_metadata::ID, @@ -589,7 +600,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { let mut message: solana_program::message::Message = bincode::deserialize(&revision.serialized_message)?; - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; message.recent_blockhash = blockhash; Ok(TransactionResponse { @@ -674,7 +685,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { let instructions = vec![unverify_ins, verify_ins]; - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; let message = solana_program::message::Message::new_with_blockhash( &instructions, @@ -744,7 +755,8 @@ impl<'a> MintBackend for E ]; let (metadata_key, _) = Pubkey::find_program_address(metadata_seeds, &program_pubkey); - let rent = call_with_retry!(rpc.get_minimum_balance_for_rent_exemption(state::Mint::LEN))?; + let rent = + with_retry!(rpc.get_minimum_balance_for_rent_exemption(state::Mint::LEN)).call()?; let mut instructions = vec![ create_account( @@ -782,7 +794,7 @@ impl<'a> MintBackend for E edition, )); - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; let message = solana_program::message::Message::new_with_blockhash( &instructions, @@ -830,7 +842,7 @@ impl<'a> TransferBackend for Un let recipient: Pubkey = recipient_address.parse()?; let mint_address: Pubkey = collection_mint.mint.parse()?; let payer: Pubkey = self.0.treasury_wallet_address; - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; let source_ata = get_associated_token_address(&sender, &mint_address); let destination_ata = get_associated_token_address(&recipient, &mint_address); @@ -1127,8 +1139,8 @@ impl<'a> MintBackend ); let associated_token_account = get_associated_token_address(&recipient, &mint.pubkey()); let len = spl_token::state::Mint::LEN; - let rent = call_with_retry!(rpc.get_minimum_balance_for_rent_exemption(len))?; - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; let create_account_ins = solana_program::system_instruction::create_account( &payer, From 94c2abe6c9560f7726e4d68fc789efac1626f861 Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Fri, 6 Oct 2023 19:45:31 +0500 Subject: [PATCH 11/11] Support retry for failed compressed mint --- consumer/src/events.rs | 42 ++++++++++++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/consumer/src/events.rs b/consumer/src/events.rs index 22043ce..642ccda 100644 --- a/consumer/src/events.rs +++ b/consumer/src/events.rs @@ -546,11 +546,7 @@ impl Processor { self.process_nft( EventKind::RetryMintToCollection, &key, - self.retry_mint_to_collection( - &UncompressedRef(self.solana()), - &key, - payload, - ), + self.retry_mint_to_collection(&key, payload), ) .await }, @@ -625,11 +621,7 @@ impl Processor { self.process_nft( EventKind::RetryMintOpenDrop, &key, - self.retry_mint_to_collection( - &UncompressedRef(self.solana()), - &key, - payload, - ), + self.retry_mint_to_collection(&key, payload), ) .await }, @@ -1185,11 +1177,8 @@ impl Processor { Ok(tx.into()) } - async fn retry_mint_to_collection< - B: MintBackend, - >( + async fn retry_mint_to_collection( &self, - backend: &B, key: &SolanaNftEventKey, payload: MintMetaplexMetadataTransaction, ) -> ProcessResult { @@ -1202,6 +1191,31 @@ impl Processor { let collection = collection.ok_or(ProcessorErrorKind::RecordNotFound)?; + if payload.compressed { + let backend = &CompressedRef(self.solana()); + + let tx = backend + .mint(&collection, payload) + .map_err(ProcessorErrorKind::Solana)?; + + let leaf_model = CompressionLeaf::find_by_id(conn, id) + .await? + .ok_or(ProcessorErrorKind::RecordNotFound)?; + + let mut compression_leaf: compression_leafs::ActiveModel = leaf_model.into(); + + compression_leaf.merkle_tree = Set(tx.addresses.merkle_tree.to_string()); + compression_leaf.tree_authority = Set(tx.addresses.tree_authority.to_string()); + compression_leaf.tree_delegate = Set(tx.addresses.tree_delegate.to_string()); + compression_leaf.leaf_owner = Set(tx.addresses.leaf_owner.to_string()); + + compression_leaf.update(conn).await?; + + return Ok(tx.into()); + } + + let backend = &UncompressedRef(self.solana()); + let tx = backend .mint(&collection, payload) .map_err(ProcessorErrorKind::Solana)?;