Skip to content

Commit

Permalink
PR feedback 10th round
Browse files Browse the repository at this point in the history
  • Loading branch information
anchitj committed Jul 8, 2024
1 parent bbd316b commit 9616eba
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 12 deletions.
2 changes: 2 additions & 0 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -2107,6 +2107,8 @@ release of librdkafka.
| 50 | DescribeUserScramCredentials | 0 | 0 |
| 51 | AlterUserScramCredentials | 0 | 0 |
| 68 | ConsumerGroupHeartbeat | 0 | 0 |
| 71 | GetTelemetrySubscriptions | 0 | 0 |
| 72 | PushTelemetry | 0 | 0 |

# Recommendations for language binding developers

Expand Down
14 changes: 10 additions & 4 deletions examples/producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>


/* Typical include path would be <librdkafka/rdkafka.h>, but this program
Expand Down Expand Up @@ -84,9 +85,10 @@ int main(int argc, char **argv) {
rd_kafka_t *rk; /* Producer instance handle */
rd_kafka_conf_t *conf; /* Temporary configuration object */
char errstr[512]; /* librdkafka API error reporting buffer */
char buf[512]; /* Message value temporary buffer */
const char *brokers; /* Argument: broker list */
const char *topic; /* Argument: topic to produce to */
char buf[512] =
"sadkjnasdfbjkjknsad"; /* Message value temporary buffer */
const char *brokers; /* Argument: broker list */
const char *topic; /* Argument: topic to produce to */

/*
* Argument validation
Expand Down Expand Up @@ -115,6 +117,9 @@ int main(int argc, char **argv) {
return 1;
}

rd_kafka_conf_set(conf, "debug", "all", NULL, 0);
rd_kafka_conf_set(conf, "enable.metrics.push", "false", NULL, 0);

/* Set the delivery report callback.
* This callback will be called once per message to inform
* the application if delivery succeeded or failed.
Expand Down Expand Up @@ -145,7 +150,7 @@ int main(int argc, char **argv) {
"%% Or just hit enter to only serve delivery reports\n"
"%% Press Ctrl-C or Ctrl-D to exit\n");

while (run && fgets(buf, sizeof(buf), stdin)) {
while (run) {
size_t len = strlen(buf);
rd_kafka_resp_err_t err;

Expand Down Expand Up @@ -229,6 +234,7 @@ int main(int argc, char **argv) {
* delivery report callback served (and any other callbacks
* you register). */
rd_kafka_poll(rk, 0 /*non-blocking*/);
usleep(500000);
}


Expand Down
10 changes: 5 additions & 5 deletions src/rdkafka_telemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ static rd_kafka_broker_t *rd_kafka_get_preferred_broker(rd_kafka_t *rk) {

rd_kafka_dbg(rk, TELEMETRY, "SETBROKER",
"Lost preferred broker, switching to new "
"preferred broker %d\n",
"preferred broker %" PRId32 "\n",
rkb ? rd_kafka_broker_id(rkb) : -1);
}
mtx_unlock(&rk->rk_telemetry.lock);
Expand Down Expand Up @@ -211,7 +211,7 @@ static void rd_kafka_match_requested_metrics(rd_kafka_t *rk) {
}

rd_kafka_dbg(rk, TELEMETRY, "GETSUBSCRIPTIONS",
"Matched metrics: %" PRIdsz,
"Matched metrics: %" PRIusz,
rk->rk_telemetry.matched_metrics_cnt);
}

Expand Down Expand Up @@ -352,16 +352,16 @@ static void rd_kafka_send_push_telemetry(rd_kafka_t *rk,
if (compressed_metrics_payload_size >
(size_t)rk->rk_telemetry.telemetry_max_bytes) {
rd_kafka_log(rk, LOG_WARNING, "TELEMETRY",
"Metrics payload size %" PRIdsz
"Metrics payload size %" PRIusz
" exceeds telemetry_max_bytes %" PRId32
"specified by the broker.",
compressed_metrics_payload_size,
rk->rk_telemetry.telemetry_max_bytes);
}

rd_kafka_dbg(rk, TELEMETRY, "PUSH",
"Sending PushTelemetryRequest with terminating = %d",
terminating);
"Sending PushTelemetryRequest with terminating = %s",
RD_STR_ToF(terminating));
rd_kafka_PushTelemetryRequest(
rkb, &rk->rk_telemetry.client_instance_id,
rk->rk_telemetry.subscription_id, terminating, compression_used,
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_telemetry_decode.c
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ bool unit_test_telemetry(rd_kafka_telemetry_producer_metric_name_t metric_name,
rd_buf_t *rbuf = rd_kafka_telemetry_encode_metrics(rk);
void *metrics_payload = rbuf->rbuf_wpos->seg_p;
size_t metrics_payload_size = rbuf->rbuf_wpos->seg_of;
RD_UT_SAY("metrics_payload_size: %zu", metrics_payload_size);
RD_UT_SAY("metrics_payload_size: %" PRIusz, metrics_payload_size);

RD_UT_ASSERT(metrics_payload_size != 0, "Metrics payload zero");

Expand Down
4 changes: 2 additions & 2 deletions src/rdkafka_telemetry_encode.c
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ rd_buf_t *rd_kafka_telemetry_encode_metrics(rd_kafka_t *rk) {
total_metrics_count);
metric_names = rd_malloc(sizeof(char *) * total_metrics_count);
rd_kafka_dbg(rk, TELEMETRY, "PUSH",
"Total metrics to be encoded count: %ld",
"Total metrics to be encoded count: %" PRIusz,
total_metrics_count);


Expand Down Expand Up @@ -806,7 +806,7 @@ rd_buf_t *rd_kafka_telemetry_encode_metrics(rd_kafka_t *rk) {
goto fail;
}
rd_kafka_dbg(rk, TELEMETRY, "PUSH",
"Push Telemetry metrics encoded, size: %ld",
"Push Telemetry metrics encoded, size: %" PRIusz,
stream.bytes_written);
rd_buf_write(rbuf, NULL, stream.bytes_written);

Expand Down

0 comments on commit 9616eba

Please sign in to comment.