Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: merge staging to master #763

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/status/Status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import type {
StatusDead,
} from './types';
import type { FileSystem, FileHandle } from '../types';
import type { PromiseCancellable } from '@matrixai/async-cancellable';
import type { ContextTimed, ContextTimedInput } from '@matrixai/contexts';
import Logger from '@matrixai/logger';
import lock from 'fd-lock';
import { StartStop, ready } from '@matrixai/async-init/dist/StartStop';
import { context, timedCancellable } from '@matrixai/contexts/dist/decorators';
import * as statusUtils from './utils';
import * as statusErrors from './errors';
import * as statusEvents from './events';
Expand Down Expand Up @@ -299,9 +302,14 @@ class Status {
}
}

public waitFor(
status: StatusInfo['status'],
ctx?: Partial<ContextTimedInput>,
): PromiseCancellable<StatusInfo>;
@timedCancellable(true)
public async waitFor(
status: StatusInfo['status'],
timeout?: number,
@context ctx: ContextTimed,
): Promise<StatusInfo> {
let statusInfo;
try {
Expand All @@ -323,7 +331,7 @@ class Status {
return false;
},
50,
timeout,
ctx,
);
} catch (e) {
if (e instanceof errors.ErrorUtilsPollTimeout) {
Expand Down
76 changes: 68 additions & 8 deletions src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import type {
PromiseDeconstructed,
Callback,
} from '../types';
import type { ContextTimed, ContextTimedInput } from '@matrixai/contexts';
import os from 'os';
import process from 'process';
import path from 'path';
import nodesEvents from 'events';
import lexi from 'lexicographic-integer';
import { PromiseCancellable } from '@matrixai/async-cancellable';
import { timedCancellable } from '@matrixai/contexts/dist/functions';
import * as utilsErrors from './errors';

const AsyncFunction = (async () => {}).constructor;
Expand Down Expand Up @@ -93,6 +95,22 @@ async function sleep(ms: number): Promise<void> {
return await new Promise<void>((r) => setTimeout(r, ms));
}

function sleepCancellable(ms: number): PromiseCancellable<void> {
return new PromiseCancellable<void>((resolve, reject, signal) => {
if (signal.aborted) return reject(signal.reason);
const handleTimeout = () => {
signal.removeEventListener('abort', handleAbort);
resolve();
};
const handleAbort = () => {
clearTimeout(timer);
reject(signal.reason);
};
signal.addEventListener('abort', handleAbort, { once: true });
const timer = setTimeout(handleTimeout, ms);
});
}

/**
* Checks if value is an object.
* Arrays are also considered objects.
Expand Down Expand Up @@ -149,23 +167,31 @@ function getUnixtime(date: Date = new Date()) {
return Math.round(date.getTime() / 1000);
}

const sleepCancelReasonSymbol = Symbol('sleepCancelReasonSymbol');

/**
* Poll execution and use condition to accept or reject the results
*/
async function poll<T, E = any>(
async function poll_<T, E = any>(
ctx: ContextTimed,
f: () => T | PromiseLike<T>,
condition: {
(e: E, result?: undefined): boolean;
(e: null, result: T): boolean;
},
interval = 1000,
timeout?: number,
interval: number,
): Promise<T> {
const timer = timeout != null ? timerStart(timeout) : undefined;
let result: T;
const { p: abortP, resolveP: resolveAbortP } = promise();
const handleAbortP = () => resolveAbortP();
if (ctx.signal.aborted) {
resolveAbortP();
} else {
ctx.signal.addEventListener('abort', handleAbortP, { once: true });
}
try {
let result: T;
while (true) {
if (timer?.timedOut) {
if (ctx.signal.aborted) {
throw new utilsErrors.ErrorUtilsPollTimeout();
}
try {
Expand All @@ -178,13 +204,46 @@ async function poll<T, E = any>(
throw e;
}
}
await sleep(interval);
const sleepP = sleepCancellable(interval);
await Promise.race([sleepP, abortP])
.finally(async () => {
// Clean up
sleepP.cancel(sleepCancelReasonSymbol);
await sleepP;
})
.catch((e) => {
if (e !== sleepCancelReasonSymbol) throw e;
});
}
} finally {
if (timer != null) timerStop(timer);
resolveAbortP();
await abortP;
ctx.signal.removeEventListener('abort', handleAbortP);
}
}

const pollCancellable = timedCancellable(
poll_,
true,
undefined,
utilsErrors.ErrorUtilsPollTimeout,
);

/**
* Poll execution and use condition to accept or reject the results
*/
function poll<T, E = any>(
f: () => T | PromiseLike<T>,
condition: {
(e: E, result?: undefined): boolean;
(e: null, result: T): boolean;
},
interval = 1000,
ctx?: Partial<ContextTimedInput>,
): PromiseCancellable<T> {
return pollCancellable(ctx, f, condition, interval);
}

/**
* Convert callback-style to promise-style
* If this is applied to overloaded function
Expand Down Expand Up @@ -492,6 +551,7 @@ export {
dirEmpty,
pathIncludes,
sleep,
sleepCancellable,
isObject,
isEmptyObject,
filterEmptyObject,
Expand Down
17 changes: 16 additions & 1 deletion tests/status/Status.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ describe('Status', () => {
// In this case, it is possible that upon reacting to `LIVE` status
// When it reads the status, it has already changed to `STOPPING`
// Which means the `statusWaitFor` never resolves
const statusWaitFor = status.waitFor('LIVE', 1000);
const statusWaitFor = status.waitFor('LIVE', { timer: 1000 });
const p1 = status.finishStart({
clientHost: '',
clientPort: 0,
Expand All @@ -320,4 +320,19 @@ describe('Status', () => {
).toBe(true);
await status.stop({});
});
test('wait for is cancellable', async () => {
const status = new Status({
statusPath: path.join(dataDir, config.paths.statusBase),
statusLockPath: path.join(dataDir, config.paths.statusLockBase),
fs: fs,
logger: logger,
});
await status.start({ pid: 0 });
// Is in `STARTED` state, so `DEAD` state will never be reached
const statusWaitForP = status.waitFor('DEAD');
statusWaitForP.cancel('reason');
await expect(statusWaitForP).rejects.toThrow(
statusErrors.ErrorStatusTimeout,
);
});
});
27 changes: 13 additions & 14 deletions tests/tasks/TaskManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,15 @@ describe(TaskManager.name, () => {
}

let completed = false;
const waitForcompletionProm = (async () => {
const waitForCompletionProm = (async () => {
await Promise.all(pendingTasks);
completed = true;
})();

// Check for active tasks while tasks are still running
while (!completed) {
expect(taskManager.activeCount).toBeLessThanOrEqual(activeLimit);
await Promise.race([utils.sleep(100), waitForcompletionProm]);
await Promise.race([utils.sleep(100), waitForCompletionProm]);
}

await taskManager.stop();
Expand Down Expand Up @@ -555,7 +555,7 @@ describe(TaskManager.name, () => {
const handler = jest.fn();
const { p: pauseP, resolveP: resolvePauseP } = utils.promise();
handler.mockImplementation(async (ctx: ContextTimed) => {
const abortP = new Promise((resolve, reject) =>
const abortP = new Promise((_, reject) =>
ctx.signal.addEventListener('abort', () => reject(ctx.signal.reason)),
);
await Promise.race([pauseP, abortP]);
Expand Down Expand Up @@ -589,7 +589,7 @@ describe(TaskManager.name, () => {
return status![0] === 'active' && status![1] === 'active';
},
10,
1000,
{ timer: 1000 },
);
// Cancellation should reject promise
const taskPromise = task1.promise();
Expand Down Expand Up @@ -657,7 +657,7 @@ describe(TaskManager.name, () => {
const handler = jest.fn();
const { p: pauseP, resolveP: resolvePauseP } = utils.promise();
handler.mockImplementation(async (ctx: ContextTimed) => {
const abortP = new Promise((resolve, reject) =>
const abortP = new Promise((_, reject) =>
ctx.signal.addEventListener('abort', () =>
reject(
new tasksErrors.ErrorTaskRetry(undefined, {
Expand Down Expand Up @@ -696,7 +696,7 @@ describe(TaskManager.name, () => {
return status![0] === 'active' && status![1] === 'active';
},
10,
1000,
{ timer: 1000 },
);
await taskManager.stop();
await taskManager.start({ lazy: true });
Expand All @@ -710,7 +710,7 @@ describe(TaskManager.name, () => {
const handlerId1 = 'handler1' as TaskHandlerId;
const handler1 = jest.fn();
handler1.mockImplementation(async (ctx: ContextTimed) => {
const abortP = new Promise((resolve, reject) =>
const abortP = new Promise((_, reject) =>
ctx.signal.addEventListener('abort', () =>
reject(
new tasksErrors.ErrorTaskRetry(undefined, {
Expand All @@ -724,7 +724,7 @@ describe(TaskManager.name, () => {
const handlerId2 = 'handler2' as TaskHandlerId;
const handler2 = jest.fn();
handler2.mockImplementation(async (ctx: ContextTimed) => {
const abortP = new Promise((resolve, reject) =>
const abortP = new Promise((_, reject) =>
ctx.signal.addEventListener('abort', () => reject(ctx.signal.reason)),
);
await Promise.race([pauseP, abortP]);
Expand Down Expand Up @@ -758,7 +758,7 @@ describe(TaskManager.name, () => {
return status![0] === 'active' && status![1] === 'active';
},
10,
1000,
{ timer: 1000 },
);
await taskManager.stop();
// Tasks were run
Expand Down Expand Up @@ -973,7 +973,7 @@ describe(TaskManager.name, () => {
return status === 'queued';
},
10,
1000,
{ timer: 1000 },
);
await expect(
taskManager.updateTask(task1.id, {
Expand Down Expand Up @@ -1124,14 +1124,13 @@ describe(TaskManager.name, () => {
const tasks = await Promise.all(
taskIds.map((id) => taskManager.getTask(id)),
);
const statuses = tasks.map((task) => task!.status);
return statuses;
return tasks.map((task) => task!.status);
},
(_: any, statuses?: Array<TaskStatus>) => {
return statuses!.every((status) => status === 'queued');
},
10,
1000,
{ timer: 1000 },
);
// @ts-ignore: Then queueing, which makes the tasks active
await taskManager.startQueueing();
Expand All @@ -1144,7 +1143,7 @@ describe(TaskManager.name, () => {
const handler = jest.fn();
const { p: pauseP, resolveP: resolvePauseP } = utils.promise();
handler.mockImplementation(async (ctx: ContextTimed) => {
const abortP = new Promise((resolve, reject) =>
const abortP = new Promise((_, reject) =>
ctx.signal.addEventListener('abort', () => reject(ctx.signal.reason)),
);
await Promise.race([pauseP, abortP]);
Expand Down