Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single Subquery Queue/Thread Messaging Changes #32

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_ws_codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
run: |
pnpm run clone
pnpm run git checkout ${{ steps.branch-name.outputs.current_branch }}
pnpm i
pnpm i || pnpm i
pnpm --filter bte-server test-cov

- name: Send coverage report to codecov for visualization
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
"@biothings-explorer/smartapi-kg": "workspace:../smartapi-kg",
"@biothings-explorer/types": "workspace:../types",
"@biothings-explorer/utils": "workspace:../utils",
"@biothings-explorer/call-apis": "workspace:../call-apis",
"@bull-board/api": "^5.9.1",
"@bull-board/express": "^5.9.1",
"@opentelemetry/api": "^1.7.0",
Expand Down
6 changes: 3 additions & 3 deletions src/controllers/threading/taskHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import * as Sentry from "@sentry/node";
import { ProfilingIntegration } from "@sentry/profiling-node";
import OpenTelemetry, { Span } from "@opentelemetry/api";
import { Telemetry } from "@biothings-explorer/utils";
import { InnerTaskData } from "@biothings-explorer/types";
import { InnerTaskData, ThreadMessage } from "@biothings-explorer/types";

// use SENTRY_DSN environment variable
try {
Expand Down Expand Up @@ -58,8 +58,8 @@ async function runTask({

global.SCHEMA_VERSION = "1.5.0";

global.parentPort = port;
port.postMessage({ threadId, registerId: true });
global.workerSide = port;
port.postMessage({ threadId, type: 'registerId' } satisfies ThreadMessage);
global.cachingTasks = [];

global.queryInformation = {
Expand Down
122 changes: 60 additions & 62 deletions src/controllers/threading/threadHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import { Telemetry } from "@biothings-explorer/utils";
import ErrorHandler from "../../middlewares/error";
import { Request, Response } from "express";
import { BullJob, PiscinaWaitTime, ThreadPool } from "../../types";
import { FrozenSubquery, Subquery, SubqueryRelay } from "@biothings-explorer/call-apis";
import { TaskInfo, InnerTaskData, QueryHandlerOptions } from "@biothings-explorer/types";
import { DialHome, TrapiQuery, TrapiResponse } from "@biothings-explorer/types";
import { ThreadMessage, TrapiQuery, TrapiResponse } from "@biothings-explorer/types";
import { Queue } from "bull";

const SYNC_MIN_CONCURRENCY = 2;
Expand Down Expand Up @@ -98,24 +99,26 @@ if (!global.threadpool && !Piscina.isWorkerThread && !(process.env.USE_THREADING
} as ThreadPool;
}

async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: string, job?: BullJob): Promise<DialHome> {
const subqueryRelay = new SubqueryRelay();

async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: string, job?: BullJob): Promise<ThreadMessage> {
return new Promise((resolve, reject) => {
let workerThreadID: string;
const abortController = new AbortController();
const { port1: toWorker, port2: fromWorker } = new MessageChannel();
const { port1: workerSide, port2: parentSide } = new MessageChannel();

// get otel context

const otelData: Partial<{ traceparent: string; tracestate: string }> = {};
propagation.inject(context.active(), otelData);
const { traceparent, tracestate } = otelData;

const taskData: InnerTaskData = { req: taskInfo, route, traceparent, tracestate, port: toWorker };
const taskData: InnerTaskData = { req: taskInfo, route, traceparent, tracestate, port: workerSide };
taskData.req.data.options = {...taskData.req.data.options, metakg: global.metakg?.ops, smartapi: global.smartapi} as QueryHandlerOptions;

// Propagate data between task runner and bull job
if (job) taskData.job = { jobId: job.id, queueName: job.queue.name };
const task = pool.run(taskData, { signal: abortController.signal, transferList: [toWorker] });
const task = pool.run(taskData, { signal: abortController.signal, transferList: [workerSide] });
if (job) {
void job.update({ ...job.data, abortController });
}
Expand Down Expand Up @@ -148,42 +151,48 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri
});

let reqDone = false;
let cacheInProgress = 0;
const cacheKeys: {
[cacheKey: string]: boolean;
} = {};

const timeout = parseInt(process.env.REQUEST_TIMEOUT ?? (60 * 5).toString()) * 1000;

fromWorker.on("message", (msg: DialHome) => {
if (msg.cacheInProgress) {
// Cache handler has started caching
cacheInProgress += 1;
} else if (msg.addCacheKey) {
// Hashed edge id cache in progress
cacheKeys[msg.addCacheKey] = false;
} else if (msg.completeCacheKey) {
// Hashed edge id cache complete
cacheKeys[msg.completeCacheKey] = true;
} else if (msg.registerId) {
// Worker registers itself for better tracking
workerThreadID = String(msg.threadId);
if (job) {
void job.update({ ...job.data, threadId });
}
} else if (typeof msg.cacheDone !== "undefined") {
cacheInProgress = msg.cacheDone
? cacheInProgress - 1 // A caching handler has finished caching
: 0; // Caching has been entirely cancelled
} else if (typeof msg.result !== "undefined") {
// Request has finished with a message
reqDone = true;
resolve(msg);
} else if (msg.err) {
// Request has resulted in a catchable error
reqDone = true;
reject(msg.err);
parentSide.on("message", async (msg: ThreadMessage) => {
switch (msg.type) {
default:
debug(`WARNING: received untyped message from thread {msg.threadId}`);
break;
case "result":
reqDone = true;
resolve(msg);
break;
case "error":
reqDone = true;
reject(msg.value as Error);
break;
case "registerId":
workerThreadID = String(msg.threadId);
if (job) {
void job.update({ ...job.data, threadId });
}
break;
case "subqueryRequest":
const { queries, options } = msg.value as {
queries: FrozenSubquery[];
options: QueryHandlerOptions;
};
debug(`Main thread receives ${queries.length} subqueries from worker.`);
subqueryRelay.subscribe(
await Promise.all(queries.map(async query => await Subquery.unfreeze(query))),
options,
({ hash, records, logs, apiUnavailable }) => {
parentSide.postMessage({
threadId: 0,
type: "subQueryResult",
value: { hash, records, logs, apiUnavailable },
} satisfies ThreadMessage);
},
);
break;
}
if (reqDone && cacheInProgress <= 0 && job) {
if (reqDone && job) {
void job.progress(100);
}
});
Expand All @@ -192,17 +201,6 @@ async function queueTaskToWorkers(pool: Piscina, taskInfo: TaskInfo, route: stri
// TODO better timeout handling for async?
if (timeout && pool !== global.threadpool.async) {
setTimeout(() => {
// Clean up any incompletely cached hashes to avoid issues pulling from cache
const activeKeys = Object.entries(cacheKeys)
.filter(([, complete]) => !complete)
.map(([key]) => key);
if (activeKeys.length) {
try {
void redisClient.client.delTimeout(activeKeys);
} catch (error) {
null;
}
}
abortController.abort();
reject(
new Error(
Expand Down Expand Up @@ -252,13 +250,13 @@ export async function runTask(req: Request, res: Response, route: string, useBul
route,
);

if (typeof response.result !== "undefined") {
if (response.type === 'result') {
if (response.status) {
res?.status(response.status as number);
}
return response.result ? response.result : undefined; // null msg means keep response body empty
} else if (response.err) {
throw response.err;
return response.value ? (response.value as TrapiResponse) : undefined; // null msg means keep response body empty
} else if (response.type === "error") {
throw response.value as Error;
} else {
throw new Error("Threading Error: Task resolved without message");
}
Expand Down Expand Up @@ -325,30 +323,30 @@ export async function runBullJob(job: BullJob, route: string, useAsync = true) {
route,
job,
);
if (typeof response.result !== "undefined") {
return response.result ? response.result : undefined; // null result means keep response body empty
} else if (response.err) {
throw response.err;
if (response.type === "result") {
return response.value ? (response.value as TrapiResponse) : undefined; // null result means keep response body empty
} else if (response.type === "error") {
throw response.value as Error;
} else {
throw new Error("Threading Error: Task resolved without message");
}
}

export function taskResponse<T>(response: T, status: string | number = undefined): T {
if (global.parentPort) {
global.parentPort.postMessage({ threadId, result: response, status: status });
export function taskResponse<T>(response: T, status: number = undefined): T {
if (global.workerSide) {
global.workerSide.postMessage({ threadId, type: 'result', value: response, status } satisfies ThreadMessage);
return undefined;
} else {
return response;
}
}

export function taskError(error: Error): void {
if (global.parentPort) {
if (global.workerSide) {
if (ErrorHandler.shouldHandleError(error)) {
Telemetry.captureException(error);
}
global.parentPort.postMessage({ threadId, err: error });
global.workerSide.postMessage({ threadId, type: 'error', value: error } satisfies ThreadMessage);
return undefined;
} else {
throw error;
Expand Down
3 changes: 1 addition & 2 deletions src/middlewares/error.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import swaggerValidation from "./validate";
import { InvalidQueryGraphError } from "@biothings-explorer/query_graph_handler";
import PredicatesLoadingError from "../utils/errors/predicates_error";
import MetaKGLoadingError from "../utils/errors/metakg_error";
import ServerOverloadedError from "../utils/errors/server_overloaded_error";
Expand All @@ -8,7 +7,7 @@ const debug = Debug("bte:biothings-explorer-trapi:error_handler");
import * as Sentry from "@sentry/node";
import { Express, NextFunction, Request, Response } from "express";
import StatusError from "../utils/errors/status_error";
import { TrapiResponse } from "@biothings-explorer/types";
import { TrapiResponse, InvalidQueryGraphError } from "@biothings-explorer/types";

class ErrorHandler {
shouldHandleError(error: Error) {
Expand Down
Loading