Skip to content

Commit fc37884

Browse files
authored
refactor: extract Queue from IncrementalPublisher and IncrementalGraph (#4498)
A Queue is a batching async-generator meant to be consumed in the following pattern: ```ts for await (const item of queue.subscribe(mapBatch)) { .... // use item } ``` where `mapBatch` is a function that takes a batch of type `Generator<T>` and returns a single item of type `U | undefined` where `undefined` means no item is emitted. Items are produced as managed by an executor function passed to the Queue constructor, a la repeaters, see https://repeater.js.org/) A `push()` method is provided as an argument to the executor so that `push()` can be private to the code that constructs the Queue (although it can be saved to be passed to other code). A `stop()` method is also provided as a the second argument to the executor for convenience, but it is also available on the queue object itself so that it can be called within the executor or by the consumer to end early. So this works: ```ts const queue = new Queue( async (push, stop) => { push(1); push(2) await Promise.resolve(); push(3); stop(); }, ); const sub = queue.subscribe(batch => Array.from(batch)); const batch1 = await sub.next(); // batch1 = [1, 2] const batch2 = await sub.next(); // batch2 = [3] ``` as does this: ```ts let push; const queue = new Queue( (_push) => { push = _push; }, ); push(1); push(2); const sub = queue.subscribe(batch => Array.from(batch)); const batch1 = await sub.next(); // batch1 = [1, 2] const batch2Promise = sub.next(); await Promise.resolve(); push(3); queue.stop(); // using the stop() method on queue avoid the need to set stop = _stop const batch2 = await batch2Promise; // batch2 = [3] ``` Note: concurrent calls to `subscribe` will reference the same queue and are not encouraged. Using queues, we are able to remove all logic for handling the implicit queue from the IncrementalGraph and IncrementalPublisher, retaining only the `_handleCompletedBatch()`, while adding only the required `push()` and `stop()` calls within the IncrementalGraph. Tests do not change, except that we have some extra ticks from the use of our new generators (including the use of `withCleanup()` to wrap, and so we have to adjust a few tick-sensitive tests.
1 parent a401bf6 commit fc37884

File tree

5 files changed

+355
-100
lines changed

5 files changed

+355
-100
lines changed

src/execution/IncrementalGraph.ts

Lines changed: 32 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js';
22
import { invariant } from '../jsutils/invariant.js';
33
import { isPromise } from '../jsutils/isPromise.js';
4-
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
54

65
import type { GraphQLError } from '../error/GraphQLError.js';
76

7+
import { Queue } from './Queue.js';
88
import type {
99
DeferredFragmentRecord,
1010
DeliveryGroup,
@@ -22,16 +22,18 @@ import { isDeferredFragmentRecord, isPendingExecutionGroup } from './types.js';
2222
*/
2323
export class IncrementalGraph {
2424
private _rootNodes: Set<DeliveryGroup>;
25-
26-
private _completedQueue: Array<IncrementalDataRecordResult>;
27-
private _nextQueue: Array<
28-
(iterable: Iterable<IncrementalDataRecordResult> | undefined) => void
29-
>;
25+
private _completed: Queue<IncrementalDataRecordResult>;
26+
// _push and _stop are assigned in the executor which is executed
27+
// synchronously by the Queue constructor.
28+
private _push!: (item: IncrementalDataRecordResult) => void;
29+
private _stop!: () => void;
3030

3131
constructor() {
3232
this._rootNodes = new Set();
33-
this._completedQueue = [];
34-
this._nextQueue = [];
33+
this._completed = new Queue<IncrementalDataRecordResult>((push, stop) => {
34+
this._push = push;
35+
this._stop = stop;
36+
});
3537
}
3638

3739
getNewRootNodes(
@@ -92,29 +94,6 @@ export class IncrementalGraph {
9294
}
9395
}
9496

95-
*currentCompletedBatch(): Generator<IncrementalDataRecordResult> {
96-
let completed;
97-
while ((completed = this._completedQueue.shift()) !== undefined) {
98-
yield completed;
99-
}
100-
}
101-
102-
nextCompletedBatch(): Promise<
103-
Iterable<IncrementalDataRecordResult> | undefined
104-
> {
105-
const { promise, resolve } = promiseWithResolvers<
106-
Iterable<IncrementalDataRecordResult> | undefined
107-
>();
108-
this._nextQueue.push(resolve);
109-
return promise;
110-
}
111-
112-
abort(): void {
113-
for (const resolve of this._nextQueue) {
114-
resolve(undefined);
115-
}
116-
}
117-
11897
hasNext(): boolean {
11998
return this._rootNodes.size > 0;
12099
}
@@ -146,17 +125,30 @@ export class IncrementalGraph {
146125
const newRootNodes = this._promoteNonEmptyToRoot(
147126
deferredFragmentRecord.children,
148127
);
128+
this._maybeStop();
149129
return { newRootNodes, successfulExecutionGroups };
150130
}
151131

152132
removeDeferredFragment(
153133
deferredFragmentRecord: DeferredFragmentRecord,
154134
): boolean {
155-
return this._rootNodes.delete(deferredFragmentRecord);
135+
const deleted = this._rootNodes.delete(deferredFragmentRecord);
136+
if (!deleted) {
137+
return false;
138+
}
139+
this._maybeStop();
140+
return true;
156141
}
157142

158143
removeStream(streamRecord: StreamRecord): void {
159144
this._rootNodes.delete(streamRecord);
145+
this._maybeStop();
146+
}
147+
148+
subscribe<U>(
149+
mapFn: (generator: Generator<IncrementalDataRecordResult>) => U | undefined,
150+
): AsyncGenerator<U, void, void> {
151+
return this._completed.subscribe(mapFn);
160152
}
161153

162154
private _addIncrementalDataRecords(
@@ -246,9 +238,9 @@ export class IncrementalGraph {
246238
const value = completedExecutionGroup.value;
247239
if (isPromise(value)) {
248240
// eslint-disable-next-line @typescript-eslint/no-floating-promises
249-
value.then((resolved) => this._enqueue(resolved));
241+
value.then((resolved) => this._push(resolved));
250242
} else {
251-
this._enqueue(value);
243+
this._push(value);
252244
}
253245
}
254246

@@ -266,7 +258,7 @@ export class IncrementalGraph {
266258
: streamItemRecord().value;
267259
if (isPromise(result)) {
268260
if (items.length > 0) {
269-
this._enqueue({
261+
this._push({
270262
streamRecord,
271263
result:
272264
// TODO add additional test case or rework for coverage
@@ -290,14 +282,14 @@ export class IncrementalGraph {
290282
}
291283
if (result.item === undefined) {
292284
if (items.length > 0) {
293-
this._enqueue({
285+
this._push({
294286
streamRecord,
295287
result: errors.length > 0 ? { items, errors } : { items },
296288
newDeferredFragmentRecords,
297289
incrementalDataRecords,
298290
});
299291
}
300-
this._enqueue(
292+
this._push(
301293
result.errors === undefined
302294
? { streamRecord }
303295
: {
@@ -320,12 +312,9 @@ export class IncrementalGraph {
320312
}
321313
}
322314

323-
private _enqueue(completed: IncrementalDataRecordResult): void {
324-
this._completedQueue.push(completed);
325-
const next = this._nextQueue.shift();
326-
if (next === undefined) {
327-
return;
315+
private _maybeStop(): void {
316+
if (!this.hasNext()) {
317+
this._stop();
328318
}
329-
next(this.currentCompletedBatch());
330319
}
331320
}

src/execution/IncrementalPublisher.ts

Lines changed: 38 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,11 @@ interface SubsequentIncrementalExecutionResultContext {
6464
* @internal
6565
*/
6666
class IncrementalPublisher {
67-
private _isDone: boolean;
6867
private _context: IncrementalPublisherContext;
6968
private _nextId: number;
7069
private _incrementalGraph: IncrementalGraph;
7170

7271
constructor(context: IncrementalPublisherContext) {
73-
this._isDone = false;
7472
this._context = context;
7573
this._nextId = 0;
7674
this._incrementalGraph = new IncrementalGraph();
@@ -95,14 +93,17 @@ class IncrementalPublisher {
9593
? { errors, data, pending, hasNext: true }
9694
: { data, pending, hasNext: true };
9795

98-
const subsequentResults = withCleanup(this._subscribe(), async () => {
99-
this._isDone = true;
100-
this._context.abortSignalListener?.disconnect();
101-
this._incrementalGraph.abort();
102-
await this._returnAsyncIteratorsIgnoringErrors();
103-
});
96+
const subsequentResults = this._incrementalGraph.subscribe((batch) =>
97+
this._handleCompletedBatch(batch),
98+
);
10499

105-
return { initialResult, subsequentResults };
100+
return {
101+
initialResult,
102+
subsequentResults: withCleanup(subsequentResults, async () => {
103+
this._context.abortSignalListener?.disconnect();
104+
await this._returnAsyncIteratorsIgnoringErrors();
105+
}),
106+
};
106107
}
107108

108109
private _toPendingResults(
@@ -128,55 +129,39 @@ class IncrementalPublisher {
128129
return String(this._nextId++);
129130
}
130131

131-
private async *_subscribe(): AsyncGenerator<
132-
SubsequentIncrementalExecutionResult,
133-
void,
134-
void
135-
> {
136-
while (!this._isDone) {
137-
const context: SubsequentIncrementalExecutionResultContext = {
138-
pending: [],
139-
incremental: [],
140-
completed: [],
141-
};
142-
143-
let batch: Iterable<IncrementalDataRecordResult> | undefined =
144-
this._incrementalGraph.currentCompletedBatch();
145-
do {
146-
for (const completedResult of batch) {
147-
this._handleCompletedIncrementalData(completedResult, context);
148-
}
149-
150-
const { incremental, completed } = context;
151-
if (incremental.length > 0 || completed.length > 0) {
152-
const hasNext = this._incrementalGraph.hasNext();
153-
154-
if (!hasNext) {
155-
this._isDone = true;
156-
}
132+
private _handleCompletedBatch(
133+
batch: Iterable<IncrementalDataRecordResult>,
134+
): SubsequentIncrementalExecutionResult | undefined {
135+
const context: SubsequentIncrementalExecutionResultContext = {
136+
pending: [],
137+
incremental: [],
138+
completed: [],
139+
};
157140

158-
const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult =
159-
{ hasNext };
141+
for (const completedResult of batch) {
142+
this._handleCompletedIncrementalData(completedResult, context);
143+
}
160144

161-
const pending = context.pending;
162-
if (pending.length > 0) {
163-
subsequentIncrementalExecutionResult.pending = pending;
164-
}
165-
if (incremental.length > 0) {
166-
subsequentIncrementalExecutionResult.incremental = incremental;
167-
}
168-
if (completed.length > 0) {
169-
subsequentIncrementalExecutionResult.completed = completed;
170-
}
145+
const { incremental, completed } = context;
146+
if (incremental.length === 0 && completed.length === 0) {
147+
return;
148+
}
171149

172-
yield subsequentIncrementalExecutionResult;
173-
break;
174-
}
150+
const hasNext = this._incrementalGraph.hasNext();
175151

176-
// eslint-disable-next-line no-await-in-loop
177-
batch = await this._incrementalGraph.nextCompletedBatch();
178-
} while (batch !== undefined);
152+
const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult =
153+
{ hasNext };
154+
const pending = context.pending;
155+
if (pending.length > 0) {
156+
subsequentIncrementalExecutionResult.pending = pending;
157+
}
158+
if (incremental.length > 0) {
159+
subsequentIncrementalExecutionResult.incremental = incremental;
160+
}
161+
if (completed.length > 0) {
162+
subsequentIncrementalExecutionResult.completed = completed;
179163
}
164+
return subsequentIncrementalExecutionResult;
180165
}
181166

182167
private _handleCompletedIncrementalData(

src/execution/Queue.ts

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import { isPromise } from '../jsutils/isPromise.js';
2+
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
3+
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
4+
5+
import { withCleanup } from './withCleanup.js';
6+
7+
/**
8+
* @internal
9+
*/
10+
export class Queue<T> {
11+
private _items: Array<T>;
12+
private _stopped: boolean;
13+
private _resolvers: Array<(iterable: Generator<T> | undefined) => void>;
14+
15+
constructor(
16+
executor: (
17+
push: (item: T) => void,
18+
stop: () => void,
19+
) => PromiseOrValue<void>,
20+
) {
21+
this._items = [];
22+
this._stopped = false;
23+
this._resolvers = [];
24+
let result;
25+
try {
26+
result = executor(this._push.bind(this), this.stop.bind(this));
27+
} catch {
28+
// ignore sync executor errors
29+
}
30+
if (isPromise(result)) {
31+
result.catch(() => {
32+
/* ignore async executor errors */
33+
});
34+
}
35+
}
36+
37+
stop(): void {
38+
this._stopped = true;
39+
this._resolve(undefined);
40+
}
41+
42+
subscribe<U>(
43+
mapFn: (generator: Generator<T>) => U | undefined,
44+
): AsyncGenerator<U, void, void> {
45+
return withCleanup(this.subscribeImpl(mapFn), () => this.stop());
46+
}
47+
48+
private async *subscribeImpl<U>(
49+
mapFn: (generator: Generator<T, void, void>) => U | undefined,
50+
): AsyncGenerator<U> {
51+
while (true) {
52+
if (this._stopped) {
53+
return;
54+
}
55+
56+
let mapped;
57+
// drain any items pushed prior to or between .next() calls
58+
while (
59+
this._items.length > 0 &&
60+
(mapped = mapFn(this.batch())) !== undefined
61+
) {
62+
yield mapped;
63+
if (this._stopped) {
64+
return;
65+
}
66+
}
67+
68+
// wait for a yield-able batch
69+
do {
70+
// eslint-disable-next-line no-await-in-loop
71+
const nextBatch = await this._nextBatch();
72+
if (nextBatch === undefined || this._stopped) {
73+
return;
74+
}
75+
mapped = mapFn(nextBatch);
76+
} while (mapped === undefined);
77+
78+
yield mapped;
79+
}
80+
}
81+
82+
private _nextBatch(): Promise<Generator<T> | undefined> {
83+
const { promise, resolve } = promiseWithResolvers<
84+
Generator<T> | undefined
85+
>();
86+
this._resolvers.push(resolve);
87+
return promise;
88+
}
89+
90+
private _push(item: T): void {
91+
this._items.push(item);
92+
this._resolve(this.batch());
93+
}
94+
95+
private _resolve(maybeIterable: Generator<T> | undefined): void {
96+
for (const resolve of this._resolvers) {
97+
resolve(maybeIterable);
98+
}
99+
this._resolvers = [];
100+
}
101+
102+
private *batch(): Generator<T> {
103+
let item: T | undefined;
104+
while ((item = this._items.shift()) !== undefined) {
105+
yield item;
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)