From 8a962600f37d2dbc2c137d8a9bdcbfa6cef35920 Mon Sep 17 00:00:00 2001 From: Giacomo Rebonato Date: Thu, 25 Nov 2021 12:33:55 +0100 Subject: [PATCH 1/7] feat: gateway batching queries --- lib/gateway.js | 75 +---- lib/gateway/get-query-result.js | 142 +++++++++ lib/gateway/service-map.js | 1 + test/gateway/aliases-with-batching.js | 213 +++++++++++++ .../custom-directives-with-batching.js | 286 ++++++++++++++++++ test/gateway/get-query-result.js | 216 +++++++++++++ .../include-directive-with-batching.js | 214 +++++++++++++ test/gateway/load-balancing-with-batching.js | 198 ++++++++++++ test/gateway/with-batching.js | 188 ++++++++++++ 9 files changed, 1463 insertions(+), 70 deletions(-) create mode 100644 lib/gateway/get-query-result.js create mode 100644 test/gateway/aliases-with-batching.js create mode 100644 test/gateway/custom-directives-with-batching.js create mode 100644 test/gateway/get-query-result.js create mode 100644 test/gateway/include-directive-with-batching.js create mode 100644 test/gateway/load-balancing-with-batching.js create mode 100644 test/gateway/with-batching.js diff --git a/lib/gateway.js b/lib/gateway.js index 530721da..440e4587 100644 --- a/lib/gateway.js +++ b/lib/gateway.js @@ -4,8 +4,7 @@ const { getNamedType, isObjectType, isScalarType, - Kind, - parse + Kind } = require('graphql') const { Factory } = require('single-user-cache') const buildFederatedSchema = require('./federation') @@ -18,9 +17,8 @@ const { kEntityResolvers } = require('./gateway/make-resolver') const { MER_ERR_GQL_GATEWAY_REFRESH, MER_ERR_GQL_GATEWAY_INIT } = require('./errors') -const { preGatewayExecutionHandler } = require('./handlers') const findValueTypes = require('./gateway/find-value-types') - +const getQueryResult = require('./gateway/get-query-result') const allSettled = require('promise.allsettled') function isDefaultType (type) { @@ -354,72 +352,9 @@ async function buildGateway (gatewayOpts, app) { * */ factory.add(`${service}Entity`, async (queries) => { - const q = [...new Set(queries.map(q => q.query))] - - const resultIndexes = [] - let queryIndex = 0 - const mergedQueries = queries.reduce((acc, curr) => { - if (!acc[curr.query]) { - acc[curr.query] = curr.variables - resultIndexes[q.indexOf(curr.query)] = [] - } else { - acc[curr.query].representations = [ - ...acc[curr.query].representations, - ...curr.variables.representations - ] - } - - for (let i = 0; i < curr.variables.representations.length; i++) { - resultIndexes[q.indexOf(curr.query)].push(queryIndex) - } - - queryIndex++ - - return acc - }, {}) - - const result = [] - - // Gateway query here - await Promise.all(Object.entries(mergedQueries).map(async ([query, variables], queryIndex, entries) => { - // Trigger preGatewayExecution hook for entities - let modifiedQuery - if (queries[queryIndex].context.preGatewayExecution !== null) { - ({ modifiedQuery } = await preGatewayExecutionHandler({ - schema: serviceDefinition.schema, - document: parse(query), - context: queries[queryIndex].context, - service: { name: service } - })) - } - - const response = await serviceDefinition.sendRequest({ - originalRequestHeaders: queries[queryIndex].originalRequestHeaders, - body: JSON.stringify({ - query: modifiedQuery || query, - variables - }), - context: queries[queryIndex].context - }) - - let entityIndex = 0 - for (const entity of response.json.data._entities) { - if (!result[resultIndexes[queryIndex][entityIndex]]) { - result[resultIndexes[queryIndex][entityIndex]] = { - ...response, - json: { - data: { - _entities: [entity] - } - } - } - } else { - result[resultIndexes[queryIndex][entityIndex]].json.data._entities.push(entity) - } - - entityIndex++ - } - })) + const result = await getQueryResult({ + queries, serviceDefinition, service + }) return result }, query => query.id) diff --git a/lib/gateway/get-query-result.js b/lib/gateway/get-query-result.js new file mode 100644 index 00000000..8dc64736 --- /dev/null +++ b/lib/gateway/get-query-result.js @@ -0,0 +1,142 @@ +'use strict' + +const { + parse +} = require('graphql') +const { preGatewayExecutionHandler } = require('../handlers') + +const mergeQueries = (queries) => { + const q = [...new Set(queries.map(q => q.query))] + const resultIndexes = [] + const mergedQueries = queries.reduce((acc, curr, queryIndex) => { + if (!acc[curr.query]) { + acc[curr.query] = curr.variables + resultIndexes[q.indexOf(curr.query)] = [] + } else { + acc[curr.query].representations = [ + ...acc[curr.query].representations, + ...curr.variables.representations + ] + } + + for (let i = 0; i < curr.variables.representations.length; i++) { + resultIndexes[q.indexOf(curr.query)].push(queryIndex) + } + + return acc + }, {}) + + return { mergedQueries, resultIndexes } +} + +const getBactchedResult = async ({ mergeQueriesResult, queries, serviceDefinition, service }) => { + const { mergedQueries, resultIndexes } = mergeQueriesResult + const context = queries[0].context + const originalRequestHeaders = queries[0].originalRequestHeaders + const batchedQueries = [] + + for (const [query, variables] of Object.entries(mergedQueries)) { + const document = parse(query) + let modifiedQuery + + if (context.preGatewayExecution !== null) { + ({ modifiedQuery } = await preGatewayExecutionHandler({ + schema: serviceDefinition.schema, + document, + context, + service: { name: service } + })) + } + + batchedQueries.push({ + operationName: document.definitions.find(d => d.kind === 'OperationDefinition').name.value, + query: query || modifiedQuery, + variables + }) + } + + const response = await serviceDefinition.sendRequest({ + originalRequestHeaders, + body: JSON.stringify(batchedQueries), + context + }) + + return buildResult({ resultIndexes, data: response.json }) +} + +const buildResult = ({ resultIndexes, data }) => { + const result = [] + + for (const [queryIndex, queryResponse] of data.entries()) { + let entityIndex = 0 + + for (const entity of queryResponse.data._entities) { + if (!result[resultIndexes[queryIndex][entityIndex]]) { + result[resultIndexes[queryIndex][entityIndex]] = { + ...queryResponse, + json: { + data: { + _entities: [entity] + } + } + } + } else { + result[resultIndexes[queryIndex][entityIndex]].json.data._entities.push(entity) + } + + entityIndex++ + } + } + + return result +} + +const getResult = async ({ mergeQueriesResult, serviceDefinition, queries, service }) => { + const { mergedQueries, resultIndexes } = mergeQueriesResult + const jsons = [] + const queriesEntries = Object.entries(mergedQueries) + + for await (const [queryIndex, [query, variables]] of queriesEntries.entries()) { + let modifiedQuery + + if (queries[queryIndex].context.preGatewayExecution !== null) { + ({ modifiedQuery } = await preGatewayExecutionHandler({ + schema: serviceDefinition.schema, + document: parse(query), + context: queries[queryIndex].context, + service: { name: service } + })) + } + + const response = await serviceDefinition.sendRequest({ + originalRequestHeaders: queries[queryIndex].originalRequestHeaders, + body: JSON.stringify({ + query: modifiedQuery || query, + variables + }), + context: queries[queryIndex].context + }) + + jsons.push(response.json) + } + + return buildResult({ data: jsons, resultIndexes }) +} + +const getQueryResult = async ({ queries, serviceDefinition, service }) => { + const mergeQueriesResult = mergeQueries(queries) + const params = { + mergeQueriesResult, + service, + serviceDefinition, + queries + } + + if (serviceDefinition.allowBatchedQueries) { + return getBactchedResult({ ...params }) + } + + return getResult({ ...params }) +} + +module.exports = getQueryResult diff --git a/lib/gateway/service-map.js b/lib/gateway/service-map.js index ab53f754..42db4dcb 100644 --- a/lib/gateway/service-map.js +++ b/lib/gateway/service-map.js @@ -221,6 +221,7 @@ async function buildServiceMap (services, errorHandler) { } serviceMap[service.name].name = service.name + serviceMap[service.name].allowBatchedQueries = service.allowBatchedQueries } await pmap(services, mapper, { concurrency: 8 }) diff --git a/test/gateway/aliases-with-batching.js b/test/gateway/aliases-with-batching.js new file mode 100644 index 00000000..a570cab5 --- /dev/null +++ b/test/gateway/aliases-with-batching.js @@ -0,0 +1,213 @@ +'use strict' + +const { test } = require('tap') +const Fastify = require('fastify') +const GQL = require('../..') + +async function createTestService (t, schema, resolvers = {}) { + const service = Fastify() + service.register(GQL, { + schema, + resolvers, + federationMetadata: true, + allowBatchedQueries: true + }) + await service.listen(0) + return [service, service.server.address().port] +} + +const users = { + u1: { + id: 'u1', + name: 'John' + }, + u2: { + id: 'u2', + name: 'Jane' + } +} + +const posts = { + p1: { + pid: 'p1', + title: 'Post 1', + content: 'Content 1', + authorId: 'u1' + }, + p2: { + pid: 'p2', + title: 'Post 2', + content: 'Content 2', + authorId: 'u2' + }, + p3: { + pid: 'p3', + title: 'Post 3', + content: 'Content 3', + authorId: 'u1' + }, + p4: { + pid: 'p4', + title: 'Post 4', + content: 'Content 4', + authorId: 'u1' + } +} + +async function createTestGatewayServer (t) { + // User service + const userServiceSchema = ` + type Query @extends { + me: User + } + + type Metadata { + info: String! + } + + type User @key(fields: "id") { + id: ID! + name: String! + quote(input: String!): String! + metadata(input: String!): Metadata! + }` + const userServiceResolvers = { + Query: { + me: (root, args, context, info) => { + return users.u1 + } + }, + User: { + quote: (user, args, context, info) => { + return args.input + }, + metadata: (user, args, context, info) => { + return { + info: args.input + } + }, + __resolveReference: (user, args, context, info) => { + return users[user.id] + } + } + } + const [userService, userServicePort] = await createTestService(t, userServiceSchema, userServiceResolvers) + + // Post service + const postServiceSchema = ` + type Post @key(fields: "pid") { + pid: ID! + } + + type User @key(fields: "id") @extends { + id: ID! @external + topPosts(count: Int!): [Post] + }` + const postServiceResolvers = { + User: { + topPosts: (user, { count }, context, info) => { + return Object.values(posts).filter(p => p.authorId === user.id).slice(0, count) + } + } + } + const [postService, postServicePort] = await createTestService(t, postServiceSchema, postServiceResolvers) + + const gateway = Fastify() + t.teardown(async () => { + await gateway.close() + await userService.close() + await postService.close() + }) + gateway.register(GQL, { + gateway: { + services: [{ + name: 'user', + url: `http://localhost:${userServicePort}/graphql`, + allowBatchedQueries: true + }, { + name: 'post', + url: `http://localhost:${postServicePort}/graphql`, + allowBatchedQueries: true + }] + } + }) + return gateway +} + +test('gateway with batching - should support aliases', async (t) => { + t.plan(1) + const app = await createTestGatewayServer(t) + + const query = ` + query { + user: me { + id + name + newName: name + otherName: name + quote(input: "quote") + firstQuote: quote(input: "foo") + secondQuote: quote(input: "bar") + metadata(input: "info") { + info + } + originalMetadata: metadata(input: "hello") { + hi: info + ho: info + } + moreMetadata: metadata(input: "hi") { + info + } + somePosts: topPosts(count: 1) { + pid + } + morePosts: topPosts(count: 2) { + pid + } + } + }` + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + user: { + id: 'u1', + name: 'John', + newName: 'John', + otherName: 'John', + quote: 'quote', + firstQuote: 'foo', + secondQuote: 'bar', + metadata: { + info: 'info' + }, + originalMetadata: { + hi: 'hello', + ho: 'hello' + }, + moreMetadata: { + info: 'hi' + }, + somePosts: [ + { + pid: 'p1' + } + ], + morePosts: [ + { + pid: 'p1' + }, + { + pid: 'p3' + } + ] + } + } + }) +}) diff --git a/test/gateway/custom-directives-with-batching.js b/test/gateway/custom-directives-with-batching.js new file mode 100644 index 00000000..51e041d2 --- /dev/null +++ b/test/gateway/custom-directives-with-batching.js @@ -0,0 +1,286 @@ +'use strict' + +const t = require('tap') +const Fastify = require('fastify') +const GQL = require('../..') +const { MER_ERR_GQL_GATEWAY_DUPLICATE_DIRECTIVE } = require('../../lib/errors') + +async function createTestService (t, schema, resolvers = {}) { + const service = Fastify() + service.register(GQL, { + schema, + resolvers, + federationMetadata: true, + allowBatchedQueries: true + }) + await service.listen(0) + return [service, service.server.address().port] +} + +const users = { + u1: { + id: 'u1', + name: 'John' + }, + u2: { + id: 'u2', + name: 'Jane' + } +} + +const posts = { + p1: { + pid: 'p1', + title: 'Post 1', + content: 'Content 1', + authorId: 'u1' + }, + p2: { + pid: 'p2', + title: 'Post 2', + content: 'Content 2', + authorId: 'u2' + }, + p3: { + pid: 'p3', + title: 'Post 3', + content: 'Content 3', + authorId: 'u1' + }, + p4: { + pid: 'p4', + title: 'Post 4', + content: 'Content 4', + authorId: 'u1' + } +} + +const query = ` + query { + me { + id + name + topPosts(count: 2) { + pid + author { + id + } + } + } + topPosts(count: 2) { + pid + } + } +` + +async function createUserService (directiveDefinition) { + const userServiceSchema = ` + ${directiveDefinition} + + type Query @extends { + me: User @custom + } + + type User @key(fields: "id") { + id: ID! + name: String! @custom + }` + const userServiceResolvers = { + Query: { + me: (root, args, context, info) => { + return users.u1 + } + }, + User: { + __resolveReference: (user, args, context, info) => { + return users[user.id] + } + } + } + return createTestService(t, userServiceSchema, userServiceResolvers) +} + +async function createPostService (directiveDefinition) { + const postServiceSchema = ` + ${directiveDefinition} + + type Post @key(fields: "pid") { + pid: ID! @custom + author: User + } + + extend type Query { + topPosts(count: Int): [Post] + } + + type User @key(fields: "id") @extends { + id: ID! @external + topPosts(count: Int!): [Post] + }` + const postServiceResolvers = { + Post: { + __resolveReference: (post, args, context, info) => { + return posts[post.pid] + }, + author: (post, args, context, info) => { + return { + __typename: 'User', + id: post.authorId + } + } + }, + User: { + topPosts: (user, { count }, context, info) => { + return Object.values(posts).filter(p => p.authorId === user.id).slice(0, count) + } + }, + Query: { + topPosts: (root, { count = 2 }) => Object.values(posts).slice(0, count) + } + } + return createTestService(t, postServiceSchema, postServiceResolvers) +} + +t.test('gateway with batching', t => { + t.plan(2) + + t.test('should de-duplicate custom directives on the gateway', async (t) => { + t.plan(4) + + const [userService, userServicePort] = await createUserService('directive @custom(input: ID) on OBJECT | FIELD_DEFINITION') + const [postService, postServicePort] = await createPostService('directive @custom(input: ID) on OBJECT | FIELD_DEFINITION') + const gateway = Fastify() + t.teardown(async () => { + await gateway.close() + await userService.close() + await postService.close() + }) + gateway.register(GQL, { + gateway: { + services: [{ + name: 'user', + url: `http://localhost:${userServicePort}/graphql`, + allowBatchedQueries: true + }, { + name: 'post', + url: `http://localhost:${postServicePort}/graphql`, + allowBatchedQueries: true + }] + } + }) + await gateway.ready() + + const userDirectiveNames = userService.graphql.schema.getDirectives().map(directive => directive.name) + t.same(userDirectiveNames, [ + 'include', + 'skip', + 'deprecated', + 'specifiedBy', + 'external', + 'requires', + 'provides', + 'key', + 'extends', + 'custom' + ]) + + const postDirectiveNames = userService.graphql.schema.getDirectives().map(directive => directive.name) + t.same(postDirectiveNames, [ + 'include', + 'skip', + 'deprecated', + 'specifiedBy', + 'external', + 'requires', + 'provides', + 'key', + 'extends', + 'custom' + ]) + + const gatewayDirectiveNames = gateway.graphql.schema.getDirectives().map(directive => directive.name) + t.same(gatewayDirectiveNames, [ + 'include', + 'skip', + 'deprecated', + 'specifiedBy', + 'custom' + ]) + + const res = await gateway.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + topPosts: [ + { + pid: 'p1', + author: { + id: 'u1' + } + }, + { + pid: 'p3', + author: { + id: 'u1' + } + } + ] + }, + topPosts: [ + { + pid: 'p1' + }, + { + pid: 'p2' + } + ] + } + }) + }) + + t.test('should error on startup when different definitions of custom directives with the same name are present in federated services', async (t) => { + t.plan(1) + + const [userService, userServicePort] = await createUserService('directive @custom(input: ID) on OBJECT | FIELD_DEFINITION') + const [postService, postServicePort] = await createPostService('directive @custom(input: String) on OBJECT | FIELD_DEFINITION') + const serviceOpts = { + keepAliveTimeout: 10, // milliseconds + keepAliveMaxTimeout: 10 // milliseconds + } + + const gateway = Fastify() + t.teardown(async () => { + await gateway.close() + await userService.close() + await postService.close() + }) + gateway.register(GQL, { + gateway: { + services: [ + { + ...serviceOpts, + name: 'user', + url: `http://localhost:${userServicePort}/graphql`, + allowBatchedQueries: true + }, + { + ...serviceOpts, + name: 'post', + url: `http://localhost:${postServicePort}/graphql`, + allowBatchedQueries: true + } + ] + } + }) + + await t.rejects(gateway.ready(), new MER_ERR_GQL_GATEWAY_DUPLICATE_DIRECTIVE('custom')) + }) +}) diff --git a/test/gateway/get-query-result.js b/test/gateway/get-query-result.js new file mode 100644 index 00000000..c642f1dd --- /dev/null +++ b/test/gateway/get-query-result.js @@ -0,0 +1,216 @@ +'use strict' + +const getQueryResult = require('../../lib/gateway/get-query-result') +const { test } = require('tap') + +const query1 = ` +query EntitiesQuery($representations: [_Any!]!) { + _entities(representations: $representations) { + __typename + ... on User { + topPosts(count: 1) { + pid + __typename + pid + } + } + } +} + +` +const query2 = ` +query EntitiesQuery($representations: [_Any!]!) { + _entities(representations: $representations) { + __typename + ... on User { + topPosts(count: 2) { + pid + __typename + pid + } + } + } +} +` + +test('it works with a basic example', async (t) => { + const result = await getQueryResult({ + queries: [ + { + context: { + preGatewayExecution: null + }, + query: query1, + variables: { + representations: [ + { + __typename: 'User', + id: 'u1' + } + ] + } + } + ], + serviceDefinition: { + sendRequest: async () => ({ + json: { + data: { + _entities: [ + { + __typename: 'User', + topPosts: { + pid: 'p1', + __typename: 'Post' + } + } + ] + } + } + }) + } + }) + + t.same(result[0].data._entities[0], { + __typename: 'User', + topPosts: { + pid: 'p1', + __typename: 'Post' + } + }) +}) + +test('it works with a basic example and batched queries', async (t) => { + const result = await getQueryResult({ + queries: [ + { + context: { + preGatewayExecution: null + }, + query: query1, + variables: { + representations: [ + { + __typename: 'User', + id: 'u1' + } + ] + } + }, + { + context: { + preGatewayExecution: null + }, + query: query2, + variables: { + representations: [ + { + __typename: 'User', + id: 'u1' + } + ] + } + } + ], + serviceDefinition: { + allowBatchedQueries: true, + sendRequest: async () => ({ + json: [ + { + data: { + _entities: [ + { + __typename: 'User', + topPosts: [{ pid: 'p1', __typename: 'Post' }] + } + ] + } + }, + { + data: { + _entities: [ + { + __typename: 'User', + topPosts: [ + { pid: 'p1', __typename: 'Post' }, + { pid: 'p3', __typename: 'Post' } + ] + } + ] + } + } + ] + }) + } + }) + + t.same(result, [ + { + data: { + _entities: [ + { + __typename: 'User', + topPosts: [ + { + pid: 'p1', + __typename: 'Post' + } + ] + } + ] + }, + json: { + data: { + _entities: [ + { + __typename: 'User', + topPosts: [ + { + pid: 'p1', + __typename: 'Post' + } + ] + } + ] + } + } + }, + { + data: { + _entities: [ + { + __typename: 'User', + topPosts: [ + { + pid: 'p1', + __typename: 'Post' + }, + { + pid: 'p3', + __typename: 'Post' + } + ] + } + ] + }, + json: { + data: { + _entities: [ + { + __typename: 'User', + topPosts: [ + { + pid: 'p1', + __typename: 'Post' + }, + { + pid: 'p3', + __typename: 'Post' + } + ] + } + ] + } + } + } + ]) +}) diff --git a/test/gateway/include-directive-with-batching.js b/test/gateway/include-directive-with-batching.js new file mode 100644 index 00000000..01630b83 --- /dev/null +++ b/test/gateway/include-directive-with-batching.js @@ -0,0 +1,214 @@ +'use strict' + +const { test } = require('tap') +const Fastify = require('fastify') +const GQL = require('../..') + +async function createTestService (t, schema, resolvers = {}) { + const service = Fastify() + service.register(GQL, { + schema, + resolvers, + federationMetadata: true, + allowBatchedQueries: true + }) + await service.listen(0) + return [service, service.server.address().port] +} + +const users = { + u1: { + id: 'u1', + name: 'John' + }, + u2: { + id: 'u2', + name: 'Jane' + } +} + +const posts = { + p1: { + pid: 'p1', + title: 'Post 1', + content: 'Content 1', + authorId: 'u1' + }, + p2: { + pid: 'p2', + title: 'Post 2', + content: 'Content 2', + authorId: 'u2' + }, + p3: { + pid: 'p3', + title: 'Post 3', + content: 'Content 3', + authorId: 'u1' + }, + p4: { + pid: 'p4', + title: 'Post 4', + content: 'Content 4', + authorId: 'u1' + } +} + +async function createTestGatewayServer (t) { + // User service + const userServiceSchema = ` + type Query @extends { + me: User + } + + type Metadata { + info: String! + } + + type User @key(fields: "id") { + id: ID! + name: String! + metadata(input: String!): Metadata! + }` + const userServiceResolvers = { + Query: { + me: (root, args, context, info) => { + return users.u1 + } + }, + User: { + metadata: (user, args, context, info) => { + return { + info: args.input + } + } + } + } + const [userService, userServicePort] = await createTestService(t, userServiceSchema, userServiceResolvers) + + // Post service + const postServiceSchema = ` + type Post @key(fields: "pid") { + pid: ID! + } + + type User @key(fields: "id") @extends { + id: ID! @external + topPosts(count: Int!): [Post] + }` + const postServiceResolvers = { + User: { + topPosts: (user, { count }, context, info) => { + return Object.values(posts).filter(p => p.authorId === user.id).slice(0, count) + } + } + } + const [postService, postServicePort] = await createTestService(t, postServiceSchema, postServiceResolvers) + + const gateway = Fastify() + t.teardown(async () => { + await gateway.close() + await userService.close() + await postService.close() + }) + gateway.register(GQL, { + gateway: { + services: [{ + name: 'user', + url: `http://localhost:${userServicePort}/graphql`, + allowBatchedQueries: true + }, { + name: 'post', + url: `http://localhost:${postServicePort}/graphql`, + allowBatchedQueries: true + }] + } + }) + return gateway +} + +test('gateway - should support truthy include directive', async (t) => { + t.plan(1) + const app = await createTestGatewayServer(t) + + const variables = { + shouldInclude: true, + input: 'hello' + } + const query = ` + query GetMe($input: String!, $shouldInclude: Boolean!) { + me { + id + name + metadata(input: $input) @include(if: $shouldInclude) { + info + } + topPosts(count: 1) @include(if: $shouldInclude) { + pid + } + } + }` + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query, variables }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + metadata: { + info: 'hello' + }, + topPosts: [ + { + pid: 'p1' + } + ] + } + } + }) +}) + +test('gateway - should support falsy include directive', async (t) => { + t.plan(1) + const app = await createTestGatewayServer(t) + + const variables = { + shouldInclude: false, + input: 'hello' + } + const query = ` + query GetMe($input: String!, $shouldInclude: Boolean!) { + me { + id + name + metadata(input: $input) @include(if: $shouldInclude) { + info + } + topPosts(count: 1) @include(if: $shouldInclude) { + pid + } + } + }` + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query, variables }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John' + } + } + }) +}) diff --git a/test/gateway/load-balancing-with-batching.js b/test/gateway/load-balancing-with-batching.js new file mode 100644 index 00000000..a5b49b7e --- /dev/null +++ b/test/gateway/load-balancing-with-batching.js @@ -0,0 +1,198 @@ +'use strict' + +const { test } = require('tap') +const Fastify = require('fastify') +const GQL = require('../..') + +async function createTestService (t, schema, resolvers = {}, fn = async () => {}) { + const service = Fastify() + service.addHook('preHandler', fn) + service.register(GQL, { + schema, + resolvers, + federationMetadata: true, + allowBatchedQueries: true + }) + await service.listen(0) + return [service, service.server.address().port] +} + +const users = { + u1: { + id: 'u1', + name: 'John' + }, + u2: { + id: 'u2', + name: 'Jane' + } +} + +const posts = { + p1: { + pid: 'p1', + title: 'Post 1', + content: 'Content 1', + authorId: 'u1' + }, + p2: { + pid: 'p2', + title: 'Post 2', + content: 'Content 2', + authorId: 'u2' + }, + p3: { + pid: 'p3', + title: 'Post 3', + content: 'Content 3', + authorId: 'u1' + }, + p4: { + pid: 'p4', + title: 'Post 4', + content: 'Content 4', + authorId: 'u1' + } +} + +test('load balances two peers', async (t) => { + // User service + const userServiceSchema = ` + type Query @extends { + me: User + } + + type Metadata { + info: String! + } + + type User @key(fields: "id") { + id: ID! + name: String! + metadata(input: String!): Metadata! + }` + const userServiceResolvers = { + Query: { + me: (root, args, context, info) => { + return users.u1 + } + }, + User: { + metadata: (user, args, context, info) => { + return { + info: args.input + } + } + } + } + let user1called = 0 + let user2called = 0 + const [userService1, userServicePort1] = await createTestService(t, userServiceSchema, userServiceResolvers, async () => { + user1called++ + }) + const [userService2, userServicePort2] = await createTestService(t, userServiceSchema, userServiceResolvers, async () => { + user2called++ + }) + + // Post service + const postServiceSchema = ` + type Post @key(fields: "pid") { + pid: ID! + } + + type User @key(fields: "id") @extends { + id: ID! @external + topPosts(count: Int!): [Post] + }` + const postServiceResolvers = { + User: { + topPosts: (user, { count }, context, info) => { + return Object.values(posts).filter(p => p.authorId === user.id).slice(0, count) + } + } + } + const [postService, postServicePort] = await createTestService(t, postServiceSchema, postServiceResolvers) + + const gateway = Fastify() + t.teardown(async () => { + await gateway.close() + await userService1.close() + await userService2.close() + await postService.close() + }) + + gateway.register(GQL, { + gateway: { + services: [{ + name: 'user', + url: [`http://localhost:${userServicePort1}/graphql`, `http://localhost:${userServicePort2}/graphql`], + allowBatchedQueries: true + }, { + name: 'post', + url: `http://localhost:${postServicePort}/graphql`, + allowBatchedQueries: true + }] + } + }) + await gateway + + const variables = { + shouldSkip: true, + input: 'hello' + } + const query = ` + query GetMe($input: String!, $shouldSkip: Boolean!) { + me { + id + name + metadata(input: $input) @skip(if: $shouldSkip) { + info + } + topPosts(count: 1) @skip(if: $shouldSkip) { + pid + } + } + }` + + { + const res = await gateway.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query, variables }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John' + } + } + }) + } + + { + const res = await gateway.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query, variables }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John' + } + } + }) + } + + // Called two times, one to get the schema and one for the query + t.equal(user1called, 2) + + // Called one time, one one for the query + t.equal(user2called, 1) +}) diff --git a/test/gateway/with-batching.js b/test/gateway/with-batching.js new file mode 100644 index 00000000..c6db28aa --- /dev/null +++ b/test/gateway/with-batching.js @@ -0,0 +1,188 @@ +'use strict' + +const { test } = require('tap') +const Fastify = require('fastify') +const GQL = require('../..') + +async function createTestService (t, schema, resolvers = {}, allowBatchedQueries = false) { + const service = Fastify() + service.register(GQL, { + schema, + resolvers, + federationMetadata: true, + allowBatchedQueries + }) + await service.listen(0) + return [service, service.server.address().port] +} + +const users = { + u1: { + id: 'u1', + name: 'John' + }, + u2: { + id: 'u2', + name: 'Jane' + } +} + +const posts = { + p1: { + pid: 'p1', + title: 'Post 1', + content: 'Content 1', + authorId: 'u1' + }, + p2: { + pid: 'p2', + title: 'Post 2', + content: 'Content 2', + authorId: 'u2' + }, + p3: { + pid: 'p3', + title: 'Post 3', + content: 'Content 3', + authorId: 'u1' + }, + p4: { + pid: 'p4', + title: 'Post 4', + content: 'Content 4', + authorId: 'u1' + } +} + +async function createTestGatewayServer (t, allowBatchedQueries = false) { + // User service + const userServiceSchema = ` + type Query @extends { + me: User + } + + type Metadata { + info: String! + } + + type User @key(fields: "id") { + id: ID! + name: String! + quote(input: String!): String! + metadata(input: String!): Metadata! + }` + const userServiceResolvers = { + Query: { + me: (root, args, context, info) => { + return users.u1 + } + }, + User: { + quote: (user, args, context, info) => { + return args.input + }, + metadata: (user, args, context, info) => { + return { + info: args.input + } + }, + __resolveReference: (user, args, context, info) => { + return users[user.id] + } + } + } + const [userService, userServicePort] = await createTestService(t, userServiceSchema, userServiceResolvers, allowBatchedQueries) + + // Post service + const postServiceSchema = ` + type Post @key(fields: "pid") { + pid: ID! + } + + type User @key(fields: "id") @extends { + id: ID! @external + topPosts(count: Int!): [Post] + }` + const postServiceResolvers = { + User: { + topPosts: (user, { count }, context, info) => { + return Object.values(posts).filter(p => p.authorId === user.id).slice(0, count) + } + } + } + const [postService, postServicePort] = await createTestService(t, postServiceSchema, postServiceResolvers, allowBatchedQueries) + + const gateway = Fastify() + t.teardown(async () => { + await gateway.close() + await userService.close() + await postService.close() + }) + gateway.register(GQL, { + gateway: { + services: [{ + name: 'user', + url: `http://localhost:${userServicePort}/graphql`, + allowBatchedQueries + }, { + name: 'post', + url: `http://localhost:${postServicePort}/graphql`, + allowBatchedQueries + }] + } + }) + return gateway +} + +test('it returns the same data if batching is enabled', async (t) => { + t.plan(1) + const app1 = await createTestGatewayServer(t) + const app2 = await createTestGatewayServer(t, true) + + const query = ` + query { + user: me { + id + name + newName: name + otherName: name + quote(input: "quote") + firstQuote: quote(input: "foo") + secondQuote: quote(input: "bar") + metadata(input: "info") { + info + } + originalMetadata: metadata(input: "hello") { + hi: info + ho: info + } + moreMetadata: metadata(input: "hi") { + info + } + somePosts: topPosts(count: 1) { + pid + } + morePosts: topPosts(count: 2) { + pid + } + } + }` + + const res1 = await app1.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + await app1.close() + + const res2 = await app2.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res1.body), JSON.parse(res2.body)) +}) From 4f3d03819658c9c2bff6e8aa736f92d72809e83b Mon Sep 17 00:00:00 2001 From: Giacomo Rebonato Date: Thu, 25 Nov 2021 18:27:02 +0100 Subject: [PATCH 2/7] feat: batching queries with gateway - unit testing --- lib/gateway.js | 5 +- lib/gateway/get-query-result.js | 25 +- lib/gateway/make-resolver.js | 3 +- test/gateway/get-query-result.js | 208 ++--- test/gateway/hooks-with-batching.js | 1240 +++++++++++++++++++++++++++ 5 files changed, 1320 insertions(+), 161 deletions(-) create mode 100644 test/gateway/hooks-with-batching.js diff --git a/lib/gateway.js b/lib/gateway.js index 440e4587..7acda4bc 100644 --- a/lib/gateway.js +++ b/lib/gateway.js @@ -352,8 +352,11 @@ async function buildGateway (gatewayOpts, app) { * */ factory.add(`${service}Entity`, async (queries) => { + // context is the same for each request, but unfortunately it's not acessible from onRequest + // where we do factory.create(). What is a cleaner option? + const context = queries[0].context const result = await getQueryResult({ - queries, serviceDefinition, service + context, queries, serviceDefinition, service }) return result diff --git a/lib/gateway/get-query-result.js b/lib/gateway/get-query-result.js index 8dc64736..b4611c58 100644 --- a/lib/gateway/get-query-result.js +++ b/lib/gateway/get-query-result.js @@ -29,10 +29,8 @@ const mergeQueries = (queries) => { return { mergedQueries, resultIndexes } } -const getBactchedResult = async ({ mergeQueriesResult, queries, serviceDefinition, service }) => { +const getBactchedResult = async ({ mergeQueriesResult, context, serviceDefinition, service }) => { const { mergedQueries, resultIndexes } = mergeQueriesResult - const context = queries[0].context - const originalRequestHeaders = queries[0].originalRequestHeaders const batchedQueries = [] for (const [query, variables] of Object.entries(mergedQueries)) { @@ -50,13 +48,13 @@ const getBactchedResult = async ({ mergeQueriesResult, queries, serviceDefinitio batchedQueries.push({ operationName: document.definitions.find(d => d.kind === 'OperationDefinition').name.value, - query: query || modifiedQuery, + query: modifiedQuery || query, variables }) } const response = await serviceDefinition.sendRequest({ - originalRequestHeaders, + originalRequestHeaders: context.reply.request.headers, body: JSON.stringify(batchedQueries), context }) @@ -91,30 +89,30 @@ const buildResult = ({ resultIndexes, data }) => { return result } -const getResult = async ({ mergeQueriesResult, serviceDefinition, queries, service }) => { +const getResult = async ({ mergeQueriesResult, serviceDefinition, context, service }) => { const { mergedQueries, resultIndexes } = mergeQueriesResult const jsons = [] const queriesEntries = Object.entries(mergedQueries) - for await (const [queryIndex, [query, variables]] of queriesEntries.entries()) { + for await (const [query, variables] of queriesEntries) { let modifiedQuery - if (queries[queryIndex].context.preGatewayExecution !== null) { + if (context.preGatewayExecution !== null) { ({ modifiedQuery } = await preGatewayExecutionHandler({ schema: serviceDefinition.schema, document: parse(query), - context: queries[queryIndex].context, + context, service: { name: service } })) } const response = await serviceDefinition.sendRequest({ - originalRequestHeaders: queries[queryIndex].originalRequestHeaders, + originalRequestHeaders: context.reply.request.headers, body: JSON.stringify({ query: modifiedQuery || query, variables }), - context: queries[queryIndex].context + context }) jsons.push(response.json) @@ -123,13 +121,14 @@ const getResult = async ({ mergeQueriesResult, serviceDefinition, queries, servi return buildResult({ data: jsons, resultIndexes }) } -const getQueryResult = async ({ queries, serviceDefinition, service }) => { +const getQueryResult = async ({ context, queries, serviceDefinition, service }) => { const mergeQueriesResult = mergeQueries(queries) const params = { mergeQueriesResult, service, serviceDefinition, - queries + queries, + context } if (serviceDefinition.allowBatchedQueries) { diff --git a/lib/gateway/make-resolver.js b/lib/gateway/make-resolver.js index ef25c726..e0c87c05 100644 --- a/lib/gateway/make-resolver.js +++ b/lib/gateway/make-resolver.js @@ -506,10 +506,11 @@ function makeResolver ({ service, createOperation, transformData, isQuery, isRef return transformData(response) } + // This method is declared in gateway.js inside of onRequest + // hence it's unique per request. const response = await reply[kEntityResolvers][`${service.name}Entity`]({ query, variables, - originalRequestHeaders: reply.request.headers, context, id: queryId }) diff --git a/test/gateway/get-query-result.js b/test/gateway/get-query-result.js index c642f1dd..b78353ab 100644 --- a/test/gateway/get-query-result.js +++ b/test/gateway/get-query-result.js @@ -3,12 +3,12 @@ const getQueryResult = require('../../lib/gateway/get-query-result') const { test } = require('tap') -const query1 = ` +const getQueryWithCount = (count) => ` query EntitiesQuery($representations: [_Any!]!) { _entities(representations: $representations) { __typename ... on User { - topPosts(count: 1) { + topPosts(count: ${count}) { pid __typename pid @@ -16,31 +16,54 @@ query EntitiesQuery($representations: [_Any!]!) { } } } - ` -const query2 = ` -query EntitiesQuery($representations: [_Any!]!) { - _entities(representations: $representations) { - __typename - ... on User { - topPosts(count: 2) { - pid - __typename - pid - } + +const createEntity = (pid) => ({ + __typename: 'User', + topPosts: { + pid, + __typename: 'Post' + } +}) + +const createNotBatchedResponse = (...entities) => ({ + json: { + data: { + _entities: [...entities] } } -} -` +}) + +const createBatchedResponse = (...entities) => ({ + json: [ + { + data: { + _entities: [...entities] + } + }, + { + data: { + _entities: [...entities] + } + } + ] +}) test('it works with a basic example', async (t) => { + const entity1 = createEntity('p1') + const entity2 = createEntity('p2') const result = await getQueryResult({ + context: { + preGatewayExecution: null, + reply: { + request: { + headers: {} + } + } + }, queries: [ { - context: { - preGatewayExecution: null - }, - query: query1, + query: getQueryWithCount(1), variables: { representations: [ { @@ -52,41 +75,29 @@ test('it works with a basic example', async (t) => { } ], serviceDefinition: { - sendRequest: async () => ({ - json: { - data: { - _entities: [ - { - __typename: 'User', - topPosts: { - pid: 'p1', - __typename: 'Post' - } - } - ] - } - } - }) + sendRequest: async () => createNotBatchedResponse(entity1, entity2) } }) - t.same(result[0].data._entities[0], { - __typename: 'User', - topPosts: { - pid: 'p1', - __typename: 'Post' - } - }) + t.same(result[0].data._entities[0], entity1) + t.same(result[0].data._entities[1], entity2) }) test('it works with a basic example and batched queries', async (t) => { + const entity1 = createEntity('p3') + const entity2 = createEntity('p4') const result = await getQueryResult({ + context: { + preGatewayExecution: null, + reply: { + request: { + headers: {} + } + } + }, queries: [ { - context: { - preGatewayExecution: null - }, - query: query1, + query: getQueryWithCount(1), variables: { representations: [ { @@ -97,10 +108,7 @@ test('it works with a basic example and batched queries', async (t) => { } }, { - context: { - preGatewayExecution: null - }, - query: query2, + query: getQueryWithCount(2), variables: { representations: [ { @@ -113,104 +121,12 @@ test('it works with a basic example and batched queries', async (t) => { ], serviceDefinition: { allowBatchedQueries: true, - sendRequest: async () => ({ - json: [ - { - data: { - _entities: [ - { - __typename: 'User', - topPosts: [{ pid: 'p1', __typename: 'Post' }] - } - ] - } - }, - { - data: { - _entities: [ - { - __typename: 'User', - topPosts: [ - { pid: 'p1', __typename: 'Post' }, - { pid: 'p3', __typename: 'Post' } - ] - } - ] - } - } - ] - }) + sendRequest: async () => createBatchedResponse(entity1, entity2) } }) - t.same(result, [ - { - data: { - _entities: [ - { - __typename: 'User', - topPosts: [ - { - pid: 'p1', - __typename: 'Post' - } - ] - } - ] - }, - json: { - data: { - _entities: [ - { - __typename: 'User', - topPosts: [ - { - pid: 'p1', - __typename: 'Post' - } - ] - } - ] - } - } - }, - { - data: { - _entities: [ - { - __typename: 'User', - topPosts: [ - { - pid: 'p1', - __typename: 'Post' - }, - { - pid: 'p3', - __typename: 'Post' - } - ] - } - ] - }, - json: { - data: { - _entities: [ - { - __typename: 'User', - topPosts: [ - { - pid: 'p1', - __typename: 'Post' - }, - { - pid: 'p3', - __typename: 'Post' - } - ] - } - ] - } - } - } - ]) + t.same(result[0].data._entities[0], entity1) + t.same(result[0].data._entities[1], entity2) + t.same(result[1].data._entities[0], entity1) + t.same(result[1].data._entities[1], entity2) }) diff --git a/test/gateway/hooks-with-batching.js b/test/gateway/hooks-with-batching.js new file mode 100644 index 00000000..9ff9518e --- /dev/null +++ b/test/gateway/hooks-with-batching.js @@ -0,0 +1,1240 @@ +'use strict' + +const { test } = require('tap') +const Fastify = require('fastify') +const { GraphQLSchema, parse } = require('graphql') +const { promisify } = require('util') +const GQL = require('../..') + +const immediate = promisify(setImmediate) + +async function createTestService (t, schema, resolvers = {}) { + const service = Fastify() + service.register(GQL, { + schema, + resolvers, + federationMetadata: true, + allowBatchedQueries: true + }) + await service.listen(0) + return [service, service.server.address().port] +} + +const users = { + u1: { + id: 'u1', + name: 'John' + }, + u2: { + id: 'u2', + name: 'Jane' + } +} + +const posts = { + p1: { + pid: 'p1', + title: 'Post 1', + content: 'Content 1', + authorId: 'u1' + }, + p2: { + pid: 'p2', + title: 'Post 2', + content: 'Content 2', + authorId: 'u2' + }, + p3: { + pid: 'p3', + title: 'Post 3', + content: 'Content 3', + authorId: 'u1' + }, + p4: { + pid: 'p4', + title: 'Post 4', + content: 'Content 4', + authorId: 'u1' + } +} + +const query = ` + query { + me { + id + name + topPosts(count: 2) { + pid + author { + id + } + } + } + topPosts(count: 2) { + pid + } + } +` + +async function createTestGatewayServer (t, opts = {}) { + // User service + const userServiceSchema = ` + type Query @extends { + me: User + } + + type User @key(fields: "id") { + id: ID! + name: String! + }` + const userServiceResolvers = { + Query: { + me: (root, args, context, info) => { + return users.u1 + } + }, + User: { + __resolveReference: (user, args, context, info) => { + return users[user.id] + } + } + } + const [userService, userServicePort] = await createTestService(t, userServiceSchema, userServiceResolvers) + + // Post service + const postServiceSchema = ` + type Post @key(fields: "pid") { + pid: ID! + author: User + } + + extend type Query { + topPosts(count: Int): [Post] + } + + type User @key(fields: "id") @extends { + id: ID! @external + topPosts(count: Int!): [Post] + }` + const postServiceResolvers = { + Post: { + __resolveReference: (post, args, context, info) => { + return posts[post.pid] + }, + author: (post, args, context, info) => { + return { + __typename: 'User', + id: post.authorId + } + } + }, + User: { + topPosts: (user, { count }, context, info) => { + return Object.values(posts).filter(p => p.authorId === user.id).slice(0, count) + } + }, + Query: { + topPosts: (root, { count = 2 }) => Object.values(posts).slice(0, count) + } + } + const [postService, postServicePort] = await createTestService(t, postServiceSchema, postServiceResolvers) + + const gateway = Fastify() + t.teardown(async () => { + await gateway.close() + await userService.close() + await postService.close() + }) + gateway.register(GQL, { + ...opts, + gateway: { + services: [{ + name: 'user', + url: `http://localhost:${userServicePort}/graphql`, + allowBatchedQueries: true + }, { + name: 'post', + url: `http://localhost:${postServicePort}/graphql`, + allowBatchedQueries: true + }] + } + }) + return gateway +} + +// ----- +// hooks +// ----- +test('gateway - hooks', async (t) => { + t.plan(32) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('preParsing', async function (schema, source, context) { + await immediate() + t.type(schema, GraphQLSchema) + t.equal(source, query) + t.type(context, 'object') + t.ok('preParsing called') + }) + + app.graphql.addHook('preValidation', async function (schema, document, context) { + await immediate() + t.type(schema, GraphQLSchema) + t.same(document, parse(query)) + t.type(context, 'object') + t.ok('preValidation called') + }) + + app.graphql.addHook('preExecution', async function (schema, document, context) { + await immediate() + t.type(schema, GraphQLSchema) + t.same(document, parse(query)) + t.type(context, 'object') + t.ok('preExecution called') + }) + + // Execution events: + // - once for user service query + // - once for post service query + // - once for reference type topPosts on User + // - once for reference type author on Post + app.graphql.addHook('preGatewayExecution', async function (schema, document, context) { + await immediate() + t.type(schema, GraphQLSchema) + t.type(document, 'object') + t.type(context, 'object') + t.ok('preGatewayExecution called') + }) + + app.graphql.addHook('onResolution', async function (execution, context) { + await immediate() + t.type(execution, 'object') + t.type(context, 'object') + t.ok('onResolution called') + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + topPosts: [ + { + pid: 'p1', + author: { + id: 'u1' + } + }, + { + pid: 'p3', + author: { + id: 'u1' + } + } + ] + }, + topPosts: [ + { + pid: 'p1' + }, + { + pid: 'p2' + } + ] + } + }) +}) + +test('gateway - hooks validation should handle invalid hook names', async (t) => { + t.plan(1) + const app = await createTestGatewayServer(t) + + try { + app.graphql.addHook('unsupportedHook', async () => {}) + } catch (e) { + t.equal(e.message, 'unsupportedHook hook not supported!') + } +}) + +test('gateway - hooks validation should handle invalid hook name types', async (t) => { + t.plan(2) + const app = await createTestGatewayServer(t) + + try { + app.graphql.addHook(1, async () => {}) + } catch (e) { + t.equal(e.code, 'MER_ERR_HOOK_INVALID_TYPE') + t.equal(e.message, 'The hook name must be a string') + } +}) + +test('gateway - hooks validation should handle invalid hook handlers', async (t) => { + t.plan(2) + const app = await createTestGatewayServer(t) + + try { + app.graphql.addHook('preParsing', 'not a function') + } catch (e) { + t.equal(e.code, 'MER_ERR_HOOK_INVALID_HANDLER') + t.equal(e.message, 'The hook callback must be a function') + } +}) + +test('gateway - hooks should trigger when JIT is enabled', async (t) => { + t.plan(60) + const app = await createTestGatewayServer(t, { jit: 1 }) + + app.graphql.addHook('preParsing', async function (schema, source, context) { + await immediate() + t.type(schema, GraphQLSchema) + t.equal(source, query) + t.type(context, 'object') + t.ok('preParsing called') + }) + + // preValidation is not triggered a second time + app.graphql.addHook('preValidation', async function (schema, document, context) { + await immediate() + t.type(schema, GraphQLSchema) + t.same(document, parse(query)) + t.type(context, 'object') + t.ok('preValidation called') + }) + + app.graphql.addHook('preExecution', async function (schema, document, context) { + await immediate() + t.type(schema, GraphQLSchema) + t.same(document, parse(query)) + t.type(context, 'object') + t.ok('preExecution called') + }) + + // Execution events: + // - once for user service query + // - once for post service query + // - once for reference type topPosts on User + // - once for reference type author on Post + app.graphql.addHook('preGatewayExecution', async function (schema, document, context) { + await immediate() + t.type(schema, GraphQLSchema) + t.type(document, 'object') + t.type(context, 'object') + t.ok('preGatewayExecution called') + }) + + app.graphql.addHook('onResolution', async function (execution, context) { + await immediate() + t.type(execution, 'object') + t.type(context, 'object') + t.ok('onResolution called') + }) + + { + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + topPosts: [ + { + pid: 'p1', + author: { + id: 'u1' + } + }, + { + pid: 'p3', + author: { + id: 'u1' + } + } + ] + }, + topPosts: [ + { + pid: 'p1' + }, + { + pid: 'p2' + } + ] + } + }) + } + + { + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + topPosts: [ + { + pid: 'p1', + author: { + id: 'u1' + } + }, + { + pid: 'p3', + author: { + id: 'u1' + } + } + ] + }, + topPosts: [ + { + pid: 'p1' + }, + { + pid: 'p2' + } + ] + } + }) + } +}) + +// -------------------- +// preParsing +// -------------------- +test('gateway - preParsing hooks should handle errors', async t => { + t.plan(4) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('preParsing', async (schema, source, context) => { + t.type(schema, GraphQLSchema) + t.equal(source, query) + t.type(context, 'object') + throw new Error('a preParsing error occured') + }) + + app.graphql.addHook('preParsing', async (schema, source, context) => { + t.fail('this should not be called') + }) + + app.graphql.addHook('preValidation', async (schema, document, context) => { + t.fail('this should not be called') + }) + + app.graphql.addHook('preExecution', async (schema, operation, context) => { + t.fail('this should not be called') + }) + + app.graphql.addHook('onResolution', async (execution, context) => { + t.fail('this should not be called') + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: null, + errors: [ + { + message: 'a preParsing error occured' + } + ] + }) +}) + +test('gateway - preParsing hooks should be able to put values onto the context', async t => { + t.plan(8) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('preParsing', async (schema, source, context) => { + t.type(schema, GraphQLSchema) + t.equal(source, query) + t.type(context, 'object') + context.foo = 'bar' + }) + + app.graphql.addHook('preParsing', async (schema, source, context) => { + t.type(schema, GraphQLSchema) + t.equal(source, query) + t.type(context, 'object') + t.equal(context.foo, 'bar') + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + topPosts: [ + { + pid: 'p1', + author: { + id: 'u1' + } + }, + { + pid: 'p3', + author: { + id: 'u1' + } + } + ] + }, + topPosts: [ + { + pid: 'p1' + }, + { + pid: 'p2' + } + ] + } + }) +}) + +// -------------- +// preValidation +// -------------- +test('gateway - preValidation hooks should handle errors', async t => { + t.plan(4) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('preValidation', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.same(document, parse(query)) + t.type(context, 'object') + throw new Error('a preValidation error occured') + }) + + app.graphql.addHook('preValidation', async (schema, document, context) => { + t.fail('this should not be called') + }) + + app.graphql.addHook('preExecution', async (schema, document, context) => { + t.fail('this should not be called') + }) + + app.graphql.addHook('onResolution', async (execution, context) => { + t.fail('this should not be called') + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: null, + errors: [ + { + message: 'a preValidation error occured' + } + ] + }) +}) + +test('gateway - preValidation hooks should be able to put values onto the context', async t => { + t.plan(8) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('preValidation', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.same(document, parse(query)) + t.type(context, 'object') + context.foo = 'bar' + }) + + app.graphql.addHook('preValidation', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.same(document, parse(query)) + t.type(context, 'object') + t.equal(context.foo, 'bar') + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + topPosts: [ + { + pid: 'p1', + author: { + id: 'u1' + } + }, + { + pid: 'p3', + author: { + id: 'u1' + } + } + ] + }, + topPosts: [ + { + pid: 'p1' + }, + { + pid: 'p2' + } + ] + } + }) +}) + +// ------------- +// preExecution +// ------------- +test('gateway - preExecution hooks should handle errors', async t => { + t.plan(4) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('preExecution', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.same(document, parse(query)) + t.type(context, 'object') + throw new Error('a preExecution error occured') + }) + + app.graphql.addHook('preExecution', async (schema, document, context) => { + t.fail('this should not be called') + }) + + app.graphql.addHook('onResolution', async (execution, context) => { + t.fail('this should not be called') + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: null, + errors: [ + { + message: 'a preExecution error occured' + } + ] + }) +}) + +test('gateway - preExecution hooks should be able to put values onto the context', async t => { + t.plan(8) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('preExecution', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.same(document, parse(query)) + t.type(context, 'object') + context.foo = 'bar' + }) + + app.graphql.addHook('preExecution', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.same(document, parse(query)) + t.type(context, 'object') + t.equal(context.foo, 'bar') + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + topPosts: [ + { + pid: 'p1', + author: { + id: 'u1' + } + }, + { + pid: 'p3', + author: { + id: 'u1' + } + } + ] + }, + topPosts: [ + { + pid: 'p1' + }, + { + pid: 'p2' + } + ] + } + }) +}) + +test('gateway - preExecution hooks should be able to modify the request document', async t => { + t.plan(5) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('preExecution', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.same(document, parse(query)) + t.type(context, 'object') + t.ok('preExecution called') + const documentClone = JSON.parse(JSON.stringify(document)) + documentClone.definitions[0].selectionSet.selections = [documentClone.definitions[0].selectionSet.selections[0]] + return { + document: documentClone + } + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + topPosts: [ + { + pid: 'p1', + author: { + id: 'u1' + } + }, + { + pid: 'p3', + author: { + id: 'u1' + } + } + ] + } + } + }) +}) + +test('gateway - preExecution hooks should be able to add to the errors array', async t => { + t.plan(9) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('preExecution', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.same(document, parse(query)) + t.type(context, 'object') + t.ok('preExecution called for foo error') + return { + errors: [new Error('foo')] + } + }) + + app.graphql.addHook('preExecution', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.same(document, parse(query)) + t.type(context, 'object') + t.ok('preExecution called for foo error') + return { + errors: [new Error('bar')] + } + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + topPosts: [ + { + pid: 'p1', + author: { + id: 'u1' + } + }, + { + pid: 'p3', + author: { + id: 'u1' + } + } + ] + }, + topPosts: [ + { + pid: 'p1' + }, + { + pid: 'p2' + } + ] + }, + errors: [ + { + message: 'foo' + }, + { + message: 'bar' + } + ] + }) +}) + +// ------------------- +// preGatewayExecution +// ------------------- +test('gateway - preGatewayExecution hooks should handle errors', async t => { + t.plan(10) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('preGatewayExecution', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.type(document, 'object') + t.type(context, 'object') + throw new Error('a preGatewayExecution error occured') + }) + + app.graphql.addHook('preGatewayExecution', async (schema, document, context) => { + t.fail('this should not be called') + }) + + // This should still be called in the gateway + app.graphql.addHook('onResolution', async (execution, context) => { + t.type(execution, 'object') + t.type(context, 'object') + t.ok('onResolution called') + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: null, + topPosts: null + }, + errors: [ + { + message: 'a preGatewayExecution error occured', + locations: [{ line: 3, column: 5 }], + path: ['me'] + }, + { + message: 'a preGatewayExecution error occured', + locations: [{ line: 13, column: 5 }], + path: ['topPosts'] + } + ] + }) +}) + +test('gateway - preGatewayExecution hooks should be able to put values onto the context', async t => { + t.plan(29) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('preGatewayExecution', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.type(document, 'object') + t.type(context, 'object') + context[document.definitions[0].name.value] = 'bar' + }) + + app.graphql.addHook('preGatewayExecution', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.type(document, 'object') + t.type(context, 'object') + t.equal(context[document.definitions[0].name.value], 'bar') + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + topPosts: [ + { + pid: 'p1', + author: { + id: 'u1' + } + }, + { + pid: 'p3', + author: { + id: 'u1' + } + } + ] + }, + topPosts: [ + { + pid: 'p1' + }, + { + pid: 'p2' + } + ] + } + }) +}) + +test('gateway - preGatewayExecution hooks should be able to add to the errors array', async t => { + t.plan(33) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('preGatewayExecution', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.type(document, 'object') + t.type(context, 'object') + t.ok('preGatewayExecution called for foo error') + return { + errors: [new Error(`foo - ${document.definitions[0].name.value}`)] + } + }) + + app.graphql.addHook('preGatewayExecution', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.type(document, 'object') + t.type(context, 'object') + t.ok('preGatewayExecution called for foo error') + return { + errors: [new Error(`bar - ${document.definitions[0].name.value}`)] + } + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + topPosts: [ + { + pid: 'p1', + author: { + id: 'u1' + } + }, + { + pid: 'p3', + author: { + id: 'u1' + } + } + ] + }, + topPosts: [ + { + pid: 'p1' + }, + { + pid: 'p2' + } + ] + }, + errors: [ + { + message: 'foo - Query_me' + }, + { + message: 'bar - Query_me' + }, + { + message: 'foo - Query_topPosts' + }, + { + message: 'bar - Query_topPosts' + }, + { + message: 'foo - EntitiesQuery' + }, + { + message: 'bar - EntitiesQuery' + }, + { + message: 'foo - EntitiesQuery' + }, + { + message: 'bar - EntitiesQuery' + } + ] + }) +}) + +test('gateway - preGatewayExecution hooks should be able to modify the request document', async t => { + t.plan(17) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('preGatewayExecution', async (schema, document, context) => { + t.type(schema, GraphQLSchema) + t.type(document, 'object') + t.type(context, 'object') + t.ok('preGatewayExecution called') + if (document.definitions[0].name.value === 'EntitiesQuery') { + if (document.definitions[0].selectionSet.selections[0].selectionSet.selections[1].selectionSet.selections[0].arguments[0]) { + const documentClone = JSON.parse(JSON.stringify(document)) + documentClone.definitions[0].selectionSet.selections[0].selectionSet.selections[1].selectionSet.selections[0].arguments[0].value.value = 1 + return { + document: documentClone + } + } + } + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + topPosts: [ + { + pid: 'p1', + author: { + id: 'u1' + } + } + ] + }, + topPosts: [ + { + pid: 'p1' + }, + { + pid: 'p2' + } + ] + } + }) +}) + +test('gateway - preGatewayExecution hooks should contain service metadata', async (t) => { + t.plan(21) + const app = await createTestGatewayServer(t) + + // Execution events: + // - user service: once for user service query + // - post service: once for post service query + // - post service: once for reference type topPosts on User + // - user service: once for reference type author on Post + app.graphql.addHook('preGatewayExecution', async function (schema, document, context, service) { + await immediate() + t.type(schema, GraphQLSchema) + t.type(document, 'object') + t.type(context, 'object') + if (typeof service === 'object' && service.name === 'user') { + t.equal(service.name, 'user') + } else if (typeof service === 'object' && service.name === 'post') { + t.equal(service.name, 'post') + } else { + t.fail('service metadata should be correctly populated') + return + } + t.ok('preGatewayExecution called') + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + topPosts: [ + { + pid: 'p1', + author: { + id: 'u1' + } + }, + { + pid: 'p3', + author: { + id: 'u1' + } + } + ] + }, + topPosts: [ + { + pid: 'p1' + }, + { + pid: 'p2' + } + ] + } + }) +}) + +// ------------- +// onResolution +// ------------- +test('gateway - onResolution hooks should handle errors', async t => { + t.plan(3) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('onResolution', async (execution, context) => { + t.type(execution, 'object') + t.type(context, 'object') + throw new Error('a onResolution error occured') + }) + + app.graphql.addHook('onResolution', async (execution, context) => { + t.fail('this should not be called') + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: null, + errors: [ + { + message: 'a onResolution error occured' + } + ] + }) +}) + +test('gateway - onResolution hooks should be able to put values onto the context', async t => { + t.plan(6) + const app = await createTestGatewayServer(t) + + app.graphql.addHook('onResolution', async (execution, context) => { + t.type(execution, 'object') + t.type(context, 'object') + context.foo = 'bar' + }) + + app.graphql.addHook('onResolution', async (execution, context) => { + t.type(execution, 'object') + t.type(context, 'object') + t.equal(context.foo, 'bar') + }) + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res.body), { + data: { + me: { + id: 'u1', + name: 'John', + topPosts: [ + { + pid: 'p1', + author: { + id: 'u1' + } + }, + { + pid: 'p3', + author: { + id: 'u1' + } + } + ] + }, + topPosts: [ + { + pid: 'p1' + }, + { + pid: 'p2' + } + ] + } + }) +}) From ed26498f9467a1e6486f667461517831dd364897 Mon Sep 17 00:00:00 2001 From: Giacomo Rebonato Date: Thu, 25 Nov 2021 18:50:06 +0100 Subject: [PATCH 3/7] feat: gateway query batching - parallel execution --- lib/gateway/get-query-result.js | 61 ++++++++++++++++---------------- lib/gateway/make-resolver.js | 1 + test/gateway/get-query-result.js | 5 +++ 3 files changed, 36 insertions(+), 31 deletions(-) diff --git a/lib/gateway/get-query-result.js b/lib/gateway/get-query-result.js index b4611c58..a899feb4 100644 --- a/lib/gateway/get-query-result.js +++ b/lib/gateway/get-query-result.js @@ -1,8 +1,5 @@ 'use strict' -const { - parse -} = require('graphql') const { preGatewayExecutionHandler } = require('../handlers') const mergeQueries = (queries) => { @@ -10,11 +7,14 @@ const mergeQueries = (queries) => { const resultIndexes = [] const mergedQueries = queries.reduce((acc, curr, queryIndex) => { if (!acc[curr.query]) { - acc[curr.query] = curr.variables + acc[curr.query] = { + document: curr.document, + variables: curr.variables + } resultIndexes[q.indexOf(curr.query)] = [] } else { - acc[curr.query].representations = [ - ...acc[curr.query].representations, + acc[curr.query].variables.representations = [ + ...acc[curr.query].variables.representations, ...curr.variables.representations ] } @@ -33,8 +33,7 @@ const getBactchedResult = async ({ mergeQueriesResult, context, serviceDefinitio const { mergedQueries, resultIndexes } = mergeQueriesResult const batchedQueries = [] - for (const [query, variables] of Object.entries(mergedQueries)) { - const document = parse(query) + for (const [query, { document, variables }] of Object.entries(mergedQueries)) { let modifiedQuery if (context.preGatewayExecution !== null) { @@ -91,34 +90,34 @@ const buildResult = ({ resultIndexes, data }) => { const getResult = async ({ mergeQueriesResult, serviceDefinition, context, service }) => { const { mergedQueries, resultIndexes } = mergeQueriesResult - const jsons = [] const queriesEntries = Object.entries(mergedQueries) + const data = await Promise.all( + queriesEntries.map(async ([query, { document, variables }]) => { + let modifiedQuery + + if (context.preGatewayExecution !== null) { + ({ modifiedQuery } = await preGatewayExecutionHandler({ + schema: serviceDefinition.schema, + document, + context, + service: { name: service } + })) + } - for await (const [query, variables] of queriesEntries) { - let modifiedQuery + const response = await serviceDefinition.sendRequest({ + originalRequestHeaders: context.reply.request.headers, + body: JSON.stringify({ + query: modifiedQuery || query, + variables + }), + context + }) - if (context.preGatewayExecution !== null) { - ({ modifiedQuery } = await preGatewayExecutionHandler({ - schema: serviceDefinition.schema, - document: parse(query), - context, - service: { name: service } - })) - } - - const response = await serviceDefinition.sendRequest({ - originalRequestHeaders: context.reply.request.headers, - body: JSON.stringify({ - query: modifiedQuery || query, - variables - }), - context + return response.json }) + ) - jsons.push(response.json) - } - - return buildResult({ data: jsons, resultIndexes }) + return buildResult({ data, resultIndexes }) } const getQueryResult = async ({ context, queries, serviceDefinition, service }) => { diff --git a/lib/gateway/make-resolver.js b/lib/gateway/make-resolver.js index 421ca840..db47537f 100644 --- a/lib/gateway/make-resolver.js +++ b/lib/gateway/make-resolver.js @@ -511,6 +511,7 @@ function makeResolver ({ service, createOperation, transformData, isQuery, isRef // This method is declared in gateway.js inside of onRequest // hence it's unique per request. const response = await entityResolvers[`${service.name}Entity`]({ + document: operation, query, variables, context, diff --git a/test/gateway/get-query-result.js b/test/gateway/get-query-result.js index b78353ab..093212c3 100644 --- a/test/gateway/get-query-result.js +++ b/test/gateway/get-query-result.js @@ -1,5 +1,6 @@ 'use strict' +const { parse } = require('graphql') const getQueryResult = require('../../lib/gateway/get-query-result') const { test } = require('tap') @@ -61,8 +62,10 @@ test('it works with a basic example', async (t) => { } } }, + queries: [ { + document: parse(getQueryWithCount(1)), query: getQueryWithCount(1), variables: { representations: [ @@ -97,6 +100,7 @@ test('it works with a basic example and batched queries', async (t) => { }, queries: [ { + document: parse(getQueryWithCount(1)), query: getQueryWithCount(1), variables: { representations: [ @@ -108,6 +112,7 @@ test('it works with a basic example and batched queries', async (t) => { } }, { + document: parse(getQueryWithCount(2)), query: getQueryWithCount(2), variables: { representations: [ From 4419f583ff25d279a663099d4b7823956a3fefa2 Mon Sep 17 00:00:00 2001 From: Giacomo Rebonato Date: Thu, 25 Nov 2021 18:56:42 +0100 Subject: [PATCH 4/7] doc: gateway with batching - fixed messsage --- lib/gateway.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/gateway.js b/lib/gateway.js index 7acda4bc..68469d04 100644 --- a/lib/gateway.js +++ b/lib/gateway.js @@ -352,7 +352,7 @@ async function buildGateway (gatewayOpts, app) { * */ factory.add(`${service}Entity`, async (queries) => { - // context is the same for each request, but unfortunately it's not acessible from onRequest + // context is the same for each query, but unfortunately it's not acessible from onRequest // where we do factory.create(). What is a cleaner option? const context = queries[0].context const result = await getQueryResult({ From 6fba68e2582464c99946143de461432c3ca64a95 Mon Sep 17 00:00:00 2001 From: Giacomo Rebonato Date: Fri, 26 Nov 2021 12:32:16 +0100 Subject: [PATCH 5/7] feat: batched queries to services - updated docs - better testing - setting batched as default --- docs/federation.md | 54 ++++- lib/gateway.js | 5 +- lib/gateway/default-service-definition.js | 5 + .../batching-on-both-gateway-and-services.js | 216 ++++++++++++++++++ test/gateway/errors-with-batching.js | 152 ++++++++++++ 5 files changed, 423 insertions(+), 9 deletions(-) create mode 100644 lib/gateway/default-service-definition.js create mode 100644 test/gateway/batching-on-both-gateway-and-services.js create mode 100644 test/gateway/errors-with-batching.js diff --git a/docs/federation.md b/docs/federation.md index c485d960..49df9460 100644 --- a/docs/federation.md +++ b/docs/federation.md @@ -1,13 +1,17 @@ # mercurius -- [Federation metadata support](#federation-metadata-support) -- [Federation with \_\_resolveReference caching](#federation-with-__resolvereference-caching) -- [Use GraphQL server as a Gateway for federated schemas](#use-graphql-server-as-a-gateway-for-federated-schemas) - - [Periodically refresh federated schemas in Gateway mode](#periodically-refresh-federated-schemas-in-gateway-mode) - - [Programmatically refresh federated schemas in Gateway mode](#programmatically-refresh-federated-schemas-in-gateway-mode) - - [Using Gateway mode with a schema registry](#using-gateway-mode-with-a-schema-registry) - - [Flag service as mandatory in Gateway mode](#flag-service-as-mandatory-in-gateway-mode) - - [Using a custom errorHandler for handling downstream service errors in Gateway mode](#using-a-custom-errorhandler-for-handling-downstream-service-errors-in-gateway-mode) +- [mercurius](#mercurius) + - [Federation](#federation) + - [Federation metadata support](#federation-metadata-support) + - [Federation with \_\_resolveReference caching](#federation-with-__resolvereference-caching) + - [Use GraphQL server as a Gateway for federated schemas](#use-graphql-server-as-a-gateway-for-federated-schemas) + - [Periodically refresh federated schemas in Gateway mode](#periodically-refresh-federated-schemas-in-gateway-mode) + - [Programmatically refresh federated schemas in Gateway mode](#programmatically-refresh-federated-schemas-in-gateway-mode) + - [Using Gateway mode with a schema registry](#using-gateway-mode-with-a-schema-registry) + - [Flag service as mandatory in Gateway mode](#flag-service-as-mandatory-in-gateway-mode) + - [Batched Queries to services](#batched-queries-to-services) + - [Using a custom errorHandler for handling downstream service errors in Gateway mode](#using-a-custom-errorhandler-for-handling-downstream-service-errors-in-gateway-mode) + - [Securely parse service responses in Gateway mode](#securely-parse-service-responses-in-gateway-mode) ## Federation @@ -351,6 +355,40 @@ server.register(mercurius, { server.listen(3002) ``` +#### Batched Queries to services + +To fully leverage the DataLoader pattern a Gateway assumes that can send a request with batched queries to its services. +This configuration can be disabled if the service doesn't support batched queries instead. + + +```js +const Fastify = require('fastify') +const mercurius = require('mercurius') + +const server = Fastify() + +server.register(mercurius, { + graphiql: true, + gateway: { + services: [ + { + name: 'user', + url: 'http://localhost:3000/graphql' // queries will be batched into one request + // same as setting allowBatchedQueries: true + }, + { + name: 'company', + url: 'http://localhost:3001/graphql', // one request for each of the queries + allowBatchedQueries: false + } + ] + }, + pollingInterval: 2000 +}) + +server.listen(3002) +``` + #### Using a custom errorHandler for handling downstream service errors in Gateway mode Service which uses Gateway mode can process different types of issues that can be obtained from remote services (for example, Network Error, Downstream Error, etc.). A developer can provide a function (`gateway.errorHandler`) that can process these errors. diff --git a/lib/gateway.js b/lib/gateway.js index 68469d04..4e797e2e 100644 --- a/lib/gateway.js +++ b/lib/gateway.js @@ -20,6 +20,7 @@ const { MER_ERR_GQL_GATEWAY_REFRESH, MER_ERR_GQL_GATEWAY_INIT } = require('./err const findValueTypes = require('./gateway/find-value-types') const getQueryResult = require('./gateway/get-query-result') const allSettled = require('promise.allsettled') +const defaultServiceDefinition = require('./gateway/default-service-definition') function isDefaultType (type) { return [ @@ -287,7 +288,9 @@ async function buildGateway (gatewayOpts, app) { reply[kEntityResolvers] = factory.create() }) - for (const [service, serviceDefinition] of Object.entries(serviceMap)) { + for (const [service, serviceDefinitionFromGateway] of Object.entries(serviceMap)) { + const serviceDefinition = { ...defaultServiceDefinition, ...serviceDefinitionFromGateway } + for (const type of serviceDefinition.types) { allTypes.push(serviceDefinition.schema.getTypeMap()[type]) typeToServiceMap[type] = service diff --git a/lib/gateway/default-service-definition.js b/lib/gateway/default-service-definition.js new file mode 100644 index 00000000..44ee7403 --- /dev/null +++ b/lib/gateway/default-service-definition.js @@ -0,0 +1,5 @@ +const defaultServiceDefinition = { + allowBatchedQueries: true +} + +module.exports = defaultServiceDefinition diff --git a/test/gateway/batching-on-both-gateway-and-services.js b/test/gateway/batching-on-both-gateway-and-services.js new file mode 100644 index 00000000..82d26dbb --- /dev/null +++ b/test/gateway/batching-on-both-gateway-and-services.js @@ -0,0 +1,216 @@ +'use strict' + +const { test } = require('tap') +const Fastify = require('fastify') +const GQL = require('../..') + +async function createTestService (t, schema, resolvers = {}) { + const service = Fastify() + service.register(GQL, { + schema, + resolvers, + federationMetadata: true, + allowBatchedQueries: true + }) + await service.listen(0) + return [service, service.server.address().port] +} + +const users = { + u1: { + id: 'u1', + name: 'John' + }, + u2: { + id: 'u2', + name: 'Jane' + } +} + +const posts = { + p1: { + pid: 'p1', + title: 'Post 1', + content: 'Content 1', + authorId: 'u1' + }, + p2: { + pid: 'p2', + title: 'Post 2', + content: 'Content 2', + authorId: 'u2' + }, + p3: { + pid: 'p3', + title: 'Post 3', + content: 'Content 3', + authorId: 'u1' + }, + p4: { + pid: 'p4', + title: 'Post 4', + content: 'Content 4', + authorId: 'u1' + } +} + +async function createTestGatewayServer (t) { + // User service + const userServiceSchema = ` + type Query @extends { + me: User + } + + type Metadata { + info: String! + } + + type User @key(fields: "id") { + id: ID! + name: String! + quote(input: String!): String! + metadata(input: String!): Metadata! + }` + const userServiceResolvers = { + Query: { + me: (root, args, context, info) => { + return users.u1 + } + }, + User: { + quote: (user, args, context, info) => { + return args.input + }, + metadata: (user, args, context, info) => { + return { + info: args.input + } + }, + __resolveReference: (user, args, context, info) => { + return users[user.id] + } + } + } + const [userService, userServicePort] = await createTestService(t, userServiceSchema, userServiceResolvers) + + // Post service + const postServiceSchema = ` + type Post @key(fields: "pid") { + pid: ID! + } + + type User @key(fields: "id") @extends { + id: ID! @external + topPosts(count: Int!): [Post] + }` + const postServiceResolvers = { + User: { + topPosts: (user, { count }, context, info) => { + return Object.values(posts).filter(p => p.authorId === user.id).slice(0, count) + } + } + } + const [postService, postServicePort] = await createTestService(t, postServiceSchema, postServiceResolvers) + + const gateway = Fastify() + t.teardown(async () => { + await gateway.close() + await userService.close() + await postService.close() + }) + gateway.register(GQL, { + allowBatchedQueries: true, + gateway: { + services: [{ + name: 'user', + url: `http://localhost:${userServicePort}/graphql`, + allowBatchedQueries: true + }, { + name: 'post', + url: `http://localhost:${postServicePort}/graphql`, + allowBatchedQueries: true + }] + } + }) + return gateway +} + +test('gateway with batching - should support aliases', async (t) => { + t.plan(1) + const app = await createTestGatewayServer(t) + + const query = ` + query getUser { + user: me { + id + name + newName: name + otherName: name + quote(input: "quote") + firstQuote: quote(input: "foo") + secondQuote: quote(input: "bar") + metadata(input: "info") { + info + } + originalMetadata: metadata(input: "hello") { + hi: info + ho: info + } + moreMetadata: metadata(input: "hi") { + info + } + somePosts: topPosts(count: 1) { + pid + } + morePosts: topPosts(count: 2) { + pid + } + } + }` + + const res = await app.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify([ + { operationName: 'getUser', query } + ]) + }) + + t.same(JSON.parse(res.body)[0], { + data: { + user: { + id: 'u1', + name: 'John', + newName: 'John', + otherName: 'John', + quote: 'quote', + firstQuote: 'foo', + secondQuote: 'bar', + metadata: { + info: 'info' + }, + originalMetadata: { + hi: 'hello', + ho: 'hello' + }, + moreMetadata: { + info: 'hi' + }, + somePosts: [ + { + pid: 'p1' + } + ], + morePosts: [ + { + pid: 'p1' + }, + { + pid: 'p3' + } + ] + } + } + }) +}) diff --git a/test/gateway/errors-with-batching.js b/test/gateway/errors-with-batching.js new file mode 100644 index 00000000..09063985 --- /dev/null +++ b/test/gateway/errors-with-batching.js @@ -0,0 +1,152 @@ +'use strict' + +const { test } = require('tap') +const Fastify = require('fastify') +const GQL = require('../..') +const { ErrorWithProps } = require('../../') + +async function createTestService (t, schema, resolvers = {}, allowBatchedQueries = false) { + const service = Fastify() + service.register(GQL, { + schema, + resolvers, + federationMetadata: true, + allowBatchedQueries + }) + await service.listen(0) + return [service, service.server.address().port] +} + +async function createTestGatewayServer (t, allowBatchedQueries = false) { + // User service + const userServiceSchema = ` + type Query @extends { + me: User + } + + type Metadata { + info: String! + } + + type User @key(fields: "id") { + id: ID! + name: String! + quote(input: String!): String! + metadata(input: String!): Metadata! + }` + const userServiceResolvers = { + Query: { + me: (root, args, context, info) => { + throw new ErrorWithProps('Invalid User ID', { + id: 4, + code: 'USER_ID_INVALID' + }) + } + }, + User: { + quote: (user, args, context, info) => { + throw new ErrorWithProps('Invalid Quote', { + id: 4, + code: 'QUOTE_ID_INVALID' + }) + } + } + } + const [userService, userServicePort] = await createTestService(t, userServiceSchema, userServiceResolvers, allowBatchedQueries) + + // Post service + const postServiceSchema = ` + type Post @key(fields: "pid") { + pid: ID! + } + + type User @key(fields: "id") @extends { + id: ID! @external + topPosts(count: Int!): [Post] + }` + const postServiceResolvers = { + User: { + topPosts: (user, { count }, context, info) => { + throw new ErrorWithProps('Invalid Quote', { + id: 4, + code: 'NO_TOP_POSTS' + }) + } + } + } + const [postService, postServicePort] = await createTestService(t, postServiceSchema, postServiceResolvers, allowBatchedQueries) + + const gateway = Fastify() + t.teardown(async () => { + await gateway.close() + await userService.close() + await postService.close() + }) + gateway.register(GQL, { + gateway: { + services: [{ + name: 'user', + url: `http://localhost:${userServicePort}/graphql`, + allowBatchedQueries + }, { + name: 'post', + url: `http://localhost:${postServicePort}/graphql`, + allowBatchedQueries + }] + } + }) + return gateway +} + +test('it returns the same error if batching is enabled', async (t) => { + t.plan(1) + const app1 = await createTestGatewayServer(t) + const app2 = await createTestGatewayServer(t, true) + + const query = ` + query { + user: me { + id + name + newName: name + otherName: name + quote(input: "quote") + firstQuote: quote(input: "foo") + secondQuote: quote(input: "bar") + metadata(input: "info") { + info + } + originalMetadata: metadata(input: "hello") { + hi: info + ho: info + } + moreMetadata: metadata(input: "hi") { + info + } + somePosts: topPosts(count: 1) { + pid + } + morePosts: topPosts(count: 2) { + pid + } + } + }` + + const res1 = await app1.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + await app1.close() + + const res2 = await app2.inject({ + method: 'POST', + headers: { 'content-type': 'application/json' }, + url: '/graphql', + body: JSON.stringify({ query }) + }) + + t.same(JSON.parse(res1.body), JSON.parse(res2.body)) +}) From 144ae99dcea95d99e825d57911b1fab89129b186 Mon Sep 17 00:00:00 2001 From: Giacomo Rebonato Date: Fri, 26 Nov 2021 15:30:11 +0100 Subject: [PATCH 6/7] feat: batch gateway requests - improved jsdoc - set default batching to false --- lib/gateway.js | 5 +-- lib/gateway/default-service-definition.js | 5 --- lib/gateway/get-query-result.js | 53 +++++++++++++++++++---- 3 files changed, 46 insertions(+), 17 deletions(-) delete mode 100644 lib/gateway/default-service-definition.js diff --git a/lib/gateway.js b/lib/gateway.js index 4e797e2e..68469d04 100644 --- a/lib/gateway.js +++ b/lib/gateway.js @@ -20,7 +20,6 @@ const { MER_ERR_GQL_GATEWAY_REFRESH, MER_ERR_GQL_GATEWAY_INIT } = require('./err const findValueTypes = require('./gateway/find-value-types') const getQueryResult = require('./gateway/get-query-result') const allSettled = require('promise.allsettled') -const defaultServiceDefinition = require('./gateway/default-service-definition') function isDefaultType (type) { return [ @@ -288,9 +287,7 @@ async function buildGateway (gatewayOpts, app) { reply[kEntityResolvers] = factory.create() }) - for (const [service, serviceDefinitionFromGateway] of Object.entries(serviceMap)) { - const serviceDefinition = { ...defaultServiceDefinition, ...serviceDefinitionFromGateway } - + for (const [service, serviceDefinition] of Object.entries(serviceMap)) { for (const type of serviceDefinition.types) { allTypes.push(serviceDefinition.schema.getTypeMap()[type]) typeToServiceMap[type] = service diff --git a/lib/gateway/default-service-definition.js b/lib/gateway/default-service-definition.js deleted file mode 100644 index 44ee7403..00000000 --- a/lib/gateway/default-service-definition.js +++ /dev/null @@ -1,5 +0,0 @@ -const defaultServiceDefinition = { - allowBatchedQueries: true -} - -module.exports = defaultServiceDefinition diff --git a/lib/gateway/get-query-result.js b/lib/gateway/get-query-result.js index a899feb4..5e0a85c3 100644 --- a/lib/gateway/get-query-result.js +++ b/lib/gateway/get-query-result.js @@ -2,7 +2,16 @@ const { preGatewayExecutionHandler } = require('../handlers') -const mergeQueries = (queries) => { +/** + * @typedef {Object.} GroupedQueries + */ + +/** + * Group GraphQL queries by their string and map them to their variables and document. + * @param {Array} queries + * @returns {GroupedQueries} + */ +function groupQueriesByDefinition (queries) { const q = [...new Set(queries.map(q => q.query))] const resultIndexes = [] const mergedQueries = queries.reduce((acc, curr, queryIndex) => { @@ -29,7 +38,13 @@ const mergeQueries = (queries) => { return { mergedQueries, resultIndexes } } -const getBactchedResult = async ({ mergeQueriesResult, context, serviceDefinition, service }) => { +/** + * Fetches queries result from the service with batching (1 request for all the queries). + * @param {Object} params + * @param {Object} params.service The service that will receive one request with the batched queries + * @returns {Array} result + */ +async function fetchBactchedResult ({ mergeQueriesResult, context, serviceDefinition, service }) { const { mergedQueries, resultIndexes } = mergeQueriesResult const batchedQueries = [] @@ -61,7 +76,14 @@ const getBactchedResult = async ({ mergeQueriesResult, context, serviceDefinitio return buildResult({ resultIndexes, data: response.json }) } -const buildResult = ({ resultIndexes, data }) => { +/** + * + * @param {Object} params + * @param {Array} params.resultIndexes Array used to map results with queries + * @param {Array} params.data Array of data returned from GraphQL end point + * @returns {Array} result + */ +function buildResult ({ resultIndexes, data }) { const result = [] for (const [queryIndex, queryResponse] of data.entries()) { @@ -88,7 +110,14 @@ const buildResult = ({ resultIndexes, data }) => { return result } -const getResult = async ({ mergeQueriesResult, serviceDefinition, context, service }) => { +/** + * Fetches queries result from the service without batching (1 request for each query) + * @param {Object} params + * @param {GroupedQueries} params.mergeQueriesResult + * @param {Object} params.service The service that will receive requests for the queries + * @returns {Array} result + */ +async function fetchResult ({ mergeQueriesResult, serviceDefinition, context, service }) { const { mergedQueries, resultIndexes } = mergeQueriesResult const queriesEntries = Object.entries(mergedQueries) const data = await Promise.all( @@ -120,8 +149,16 @@ const getResult = async ({ mergeQueriesResult, serviceDefinition, context, servi return buildResult({ data, resultIndexes }) } -const getQueryResult = async ({ context, queries, serviceDefinition, service }) => { - const mergeQueriesResult = mergeQueries(queries) +/** + * Fetches queries results from their shared service and returns array of data. + * It batches queries into one request if allowBatchedQueries is true for the service. + * @param {Object} params + * @param {Array} params.queries The list of queries to be executed + * @param {Object} params.service The service to send requests to + * @returns {Array} The array of results + */ +async function getQueryResult ({ context, queries, serviceDefinition, service }) { + const mergeQueriesResult = groupQueriesByDefinition(queries) const params = { mergeQueriesResult, service, @@ -131,10 +168,10 @@ const getQueryResult = async ({ context, queries, serviceDefinition, service }) } if (serviceDefinition.allowBatchedQueries) { - return getBactchedResult({ ...params }) + return fetchBactchedResult({ ...params }) } - return getResult({ ...params }) + return fetchResult({ ...params }) } module.exports = getQueryResult From 5d0644d29a2eb5e9867c3a6c2791075f2479a280 Mon Sep 17 00:00:00 2001 From: Giacomo Rebonato Date: Sat, 27 Nov 2021 11:32:36 +0100 Subject: [PATCH 7/7] doc: batched queries for services - updating doc about defaults - link to batched queries doc --- docs/federation.md | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/docs/federation.md b/docs/federation.md index 49df9460..8034a78f 100644 --- a/docs/federation.md +++ b/docs/federation.md @@ -357,8 +357,9 @@ server.listen(3002) #### Batched Queries to services -To fully leverage the DataLoader pattern a Gateway assumes that can send a request with batched queries to its services. -This configuration can be disabled if the service doesn't support batched queries instead. +To fully leverage the DataLoader pattern we can tell the Gateway which of its services support [batched queries](batched-queries.md). +In this case the service will receive a request body with an array of queries to execute. +Enabling batched queries for a service that doesn't support it will generate errors. ```js @@ -373,13 +374,13 @@ server.register(mercurius, { services: [ { name: 'user', - url: 'http://localhost:3000/graphql' // queries will be batched into one request - // same as setting allowBatchedQueries: true + url: 'http://localhost:3000/graphql' + allowBatchedQueries: true }, { name: 'company', - url: 'http://localhost:3001/graphql', // one request for each of the queries - allowBatchedQueries: false + url: 'http://localhost:3001/graphql', + allowBatchedQueries: false } ] },