diff --git a/CHANGELOG.md b/CHANGELOG.md index 2de9f83148..0ee0810507 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,10 @@ librdkafka v2.5.0 is a feature release. -* Fix segfault when using long client id because of erased segment when using flexver. (#4689) + * Update bundled lz4 (#4726) + * Fix segfault when using long client id because of erased segment when using flexver. (#4689) + * Fix for a loop of ListOffset requests, happening in a Fetch From Follower + scenario, if such request is made to the follower (#4616, @kphelps). ## Enhancements @@ -14,6 +17,15 @@ librdkafka v2.5.0 is a feature release. ## Fixes +### Consumer fixes + + * Issues: #4616 + When an out of range on a follower caused an offset reset, the corresponding + ListOffsets request is made to the follower, causing a repeated + "Not leader for partition" error. Fixed by sending the request always + to the leader. + Happening since 1.5.0 (tested version) or previous ones (#4616, @kphelps). + ### General fixes * Issues: [confluentinc/confluent-kafka-dotnet#2084](https://github.com/confluentinc/confluent-kafka-dotnet/issues/2084) diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 49e6f76e6f..ac9d2a5f39 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -1351,7 +1351,7 @@ static void rd_kafka_toppar_handle_Offset(rd_kafka_t *rk, rd_kafka_toppar_lock(rktp); /* Drop reply from previous partition leader */ - if (err != RD_KAFKA_RESP_ERR__DESTROY && rktp->rktp_broker != rkb) + if (err != RD_KAFKA_RESP_ERR__DESTROY && rktp->rktp_leader != rkb) err = RD_KAFKA_RESP_ERR__OUTDATED; rd_kafka_toppar_unlock(rktp); @@ -1541,7 +1541,7 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t *rktp, rd_kafka_assert(NULL, thrd_is_current(rktp->rktp_rkt->rkt_rk->rk_thread)); - rkb = rktp->rktp_broker; + rkb = rktp->rktp_leader; if (!backoff_ms && (!rkb || rkb->rkb_source == RD_KAFKA_INTERNAL)) backoff_ms = 500; diff --git a/tests/0104-fetch_from_follower_mock.c b/tests/0104-fetch_from_follower_mock.c index 972ff9c518..6586124b06 100644 --- a/tests/0104-fetch_from_follower_mock.c +++ b/tests/0104-fetch_from_follower_mock.c @@ -29,7 +29,6 @@ #include "test.h" - /** * @name Fetch from follower tests using the mock broker. */ @@ -111,6 +110,11 @@ static void do_test_offset_reset(const char *auto_offset_reset) { else test_consumer_poll(auto_offset_reset, c, 0, 1, 0, msgcnt, NULL); + /* send another batch of messages to ensure the consumer isn't stuck */ + test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 1000, + "bootstrap.servers", bootstraps, NULL); + test_consumer_poll("ASSIGN", c, 0, 1, 0, msgcnt, NULL); + test_consumer_close(c); rd_kafka_destroy(c);