diff --git a/Cargo.lock b/Cargo.lock index 63769e3..a14fdfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1595,19 +1595,6 @@ dependencies = [ "syn 2.0.37", ] -[[package]] -name = "dashmap" -version = "5.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" -dependencies = [ - "cfg-if", - "hashbrown 0.14.0", - "lock_api", - "once_cell", - "parking_lot_core 0.9.8", -] - [[package]] name = "data-encoding" version = "2.4.0" @@ -5406,7 +5393,6 @@ version = "0.1.0" dependencies = [ "anchor-lang", "backoff", - "dashmap", "futures", "hex", "holaplex-hub-core", diff --git a/consumer/src/events.rs b/consumer/src/events.rs index 22043ce..642ccda 100644 --- a/consumer/src/events.rs +++ b/consumer/src/events.rs @@ -546,11 +546,7 @@ impl Processor { self.process_nft( EventKind::RetryMintToCollection, &key, - self.retry_mint_to_collection( - &UncompressedRef(self.solana()), - &key, - payload, - ), + self.retry_mint_to_collection(&key, payload), ) .await }, @@ -625,11 +621,7 @@ impl Processor { self.process_nft( EventKind::RetryMintOpenDrop, &key, - self.retry_mint_to_collection( - &UncompressedRef(self.solana()), - &key, - payload, - ), + self.retry_mint_to_collection(&key, payload), ) .await }, @@ -1185,11 +1177,8 @@ impl Processor { Ok(tx.into()) } - async fn retry_mint_to_collection< - B: MintBackend, - >( + async fn retry_mint_to_collection( &self, - backend: &B, key: &SolanaNftEventKey, payload: MintMetaplexMetadataTransaction, ) -> ProcessResult { @@ -1202,6 +1191,31 @@ impl Processor { let collection = collection.ok_or(ProcessorErrorKind::RecordNotFound)?; + if payload.compressed { + let backend = &CompressedRef(self.solana()); + + let tx = backend + .mint(&collection, payload) + .map_err(ProcessorErrorKind::Solana)?; + + let leaf_model = CompressionLeaf::find_by_id(conn, id) + .await? + .ok_or(ProcessorErrorKind::RecordNotFound)?; + + let mut compression_leaf: compression_leafs::ActiveModel = leaf_model.into(); + + compression_leaf.merkle_tree = Set(tx.addresses.merkle_tree.to_string()); + compression_leaf.tree_authority = Set(tx.addresses.tree_authority.to_string()); + compression_leaf.tree_delegate = Set(tx.addresses.tree_delegate.to_string()); + compression_leaf.leaf_owner = Set(tx.addresses.leaf_owner.to_string()); + + compression_leaf.update(conn).await?; + + return Ok(tx.into()); + } + + let backend = &UncompressedRef(self.solana()); + let tx = backend .mint(&collection, payload) .map_err(ProcessorErrorKind::Solana)?; diff --git a/consumer/src/solana.rs b/consumer/src/solana.rs index 50c0dd5..3d1a748 100644 --- a/consumer/src/solana.rs +++ b/consumer/src/solana.rs @@ -25,7 +25,10 @@ use mpl_token_metadata::{ }, state::{Creator, DataV2, EDITION, PREFIX}, }; -use solana_client::rpc_client::RpcClient as SolanaRpcClient; +use solana_client::{ + client_error::{ClientError, ClientErrorKind}, + rpc_client::RpcClient as SolanaRpcClient, +}; use solana_program::{ instruction::Instruction, program_pack::Pack, pubkey::Pubkey, system_instruction::create_account, system_program, @@ -59,16 +62,18 @@ use crate::{ }, }; -macro_rules! call_with_retry { +macro_rules! with_retry { ($expr:expr) => {{ (|| $expr) .retry( &ExponentialBuilder::default() .with_jitter() .with_min_delay(Duration::from_millis(30)) - .with_max_times(10), + .with_max_times(15), ) - .call() + .notify(|err: &ClientError, dur: Duration| { + error!("retrying error {:?} in {:?}", err, dur); + }) }}; } @@ -201,9 +206,11 @@ impl Solana { &self, signature: &Signature, ) -> Result { - let response = self - .rpc() - .get_transaction(signature, UiTransactionEncoding::Json)?; + let response = with_retry!( + self.rpc() + .get_transaction(signature, UiTransactionEncoding::Json) + ) + .call()?; let meta = response .transaction @@ -247,10 +254,7 @@ impl Solana { let signatures = transaction .signed_message_signatures .iter() - .map(|s| { - Signature::from_str(s) - .map_err(|e| anyhow!(format!("failed to parse signature: {e}"))) - }) + .map(|s| Signature::from_str(s).context("failed to parse signature")) .collect::>>()?; let message = bincode::deserialize( @@ -265,13 +269,21 @@ impl Solana { message, }; - call_with_retry!(self.rpc().send_and_confirm_transaction(&transaction)) - .map(|s| s.to_string()) + let signature = with_retry!(self.rpc().send_and_confirm_transaction(&transaction)) + .when(|e| { + !matches!( + e.kind, + ClientErrorKind::TransactionError(_) | ClientErrorKind::SigningError(_) + ) + }) + .call() .map_err(|e| { - let msg = format!("failed to submit transaction: {e}"); + let msg = format!("failed to send transaction: {e}"); error!(msg); anyhow!(msg) - }) + })?; + + Ok(signature.to_string()) } } @@ -323,8 +335,8 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { ); let len = spl_token::state::Mint::LEN; - let rent = call_with_retry!(rpc.get_minimum_balance_for_rent_exemption(len))?; - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; let create_account_ins = solana_program::system_instruction::create_account( &payer, @@ -473,7 +485,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { None, ); - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; let message = solana_program::message::Message::new_with_blockhash(&[ins], Some(&payer), &blockhash); @@ -525,7 +537,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { &mpl_token_metadata::ID, ); - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; let update_ins: Instruction = mpl_token_metadata::instruction::update_metadata_accounts_v2( mpl_token_metadata::ID, @@ -588,7 +600,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { let mut message: solana_program::message::Message = bincode::deserialize(&revision.serialized_message)?; - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; message.recent_blockhash = blockhash; Ok(TransactionResponse { @@ -673,7 +685,7 @@ impl<'a> CollectionBackend for UncompressedRef<'a> { let instructions = vec![unverify_ins, verify_ins]; - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; let message = solana_program::message::Message::new_with_blockhash( &instructions, @@ -743,7 +755,8 @@ impl<'a> MintBackend for E ]; let (metadata_key, _) = Pubkey::find_program_address(metadata_seeds, &program_pubkey); - let rent = call_with_retry!(rpc.get_minimum_balance_for_rent_exemption(state::Mint::LEN))?; + let rent = + with_retry!(rpc.get_minimum_balance_for_rent_exemption(state::Mint::LEN)).call()?; let mut instructions = vec![ create_account( @@ -781,7 +794,7 @@ impl<'a> MintBackend for E edition, )); - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; let message = solana_program::message::Message::new_with_blockhash( &instructions, @@ -829,7 +842,7 @@ impl<'a> TransferBackend for Un let recipient: Pubkey = recipient_address.parse()?; let mint_address: Pubkey = collection_mint.mint.parse()?; let payer: Pubkey = self.0.treasury_wallet_address; - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; let source_ata = get_associated_token_address(&sender, &mint_address); let destination_ata = get_associated_token_address(&recipient, &mint_address); @@ -1126,8 +1139,8 @@ impl<'a> MintBackend ); let associated_token_account = get_associated_token_address(&recipient, &mint.pubkey()); let len = spl_token::state::Mint::LEN; - let rent = call_with_retry!(rpc.get_minimum_balance_for_rent_exemption(len))?; - let blockhash = call_with_retry!(rpc.get_latest_blockhash())?; + let rent = with_retry!(rpc.get_minimum_balance_for_rent_exemption(len)).call()?; + let blockhash = with_retry!(rpc.get_latest_blockhash()).call()?; let create_account_ins = solana_program::system_instruction::create_account( &payer, diff --git a/core/src/db.rs b/core/src/db.rs index 2cc9c7f..18ce10b 100644 --- a/core/src/db.rs +++ b/core/src/db.rs @@ -12,7 +12,7 @@ pub struct DbArgs { pub connection_timeout: u64, #[arg(long, env, default_value_t = 10)] pub acquire_timeout: u64, - #[arg(long, env, default_value_t = 60)] + #[arg(long, env, default_value_t = 10)] pub idle_timeout: u64, #[arg(long, env)] pub database_url: String, diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index c9f3173..3837b4c 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -25,12 +25,10 @@ solana-program = "1.14" anchor-lang = "0.26.0" yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc", tag = "v1.7.1+solana.1.16.1" } yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc", tag = "v1.7.1+solana.1.16.1" } -dashmap = "5.4.0" spl-token = "=3.5.0" solana-client = "1.14" backoff = { version = "0.4.0", features = ["tokio"] } - [dependencies.hub-core] package = "holaplex-hub-core" version = "0.5.6" diff --git a/indexer/src/handler.rs b/indexer/src/handler.rs index f70c3ee..0abc9c0 100644 --- a/indexer/src/handler.rs +++ b/indexer/src/handler.rs @@ -6,11 +6,20 @@ use hub_core::{ backon::{ExponentialBuilder, Retryable}, prelude::*, producer::Producer, - tokio, + tokio::{ + self, + sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + Mutex, + }, + task::{self, JoinSet}, + }, }; use solana_client::rpc_client::RpcClient; use yellowstone_grpc_client::GeyserGrpcClientError; -use yellowstone_grpc_proto::prelude::SubscribeRequest; +use yellowstone_grpc_proto::prelude::{ + subscribe_update::UpdateOneof, SubscribeRequest, SubscribeUpdateTransaction, +}; use crate::{processor::Processor, Args, GeyserGrpcConnector}; @@ -18,6 +27,9 @@ use crate::{processor::Processor, Args, GeyserGrpcConnector}; pub struct MessageHandler { connector: GeyserGrpcConnector, processor: Processor, + tx: UnboundedSender, + rx: Arc>>, + parallelism: usize, } impl MessageHandler { @@ -26,6 +38,7 @@ impl MessageHandler { dragon_mouth_endpoint, dragon_mouth_x_token, solana_endpoint, + parallelism, db, } = args; @@ -35,28 +48,45 @@ impl MessageHandler { let rpc = Arc::new(RpcClient::new(solana_endpoint)); let connector = GeyserGrpcConnector::new(dragon_mouth_endpoint, dragon_mouth_x_token); - + let (tx, rx) = mpsc::unbounded_channel(); let processor = Processor::new(db, rpc, producer); Ok(Self { connector, processor, + tx, + rx: Arc::new(Mutex::new(rx)), + parallelism, }) } async fn connect(&self, request: SubscribeRequest) -> Result<()> { (|| async { let (mut subscribe_tx, mut stream) = self.connector.subscribe().await?; - + let mut hashmap = std::collections::HashMap::new(); subscribe_tx .send(request.clone()) .await .map_err(GeyserGrpcClientError::SubscribeSendError)?; while let Some(message) = stream.next().await { - self.processor.process(message).await?; + match message { + Ok(msg) => match msg.update_oneof { + Some(UpdateOneof::Transaction(tx)) => { + hashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); + }, + Some(UpdateOneof::Slot(slot)) => { + if let Some(transactions) = hashmap.remove(&slot.slot) { + for tx in transactions { + self.tx.send(tx)?; + } + } + }, + _ => {}, + }, + Err(error) => bail!("stream error: {:?}", error), + }; } - Ok(()) }) .retry( @@ -81,12 +111,41 @@ impl MessageHandler { }); let mpl_bubblegum_stream = tokio::spawn({ + let handler = self.clone(); async move { - self.connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID)) + handler + .connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID)) .await } }); + let processor = self.processor; + + let process_task = task::spawn(async move { + let mut set = JoinSet::new(); + + loop { + let processor = processor.clone(); + let mut rx = self.rx.lock().await; + + while set.len() >= self.parallelism { + match set.join_next().await { + Some(Err(e)) => { + return Result::<(), Error>::Err(anyhow!( + "failed to join task {:?}", + e + )); + }, + Some(Ok(_)) | None => (), + } + } + + if let Some(tx) = rx.recv().await { + set.spawn(processor.process_transaction(tx)); + } + } + }); + tokio::select! { Err(e) = spl_token_stream => { bail!("spl token stream error: {:?}", e) @@ -94,6 +153,9 @@ impl MessageHandler { Err(e) = mpl_bubblegum_stream => { bail!("mpl bumblegum stream error: {:?}", e) } + Err(e) = process_task => { + bail!("Receiver err: {:?}", e) + } } } } diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 5b66883..fbee2fe 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -18,6 +18,9 @@ pub struct Args { #[arg(long, env)] pub solana_endpoint: String, + #[arg(long, short = 'p', env, default_value_t = 8)] + pub parallelism: usize, + #[command(flatten)] pub db: db::DbArgs, } diff --git a/indexer/src/processor.rs b/indexer/src/processor.rs index b6531dd..6847de8 100644 --- a/indexer/src/processor.rs +++ b/indexer/src/processor.rs @@ -2,7 +2,6 @@ use std::{convert::TryInto, sync::Arc}; use anchor_lang::AnchorDeserialize; use backoff::ExponentialBackoff; -use dashmap::DashMap; use holaplex_hub_nfts_solana_core::{ db::Connection, proto::{ @@ -13,25 +12,19 @@ use holaplex_hub_nfts_solana_core::{ CollectionMint, CompressionLeaf, }; use holaplex_hub_nfts_solana_entity::compression_leafs; -use hub_core::{prelude::*, producer::Producer, tokio::task}; +use hub_core::{prelude::*, producer::Producer}; use mpl_bubblegum::utils::get_asset_id; use solana_client::rpc_client::RpcClient; use solana_program::program_pack::Pack; use solana_sdk::{pubkey::Pubkey, signature::Signature}; use spl_token::{instruction::TokenInstruction, state::Account}; -use yellowstone_grpc_proto::{ - prelude::{ - subscribe_update::UpdateOneof, Message, SubscribeUpdate, SubscribeUpdateTransaction, - }, - tonic::Status, -}; +use yellowstone_grpc_proto::prelude::{Message, SubscribeUpdateTransaction}; #[derive(Clone)] pub struct Processor { db: Connection, rpc: Arc, producer: Producer, - dashmap: DashMap>, } impl Processor { @@ -40,38 +33,7 @@ impl Processor { rpc: Arc, producer: Producer, ) -> Self { - Self { - db, - rpc, - producer, - dashmap: DashMap::new(), - } - } - - pub(crate) async fn process(&self, message: Result) -> Result<()> { - match message { - Ok(msg) => match msg.update_oneof { - Some(UpdateOneof::Transaction(tx)) => { - self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); - }, - Some(UpdateOneof::Slot(slot)) => { - if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) { - let handles: Vec<_> = transactions - .into_iter() - .map(|tx| task::spawn(self.clone().process_transaction(tx))) - .collect(); - - for handle in handles { - handle.await?? - } - } - }, - _ => {}, - }, - Err(error) => bail!("stream error: {:?}", error), - }; - - Ok(()) + Self { db, rpc, producer } } pub(crate) async fn process_transaction(self, tx: SubscribeUpdateTransaction) -> Result<()> { @@ -135,9 +97,13 @@ impl Processor { let asset_id = get_asset_id(&merkle_tree, tkn_instruction.nonce); let compression_leaf = - CompressionLeaf::find_by_asset_id(conn, asset_id.to_string()) - .await? - .context("compression leaf not found")?; + CompressionLeaf::find_by_asset_id(conn, asset_id.to_string()).await?; + + if compression_leaf.is_none() { + return Ok(()); + } + + let compression_leaf = compression_leaf.context("Compression leaf not found")?; let collection_mint_id = compression_leaf.id; let leaf_owner = compression_leaf.leaf_owner.clone();