From 8b2f2114176e643ab326ebf85cff11f3b0aa7921 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Mon, 8 Jul 2024 18:09:18 +1000 Subject: [PATCH] feat: made `poll` and `Status.waitFor` cancellable --- src/status/Status.ts | 12 +++++- src/utils/utils.ts | 76 +++++++++++++++++++++++++++++---- tests/status/Status.test.ts | 17 +++++++- tests/tasks/TaskManager.test.ts | 27 ++++++------ 4 files changed, 107 insertions(+), 25 deletions(-) diff --git a/src/status/Status.ts b/src/status/Status.ts index eac8ecba7..c4d7671e2 100644 --- a/src/status/Status.ts +++ b/src/status/Status.ts @@ -6,9 +6,12 @@ import type { StatusDead, } from './types'; import type { FileSystem, FileHandle } from '../types'; +import type { PromiseCancellable } from '@matrixai/async-cancellable'; +import type { ContextTimed, ContextTimedInput } from '@matrixai/contexts'; import Logger from '@matrixai/logger'; import lock from 'fd-lock'; import { StartStop, ready } from '@matrixai/async-init/dist/StartStop'; +import { context, timedCancellable } from '@matrixai/contexts/dist/decorators'; import * as statusUtils from './utils'; import * as statusErrors from './errors'; import * as statusEvents from './events'; @@ -299,9 +302,14 @@ class Status { } } + public waitFor( + status: StatusInfo['status'], + ctx?: Partial, + ): PromiseCancellable; + @timedCancellable(true) public async waitFor( status: StatusInfo['status'], - timeout?: number, + @context ctx: ContextTimed, ): Promise { let statusInfo; try { @@ -323,7 +331,7 @@ class Status { return false; }, 50, - timeout, + ctx, ); } catch (e) { if (e instanceof errors.ErrorUtilsPollTimeout) { diff --git a/src/utils/utils.ts b/src/utils/utils.ts index 9f70b7361..cd4dbeda4 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -5,12 +5,14 @@ import type { PromiseDeconstructed, Callback, } from '../types'; +import type { ContextTimed, ContextTimedInput } from '@matrixai/contexts'; import os from 'os'; import process from 'process'; import path from 'path'; import nodesEvents from 'events'; import lexi from 'lexicographic-integer'; import { PromiseCancellable } from '@matrixai/async-cancellable'; +import { timedCancellable } from '@matrixai/contexts/dist/functions'; import * as utilsErrors from './errors'; const AsyncFunction = (async () => {}).constructor; @@ -93,6 +95,22 @@ async function sleep(ms: number): Promise { return await new Promise((r) => setTimeout(r, ms)); } +function sleepCancellable(ms: number): PromiseCancellable { + return new PromiseCancellable((resolve, reject, signal) => { + if (signal.aborted) return reject(signal.reason); + const handleTimeout = () => { + signal.removeEventListener('abort', handleAbort); + resolve(); + }; + const handleAbort = () => { + clearTimeout(timer); + reject(signal.reason); + }; + signal.addEventListener('abort', handleAbort, { once: true }); + const timer = setTimeout(handleTimeout, ms); + }); +} + /** * Checks if value is an object. * Arrays are also considered objects. @@ -149,23 +167,31 @@ function getUnixtime(date: Date = new Date()) { return Math.round(date.getTime() / 1000); } +const sleepCancelReasonSymbol = Symbol('sleepCancelReasonSymbol'); + /** * Poll execution and use condition to accept or reject the results */ -async function poll( +async function poll_( + ctx: ContextTimed, f: () => T | PromiseLike, condition: { (e: E, result?: undefined): boolean; (e: null, result: T): boolean; }, - interval = 1000, - timeout?: number, + interval: number, ): Promise { - const timer = timeout != null ? timerStart(timeout) : undefined; + let result: T; + const { p: abortP, resolveP: resolveAbortP } = promise(); + const handleAbortP = () => resolveAbortP(); + if (ctx.signal.aborted) { + resolveAbortP(); + } else { + ctx.signal.addEventListener('abort', handleAbortP, { once: true }); + } try { - let result: T; while (true) { - if (timer?.timedOut) { + if (ctx.signal.aborted) { throw new utilsErrors.ErrorUtilsPollTimeout(); } try { @@ -178,13 +204,46 @@ async function poll( throw e; } } - await sleep(interval); + const sleepP = sleepCancellable(interval); + await Promise.race([sleepP, abortP]) + .finally(async () => { + // Clean up + sleepP.cancel(sleepCancelReasonSymbol); + await sleepP; + }) + .catch((e) => { + if (e !== sleepCancelReasonSymbol) throw e; + }); } } finally { - if (timer != null) timerStop(timer); + resolveAbortP(); + await abortP; + ctx.signal.removeEventListener('abort', handleAbortP); } } +const pollCancellable = timedCancellable( + poll_, + true, + undefined, + utilsErrors.ErrorUtilsPollTimeout, +); + +/** + * Poll execution and use condition to accept or reject the results + */ +function poll( + f: () => T | PromiseLike, + condition: { + (e: E, result?: undefined): boolean; + (e: null, result: T): boolean; + }, + interval = 1000, + ctx?: Partial, +): PromiseCancellable { + return pollCancellable(ctx, f, condition, interval); +} + /** * Convert callback-style to promise-style * If this is applied to overloaded function @@ -492,6 +551,7 @@ export { dirEmpty, pathIncludes, sleep, + sleepCancellable, isObject, isEmptyObject, filterEmptyObject, diff --git a/tests/status/Status.test.ts b/tests/status/Status.test.ts index 0ed5b22be..d1ec9da46 100644 --- a/tests/status/Status.test.ts +++ b/tests/status/Status.test.ts @@ -294,7 +294,7 @@ describe('Status', () => { // In this case, it is possible that upon reacting to `LIVE` status // When it reads the status, it has already changed to `STOPPING` // Which means the `statusWaitFor` never resolves - const statusWaitFor = status.waitFor('LIVE', 1000); + const statusWaitFor = status.waitFor('LIVE', { timer: 1000 }); const p1 = status.finishStart({ clientHost: '', clientPort: 0, @@ -320,4 +320,19 @@ describe('Status', () => { ).toBe(true); await status.stop({}); }); + test('wait for is cancellable', async () => { + const status = new Status({ + statusPath: path.join(dataDir, config.paths.statusBase), + statusLockPath: path.join(dataDir, config.paths.statusLockBase), + fs: fs, + logger: logger, + }); + await status.start({ pid: 0 }); + // Is in `STARTED` state, so `DEAD` state will never be reached + const statusWaitForP = status.waitFor('DEAD'); + statusWaitForP.cancel('reason'); + await expect(statusWaitForP).rejects.toThrow( + statusErrors.ErrorStatusTimeout, + ); + }); }); diff --git a/tests/tasks/TaskManager.test.ts b/tests/tasks/TaskManager.test.ts index 5334eed35..7d8a6c8d9 100644 --- a/tests/tasks/TaskManager.test.ts +++ b/tests/tasks/TaskManager.test.ts @@ -261,7 +261,7 @@ describe(TaskManager.name, () => { } let completed = false; - const waitForcompletionProm = (async () => { + const waitForCompletionProm = (async () => { await Promise.all(pendingTasks); completed = true; })(); @@ -269,7 +269,7 @@ describe(TaskManager.name, () => { // Check for active tasks while tasks are still running while (!completed) { expect(taskManager.activeCount).toBeLessThanOrEqual(activeLimit); - await Promise.race([utils.sleep(100), waitForcompletionProm]); + await Promise.race([utils.sleep(100), waitForCompletionProm]); } await taskManager.stop(); @@ -555,7 +555,7 @@ describe(TaskManager.name, () => { const handler = jest.fn(); const { p: pauseP, resolveP: resolvePauseP } = utils.promise(); handler.mockImplementation(async (ctx: ContextTimed) => { - const abortP = new Promise((resolve, reject) => + const abortP = new Promise((_, reject) => ctx.signal.addEventListener('abort', () => reject(ctx.signal.reason)), ); await Promise.race([pauseP, abortP]); @@ -589,7 +589,7 @@ describe(TaskManager.name, () => { return status![0] === 'active' && status![1] === 'active'; }, 10, - 1000, + { timer: 1000 }, ); // Cancellation should reject promise const taskPromise = task1.promise(); @@ -657,7 +657,7 @@ describe(TaskManager.name, () => { const handler = jest.fn(); const { p: pauseP, resolveP: resolvePauseP } = utils.promise(); handler.mockImplementation(async (ctx: ContextTimed) => { - const abortP = new Promise((resolve, reject) => + const abortP = new Promise((_, reject) => ctx.signal.addEventListener('abort', () => reject( new tasksErrors.ErrorTaskRetry(undefined, { @@ -696,7 +696,7 @@ describe(TaskManager.name, () => { return status![0] === 'active' && status![1] === 'active'; }, 10, - 1000, + { timer: 1000 }, ); await taskManager.stop(); await taskManager.start({ lazy: true }); @@ -710,7 +710,7 @@ describe(TaskManager.name, () => { const handlerId1 = 'handler1' as TaskHandlerId; const handler1 = jest.fn(); handler1.mockImplementation(async (ctx: ContextTimed) => { - const abortP = new Promise((resolve, reject) => + const abortP = new Promise((_, reject) => ctx.signal.addEventListener('abort', () => reject( new tasksErrors.ErrorTaskRetry(undefined, { @@ -724,7 +724,7 @@ describe(TaskManager.name, () => { const handlerId2 = 'handler2' as TaskHandlerId; const handler2 = jest.fn(); handler2.mockImplementation(async (ctx: ContextTimed) => { - const abortP = new Promise((resolve, reject) => + const abortP = new Promise((_, reject) => ctx.signal.addEventListener('abort', () => reject(ctx.signal.reason)), ); await Promise.race([pauseP, abortP]); @@ -758,7 +758,7 @@ describe(TaskManager.name, () => { return status![0] === 'active' && status![1] === 'active'; }, 10, - 1000, + { timer: 1000 }, ); await taskManager.stop(); // Tasks were run @@ -973,7 +973,7 @@ describe(TaskManager.name, () => { return status === 'queued'; }, 10, - 1000, + { timer: 1000 }, ); await expect( taskManager.updateTask(task1.id, { @@ -1124,14 +1124,13 @@ describe(TaskManager.name, () => { const tasks = await Promise.all( taskIds.map((id) => taskManager.getTask(id)), ); - const statuses = tasks.map((task) => task!.status); - return statuses; + return tasks.map((task) => task!.status); }, (_: any, statuses?: Array) => { return statuses!.every((status) => status === 'queued'); }, 10, - 1000, + { timer: 1000 }, ); // @ts-ignore: Then queueing, which makes the tasks active await taskManager.startQueueing(); @@ -1144,7 +1143,7 @@ describe(TaskManager.name, () => { const handler = jest.fn(); const { p: pauseP, resolveP: resolvePauseP } = utils.promise(); handler.mockImplementation(async (ctx: ContextTimed) => { - const abortP = new Promise((resolve, reject) => + const abortP = new Promise((_, reject) => ctx.signal.addEventListener('abort', () => reject(ctx.signal.reason)), ); await Promise.race([pauseP, abortP]);