Skip to content

Commit 8aa1567

Browse files
authored
polish(IncrementalPublisher): simplify early return handling (#4503)
use just a map of stream records to early returns instead of: (1) a new subtype of stream record, (2) a new property on the new subtype, and (3) a set of stream records with that property.
1 parent d4807a6 commit 8aa1567

File tree

3 files changed

+45
-73
lines changed

3 files changed

+45
-73
lines changed

src/execution/IncrementalPublisher.ts

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { invariant } from '../jsutils/invariant.js';
21
import type { ObjMap } from '../jsutils/ObjMap.js';
32
import { pathToArray } from '../jsutils/Path.js';
43

@@ -7,7 +6,6 @@ import type { GraphQLError } from '../error/GraphQLError.js';
76
import type { AbortSignalListener } from './AbortSignalListener.js';
87
import { IncrementalGraph } from './IncrementalGraph.js';
98
import type {
10-
CancellableStreamRecord,
119
CompletedExecutionGroup,
1210
CompletedResult,
1311
DeferredFragmentRecord,
@@ -21,23 +19,25 @@ import type {
2119
InitialIncrementalExecutionResult,
2220
PendingResult,
2321
StreamItemsResult,
22+
StreamRecord,
2423
SubsequentIncrementalExecutionResult,
2524
} from './types.js';
26-
import {
27-
isCancellableStreamRecord,
28-
isCompletedExecutionGroup,
29-
isFailedExecutionGroup,
30-
} from './types.js';
25+
import { isCompletedExecutionGroup, isFailedExecutionGroup } from './types.js';
3126
import { withCleanup } from './withCleanup.js';
3227

28+
// eslint-disable-next-line max-params
3329
export function buildIncrementalResponse(
34-
context: IncrementalPublisherContext,
3530
result: ObjMap<unknown>,
3631
errors: ReadonlyArray<GraphQLError>,
3732
newDeferredFragmentRecords: ReadonlyArray<DeferredFragmentRecord> | undefined,
3833
incrementalDataRecords: ReadonlyArray<IncrementalDataRecord>,
34+
earlyReturns: Map<StreamRecord, () => Promise<unknown>>,
35+
abortSignalListener: AbortSignalListener | undefined,
3936
): ExperimentalIncrementalExecutionResults {
40-
const incrementalPublisher = new IncrementalPublisher(context);
37+
const incrementalPublisher = new IncrementalPublisher(
38+
earlyReturns,
39+
abortSignalListener,
40+
);
4141
return incrementalPublisher.buildResponse(
4242
result,
4343
errors,
@@ -46,11 +46,6 @@ export function buildIncrementalResponse(
4646
);
4747
}
4848

49-
interface IncrementalPublisherContext {
50-
abortSignalListener: AbortSignalListener | undefined;
51-
cancellableStreams: Set<CancellableStreamRecord> | undefined;
52-
}
53-
5449
interface SubsequentIncrementalExecutionResultContext {
5550
pending: Array<PendingResult>;
5651
incremental: Array<IncrementalResult>;
@@ -64,12 +59,17 @@ interface SubsequentIncrementalExecutionResultContext {
6459
* @internal
6560
*/
6661
class IncrementalPublisher {
67-
private _context: IncrementalPublisherContext;
62+
private _earlyReturns: Map<StreamRecord, () => Promise<unknown>>;
63+
private _abortSignalListener: AbortSignalListener | undefined;
6864
private _nextId: number;
6965
private _incrementalGraph: IncrementalGraph;
7066

71-
constructor(context: IncrementalPublisherContext) {
72-
this._context = context;
67+
constructor(
68+
earlyReturns: Map<StreamRecord, () => Promise<unknown>>,
69+
abortSignalListener: AbortSignalListener | undefined,
70+
) {
71+
this._earlyReturns = earlyReturns;
72+
this._abortSignalListener = abortSignalListener;
7373
this._nextId = 0;
7474
this._incrementalGraph = new IncrementalGraph();
7575
}
@@ -100,7 +100,7 @@ class IncrementalPublisher {
100100
return {
101101
initialResult,
102102
subsequentResults: withCleanup(subsequentResults, async () => {
103-
this._context.abortSignalListener?.disconnect();
103+
this._abortSignalListener?.disconnect();
104104
await this._returnAsyncIteratorsIgnoringErrors();
105105
}),
106106
};
@@ -241,21 +241,18 @@ class IncrementalPublisher {
241241
errors: streamItemsResult.errors,
242242
});
243243
this._incrementalGraph.removeStream(streamRecord);
244-
if (isCancellableStreamRecord(streamRecord)) {
245-
invariant(this._context.cancellableStreams !== undefined);
246-
this._context.cancellableStreams.delete(streamRecord);
247-
streamRecord.earlyReturn().catch(() => {
244+
const earlyReturn = this._earlyReturns.get(streamRecord);
245+
if (earlyReturn !== undefined) {
246+
earlyReturn().catch(() => {
248247
/* c8 ignore next 1 */
249248
// ignore error
250249
});
250+
this._earlyReturns.delete(streamRecord);
251251
}
252252
} else if (streamItemsResult.result === undefined) {
253253
context.completed.push({ id });
254254
this._incrementalGraph.removeStream(streamRecord);
255-
if (isCancellableStreamRecord(streamRecord)) {
256-
invariant(this._context.cancellableStreams !== undefined);
257-
this._context.cancellableStreams.delete(streamRecord);
258-
}
255+
this._earlyReturns.delete(streamRecord);
259256
} else {
260257
const incrementalEntry: IncrementalStreamResult = {
261258
id,
@@ -310,14 +307,10 @@ class IncrementalPublisher {
310307
}
311308

312309
private async _returnAsyncIterators(): Promise<void> {
313-
const cancellableStreams = this._context.cancellableStreams;
314-
if (cancellableStreams === undefined) {
315-
return;
316-
}
317310
const promises: Array<Promise<unknown>> = [];
318-
for (const streamRecord of cancellableStreams) {
319-
if (streamRecord.earlyReturn !== undefined) {
320-
promises.push(streamRecord.earlyReturn());
311+
for (const earlyReturn of this._earlyReturns.values()) {
312+
if (earlyReturn !== undefined) {
313+
promises.push(earlyReturn());
321314
}
322315
}
323316
await Promise.all(promises);

src/execution/execute.ts

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ import { getVariableSignature } from './getVariableSignature.js';
7272
import { buildIncrementalResponse } from './IncrementalPublisher.js';
7373
import { mapAsyncIterable } from './mapAsyncIterable.js';
7474
import type {
75-
CancellableStreamRecord,
7675
CompletedExecutionGroup,
7776
ExecutionResult,
7877
ExperimentalIncrementalExecutionResults,
@@ -172,7 +171,7 @@ export interface ExecutionContext {
172171
errors: Array<GraphQLError>;
173172
abortSignalListener: AbortSignalListener | undefined;
174173
completed: boolean;
175-
cancellableStreams: Set<CancellableStreamRecord> | undefined;
174+
earlyReturns: Map<StreamRecord, () => Promise<unknown>> | undefined;
176175
errorPropagation: boolean;
177176
}
178177

@@ -344,7 +343,7 @@ export function experimentalExecuteQueryOrMutationOrSubscriptionEvent(
344343
? new AbortSignalListener(abortSignal)
345344
: undefined,
346345
completed: false,
347-
cancellableStreams: undefined,
346+
earlyReturns: undefined,
348347
errorPropagation: errorPropagation(validatedExecutionArgs.operation),
349348
};
350349
try {
@@ -428,11 +427,12 @@ function buildDataResponse(
428427
}
429428

430429
return buildIncrementalResponse(
431-
exeContext,
432430
data,
433431
errors,
434432
newDeferredFragmentRecords,
435433
incrementalDataRecords,
434+
(exeContext.earlyReturns ??= new Map()),
435+
exeContext.abortSignalListener,
436436
);
437437
}
438438

@@ -1351,10 +1351,6 @@ async function completeAsyncIteratorValue(
13511351
fieldDetailsList,
13521352
path,
13531353
);
1354-
const earlyReturn =
1355-
asyncIterator.return === undefined
1356-
? undefined
1357-
: asyncIterator.return.bind(asyncIterator);
13581354
try {
13591355
while (true) {
13601356
if (streamUsage && index >= streamUsage.initialCount) {
@@ -1368,22 +1364,17 @@ async function completeAsyncIteratorValue(
13681364
itemType,
13691365
);
13701366

1371-
let streamRecord: StreamRecord | CancellableStreamRecord;
1372-
if (earlyReturn === undefined) {
1373-
streamRecord = {
1374-
label: streamUsage.label,
1375-
path,
1376-
streamItemQueue,
1377-
};
1378-
} else {
1379-
streamRecord = {
1380-
label: streamUsage.label,
1381-
path,
1382-
earlyReturn,
1383-
streamItemQueue,
1384-
};
1385-
exeContext.cancellableStreams ??= new Set();
1386-
exeContext.cancellableStreams.add(streamRecord);
1367+
const streamRecord: StreamRecord = {
1368+
label: streamUsage.label,
1369+
path,
1370+
streamItemQueue,
1371+
};
1372+
if (asyncIterator.return !== undefined) {
1373+
exeContext.earlyReturns ??= new Map();
1374+
exeContext.earlyReturns.set(
1375+
streamRecord,
1376+
asyncIterator.return.bind(asyncIterator),
1377+
);
13871378
}
13881379

13891380
addIncrementalDataRecords(graphqlWrappedResult, [streamRecord]);
@@ -1450,12 +1441,10 @@ async function completeAsyncIteratorValue(
14501441
index++;
14511442
}
14521443
} catch (error) {
1453-
if (earlyReturn !== undefined) {
1454-
earlyReturn().catch(() => {
1455-
/* c8 ignore next 1 */
1456-
// ignore error
1457-
});
1458-
}
1444+
asyncIterator.return?.().catch(() => {
1445+
/* c8 ignore next 1 */
1446+
// ignore error
1447+
});
14591448
throw error;
14601449
}
14611450

src/execution/types.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -273,16 +273,6 @@ export interface StreamItemsResult {
273273
incrementalDataRecords?: ReadonlyArray<IncrementalDataRecord> | undefined;
274274
}
275275

276-
export interface CancellableStreamRecord extends StreamRecord {
277-
earlyReturn: () => Promise<unknown>;
278-
}
279-
280-
export function isCancellableStreamRecord(
281-
deliveryGroup: DeliveryGroup,
282-
): deliveryGroup is CancellableStreamRecord {
283-
return 'earlyReturn' in deliveryGroup;
284-
}
285-
286276
export type IncrementalDataRecord = PendingExecutionGroup | StreamRecord;
287277

288278
export type IncrementalDataRecordResult =

0 commit comments

Comments
 (0)