Skip to content

Commit d397736

Browse files
authored
Merge pull request #34 from jsr-core/performance-fix
feat: performance fix
2 parents 227e7e5 + 2ce4940 commit d397736

20 files changed

+469
-110
lines changed

_raw_semaphore.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* @internal
3+
*/
4+
export class RawSemaphore {
5+
#resolves: (() => void)[] = [];
6+
#value: number;
7+
#size: number;
8+
9+
/**
10+
* Creates a new semaphore with the specified limit.
11+
*
12+
* @param size The maximum number of times the semaphore can be acquired before blocking.
13+
* @throws {RangeError} if the size is not a positive safe integer.
14+
*/
15+
constructor(size: number) {
16+
if (size <= 0 || !Number.isSafeInteger(size)) {
17+
throw new RangeError(
18+
`size must be a positive safe integer, got ${size}`,
19+
);
20+
}
21+
this.#value = size;
22+
this.#size = size;
23+
}
24+
25+
/**
26+
* Returns true if the semaphore is currently locked.
27+
*/
28+
get locked(): boolean {
29+
return this.#value === 0;
30+
}
31+
32+
/**
33+
* Acquires the semaphore, blocking until the semaphore is available.
34+
*/
35+
acquire(): Promise<void> {
36+
if (this.#value > 0) {
37+
this.#value -= 1;
38+
return Promise.resolve();
39+
} else {
40+
const { promise, resolve } = Promise.withResolvers<void>();
41+
this.#resolves.push(resolve);
42+
return promise;
43+
}
44+
}
45+
46+
/**
47+
* Releases the semaphore, allowing the next waiting operation to proceed.
48+
*/
49+
release(): void {
50+
const resolve = this.#resolves.shift();
51+
if (resolve) {
52+
resolve();
53+
} else if (this.#value < this.#size) {
54+
this.#value += 1;
55+
}
56+
}
57+
}

barrier.ts

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import { Notify } from "./notify.ts";
2-
31
/**
42
* A synchronization primitive that allows multiple tasks to wait until all of
53
* them have reached a certain point of execution before continuing.
@@ -26,8 +24,8 @@ import { Notify } from "./notify.ts";
2624
* ```
2725
*/
2826
export class Barrier {
29-
#notify = new Notify();
30-
#rest: number;
27+
#waiter: PromiseWithResolvers<void> = Promise.withResolvers();
28+
#value: number;
3129

3230
/**
3331
* Creates a new `Barrier` that blocks until `size` threads have called `wait`.
@@ -41,23 +39,24 @@ export class Barrier {
4139
`size must be a positive safe integer, got ${size}`,
4240
);
4341
}
44-
this.#rest = size;
42+
this.#value = size;
4543
}
4644

4745
/**
4846
* Wait for all threads to reach the barrier.
4947
* Blocks until all threads reach the barrier.
5048
*/
51-
async wait({ signal }: { signal?: AbortSignal } = {}): Promise<void> {
52-
signal?.throwIfAborted();
53-
this.#rest -= 1;
54-
if (this.#rest === 0) {
55-
await Promise.all([
56-
this.#notify.notified({ signal }),
57-
this.#notify.notifyAll(),
58-
]);
59-
} else {
60-
await this.#notify.notified({ signal });
49+
wait({ signal }: { signal?: AbortSignal } = {}): Promise<void> {
50+
if (signal?.aborted) {
51+
return Promise.reject(signal.reason);
52+
}
53+
const { promise, resolve, reject } = this.#waiter;
54+
const abort = () => reject(signal!.reason);
55+
signal?.addEventListener("abort", abort, { once: true });
56+
this.#value -= 1;
57+
if (this.#value === 0) {
58+
resolve();
6159
}
60+
return promise.finally(() => signal?.removeEventListener("abort", abort));
6261
}
6362
}

barrier_bench.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import { Barrier as Barrier100 } from "jsr:@core/asyncutil@~1.0.0/barrier";
2+
import { Barrier } from "./barrier.ts";
3+
4+
const length = 1_000;
5+
6+
Deno.bench({
7+
name: "current",
8+
fn: async () => {
9+
const barrier = new Barrier(length);
10+
await Promise.all(Array.from({ length }).map(() => barrier.wait()));
11+
},
12+
group: "Barrier#wait",
13+
baseline: true,
14+
});
15+
16+
Deno.bench({
17+
name: "v1.0.0",
18+
fn: async () => {
19+
const barrier = new Barrier100(length);
20+
await Promise.all(Array.from({ length }).map(() => barrier.wait()));
21+
},
22+
group: "Barrier#wait",
23+
});

deno.jsonc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
],
2727
"exclude": [
2828
"**/*_test.ts",
29+
"**/*_bench.ts",
2930
".*"
3031
]
3132
},

