Skip to content

Commit

Permalink
feat: implement Indexer API
Browse files Browse the repository at this point in the history
  • Loading branch information
zensh committed Dec 18, 2023
1 parent 60166f8 commit 044a19d
Show file tree
Hide file tree
Showing 19 changed files with 798 additions and 114 deletions.
2 changes: 2 additions & 0 deletions crates/internal/axum-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ serde = { workspace = true }
serde_json = { workspace = true }
structured-logger = { workspace = true }
tokio = { workspace = true }
scylla = "0.10"
zstd = "0.12"
validator = { version = "0.16", features = ["derive"] }
33 changes: 30 additions & 3 deletions crates/internal/axum-web/src/erring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use axum::{
response::{IntoResponse, Response},
Json,
};
use scylla::transport::query_result::SingleRowError;
use serde::{Deserialize, Serialize};
use std::{error::Error, fmt, fmt::Debug};

use crate::object::PackObject;
use validator::{ValidationError, ValidationErrors};

/// ErrorResponse is the response body for error.
#[derive(Deserialize, Serialize)]
Expand All @@ -20,7 +20,7 @@ pub struct SuccessResponse<T> {
#[serde(skip_serializing_if = "Option::is_none")]
pub total_size: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub next_page_token: Option<PackObject<Vec<u8>>>,
pub next_page_token: Option<String>,
pub result: T,
}

Expand Down Expand Up @@ -76,3 +76,30 @@ impl IntoResponse for HTTPError {
(status, body).into_response()
}
}

impl From<anyhow::Error> for HTTPError {
fn from(err: anyhow::Error) -> Self {
match err.downcast::<Self>() {
Ok(err) => err,
Err(sel) => match sel.downcast::<ValidationErrors>() {
Ok(sel) => HTTPError::new(400, format!("{:?}", sel)),
Err(sel) => match sel.downcast::<SingleRowError>() {
Ok(_) => HTTPError::new(404, "data not found".to_string()),
Err(sel) => HTTPError::new(500, format!("{:?}", sel)),
},
},
}
}
}

impl From<ValidationError> for HTTPError {
fn from(err: ValidationError) -> Self {
HTTPError::new(400, format!("{:?}", err))
}
}

impl From<ValidationErrors> for HTTPError {
fn from(err: ValidationErrors) -> Self {
HTTPError::new(400, format!("{:?}", err))
}
}
3 changes: 2 additions & 1 deletion crates/ns-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ reqwest = { version = "0.11", features = [
], default-features = false }
bitcoin = { version = "0.31", features = ["serde", "base64", "rand"] }
dotenvy = "0.15"
faster-hex = "0.8"
faster-hex = "0.9"
bitcoincore-rpc-json = "0.18.0"
scylla = "0.10"
tower = "0.4"
Expand All @@ -52,6 +52,7 @@ tower-http = { version = "0.5", features = [
"decompression-zstd",
"propagate-header",
] }
validator = { version = "0.16", features = ["derive"] }

[dev-dependencies]
hex-literal = "0.4"
196 changes: 196 additions & 0 deletions crates/ns-indexer/src/api/inscription.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
use axum::{
extract::{Query, State},
Extension,
};
use std::sync::Arc;
use validator::Validate;

use axum_web::{
context::ReqContext,
erring::{HTTPError, SuccessResponse},
object::PackObject,
};
use ns_protocol::index::{Inscription, InvalidInscription};

use crate::api::{IndexerAPI, QueryHeight, QueryName, QueryNamePagination};
use crate::db;

pub struct InscriptionAPI;

