Skip to content

Commit

Permalink
Merge pull request #58 from holaplex/abdul/separate-streams
Browse files Browse the repository at this point in the history
Separate streams for spl token and mpl bumblegum
  • Loading branch information
kespinola authored Sep 20, 2023
2 parents 19bda01 + 56b97c3 commit b44c21d
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 248 deletions.
11 changes: 4 additions & 7 deletions indexer/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{collections::HashMap, vec};

use futures::{channel::mpsc::SendError, Sink, Stream};
use hub_core::prelude::*;
use solana_program::pubkey::Pubkey;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::{prelude::*, tonic::Status};

Expand Down Expand Up @@ -29,7 +30,7 @@ impl GeyserGrpcConnector {
Ok((subscribe_tx, stream))
}

pub fn build_request() -> SubscribeRequest {
pub fn build_request(program_id: Pubkey) -> SubscribeRequest {
let mut slots = HashMap::new();
slots.insert("client".to_owned(), SubscribeRequestFilterSlots {});

Expand All @@ -38,20 +39,16 @@ impl GeyserGrpcConnector {
vote: Some(false),
failed: Some(false),
signature: None,
account_include: vec![spl_token::ID.to_string(), mpl_bubblegum::ID.to_string()],
account_include: vec![program_id.to_string()],
account_exclude: Vec::new(),
account_required: Vec::new(),
});

SubscribeRequest {
accounts: HashMap::new(),
slots,
transactions,
blocks: HashMap::new(),
blocks_meta: HashMap::new(),
commitment: Some(CommitmentLevel::Finalized as i32),
accounts_data_slice: vec![],
entry: HashMap::new(),
..Default::default()
}
}
}
290 changes: 49 additions & 241 deletions indexer/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,23 @@
use std::{convert::TryInto, sync::Arc};
use std::sync::Arc;

use anchor_lang::AnchorDeserialize;
use backoff::ExponentialBackoff;
use dashmap::DashMap;
use futures::{sink::SinkExt, stream::StreamExt};
use holaplex_hub_nfts_solana_core::{
db::Connection,
proto::{
solana_nft_events::Event::UpdateMintOwner, MintOwnershipUpdate, SolanaNftEventKey,
SolanaNftEvents,
},
sea_orm::Set,
CollectionMint, CompressionLeaf,
use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents};
use hub_core::{
backon::{ExponentialBuilder, Retryable},
prelude::*,
producer::Producer,
tokio,
};
use holaplex_hub_nfts_solana_entity::compression_leafs;
use hub_core::{prelude::*, producer::Producer, tokio::task};
use mpl_bubblegum::utils::get_asset_id;
use solana_client::rpc_client::RpcClient;
use solana_program::program_pack::Pack;
use solana_sdk::{pubkey::Pubkey, signature::Signature};
use spl_token::{instruction::TokenInstruction, state::Account};
use yellowstone_grpc_client::GeyserGrpcClientError;
use yellowstone_grpc_proto::{
prelude::{
subscribe_update::UpdateOneof, Message, SubscribeUpdate, SubscribeUpdateTransaction,
},
tonic::Status,
};
use yellowstone_grpc_proto::prelude::SubscribeRequest;

use crate::{Args, GeyserGrpcConnector};
use crate::{processor::Processor, Args, GeyserGrpcConnector};

#[derive(Clone)]
pub struct MessageHandler {
db: Connection,
dashmap: DashMap<u64, Vec<SubscribeUpdateTransaction>>,
rpc: Arc<RpcClient>,
connector: GeyserGrpcConnector,
producer: Producer<SolanaNftEvents>,
processor: Processor,
}

