Skip to content

Commit

Permalink
A week of hard coding
Browse files Browse the repository at this point in the history
  • Loading branch information
zensh committed Dec 17, 2023
1 parent dea015f commit 60166f8
Show file tree
Hide file tree
Showing 47 changed files with 5,226 additions and 132 deletions.
16 changes: 16 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
BITCOIN_RPC_URL=http://127.0.0.1:8332
BITCOIN_RPC_USER=test
BITCOIN_RPC_PASSWORD=123456

# ----- ns-indexer env -----
LOG_LEVEL=info
INDEXER_SERVER_WORKER_THREADS=0 # defaults to the number of cpus on the system
INDEXER_SERVER_ADDR=0.0.0.0:8080
INDEXER_START_HEIGHT=0
# more nodes split by comma
SCYLLA_NODES=127.0.0.1:9042
SCYLLA_USERNAME=""
SCYLLA_PASSWORD=""
SCYLLA_KEYSPACE=ns_indexer

# ----- ns-inscriber env -----
12 changes: 10 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@ on:
jobs:
test:
runs-on: ubuntu-latest
services:
scylladb1:
image: scylladb/scylla:5.2
ports:
- 9042:9042
options: --health-cmd "cqlsh --debug" --health-interval 5s --health-retries 10
volumes:
- ${{ github.workspace }}:/workspace
steps:
- uses: actions/checkout@v4
- name: Run clippy
run: cargo clippy --verbose --all-targets --all-features
- name: Run tests
run: cargo test --verbose --workspace -- --nocapture
- name: Run all tests
run: cargo test --verbose --workspace -- --nocapture --include-ignored
# - name: Run all tests
# run: cargo test --verbose --workspace -- --nocapture --include-ignored
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ Cargo.lock

# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
.env
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
25 changes: 4 additions & 21 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ license = "CC0-1.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[workspace]
members = ["crates/*"]
members = ["crates/internal/*", "crates/ns-*"]
# exclude = ["crates/internal"]

