Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtimes/core: stream service to service #1426

Merged
merged 12 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions runtimes/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ tokio-postgres = { version = "0.7.10", features = [
"with-chrono-0_4",
] }
tokio-util = "0.7.10"
tokio-tungstenite = "0.21.0"
futures-util = "0.3.30"
rand = "0.8.5"
env_logger = "0.10.1"
Expand Down
115 changes: 115 additions & 0 deletions runtimes/core/src/api/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::SystemTime;

use anyhow::Context;
use serde::de::DeserializeOwned;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use url::Url;

use encore::runtime::v1 as pb;
Expand All @@ -20,6 +21,8 @@ use crate::trace::Tracer;
use crate::{api, encore, model, secrets, EncoreName, Hosted};

use super::reqauth::meta::MetaMapMut;
use super::websocket_client::WebSocketClient;
use super::HandshakeSchema;

/// Tracks where services are located and how to call them.
pub struct ServiceRegistry {
Expand Down Expand Up @@ -136,6 +139,30 @@ impl ServiceRegistry {
result
}

pub async fn connect_stream(
&self,
endpoint_name: &EndpointName,
data: JSONPayload,
source: Option<&model::Request>,
) -> APIResult<WebSocketClient> {
let call = model::APICall {
source,
target: endpoint_name,
};
let start_event_id = self.tracer.rpc_call_start(&call);

let result = self
.do_connect_stream(endpoint_name, data, source, start_event_id)
.await;

if let Some(start_event_id) = start_event_id {
self.tracer
.rpc_call_end(&call, start_event_id, result.as_ref().err());
}

result
}

async fn do_api_call(
&self,
endpoint_name: &EndpointName,
Expand Down Expand Up @@ -226,6 +253,94 @@ impl ServiceRegistry {
}
}

async fn do_connect_stream(
&self,
endpoint_name: &EndpointName,
mut data: JSONPayload,
source: Option<&model::Request>,
start_event_id: Option<TraceEventId>,
) -> APIResult<WebSocketClient> {
let base_url = self
.base_urls
.get(endpoint_name.service())
.ok_or_else(|| api::Error {
code: api::ErrCode::NotFound,
message: "service not found".into(),
internal_message: Some(format!(
"no service discovery configuration found for service {}",
endpoint_name.service()
)),
stack: None,
details: None,
})?;

let Some(endpoint) = self.endpoints.get(endpoint_name) else {
return Err(api::Error {
code: api::ErrCode::NotFound,
message: "endpoint not found".into(),
internal_message: Some(format!(
"endpoint {} not found in application metadata",
endpoint_name
)),
stack: None,
details: None,
});
};

let Some(handshake) = &endpoint.handshake else {
return Err(api::Error {
code: api::ErrCode::NotFound,
message: "no handshake schema found".into(),
internal_message: Some(format!(
"endpoint {} doesn't have a handshake schema specified",
endpoint_name
)),
stack: None,
details: None,
});
};

let req_path = handshake.path().to_request_path(&mut data)?;

let base_url = base_url
.replace("http://", "ws://")
.replace("https://", "wss://");

let req_url = Url::parse(&format!("{}{}", base_url, req_path)).map_err(|_| api::Error {
code: api::ErrCode::Internal,
message: "failed to build endpoint url".into(),
internal_message: Some(format!(
"failed to build endpoint url for endpoint {}",
endpoint_name
)),
stack: None,
details: None,
})?;

let mut req = req_url
.into_client_request()
.map_err(|e| api::Error::invalid_argument("unable to create request", e))?;

if let HandshakeSchema::Request(req_schema) = handshake.as_ref() {
if let Some(qry) = &req_schema.query {
qry.to_outgoing_request(&mut data, &mut req)?;
}

if let Some(hdr) = &req_schema.header {
hdr.to_outgoing_request(&mut data, &mut req)?;
}
}

self.propagate_call_meta(req.headers_mut(), endpoint, source, start_event_id)
.map_err(api::Error::internal)?;

let incoming = endpoint.response.clone();
let outgoing = endpoint.request[0].clone();
let schema = schema::Stream::new(incoming, outgoing);

WebSocketClient::connect(req, schema).await
}

fn propagate_call_meta(
&self,
headers: &mut reqwest::header::HeaderMap,
Expand Down
105 changes: 69 additions & 36 deletions runtimes/core/src/api/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use serde::Serialize;

use crate::api::reqauth::{platform, svcauth, CallMeta};
use crate::api::schema::encoding::{
handshake_encoding, request_encoding, response_encoding, ReqSchemaUnderConstruction,
SchemaUnderConstruction,
handshake_encoding, request_encoding, response_encoding, HandshakeSchemaUnderConstruction,
ReqSchemaUnderConstruction, SchemaUnderConstruction,
};
use crate::api::schema::{JSONPayload, Method};
use crate::api::{jsonschema, schema, ErrCode, Error};
Expand Down Expand Up @@ -121,12 +121,30 @@ pub enum ResponseData {
Raw(axum::http::Response<axum::body::Body>),
}

/// Schema variations for stream handshake
#[derive(Debug)]
pub enum HandshakeSchema {
// Handshake with only a path, no parseable data
Path(schema::Path),
// Handshake with a request schema
Request(schema::Request),
}

impl HandshakeSchema {
pub fn path(&self) -> &schema::Path {
match self {
HandshakeSchema::Path(path) => path,
HandshakeSchema::Request(schema::Request { path, .. }) => path,
}
}
}

/// Represents a single API Endpoint.
#[derive(Debug)]
pub struct Endpoint {
pub name: EndpointName,
pub path: meta::Path,
pub handshake: Option<Arc<schema::Request>>,
pub handshake: Option<Arc<HandshakeSchema>>,
pub request: Vec<Arc<schema::Request>>,
pub response: Arc<schema::Response>,

Expand Down Expand Up @@ -198,7 +216,7 @@ pub fn endpoints_from_meta(
struct EndpointUnderConstruction<'a> {
svc: &'a meta::Service,
ep: &'a meta::Rpc,
handshake_schema: Option<ReqSchemaUnderConstruction>,
handshake_schema: Option<HandshakeSchemaUnderConstruction>,
request_schemas: Vec<ReqSchemaUnderConstruction>,
response_schema: SchemaUnderConstruction,
}
Expand Down Expand Up @@ -235,14 +253,49 @@ pub fn endpoints_from_meta(
for ep in endpoints {
let mut request_schemas = Vec::with_capacity(ep.request_schemas.len());
let raw = rpc::Protocol::try_from(ep.ep.proto).is_ok_and(|p| p == rpc::Protocol::Raw);

let handshake_schema = ep
.handshake_schema
.map(|schema| schema.build(&registry))
.transpose()?;

let handshake = handshake_schema
.map(|handshake_schema| -> anyhow::Result<Arc<HandshakeSchema>> {
let path = handshake_schema
.schema
.path
.context("endpoint must have a path defined")?;

let handshake_schema = if handshake_schema.parse_data {
let handshake_schema = schema::Request {
methods: vec![],
path,
header: handshake_schema.schema.header,
query: handshake_schema.schema.query,
body: schema::RequestBody::Typed(None),
stream: false,
};

HandshakeSchema::Request(handshake_schema)
} else {
HandshakeSchema::Path(path)
};

Ok(Arc::new(handshake_schema))
})
.transpose()?;

for req_schema in ep.request_schemas {
let req_schema = req_schema.build(&registry)?;
let path = req_schema
.schema
.path
.or_else(|| handshake.as_ref().map(|hs| hs.path().clone()))
.context("endpoint must have path defined")?;

request_schemas.push(Arc::new(schema::Request {
methods: req_schema.methods,
path: req_schema
.schema
.path
.context("endpoint must have path defined")?,
path,
header: req_schema.schema.header,
query: req_schema.schema.query,
body: if raw {
Expand All @@ -254,28 +307,6 @@ pub fn endpoints_from_meta(
}));
}
let resp_schema = ep.response_schema.build(&registry)?;
let handshake_schema = ep
.handshake_schema
.map(|schema| schema.build(&registry))
.transpose()?;

let handshake = handshake_schema
.map(|handshake_schema| -> anyhow::Result<Arc<schema::Request>> {
let handshake_schema = schema::Request {
methods: handshake_schema.methods,
path: handshake_schema
.schema
.path
.context("endpoint must have a path defined")?,
header: handshake_schema.schema.header,
query: handshake_schema.schema.query,
body: schema::RequestBody::Typed(None),
stream: false,
};

Ok(Arc::new(handshake_schema))
})
.transpose()?;

// We only support a single gateway right now.
let exposed = ep.ep.expose.contains_key("api-gateway");
Expand Down Expand Up @@ -382,13 +413,15 @@ impl EndpointHandler {

let meta = CallMeta::parse_with_caller(&self.shared.inbound_svc_auth, &parts.headers)?;

let parsed_payload = if stream_direction.is_none() {
req_schema.extract(&mut parts, body).await?
} else if let Some(handshake_schema) = &self.endpoint.handshake {
// handshake does not have a body
handshake_schema.extract_parts(&mut parts).await?
let parsed_payload = if let Some(handshake_schema) = &self.endpoint.handshake {
match handshake_schema.as_ref() {
HandshakeSchema::Request(req_schema) => {
req_schema.extract(&mut parts, body).await?
}
HandshakeSchema::Path(_) => None,
}
} else {
None
req_schema.extract(&mut parts, body).await?
};

// Extract caller information.
Expand Down
11 changes: 11 additions & 0 deletions runtimes/core/src/api/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::trace::Tracer;
use crate::{api, model, pubsub, secrets, EncoreName, EndpointName, Hosted};

use super::encore_routes::healthz;
use super::websocket_client::WebSocketClient;

pub struct ManagerConfig<'a> {
pub meta: &'a meta::Data,
Expand Down Expand Up @@ -317,6 +318,16 @@ impl Manager {
self.service_registry.api_call(endpoint_name, data, source)
}

pub fn stream<'a>(
&'a self,
endpoint_name: &'a EndpointName,
data: JSONPayload,
source: Option<&'a model::Request>,
) -> impl Future<Output = APIResult<WebSocketClient>> + 'a {
self.service_registry
.connect_stream(endpoint_name, data, source)
}

/// Starts serving the API.
pub fn start_serving(&self) -> tokio::task::JoinHandle<anyhow::Result<()>> {
let api = self.api_server.as_ref().map(|srv| srv.router());
Expand Down
1 change: 1 addition & 0 deletions runtimes/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod schema;
mod server;
mod static_assets;
pub mod websocket;
pub mod websocket_client;

pub use endpoint::*;
pub use error::*;
Expand Down
2 changes: 1 addition & 1 deletion runtimes/core/src/api/schema/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Body {
}
}

impl ToOutgoingRequest for Body {
impl ToOutgoingRequest<reqwest::Request> for Body {
fn to_outgoing_request(
&self,
payload: &mut JSONPayload,
Expand Down
Loading
Loading