-
Notifications
You must be signed in to change notification settings - Fork 9
Description
In order to implement publisher confirms on my SuperStreamPublisher
correctly I found I needed to keep track of a "publishing key" for each message I publish. The publishing key is made up of the extendedId
of the publisher (which is computed from the publisher id and the connection id using computeExtendedPublisherId()
) and the publishingId
of the message.
Keeping track using this publishing key ensures message confirmations from the different (per-partition) publishers in the super stream do not get mixed up by the listener I register when creating the Client
:
const client = rabbitStreams.connect({
...
listeners: {
publish_confirm: (confirm) => {
....
},
publish_error: (error) => {
....
},
}
});
The confirm
and error
parameters to these handlers do not include the connectionId
, only the publisherId
and the publishingIds
of the confirmed/errored message publishes.
Additionally, when using send()
on SuperStreamPublisher
the return value does not include the publisherId
and connectionId
, only the publishingId
. This makes it hard to initiate a wait for the publish confirmation via the listener for the same reason.
In order to get this working in my application, I have made local changes to the stream client to add the above information. I'd be interested to understand if there is a way to implement this behaviour without my modifications (or with fewer modifications) to the library.
My changes are in the following PR: #276