From 89293d982cbf9f85d83aa4df72faa39188dac43b Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Fri, 23 Aug 2024 19:45:37 -0300 Subject: [PATCH] feat: make pipeline compatible with Conway era (#168) --- Cargo.lock | 199 ++++++++++++------ Cargo.toml | 16 +- src/crosscut/args.rs | 8 +- src/crosscut/epochs.rs | 4 +- src/crosscut/filters.rs | 11 +- src/enrich/sled.rs | 7 +- src/model.rs | 10 +- src/reducers/address_by_asset.rs | 39 ++-- src/reducers/address_by_txo.rs | 4 +- src/reducers/addresses_by_stake.rs | 6 +- src/reducers/asset_holders_by_asset_id.rs | 50 ++--- src/reducers/balance_by_address.rs | 4 +- src/reducers/block_header_by_hash.rs | 15 +- src/reducers/last_block_parameters.rs | 62 ++++-- src/reducers/mod.rs | 2 +- src/reducers/point_by_tx.rs | 4 +- src/reducers/pool_by_stake.rs | 6 +- src/reducers/supply_by_asset.rs | 21 +- src/reducers/tx_by_hash.rs | 2 +- src/reducers/tx_count_by_address.rs | 20 +- .../tx_count_by_native_token_policy_id.rs | 51 ++--- src/reducers/utxo_by_address.rs | 11 +- src/reducers/utxo_by_stake.rs | 8 +- src/reducers/utxos_by_asset.rs | 27 ++- src/reducers/worker.rs | 2 +- src/sources/n2c/chainsync.rs | 10 +- src/sources/n2c/transport.rs | 3 +- src/sources/n2n/chainsync.rs | 10 +- src/sources/n2n/mod.rs | 2 +- src/sources/n2n/transport.rs | 3 +- src/sources/utils.rs | 98 ++++----- 31 files changed, 398 insertions(+), 317 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b050a7e1..29b293fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,7 +44,7 @@ checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -169,7 +169,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -237,6 +237,21 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +[[package]] +name = "crc" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32fast" version = "1.3.2" @@ -317,9 +332,9 @@ dependencies = [ [[package]] name = "cryptoxide" -version = "0.4.2" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "129eabb7b0b78644a3a7e7cf220714aba47463bb281f69fa7a71ca5d12564cca" +checksum = "382ce8820a5bb815055d3553a610e8cb542b2d767bbacea99038afda96cd760d" [[package]] name = "darling" @@ -342,7 +357,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn", + "syn 1.0.107", ] [[package]] @@ -353,7 +368,7 @@ checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" dependencies = [ "darling_core", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -538,7 +553,7 @@ checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -792,6 +807,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.8" @@ -884,7 +908,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -901,9 +925,19 @@ checksum = "5ef3a5eb0af5d357a7e44287d7ddd094f47de68cda7086c4917578f62e4294df" [[package]] name = "minicbor" -version = "0.18.0" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7005aaf257a59ff4de471a9d5538ec868a21586534fff7f85dd97d4043a6139" +dependencies = [ + "half", + "minicbor-derive", +] + +[[package]] +name = "minicbor" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a20020e8e2d1881d8736f64011bb5ff99f1db9947ce3089706945c8915695cb" +checksum = "9d15f4203d71fdf90903c2696e55426ac97a363c67b218488a73b534ce7aca10" dependencies = [ "half", "minicbor-derive", @@ -911,13 +945,13 @@ dependencies = [ [[package]] name = "minicbor-derive" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8608fb1c805b5b6b3d5ab7bd95c40c396df622b64d77b2d621a5eae1eed050ee" +checksum = "1154809406efdb7982841adb6311b3d095b46f78342dd646736122fe6b19e267" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1049,7 +1083,7 @@ checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1088,54 +1122,53 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64" [[package]] -name = "pallas" -version = "0.17.0" +name = "pallas-addresses" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3aa33418b6ad94a42dde318b035af9b3ea958ca43ca77e6e8a5e9f259f44a837" +checksum = "d628ad58404ddd733e8fe46fe9986489b46258a2ab1bb7b1c4b8e406b91b7cff" dependencies = [ - "pallas-addresses", - "pallas-codec", + "base58", + "bech32 0.9.1", + "crc", + "cryptoxide", + "hex", + "pallas-codec 0.29.0", "pallas-crypto", - "pallas-miniprotocols", - "pallas-multiplexer", - "pallas-primitives", - "pallas-traverse", + "thiserror", ] [[package]] -name = "pallas-addresses" -version = "0.17.0" +name = "pallas-codec" +version = "0.18.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33e4dbcadac1a429795eb111483b697fc848776aeb645d16aa9586849e03bcd7" +checksum = "8b6e03d05d42a663526d78c8b1d4f2554f09bbf4cc846e1a9e839c558bf6103c" dependencies = [ - "base58", - "bech32 0.9.1", "hex", - "pallas-codec", - "pallas-crypto", - "thiserror", + "minicbor 0.19.1", + "serde", ] [[package]] name = "pallas-codec" -version = "0.17.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0121cf6f8a780c073437f98f0bf35014de8f2661aa04ea63469b9360670b1263" +checksum = "da003a7360fa032b80d38b4a15573f885f412f2b3868772d49fb072197a9d5f9" dependencies = [ "hex", - "minicbor 0.18.0", + "minicbor 0.20.0", "serde", + "thiserror", ] [[package]] name = "pallas-crypto" -version = "0.17.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea13446a83190ea3d4fac6c4d177d001eda6a91edb9e01ffeb4570ac6d5dd929" +checksum = "c9248ed0e594bcb0f548393264519c7adea88874d8bd7cc86f894e8ba4e918c2" dependencies = [ "cryptoxide", "hex", - "pallas-codec", + "pallas-codec 0.29.0", "rand_core", "serde", "thiserror", @@ -1143,13 +1176,13 @@ dependencies = [ [[package]] name = "pallas-miniprotocols" -version = "0.17.0" +version = "0.18.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02feb6060d0d421e17126291cb1e87e0131028827607096063939718336a909d" +checksum = "a8a4754676d92ae351ad524d98bc32d70835856ee0623a45288bb50a5ee4b161" dependencies = [ "hex", - "itertools", - "pallas-codec", + "itertools 0.10.3", + "pallas-codec 0.18.2", "pallas-multiplexer", "thiserror", "tracing", @@ -1157,14 +1190,14 @@ dependencies = [ [[package]] name = "pallas-multiplexer" -version = "0.17.0" +version = "0.18.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c62837b151a664b7844769275877ffb6645e1440b1356888a1fca6e076c3c55" +checksum = "5d4b8c7f48cd3b41ce1fac3e829c723d2e76f1cee77b864446d65b2d12ee0aa3" dependencies = [ "byteorder", "hex", "log", - "pallas-codec", + "pallas-codec 0.18.2", "rand", "thiserror", "tracing", @@ -1172,15 +1205,15 @@ dependencies = [ [[package]] name = "pallas-primitives" -version = "0.17.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2cf19eaf7d399719bf2e70313345e0458a76efc2c9401244527419849228a55" +checksum = "c0fa55305212f7828651c8db024e1e286198c2fccb028bbb697c68990c044959" dependencies = [ "base58", "bech32 0.9.1", "hex", "log", - "pallas-codec", + "pallas-codec 0.29.0", "pallas-crypto", "serde", "serde_json", @@ -1188,15 +1221,18 @@ dependencies = [ [[package]] name = "pallas-traverse" -version = "0.17.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2315201959af3f9ecc8ac22a37ed31d0043aa33fe9115242f8e69d8814ded8e" +checksum = "49459bd0d2ba86fd909890a81e6238eaf051952d7e38ad63195301e72e8f458e" dependencies = [ "hex", + "itertools 0.13.0", "pallas-addresses", - "pallas-codec", + "pallas-codec 0.29.0", "pallas-crypto", "pallas-primitives", + "paste", + "serde", "thiserror", ] @@ -1225,6 +1261,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "pathdiff" version = "0.2.1" @@ -1270,7 +1312,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn", + "syn 1.0.107", "version_check", ] @@ -1287,9 +1329,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.50" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ "unicode-ident", ] @@ -1322,9 +1364,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.17" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "632d02bff7f874a36f33ea8bb416cd484b90cc66c1194b1a1110d067a7013f58" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" dependencies = [ "proc-macro2", ] @@ -1522,7 +1564,13 @@ dependencies = [ "minicbor 0.14.2", "net2", "openssl", - "pallas", + "pallas-addresses", + "pallas-codec 0.29.0", + "pallas-crypto", + "pallas-miniprotocols", + "pallas-multiplexer", + "pallas-primitives", + "pallas-traverse", "prometheus_exporter", "rayon", "redis", @@ -1573,22 +1621,22 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.144" +version = "1.0.208" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860" +checksum = "cff085d2cb684faa248efb494c39b68e522822ac0de72ccf08109abde717cfb2" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.144" +version = "1.0.208" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00" +checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.75", ] [[package]] @@ -1633,7 +1681,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1709,6 +1757,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6af063034fc1935ede7be0122941bafa9bacb949334d090b77ca98b5817c7d9" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "tempfile" version = "3.3.0" @@ -1750,22 +1809,22 @@ checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" [[package]] name = "thiserror" -version = "1.0.31" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a" +checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.31" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a" +checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.75", ] [[package]] @@ -1891,7 +1950,7 @@ checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -2009,7 +2068,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 1.0.107", "wasm-bindgen-shared", ] @@ -2043,7 +2102,7 @@ checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/Cargo.toml b/Cargo.toml index b2c90928..9e38d384 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,9 +12,18 @@ authors = ["Santiago Carmuega "] [dependencies] -pallas = "0.17.0" +# pallas = "0.17.0" # pallas = { path = "../pallas/pallas" } # pallas = { git = "https://github.com/txpipe/pallas.git" } + +pallas-multiplexer = "0.18.2" +pallas-miniprotocols = "0.18.2" +pallas-primitives = "0.29.0" +pallas-traverse = "0.29.0" +pallas-addresses = "0.29.0" +pallas-codec = "0.29.0" +pallas-crypto = "0.29.0" + hex = "0.4.3" net2 = "0.2.37" bech32 = "0.8.1" @@ -22,10 +31,7 @@ clap = { version = "3.2.6", features = ["derive"] } log = "0.4.14" env_logger = "0.9.0" merge = "0.1.0" -config = { version = "0.13.0", default-features = false, features = [ - "toml", - "json", -] } +config = { version = "0.13.0", default-features = false, features = ["toml", "json"] } serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" minicbor = "0.14.1" diff --git a/src/crosscut/args.rs b/src/crosscut/args.rs index 352ae3ab..bcd54a8d 100644 --- a/src/crosscut/args.rs +++ b/src/crosscut/args.rs @@ -1,4 +1,4 @@ -use pallas::network::miniprotocols::{ +use pallas_miniprotocols::{ Point, MAINNET_MAGIC, TESTNET_MAGIC, @@ -151,7 +151,7 @@ impl IntersectConfig { /// Optional configuration to stop processing new blocks after processing: /// 1. a block with the given hash /// 2. the first block on or after a given absolute slot -/// 3. TODO: a total of X blocks +/// 3. TODO: a total of X blocks #[derive(Deserialize, Debug, Clone)] pub struct FinalizeConfig { until_hash: Option, @@ -174,13 +174,13 @@ pub fn should_finalize( return expected == &hex::encode(current); } } - + if let Some(max) = config.max_block_slot { if last_point.slot_or_default() >= max { return true; } } - + // if let Some(max) = config.max_block_quantity { // if block_count >= max { // return true; diff --git a/src/crosscut/epochs.rs b/src/crosscut/epochs.rs index b69b1108..6684d3e3 100644 --- a/src/crosscut/epochs.rs +++ b/src/crosscut/epochs.rs @@ -1,7 +1,7 @@ // TODO this is temporary, we should actually use this code from Pallas as this // is very generic code -use pallas::ledger::traverse::MultiEraBlock; +use pallas_traverse::MultiEraBlock; fn post_byron_epoch_for_slot(shelley_known_slot: u64, shelley_epoch_length: u32, slot: u64) -> u64 { let last_byron_epoch_no = 208; @@ -25,7 +25,7 @@ pub fn block_epoch(chain: &super::ChainWellKnownInfo, block: &MultiEraBlock) -> let slot = block.slot(); match block.era() { - pallas::ledger::traverse::Era::Byron => { + pallas_traverse::Era::Byron => { byron_epoch_for_slot(chain.byron_epoch_length, chain.byron_slot_length, slot) } _ => post_byron_epoch_for_slot(chain.shelley_known_slot, chain.shelley_epoch_length, slot), diff --git a/src/crosscut/filters.rs b/src/crosscut/filters.rs index 51f622b0..19f06247 100644 --- a/src/crosscut/filters.rs +++ b/src/crosscut/filters.rs @@ -1,7 +1,5 @@ -use pallas::ledger::{ - addresses::Address, - traverse::{MultiEraBlock, MultiEraTx}, -}; +use pallas_addresses::Address; +use pallas_traverse::{MultiEraBlock, MultiEraTx}; use serde::Deserialize; use crate::prelude::*; @@ -93,7 +91,6 @@ pub struct TransactionPattern { pub is_valid: Option, } - #[derive(Deserialize, Clone)] #[serde(rename_all = "snake_case")] pub enum Predicate { @@ -224,7 +221,7 @@ fn eval_block(block: &MultiEraBlock, pattern: &BlockPattern) -> Result Result { if let Some(b) = pattern.is_valid { - return Ok(tx.is_valid() == b) + return Ok(tx.is_valid() == b); } Ok(false) @@ -287,7 +284,7 @@ pub fn eval_predicate( #[cfg(test)] mod tests { - use pallas::ledger::traverse::MultiEraBlock; + use pallas_traverse::MultiEraBlock; use crate::{ crosscut::policies::{ErrorAction, RuntimePolicy}, diff --git a/src/enrich/sled.rs b/src/enrich/sled.rs index 069b2248..190c8fab 100644 --- a/src/enrich/sled.rs +++ b/src/enrich/sled.rs @@ -5,10 +5,9 @@ use gasket::{ runtime::{spawn_stage, WorkOutcome}, }; -use pallas::{ - codec::minicbor, - ledger::traverse::{Era, MultiEraBlock, MultiEraTx, OutputRef}, -}; +use pallas_codec::minicbor; +use pallas_traverse::{Era, MultiEraBlock, MultiEraTx, OutputRef}; + use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use serde::Deserialize; use sled::IVec; diff --git a/src/model.rs b/src/model.rs index cf9d2e9d..95343482 100644 --- a/src/model.rs +++ b/src/model.rs @@ -1,10 +1,8 @@ use std::{collections::HashMap, fmt::Debug}; -use pallas::{ - ledger::traverse::{Era, MultiEraBlock, MultiEraOutput, MultiEraTx, OutputRef}, - network::miniprotocols::Point, - crypto::hash::Hash, -}; +use pallas_crypto::hash::Hash; +use pallas_miniprotocols::Point; +use pallas_traverse::{Era, MultiEraBlock, MultiEraOutput, MultiEraTx, OutputRef}; use crate::prelude::*; @@ -60,7 +58,7 @@ impl BlockContext { .consumes() .iter() .map(|i| i.output_ref()) - .map(|r| self.find_utxo(&r).map(|u| (r,u))) + .map(|r| self.find_utxo(&r).map(|u| (r, u))) .map(|r| r.apply_policy(policy)) .collect::, _>>()? .into_iter() diff --git a/src/reducers/address_by_asset.rs b/src/reducers/address_by_asset.rs index 3eb7e017..ff80b1e2 100644 --- a/src/reducers/address_by_asset.rs +++ b/src/reducers/address_by_asset.rs @@ -1,5 +1,7 @@ -use pallas::ledger::traverse::MultiEraOutput; -use pallas::ledger::traverse::{Asset, MultiEraBlock}; +use pallas_traverse::MultiEraAsset; +use pallas_traverse::MultiEraBlock; +use pallas_traverse::MultiEraOutput; +use pallas_traverse::MultiEraPolicyAssets; use serde::Deserialize; use crate::{model, prelude::*}; @@ -19,16 +21,20 @@ pub struct Reducer { } impl Reducer { - fn to_string_output(&self, asset: Asset) -> Option { - match asset.policy_hex() { - Some(policy_id) if policy_id.eq(&self.config.policy_id_hex) => match asset { - Asset::NativeAsset(_, name, _) => match self.convert_to_ascii { - true => String::from_utf8(name).ok(), - false => Some(hex::encode(name)), - }, - _ => None, - }, - _ => None, + fn to_string_output( + &self, + policy: &MultiEraPolicyAssets, + asset: &MultiEraAsset, + ) -> Option { + let policy_id = policy.policy().to_string(); + + if policy_id.eq(&self.config.policy_id_hex) { + match self.convert_to_ascii { + true => asset.to_ascii_name(), + false => Some(hex::encode(asset.name())), + } + } else { + None } } @@ -40,7 +46,14 @@ impl Reducer { let asset_names: Vec<_> = txo .non_ada_assets() .into_iter() - .filter_map(|x| self.to_string_output(x)) + .flat_map(|policy| { + policy + .assets() + .iter() + .map(|asset| self.to_string_output(&policy, asset)) + .collect::>() + }) + .flatten() .collect(); if asset_names.is_empty() { diff --git a/src/reducers/address_by_txo.rs b/src/reducers/address_by_txo.rs index f9abad18..768c6851 100644 --- a/src/reducers/address_by_txo.rs +++ b/src/reducers/address_by_txo.rs @@ -1,6 +1,6 @@ use gasket::error::AsWorkError; -use pallas::crypto::hash::Hash; -use pallas::ledger::traverse::MultiEraBlock; +use pallas_crypto::hash::Hash; +use pallas_traverse::MultiEraBlock; use serde::Deserialize; use crate::prelude::*; diff --git a/src/reducers/addresses_by_stake.rs b/src/reducers/addresses_by_stake.rs index d6f95fd8..95033e57 100644 --- a/src/reducers/addresses_by_stake.rs +++ b/src/reducers/addresses_by_stake.rs @@ -1,5 +1,5 @@ -use pallas::ledger::addresses::{Address, StakeAddress}; -use pallas::ledger::traverse::MultiEraBlock; +use pallas_addresses::{Address, StakeAddress}; +use pallas_traverse::MultiEraBlock; use serde::Deserialize; use crate::{crosscut, model, prelude::*}; @@ -91,7 +91,7 @@ impl Config { #[cfg(test)] mod test { use super::any_address_to_stake_bech32; - use pallas::ledger::addresses::Address; + use pallas_addresses::Address; #[test] fn stake_bech32() { diff --git a/src/reducers/asset_holders_by_asset_id.rs b/src/reducers/asset_holders_by_asset_id.rs index f518676c..9cad8235 100644 --- a/src/reducers/asset_holders_by_asset_id.rs +++ b/src/reducers/asset_holders_by_asset_id.rs @@ -1,9 +1,9 @@ -use pallas::ledger::traverse::{Asset, MultiEraOutput}; -use pallas::ledger::traverse::{MultiEraBlock, OutputRef}; +use pallas_traverse::MultiEraOutput; +use pallas_traverse::{MultiEraBlock, OutputRef}; use serde::Deserialize; use crate::{crosscut, model, prelude::*}; -use pallas::crypto::hash::Hash; +use pallas_crypto::hash::Hash; use crate::crosscut::epochs::block_epoch; use std::str::FromStr; @@ -76,22 +76,18 @@ impl Reducer { let address = utxo.address().map(|addr| addr.to_string()).or_panic()?; - for asset in utxo.assets() { - match asset { - Asset::NativeAsset(policy_id, _, quantity) => { - if self.is_policy_id_accepted(&policy_id) { - let subject = asset.subject(); - let key = self.config_key(subject, epoch_no); - let delta = quantity as i64 * (-1); + for policy in utxo.non_ada_assets() { + for asset in policy.assets() { + if self.is_policy_id_accepted(policy.policy()) { + let subject = format!("{}.{}", policy.policy(), hex::encode(asset.name())); + let key = self.config_key(subject, epoch_no); + let delta = asset.output_coin().unwrap_or_default() as i64 * (-1); - let crdt = - model::CRDTCommand::SortedSetRemove(key, address.to_string(), delta); + let crdt = model::CRDTCommand::SortedSetRemove(key, address.to_string(), delta); - output.send(gasket::messaging::Message::from(crdt))?; - } + output.send(gasket::messaging::Message::from(crdt))?; } - _ => (), - }; + } } Ok(()) @@ -108,22 +104,18 @@ impl Reducer { .map(|addr| addr.to_string()) .or_panic()?; - for asset in tx_output.assets() { - match asset { - Asset::NativeAsset(policy_id, _, quantity) => { - if self.is_policy_id_accepted(&policy_id) { - let subject = asset.subject(); - let key = self.config_key(subject, epoch_no); - let delta = quantity as i64; + for policy in tx_output.non_ada_assets() { + for asset in policy.assets() { + if self.is_policy_id_accepted(policy.policy()) { + let subject = format!("{}.{}", policy.policy(), hex::encode(asset.name())); + let key = self.config_key(subject, epoch_no); + let delta = asset.output_coin().unwrap_or_default() as i64; - let crdt = - model::CRDTCommand::SortedSetAdd(key, address.to_string(), delta); + let crdt = model::CRDTCommand::SortedSetAdd(key, address.to_string(), delta); - output.send(gasket::messaging::Message::from(crdt))?; - } + output.send(gasket::messaging::Message::from(crdt))?; } - _ => {} - }; + } } Ok(()) diff --git a/src/reducers/balance_by_address.rs b/src/reducers/balance_by_address.rs index 745f2e61..0e209f87 100644 --- a/src/reducers/balance_by_address.rs +++ b/src/reducers/balance_by_address.rs @@ -1,5 +1,5 @@ -use pallas::ledger::traverse::MultiEraOutput; -use pallas::ledger::traverse::{MultiEraBlock, OutputRef}; +use pallas_traverse::MultiEraOutput; +use pallas_traverse::{MultiEraBlock, OutputRef}; use serde::Deserialize; use crate::{crosscut, model, prelude::*}; diff --git a/src/reducers/block_header_by_hash.rs b/src/reducers/block_header_by_hash.rs index c7c38ab4..8a35d6d2 100644 --- a/src/reducers/block_header_by_hash.rs +++ b/src/reducers/block_header_by_hash.rs @@ -1,4 +1,4 @@ -use pallas::ledger::traverse::MultiEraBlock; +use pallas_traverse::MultiEraBlock; use serde::Deserialize; use crate::prelude::*; @@ -23,20 +23,17 @@ impl Reducer { output: &mut super::OutputPort, ) -> Result<(), gasket::error::Error> { if filter_matches_block!(self, block, ctx) { - let value = block - .header() - .cbor() - .to_vec(); - + let value = block.header().cbor().to_vec(); + let crdt = model::CRDTCommand::any_write_wins( self.config.key_prefix.as_deref(), block.hash(), - value + value, ); - + output.send(gasket::messaging::Message::from(crdt))?; } - + Ok(()) } } diff --git a/src/reducers/last_block_parameters.rs b/src/reducers/last_block_parameters.rs index 3eb5e86d..a1a100e7 100644 --- a/src/reducers/last_block_parameters.rs +++ b/src/reducers/last_block_parameters.rs @@ -1,4 +1,4 @@ -use pallas::ledger::traverse::MultiEraBlock; +use pallas_traverse::MultiEraBlock; use serde::Deserialize; use crate::crosscut::epochs::block_epoch; @@ -16,7 +16,6 @@ pub struct Reducer { } impl Reducer { - pub fn current_epoch( &mut self, block: &MultiEraBlock, @@ -25,12 +24,15 @@ impl Reducer { ) -> Result<(), gasket::error::Error> { let epoch_no = block_epoch(&self.chain, block); - let crdt = model::CRDTCommand::AnyWriteWins(format!("{}.{}", key, "epoch_no"), Value::BigInt(epoch_no as i128)); + let crdt = model::CRDTCommand::AnyWriteWins( + format!("{}.{}", key, "epoch_no"), + Value::BigInt(epoch_no as i128), + ); output.send(gasket::messaging::Message::from(crdt))?; Result::Ok(()) - } + } pub fn current_height( &mut self, @@ -38,12 +40,15 @@ impl Reducer { key: &str, output: &mut super::OutputPort, ) -> Result<(), gasket::error::Error> { - let crdt = model::CRDTCommand::AnyWriteWins(format!("{}.{}", key, "height"), Value::BigInt(block.number() as i128)); + let crdt = model::CRDTCommand::AnyWriteWins( + format!("{}.{}", key, "height"), + Value::BigInt(block.number() as i128), + ); output.send(gasket::messaging::Message::from(crdt))?; Result::Ok(()) - } + } pub fn current_slot( &mut self, @@ -51,12 +56,15 @@ impl Reducer { key: &str, output: &mut super::OutputPort, ) -> Result<(), gasket::error::Error> { - let crdt = model::CRDTCommand::AnyWriteWins(format!("{}.{}", key, "slot_no"), Value::BigInt(block.slot() as i128)); + let crdt = model::CRDTCommand::AnyWriteWins( + format!("{}.{}", key, "slot_no"), + Value::BigInt(block.slot() as i128), + ); output.send(gasket::messaging::Message::from(crdt))?; Result::Ok(()) - } + } pub fn current_block_hash( &mut self, @@ -64,12 +72,15 @@ impl Reducer { key: &str, output: &mut super::OutputPort, ) -> Result<(), gasket::error::Error> { - let crdt = model::CRDTCommand::AnyWriteWins(format!("{}.{}", key, "block_hash"), Value::String(block.hash().to_string())); + let crdt = model::CRDTCommand::AnyWriteWins( + format!("{}.{}", key, "block_hash"), + Value::String(block.hash().to_string()), + ); output.send(gasket::messaging::Message::from(crdt))?; Result::Ok(()) - } + } pub fn current_block_era( &mut self, @@ -77,12 +88,15 @@ impl Reducer { key: &str, output: &mut super::OutputPort, ) -> Result<(), gasket::error::Error> { - let crdt = model::CRDTCommand::AnyWriteWins(format!("{}.{}", key, "block_era"), Value::String(block.era().to_string())); + let crdt = model::CRDTCommand::AnyWriteWins( + format!("{}.{}", key, "block_era"), + Value::String(block.era().to_string()), + ); output.send(gasket::messaging::Message::from(crdt))?; Result::Ok(()) - } + } pub fn current_block_last_tx_hash( &mut self, @@ -91,12 +105,18 @@ impl Reducer { output: &mut super::OutputPort, ) -> Result<(), gasket::error::Error> { if !block.is_empty() { - let crdt = model::CRDTCommand::AnyWriteWins(format!("{}.{}", key, "first_transaction_hash"), Value::String(block.txs().first().unwrap().hash().to_string())); - + let crdt = model::CRDTCommand::AnyWriteWins( + format!("{}.{}", key, "first_transaction_hash"), + Value::String(block.txs().first().unwrap().hash().to_string()), + ); + output.send(gasket::messaging::Message::from(crdt))?; - let crdt = model::CRDTCommand::AnyWriteWins(format!("{}.{}", key, "last_transaction_hash"), Value::String(block.txs().last().unwrap().hash().to_string())); - + let crdt = model::CRDTCommand::AnyWriteWins( + format!("{}.{}", key, "last_transaction_hash"), + Value::String(block.txs().last().unwrap().hash().to_string()), + ); + output.send(gasket::messaging::Message::from(crdt))?; } @@ -109,7 +129,10 @@ impl Reducer { key: &str, output: &mut super::OutputPort, ) -> Result<(), gasket::error::Error> { - let crdt = model::CRDTCommand::AnyWriteWins(format!("{}.{}", key, "transactions_count"), Value::BigInt(block.tx_count() as i128)); + let crdt = model::CRDTCommand::AnyWriteWins( + format!("{}.{}", key, "transactions_count"), + Value::BigInt(block.tx_count() as i128), + ); output.send(gasket::messaging::Message::from(crdt))?; @@ -121,7 +144,6 @@ impl Reducer { block: &'b MultiEraBlock<'b>, output: &mut super::OutputPort, ) -> Result<(), gasket::error::Error> { - let def_key_prefix = "last_block"; let key = match &self.config.key_prefix { @@ -142,9 +164,7 @@ impl Reducer { } impl Config { - pub fn plugin(self, - chain: &crosscut::ChainWellKnownInfo - ) -> super::Reducer { + pub fn plugin(self, chain: &crosscut::ChainWellKnownInfo) -> super::Reducer { let reducer = Reducer { config: self, chain: chain.clone(), diff --git a/src/reducers/mod.rs b/src/reducers/mod.rs index 042332ca..9420dc52 100644 --- a/src/reducers/mod.rs +++ b/src/reducers/mod.rs @@ -1,7 +1,7 @@ use std::time::Duration; use gasket::runtime::spawn_stage; -use pallas::ledger::traverse::MultiEraBlock; +use pallas_traverse::MultiEraBlock; use serde::Deserialize; use crate::{bootstrap, crosscut, model}; diff --git a/src/reducers/point_by_tx.rs b/src/reducers/point_by_tx.rs index 79a2b4e4..51c22895 100644 --- a/src/reducers/point_by_tx.rs +++ b/src/reducers/point_by_tx.rs @@ -1,5 +1,5 @@ -use pallas::crypto::hash::Hash; -use pallas::ledger::traverse::MultiEraBlock; +use pallas_crypto::hash::Hash; +use pallas_traverse::MultiEraBlock; use serde::Deserialize; use crate::model; diff --git a/src/reducers/pool_by_stake.rs b/src/reducers/pool_by_stake.rs index 925ee582..cbc5ca9a 100644 --- a/src/reducers/pool_by_stake.rs +++ b/src/reducers/pool_by_stake.rs @@ -1,6 +1,6 @@ -use pallas::ledger::primitives::alonzo; -use pallas::ledger::primitives::alonzo::{PoolKeyhash, StakeCredential}; -use pallas::ledger::traverse::MultiEraBlock; +use pallas_primitives::alonzo; +use pallas_primitives::alonzo::{PoolKeyhash, StakeCredential}; +use pallas_traverse::MultiEraBlock; use serde::Deserialize; use crate::model; diff --git a/src/reducers/supply_by_asset.rs b/src/reducers/supply_by_asset.rs index 95736129..3e7d9f56 100644 --- a/src/reducers/supply_by_asset.rs +++ b/src/reducers/supply_by_asset.rs @@ -1,9 +1,7 @@ use std::str::FromStr; -use gasket::error::AsWorkError; -use pallas::crypto::hash::Hash; -use pallas::ledger::traverse::Asset; -use pallas::ledger::traverse::MultiEraBlock; +use pallas_crypto::hash::Hash; +use pallas_traverse::MultiEraBlock; use serde::Deserialize; use crate::{crosscut, model}; @@ -31,7 +29,7 @@ impl Reducer { fn process_asset( &mut self, policy: &Hash<28>, - asset: &Vec, + asset: &[u8], qty: i64, output: &mut super::OutputPort, ) -> Result<(), gasket::error::Error> { @@ -58,11 +56,14 @@ impl Reducer { output: &mut super::OutputPort, ) -> Result<(), gasket::error::Error> { for tx in block.txs().into_iter() { - if let Some(mints) = tx.mint().as_alonzo() { - for (policy, assets) in mints.iter() { - for (name, amount) in assets.iter() { - self.process_asset(policy, name, *amount, output)?; - } + for mint in tx.mints() { + for asset in mint.assets() { + self.process_asset( + mint.policy(), + asset.name(), + asset.mint_coin().unwrap_or_default(), + output, + )?; } } } diff --git a/src/reducers/tx_by_hash.rs b/src/reducers/tx_by_hash.rs index 469505b9..e66c378f 100644 --- a/src/reducers/tx_by_hash.rs +++ b/src/reducers/tx_by_hash.rs @@ -1,4 +1,4 @@ -use pallas::ledger::traverse::{MultiEraBlock, MultiEraTx}; +use pallas_traverse::{MultiEraBlock, MultiEraTx}; use serde::Deserialize; use serde_json::json; diff --git a/src/reducers/tx_count_by_address.rs b/src/reducers/tx_count_by_address.rs index d8cbaef5..8b7bd932 100644 --- a/src/reducers/tx_count_by_address.rs +++ b/src/reducers/tx_count_by_address.rs @@ -1,5 +1,5 @@ -use pallas::ledger::traverse::MultiEraOutput; -use pallas::ledger::traverse::{MultiEraBlock, OutputRef}; +use pallas_traverse::MultiEraOutput; +use pallas_traverse::{MultiEraBlock, OutputRef}; use serde::Deserialize; use std::collections::HashSet; @@ -28,19 +28,19 @@ impl Reducer { let utxo = match utxo { Some(x) => x, - None => return Ok(()) + None => return Ok(()), }; let address = utxo.address().map(|addr| addr.to_string()).or_panic()?; - + if seen.insert(address.clone()) { let key = match &self.config.key_prefix { Some(prefix) => format!("{}.{}", prefix, address), None => format!("{}.{}", "txcount_by_address".to_string(), address), }; - + let crdt = model::CRDTCommand::PNCounter(key, 1); - + output.send(gasket::messaging::Message::from(crdt))?; } @@ -54,15 +54,15 @@ impl Reducer { output: &mut super::OutputPort, ) -> Result<(), gasket::error::Error> { let address = tx_output.address().map(|x| x.to_string()).or_panic()?; - + if seen.insert(address.clone()) { let key = match &self.config.key_prefix { Some(prefix) => format!("{}.{}", prefix, address), None => format!("{}.{}", "txcount_by_address".to_string(), address), }; - + let crdt = model::CRDTCommand::PNCounter(key, 1); - + output.send(gasket::messaging::Message::from(crdt))?; } @@ -78,7 +78,7 @@ impl Reducer { for tx in block.txs().into_iter() { if filter_matches!(self, block, &tx, ctx) { let mut seen = HashSet::new(); - + for input in tx.inputs().iter().map(|i| i.output_ref()) { self.process_inbound_txo(&ctx, &input, &mut seen, output)?; } diff --git a/src/reducers/tx_count_by_native_token_policy_id.rs b/src/reducers/tx_count_by_native_token_policy_id.rs index 5ec7c78c..ccf42a6d 100644 --- a/src/reducers/tx_count_by_native_token_policy_id.rs +++ b/src/reducers/tx_count_by_native_token_policy_id.rs @@ -1,6 +1,6 @@ use serde::Deserialize; -use pallas::ledger::traverse::{Feature, MultiEraBlock}; +use pallas_traverse::{Feature, MultiEraBlock}; use crate::crosscut::epochs::block_epoch; use crate::{crosscut, model}; @@ -26,14 +26,12 @@ impl Reducer { let def_key_prefix = "transaction_count_by_native_token_policy"; match &self.config.aggr_by { - Some(aggr_type) => { - match aggr_type { - AggrType::Epoch => { - return match &self.config.key_prefix { - Some(prefix) => format!("{}.{}.{}", prefix, policy_id, epoch_no), - None => format!("{}.{}", def_key_prefix.to_string(), policy_id), - }; - } + Some(aggr_type) => match aggr_type { + AggrType::Epoch => { + return match &self.config.key_prefix { + Some(prefix) => format!("{}.{}.{}", prefix, policy_id, epoch_no), + None => format!("{}.{}", def_key_prefix.to_string(), policy_id), + }; } }, None => { @@ -41,7 +39,7 @@ impl Reducer { Some(prefix) => format!("{}.{}", prefix, policy_id), None => format!("{}.{}", def_key_prefix.to_string(), policy_id), }; - }, + } }; } @@ -51,27 +49,22 @@ impl Reducer { output: &mut super::OutputPort, ) -> Result<(), gasket::error::Error> { if block.era().has_feature(Feature::MultiAssets) { - let epoch_no = block_epoch(&self.chain, block); for tx in block.txs() { if tx.is_valid() { - let mint = tx.mint(); + for policy in tx.mints() { + let policy_id = policy.policy().to_string(); - if let Some(mints) = mint.as_alonzo() { - for (policy, assets) in mints.iter() { - let policy_id = hex::encode(policy.as_slice()); + let number_of_minted_or_destroyed = policy.assets().len(); - let number_of_minted_or_destroyed = assets.len(); + let key = self.config_key(policy_id, epoch_no); - let key = self.config_key(policy_id, epoch_no); - - let crdt = model::CRDTCommand::PNCounter( - key, - number_of_minted_or_destroyed as i64, - ); - output.send(gasket::messaging::Message::from(crdt))?; - } + let crdt = model::CRDTCommand::PNCounter( + key, + number_of_minted_or_destroyed as i64, + ); + output.send(gasket::messaging::Message::from(crdt))?; } } } @@ -82,14 +75,12 @@ impl Reducer { } impl Config { - pub fn plugin(self, - chain: &crosscut::ChainWellKnownInfo - ) -> super::Reducer { - let reducer = Reducer { + pub fn plugin(self, chain: &crosscut::ChainWellKnownInfo) -> super::Reducer { + let reducer = Reducer { config: self, chain: chain.clone(), - }; + }; super::Reducer::TxCountByNativeTokenPolicyId(reducer) } -} \ No newline at end of file +} diff --git a/src/reducers/utxo_by_address.rs b/src/reducers/utxo_by_address.rs index 215e984a..0f5854d2 100644 --- a/src/reducers/utxo_by_address.rs +++ b/src/reducers/utxo_by_address.rs @@ -1,5 +1,5 @@ -use pallas::ledger::traverse::MultiEraOutput; -use pallas::ledger::traverse::{MultiEraBlock, MultiEraTx, OutputRef}; +use pallas_traverse::MultiEraOutput; +use pallas_traverse::{MultiEraBlock, MultiEraTx, OutputRef}; use serde::Deserialize; use crate::{crosscut, model, prelude::*}; @@ -26,7 +26,7 @@ impl Reducer { let utxo = match utxo { Some(x) => x, - None => return Ok(()) + None => return Ok(()), }; let address = utxo.address().map(|x| x.to_string()).or_panic()?; @@ -54,7 +54,10 @@ impl Reducer { output: &mut super::OutputPort, ) -> Result<(), gasket::error::Error> { let tx_hash = tx.hash(); - let address = tx_output.address().map(|addr| addr.to_string()).or_panic()?; + let address = tx_output + .address() + .map(|addr| addr.to_string()) + .or_panic()?; if let Some(addresses) = &self.config.filter { if let Err(_) = addresses.binary_search(&address) { diff --git a/src/reducers/utxo_by_stake.rs b/src/reducers/utxo_by_stake.rs index 882ef7b2..d7325e67 100644 --- a/src/reducers/utxo_by_stake.rs +++ b/src/reducers/utxo_by_stake.rs @@ -1,6 +1,6 @@ -use pallas::ledger::addresses::{self, Address, StakeAddress}; -use pallas::ledger::traverse::MultiEraOutput; -use pallas::ledger::traverse::{MultiEraBlock, MultiEraTx, OutputRef}; +use pallas_addresses::{self, Address, StakeAddress}; +use pallas_traverse::MultiEraOutput; +use pallas_traverse::{MultiEraBlock, MultiEraTx, OutputRef}; use serde::Deserialize; use crate::{crosscut, model, prelude::*}; @@ -129,7 +129,7 @@ impl Config { #[cfg(test)] mod test { use super::any_address_to_stake_bech32; - use pallas::ledger::addresses::Address; + use pallas_addresses::Address; #[test] fn stake_bech32() { diff --git a/src/reducers/utxos_by_asset.rs b/src/reducers/utxos_by_asset.rs index ba735bf8..3caae557 100644 --- a/src/reducers/utxos_by_asset.rs +++ b/src/reducers/utxos_by_asset.rs @@ -1,9 +1,8 @@ use std::str::FromStr; use gasket::error::AsWorkError; -use pallas::crypto::hash::Hash; -use pallas::ledger::traverse::Asset; -use pallas::ledger::traverse::{MultiEraBlock, MultiEraTx}; +use pallas_crypto::hash::Hash; +use pallas_traverse::MultiEraBlock; use serde::Deserialize; use crate::{crosscut, model}; @@ -33,7 +32,7 @@ impl Reducer { tx_hash: &Hash<32>, txo_idx: u64, policy: Hash<28>, - asset: Vec, + asset: &[u8], delta: i64, output: &mut super::OutputPort, ) -> Result<(), gasket::error::Error> { @@ -60,14 +59,14 @@ impl Reducer { ) -> Result<(), gasket::error::Error> { for tx in block.txs().into_iter() { for (tx_ref, tx_output) in ctx.find_consumed_txos(&tx, &self.policy).or_panic()? { - for asset in tx_output.assets() { - if let Asset::NativeAsset(policy, asset, delta) = asset { + for policy in tx_output.non_ada_assets() { + for asset in policy.assets() { self.process_asset( tx_ref.hash(), tx_ref.index(), - policy, - asset, - -1 * delta as i64, + *policy.policy(), + asset.name(), + -1 * asset.output_coin().unwrap_or_default() as i64, output, )?; } @@ -75,14 +74,14 @@ impl Reducer { } for (idx, txo) in tx.produces() { - for asset in txo.assets() { - if let Asset::NativeAsset(policy, asset, delta) = asset { + for policy in txo.non_ada_assets() { + for asset in policy.assets() { self.process_asset( &tx.hash(), idx as u64, - policy, - asset, - delta as i64, + *policy.policy(), + asset.name(), + asset.output_coin().unwrap_or_default() as i64, output, )?; } diff --git a/src/reducers/worker.rs b/src/reducers/worker.rs index e922a4e1..ef7d9d34 100644 --- a/src/reducers/worker.rs +++ b/src/reducers/worker.rs @@ -1,4 +1,4 @@ -use pallas::ledger::traverse::MultiEraBlock; +use pallas_traverse::MultiEraBlock; use crate::{crosscut, model, prelude::*}; diff --git a/src/sources/n2c/chainsync.rs b/src/sources/n2c/chainsync.rs index 74ed9912..fc374753 100644 --- a/src/sources/n2c/chainsync.rs +++ b/src/sources/n2c/chainsync.rs @@ -1,8 +1,8 @@ use gasket::error::AsWorkError; -use pallas::ledger::traverse::MultiEraBlock; -use pallas::network::miniprotocols::chainsync::BlockContent; -use pallas::network::miniprotocols::{chainsync, Point}; -use pallas::network::multiplexer::StdChannel; +use pallas_miniprotocols::chainsync::BlockContent; +use pallas_miniprotocols::{chainsync, Point}; +use pallas_multiplexer::StdChannel; +use pallas_traverse::MultiEraBlock; use std::collections::HashMap; use crate::prelude::*; @@ -173,7 +173,7 @@ impl gasket::runtime::Worker for Worker { let mut chainsync = chainsync::N2CClient::new(transport.channel5); let start = - utils::define_chainsync_start(&self.intersect, &mut self.cursor, &mut chainsync) + utils::define_chainsync_start_n2c(&self.intersect, &mut self.cursor, &mut chainsync) .or_retry()?; let start = start.ok_or(Error::IntersectNotFound).or_panic()?; diff --git a/src/sources/n2c/transport.rs b/src/sources/n2c/transport.rs index 387f64a2..24b6bbe9 100644 --- a/src/sources/n2c/transport.rs +++ b/src/sources/n2c/transport.rs @@ -1,4 +1,5 @@ -use pallas::network::{miniprotocols::handshake, multiplexer}; +use pallas_miniprotocols::handshake; +use pallas_multiplexer as multiplexer; pub struct Transport { pub channel5: multiplexer::StdChannel, diff --git a/src/sources/n2n/chainsync.rs b/src/sources/n2n/chainsync.rs index fa522553..5a93f1ea 100644 --- a/src/sources/n2n/chainsync.rs +++ b/src/sources/n2n/chainsync.rs @@ -1,9 +1,9 @@ -use pallas::ledger::traverse::MultiEraHeader; -use pallas::network::miniprotocols::chainsync::HeaderContent; -use pallas::network::miniprotocols::{blockfetch, chainsync, Point}; +use pallas_miniprotocols::chainsync::HeaderContent; +use pallas_miniprotocols::{blockfetch, chainsync, Point}; +use pallas_traverse::MultiEraHeader; use gasket::error::AsWorkError; -use pallas::network::multiplexer::StdChannel; +use pallas_multiplexer::StdChannel; use crate::sources::n2n::transport::Transport; use crate::{crosscut, model, sources::utils, storage, Error}; @@ -173,7 +173,7 @@ impl gasket::runtime::Worker for Worker { let mut chainsync = chainsync::N2NClient::new(transport.channel2); let start = - utils::define_chainsync_start(&self.intersect, &mut self.cursor, &mut chainsync) + utils::define_chainsync_start_n2n(&self.intersect, &mut self.cursor, &mut chainsync) .or_retry()?; let start = start.ok_or(Error::IntersectNotFound).or_panic()?; diff --git a/src/sources/n2n/mod.rs b/src/sources/n2n/mod.rs index db40cd1a..36bb8627 100644 --- a/src/sources/n2n/mod.rs +++ b/src/sources/n2n/mod.rs @@ -5,7 +5,7 @@ use std::time::Duration; use gasket::messaging::OutputPort; -use pallas::network::miniprotocols::Point; +use pallas_miniprotocols::Point; use serde::Deserialize; use crate::{bootstrap, crosscut, model, storage}; diff --git a/src/sources/n2n/transport.rs b/src/sources/n2n/transport.rs index 760e3fbb..4abd0435 100644 --- a/src/sources/n2n/transport.rs +++ b/src/sources/n2n/transport.rs @@ -1,4 +1,5 @@ -use pallas::network::{miniprotocols::handshake, multiplexer}; +use pallas_miniprotocols::handshake; +use pallas_multiplexer as multiplexer; pub struct Transport { pub channel2: multiplexer::StdChannel, diff --git a/src/sources/utils.rs b/src/sources/utils.rs index 3deceb60..38d59416 100644 --- a/src/sources/utils.rs +++ b/src/sources/utils.rs @@ -1,54 +1,58 @@ -use std::convert::TryInto; - -use pallas::{ - codec::Fragment, - network::{ - miniprotocols::{chainsync, Point}, - multiplexer::StdChannel, - }, +use pallas_miniprotocols::{ + chainsync::{self, N2CClient, N2NClient}, + Point, }; +use pallas_multiplexer::StdChannel; +use std::convert::TryInto; use crate::{crosscut, storage}; -pub fn define_chainsync_start( - intersect: &crosscut::IntersectConfig, - cursor: &mut storage::Cursor, - client: &mut chainsync::Client, -) -> Result, crate::Error> { - match cursor.last_point()? { - Some(x) => { - log::info!("found existing cursor in storage plugin: {:?}", x); - let point = x.try_into()?; - let (point, _) = client - .find_intersect(vec![point]) - .map_err(crate::Error::ouroboros)?; - return Ok(point); - } - None => log::info!("no cursor found in storage plugin"), - }; +macro_rules! define_chainsync_start { + ($fn:ident, $client:ident) => { + pub fn $fn( + intersect: &crosscut::IntersectConfig, + cursor: &mut storage::Cursor, + client: &mut $client, + ) -> Result, crate::Error> { + match cursor.last_point()? { + Some(x) => { + log::info!("found existing cursor in storage plugin: {:?}", x); + let point = x.try_into()?; + let (point, _) = client + .find_intersect(vec![point]) + .map_err(crate::Error::ouroboros)?; + return Ok(point); + } + None => log::info!("no cursor found in storage plugin"), + }; - match &intersect { - crosscut::IntersectConfig::Origin => { - let point = client.intersect_origin().map_err(crate::Error::ouroboros)?; - Ok(Some(point)) + match &intersect { + crosscut::IntersectConfig::Origin => { + let point = client.intersect_origin().map_err(crate::Error::ouroboros)?; + Ok(Some(point)) + } + crosscut::IntersectConfig::Tip => { + let point = client.intersect_tip().map_err(crate::Error::ouroboros)?; + Ok(Some(point)) + } + crosscut::IntersectConfig::Point(_, _) => { + let point = intersect.get_point().expect("point value"); + let (point, _) = client + .find_intersect(vec![point]) + .map_err(crate::Error::ouroboros)?; + Ok(point) + } + crosscut::IntersectConfig::Fallbacks(_) => { + let points = intersect.get_fallbacks().expect("fallback values"); + let (point, _) = client + .find_intersect(points) + .map_err(crate::Error::ouroboros)?; + Ok(point) + } + } } - crosscut::IntersectConfig::Tip => { - let point = client.intersect_tip().map_err(crate::Error::ouroboros)?; - Ok(Some(point)) - } - crosscut::IntersectConfig::Point(_, _) => { - let point = intersect.get_point().expect("point value"); - let (point, _) = client - .find_intersect(vec![point]) - .map_err(crate::Error::ouroboros)?; - Ok(point) - } - crosscut::IntersectConfig::Fallbacks(_) => { - let points = intersect.get_fallbacks().expect("fallback values"); - let (point, _) = client - .find_intersect(points) - .map_err(crate::Error::ouroboros)?; - Ok(point) - } - } + }; } + +define_chainsync_start!(define_chainsync_start_n2c, N2CClient); +define_chainsync_start!(define_chainsync_start_n2n, N2NClient);