Skip to content

Commit

Permalink
Merge pull request #25 from sweet-security/fix-assignment-bug
Browse files Browse the repository at this point in the history
0.1.5
  • Loading branch information
TsurEdoe authored Sep 19, 2024
2 parents bb0d909 + 40fe575 commit 312bd53
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 40 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@sweet-security/kafkas",
"version": "0.1.4",
"version": "0.1.5",
"description": "An kafkajs decorator with cooperative sticky assigner support",
"main": "index.js",
"types": "types/index.d.ts",
Expand Down
4 changes: 2 additions & 2 deletions src/consumer/assigners/cooperativeStickyAssigner/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ module.exports = ({ cluster }) => ({
unloadOverloadedMembers(assignment, avgPartitions)
}
// Step 2: If not already assigned, distribute using round-robin balancing
const unassignedPartitions = getUnassignedPartitions(currentAssignment, topicsPartitions)
const unassignedPartitions = getUnassignedPartitions(assignment, topicsPartitions)
for (const unassignedPartition of unassignedPartitions) {
const memberWithLeastPartitions = minBy(members, member =>
getMemberAssignedPartitionCount(currentAssignment, member.memberId)
getMemberAssignedPartitionCount(assignment, member.memberId)
)?.memberId

if (!memberWithLeastPartitions) {
Expand Down
20 changes: 7 additions & 13 deletions src/consumer/assigners/cooperativeStickyAssigner/utils.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { orderBy } = require('lodash')
const { minBy } = require('lodash')
const hasImbalance = (assignment, avgPartitions) => {
return Object.values(assignment).some(
topicPartitions => Object.values(topicPartitions).flat().length > avgPartitions
Expand All @@ -13,15 +13,13 @@ const unloadOverloadedMembers = (assignment, avgPartitions) => {

if (partitionsToRemove > 0) {
let partitionsRemovedCount = 0

// Sort by partition count
const sortedAssignedTopics = orderBy(
Object.entries(assignment[memberId]),
[([_, assignedTopicPartitions]) => assignedTopicPartitions.length],
'desc'
)

for (const [topic, partitions] of sortedAssignedTopics) {
while (partitionsRemovedCount < partitionsToRemove) {
// Sort by partition count
const [topic, partitions] = minBy(Object.entries(assignment[memberId]), [
([_, assignedTopicPartitions]) => assignedTopicPartitions.length,
])

const removedPartitionId = partitions.pop()
partitionsRemovedCount++

Expand All @@ -32,10 +30,6 @@ const unloadOverloadedMembers = (assignment, avgPartitions) => {
if (partitions.length === 0) {
delete assignment[memberId][topic]
}

if (partitionsRemovedCount > partitionsToRemove) {
break
}
}
}
}
Expand Down
45 changes: 21 additions & 24 deletions src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -265,30 +265,6 @@ module.exports = class ConsumerGroup {
const assignedTopics = keys(decodedAssignment)
const topicsNotSubscribed = arrayDiff(assignedTopics, topicsSubscribed)

if (this.groupProtocol === 'CooperativeStickyAssigner') {
const ownedTopicPartitions = this.assigned()
const ownedPartitions = ownedTopicPartitions.flatMap(ownedTopicPartitions =>
ownedTopicPartitions.partitions.map(
assignedTopicPartition => `${ownedTopicPartitions.topic}-${assignedTopicPartition}`
)
)

const assignedPartitions = Object.keys(decodedAssignment).flatMap(assignedTopic =>
decodedAssignment[assignedTopic].map(
assignedTopicPartition => `${assignedTopic}-${assignedTopicPartition}`
)
)

const revokedPartitions = arrayDiff(ownedPartitions, assignedPartitions)
if (revokedPartitions.length > 0) {
this.logger.info('`Rejoining due to revoked partitions', {
ownedPartitions,
revokedPartitions,
})
await this[PRIVATE.SYNC]()
}
}

if (topicsNotSubscribed.length > 0) {
const payload = {
groupId,
Expand Down Expand Up @@ -364,6 +340,27 @@ module.exports = class ConsumerGroup {
generationId,
memberId,
})

if (this.groupProtocol === 'CooperativeStickyAssigner') {
const ownedTopicPartitions = this.assigned()
const ownedPartitions = ownedTopicPartitions.flatMap(ownedTopicPartitions =>
ownedTopicPartitions.partitions.map(
assignedTopicPartition => `${ownedTopicPartitions.topic}-${assignedTopicPartition}`
)
)

const assignedPartitions = Object.keys(decodedAssignment).flatMap(assignedTopic =>
decodedAssignment[assignedTopic].map(
assignedTopicPartition => `${assignedTopic}-${assignedTopicPartition}`
)
)

const revokedPartitions = arrayDiff(ownedPartitions, assignedPartitions)
if (revokedPartitions.length > 0) {
this.logger.info('Rejoining due to revoked partitions', { revokedPartitions })
this.joinAndSync()
}
}
}

joinAndSync() {
Expand Down

0 comments on commit 312bd53

Please sign in to comment.