Skip to content

Commit

Permalink
Refactor to move Uuid functions
Browse files Browse the repository at this point in the history
to their own file
  • Loading branch information
emasab committed May 8, 2024
1 parent a4bf487 commit c2d9472
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 226 deletions.
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ set(
rdkafka_mock_cgrp.c
rdkafka_error.c
rdkafka_fetcher.c
rdkafka_uuid.c
rdlist.c
rdlog.c
rdmurmur2.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_txnmgr.c rdkafka_coord.c rdbase64.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
rdkafka_error.c rdkafka_fetcher.c \
rdkafka_error.c rdkafka_fetcher.c rdkafka_uuid.c \
$(SRCS_y)

HDRS= rdkafka.h rdkafka_mock.h
Expand Down
151 changes: 0 additions & 151 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -5121,154 +5121,3 @@ int rd_kafka_errno(void) {
int rd_kafka_unittest(void) {
return rd_unittest();
}


/**
* Creates a new UUID.
*
* @return A newly allocated UUID.
*/
rd_kafka_Uuid_t *rd_kafka_Uuid_new(int64_t most_significant_bits,
int64_t least_significant_bits) {
rd_kafka_Uuid_t *uuid = rd_calloc(1, sizeof(rd_kafka_Uuid_t));
uuid->most_significant_bits = most_significant_bits;
uuid->least_significant_bits = least_significant_bits;
return uuid;
}

/**
* Returns a newly allocated copy of the given UUID.
*
* @param uuid UUID to copy.
* @return Copy of the provided UUID.
*
* @remark Dynamically allocated. Deallocate (free) after use.
*/
rd_kafka_Uuid_t *rd_kafka_Uuid_copy(const rd_kafka_Uuid_t *uuid) {
rd_kafka_Uuid_t *copy_uuid = rd_kafka_Uuid_new(
uuid->most_significant_bits, uuid->least_significant_bits);
if (*uuid->base64str)
memcpy(copy_uuid->base64str, uuid->base64str, 23);
return copy_uuid;
}

/**
* Returns a new non cryptographically secure UUIDv4 (random).
*
* @return A UUIDv4.
*
* @remark Must be freed after use using rd_kafka_Uuid_destroy().
*/
rd_kafka_Uuid_t rd_kafka_Uuid_random() {
int i;
unsigned char rand_values_bytes[16] = {0};
uint64_t *rand_values_uint64 = (uint64_t *)rand_values_bytes;
unsigned char *rand_values_app;
rd_kafka_Uuid_t ret = RD_KAFKA_UUID_ZERO;
for (i = 0; i < 16; i += 2) {
uint16_t rand_uint16 = (uint16_t)rd_jitter(0, INT16_MAX - 1);
/* No need to convert endianess here because it's still only
* a random value. */
rand_values_app = (unsigned char *)&rand_uint16;
rand_values_bytes[i] |= rand_values_app[0];
rand_values_bytes[i + 1] |= rand_values_app[1];
}

rand_values_bytes[6] &= 0x0f; /* clear version */
rand_values_bytes[6] |= 0x40; /* version 4 */
rand_values_bytes[8] &= 0x3f; /* clear variant */
rand_values_bytes[8] |= 0x80; /* IETF variant */

ret.most_significant_bits = be64toh(rand_values_uint64[0]);
ret.least_significant_bits = be64toh(rand_values_uint64[1]);
return ret;
}

/**
* @brief Destroy the provided uuid.
*
* @param uuid UUID
*/
void rd_kafka_Uuid_destroy(rd_kafka_Uuid_t *uuid) {
rd_free(uuid);
}

/**
* @brief Computes canonical encoding for the given uuid string.
* Mainly useful for testing.
*
* @param uuid UUID for which canonical encoding is required.
*
* @return canonical encoded string for the given UUID.
*
* @remark Must be freed after use.
*/
const char *rd_kafka_Uuid_str(const rd_kafka_Uuid_t *uuid) {
int i, j;
unsigned char bytes[16];
char *ret = rd_calloc(37, sizeof(*ret));

for (i = 0; i < 8; i++) {
#if __BYTE_ORDER == __LITTLE_ENDIAN
j = 7 - i;
#elif __BYTE_ORDER == __BIG_ENDIAN
j = i;
#endif
bytes[i] = (uuid->most_significant_bits >> (8 * j)) & 0xFF;
bytes[8 + i] = (uuid->least_significant_bits >> (8 * j)) & 0xFF;
}

rd_snprintf(ret, 37,
"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%"
"02x%02x%02x",
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5],
bytes[6], bytes[7], bytes[8], bytes[9], bytes[10],
bytes[11], bytes[12], bytes[13], bytes[14], bytes[15]);
return ret;
}

const char *rd_kafka_Uuid_base64str(const rd_kafka_Uuid_t *uuid) {
if (*uuid->base64str)
return uuid->base64str;

rd_chariov_t in_base64;
char *out_base64_str;
char *uuid_bytes;
uint64_t input_uuid[2];

input_uuid[0] = htobe64(uuid->most_significant_bits);
input_uuid[1] = htobe64(uuid->least_significant_bits);
uuid_bytes = (char *)input_uuid;
in_base64.ptr = uuid_bytes;
in_base64.size = sizeof(uuid->most_significant_bits) +
sizeof(uuid->least_significant_bits);

out_base64_str = rd_base64_encode_str(&in_base64);
if (!out_base64_str)
return NULL;

rd_strlcpy((char *)uuid->base64str, out_base64_str,
23 /* Removing extra ('=') padding */);
rd_free(out_base64_str);
return uuid->base64str;
}

unsigned int rd_kafka_Uuid_hash(const rd_kafka_Uuid_t *uuid) {
unsigned char bytes[16];
memcpy(bytes, &uuid->most_significant_bits, 8);
memcpy(&bytes[8], &uuid->least_significant_bits, 8);
return rd_bytes_hash(bytes, 16);
}

unsigned int rd_kafka_Uuid_map_hash(const void *key) {
return rd_kafka_Uuid_hash(key);
}

int64_t rd_kafka_Uuid_least_significant_bits(const rd_kafka_Uuid_t *uuid) {
return uuid->least_significant_bits;
}


int64_t rd_kafka_Uuid_most_significant_bits(const rd_kafka_Uuid_t *uuid) {
return uuid->most_significant_bits;
}
2 changes: 1 addition & 1 deletion src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2774,7 +2774,7 @@ rd_kafka_cgrp_consumer_assignment_with_metadata(
}
if (missing_topic_ids && *missing_topic_ids)
rd_list_deduplicate(missing_topic_ids,
(void *)rd_kafka_Uuid_ptr_cmp);
(void *)rd_list_Uuid_cmp);
return assignment_with_metadata;
}

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ typedef struct rd_kafka_fetch_pos_s {



#include "rdkafka_uuid.h"
#include "rdkafka_op.h"
#include "rdkafka_queue.h"
#include "rdkafka_msg.h"
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ rd_kafka_parse_Metadata0(rd_kafka_broker_t *rkb,
missing_topic_ids,
rd_list_remove_cmp(missing_topic_ids,
&mdi->topics[i].topic_id,
(void *)rd_kafka_Uuid_ptr_cmp));
(void *)rd_list_Uuid_cmp));
/* Only update cache when not asking
* for all topics or cache entry
* already exists. */
Expand Down
72 changes: 0 additions & 72 deletions src/rdkafka_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -569,78 +569,6 @@ typedef struct rd_kafka_buf_s rd_kafka_buf_t;
#define RD_KAFKAP_MSGSET_V2_OF_RecordCount \
(8 + 4 + 4 + 1 + 4 + 2 + 4 + 8 + 8 + 8 + 2 + 4)


