-
Notifications
You must be signed in to change notification settings - Fork 20
feat: implement KIP-848 new consumer protocol #133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for this contributions!
I left some comments but the general idea looks great.
Once you address those, I'll happily merge!
docs/consumer.md
Outdated
| | sessionTimeout | `number` | 1 minute | Amount of time in milliseconds to wait for a consumer to send the heartbeat before considering it down.<br/><br/> This is only relevant when Kafka creates a new group.<br/><br/> Not supported for `groupProtocol=consumer`, instead it is set with broker configuration property `group.consumer.session.timeout.ms`. | | ||
| | rebalanceTimeout | `number` | 2 minutes | Amount of time in milliseconds to wait for a consumer to confirm the rebalancing before considering it down.<br/><br/> This is only relevant when Kafka creates a new group. | | ||
| | heartbeatInterval | `number` | 3 seconds | Interval in milliseconds between heartbeats.<br/><br/> Not supported for `groupProtocol=consumer`, instead it is set with the broker configuration property `group.consumer.heartbeat.interval`. | | ||
| | groupProtocol | `'classic' \| 'consumer'` | `'classic'` | Group protocol to use. Use `'classic'` for the original consumer group protocol and `'consumer'` for the new protocol introduced in KIP-848.<br/><br/> The `'consumer'` protocol provides server-side partition assignment and incremental rebalancing behavior. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please insert a link to the KIP!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/clients/consumer/options.ts
Outdated
| sessionTimeout: { type: 'number', minimum: 0 }, | ||
| rebalanceTimeout: { type: 'number', minimum: 0 }, | ||
| heartbeatInterval: { type: 'number', minimum: 0 }, | ||
| groupProtocol: { type: 'string', enum: ['classic', 'consumer'] }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please define this values in enumerations.ts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/clients/consumer/types.ts
Outdated
| (typeof MessagesStreamFallbackModes)[keyof typeof MessagesStreamFallbackModes] | ||
|
|
||
| export interface GroupOptions { | ||
| groupProtocol?: 'classic' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use an Enumeration here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
docker-compose.yml
Outdated
| broker-single: | ||
| # Rule of thumb: Confluent Kafka Version = Apache Kafka Version + 4.0.0 | ||
| image: &image confluentinc/cp-kafka:${KAFKA_VERSION:-7.9.0} | ||
| image: &image confluentinc/cp-kafka:${KAFKA_VERSION:-8.0.0} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not change this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. The new consumer protocol is only supported for Kafka >= 4.0.0. The new tests are disabled for Kafka <= 3.9.0. You might need to tweak you local Kafka container versions to run the tests.
src/clients/consumer/consumer.ts
Outdated
| : function noopCloser (_: boolean, callback: CallbackWithPromise<void>) { | ||
| callback(null) | ||
| } | ||
| const closer = this.#useNewProtocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not nest ternary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/clients/consumer/consumer.ts
Outdated
| this.#cancelHeartbeat() | ||
| this.emitWithDebug('consumer:heartbeat', 'error', { error }) | ||
|
|
||
| const fenced = (error as any).response?.errorCode === 110 // FENCED_MEMBER_EPOCH |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the constant here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
44259ab to
77ea990
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
|
Thanks |
Overview
Implements KIP-848, the new consumer rebalance protocol.
fixes #104
What is KIP-848?
KIP-848 introduces a new consumer group protocol that:
ConsumerGroupHeartbeatAPI instead of separate JoinGroup/SyncGroup/Heartbeat callsChanges
New Consumer Options:
groupProtocol:'classic'(default, existing protocol) or'consumer'(new protocol).groupRemoteAssignor: Specify server-side assignor ('uniform' or 'range'), or leave unset to let the server decide.ConsumerGroupHeartbeat response handling
heartbeatIntervalMs.Notes
heartbeatIntervalandsessionTimeout. This is enforced by new typing.