diff --git a/indexer/src/connector.rs b/indexer/src/connector.rs index 3fe3389..5a443a3 100644 --- a/indexer/src/connector.rs +++ b/indexer/src/connector.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, vec}; use futures::{channel::mpsc::SendError, Sink, Stream}; use hub_core::prelude::*; +use solana_program::pubkey::Pubkey; use yellowstone_grpc_client::GeyserGrpcClient; use yellowstone_grpc_proto::{prelude::*, tonic::Status}; @@ -29,7 +30,7 @@ impl GeyserGrpcConnector { Ok((subscribe_tx, stream)) } - pub fn build_request() -> SubscribeRequest { + pub fn build_request(program_id: Pubkey) -> SubscribeRequest { let mut slots = HashMap::new(); slots.insert("client".to_owned(), SubscribeRequestFilterSlots {}); @@ -38,20 +39,16 @@ impl GeyserGrpcConnector { vote: Some(false), failed: Some(false), signature: None, - account_include: vec![spl_token::ID.to_string(), mpl_bubblegum::ID.to_string()], + account_include: vec![program_id.to_string()], account_exclude: Vec::new(), account_required: Vec::new(), }); SubscribeRequest { - accounts: HashMap::new(), slots, transactions, - blocks: HashMap::new(), - blocks_meta: HashMap::new(), commitment: Some(CommitmentLevel::Finalized as i32), - accounts_data_slice: vec![], - entry: HashMap::new(), + ..Default::default() } } } diff --git a/indexer/src/handler.rs b/indexer/src/handler.rs index cdde0fc..f70c3ee 100644 --- a/indexer/src/handler.rs +++ b/indexer/src/handler.rs @@ -1,42 +1,23 @@ -use std::{convert::TryInto, sync::Arc}; +use std::sync::Arc; -use anchor_lang::AnchorDeserialize; -use backoff::ExponentialBackoff; -use dashmap::DashMap; use futures::{sink::SinkExt, stream::StreamExt}; -use holaplex_hub_nfts_solana_core::{ - db::Connection, - proto::{ - solana_nft_events::Event::UpdateMintOwner, MintOwnershipUpdate, SolanaNftEventKey, - SolanaNftEvents, - }, - sea_orm::Set, - CollectionMint, CompressionLeaf, +use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents}; +use hub_core::{ + backon::{ExponentialBuilder, Retryable}, + prelude::*, + producer::Producer, + tokio, }; -use holaplex_hub_nfts_solana_entity::compression_leafs; -use hub_core::{prelude::*, producer::Producer, tokio::task}; -use mpl_bubblegum::utils::get_asset_id; use solana_client::rpc_client::RpcClient; -use solana_program::program_pack::Pack; -use solana_sdk::{pubkey::Pubkey, signature::Signature}; -use spl_token::{instruction::TokenInstruction, state::Account}; use yellowstone_grpc_client::GeyserGrpcClientError; -use yellowstone_grpc_proto::{ - prelude::{ - subscribe_update::UpdateOneof, Message, SubscribeUpdate, SubscribeUpdateTransaction, - }, - tonic::Status, -}; +use yellowstone_grpc_proto::prelude::SubscribeRequest; -use crate::{Args, GeyserGrpcConnector}; +use crate::{processor::Processor, Args, GeyserGrpcConnector}; #[derive(Clone)] pub struct MessageHandler { - db: Connection, - dashmap: DashMap>, - rpc: Arc, connector: GeyserGrpcConnector, - producer: Producer, + processor: Processor, } impl MessageHandler { @@ -51,241 +32,68 @@ impl MessageHandler { let db = Connection::new(db) .await .context("failed to get database connection")?; - let dashmap: DashMap> = DashMap::new(); + let rpc = Arc::new(RpcClient::new(solana_endpoint)); let connector = GeyserGrpcConnector::new(dragon_mouth_endpoint, dragon_mouth_x_token); + let processor = Processor::new(db, rpc, producer); + Ok(Self { - db, - dashmap, - rpc, connector, - producer, + processor, }) } - pub async fn run(&self) -> Result<()> { - let request = GeyserGrpcConnector::build_request(); - let (mut subscribe_tx, mut stream) = self.connector.subscribe().await?; - - loop { - let request = request.clone(); + async fn connect(&self, request: SubscribeRequest) -> Result<()> { + (|| async { + let (mut subscribe_tx, mut stream) = self.connector.subscribe().await?; subscribe_tx - .send(request) + .send(request.clone()) .await .map_err(GeyserGrpcClientError::SubscribeSendError)?; while let Some(message) = stream.next().await { - self.handle_message(message).await?; + self.processor.process(message).await?; } - } - } - - async fn handle_message(&self, message: Result) -> Result<()> { - match message { - Ok(msg) => match msg.update_oneof { - Some(UpdateOneof::Transaction(tx)) => { - self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); - }, - Some(UpdateOneof::Slot(slot)) => { - if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) { - for tx in transactions { - task::spawn(self.clone().process_transaction(tx)); - } - } - }, - _ => {}, - }, - Err(error) => return Err(anyhow!("stream error: {:?}", error)), - }; - Ok(()) + Ok(()) + }) + .retry( + &ExponentialBuilder::default() + .with_max_times(10) + .with_jitter(), + ) + .notify(|err: &Error, dur: Duration| { + error!("stream error: {:?} retrying in {:?}", err, dur); + }) + .await } - async fn process_transaction(self, tx: SubscribeUpdateTransaction) -> Result<()> { - let message = tx - .transaction - .clone() - .context("SubscribeTransactionInfo not found")? - .transaction - .context("Transaction not found")? - .message - .context("Message not found")?; - let sig = tx - .transaction - .as_ref() - .ok_or_else(|| anyhow!("failed to get transaction"))? - .signature - .clone(); - - let keys = message.clone().account_keys; - - for (idx, key) in message.clone().account_keys.iter().enumerate() { - let key: &[u8] = key; - let k = Pubkey::try_from(key)?; - if k == spl_token::ID { - self.process_spl_token_transaction(idx, &keys, &sig, &message) - .await?; - } else if k == mpl_bubblegum::ID { - self.process_mpl_bubblegum_transaction(idx, &keys, &sig, &message) - .await?; + pub async fn run(self) -> Result<()> { + let spl_token_stream = tokio::spawn({ + let handler = self.clone(); + async move { + handler + .connect(GeyserGrpcConnector::build_request(spl_token::ID)) + .await } - } - - Ok(()) - } - - async fn process_mpl_bubblegum_transaction( - &self, - program_account_index: usize, - keys: &[Vec], - sig: &Vec, - message: &Message, - ) -> Result<()> { - for ins in message.instructions.iter() { - let account_indices = ins.accounts.clone(); - let program_idx: usize = ins.program_id_index.try_into()?; - - if program_idx == program_account_index { - let conn = self.db.get(); - let data = ins.data.clone(); - let data = data.as_slice(); - - let tkn_instruction = - mpl_bubblegum::instruction::Transfer::try_from_slice(&data[8..])?; - let new_leaf_owner_account_index = account_indices[3]; - let merkle_tree_account_index = account_indices[4]; - let new_leaf_owner_bytes: &[u8] = &keys[new_leaf_owner_account_index as usize]; - let merkle_tree_bytes: &[u8] = &keys[merkle_tree_account_index as usize]; - let new_leaf_owner = Pubkey::try_from(new_leaf_owner_bytes)?; - let merkle_tree = Pubkey::try_from(merkle_tree_bytes)?; - - let asset_id = get_asset_id(&merkle_tree, tkn_instruction.nonce); - - let compression_leaf = - CompressionLeaf::find_by_asset_id(conn, asset_id.to_string()) - .await? - .context("compression leaf not found")?; + }); - let collection_mint_id = compression_leaf.id; - let leaf_owner = compression_leaf.leaf_owner.clone(); - let mut compression_leaf: compression_leafs::ActiveModel = compression_leaf.into(); - - compression_leaf.leaf_owner = Set(new_leaf_owner.to_string()); - - CompressionLeaf::update(conn, compression_leaf).await?; - - self.producer - .send( - Some(&SolanaNftEvents { - event: Some(UpdateMintOwner(MintOwnershipUpdate { - mint_address: asset_id.to_string(), - sender: leaf_owner, - recipient: new_leaf_owner.to_string(), - tx_signature: Signature::new(sig.as_slice()).to_string(), - })), - }), - Some(&SolanaNftEventKey { - id: collection_mint_id.to_string(), - ..Default::default() - }), - ) - .await?; + let mpl_bubblegum_stream = tokio::spawn({ + async move { + self.connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID)) + .await } - } - - Ok(()) - } - - async fn process_spl_token_transaction( - &self, - program_account_index: usize, - keys: &[Vec], - sig: &Vec, - message: &Message, - ) -> Result<()> { - let conn = self.db.get(); - for ins in message.instructions.iter() { - let account_indices = ins.accounts.clone(); - let program_idx: usize = ins.program_id_index.try_into()?; - - if program_idx == program_account_index { - let data = ins.data.clone(); - let data = data.as_slice(); - let tkn_instruction = spl_token::instruction::TokenInstruction::unpack(data)?; - - let transfer_info = match tkn_instruction { - TokenInstruction::TransferChecked { amount, .. } => Some((amount, 2)), - TokenInstruction::Transfer { amount } => Some((amount, 1)), - _ => None, - }; - - if transfer_info.is_none() { - continue; - } - - if let Some((1, destination_ata_index)) = transfer_info { - let source_account_index = account_indices[0]; - let source_bytes: &[u8] = &keys[source_account_index as usize]; - let source = Pubkey::try_from(source_bytes)?; + }); - let collection_mint = - CollectionMint::find_by_ata(conn, source.to_string()).await?; - - if collection_mint.is_none() { - return Ok(()); - } - - let destination_account_index = account_indices[destination_ata_index]; - let destination_bytes: &[u8] = &keys[destination_account_index as usize]; - let destination = Pubkey::try_from(destination_bytes)?; - - let acct = fetch_account(&self.rpc, &destination).await?; - let destination_tkn_act = Account::unpack(&acct.data)?; - let new_owner = destination_tkn_act.owner.to_string(); - let mint = collection_mint.context("No mint found")?; - - CollectionMint::update_owner_and_ata( - conn, - &mint, - new_owner.clone(), - destination.to_string(), - ) - .await?; - - self.producer - .send( - Some(&SolanaNftEvents { - event: Some(UpdateMintOwner(MintOwnershipUpdate { - mint_address: destination_tkn_act.mint.to_string(), - sender: mint.owner.to_string(), - recipient: new_owner, - tx_signature: Signature::new(sig.as_slice()).to_string(), - })), - }), - Some(&SolanaNftEventKey { - id: mint.id.to_string(), - ..Default::default() - }), - ) - .await?; - } + tokio::select! { + Err(e) = spl_token_stream => { + bail!("spl token stream error: {:?}", e) + }, + Err(e) = mpl_bubblegum_stream => { + bail!("mpl bumblegum stream error: {:?}", e) } } - - Ok(()) } } - -async fn fetch_account( - rpc: &Arc, - address: &Pubkey, -) -> Result { - backoff::future::retry(ExponentialBackoff::default(), || async { - let acct = rpc.get_account(address)?; - - Ok(acct) - }) - .await -} diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 53d0373..5b66883 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -1,5 +1,6 @@ mod connector; mod handler; +mod processor; use clap::{arg, command}; pub use connector::GeyserGrpcConnector; pub use handler::MessageHandler; diff --git a/indexer/src/processor.rs b/indexer/src/processor.rs new file mode 100644 index 0000000..b6531dd --- /dev/null +++ b/indexer/src/processor.rs @@ -0,0 +1,262 @@ +use std::{convert::TryInto, sync::Arc}; + +use anchor_lang::AnchorDeserialize; +use backoff::ExponentialBackoff; +use dashmap::DashMap; +use holaplex_hub_nfts_solana_core::{ + db::Connection, + proto::{ + solana_nft_events::Event::UpdateMintOwner, MintOwnershipUpdate, SolanaNftEventKey, + SolanaNftEvents, + }, + sea_orm::Set, + CollectionMint, CompressionLeaf, +}; +use holaplex_hub_nfts_solana_entity::compression_leafs; +use hub_core::{prelude::*, producer::Producer, tokio::task}; +use mpl_bubblegum::utils::get_asset_id; +use solana_client::rpc_client::RpcClient; +use solana_program::program_pack::Pack; +use solana_sdk::{pubkey::Pubkey, signature::Signature}; +use spl_token::{instruction::TokenInstruction, state::Account}; +use yellowstone_grpc_proto::{ + prelude::{ + subscribe_update::UpdateOneof, Message, SubscribeUpdate, SubscribeUpdateTransaction, + }, + tonic::Status, +}; + +#[derive(Clone)] +pub struct Processor { + db: Connection, + rpc: Arc, + producer: Producer, + dashmap: DashMap>, +} + +impl Processor { + pub(crate) fn new( + db: Connection, + rpc: Arc, + producer: Producer, + ) -> Self { + Self { + db, + rpc, + producer, + dashmap: DashMap::new(), + } + } + + pub(crate) async fn process(&self, message: Result) -> Result<()> { + match message { + Ok(msg) => match msg.update_oneof { + Some(UpdateOneof::Transaction(tx)) => { + self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx); + }, + Some(UpdateOneof::Slot(slot)) => { + if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) { + let handles: Vec<_> = transactions + .into_iter() + .map(|tx| task::spawn(self.clone().process_transaction(tx))) + .collect(); + + for handle in handles { + handle.await?? + } + } + }, + _ => {}, + }, + Err(error) => bail!("stream error: {:?}", error), + }; + + Ok(()) + } + + pub(crate) async fn process_transaction(self, tx: SubscribeUpdateTransaction) -> Result<()> { + let message = tx + .transaction + .clone() + .context("SubscribeTransactionInfo not found")? + .transaction + .context("Transaction not found")? + .message + .context("Message not found")?; + let sig = tx + .transaction + .as_ref() + .ok_or_else(|| anyhow!("failed to get transaction"))? + .signature + .clone(); + + let keys = message.clone().account_keys; + + for (idx, key) in message.clone().account_keys.iter().enumerate() { + let key: &[u8] = key; + let k = Pubkey::try_from(key)?; + if k == spl_token::ID { + self.process_spl_token_transaction(idx, &keys, &sig, &message) + .await?; + } else if k == mpl_bubblegum::ID { + self.process_mpl_bubblegum_transaction(idx, &keys, &sig, &message) + .await?; + } + } + + Ok(()) + } + + pub(crate) async fn process_mpl_bubblegum_transaction( + &self, + program_account_index: usize, + keys: &[Vec], + sig: &Vec, + message: &Message, + ) -> Result<()> { + for ins in message.instructions.iter() { + let account_indices = ins.accounts.clone(); + let program_idx: usize = ins.program_id_index.try_into()?; + + if program_idx == program_account_index { + let conn = self.db.get(); + let data = ins.data.clone(); + let data = data.as_slice(); + + let tkn_instruction = + mpl_bubblegum::instruction::Transfer::try_from_slice(&data[8..])?; + let new_leaf_owner_account_index = account_indices[3]; + let merkle_tree_account_index = account_indices[4]; + let new_leaf_owner_bytes: &[u8] = &keys[new_leaf_owner_account_index as usize]; + let merkle_tree_bytes: &[u8] = &keys[merkle_tree_account_index as usize]; + let new_leaf_owner = Pubkey::try_from(new_leaf_owner_bytes)?; + let merkle_tree = Pubkey::try_from(merkle_tree_bytes)?; + + let asset_id = get_asset_id(&merkle_tree, tkn_instruction.nonce); + + let compression_leaf = + CompressionLeaf::find_by_asset_id(conn, asset_id.to_string()) + .await? + .context("compression leaf not found")?; + + let collection_mint_id = compression_leaf.id; + let leaf_owner = compression_leaf.leaf_owner.clone(); + let mut compression_leaf: compression_leafs::ActiveModel = compression_leaf.into(); + + compression_leaf.leaf_owner = Set(new_leaf_owner.to_string()); + + CompressionLeaf::update(conn, compression_leaf).await?; + + self.producer + .send( + Some(&SolanaNftEvents { + event: Some(UpdateMintOwner(MintOwnershipUpdate { + mint_address: asset_id.to_string(), + sender: leaf_owner, + recipient: new_leaf_owner.to_string(), + tx_signature: Signature::new(sig.as_slice()).to_string(), + })), + }), + Some(&SolanaNftEventKey { + id: collection_mint_id.to_string(), + ..Default::default() + }), + ) + .await?; + } + } + + Ok(()) + } + + pub(crate) async fn process_spl_token_transaction( + &self, + program_account_index: usize, + keys: &[Vec], + sig: &Vec, + message: &Message, + ) -> Result<()> { + let conn = self.db.get(); + for ins in message.instructions.iter() { + let account_indices = ins.accounts.clone(); + let program_idx: usize = ins.program_id_index.try_into()?; + + if program_idx == program_account_index { + let data = ins.data.clone(); + let data = data.as_slice(); + let tkn_instruction = spl_token::instruction::TokenInstruction::unpack(data)?; + + let transfer_info = match tkn_instruction { + TokenInstruction::TransferChecked { amount, .. } => Some((amount, 2)), + TokenInstruction::Transfer { amount } => Some((amount, 1)), + _ => None, + }; + + if transfer_info.is_none() { + continue; + } + + if let Some((1, destination_ata_index)) = transfer_info { + let source_account_index = account_indices[0]; + let source_bytes: &[u8] = &keys[source_account_index as usize]; + let source = Pubkey::try_from(source_bytes)?; + + let collection_mint = + CollectionMint::find_by_ata(conn, source.to_string()).await?; + + if collection_mint.is_none() { + return Ok(()); + } + + let destination_account_index = account_indices[destination_ata_index]; + let destination_bytes: &[u8] = &keys[destination_account_index as usize]; + let destination = Pubkey::try_from(destination_bytes)?; + + let acct = fetch_account(&self.rpc, &destination).await?; + let destination_tkn_act = Account::unpack(&acct.data)?; + let new_owner = destination_tkn_act.owner.to_string(); + let mint = collection_mint.context("No mint found")?; + + CollectionMint::update_owner_and_ata( + conn, + &mint, + new_owner.clone(), + destination.to_string(), + ) + .await?; + + self.producer + .send( + Some(&SolanaNftEvents { + event: Some(UpdateMintOwner(MintOwnershipUpdate { + mint_address: destination_tkn_act.mint.to_string(), + sender: mint.owner.to_string(), + recipient: new_owner, + tx_signature: Signature::new(sig.as_slice()).to_string(), + })), + }), + Some(&SolanaNftEventKey { + id: mint.id.to_string(), + ..Default::default() + }), + ) + .await?; + } + } + } + + Ok(()) + } +} + +async fn fetch_account( + rpc: &Arc, + address: &Pubkey, +) -> Result { + backoff::future::retry(ExponentialBackoff::default(), || async { + let acct = rpc.get_account(address)?; + + Ok(acct) + }) + .await +}