Skip to content

Commit

Permalink
feat: support query type, subscription-level publish
Browse files Browse the repository at this point in the history
  • Loading branch information
kettanaito committed Nov 15, 2024
1 parent cce5115 commit e24e6db
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 27 deletions.
4 changes: 1 addition & 3 deletions src/core/graphql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ function createGraphQLLink(url: Path): GraphQLLink {
return {
query: createScopedGraphQLHandler(OperationTypeNode.QUERY, url),
mutation: createScopedGraphQLHandler(OperationTypeNode.MUTATION, url),
subscription: createGraphQLSubscriptionHandler(
internalPubSub.webSocketLink,
),
subscription: createGraphQLSubscriptionHandler(internalPubSub),
pubsub: internalPubSub.pubsub,
operation: createGraphQLOperationHandler(url),
}
Expand Down
94 changes: 76 additions & 18 deletions src/core/handlers/GraphQLSubscriptionHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,46 @@ export interface GraphQLPubsub {
/**
* Publishes the given payload to all GraphQL subscriptions.
*/
publish: (payload: { data?: Record<string, unknown> }) => void
publish: (
payload: { data?: Record<string, unknown> },
predicate?: (args: {
subscription: GraphQLWebSocketSubscriptionWithId
}) => boolean,
) => void
}

type GraphQLWebSocketOutgoingMessage =
| {
type: 'connection_init'
}
| {
type: 'subscribe'
id: string
payload: GraphQLWebSocketSubscription
}
| {
type: 'complete'
id: string
}

interface GraphQLWebSocketSubscription {
query: string
variables: Record<string, unknown>
extensions: Array<any>
}

interface GraphQLWebSocketSubscriptionWithId
extends GraphQLWebSocketSubscription {
id: string
}

export class GraphQLInternalPubsub {
public pubsub: GraphQLPubsub
public webSocketLink: WebSocketLink
private subscriptions: Set<string>
private subscriptions: Map<string, GraphQLWebSocketSubscriptionWithId>

constructor(public readonly url: Path) {
this.subscriptions = new Set()
this.subscriptions = new Map()

/**
* @fixme This isn't nice.
Expand All @@ -52,7 +82,7 @@ export class GraphQLInternalPubsub {
return
}

const message = jsonParse(event.data)
const message = jsonParse<GraphQLWebSocketOutgoingMessage>(event.data)

if (!message) {
return
Expand All @@ -65,7 +95,10 @@ export class GraphQLInternalPubsub {
}

case 'subscribe': {
this.subscriptions.add(message.id)
this.subscriptions.set(message.id, {
...message.payload,
id: message.id,
})
break
}

Expand All @@ -80,14 +113,16 @@ export class GraphQLInternalPubsub {

this.pubsub = {
handler: webSocketHandler,
publish: (payload) => {
for (const subscriptionId of this.subscriptions) {
this.webSocketLink.broadcast(
this.createSubscriptionMessage({
id: subscriptionId,
payload,
}),
)
publish: (payload, predicate = () => true) => {
for (const [, subscription] of this.subscriptions) {
if (predicate({ subscription })) {
this.webSocketLink.broadcast(
this.createSubscriptionMessage({
id: subscription.id,
payload,
}),
)
}
}
},
}
Expand All @@ -110,30 +145,32 @@ export type GraphQLSubscriptionHandler = <
| GraphQLHandlerNameSelector
| DocumentNode
| TypedDocumentNode<Query, Variables>,
resolver: (info: GraphQLSubscriptionHandlerInfo<Variables>) => void,
resolver: (info: GraphQLSubscriptionHandlerInfo<Query, Variables>) => void,
) => WebSocketHandler

export interface GraphQLSubscriptionHandlerInfo<
Query extends GraphQLQuery,
Variables extends GraphQLVariables,
> {
operationName: string
query: string
variables: Variables
pubsub: GraphQLSubscriptionHandlerPubsub<Query>
}

export function createGraphQLSubscriptionHandler(
webSocketLink: WebSocketLink,
internalPubsub: GraphQLInternalPubsub,
): GraphQLSubscriptionHandler {
return (operationName, resolver) => {
const webSocketHandler = webSocketLink.addEventListener(
const webSocketHandler = internalPubsub.webSocketLink.addEventListener(
'connection',
({ client }) => {
client.addEventListener('message', async (event) => {
if (typeof event.data !== 'string') {
return
}

const message = jsonParse(event.data)
const message = jsonParse<GraphQLWebSocketOutgoingMessage>(event.data)

if (
message != null &&
Expand All @@ -148,13 +185,19 @@ export function createGraphQLSubscriptionHandler(
node.operationType === OperationTypeNode.SUBSCRIPTION &&
node.operationName === operationName
) {
const pubsub = new GraphQLSubscriptionHandlerPubsub({
internalPubsub,
subscriptionId: message.id,
})

/**
* @todo Add the path parameters from the pubsub URL.
*/
resolver({
operationName: node.operationName,
query: message.payload.query,
variables: message.payload.variables,
variables: message.payload.variables as any,
pubsub,
})
}
}
Expand All @@ -165,3 +208,18 @@ export function createGraphQLSubscriptionHandler(
return webSocketHandler
}
}

class GraphQLSubscriptionHandlerPubsub<Query extends GraphQLQuery> {
constructor(
private readonly args: {
internalPubsub: GraphQLInternalPubsub
subscriptionId: string
},
) {}

public publish(payload: { data?: Query }): void {
this.args.internalPubsub.pubsub.publish(payload, ({ subscription }) => {
return subscription.id === this.args.subscriptionId
})
}
}
46 changes: 40 additions & 6 deletions test/typings/graphql.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ it('graphql query cannot extract variable and reponse types', () => {

it('graphql mutation cannot extract variable and reponse types', () => {
const createUser = parse(`
mutation CreateUser {
user {
id
}
}
`)
mutation CreateUser {
user {
id
}
}
`)
graphql.mutation(createUser, () => {
return HttpResponse.json({
data: { arbitrary: true },
Expand All @@ -213,3 +213,37 @@ it('exposes a "subscription" method only on a GraphQL link', () => {
graphql.link('http://localhost:4000').subscription,
).toEqualTypeOf<GraphQLSubscriptionHandler>()
})

it('graphql subscroption accepts matching data publish', () => {
const api = graphql.link('http://localhost:4000/graphql')
api.subscription<{ commentAdded: { id: string; text: string } }>(
'OnCommentAdded',
({ pubsub }) => {
pubsub.publish({
data: {
commentAdded: {
id: '1',
text: 'Hello, world!',
},
},
})
},
)
})

it('graphql subscription does not allow mismatched data publish', () => {
const api = graphql.link('http://localhost:4000/graphql')
api.subscription<{ commentAdded: { id: string; text: string } }>(
'OnCommentAdded',
({ pubsub }) => {
pubsub.publish({
data: {
commentAdded: {
// @ts-expect-error number is not assignable to type string.
id: 123,
},
},
})
},
)
})

0 comments on commit e24e6db

Please sign in to comment.