Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: configured multiplexers #191

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ members = [".", "app", "entity", "migration", "mailer"]

[dependencies]
app = {path = "app"}
axum = "0.6.20"
entity = {path = "entity"}
futures = "0.3.28"
migration = {path = "migration"}# depends on your needs
prost = {workspace = true}
sea-orm = {version = "0.12.2", features = ["macros", "debug-print", "with-uuid", "sqlx-postgres", "runtime-tokio-rustls"]}
tokio = {workspace = true, features = ["full"]}
tonic = {workspace = true}
tonic.workspace = true

[build-dependencies]
tonic-build = "0.9.2"
Expand All @@ -39,7 +41,12 @@ exclude = ["docs"]
license-file = "./LICENSE"
name = "uranium"
version = "0.1.3"


[workspace.dependencies]
prost = "0.11.9"
tokio = {version = "1.20.1", features = ["full"]}
tonic = "0.9.2"
tonic-reflection = "0.10.0"
tower = { version = "0.4.13", features = ["full"] }
hyper = "0.14.27"
7 changes: 5 additions & 2 deletions app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@ error = "0.1.9"
fake = "2.6.1"
futures = "0.3.28"
http-body = "0.4.5"
hyper = {workspace = true}
jsonwebtoken = "8.3.0"
log = "0.4.19"
migration = {path = "../migration"}
prost = {workspace = true}
sea-orm = {version = "0.12.2", features = ["macros", "debug-print", "with-uuid", "sqlx-postgres", "runtime-tokio-rustls"]}
serde = "1.0.181"
serde_json = "1.0.104"
simple_logger = "4.2.0"
this = "0.3.0"
thiserror = "1.0.44"
tokio = {version = "1.29.1", features = ["full"]}
tonic = {workspace = true}
tonic-reflection = {workspace = true}
tower = {workspace = true}
tower-http = {version = "0.4.3", features = ["cors", "trace"]}
tracing = "0.1.37"
tracing-subscriber = {version = "0.3.17", features = ["env-filter"]}
uuid = "1.4.1"
validator = {version = "0.16.1", features = ["derive"]}
prost = {workspace = true}
tonic = {workspace = true}
1 change: 1 addition & 0 deletions app/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod app_state;
pub mod database;
pub mod multiplexer;
134 changes: 134 additions & 0 deletions app/src/config/multiplexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/// the multiplexers is adapated from axum example
/// <https://github.com/tokio-rs/axum/blob/axum-v0.6.20/examples/rest-grpc-multiplex/src/main.rs>
///
/// The multiplexer shall allow request handlers to be consumed by http server and clients
/// FOr example
///
/// REST -> curl http://0.0.0.0:4564
/// gPRC -. grpcurl -plaintext -import-path ./proto -proto uranium.proto '[::1]:50051' uranium.Uranium/HealthCheck
///
/// would both return "Service health!"
///
///
use axum::{body::BoxBody, http::header::CONTENT_TYPE, response::IntoResponse};
use futures::{future::BoxFuture, ready};
use hyper::{Body, Request, Response};
use std::{
convert::Infallible,
task::{Context, Poll},
};
use tower::Service;

#[derive(Debug, Default)]
pub struct Multiplexers<A, B> {
rest: A,
grpc: B,
rest_ready: bool,
grpc_ready: bool,
}

impl<A, B> Multiplexers<A, B> {
pub fn new(rest: A, grpc: B) -> Self {
Self {
rest,
grpc,
rest_ready: false,
grpc_ready: false,
}
}

pub fn rest_ready(&mut self) {
self.rest_ready = true;
}

pub fn grpc_ready(&mut self) {
self.grpc_ready = true;
}
}

/// implement Clone trait for Multiplexers
impl<A: Clone + std::default::Default, B: Clone + std::default::Default> Clone
for Multiplexers<A, B>
{
fn clone(&self) -> Self {
Self {
rest: self.rest.clone(),
grpc: self.grpc.clone(),
/* result the grpc_ready and rest_ready to false // because the cloned services probably wont be ready */
..Default::default() // rest_ready: false,
// grpc_ready: false,
}
}
}

