diff --git a/packages/integration-tests/test/fixtures/base-options.browser.ts b/packages/integration-tests/test/fixtures/base-options.browser.ts index bfa7fd4488..66bbea33c8 100644 --- a/packages/integration-tests/test/fixtures/base-options.browser.ts +++ b/packages/integration-tests/test/fixtures/base-options.browser.ts @@ -1,7 +1,6 @@ import { yamux } from '@chainsafe/libp2p-yamux' import { circuitRelayTransport } from '@libp2p/circuit-relay-v2' import { identify } from '@libp2p/identify' -import { mockConnectionGater } from '@libp2p/interface-compliance-tests/mocks' import { mplex } from '@libp2p/mplex' import { plaintext } from '@libp2p/plaintext' import { webRTC } from '@libp2p/webrtc' @@ -32,7 +31,9 @@ export function createBaseOptions false + }, services: { identify: identify() } diff --git a/packages/interface-compliance-tests/package.json b/packages/interface-compliance-tests/package.json index bc0d4af4ad..6dcbbfc550 100644 --- a/packages/interface-compliance-tests/package.json +++ b/packages/interface-compliance-tests/package.json @@ -136,16 +136,15 @@ "p-defer": "^4.0.1", "p-event": "^6.0.1", "p-limit": "^6.0.0", + "p-retry": "^6.2.0", "p-wait-for": "^5.0.2", "protons-runtime": "^5.4.0", "race-signal": "^1.1.0", "sinon": "^18.0.0", - "tdigest": "^0.1.2", "uint8arraylist": "^2.4.8", "uint8arrays": "^5.1.0" }, "devDependencies": { - "@types/tdigest": "^0.1.4", "protons": "^7.5.0" } } diff --git a/packages/interface-compliance-tests/src/mocks/connection-gater.ts b/packages/interface-compliance-tests/src/mocks/connection-gater.ts deleted file mode 100644 index eaaf62038b..0000000000 --- a/packages/interface-compliance-tests/src/mocks/connection-gater.ts +++ /dev/null @@ -1,18 +0,0 @@ -import type { ConnectionGater } from '@libp2p/interface' - -export function mockConnectionGater (): ConnectionGater { - return { - denyDialPeer: async () => Promise.resolve(false), - denyDialMultiaddr: async () => Promise.resolve(false), - denyInboundConnection: async () => Promise.resolve(false), - denyOutboundConnection: async () => Promise.resolve(false), - denyInboundEncryptedConnection: async () => Promise.resolve(false), - denyOutboundEncryptedConnection: async () => Promise.resolve(false), - denyInboundUpgradedConnection: async () => Promise.resolve(false), - denyOutboundUpgradedConnection: async () => Promise.resolve(false), - denyInboundRelayReservation: async () => Promise.resolve(false), - denyOutboundRelayedConnection: async () => Promise.resolve(false), - denyInboundRelayedConnection: async () => Promise.resolve(false), - filterMultiaddrForPeer: async () => Promise.resolve(true) - } -} diff --git a/packages/interface-compliance-tests/src/mocks/index.ts b/packages/interface-compliance-tests/src/mocks/index.ts index 1c94ae79da..bdf8c1a5a3 100644 --- a/packages/interface-compliance-tests/src/mocks/index.ts +++ b/packages/interface-compliance-tests/src/mocks/index.ts @@ -1,4 +1,3 @@ -export { mockConnectionGater } from './connection-gater.js' export { mockConnectionManager, mockNetwork } from './connection-manager.js' export { mockConnection, mockStream, streamPair, connectionPair } from './connection.js' export { mockMultiaddrConnection, mockMultiaddrConnPair } from './multiaddr-connection.js' @@ -6,7 +5,6 @@ export { mockMuxer } from './muxer.js' export { mockRegistrar } from './registrar.js' export { mockUpgrader } from './upgrader.js' export { mockDuplex } from './duplex.js' -export { mockMetrics } from './metrics.js' export type { MockUpgraderInit } from './upgrader.js' export type { MockNetworkComponents, MockConnectionManagerComponents, MockNetwork } from './connection-manager.js' export type { MockConnectionOptions, StreamInit, StreamPairInit } from './connection.js' diff --git a/packages/interface-compliance-tests/src/mocks/metrics.ts b/packages/interface-compliance-tests/src/mocks/metrics.ts deleted file mode 100644 index 8bcd27c366..0000000000 --- a/packages/interface-compliance-tests/src/mocks/metrics.ts +++ /dev/null @@ -1,385 +0,0 @@ -import { TDigest } from 'tdigest' -import type { MultiaddrConnection, Stream, Connection, Metric, MetricGroup, StopTimer, Metrics, CalculatedMetricOptions, MetricOptions, Histogram, HistogramOptions, HistogramGroup, Summary, SummaryOptions, SummaryGroup, CalculatedHistogramOptions, CalculatedSummaryOptions } from '@libp2p/interface' - -class DefaultMetric implements Metric { - public value: number = 0 - - update (value: number): void { - this.value = value - } - - increment (value: number = 1): void { - this.value += value - } - - decrement (value: number = 1): void { - this.value -= value - } - - reset (): void { - this.value = 0 - } - - timer (): StopTimer { - const start = Date.now() - - return () => { - this.value = Date.now() - start - } - } -} - -class DefaultGroupMetric implements MetricGroup { - public values: Record = {} - - update (values: Record): void { - Object.entries(values).forEach(([key, value]) => { - this.values[key] = value - }) - } - - increment (values: Record): void { - Object.entries(values).forEach(([key, value]) => { - this.values[key] = this.values[key] ?? 0 - const inc = typeof value === 'number' ? value : 1 - - this.values[key] += Number(inc) - }) - } - - decrement (values: Record): void { - Object.entries(values).forEach(([key, value]) => { - this.values[key] = this.values[key] ?? 0 - const dec = typeof value === 'number' ? value : 1 - - this.values[key] -= Number(dec) - }) - } - - reset (): void { - this.values = {} - } - - timer (key: string): StopTimer { - const start = Date.now() - - return () => { - this.values[key] = Date.now() - start - } - } -} - -class DefaultHistogram implements Histogram { - public bucketValues = new Map() - public countValue: number = 0 - public sumValue: number = 0 - - constructor (opts: HistogramOptions) { - const buckets = [ - ...(opts.buckets ?? [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]), - Infinity - ] - for (const bucket of buckets) { - this.bucketValues.set(bucket, 0) - } - } - - observe (value: number): void { - this.countValue++ - this.sumValue += value - - for (const [bucket, count] of this.bucketValues.entries()) { - if (value <= bucket) { - this.bucketValues.set(bucket, count + 1) - } - } - } - - reset (): void { - this.countValue = 0 - this.sumValue = 0 - for (const bucket of this.bucketValues.keys()) { - this.bucketValues.set(bucket, 0) - } - } - - timer (): StopTimer { - const start = Date.now() - - return () => { - this.observe(Date.now() - start) - } - } -} - -class DefaultHistogramGroup implements HistogramGroup { - public histograms: Record = {} - - constructor (opts: HistogramOptions) { - this.histograms = {} - } - - observe (values: Partial>): void { - for (const [key, value] of Object.entries(values) as Array<[string, number]>) { - if (this.histograms[key] === undefined) { - this.histograms[key] = new DefaultHistogram({}) - } - - this.histograms[key].observe(value) - } - } - - reset (): void { - for (const histogram of Object.values(this.histograms)) { - histogram.reset() - } - } - - timer (key: string): StopTimer { - const start = Date.now() - - return () => { - this.observe({ [key]: Date.now() - start }) - } - } -} - -class DefaultSummary implements Summary { - public sumValue: number = 0 - public countValue: number = 0 - public percentiles: number[] - public tdigest = new TDigest(0.01) - private readonly compressCount: number - - constructor (opts: SummaryOptions) { - this.percentiles = opts.percentiles ?? [0.01, 0.05, 0.5, 0.9, 0.95, 0.99, 0.999] - this.compressCount = opts.compressCount ?? 1000 - } - - observe (value: number): void { - this.sumValue += value - this.countValue++ - - this.tdigest.push(value) - if (this.tdigest.size() > this.compressCount) { - this.tdigest.compress() - } - } - - reset (): void { - this.sumValue = 0 - this.countValue = 0 - - this.tdigest.reset() - } - - timer (): StopTimer { - const start = Date.now() - - return () => { - this.observe(Date.now() - start) - } - } -} - -class DefaultSummaryGroup implements SummaryGroup { - public summaries: Record = {} - private readonly opts: SummaryOptions - - constructor (opts: SummaryOptions) { - this.summaries = {} - this.opts = opts - } - - observe (values: Record): void { - for (const [key, value] of Object.entries(values)) { - if (this.summaries[key] === undefined) { - this.summaries[key] = new DefaultSummary(this.opts) - } - - this.summaries[key].observe(value) - } - } - - reset (): void { - for (const summary of Object.values(this.summaries)) { - summary.reset() - } - } - - timer (key: string): StopTimer { - const start = Date.now() - - return () => { - this.observe({ [key]: Date.now() - start }) - } - } -} - -class MockMetrics implements Metrics { - public metrics = new Map() - - trackMultiaddrConnection (maConn: MultiaddrConnection): void { - - } - - trackProtocolStream (stream: Stream, connection: Connection): void { - - } - - registerMetric (name: string, opts: CalculatedMetricOptions): void - registerMetric (name: string, opts?: MetricOptions): Metric - registerMetric (name: string, opts: any): any { - if (name == null || name.trim() === '') { - throw new Error('Metric name is required') - } - - if (opts?.calculate != null) { - // calculated metric - this.metrics.set(name, opts.calculate) - return - } - - const metric = new DefaultMetric() - this.metrics.set(name, metric) - - return metric - } - - registerCounter (name: string, opts: CalculatedMetricOptions): void - registerCounter (name: string, opts?: MetricOptions): Metric - registerCounter (name: string, opts: any): any { - if (name == null || name.trim() === '') { - throw new Error('Metric name is required') - } - - if (opts?.calculate != null) { - // calculated metric - this.metrics.set(name, opts.calculate) - return - } - - const metric = new DefaultMetric() - this.metrics.set(name, metric) - - return metric - } - - registerMetricGroup (name: string, opts: CalculatedMetricOptions>): void - registerMetricGroup (name: string, opts?: MetricOptions): MetricGroup - registerMetricGroup (name: string, opts: any): any { - if (name == null || name.trim() === '') { - throw new Error('Metric name is required') - } - - if (opts?.calculate != null) { - // calculated metric - this.metrics.set(name, opts.calculate) - return - } - - const metric = new DefaultGroupMetric() - this.metrics.set(name, metric) - - return metric - } - - registerCounterGroup (name: string, opts: CalculatedMetricOptions>): void - registerCounterGroup (name: string, opts?: MetricOptions): MetricGroup - registerCounterGroup (name: string, opts: any): any { - if (name == null || name.trim() === '') { - throw new Error('Metric name is required') - } - - if (opts?.calculate != null) { - // calculated metric - this.metrics.set(name, opts.calculate) - return - } - - const metric = new DefaultGroupMetric() - this.metrics.set(name, metric) - - return metric - } - - registerHistogram (name: string, opts: CalculatedHistogramOptions): void - registerHistogram (name: string, opts?: HistogramOptions): Histogram - registerHistogram (name: string, opts: any = {}): any { - if (name == null || name.trim() === '') { - throw new Error('Metric name is required') - } - - if (opts?.calculate != null) { - // calculated metric - this.metrics.set(name, opts.calculate) - return - } - - const metric = new DefaultHistogram(opts) - this.metrics.set(name, metric) - - return metric - } - - registerHistogramGroup (name: string, opts: CalculatedHistogramOptions>): void - registerHistogramGroup (name: string, opts?: HistogramOptions): HistogramGroup - registerHistogramGroup (name: string, opts: any = {}): any { - if (name == null || name.trim() === '') { - throw new Error('Metric name is required') - } - - if (opts?.calculate != null) { - // calculated metric - this.metrics.set(name, opts.calculate) - return - } - - const metric = new DefaultHistogramGroup(opts) - this.metrics.set(name, metric) - - return metric - } - - registerSummary (name: string, opts: CalculatedSummaryOptions): void - registerSummary (name: string, opts?: SummaryOptions): Summary - registerSummary (name: string, opts: any = {}): any { - if (name == null || name.trim() === '') { - throw new Error('Metric name is required') - } - - if (opts?.calculate != null) { - // calculated metric - this.metrics.set(name, opts.calculate) - return - } - - const metric = new DefaultSummary(opts) - this.metrics.set(name, metric) - - return metric - } - - registerSummaryGroup (name: string, opts: CalculatedSummaryOptions>): void - registerSummaryGroup (name: string, opts?: SummaryOptions): SummaryGroup - registerSummaryGroup (name: string, opts: any = {}): any { - if (name == null || name.trim() === '') { - throw new Error('Metric name is required') - } - - if (opts?.calculate != null) { - // calculated metric - this.metrics.set(name, opts.calculate) - return - } - - const metric = new DefaultSummaryGroup(opts) - this.metrics.set(name, metric) - - return metric - } -} - -export function mockMetrics (): () => Metrics { - return () => new MockMetrics() -} diff --git a/packages/interface-compliance-tests/src/mocks/peer-discovery.ts b/packages/interface-compliance-tests/src/mocks/peer-discovery.ts deleted file mode 100644 index 83fb7c5f45..0000000000 --- a/packages/interface-compliance-tests/src/mocks/peer-discovery.ts +++ /dev/null @@ -1,59 +0,0 @@ -import { generateKeyPair } from '@libp2p/crypto/keys' -import { TypedEventEmitter, peerDiscoverySymbol } from '@libp2p/interface' -import { peerIdFromPrivateKey } from '@libp2p/peer-id' -import { multiaddr } from '@multiformats/multiaddr' -import type { PeerDiscovery, PeerDiscoveryEvents, PeerInfo } from '@libp2p/interface' - -interface MockDiscoveryInit { - discoveryDelay?: number -} - -/** - * Emits 'peer' events on discovery. - */ -export class MockDiscovery extends TypedEventEmitter implements PeerDiscovery { - public readonly options: MockDiscoveryInit - private _isRunning: boolean - private _timer: any - - constructor (init = {}) { - super() - - this.options = init - this._isRunning = false - } - - readonly [peerDiscoverySymbol] = this - - start (): void { - this._isRunning = true - this._discoverPeer() - } - - stop (): void { - clearTimeout(this._timer) - this._isRunning = false - } - - isStarted (): boolean { - return this._isRunning - } - - _discoverPeer (): void { - if (!this._isRunning) return - - generateKeyPair('Ed25519') - .then(key => { - const peerId = peerIdFromPrivateKey(key) - this._timer = setTimeout(() => { - this.safeDispatchEvent('peer', { - detail: { - id: peerId, - multiaddrs: [multiaddr('/ip4/127.0.0.1/tcp/8000')] - } - }) - }, this.options.discoveryDelay ?? 1000) - }) - .catch(() => {}) - } -} diff --git a/packages/interface-compliance-tests/src/transport/index.ts b/packages/interface-compliance-tests/src/transport/index.ts index eca91c1627..9b12d7bfa9 100644 --- a/packages/interface-compliance-tests/src/transport/index.ts +++ b/packages/interface-compliance-tests/src/transport/index.ts @@ -1,15 +1,18 @@ import { stop } from '@libp2p/interface' import { expect } from 'aegir/chai' import delay from 'delay' +import drain from 'it-drain' +import { pushable } from 'it-pushable' import pDefer from 'p-defer' import { pEvent } from 'p-event' +import pRetry from 'p-retry' import pWaitFor from 'p-wait-for' import { raceSignal } from 'race-signal' import { isValidTick } from '../is-valid-tick.js' import { createPeer, getTransportManager, getUpgrader, slowNetwork } from './utils.js' import type { TestSetup } from '../index.js' import type { Echo } from '@libp2p/echo' -import type { Connection, Libp2p } from '@libp2p/interface' +import type { Connection, Libp2p, Stream, StreamHandler } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { MultiaddrMatcher } from '@multiformats/multiaddr-matcher' import type { Libp2pInit } from 'libp2p' @@ -259,6 +262,249 @@ export default (common: TestSetup): void => { expect(output).to.equalBytes(input) } }) + + it('can close a stream for reading but send a large amount of data', async function () { + const timeout = 120_000 + this.timeout(timeout); + ({ dialer, listener, dialAddrs } = await getSetup(common)) + + if (listener == null) { + return this.skip() + } + + const protocol = '/send-data/1.0.0' + const chunkSize = 1024 + const bytes = chunkSize * 1024 * 10 + const deferred = pDefer() + + await listener.handle(protocol, ({ stream }) => { + Promise.resolve().then(async () => { + let read = 0 + + for await (const buf of stream.source) { + read += buf.byteLength + + if (read === bytes) { + deferred.resolve() + break + } + } + }) + .catch(err => { + deferred.reject(err) + stream.abort(err) + }) + }) + + const stream = await dialer.dialProtocol(dialAddrs[0], protocol) + + await stream.closeRead() + + await stream.sink((async function * () { + for (let i = 0; i < bytes; i += chunkSize) { + yield new Uint8Array(chunkSize) + } + })()) + + await stream.close() + + await deferred.promise + }) + + it('can close a stream for writing but receive a large amount of data', async function () { + const timeout = 120_000 + this.timeout(timeout); + ({ dialer, listener, dialAddrs } = await getSetup(common)) + + if (listener == null) { + return this.skip() + } + + const protocol = '/receive-data/1.0.0' + const chunkSize = 1024 + const bytes = chunkSize * 1024 * 10 + const deferred = pDefer() + + await listener.handle(protocol, ({ stream }) => { + Promise.resolve().then(async () => { + await stream.sink((async function * () { + for (let i = 0; i < bytes; i += chunkSize) { + yield new Uint8Array(chunkSize) + } + })()) + + await stream.close() + }) + .catch(err => { + deferred.reject(err) + stream.abort(err) + }) + }) + + const stream = await dialer.dialProtocol(dialAddrs[0], protocol) + + await stream.closeWrite() + + let read = 0 + + for await (const buf of stream.source) { + read += buf.byteLength + } + + expect(read).to.equal(bytes) + }) + + it('can close local stream for writing and reading while a remote stream is writing', async function () { + ({ dialer, listener, dialAddrs } = await getSetup(common)) + + if (listener == null) { + return this.skip() + } + + /** + * NodeA NodeB + * | <--- STOP_SENDING | + * | FIN ---> | + * | <--- FIN | + * | FIN_ACK ---> | + * | <--- FIN_ACK | + */ + + const getRemoteStream = pDefer() + const protocol = '/close-local-while-remote-writes/1.0.0' + + const streamHandler: StreamHandler = ({ stream }) => { + void Promise.resolve().then(async () => { + getRemoteStream.resolve(stream) + }) + } + + await listener.handle(protocol, (info) => { + streamHandler(info) + }, { + runOnLimitedConnection: true + }) + + const connection = await dialer.dial(dialAddrs[0]) + + // open a stream on the echo protocol + const stream = await connection.newStream(protocol, { + runOnLimitedConnection: true + }) + + // close the write end immediately + const p = stream.closeWrite() + + const remoteStream = await getRemoteStream.promise + // close the readable end of the remote stream + await remoteStream.closeRead() + + // keep the remote write end open, this should delay the FIN_ACK reply to the local stream + const remoteInputStream = pushable() + void remoteStream.sink(remoteInputStream) + + // wait for remote to receive local close-write + await pRetry(() => { + if (remoteStream.readStatus !== 'closed') { + throw new Error('Remote stream read status ' + remoteStream.readStatus) + } + }, { + minTimeout: 100 + }) + + // remote closes write + remoteInputStream.end() + + // wait to receive FIN_ACK + await p + + // wait for remote to notice closure + await pRetry(() => { + if (remoteStream.status !== 'closed') { + throw new Error('Remote stream not closed') + } + }) + + assertStreamClosed(stream) + assertStreamClosed(remoteStream) + }) + + it('can close local stream for writing and reading while a remote stream is writing using source/sink', async function () { + ({ dialer, listener, dialAddrs } = await getSetup(common)) + + if (listener == null) { + return this.skip() + } + + /** + * NodeA NodeB + * | FIN ---> | + * | <--- FIN | + * | FIN_ACK ---> | + * | <--- FIN_ACK | + */ + + const getRemoteStream = pDefer() + const protocol = '/close-local-while-remote-reads/1.0.0' + + const streamHandler: StreamHandler = ({ stream }) => { + void Promise.resolve().then(async () => { + getRemoteStream.resolve(stream) + }) + } + + await listener.handle(protocol, (info) => { + streamHandler(info) + }, { + runOnLimitedConnection: true + }) + + const connection = await dialer.dial(dialAddrs[0]) + + // open a stream on the echo protocol + const stream = await connection.newStream(protocol, { + runOnLimitedConnection: true + }) + + // keep the remote write end open, this should delay the FIN_ACK reply to the local stream + const p = stream.sink([]) + + const remoteStream = await getRemoteStream.promise + // close the readable end of the remote stream + await remoteStream.closeRead() + // readable end should finish + await drain(remoteStream.source) + + // wait for remote to receive local close-write + await pRetry(() => { + if (remoteStream.readStatus !== 'closed') { + throw new Error('Remote stream read status ' + remoteStream.readStatus) + } + }, { + minTimeout: 100 + }) + + // remote closes write + await remoteStream.sink([]) + + // wait to receive FIN_ACK + await p + + // close read end of stream + await stream.closeRead() + // readable end should finish + await drain(stream.source) + + // wait for remote to notice closure + await pRetry(() => { + if (remoteStream.status !== 'closed') { + throw new Error('Remote stream not closed') + } + }) + + assertStreamClosed(stream) + assertStreamClosed(remoteStream) + }) }) describe('events', () => { @@ -323,3 +569,13 @@ export default (common: TestSetup): void => { }) }) } + +function assertStreamClosed (stream: Stream): void { + expect(stream.status).to.equal('closed') + expect(stream.readStatus).to.equal('closed') + expect(stream.writeStatus).to.equal('closed') + + expect(stream.timeline.close).to.be.a('number') + expect(stream.timeline.closeRead).to.be.a('number') + expect(stream.timeline.closeWrite).to.be.a('number') +} diff --git a/packages/interface-compliance-tests/test/mocks/peer-discovery.spec.ts b/packages/interface-compliance-tests/test/mocks/peer-discovery.spec.ts deleted file mode 100644 index 33d136ab95..0000000000 --- a/packages/interface-compliance-tests/test/mocks/peer-discovery.spec.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { MockDiscovery } from '../../src/mocks/peer-discovery.js' -import tests from '../../src/peer-discovery/index.js' - -describe('mock peer discovery compliance tests', () => { - let intervalId: any - - tests({ - async setup () { - const mockDiscovery = new MockDiscovery({ - discoveryDelay: 1 - }) - - intervalId = setInterval(mockDiscovery._discoverPeer, 1000) - - return mockDiscovery - }, - async teardown () { - clearInterval(intervalId) - } - }) -}) diff --git a/packages/kad-dht/test/utils/test-dht.ts b/packages/kad-dht/test/utils/test-dht.ts index 349cfdb8ea..793afdecf6 100644 --- a/packages/kad-dht/test/utils/test-dht.ts +++ b/packages/kad-dht/test/utils/test-dht.ts @@ -33,7 +33,6 @@ export class TestDHT { privateKey, datastore: new MemoryDatastore(), registrar: mockRegistrar(), - // connectionGater: mockConnectionGater(), addressManager: stubInterface(), peerStore: stubInterface(), connectionManager: stubInterface(), diff --git a/packages/protocol-perf/test/run._ts b/packages/protocol-perf/test/run._ts new file mode 100644 index 0000000000..79dc00f4e8 --- /dev/null +++ b/packages/protocol-perf/test/run._ts @@ -0,0 +1,63 @@ +/* eslint-disable no-console */ +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { mplex } from '@libp2p/mplex' +import { plaintext } from '@libp2p/plaintext' +import { tcp } from '@libp2p/tcp' +import { createLibp2p, type Libp2p } from 'libp2p' +import { perf, type PerfOutput, type Perf } from '../src/index.js' + +const ONE_MEG = 1024 * 1024 +const DOWNLOAD_BYTES = ONE_MEG * 1024 * 5 + +async function createNode (): Promise> { + return createLibp2p({ + addresses: { + listen: [ + '/ip4/0.0.0.0/tcp/0' + ] + }, + transports: [ + tcp() + ], + connectionEncrypters: [ + noise(), plaintext() + ], + streamMuxers: [ + yamux(), mplex() + ], + services: { + perf: perf() + }, + connectionManager: { + minConnections: 0 + } + }) +} + +const libp2p1 = await createNode() +const libp2p2 = await createNode() + +let last: PerfOutput | undefined + +for await (const output of libp2p1.services.perf.measurePerformance(libp2p2.getMultiaddrs()[0], 0, DOWNLOAD_BYTES)) { + last = output + console.info(output) + + console.info((output.downloadBytes / (1024 * 1024)) / output.timeSeconds, 'MB/s') +} + +if (last?.type === 'final') { + console.info((last.downloadBytes / (1024 * 1024)) / last.timeSeconds, 'MB/s') +} + +await libp2p1.stop() +await libp2p2.stop() + +// plaintext/yamux - 1354 MB/s +// plaintext/mplex - 34478 MB/s +// noise/yamux - 60 MB/s +// noise/mplex - 62 MB/s + +// noise/yamux/native crypto - 282 MB/s +// noise/mplex/native crypto - 420 MB/s diff --git a/packages/transport-webrtc/.aegir.js b/packages/transport-webrtc/.aegir.js index a01c942193..315fec321e 100644 --- a/packages/transport-webrtc/.aegir.js +++ b/packages/transport-webrtc/.aegir.js @@ -2,54 +2,5 @@ export default { build: { bundlesizeMax: '117KB' - }, - test: { - before: async () => { - const { createLibp2p } = await import('libp2p') - const { circuitRelayServer } = await import('@libp2p/circuit-relay-v2') - const { webSockets } = await import('@libp2p/websockets') - const { noise } = await import('@chainsafe/libp2p-noise') - const { yamux } = await import('@chainsafe/libp2p-yamux') - - // start a relay node for use in the tests - const relay = await createLibp2p({ - addresses: { - listen: [ - '/ip4/127.0.0.1/tcp/0/ws' - ] - }, - transports: [ - webSockets() - ], - connectionEncrypters: [ - noise() - ], - streamMuxers: [ - yamux() - ], - services: { - relay: circuitRelayServer({ - reservations: { - maxReservations: Infinity - } - }) - }, - connectionManager: { - inboundConnectionThreshold: Infinity - } - }) - - const multiaddrs = relay.getMultiaddrs().map(ma => ma.toString()) - - return { - relay, - env: { - RELAY_MULTIADDR: multiaddrs[0] - } - } - }, - after: async (_, before) => { - await before.relay.stop() - } } } diff --git a/packages/transport-webrtc/package.json b/packages/transport-webrtc/package.json index b7203692ba..87af8a01b1 100644 --- a/packages/transport-webrtc/package.json +++ b/packages/transport-webrtc/package.json @@ -76,23 +76,14 @@ "uint8arrays": "^5.1.0" }, "devDependencies": { - "@chainsafe/libp2p-yamux": "^7.0.0", - "@libp2p/circuit-relay-v2": "^3.1.0", "@libp2p/crypto": "^5.0.6", - "@libp2p/identify": "^3.0.10", "@libp2p/interface-compliance-tests": "^6.1.8", "@libp2p/logger": "^5.1.3", - "@libp2p/websockets": "^9.0.11", "@types/sinon": "^17.0.3", "aegir": "^44.0.1", "delay": "^6.0.0", - "it-drain": "^3.0.7", "it-length": "^3.0.6", - "it-map": "^3.1.0", "it-pair": "^2.0.6", - "it-pipe": "^3.0.1", - "it-to-buffer": "^4.0.7", - "libp2p": "^2.2.1", "p-retry": "^6.2.0", "protons": "^7.5.0", "sinon": "^18.0.0", diff --git a/packages/transport-webrtc/test/basics.spec.ts b/packages/transport-webrtc/test/basics.spec.ts deleted file mode 100644 index 122d91efdc..0000000000 --- a/packages/transport-webrtc/test/basics.spec.ts +++ /dev/null @@ -1,354 +0,0 @@ -/* eslint-disable @typescript-eslint/no-unused-expressions */ - -import { noise } from '@chainsafe/libp2p-noise' -import { yamux } from '@chainsafe/libp2p-yamux' -import { circuitRelayTransport } from '@libp2p/circuit-relay-v2' -import { identify } from '@libp2p/identify' -import { webSockets } from '@libp2p/websockets' -import * as filter from '@libp2p/websockets/filters' -import { multiaddr } from '@multiformats/multiaddr' -import { WebRTC } from '@multiformats/multiaddr-matcher' -import { expect } from 'aegir/chai' -import drain from 'it-drain' -import map from 'it-map' -import { pipe } from 'it-pipe' -import { pushable } from 'it-pushable' -import toBuffer from 'it-to-buffer' -import { createLibp2p } from 'libp2p' -import pDefer from 'p-defer' -import pRetry from 'p-retry' -import { webRTC } from '../src/index.js' -import type { Libp2p, Connection, Stream, StreamHandler } from '@libp2p/interface' - -async function createNode (): Promise { - return createLibp2p({ - addresses: { - listen: [ - '/webrtc', - `${process.env.RELAY_MULTIADDR}/p2p-circuit` - ] - }, - transports: [ - webSockets({ - filter: filter.all - }), - circuitRelayTransport(), - webRTC() - ], - connectionEncrypters: [ - noise() - ], - streamMuxers: [ - yamux() - ], - connectionGater: { - denyDialMultiaddr: () => false - }, - services: { - identify: identify() - } - }) -} - -describe('basics', () => { - const echo = '/echo/1.0.0' - - let localNode: Libp2p - let remoteNode: Libp2p - let streamHandler: StreamHandler - - async function connectNodes (): Promise { - const remoteAddr = remoteNode.getMultiaddrs() - .filter(ma => WebRTC.exactMatch(ma)).pop() - - if (remoteAddr == null) { - throw new Error('Remote peer could not listen on relay') - } - - await remoteNode.handle(echo, (info) => { - streamHandler(info) - }, { - runOnLimitedConnection: true - }) - - const connection = await localNode.dial(remoteAddr) - - // disconnect both from relay - await localNode.hangUp(multiaddr(process.env.RELAY_MULTIADDR)) - await remoteNode.hangUp(multiaddr(process.env.RELAY_MULTIADDR)) - - return connection - } - - beforeEach(async () => { - streamHandler = ({ stream }) => { - void pipe( - stream, - stream - ) - } - - localNode = await createNode() - remoteNode = await createNode() - }) - - afterEach(async () => { - if (localNode != null) { - await localNode.stop() - } - - if (remoteNode != null) { - await remoteNode.stop() - } - }) - - it('can dial through a relay', async () => { - const connection = await connectNodes() - - // open a stream on the echo protocol - const stream = await connection.newStream(echo) - - // send and receive some data - const input = new Array(5).fill(0).map(() => new Uint8Array(10)) - const output = await pipe( - input, - stream, - (source) => map(source, list => list.subarray()), - async (source) => toBuffer(source) - ) - - // asset that we got the right data - expect(output).to.equalBytes(toBuffer(input)) - }) - - it('reports remote addresses correctly', async () => { - const initatorConnection = await connectNodes() - expect(initatorConnection.remoteAddr.toString()).to.equal(`${process.env.RELAY_MULTIADDR}/p2p-circuit/webrtc/p2p/${remoteNode.peerId}`) - - const receiverConnections = remoteNode.getConnections(localNode.peerId) - .filter(conn => conn.remoteAddr.toString() === `/webrtc/p2p/${localNode.peerId}`) - expect(receiverConnections).to.have.lengthOf(1) - }) - - it('can send a large file', async () => { - const connection = await connectNodes() - - // open a stream on the echo protocol - const stream = await connection.newStream(echo, { - runOnLimitedConnection: true - }) - - // send and receive some data - const input = new Array(5).fill(0).map(() => new Uint8Array(1024 * 1024)) - const output = await pipe( - input, - stream, - (source) => map(source, list => list.subarray()), - async (source) => toBuffer(source) - ) - - // asset that we got the right data - expect(output).to.equalBytes(toBuffer(input)) - }) - - it('can close local stream for reading but send a large file', async () => { - let output: Uint8Array = new Uint8Array(0) - const streamClosed = pDefer() - - streamHandler = ({ stream }) => { - void Promise.resolve().then(async () => { - output = await toBuffer(map(stream.source, (buf) => buf.subarray())) - await stream.close() - streamClosed.resolve() - }) - } - - const connection = await connectNodes() - - // open a stream on the echo protocol - const stream = await connection.newStream(echo, { - runOnLimitedConnection: true - }) - - // close for reading - await stream.closeRead() - - // send some data - const input = new Array(5).fill(0).map(() => new Uint8Array(1024 * 1024)) - - await stream.sink(input) - await stream.close() - - // wait for remote to receive all data - await streamClosed.promise - - // asset that we got the right data - expect(output).to.equalBytes(toBuffer(input)) - }) - - it('can close local stream for writing but receive a large file', async () => { - const input = new Array(5).fill(0).map(() => new Uint8Array(1024 * 1024)) - - streamHandler = ({ stream }) => { - void Promise.resolve().then(async () => { - // send some data - await stream.sink(input) - await stream.close() - }) - } - - const connection = await connectNodes() - - // open a stream on the echo protocol - const stream = await connection.newStream(echo, { - runOnLimitedConnection: true - }) - - // close for reading - await stream.closeWrite() - - // receive some data - const output = await toBuffer(map(stream.source, (buf) => buf.subarray())) - - await stream.close() - - // asset that we got the right data - expect(output).to.equalBytes(toBuffer(input)) - }) - - it('can close local stream for writing and reading while a remote stream is writing', async () => { - /** - * NodeA NodeB - * | <--- STOP_SENDING | - * | FIN ---> | - * | <--- FIN | - * | FIN_ACK ---> | - * | <--- FIN_ACK | - */ - - const getRemoteStream = pDefer() - - streamHandler = ({ stream }) => { - void Promise.resolve().then(async () => { - getRemoteStream.resolve(stream) - }) - } - - const connection = await connectNodes() - - // open a stream on the echo protocol - const stream = await connection.newStream(echo, { - runOnLimitedConnection: true - }) - - // close the write end immediately - const p = stream.closeWrite() - - const remoteStream = await getRemoteStream.promise - // close the readable end of the remote stream - await remoteStream.closeRead() - - // keep the remote write end open, this should delay the FIN_ACK reply to the local stream - const remoteInputStream = pushable() - void remoteStream.sink(remoteInputStream) - - // wait for remote to receive local close-write - await pRetry(() => { - if (remoteStream.readStatus !== 'closed') { - throw new Error('Remote stream read status ' + remoteStream.readStatus) - } - }, { - minTimeout: 100 - }) - - // remote closes write - remoteInputStream.end() - - // wait to receive FIN_ACK - await p - - // wait for remote to notice closure - await pRetry(() => { - if (remoteStream.status !== 'closed') { - throw new Error('Remote stream not closed') - } - }) - - assertStreamClosed(stream) - assertStreamClosed(remoteStream) - }) - - it('can close local stream for writing and reading while a remote stream is writing using source/sink', async () => { - /** - * NodeA NodeB - * | FIN ---> | - * | <--- FIN | - * | FIN_ACK ---> | - * | <--- FIN_ACK | - */ - - const getRemoteStream = pDefer() - - streamHandler = ({ stream }) => { - void Promise.resolve().then(async () => { - getRemoteStream.resolve(stream) - }) - } - - const connection = await connectNodes() - - // open a stream on the echo protocol - const stream = await connection.newStream(echo, { - runOnLimitedConnection: true - }) - - // keep the remote write end open, this should delay the FIN_ACK reply to the local stream - const p = stream.sink([]) - - const remoteStream = await getRemoteStream.promise - // close the readable end of the remote stream - await remoteStream.closeRead() - // readable end should finish - await drain(remoteStream.source) - - // wait for remote to receive local close-write - await pRetry(() => { - if (remoteStream.readStatus !== 'closed') { - throw new Error('Remote stream read status ' + remoteStream.readStatus) - } - }, { - minTimeout: 100 - }) - - // remote closes write - await remoteStream.sink([]) - - // wait to receive FIN_ACK - await p - - // close read end of stream - await stream.closeRead() - // readable end should finish - await drain(stream.source) - - // wait for remote to notice closure - await pRetry(() => { - if (remoteStream.status !== 'closed') { - throw new Error('Remote stream not closed') - } - }) - - assertStreamClosed(stream) - assertStreamClosed(remoteStream) - }) -}) - -function assertStreamClosed (stream: Stream): void { - expect(stream.status).to.equal('closed') - expect(stream.readStatus).to.equal('closed') - expect(stream.writeStatus).to.equal('closed') - - expect(stream.timeline.close).to.be.a('number') - expect(stream.timeline.closeRead).to.be.a('number') - expect(stream.timeline.closeWrite).to.be.a('number') -} diff --git a/packages/transport-webrtc/test/peer.spec.ts b/packages/transport-webrtc/test/peer.spec.ts index 5f455ce8e5..aa951d34d1 100644 --- a/packages/transport-webrtc/test/peer.spec.ts +++ b/packages/transport-webrtc/test/peer.spec.ts @@ -1,5 +1,5 @@ import { generateKeyPair } from '@libp2p/crypto/keys' -import { mockRegistrar, mockUpgrader, streamPair } from '@libp2p/interface-compliance-tests/mocks' +import { streamPair } from '@libp2p/interface-compliance-tests/mocks' import { defaultLogger, logger } from '@libp2p/logger' import { peerIdFromPrivateKey } from '@libp2p/peer-id' import { multiaddr, type Multiaddr } from '@multiformats/multiaddr' @@ -16,8 +16,8 @@ import { Message } from '../src/private-to-private/pb/message.js' import { handleIncomingStream } from '../src/private-to-private/signaling-stream-handler.js' import { SIGNALING_PROTO_ID, WebRTCTransport, splitAddr } from '../src/private-to-private/transport.js' import { RTCPeerConnection, RTCSessionDescription } from '../src/webrtc/index.js' -import type { Logger, Connection, Stream, ComponentLogger } from '@libp2p/interface' -import type { ConnectionManager, TransportManager } from '@libp2p/interface-internal' +import type { Logger, Connection, Stream, ComponentLogger, Upgrader } from '@libp2p/interface' +import type { ConnectionManager, Registrar, TransportManager } from '@libp2p/interface-internal' const browser = detect() @@ -250,8 +250,8 @@ describe('webrtc filter', () => { transportManager: stubInterface(), connectionManager: stubInterface(), peerId: Sinon.stub() as any, - registrar: mockRegistrar(), - upgrader: mockUpgrader({}), + registrar: stubInterface(), + upgrader: stubInterface(), logger: defaultLogger() }) diff --git a/packages/transport-webrtc/test/transport.spec.ts b/packages/transport-webrtc/test/transport.spec.ts index 6a977d77ae..bcde79bf60 100644 --- a/packages/transport-webrtc/test/transport.spec.ts +++ b/packages/transport-webrtc/test/transport.spec.ts @@ -1,34 +1,26 @@ /* eslint-disable @typescript-eslint/no-floating-promises */ import { generateKeyPair } from '@libp2p/crypto/keys' -import { type CreateListenerOptions, transportSymbol, type Metrics } from '@libp2p/interface' -import { mockMetrics, mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' +import { transportSymbol } from '@libp2p/interface' import { defaultLogger } from '@libp2p/logger' import { peerIdFromPrivateKey } from '@libp2p/peer-id' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' +import { stubInterface } from 'sinon-ts' import { UnimplementedError } from '../src/error.js' import { WebRTCDirectTransport, type WebRTCDirectTransportComponents } from '../src/private-to-public/transport.js' import { expectError } from './util.js' - -function ignoredDialOption (): CreateListenerOptions { - const upgrader = mockUpgrader({}) - return { upgrader } -} +import type { Upgrader } from '@libp2p/interface' describe('WebRTCDirect Transport', () => { - let metrics: Metrics let components: WebRTCDirectTransportComponents before(async () => { - metrics = mockMetrics()() - const privateKey = await generateKeyPair('Ed25519') components = { peerId: peerIdFromPrivateKey(privateKey), privateKey, - metrics, logger: defaultLogger() } }) @@ -41,16 +33,19 @@ describe('WebRTCDirect Transport', () => { it('can dial', async () => { const ma = multiaddr('/ip4/1.2.3.4/udp/1234/webrtc-direct/certhash/uEiAUqV7kzvM1wI5DYDc1RbcekYVmXli_Qprlw3IkiEg6tQ/p2p/12D3KooWGDMwwqrpcYKpKCgxuKT2NfqPqa94QnkoBBpqvCaiCzWd') const transport = new WebRTCDirectTransport(components) - const options = ignoredDialOption() // don't await as this isn't an e2e test - transport.dial(ma, options) + transport.dial(ma, { + upgrader: stubInterface() + }) }) it('createListner throws', () => { const t = new WebRTCDirectTransport(components) try { - t.createListener(ignoredDialOption()) + t.createListener({ + upgrader: stubInterface() + }) expect('Should have thrown').to.equal('but did not') } catch (e) { expect(e).to.be.instanceOf(UnimplementedError) @@ -92,7 +87,9 @@ describe('WebRTCDirect Transport', () => { const transport = new WebRTCDirectTransport(components) try { - await transport.dial(ma, ignoredDialOption()) + await transport.dial(ma, { + upgrader: stubInterface() + }) } catch (error: any) { const expected = 'WebRTC transport error: There was a problem with the Multiaddr which was passed in: we need to have the remote\'s PeerId' expectError(error, expected)