Skip to content

Commit 040e865

Browse files
authored
221 allow specifying creditpolicy on superstream consumer (#225)
* added credit policy to super stream consumer * added missing test for async super stream consumer * custom policy test
1 parent 96f3443 commit 040e865

File tree

3 files changed

+33
-3
lines changed

3 files changed

+33
-3
lines changed

src/client.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ export class Client {
244244
}
245245

246246
public async declareSuperStreamConsumer(
247-
{ superStream, offset, consumerRef }: DeclareSuperStreamConsumerParams,
247+
{ superStream, offset, consumerRef, creditPolicy }: DeclareSuperStreamConsumerParams,
248248
handle: ConsumerFunc
249249
): Promise<SuperStreamConsumer> {
250250
const partitions = await this.queryPartitions({ superStream })
@@ -254,6 +254,7 @@ export class Client {
254254
consumerRef: consumerRef || `${superStream}-${randomUUID()}`,
255255
offset: offset || Offset.first(),
256256
partitions,
257+
creditPolicy,
257258
})
258259
}
259260

@@ -774,6 +775,7 @@ export interface DeclareSuperStreamConsumerParams {
774775
superStream: string
775776
consumerRef?: string
776777
offset?: Offset
778+
creditPolicy?: ConsumerCreditPolicy
777779
}
778780

779781
export interface SubscribeParams {

src/super_stream_consumer.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Client } from "./client"
22
import { Consumer, ConsumerFunc } from "./consumer"
3+
import { ConsumerCreditPolicy, defaultCreditPolicy } from "./consumer_credit_policy"
34
import { Offset } from "./requests/subscribe_request"
45

56
export class SuperStreamConsumer {
@@ -9,6 +10,7 @@ export class SuperStreamConsumer {
910
private locator: Client
1011
private partitions: string[]
1112
private offset: Offset
13+
private creditPolicy: ConsumerCreditPolicy
1214

1315
private constructor(
1416
readonly handle: ConsumerFunc,
@@ -18,20 +20,28 @@ export class SuperStreamConsumer {
1820
partitions: string[]
1921
consumerRef: string
2022
offset: Offset
23+
creditPolicy?: ConsumerCreditPolicy
2124
}
2225
) {
2326
this.superStream = params.superStream
2427
this.consumerRef = params.consumerRef
2528
this.locator = params.locator
2629
this.partitions = params.partitions
2730
this.offset = params.offset
31+
this.creditPolicy = params.creditPolicy || defaultCreditPolicy
2832
}
2933

3034
async start(): Promise<void> {
3135
await Promise.all(
3236
this.partitions.map(async (p) => {
3337
const partitionConsumer = await this.locator.declareConsumer(
34-
{ stream: p, consumerRef: this.consumerRef, offset: this.offset, singleActive: true },
38+
{
39+
stream: p,
40+
consumerRef: this.consumerRef,
41+
offset: this.offset,
42+
singleActive: true,
43+
creditPolicy: this.creditPolicy,
44+
},
3545
this.handle,
3646
this
3747
)
@@ -49,6 +59,7 @@ export class SuperStreamConsumer {
4959
partitions: string[]
5060
consumerRef: string
5161
offset: Offset
62+
creditPolicy?: ConsumerCreditPolicy
5263
}
5364
): Promise<SuperStreamConsumer> {
5465
const superStreamConsumer = new SuperStreamConsumer(handle, params)

test/e2e/superstream_consumer.test.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import { Message, MessageOptions } from "../../src/publisher"
44
import { range } from "../../src/util"
55
import { createClient, createStreamName } from "../support/fake_data"
66
import { Rabbit } from "../support/rabbit"
7-
import { eventually, password, username } from "../support/util"
7+
import { eventually, password, username, waitSleeping } from "../support/util"
88
import { randomUUID } from "crypto"
9+
import { creditsOnChunkCompleted } from "../../src/consumer_credit_policy"
910

1011
describe("super stream consumer", () => {
1112
let superStreamName: string
@@ -52,6 +53,22 @@ describe("super stream consumer", () => {
5253
})
5354
})
5455

56+
it("declaring an async super stream consumer on an existing super stream - no error is thrown", async () => {
57+
await client.declareSuperStreamConsumer({ superStream: superStreamName }, async (_message: Message) => {
58+
await waitSleeping(10)
59+
return
60+
})
61+
})
62+
63+
it("declaring a super stream consumer with a custom credit policy - no error is thrown", async () => {
64+
await client.declareSuperStreamConsumer(
65+
{ superStream: superStreamName, creditPolicy: creditsOnChunkCompleted(2, 1) },
66+
(_message: Message) => {
67+
return
68+
}
69+
)
70+
})
71+
5572
it("declaring a super stream consumer on an existing super stream - read a message", async () => {
5673
await sender(1)
5774
const messages: Message[] = []

0 commit comments

Comments
 (0)