/**
* @struct Struct representing UUID protocol primitive type.
*/
typedef struct rd_kafka_Uuid_s {
int64_t
most_significant_bits; /**< Most significant 64 bits for the UUID */
int64_t least_significant_bits; /**< Least significant 64 bits for the
UUID */
char base64str[23]; /**< base64 encoding for the uuid. By default, it is
lazy loaded. Use function
`rd_kafka_Uuid_base64str()` as a getter for this
field. */
} rd_kafka_Uuid_t;

#define RD_KAFKA_UUID_ZERO \
(rd_kafka_Uuid_t) { \
0, 0, "" \
}

#define RD_KAFKA_UUID_IS_ZERO(uuid) \
(!rd_kafka_Uuid_cmp(uuid, RD_KAFKA_UUID_ZERO))

#define RD_KAFKA_UUID_METADATA_TOPIC_ID \
(rd_kafka_Uuid_t) { \
0, 1, "" \
}

static RD_INLINE RD_UNUSED int rd_kafka_Uuid_cmp(rd_kafka_Uuid_t a,
rd_kafka_Uuid_t b) {
if (a.most_significant_bits < b.most_significant_bits)
return -1;
if (a.most_significant_bits > b.most_significant_bits)
return 1;
if (a.least_significant_bits < b.least_significant_bits)
return -1;
if (a.least_significant_bits > b.least_significant_bits)
return 1;
return 0;
}

static RD_INLINE RD_UNUSED int rd_kafka_Uuid_ptr_cmp(void *a, void *b) {
rd_kafka_Uuid_t *a_uuid = a, *b_uuid = b;
return rd_kafka_Uuid_cmp(*a_uuid, *b_uuid);
}

rd_kafka_Uuid_t rd_kafka_Uuid_random();

const char *rd_kafka_Uuid_str(const rd_kafka_Uuid_t *uuid);

unsigned int rd_kafka_Uuid_hash(const rd_kafka_Uuid_t *uuid);

unsigned int rd_kafka_Uuid_map_hash(const void *key);

/**
* @brief UUID copier for rd_list_copy()
*/
static RD_UNUSED void *rd_list_Uuid_copy(const void *elem, void *opaque) {
return (void *)rd_kafka_Uuid_copy((rd_kafka_Uuid_t *)elem);
}

static RD_INLINE RD_UNUSED void rd_list_Uuid_destroy(void *uuid) {
rd_kafka_Uuid_destroy((rd_kafka_Uuid_t *)uuid);
}

static RD_INLINE RD_UNUSED int rd_list_Uuid_cmp(const void *uuid1,
const void *uuid2) {
return rd_kafka_Uuid_cmp(*((rd_kafka_Uuid_t *)uuid1),
*((rd_kafka_Uuid_t *)uuid2));
}


/**
* @name Producer ID and Epoch for the Idempotent Producer
* @{
Expand Down
Loading

0 comments on commit c2d9472

Please sign in to comment.