impl InscriptionAPI {
pub async fn get_last_accepted(
Extension(ctx): Extension<Arc<ReqContext>>,
to: PackObject<()>,
State(api): State<Arc<IndexerAPI>>,
) -> Result<PackObject<SuccessResponse<Option<Inscription>>>, HTTPError> {
ctx.set("action", "get_last_accepted_inscription".into())
.await;

let last_accepted_state = api.state.last_accepted.read().await;

Ok(to.with(SuccessResponse::new(last_accepted_state.clone())))
}

pub async fn get_best(
Extension(ctx): Extension<Arc<ReqContext>>,
to: PackObject<()>,
State(api): State<Arc<IndexerAPI>>,
) -> Result<PackObject<SuccessResponse<Option<Inscription>>>, HTTPError> {
ctx.set("action", "get_best_inscription".into()).await;

let best_inscriptions_state = api.state.best_inscriptions.read().await;
let mut inscription = best_inscriptions_state.last().cloned();
if inscription.is_none() {
let last_accepted_state = api.state.last_accepted.read().await;
inscription = last_accepted_state.clone();
}

Ok(to.with(SuccessResponse::new(inscription)))
}

pub async fn get(
State(app): State<Arc<IndexerAPI>>,
Extension(ctx): Extension<Arc<ReqContext>>,
to: PackObject<()>,
input: Query<QueryName>,
) -> Result<PackObject<SuccessResponse<Inscription>>, HTTPError> {
input.validate()?;
if input.sequence.is_none() {
return Err(HTTPError::new(400, "sequence is required".to_string()));
}

let name = input.name.clone();
let sequence = input.sequence.unwrap();
ctx.set_kvs(vec![
("action", "get_inscription".into()),
("name", name.clone().into()),
("sequence", sequence.into()),
])
.await;

let mut inscription = db::Inscription::with_pk(name, sequence);
inscription.get_one(&app.scylla, vec![]).await?;

Ok(to.with(SuccessResponse::new(inscription.to_index()?)))
}

pub async fn get_by_height(
State(app): State<Arc<IndexerAPI>>,
Extension(ctx): Extension<Arc<ReqContext>>,
to: PackObject<()>,
input: Query<QueryHeight>,
) -> Result<PackObject<SuccessResponse<Inscription>>, HTTPError> {
input.validate()?;

let height = input.height;
ctx.set_kvs(vec![
("action", "get_inscription_by_height".into()),
("height", height.into()),
])
.await;

let inscription = db::Inscription::get_by_height(&app.scylla, height, vec![]).await?;

Ok(to.with(SuccessResponse::new(inscription.to_index()?)))
}

pub async fn list_best(
Extension(ctx): Extension<Arc<ReqContext>>,
to: PackObject<()>,
State(api): State<Arc<IndexerAPI>>,
) -> Result<PackObject<SuccessResponse<Vec<Inscription>>>, HTTPError> {
ctx.set("action", "list_best_inscriptions".into()).await;
let best_inscriptions_state = api.state.best_inscriptions.read().await;
Ok(to.with(SuccessResponse::new(best_inscriptions_state.clone())))
}

pub async fn list_by_block_height(
State(app): State<Arc<IndexerAPI>>,
Extension(ctx): Extension<Arc<ReqContext>>,
to: PackObject<()>,
input: Query<QueryHeight>,
) -> Result<PackObject<SuccessResponse<Vec<Inscription>>>, HTTPError> {
input.validate()?;

let height = input.height;
ctx.set_kvs(vec![
("action", "list_inscriptions_block_height".into()),
("height", height.into()),
])
.await;

let res = db::Inscription::list_by_block_height(&app.scylla, height, vec![]).await?;
let mut inscriptions: Vec<Inscription> = Vec::with_capacity(res.len());
for i in res {
inscriptions.push(i.to_index()?);
}
Ok(to.with(SuccessResponse::new(inscriptions)))
}

pub async fn list_by_name(
State(app): State<Arc<IndexerAPI>>,
Extension(ctx): Extension<Arc<ReqContext>>,
to: PackObject<()>,
input: Query<QueryNamePagination>,
) -> Result<PackObject<SuccessResponse<Vec<Inscription>>>, HTTPError> {
input.validate()?;

let name = input.name.clone();
ctx.set_kvs(vec![
("action", "list_inscriptions_by_name".into()),
("name", name.clone().into()),
])
.await;

let res = db::Inscription::list_by_name(
&app.scylla,
&name,
vec![],
input.page_size.unwrap_or(10),
input.page_token,
)
.await?;
let mut inscriptions: Vec<Inscription> = Vec::with_capacity(res.len());
for i in res {
inscriptions.push(i.to_index()?);
}
let next_sequence = if let Some(last) = inscriptions.last() {
last.sequence
} else {
0
};
Ok(to.with(SuccessResponse {
total_size: None,
next_page_token: if next_sequence > 0 {
Some(next_sequence.to_string())
} else {
None
},
result: inscriptions,
}))
}

pub async fn list_invalid_by_name(
State(app): State<Arc<IndexerAPI>>,
Extension(ctx): Extension<Arc<ReqContext>>,
to: PackObject<()>,
input: Query<QueryName>,
) -> Result<PackObject<SuccessResponse<Vec<InvalidInscription>>>, HTTPError> {
input.validate()?;

let name = input.name.clone();
ctx.set_kvs(vec![
("action", "list_invalid_inscriptions_by_name".into()),
("name", name.clone().into()),
])
.await;

let res = db::InvalidInscription::list_by_name(&app.scylla, &name).await?;
let mut inscriptions: Vec<InvalidInscription> = Vec::with_capacity(res.len());
for i in res {
inscriptions.push(i.to_index()?);
}

Ok(to.with(SuccessResponse::new(inscriptions)))
}
}
108 changes: 108 additions & 0 deletions crates/ns-indexer/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use axum::extract::State;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use validator::{Validate, ValidationError};

