Skip to content

Commit

Permalink
fix: handle stream closing gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Nov 15, 2024
1 parent a1b0132 commit ed074bb
Showing 1 changed file with 22 additions and 16 deletions.
38 changes: 22 additions & 16 deletions packages/protocol-ping/src/ping.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { randomBytes } from '@libp2p/crypto'
import { ProtocolError, TimeoutError } from '@libp2p/interface'
import { ProtocolError, TimeoutError, setMaxListeners } from '@libp2p/interface'
import { byteStream } from 'it-byte-stream'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import { PROTOCOL_PREFIX, PROTOCOL_NAME, PING_LENGTH, PROTOCOL_VERSION, TIMEOUT, MAX_INBOUND_STREAMS, MAX_OUTBOUND_STREAMS } from './constants.js'
Expand Down Expand Up @@ -60,44 +60,50 @@ export class PingService implements Startable, PingServiceInterface {
const { stream } = data
const start = Date.now()
const bytes = byteStream(stream)
let pinged = false

Promise.resolve().then(async () => {
while (true) {
const signal = AbortSignal.timeout(this.timeout)
setMaxListeners(Infinity, signal)
signal.addEventListener('abort', () => {
stream?.abort(new TimeoutError('ping timeout'))
})

const buf = await bytes.read(PING_LENGTH, {
signal
}).catch((err: Error) => {
if (stream.readStatus === 'ready') throw err
return null
})

if (buf === null) break

await bytes.write(buf, {
signal
})
}
// close the stream
if (stream.status === 'open') {
const signal = AbortSignal.timeout(this.timeout)
signal.addEventListener('abort', () => {
stream?.abort(new TimeoutError('close timeout'))
})

await stream.close({ signal })
pinged = true
}
})
.catch(err => {
this.log.error('incoming ping from %p failed with error', data.connection.remotePeer, err)
// ignore the error if we've processed at least one ping, the remote
// closed the stream and we handled or are handling the close cleanly
if (pinged && err.name === 'UnexpectedEOFError' && stream.readStatus !== 'ready') {
return
}

Check warning on line 88 in packages/protocol-ping/src/ping.ts

View check run for this annotation

Codecov / codecov/patch

packages/protocol-ping/src/ping.ts#L87-L88

Added lines #L87 - L88 were not covered by tests

this.log.error('incoming ping from %p failed with error - %e', data.connection.remotePeer, err)
stream?.abort(err)
})
.finally(() => {
const ms = Date.now() - start
this.log('incoming ping from %p complete in %dms', data.connection.remotePeer, ms)

const signal = AbortSignal.timeout(this.timeout)
setMaxListeners(Infinity, signal)

stream.close({
signal
})
.catch(err => {
this.log.error('error closing ping stream from %p - %e', data.connection.remotePeer, err)
stream?.abort(err)

Check warning on line 105 in packages/protocol-ping/src/ping.ts

View check run for this annotation

Codecov / codecov/patch

packages/protocol-ping/src/ping.ts#L104-L105

Added lines #L104 - L105 were not covered by tests
})
})
}

Expand Down

0 comments on commit ed074bb

Please sign in to comment.