Skip to content

Commit

Permalink
wip: adding Queue and Scheduler to PolykeyAgent
Browse files Browse the repository at this point in the history
  • Loading branch information
tegefaulkes committed Sep 6, 2022
1 parent f88b157 commit 47df43d
Showing 1 changed file with 53 additions and 0 deletions.
53 changes: 53 additions & 0 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import * as errors from './errors';
import * as utils from './utils';
import * as keysUtils from './keys/utils';
import * as nodesUtils from './nodes/utils';
import Scheduler from './tasks/Scheduler';
// FIXME: update name to queue, currently conflicts
import TaskQueue from './tasks/Queue';

type NetworkConfig = {
forwardHost?: Host;
Expand Down Expand Up @@ -87,6 +90,8 @@ class PolykeyAgent {
acl,
gestaltGraph,
proxy,
taskQueue,
taskScheduler,
nodeGraph,
queue,
nodeConnectionManager,
Expand Down Expand Up @@ -134,6 +139,8 @@ class PolykeyAgent {
acl?: ACL;
gestaltGraph?: GestaltGraph;
proxy?: Proxy;
taskQueue?: TaskQueue;
taskScheduler?: Scheduler;
nodeGraph?: NodeGraph;
queue?: Queue;
nodeConnectionManager?: NodeConnectionManager;
Expand Down Expand Up @@ -285,6 +292,26 @@ class PolykeyAgent {
keyManager,
logger: logger.getChild(NodeGraph.name),
}));
taskQueue =
taskQueue ??
(await TaskQueue.createQueue({
db,
keyManager,
concurrencyLimit: 3,
delay: true,
fresh,
handlers: {},
logger,
}));
taskScheduler =
taskScheduler ??
(await Scheduler.createScheduler({
db,
queue: taskQueue,
delay: true,
fresh,
logger,
}));
queue =
queue ??
new Queue({
Expand Down Expand Up @@ -373,6 +400,8 @@ class PolykeyAgent {
await notificationsManager?.stop();
await vaultManager?.stop();
await discovery?.stop();
await taskScheduler?.stop();
await taskQueue?.stop();
await proxy?.stop();
await gestaltGraph?.stop();
await acl?.stop();
Expand All @@ -396,6 +425,8 @@ class PolykeyAgent {
gestaltGraph,
proxy,
nodeGraph,
taskQueue,
taskScheduler,
queue,
nodeConnectionManager,
nodeManager,
Expand Down Expand Up @@ -429,6 +460,8 @@ class PolykeyAgent {
public readonly gestaltGraph: GestaltGraph;
public readonly proxy: Proxy;
public readonly nodeGraph: NodeGraph;
public readonly taskQueue: TaskQueue;
public readonly taskScheduler: Scheduler;
public readonly queue: Queue;
public readonly nodeConnectionManager: NodeConnectionManager;
public readonly nodeManager: NodeManager;
Expand All @@ -454,6 +487,8 @@ class PolykeyAgent {
gestaltGraph,
proxy,
nodeGraph,
taskQueue,
taskScheduler,
queue,
nodeConnectionManager,
nodeManager,
Expand All @@ -478,6 +513,8 @@ class PolykeyAgent {
gestaltGraph: GestaltGraph;
proxy: Proxy;
nodeGraph: NodeGraph;
taskQueue: TaskQueue;
taskScheduler: Scheduler;
queue: Queue;
nodeConnectionManager: NodeConnectionManager;
nodeManager: NodeManager;
Expand All @@ -504,6 +541,8 @@ class PolykeyAgent {
this.proxy = proxy;
this.discovery = discovery;
this.nodeGraph = nodeGraph;
this.taskQueue = taskQueue;
this.taskScheduler = taskScheduler;
this.queue = queue;
this.nodeConnectionManager = nodeConnectionManager;
this.nodeManager = nodeManager;
Expand Down Expand Up @@ -667,6 +706,8 @@ class PolykeyAgent {
proxyPort: networkConfig_.proxyPort,
tlsConfig,
});
await this.taskQueue.start({ fresh });
await this.taskScheduler.start({ fresh });
await this.queue.start();
await this.nodeManager.start();
await this.nodeConnectionManager.start({ nodeManager: this.nodeManager });
Expand All @@ -676,6 +717,8 @@ class PolykeyAgent {
await this.vaultManager.start({ fresh });
await this.notificationsManager.start({ fresh });
await this.sessionManager.start({ fresh });
await this.taskQueue.startTasks();
await this.taskScheduler.startDispatching();
await this.status.finishStart({
pid: process.pid,
nodeId: this.keyManager.getNodeId(),
Expand All @@ -693,11 +736,15 @@ class PolykeyAgent {
this.logger.warn(`Failed Starting ${this.constructor.name}`);
this.events.removeAllListeners();
await this.status?.beginStop({ pid: process.pid });
await this.taskScheduler.stopDispatching();
await this.taskQueue.stopTasks();
await this.sessionManager?.stop();
await this.notificationsManager?.stop();
await this.vaultManager?.stop();
await this.discovery?.stop();
await this.queue?.stop();
await this.taskScheduler?.stop();
await this.taskQueue?.stop();
await this.nodeGraph?.stop();
await this.nodeConnectionManager?.stop();
await this.nodeManager?.stop();
Expand All @@ -723,6 +770,8 @@ class PolykeyAgent {
this.logger.info(`Stopping ${this.constructor.name}`);
this.events.removeAllListeners();
await this.status.beginStop({ pid: process.pid });
await this.taskScheduler.stopDispatching();
await this.taskQueue.stopTasks();
await this.sessionManager.stop();
await this.notificationsManager.stop();
await this.vaultManager.stop();
Expand All @@ -731,6 +780,8 @@ class PolykeyAgent {
await this.nodeGraph.stop();
await this.nodeManager.stop();
await this.queue.stop();
await this.taskScheduler?.stop();
await this.taskQueue?.stop();
await this.proxy.stop();
await this.grpcServerAgent.stop();
await this.grpcServerClient.stop();
Expand All @@ -755,6 +806,8 @@ class PolykeyAgent {
await this.discovery.destroy();
await this.nodeGraph.destroy();
await this.gestaltGraph.destroy();
await this.taskScheduler.destroy();
await this.taskQueue.destroy();
await this.acl.destroy();
await this.sigchain.destroy();
await this.identitiesManager.destroy();
Expand Down

0 comments on commit 47df43d

Please sign in to comment.