Skip to content

Commit

Permalink
Master and Worker Nodes POC (#27)
Browse files Browse the repository at this point in the history
* Master and Worker Nodes POC

* Update constants.ts

---------

Co-authored-by: Vicente Eduardo Ferrer Garcia <[email protected]>
  • Loading branch information
Creatoon and viferga authored Feb 20, 2024
1 parent 16f69a6 commit ee48066
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 27 deletions.
71 changes: 61 additions & 10 deletions src/api.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { spawn } from 'child_process';
import { hostname } from 'os';
import * as path from 'path';

Expand All @@ -7,10 +8,13 @@ import upload from './controller/upload';

import {
allApplications,
childProcessResponse,
cps,
currentFile,
deployBody,
fetchBranchListBody,
fetchFilesFromRepoBody
fetchFilesFromRepoBody,
protocol
} from './constants';

import AppError from './utils/appError';
Expand All @@ -22,10 +26,10 @@ import {
ensureFolderExists,
execPromise,
exists,
installDependencies
installDependencies,
isIAllApps
} from './utils/utils';

import { handleJSONFiles } from './controller/deploy';
import { appsDirectory } from './utils/config';

const appsDir = appsDirectory();
Expand All @@ -34,7 +38,7 @@ export const callFnByName = (
req: Request,
res: Response,
next: NextFunction
): Response => {
): Response | void => {
if (!(req.params && req.params.name))
next(
new AppError(
Expand All @@ -46,7 +50,35 @@ export const callFnByName = (
const { appName: app, name } = req.params;
const args = Object.values(req.body);

return res.send(JSON.stringify(allApplications[app].funcs[name](...args)));
let responseSent = false; // Flag to track if response has been sent
let errorCame = false;

cps[app].send({
type: protocol.c,
fn: {
name,
args
}
});

cps[app].on('message', (data: childProcessResponse) => {
if (!responseSent) {
// Check if response has already been sent
if (data.type === protocol.r) {
responseSent = true; // Set flag to true to indicate response has been sent
return res.send(JSON.stringify(data.data));
} else {
errorCame = true;
}
}
});

// Default response in case the 'message' event is not triggered
if (!responseSent && errorCame) {
responseSent = true; // Set flag to true to indicate response has been sent
errorCame = false;
return res.send('Function calling error');
}
};

export const serveStatic = catchAsync(
Expand Down Expand Up @@ -177,11 +209,30 @@ export const deploy = catchAsync(

await installDependencies();

await handleJSONFiles(
currentFile.path,
currentFile.id,
req.body.version
);
const desiredPath = path.join(__dirname, '/worker/index.js');

const proc = spawn('metacall', [desiredPath], {
stdio: ['pipe', 'pipe', 'pipe', 'ipc']
});

proc.send({
type: protocol.l,
currentFile
});

// proc.stdout?.on('data', (data: Buffer) => {
// console.log('CP console log: -->>', data.toString());
// });

proc.on('message', (data: childProcessResponse) => {
if (data.type === protocol.g) {
if (isIAllApps(data.data)) {
const appName = Object.keys(data.data)[0];
cps[appName] = proc;
allApplications[appName] = data.data[appName];
}
}
});

res.status(200).json({
suffix: hostname(),
Expand Down
30 changes: 26 additions & 4 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { DeployStatus, MetaCallJSON } from '@metacall/protocol/deployment';
import { ChildProcess } from 'child_process';

export interface currentUploadedFile {
id: string;
Expand Down Expand Up @@ -57,6 +58,7 @@ export interface IApp {
suffix: string;
version: string;
packages: tpackages;
ports: number[];
}

export class App implements IApp {
Expand All @@ -65,24 +67,44 @@ export class App implements IApp {
public suffix: string;
public version: string;
public packages: tpackages;
public ports: number[];

constructor(
status: DeployStatus,
prefix: string,
suffix: string,
version: string,
packages: tpackages
packages: tpackages,
ports: number[]
) {
this.status = status;
this.prefix = prefix;
this.suffix = suffix;
this.version = version;
this.packages = packages;
this.ports = ports;
}
}

type IAppWithFunctions = IApp & {
funcs: Record<string, (...args: any[]) => any>; // eslint-disable-line
export type IAppWithFunctions = IApp & {
funcs: string[];
};

export const allApplications: Record<string, IAppWithFunctions> = {};
export type IAllApps = Record<string, IAppWithFunctions>;

export const allApplications: IAllApps = {};

export const protocol = {
i: 'installDependencies',
l: 'loadFunctions',
g: 'getApplicationMetadata',
c: 'callFunction',
r: 'functionInvokeResult'
};

export const cps: { [key: string]: ChildProcess } = {};

export interface childProcessResponse {
type: keyof typeof protocol;
data: unknown;
}
12 changes: 10 additions & 2 deletions src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import { platform } from 'os';
import { join } from 'path';

import { LanguageId, MetaCallJSON } from '@metacall/protocol/deployment';
import { PackageError, generatePackage } from '@metacall/protocol/package';
import { generatePackage, PackageError } from '@metacall/protocol/package';
import { NextFunction, Request, RequestHandler, Response } from 'express';

import { createInstallDependenciesScript, currentFile } from '../constants';
import {
createInstallDependenciesScript,
currentFile,
IAllApps
} from '../constants';

export const dirName = (gitUrl: string): string =>
String(gitUrl.split('/')[gitUrl.split('/').length - 1]).replace('.git', '');
Expand Down Expand Up @@ -131,3 +135,7 @@ export const diff = (object1: any, object2: any): any => {

return object1; // eslint-disable-line
};

export function isIAllApps(data: any): data is IAllApps {

Check warning on line 139 in src/utils/utils.ts

View workflow job for this annotation

GitHub Actions / ci

Argument 'data' should be typed with a non-any type

Check warning on line 139 in src/utils/utils.ts

View workflow job for this annotation

GitHub Actions / ci

Unexpected any. Specify a different type

Check warning on line 139 in src/utils/utils.ts

View workflow job for this annotation

GitHub Actions / ci

Argument 'data' should be typed with a non-any type

Check warning on line 139 in src/utils/utils.ts

View workflow job for this annotation

GitHub Actions / ci

Unexpected any. Specify a different type

Check warning on line 139 in src/utils/utils.ts

View workflow job for this annotation

GitHub Actions / ci

Argument 'data' should be typed with a non-any type

Check warning on line 139 in src/utils/utils.ts

View workflow job for this annotation

GitHub Actions / ci

Unexpected any. Specify a different type

Check warning on line 139 in src/utils/utils.ts

View workflow job for this annotation

GitHub Actions / ci

Argument 'data' should be typed with a non-any type

Check warning on line 139 in src/utils/utils.ts

View workflow job for this annotation

GitHub Actions / ci

Unexpected any. Specify a different type
return typeof data === 'object' && data !== null;
}
60 changes: 49 additions & 11 deletions src/controller/deploy.ts → src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,27 @@ import {
metacall_load_from_configuration_export
} from 'metacall';
import { hostname } from 'os';
import { allApplications, App, currentFile } from '../constants';
import { createMetacallJsonFile, diff, getLangId } from '../utils/utils';
import {
App,
IAppWithFunctions,
currentUploadedFile,
protocol
} from '../constants';

import { createMetacallJsonFile, diff } from '../utils/utils';

let currentFile: currentUploadedFile = {
id: '',
type: '',
jsons: [],
runners: [],
path: ''
};

let allApplications: Record<string, IAppWithFunctions> = {};
let exactFnx: Record<string, (...args: any[]) => any>;

export const handleNoJSONFile = (
const handleNoJSONFile = (
jsonPaths: string[],
suffix: string,
version: string
Expand All @@ -19,18 +36,17 @@ export const handleNoJSONFile = (
hostname(),
suffix,
version,
{}
{},
[]
);

// TODO:
// type Modules = { module_ptr, funcs }[];
// const modules = jsonPaths.map(path => metacall_load_from_configuration_export(path));

const funcs = {};
let funcs: string[] = [];

jsonPaths.forEach(path => {
const previousInspect = metacall_inspect();
Object.assign(funcs, metacall_load_from_configuration_export(path));
exactFnx = metacall_load_from_configuration_export(path);
funcs = Object.keys(exactFnx);

const newInspect = metacall_inspect();
const inspect = diff(newInspect, previousInspect);
const langId = require(path).language_id;
Expand All @@ -46,10 +62,18 @@ export const handleNoJSONFile = (

currentApp.status = 'ready';
allApplications[currentApp.suffix] = { ...currentApp, funcs };

if (process.send) {
process.send({
type: protocol.g,
data: allApplications
});
}

currentApp = undefined;
};

export const handleJSONFiles = async (
const handleJSONFiles = async (
path: string,
suffix: string,
version: string
Expand All @@ -63,3 +87,17 @@ export const handleJSONFiles = async (
// FIXME Currently it do not support metacall.json syntax, else metacall-{runner}.json is fine and will work
handleNoJSONFile(jsonPath, suffix, version);
};

process.on('message', payload => {
if (payload.type === protocol.l) {
currentFile = payload.currentFile;
handleJSONFiles(currentFile.path, currentFile.id, 'v1');
} else if (payload.type === protocol.c) {
if (process.send) {
process.send({
type: protocol.r,
data: exactFnx[payload.fn.name](...payload.fn.args)
});
}
}
});

0 comments on commit ee48066

Please sign in to comment.