Skip to content

Commit

Permalink
feat: add ns-fetcher crate.
Browse files Browse the repository at this point in the history
  • Loading branch information
zensh committed Dec 22, 2023
1 parent cda050f commit 2a480e8
Show file tree
Hide file tree
Showing 12 changed files with 310 additions and 12 deletions.
37 changes: 37 additions & 0 deletions crates/ns-fetcher/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
[package]
name = "ns-fetcher"
version = "0.1.0"
edition = "2021"
rust-version = "1.64"
description = "Fetch and validate inscriptions from ns-indexer service"
publish = true
repository = "https://github.com/ldclabs/ns-rs/tree/main/crates/ns-fetcher"
license = "CC0-1.0"

[lib]

[dependencies]
ns-protocol = { path = "../ns-protocol" }
anyhow = { workspace = true }
bytes = { workspace = true }
base64 = { workspace = true }
ciborium = { workspace = true }
ciborium-io = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
futures = "0.3"
reqwest = { version = "0.11", features = [
"rustls-tls",
"rustls-tls-webpki-roots",
"json",
"gzip",
"trust-dns",
], default-features = false }
hex = "0.4"
bloomfilter = "1"
async-stream = "0.3"
futures-core = "0.3"
futures-util = "0.3"

[dev-dependencies]
hex-literal = "0.4"
3 changes: 3 additions & 0 deletions crates/ns-fetcher/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# ns-fetcher

