Skip to content

Commit

Permalink
Make parallel example work
Browse files Browse the repository at this point in the history
  • Loading branch information
AnnaBlume99 committed Jun 30, 2023
1 parent a389b63 commit d028f2e
Showing 1 changed file with 143 additions and 45 deletions.
188 changes: 143 additions & 45 deletions rt/src/p2p/p2p.mts
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,6 @@ function setupBlockingHealthChecker (period) {
*/
import { PeerId } from '@libp2p/interface-peer-id'
import type { ConnectionManager } from '@libp2p/interface-connection-manager'

import { tcp } from '@libp2p/tcp'
import { webSockets } from '@libp2p/websockets'
Expand All @@ -681,7 +680,6 @@ import map from 'it-map'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { pushable } from 'it-pushable'
//import { PeerId } from '@libp2p/interface-peer-id'

let bootstrappers = [
'/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ',
Expand Down Expand Up @@ -750,6 +748,30 @@ async function obtainPeerId(nodeId) : Promise<PeerId> {
return id;
}

const _HEALTHCHECKPERIOD = 5000 // 2020-02-10; AA; this should be an option

function setupBlockingHealthChecker (period) {
let _lastHealth:number = Date.now()
let _healthCounter = 0;
let health_threshold = Math.max (period * 1.25 , period + 50)
// AA: 2020-02-10;
// The event queue always has a fair bit of latency, so we adjust for
// the minimal expected latency here; the constant of 50 is an
// empirically derived value, but needs to be critically reevaluated
// as the system evolves

function f() {
let now = Date.now()
// debug (`Health checker running ${now - _lastHealth}, ${new Date()}`)
if (now - _lastHealth > health_threshold) {
debug (`Potential blocking issue: ${_healthCounter} ${now - _lastHealth}`)
}
_lastHealth = now;
setTimeout(f, period);
}
f ()
}

let _node : Libp2p = null;
let _rt = null;

Expand Down Expand Up @@ -778,13 +800,13 @@ async function startp2p(nodeId, rt): Promise<PeerId> {
}
})

setupBlockingHealthChecker(_HEALTHCHECKPERIOD);

_node = nodeListener
_rt = rt
//let connectionManager = (nodeListener as any).components.connectionManager

await nodeListener.handle(_PROTOCOL, async ({ connection, stream }) => {
console.log('Handling protocol')
//streamToConsole(stream)
console.log(`Handling protocol, id: ${connection.remotePeer}`)
setupConnection(connection.remotePeer, stream)
})

