From 1835c1731dc69f2e0b86ef8fed881eaeb165e30d Mon Sep 17 00:00:00 2001 From: Marek Polak Date: Wed, 25 Sep 2024 15:47:54 +0200 Subject: [PATCH] feat(utils): implement `getMutex` --- packages/utils/src/getMutex.ts | 39 ++++++ packages/utils/src/getSynchronize.ts | 25 +--- packages/utils/src/index.ts | 1 + packages/utils/tests/getMutex.test.ts | 168 ++++++++++++++++++++++++++ 4 files changed, 214 insertions(+), 19 deletions(-) create mode 100644 packages/utils/src/getMutex.ts create mode 100644 packages/utils/tests/getMutex.test.ts diff --git a/packages/utils/src/getMutex.ts b/packages/utils/src/getMutex.ts new file mode 100644 index 00000000000..fbbc7931a5c --- /dev/null +++ b/packages/utils/src/getMutex.ts @@ -0,0 +1,39 @@ +/** + * Ensures that since the awaited lock is obtained until unlock return from it + * is called, no other part of code can enter the same lock. + * + * Optionally, it also takes `lockId` param, in which case only actions with + * the same lock id are blocking each other. + * + * Example: + * + * ``` + * const lock = getMutex(); + * + * lock().then(unlock => writeToSocket('foo').finally(unlock)); + * lock().then(unlock => writeToSocket('bar').finally(unlock)); + * lock('differentLockId').then(unlock => writeToAnotherSocket('baz').finally(unlock)); + * + * const unlock = await lock(); + * await readFromSocket(); + * unlock(); + * ``` + */ +export const getMutex = () => { + const DEFAULT_ID = Symbol(); + const locks: Record> = {}; + + return async (lockId: keyof any = DEFAULT_ID) => { + while (locks[lockId]) { + await locks[lockId]; + } + let resolve = () => {}; + locks[lockId] = new Promise(res => { + resolve = res; + }).finally(() => { + delete locks[lockId]; + }); + + return resolve; + }; +}; diff --git a/packages/utils/src/getSynchronize.ts b/packages/utils/src/getSynchronize.ts index bf7cc3910a9..bdbc3f5d7d8 100644 --- a/packages/utils/src/getSynchronize.ts +++ b/packages/utils/src/getSynchronize.ts @@ -1,3 +1,5 @@ +import { getMutex } from './getMutex'; + /** * Ensures that all async actions passed to the returned function are called * immediately one after another, without interfering with each other. @@ -13,26 +15,11 @@ * synchronize(() => asyncAction3(), 'differentLockId'); * ``` */ -export const getSynchronize = () => { - const DEFAULT_ID = Symbol(); - const locks: Record> = {}; - - return ( - action: () => T, - lockId: keyof any = DEFAULT_ID, - ): T extends Promise ? T : Promise => { - const newLock = (locks[lockId] ?? Promise.resolve()) - .catch(() => {}) - .then(action) - .finally(() => { - if (locks[lockId] === newLock) { - delete locks[lockId]; - } - }); - locks[lockId] = newLock; +export const getSynchronize = (mutex?: ReturnType) => { + const lock = mutex ?? getMutex(); - return newLock as any; - }; + return (action: () => T, lockId?: keyof any): T extends Promise ? T : Promise => + lock(lockId).then(unlock => Promise.resolve().then(action).finally(unlock)) as any; }; export type Synchronize = ReturnType; diff --git a/packages/utils/src/index.ts b/packages/utils/src/index.ts index 6b1fe8ebc14..d1b93bc1c1a 100644 --- a/packages/utils/src/index.ts +++ b/packages/utils/src/index.ts @@ -17,6 +17,7 @@ export * from './createDeferredManager'; export * from './createLazy'; export * from './createTimeoutPromise'; export * from './getLocaleSeparators'; +export * from './getMutex'; export * from './getNumberFromPixelString'; export * from './getRandomNumberInRange'; export * from './getSynchronize'; diff --git a/packages/utils/tests/getMutex.test.ts b/packages/utils/tests/getMutex.test.ts new file mode 100644 index 00000000000..e5888874824 --- /dev/null +++ b/packages/utils/tests/getMutex.test.ts @@ -0,0 +1,168 @@ +import { getMutex } from '../src/getMutex'; +import { mockTime, unmockTime } from './utils/mockTime'; + +const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); + +const fail = (reason: string) => { + throw new Error(reason); +}; + +describe('getMutex', () => { + let state: any; + let lock: ReturnType; + + const sequence = async (...seq: [any, number][]) => { + for (const [str, ms] of seq) { + state = str; + + await delay(ms); + expect(state).toBe(str); + } + }; + + const sync = (value: any) => (state = value); + + beforeEach(() => { + state = 'init'; + lock = getMutex(); + + mockTime(); + }); + + afterEach(() => { + unmockTime(); + }); + + it('basic', async () => { + await Promise.all([ + lock().then(unlock => sequence(['init', 3], [42, 9], [null, 3]).finally(unlock)), + lock().then(unlock => + sequence(['init', 5], ['boo', 3], [{ foo: 'bar' }, 6]).finally(unlock), + ), + lock().then(unlock => + sequence([undefined, 8], [[1, 2, 3], 4], [NaN, 2]).finally(unlock), + ), + ]); + }); + + it('sync X async', async () => { + await Promise.all([ + expect( + lock().then(unlock => { + try { + return sync('foo'); + } finally { + unlock(); + } + }), + ).resolves.toBe('foo'), + lock().then(unlock => sequence([0, 3], ['a', 5]).finally(unlock)), + expect( + lock().then(unlock => { + try { + return sync([null, null]); + } finally { + unlock(); + } + }), + ).resolves.toStrictEqual([null, null]), + ]); + }); + + it('with errors', async () => { + await Promise.all([ + lock().then(unlock => sequence(['a', 9]).finally(unlock)), + expect( + lock().then(unlock => + delay(5) + .then(() => fail('err')) + .finally(unlock), + ), + ).rejects.toThrow('err'), + lock().then(unlock => sequence(['b', 11]).finally(unlock)), + expect( + lock().then(unlock => { + try { + fail('err'); + } finally { + unlock(); + } + }), + ).rejects.toThrow('err'), + lock().then(unlock => sequence(['c', 7]).finally(unlock)), + ]); + }); + + it('nested', done => { + lock().then(unlock => + sequence(['a', 3]) + .then(() => { + // 'c' registers after 'a' ended and while 'b' is running + delay(2).then(() => + lock() + .then(unlock2 => sequence(['c', 3]).finally(unlock2)) + .then(done), + ); + }) + .finally(unlock), + ); + lock().then(unlock => sequence(['b', 8]).finally(unlock)); + }); + + it('with keys', async () => { + let state1: any, state2: any; + + await Promise.all([ + lock('lock1').then(async unlock => { + state1 = 'a'; + await delay(3); + expect(state1).toBe('a'); + + state1 = 'b'; + await delay(9); + expect(state1).toBe('b'); + + state1 = 'c'; + await delay(3); + expect(state1).toBe('c'); + + unlock(); + }), + lock('lock2').then(async unlock => { + expect(state1).toBe('a'); + + state2 = 'g'; + await delay(8); + expect(state2).toBe('g'); + expect(state1).toBe('b'); + + state2 = 'h'; + await delay(11); + expect(state2).toBe('h'); + expect(state1).toBe('d'); + + state2 = 'i'; + await delay(12); + expect(state2).toBe('i'); + expect(state1).toBe('f'); + + unlock(); + }), + lock('lock1').then(async unlock => { + state1 = 'd'; + await delay(8); + expect(state1).toBe('d'); + + state1 = 'e'; + await delay(4); + expect(state1).toBe('e'); + + state1 = 'f'; + await delay(2); + expect(state1).toBe('f'); + + unlock(); + }), + ]); + }); +});