diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 210132fb7f..5927ff9cba 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -835,6 +835,14 @@ rd_kafka_broker_t *rd_kafka_cgrp_get_coord(rd_kafka_cgrp_t *rkcg) { return rkcg->rkcg_coord; } +#define rd_kafka_cgrp_will_leave(rkcg) \ + (rkcg->rkcg_flags & (RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE | \ + RD_KAFKA_CGRP_F_WAIT_LEAVE)) + +#define rd_kafka_cgrp_consumer_will_rejoin(rkcg) \ + (rkcg->rkcg_flags & \ + (RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN | \ + RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE)) /** * @brief cgrp handling of LeaveGroup responses @@ -909,6 +917,62 @@ static void rd_kafka_cgrp_consumer_reset(rd_kafka_cgrp_t *rkcg) { RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE); } +/** + * @brief Sets subscription for \p rkcg to \p rktparlist . + * Creates a metadata cache hint for all non-wildcard topics + * in new subscription. + */ +static void +rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *rktparlist) { + rkcg->rkcg_subscription = rktparlist; + if (rkcg->rkcg_subscription) { + /* Insert all non-wildcard topics in cache immediately. + * Otherwise a manual full metadata request could + * not cache the hinted topic and return an + * UNKNOWN_TOPIC_OR_PART error to the user. See #4589. */ + rd_kafka_metadata_cache_hint_rktparlist( + rkcg->rkcg_rk, rkcg->rkcg_subscription, NULL, + 0 /*dont replace*/); + } +} + +/** + * @brief Apply next subscription in \p rkcg , if set. + */ +static void rd_kafka_cgrp_consumer_apply_next_subscribe(rd_kafka_cgrp_t *rkcg) { + if (rkcg->rkcg_next_subscription) { + if (unlikely(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)) { + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_next_subscription); + rkcg->rkcg_next_subscription = NULL; + return; + } + + if (rkcg->rkcg_subscription) + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_subscription); + + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION; + + if (rd_kafka_topic_partition_list_regex_cnt( + rkcg->rkcg_next_subscription) > 0) + rkcg->rkcg_flags |= + RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION; + + rkcg->rkcg_consumer_flags |= + RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE | + RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION; + + rd_kafka_cgrp_subscription_set(rkcg, + rkcg->rkcg_next_subscription); + rkcg->rkcg_next_subscription = NULL; + + rd_kafka_cgrp_consumer_expedite_next_heartbeat( + rkcg, "subscription changed"); + } +} + /** * @brief cgrp handling of ConsumerGroupHeartbeat response after leaving group * @param opaque must be the cgrp handle. @@ -952,6 +1016,7 @@ rd_kafka_cgrp_handle_ConsumerGroupHeartbeat_leave(rd_kafka_t *rk, if (ErrorCode != RD_KAFKA_RESP_ERR__DESTROY) { rd_assert(thrd_is_current(rk->rk_thread)); rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE; + rd_kafka_cgrp_consumer_apply_next_subscribe(rkcg); rd_kafka_cgrp_try_terminate(rkcg); } @@ -5168,21 +5233,6 @@ rd_kafka_cgrp_calculate_subscribe_revoking_partitions( return revoking; } -static void -rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg, - rd_kafka_topic_partition_list_t *rktparlist) { - rkcg->rkcg_subscription = rktparlist; - if (rkcg->rkcg_subscription) { - /* Insert all non-wildcard topics in cache immediately. - * Otherwise a manual full metadata request could - * not cache the hinted topic and return an - * UNKNOWN_TOPIC_OR_PART error to the user. See #4589. */ - rd_kafka_metadata_cache_hint_rktparlist( - rkcg->rkcg_rk, rkcg->rkcg_subscription, NULL, - 0 /*dont replace*/); - } -} - /** * @brief Handle a new subscription that is modifying an existing subscription * in the COOPERATIVE case. @@ -5321,6 +5371,12 @@ rd_kafka_cgrp_consumer_unsubscribe(rd_kafka_cgrp_t *rkcg, rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers, &rkcg->rkcg_max_poll_interval_tmr, 1 /*lock*/); + if (rkcg->rkcg_next_subscription) { + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_next_subscription); + rkcg->rkcg_next_subscription = NULL; + } + if (rkcg->rkcg_subscription) { rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription); rd_kafka_cgrp_subscription_set(rkcg, NULL); @@ -5331,14 +5387,13 @@ rd_kafka_cgrp_consumer_unsubscribe(rd_kafka_cgrp_t *rkcg, */ rd_kafka_cgrp_group_leader_reset(rkcg, "unsubscribe"); - if (leave_group) + if (leave_group && !rd_kafka_cgrp_consumer_will_rejoin(rkcg) && + !rd_kafka_cgrp_will_leave(rkcg)) { rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE; - - /* FIXME: Why are we only revoking if !assignment_lost ? */ - if (!rd_kafka_cgrp_assignment_is_lost(rkcg)) rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_false /*not lost*/, rd_true /*initiating*/, "unsubscribe"); + } rkcg->rkcg_flags &= ~(RD_KAFKA_CGRP_F_SUBSCRIPTION | RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION); @@ -5989,6 +6044,7 @@ void rd_kafka_cgrp_consumer_serve(rd_kafka_cgrp_t *rkcg) { case RD_KAFKA_CGRP_JOIN_STATE_INIT: rkcg->rkcg_consumer_flags &= ~RD_KAFKA_CGRP_CONSUMER_F_WAIT_REJOIN_TO_COMPLETE; + rd_kafka_cgrp_consumer_apply_next_subscribe(rkcg); full_request = rd_true; break; case RD_KAFKA_CGRP_JOIN_STATE_STEADY: @@ -6066,25 +6122,20 @@ rd_kafka_cgrp_consumer_subscribe(rd_kafka_cgrp_t *rkcg, rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION; if (rktparlist) { - if (rkcg->rkcg_subscription) + if (rkcg->rkcg_next_subscription) rd_kafka_topic_partition_list_destroy( - rkcg->rkcg_subscription); - - rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION; - - if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0) - rkcg->rkcg_flags |= - RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION; - - rkcg->rkcg_consumer_flags |= - RD_KAFKA_CGRP_CONSUMER_F_SUBSCRIBED_ONCE | - RD_KAFKA_CGRP_CONSUMER_F_SEND_NEW_SUBSCRIPTION; - - rd_kafka_cgrp_subscription_set(rkcg, rktparlist); - rd_kafka_cgrp_consumer_expedite_next_heartbeat( - rkcg, "subscription changed"); + rkcg->rkcg_next_subscription); + rkcg->rkcg_next_subscription = rktparlist; + + /* If member is leaving, new subscription + * will be applied after the leave + * ConsumerGroupHeartbeat */ + if (!rd_kafka_cgrp_will_leave(rkcg)) { + rd_kafka_cgrp_consumer_apply_next_subscribe(rkcg); + } } else { - rd_kafka_cgrp_unsubscribe(rkcg, rd_true /*leave group*/); + rd_kafka_cgrp_consumer_unsubscribe(rkcg, + rd_true /*leave group*/); } return RD_KAFKA_RESP_ERR_NO_ERROR;