[workspace.dependencies]
anyhow = "1"
async-trait = "0.1"
axum = { version = "0.6", features = [
"headers",
axum = { version = "0.7", features = [
"http1",
"http2",
"json",
Expand All @@ -37,28 +37,11 @@ mime = "0.3"
serde = "1"
serde_json = { version = "1", features = ["preserve_order"] }
structured-logger = "1"
tokio = { version = "1", features = [
"fs",
"io-util",
"io-std",
"macros",
"net",
"parking_lot",
"process",
"rt",
"rt-multi-thread",
"signal",
"sync",
"time",
], default-features = true }
tokio = { version = "1", features = ["full"] }

[dev-dependencies]
faster-hex = "0.8"
hex-literal = "0.4"

[[bin]]
name = "ns-indexer"
path = "crates/ns-indexer/bin/main.rs"

[profile.release]
lto = true
28 changes: 28 additions & 0 deletions crates/internal/axum-web/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "axum-web"
version = "0.1.0"
edition = "2021"
rust-version = "1.64"
description = ""
publish = false
repository = "https://github.com/ldclabs/ns-rs/crates/internal/axum-web"
license = "CC0-1.0"

[lib]

[dependencies]
anyhow = { workspace = true }
axum = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
base64 = { workspace = true }
ciborium = { workspace = true }
ciborium-io = { workspace = true }
libflate = { workspace = true }
log = { workspace = true }
mime = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
structured-logger = { workspace = true }
tokio = { workspace = true }
zstd = "0.12"
85 changes: 85 additions & 0 deletions crates/internal/axum-web/src/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use axum::{
body::Body,
http::{header, HeaderMap, Request},
middleware::Next,
response::Response,
};
use serde_json::Value;
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use tokio::sync::RwLock;

pub use structured_logger::unix_ms;

pub struct ReqContext {
pub rid: String, // from x-request-id header
pub unix_ms: u64,
pub start: Instant,
pub kv: RwLock<BTreeMap<String, Value>>,
}

impl ReqContext {
pub fn new(rid: &str) -> Self {
Self {
rid: rid.to_string(),
unix_ms: unix_ms(),
start: Instant::now(),
kv: RwLock::new(BTreeMap::new()),
}
}

pub async fn set(&self, key: &str, value: Value) {
let mut kv = self.kv.write().await;
kv.insert(key.to_string(), value);
}

pub async fn set_kvs(&self, kvs: Vec<(&str, Value)>) {
let mut kv = self.kv.write().await;
for item in kvs {
kv.insert(item.0.to_string(), item.1);
}
}
}

pub async fn middleware(mut req: Request<Body>, next: Next) -> Response {
let method = req.method().to_string();
let uri = req.uri().to_string();
let rid = extract_header(req.headers(), "x-request-id", || "".to_string());

let ctx = Arc::new(ReqContext::new(&rid));
req.extensions_mut().insert(ctx.clone());

let res = next.run(req).await;
let kv = ctx.kv.read().await;
let status = res.status().as_u16();
let headers = res.headers();
let ct = headers
.get(header::CONTENT_TYPE)
.map_or("", |v| v.to_str().unwrap_or_default());
let ce = headers
.get(header::CONTENT_ENCODING)
.map_or("", |v| v.to_str().unwrap_or_default());
log::info!(target: "api",
method = method,
uri = uri,
rid = rid,
status = status,
start = ctx.unix_ms,
elapsed = ctx.start.elapsed().as_millis() as u64,
ctype = ct,
encoding = ce,
kv = log::as_serde!(*kv);
"",
);

res
}

pub fn extract_header(hm: &HeaderMap, key: &str, or: impl FnOnce() -> String) -> String {
match hm.get(key) {
None => or(),
Some(v) => match v.to_str() {
Ok(s) => s.to_string(),
Err(_) => or(),
},
}
}
124 changes: 124 additions & 0 deletions crates/internal/axum-web/src/encoding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use axum::http::header;
use libflate::gzip::{Decoder, Encoder};
use std::{io, string::ToString};

// recommended minimum size for compression.
pub const MIN_ENCODING_SIZE: u16 = 128;

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Encoding {
Zstd,
Gzip,
Identity,
}

impl ToString for Encoding {
fn to_string(&self) -> String {
match self {
Self::Zstd => "zstd".to_string(),
Self::Gzip => "gzip".to_string(),
Self::Identity => "identity".to_string(),
}
}
}

impl Encoding {
pub fn identity(&self) -> bool {
matches!(self, Self::Identity)
}

pub fn from_header_value(val: Option<&header::HeaderValue>) -> Self {
if let Some(val) = val {
if let Ok(val) = val.to_str() {
if val.contains("zstd") {
return Self::Zstd;
} else if val.contains("gzip") {
return Self::Gzip;
}
}
}
Self::Identity
}

pub fn header_value(&self) -> header::HeaderValue {
match self {
Self::Zstd => header::HeaderValue::from_static("zstd"),
Self::Gzip => header::HeaderValue::from_static("gzip"),
Self::Identity => header::HeaderValue::from_static("identity"),
}
}

pub fn encode_all<R: io::Read>(&self, r: R) -> Result<Vec<u8>, io::Error> {
match self {
Self::Zstd => {
let buf = zstd::stream::encode_all(r, 9)?;
Ok(buf)
}
Self::Gzip => {
let mut encoder = Encoder::new(Vec::new())?;
let mut r = r;
let _ = io::copy(&mut r, &mut encoder);
encoder.finish().into_result()
}
Self::Identity => Err(io::Error::new(
io::ErrorKind::Unsupported,
"identity encoding not supported",
)),
}
}

pub fn decode_all<R: io::Read>(&self, r: R) -> Result<Vec<u8>, io::Error> {
use io::Read;
match self {
Self::Zstd => {
let buf = zstd::stream::decode_all(r)?;
Ok(buf)
}
Self::Gzip => {
let mut decoder = Decoder::new(r)?;
let mut buf = Vec::new();
decoder.read_to_end(&mut buf)?;
Ok(buf)
}
Self::Identity => Err(io::Error::new(
io::ErrorKind::Unsupported,
"identity decoding not supported",
)),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn gzip_encode_decode() {
let enc = Encoding::from_header_value(Some(&Encoding::Gzip.header_value()));
assert_eq!(enc, Encoding::Gzip);

let data = r#"[{"id":"------","texts":[]},{"id":"Esp9G6","texts":["Stream:"]},{"id": "------","texts":[]},{"id":"ykuRdu","texts":["Internet Engineering Task Force (IETF)"]}]"#;

let encoded = enc.encode_all(data.as_bytes()).unwrap();
println!("{}, {}", data.len(), encoded.len());
assert!(encoded.len() < data.len());

let decoded = enc.decode_all(encoded.as_slice()).unwrap();
assert_eq!(data.as_bytes(), decoded.as_slice());
}

#[test]
fn zstd_encode_decode() {
let enc = Encoding::from_header_value(Some(&Encoding::Zstd.header_value()));
assert_eq!(enc, Encoding::Zstd);

let data = r#"[{"id":"------","texts":[]},{"id":"Esp9G6","texts":["Stream:"]},{"id": "------","texts":[]},{"id":"ykuRdu","texts":["Internet Engineering Task Force (IETF)"]}]"#;

let encoded = enc.encode_all(data.as_bytes()).unwrap();
println!("{}, {}", data.len(), encoded.len());
assert!(encoded.len() < data.len());

let decoded = enc.decode_all(encoded.as_slice()).unwrap();
assert_eq!(data.as_bytes(), decoded.as_slice());
}
}
Loading

0 comments on commit 60166f8

Please sign in to comment.