diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 397ddf5250..210132fb7f 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -5299,12 +5299,61 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg, return RD_KAFKA_RESP_ERR_NO_ERROR; } +/** + * Remove existing topic subscription (KIP 848). + */ +static rd_kafka_resp_err_t +rd_kafka_cgrp_consumer_unsubscribe(rd_kafka_cgrp_t *rkcg, + rd_bool_t leave_group) { + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNSUBSCRIBE", + "Group \"%.*s\": unsubscribe from current %ssubscription " + "of size %d (leave group=%s, has joined=%s, %s, " + "join-state %s)", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rkcg->rkcg_subscription ? "" : "unset ", + rkcg->rkcg_subscription ? rkcg->rkcg_subscription->cnt : 0, + RD_STR_ToF(leave_group), + RD_STR_ToF(RD_KAFKA_CGRP_HAS_JOINED(rkcg)), + rkcg->rkcg_member_id ? rkcg->rkcg_member_id->str : "n/a", + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + + rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers, + &rkcg->rkcg_max_poll_interval_tmr, 1 /*lock*/); + + if (rkcg->rkcg_subscription) { + rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription); + rd_kafka_cgrp_subscription_set(rkcg, NULL); + } + + /* + * Clean-up group leader duties, if any. + */ + rd_kafka_cgrp_group_leader_reset(rkcg, "unsubscribe"); + + if (leave_group) + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE; + + /* FIXME: Why are we only revoking if !assignment_lost ? */ + if (!rd_kafka_cgrp_assignment_is_lost(rkcg)) + rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_false /*not lost*/, + rd_true /*initiating*/, + "unsubscribe"); + + rkcg->rkcg_flags &= ~(RD_KAFKA_CGRP_F_SUBSCRIPTION | + RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} /** * Remove existing topic subscription. */ static rd_kafka_resp_err_t rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t *rkcg, rd_bool_t leave_group) { + if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER) { + return rd_kafka_cgrp_consumer_unsubscribe(rkcg, leave_group); + } rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNSUBSCRIBE", "Group \"%.*s\": unsubscribe from current %ssubscription " @@ -5326,8 +5375,7 @@ static rd_kafka_resp_err_t rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t *rkcg, rd_kafka_cgrp_subscription_set(rkcg, NULL); } - if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CLASSIC) - rd_kafka_cgrp_update_subscribed_topics(rkcg, NULL); + rd_kafka_cgrp_update_subscribed_topics(rkcg, NULL); /* * Clean-up group leader duties, if any.