Skip to content

Commit

Permalink
WIP [ci skip]
Browse files Browse the repository at this point in the history
  • Loading branch information
CMCDragonkai committed Nov 9, 2023
1 parent a360cec commit 71583a6
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 64 deletions.
120 changes: 62 additions & 58 deletions src/nodes/NodeGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { IdInternal } from '@matrixai/id';
import * as nodesUtils from './utils';
import * as nodesErrors from './errors';
import * as nodesEvents from './events';
import { getUnixtime, never } from '../utils';
import * as utils from '../utils';

/**
* NodeGraph is an implementation of Kademlia for maintaining peer to peer
Expand All @@ -33,10 +33,7 @@ import { getUnixtime, never } from '../utils';
* transactional resetting of the buckets if the own node ID changes.
*
* When the node ID changes, either due to key renewal or reset, we remap all
* existing records to the other space, and then we swap the active space
* pointer.
*
* Wait is this true?
* existing records to the other space, and then we swap the active space key.
*/
interface NodeGraph extends CreateDestroyStartStop {}
@CreateDestroyStartStop(
Expand Down Expand Up @@ -150,8 +147,6 @@ class NodeGraph {

public async destroy(): Promise<void> {
this.logger.info(`Destroying ${this.constructor.name}`);
// If the DB was stopped, the existing sublevel `this.nodeGraphDb` will not be valid
// Therefore we recreate the sublevel here
await this.db.clear(this.nodeGraphDbPath);
this.logger.info(`Destroyed ${this.constructor.name}`);
}
Expand All @@ -175,6 +170,10 @@ class NodeGraph {
return space;
}

/**
* Locks the bucket index for exclusive operations.
* This allows you sequence operations for any bucket.
*/
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async lockBucket(bucketIndex: number, tran: DBTransaction) {
const keyPath = [
Expand All @@ -184,13 +183,15 @@ class NodeGraph {
return await tran.lock(keyPath.join(''));
}

/**
* Gets the `NodeData` given a `NodeId`.
*/
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async getNode(
nodeId: NodeId,
tran?: DBTransaction,
): Promise<NodeData | undefined> {
const tranOrDb = tran ?? this.db;

const [bucketIndex] = this.bucketIndex(nodeId);
const bucketDomain = [
...this.nodeGraphBucketsDbPath,
Expand All @@ -201,10 +202,10 @@ class NodeGraph {
}

/**
* Get all nodes.
* Nodes are always sorted by `NodeBucketIndex` first
* Then secondly by the node IDs
* The `order` parameter applies to both, for example possible sorts:
* Get all `NodeData`.
*
* Results are sorted by `NodeBucketIndex` then `NodeId`.
* The `order` parameter applies to both, for example:
* NodeBucketIndex asc, NodeID asc
* NodeBucketIndex desc, NodeId desc
*/
Expand All @@ -219,7 +220,6 @@ class NodeGraph {
return yield* getNodes(tran);
});
}

for await (const [keyPath, nodeData] of tran.iterator<NodeData>(
this.nodeGraphBucketsDbPath,
{
Expand All @@ -233,11 +233,12 @@ class NodeGraph {
}

/**
* Will add a node to the node graph and increment the bucket count.
* If the node already existed it will be updated.
* @param nodeId NodeId to add to the NodeGraph
* @param nodeAddress Address information to add
* @param tran
* Sets a `NodeId` and `NodeAddress` to an appropriate bucket.
* If the `NodeId` already exists, it will be updated.
*
* Note that this does not respect any bucket limit.
* The bucket limit is managed by `NodeManager`.
* It will however increment the bucket count if it is a new record.
*/
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async setNode(
Expand All @@ -250,32 +251,21 @@ class NodeGraph {
this.setNode(nodeId, nodeAddress, tran),
);
}

const [bucketIndex, bucketKey] = this.bucketIndex(nodeId);
const lastUpdatedPath = [...this.nodeGraphLastUpdatedDbPath, bucketKey];
const nodeIdKey = nodesUtils.bucketDbKey(nodeId);
const bucketPath = [...this.nodeGraphBucketsDbPath, bucketKey, nodeIdKey];
const nodeData = await tran.get<NodeData>(bucketPath);
if (nodeData != null) {
this.logger.debug(
`Updating node ${nodesUtils.encodeNodeId(
nodeId,
)} in bucket ${bucketIndex}`,
);
// If the node already exists we want to remove the old `lastUpdated`
const lastUpdatedKey = nodesUtils.lastUpdatedKey(nodeData.lastUpdated);
await tran.del([...lastUpdatedPath, lastUpdatedKey, nodeIdKey]);
} else {
this.logger.debug(
`Adding node ${nodesUtils.encodeNodeId(
nodeId,
)} to bucket ${bucketIndex}`,
);
// It didn't exist, so we want to increment the bucket count
const count = await this.getBucketMetaProp(bucketIndex, 'count', tran);
await this.setBucketMetaProp(bucketIndex, 'count', count + 1, tran);
}
const lastUpdated = getUnixtime();
const lastUpdated = utils.getUnixtime();
await tran.put(bucketPath, {
address: nodeAddress,
lastUpdated,
Expand All @@ -288,29 +278,35 @@ class NodeGraph {
);
}

@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async getOldestNode(
bucketIndex: number,
limit: number = 1,
tran?: DBTransaction,
): Promise<Array<NodeId>> {
if (tran == null) {
return this.db.withTransactionF((tran) =>
this.getOldestNode(bucketIndex, limit, tran),
);
}
const bucketKey = nodesUtils.bucketKey(bucketIndex);
// Remove the oldest entry in the bucket
const oldestNodeIds: Array<NodeId> = [];
for await (const [keyPath] of tran.iterator(
[...this.nodeGraphLastUpdatedDbPath, bucketKey],
{ limit },
)) {
const { nodeId } = nodesUtils.parseLastUpdatedBucketDbKey(keyPath);
oldestNodeIds.push(nodeId);
}
return oldestNodeIds;
}
// /**
// * Gets node Ids
// * WHy don't you just get a bucket and iterate over it?
// * Or set the limit accordingly?
// * That seems strange
// */
// @ready(new nodesErrors.ErrorNodeGraphNotRunning())
// public async getOldestNode(
// bucketIndex: number,
// limit: number = 1,
// tran?: DBTransaction,
// ): Promise<Array<NodeId>> {
// if (tran == null) {
// return this.db.withTransactionF((tran) =>
// this.getOldestNode(bucketIndex, limit, tran),
// );
// }
// const bucketKey = nodesUtils.bucketKey(bucketIndex);
// // Remove the oldest entry in the bucket
// const oldestNodeIds: Array<NodeId> = [];
// for await (const [keyPath] of tran.iterator(
// [...this.nodeGraphLastUpdatedDbPath, bucketKey],
// { limit },
// )) {
// const { nodeId } = nodesUtils.parseLastUpdatedBucketDbKey(keyPath);
// oldestNodeIds.push(nodeId);
// }
// return oldestNodeIds;
// }

@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async unsetNode(nodeId: NodeId, tran?: DBTransaction): Promise<void> {
Expand Down Expand Up @@ -338,24 +334,28 @@ class NodeGraph {
}

/**
* Gets a bucket
* Gets a bucket.
* The bucket's node IDs is sorted lexicographically by default
* Alternatively you can acquire them sorted by lastUpdated timestamp
* or by distance to the own NodeId
* or by distance to the own NodeId.
*
* @param limit Limit the number of nodes returned, note that `-1` means
* no limit, but `Infinity` means `0`.
*/
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async getBucket(
bucketIndex: NodeBucketIndex,
sort: 'nodeId' | 'distance' | 'lastUpdated' = 'nodeId',
order: 'asc' | 'desc' = 'asc',
limit?: number,
tran?: DBTransaction,
): Promise<NodeBucket> {
if (tran == null) {
return this.db.withTransactionF((tran) =>
this.getBucket(bucketIndex, sort, order, tran),
this.getBucket(bucketIndex, sort, order, limit, tran),
);
}

if (bucketIndex < 0 || bucketIndex >= this.nodeIdBits) {
throw new nodesErrors.ErrorNodeGraphBucketIndex(
`bucketIndex must be between 0 and ${this.nodeIdBits - 1} inclusive`,
Expand All @@ -369,6 +369,7 @@ class NodeGraph {
{
reverse: order !== 'asc',
valueAsBuffer: false,
limit,
},
)) {
const nodeId = nodesUtils.parseBucketDbKey(key[0] as Buffer);
Expand All @@ -391,13 +392,14 @@ class NodeGraph {
[...this.nodeGraphLastUpdatedDbPath, bucketKey],
{
reverse: order !== 'asc',
limit,
},
)) {
const nodeId = IdInternal.fromBuffer<NodeId>(nodeIdBuffer);
bucketDbIterator.seek(nodeIdBuffer);
// eslint-disable-next-line
const iteratorResult = await bucketDbIterator.next();
if (iteratorResult == null) never();
if (iteratorResult == null) utils.never();
const [, nodeData] = iteratorResult;
bucket.push([nodeId, nodeData]);
}
Expand Down Expand Up @@ -726,6 +728,7 @@ class NodeGraph {
startingBucket,
undefined,
undefined,
undefined,
tran,
);
// We need to iterate over the key stream
Expand Down Expand Up @@ -783,6 +786,7 @@ class NodeGraph {
lastBucketIndex,
undefined,
undefined,
undefined,
tran,
);
// Pop off elements of the same bucket to avoid duplicates
Expand Down
19 changes: 13 additions & 6 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ class NodeManager {
bucketIndex,
undefined,
undefined,
undefined,
tran,
);
}
Expand Down Expand Up @@ -771,9 +772,14 @@ class NodeManager {
// We want to add a node but the bucket is full
if (force) {
// We just add the new node anyway without checking the old one
const oldNodeId = (
await this.nodeGraph.getOldestNode(bucketIndex, 1, tran)
).pop();
const bucket = await this.nodeGraph.getBucket(
bucketIndex,
'lastUpdated',
'asc',
1,
tran
);
const oldNodeId = bucket[0]?.[0];
if (oldNodeId == null) never();
this.logger.debug(
`Force was set, removing ${nodesUtils.encodeNodeId(
Expand Down Expand Up @@ -842,18 +848,19 @@ class NodeManager {
// Locking on bucket
await this.nodeGraph.lockBucket(bucketIndex, tran);
const semaphore = new Semaphore(3);

// Iterating over existing nodes
const bucket = await this.nodeGraph.getOldestNode(
const bucket = await this.nodeGraph.getBucket(
bucketIndex,
'lastUpdated',
'asc',
this.nodeGraph.nodeBucketLimit,
tran,
);
if (bucket == null) never();
let removedNodes = 0;
const unsetLock = new Lock();
const pendingPromises: Array<Promise<void>> = [];
for (const nodeId of bucket) {
for (const [nodeId] of bucket) {
// We want to retain seed nodes regardless of state, so skip them
if (this.nodeConnectionManager.isSeedNode(nodeId)) continue;
if (removedNodes >= pendingNodes.size) break;
Expand Down
27 changes: 27 additions & 0 deletions src/nodes/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,29 @@ import type { Host, Hostname, Port } from '../network/types';
*/
type NodeGraphSpace = '0' | '1';

/**
* Node address scopes allows the classification of the address.
* Local means that the address is locally routable.
* Global means that the address is globally routable.
*/
type NodeAddressScope = 'local' | 'global';

/**
* Node address.
*/
type NodeAddress = {
/**
* Host can be a host IP address or a hostname string.
*/
host: Host | Hostname;
/**
* Port of the node.
*/
port: Port;
/**
* Scopes can be used to classify the address.
* Multiple scopes is understood as set-union.
*/
scopes: Array<NodeAddressScope>;
};

Expand All @@ -22,8 +40,17 @@ type NodeBucketMeta = {
count: number;
};

/**
* This is the record value stored in the NodeGraph.
*/
type NodeData = {
/**
* The address of the node.
*/
address: NodeAddress;
/**
* Unix timestamp of when it was last updated.
*/
lastUpdated: number;
};

Expand Down

0 comments on commit 71583a6

Please sign in to comment.