impl MessageHandler {
Expand All @@ -51,241 +32,68 @@ impl MessageHandler {
let db = Connection::new(db)
.await
.context("failed to get database connection")?;
let dashmap: DashMap<u64, Vec<SubscribeUpdateTransaction>> = DashMap::new();

let rpc = Arc::new(RpcClient::new(solana_endpoint));
let connector = GeyserGrpcConnector::new(dragon_mouth_endpoint, dragon_mouth_x_token);

let processor = Processor::new(db, rpc, producer);

Ok(Self {
db,
dashmap,
rpc,
connector,
producer,
processor,
})
}

pub async fn run(&self) -> Result<()> {
let request = GeyserGrpcConnector::build_request();
let (mut subscribe_tx, mut stream) = self.connector.subscribe().await?;

loop {
let request = request.clone();
async fn connect(&self, request: SubscribeRequest) -> Result<()> {
(|| async {
let (mut subscribe_tx, mut stream) = self.connector.subscribe().await?;

subscribe_tx
.send(request)
.send(request.clone())
.await
.map_err(GeyserGrpcClientError::SubscribeSendError)?;

while let Some(message) = stream.next().await {
self.handle_message(message).await?;
self.processor.process(message).await?;
}
}
}

async fn handle_message(&self, message: Result<SubscribeUpdate, Status>) -> Result<()> {
match message {
Ok(msg) => match msg.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx);
},
Some(UpdateOneof::Slot(slot)) => {
if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) {
for tx in transactions {
task::spawn(self.clone().process_transaction(tx));
}
}
},
_ => {},
},
Err(error) => return Err(anyhow!("stream error: {:?}", error)),
};

Ok(())
Ok(())
})
.retry(
&ExponentialBuilder::default()
.with_max_times(10)
.with_jitter(),
)
.notify(|err: &Error, dur: Duration| {
error!("stream error: {:?} retrying in {:?}", err, dur);
})
.await
}

