Skip to content

Commit 7bc6676

Browse files
committed
feat(tarball): implement new NpmTarballWorkerPool class
1 parent b9a9e45 commit 7bc6676

File tree

5 files changed

+367
-0
lines changed

5 files changed

+367
-0
lines changed

workspaces/tarball/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
"@nodesecure/mama": "^2.0.0",
4646
"@nodesecure/npm-types": "^1.2.0",
4747
"@nodesecure/utils": "^2.3.0",
48+
"hyperid": "3.3.0",
4849
"pacote": "^21.0.0"
4950
},
5051
"devDependencies": {
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Import Node.js Dependencies
2+
import { EventEmitter } from "node:events";
3+
import path from "node:path";
4+
5+
// Import Third-party Dependencies
6+
import hyperid from "hyperid";
7+
import type { AstAnalyserOptions } from "@nodesecure/js-x-ray";
8+
import type { Contact } from "@nodesecure/npm-types";
9+
10+
// Import Internal Dependencies
11+
import { PooledWorker } from "./PooledWorker.class.ts";
12+
13+
export interface NpmTarballWorkerPoolOptions {
14+
/**
15+
* Number of workers in the pool
16+
* @default 4
17+
*/
18+
workerCount?: number;
19+
}
20+
21+
export interface WorkerTask {
22+
id: string;
23+
location: string;
24+
astAnalyserOptions?: AstAnalyserOptions;
25+
}
26+
27+
export interface WorkerTaskResult {
28+
id: string;
29+
location: string;
30+
result: ScanResultPayload | null;
31+
error: string | null;
32+
}
33+
34+
export interface ScanResultPayload {
35+
description?: string;
36+
engines?: Record<string, any>;
37+
repository?: any;
38+
scripts?: Record<string, string>;
39+
author?: Contact | null;
40+
integrity?: string | null;
41+
type: string;
42+
size: number;
43+
licenses: any[];
44+
uniqueLicenseIds: string[];
45+
warnings: any[];
46+
flags: string[];
47+
composition: {
48+
extensions: string[];
49+
files: string[];
50+
minified: string[];
51+
unused: string[];
52+
missing: string[];
53+
required_files: string[];
54+
required_nodejs: string[];
55+
required_thirdparty: string[];
56+
required_subpath: Record<string, string>;
57+
};
58+
}
59+
60+
interface TaskPromiseHandler {
61+
resolve: (result: WorkerTaskResult) => void;
62+
reject: (error: Error) => void;
63+
}
64+
65+
export class NpmTarballWorkerPool extends EventEmitter {
66+
#generateTaskId = hyperid();
67+
#workers: PooledWorker[] = [];
68+
#processingTasks: Map<string, TaskPromiseHandler> = new Map();
69+
#waitingTasks: WorkerTask[] = [];
70+
#isTerminated = false;
71+
72+
constructor(
73+
options: NpmTarballWorkerPoolOptions = {}
74+
) {
75+
super();
76+
77+
const { workerCount = 4 } = options;
78+
const workerPath = path.join(
79+
import.meta.dirname,
80+
"NpmTarballWorkerScript.js"
81+
);
82+
83+
for (let i = 0; i < workerCount; i++) {
84+
const worker = new PooledWorker(workerPath, {
85+
onComplete: (worker, result) => this.#onWorkerComplete(worker, result),
86+
onError: (worker, error) => this.#onWorkerError(worker, error)
87+
});
88+
89+
this.#workers.push(worker);
90+
}
91+
}
92+
93+
#onWorkerComplete(
94+
worker: PooledWorker,
95+
result: WorkerTaskResult
96+
): void {
97+
const handler = this.#processingTasks.get(result.id);
98+
if (handler) {
99+
this.#processingTasks.delete(result.id);
100+
handler.resolve(result);
101+
}
102+
103+
const nextTask = this.#waitingTasks.shift();
104+
if (nextTask) {
105+
worker.execute(nextTask);
106+
}
107+
}
108+
109+
#onWorkerError(
110+
worker: PooledWorker,
111+
error: Error
112+
): void {
113+
const taskId = worker.currentTaskId;
114+
if (taskId) {
115+
const handler = this.#processingTasks.get(taskId);
116+
if (handler) {
117+
this.#processingTasks.delete(taskId);
118+
handler.reject(error);
119+
}
120+
}
121+
122+
this.emit("error", error);
123+
const nextTask = this.#waitingTasks.shift();
124+
if (nextTask) {
125+
worker.execute(nextTask);
126+
}
127+
}
128+
129+
scan(
130+
task: Omit<WorkerTask, "id">
131+
): Promise<WorkerTaskResult> {
132+
if (this.#isTerminated) {
133+
return Promise.reject(
134+
new Error("NpmTarballWorkerPool has been terminated")
135+
);
136+
}
137+
138+
const fullTask: WorkerTask = {
139+
id: this.#generateTaskId(),
140+
...task
141+
};
142+
143+
const { promise, resolve, reject } = Promise.withResolvers<WorkerTaskResult>();
144+
this.#processingTasks.set(fullTask.id, { resolve, reject });
145+
146+
const availableWorker = this.#workers.find((worker) => worker.isAvailable) ?? null;
147+
if (availableWorker) {
148+
availableWorker.execute(fullTask);
149+
}
150+
else {
151+
this.#waitingTasks.push(fullTask);
152+
}
153+
154+
return promise;
155+
}
156+
157+
async terminate(): Promise<void> {
158+
this.#isTerminated = true;
159+
160+
const terminationError = new Error("NpmTarballWorkerPool terminated");
161+
for (const handler of this.#processingTasks.values()) {
162+
handler.reject(terminationError);
163+
}
164+
this.#processingTasks.clear();
165+
this.#waitingTasks = [];
166+
167+
await Promise.all(
168+
this.#workers.map((worker) => worker.terminate())
169+
);
170+
this.#workers = [];
171+
}
172+
173+
[Symbol.asyncDispose](): Promise<void> {
174+
return this.terminate();
175+
}
176+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// Import Node.js Dependencies
2+
import { parentPort } from "node:worker_threads";
3+
import path from "node:path";
4+
5+
// Import Third-party Dependencies
6+
import { ManifestManager } from "@nodesecure/mama";
7+
import type { Warning } from "@nodesecure/js-x-ray";
8+
9+
// Import Internal Dependencies
10+
import { NpmTarball } from "./NpmTarball.class.ts";
11+
import {
12+
isSensitiveFile,
13+
booleanToFlags
14+
} from "../utils/index.ts";
15+
import {
16+
getEmptyPackageWarning,
17+
getSemVerWarning
18+
} from "../warnings.ts";
19+
20+
import type {
21+
WorkerTask,
22+
WorkerTaskResult,
23+
ScanResultPayload
24+
} from "./NpmTarballWorkerPool.class.ts";
25+
26+
// CONSTANTS
27+
const kNativeCodeExtensions = new Set([".gyp", ".c", ".cpp", ".node", ".so", ".h"]);
28+
29+
async function scanPackageInWorker(
30+
task: WorkerTask
31+
): Promise<ScanResultPayload> {
32+
const { location, astAnalyserOptions } = task;
33+
34+
const mama = await ManifestManager.fromPackageJSON(
35+
location
36+
);
37+
const tarex = new NpmTarball(mama);
38+
39+
const {
40+
composition,
41+
conformance,
42+
code
43+
} = await tarex.scanFiles(astAnalyserOptions);
44+
45+
const warnings: Warning[] = [];
46+
47+
// Check for empty package
48+
if (
49+
composition.files.length === 1 &&
50+
composition.files.includes("package.json")
51+
) {
52+
warnings.push(getEmptyPackageWarning());
53+
}
54+
55+
// Check for zero semver
56+
if (mama.hasZeroSemver) {
57+
warnings.push(getSemVerWarning(mama.document.version!));
58+
}
59+
60+
warnings.push(...code.warnings);
61+
62+
const {
63+
files,
64+
dependencies,
65+
flags
66+
} = code.groupAndAnalyseDependencies(mama);
67+
68+
const computedFlags = booleanToFlags({
69+
...flags,
70+
hasExternalCapacity: code.flags.hasExternalCapacity || flags.hasExternalCapacity,
71+
hasNoLicense: conformance.uniqueLicenseIds.length === 0,
72+
hasMultipleLicenses: conformance.uniqueLicenseIds.length > 1,
73+
hasMinifiedCode: code.minified.length > 0,
74+
hasWarnings: warnings.length > 0,
75+
hasBannedFile: composition.files.some((filePath) => isSensitiveFile(filePath)),
76+
hasNativeCode: mama.flags.isNative ||
77+
composition.files.some((file) => kNativeCodeExtensions.has(path.extname(file))),
78+
hasScript: mama.flags.hasUnsafeScripts
79+
});
80+
const {
81+
description, engines, repository, scripts
82+
} = mama.document;
83+
84+
return {
85+
description,
86+
engines,
87+
repository,
88+
scripts,
89+
author: mama.author,
90+
integrity: mama.isWorkspace ? null : mama.integrity,
91+
type: mama.moduleType,
92+
size: composition.size,
93+
licenses: conformance.licenses,
94+
uniqueLicenseIds: conformance.uniqueLicenseIds,
95+
warnings,
96+
flags: Array.from(computedFlags),
97+
composition: {
98+
extensions: [...composition.ext],
99+
files: composition.files,
100+
minified: code.minified,
101+
unused: dependencies.unused,
102+
missing: dependencies.missing,
103+
required_files: [...files],
104+
required_nodejs: dependencies.nodejs,
105+
required_thirdparty: dependencies.thirdparty,
106+
required_subpath: dependencies.subpathImports
107+
}
108+
};
109+
}
110+
111+
parentPort?.on("message", async(task: WorkerTask) => {
112+
const result: WorkerTaskResult = {
113+
id: task.id,
114+
location: task.location,
115+
result: null,
116+
error: null
117+
};
118+
119+
try {
120+
result.result = await scanPackageInWorker(task);
121+
}
122+
catch (error) {
123+
result.error = error instanceof Error ?
124+
error.message :
125+
String(error);
126+
}
127+
128+
parentPort?.postMessage(result);
129+
});
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Import Node.js Dependencies
2+
import { Worker } from "node:worker_threads";
3+
4+
// Import Internal Dependencies
5+
import type {
6+
WorkerTask,
7+
WorkerTaskResult
8+
} from "./NpmTarballWorkerPool.class.ts";
9+
10+
export interface PooledWorkerEvents {
11+
onComplete: (worker: PooledWorker, result: WorkerTaskResult) => void;
12+
onError: (worker: PooledWorker, error: Error) => void;
13+
}
14+
15+
export class PooledWorker {
16+
#worker: Worker;
17+
#currentTaskId: string | null = null;
18+
#events: PooledWorkerEvents;
19+
20+
constructor(
21+
workerPath: string,
22+
events: PooledWorkerEvents
23+
) {
24+
this.#events = events;
25+
this.#worker = new Worker(workerPath);
26+
27+
this.#worker.on("message", (message: WorkerTaskResult) => {
28+
this.#currentTaskId = null;
29+
this.#events.onComplete(this, message);
30+
});
31+
32+
this.#worker.on("error", (error: Error) => {
33+
this.#currentTaskId = null;
34+
this.#events.onError(this, error);
35+
});
36+
}
37+
38+
get isAvailable(): boolean {
39+
return this.#currentTaskId === null;
40+
}
41+
42+
get currentTaskId(): string | null {
43+
return this.#currentTaskId;
44+
}
45+
46+
execute(
47+
task: WorkerTask
48+
): void {
49+
if (!this.isAvailable) {
50+
throw new Error(`Worker is busy with task ${this.#currentTaskId}`);
51+
}
52+
53+
this.#currentTaskId = task.id;
54+
this.#worker.postMessage(task);
55+
}
56+
57+
terminate(): Promise<number> {
58+
return this.#worker.terminate();
59+
}
60+
}

workspaces/tarball/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export * from "./tarball.ts";
22
export * from "./class/NpmTarball.class.ts";
3+
export * from "./class/NpmTarballWorkerPool.class.ts";

0 commit comments

Comments
 (0)