-
Notifications
You must be signed in to change notification settings - Fork 446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cleanly closing streams #1793
Comments
Refactors streams from duplex async iterables: ```js { source: Duplex<AsyncGenerator<Uint8Array, void, unknown>, Source<Uint8Array | Uint8ArrayList>, Promise<void> sink: (Source<Uint8Array | Uint8ArrayList>) => Promise<void> } ``` to `ReadableWriteablePair<Uint8Array>`s: ```js { readable: ReadableStream<Uint8Array> writable: WritableStream<Uint8Array> } ``` Since the close methods for web streams are asynchronous, this lets us close streams cleanly - that is, wait for any buffered data to be sent/consumed before closing the stream. We still need to be able abort a stream in an emergency, so streams have the following methods for graceful closing: ```js stream.readable.cancel(reason?: any): Promise<void> stream.writable.close(): Promise<void> // or stream.close(): Promise<void> ``` ..and for emergency closing: ```js stream.abort(err: Error): void ``` Connections and multiaddr connections have the same `close`/`abort` semantics, but are still Duplexes since making them web streams would mean we need to convert things like node streams (for tcp) to web streams which would just make things slower. Transports such as WebTransport and WebRTC already deal in web streams when multiplexing so these no longer need to be converted to Duplex streams so it's win-win. Fixes #1793
- Refactors `.close`, `closeRead` and `.closeWrite` methods on the `Stream` interface to be async - The `Connection` interface now has `.close` and `.abort` methods - `.close` on `Stream`s and `Connection`s wait for the internal message queues to empty before closing - `.abort` on `Stream`s and `Connection`s close the underlying stream immediately and discards any unsent data - `@chainsafe/libp2p-yamux` now uses the `AbstractStream` class from `@libp2p/interface` the same as `@libp2p/mplex` and `@libp2p/webrtc` Follow-up PRs will be necessary to `@chainsafe/libp2p-yamux`, `@chainsafe/libp2p-gossipsub` and `@chainsafe/libp2p-noise` though they will not block the release as their code is temporarily added to this repo to let CI run. Fixes #1793 Fixes #656 BREAKING CHANGE: the `.close`, `closeRead` and `closeWrite` methods on the `Stream` interface are now asynchronous
- Refactors `.close`, `closeRead` and `.closeWrite` methods on the `Stream` interface to be async - The `Connection` interface now has `.close` and `.abort` methods - `.close` on `Stream`s and `Connection`s wait for the internal message queues to empty before closing - `.abort` on `Stream`s and `Connection`s close the underlying stream immediately and discards any unsent data - `@chainsafe/libp2p-yamux` now uses the `AbstractStream` class from `@libp2p/interface` the same as `@libp2p/mplex` and `@libp2p/webrtc` Follow-up PRs will be necessary to `@chainsafe/libp2p-yamux`, `@chainsafe/libp2p-gossipsub` and `@chainsafe/libp2p-noise` though they will not block the release as their code is temporarily added to this repo to let CI run. Fixes #1793 Fixes #656 BREAKING CHANGE: the `.close`, `closeRead` and `closeWrite` methods on the `Stream` interface are now asynchronous
- Refactors `.close`, `closeRead` and `.closeWrite` methods on the `Stream` interface to be async - The `Connection` interface now has `.close` and `.abort` methods - `.close` on `Stream`s and `Connection`s wait for the internal message queues to empty before closing - `.abort` on `Stream`s and `Connection`s close the underlying stream immediately and discards any unsent data - `@chainsafe/libp2p-yamux` now uses the `AbstractStream` class from `@libp2p/interface` the same as `@libp2p/mplex` and `@libp2p/webrtc` Follow-up PRs will be necessary to `@chainsafe/libp2p-yamux`, `@chainsafe/libp2p-gossipsub` and `@chainsafe/libp2p-noise` though they will not block the release as their code is temporarily added to this repo to let CI run. Fixes #1793 Fixes #656 BREAKING CHANGE: the `.close`, `closeRead` and `closeWrite` methods on the `Stream` interface are now asynchronous
- Refactors `.close`, `closeRead` and `.closeWrite` methods on the `Stream` interface to be async - The `Connection` interface now has `.close` and `.abort` methods - `.close` on `Stream`s and `Connection`s wait for the internal message queues to empty before closing - `.abort` on `Stream`s and `Connection`s close the underlying stream immediately and discards any unsent data - `@chainsafe/libp2p-yamux` now uses the `AbstractStream` class from `@libp2p/interface` the same as `@libp2p/mplex` and `@libp2p/webrtc` Follow-up PRs will be necessary to `@chainsafe/libp2p-yamux`, `@chainsafe/libp2p-gossipsub` and `@chainsafe/libp2p-noise` though they will not block the release as their code is temporarily added to this repo to let CI run. Fixes #1793 Fixes #656 BREAKING CHANGE: the `.close`, `closeRead` and `closeWrite` methods on the `Stream` interface are now asynchronous
- Refactors `.close`, `closeRead` and `.closeWrite` methods on the `Stream` interface to be async - The `Connection` interface now has `.close` and `.abort` methods - `.close` on `Stream`s and `Connection`s wait for the internal message queues to empty before closing - `.abort` on `Stream`s and `Connection`s close the underlying stream immediately and discards any unsent data - `@chainsafe/libp2p-yamux` now uses the `AbstractStream` class from `@libp2p/interface` the same as `@libp2p/mplex` and `@libp2p/webrtc` Follow-up PRs will be necessary to `@chainsafe/libp2p-yamux`, `@chainsafe/libp2p-gossipsub` and `@chainsafe/libp2p-noise` though they will not block the release as their code is temporarily added to this repo to let CI run. Fixes #1793 Fixes #656 BREAKING CHANGE: the `.close`, `closeRead` and `closeWrite` methods on the `Stream` interface are now asynchronous
- Refactors `.close`, `closeRead` and `.closeWrite` methods on the `Stream` interface to be async - The `Connection` interface now has `.close` and `.abort` methods - `.close` on `Stream`s and `Connection`s wait for the internal message queues to empty before closing - `.abort` on `Stream`s and `Connection`s close the underlying stream immediately and discards any unsent data - `@chainsafe/libp2p-yamux` now uses the `AbstractStream` class from `@libp2p/interface` the same as `@libp2p/mplex` and `@libp2p/webrtc` Follow-up PRs will be necessary to `@chainsafe/libp2p-yamux`, `@chainsafe/libp2p-gossipsub` and `@chainsafe/libp2p-noise` though they will not block the release as their code is temporarily added to this repo to let CI run. Fixes #1793 Fixes #656 BREAKING CHANGE: the `.close`, `closeRead` and `closeWrite` methods on the `Stream` interface are now asynchronous
- Refactors `.close`, `closeRead` and `.closeWrite` methods on the `Stream` interface to be async - The `Connection` interface now has `.close` and `.abort` methods - `.close` on `Stream`s and `Connection`s wait for the internal message queues to empty before closing - `.abort` on `Stream`s and `Connection`s close the underlying stream immediately and discards any unsent data - `.reset` is removed from the `Stream` interface - instead call `.abort(err)` to signal a local error - `.reset` is still present on the `AbstractStream` class - the muxer implementation should call this to signal a remote error - `@chainsafe/libp2p-yamux` now uses the `AbstractStream` class from `@libp2p/interface` the same as `@libp2p/mplex` and `@libp2p/webrtc` - all the logic around the \*checks notes* 17 different ways to close a stream is contained there Follow-up PRs will be necessary to `@chainsafe/libp2p-yamux`, `@chainsafe/libp2p-gossipsub` and `@chainsafe/libp2p-noise` though they will not block the release as their code is temporarily added to this repo to let CI run. Fixes #1793 Fixes #656 BREAKING CHANGE: the `.close`, `closeRead` and `closeWrite` methods on the `Stream` interface are now asynchronous
I am trying to do something related to this. The a protocol handler I want to say Goodbye before stopping. async beforeStop() {
// say goodbye
... this.peers.send GOODBYE
// wait for goodbye to be sent
for (const peer of this.peers) {
await peer[1].outboundStream?.onEmpty()
}
// await delay(3000) <-- Without this timeout receivers will never get my Goodbye
} I am not sure though if this is perhaps related to that the connection, hence the inboundStreams are closed before the messages arrives. Is there any strategy for "cleanly" closing inbound streams (and/or connections)? My gut feeling says that this is not possible since you can not predict what messages that are in flight over a connection. |
Could you share a link to a full example please? There are lots of variables here - since this issue was closed lib2p takes pains to ensure that a given buffer has been handed off to the underlying transport before the stream write resolves, but some transports are fundamentally unreliable so it's hard to say for sure what's happening here without more information. |
I think this is a perfectly reasonable interaction:
But the thing is it doesn't work, the following is thrown on the remote:
Behind the scenes
it-pb-stream
has passed anit-pushable
tostream.sink
,pb.writePB
pushes some bytes to the pushable but becausestream.sink
consumes the passed stream usingfor await..of
, the bytes won't be read until the microtask queue is processed since everything is a promise.This never happens because
stream.close
is synchronous and causes theCLOSE
message to be sent before theDATA
message since it's queued for sending in the current macrotask.We can see this if we enable trace logging:
If we move the
stream.close()
to the microtask queue it gets queued behind the task that sends theDATA
message and everything works:.close
should be a graceful close and ensure that all data is sent before closing the stream (subject to a timeout). If we need to end the stream abruptly we have.abort
for that.The problem is according to the interface
.close
is synchronous - we can't block the thread while this happens so we'll need to convert.close
,.closeRead
and.closeWrite
to asynchronous to do this (.abort
should remain synchronous as it's really for error handling).This is a breaking change so will cause
[email protected]
to be released. I do wonder if this is a good time to make our streams more "webby" and have{ readable: ReadbleStream, writeable: WriteableStream }
instead of{ source: AsyncGenerator, sink: (source: AsyncGenerator) => Promise<void> }
instead?Either way we should get #1792 in first so we can deliver this as the minimum number of PRs and not several across here, libp2p/js-libp2p-interfaces, mplex and yamux.
todo:
other components below above layers
The text was updated successfully, but these errors were encountered: