Skip to content

Commit 8c8491b

Browse files
committed
wip: remote dht
1 parent 29fa15d commit 8c8491b

13 files changed

+115
-214
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
"@commitlint/config-conventional": "^19.5.0",
7272
"@openhps/core": "0.7.16",
7373
"@openhps/rdf": "0.4.101",
74-
"@openhps/solid": "0.2.18",
74+
"@openhps/solid": "0.2.21",
7575
"@types/chai": "^4.3.19",
7676
"@types/mocha": "^10.0.8",
7777
"@types/node": "^22.5.5",

src/models/DHTNode.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { DHTNetwork } from "../services/DHTNetwork";
2+
13
export type NodeID = number;
24

35
/**
@@ -6,7 +8,8 @@ export type NodeID = number;
68
export interface DHTNode {
79
nodeID: number;
810
collection: string;
9-
11+
network: DHTNetwork;
12+
1013
/**
1114
* Add a node to the network. This function is executed on the target node.
1215
* @param {NodeID} nodeID Node to add

src/models/RemoteDHTNode.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import { DHTNetwork } from '../services';
12
import { DHTNode, NodeID } from './DHTNode';
23

34
/**
45
* Proxy handler for a remote DHT node
56
*/
67
export abstract class RemoteDHTNode implements DHTNode, ProxyHandler<DHTNode> {
8+
abstract network: DHTNetwork;
79
collection: string;
810
nodeID: number;
911

@@ -12,19 +14,25 @@ export abstract class RemoteDHTNode implements DHTNode, ProxyHandler<DHTNode> {
1214
case 'nodeID':
1315
return this.nodeID;
1416
case 'addNode':
15-
return this.addNode.bind(this);
17+
return this.addNode.bind(receiver);
1618
case 'removeNode':
17-
return this.removeNode.bind(this);
19+
return this.removeNode.bind(receiver);
1820
case 'store':
19-
return this.store.bind(this);
21+
return this.store.bind(receiver);
2022
case 'hasValue':
21-
return this.hasValue.bind(this);
23+
return this.hasValue.bind(receiver);
2224
case 'findValue':
23-
return this.findValue.bind(this);
25+
return this.findValue.bind(receiver);
2426
case 'ping':
25-
return this.ping.bind(this);
27+
return this.ping.bind(receiver);
2628
default:
27-
return Reflect.get(target, p, receiver);
29+
const targetProperty = Reflect.get(target, p, receiver);
30+
const selfProperty = Reflect.get(this, p, receiver);
31+
if (selfProperty) {
32+
return selfProperty;
33+
} else {
34+
return targetProperty;
35+
}
2836
}
2937
}
3038

src/models/ldht/LDHTAction.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { SerializableMember, SerializableObject } from '@openhps/core';
2-
import { IriString, SerializableThing, schema } from '@openhps/rdf';
1+
import { NumberType, SerializableMember, SerializableObject } from '@openhps/core';
2+
import { DataFactory, IriString, SerializableThing, Thing, schema } from '@openhps/rdf';
33
import { ldht } from '../../terms';
44

55
@SerializableObject({
@@ -14,29 +14,33 @@ export abstract class LDHTAction extends SerializableThing {
1414
rdf: {
1515
predicate: ldht.timeout,
1616
},
17+
numberType: NumberType.INTEGER
1718
})
1819
timeout?: number;
1920

2021
@SerializableMember({
2122
rdf: {
2223
predicate: schema.actionStatus,
23-
serialize: false
24+
serializer: (value: IriString) => DataFactory.namedNode(value),
25+
deserializer: (value: Thing) => value.value,
2426
},
2527
})
2628
actionStatus?: IriString;
2729

2830
@SerializableMember({
2931
rdf: {
3032
predicate: schema.agent,
31-
serialize: false
33+
serializer: (value: IriString) => DataFactory.namedNode(value),
34+
deserializer: (value: Thing) => value.value,
3235
},
3336
})
3437
agent?: IriString;
3538

3639
@SerializableMember({
3740
rdf: {
3841
predicate: schema.target,
39-
serialize: false
42+
serializer: (value: IriString) => DataFactory.namedNode(value),
43+
deserializer: (value: Thing) => value.value,
4044
},
4145
})
4246
target?: IriString;
@@ -47,6 +51,9 @@ export abstract class LDHTAction extends SerializableThing {
4751
}
4852
}
4953

54+
/**
55+
* Action status
56+
*/
5057
export class ActionStatus {
5158
static readonly CompletedActionStatus = schema.CompletedActionStatus;
5259
static readonly FailedActionStatus = schema.FailedActionStatus;

src/models/ldht/LDHTRemoteNode.ts

Lines changed: 0 additions & 59 deletions
This file was deleted.

src/models/ldht/LocalRDFNode.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ export class LocalRDFNode extends LocalDHTNode implements RDFNode {
9898
}
9999

100100
ping(): Promise<void> {
101-
return new Promise((resolve, reject) => {});
101+
return new Promise((resolve, reject) => {
102+
103+
});
102104
}
103105

104106
/**
@@ -111,7 +113,7 @@ export class LocalRDFNode extends LocalDHTNode implements RDFNode {
111113
super
112114
.addNode(nodeID)
113115
.then(() => {
114-
// Add in RDF store
116+
resolve();
115117
})
116118
.catch(reject);
117119
});

src/models/ldht/RDFNode.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ import { IriString } from '@openhps/rdf';
33
import { LDHTAction } from '.';
44

55
export interface RDFNode extends DHTNode {
6-
uri: IriString;
7-
actions: LDHTAction[];
6+
uri?: IriString;
7+
actions?: LDHTAction[];
88
/**
99
* URI where the data is stored
1010
*/

src/models/ldht/RemoteRDFNode.ts

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Action, DataFactory, IriString, RDFSerializer, schema } from '@openhps/rdf';
1+
import { Action, createChangeLog, DataFactory, IriString, RDFSerializer, schema, Store } from '@openhps/rdf';
22
import { NodeID } from '../DHTNode';
33
import { RemoteDHTNode } from '../RemoteDHTNode';
44
import { RDFNode } from './RDFNode';
@@ -14,25 +14,26 @@ import { LDHTPingAction } from './LDHTPingAction';
1414

