Skip to content

[Rust] Multiplexed streams core#188

Open
danilonajkov-db wants to merge 2 commits intomainfrom
stream-multiplexing-core
Open

[Rust] Multiplexed streams core#188
danilonajkov-db wants to merge 2 commits intomainfrom
stream-multiplexing-core

Conversation

@danilonajkov-db
Copy link
Copy Markdown
Contributor

@danilonajkov-db danilonajkov-db commented Apr 2, 2026

What changes are proposed in this pull request?

Added a MultiplexedStream abstraction that manages N sub-streams behind the scenes. Records are distributed
across streams via round-robin, giving customers higher throughput with a simple API.

  • [TODO: Decide if this is the right approach] Bitshift-encoded MessageId: Instead of maintaining a HashMap to map mux-level IDs to (stream, offset) pairs, we encode the stream index in the upper 6 bits of the offset returned by the sub-stream. This gives O(1) encode/decode with zero allocations and no locks on the hot path. Supports up to 64 sub-streams.
  • Separate connection per stream: During development I discovered that tonic's Channel serializes requests
    through a single tower Buffer worker - cloning the channel shares the same connection. I changed ZerobusSdk
    to create a new channel per create_stream call so each stream gets its own TCP connection, enabling true
    parallel ingestion and ack processing.
  • Poison-on-failure semantics: If any sub-stream fails, the entire MultiplexedStream shuts down — flushes
    healthy streams best-effort, collects all unacked records, and blocks further ingestion. Callers can
    retrieve unacked records via get_unacked_records().
  • MessageId API: The multiplexed stream returns opaque MessageId values (not raw offsets) from ingest_record
    / ingest_records. Callers use wait_for_message_id to await acknowledgment. This makes it clear that these
    IDs are not ordered offsets.
  1. Once stream builder API is merged , ill expand it to be the only way of creating multiplexed streams. It will handle creating the streams and managing this multiplex object. For now this PR just has the core logic for multiplexing
  2. Once the PR for better rust mock tests is merged Ill expand the testing sute with more and better tests.

All current changes are currently guarded by "testing" flag

How is this tested?

New unit test suite, benchmarked performance increase

@danilonajkov-db danilonajkov-db changed the title cleanup [All] Multiplexed streams core Apr 2, 2026
@danilonajkov-db danilonajkov-db marked this pull request as ready for review April 2, 2026 16:00
@danilonajkov-db danilonajkov-db changed the title [All] Multiplexed streams core [Rust] Multiplexed streams core Apr 2, 2026
@elenagaljak-db
Copy link
Copy Markdown
Contributor

I thought that we decided to go into the approach of this being like a default thing? With a global ZerobusStream that would act as a single or as MultiplexStream depending on the feature? Is there any specific reason you decided not to?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants