From 2a480e82c461223c7445aec2d68f669169932f02 Mon Sep 17 00:00:00 2001 From: 0xZensh Date: Fri, 22 Dec 2023 18:42:02 +0800 Subject: [PATCH] feat: add ns-fetcher crate. --- crates/ns-fetcher/Cargo.toml | 37 +++++++ crates/ns-fetcher/README.md | 3 + crates/ns-fetcher/src/fetcher.rs | 123 ++++++++++++++++++++++ crates/ns-fetcher/src/indexer.rs | 129 ++++++++++++++++++++++++ crates/ns-fetcher/src/lib.rs | 2 + crates/ns-indexer/Cargo.toml | 4 +- crates/ns-indexer/cql/keyspace.cql | 3 + crates/ns-indexer/cql/keyspace_prod.cql | 3 + crates/ns-indexer/cql/schema.cql | 4 - crates/ns-indexer/src/db/scylladb.rs | 6 +- crates/ns-inscriber/Cargo.toml | 6 +- crates/ns-protocol/Cargo.toml | 2 +- 12 files changed, 310 insertions(+), 12 deletions(-) create mode 100644 crates/ns-fetcher/Cargo.toml create mode 100644 crates/ns-fetcher/README.md create mode 100644 crates/ns-fetcher/src/fetcher.rs create mode 100644 crates/ns-fetcher/src/indexer.rs create mode 100644 crates/ns-fetcher/src/lib.rs create mode 100644 crates/ns-indexer/cql/keyspace.cql create mode 100644 crates/ns-indexer/cql/keyspace_prod.cql diff --git a/crates/ns-fetcher/Cargo.toml b/crates/ns-fetcher/Cargo.toml new file mode 100644 index 0000000..927472c --- /dev/null +++ b/crates/ns-fetcher/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "ns-fetcher" +version = "0.1.0" +edition = "2021" +rust-version = "1.64" +description = "Fetch and validate inscriptions from ns-indexer service" +publish = true +repository = "https://github.com/ldclabs/ns-rs/tree/main/crates/ns-fetcher" +license = "CC0-1.0" + +[lib] + +[dependencies] +ns-protocol = { path = "../ns-protocol" } +anyhow = { workspace = true } +bytes = { workspace = true } +base64 = { workspace = true } +ciborium = { workspace = true } +ciborium-io = { workspace = true } +serde = { workspace = true } +tokio = { workspace = true } +futures = "0.3" +reqwest = { version = "0.11", features = [ + "rustls-tls", + "rustls-tls-webpki-roots", + "json", + "gzip", + "trust-dns", +], default-features = false } +hex = "0.4" +bloomfilter = "1" +async-stream = "0.3" +futures-core = "0.3" +futures-util = "0.3" + +[dev-dependencies] +hex-literal = "0.4" diff --git a/crates/ns-fetcher/README.md b/crates/ns-fetcher/README.md new file mode 100644 index 0000000..fcf40d3 --- /dev/null +++ b/crates/ns-fetcher/README.md @@ -0,0 +1,3 @@ +# ns-fetcher + +More information about the protocol can be found in the [protocol documentation](https://github.com/ldclabs/ns-protocol) diff --git a/crates/ns-fetcher/src/fetcher.rs b/crates/ns-fetcher/src/fetcher.rs new file mode 100644 index 0000000..e2692f0 --- /dev/null +++ b/crates/ns-fetcher/src/fetcher.rs @@ -0,0 +1,123 @@ +use async_stream::try_stream; +use bloomfilter::Bloom; +use futures_core::stream::Stream; + +use ns_protocol::index::{Inscription, NameState, ServiceState}; + +use crate::indexer::Client; + +// fetches all inscriptions and states from last accepted to bottom_height +pub fn fetch_desc( + cli: Client, + bottom_height: u64, +) -> impl Stream)>> { + try_stream! { + let last_accepted: Inscription = cli.get_last_accepted_inscription().await?; + let name_state: NameState = cli.get_name_state(&last_accepted.name).await?; + let service_state: ServiceState = cli.get_service_state(&last_accepted.name, last_accepted.data.payload.code).await?; + + let mut bloom = Bloom::new_for_fp_rate(last_accepted.height as usize, 0.0001); + let mut head_height = last_accepted.height; + let mut head_inscription = last_accepted.clone(); + + bloom.set(&head_inscription.name); + yield (last_accepted, Some((name_state, service_state))); + + loop { + if head_height == 0 || head_height < bottom_height { + break; + } + + head_height -= 1; + let inscription: Inscription = cli.get_inscription_by_height(head_height).await?; + if head_inscription.previous_hash != inscription.hash()? { + Err(anyhow::anyhow!("inscription({}): previous hash mismatch", inscription.height))?; + } + + head_inscription = inscription.clone(); + if bloom.check(&inscription.name) { + // latest name & service state returned in previous iteration + yield (inscription, None); + continue; + } + + let name_state: NameState = cli.get_name_state(&inscription.name).await?; + let service_state: ServiceState = cli.get_service_state(&inscription.name, inscription.data.payload.code).await?; + + bloom.set(&head_inscription.name); + // false positives are possible from bloom, but it's ok + if name_state.sequence > inscription.sequence { + yield (inscription, None); + continue; + } + + if name_state.sequence != inscription.sequence { + Err(anyhow::anyhow!("inscription({}): name_state sequence mismatch, expected {}, got {}", inscription.height, inscription.sequence, name_state.sequence))?; + } + if inscription.name_hash != name_state.hash()? { + Err(anyhow::anyhow!("inscription({}): name_hash mismatch", inscription.height))?; + } + + if service_state.sequence != inscription.sequence { + Err(anyhow::anyhow!("inscription({}): service_state sequence mismatch, expected {}, got {}", inscription.height, inscription.sequence, service_state.sequence))?; + } + if inscription.service_hash != service_state.hash()? { + Err(anyhow::anyhow!("inscription({}): service_hash mismatch", inscription.height))?; + } + + yield (inscription, Some((name_state, service_state))); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_util::{pin_mut, stream::StreamExt}; + + use crate::indexer::ClientOptions; + + #[tokio::test(flavor = "current_thread")] + #[ignore] + async fn fetcher_works() { + let endpoint = std::env::var("INDEXER_ENDPOINT").unwrap_or_default(); + // let endpoint = "http://127.0.0.1::8080".to_string(); + if endpoint.is_empty() { + return; + } + + let cli = Client::new(&ClientOptions { endpoint }).await.unwrap(); + + let s = fetch_desc(cli, 0); + pin_mut!(s); // needed for iteration + + // first item is always the last accepted inscription + let (last_accepted, state) = s.next().await.unwrap().unwrap(); + assert!(last_accepted.height > 0); + assert!(state.is_some()); + let (name_state, service_state) = state.unwrap(); + assert_eq!(last_accepted.name, name_state.name); + assert_eq!(last_accepted.sequence, name_state.sequence); + assert_eq!(last_accepted.name, service_state.name); + assert_eq!(last_accepted.sequence, service_state.sequence); + assert_eq!(last_accepted.name_hash, name_state.hash().unwrap()); + assert_eq!(last_accepted.service_hash, service_state.hash().unwrap()); + + let mut state_exists = false; + while let Some(res) = s.next().await { + let (ins, state) = res.unwrap(); + println!("got {}, {}, {}", ins.height, ins.name, ins.sequence); + if let Some((name_state, service_state)) = state { + assert_eq!(ins.name, name_state.name); + assert_eq!(ins.sequence, name_state.sequence); + assert_eq!(ins.name, service_state.name); + assert_eq!(ins.sequence, service_state.sequence); + assert_eq!(ins.name_hash, name_state.hash().unwrap()); + assert_eq!(ins.service_hash, service_state.hash().unwrap()); + state_exists = true; + } + } + + assert!(state_exists); + } +} diff --git a/crates/ns-fetcher/src/indexer.rs b/crates/ns-fetcher/src/indexer.rs new file mode 100644 index 0000000..de8dad4 --- /dev/null +++ b/crates/ns-fetcher/src/indexer.rs @@ -0,0 +1,129 @@ +use ciborium::from_reader; +use reqwest::{header, ClientBuilder, Url}; +use serde::{de::DeserializeOwned, Deserialize}; +use tokio::time::{sleep, Duration}; + +use ns_protocol::{ + index::{Inscription, NameState, ServiceState}, + ns::Value, +}; + +static APP_USER_AGENT: &str = concat!( + "Mozilla/5.0 NS-Fetcher ", + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION"), +); + +pub struct Client { + client: reqwest::Client, + url: Url, +} + +pub struct ClientOptions { + pub endpoint: String, +} + +#[derive(Debug, Deserialize)] +struct Response { + result: Option, + error: Option, +} + +impl Client { + pub async fn new(opts: &ClientOptions) -> anyhow::Result { + let mut common_headers = header::HeaderMap::with_capacity(3); + common_headers.insert(header::ACCEPT, "application/cbor".parse()?); + common_headers.insert(header::CONTENT_TYPE, "application/cbor".parse()?); + common_headers.insert(header::ACCEPT_ENCODING, "gzip".parse()?); + + let url = reqwest::Url::parse(&opts.endpoint)?; + let client = ClientBuilder::new() + .use_rustls_tls() + .no_proxy() + .connect_timeout(Duration::from_secs(10)) + .timeout(Duration::from_secs(30)) + .user_agent(APP_USER_AGENT) + .default_headers(common_headers) + .gzip(true) + .build()?; + + let rpc = Self { client, url }; + rpc.ping().await?; + Ok(rpc) + } + + pub async fn ping(&self) -> anyhow::Result { + self.call("/healthz", &[]).await + } + + pub async fn get_last_accepted_inscription(&self) -> anyhow::Result { + self.call("/v1/inscription/get_last_accepted", &[]).await + } + + pub async fn get_inscription_by_height(&self, height: u64) -> anyhow::Result { + self.call( + "/v1/inscription/get_by_height", + &[("height", height.to_string().as_str())], + ) + .await + } + + pub async fn get_name_state(&self, name: &str) -> anyhow::Result { + self.call("/v1/name", &[("name", name)]).await + } + + pub async fn get_service_state(&self, name: &str, code: u64) -> anyhow::Result { + self.call( + "/v1/service", + &[("name", name), ("code", code.to_string().as_str())], + ) + .await + } + + pub async fn call( + &self, + path: &str, + query_pairs: &[(&str, &str)], + ) -> anyhow::Result { + let mut url = self.url.join(path)?; + if !query_pairs.is_empty() { + url.query_pairs_mut().extend_pairs(query_pairs); + } + + // retry if server error + let mut retry_secs = 0; + let res = loop { + match self.client.get(url.clone()).send().await { + Ok(res) => break res, + Err(err) => { + retry_secs += 1; + if retry_secs <= 5 { + sleep(Duration::from_secs(retry_secs)).await; + continue; + } else { + anyhow::bail!("Client: {}", err.to_string()); + } + } + } + }; + + let data = res.bytes().await?; + let output: Response = from_reader(&data[..]).map_err(|err| { + anyhow::anyhow!( + "Client: failed to parse response, {}, data: {}", + err.to_string(), + String::from_utf8_lossy(&data) + ) + })?; + + if let Some(error) = output.error { + anyhow::bail!("Client: {:?}", error); + } + + match output.result { + Some(result) => Ok(result), + None => Err(anyhow::anyhow!("Client: no result")), + } + } +} diff --git a/crates/ns-fetcher/src/lib.rs b/crates/ns-fetcher/src/lib.rs new file mode 100644 index 0000000..d00791e --- /dev/null +++ b/crates/ns-fetcher/src/lib.rs @@ -0,0 +1,2 @@ +pub mod fetcher; +pub mod indexer; diff --git a/crates/ns-indexer/Cargo.toml b/crates/ns-indexer/Cargo.toml index bdd7dc0..9e11f08 100644 --- a/crates/ns-indexer/Cargo.toml +++ b/crates/ns-indexer/Cargo.toml @@ -4,8 +4,8 @@ version = "0.1.0" edition = "2021" rust-version = "1.64" description = "Name & Service Protocol indexer service in Rust" -publish = false -repository = "https://github.com/ldclabs/ns-rs/crates/ns-indexer" +publish = true +repository = "https://github.com/ldclabs/ns-rs/tree/main/crates/ns-indexer" license = "CC0-1.0" [lib] diff --git a/crates/ns-indexer/cql/keyspace.cql b/crates/ns-indexer/cql/keyspace.cql new file mode 100644 index 0000000..f9b56a9 --- /dev/null +++ b/crates/ns-indexer/cql/keyspace.cql @@ -0,0 +1,3 @@ +CREATE KEYSPACE IF NOT EXISTS ns_indexer +WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': 1 }; +USE ns_indexer; diff --git a/crates/ns-indexer/cql/keyspace_prod.cql b/crates/ns-indexer/cql/keyspace_prod.cql new file mode 100644 index 0000000..74c0d5c --- /dev/null +++ b/crates/ns-indexer/cql/keyspace_prod.cql @@ -0,0 +1,3 @@ +CREATE KEYSPACE IF NOT EXISTS ns_indexer +WITH replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': '3' }; +USE ns_indexer; \ No newline at end of file diff --git a/crates/ns-indexer/cql/schema.cql b/crates/ns-indexer/cql/schema.cql index 89c1dd0..b5b84d6 100644 --- a/crates/ns-indexer/cql/schema.cql +++ b/crates/ns-indexer/cql/schema.cql @@ -1,7 +1,3 @@ -CREATE KEYSPACE IF NOT EXISTS ns_indexer -WITH replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': '3' }; -USE ns_indexer; - CREATE TABLE IF NOT EXISTS name_state ( name TEXT, -- unique name sequence BIGINT, -- name's latest sequence diff --git a/crates/ns-indexer/src/db/scylladb.rs b/crates/ns-indexer/src/db/scylladb.rs index 39d52a2..61cb20d 100644 --- a/crates/ns-indexer/src/db/scylladb.rs +++ b/crates/ns-indexer/src/db/scylladb.rs @@ -32,7 +32,7 @@ pub struct ScyllaDB { impl ScyllaDB { pub async fn new(cfg: &ScyllaDBOptions) -> anyhow::Result { let handle = ExecutionProfile::builder() - .consistency(Consistency::One) + .consistency(Consistency::Quorum) .serial_consistency(Some(SerialConsistency::Serial)) .request_timeout(Some(Duration::from_secs(5))) .build() @@ -189,8 +189,10 @@ mod tests { dotenvy::from_filename("sample.env").expect(".env file not found"); let db = get_db().await; + let schema = std::include_str!("../../cql/keyspace.cql"); + exec_cqls(db, schema).await.unwrap(); + let schema = std::include_str!("../../cql/schema.cql"); - println!("schema: {}", schema); exec_cqls(db, schema).await.unwrap(); } } diff --git a/crates/ns-inscriber/Cargo.toml b/crates/ns-inscriber/Cargo.toml index 7028ab9..1c24fc9 100644 --- a/crates/ns-inscriber/Cargo.toml +++ b/crates/ns-inscriber/Cargo.toml @@ -1,11 +1,11 @@ [package] name = "ns-inscriber" -version = "0.2.0" +version = "0.1.0" edition = "2021" rust-version = "1.64" description = "Name & Service Protocol inscriber service in Rust" -publish = false -repository = "https://github.com/ldclabs/ns-rs/crates/ns-inscriber" +publish = true +repository = "https://github.com/ldclabs/ns-rs/tree/main/crates/ns-inscriber" license = "CC0-1.0" [lib] diff --git a/crates/ns-protocol/Cargo.toml b/crates/ns-protocol/Cargo.toml index 2990343..de6fb0c 100644 --- a/crates/ns-protocol/Cargo.toml +++ b/crates/ns-protocol/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" rust-version = "1.64" description = "Name & Service Protocol in Rust" publish = true -repository = "https://github.com/ldclabs/ns-rs/crates/ns-protocol" +repository = "https://github.com/ldclabs/ns-rs/tree/main/crates/ns-protocol" license = "CC0-1.0" [lib]