Skip to content

Commit 74fc31f

Browse files
committed
dequeue multiple tasks when needed
1 parent fdcc707 commit 74fc31f

File tree

4 files changed

+78
-15
lines changed

4 files changed

+78
-15
lines changed

.vscode/launch.json

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,24 @@
11
{
22
"version": "0.2.0",
33
"configurations": [
4+
{
5+
"type": "node",
6+
"request": "launch",
7+
"name": "Launch mocha",
8+
"skipFiles": [
9+
"<node_internals>/**"
10+
],
11+
"program": "${workspaceFolder}/node_modules/mocha/lib/cli/cli.js"
12+
},
413
{
514
"type": "node",
615
"request": "launch",
716
"name": "Launch test",
17+
"runtimeExecutable": "ts-node",
818
"skipFiles": [
919
"<node_internals>/**"
1020
],
11-
"program": "${workspaceFolder}/test.js"
21+
"program": "${workspaceFolder}/test/stress.ts"
1222
}
1323
]
1424
}

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
### [2.1.1]
99
- Raise an exception when trying to add a task with an already existing id (instead of failing when trying to run it)
10+
- Ability to dequeue multiple tasks when scheduling (fixes not reaching the maximum concurrency limit when the cool down is very low but more than zero)
1011

1112
## [2.1.0] 2023-01-25
1213
- Builtin ultra-light heap implementation from [`heap-js`](https://github.com/ignlg/heap-js) by @ignlg

src/index.ts

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -72,22 +72,23 @@ export class Queue<T = unknown> {
7272
*/
7373
tryRun(): void {
7474
debug('tryRun');
75-
this.nextTimer = null;
76-
if (!this.queueWaiting.peek() || this.queueRunning.size >= this.maxConcurrent) return;
77-
78-
/* Wait if it is too soon */
79-
if (Date.now() - this.lastRun < this.minCycle) {
80-
debug('will throttle', Date.now() % 1000, (this.minCycle + this.lastRun) % 1000, Date.now() - this.lastRun);
81-
if (this.nextTimer === null) {
82-
this.nextTimer = new Promise((resolve) => setTimeout(() => {
83-
this.tryRun();
84-
resolve();
85-
}, this.minCycle - Date.now() + this.lastRun));
75+
while (this.queueWaiting.peek() && this.queueRunning.size < this.maxConcurrent) {
76+
/* Wait if it is too soon */
77+
if (Date.now() - this.lastRun < this.minCycle) {
78+
debug(`will throttle, now=${Date.now() % 1000}, next=${(this.minCycle + this.lastRun) % 1000}, elapsed=${Date.now() - this.lastRun}`);
79+
if (this.nextTimer === null) {
80+
this.nextTimer = new Promise((resolve) => setTimeout(() => {
81+
this.nextTimer = null;
82+
this.tryRun();
83+
resolve();
84+
}, this.minCycle - Date.now() + this.lastRun));
85+
}
86+
return;
8687
}
87-
} else {
88+
8889
/* Choose the next task to run and unblock its promise */
8990
const next = this.queueWaiting.pop();
90-
debug('wont throttle', this.lastRun % 1000, Date.now() % 1000, 'next is ', next?.hash);
91+
debug(`wont throttle, last=${this.lastRun % 1000}, now=${Date.now() % 1000}, next is ${next?.hash}`);
9192
if (next !== undefined) {
9293
let finishSignal;
9394
const finishWait = new Promise<void>((resolve) => {
@@ -153,7 +154,7 @@ export class Queue<T = unknown> {
153154
await wait;
154155

155156
this.lastRun = Date.now();
156-
debug(hash, 'will run', this.lastRun % 1000, Date.now() % 1000);
157+
debug(`will run ${hash} last=${this.lastRun % 1000}, now=${Date.now() % 1000}`);
157158
}
158159

159160
/**
@@ -191,6 +192,8 @@ export class Queue<T = unknown> {
191192

192193
/**
193194
* Returns a promise that resolves when the queue is empty
195+
* (or there are no more than <maxWaiting> waiting tasks
196+
* if the argument is provided)
194197
*
195198
* @method flush
196199
* @return {Promise<void>}

test/stress.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { Queue } from '../src/index';
2+
3+
const CONCURRENCY = 1000;
4+
const DELAY = 100;
5+
const CYCLE = 1;
6+
const TOTAL = 1e5;
7+
const ECHO = 1e3;
8+
9+
const q = new Queue<number>(CONCURRENCY, CYCLE);
10+
let counter = 0;
11+
let last = -1;
12+
let concurrency = 0;
13+
14+
async function main() {
15+
while (counter < TOTAL) {
16+
const me = counter++;
17+
if (me % ECHO == 0) {
18+
console.log('- scheduled', me);
19+
}
20+
if (q.stat().waiting > ECHO * 2) {
21+
console.log('- throttling');
22+
await q.flush();
23+
}
24+
q.wait(me, 0).then(() => {
25+
concurrency++;
26+
if (concurrency > CONCURRENCY) {
27+
throw new Error('overflow ' + concurrency);
28+
}
29+
if (me % ECHO == 0) {
30+
console.log('- run', me, concurrency);
31+
}
32+
setTimeout(() => {
33+
if (me % ECHO == 0) {
34+
console.log('- finished', me, concurrency);
35+
}
36+
if (last >= me) {
37+
throw new Error('desync');
38+
}
39+
last = me;
40+
q.end(me);
41+
concurrency--;
42+
}, DELAY);
43+
});
44+
}
45+
await q.flush();
46+
console.log('last', last);
47+
}
48+
49+
main();

0 commit comments

Comments
 (0)