Skip to content

Commit

Permalink
Make pingpong example work
Browse files Browse the repository at this point in the history
  • Loading branch information
AnnaBlume99 committed Jun 29, 2023
1 parent b973d23 commit a389b63
Showing 1 changed file with 27 additions and 16 deletions.
43 changes: 27 additions & 16 deletions rt/src/p2p/p2p.mts
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ async function startp2p(nodeId, rt): Promise<PeerId> {
//let connectionManager = (nodeListener as any).components.connectionManager

await nodeListener.handle(_PROTOCOL, async ({ connection, stream }) => {
//console.log('Handling protocol')
console.log('Handling protocol')
//streamToConsole(stream)
setupConnection(connection.remotePeer, stream)
})
Expand Down Expand Up @@ -862,7 +862,7 @@ async function push_wrap(id: any, data: any) {
// issues. we report the errors
// and redial
debug (`push wrap error`)
//processExpectedNetworkErrors(err, "push_wrap");
processExpectedNetworkErrors(err, "push_wrap");
throw err
}
}
Expand All @@ -889,7 +889,7 @@ function setupConnection(peerId : PeerId, stream) {
}
} catch (err) {
debug (`try catch of the source`)
//processExpectedNetworkErrors(err, "setupConnection/pipe");
processExpectedNetworkErrors(err, "setupConnection/pipe");
throw err;
}

Expand Down Expand Up @@ -924,7 +924,7 @@ function dial(id) {
setupConnection (peerId, stream);
resolve ( stream );
} catch ( err ) {
//processExpectedNetworkErrors (err, "dial");
processExpectedNetworkErrors (err, "dial");

// if the error is suppressed we move on to trying 10 times
// with exponential backoff
Expand Down Expand Up @@ -983,7 +983,7 @@ async function inputHandler(id, input, fromNodeId_) {
let fromNodeId = fromNodeId_.toString()
debug ("-- input handler")
switch (input.messageType) {
/*case (MessageType.SPAWN):
case (MessageType.SPAWN):
if (_rt.remoteSpawnOK()) {
debug ("RECEIVED SPAWN")
let x = await _rt.spawnFromRemote (input.message, fromNodeId)
Expand All @@ -1006,7 +1006,7 @@ async function inputHandler(id, input, fromNodeId_) {
// something is fishy;
debug("something is fishy; no matching callback for the nonce");
}
break;*/
break;

case (MessageType.SEND):
debug (`SEND ${fromNodeId}`);
Expand Down Expand Up @@ -1074,7 +1074,22 @@ async function whereisp2p(id, str) {
}

let _whereisNonces = {};
let _unacknowledged:any = {}
let _unacknowledged:any = {};
let _spawnNonces = {};

async function spawnp2p(id, data) {
const spawnNonce = uuidv4();
return new Promise ((resolve, reject) => {
_spawnNonces[spawnNonce] = (err, data) => {
if (err) { reject (err) } else { resolve (data)}
};
push_wrap(id, {
messageType: MessageType.SPAWN,
spawnNonce: spawnNonce,
message: data
})
});
}

function addUnacknowledged (id, uuid, f) {
if (!_unacknowledged[id]){
Expand All @@ -1093,11 +1108,10 @@ function reissueUnacknowledged (id:string) {
}
}

/*
function processExpectedNetworkErrors (err, source="source unknown") {
debug (`error source: ${source}`);
if (err instanceof AggregateError) {
for (const e of err ) {
for (const e of err.errors ) {
processExpectedNetworkErrors (e, source)
}
} else {
Expand Down Expand Up @@ -1146,29 +1160,26 @@ function processExpectedNetworkErrors (err, source="source unknown") {
throw err;
}
}
}*/
}


export let p2p = {
startp2p: (arg1, arg2) => {
return startp2p(arg1, arg2)
},
spawnp2p: (arg1, arg2) => {
throw new Error("Spawn p2p")
return spawnp2p(arg1, arg2)
},
//_troupeP2P.spawnp2p(arg1, arg2),
sendp2p: (arg1, arg2, arg3) => {
return sendp2p(arg1, arg2, arg3)
},
//_troupeP2P.sendp2p(arg1, arg2, arg3),
whereisp2p: (arg1, arg2) => {
return whereisp2p(arg1, arg2)
},
//_troupeP2P.whereisp2p (arg1, arg2),
stopp2p: async () => {
return await _node.stop()
},
processExpectedNetworkErrors: (arg1, arg2) => {
throw new Error("Error p2p")
}, //processExpectedNetworkErrors
processExpectedNetworkErrors(arg1, arg2)
},
}

0 comments on commit a389b63

Please sign in to comment.