diff --git a/src/core/graphql.ts b/src/core/graphql.ts index bf1661413..82f8cdab1 100644 --- a/src/core/graphql.ts +++ b/src/core/graphql.ts @@ -148,9 +148,7 @@ function createGraphQLLink(url: Path): GraphQLLink { return { query: createScopedGraphQLHandler(OperationTypeNode.QUERY, url), mutation: createScopedGraphQLHandler(OperationTypeNode.MUTATION, url), - subscription: createGraphQLSubscriptionHandler( - internalPubSub.webSocketLink, - ), + subscription: createGraphQLSubscriptionHandler(internalPubSub), pubsub: internalPubSub.pubsub, operation: createGraphQLOperationHandler(url), } diff --git a/src/core/handlers/GraphQLSubscriptionHandler.ts b/src/core/handlers/GraphQLSubscriptionHandler.ts index 61c9f6905..88098a5c6 100644 --- a/src/core/handlers/GraphQLSubscriptionHandler.ts +++ b/src/core/handlers/GraphQLSubscriptionHandler.ts @@ -20,16 +20,46 @@ export interface GraphQLPubsub { /** * Publishes the given payload to all GraphQL subscriptions. */ - publish: (payload: { data?: Record }) => void + publish: ( + payload: { data?: Record }, + predicate?: (args: { + subscription: GraphQLWebSocketSubscriptionWithId + }) => boolean, + ) => void +} + +type GraphQLWebSocketOutgoingMessage = + | { + type: 'connection_init' + } + | { + type: 'subscribe' + id: string + payload: GraphQLWebSocketSubscription + } + | { + type: 'complete' + id: string + } + +interface GraphQLWebSocketSubscription { + query: string + variables: Record + extensions: Array +} + +interface GraphQLWebSocketSubscriptionWithId + extends GraphQLWebSocketSubscription { + id: string } export class GraphQLInternalPubsub { public pubsub: GraphQLPubsub public webSocketLink: WebSocketLink - private subscriptions: Set + private subscriptions: Map constructor(public readonly url: Path) { - this.subscriptions = new Set() + this.subscriptions = new Map() /** * @fixme This isn't nice. @@ -52,7 +82,7 @@ export class GraphQLInternalPubsub { return } - const message = jsonParse(event.data) + const message = jsonParse(event.data) if (!message) { return @@ -65,7 +95,10 @@ export class GraphQLInternalPubsub { } case 'subscribe': { - this.subscriptions.add(message.id) + this.subscriptions.set(message.id, { + ...message.payload, + id: message.id, + }) break } @@ -80,14 +113,16 @@ export class GraphQLInternalPubsub { this.pubsub = { handler: webSocketHandler, - publish: (payload) => { - for (const subscriptionId of this.subscriptions) { - this.webSocketLink.broadcast( - this.createSubscriptionMessage({ - id: subscriptionId, - payload, - }), - ) + publish: (payload, predicate = () => true) => { + for (const [, subscription] of this.subscriptions) { + if (predicate({ subscription })) { + this.webSocketLink.broadcast( + this.createSubscriptionMessage({ + id: subscription.id, + payload, + }), + ) + } } }, } @@ -110,22 +145,24 @@ export type GraphQLSubscriptionHandler = < | GraphQLHandlerNameSelector | DocumentNode | TypedDocumentNode, - resolver: (info: GraphQLSubscriptionHandlerInfo) => void, + resolver: (info: GraphQLSubscriptionHandlerInfo) => void, ) => WebSocketHandler export interface GraphQLSubscriptionHandlerInfo< + Query extends GraphQLQuery, Variables extends GraphQLVariables, > { operationName: string query: string variables: Variables + pubsub: GraphQLSubscriptionHandlerPubsub } export function createGraphQLSubscriptionHandler( - webSocketLink: WebSocketLink, + internalPubsub: GraphQLInternalPubsub, ): GraphQLSubscriptionHandler { return (operationName, resolver) => { - const webSocketHandler = webSocketLink.addEventListener( + const webSocketHandler = internalPubsub.webSocketLink.addEventListener( 'connection', ({ client }) => { client.addEventListener('message', async (event) => { @@ -133,7 +170,7 @@ export function createGraphQLSubscriptionHandler( return } - const message = jsonParse(event.data) + const message = jsonParse(event.data) if ( message != null && @@ -148,13 +185,19 @@ export function createGraphQLSubscriptionHandler( node.operationType === OperationTypeNode.SUBSCRIPTION && node.operationName === operationName ) { + const pubsub = new GraphQLSubscriptionHandlerPubsub({ + internalPubsub, + subscriptionId: message.id, + }) + /** * @todo Add the path parameters from the pubsub URL. */ resolver({ operationName: node.operationName, query: message.payload.query, - variables: message.payload.variables, + variables: message.payload.variables as any, + pubsub, }) } } @@ -165,3 +208,18 @@ export function createGraphQLSubscriptionHandler( return webSocketHandler } } + +class GraphQLSubscriptionHandlerPubsub { + constructor( + private readonly args: { + internalPubsub: GraphQLInternalPubsub + subscriptionId: string + }, + ) {} + + public publish(payload: { data?: Query }): void { + this.args.internalPubsub.pubsub.publish(payload, ({ subscription }) => { + return subscription.id === this.args.subscriptionId + }) + } +} diff --git a/test/typings/graphql.test-d.ts b/test/typings/graphql.test-d.ts index b819b0f57..3676b36a3 100644 --- a/test/typings/graphql.test-d.ts +++ b/test/typings/graphql.test-d.ts @@ -191,12 +191,12 @@ it('graphql query cannot extract variable and reponse types', () => { it('graphql mutation cannot extract variable and reponse types', () => { const createUser = parse(` - mutation CreateUser { - user { - id - } - } - `) +mutation CreateUser { + user { + id + } +} + `) graphql.mutation(createUser, () => { return HttpResponse.json({ data: { arbitrary: true }, @@ -213,3 +213,37 @@ it('exposes a "subscription" method only on a GraphQL link', () => { graphql.link('http://localhost:4000').subscription, ).toEqualTypeOf() }) + +it('graphql subscroption accepts matching data publish', () => { + const api = graphql.link('http://localhost:4000/graphql') + api.subscription<{ commentAdded: { id: string; text: string } }>( + 'onCommentAdded', + ({ pubsub }) => { + pubsub.publish({ + data: { + commentAdded: { + id: '1', + text: 'Hello, world!', + }, + }, + }) + }, + ) +}) + +it('graphql subscription does not allow mismatched data publish', () => { + const api = graphql.link('http://localhost:4000/graphql') + api.subscription<{ commentAdded: { id: string; text: string } }>( + 'onCommentAdded', + ({ pubsub }) => { + pubsub.publish({ + data: { + commentAdded: { + // @ts-expect-error number is not assignable to type string. + id: 123, + }, + }, + }) + }, + ) +})