diff --git a/consumer/src/events.rs b/consumer/src/events.rs index 2205044..ad9c092 100644 --- a/consumer/src/events.rs +++ b/consumer/src/events.rs @@ -4,33 +4,27 @@ use holaplex_hub_nfts_solana_core::{ nft_events::Event as NftEvent, solana_nft_events::Event as SolanaNftEvent, treasury_events::{Event as TreasuryEvent, SolanaTransactionResult, TransactionStatus}, - Attribute, CollectionImport, File, Metadata, MetaplexMasterEditionTransaction, - MintMetaplexEditionTransaction, MintMetaplexMetadataTransaction, SolanaCollectionPayload, - SolanaCompletedMintTransaction, SolanaCompletedTransferTransaction, - SolanaCompletedUpdateTransaction, SolanaCreator, SolanaFailedTransaction, - SolanaMintPayload, SolanaNftEventKey, SolanaNftEvents, SolanaPendingTransaction, + MetaplexMasterEditionTransaction, MintMetaplexEditionTransaction, + MintMetaplexMetadataTransaction, SolanaCompletedMintTransaction, + SolanaCompletedTransferTransaction, SolanaCompletedUpdateTransaction, + SolanaFailedTransaction, SolanaNftEventKey, SolanaNftEvents, SolanaPendingTransaction, SolanaTransactionFailureReason, TransferMetaplexAssetTransaction, }, - sea_orm::{DbErr, EntityTrait, ModelTrait, Set}, + sea_orm::{DbErr, Set}, Collection, CollectionMint, CompressionLeaf, Services, }; -use holaplex_hub_nfts_solana_entity::{ - collection_mints, collections, compression_leafs, prelude::CollectionMints, -}; +use holaplex_hub_nfts_solana_entity::{collection_mints, collections, compression_leafs}; use hub_core::{ chrono::Utc, prelude::*, producer::{Producer, SendError}, - thiserror, tokio, + thiserror, util::DebugShim, uuid, uuid::Uuid, }; -use mpl_token_metadata::pda::{find_master_edition_account, find_metadata_account}; -use spl_associated_token_account::get_associated_token_address; use crate::{ - asset_api::{self, Asset, RpcClient}, backend::{ CollectionBackend, MasterEditionAddresses, MintBackend, MintEditionAddresses, MintMetaplexAddresses, TransferBackend, @@ -38,8 +32,6 @@ use crate::{ solana::{CompressedRef, EditionRef, Solana, UncompressedRef}, }; -const MAX_LIMIT: u64 = 1000; - #[derive(Debug, thiserror::Error)] pub enum ProcessorErrorKind { #[error("Associated record not found in database")] @@ -55,10 +47,6 @@ pub enum ProcessorErrorKind { InvalidUuid(#[from] uuid::Error), #[error("Database error")] DbError(#[from] DbErr), - #[error("Asset api error")] - AssetApi(#[from] jsonrpsee::core::Error), - #[error("Array Index not found")] - IndexNotFound, } #[derive(Debug, thiserror::Error)] @@ -86,7 +74,6 @@ enum ErrorSource { TreasuryStatus, TreasurySuccess, TreasuryFailure, - ImportCollectionFailure, } impl ErrorSource { @@ -97,7 +84,6 @@ impl ErrorSource { Self::TreasuryStatus => "treasury status check", Self::TreasurySuccess => "treasury success response", Self::TreasuryFailure => "treasury success failure", - Self::ImportCollectionFailure => "collection import failure", } } } @@ -115,7 +101,6 @@ pub enum EventKind { UpdateCollection, MintToCollection, RetryMintToCollection, - ImportCollection, } impl EventKind { @@ -132,32 +117,26 @@ impl EventKind { Self::UpdateCollection => "collection update", Self::MintToCollection => "mint to collection", Self::RetryMintToCollection => "mint to collection retry", - Self::ImportCollection => "import collection", } } - fn into_sign_request(self, tx: SolanaPendingTransaction) -> Result { + fn into_sign_request(self, tx: SolanaPendingTransaction) -> SolanaNftEvent { match self { - EventKind::CreateDrop => Ok(SolanaNftEvent::CreateDropSigningRequested(tx)), - EventKind::MintDrop => Ok(SolanaNftEvent::MintDropSigningRequested(tx)), - EventKind::UpdateDrop => Ok(SolanaNftEvent::UpdateDropSigningRequested(tx)), - EventKind::TransferAsset => Ok(SolanaNftEvent::TransferAssetSigningRequested(tx)), - EventKind::RetryCreateDrop => Ok(SolanaNftEvent::RetryCreateDropSigningRequested(tx)), - EventKind::RetryMintDrop => Ok(SolanaNftEvent::RetryMintDropSigningRequested(tx)), - EventKind::CreateCollection => Ok(SolanaNftEvent::CreateCollectionSigningRequested(tx)), - EventKind::UpdateCollection => Ok(SolanaNftEvent::UpdateCollectionSigningRequested(tx)), + EventKind::CreateDrop => SolanaNftEvent::CreateDropSigningRequested(tx), + EventKind::MintDrop => SolanaNftEvent::MintDropSigningRequested(tx), + EventKind::UpdateDrop => SolanaNftEvent::UpdateDropSigningRequested(tx), + EventKind::TransferAsset => SolanaNftEvent::TransferAssetSigningRequested(tx), + EventKind::RetryCreateDrop => SolanaNftEvent::RetryCreateDropSigningRequested(tx), + EventKind::RetryMintDrop => SolanaNftEvent::RetryMintDropSigningRequested(tx), + EventKind::CreateCollection => SolanaNftEvent::CreateCollectionSigningRequested(tx), + EventKind::UpdateCollection => SolanaNftEvent::UpdateCollectionSigningRequested(tx), EventKind::RetryCreateCollection => { - Ok(SolanaNftEvent::RetryCreateCollectionSigningRequested(tx)) + SolanaNftEvent::RetryCreateCollectionSigningRequested(tx) }, - EventKind::MintToCollection => Ok(SolanaNftEvent::MintToCollectionSigningRequested(tx)), + EventKind::MintToCollection => SolanaNftEvent::MintToCollectionSigningRequested(tx), EventKind::RetryMintToCollection => { - Ok(SolanaNftEvent::RetryMintToCollectionSigningRequested(tx)) + SolanaNftEvent::RetryMintToCollectionSigningRequested(tx) }, - EventKind::ImportCollection => Err(ProcessorError::new( - ProcessorErrorKind::Solana(anyhow!("Invalid Operation")), - EventKind::ImportCollection, - ErrorSource::ImportCollectionFailure, - )), } } @@ -169,19 +148,17 @@ impl EventKind { ) -> ProcessResult { let id = || Uuid::parse_str(&key.id); - match self { + Ok(match self { Self::CreateDrop => { let id = id()?; let collection = Collection::find_by_id(db, id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; - Ok(SolanaNftEvent::CreateDropSubmitted( - SolanaCompletedMintTransaction { - signature, - address: collection.mint, - }, - )) + SolanaNftEvent::CreateDropSubmitted(SolanaCompletedMintTransaction { + signature, + address: collection.mint, + }) }, Self::CreateCollection => { let id = id()?; @@ -190,12 +167,10 @@ impl EventKind { .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; - Ok(SolanaNftEvent::CreateCollectionSubmitted( - SolanaCompletedMintTransaction { - signature, - address: collection.mint, - }, - )) + SolanaNftEvent::CreateCollectionSubmitted(SolanaCompletedMintTransaction { + signature, + address: collection.mint, + }) }, Self::RetryCreateCollection => { let id = id()?; @@ -204,12 +179,15 @@ impl EventKind { .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; - Ok(SolanaNftEvent::RetryCreateCollectionSubmitted( - SolanaCompletedMintTransaction { - signature, - address: collection.mint, - }, - )) + SolanaNftEvent::RetryCreateCollectionSubmitted(SolanaCompletedMintTransaction { + signature, + address: collection.mint, + }) + }, + Self::UpdateCollection => { + SolanaNftEvent::UpdateCollectionSubmitted(SolanaCompletedUpdateTransaction { + signature, + }) }, Self::MintToCollection => { let id = id()?; @@ -217,47 +195,40 @@ impl EventKind { .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; - Ok(SolanaNftEvent::MintToCollectionSubmitted( - SolanaCompletedMintTransaction { - signature, - address: collection_mint.mint.to_string(), - }, - )) + SolanaNftEvent::MintToCollectionSubmitted(SolanaCompletedMintTransaction { + signature, + address: collection_mint.mint.to_string(), + }) }, - Self::UpdateCollection => Ok(SolanaNftEvent::UpdateCollectionSubmitted( - SolanaCompletedUpdateTransaction { signature }, - )), Self::MintDrop => { let id = id()?; let collection_mint = CollectionMint::find_by_id(db, id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; - Ok(SolanaNftEvent::MintDropSubmitted( - SolanaCompletedMintTransaction { - signature, - address: collection_mint.mint, - }, - )) + SolanaNftEvent::MintDropSubmitted(SolanaCompletedMintTransaction { + signature, + address: collection_mint.mint, + }) + }, + Self::UpdateDrop => { + SolanaNftEvent::UpdateDropSubmitted(SolanaCompletedUpdateTransaction { signature }) + }, + Self::TransferAsset => { + SolanaNftEvent::TransferAssetSubmitted(SolanaCompletedTransferTransaction { + signature, + }) }, - Self::UpdateDrop => Ok(SolanaNftEvent::UpdateDropSubmitted( - SolanaCompletedUpdateTransaction { signature }, - )), - Self::TransferAsset => Ok(SolanaNftEvent::TransferAssetSubmitted( - SolanaCompletedTransferTransaction { signature }, - )), Self::RetryCreateDrop => { let id = id()?; let collection = Collection::find_by_id(db, id) .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; - Ok(SolanaNftEvent::RetryCreateDropSubmitted( - SolanaCompletedMintTransaction { - signature, - address: collection.mint, - }, - )) + SolanaNftEvent::RetryCreateDropSubmitted(SolanaCompletedMintTransaction { + signature, + address: collection.mint, + }) }, Self::RetryMintDrop => { let id = id()?; @@ -265,12 +236,10 @@ impl EventKind { .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; - Ok(SolanaNftEvent::RetryMintDropSubmitted( - SolanaCompletedMintTransaction { - signature, - address: collection_mint.mint, - }, - )) + SolanaNftEvent::RetryMintDropSubmitted(SolanaCompletedMintTransaction { + signature, + address: collection_mint.mint, + }) }, Self::RetryMintToCollection => { let id = id()?; @@ -278,32 +247,27 @@ impl EventKind { .await? .ok_or(ProcessorErrorKind::RecordNotFound)?; - Ok(SolanaNftEvent::RetryMintToCollectionSubmitted( - SolanaCompletedMintTransaction { - signature, - address: collection_mint.mint, - }, - )) + SolanaNftEvent::RetryMintToCollectionSubmitted(SolanaCompletedMintTransaction { + signature, + address: collection_mint.mint, + }) }, - - Self::ImportCollection => Err(ProcessorErrorKind::Solana(anyhow!("Invalid Operation"))), - } + }) } - fn into_failure(self, tx: SolanaFailedTransaction) -> ProcessResult { + fn into_failure(self, tx: SolanaFailedTransaction) -> SolanaNftEvent { match self { - Self::CreateDrop => Ok(SolanaNftEvent::CreateDropFailed(tx)), - Self::MintDrop => Ok(SolanaNftEvent::MintDropFailed(tx)), - Self::UpdateDrop => Ok(SolanaNftEvent::UpdateDropFailed(tx)), - Self::TransferAsset => Ok(SolanaNftEvent::TransferAssetFailed(tx)), - Self::RetryCreateDrop => Ok(SolanaNftEvent::RetryCreateDropFailed(tx)), - Self::RetryMintDrop => Ok(SolanaNftEvent::RetryMintDropFailed(tx)), - Self::CreateCollection => Ok(SolanaNftEvent::CreateCollectionFailed(tx)), - Self::RetryCreateCollection => Ok(SolanaNftEvent::RetryCreateCollectionFailed(tx)), - Self::UpdateCollection => Ok(SolanaNftEvent::UpdateCollectionFailed(tx)), - Self::MintToCollection => Ok(SolanaNftEvent::MintToCollectionFailed(tx)), - Self::RetryMintToCollection => Ok(SolanaNftEvent::RetryMintToCollectionFailed(tx)), - Self::ImportCollection => Err(ProcessorErrorKind::Solana(anyhow!("Invalid Operation"))), + Self::CreateDrop => SolanaNftEvent::CreateDropFailed(tx), + Self::MintDrop => SolanaNftEvent::MintDropFailed(tx), + Self::UpdateDrop => SolanaNftEvent::UpdateDropFailed(tx), + Self::TransferAsset => SolanaNftEvent::TransferAssetFailed(tx), + Self::RetryCreateDrop => SolanaNftEvent::RetryCreateDropFailed(tx), + Self::RetryMintDrop => SolanaNftEvent::RetryMintDropFailed(tx), + Self::CreateCollection => SolanaNftEvent::CreateCollectionFailed(tx), + Self::RetryCreateCollection => SolanaNftEvent::RetryCreateCollectionFailed(tx), + Self::UpdateCollection => SolanaNftEvent::UpdateCollectionFailed(tx), + Self::MintToCollection => SolanaNftEvent::MintToCollectionFailed(tx), + Self::RetryMintToCollection => SolanaNftEvent::RetryMintToCollectionFailed(tx), } } } @@ -437,15 +401,6 @@ impl Processor { ) .await }, - Some(NftEvent::StartedImportingSolanaCollection(payload)) => { - self.import_collection(key, payload).await.map_err(|k| { - ProcessorError::new( - k, - EventKind::ImportCollection, - ErrorSource::ImportCollectionFailure, - ) - }) - }, _ => Ok(()), } }, @@ -511,7 +466,7 @@ impl Processor { .producer .send( Some(&SolanaNftEvents { - event: Some(kind.into_sign_request(tx)?), + event: Some(kind.into_sign_request(tx)), }), Some(key), ) @@ -595,7 +550,7 @@ impl Processor { Some(&SolanaNftEvents { event: Some(kind.into_failure(SolanaFailedTransaction { reason: reason as i32, - })?), + })), }), Some(key), ) @@ -623,18 +578,18 @@ impl Processor { } = tx.addresses; let id = key.id.parse()?; - let collection = collections::ActiveModel { - id: Set(id), - master_edition: Set(master_edition.to_string()), - owner: Set(owner.to_string()), - metadata: Set(metadata.to_string()), - associated_token_account: Set(associated_token_account.to_string()), - mint: Set(mint.to_string()), - update_authority: Set(update_authority.to_string()), - ..Default::default() + let collection = collections::Model { + id, + master_edition: master_edition.to_string(), + owner: owner.to_string(), + metadata: metadata.to_string(), + associated_token_account: associated_token_account.to_string(), + mint: mint.to_string(), + update_authority: update_authority.to_string(), + created_at: Utc::now().naive_utc(), }; - Collection::create(&self.db, collection).await?; + Collection::create(&self.db, collection.into()).await?; Ok(tx.into()) } @@ -876,294 +831,4 @@ impl Processor { Ok(tx.into()) } - - async fn import_collection( - &self, - SolanaNftEventKey { - user_id, - project_id, - .. - }: SolanaNftEventKey, - CollectionImport { mint_address }: CollectionImport, - ) -> ProcessResult<()> { - let rpc = &self.solana.0.asset_rpc(); - let db = &self.db; - let producer = &self.producer; - let mut page = 1; - - let collection = rpc - .get_asset(&mint_address) - .await - .map_err(ProcessorErrorKind::AssetApi)?; - - let collection_model = Collection::find_by_mint(db, collection.id.to_string()).await?; - if let Some(collection_model) = collection_model { - info!( - "Deleting already indexed collection: {:?}", - collection_model.id - ); - - collection_model.delete(db.get()).await?; - } - - info!("Importing collection: {:?}", collection.id.to_string()); - - let collection_model = index_collection( - project_id.clone(), - user_id.clone(), - collection, - db, - producer, - ) - .await?; - - loop { - let result = rpc - .search_assets(vec!["collection", &mint_address], page) - .await - .map_err(ProcessorErrorKind::AssetApi)?; - - let mut mints: Vec = Vec::new(); - - for asset in result.items { - let project_id = project_id.clone(); - let user_id = user_id.clone(); - - // Check whether NFT is burned - if asset.ownership.owner.0.is_empty() { - continue; - } - - info!("Importing mint: {:?}", asset.id.to_string()); - let model = emit_index_collection_mint_event( - project_id, - user_id, - collection_model.id, - asset, - producer.clone(), - ) - .await?; - - mints.push(model.into()); - } - - CollectionMints::insert_many(mints) - .exec(self.db.get()) - .await?; - - if result.total < MAX_LIMIT { - break; - } - page += 1; - } - - Ok(()) - } -} - -async fn index_collection( - project_id: String, - user_id: String, - collection: Asset, - db: &db::Connection, - producer: &Producer, -) -> ProcessResult { - let owner = collection.ownership.owner.into(); - let mint = collection.id.into(); - let seller_fee_basis_points = collection.royalty.basis_points; - let metadata = collection.content.metadata; - let files: Vec = collection - .content - .files - .map(|fs| fs.iter().map(Into::into).collect()) - .unwrap_or_default(); - - let image = files - .iter() - .find(|f| f.mime.as_ref().map_or(false, |m| m.contains("image"))) - .map(|f| f.uri.clone()) - .unwrap_or_default(); - - let attributes = metadata - .attributes - .clone() - .map(|attributes| attributes.iter().map(Into::into).collect::>()) - .unwrap_or_default(); - - let creators = collection - .creators - .iter() - .map(|c| SolanaCreator { - address: c.address.to_string(), - verified: c.verified, - share: c.share, - }) - .collect::>(); - // Collection Model fields - let update_authority = &collection - .authorities - .get(0) - .ok_or(ProcessorErrorKind::IndexNotFound)? - .address; - - let ata = get_associated_token_address(&owner, &mint); - let (metadata_pubkey, _) = find_metadata_account(&mint); - - let (master_edition, _) = find_master_edition_account(&mint); - let collection_model = Collection::create(db, collections::ActiveModel { - master_edition: Set(master_edition.to_string()), - update_authority: Set(update_authority.to_string()), - associated_token_account: Set(ata.to_string()), - owner: Set(owner.to_string()), - mint: Set(mint.to_string()), - metadata: Set(metadata_pubkey.to_string()), - ..Default::default() - }) - .await?; - - producer - .send( - Some(&SolanaNftEvents { - event: Some(SolanaNftEvent::ImportedExternalCollection( - SolanaCollectionPayload { - supply: collection.supply.map(|s| s.print_max_supply), - mint_address: mint.to_string(), - seller_fee_basis_points, - creators, - metadata: Some(Metadata { - name: metadata.name, - description: metadata.description, - symbol: metadata.symbol, - attributes, - uri: collection.content.json_uri, - image, - }), - files, - update_authority: update_authority.to_string(), - }, - )), - }), - Some(&SolanaNftEventKey { - id: collection_model.id.to_string(), - project_id, - user_id, - }), - ) - .await - .map_err(ProcessorErrorKind::SendError)?; - - Ok(collection_model) -} - -async fn emit_index_collection_mint_event( - project_id: String, - user_id: String, - collection: Uuid, - asset: Asset, - producer: Producer, -) -> ProcessResult { - let owner = asset.ownership.owner.into(); - - let mint = asset.id.into(); - let ata = get_associated_token_address(&owner, &mint); - let seller_fee_basis_points = asset.royalty.basis_points; - let metadata = asset.content.metadata; - let update_authority = asset - .authorities - .get(0) - .ok_or(ProcessorErrorKind::IndexNotFound)? - .address - .clone(); - - let files = asset - .content - .files - .map(|fs| fs.iter().map(Into::into).collect::>()) - .unwrap_or_default(); - - let image = files - .iter() - .find(|f| f.mime.as_ref().map_or(false, |m| m.contains("image"))) - .map(|f| f.uri.clone()) - .unwrap_or_default(); - - let attributes = metadata - .attributes - .map(|attributes| attributes.iter().map(Into::into).collect()) - .unwrap_or_default(); - - let creators = asset - .creators - .iter() - .map(|c| SolanaCreator { - address: c.address.to_string(), - verified: c.verified, - share: c.share, - }) - .collect::>(); - - #[allow(clippy::cast_sign_loss)] - let uuid = Uuid::from_u64_pair(Utc::now().timestamp_nanos() as u64, rand::random::()); - - let mint_model = collection_mints::Model { - id: uuid, - collection_id: collection, - mint: mint.to_string(), - owner: owner.to_string(), - associated_token_account: ata.to_string(), - created_at: Utc::now().naive_utc(), - }; - - tokio::spawn(async move { - producer - .send( - Some(&SolanaNftEvents { - event: Some(SolanaNftEvent::ImportedExternalMint(SolanaMintPayload { - collection_id: collection.to_string(), - mint_address: mint.to_string(), - owner: owner.to_string(), - seller_fee_basis_points, - compressed: asset.compression.compressed, - creators, - metadata: Some(Metadata { - name: metadata.name, - description: metadata.description, - symbol: metadata.symbol, - attributes, - uri: asset.content.json_uri, - image, - }), - files, - update_authority: update_authority.to_string(), - })), - }), - Some(&SolanaNftEventKey { - id: uuid.to_string(), - user_id, - project_id, - }), - ) - .await - .map_err(ProcessorErrorKind::SendError) - }); - - Ok(mint_model) -} - -impl From<&asset_api::File> for File { - fn from(file: &asset_api::File) -> Self { - Self { - uri: file.uri.clone(), - mime: file.mime.clone(), - } - } -} - -impl From<&asset_api::Attribute> for Attribute { - fn from(attr: &asset_api::Attribute) -> Self { - Self { - value: attr.value.to_string(), - trait_type: attr.trait_type.to_string(), - } - } } diff --git a/consumer/src/import.rs b/consumer/src/import.rs new file mode 100644 index 0000000..5a77dc3 --- /dev/null +++ b/consumer/src/import.rs @@ -0,0 +1,318 @@ +use holaplex_hub_nfts_solana_core::{ + db, + proto::{ + solana_nft_events::Event as SolanaNftEvent, Attribute, CollectionImport, File, Metadata, + SolanaCollectionPayload, SolanaCreator, SolanaMintPayload, SolanaNftEventKey, + SolanaNftEvents, + }, + sea_orm::{EntityTrait, ModelTrait, Set}, + Collection, +}; +use holaplex_hub_nfts_solana_entity::{collection_mints, collections, prelude::CollectionMints}; +use hub_core::{chrono::Utc, prelude::*, producer::Producer, tokio, util::DebugShim, uuid::Uuid}; +use mpl_token_metadata::pda::{find_master_edition_account, find_metadata_account}; +use spl_associated_token_account::get_associated_token_address; + +use crate::{ + asset_api::{self, Asset, RpcClient}, + solana::Solana, +}; + +const MAX_LIMIT: u64 = 1000; + +pub struct CollectionImporter { + db: db::Connection, + producer: Producer, + solana: DebugShim, +} + +impl CollectionImporter { + pub fn new( + db: db::Connection, + producer: Producer, + solana: DebugShim, + ) -> Self { + Self { + db, + producer, + solana, + } + } + + pub async fn process( + &self, + SolanaNftEventKey { + user_id, + project_id, + .. + }: SolanaNftEventKey, + CollectionImport { mint_address }: CollectionImport, + ) -> Result<()> { + let rpc = &self.solana.0.asset_rpc(); + let db = &self.db; + let producer = &self.producer; + let mut page = 1; + + let collection = rpc.get_asset(&mint_address).await?; + + let collection_model = Collection::find_by_mint(db, collection.id.to_string()).await?; + if let Some(collection_model) = collection_model { + info!( + "Deleting already indexed collection: {:?}", + collection_model.id + ); + + collection_model.delete(db.get()).await?; + } + + info!("Importing collection: {:?}", collection.id.to_string()); + + let collection_model = self + .index_collection(project_id.clone(), user_id.clone(), collection) + .await?; + + loop { + let result = rpc + .search_assets(vec!["collection", &mint_address], page) + .await?; + + let mut mints: Vec = Vec::new(); + + for asset in result.items { + let project_id = project_id.clone(); + let user_id = user_id.clone(); + + // Check whether NFT is burned + if asset.ownership.owner.0.is_empty() { + continue; + } + + info!("Importing mint: {:?}", asset.id.to_string()); + let model = self + .collection_mint_event(project_id, user_id, collection_model.id, asset) + .await?; + + mints.push(model.into()); + } + + CollectionMints::insert_many(mints) + .exec(self.db.get()) + .await?; + + if result.total < MAX_LIMIT { + break; + } + page += 1; + } + + Ok(()) + } + + async fn index_collection( + &self, + project_id: String, + user_id: String, + collection: Asset, + ) -> Result { + let db = &self.db; + let producer = &self.producer; + let owner = collection.ownership.owner.into(); + let mint = collection.id.into(); + let seller_fee_basis_points = collection.royalty.basis_points; + let metadata = collection.content.metadata; + let files: Vec = collection + .content + .files + .map(|fs| fs.iter().map(Into::into).collect()) + .unwrap_or_default(); + + let image = files + .iter() + .find(|f| f.mime.as_ref().map_or(false, |m| m.contains("image"))) + .map(|f| f.uri.clone()) + .unwrap_or_default(); + + let attributes = metadata + .attributes + .clone() + .map(|attributes| attributes.iter().map(Into::into).collect::>()) + .unwrap_or_default(); + + let creators = collection + .creators + .iter() + .map(|c| SolanaCreator { + address: c.address.to_string(), + verified: c.verified, + share: c.share, + }) + .collect::>(); + // Collection Model fields + let update_authority = &collection + .authorities + .get(0) + .context("Invalid index")? + .address; + + let ata = get_associated_token_address(&owner, &mint); + let (metadata_pubkey, _) = find_metadata_account(&mint); + + let (master_edition, _) = find_master_edition_account(&mint); + let collection_model = Collection::create(db, collections::ActiveModel { + master_edition: Set(master_edition.to_string()), + update_authority: Set(update_authority.to_string()), + associated_token_account: Set(ata.to_string()), + owner: Set(owner.to_string()), + mint: Set(mint.to_string()), + metadata: Set(metadata_pubkey.to_string()), + ..Default::default() + }) + .await?; + + producer + .send( + Some(&SolanaNftEvents { + event: Some(SolanaNftEvent::ImportedExternalCollection( + SolanaCollectionPayload { + supply: collection.supply.map(|s| s.print_max_supply), + mint_address: mint.to_string(), + seller_fee_basis_points, + creators, + metadata: Some(Metadata { + name: metadata.name, + description: metadata.description, + symbol: metadata.symbol, + attributes, + uri: collection.content.json_uri, + image, + }), + files, + update_authority: update_authority.to_string(), + }, + )), + }), + Some(&SolanaNftEventKey { + id: collection_model.id.to_string(), + project_id, + user_id, + }), + ) + .await?; + + Ok(collection_model) + } + + async fn collection_mint_event( + &self, + project_id: String, + user_id: String, + collection: Uuid, + asset: Asset, + ) -> Result { + let db = &self.db; + let producer = self.producer.clone(); + let owner = asset.ownership.owner.into(); + + let mint = asset.id.into(); + let ata = get_associated_token_address(&owner, &mint); + let seller_fee_basis_points = asset.royalty.basis_points; + let metadata = asset.content.metadata; + let update_authority = asset + .authorities + .get(0) + .context("Invalid index")? + .address + .clone(); + + let files = asset + .content + .files + .map(|fs| fs.iter().map(Into::into).collect::>()) + .unwrap_or_default(); + + let image = files + .iter() + .find(|f| f.mime.as_ref().map_or(false, |m| m.contains("image"))) + .map(|f| f.uri.clone()) + .unwrap_or_default(); + + let attributes = metadata + .attributes + .map(|attributes| attributes.iter().map(Into::into).collect()) + .unwrap_or_default(); + + let creators = asset + .creators + .iter() + .map(|c| SolanaCreator { + address: c.address.to_string(), + verified: c.verified, + share: c.share, + }) + .collect::>(); + + #[allow(clippy::cast_sign_loss)] + let uuid = Uuid::from_u64_pair(Utc::now().timestamp_nanos() as u64, rand::random::()); + + let mint_model = collection_mints::Model { + id: uuid, + collection_id: collection, + mint: mint.to_string(), + owner: owner.to_string(), + associated_token_account: ata.to_string(), + created_at: Utc::now().naive_utc(), + }; + + tokio::spawn(async move { + producer + .send( + Some(&SolanaNftEvents { + event: Some(SolanaNftEvent::ImportedExternalMint(SolanaMintPayload { + collection_id: collection.to_string(), + mint_address: mint.to_string(), + owner: owner.to_string(), + seller_fee_basis_points, + compressed: asset.compression.compressed, + creators, + metadata: Some(Metadata { + name: metadata.name, + description: metadata.description, + symbol: metadata.symbol, + attributes, + uri: asset.content.json_uri, + image, + }), + files, + update_authority: update_authority.to_string(), + })), + }), + Some(&SolanaNftEventKey { + id: uuid.to_string(), + user_id, + project_id, + }), + ) + .await + }); + + Ok(mint_model) + } +} + +impl From<&asset_api::File> for File { + fn from(file: &asset_api::File) -> Self { + Self { + uri: file.uri.clone(), + mime: file.mime.clone(), + } + } +} + +impl From<&asset_api::Attribute> for Attribute { + fn from(attr: &asset_api::Attribute) -> Self { + Self { + value: attr.value.to_string(), + trait_type: attr.trait_type.to_string(), + } + } +} diff --git a/consumer/src/lib.rs b/consumer/src/lib.rs index 7dc73f0..65651f9 100644 --- a/consumer/src/lib.rs +++ b/consumer/src/lib.rs @@ -5,8 +5,8 @@ pub(crate) mod asset_api; mod backend; pub mod events; +mod import; pub mod solana; - use holaplex_hub_nfts_solana_core::db::DbArgs; use hub_core::{clap, prelude::*}; use solana::SolanaArgs;