Skip to content

Commit

Permalink
Fix unsubscription when
Browse files Browse the repository at this point in the history
there's still no member id
  • Loading branch information
emasab committed Apr 26, 2024
1 parent 6e87253 commit 3e6533b
Showing 1 changed file with 50 additions and 2 deletions.
52 changes: 50 additions & 2 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -5299,12 +5299,61 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
return RD_KAFKA_RESP_ERR_NO_ERROR;
}

/**
* Remove existing topic subscription (KIP 848).
*/
static rd_kafka_resp_err_t
rd_kafka_cgrp_consumer_unsubscribe(rd_kafka_cgrp_t *rkcg,
rd_bool_t leave_group) {

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNSUBSCRIBE",
"Group \"%.*s\": unsubscribe from current %ssubscription "
"of size %d (leave group=%s, has joined=%s, %s, "
"join-state %s)",
RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
rkcg->rkcg_subscription ? "" : "unset ",
rkcg->rkcg_subscription ? rkcg->rkcg_subscription->cnt : 0,
RD_STR_ToF(leave_group),
RD_STR_ToF(RD_KAFKA_CGRP_HAS_JOINED(rkcg)),
rkcg->rkcg_member_id ? rkcg->rkcg_member_id->str : "n/a",
rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);

rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers,
&rkcg->rkcg_max_poll_interval_tmr, 1 /*lock*/);

if (rkcg->rkcg_subscription) {
rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
rd_kafka_cgrp_subscription_set(rkcg, NULL);
}

/*
* Clean-up group leader duties, if any.
*/
rd_kafka_cgrp_group_leader_reset(rkcg, "unsubscribe");

if (leave_group)
rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE;

/* FIXME: Why are we only revoking if !assignment_lost ? */
if (!rd_kafka_cgrp_assignment_is_lost(rkcg))
rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_false /*not lost*/,
rd_true /*initiating*/,
"unsubscribe");

rkcg->rkcg_flags &= ~(RD_KAFKA_CGRP_F_SUBSCRIPTION |
RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION);

return RD_KAFKA_RESP_ERR_NO_ERROR;
}

/**
* Remove existing topic subscription.
*/
static rd_kafka_resp_err_t rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t *rkcg,
rd_bool_t leave_group) {
if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
return rd_kafka_cgrp_consumer_unsubscribe(rkcg, leave_group);
}

rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNSUBSCRIBE",
"Group \"%.*s\": unsubscribe from current %ssubscription "
Expand All @@ -5326,8 +5375,7 @@ static rd_kafka_resp_err_t rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t *rkcg,
rd_kafka_cgrp_subscription_set(rkcg, NULL);
}

if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CLASSIC)
rd_kafka_cgrp_update_subscribed_topics(rkcg, NULL);
rd_kafka_cgrp_update_subscribed_topics(rkcg, NULL);

/*
* Clean-up group leader duties, if any.
Expand Down

0 comments on commit 3e6533b

Please sign in to comment.