use axum_web::erring::{HTTPError, SuccessResponse};
use axum_web::object::PackObject;
use ns_protocol::ns;

use crate::db::scylladb::ScyllaDB;
use crate::indexer::{Indexer, IndexerState};

mod inscription;
mod name;
mod service;

pub use inscription::InscriptionAPI;
pub use name::NameAPI;
pub use service::ServiceAPI;

#[derive(Serialize, Deserialize)]
pub struct AppVersion {
pub name: String,
pub version: String,
}

#[derive(Serialize, Deserialize)]
pub struct AppHealth {
pub block_height: u64,
pub inscription_height: u64,
}

pub struct IndexerAPI {
pub(crate) scylla: Arc<ScyllaDB>,
pub(crate) state: Arc<IndexerState>,
}

impl IndexerAPI {
pub fn new(indexer: Arc<Indexer>) -> Self {
Self {
scylla: indexer.scylla.clone(),
state: indexer.state.clone(),
}
}
}

pub async fn version(
to: PackObject<()>,
State(_): State<Arc<IndexerAPI>>,
) -> PackObject<AppVersion> {
to.with(AppVersion {
name: crate::APP_NAME.to_string(),
version: crate::APP_VERSION.to_string(),
})
}

pub async fn healthz(
to: PackObject<()>,
State(api): State<Arc<IndexerAPI>>,
) -> Result<PackObject<SuccessResponse<AppHealth>>, HTTPError> {
let last_accepted_state = api.state.last_accepted.read().await;
let (block_height, height) = match *last_accepted_state {
Some(ref last_accepted) => (last_accepted.block_height, last_accepted.height),
None => (0, 0),
};
Ok(to.with(SuccessResponse::new(AppHealth {
block_height,
inscription_height: height,
})))
}

#[derive(Debug, Deserialize, Validate)]
pub struct QueryName {
#[validate(custom = "validate_name")]
pub name: String,
#[validate(range(min = 0))]
pub sequence: Option<i64>,
#[validate(range(min = 0))]
pub code: Option<i64>,
}

#[derive(Debug, Deserialize, Validate)]
pub struct QueryHeight {
#[validate(range(min = 0))]
pub height: i64,
}

#[derive(Debug, Deserialize, Validate)]
pub struct QueryNamePagination {
#[validate(custom = "validate_name")]
pub name: String,
pub page_token: Option<i64>,
#[validate(range(min = 2, max = 1000))]
pub page_size: Option<u16>,
}

#[derive(Debug, Deserialize, Validate)]
pub struct QueryPubkey {
pub pubkey: String,
}

fn validate_name(name: &str) -> Result<(), ValidationError> {
if !ns::valid_name(name) {
return Err(ValidationError::new("invalid name"));
}

Ok(())
}
Loading

0 comments on commit 044a19d

Please sign in to comment.