1515
@SerializableObject()
1616
export class RemoteRDFNode extends RemoteDHTNode implements RDFNode {
17-
@SerializableMember()
18-
uri: IriString;
19-
@SerializableArrayMember(LDHTAction)
20-
actions: LDHTAction[];
17+
uri?: IriString;
18+
actions?: LDHTAction[];
2119
dataUri: IriString;
2220
nodesUri: IriString;
2321
network: DHTRDFNetwork;
2422

2523
addNode(nodeID: NodeID): Promise<void> {
2624
return new Promise(async (resolve, reject) => {
27-
console.log("adding node");
28-
// Send an add node action
29-
const action = new LDHTAddNodeAction();
30-
action.actionStatus = schema.PotentialActionStatus;
31-
action.agent = (this.network.node as LocalRDFNode).uri;
32-
action.object = (await this.network.findNodeById(nodeID) as LocalRDFNode).uri;
33-
this.createAction(action).then(() => {
34-
resolve();
35-
}).catch(reject);
25+
try{
26+
// Send an add node action
27+
const action = new LDHTAddNodeAction();
28+
action.actionStatus = schema.PotentialActionStatus;
29+
action.agent = (this.network.node as LocalRDFNode).uri;
30+
action.object = (await this.network.findNodeById(nodeID) as RemoteRDFNode).uri;
31+
this.createAction(action).then(() => {
32+
resolve();
33+
}).catch(reject);
34+
} catch (e) {
35+
console.log(e);
36+
}
3637
});
3738
}
3839

@@ -110,16 +111,17 @@ export class RemoteRDFNode extends RemoteDHTNode implements RDFNode {
110111

111112
protected createAction<T extends LDHTAction>(action: T): Promise<T> {
112113
return new Promise((resolve, reject) => {
113-
console.log(this.uri, this.actions);
114114
const actionContainer = this.actions.find((a) => a.type === action.type);
115+
if (!actionContainer) {
116+
return reject(new Error("Action container not found!"));
117+
}
115118
const timestamp = new Date().getTime();
116-
const uri = `${actionContainer}${timestamp}.ttl`;
119+
const uri = `${actionContainer.target}${timestamp}.ttl`;
117120
const service = this.network.solidService;
118121
const session = service.session;
119-
service.getDatasetStore(session, uri).then((store) => {
120-
store.addQuads(RDFSerializer.serializeToQuads(action));
121-
return service.saveDatasetStore(session, uri, store);
122-
}).then(() => {
122+
const store = new Store();
123+
store.addQuads(RDFSerializer.serializeToQuads(action));
124+
service.saveDataset(session, uri, createChangeLog(store) as any).then(() => {
123125
resolve(action);
124126
}).catch(reject);
125127
});

src/services/DHTMemoryNetwork.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@ export class DHTMemoryNetwork extends DHTNetwork {
2020

2121
addNode(node: DHTNode): Promise<void> {
2222
return new Promise((resolve, reject) => {
23+
// Set reference to network
24+
node = this.nodeHandler ? new Proxy(node, this.nodeHandler) : node;
25+
node.network = this;
2326
// Add node locally
24-
this.nodes.set(node.nodeID, this.nodeHandler ? new Proxy(node, this.nodeHandler) : node);
25-
27+
this.nodes.set(node.nodeID, node);
28+
2629
Promise.all(
2730
Array.from(this.nodes.values()).map((otherNode) => {
28-
if (otherNode.nodeID !== node.nodeID) {
29-
return Promise.all([otherNode.addNode(node.nodeID), node.addNode(otherNode.nodeID)]);
30-
}
31-
return Promise.resolve();
31+
return Promise.all([otherNode.addNode(node.nodeID), node.addNode(otherNode.nodeID)]);
3232
}),
3333
)
3434
.then(() => resolve())

src/services/DHTNetwork.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,14 @@ export abstract class DHTNetwork {
5757
/**
5858
* Initialize the network
5959
* @param nodeID Node identifier to use as the local node
60-
* @param model Model to use
60+
* @param _model Model to use
6161
* @returns {Promise<void>} Promise when the network is initialized
6262
*/
63-
initialize(nodeID: number, model?: Model): Promise<void> {
63+
initialize(nodeID: number, _model?: Model): Promise<void> {
6464
return new Promise((resolve, reject) => {
6565
this.createLocalNode(nodeID)
6666
.then((node) => {
6767
this.node = node;
68-
return this.addNode(this.node);
69-
})
70-
.then(() => {
7168
resolve();
7269
})
7370
.catch(reject);

0 commit comments

Comments
 (0)