Skip to content

Commit

Permalink
feat: made poll and Status.waitFor cancellable
Browse files Browse the repository at this point in the history
  • Loading branch information
tegefaulkes committed Jul 8, 2024
1 parent de86b0e commit 8b2f211
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 25 deletions.
12 changes: 10 additions & 2 deletions src/status/Status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import type {
StatusDead,
} from './types';
import type { FileSystem, FileHandle } from '../types';
import type { PromiseCancellable } from '@matrixai/async-cancellable';
import type { ContextTimed, ContextTimedInput } from '@matrixai/contexts';
import Logger from '@matrixai/logger';
import lock from 'fd-lock';
import { StartStop, ready } from '@matrixai/async-init/dist/StartStop';
import { context, timedCancellable } from '@matrixai/contexts/dist/decorators';
import * as statusUtils from './utils';
import * as statusErrors from './errors';
import * as statusEvents from './events';
Expand Down Expand Up @@ -299,9 +302,14 @@ class Status {
}
}

public waitFor(
status: StatusInfo['status'],
ctx?: Partial<ContextTimedInput>,
): PromiseCancellable<StatusInfo>;
@timedCancellable(true)
public async waitFor(
status: StatusInfo['status'],
timeout?: number,
@context ctx: ContextTimed,
): Promise<StatusInfo> {
let statusInfo;
try {
Expand All @@ -323,7 +331,7 @@ class Status {
return false;
},
50,
timeout,
ctx,
);
} catch (e) {
if (e instanceof errors.ErrorUtilsPollTimeout) {
Expand Down
76 changes: 68 additions & 8 deletions src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import type {
PromiseDeconstructed,
Callback,
} from '../types';
import type { ContextTimed, ContextTimedInput } from '@matrixai/contexts';
import os from 'os';
import process from 'process';
import path from 'path';
import nodesEvents from 'events';
import lexi from 'lexicographic-integer';
import { PromiseCancellable } from '@matrixai/async-cancellable';
import { timedCancellable } from '@matrixai/contexts/dist/functions';
import * as utilsErrors from './errors';

const AsyncFunction = (async () => {}).constructor;
Expand Down Expand Up @@ -93,6 +95,22 @@ async function sleep(ms: number): Promise<void> {
return await new Promise<void>((r) => setTimeout(r, ms));
}

function sleepCancellable(ms: number): PromiseCancellable<void> {
return new PromiseCancellable<void>((resolve, reject, signal) => {
if (signal.aborted) return reject(signal.reason);
const handleTimeout = () => {
signal.removeEventListener('abort', handleAbort);
resolve();
};
const handleAbort = () => {
clearTimeout(timer);
reject(signal.reason);
};
signal.addEventListener('abort', handleAbort, { once: true });
const timer = setTimeout(handleTimeout, ms);
});
}

/**
* Checks if value is an object.
* Arrays are also considered objects.
Expand Down Expand Up @@ -149,23 +167,31 @@ function getUnixtime(date: Date = new Date()) {
return Math.round(date.getTime() / 1000);
}

const sleepCancelReasonSymbol = Symbol('sleepCancelReasonSymbol');

/**
* Poll execution and use condition to accept or reject the results
*/
async function poll<T, E = any>(
async function poll_<T, E = any>(
ctx: ContextTimed,
f: () => T | PromiseLike<T>,
condition: {
(e: E, result?: undefined): boolean;
(e: null, result: T): boolean;
},
interval = 1000,
timeout?: number,
interval: number,
): Promise<T> {
const timer = timeout != null ? timerStart(timeout) : undefined;
let result: T;
const { p: abortP, resolveP: resolveAbortP } = promise();
const handleAbortP = () => resolveAbortP();
if (ctx.signal.aborted) {
resolveAbortP();
} else {
ctx.signal.addEventListener('abort', handleAbortP, { once: true });
}
try {
let result: T;
while (true) {
if (timer?.timedOut) {
if (ctx.signal.aborted) {
throw new utilsErrors.ErrorUtilsPollTimeout();
}
try {
Expand All @@ -178,13 +204,46 @@ async function poll<T, E = any>(
throw e;
}
}
await sleep(interval);
const sleepP = sleepCancellable(interval);
await Promise.race([sleepP, abortP])
.finally(async () => {
// Clean up
sleepP.cancel(sleepCancelReasonSymbol);
await sleepP;
})
.catch((e) => {
if (e !== sleepCancelReasonSymbol) throw e;
});
}
} finally {
if (timer != null) timerStop(timer);
resolveAbortP();
await abortP;
ctx.signal.removeEventListener('abort', handleAbortP);
}
}

const pollCancellable = timedCancellable(
poll_,
true,
undefined,
utilsErrors.ErrorUtilsPollTimeout,
);

/**
* Poll execution and use condition to accept or reject the results
*/
function poll<T, E = any>(
f: () => T | PromiseLike<T>,
condition: {
(e: E, result?: undefined): boolean;
(e: null, result: T): boolean;
},
interval = 1000,
ctx?: Partial<ContextTimedInput>,
): PromiseCancellable<T> {
return pollCancellable(ctx, f, condition, interval);
}

/**
* Convert callback-style to promise-style
* If this is applied to overloaded function
Expand Down Expand Up @@ -492,6 +551,7 @@ export {
dirEmpty,
pathIncludes,
sleep,
sleepCancellable,
isObject,
isEmptyObject,
filterEmptyObject,
Expand Down
17 changes: 16 additions & 1 deletion tests/status/Status.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ describe('Status', () => {
// In this case, it is possible that upon reacting to `LIVE` status
// When it reads the status, it has already changed to `STOPPING`
// Which means the `statusWaitFor` never resolves
const statusWaitFor = status.waitFor('LIVE', 1000);
const statusWaitFor = status.waitFor('LIVE', { timer: 1000 });
const p1 = status.finishStart({
clientHost: '',
clientPort: 0,
Expand All @@ -320,4 +320,19 @@ describe('Status', () => {
).toBe(true);
await status.stop({});
});
test('wait for is cancellable', async () => {
const status = new Status({
statusPath: path.join(dataDir, config.paths.statusBase),
statusLockPath: path.join(dataDir, config.paths.statusLockBase),
fs: fs,
logger: logger,
});
await status.start({ pid: 0 });
// Is in `STARTED` state, so `DEAD` state will never be reached
const statusWaitForP = status.waitFor('DEAD');
statusWaitForP.cancel('reason');
await expect(statusWaitForP).rejects.toThrow(
statusErrors.ErrorStatusTimeout,
);
});
});
27 changes: 13 additions & 14 deletions tests/tasks/TaskManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,15 @@ describe(TaskManager.name, () => {
}

let completed = false;
const waitForcompletionProm = (async () => {
const waitForCompletionProm = (async () => {
await Promise.all(pendingTasks);
completed = true;
})();

// Check for active tasks while tasks are still running
while (!completed) {
expect(taskManager.activeCount).toBeLessThanOrEqual(activeLimit);
await Promise.race([utils.sleep(100), waitForcompletionProm]);
await Promise.race([utils.sleep(100), waitForCompletionProm]);
}

await taskManager.stop();
Expand Down Expand Up @@ -555,7 +555,7 @@ describe(TaskManager.name, () => {
const handler = jest.fn();
const { p: pauseP, resolveP: resolvePauseP } = utils.promise();
handler.mockImplementation(async (ctx: ContextTimed) => {
const abortP = new Promise((resolve, reject) =>
const abortP = new Promise((_, reject) =>
ctx.signal.addEventListener('abort', () => reject(ctx.signal.reason)),
);
await Promise.race([pauseP, abortP]);
Expand Down Expand Up @@ -589,7 +589,7 @@ describe(TaskManager.name, () => {
return status![0] === 'active' && status![1] === 'active';
},
10,
1000,
{ timer: 1000 },
);
// Cancellation should reject promise
const taskPromise = task1.promise();
Expand Down Expand Up @@ -657,7 +657,7 @@ describe(TaskManager.name, () => {
const handler = jest.fn();
const { p: pauseP, resolveP: resolvePauseP } = utils.promise();
handler.mockImplementation(async (ctx: ContextTimed) => {
const abortP = new Promise((resolve, reject) =>
const abortP = new Promise((_, reject) =>
ctx.signal.addEventListener('abort', () =>
reject(
new tasksErrors.ErrorTaskRetry(undefined, {
Expand Down Expand Up @@ -696,7 +696,7 @@ describe(TaskManager.name, () => {
return status![0] === 'active' && status![1] === 'active';
},
10,
1000,
{ timer: 1000 },
);
await taskManager.stop();
await taskManager.start({ lazy: true });
Expand All @@ -710,7 +710,7 @@ describe(TaskManager.name, () => {
const handlerId1 = 'handler1' as TaskHandlerId;
const handler1 = jest.fn();
handler1.mockImplementation(async (ctx: ContextTimed) => {
const abortP = new Promise((resolve, reject) =>
const abortP = new Promise((_, reject) =>
ctx.signal.addEventListener('abort', () =>
reject(
new tasksErrors.ErrorTaskRetry(undefined, {
Expand All @@ -724,7 +724,7 @@ describe(TaskManager.name, () => {
const handlerId2 = 'handler2' as TaskHandlerId;
const handler2 = jest.fn();
handler2.mockImplementation(async (ctx: ContextTimed) => {
const abortP = new Promise((resolve, reject) =>
const abortP = new Promise((_, reject) =>
ctx.signal.addEventListener('abort', () => reject(ctx.signal.reason)),
);
await Promise.race([pauseP, abortP]);
Expand Down Expand Up @@ -758,7 +758,7 @@ describe(TaskManager.name, () => {
return status![0] === 'active' && status![1] === 'active';
},
10,
1000,
{ timer: 1000 },
);
await taskManager.stop();
// Tasks were run
Expand Down Expand Up @@ -973,7 +973,7 @@ describe(TaskManager.name, () => {
return status === 'queued';
},
10,
1000,
{ timer: 1000 },
);
await expect(
taskManager.updateTask(task1.id, {
Expand Down Expand Up @@ -1124,14 +1124,13 @@ describe(TaskManager.name, () => {
const tasks = await Promise.all(
taskIds.map((id) => taskManager.getTask(id)),
);
const statuses = tasks.map((task) => task!.status);
return statuses;
return tasks.map((task) => task!.status);
},
(_: any, statuses?: Array<TaskStatus>) => {
return statuses!.every((status) => status === 'queued');
},
10,
1000,
{ timer: 1000 },
);
// @ts-ignore: Then queueing, which makes the tasks active
await taskManager.startQueueing();
Expand All @@ -1144,7 +1143,7 @@ describe(TaskManager.name, () => {
const handler = jest.fn();
const { p: pauseP, resolveP: resolvePauseP } = utils.promise();
handler.mockImplementation(async (ctx: ContextTimed) => {
const abortP = new Promise((resolve, reject) =>
const abortP = new Promise((_, reject) =>
ctx.signal.addEventListener('abort', () => reject(ctx.signal.reason)),
);
await Promise.race([pauseP, abortP]);
Expand Down

0 comments on commit 8b2f211

Please sign in to comment.