diff --git a/relay/Makefile b/relay/Makefile index bed8798..a9b75d1 100644 --- a/relay/Makefile +++ b/relay/Makefile @@ -2,4 +2,5 @@ start-relay: ./troupe-p2p-relay --key=keys/relay.priv generate-relay-key: + mkdir -p keys node $(TROUPE)/rt/built/p2p/mkid.mjs --privkeyfile=keys/relay.priv --idfile=keys/relay.id --verbose \ No newline at end of file diff --git a/relay/relay.mjs b/relay/relay.mjs new file mode 100644 index 0000000..18f1d7b --- /dev/null +++ b/relay/relay.mjs @@ -0,0 +1,46 @@ +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { mplex } from '@libp2p/mplex' +import { webSockets } from '@libp2p/websockets' +import { createLibp2p } from 'libp2p' +import { circuitRelayServer } from 'libp2p/circuit-relay' +import { identifyService } from 'libp2p/identify' +import { createFromJSON } from '@libp2p/peer-id-factory' + +async function main () { + const id = await createFromJSON({id : "12D3KooWShh9qmeS1UEgwWpjAsrjsigu8UGh8DRKyx1UG6HeHzjf", + privKey : "CAESQEQ7HBed1HEMpRHdhDmsJOlzHsVNBEWVc7DjEzuQtElv+uET7jQtZlGNKpltf2w4P7UqMdSYm4cYAGzjHcGcSj4="}); + const node = await createLibp2p({ + peerId : id, + addresses: { + listen: ['/ip4/0.0.0.0/tcp/5555/ws'] + // TODO check "What is next?" section + // announce: ['/dns4/auto-relay.libp2p.io/tcp/443/wss/p2p/QmWDn2LY8nannvSWJzruUYoLZ4vV83vfCBwd8DipvdgQc3'] + }, + transports: [ + webSockets() + ], + connectionEncryption: [ + noise() + ], + streamMuxers: [ + yamux(), + mplex() + ], + services: { + identify: identifyService(), + relay: circuitRelayServer() + } + }) + + await node.handle("/trouperelay/keepalive", async ({ connection, stream }) => { + console.log(`Relay handling protocol, id: ${connection.remotePeer}`) + //setupConnection(connection.remotePeer, stream) + }) + + console.log(`Node started with id ${node.peerId.toString()}`) + console.log('Listening on:') + node.getMultiaddrs().forEach((ma) => console.log(ma.toString())) +} + +main() \ No newline at end of file diff --git a/rt/src/p2p/p2p.mts b/rt/src/p2p/p2p.mts index 3a689b2..93c95b5 100644 --- a/rt/src/p2p/p2p.mts +++ b/rt/src/p2p/p2p.mts @@ -58,7 +58,7 @@ the libp2p). // LOGGING AND DEBUGGING -import yargs from 'yargs' +/*import yargs from 'yargs' let logLevel = yargs.argv.debugp2p? 'debug':'info' let __port = yargs.argv.port || 0 @@ -76,7 +76,7 @@ let logger; const info = x => logger.info(x) const debug = x => logger.debug(x) -const error = x => logger.error(x); +const error = x => logger.error(x);*/ /* @@ -100,7 +100,7 @@ const p2pconfig = require('./p2pconfig.js') // const Pushable = require('pull-pushable') -import {v4 as uuidv4} from 'uuid' +/*import {v4 as uuidv4} from 'uuid' const MessageType = { @@ -110,7 +110,7 @@ const MessageType = { TEST: 3, WHEREIS: 4, WHEREISOK: 5 -} +}*/ /* @@ -661,6 +661,37 @@ function setupBlockingHealthChecker (period) { } */ +// LOGGING AND DEBUGGING + +import yargs from 'yargs' +let logLevel = yargs.argv.debugp2p? 'debug':'info' +let __port = yargs.argv.port || 0 + +const _PROTOCOL = "/troupe/1.0.0" + +let logger; +(async() => { + let { mkLogger } = await import ('../logger.mjs'); + logger = mkLogger ('p2p', logLevel) +})() + +// const logger = require('../logger.js').mkLogger('p2p',logLevel); + +const info = x => logger.info(x) +const debug = x => logger.debug(x) +const error = x => logger.error(x); + +import {v4 as uuidv4} from 'uuid' + +const MessageType = { + SPAWN: 0, + SPAWNOK: 1, + SEND: 2, + TEST: 3, + WHEREIS: 4, + WHEREISOK: 5 +} + import { PeerId } from '@libp2p/interface-peer-id' import { tcp } from '@libp2p/tcp' @@ -680,8 +711,18 @@ import map from 'it-map' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { pushable } from 'it-pushable' +import p2pconfig from './p2pconfig.js'; //AB: ecmascript? +import { multiaddr } from '@multiformats/multiaddr' +import { identifyService } from 'libp2p/identify' +import { circuitRelayTransport } from 'libp2p/circuit-relay' + +//import { secio } from 'libp2p-secio'; //AB: deprecated - find out why it's needed +import pkg from 'libp2p-secio' +import { nodeTrustLevel } from '../TrustManager.mjs'; +const {secio} = pkg; -let bootstrappers = [ + +let bootstrappers = [ //AB: bootstrap known_nodes from config?? Make findNode easier '/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ', '/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN', '/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb', @@ -696,23 +737,29 @@ async function createLibp2p (_options) { const defaults = { transports: [ tcp(), - webSockets() + webSockets(), + circuitRelayTransport({ + discoverRelays: 2 //AB: what to set this to? + }) ], streamMuxers: [ yamux(), mplex() ], connectionEncryption: [ - noise() + noise(), ], peerDiscovery: [ /*bootstrap({ list: bootstrappers }),*/ - mdns({ + /*mdns({ interval: 20e3 - }) + })*/ ], + services: { + identify: identifyService() + } /*dht: kadDHT({ kBucketSize: 20, clientMode: true // Whether to run the WAN DHT in client or server mode (default: client mode) @@ -743,7 +790,7 @@ async function obtainPeerId(nodeId) : Promise { } } /*let _peerInfo = new PeerInfo(id) - _peerInfo.multiaddrs.add(`/ip4/0.0.0.0/tcp/${__port}`); + _peerInfo.multiaddrs.add(`/ip4/0.0.0.0/tcp/${__port}`); //AB: do this??? already in adresses, possibly return _peerInfo;*/ return id; } @@ -789,48 +836,58 @@ async function startp2p(nodeId, rt): Promise { let id : PeerId = await obtainPeerId(nodeId); - let nodeListener : Libp2p = await createLibp2p({ - peerId: id, - addresses: { - listen: ['/ip4/0.0.0.0/tcp/0'] - }, - connectionManager : { - maxConnections: Infinity, - minConnections: 0 - } - }) - + try { + let nodeListener : Libp2p = await createLibp2p({ + peerId: id, + addresses: { + listen: ['/ip4/0.0.0.0/tcp/0'] + }, + connectionManager : { + maxConnections: Infinity, + minConnections: 0 + } + }) + + _node = nodeListener + _rt = rt + } catch (err) { + debug(`Something wrong while creating Libp2p node: ${err}`) + throw err + } + setupBlockingHealthChecker(_HEALTHCHECKPERIOD); - - _node = nodeListener - _rt = rt - - await nodeListener.handle(_PROTOCOL, async ({ connection, stream }) => { - console.log(`Handling protocol, id: ${connection.remotePeer}`) + + await _node.handle(_PROTOCOL, async ({ connection, stream }) => { + debug(`Handling protocol, id: ${connection.remotePeer}`) setupConnection(connection.remotePeer, stream) }) - nodeListener.addEventListener('peer:discovery', async (evt) => { + _node.addEventListener('peer:discovery', async (evt) => { const peerInfo = evt.detail - console.log('Discovered:', peerInfo.id.toString()) // add to PeerStore??? - - /*setTimeout(() => { - console.log('Sending whereIs') - let data = "Hello" - whereisp2p(peerInfo.id, peerInfo.id.toString()) - }, 3000)*/ - - //await dial(peerInfo.id.toString()) - //const stream = await nodeListener.dialProtocol(peerInfo.id, _PROTOCOL) - //setupConnection(peerInfo.id, stream) - //stdinToStream(stream) + debug(`Discovered: ${peerInfo.id.toString()}`) // add to PeerStore??? }) - nodeListener.addEventListener('peer:connect', (evt) => { + _node.addEventListener('peer:connect', (evt) => { const peerId = evt.detail - console.log('Connection established to:', peerId.toString()) + debug(`Connection established to: ${peerId.toString()}`) }) + _node.addEventListener('peer:disconnect', (evt) => { + let id = evt.detail + debug (`-- disconnect: ${id}`) + if (_relayTable[id.toString()]) { + debug (`deleting relay table entry`) + delete _relayTable[id.toString()] + } + }) + + debug("p2p node started") + + /*for (let relay_addr of p2pconfig.relays ) { + keepAliveRelay(relay_addr); + }*/ + keepAliveRelay("/ip4/127.0.0.1/tcp/5555/ws/p2p/12D3KooWShh9qmeS1UEgwWpjAsrjsigu8UGh8DRKyx1UG6HeHzjf") + debug(`id is ${id.toString()}`) return id } @@ -853,7 +910,7 @@ async function push_wrap(id: any, data: any) { let needsToDial = true; let p = null if(connections.length >= 1) { - let connection = connections[0] + let connection = connections[0] //AB: check all connections let streams = connection.streams if(streams.length >= 1) { let stream = streams[0] @@ -929,24 +986,35 @@ function setupConnection(peerId : PeerId, stream) { debug(`Connection set up with ${id}`); } -/*function nPeers( ) { +function nPeers( ) { return _node.getPeers().length } -async function getPeerInfo(id:string) { +async function getPeerInfo(id:string) : Promise{ const peerId = peerIdFromString(id); return new Promise ((resolve, reject) => { let n_attempts = 0; async function try_find_peer () { - if (_node.peerStore.has(peerId)) { + if (await _node.peerStore.has(peerId)) { debug ("peer info is in the store") - resolve (_node.peerStore.get(peerId)) + try { + let foundPeer = await _node.peerStore.get(peerId) + resolve (foundPeer.id) + } catch (err) { + debug(`Error in getPeerInfo: ${err}`) + throw err + } } else { try { debug (`calling peerRouting.findPeer ${peerId}`) - const peerInfo = await _node.peerRouting.findPeer (peerId, {timeout:1000}); + const peerInfo = await _node.peerRouting.findPeer (peerId, {signal : AbortSignal.timeout(1000)}); debug ("findPeer returned") - resolve (peerInfo); + await _node.peerStore.patch(peerInfo.id, { + multiaddrs: + peerInfo.multiaddrs + }) //AB: necessary?? + debug("added multiaddr to store") + resolve (peerInfo.id); } catch (err) { debug (`try_find_peer exception`) if (nPeers() > 0 ) { @@ -959,12 +1027,12 @@ async function getPeerInfo(id:string) { } } else { debug (`Find peer error: ${err.toString()}`) - throw err; + //throw err; } if (n_attempts > 5) { debug (`Resolving to empty peer info`) - resolve (new PeerInfo(peerId)) + resolve (peerId) // reject (err); } else { debug (`try_find_peer: attempt ${n_attempts} failed with ${nPeers()} nodes connected`) @@ -978,32 +1046,106 @@ async function getPeerInfo(id:string) { }); } +export interface IHash { + [details: string] : any; +} //AB: use in-built functionality instead + let _relay_id = null; +let _KEEPALIVE = 5000 //AB: can tag peers with "Keep_alive" in the peerstore +let _keepAliveCounter = 0; +let _relayTable:IHash = {} + +async function dialRelay (relay_addr) { + debug (`dialing relay ${relay_addr}`) + let id = relay_addr.split('/').pop(); + const relayId = peerIdFromString(id) + await _node.peerStore.patch(relayId, { + multiaddrs: [ + multiaddr(`${relay_addr}`) + ] + }) + await _node.peerStore.merge(relayId, { + tags : { + 'keep-alive' : {} + } + }); + debug (`Added relay address`) + const conn = await _node.dial(relayId); + debug (`Got relay connection`) + const stream = await _node.dialProtocol (relayId, "/trouperelay/keepalive") + debug (`Got relay stream`) + const peerId = conn.remotePeer + _relay_id = peerId.toString() + debug (`~~ relay dialed, keep alive counter is ${_keepAliveCounter++}`) + const p = pushable({ objectMode : true }) + + pipe (stream.source, + async (source: any) => { + let ss = "" + for await (const msg of source ) { + debug (`~~ relay says:${msg.toString().trim()}`) + } + }) + pipe (p, + (stream.sink as any)); + + _relayTable[id] = p; + return p; +} + +async function keepAliveRelay (relay_addr:string) { + let id = relay_addr.split('/').pop(); + debug (`relay id is ${id}`) + let timeout = _KEEPALIVE; + async function f () { + try { + let p = _relayTable[id] ? _relayTable[id] : await dialRelay (relay_addr) + p.push (`keep alive request ${_keepAliveCounter++}\n`) + timeout = _KEEPALIVE + } catch (err ) { + timeout = timeout < 600e3 ? timeout * 2 : timeout // exponential backoff with 10 min limit + processExpectedNetworkErrors(err, "relay") + debug (`~~ error reaching the relay server; we will retry again in ${timeout/1000} seconds`) + } + setTimeout(f,timeout) + } + f () + } + async function getPeerInfoWithRelay(id:any) { let known_nodes = p2pconfig.known_nodes; for (let ni of known_nodes) { if (ni.nodeid == id) { // found a known node! - let pi = new PeerInfo (PeerId.createFromB58String(id)); - pi.multiaddrs.add (multiaddr(`${ni.ip}`)) + let pi = peerIdFromString(id); + await _node.peerStore.patch(pi, { + multiaddrs: [ + multiaddr(`${ni.ip}`) + ] + }) + //pi.multiaddrs.add (multiaddr(`${ni.ip}`)) debug(`node ${ni.nodeid} will be contacted directly via IP: ${ni.ip}`) return pi } } debug ("the node is not known; using relay information") - let pi:any = await getPeerInfo (id) + let pi = await getPeerInfo(id) if (_relay_id) { - pi.multiaddrs.add( multiaddr(`/p2p/${_relay_id}/p2p-circuit/p2p/${id}`)) + //pi.multiaddrs.add( multiaddr(`/p2p/${_relay_id}/p2p-circuit/p2p/${id}`)) + await _node.peerStore.patch(pi, { + multiaddrs: [ + multiaddr(`/p2p/${_relay_id}/p2p-circuit/p2p/${id}`) + ] + }) } // for (let i = 0; i < p2pconfig.relays.length; i++ ) { // pi.multiaddrs.add( multiaddr(`${p2pconfig.relays[i]}/p2p-circuit/p2p/${id}`)) // } return pi -}*/ -//AB: fix! +} function dial(id) { let i = 0; @@ -1016,7 +1158,7 @@ function dial(id) { debug (`dialing will use the following addresses:`) peerInfo.multiaddrs.forEach( m => {debug (m.toString() ) }); debug (">> -- end of address list -- << ")*/ - let peerId : PeerId = peerIdFromString(id) + let peerId : PeerId = await getPeerInfoWithRelay(id) debug (`trying to dial ${peerId}, attempt number ${i}`) const stream = await _node.dialProtocol(peerId, _PROTOCOL) debug ("dial successful") @@ -1153,7 +1295,7 @@ async function whereisp2p(id, str) { function f () { push_wrap(id, { - messageType: MessageType.WHEREIS, + messageType : MessageType.WHEREIS, whereisNonce : whereisNonce, message : str });