Skip to content

Commit

Permalink
feat(utils): implement getMutex
Browse files Browse the repository at this point in the history
  • Loading branch information
marekrjpolak authored and mroz22 committed Oct 2, 2024
1 parent ba3580a commit 1835c17
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 19 deletions.
39 changes: 39 additions & 0 deletions packages/utils/src/getMutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Ensures that since the awaited lock is obtained until unlock return from it
* is called, no other part of code can enter the same lock.
*
* Optionally, it also takes `lockId` param, in which case only actions with
* the same lock id are blocking each other.
*
* Example:
*
* ```
* const lock = getMutex();
*
* lock().then(unlock => writeToSocket('foo').finally(unlock));
* lock().then(unlock => writeToSocket('bar').finally(unlock));
* lock('differentLockId').then(unlock => writeToAnotherSocket('baz').finally(unlock));
*
* const unlock = await lock();
* await readFromSocket();
* unlock();
* ```
*/
export const getMutex = () => {
const DEFAULT_ID = Symbol();
const locks: Record<keyof any, Promise<void>> = {};

return async (lockId: keyof any = DEFAULT_ID) => {
while (locks[lockId]) {
await locks[lockId];
}
let resolve = () => {};
locks[lockId] = new Promise<void>(res => {
resolve = res;
}).finally(() => {
delete locks[lockId];
});

return resolve;
};
};
25 changes: 6 additions & 19 deletions packages/utils/src/getSynchronize.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { getMutex } from './getMutex';

/**
* Ensures that all async actions passed to the returned function are called
* immediately one after another, without interfering with each other.
Expand All @@ -13,26 +15,11 @@
* synchronize(() => asyncAction3(), 'differentLockId');
* ```
*/
export const getSynchronize = () => {
const DEFAULT_ID = Symbol();
const locks: Record<keyof any, Promise<unknown>> = {};

return <T>(
action: () => T,
lockId: keyof any = DEFAULT_ID,
): T extends Promise<unknown> ? T : Promise<T> => {
const newLock = (locks[lockId] ?? Promise.resolve())
.catch(() => {})
.then(action)
.finally(() => {
if (locks[lockId] === newLock) {
delete locks[lockId];
}
});
locks[lockId] = newLock;
export const getSynchronize = (mutex?: ReturnType<typeof getMutex>) => {
const lock = mutex ?? getMutex();

return newLock as any;
};
return <T>(action: () => T, lockId?: keyof any): T extends Promise<unknown> ? T : Promise<T> =>
lock(lockId).then(unlock => Promise.resolve().then(action).finally(unlock)) as any;
};

export type Synchronize = ReturnType<typeof getSynchronize>;
1 change: 1 addition & 0 deletions packages/utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export * from './createDeferredManager';
export * from './createLazy';
export * from './createTimeoutPromise';
export * from './getLocaleSeparators';
export * from './getMutex';
export * from './getNumberFromPixelString';
export * from './getRandomNumberInRange';
export * from './getSynchronize';
Expand Down
168 changes: 168 additions & 0 deletions packages/utils/tests/getMutex.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import { getMutex } from '../src/getMutex';
import { mockTime, unmockTime } from './utils/mockTime';

const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));

const fail = (reason: string) => {
throw new Error(reason);
};

describe('getMutex', () => {
let state: any;
let lock: ReturnType<typeof getMutex>;

const sequence = async (...seq: [any, number][]) => {
for (const [str, ms] of seq) {
state = str;

await delay(ms);
expect(state).toBe(str);
}
};

const sync = (value: any) => (state = value);

beforeEach(() => {
state = 'init';
lock = getMutex();

mockTime();
});

afterEach(() => {
unmockTime();
});

it('basic', async () => {
await Promise.all([
lock().then(unlock => sequence(['init', 3], [42, 9], [null, 3]).finally(unlock)),
lock().then(unlock =>
sequence(['init', 5], ['boo', 3], [{ foo: 'bar' }, 6]).finally(unlock),
),
lock().then(unlock =>
sequence([undefined, 8], [[1, 2, 3], 4], [NaN, 2]).finally(unlock),
),
]);
});

it('sync X async', async () => {
await Promise.all([
expect(
lock().then(unlock => {
try {
return sync('foo');
} finally {
unlock();
}
}),
).resolves.toBe('foo'),
lock().then(unlock => sequence([0, 3], ['a', 5]).finally(unlock)),
expect(
lock().then(unlock => {
try {
return sync([null, null]);
} finally {
unlock();
}
}),
).resolves.toStrictEqual([null, null]),
]);
});

it('with errors', async () => {
await Promise.all([
lock().then(unlock => sequence(['a', 9]).finally(unlock)),
expect(
lock().then(unlock =>
delay(5)
.then(() => fail('err'))
.finally(unlock),
),
).rejects.toThrow('err'),
lock().then(unlock => sequence(['b', 11]).finally(unlock)),
expect(
lock().then(unlock => {
try {
fail('err');
} finally {
unlock();
}
}),
).rejects.toThrow('err'),
lock().then(unlock => sequence(['c', 7]).finally(unlock)),
]);
});

it('nested', done => {
lock().then(unlock =>
sequence(['a', 3])
.then(() => {
// 'c' registers after 'a' ended and while 'b' is running
delay(2).then(() =>
lock()
.then(unlock2 => sequence(['c', 3]).finally(unlock2))
.then(done),
);
})
.finally(unlock),
);
lock().then(unlock => sequence(['b', 8]).finally(unlock));
});

it('with keys', async () => {
let state1: any, state2: any;

await Promise.all([
lock('lock1').then(async unlock => {
state1 = 'a';
await delay(3);
expect(state1).toBe('a');

state1 = 'b';
await delay(9);
expect(state1).toBe('b');

state1 = 'c';
await delay(3);
expect(state1).toBe('c');

unlock();
}),
lock('lock2').then(async unlock => {
expect(state1).toBe('a');

state2 = 'g';
await delay(8);
expect(state2).toBe('g');
expect(state1).toBe('b');

state2 = 'h';
await delay(11);
expect(state2).toBe('h');
expect(state1).toBe('d');

state2 = 'i';
await delay(12);
expect(state2).toBe('i');
expect(state1).toBe('f');

unlock();
}),
lock('lock1').then(async unlock => {
state1 = 'd';
await delay(8);
expect(state1).toBe('d');

state1 = 'e';
await delay(4);
expect(state1).toBe('e');

state1 = 'f';
await delay(2);
expect(state1).toBe('f');

unlock();
}),
]);
});
});

0 comments on commit 1835c17

Please sign in to comment.