Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { RouteQuery } from "./requests/route_query"
import { StreamStatsRequest } from "./requests/stream_stats_request"
import { Offset, SubscribeRequest } from "./requests/subscribe_request"
import { UnsubscribeRequest } from "./requests/unsubscribe_request"
import { MetadataUpdateListener, PublishConfirmListener, PublishErrorListener } from "./response_decoder"
import { MetadataUpdateListener } from "./response_decoder"
import { ConsumerUpdateQuery } from "./responses/consumer_update_query"
import { CreateStreamResponse } from "./responses/create_stream_response"
import { CreateSuperStreamResponse } from "./responses/create_super_stream_response"
Expand All @@ -43,8 +43,12 @@ import { SuperStreamConsumer } from "./super_stream_consumer"
import { MessageKeyExtractorFunction, SuperStreamPublisher } from "./super_stream_publisher"
import { DEFAULT_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, ResponseCode, sample, wait } from "./util"
import { ConsumerCreditPolicy, CreditRequestWrapper, defaultCreditPolicy } from "./consumer_credit_policy"
import { PublishConfirmResponse } from "./responses/publish_confirm_response"
import { PublishErrorResponse } from "./responses/publish_error_response"

export type ConnectionClosedListener = (hadError: boolean) => void
export type ConnectionPublishConfirmListener = (confirm: PublishConfirmResponse, connectionId: string) => void
export type ConnectionPublishErrorListener = (confirm: PublishErrorResponse, connectionId: string) => void

export type ClosingParams = { closingCode: number; closingReason: string; manuallyClose?: boolean }

Expand Down Expand Up @@ -738,8 +742,8 @@ export class Client {

export type ClientListenersParams = {
metadata_update?: MetadataUpdateListener
publish_confirm?: PublishConfirmListener
publish_error?: PublishErrorListener
publish_confirm?: ConnectionPublishConfirmListener
publish_error?: ConnectionPublishErrorListener
connection_closed?: ConnectionClosedListener
}

Expand Down
10 changes: 8 additions & 2 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,14 @@ export class Connection {
})

if (listeners?.metadata_update) this.decoder.on("metadata_update", listeners.metadata_update)
if (listeners?.publish_confirm) this.decoder.on("publish_confirm", listeners.publish_confirm)
if (listeners?.publish_error) this.decoder.on("publish_error", listeners.publish_error)
if (listeners?.publish_confirm) {
const publishConfirmListener = listeners.publish_confirm
this.decoder.on("publish_confirm", (confirm) => publishConfirmListener(confirm, this.connectionId))
}
if (listeners?.publish_error) {
const publishErrorListener = listeners.publish_error
this.decoder.on("publish_error", (confirm) => publishErrorListener(confirm, this.connectionId))
}
if (listeners?.deliverV1) this.decoder.on("deliverV1", listeners.deliverV1)
if (listeners?.deliverV2) this.decoder.on("deliverV2", listeners.deliverV2)
if (listeners?.consumer_update_query) this.decoder.on("consumer_update_query", listeners.consumer_update_query)
Expand Down
4 changes: 2 additions & 2 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ export interface Publisher {

export type FilterFunc = (msg: Message) => string | undefined
type PublishConfirmCallback = (err: number | null, publishingIds: bigint[]) => void
export type SendResult = { sent: boolean; publishingId: bigint }
export type SendResult = { sent: boolean; publishingId: bigint; publisher: Publisher }
export class StreamPublisher implements Publisher {
private connection: Connection
private stream: string
Expand Down Expand Up @@ -304,7 +304,7 @@ export class StreamPublisher implements Publisher {
}
this.checkMessageSize(publishRequestMessage)
const sendCycleNeeded = this.add(publishRequestMessage)
const result = { sent: false, publishingId: publishRequestMessage.publishingId }
const result = { sent: false, publishingId: publishRequestMessage.publishingId, publisher: this }
if (sendCycleNeeded) {
result.sent = await this.sendBuffer()
}
Expand Down
2 changes: 1 addition & 1 deletion src/responses/publish_confirm_response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export class PublishConfirmResponse implements Response {
static readonly Version = 1

public publishingIds: bigint[]
private publisherId: number
readonly publisherId: number
constructor(private response: RawPublishConfirmResponse) {
if (this.response.key !== PublishConfirmResponse.key) {
throw new Error(`Unable to create ${PublishConfirmResponse.name} from data of type ${this.response.key}`)
Expand Down
2 changes: 1 addition & 1 deletion src/responses/publish_error_response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export class PublishErrorResponse implements Response {
static key = 0x0004
static readonly Version = 1

private publisherId: number
readonly publisherId: number
public publishingError: PublishingError
constructor(private response: RawPublishErrorResponse) {
if (this.response.key !== PublishErrorResponse.key) {
Expand Down
Loading