Skip to content

Commit

Permalink
chore: support nested joins using JIT (#2323)
Browse files Browse the repository at this point in the history
Co-authored-by: Tushar Mathur <[email protected]>
  • Loading branch information
meskill and tusharmath committed Jul 2, 2024
1 parent ee1511b commit 7ee2238
Show file tree
Hide file tree
Showing 9 changed files with 996 additions and 84 deletions.
28 changes: 5 additions & 23 deletions src/core/jit/context.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,28 @@
use derive_getters::Getters;
use indexmap::IndexMap;

use super::Request;
use crate::core::ir::ResolverContextLike;

/// Rust representation of the GraphQL context available in the DSL
#[derive(Getters)]
#[derive(Getters, Debug)]
pub struct Context<'a, Input, Output> {
request: &'a Request<Input>,
parent: Option<&'a Output>,
value: Option<&'a Output>,
}

impl<'a, Input, Output> Clone for Context<'a, Input, Output> {
fn clone(&self) -> Self {
Self {
request: self.request,
parent: self.parent,
value: self.value,
}
Self { request: self.request, value: self.value }
}
}

impl<'a, Input, Output> Context<'a, Input, Output> {
pub fn new(request: &'a Request<Input>) -> Self {
Self { request, parent: None, value: None }
}

pub fn with_parent_value(&self, value: &'a Output) -> Self {
Self {
request: self.request,
parent: self.parent,
value: Some(value),
}
Self { request, value: None }
}

pub fn with_value(&self, value: &'a Output) -> Self {
Self {
request: self.request,
parent: self.parent,
value: Some(value),
}
Self { request: self.request, value: Some(value) }
}
}

Expand All @@ -61,7 +43,7 @@ impl<'a> ResolverContextLike for Context<'a, async_graphql::Value, async_graphql
todo!()
}

fn add_error(&self, error: async_graphql::ServerError) {
fn add_error(&self, _error: async_graphql::ServerError) {
todo!()
}
}
87 changes: 48 additions & 39 deletions src/core/jit/exec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::borrow::Borrow;
use std::mem;
use std::sync::{Arc, Mutex};