deno.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lock.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Mutex } from "./mutex.ts";
1+
import { RawSemaphore } from "./_raw_semaphore.ts";
22

33
/**
44
* A mutual exclusion lock that provides safe concurrent access to a shared value.
@@ -16,7 +16,7 @@ import { Mutex } from "./mutex.ts";
1616
* ```
1717
*/
1818
export class Lock<T> {
19-
#mu = new Mutex();
19+
#sem = new RawSemaphore(1);
2020
#value: T;
2121

2222
/**
@@ -32,7 +32,7 @@ export class Lock<T> {
3232
* Returns true if the lock is currently locked, false otherwise.
3333
*/
3434
get locked(): boolean {
35-
return this.#mu.locked;
35+
return this.#sem.locked;
3636
}
3737

3838
/**
@@ -43,7 +43,11 @@ export class Lock<T> {
4343
* @returns A Promise that resolves with the result of the function.
4444
*/
4545
async lock<R>(fn: (value: T) => R | PromiseLike<R>): Promise<R> {
46-
using _lock = await this.#mu.acquire();
47-
return await fn(this.#value);
46+
await this.#sem.acquire();
47+
try {
48+
return await fn(this.#value);
49+
} finally {
50+
this.#sem.release();
51+
}
4852
}
4953
}

lock_bench.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import { Lock as Lock100 } from "jsr:@core/asyncutil@~1.0.0/lock";
2+
import { Lock } from "./lock.ts";
3+
4+
const length = 1_000;
5+
6+
Deno.bench({
7+
name: "current",
8+
fn: async () => {
9+
const lock = new Lock(0);
10+
await Promise.all(Array.from({ length }).map(() => lock.lock(() => {})));
11+
},
12+
group: "Lock#lock",
13+
baseline: true,
14+
});
15+
16+
Deno.bench({
17+
name: "v1.0.0",
18+
fn: async () => {
19+
const lock = new Lock100(0);
20+
await Promise.all(Array.from({ length }).map(() => lock.lock(() => {})));
21+
},
22+
group: "Lock#lock",
23+
});

mutex.ts

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { RawSemaphore } from "./_raw_semaphore.ts";
2+
13
/**
24
* A mutex (mutual exclusion) is a synchronization primitive that grants
35
* exclusive access to a shared resource.
@@ -26,33 +28,25 @@
2628
* ```
2729
*/
2830
export class Mutex {
29-
#waiters: Set<Promise<void>> = new Set();
31+
#sem: RawSemaphore = new RawSemaphore(1);
3032

3133
/**
3234
* Returns true if the mutex is locked, false otherwise.
3335
*/
3436
get locked(): boolean {
35-
return this.#waiters.size > 0;
37+
return this.#sem.locked;
3638
}
3739

3840
/**
3941
* Acquire the mutex and return a promise with disposable that releases the mutex when disposed.
4042
*
4143
* @returns A Promise with Disposable that releases the mutex when disposed.
4244
*/
43-
acquire(): Promise<Disposable> & Disposable {
44-
const waiters = [...this.#waiters];
45-
const { promise, resolve } = Promise.withResolvers<void>();
46-
this.#waiters.add(promise);
47-
const disposable = {
45+
acquire(): Promise<Disposable> {
46+
return this.#sem.acquire().then(() => ({
4847
[Symbol.dispose]: () => {
49-
resolve();
50-
this.#waiters.delete(promise);
48+
this.#sem.release();
5149
},
52-
};
53-
return Object.assign(
54-
Promise.all(waiters).then(() => disposable),
55-
disposable,
56-
);
50+
}));
5751
}
5852
}

mutex_bench.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { Mutex as Mutex100 } from "jsr:@core/asyncutil@~1.0.0/mutex";
2+
import { Mutex } from "./mutex.ts";
3+
4+
const length = 1_000;
5+
6+
Deno.bench({
7+
name: "current",
8+
fn: async () => {
9+
const mutex = new Mutex();
10+
await Promise.all(
11+
Array.from({ length }).map(async () => {
12+
const lock = await mutex.acquire();
13+
lock[Symbol.dispose]();
14+
}),
15+
);
16+
},
17+
group: "Mutex#wait",
18+
baseline: true,
19+
});
20+
21+
Deno.bench({
22+
name: "v1.0.0",
23+
fn: async () => {
24+
const mutex = new Mutex100();
25+
await Promise.all(
26+
Array.from({ length }).map(async () => {
27+
const lock = await mutex.acquire();
28+
lock[Symbol.dispose]();
29+
}),
30+
);
31+
},
32+
group: "Mutex#wait",
33+
});

notify.ts

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
import { iter } from "@core/iterutil/iter";
2-
import { take } from "@core/iterutil/take";
3-
41
/**
52
* Async notifier that allows one or more "waiters" to wait for a notification.
63
*
@@ -23,13 +20,13 @@ import { take } from "@core/iterutil/take";
2320
* ```
2421
*/
2522
export class Notify {
26-
#waiters: Set<PromiseWithResolvers<void>> = new Set();
23+
#waiters: PromiseWithResolvers<void>[] = [];
2724

2825
/**
2926
* Returns the number of waiters that are waiting for notification.
3027
*/
3128
get waiterCount(): number {
32-
return this.#waiters.size;
29+
return this.#waiters.length;
3330
}
3431

3532
/**
@@ -43,40 +40,37 @@ export class Notify {
4340
if (n <= 0 || !Number.isSafeInteger(n)) {
4441
throw new RangeError(`n must be a positive safe integer, got ${n}`);
4542
}
46-
const it = iter(this.#waiters);
47-
for (const waiter of take(it, n)) {
48-
waiter.resolve();
49-
}
50-
this.#waiters = new Set(it);
43+
this.#waiters.splice(0, n).forEach(({ resolve }) => resolve());
5144
}
5245

5346
/**
5447
* Notifies all waiters that are waiting for notification. Resolves each of the notified waiters.
5548
*/
5649
notifyAll(): void {
57-
for (const waiter of this.#waiters) {
58-
waiter.resolve();
59-
}
60-
this.#waiters = new Set();
50+
this.#waiters.forEach(({ resolve }) => resolve());
51+
this.#waiters = [];
6152
}
6253

6354
/**
6455
* Asynchronously waits for notification. The caller's execution is suspended until
6556
* the `notify` method is called. The method returns a Promise that resolves when the caller is notified.
6657
* Optionally takes an AbortSignal to abort the waiting if the signal is aborted.
6758
*/
68-
async notified({ signal }: { signal?: AbortSignal } = {}): Promise<void> {
59+
notified({ signal }: { signal?: AbortSignal } = {}): Promise<void> {
6960
if (signal?.aborted) {
70-
throw signal.reason;
61+
return Promise.reject(signal.reason);
7162
}
72-
const waiter = Promise.withResolvers<void>();
7363
const abort = () => {
74-
this.#waiters.delete(waiter);
75-
waiter.reject(signal!.reason);
64+
const waiter = this.#waiters.shift();
65+
if (waiter) {
66+
waiter.reject(signal!.reason);
67+
}
7668
};
7769
signal?.addEventListener("abort", abort, { once: true });
78-
this.#waiters.add(waiter);
79-
await waiter.promise;
80-
signal?.removeEventListener("abort", abort);
70+
const waiter = Promise.withResolvers<void>();
71+
this.#waiters.push(waiter);
72+
return waiter.promise.finally(() => {
73+
signal?.removeEventListener("abort", abort);
74+
});
8175
}
8276
}

0 commit comments

Comments
 (0)