Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanly closing streams #1793

Closed
3 tasks
achingbrain opened this issue Jun 5, 2023 · 3 comments · Fixed by #1864 · May be fixed by #1847
Closed
3 tasks

Cleanly closing streams #1793

achingbrain opened this issue Jun 5, 2023 · 3 comments · Fixed by #1864 · May be fixed by #1847
Labels
need/triage Needs initial labeling and prioritization

Comments

@achingbrain
Copy link
Member

achingbrain commented Jun 5, 2023

I think this is a perfectly reasonable interaction:

import { pbStream } from 'it-pb-stream'

const stream = await node.dialProtocol(remotePeer, '/my/protocol/1.0.0')
const pb = pbStream(stream)
pb.writePB({ hello: 'world' }, Message)
stream.close()

But the thing is it doesn't work, the following is thrown on the remote:

Error: stream ended before 1 bytes became available
  at file:///Users/alex/Documents/Workspaces/libp2p/js-libp2p/node_modules/it-reader/src/index.ts:70:9
  at Object.next (file:///Users/alex/Documents/Workspaces/libp2p/js-libp2p/node_modules/@libp2p/multistream-select/src/multistream.ts:62:23)
... more stack trace here

Behind the scenes it-pb-stream has passed an it-pushable to stream.sink, pb.writePB pushes some bytes to the pushable but because stream.sink consumes the passed stream using for await..of, the bytes won't be read until the microtask queue is processed since everything is a promise.

This never happens because stream.close is synchronous and causes the CLOSE message to be sent before the DATA message since it's queued for sending in the current macrotask.

We can see this if we enable trace logging:

  libp2p:mplex new initiator stream 0 +0ms
  libp2p:mplex:trace initiator stream 0 send { id: 0, type: 'NEW_STREAM (0)', data: '0' } +0ms
-- local pb.writePB(...) --
-- local stream.close() --
  libp2p:stream:trace outbound stream i0 close +0ms
  libp2p:stream:trace outbound stream i0 closeRead +0ms
  libp2p:stream:trace outbound stream i0 source end - err: undefined +0ms
  libp2p:stream:trace outbound stream i0 closeWrite +0ms
-- local sends close message  --
  libp2p:mplex:trace initiator stream 0 send { id: 0, type: 'CLOSE_INITIATOR (4)' } +2ms
  libp2p:stream:trace outbound stream i0 sink end - err: undefined +0ms
  libp2p:mplex initiator stream with id 0 and protocol undefined ended +3ms
-- local sends data message  --
  libp2p:mplex:trace initiator stream 0 send {
  id: 0,
  type: 'MESSAGE_INITIATOR (2)',
  data: '110a0b68656c6c6f20776f726c6410051801'
} +1ms
-- remote node notices incoming stream --
  libp2p:mplex:trace incoming message { id: 0, type: 'NEW_STREAM (0)', data: '0' } +1ms
  libp2p:mplex new receiver stream 0 +2ms
-- remote stream receives stream close --
  libp2p:mplex:trace incoming message { id: 0, type: 'CLOSE_INITIATOR (4)' } +0ms
  libp2p:stream:trace inbound stream r0 closeRead +2ms
  libp2p:stream:trace inbound stream r0 source end - err: undefined +0ms
-- remote stream receives data message but stream is already closed --
  libp2p:mplex:trace incoming message {
  id: 0,
  type: 'MESSAGE_INITIATOR (2)',
  data: '110a0b68656c6c6f20776f726c6410051801'
} +0ms

If we move the stream.close() to the microtask queue it gets queued behind the task that sends the DATA message and everything works:

pb.writePB({ hello: 'world' }, Message)
await delay(0)
stream.close()

.close should be a graceful close and ensure that all data is sent before closing the stream (subject to a timeout). If we need to end the stream abruptly we have .abort for that.

The problem is according to the interface .close is synchronous - we can't block the thread while this happens so we'll need to convert .close, .closeRead and .closeWrite to asynchronous to do this (.abort should remain synchronous as it's really for error handling).

This is a breaking change so will cause [email protected] to be released. I do wonder if this is a good time to make our streams more "webby" and have { readable: ReadbleStream, writeable: WriteableStream } instead of { source: AsyncGenerator, sink: (source: AsyncGenerator) => Promise<void> } instead?

Either way we should get #1792 in first so we can deliver this as the minimum number of PRs and not several across here, libp2p/js-libp2p-interfaces, mplex and yamux.

  • interfaces
  • mplex
  • yamux

todo:
other components below above layers

@achingbrain achingbrain added the need/triage Needs initial labeling and prioritization label Jun 5, 2023
@achingbrain
Copy link
Member Author

cc @maschad @wemeetagain @MarcoPolo

@maschad maschad added this to js-libp2p Jun 5, 2023
@maschad maschad moved this to 🥞Weekly Candidates/Discuss🎙 in js-libp2p Jun 5, 2023
achingbrain added a commit that referenced this issue Jun 23, 2023
Refactors streams from duplex async iterables:

```js
{
  source: Duplex<AsyncGenerator<Uint8Array, void, unknown>, Source<Uint8Array | Uint8ArrayList>, Promise<void>
  sink: (Source<Uint8Array | Uint8ArrayList>) => Promise<void>
}
```

to `ReadableWriteablePair<Uint8Array>`s:

```js
{
  readable: ReadableStream<Uint8Array>
  writable: WritableStream<Uint8Array>
}
```

Since the close methods for web streams are asynchronous, this lets
us close streams cleanly - that is, wait for any buffered data to
be sent/consumed before closing the stream.

We still need to be able abort a stream in an emergency, so streams
have the following methods for graceful closing:

```js
stream.readable.cancel(reason?: any): Promise<void>
stream.writable.close(): Promise<void>

// or

stream.close(): Promise<void>
```

..and for emergency closing:

```js
stream.abort(err: Error): void
```

Connections and multiaddr connections have the same `close`/`abort`
semantics, but are still Duplexes since making them web streams
would mean we need to convert things like node streams (for tcp) to
web streams which would just make things slower.

Transports such as WebTransport and WebRTC already deal in web
streams when multiplexing so these no longer need to be converted to
Duplex streams so it's win-win.

Fixes #1793
achingbrain added a commit that referenced this issue Jul 3, 2023
- Refactors `.close`, `closeRead` and `.closeWrite` methods on the `Stream` interface to be async
- The `Connection` interface now has `.close` and `.abort` methods
- `.close` on `Stream`s and `Connection`s wait for the internal message queues to empty before closing
- `.abort` on `Stream`s and `Connection`s close the underlying stream immediately and discards any unsent data
- `@chainsafe/libp2p-yamux` now uses the `AbstractStream` class from `@libp2p/interface` the same as `@libp2p/mplex` and
`@libp2p/webrtc`

Follow-up PRs will be necessary to `@chainsafe/libp2p-yamux`, `@chainsafe/libp2p-gossipsub` and `@chainsafe/libp2p-noise`
though they will not block the release as their code is temporarily added to this repo to let CI run.

Fixes #1793
Fixes #656

BREAKING CHANGE: the `.close`, `closeRead` and `closeWrite` methods on the `Stream` interface are now asynchronous
achingbrain added a commit that referenced this issue Jul 3, 2023
- Refactors `.close`, `closeRead` and `.closeWrite` methods on the `Stream` interface to be async
- The `Connection` interface now has `.close` and `.abort` methods
- `.close` on `Stream`s and `Connection`s wait for the internal message queues to empty before closing
- `.abort` on `Stream`s and `Connection`s close the underlying stream immediately and discards any unsent data
- `@chainsafe/libp2p-yamux` now uses the `AbstractStream` class from `@libp2p/interface` the same as `@libp2p/mplex` and
`@libp2p/webrtc`

Follow-up PRs will be necessary to `@chainsafe/libp2p-yamux`, `@chainsafe/libp2p-gossipsub` and `@chainsafe/libp2p-noise`
though they will not block the release as their code is temporarily added to this repo to let CI run.

Fixes #1793
Fixes #656

BREAKING CHANGE: the `.close`, `closeRead` and `closeWrite` methods on the `Stream` interface are now asynchronous
achingbrain added a commit that referenced this issue Jul 20, 2023
- Refactors `.close`, `closeRead` and `.closeWrite` methods on the `Stream` interface to be async
- The `Connection` interface now has `.close` and `.abort` methods
- `.close` on `Stream`s and `Connection`s wait for the internal message queues to empty before closing
- `.abort` on `Stream`s and `Connection`s close the underlying stream immediately and discards any unsent data
- `@chainsafe/libp2p-yamux` now uses the `AbstractStream` class from `@libp2p/interface` the same as `@libp2p/mplex` and
`@libp2p/webrtc`

Follow-up PRs will be necessary to `@chainsafe/libp2p-yamux`, `@chainsafe/libp2p-gossipsub` and `@chainsafe/libp2p-noise`
though they will not block the release as their code is temporarily added to this repo to let CI run.

Fixes #1793
Fixes #656

BREAKING CHANGE: the `.close`, `closeRead` and `closeWrite` methods on the `Stream` interface are now asynchronous
achingbrain added a commit that referenced this issue Jul 20, 2023
- Refactors `.close`, `closeRead` and `.closeWrite` methods on the `Stream` interface to be async
- The `Connection` interface now has `.close` and `.abort` methods
- `.close` on `Stream`s and `Connection`s wait for the internal message queues to empty before closing
- `.abort` on `Stream`s and `Connection`s close the underlying stream immediately and discards any unsent data
- `@chainsafe/libp2p-yamux` now uses the `AbstractStream` class from `@libp2p/interface` the same as `@libp2p/mplex` and
`@libp2p/webrtc`

Follow-up PRs will be necessary to `@chainsafe/libp2p-yamux`, `@chainsafe/libp2p-gossipsub` and `@chainsafe/libp2p-noise`
though they will not block the release as their code is temporarily added to this repo to let CI run.

Fixes #1793
Fixes #656

BREAKING CHANGE: the `.close`, `closeRead` and `closeWrite` methods on the `Stream` interface are now asynchronous
achingbrain added a commit that referenced this issue Jul 20, 2023
- Refactors `.close`, `closeRead` and `.closeWrite` methods on the `Stream` interface to be async
- The `Connection` interface now has `.close` and `.abort` methods
- `.close` on `Stream`s and `Connection`s wait for the internal message queues to empty before closing
- `.abort` on `Stream`s and `Connection`s close the underlying stream immediately and discards any unsent data
- `@chainsafe/libp2p-yamux` now uses the `AbstractStream` class from `@libp2p/interface` the same as `@libp2p/mplex` and
`@libp2p/webrtc`

Follow-up PRs will be necessary to `@chainsafe/libp2p-yamux`, `@chainsafe/libp2p-gossipsub` and `@chainsafe/libp2p-noise`
though they will not block the release as their code is temporarily added to this repo to let CI run.

Fixes #1793
Fixes #656

BREAKING CHANGE: the `.close`, `closeRead` and `closeWrite` methods on the `Stream` interface are now asynchronous
achingbrain added a commit that referenced this issue Jul 20, 2023
- Refactors `.close`, `closeRead` and `.closeWrite` methods on the `Stream` interface to be async
- The `Connection` interface now has `.close` and `.abort` methods
- `.close` on `Stream`s and `Connection`s wait for the internal message queues to empty before closing
- `.abort` on `Stream`s and `Connection`s close the underlying stream immediately and discards any unsent data
- `@chainsafe/libp2p-yamux` now uses the `AbstractStream` class from `@libp2p/interface` the same as `@libp2p/mplex` and
`@libp2p/webrtc`

Follow-up PRs will be necessary to `@chainsafe/libp2p-yamux`, `@chainsafe/libp2p-gossipsub` and `@chainsafe/libp2p-noise`
though they will not block the release as their code is temporarily added to this repo to let CI run.

Fixes #1793
Fixes #656

BREAKING CHANGE: the `.close`, `closeRead` and `closeWrite` methods on the `Stream` interface are now asynchronous
achingbrain added a commit that referenced this issue Jul 20, 2023
- Refactors `.close`, `closeRead` and `.closeWrite` methods on the `Stream` interface to be async
- The `Connection` interface now has `.close` and `.abort` methods
- `.close` on `Stream`s and `Connection`s wait for the internal message queues to empty before closing
- `.abort` on `Stream`s and `Connection`s close the underlying stream immediately and discards any unsent data
- `.reset` is removed from the `Stream` interface - instead call `.abort(err)` to signal a local error
- `.reset` is still present on the `AbstractStream` class - the muxer implementation should call this to signal a remote error
- `@chainsafe/libp2p-yamux` now uses the `AbstractStream` class from `@libp2p/interface` the same as `@libp2p/mplex` and `@libp2p/webrtc` - all the logic around the \*checks notes* 17 different ways to close a stream is contained there

Follow-up PRs will be necessary to `@chainsafe/libp2p-yamux`, `@chainsafe/libp2p-gossipsub` and `@chainsafe/libp2p-noise` though they will not block the release as their code is temporarily added to this repo to let CI run.

Fixes #1793
Fixes #656

BREAKING CHANGE: the `.close`, `closeRead` and `closeWrite` methods on the `Stream` interface are now asynchronous
@github-project-automation github-project-automation bot moved this from 🥞Weekly Candidates/Discuss🎙 to 🎉Done in js-libp2p Jul 20, 2023
@marcus-pousette
Copy link
Contributor

marcus-pousette commented Jan 16, 2024

I am trying to do something related to this. The a protocol handler I want to say Goodbye before stopping.
This message only seem to be delivered to peers if I add an additional delay after the writing has been performed.

	async beforeStop() {
		// say goodbye
		
		... this.peers.send GOODBYE

		// wait for goodbye to be sent
		for (const peer of this.peers) {
			await peer[1].outboundStream?.onEmpty()

		}
	
		 // await delay(3000) <-- Without this timeout receivers will never get my Goodbye 
	}

I am not sure though if this is perhaps related to that the connection, hence the inboundStreams are closed before the messages arrives. Is there any strategy for "cleanly" closing inbound streams (and/or connections)? My gut feeling says that this is not possible since you can not predict what messages that are in flight over a connection.

@achingbrain
Copy link
Member Author

Could you share a link to a full example please? There are lots of variables here - since this issue was closed lib2p takes pains to ensure that a given buffer has been handed off to the underlying transport before the stream write resolves, but some transports are fundamentally unreliable so it's hard to say for sure what's happening here without more information.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
need/triage Needs initial labeling and prioritization
Projects
Archived in project
2 participants