Skip to content

Commit

Permalink
Refactor inputHandler and push_wrap
Browse files Browse the repository at this point in the history
  • Loading branch information
AnnaBlume99 committed Jul 19, 2023
1 parent e807f53 commit 5a21828
Showing 1 changed file with 54 additions and 49 deletions.
103 changes: 54 additions & 49 deletions rt/src/p2p/p2p.mts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ the libp2p).
*/

// AB: TODO:
// - relay pushables??
// - connection / stream / needsToDial
// - comments
// - discuss w/ Aslan
// - types... :(

// IMPORTS

import { PeerId } from '@libp2p/interface-peer-id';
Expand Down Expand Up @@ -139,7 +146,7 @@ async function startp2p(nodeId, rt: any): Promise<PeerId> {
},
connectionManager : {
maxConnections: Infinity,
minConnections: 0 //What are good numbers here?
minConnections: 0 //AB: What are good numbers here?
}
});

Expand Down Expand Up @@ -188,7 +195,7 @@ async function startp2p(nodeId, rt: any): Promise<PeerId> {
/*for (let relay_addr of p2pconfig.relays ) { //AB: update relay addresses in p2pconfig
keepAliveRelay(relay_addr);
}*/ //AB: update relay addresses in p2pconfig
keepAliveRelay("/ip4/134.209.92.133/tcp/5555/ws/p2p/12D3KooWShh9qmeS1UEgwWpjAsrjsigu8UGh8DRKyx1UG6HeHzjf");
//keepAliveRelay("/ip4/134.209.92.133/tcp/5555/ws/p2p/12D3KooWShh9qmeS1UEgwWpjAsrjsigu8UGh8DRKyx1UG6HeHzjf");

return id;
}
Expand All @@ -214,9 +221,9 @@ async function createLibp2p(_options) {
/*bootstrap({
list: bootstrappers
}),*/ //AB: should we use bootstrappers?
/*mdns({
mdns({
interval: 20e3
})*/
})
],
services: {
identify: identifyService()
Expand Down Expand Up @@ -326,7 +333,7 @@ async function getPeerInfoWithRelay(id:any) {
return pi;
}

async function getPeerInfo(id:string) : Promise<PeerId>{
async function getPeerInfo(id:string) : Promise<PeerId> {
const peerId = peerIdFromString(id);

return new Promise ((resolve, reject) => {
Expand All @@ -346,11 +353,13 @@ async function getPeerInfo(id:string) : Promise<PeerId>{
debug(`calling peerRouting.findPeer ${peerId}`);
const peerInfo = await _node.peerRouting.findPeer(peerId, {signal : AbortSignal.timeout(1000)});
debug ("findPeer returned");

await _node.peerStore.patch(peerInfo.id, {
multiaddrs:
peerInfo.multiaddrs
}); //AB: necessary??
});
debug("added multiaddr to store");

resolve (peerInfo.id);
} catch (err) {
debug(`try_find_peer exception`);
Expand Down Expand Up @@ -403,7 +412,7 @@ function setupConnection(peerId : PeerId, stream): void {
async (source) => {
try {
for await (const x of source) {
inputHandler(id, x, peerId); //AB: Something is wonky with the parameters!
inputHandler(id, x);
}
} catch (err) {
error(`error in pipe`);
Expand All @@ -426,15 +435,14 @@ function setupConnection(peerId : PeerId, stream): void {
debug(`Connection set up with ${id}`);
}

async function inputHandler(id, input, fromNodeId_) { //AB: fix parameters!!!
let fromNodeId = fromNodeId_.toString();
async function inputHandler(id, input) {
debug ("-- input handler");
switch (input.messageType) {
case (MessageType.SPAWN):
if(_rt.remoteSpawnOK()) {
debug("RECEIVED SPAWN");

let x = await _rt.spawnFromRemote(input.message, fromNodeId);
let x = await _rt.spawnFromRemote(input.message, id);
push_wrap(id, {
messageType: MessageType.SPAWNOK,
spawnNonce: input.spawnNonce,
Expand All @@ -455,11 +463,11 @@ async function inputHandler(id, input, fromNodeId_) { //AB: fix parameters!!!
break;

case (MessageType.SEND):
debug (`SEND from ${fromNodeId}`);
debug (`SEND from ${id}`);
_rt.receiveFromRemote(
input.pid,
input.message,
fromNodeId
id
);
break;

Expand Down Expand Up @@ -507,7 +515,7 @@ let _relay_id = null;
let _keepAliveCounter = 0; //AB: can tag peers with "Keep_alive" in the peerstore (does it work?)
let _relayTable: IHash = {}; //AB: use in-built functionality instead?

async function keepAliveRelay(relay_addr:string) {
async function keepAliveRelay(relay_addr: string) {
let id = relay_addr.split('/').pop();
debug(`relay id is ${id}`);
let timeout = _KEEPALIVE;
Expand Down Expand Up @@ -540,11 +548,11 @@ async function dialRelay(relay_addr) {
}
});

debug (`Added relay address`);
debug(`Added relay address`);
const conn = await _node.dial(relayId); //AB: necessary??
debug (`Got relay connection`);
debug(`Got relay connection`);
const stream = await _node.dialProtocol(relayId, _RELAY_PROTOCOL);
debug (`Got relay stream`);
debug(`Got relay stream`);
const peerId = conn.remotePeer;
_relay_id = peerId.toString();
debug(`~~ relay dialed, keep alive counter is ${_keepAliveCounter++}`);
Expand Down Expand Up @@ -578,47 +586,44 @@ async function sendp2p(id : PeerId, procId, obj) {
}

async function push_wrap(id: any, data: any) {
debug(`push_wrap`);
let connections = _node.getConnections(id);
let needsToDial = true;
let p = null;

if(connections.length >= 1) {
let connection = connections[0]; //AB: check all connections
let streams = connection.streams;
if(streams.length >= 1) {
let stream = streams[0];
p = (stream as any).p;
needsToDial = p == undefined;
} else {
error("streams array empty");
throw new Error //AB: What to do then? Can this happen?
while (true) {
debug(`push_wrap`);
let connections = _node.getConnections(id);
let needsToDial = true;
let p = null;

break_loop:
for(const connection of connections) {
let streams = connection.streams;
for(const stream of streams) {
p = (stream as any).p;
needsToDial = p == undefined;

if(!needsToDial) {
break break_loop;
}
}
}
}
if(needsToDial) {
debug("needs to dial");
let stream = await dial(id.toString());
debug("dialed to obtain stream");
p = (stream as any).p;
}

debug (`push_wrap; stream obtained`);

while (true) {
try {
debug(`push_wrap; pushing`);
if(needsToDial) {
debug("needs to dial");
let stream = await dial(id.toString());
debug("dialed to obtain stream");
p = (stream as any).p;
}

debug (`push_wrap; stream obtained; pushing`);
await p.push(data);
debug(`push_wrap; data pushed into the stream`);
break;
} catch (err) {
// the stream we have used is
// no good for whatever reason;
// most likely there are networking
// issues. we report the errors
// and redial
// the stream we have used is no good for whatever reason;
// most likely there are networking issues.
// we report the errors and redial
error(`push wrap error`);
processExpectedNetworkErrors(err, "push_wrap");
}
}
}
}

Expand Down Expand Up @@ -763,7 +768,7 @@ function processExpectedNetworkErrors(err, source="source unknown") {
default:
error (`Unhandled error case with error code ${err.code}`)
throw err;
}
}
} else {
error (`Unhandled general error case ${err}`)
throw err;
Expand Down

0 comments on commit 5a21828

Please sign in to comment.