Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify worker class #342

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc",
"test": "./node_modules/.bin/jest",
"coverage": "./node_modules/.bin/jest --coverage",
"dev": "./node_modules/.bin/tsc --watch",
"lint": "./node_modules/.bin/tslint -c tslint.json --project test/_config/tsconfig.json \"src/**/*.ts\" \"test/**/*.ts\""
"test": "jest",
"coverage": "jest --coverage",
"dev": "tsc --watch",
"lint": "tslint -c tslint.json --project test/_config/tsconfig.json \"src/**/*.ts\" \"test/**/*.ts\""
},
"author": "Thomas Dondorf",
"repository": {
Expand Down
2 changes: 0 additions & 2 deletions src/Cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ export default class Cluster<JobData = any, ReturnData = any> extends EventEmitt
}

const worker = new Worker<JobData, ReturnData>({
cluster: this,
args: [''], // this.options.args,
browser: workerBrowserInstance,
id: workerId,
});
Expand Down
75 changes: 28 additions & 47 deletions src/Worker.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@

import Job from './Job';
import Cluster, { TaskFunction } from './Cluster';
import { Page } from 'puppeteer';
import { timeoutExecute, debugGenerator, log } from './util';
import { TaskFunction } from './Cluster';
import { debugGenerator, log, timeoutExecute } from './util';
import { inspect } from 'util';
import { WorkerInstance, JobInstance } from './concurrency/ConcurrencyImplementation';
import { WorkerInstance } from './concurrency/ConcurrencyImplementation';

const debug = debugGenerator('Worker');

const DEFAULT_OPTIONS = {
args: [],
};

interface WorkerOptions {
cluster: Cluster;
args: string[];
id: number;
browser: WorkerInstance;
}
Expand All @@ -33,18 +25,23 @@ export interface WorkData {

export type WorkResult = WorkError | WorkData;

export default class Worker<JobData, ReturnData> implements WorkerOptions {
const success = (data: any): WorkData => ({
data,
type: 'success',
});

const error = (errorState?: Error): WorkError => ({
type: 'error',
error: errorState || new Error(),
});

cluster: Cluster;
args: string[];
export default class Worker<JobData, ReturnData> {
id: number;
browser: WorkerInstance;

activeTarget: Job<JobData, ReturnData> | null = null;

public constructor({ cluster, args, id, browser }: WorkerOptions) {
this.cluster = cluster;
this.args = args;
public constructor({ id, browser }: WorkerOptions) {
this.id = id;
this.browser = browser;

Expand All @@ -58,27 +55,8 @@ export default class Worker<JobData, ReturnData> implements WorkerOptions {
): Promise<WorkResult> {
this.activeTarget = job;

let jobInstance: JobInstance | null = null;
let page: Page | null = null;

let tries = 0;

while (jobInstance === null) {
try {
jobInstance = await this.browser.jobInstance();
page = jobInstance.resources.page;
} catch (err) {
debug(`Error getting browser page (try: ${tries}), message: ${err.message}`);
await this.browser.repair();
tries += 1;
if (tries >= BROWSER_INSTANCE_TRIES) {
throw new Error('Unable to get browser page');
}
}
}

// We can be sure that page is set now, otherwise an exception would've been thrown
page = page as Page; // this is just for TypeScript
const jobInstance = await this.getJobInstance();
const page = jobInstance.resources.page;

let errorState: Error | null = null;

Expand Down Expand Up @@ -119,17 +97,20 @@ export default class Worker<JobData, ReturnData> implements WorkerOptions {
}

this.activeTarget = null;
return errorState ? error(errorState) : success(result);
}

if (errorState) {
return {
type: 'error',
error: errorState || new Error('asf'),
};
private async getJobInstance() {
for (let attempt = 0; attempt < BROWSER_INSTANCE_TRIES; attempt += 1) {
try {
return await this.browser.jobInstance();
} catch (err) {
debug(`Error getting browser page (try: ${attempt}), message: ${err.message}`);
await this.browser.repair();
}
}
return {
data: result,
type: 'success',
};

throw new Error('Unable to get browser page');
}

public async close(): Promise<void> {
Expand Down