Skip to content

Commit

Permalink
Merge pull request #618 from MatrixAI/feature-decentralized-signalling
Browse files Browse the repository at this point in the history
Decentralized Signalling
  • Loading branch information
tegefaulkes committed Dec 4, 2023
2 parents af59e43 + 37ef800 commit 28f0001
Show file tree
Hide file tree
Showing 66 changed files with 7,297 additions and 7,188 deletions.
11 changes: 2 additions & 9 deletions jest.config.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
const os = require('os');
const path = require('path');
const fs = require('fs');
const process = require('process');
const { pathsToModuleNameMapper } = require('ts-jest');
const { compilerOptions } = require('./tsconfig');

Expand All @@ -16,10 +13,6 @@ const globals = {
projectDir: __dirname,
// Absolute directory to the test root
testDir: path.join(__dirname, 'tests'),
// Default global data directory
dataDir: fs.mkdtempSync(
path.join(os.tmpdir(), 'polykey-test-global-'),
),
// Default asynchronous test timeout
defaultTimeout: 20000,
failedConnectionTimeout: 50000,
Expand All @@ -29,8 +22,8 @@ const globals = {

// The `globalSetup` and `globalTeardown` cannot access the `globals`
// They run in their own process context
// They can receive process environment
process.env['GLOBAL_DATA_DIR'] = globals.dataDir;
// They can however receive the process environment
// Use `process.env` to set variables

module.exports = {
testEnvironment: 'node',
Expand Down
105 changes: 52 additions & 53 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import type { DeepPartial, FileSystem, ObjectEmpty } from './types';
import type { PolykeyWorkerManagerInterface } from './workers/types';
import type { TLSConfig } from './network/types';
import type { SeedNodes } from './nodes/types';
import type { NodeAddress, NodeId, SeedNodes } from './nodes/types';
import type { Key, PasswordOpsLimit, PasswordMemLimit } from './keys/types';
import path from 'path';
import process from 'process';
import Logger from '@matrixai/logger';
import { DB } from '@matrixai/db';
import { MDNS } from '@matrixai/mdns';
import {
CreateDestroyStartStop,
ready,
Expand Down Expand Up @@ -76,7 +75,8 @@ type PolykeyAgentOptions = {
rpcParserBufferSize: number;
};
nodes: {
connectionIdleTimeoutTime: number;
connectionIdleTimeoutTimeMin: number;
connectionIdleTimeoutTimeScale: number;
connectionFindConcurrencyLimit: number;
connectionConnectTimeoutTime: number;
connectionKeepAliveTimeoutTime: number;
Expand Down Expand Up @@ -160,8 +160,10 @@ class PolykeyAgent {
rpcParserBufferSize: config.defaultsSystem.rpcParserBufferSize,
},
nodes: {
connectionIdleTimeoutTime:
config.defaultsSystem.nodesConnectionIdleTimeoutTime,
connectionIdleTimeoutTimeMin:
config.defaultsSystem.nodesConnectionIdleTimeoutTimeMin,
connectionIdleTimeoutTimeScale:
config.defaultsSystem.nodesConnectionIdleTimeoutTimeScale,
connectionFindConcurrencyLimit:
config.defaultsSystem.nodesConnectionFindConcurrencyLimit,
connectionFindLocalTimeoutTime:
Expand Down Expand Up @@ -213,7 +215,6 @@ class PolykeyAgent {
let gestaltGraph: GestaltGraph | undefined;
let identitiesManager: IdentitiesManager | undefined;
let nodeGraph: NodeGraph | undefined;
let mdns: MDNS | undefined;
let nodeConnectionManager: NodeConnectionManager | undefined;
let nodeManager: NodeManager | undefined;
let discovery: Discovery | undefined;
Expand Down Expand Up @@ -330,30 +331,20 @@ class PolykeyAgent {
keyRing,
logger: logger.getChild(NodeGraph.name),
});
mdns = new MDNS({
logger: logger.getChild(MDNS.name),
});
await mdns.start({
id: keyRing.getNodeId().toBuffer().readUint16BE(),
hostname: nodesUtils.encodeNodeId(keyRing.getNodeId()),
groups: optionsDefaulted.mdns.groups,
port: optionsDefaulted.mdns.port,
});
// Remove your own node ID if provided as a seed node
const nodeIdOwnEncoded = nodesUtils.encodeNodeId(keyRing.getNodeId());
delete optionsDefaulted.seedNodes[nodeIdOwnEncoded];
nodeConnectionManager = new NodeConnectionManager({
keyRing,
nodeGraph,
tlsConfig,
mdns,
seedNodes: optionsDefaulted.seedNodes,
connectionFindConcurrencyLimit:
optionsDefaulted.nodes.connectionFindConcurrencyLimit,
connectionFindLocalTimeoutTime:
optionsDefaulted.nodes.connectionFindLocalTimeoutTime,
connectionIdleTimeoutTime:
optionsDefaulted.nodes.connectionIdleTimeoutTime,
connectionIdleTimeoutTimeMin:
optionsDefaulted.nodes.connectionIdleTimeoutTimeMin,
connectionIdleTimeoutTimeScale:
optionsDefaulted.nodes.connectionIdleTimeoutTimeScale,
connectionConnectTimeoutTime:
optionsDefaulted.nodes.connectionConnectTimeoutTime,
connectionKeepAliveTimeoutTime:
Expand All @@ -374,22 +365,12 @@ class PolykeyAgent {
nodeConnectionManager,
taskManager,
gestaltGraph,
mdnsOptions: {
groups: optionsDefaulted.mdns.groups,
port: optionsDefaulted.mdns.port,
},
logger: logger.getChild(NodeManager.name),
});
await nodeManager.start();
// Add seed nodes to the nodeGraph
const setNodeProms = new Array<Promise<void>>();
for (const nodeIdEncoded in optionsDefaulted.seedNodes) {
const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded);
if (nodeId == null) utils.never();
const setNodeProm = nodeManager.setNode(
nodeId,
optionsDefaulted.seedNodes[nodeIdEncoded],
true,
);
setNodeProms.push(setNodeProm);
}
await Promise.all(setNodeProms);
discovery = await Discovery.createDiscovery({
db,
keyRing,
Expand All @@ -403,7 +384,6 @@ class PolykeyAgent {
await NotificationsManager.createNotificationsManager({
acl,
db,
nodeConnectionManager,
nodeManager,
keyRing,
logger: logger.getChild(NotificationsManager.name),
Expand All @@ -412,7 +392,7 @@ class PolykeyAgent {
vaultManager = await VaultManager.createVaultManager({
vaultsPath,
keyRing,
nodeConnectionManager,
nodeManager,
notificationsManager,
gestaltGraph,
acl,
Expand Down Expand Up @@ -458,7 +438,6 @@ class PolykeyAgent {
await discovery?.stop();
await identitiesManager?.stop();
await gestaltGraph?.stop();
await mdns?.stop();
await acl?.stop();
await sigchain?.stop();
await certManager?.stop();
Expand All @@ -483,7 +462,6 @@ class PolykeyAgent {
acl,
gestaltGraph,
nodeGraph,
mdns,
taskManager,
nodeConnectionManager,
nodeManager,
Expand All @@ -504,6 +482,7 @@ class PolykeyAgent {
agentServicePort: optionsDefaulted.agentServicePort,
workers: optionsDefaulted.workers,
ipv6Only: optionsDefaulted.ipv6Only,
seedNodes: optionsDefaulted.seedNodes,
},
fresh,
});
Expand All @@ -523,7 +502,6 @@ class PolykeyAgent {
public readonly acl: ACL;
public readonly gestaltGraph: GestaltGraph;
public readonly nodeGraph: NodeGraph;
public readonly mdns: MDNS;
public readonly taskManager: TaskManager;
public readonly nodeConnectionManager: NodeConnectionManager;
public readonly nodeManager: NodeManager;
Expand Down Expand Up @@ -570,7 +548,6 @@ class PolykeyAgent {
acl,
gestaltGraph,
nodeGraph,
mdns,
taskManager,
nodeConnectionManager,
nodeManager,
Expand All @@ -594,7 +571,6 @@ class PolykeyAgent {
acl: ACL;
gestaltGraph: GestaltGraph;
nodeGraph: NodeGraph;
mdns: MDNS;
taskManager: TaskManager;
nodeConnectionManager: NodeConnectionManager;
nodeManager: NodeManager;
Expand All @@ -620,7 +596,6 @@ class PolykeyAgent {
this.gestaltGraph = gestaltGraph;
this.discovery = discovery;
this.nodeGraph = nodeGraph;
this.mdns = mdns;
this.taskManager = taskManager;
this.nodeConnectionManager = nodeConnectionManager;
this.nodeManager = nodeManager;
Expand Down Expand Up @@ -681,6 +656,7 @@ class PolykeyAgent {
groups: Array<string>;
port: number;
};
seedNodes: SeedNodes;
}>;
workers?: number;
fresh?: boolean;
Expand All @@ -698,6 +674,7 @@ class PolykeyAgent {
groups: config.defaultsSystem.mdnsGroups,
port: config.defaultsSystem.mdnsPort,
},
seedNodes: config.defaultsUser.seedNodes,
});
// Register event handlers
this.certManager.addEventListener(
Expand Down Expand Up @@ -768,18 +745,11 @@ class PolykeyAgent {
host: optionsDefaulted.clientServiceHost,
port: optionsDefaulted.clientServicePort,
});
await this.mdns.start({
id: this.keyRing.getNodeId().toBuffer().readUint16BE(),
hostname: nodesUtils.encodeNodeId(this.keyRing.getNodeId()),
groups: optionsDefaulted.mdns.groups,
port: optionsDefaulted.mdns.port,
});
await this.nodeManager.start();
await this.nodeConnectionManager.start({
host: optionsDefaulted.agentServiceHost,
port: optionsDefaulted.agentServicePort,
ipv6Only: optionsDefaulted.ipv6Only,
manifest: agentServerManifest({
agentService: agentServerManifest({
acl: this.acl,
db: this.db,
keyRing: this.keyRing,
Expand All @@ -792,8 +762,39 @@ class PolykeyAgent {
vaultManager: this.vaultManager,
}),
});
await this.nodeManager.start();
// Add seed nodes to the nodeGraph
const setNodeProms = new Array<Promise<void>>();
for (const nodeIdEncoded in optionsDefaulted.seedNodes) {
const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded);
if (nodeId == null) utils.never();
const setNodeProm = this.nodeManager.setNode(
nodeId,
optionsDefaulted.seedNodes[nodeIdEncoded],
{
mode: 'direct',
connectedTime: 0,
scopes: ['global'],
},
true,
);
setNodeProms.push(setNodeProm);
}
await Promise.all(setNodeProms);
await this.nodeGraph.start({ fresh });
await this.nodeManager.syncNodeGraph(false);
const seedNodeEntries = Object.entries(
optionsDefaulted.seedNodes as SeedNodes,
);
if (seedNodeEntries.length > 0) {
const initialNodes = seedNodeEntries.map(
([nodeIdEncoded, nodeAddress]) => {
const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded);
if (nodeId == null) utils.never('nodeId should be defined');
return [nodeId, nodeAddress] as [NodeId, NodeAddress];
},
);
await this.nodeManager.syncNodeGraph(initialNodes, undefined, false);
}
await this.discovery.start({ fresh });
await this.vaultManager.start({ fresh });
await this.notificationsManager.start({ fresh });
Expand Down Expand Up @@ -838,7 +839,6 @@ class PolykeyAgent {
await this.vaultManager?.stop();
await this.discovery?.stop();
await this.nodeGraph?.stop();
await this.mdns?.stop();
await this.nodeConnectionManager?.stop();
await this.nodeManager?.stop();
await this.clientService.stop({ force: true });
Expand Down Expand Up @@ -882,7 +882,6 @@ class PolykeyAgent {
await this.clientService.stop({ force: true });
await this.identitiesManager.stop();
await this.gestaltGraph.stop();
await this.mdns.stop();
await this.acl.stop();
await this.sigchain.stop();
await this.certManager.stop();
Expand Down
3 changes: 1 addition & 2 deletions src/bootstrap/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ async function bootstrapState({
await NotificationsManager.createNotificationsManager({
acl,
db,
nodeConnectionManager: {} as any, // No connections are attempted
nodeManager,
keyRing,
logger: logger.getChild(NotificationsManager.name),
Expand All @@ -206,7 +205,7 @@ async function bootstrapState({
db,
gestaltGraph,
keyRing,
nodeConnectionManager: {} as any, // No connections are attempted
nodeManager: {} as any, // No connections are attempted
vaultsPath,
notificationsManager,
logger: logger.getChild(VaultManager.name),
Expand Down
15 changes: 7 additions & 8 deletions src/client/handlers/NodesAdd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type {
NodesAddMessage,
} from '../types';
import type { NodeId } from '../../ids';
import type { Host, Hostname, Port } from '../../network/types';
import type { Host, Port } from '../../network/types';
import type NodeManager from '../../nodes/NodeManager';
import { UnaryHandler } from '@matrixai/rpc';
import * as ids from '../../ids';
Expand All @@ -32,13 +32,13 @@ class NodesAdd extends UnaryHandler<
port,
}: {
nodeId: NodeId;
host: Host | Hostname;
host: Host;
port: Port;
} = validateSync(
(keyPath, value) => {
return matchSync(keyPath)(
[['nodeId'], () => ids.parseNodeId(value)],
[['host'], () => networkUtils.parseHostOrHostname(value)],
[['host'], () => networkUtils.parseHost(value)],
[['port'], () => networkUtils.parsePort(value)],
() => value,
);
Expand All @@ -52,9 +52,7 @@ class NodesAdd extends UnaryHandler<
// Pinging to authenticate the node
if (
(input.ping ?? false) &&
!(await nodeManager.pingNode(nodeId, [
{ host, port, scopes: ['global'] },
]))
!(await nodeManager.pingNodeAddress(nodeId, host, port))
) {
throw new nodeErrors.ErrorNodePingFailed(
'Failed to authenticate target node',
Expand All @@ -64,9 +62,10 @@ class NodesAdd extends UnaryHandler<
await db.withTransactionF((tran) =>
nodeManager.setNode(
nodeId,
[host, port],
{
host,
port,
mode: 'direct',
connectedTime: Date.now(),
scopes: ['global'],
},
true,
Expand Down
Loading

0 comments on commit 28f0001

Please sign in to comment.