More information about the protocol can be found in the [protocol documentation](https://github.com/ldclabs/ns-protocol)
123 changes: 123 additions & 0 deletions crates/ns-fetcher/src/fetcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use async_stream::try_stream;
use bloomfilter::Bloom;
use futures_core::stream::Stream;

use ns_protocol::index::{Inscription, NameState, ServiceState};

use crate::indexer::Client;

// fetches all inscriptions and states from last accepted to bottom_height
pub fn fetch_desc(
cli: Client,
bottom_height: u64,
) -> impl Stream<Item = anyhow::Result<(Inscription, Option<(NameState, ServiceState)>)>> {
try_stream! {
let last_accepted: Inscription = cli.get_last_accepted_inscription().await?;
let name_state: NameState = cli.get_name_state(&last_accepted.name).await?;
let service_state: ServiceState = cli.get_service_state(&last_accepted.name, last_accepted.data.payload.code).await?;

let mut bloom = Bloom::new_for_fp_rate(last_accepted.height as usize, 0.0001);
let mut head_height = last_accepted.height;
let mut head_inscription = last_accepted.clone();

bloom.set(&head_inscription.name);
yield (last_accepted, Some((name_state, service_state)));

loop {
if head_height == 0 || head_height < bottom_height {
break;
}

head_height -= 1;
let inscription: Inscription = cli.get_inscription_by_height(head_height).await?;
if head_inscription.previous_hash != inscription.hash()? {
Err(anyhow::anyhow!("inscription({}): previous hash mismatch", inscription.height))?;
}

head_inscription = inscription.clone();
if bloom.check(&inscription.name) {
// latest name & service state returned in previous iteration
yield (inscription, None);
continue;
}

let name_state: NameState = cli.get_name_state(&inscription.name).await?;
let service_state: ServiceState = cli.get_service_state(&inscription.name, inscription.data.payload.code).await?;

bloom.set(&head_inscription.name);
// false positives are possible from bloom, but it's ok
if name_state.sequence > inscription.sequence {
yield (inscription, None);
continue;
}

if name_state.sequence != inscription.sequence {
Err(anyhow::anyhow!("inscription({}): name_state sequence mismatch, expected {}, got {}", inscription.height, inscription.sequence, name_state.sequence))?;
}
if inscription.name_hash != name_state.hash()? {
Err(anyhow::anyhow!("inscription({}): name_hash mismatch", inscription.height))?;
}

if service_state.sequence != inscription.sequence {
Err(anyhow::anyhow!("inscription({}): service_state sequence mismatch, expected {}, got {}", inscription.height, inscription.sequence, service_state.sequence))?;
}
if inscription.service_hash != service_state.hash()? {
Err(anyhow::anyhow!("inscription({}): service_hash mismatch", inscription.height))?;
}

yield (inscription, Some((name_state, service_state)));
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures_util::{pin_mut, stream::StreamExt};

use crate::indexer::ClientOptions;

#[tokio::test(flavor = "current_thread")]
#[ignore]
async fn fetcher_works() {
let endpoint = std::env::var("INDEXER_ENDPOINT").unwrap_or_default();
// let endpoint = "http://127.0.0.1::8080".to_string();
if endpoint.is_empty() {
return;
}

let cli = Client::new(&ClientOptions { endpoint }).await.unwrap();

let s = fetch_desc(cli, 0);
pin_mut!(s); // needed for iteration

// first item is always the last accepted inscription
let (last_accepted, state) = s.next().await.unwrap().unwrap();
assert!(last_accepted.height > 0);
assert!(state.is_some());
let (name_state, service_state) = state.unwrap();
assert_eq!(last_accepted.name, name_state.name);
assert_eq!(last_accepted.sequence, name_state.sequence);
assert_eq!(last_accepted.name, service_state.name);
assert_eq!(last_accepted.sequence, service_state.sequence);
assert_eq!(last_accepted.name_hash, name_state.hash().unwrap());
assert_eq!(last_accepted.service_hash, service_state.hash().unwrap());

let mut state_exists = false;
while let Some(res) = s.next().await {
let (ins, state) = res.unwrap();
println!("got {}, {}, {}", ins.height, ins.name, ins.sequence);
if let Some((name_state, service_state)) = state {
assert_eq!(ins.name, name_state.name);
assert_eq!(ins.sequence, name_state.sequence);
assert_eq!(ins.name, service_state.name);
assert_eq!(ins.sequence, service_state.sequence);
assert_eq!(ins.name_hash, name_state.hash().unwrap());
assert_eq!(ins.service_hash, service_state.hash().unwrap());
state_exists = true;
}
}

assert!(state_exists);
}
}
129 changes: 129 additions & 0 deletions crates/ns-fetcher/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use ciborium::from_reader;
use reqwest::{header, ClientBuilder, Url};
use serde::{de::DeserializeOwned, Deserialize};
use tokio::time::{sleep, Duration};

use ns_protocol::{
index::{Inscription, NameState, ServiceState},
ns::Value,
};

static APP_USER_AGENT: &str = concat!(
"Mozilla/5.0 NS-Fetcher ",
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
);

pub struct Client {
client: reqwest::Client,
url: Url,
}

pub struct ClientOptions {
pub endpoint: String,
}

#[derive(Debug, Deserialize)]
struct Response<T> {
result: Option<T>,
error: Option<Value>,
}

impl Client {
pub async fn new(opts: &ClientOptions) -> anyhow::Result<Self> {
let mut common_headers = header::HeaderMap::with_capacity(3);
common_headers.insert(header::ACCEPT, "application/cbor".parse()?);
common_headers.insert(header::CONTENT_TYPE, "application/cbor".parse()?);
common_headers.insert(header::ACCEPT_ENCODING, "gzip".parse()?);

let url = reqwest::Url::parse(&opts.endpoint)?;
let client = ClientBuilder::new()
.use_rustls_tls()
.no_proxy()
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(30))
.user_agent(APP_USER_AGENT)
.default_headers(common_headers)
.gzip(true)
.build()?;

let rpc = Self { client, url };
rpc.ping().await?;
Ok(rpc)
}

pub async fn ping(&self) -> anyhow::Result<Value> {
self.call("/healthz", &[]).await
}

pub async fn get_last_accepted_inscription(&self) -> anyhow::Result<Inscription> {
self.call("/v1/inscription/get_last_accepted", &[]).await
}

pub async fn get_inscription_by_height(&self, height: u64) -> anyhow::Result<Inscription> {
self.call(
"/v1/inscription/get_by_height",
&[("height", height.to_string().as_str())],
)
.await
}

pub async fn get_name_state(&self, name: &str) -> anyhow::Result<NameState> {
self.call("/v1/name", &[("name", name)]).await
}

pub async fn get_service_state(&self, name: &str, code: u64) -> anyhow::Result<ServiceState> {
self.call(
"/v1/service",
&[("name", name), ("code", code.to_string().as_str())],
)
.await
}

pub async fn call<T: DeserializeOwned>(
&self,
path: &str,
query_pairs: &[(&str, &str)],
) -> anyhow::Result<T> {
let mut url = self.url.join(path)?;
if !query_pairs.is_empty() {
url.query_pairs_mut().extend_pairs(query_pairs);
}

// retry if server error
let mut retry_secs = 0;
let res = loop {
match self.client.get(url.clone()).send().await {
Ok(res) => break res,
Err(err) => {
retry_secs += 1;
if retry_secs <= 5 {
sleep(Duration::from_secs(retry_secs)).await;
continue;
} else {
anyhow::bail!("Client: {}", err.to_string());
}
}
}
};

let data = res.bytes().await?;
let output: Response<T> = from_reader(&data[..]).map_err(|err| {
anyhow::anyhow!(
"Client: failed to parse response, {}, data: {}",
err.to_string(),
String::from_utf8_lossy(&data)
)
})?;

if let Some(error) = output.error {
anyhow::bail!("Client: {:?}", error);
}

match output.result {
Some(result) => Ok(result),
None => Err(anyhow::anyhow!("Client: no result")),
}
}
}
2 changes: 2 additions & 0 deletions crates/ns-fetcher/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod fetcher;
pub mod indexer;
4 changes: 2 additions & 2 deletions crates/ns-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ version = "0.1.0"
edition = "2021"
rust-version = "1.64"
description = "Name & Service Protocol indexer service in Rust"
publish = false
repository = "https://github.com/ldclabs/ns-rs/crates/ns-indexer"
publish = true
repository = "https://github.com/ldclabs/ns-rs/tree/main/crates/ns-indexer"
license = "CC0-1.0"

[lib]
Expand Down
3 changes: 3 additions & 0 deletions crates/ns-indexer/cql/keyspace.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE KEYSPACE IF NOT EXISTS ns_indexer
WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': 1 };
USE ns_indexer;
3 changes: 3 additions & 0 deletions crates/ns-indexer/cql/keyspace_prod.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE KEYSPACE IF NOT EXISTS ns_indexer
WITH replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': '3' };
USE ns_indexer;
4 changes: 0 additions & 4 deletions crates/ns-indexer/cql/schema.cql
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
CREATE KEYSPACE IF NOT EXISTS ns_indexer
WITH replication = { 'class': 'NetworkTopologyStrategy', 'replication_factor': '3' };
USE ns_indexer;

CREATE TABLE IF NOT EXISTS name_state (
name TEXT, -- unique name
sequence BIGINT, -- name's latest sequence
Expand Down
6 changes: 4 additions & 2 deletions crates/ns-indexer/src/db/scylladb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct ScyllaDB {
impl ScyllaDB {
pub async fn new(cfg: &ScyllaDBOptions) -> anyhow::Result<Self> {
let handle = ExecutionProfile::builder()
.consistency(Consistency::One)
.consistency(Consistency::Quorum)
.serial_consistency(Some(SerialConsistency::Serial))
.request_timeout(Some(Duration::from_secs(5)))
.build()
Expand Down Expand Up @@ -189,8 +189,10 @@ mod tests {
dotenvy::from_filename("sample.env").expect(".env file not found");
let db = get_db().await;

let schema = std::include_str!("../../cql/keyspace.cql");
exec_cqls(db, schema).await.unwrap();

let schema = std::include_str!("../../cql/schema.cql");
println!("schema: {}", schema);
exec_cqls(db, schema).await.unwrap();
}
}
6 changes: 3 additions & 3 deletions crates/ns-inscriber/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[package]
name = "ns-inscriber"
version = "0.2.0"
version = "0.1.0"
edition = "2021"
rust-version = "1.64"
description = "Name & Service Protocol inscriber service in Rust"
publish = false
repository = "https://github.com/ldclabs/ns-rs/crates/ns-inscriber"
publish = true
repository = "https://github.com/ldclabs/ns-rs/tree/main/crates/ns-inscriber"
license = "CC0-1.0"

[lib]
Expand Down
2 changes: 1 addition & 1 deletion crates/ns-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"
rust-version = "1.64"
description = "Name & Service Protocol in Rust"
publish = true
repository = "https://github.com/ldclabs/ns-rs/crates/ns-protocol"
repository = "https://github.com/ldclabs/ns-rs/tree/main/crates/ns-protocol"
license = "CC0-1.0"

[lib]
Expand Down

0 comments on commit 2a480e8

Please sign in to comment.