Skip to content

[KIP-848]: Implemented KIP 848 changes in ListGroups API #328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: dev_add_describeGroup
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion examples/kafkajs/admin/list-groups.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@ async function adminStart() {
short: 's',
multiple: true,
default: [],
}
},
'types': {
type: 'string',
short: 't',
multiple: true,
default: [],
},
},
});

let {
'bootstrap-servers': bootstrapServers,
states: matchConsumerGroupStates,
types: matchConsumerGroupTypes,
timeout,
} = args.values;

Expand All @@ -36,6 +43,9 @@ async function adminStart() {
matchConsumerGroupStates = matchConsumerGroupStates.map(
state => ConsumerGroupStates[state]);

matchConsumerGroupTypes = matchConsumerGroupTypes.map(
type => ConsumerGroupTypes[type]);

const kafka = new Kafka({
kafkaJS: {
brokers: [bootstrapServers],
Expand All @@ -55,6 +65,7 @@ async function adminStart() {
console.log(`\tType: ${group.protocolType}`);
console.log(`\tIs simple: ${group.isSimpleConsumerGroup}`);
console.log(`\tState: ${group.state}`);
console.log(`\tType: ${group.type}`);
}
} catch(err) {
console.log('List topics failed', err);
Expand Down
2 changes: 2 additions & 0 deletions lib/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ AdminClient.prototype.createPartitions = function (topic, totalPartitions, timeo
* May be unset (default: 5000).
* @param {Array<RdKafka.ConsumerGroupStates>?} options.matchConsumerGroupStates -
* A list of consumer group states to match. May be unset, fetches all states (default: unset).
* @param {Array<RdKafka.ConsumerGroupTypes>?} options.matchConsumerGroupTypes -
* A list of consumer group types to match. May be unset, fetches all types (default: unset).
* @param {function} cb - The callback to be executed when finished.
* @example
* // Valid ways to call this function:
Expand Down
4 changes: 3 additions & 1 deletion lib/kafkajs/_admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,9 @@ class Admin {
* May be unset (default: 5000).
* @param {Array<KafkaJS.ConsumerGroupStates>?} options.matchConsumerGroupStates -
* A list of consumer group states to match. May be unset, fetches all states (default: unset).
* @returns {Promise<{ groups: Array<{groupId: string, protocolType: string, isSimpleConsumerGroup: boolean, state: KafkaJS.ConsumerGroupStates}>, errors: Array<RdKafka.LibrdKafkaError> }>}
* @param {Array<KafkaJS.ConsumerGroupTypes>?} options.matchConsumerGroupTypes -
* A list of consumer group types to match. May be unset, fetches all types (default: unset).
* @returns {Promise<{ groups: Array<{groupId: string, protocolType: string, isSimpleConsumerGroup: boolean, state: KafkaJS.ConsumerGroupStates, type: KafkaJS.ConsumerGroupTypes}>, errors: Array<RdKafka.LibrdKafkaError> }>}
* Resolves with the list of consumer groups, rejects on error.
*/
async listGroups(options = {}) {
Expand Down
32 changes: 30 additions & 2 deletions src/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,9 @@ Baton AdminClient::CreatePartitions(

Baton AdminClient::ListGroups(
bool is_match_states_set,
std::vector<rd_kafka_consumer_group_state_t> &match_states, int timeout_ms,
std::vector<rd_kafka_consumer_group_state_t> &match_states,
bool is_match_types_set,
std::vector<rd_kafka_consumer_group_type_t> &match_types, int timeout_ms,
/* out */ rd_kafka_event_t **event_response) {
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE);
Expand Down Expand Up @@ -515,6 +517,15 @@ Baton AdminClient::ListGroups(
}
}

if (is_match_types_set) {
rd_kafka_error_t *error =
rd_kafka_AdminOptions_set_match_consumer_group_types(
options, &match_types[0], match_types.size());
if (error) {
return Baton::BatonFromErrorAndDestroy(error);
}
}

// Create queue just for this operation.
rd_kafka_queue_t *rkqu = rd_kafka_queue_new(m_client->c_ptr());

Expand Down Expand Up @@ -1195,9 +1206,26 @@ NAN_METHOD(AdminClient::NodeListGroups) {
}
}

std::vector<rd_kafka_consumer_group_type_t> match_types;
v8::Local<v8::String> match_consumer_group_types_key =
Nan::New("matchConsumerGroupTypes").ToLocalChecked();
bool is_match_types_set =
Nan::Has(config, match_consumer_group_types_key).FromMaybe(false);
v8::Local<v8::Array> match_types_array = Nan::New<v8::Array>();

if (is_match_types_set) {
match_types_array = GetParameter<v8::Local<v8::Array>>(
config, "matchConsumerGroupTypes", match_types_array);
if (match_types_array->Length()) {
match_types = Conversion::Admin::FromV8GroupTypeArray(
match_types_array);
}
}

// Queue the work.
Nan::AsyncQueueWorker(new Workers::AdminClientListGroups(
callback, client, is_match_states_set, match_states, timeout_ms));
callback, client, is_match_states_set, match_states, is_match_types_set,
match_types, timeout_ms));
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class AdminClient : public Connection {
// Baton DescribeConfig(rd_kafka_NewTopic_t* topic, int timeout_ms);
Baton ListGroups(bool is_match_states_set,
std::vector<rd_kafka_consumer_group_state_t>& match_states,
bool is_match_types_set,
std::vector<rd_kafka_consumer_group_type_t>& match_types,
int timeout_ms,
rd_kafka_event_t** event_response);
Baton DescribeGroups(std::vector<std::string>& groups,
Expand Down
33 changes: 33 additions & 0 deletions src/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,35 @@ std::vector<rd_kafka_consumer_group_state_t> FromV8GroupStateArray(
return returnVec;
}

/**
* @brief Converts a v8 array of group types into a vector of
* rd_kafka_consumer_group_type_t.
*/
std::vector<rd_kafka_consumer_group_type_t> FromV8GroupTypeArray(
v8::Local<v8::Array> array) {
v8::Local<v8::Array> parameter = array.As<v8::Array>();
std::vector<rd_kafka_consumer_group_type_t> returnVec;
if (parameter->Length() >= 1) {
for (unsigned int i = 0; i < parameter->Length(); i++) {
v8::Local<v8::Value> v;
if (!Nan::Get(parameter, i).ToLocal(&v)) {
continue;
}
Nan::Maybe<int64_t> maybeT = Nan::To<int64_t>(v);
if (maybeT.IsNothing()) {
continue;
}
int64_t type_number = maybeT.FromJust();
if (type_number >= RD_KAFKA_CONSUMER_GROUP_TYPE__CNT) {
continue;
}
returnVec.push_back(
static_cast<rd_kafka_consumer_group_type_t>(type_number));
}
}
return returnVec;
}

/**
* @brief Converts a rd_kafka_ListConsumerGroups_result_t* into a v8 object.
*/
Expand All @@ -920,6 +949,7 @@ v8::Local<v8::Object> FromListConsumerGroupsResult(
protocolType: string,
isSimpleConsumerGroup: boolean,
state: ConsumerGroupState (internally a number)
type: ConsumerGroupType (internally a number)
}[],
errors: LibrdKafkaError[]
}
Expand Down Expand Up @@ -957,6 +987,9 @@ v8::Local<v8::Object> FromListConsumerGroupsResult(
Nan::Set(groupObject, Nan::New("state").ToLocalChecked(),
Nan::New<v8::Number>(rd_kafka_ConsumerGroupListing_state(group)));

Nan::Set(groupObject, Nan::New("type").ToLocalChecked(),
Nan::New<v8::Number>(rd_kafka_ConsumerGroupListing_type(group)));

Nan::Set(groups, i, groupObject);
}

Expand Down
4 changes: 4 additions & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ rd_kafka_NewTopic_t **FromV8TopicObjectArray(v8::Local<v8::Array>);
std::vector<rd_kafka_consumer_group_state_t> FromV8GroupStateArray(
v8::Local<v8::Array>);

// ListGroups: request
std::vector<rd_kafka_consumer_group_type_t> FromV8GroupTypeArray(
v8::Local<v8::Array> array);

// ListGroups: response
v8::Local<v8::Object> FromListConsumerGroupsResult(
const rd_kafka_ListConsumerGroups_result_t *);
Expand Down
5 changes: 5 additions & 0 deletions src/workers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1317,11 +1317,15 @@ void AdminClientCreatePartitions::HandleErrorCallback() {
AdminClientListGroups::AdminClientListGroups(
Nan::Callback* callback, AdminClient* client, bool is_match_states_set,
std::vector<rd_kafka_consumer_group_state_t>& match_states,
bool is_match_types_set,
std::vector<rd_kafka_consumer_group_type_t>& match_types,
const int& timeout_ms)
: ErrorAwareWorker(callback),
m_client(client),
m_is_match_states_set(is_match_states_set),
m_match_states(match_states),
m_is_match_types_set(is_match_types_set),
m_match_types(match_types),
m_timeout_ms(timeout_ms) {}

AdminClientListGroups::~AdminClientListGroups() {
Expand All @@ -1332,6 +1336,7 @@ AdminClientListGroups::~AdminClientListGroups() {

void AdminClientListGroups::Execute() {
Baton b = m_client->ListGroups(m_is_match_states_set, m_match_states,
m_is_match_types_set, m_match_types,
m_timeout_ms, &m_event_response);
if (b.err() != RdKafka::ERR_NO_ERROR) {
SetErrorBaton(b);
Expand Down
4 changes: 4 additions & 0 deletions src/workers.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,8 @@ class AdminClientListGroups : public ErrorAwareWorker {
public:
AdminClientListGroups(Nan::Callback *, NodeKafka::AdminClient *, bool,
std::vector<rd_kafka_consumer_group_state_t> &,
bool,
std::vector<rd_kafka_consumer_group_type_t> &,
const int &);
~AdminClientListGroups();

Expand All @@ -543,6 +545,8 @@ class AdminClientListGroups : public ErrorAwareWorker {
NodeKafka::AdminClient *m_client;
const bool m_is_match_states_set;
std::vector<rd_kafka_consumer_group_state_t> m_match_states;
const bool m_is_match_types_set;
std::vector<rd_kafka_consumer_group_type_t> m_match_types;
const int m_timeout_ms;
rd_kafka_event_t *m_event_response;
};
Expand Down
10 changes: 8 additions & 2 deletions test/promisified/admin/list_groups.spec.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
jest.setTimeout(30000);

const {
testConsumerGroupProtocolClassic,
createConsumer,
secureRandom,
createTopic,
waitFor,
createAdmin,
} = require('../testhelpers');
const { ConsumerGroupStates, ErrorCodes } = require('../../../lib').KafkaJS;
const { ConsumerGroupStates, ConsumerGroupTypes, ErrorCodes } = require('../../../lib').KafkaJS;

describe('Admin > listGroups', () => {
let topicName, groupId, consumer, admin;
Expand All @@ -19,6 +20,7 @@ describe('Admin > listGroups', () => {
consumer = createConsumer({
groupId,
fromBeginning: true,
autoCommit: true,
});

await createTopic({ topic: topicName, partitions: 2 });
Expand Down Expand Up @@ -46,10 +48,12 @@ describe('Admin > listGroups', () => {
await consumer.run({ eachMessage: async () => {} });

await waitFor(() => consumer.assignment().length > 0, () => null, 1000);
const groupType = testConsumerGroupProtocolClassic() ? ConsumerGroupTypes.CLASSIC : ConsumerGroupTypes.CONSUMER;

await admin.connect();
let listGroupsResult = await admin.listGroups({
matchConsumerGroupStates: undefined,
matchConsumerGroupStates: [ConsumerGroupStates.STABLE,],
matchConsumerGroupTypes: [groupType,],
});
expect(listGroupsResult.errors).toEqual([]);
expect(listGroupsResult.groups).toEqual(
Expand All @@ -59,6 +63,7 @@ describe('Admin > listGroups', () => {
isSimpleConsumerGroup: false,
protocolType: 'consumer',
state: ConsumerGroupStates.STABLE,
type: groupType,
}),
])
);
Expand All @@ -76,6 +81,7 @@ describe('Admin > listGroups', () => {
isSimpleConsumerGroup: false,
protocolType: 'consumer',
state: ConsumerGroupStates.EMPTY,
type: groupType,
}),
])
);
Expand Down
7 changes: 5 additions & 2 deletions types/kafkajs.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import {
Node,
AclOperationTypes,
Uuid,
IsolationLevel
IsolationLevel,
ConsumerGroupTypes
} from './rdkafka'

import {
Expand All @@ -34,6 +35,7 @@ export {
AclOperationTypes,
Uuid,
IsolationLevel,
ConsumerGroupTypes
} from './rdkafka'

export interface OauthbearerProviderResponse {
Expand Down Expand Up @@ -413,7 +415,8 @@ export type Admin = {
listTopics(options?: { timeout?: number }): Promise<string[]>
listGroups(options?: {
timeout?: number,
matchConsumerGroupStates?: ConsumerGroupStates[]
matchConsumerGroupStates?: ConsumerGroupStates[],
matchConsumerGroupTypes?: ConsumerGroupTypes[]
}): Promise<{ groups: GroupOverview[], errors: LibrdKafkaError[] }>
describeGroups(
groups: string[],
Expand Down
3 changes: 2 additions & 1 deletion types/rdkafka.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ export interface GroupOverview {
protocolType: string;
isSimpleConsumerGroup: boolean;
state: ConsumerGroupStates;
type: ConsumerGroupTypes;
}

export enum AclOperationTypes {
Expand Down Expand Up @@ -504,7 +505,7 @@ export interface IAdminClient {
listTopics(options?: { timeout?: number }, cb?: (err: LibrdKafkaError, topics: string[]) => any): void;

listGroups(cb?: (err: LibrdKafkaError, result: { groups: GroupOverview[], errors: LibrdKafkaError[] }) => any): void;
listGroups(options?: { timeout?: number, matchConsumerGroupStates?: ConsumerGroupStates[] },
listGroups(options?: { timeout?: number, matchConsumerGroupStates?: ConsumerGroupStates[], matchConsumerGroupTypes?: ConsumerGroupTypes[] },
cb?: (err: LibrdKafkaError, result: { groups: GroupOverview[], errors: LibrdKafkaError[] }) => any): void;

describeGroups(groupIds: string[], cb?: (err: LibrdKafkaError, result: GroupDescriptions) => any): void;
Expand Down