Skip to content

Commit

Permalink
Route events between processors.
Browse files Browse the repository at this point in the history
  • Loading branch information
ray-kast committed Jul 25, 2023
1 parent bae348a commit bf1deca
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 26 deletions.
63 changes: 42 additions & 21 deletions consumer/src/import.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use holaplex_hub_nfts_solana_core::{
db,
proto::{
solana_nft_events::Event as SolanaNftEvent, Attribute, CollectionImport, File, Metadata,
SolanaCollectionPayload, SolanaCreator, SolanaMintPayload, SolanaNftEventKey,
SolanaNftEvents,
nft_events::Event as NftEvent, solana_nft_events::Event as SolanaNftEvent, Attribute,
CollectionImport, File, Metadata, SolanaCollectionPayload, SolanaCreator,
SolanaMintPayload, SolanaNftEventKey, SolanaNftEvents,
},
sea_orm::{EntityTrait, ModelTrait, Set},
Collection,
Collection, Services,
};
use holaplex_hub_nfts_solana_entity::{collection_mints, collections, prelude::CollectionMints};
use hub_core::{chrono::Utc, prelude::*, producer::Producer, tokio, util::DebugShim, uuid::Uuid};
Expand All @@ -18,28 +18,44 @@ use crate::{
solana::Solana,
};

const MAX_LIMIT: u64 = 1000;

pub struct CollectionImporter {
// TODO: could this just be a newtype over events::Processor?
#[derive(Debug, Clone)]
pub struct Processor {
solana: DebugShim<Solana>,
db: db::Connection,
producer: Producer<SolanaNftEvents>,
solana: DebugShim<Solana>,
}

impl CollectionImporter {
impl Processor {
pub fn new(
solana: Solana,
db: db::Connection,
producer: Producer<SolanaNftEvents>,
solana: DebugShim<Solana>,
) -> Self {
Self {
solana: DebugShim(solana),
db,
producer,
solana,
}
}

pub async fn process(
pub async fn process(&self, msg: &Services) -> Result<Option<()>> {
match msg {
Services::Nfts(key, msg) => {
let key = SolanaNftEventKey::from(key.clone());

match msg.event {
Some(NftEvent::StartedImportingSolanaCollection(ref c)) => {
self.process_import(key, c.clone()).await.map(Some)
},
_ => Ok(None),
}
},
_ => Ok(None),
}
}

async fn process_import(
&self,
SolanaNftEventKey {
user_id,
Expand All @@ -48,6 +64,8 @@ impl CollectionImporter {
}: SolanaNftEventKey,
CollectionImport { mint_address }: CollectionImport,
) -> Result<()> {
const MAX_LIMIT: u64 = 1000;

let rpc = &self.solana.0.asset_rpc();
let db = &self.db;
let producer = &self.producer;

Check warning on line 71 in consumer/src/import.rs

View workflow job for this annotation

GitHub Actions / Cargo Test

unused variable: `producer`

Check warning on line 71 in consumer/src/import.rs

View workflow job for this annotation

GitHub Actions / clippy/check/doc

unused variable: `producer`
Expand Down Expand Up @@ -158,15 +176,18 @@ impl CollectionImporter {
let (metadata_pubkey, _) = find_metadata_account(&mint);

let (master_edition, _) = find_master_edition_account(&mint);
let collection_model = Collection::create(db, collections::ActiveModel {
master_edition: Set(master_edition.to_string()),
update_authority: Set(update_authority.to_string()),
associated_token_account: Set(ata.to_string()),
owner: Set(owner.to_string()),
mint: Set(mint.to_string()),
metadata: Set(metadata_pubkey.to_string()),
..Default::default()
})
let collection_model = Collection::create(
db,
collections::ActiveModel {
master_edition: Set(master_edition.to_string()),
update_authority: Set(update_authority.to_string()),
associated_token_account: Set(ata.to_string()),
owner: Set(owner.to_string()),
mint: Set(mint.to_string()),
metadata: Set(metadata_pubkey.to_string()),
..Default::default()
},
)
.await?;

producer
Expand Down
2 changes: 1 addition & 1 deletion consumer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
pub(crate) mod asset_api;
mod backend;
pub mod events;
mod import;
pub mod import;
pub mod solana;
use holaplex_hub_nfts_solana_core::db::DbArgs;
use hub_core::{clap, prelude::*};
Expand Down
24 changes: 20 additions & 4 deletions consumer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use holaplex_hub_nfts_solana::{events::Processor, solana::Solana, Args};
use holaplex_hub_nfts_solana::{events, import, solana::Solana, Args};
use holaplex_hub_nfts_solana_core::{db::Connection, proto::SolanaNftEvents, Services};
use hub_core::{prelude::*, tokio};

Expand All @@ -20,18 +20,34 @@ pub fn main() {
let solana = Solana::new(solana)?;

let cons = common.consumer_cfg.build::<Services>().await?;
let event_processor = Processor::new(solana, connection, producer);
// TODO: change these names once there are fewer in-flight feature branches
let import_processor =
import::Processor::new(solana.clone(), connection.clone(), producer.clone());
let event_processor = events::Processor::new(solana, connection, producer);

let mut stream = cons.stream();
loop {
let import_processor = import_processor.clone();
let event_processor = event_processor.clone();

match stream.next().await {
Some(Ok(msg)) => {
info!(?msg, "message received");

tokio::spawn(async move { event_processor.process(msg).await });
tokio::task::yield_now().await;
tokio::spawn(async move {
if let Some(()) = import_processor
.process(&msg)
.await
.map_err(|e| error!("Error processing import: {e:?}"))?
{
return Ok(());
}

event_processor
.process(msg)
.await
.map_err(|e| error!("Error processing event: {:?}", Error::new(e)))
});
},
None => (),
Some(Err(e)) => {
Expand Down

0 comments on commit bf1deca

Please sign in to comment.