Skip to content

Sync producer

Julien Debon edited this page Jan 31, 2020 · 6 revisions

Due to librdkafka's async nature, it is not trivial to implement a synchronic produce interface in the library that can be used in parallel to the standard async interface.

The best solution at this time is to provide a sync interface in your application, which fortunately is a trivial thing to do.

Note: The code has data race, this is just a sample.

Sync produce wrapper

This function produces a message, provides a local stack variable as the message opaque, and waits for the stack variable to change value by calling rd_kafka_poll() that in turn will call the delivery report callback sync_produce_dr_cb().

rd_kafka_resp_err_t sync_produce (rd_kafka_topic_t *rkt, int32_t partition,
                                  void *payload, size_t len,
                                  const void *key, size_t keylen) {
        rd_kafka_resp_err_t err = -12345;

        if (rd_kafka_produce(rkt, partition, 0, payload, len,
                             key, keylen, &err) == -1)
                return rd_kafka_errno2err(errno);

        while (err == -12345)
                rd_kafka_poll(rk, 1000);

        return err;
}

Delivery report callback

Simply stores the result of the produce request in the provided errp pointer, this errp pointer is pointing to the err variable in msg_delivered().

static void msg_delivered (rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
	if (rkmessage->_private) {
		rd_kafka_resp_err_t *errp = (rd_kafka_resp_err_t *)rkmessage->_private;
		*errp = rkmessage->err;
	}
}

Initialization code

Application's initialization code for librdkafka must set up the delivery report callback.

void my_init_code (void) {
        char errstr[512];
        rd_kafka_conf_t *rk_conf = rd_kafka_conf_new();

        /* Set delivery report callback */
        rd_kafka_conf_set_dr_msg_cb(rk_conf, msg_delivered);
        /* Minimize wait-for-larger-batch delay (since there will be no batching) */
        rd_kafka_conf_set(rk_conf, "queue.buffering.max.ms", "1", errstr, sizeof(errstr));
        /* Minimize wait-for-socket delay (otherwise you will lose 100ms per message instead just the RTT) */
        rd_kafka_conf_set(rk_conf, "socket.blocking.max.ms", "1", errstr, sizeof(errstr));

        rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errstr, sizeof(errstr));
        if (!rk)
                ERROR(errstr);

        /* create topics, etc.. */
}