Skip to content

Commit b1b7ec4

Browse files
committed
feat(tarball): implement new NpmTarballWorkerPool class
1 parent b9a9e45 commit b1b7ec4

File tree

5 files changed

+387
-0
lines changed

5 files changed

+387
-0
lines changed

workspaces/tarball/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
"@nodesecure/mama": "^2.0.0",
4646
"@nodesecure/npm-types": "^1.2.0",
4747
"@nodesecure/utils": "^2.3.0",
48+
"@openally/result": "1.3.0",
49+
"hyperid": "3.3.0",
4850
"pacote": "^21.0.0"
4951
},
5052
"devDependencies": {
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
// Import Node.js Dependencies
2+
import { EventEmitter } from "node:events";
3+
import path from "node:path";
4+
5+
// Import Third-party Dependencies
6+
import hyperid from "hyperid";
7+
// import { type Result, Ok, Err } from "@openally/result";
8+
import type { AstAnalyserOptions } from "@nodesecure/js-x-ray";
9+
import type { Contact } from "@nodesecure/npm-types";
10+
11+
// Import Internal Dependencies
12+
import { PooledWorker } from "./PooledWorker.class.ts";
13+
14+
export interface NpmTarballWorkerPoolOptions {
15+
/**
16+
* Number of workers in the pool
17+
* @default 4
18+
*/
19+
workerCount?: number;
20+
}
21+
22+
export interface WorkerTask {
23+
location: string;
24+
astAnalyserOptions?: AstAnalyserOptions;
25+
}
26+
27+
export interface WorkerTaskWithId extends WorkerTask {
28+
id: string;
29+
}
30+
31+
type WorkerTaskResultOk = {
32+
id: string;
33+
result: ScanResultPayload;
34+
};
35+
36+
type WorkerTaskResultErr = {
37+
id: string;
38+
error: string;
39+
};
40+
41+
export type WorkerTaskResult = WorkerTaskResultOk | WorkerTaskResultErr;
42+
43+
export interface ScanResultPayload {
44+
description?: string;
45+
engines?: Record<string, any>;
46+
repository?: any;
47+
scripts?: Record<string, string>;
48+
author?: Contact | null;
49+
integrity?: string | null;
50+
type: string;
51+
size: number;
52+
licenses: any[];
53+
uniqueLicenseIds: string[];
54+
warnings: any[];
55+
flags: string[];
56+
composition: {
57+
extensions: string[];
58+
files: string[];
59+
minified: string[];
60+
unused: string[];
61+
missing: string[];
62+
required_files: string[];
63+
required_nodejs: string[];
64+
required_thirdparty: string[];
65+
required_subpath: Record<string, string>;
66+
};
67+
}
68+
69+
interface TaskPromiseHandler {
70+
resolve: (result: ScanResultPayload) => void;
71+
reject: (error: Error) => void;
72+
}
73+
74+
export class NpmTarballWorkerPool extends EventEmitter {
75+
#generateTaskId = hyperid();
76+
#workers: PooledWorker[] = [];
77+
#processingTasks: Map<string, TaskPromiseHandler> = new Map();
78+
#waitingTasks: WorkerTaskWithId[] = [];
79+
#isTerminated = false;
80+
81+
constructor(
82+
options: NpmTarballWorkerPoolOptions = {}
83+
) {
84+
super();
85+
86+
const { workerCount = 4 } = options;
87+
const workerPath = path.join(
88+
import.meta.dirname,
89+
"NpmTarballWorkerScript.js"
90+
);
91+
92+
for (let i = 0; i < workerCount; i++) {
93+
const worker = new PooledWorker(workerPath, {
94+
onComplete: (worker, message) => this.#onWorkerComplete(worker, message),
95+
onError: (worker, error) => this.#onWorkerError(worker, error)
96+
});
97+
98+
this.#workers.push(worker);
99+
}
100+
}
101+
102+
#onWorkerComplete(
103+
worker: PooledWorker,
104+
message: WorkerTaskResult
105+
): void {
106+
const handler = this.#processingTasks.get(message.id);
107+
if (handler) {
108+
this.#processingTasks.delete(message.id);
109+
110+
if ("error" in message) {
111+
handler.reject(new Error(message.error));
112+
}
113+
else {
114+
handler.resolve(message.result);
115+
}
116+
}
117+
118+
const nextTask = this.#waitingTasks.shift();
119+
if (nextTask) {
120+
worker.execute(nextTask);
121+
}
122+
}
123+
124+
#onWorkerError(
125+
worker: PooledWorker,
126+
error: Error
127+
): void {
128+
const taskId = worker.currentTaskId;
129+
if (taskId) {
130+
const handler = this.#processingTasks.get(taskId);
131+
if (handler) {
132+
this.#processingTasks.delete(taskId);
133+
handler.reject(error);
134+
}
135+
}
136+
137+
this.emit("error", error);
138+
const nextTask = this.#waitingTasks.shift();
139+
if (nextTask) {
140+
worker.execute(nextTask);
141+
}
142+
}
143+
144+
scan(
145+
task: WorkerTask
146+
): Promise<ScanResultPayload> {
147+
if (this.#isTerminated) {
148+
return Promise.reject(
149+
new Error("NpmTarballWorkerPool has been terminated")
150+
);
151+
}
152+
153+
const fullTask: WorkerTaskWithId = {
154+
id: this.#generateTaskId(),
155+
...task
156+
};
157+
158+
const { promise, resolve, reject } = Promise.withResolvers<ScanResultPayload>();
159+
this.#processingTasks.set(fullTask.id, { resolve, reject });
160+
161+
const availableWorker = this.#workers.find((worker) => worker.isAvailable) ?? null;
162+
if (availableWorker) {
163+
availableWorker.execute(fullTask);
164+
}
165+
else {
166+
this.#waitingTasks.push(fullTask);
167+
}
168+
169+
return promise;
170+
}
171+
172+
async terminate(): Promise<void> {
173+
this.#isTerminated = true;
174+
175+
const terminationError = new Error("NpmTarballWorkerPool terminated");
176+
for (const handler of this.#processingTasks.values()) {
177+
handler.reject(terminationError);
178+
}
179+
this.#processingTasks.clear();
180+
this.#waitingTasks = [];
181+
182+
await Promise.all(
183+
this.#workers.map((worker) => worker.terminate())
184+
);
185+
this.#workers = [];
186+
}
187+
188+
[Symbol.asyncDispose](): Promise<void> {
189+
return this.terminate();
190+
}
191+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Import Node.js Dependencies
2+
import { parentPort } from "node:worker_threads";
3+
import path from "node:path";
4+
5+
// Import Third-party Dependencies
6+
import { ManifestManager } from "@nodesecure/mama";
7+
import { type Warning } from "@nodesecure/js-x-ray";
8+
9+
// Import Internal Dependencies
10+
import { NpmTarball } from "./NpmTarball.class.ts";
11+
import {
12+
isSensitiveFile,
13+
booleanToFlags
14+
} from "../utils/index.ts";
15+
import {
16+
getEmptyPackageWarning,
17+
getSemVerWarning
18+
} from "../warnings.ts";
19+
20+
import type {
21+
WorkerTaskWithId,
22+
WorkerTaskResult,
23+
ScanResultPayload
24+
} from "./NpmTarballWorkerPool.class.ts";
25+
26+
// CONSTANTS
27+
const kNativeCodeExtensions = new Set([".gyp", ".c", ".cpp", ".node", ".so", ".h"]);
28+
29+
async function scanPackageInWorker(
30+
task: WorkerTaskWithId
31+
): Promise<ScanResultPayload> {
32+
const { location, astAnalyserOptions } = task;
33+
34+
const mama = await ManifestManager.fromPackageJSON(
35+
location
36+
);
37+
const tarex = new NpmTarball(mama);
38+
39+
const {
40+
composition,
41+
conformance,
42+
code
43+
} = await tarex.scanFiles(astAnalyserOptions);
44+
45+
const warnings: Warning[] = [];
46+
47+
// Check for empty package
48+
if (
49+
composition.files.length === 1 &&
50+
composition.files.includes("package.json")
51+
) {
52+
warnings.push(getEmptyPackageWarning());
53+
}
54+
55+
// Check for zero semver
56+
if (mama.hasZeroSemver) {
57+
warnings.push(getSemVerWarning(mama.document.version!));
58+
}
59+
60+
warnings.push(...code.warnings);
61+
62+
const {
63+
files,
64+
dependencies,
65+
flags
66+
} = code.groupAndAnalyseDependencies(mama);
67+
68+
const computedFlags = booleanToFlags({
69+
...flags,
70+
hasExternalCapacity: code.flags.hasExternalCapacity || flags.hasExternalCapacity,
71+
hasNoLicense: conformance.uniqueLicenseIds.length === 0,
72+
hasMultipleLicenses: conformance.uniqueLicenseIds.length > 1,
73+
hasMinifiedCode: code.minified.length > 0,
74+
hasWarnings: warnings.length > 0,
75+
hasBannedFile: composition.files.some((filePath) => isSensitiveFile(filePath)),
76+
hasNativeCode: mama.flags.isNative ||
77+
composition.files.some((file) => kNativeCodeExtensions.has(path.extname(file))),
78+
hasScript: mama.flags.hasUnsafeScripts
79+
});
80+
const {
81+
description, engines, repository, scripts
82+
} = mama.document;
83+
84+
return {
85+
description,
86+
engines,
87+
repository,
88+
scripts,
89+
author: mama.author,
90+
integrity: mama.isWorkspace ? null : mama.integrity,
91+
type: mama.moduleType,
92+
size: composition.size,
93+
licenses: conformance.licenses,
94+
uniqueLicenseIds: conformance.uniqueLicenseIds,
95+
warnings,
96+
flags: Array.from(computedFlags),
97+
composition: {
98+
extensions: [...composition.ext],
99+
files: composition.files,
100+
minified: code.minified,
101+
unused: dependencies.unused,
102+
missing: dependencies.missing,
103+
required_files: [...files],
104+
required_nodejs: dependencies.nodejs,
105+
required_thirdparty: dependencies.thirdparty,
106+
required_subpath: dependencies.subpathImports
107+
}
108+
};
109+
}
110+
111+
parentPort?.on("message", async(task: WorkerTaskWithId) => {
112+
let message: WorkerTaskResult;
113+
114+
try {
115+
const result = await scanPackageInWorker(task);
116+
117+
message = { id: task.id, result };
118+
}
119+
catch (error) {
120+
const messageError = error instanceof Error ?
121+
error.message :
122+
String(error);
123+
124+
message = { id: task.id, error: messageError };
125+
}
126+
127+
message && parentPort?.postMessage(message);
128+
});
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Import Node.js Dependencies
2+
import { Worker } from "node:worker_threads";
3+
4+
// Import Internal Dependencies
5+
import type {
6+
WorkerTaskWithId,
7+
WorkerTaskResult
8+
} from "./NpmTarballWorkerPool.class.ts";
9+
10+
export interface PooledWorkerEvents {
11+
onComplete: (worker: PooledWorker, result: WorkerTaskResult) => void;
12+
onError: (worker: PooledWorker, error: Error) => void;
13+
}
14+
15+
export class PooledWorker {
16+
#worker: Worker;
17+
#currentTaskId: string | null = null;
18+
#events: PooledWorkerEvents;
19+
20+
constructor(
21+
workerPath: string,
22+
events: PooledWorkerEvents
23+
) {
24+
this.#events = events;
25+
this.#worker = new Worker(workerPath);
26+
27+
this.#worker.on("message", (message: WorkerTaskResult) => {
28+
this.#currentTaskId = null;
29+
this.#events.onComplete(this, message);
30+
});
31+
32+
this.#worker.on("error", (error: Error) => {
33+
this.#currentTaskId = null;
34+
this.#events.onError(this, error);
35+
});
36+
}
37+
38+
get isAvailable(): boolean {
39+
return this.#currentTaskId === null;
40+
}
41+
42+
get currentTaskId(): string | null {
43+
return this.#currentTaskId;
44+
}
45+
46+
execute(
47+
task: WorkerTaskWithId
48+
): void {
49+
if (!this.isAvailable) {
50+
throw new Error(`Worker is busy with task ${this.#currentTaskId}`);
51+
}
52+
53+
this.#currentTaskId = task.id;
54+
this.#worker.postMessage(task);
55+
}
56+
57+
terminate(): Promise<number> {
58+
return this.#worker.terminate();
59+
}
60+
}

workspaces/tarball/src/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,8 @@
11
export * from "./tarball.ts";
22
export * from "./class/NpmTarball.class.ts";
3+
export {
4+
NpmTarballWorkerPool,
5+
type WorkerTask,
6+
type NpmTarballWorkerPoolOptions,
7+
type ScanResultPayload
8+
} from "./class/NpmTarballWorkerPool.class.ts";

0 commit comments

Comments
 (0)