diff --git a/src/client.ts b/src/client.ts index 9aaf5dd..9b73a11 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,6 +1,6 @@ import {AxiosInstance, AxiosRequestConfig} from 'axios'; import * as gql from 'graphql'; -import {get as traverse, isEmpty} from 'lodash'; +import {get as traverse, isEmpty, unset} from 'lodash'; import pino, {Logger} from 'pino'; import VError from 'verror'; @@ -324,10 +324,35 @@ export class FarosClient { paginator = paginatedQuery, args: Map = new Map() ): AsyncIterable { - const {query, edgesPath, pageInfoPath} = paginator(rawQuery); + const {query, edgesPath, edgeIdPath, pageInfoPath} = paginator(rawQuery); // eslint-disable-next-line @typescript-eslint/no-this-alias const self = this; - if (isEmpty(pageInfoPath)) { + if (edgeIdPath?.length) { + return { + async *[Symbol.asyncIterator](): AsyncIterator { + let id = ''; + let hasNextPage = true; + while (hasNextPage) { + const data = await self.gqlNoDirectives(graph, query, { + limit: pageSize, + id, + ...Object.fromEntries(args.entries()), + }); + const edges = traverse(data, edgesPath) || []; + for (const edge of edges) { + yield edge; + id = traverse(edge, edgeIdPath); + unset(edge, edgeIdPath); + if (!id) { + break; + } + } + // break on partial page + hasNextPage = edges.length === pageSize; + } + }, + }; + } else if (isEmpty(pageInfoPath)) { // use offset and limit return { async *[Symbol.asyncIterator](): AsyncIterator { diff --git a/src/graphql/graphql.ts b/src/graphql/graphql.ts index 19de9c1..5cc2bcc 100644 --- a/src/graphql/graphql.ts +++ b/src/graphql/graphql.ts @@ -19,6 +19,8 @@ export type RecordIterable = AsyncOrSyncIterable; export interface PaginatedQuery { readonly query: string; readonly edgesPath: ReadonlyArray; + // Relative to edge path + readonly edgeIdPath?: ReadonlyArray; readonly pageInfoPath: ReadonlyArray; } @@ -281,8 +283,14 @@ export function paginatedQuery(query: string): PaginatedQuery { } export function paginatedQueryV2(query: string): PaginatedQuery { - return process.env.GRAPHQL_V2_PAGINATOR === 'relay' ? - paginatedWithRelayV2(query) : paginateWithOffsetLimitV2(query); + switch (process.env.GRAPHQL_V2_PAGINATOR) { + case 'relay': + return paginatedWithRelayV2(query); + case 'keyset': + return paginateWithKeysetV2(query); + default: + return paginateWithOffsetLimitV2(query); + } } /** @@ -427,6 +435,156 @@ function createOperationDefinition( }; } +function mergeWhereClauses(clauses: any[]): any { + // extract individual predicates (e.g. {uid: {_eq: true}}) + const fields = _.flatMap(clauses, (c) => _.get(c, 'value')); + // place within _and clause + return { + kind: 'Argument', + name: { + kind: 'Name', + value: 'where', + }, + value: { + kind: 'ObjectValue', + fields: [ + { + kind: 'ObjectField', + name: { + kind: 'Name', + value: '_and', + }, + value: { + kind: 'ListValue', + values: fields, + }, + }, + ], + }, + }; +} + +/** + * Paginate v2 queries with where clause and order by on id + * https://hasura.io/docs/latest/queries/postgres/pagination/#keyset-cursor-based-pagination + */ +export function paginateWithKeysetV2(query: string): PaginatedQuery { + const edgesPath: string[] = []; + const ast = gql.visit(gql.parse(query), { + Document(node) { + if (node.definitions.length !== 1) { + throw invalidQuery( + 'document should contain a single query operation definition' + ); + } + }, + OperationDefinition(node) { + if (node.operation !== 'query') { + throw invalidQuery('only query operations are supported'); + } + + // Add pagination variables to query operation + return createOperationDefinition(node, [ + ['id', 'String'], + ['limit', 'Int'], + ]); + }, + Field: { + enter(node) { + if (edgesPath.length) { + // Skip rest of nodes once edges path has been set + return false; + } + edgesPath.push(node.name.value); + return { + ...node, + arguments: [ + mergeWhereClauses([ + ...(node.arguments?.filter((n) => n.name.value === 'where') ?? + []), + { + kind: 'Argument', + name: {kind: 'Name', value: 'where'}, + value: { + kind: 'ObjectValue', + fields: [ + { + kind: 'ObjectField', + name: { + kind: 'Name', + value: 'id', + }, + value: { + kind: 'ObjectValue', + fields: [ + { + kind: 'ObjectField', + name: { + kind: 'Name', + value: '_gt', + }, + value: { + kind: 'Variable', + name: { + kind: 'Name', + value: 'id', + }, + }, + }, + ], + }, + }, + ], + }, + }, + ]), + { + kind: 'Argument', + name: {kind: 'Name', value: 'order_by'}, + value: { + kind: 'ObjectValue', + fields: [ + { + kind: 'ObjectField', + name: {kind: 'Name', value: 'id'}, + value: {kind: 'EnumValue', value: 'asc'}, + }, + ], + }, + }, + { + kind: 'Argument', + name: {kind: 'Name', value: 'limit'}, + value: { + kind: 'Variable', + name: {kind: 'Name', value: 'limit'}, + }, + }, + ], + selectionSet: { + kind: 'SelectionSet', + selections: [ + { + kind: 'Field', + alias: {kind: 'Name', value: '_id'}, + name: {kind: 'Name', value: 'id'}, + }, + ...(node.selectionSet?.selections ?? []), + ], + }, + }; + }, + }, + }); + + return { + query: gql.print(ast), + edgesPath, + edgeIdPath: ['_id'], + pageInfoPath: [], + }; +} + /** * Paginate v2 queries with limit and offsets. */ diff --git a/test/graphql.test.ts b/test/graphql.test.ts index 60e3d0a..c68f581 100644 --- a/test/graphql.test.ts +++ b/test/graphql.test.ts @@ -196,6 +196,17 @@ describe('graphql', () => { expect(paginatedQuery.pageInfoPath).toBeEmpty(); }); + test('paginated keyset v2 query', async () => { + const query = await loadQueryFile('commits-v2.gql'); + const paginatedQuery = sut.paginateWithKeysetV2(query); + const expectedQuery = + await loadQueryFile('paginated-commits-keyset-v2.gql'); + expect(paginatedQuery.query).toEqual(expectedQuery); + expect(paginatedQuery.edgesPath).toEqual(['vcs_Commit']); + expect(paginatedQuery.edgeIdPath).toEqual(['_id']); + expect(paginatedQuery.pageInfoPath).toBeEmpty(); + }); + test('build incremental V2', () => { const type = graphSchemaV2.getType('cicd_Build'); const query1 = sut.buildIncrementalQueryV2(type as gql.GraphQLObjectType); diff --git a/test/resources/queries/paginated-commits-keyset-v2.gql b/test/resources/queries/paginated-commits-keyset-v2.gql new file mode 100644 index 0000000..15ae635 --- /dev/null +++ b/test/resources/queries/paginated-commits-keyset-v2.gql @@ -0,0 +1,10 @@ +query paginatedQuery($id: String, $limit: Int, $from: timestamptz, $to: timestamptz) { + vcs_Commit(where: {_and: [{refreshedAt: {_gte: $from, _lt: $to}}, {id: {_gt: $id}}]}, order_by: {id: asc} +, limit: $limit) { + _id: id, + id + author { + ownerId: id + } + } +}