Skip to content

Commit

Permalink
fix: add dedupe capability on all IO operations (#2945)
Browse files Browse the repository at this point in the history
Co-authored-by: Tushar Mathur <[email protected]>
  • Loading branch information
ssddOnTop and tusharmath authored Oct 1, 2024
1 parent cec5896 commit 8de419f
Show file tree
Hide file tree
Showing 33 changed files with 354 additions and 130 deletions.
56 changes: 49 additions & 7 deletions generated/.tailcallrc.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ directive @cache(
Provides the ability to refer to multiple fields in the Query or Mutation root.
"""
directive @call(
"""
Enables deduplication of IO operations to enhance performance.This flag prevents
duplicate IO requests from being executed concurrently, reducing resource load. Caution:
May lead to issues with APIs that expect unique results for identical inputs, such
as nonce-based APIs.
"""
dedupe: Boolean
"""
Steps are composed together to form a call. If you have multiple steps, the output
of the previous step is passed as input to the next step.
Expand Down Expand Up @@ -71,6 +78,13 @@ directive @graphQL(
"""
batch: Boolean!
"""
Enables deduplication of IO operations to enhance performance.This flag prevents
duplicate IO requests from being executed concurrently, reducing resource load. Caution:
May lead to issues with APIs that expect unique results for identical inputs, such
as nonce-based APIs.
"""
dedupe: Boolean
"""
The headers parameter allows you to customize the headers of the GraphQL request
made by the `@graphQL` operator. It is used by specifying a key-value map of header
names and their values.
Expand Down Expand Up @@ -111,6 +125,13 @@ directive @grpc(
"""
body: JSON
"""
Enables deduplication of IO operations to enhance performance.This flag prevents
duplicate IO requests from being executed concurrently, reducing resource load. Caution:
May lead to issues with APIs that expect unique results for identical inputs, such
as nonce-based APIs.
"""
dedupe: Boolean
"""
The `headers` parameter allows you to customize the headers of the HTTP request made
by the `@grpc` operator. It is used by specifying a key-value map of header names
and their values. Note: content-type is automatically set to application/grpc
Expand Down Expand Up @@ -148,6 +169,13 @@ directive @http(
"""
body: String
"""
Enables deduplication of IO operations to enhance performance.This flag prevents
duplicate IO requests from being executed concurrently, reducing resource load. Caution:
May lead to issues with APIs that expect unique results for identical inputs, such
as nonce-based APIs.
"""
dedupe: Boolean
"""
The `encoding` parameter specifies the encoding of the request body. It can be `ApplicationJson`
or `ApplicationXWwwFormUrlEncoded`. @default `ApplicationJson`.
"""
Expand Down Expand Up @@ -254,13 +282,6 @@ directive @server(
"""
batchRequests: Boolean
"""
Enables deduplication of IO operations to enhance performance.This flag prevents
duplicate IO requests from being executed concurrently, reducing resource load. Caution:
May lead to issues with APIs that expect unique results for identical inputs, such
as nonce-based APIs.
"""
dedupe: Boolean
"""
`enableFederation` enables functionality to Tailcall server to act as a federation
subgraph.
"""
Expand Down Expand Up @@ -752,6 +773,13 @@ input GraphQL {
"""
batch: Boolean!
"""
Enables deduplication of IO operations to enhance performance.This flag prevents
duplicate IO requests from being executed concurrently, reducing resource load. Caution:
May lead to issues with APIs that expect unique results for identical inputs, such
as nonce-based APIs.
"""
dedupe: Boolean
"""
The headers parameter allows you to customize the headers of the GraphQL request
made by the `@graphQL` operator. It is used by specifying a key-value map of header
names and their values.
Expand Down Expand Up @@ -792,6 +820,13 @@ input Grpc {
"""
body: JSON
"""
Enables deduplication of IO operations to enhance performance.This flag prevents
duplicate IO requests from being executed concurrently, reducing resource load. Caution:
May lead to issues with APIs that expect unique results for identical inputs, such
as nonce-based APIs.
"""
dedupe: Boolean
"""
The `headers` parameter allows you to customize the headers of the HTTP request made
by the `@grpc` operator. It is used by specifying a key-value map of header names
and their values. Note: content-type is automatically set to application/grpc
Expand Down Expand Up @@ -829,6 +864,13 @@ input Http {
"""
body: String
"""
Enables deduplication of IO operations to enhance performance.This flag prevents
duplicate IO requests from being executed concurrently, reducing resource load. Caution:
May lead to issues with APIs that expect unique results for identical inputs, such
as nonce-based APIs.
"""
dedupe: Boolean
"""
The `encoding` parameter specifies the encoding of the request body. It can be `ApplicationJson`
or `ApplicationXWwwFormUrlEncoded`. @default `ApplicationJson`.
"""
Expand Down
35 changes: 28 additions & 7 deletions generated/.tailcallrc.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@
"steps"
],
"properties": {
"dedupe": {
"description": "Enables deduplication of IO operations to enhance performance.\n\nThis flag prevents duplicate IO requests from being executed concurrently, reducing resource load. Caution: May lead to issues with APIs that expect unique results for identical inputs, such as nonce-based APIs.",
"type": [
"boolean",
"null"
]
},
"steps": {
"description": "Steps are composed together to form a call. If you have multiple steps, the output of the previous step is passed as input to the next step.",
"type": "array",
Expand Down Expand Up @@ -541,6 +548,13 @@
"description": "If the upstream GraphQL server supports request batching, you can specify the 'batch' argument to batch several requests into a single batch request.\n\nMake sure you have also specified batch settings to the `@upstream` and to the `@graphQL` operator.",
"type": "boolean"
},
"dedupe": {
"description": "Enables deduplication of IO operations to enhance performance.\n\nThis flag prevents duplicate IO requests from being executed concurrently, reducing resource load. Caution: May lead to issues with APIs that expect unique results for identical inputs, such as nonce-based APIs.",
"type": [
"boolean",
"null"
]
},
"headers": {
"description": "The headers parameter allows you to customize the headers of the GraphQL request made by the `@graphQL` operator. It is used by specifying a key-value map of header names and their values.",
"type": "array",
Expand Down Expand Up @@ -579,6 +593,13 @@
"body": {
"description": "This refers to the arguments of your gRPC call. You can pass it as a static object or use Mustache template for dynamic parameters. These parameters will be added in the body in `protobuf` format."
},
"dedupe": {
"description": "Enables deduplication of IO operations to enhance performance.\n\nThis flag prevents duplicate IO requests from being executed concurrently, reducing resource load. Caution: May lead to issues with APIs that expect unique results for identical inputs, such as nonce-based APIs.",
"type": [
"boolean",
"null"
]
},
"headers": {
"description": "The `headers` parameter allows you to customize the headers of the HTTP request made by the `@grpc` operator. It is used by specifying a key-value map of header names and their values. Note: content-type is automatically set to application/grpc",
"type": "array",
Expand Down Expand Up @@ -669,6 +690,13 @@
"null"
]
},
"dedupe": {
"description": "Enables deduplication of IO operations to enhance performance.\n\nThis flag prevents duplicate IO requests from being executed concurrently, reducing resource load. Caution: May lead to issues with APIs that expect unique results for identical inputs, such as nonce-based APIs.",
"type": [
"boolean",
"null"
]
},
"encoding": {
"description": "The `encoding` parameter specifies the encoding of the request body. It can be `ApplicationJson` or `ApplicationXWwwFormUrlEncoded`. @default `ApplicationJson`.",
"allOf": [
Expand Down Expand Up @@ -1018,13 +1046,6 @@
"null"
]
},
"dedupe": {
"description": "Enables deduplication of IO operations to enhance performance.\n\nThis flag prevents duplicate IO requests from being executed concurrently, reducing resource load. Caution: May lead to issues with APIs that expect unique results for identical inputs, such as nonce-based APIs.",
"type": [
"boolean",
"null"
]
},
"enableFederation": {
"description": "`enableFederation` enables functionality to Tailcall server to act as a federation subgraph.",
"type": [
Expand Down
22 changes: 16 additions & 6 deletions src/core/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::Arc;

use async_graphql::dynamic::{self, DynamicRequest};
use async_graphql_value::ConstValue;
use hyper::body::Bytes;

use crate::core::async_graphql_hyper::OperationId;
use crate::core::auth::context::GlobalAuthContext;
Expand All @@ -11,7 +10,7 @@ use crate::core::data_loader::{DataLoader, DedupeResult};
use crate::core::graphql::GraphqlDataLoader;
use crate::core::grpc;
use crate::core::grpc::data_loader::GrpcDataLoader;
use crate::core::http::{DataLoaderRequest, HttpDataLoader, Response};
use crate::core::http::{DataLoaderRequest, HttpDataLoader};
use crate::core::ir::model::{DataLoaderId, IoId, IO, IR};
use crate::core::ir::Error;
use crate::core::rest::{Checked, EndpointSet};
Expand All @@ -27,7 +26,7 @@ pub struct AppContext {
pub endpoints: EndpointSet<Checked>,
pub auth_ctx: Arc<GlobalAuthContext>,
pub dedupe_handler: Arc<DedupeResult<IoId, ConstValue, Error>>,
pub dedupe_operation_handler: DedupeResult<OperationId, Response<Bytes>, Error>,
pub dedupe_operation_handler: DedupeResult<OperationId, Arc<async_graphql::Response>, Error>,
}

impl AppContext {
Expand All @@ -48,9 +47,15 @@ impl AppContext {
expr.modify(&mut |expr| match expr {
IR::IO(io) => match io {
IO::Http {
req_template, group_by, http_filter, is_list, ..
req_template,
group_by,
http_filter,
is_list,
dedupe,
..
} => {
let is_list = *is_list;
let dedupe = *dedupe;
let data_loader = HttpDataLoader::new(
runtime.clone(),
group_by.clone(),
Expand All @@ -64,14 +69,16 @@ impl AppContext {
dl_id: Some(DataLoaderId::new(http_data_loaders.len())),
http_filter: http_filter.clone(),
is_list,
dedupe,
}));

http_data_loaders.push(data_loader);

result
}

IO::GraphQL { req_template, field_name, batch, .. } => {
IO::GraphQL { req_template, field_name, batch, dedupe, .. } => {
let dedupe = *dedupe;
let graphql_data_loader =
GraphqlDataLoader::new(runtime.clone(), *batch)
.into_data_loader(
Expand All @@ -83,14 +90,16 @@ impl AppContext {
field_name: field_name.clone(),
batch: *batch,
dl_id: Some(DataLoaderId::new(gql_data_loaders.len())),
dedupe,
}));

gql_data_loaders.push(graphql_data_loader);

result
}

IO::Grpc { req_template, group_by, .. } => {
IO::Grpc { req_template, group_by, dedupe, .. } => {
let dedupe = *dedupe;
let data_loader = GrpcDataLoader {
runtime: runtime.clone(),
operation: req_template.operation.clone(),
Expand All @@ -104,6 +113,7 @@ impl AppContext {
req_template: req_template.clone(),
group_by: group_by.clone(),
dl_id: Some(DataLoaderId::new(grpc_data_loaders.len())),
dedupe,
}));

grpc_data_loaders.push(data_loader);
Expand Down
3 changes: 2 additions & 1 deletion src/core/blueprint/operators/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ pub fn compile_graphql(
.map(|req_template| {
let field_name = graphql.name.clone();
let batch = graphql.batch;
IR::IO(IO::GraphQL { req_template, field_name, batch, dl_id: None })
let dedupe = graphql.dedupe.unwrap_or_default();
IR::IO(IO::GraphQL { req_template, field_name, batch, dl_id: None, dedupe })
})
}

Expand Down
4 changes: 3 additions & 1 deletion src/core/blueprint/operators/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ pub fn compile_grpc(inputs: CompileGrpc) -> Valid<IR, String> {
let field = inputs.field;
let grpc = inputs.grpc;
let validate_with_schema = inputs.validate_with_schema;
let dedupe = grpc.dedupe.unwrap_or_default();

Valid::from(GrpcMethod::try_from(grpc.method.as_str()))
.and_then(|method| {
Expand Down Expand Up @@ -201,9 +202,10 @@ pub fn compile_grpc(inputs: CompileGrpc) -> Valid<IR, String> {
req_template,
group_by: Some(GroupBy::new(grpc.batch_key.clone(), None)),
dl_id: None,
dedupe,
})
} else {
IR::IO(IO::Grpc { req_template, group_by: None, dl_id: None })
IR::IO(IO::Grpc { req_template, group_by: None, dl_id: None, dedupe })
}
})
}
Expand Down
4 changes: 4 additions & 0 deletions src/core/blueprint/operators/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub fn compile_http(
http: &config::Http,
is_list: bool,
) -> Valid<IR, String> {
let dedupe = http.dedupe.unwrap_or_default();

Valid::<(), String>::fail("GroupBy is only supported for GET requests".to_string())
.when(|| !http.batch_key.is_empty() && http.method != Method::GET)
.and(
Expand Down Expand Up @@ -81,6 +83,7 @@ pub fn compile_http(
dl_id: None,
http_filter,
is_list,
dedupe,
})
} else {
IR::IO(IO::Http {
Expand All @@ -89,6 +92,7 @@ pub fn compile_http(
dl_id: None,
http_filter,
is_list,
dedupe,
})
}
})
Expand Down
2 changes: 0 additions & 2 deletions src/core/blueprint/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub struct Server {
pub cors: Option<Cors>,
pub experimental_headers: HashSet<HeaderName>,
pub auth: Option<Auth>,
pub dedupe: bool,
pub routes: Routes,
}

Expand Down Expand Up @@ -150,7 +149,6 @@ impl TryFrom<crate::core::config::ConfigModule> for Server {
script,
cors,
auth,
dedupe: config_server.get_dedupe(),
routes: config_server.get_routes(),
}
},
Expand Down
Loading

1 comment on commit 8de419f

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running 30s test @ http://localhost:8000/graphql

4 threads and 100 connections

Thread Stats Avg Stdev Max +/- Stdev
Latency 7.26ms 3.07ms 36.87ms 70.56%
Req/Sec 3.48k 363.41 4.80k 94.67%

416300 requests in 30.05s, 777.67MB read

Requests/sec: 13854.80

Transfer/sec: 25.88MB

Please sign in to comment.