async fn process_transaction(self, tx: SubscribeUpdateTransaction) -> Result<()> {
let message = tx
.transaction
.clone()
.context("SubscribeTransactionInfo not found")?
.transaction
.context("Transaction not found")?
.message
.context("Message not found")?;
let sig = tx
.transaction
.as_ref()
.ok_or_else(|| anyhow!("failed to get transaction"))?
.signature
.clone();

let keys = message.clone().account_keys;

for (idx, key) in message.clone().account_keys.iter().enumerate() {
let key: &[u8] = key;
let k = Pubkey::try_from(key)?;
if k == spl_token::ID {
self.process_spl_token_transaction(idx, &keys, &sig, &message)
.await?;
} else if k == mpl_bubblegum::ID {
self.process_mpl_bubblegum_transaction(idx, &keys, &sig, &message)
.await?;
pub async fn run(self) -> Result<()> {
let spl_token_stream = tokio::spawn({
let handler = self.clone();
async move {
handler
.connect(GeyserGrpcConnector::build_request(spl_token::ID))
.await
}
}

Ok(())
}

async fn process_mpl_bubblegum_transaction(
&self,
program_account_index: usize,
keys: &[Vec<u8>],
sig: &Vec<u8>,
message: &Message,
) -> Result<()> {
for ins in message.instructions.iter() {
let account_indices = ins.accounts.clone();
let program_idx: usize = ins.program_id_index.try_into()?;

if program_idx == program_account_index {
let conn = self.db.get();
let data = ins.data.clone();
let data = data.as_slice();

let tkn_instruction =
mpl_bubblegum::instruction::Transfer::try_from_slice(&data[8..])?;
let new_leaf_owner_account_index = account_indices[3];
let merkle_tree_account_index = account_indices[4];
let new_leaf_owner_bytes: &[u8] = &keys[new_leaf_owner_account_index as usize];
let merkle_tree_bytes: &[u8] = &keys[merkle_tree_account_index as usize];
let new_leaf_owner = Pubkey::try_from(new_leaf_owner_bytes)?;
let merkle_tree = Pubkey::try_from(merkle_tree_bytes)?;

let asset_id = get_asset_id(&merkle_tree, tkn_instruction.nonce);

let compression_leaf =
CompressionLeaf::find_by_asset_id(conn, asset_id.to_string())
.await?
.context("compression leaf not found")?;
});

let collection_mint_id = compression_leaf.id;
let leaf_owner = compression_leaf.leaf_owner.clone();
let mut compression_leaf: compression_leafs::ActiveModel = compression_leaf.into();

compression_leaf.leaf_owner = Set(new_leaf_owner.to_string());

CompressionLeaf::update(conn, compression_leaf).await?;

self.producer
.send(
Some(&SolanaNftEvents {
event: Some(UpdateMintOwner(MintOwnershipUpdate {
mint_address: asset_id.to_string(),
sender: leaf_owner,
recipient: new_leaf_owner.to_string(),
tx_signature: Signature::new(sig.as_slice()).to_string(),
})),
}),
Some(&SolanaNftEventKey {
id: collection_mint_id.to_string(),
..Default::default()
}),
)
.await?;
let mpl_bubblegum_stream = tokio::spawn({
async move {
self.connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID))
.await
}
}

Ok(())
}

async fn process_spl_token_transaction(
&self,
program_account_index: usize,
keys: &[Vec<u8>],
sig: &Vec<u8>,
message: &Message,
) -> Result<()> {
let conn = self.db.get();
for ins in message.instructions.iter() {
let account_indices = ins.accounts.clone();
let program_idx: usize = ins.program_id_index.try_into()?;

if program_idx == program_account_index {
let data = ins.data.clone();
let data = data.as_slice();
let tkn_instruction = spl_token::instruction::TokenInstruction::unpack(data)?;

let transfer_info = match tkn_instruction {
TokenInstruction::TransferChecked { amount, .. } => Some((amount, 2)),
TokenInstruction::Transfer { amount } => Some((amount, 1)),
_ => None,
};

if transfer_info.is_none() {
continue;
}

if let Some((1, destination_ata_index)) = transfer_info {
let source_account_index = account_indices[0];
let source_bytes: &[u8] = &keys[source_account_index as usize];
let source = Pubkey::try_from(source_bytes)?;
});

let collection_mint =
CollectionMint::find_by_ata(conn, source.to_string()).await?;

if collection_mint.is_none() {
return Ok(());
}

let destination_account_index = account_indices[destination_ata_index];
let destination_bytes: &[u8] = &keys[destination_account_index as usize];
let destination = Pubkey::try_from(destination_bytes)?;

let acct = fetch_account(&self.rpc, &destination).await?;
let destination_tkn_act = Account::unpack(&acct.data)?;
let new_owner = destination_tkn_act.owner.to_string();
let mint = collection_mint.context("No mint found")?;

CollectionMint::update_owner_and_ata(
conn,
&mint,
new_owner.clone(),
destination.to_string(),
)
.await?;

self.producer
.send(
Some(&SolanaNftEvents {
event: Some(UpdateMintOwner(MintOwnershipUpdate {
mint_address: destination_tkn_act.mint.to_string(),
sender: mint.owner.to_string(),
recipient: new_owner,
tx_signature: Signature::new(sig.as_slice()).to_string(),
})),
}),
Some(&SolanaNftEventKey {
id: mint.id.to_string(),
..Default::default()
}),
)
.await?;
}
tokio::select! {
Err(e) = spl_token_stream => {
bail!("spl token stream error: {:?}", e)
},
Err(e) = mpl_bubblegum_stream => {
bail!("mpl bumblegum stream error: {:?}", e)
}
}

Ok(())
}
}

async fn fetch_account(
rpc: &Arc<RpcClient>,
address: &Pubkey,
) -> Result<solana_sdk::account::Account, solana_client::client_error::ClientError> {
backoff::future::retry(ExponentialBackoff::default(), || async {
let acct = rpc.get_account(address)?;

Ok(acct)
})
.await
}
1 change: 1 addition & 0 deletions indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod connector;
mod handler;
mod processor;
use clap::{arg, command};
pub use connector::GeyserGrpcConnector;
pub use handler::MessageHandler;
Expand Down
Loading

0 comments on commit b44c21d

Please sign in to comment.