impl<A, B> Service<Request<Body>> for Multiplexers<A, B>
where
A: Service<Request<Body>, Error = Infallible>,
A::Response: IntoResponse,
A::Future: Send + 'static,
B: Service<Request<Body>>,
B::Response: IntoResponse,
B::Future: Send + 'static,
{
type Response = Response<BoxBody>;
type Error = B::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// drive readiness for each inner service and record which is ready
loop {
match (self.rest_ready, self.grpc_ready) {
(true, true) => {
return Ok(()).into();
}
(false, _) => {
ready!(self.rest.poll_ready(cx)).map_err(|err| match err {})?;
self.rest_ready = true;
}
(_, false) => {
ready!(self.grpc.poll_ready(cx))?;
self.grpc_ready = true;
}
}
}
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
// require users to call `poll_ready` first, if they don't we're allowed to panic
// as per the `tower::Service` contract
assert!(
self.grpc_ready,
"grpc service not ready. Did you forget to call `poll_ready`?"
);
assert!(
self.rest_ready,
"rest service not ready. Did you forget to call `poll_ready`?"
);

// if we get a grpc request call the grpc service, otherwise call the rest service
// when calling a service it becomes not-ready so we have drive readiness again
if is_grpc_request(&req) {
self.grpc_ready = false;
let future = self.grpc.call(req);
Box::pin(async move {
let res = future.await?;
Ok(res.into_response())
})
} else {
self.rest_ready = false;
let future = self.rest.call(req);
Box::pin(async move {
let res = future.await.map_err(|err| match err {})?;
Ok(res.into_response())
})
}
}
}

fn is_grpc_request<B>(req: &Request<B>) -> bool {
req.headers()
.get(CONTENT_TYPE)
.map(|content_type| content_type.as_bytes())
.filter(|content_type| content_type.starts_with(b"application/grpc"))
.is_some()
}
52 changes: 37 additions & 15 deletions app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ use axum::{
RequestPartsExt, Router,
};

use crate::common::uranium::uranium_server::{Uranium, UraniumServer};
use crate::common::uranium::{HealthCheckRequest, HealthCheckResponse};
use tonic::{transport::Server, Request, Response as GrpcResponse, Status};
use crate::{
common::uranium::uranium_server::{Uranium, UraniumServer},
config::multiplexer::Multiplexers,
};
use crate::{
common::uranium::{HealthCheckRequest, HealthCheckResponse},
config::app_state::AppState,
};
use tonic::{transport::Server, Response as GrpcResponse};

// use migration::{sea_orm::DatabaseConnection, Migrator, MigratorTrait};
use sea_orm::{ConnectOptions, Database};
Expand All @@ -21,21 +27,23 @@ use tower_http::{

use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

use crate::config::app_state::AppState;

mod common;
mod config;
mod extractors;
mod handlers;
mod router;
mod utils;

mod proto {
pub(crate) const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!("./common/uranium.rs");
}

/// the grpc server
#[derive(Debug, Default)]
pub struct UraniumCore;
pub struct GrpcService;

#[tonic::async_trait]
impl Uranium for UraniumCore {
impl Uranium for GrpcService {
#[doc = " Returns the current health of the Uranium service."]
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
Expand All @@ -54,7 +62,7 @@ impl Uranium for UraniumCore {
/// the grpc server
pub async fn grpc_server() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse()?;
let server = UraniumCore::default();
let server = GrpcService::default();

Server::builder()
.add_service(UraniumServer::new(server))
Expand All @@ -74,12 +82,13 @@ pub async fn run() {
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
// axum logs rejections from built-in extractors with the `axum::rejection`
// target, at `TRACE` level. `axum::rejection=trace` enables showing those events
"example_tracing_aka_logging=debug,tower_http=debug,axum::rejection=trace".into()
"uranium_trace=debug,tower_http=debug,axum::rejection=trace".into()
}),
)
.with(tracing_subscriber::fmt::layer())
.init();

// configure the service
let database_connection_string =
env::var("DATABASE_URL").expect("database URL is not provided in env variable");

Expand Down Expand Up @@ -109,7 +118,7 @@ pub async fn run() {
database: connection,
};
// build our application with some routes
let app = Router::new()
let rest = Router::new()
.route("/", get(health_check))
.nest("/:version/", router::routes(&state))
.layer(trace)
Expand All @@ -118,10 +127,22 @@ pub async fn run() {

// run the migration
// Migrator::up(&connection, None).await.unwrap();
/* let port: u32 = std::env::var("PORT")
.unwrap_or(53467.to_string())
.parse::<u32>()
.ok(); */

// configure the grpc service

// build the grpc service
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
.build()
.unwrap();

let grpc = tonic::transport::Server::builder()
.add_service(reflection_service)
.add_service(UraniumServer::new(GrpcService::default()))
.into_service();

// combine them into one service
let service = Multiplexers::new(rest, grpc);

let port = std::env::var("PORT")
.ok()
Expand All @@ -131,7 +152,8 @@ pub async fn run() {
let addr = SocketAddr::from(([0, 0, 0, 0], port));
println!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.serve(tower::make::Shared::new(service))
// .serve(app.into_make_service())
.await
.unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.protoc_arg("--experimental_allow_proto3_optional") // for older systems
// .build_client(true) // don't compile the client code
.build_server(true)
.file_descriptor_set_path(out_dir.join("store_descriptor.bin"))
.file_descriptor_set_path(out_dir.join("uranium_descriptor.bin"))
.out_dir("./app/src/common")
.compile(&[proto_file], &["proto"])?;

Expand Down
Loading
Loading