-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.ts
135 lines (117 loc) · 3.78 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/**
* task function
*
* example:
* ``` typescript
* () => import("fs").then((fs) => fs.readFileSync("/path/to/a/file"));
* ```
*/
export type Task<T = any> = () => T | PromiseLike<T>;
export interface IASAP {
/**
* concurrency level
*
* `< 1` to pause the instance, `>= 1` otherwise
*/
c: number;
/**
* enqueue a new task
* @param fn task to run
* @param priority task priority, smaller value means higher priority
* @returns a Promise resolves when the task gets executed
*
* example:
* ``` typescript
* for (let x of listOfUrls) {
* asap.q(() => fetch(x).then((res) => res.blob())); // Promise<Blob>
* }
* ```
*/
q<T>(fn: Task<T> | PromiseLike<Task<T>>, priority?: number): Promise<T>;
}
function ASAP(this: any, c: boolean | number = 1): any {
/**
* check if this function is used as a constructor
*/
if (!(this instanceof ASAP)) {
return new (ASAP as any)(c);
}
/**
* concurrency level
*/
let concurrency: number;
/**
* Set of functions which returns promises with priority
*/
const heap = new Set<[() => Promise<any>, number]>();
/**
* Set of pending/running promise methods
*/
const pending = new Set<() => Promise<any>>();
/**
* process the queue
*/
const process = (): void => {
const { size } = pending;
if (size < concurrency) {
Array.from(heap).sort(
// sort the heap from highest to lowest priority
([, a], [, b]) => a - b,
).slice(
0,
concurrency - size, // slice the array to the size of left concurrency value
).forEach((heapItem) => {
const [v] = heapItem;
// delete
heap.delete(heapItem);
// mark the promise function as pending
pending.add(v);
v().then(
() => {
// delete the promise function from pending list
pending.delete(v);
// process the task list as this task has just finished
process();
},
);
});
}
};
Object.defineProperties(this, {
c: {
get: () => concurrency,
set: (value: number) => {
// set the new concurrency level
concurrency = Math.max(Math.floor(value), 0);
// process the heap as concurrency level changed
process();
},
},
q: {
value: <T>(fn: Task<T> | PromiseLike<Task<T>>, priority?: number) => new Promise<T>((resolve, reject) => {
const promFn = () => {
// create a new promise in case when the `fn` throws anything
const prom = Promise.resolve(fn).then((v) => v());
// react on `fn` resolution and set the promise as completed
return prom.then(resolve, reject);
};
// push the promise function and priority to the task list
heap.add([promFn, priority || 0]);
// process the task list
process();
}),
},
});
// assign passed concurrency to the instance
(this as any).c = c;
}
export default (ASAP as any) as {
/**
* @param concurrency `false` or `< 1` if instance should be paused, `>= 1` for instance with given concurrency
*/
new(concurrency?: boolean | number): IASAP;
/**
* @param concurrency `false` or `< 1` if instance should be paused, `>= 1` for instance with given concurrency
*/
(concurrency?: boolean | number): IASAP;
};