Skip to content

Commit

Permalink
Handle cases with multiple
Browse files Browse the repository at this point in the history
subscribe and unscribe calls or
unsubscribe calls while rejoining
  • Loading branch information
emasab committed May 2, 2024
1 parent 3e6533b commit 4c1ce7b
Showing 1 changed file with 87 additions and 36 deletions.
123 changes: 87 additions & 36 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,14 @@ rd_kafka_broker_t *rd_kafka_cgrp_get_coord(rd_kafka_cgrp_t *rkcg) {
return rkcg->rkcg_coord;
}

#define rd_kafka_cgrp_will_leave(rkcg) \
(rkcg->rkcg_flags & (RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE | \
RD_KAFKA_CGRP_F_WAIT_LEAVE))

#define rd_kafka_cgrp_consumer_will_rejoin(rkcg) \
(rkcg->rkcg_flags & \
(RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN | \
RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE))

/**
* @brief cgrp handling of LeaveGroup responses
Expand Down Expand Up @@ -909,6 +917,62 @@ static void rd_kafka_cgrp_consumer_reset(rd_kafka_cgrp_t *rkcg) {
RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE);
}

/**
* @brief Sets subscription for \p rkcg to \p rktparlist .
* Creates a metadata cache hint for all non-wildcard topics
* in new subscription.
*/
static void
rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg,
rd_kafka_topic_partition_list_t *rktparlist) {
rkcg->rkcg_subscription = rktparlist;
if (rkcg->rkcg_subscription) {
/* Insert all non-wildcard topics in cache immediately.
* Otherwise a manual full metadata request could
* not cache the hinted topic and return an
* UNKNOWN_TOPIC_OR_PART error to the user. See #4589. */
rd_kafka_metadata_cache_hint_rktparlist(
rkcg->rkcg_rk, rkcg->rkcg_subscription, NULL,
0 /*dont replace*/);
}
}

/**
* @brief Apply next subscription in \p rkcg , if set.
*/
static void rd_kafka_cgrp_consumer_apply_next_subscribe(rd_kafka_cgrp_t *rkcg) {
if (rkcg->rkcg_next_subscription) {
if (unlikely(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)) {
rd_kafka_topic_partition_list_destroy(
rkcg->rkcg_next_subscription);
rkcg->rkcg_next_subscription = NULL;
return;
}

if (rkcg->rkcg_subscription)
rd_kafka_topic_partition_list_destroy(
rkcg->rkcg_subscription);

rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION;

if (rd_kafka_topic_partition_list_regex_cnt(
rkcg->rkcg_next_subscription) > 0)
rkcg->rkcg_flags |=
RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;

rkcg->rkcg_consumer_flags |=
RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE |
RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION;

rd_kafka_cgrp_subscription_set(rkcg,
rkcg->rkcg_next_subscription);
rkcg->rkcg_next_subscription = NULL;

rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rkcg, "subscription changed");
}
}

/**
* @brief cgrp handling of ConsumerGroupHeartbeat response after leaving group
* @param opaque must be the cgrp handle.
Expand Down Expand Up @@ -952,6 +1016,7 @@ rd_kafka_cgrp_handle_ConsumerGroupHeartbeat_leave(rd_kafka_t *rk,
if (ErrorCode != RD_KAFKA_RESP_ERR__DESTROY) {
rd_assert(thrd_is_current(rk->rk_thread));
rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE;
rd_kafka_cgrp_consumer_apply_next_subscribe(rkcg);
rd_kafka_cgrp_try_terminate(rkcg);
}

Expand Down Expand Up @@ -5168,21 +5233,6 @@ rd_kafka_cgrp_calculate_subscribe_revoking_partitions(
return revoking;
}

static void
rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg,
rd_kafka_topic_partition_list_t *rktparlist) {
rkcg->rkcg_subscription = rktparlist;
if (rkcg->rkcg_subscription) {
/* Insert all non-wildcard topics in cache immediately.
* Otherwise a manual full metadata request could
* not cache the hinted topic and return an
* UNKNOWN_TOPIC_OR_PART error to the user. See #4589. */
rd_kafka_metadata_cache_hint_rktparlist(
rkcg->rkcg_rk, rkcg->rkcg_subscription, NULL,
0 /*dont replace*/);
}
}

/**
* @brief Handle a new subscription that is modifying an existing subscription
* in the COOPERATIVE case.
Expand Down Expand Up @@ -5321,6 +5371,12 @@ rd_kafka_cgrp_consumer_unsubscribe(rd_kafka_cgrp_t *rkcg,
rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers,
&rkcg->rkcg_max_poll_interval_tmr, 1 /*lock*/);

if (rkcg->rkcg_next_subscription) {
rd_kafka_topic_partition_list_destroy(
rkcg->rkcg_next_subscription);
rkcg->rkcg_next_subscription = NULL;
}

if (rkcg->rkcg_subscription) {
rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
rd_kafka_cgrp_subscription_set(rkcg, NULL);
Expand All @@ -5331,14 +5387,13 @@ rd_kafka_cgrp_consumer_unsubscribe(rd_kafka_cgrp_t *rkcg,
*/
rd_kafka_cgrp_group_leader_reset(rkcg, "unsubscribe");

if (leave_group)
if (leave_group && !rd_kafka_cgrp_consumer_will_rejoin(rkcg) &&
!rd_kafka_cgrp_will_leave(rkcg)) {
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE;

/* FIXME: Why are we only revoking if !assignment_lost ? */
if (!rd_kafka_cgrp_assignment_is_lost(rkcg))
rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_false /*not lost*/,
rd_true /*initiating*/,
"unsubscribe");
}

rkcg->rkcg_flags &= ~(RD_KAFKA_CGRP_F_SUBSCRIPTION |
RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION);
Expand Down Expand Up @@ -5989,6 +6044,7 @@ void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) {
case RD_KAFKA_CGRP_JOIN_STATE_INIT:
rkcg->rkcg_consumer_flags &=
~RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE;
rd_kafka_cgrp_consumer_apply_next_subscribe(rkcg);
full_request = rd_true;
break;
case RD_KAFKA_CGRP_JOIN_STATE_STEADY:
Expand Down Expand Up @@ -6066,25 +6122,20 @@ rd_kafka_cgrp_consumer_subscribe(rd_kafka_cgrp_t *rkcg,

rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
if (rktparlist) {
if (rkcg->rkcg_subscription)
if (rkcg->rkcg_next_subscription)
rd_kafka_topic_partition_list_destroy(
rkcg->rkcg_subscription);

rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION;

if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
rkcg->rkcg_flags |=
RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;

rkcg->rkcg_consumer_flags |=
RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE |
RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION;

rd_kafka_cgrp_subscription_set(rkcg, rktparlist);
rd_kafka_cgrp_consumer_expedite_next_heartbeat(
rkcg, "subscription changed");
rkcg->rkcg_next_subscription);
rkcg->rkcg_next_subscription = rktparlist;

/* If member is leaving, new subscription
* will be applied after the leave
* ConsumerGroupHeartbeat */
if (!rd_kafka_cgrp_will_leave(rkcg)) {
rd_kafka_cgrp_consumer_apply_next_subscribe(rkcg);
}
} else {
rd_kafka_cgrp_unsubscribe(rkcg, rd_true /*leave group*/);
rd_kafka_cgrp_consumer_unsubscribe(rkcg,
rd_true /*leave group*/);
}

return RD_KAFKA_RESP_ERR_NO_ERROR;
Expand Down

0 comments on commit 4c1ce7b

Please sign in to comment.