Skip to content

Commit f625c32

Browse files
authored
polish(Queue): further simplify algorithm (#4501)
=> further simplifies Queue algorithm at the expense of (potentially) a few extra micro-tasks. The extra micro-tasks actually may lead to a few extra items being batched depending on the exact nature of the executor, potentially an added side-benefit. Within our GraphQL code, there are no test changes, as the Queue executor just saves the push/stop methods and passes them to the IncrementalGraph.
1 parent fc37884 commit f625c32

File tree

2 files changed

+64
-46
lines changed

2 files changed

+64
-46
lines changed

src/execution/Queue.ts

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -48,38 +48,23 @@ export class Queue<T> {
4848
private async *subscribeImpl<U>(
4949
mapFn: (generator: Generator<T, void, void>) => U | undefined,
5050
): AsyncGenerator<U> {
51-
while (true) {
52-
if (this._stopped) {
53-
return;
54-
}
55-
56-
let mapped;
57-
// drain any items pushed prior to or between .next() calls
58-
while (
59-
this._items.length > 0 &&
60-
(mapped = mapFn(this.batch())) !== undefined
61-
) {
51+
let nextBatch: Generator<T> | undefined;
52+
// eslint-disable-next-line no-await-in-loop
53+
while ((nextBatch = await this._nextBatch()) !== undefined) {
54+
const mapped = mapFn(nextBatch);
55+
if (mapped !== undefined) {
6256
yield mapped;
63-
if (this._stopped) {
64-
return;
65-
}
6657
}
67-
68-
// wait for a yield-able batch
69-
do {
70-
// eslint-disable-next-line no-await-in-loop
71-
const nextBatch = await this._nextBatch();
72-
if (nextBatch === undefined || this._stopped) {
73-
return;
74-
}
75-
mapped = mapFn(nextBatch);
76-
} while (mapped === undefined);
77-
78-
yield mapped;
7958
}
8059
}
8160

8261
private _nextBatch(): Promise<Generator<T> | undefined> {
62+
if (this._stopped) {
63+
return Promise.resolve(undefined);
64+
}
65+
if (this._items.length) {
66+
return Promise.resolve(this.batch());
67+
}
8368
const { promise, resolve } = promiseWithResolvers<
8469
Generator<T> | undefined
8570
>();

src/execution/__tests__/Queue-test.ts

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,8 @@ describe('Queue', () => {
3131
expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] });
3232
});
3333

34-
it('should yield multiple batches', async () => {
34+
it('should yield sync and async pushed items in order', async () => {
3535
const queue = new Queue<number>(async (push) => {
36-
await resolveOnNextTick();
3736
push(1);
3837
push(2);
3938
push(3);
@@ -44,8 +43,51 @@ describe('Queue', () => {
4443
});
4544

4645
const sub = queue.subscribe((batch) => Array.from(batch));
47-
expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] });
48-
expect(await sub.next()).to.deep.equal({ done: false, value: [4, 5, 6] });
46+
expect(await sub.next()).to.deep.equal({
47+
done: false,
48+
value: [1, 2, 3, 4, 5, 6],
49+
});
50+
});
51+
52+
it('should yield sync and async pushed items in order, separated by macro-task boundary', async () => {
53+
const queue = new Queue<number>(async (push) => {
54+
push(1);
55+
push(2);
56+
push(3);
57+
// awaiting macro-task delay
58+
await new Promise((r) => setTimeout(r));
59+
push(4);
60+
push(5);
61+
push(6);
62+
});
63+
64+
const sub = queue.subscribe((batch) => Array.from(batch));
65+
expect(await sub.next()).to.deep.equal({
66+
done: false,
67+
value: [1, 2, 3],
68+
});
69+
expect(await sub.next()).to.deep.equal({
70+
done: false,
71+
value: [4, 5, 6],
72+
});
73+
});
74+
75+
it('should yield multiple async batches', async () => {
76+
const queue = new Queue<number>(async (push) => {
77+
for (let i = 1; i <= 28; i += 3) {
78+
// eslint-disable-next-line no-await-in-loop
79+
await resolveOnNextTick();
80+
push(i);
81+
push(i + 1);
82+
push(i + 2);
83+
}
84+
});
85+
86+
const sub = queue.subscribe((batch) => Array.from(batch)[0]);
87+
expect(await sub.next()).to.deep.equal({ done: false, value: 1 });
88+
expect(await sub.next()).to.deep.equal({ done: false, value: 4 });
89+
expect(await sub.next()).to.deep.equal({ done: false, value: 16 });
90+
expect(await sub.next()).to.deep.equal({ done: false, value: 28 });
4991
});
5092

5193
it('should allow the executor to indicate completion', async () => {
@@ -117,18 +159,11 @@ describe('Queue', () => {
117159

118160
it('should skip payloads when mapped to undefined, skipping first async payload', async () => {
119161
const queue = new Queue<number>(async (push) => {
120-
await resolveOnNextTick();
121-
push(1);
122-
await resolveOnNextTick();
123-
push(2);
124-
await resolveOnNextTick();
125-
push(3);
126-
await resolveOnNextTick();
127-
push(4);
128-
await resolveOnNextTick();
129-
push(5);
130-
await resolveOnNextTick();
131-
push(6);
162+
for (let i = 1; i <= 14; i += 1) {
163+
// eslint-disable-next-line no-await-in-loop
164+
await resolveOnNextTick();
165+
push(i);
166+
}
132167
});
133168

134169
const sub = queue.subscribe((batch) => {
@@ -138,10 +173,8 @@ describe('Queue', () => {
138173
}
139174
});
140175
expect(await sub.next()).to.deep.equal({ done: false, value: [2] });
141-
// [3, 4, 5] are batched as we await 2:
142-
// - one tick for the [AsyncGeneratorResumeNext] job
143-
// - one tick for the await within the withCleanUp next()
144-
expect(await sub.next()).to.deep.equal({ done: false, value: [6] });
176+
expect(await sub.next()).to.deep.equal({ done: false, value: [8] });
177+
expect(await sub.next()).to.deep.equal({ done: false, value: [14] });
145178
});
146179

147180
it('should condense pushes during map into the same batch', async () => {

0 commit comments

Comments
 (0)