From 40fe575a6f31c9cd5e4ba5a04b1961ca6b1c16dd Mon Sep 17 00:00:00 2001 From: Edoe Tsur Date: Thu, 19 Sep 2024 10:05:29 +0300 Subject: [PATCH] 0.1.5 --- package.json | 2 +- .../cooperativeStickyAssigner/index.js | 4 +- .../cooperativeStickyAssigner/utils.js | 20 +++------ src/consumer/consumerGroup.js | 45 +++++++++---------- 4 files changed, 31 insertions(+), 40 deletions(-) diff --git a/package.json b/package.json index 2281950..fbafa43 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@sweet-security/kafkas", - "version": "0.1.4", + "version": "0.1.5", "description": "An kafkajs decorator with cooperative sticky assigner support", "main": "index.js", "types": "types/index.d.ts", diff --git a/src/consumer/assigners/cooperativeStickyAssigner/index.js b/src/consumer/assigners/cooperativeStickyAssigner/index.js index bbf3d52..ab80252 100644 --- a/src/consumer/assigners/cooperativeStickyAssigner/index.js +++ b/src/consumer/assigners/cooperativeStickyAssigner/index.js @@ -39,10 +39,10 @@ module.exports = ({ cluster }) => ({ unloadOverloadedMembers(assignment, avgPartitions) } // Step 2: If not already assigned, distribute using round-robin balancing - const unassignedPartitions = getUnassignedPartitions(currentAssignment, topicsPartitions) + const unassignedPartitions = getUnassignedPartitions(assignment, topicsPartitions) for (const unassignedPartition of unassignedPartitions) { const memberWithLeastPartitions = minBy(members, member => - getMemberAssignedPartitionCount(currentAssignment, member.memberId) + getMemberAssignedPartitionCount(assignment, member.memberId) )?.memberId if (!memberWithLeastPartitions) { diff --git a/src/consumer/assigners/cooperativeStickyAssigner/utils.js b/src/consumer/assigners/cooperativeStickyAssigner/utils.js index 6c19f15..b602d49 100644 --- a/src/consumer/assigners/cooperativeStickyAssigner/utils.js +++ b/src/consumer/assigners/cooperativeStickyAssigner/utils.js @@ -1,4 +1,4 @@ -const { orderBy } = require('lodash') +const { minBy } = require('lodash') const hasImbalance = (assignment, avgPartitions) => { return Object.values(assignment).some( topicPartitions => Object.values(topicPartitions).flat().length > avgPartitions @@ -13,15 +13,13 @@ const unloadOverloadedMembers = (assignment, avgPartitions) => { if (partitionsToRemove > 0) { let partitionsRemovedCount = 0 - - // Sort by partition count - const sortedAssignedTopics = orderBy( - Object.entries(assignment[memberId]), - [([_, assignedTopicPartitions]) => assignedTopicPartitions.length], - 'desc' - ) - for (const [topic, partitions] of sortedAssignedTopics) { + while (partitionsRemovedCount < partitionsToRemove) { + // Sort by partition count + const [topic, partitions] = minBy(Object.entries(assignment[memberId]), [ + ([_, assignedTopicPartitions]) => assignedTopicPartitions.length, + ]) + const removedPartitionId = partitions.pop() partitionsRemovedCount++ @@ -32,10 +30,6 @@ const unloadOverloadedMembers = (assignment, avgPartitions) => { if (partitions.length === 0) { delete assignment[memberId][topic] } - - if (partitionsRemovedCount > partitionsToRemove) { - break - } } } } diff --git a/src/consumer/consumerGroup.js b/src/consumer/consumerGroup.js index 006a340..952b95e 100644 --- a/src/consumer/consumerGroup.js +++ b/src/consumer/consumerGroup.js @@ -265,30 +265,6 @@ module.exports = class ConsumerGroup { const assignedTopics = keys(decodedAssignment) const topicsNotSubscribed = arrayDiff(assignedTopics, topicsSubscribed) - if (this.groupProtocol === 'CooperativeStickyAssigner') { - const ownedTopicPartitions = this.assigned() - const ownedPartitions = ownedTopicPartitions.flatMap(ownedTopicPartitions => - ownedTopicPartitions.partitions.map( - assignedTopicPartition => `${ownedTopicPartitions.topic}-${assignedTopicPartition}` - ) - ) - - const assignedPartitions = Object.keys(decodedAssignment).flatMap(assignedTopic => - decodedAssignment[assignedTopic].map( - assignedTopicPartition => `${assignedTopic}-${assignedTopicPartition}` - ) - ) - - const revokedPartitions = arrayDiff(ownedPartitions, assignedPartitions) - if (revokedPartitions.length > 0) { - this.logger.info('`Rejoining due to revoked partitions', { - ownedPartitions, - revokedPartitions, - }) - await this[PRIVATE.SYNC]() - } - } - if (topicsNotSubscribed.length > 0) { const payload = { groupId, @@ -364,6 +340,27 @@ module.exports = class ConsumerGroup { generationId, memberId, }) + + if (this.groupProtocol === 'CooperativeStickyAssigner') { + const ownedTopicPartitions = this.assigned() + const ownedPartitions = ownedTopicPartitions.flatMap(ownedTopicPartitions => + ownedTopicPartitions.partitions.map( + assignedTopicPartition => `${ownedTopicPartitions.topic}-${assignedTopicPartition}` + ) + ) + + const assignedPartitions = Object.keys(decodedAssignment).flatMap(assignedTopic => + decodedAssignment[assignedTopic].map( + assignedTopicPartition => `${assignedTopic}-${assignedTopicPartition}` + ) + ) + + const revokedPartitions = arrayDiff(ownedPartitions, assignedPartitions) + if (revokedPartitions.length > 0) { + this.logger.info('Rejoining due to revoked partitions', { revokedPartitions }) + this.joinAndSync() + } + } } joinAndSync() {