Skip to content

Conversation

@chiawendt
Copy link
Contributor

@chiawendt chiawendt commented Oct 6, 2025

Overview

Implements KIP-848, the new consumer rebalance protocol.

fixes #104

What is KIP-848?

KIP-848 introduces a new consumer group protocol that:

  • Moves partition assignment logic to the server-side
  • Enables incremental rebalancing instead of stop-the-world rebalances
  • Uses a single ConsumerGroupHeartbeat API instead of separate JoinGroup/SyncGroup/Heartbeat calls

Changes

New Consumer Options:

groupProtocol: 'classic' (default, existing protocol) or 'consumer' (new protocol).
groupRemoteAssignor: Specify server-side assignor ('uniform' or 'range'), or leave unset to let the server decide.

ConsumerGroupHeartbeat response handling

  1. handle fencing error → reset epoch to 0 and rejoin.
  2. handle other errors → retry heartbeat.
  3. Update member epoch and memberId.
  4. Schedule another heartbeat based on response heartbeatIntervalMs.
  5. Calculate assignments changes.
  6. If partitions are revoked → pause streams, autocommit, update assignments, resume stream, resend heartbeat.
  7. if partitions are assigned → update assignments, refresh offsets, fetch, resend heartbeat.

Notes

  1. You need Kafka >= 4.0.0 to test new consumer protocol.
  2. New protocol does not allow setting some classic group options like heartbeatInterval and sessionTimeout. This is enforced by new typing.

Copy link
Contributor

@ShogunPanda ShogunPanda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for this contributions!
I left some comments but the general idea looks great.
Once you address those, I'll happily merge!

docs/consumer.md Outdated
| sessionTimeout | `number` | 1 minute | Amount of time in milliseconds to wait for a consumer to send the heartbeat before considering it down.<br/><br/> This is only relevant when Kafka creates a new group.<br/><br/> Not supported for `groupProtocol=consumer`, instead it is set with broker configuration property `group.consumer.session.timeout.ms`. |
| rebalanceTimeout | `number` | 2 minutes | Amount of time in milliseconds to wait for a consumer to confirm the rebalancing before considering it down.<br/><br/> This is only relevant when Kafka creates a new group. |
| heartbeatInterval | `number` | 3 seconds | Interval in milliseconds between heartbeats.<br/><br/> Not supported for `groupProtocol=consumer`, instead it is set with the broker configuration property `group.consumer.heartbeat.interval`. |
| groupProtocol | `'classic' \| 'consumer'` | `'classic'` | Group protocol to use. Use `'classic'` for the original consumer group protocol and `'consumer'` for the new protocol introduced in KIP-848.<br/><br/> The `'consumer'` protocol provides server-side partition assignment and incremental rebalancing behavior. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please insert a link to the KIP!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

sessionTimeout: { type: 'number', minimum: 0 },
rebalanceTimeout: { type: 'number', minimum: 0 },
heartbeatInterval: { type: 'number', minimum: 0 },
groupProtocol: { type: 'string', enum: ['classic', 'consumer'] },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please define this values in enumerations.ts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

(typeof MessagesStreamFallbackModes)[keyof typeof MessagesStreamFallbackModes]

export interface GroupOptions {
groupProtocol?: 'classic'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use an Enumeration here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

broker-single:
# Rule of thumb: Confluent Kafka Version = Apache Kafka Version + 4.0.0
image: &image confluentinc/cp-kafka:${KAFKA_VERSION:-7.9.0}
image: &image confluentinc/cp-kafka:${KAFKA_VERSION:-8.0.0}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not change this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The new consumer protocol is only supported for Kafka >= 4.0.0. The new tests are disabled for Kafka <= 3.9.0. You might need to tweak you local Kafka container versions to run the tests.

: function noopCloser (_: boolean, callback: CallbackWithPromise<void>) {
callback(null)
}
const closer = this.#useNewProtocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not nest ternary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

this.#cancelHeartbeat()
this.emitWithDebug('consumer:heartbeat', 'error', { error })

const fenced = (error as any).response?.errorCode === 110 // FENCED_MEMBER_EPOCH
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the constant here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@chiawendt chiawendt force-pushed the kip848 branch 2 times, most recently from 44259ab to 77ea990 Compare October 17, 2025 15:17
Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@ShogunPanda ShogunPanda merged commit 8f56cee into platformatic:main Oct 21, 2025
18 checks passed
@chiawendt
Copy link
Contributor Author

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support incremental cooperative rebalancing

3 participants