Skip to content
This repository has been archived by the owner on May 7, 2024. It is now read-only.

feat: add transport listener #31

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 41 additions & 34 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -1,51 +1,58 @@
import { spawn, exec } from "child_process";
import { existsSync } from "fs";
import { pipe } from 'it-pipe'
import { noise } from '@chainsafe/libp2p-noise'
import { createLibp2p } from 'libp2p'

/** @type {import('aegir/types').PartialOptions} */
export default {
test: {
async before() {
if (!existsSync("./go-libp2p-webtransport-server/main")) {
await new Promise((resolve, reject) => {
exec('go build -o main main.go',
{ cwd: "./go-libp2p-webtransport-server" },
(error, stdout, stderr) => {
if (error) {
reject(error)
console.error(`exec error: ${error}`);
return;
}
resolve()
});
})
}
async before () {
const { generateWebTransportCertificates } = await import('./dist/test/certificate.js')
const { webTransport } = await import('./dist/src/index.js')

const server = spawn('./main', [], { cwd: "./go-libp2p-webtransport-server", killSignal: "SIGINT" });
server.stderr.on('data', (data) => {
console.log(`stderr: ${data}`, typeof data);
const node = await createLibp2p({
addresses: {
listen: ['/ip4/0.0.0.0/udp/0/quic/webtransport']
},
transports: [webTransport({
certificates: await generateWebTransportCertificates([
{ shortName: 'C', value: 'DE' },
{ shortName: 'ST', value: 'Berlin' },
{ shortName: 'L', value: 'Berlin' },
{ shortName: 'O', value: '@libp2p/webtransport tests' },
{ shortName: 'CN', value: '127.0.0.1' }
], [{
// can be max 14 days according to the spec
days: 13
}, {
// can be max 14 days according to the spec
days: 13,
// start the second certificate after the first expires
start: new Date(Date.now() + (86400000 * 13))
}])
})],
connectionEncryption: [noise()]
})
const serverAddr = await (new Promise((resolve => {
server.stdout.on('data', (data) => {
console.log(`stdout: ${data}`, typeof data);
if (data.includes("addr=")) {
// Parse the addr out
resolve((data + "").match(/addr=([^\s]*)/)[1])
}
});
})))

await node.start()

await node.handle('echo', ({ stream }) => {
void pipe(stream, stream)
})

const multiaddrs = node.getMultiaddrs()

return {
server,
node,
env: {
serverAddr
serverAddr: multiaddrs[0].toString()
}
}
},
async after(_, { server }) {
server.kill("SIGINT")
async after (_, before) {
await before.node.stop()
}
},
build: {
bundlesizeMax: '18kB'
bundlesizeMax: '97kB'
}
}
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Loading this module through a script tag will make it's exports available as `Li

## Description

`libp2p-webtransport` is the WebTransport transport implementation compatible with libp2p.
`@libp2p/webtransport` is the WebTransport transport implementation compatible with libp2p.

## Usage

Expand Down
27 changes: 21 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -148,29 +148,44 @@
"lint": "aegir lint",
"dep-check": "aegir dep-check",
"build": "aegir build",
"test": "aegir test -t browser",
"test": "aegir test -t node -t browser",
"test:node": "aegir test -t node --cov",
"test:chrome": "aegir test -t browser --cov",
"test:chrome-webworker": "aegir test -t webworker",
"release": "aegir release",
"docs": "aegir docs"
},
"dependencies": {
"@chainsafe/libp2p-noise": "^11.0.0",
"@chainsafe/libp2p-noise": "^11.0.1",
"@fails-components/webtransport": "^0.1.2",
"@libp2p/interface-connection": "^3.0.2",
"@libp2p/interface-peer-id": "^2.0.0",
"@libp2p/interface-stream-muxer": "^3.0.0",
"@libp2p/interface-transport": "^2.0.0",
"@libp2p/interfaces": "^3.0.4",
"@libp2p/logger": "^2.0.2",
"@libp2p/peer-id": "^2.0.0",
"@libp2p/peer-id": "^2.0.3",
"@libp2p/utils": "^3.0.2",
"@multiformats/multiaddr": "^12.1.0",
"browser-readablestream-to-it": "^2.0.0",
"it-stream-types": "^1.0.4",
"multiformats": "^11.0.0",
"multiformats": "^11.0.2",
"node-forge": "^1.3.1",
"p-timeout": "^6.0.0",
"uint8arraylist": "^2.3.3"
},
"devDependencies": {
"@libp2p/interface-transport-compliance-tests": "^3.0.2",
"@libp2p/peer-id-factory": "^2.0.3",
"aegir": "^38.1.7",
"libp2p": "^0.43.2"
"iso-random-stream": "^2.0.2",
"it-pipe": "^2.0.4",
"libp2p": "^0.43.2",
"sinon": "^15.0.0"
},
"browser": {
"./dist/src/listener.js": "./dist/src/listener.browser.js"
"./dist/src/listener.js": "./dist/src/listener.browser.js",
"./dist/src/webtransport.js": "./dist/src/webtransport.browser.js",
"./dist/test/certificate.js": "./dist/test/certificate.browser.js"
}
}
127 changes: 127 additions & 0 deletions src/create-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import { EventEmitter } from 'events'
import { logger } from '@libp2p/logger'
import { Http3Server, WebTransportSession } from '@fails-components/webtransport'
import pTimeout from 'p-timeout'

const log = logger('libp2p:webtransport:server')

export interface WebTransportServer extends EventEmitter {
listening: boolean
sessionTimeout: number

close: (callback?: () => void) => void
listen: () => void
address: () => { port: number, host: string, family: 'IPv4' | 'IPv6' } | null
}

class DefaultWebTransportServer extends EventEmitter implements WebTransportServer {
private readonly server: Http3Server
public listening: boolean
/**
* How long in ms to wait for an incoming session to be ready
*/
public sessionTimeout: number

constructor (init: any) {
super()

this.server = new Http3Server(init)
this.listening = false

this.sessionTimeout = 1000
}

close (callback?: () => void): void {
if (callback != null) {
this.addListener('close', callback)
}

this.server.stopServer()
this.server.closed
.then(() => {
this.listening = false
this.emit('close')
})
.catch((err) => {
this.emit('error', err)
})
}

listen (): void {
this.server.startServer()
this.server.ready
.then(() => {
this.listening = true
this.emit('listening')

this._processIncomingSessions().catch(err => {
this.emit('error', err)
})
})
.catch((err) => {
this.emit('error', err)
})
}

address (): { port: number, host: string, family: 'IPv4' | 'IPv6' } | null {
return this.server.address()
}

async _processIncomingSessions (): Promise<void> {
// FIXME: incompatible webtransport implementations
const paths = [
// Chrome
'/.well-known/libp2p-webtransport?type=noise',

// @fails-components/webtransport
'/.well-known/libp2p-webtransport'
]

await Promise.all(
paths.map(async path => {
const sessionStream = this.server.sessionStream(path)
const sessionReader = sessionStream.getReader()

while (true) {
const { done, value: session } = await sessionReader.read()

if (done) {
log('session reader finished')
break
}

void Promise.resolve()
.then(async () => {
const timeout = pTimeout(session.ready, {
milliseconds: this.sessionTimeout
})

try {
await timeout

this.emit('session', session)
} catch (err) {
log.error('error waiting for session to become ready', err)
} finally {
timeout.clear()
}
})
}
})
)
}
}

export interface SessionHandler {
(session: WebTransportSession): void
}

export function createServer (init: any, sessionHandler?: SessionHandler): WebTransportServer {
const server = new DefaultWebTransportServer(init)

if (sessionHandler != null) {
server.addListener('session', sessionHandler)
}

return server
}
Loading