Expand All @@ -22,7 +21,7 @@ pub struct Executor<Synth, IRExec> {

impl<Input, Output, Error, Synth, Exec> Executor<Synth, Exec>
where
Output: JsonLike<Output = Output>,
Output: JsonLike<Output = Output> + Default,
Synth: Synthesizer<Value = Result<Output, Error>>,
Exec: IRExecutor<Input = Input, Output = Output, Error = Error>,
{
Expand All @@ -34,7 +33,7 @@ where
let store: Arc<Mutex<Store<Result<Output, Error>>>> =
Arc::new(Mutex::new(Store::new(self.plan.size())));
let mut ctx = ExecutorInner::new(request, store.clone(), self.plan.to_owned(), &self.exec);
ctx.execute().await;
ctx.init().await;

let store = mem::replace(&mut *store.lock().unwrap(), Store::new(0));
store
Expand All @@ -51,75 +50,85 @@ struct ExecutorInner<'a, Input, Output, Error, Exec> {
request: Request<Input>,
store: Arc<Mutex<Store<Result<Output, Error>>>>,
plan: ExecutionPlan,
exec: &'a Exec,
ir_exec: &'a Exec,
}

impl<'a, Input, Output, Error, Exec> ExecutorInner<'a, Input, Output, Error, Exec>
where
Output: JsonLike<Output = Output>,
Output: JsonLike<Output = Output> + Default,
Exec: IRExecutor<Input = Input, Output = Output, Error = Error>,
{
fn new(
request: Request<Input>,
store: Arc<Mutex<Store<Result<Output, Error>>>>,
plan: ExecutionPlan,
exec: &'a Exec,
ir_exec: &'a Exec,
) -> Self {
Self { request, store, plan, exec }
Self { request, store, plan, ir_exec }
}

async fn execute(&mut self) {
async fn init(&mut self) {
join_all(self.plan.as_children().iter().map(|field| async {
let ctx = Context::new(&self.request);
self.execute_field(field, &ctx, false).await
self.execute(field, &ctx).await
}))
.await;
}

async fn execute_field<'b>(
async fn execute<'b>(
&'b self,
field: &'b Field<Children>,
ctx: &'b Context<'b, Input, Output>,
is_multi: bool,
) -> Result<(), Error> {
if let Some(ir) = &field.ir {
let result = self.exec.execute(ir, ctx).await;
let result = self.ir_exec.execute(ir, ctx).await;
if let Ok(ref value) = result {
// Array
if let Ok(array) = value.as_array_ok() {
let ctx = ctx.with_parent_value(value);
join_all(array.iter().map(|value| {
let ctx = ctx.with_value(value);

join_all(field.children().iter().map(|child| {
let ctx = ctx.clone();
async move {
let ctx = ctx.clone();
self.execute_field(child, ctx.clone().borrow(), true).await
}
}))
}))
.await;
// Check if the field expects a list
if field.type_of.is_list() {
// Check if the value is an array
if let Ok(array) = value.as_array_ok() {
let values = join_all(
field
.children()
.iter()
.filter_map(|field| field.ir.as_ref())
.map(|ir| {
join_all(array.iter().map(|value| {
let ctx = ctx.with_value(value);
// TODO: doesn't handle nested values
async move { self.ir_exec.execute(ir, &ctx).await }
}))
}),
)
.await;

// Object
} else {
join_all(field.children().iter().map(|child| {
let ctx = ctx.clone();
let value = &value;
async move {
let ctx = ctx.with_parent_value(value);
self.execute_field(child, &ctx, false).await
let mut store = self.store.lock().unwrap();
for (field, values) in field
.children()
.iter()
.filter(|field| field.ir.is_some())
.zip(values)
{
store.set_multiple(&field.id, values)
}
}
// TODO: We should throw an error stating that we expected
// a list type here but because the `Error` is a
// type-parameter, its not possible
}
// TODO: Validate if the value is an Object
// Has to be an Object, we don't do anything while executing if its a Scalar
else {
join_all(field.children().iter().map(|child| {
let ctx = ctx.with_value(value);
async move { self.execute(child, &ctx).await }
}))
.await;
}
}

if is_multi {
self.store.lock().unwrap().set_multiple(&field.id, result)
} else {
self.store.lock().unwrap().set_single(&field.id, result)
};
self.store.lock().unwrap().set_single(&field.id, result)
}
Ok(())
}
Expand Down
4 changes: 0 additions & 4 deletions src/core/jit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,8 @@ mod exec;
mod model;
mod store;
mod synth;
use std::sync::Arc;

use async_graphql::Value;
use builder::*;
use context::Context;
use exec::{Executor, IRExecutor};
use model::*;
use store::*;
mod context;
Expand Down
3 changes: 1 addition & 2 deletions src/core/jit/request.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::collections::HashMap;

use async_graphql::parser::types::OperationType;
use derive_setters::Setters;
use serde::Deserialize;

use super::{Builder, Error, ExecutionPlan, Result};
use crate::core::blueprint::Blueprint;

#[derive(Deserialize, Setters)]
#[derive(Debug, Deserialize, Setters)]
pub struct Request<Value> {
pub query: String,
pub operation_name: Option<String>,
Expand Down
32 changes: 20 additions & 12 deletions src/core/jit/store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::core::jit::model::FieldId;

#[derive(Debug)]
pub struct Store<A> {
map: Vec<Data<A>>,
data: Vec<Data<A>>,
}

#[derive(Clone)]
Expand All @@ -12,11 +13,23 @@ pub enum Data<A> {
/// Represents that the value was computed multiple times for the associated
/// field. The order is guaranteed by the executor to be the same as the
/// other of invocation and not the other of completion.
// TODO: there could be multiple inside multiple in case of nested resolvers that are resolved
// to lists
Multiple(Vec<A>),
/// Represents that the value is yet to be computed
Pending,
}

impl<A> std::fmt::Debug for Data<A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Single(_) => f.debug_tuple("Single").finish(),
Self::Multiple(arg0) => f.debug_tuple("Multiple").field(&arg0.len()).finish(),
Self::Pending => write!(f, "Pending"),
}
}
}

impl<A> Data<A> {
pub fn map<B>(self, ab: impl Fn(A) -> B) -> Data<B> {
match self {
Expand All @@ -29,27 +42,22 @@ impl<A> Data<A> {

impl<A> Store<A> {
pub fn new(size: usize) -> Self {
Store { map: (0..size).map(|_| Data::Pending).collect() }
Store { data: (0..size).map(|_| Data::Pending).collect() }
}

pub fn set(&mut self, field_id: FieldId, data: Data<A>) {
self.map.insert(field_id.as_usize(), data);
self.data[field_id.as_usize()] = data;
}

pub fn set_single(&mut self, field_id: &FieldId, data: A) {
self.map.insert(field_id.as_usize(), Data::Single(data));
self.data[field_id.as_usize()] = Data::Single(data);
}

pub fn set_multiple(&mut self, field_id: &FieldId, data: A) {
match self.map.get_mut(field_id.as_usize()) {
Some(Data::Multiple(values)) => values.push(data),
_ => self
.map
.insert(field_id.as_usize(), Data::Multiple(vec![data])),
}
pub fn set_multiple(&mut self, field_id: &FieldId, data: Vec<A>) {
self.data[field_id.as_usize()] = Data::Multiple(data);
}

pub fn get(&self, field_id: &FieldId) -> Option<&Data<A>> {
self.map.get(field_id.as_usize())
self.data.get(field_id.as_usize())
}
}
2 changes: 0 additions & 2 deletions src/core/jit/synth/synth_const.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::ops::Deref;

use async_graphql::{Name, Value};
use indexmap::IndexMap;

Expand Down
1 change: 0 additions & 1 deletion src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub mod has_headers;
pub mod helpers;
pub mod http;
pub mod ir;
#[allow(unused)]
pub mod jit;
pub mod json;
pub mod merge_right;
Expand Down
13 changes: 12 additions & 1 deletion tests/jit_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ mod tests {
let response = executor.execute(request).await;
let data = response.data;

insta::assert_json_snapshot!(data)
insta::assert_json_snapshot!(data);
}

#[tokio::test]
async fn test_executor_nested() {
// NOTE: This test makes a real HTTP call
let request = Request::new("query {posts {title userId user {id name email} }}");
let executor = new_executor(&request).await.unwrap();
let response = executor.execute(request).await;
let data = response.data;

insta::assert_json_snapshot!(data);
}
}
Loading

1 comment on commit 7ee2238

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running 30s test @ http://localhost:8000/graphql

4 threads and 100 connections

Thread Stats Avg Stdev Max +/- Stdev
Latency 7.45ms 3.27ms 79.80ms 71.23%
Req/Sec 3.39k 180.53 4.36k 89.61%

405965 requests in 30.10s, 2.03GB read

Requests/sec: 13487.86

Transfer/sec: 69.23MB

Please sign in to comment.