diff --git a/CHANGELOG.md b/CHANGELOG.md index e4dbc55350..016c185896 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,9 +2,11 @@ librdkafka v2.5.0 is a feature release. -* Fix segfault when using long client id because of erased segment when using flexver. (#4689) -* Fix for an idempotent producer error, with a message batch not reconstructed - identically when retried (#4750) + * Fix segfault when using long client id because of erased segment when using flexver. (#4689) + * Fix for an idempotent producer error, with a message batch not reconstructed + identically when retried (#4750) + * Fix to remove fetch queue messages that blocked the destroy of rdkafka + instances (#4724) ## Enhancements @@ -32,6 +34,17 @@ librdkafka v2.5.0 is a feature release. in the batch. Happens since 2.2.0 (#4750). +### Consumer fixes + + * Issues: + Fix to remove fetch queue messages that blocked the destroy of rdkafka + instances. Circular dependencies from a partition fetch queue message to + the same partition blocked the destroy of an instance, that happened + in case the partition was removed from the cluster while it was being + consumed. Solved by purging internal partition queue, after being stopped + and removed, to allow reference count to reach zero and trigger a destroy. + Happening since 2.0.2 (#4724). + # librdkafka v2.4.0 diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 685cf5bfc6..0893d99119 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -3407,6 +3407,8 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) { : (topic_err ? topic_err : RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)); + + rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp); } rd_kafka_toppar_unlock(rktp); diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 1917991ddd..dbc5c2dc69 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -3462,10 +3462,6 @@ static void rd_kafka_cgrp_partition_add(rd_kafka_cgrp_t *rkcg, */ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp) { - int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0; - rd_kafka_op_t *rko; - rd_kafka_q_t *rkq; - rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", "Group \"%s\": delete %s [%" PRId32 "]", rkcg->rkcg_group_id->str, rktp->rktp_rkt->rkt_topic->str, @@ -3475,54 +3471,7 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg, rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP); rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP; - if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) { - /* Partition is being removed from the cluster and it's stopped, - * so rktp->rktp_fetchq->rkq_fwdq is NULL. - * Purge remaining operations in rktp->rktp_fetchq->rkq_q, - * while holding lock, to avoid circular references */ - rkq = rktp->rktp_fetchq; - mtx_lock(&rkq->rkq_lock); - rd_assert(!rkq->rkq_fwdq); - - rko = TAILQ_FIRST(&rkq->rkq_q); - while (rko) { - if (rko->rko_type != RD_KAFKA_OP_BARRIER && - rko->rko_type != RD_KAFKA_OP_FETCH) { - rd_kafka_log( - rkcg->rkcg_rk, LOG_WARNING, "PARTDEL", - "Purging toppar fetch queue buffer op" - "with unexpected type: %s", - rd_kafka_op2str(rko->rko_type)); - } - - if (rko->rko_type == RD_KAFKA_OP_BARRIER) - barrier_cnt++; - else if (rko->rko_type == RD_KAFKA_OP_FETCH) - message_cnt++; - else - other_cnt++; - - rko = TAILQ_NEXT(rko, rko_link); - cnt++; - } - - mtx_unlock(&rkq->rkq_lock); - - if (cnt) { - rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", - "Purge toppar fetch queue buffer " - "containing %d op(s) " - "(%d barrier(s), %d message(s), %d other)" - " to avoid " - "circular references", - cnt, barrier_cnt, message_cnt, other_cnt); - rd_kafka_q_purge(rktp->rktp_fetchq); - } else { - rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", - "Not purging toppar fetch queue buffer." - " No ops present in the buffer."); - } - } + rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp); rd_kafka_toppar_unlock(rktp); diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 451d06eb08..a84f8c4036 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -1009,7 +1009,71 @@ void rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t *rktp, rd_kafka_toppar_unlock(rktp); } +/** + * @brief Purge internal fetch queue if toppar is stopped + * (RD_KAFKA_TOPPAR_FETCH_STOPPED) and removed from the cluster + * (RD_KAFKA_TOPPAR_F_REMOVE). Will be called from different places as it's + * removed starting from a metadata response and stopped from a rebalance or a + * consumer close. + * + * @remark Avoids circular dependencies in from `rktp_fetchq` ops to the same + * toppar that stop destroying a consumer. + * + * @locks rd_kafka_toppar_lock() MUST be held + */ +void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp) { + rd_kafka_q_t *rkq; + rkq = rktp->rktp_fetchq; + mtx_lock(&rkq->rkq_lock); + if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE && + !rktp->rktp_fetchq->rkq_fwdq) { + rd_kafka_op_t *rko; + int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0; + + /* Partition is being removed from the cluster and it's stopped, + * so rktp->rktp_fetchq->rkq_fwdq is NULL. + * Purge remaining operations in rktp->rktp_fetchq->rkq_q, + * while holding lock, to avoid circular references */ + rko = TAILQ_FIRST(&rkq->rkq_q); + while (rko) { + if (rko->rko_type != RD_KAFKA_OP_BARRIER && + rko->rko_type != RD_KAFKA_OP_FETCH) { + rd_kafka_log( + rktp->rktp_rkt->rkt_rk, LOG_WARNING, + "PARTDEL", + "Purging toppar fetch queue buffer op" + "with unexpected type: %s", + rd_kafka_op2str(rko->rko_type)); + } + + if (rko->rko_type == RD_KAFKA_OP_BARRIER) + barrier_cnt++; + else if (rko->rko_type == RD_KAFKA_OP_FETCH) + message_cnt++; + else + other_cnt++; + + rko = TAILQ_NEXT(rko, rko_link); + cnt++; + } + if (cnt) { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL", + "Purge toppar fetch queue buffer " + "containing %d op(s) " + "(%d barrier(s), %d message(s), %d other)" + " to avoid " + "circular references", + cnt, barrier_cnt, message_cnt, other_cnt); + rd_kafka_q_purge0(rktp->rktp_fetchq, rd_false); + } else { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL", + "Not purging toppar fetch queue buffer." + " No ops present in the buffer."); + } + } + mtx_unlock(&rkq->rkq_lock); +} /** * Helper method for purging queues when removing a toppar. diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index b74daf8e2f..98ff431769 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -648,6 +648,8 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t *rktp, rd_kafka_fetch_pos_t query_pos, int backoff_ms); +void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp); + int rd_kafka_toppar_purge_queues(rd_kafka_toppar_t *rktp, int purge_flags, rd_bool_t include_xmit_msgq);