Expand Down Expand Up @@ -827,21 +849,23 @@ async function sendp2p(id : PeerId, procId, obj) {

async function push_wrap(id: any, data: any) {
debug (`push_wrap`)
//let peerId = PeerId
let connections = _node.getConnections(id)
let needsToDial = true;
let p = null
if(connections.length >= 1) {
let connection = connections[0]
let streams = connection.streams
if(streams.length >= 1) {
let stream = streams[0]
p = (stream as any).p;
needsToDial = p == undefined;
} else {
debug("streams array empty")
throw new Error
throw new Error //AB: What to do then? Can this happen?
}
} else {
debug("connections array empty")
}
if(needsToDial) {
debug("needs to dial")
let stream = await dial(id.toString())
debug("dialed to obtain stream")
p = (stream as any).p;
Expand All @@ -862,8 +886,7 @@ async function push_wrap(id: any, data: any) {
// issues. we report the errors
// and redial
debug (`push wrap error`)
processExpectedNetworkErrors(err, "push_wrap");
throw err
processExpectedNetworkErrors(err, "push_wrap");
}
}
}
Expand All @@ -885,12 +908,11 @@ function setupConnection(peerId : PeerId, stream) {
try {
for await (const x of source) {
//console.log('Input: ', x)
inputHandler (id, x, peerId)
inputHandler (id, x, peerId) //AB: Something is wonky with the parameters!
}
} catch (err) {
debug (`try catch of the source`)
processExpectedNetworkErrors(err, "setupConnection/pipe");
throw err;
}

debug(`deleting entry for ${id}`);
Expand All @@ -904,44 +926,121 @@ function setupConnection(peerId : PeerId, stream) {
)
stream.p = p; // Storing a reference to the pushable on the stream
// We rely on the p2p library to keep track of streams
debug(`Connection set up with ${id}`);
}

/*function nPeers( ) {
return _node.getPeers().length
}
async function getPeerInfo(id:string) {
const peerId = peerIdFromString(id);
return new Promise ((resolve, reject) => {
let n_attempts = 0;
async function try_find_peer () {
if (_node.peerStore.has(peerId)) {
debug ("peer info is in the store")
resolve (_node.peerStore.get(peerId))
} else {
try {
debug (`calling peerRouting.findPeer ${peerId}`)
const peerInfo = await _node.peerRouting.findPeer (peerId, {timeout:1000});
debug ("findPeer returned")
resolve (peerInfo);
} catch (err) {
debug (`try_find_peer exception`)
if (nPeers() > 0 ) {
n_attempts ++ ;
}
if (err instanceof AggregateError) {
for (let e of err.errors) {
debug (`Find peer error with code: ${e}, ${e.code}`)
}
} else {
debug (`Find peer error: ${err.toString()}`)
throw err;
}
if (n_attempts > 5) {
debug (`Resolving to empty peer info`)
resolve (new PeerInfo(peerId))
// reject (err);
} else {
debug (`try_find_peer: attempt ${n_attempts} failed with ${nPeers()} nodes connected`)
// addPending (try_find_peer);
setTimeout (try_find_peer, 500)
}
}
}
}
try_find_peer ();
});
}
let _relay_id = null;
async function getPeerInfoWithRelay(id:any) {
let known_nodes = p2pconfig.known_nodes;
for (let ni of known_nodes) {
if (ni.nodeid == id) {
// found a known node!
let pi = new PeerInfo (PeerId.createFromB58String(id));
pi.multiaddrs.add (multiaddr(`${ni.ip}`))
debug(`node ${ni.nodeid} will be contacted directly via IP: ${ni.ip}`)
return pi
}
}
debug ("the node is not known; using relay information")
let pi:any = await getPeerInfo (id)
if (_relay_id) {
pi.multiaddrs.add( multiaddr(`/p2p/${_relay_id}/p2p-circuit/p2p/${id}`))
}
// for (let i = 0; i < p2pconfig.relays.length; i++ ) {
// pi.multiaddrs.add( multiaddr(`${p2pconfig.relays[i]}/p2p-circuit/p2p/${id}`))
// }
return pi
}*/
//AB: fix!

function dial(id) {
let i = 0;
let timeout = 2000;
return new Promise((resolve, reject) =>{
async function iterate() {
try {
/*const peerInfo = await getPeerInfoWithRelay(id);
debug ("find peer succeeded");
debug (`dialing will use the following addresses:`)
peerInfo.multiaddrs.forEach( m => {debug (m.toString() ) });
debug (">> -- end of address list -- << ")*/
let peerId : PeerId = peerIdFromString(id)
debug (`trying to dial ${peerId}, attempt number ${i}`)
const stream = await _node.dialProtocol(peerId, _PROTOCOL)
debug ("dial successful")
setupConnection (peerId, stream);
resolve ( stream );
} catch ( err ) {
processExpectedNetworkErrors (err, "dial");

// if the error is suppressed we move on to trying 10 times
// with exponential backoff
// 2020-02-10; AA: TODO: this code has a hardcoded constant
if (i <= 10) {
debug (`dial failed, we retry in ${timeout} seconds`)
debug(err)
setTimeout (iterate, timeout);
i ++ ;
timeout *= 2
} else {
debug (`we are giving up on dialing`)
reject (err);
}
}
async function iterate() {
try {
/*const peerInfo = await getPeerInfoWithRelay(id);
debug ("find peer succeeded");
debug (`dialing will use the following addresses:`)
peerInfo.multiaddrs.forEach( m => {debug (m.toString() ) });
debug (">> -- end of address list -- << ")*/
let peerId : PeerId = peerIdFromString(id)
debug (`trying to dial ${peerId}, attempt number ${i}`)
const stream = await _node.dialProtocol(peerId, _PROTOCOL)
debug ("dial successful")
setupConnection (peerId, stream);
resolve ( stream );
} catch ( err ) {
processExpectedNetworkErrors (err, "dial");

// if the error is suppressed we move on to trying 10 times
// with exponential backoff
// 2020-02-10; AA: TODO: this code has a hardcoded constant
if (i <= 10) {
debug (`dial failed, we retry in ${timeout} seconds`)
debug(err)
setTimeout (iterate, timeout);
i++;
timeout *= 2;
} else {
debug (`we are giving up on dialing`)
reject (err);
}
}
iterate ()
}
iterate ()
})
}

Expand Down Expand Up @@ -1162,7 +1261,6 @@ function processExpectedNetworkErrors (err, source="source unknown") {
}
}


export let p2p = {
startp2p: (arg1, arg2) => {
return startp2p(arg1, arg2)
Expand Down

0 comments on commit d028f2e

Please sign in to comment.