Skip to content
Closed
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@
"typescript": "5.2.2"
},
"dependencies": {
"@biothings-explorer/query_graph_handler": "workspace:../query_graph_handler",
"@biothings-explorer/smartapi-kg": "workspace:../smartapi-kg",
"@biothings-explorer/types": "workspace:../types",
"@biothings-explorer/utils": "workspace:../utils",
"@biothings-explorer/query_graph_handler": "workspace:*",
"@biothings-explorer/smartapi-kg": "workspace:*",
"@biothings-explorer/types": "workspace:*",
"@biothings-explorer/utils": "workspace:*",
"@biothings-explorer/call-apis": "workspace:*",
"@bull-board/api": "^5.9.1",
"@bull-board/express": "^5.9.1",
"@opentelemetry/api": "^1.7.0",
Expand Down
6 changes: 3 additions & 3 deletions src/controllers/threading/taskHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import * as Sentry from "@sentry/node";
import { ProfilingIntegration } from "@sentry/profiling-node";
import OpenTelemetry, { Span } from "@opentelemetry/api";
import { Telemetry } from "@biothings-explorer/utils";
import { InnerTaskData } from "@biothings-explorer/types";
import { InnerTaskData, ThreadMessage } from "@biothings-explorer/types";

// use SENTRY_DSN environment variable
try {
Expand Down Expand Up @@ -58,8 +58,8 @@ async function runTask({

global.SCHEMA_VERSION = "1.5.0";

global.parentPort = port;
port.postMessage({ threadId, registerId: true });
global.workerSide = port;
port.postMessage({ threadId, type: 'registerId' } satisfies ThreadMessage);
global.cachingTasks = [];

global.queryInformation = {
Expand Down
166 changes: 99 additions & 67 deletions src/controllers/threading/threadHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import { Telemetry } from "@biothings-explorer/utils";
import ErrorHandler from "../../middlewares/error";
import { Request, Response } from "express";
import { BullJob, PiscinaWaitTime, ThreadPool } from "../../types";
import { FrozenSubquery, Subquery, SubqueryRelay } from "@biothings-explorer/call-apis";
import { TaskInfo, InnerTaskData, QueryHandlerOptions } from "@biothings-explorer/types";
import { DialHome, TrapiQuery, TrapiResponse } from "@biothings-explorer/types";
import { ThreadMessage, TrapiQuery, TrapiResponse } from "@biothings-explorer/types";
import { Queue } from "bull";

const SYNC_MIN_CONCURRENCY = 2;
Expand Down Expand Up @@ -98,24 +99,26 @@ if (!global.threadpool && !Piscina.isWorkerThread && !(process.env.USE_THREADING
} as ThreadPool;
}

async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: string, job?: BullJob): Promise<DialHome> {
const subqueryRelay = new SubqueryRelay();

async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: string, job?: BullJob): Promise<ThreadMessage> {
return new Promise((resolve, reject) => {
let workerThreadID: string;
const abortController = new AbortController();
const { port1: toWorker, port2: fromWorker } = new MessageChannel();
const { port1: workerSide, port2: parentSide } = new MessageChannel();

// get otel context

const otelData: Partial<{ traceparent: string; tracestate: string }> = {};
propagation.inject(context.active(), otelData);
const { traceparent, tracestate } = otelData;

const taskData: InnerTaskData = { req: taskInfo, route, traceparent, tracestate, port: toWorker };
const taskData: InnerTaskData = { req: taskInfo, route, traceparent, tracestate, port: workerSide };
taskData.req.data.options = {...taskData.req.data.options, metakg: global.metakg?.ops, smartapi: global.smartapi} as QueryHandlerOptions;

// Propagate data between task runner and bull job
if (job) taskData.job = { jobId: job.id, queueName: job.queue.name };
const task = pool.run(taskData, { signal: abortController.signal, transferList: [toWorker] });
const task = pool.run(taskData, { signal: abortController.signal, transferList: [workerSide] });
if (job) {
void job.update({ ...job.data, abortController });
}
Expand Down Expand Up @@ -148,42 +151,48 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri
});

let reqDone = false;
let cacheInProgress = 0;
const cacheKeys: {
[cacheKey: string]: boolean;
} = {};

const timeout = parseInt(process.env.REQUEST_TIMEOUT ?? (60 * 5).toString()) * 1000;

fromWorker.on("message", (msg: DialHome) => {
if (msg.cacheInProgress) {
// Cache handler has started caching
cacheInProgress += 1;
} else if (msg.addCacheKey) {
// Hashed edge id cache in progress
cacheKeys[msg.addCacheKey] = false;
} else if (msg.completeCacheKey) {
// Hashed edge id cache complete
cacheKeys[msg.completeCacheKey] = true;
} else if (msg.registerId) {
// Worker registers itself for better tracking
workerThreadID = String(msg.threadId);
if (job) {
void job.update({ ...job.data, threadId });
}
} else if (typeof msg.cacheDone !== "undefined") {
cacheInProgress = msg.cacheDone
? cacheInProgress - 1 // A caching handler has finished caching
: 0; // Caching has been entirely cancelled
} else if (typeof msg.result !== "undefined") {
// Request has finished with a message
reqDone = true;
resolve(msg);
} else if (msg.err) {
// Request has resulted in a catchable error
reqDone = true;
reject(msg.err);
parentSide.on("message", async (msg: ThreadMessage) => {
switch (msg.type) {
default:
debug(`WARNING: received untyped message from thread {msg.threadId}`);
break;
case "result":
reqDone = true;
resolve(msg);
break;
case "error":
reqDone = true;
reject(msg.value as Error);
break;
case "registerId":
workerThreadID = String(msg.threadId);
if (job) {
void job.update({ ...job.data, threadId });
}
break;
case "subqueryRequest":
const { queries, options } = msg.value as {
queries: FrozenSubquery[];
options: QueryHandlerOptions;
};
debug(`Main thread receives ${queries.length} subqueries from worker.`);
subqueryRelay.subscribe(
await Promise.all(queries.map(async query => await Subquery.unfreeze(query))),
options,
({ hash, records, logs, apiUnavailable }) => {
parentSide.postMessage({
threadId: 0,
type: "subQueryResult",
value: { hash, records, logs, apiUnavailable },
} satisfies ThreadMessage);
},
);
break;
}
if (reqDone && cacheInProgress <= 0 && job) {
if (reqDone && job) {
void job.progress(100);
}
});
Expand All @@ -192,17 +201,6 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri
// TODO better timeout handling for async?
if (timeout && pool !== global.threadpool.async) {
setTimeout(() => {
// Clean up any incompletely cached hashes to avoid issues pulling from cache
const activeKeys = Object.entries(cacheKeys)
.filter(([, complete]) => !complete)
.map(([key]) => key);
if (activeKeys.length) {
try {
void redisClient.client.delTimeout(activeKeys);
} catch (error) {
null;
}
}
abortController.abort();
reject(
new Error(
Expand Down Expand Up @@ -241,9 +239,44 @@ export async function runTask(req: Request, res: Response, route: string, useBul
}

if (process.env.USE_THREADING === "false") {
// Threading disabled, just use the provided function in main event loop
const response = (await tasks[route](taskInfo)) as TrapiResponse;
return response;
// Set up "inter thread messaging"
const { port1: workerSide, port2: parentSide } = new MessageChannel();
global.workerSide = workerSide;

// start task
tasks[route](taskInfo);
return new Promise((resolve, reject) => {
parentSide.on("message", async (msg: ThreadMessage) => {
switch (msg.type) {
case "subqueryRequest":
const { queries, options } = msg.value as {
queries: FrozenSubquery[];
options: QueryHandlerOptions;
};
debug(`Main thread receives ${queries.length} subqueries from worker.`);
subqueryRelay.subscribe(
await Promise.all(queries.map(async query => await Subquery.unfreeze(query))),
options,
({ hash, records, logs, apiUnavailable }) => {
parentSide.postMessage({
threadId: 0,
type: "subQueryResult",
value: { hash, records, logs, apiUnavailable },
} satisfies ThreadMessage);
},
);
break;
case "result":
resolve(msg.value as TrapiResponse);
parentSide.close();
break;
case "error":
reject(msg.value as Error);
parentSide.close();
break;
}
});
});
} else if (!(queryQueue && useBullSync)) {
// Redis unavailable or query not to sync queue such as asyncquery_status
const response = await queueTaskToWorkers(
Expand All @@ -252,13 +285,13 @@ export async function runTask(req: Request, res: Response, route: string, useBul
route,
);

if (typeof response.result !== "undefined") {
if (response.type === 'result') {
if (response.status) {
res?.status(response.status as number);
}
return response.result ? response.result : undefined; // null msg means keep response body empty
} else if (response.err) {
throw response.err;
return response.value ? (response.value as TrapiResponse) : undefined; // null msg means keep response body empty
} else if (response.type === "error") {
throw response.value as Error;
} else {
throw new Error("Threading Error: Task resolved without message");
}
Expand Down Expand Up @@ -325,30 +358,29 @@ export async function runBullJob(job: BullJob, route: string, useAsync = true) {
route,
job,
);
if (typeof response.result !== "undefined") {
return response.result ? response.result : undefined; // null result means keep response body empty
} else if (response.err) {
throw response.err;
if (response.type === "result") {
return response.value ? (response.value as TrapiResponse) : undefined; // null result means keep response body empty
} else if (response.type === "error") {
throw response.value as Error;
} else {
throw new Error("Threading Error: Task resolved without message");
}
}

export function taskResponse<T>(response: T, status: string | number = undefined): T {
if (global.parentPort) {
global.parentPort.postMessage({ threadId, result: response, status: status });
export function taskResponse<T>(response: T, status: number = undefined): T {
if (global.workerSide) {
global.workerSide.postMessage({ threadId, type: 'result', value: response, status } satisfies ThreadMessage);
return undefined;
} else {
return response;
}
return response;
}

export function taskError(error: Error): void {
if (global.parentPort) {
if (global.workerSide) {
if (ErrorHandler.shouldHandleError(error)) {
Telemetry.captureException(error);
}
global.parentPort.postMessage({ threadId, err: error });
global.workerSide.postMessage({ threadId, type: 'error', value: error } satisfies ThreadMessage);
return undefined;
} else {
throw error;
Expand Down
3 changes: 1 addition & 2 deletions src/middlewares/error.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import swaggerValidation from "./validate";
import { InvalidQueryGraphError, NotImplementedError } from "@biothings-explorer/query_graph_handler";
import PredicatesLoadingError from "../utils/errors/predicates_error";
import MetaKGLoadingError from "../utils/errors/metakg_error";
import ServerOverloadedError from "../utils/errors/server_overloaded_error";
Expand All @@ -8,7 +7,7 @@ const debug = Debug("bte:biothings-explorer-trapi:error_handler");
import * as Sentry from "@sentry/node";
import { Express, NextFunction, Request, Response } from "express";
import StatusError from "../utils/errors/status_error";
import { TrapiResponse } from "@biothings-explorer/types";
import { TrapiResponse, InvalidQueryGraphError, NotImplementedError } from "@biothings-explorer/types";

class ErrorHandler {
shouldHandleError(error: Error) {
Expand Down