Skip to content

Improvements to target (i) memcached servers that support the binary protocol over UDP, and (ii) resource-contrained memcached servers (supporting only short keys); and to evaluate the latency of each transaction. #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
21 changes: 18 additions & 3 deletions client.cpp
Original file line number Diff line number Diff line change
@@ -275,7 +275,7 @@ int client::connect(void)
evbuffer_drain(m_write_buf, evbuffer_get_length(m_write_buf));

if (m_unix_sockaddr != NULL) {
m_sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
m_sockfd = socket(AF_UNIX, m_config->use_udp ? SOCK_DGRAM : SOCK_STREAM, 0);
if (m_sockfd < 0) {
return -errno;
}
@@ -300,8 +300,10 @@ int client::connect(void)
error = setsockopt(m_sockfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
assert(error == 0);

error = setsockopt(m_sockfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
assert(error == 0);
if (!m_config->use_udp) {
error = setsockopt(m_sockfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
assert(error == 0);
}
}

// set non-blcoking behavior
@@ -555,6 +557,7 @@ void client::create_request(void)
assert(key != NULL);
assert(keylen > 0);

keylen = 6; // FIXME const
benchmark_debug_log("GET key=[%.*s]\n", keylen, key);
cmd_size = m_protocol->write_command_get(key, keylen, m_config->data_offset);

@@ -618,13 +621,25 @@ void client::handle_response(request *request, protocol_response *response)
{
switch (request->m_type) {
case rt_get:
if (m_config->transaction_latency) {
// NOTE using printf adds latency to the client because of the system call, but we're measuring transaction latency, not throughput.
// FIXME might be preferable to print to some user-specified file, rather than to stdout
printf("GET %lu\n", ts_diff_now(request->m_sent_time));
}

m_stats.update_get_op(NULL,
request->m_size + response->get_total_len(),
ts_diff_now(request->m_sent_time),
response->get_hits(),
request->m_keys - response->get_hits());
break;
case rt_set:
if (m_config->transaction_latency) {
// NOTE using printf adds latency to the client because of the system call, but we're measuring transaction latency, not throughput.
// FIXME might be preferable to print to some user-specified file, rather than to stdout
printf("SET %lu\n", ts_diff_now(request->m_sent_time));
}

m_stats.update_set_op(NULL,
request->m_size + response->get_total_len(),
ts_diff_now(request->m_sent_time));
6 changes: 3 additions & 3 deletions config_types.cpp
Original file line number Diff line number Diff line change
@@ -187,8 +187,8 @@ const char* config_weight_list::print(char *buf, int buf_len)
}


server_addr::server_addr(const char *hostname, int port) :
m_hostname(hostname), m_port(port), m_server_addr(NULL), m_used_addr(NULL), m_last_error(0)
server_addr::server_addr(const char *hostname, int port, transport_protocol protocol) :
m_hostname(hostname), m_port(port), m_protocol(protocol), m_server_addr(NULL), m_used_addr(NULL), m_last_error(0)
{
int error = resolve();

@@ -215,7 +215,7 @@ int server_addr::resolve(void)

memset(&hints, 0, sizeof(hints));
hints.ai_flags = AI_PASSIVE;
hints.ai_socktype = SOCK_STREAM;
hints.ai_socktype = m_protocol;
hints.ai_family = AF_INET; // Don't play with IPv6 for now...

snprintf(port_str, sizeof(port_str)-1, "%u", m_port);
4 changes: 3 additions & 1 deletion config_types.h
Original file line number Diff line number Diff line change
@@ -77,7 +77,8 @@ struct connect_info {
};

struct server_addr {
server_addr(const char *hostname, int port);
enum transport_protocol {TCP=SOCK_STREAM, UDP=SOCK_DGRAM};
server_addr(const char *hostname, int port, transport_protocol proto);
virtual ~server_addr();

int get_connect_info(struct connect_info *ci);
@@ -88,6 +89,7 @@ struct server_addr {

std::string m_hostname;
int m_port;
int m_protocol;
struct addrinfo *m_server_addr;
struct addrinfo *m_used_addr;
int m_last_error;
16 changes: 16 additions & 0 deletions libmemcached_protocol/binary.h
Original file line number Diff line number Diff line change
@@ -179,6 +179,22 @@ extern "C"
PROTOCOL_BINARY_RAW_BYTES = 0x00
} protocol_binary_datatypes;

/**
* Definition of the extra header that is used when using the
* memcached binary protocol over UDP. This header lies between
* the UDP header and the memcached datagram.
* See https://github.com/memcached/memcached/blob/master/doc/protocol.txt#L922
*/
typedef union {
struct {
uint16_t request_id;
uint16_t sequence_no;
uint16_t total_datagrams;
uint16_t reserved; // Must be set to 0
} header;
uint8_t bytes[8];
} protocol_binary_udp_header;

/**
* Definition of the header structure for a request packet.
* See section 2
47 changes: 40 additions & 7 deletions memtier_benchmark.cpp
Original file line number Diff line number Diff line change
@@ -119,7 +119,10 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
"multi_key_get = %u\n"
"authenticate = %s\n"
"select-db = %d\n"
"no-expiry = %s\n",
"no-expiry = %s\n"
"transaction_latency = %s\n"
"key-width = %u\n"
"udp = %s\n",
cfg->server,
cfg->port,
cfg->unix_socket,
@@ -155,7 +158,10 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
cfg->multi_key_get,
cfg->authenticate ? cfg->authenticate : "",
cfg->select_db,
cfg->no_expiry ? "yes" : "no");
cfg->no_expiry ? "yes" : "no",
cfg->transaction_latency ? "yes" : "no",
cfg->key_width,
cfg->use_udp ? "yes" : "no");
}

static void config_init_defaults(struct benchmark_config *cfg)
@@ -196,6 +202,8 @@ static void config_init_defaults(struct benchmark_config *cfg)
}
if (!cfg->requests && !cfg->test_time)
cfg->requests = 10000;
if (!cfg->key_width)
cfg->key_width = OBJECT_GENERATOR_KEY_WIDTH;
}

static int generate_random_seed()
@@ -241,7 +249,10 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
o_generate_keys,
o_multi_key_get,
o_select_db,
o_no_expiry
o_no_expiry,
o_transaction_latency,
o_key_width,
o_use_udp,
};

static struct option long_options[] = {
@@ -287,6 +298,9 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
{ "no-expiry", 0, 0, o_no_expiry },
{ "help", 0, 0, 'h' },
{ "version", 0, 0, 'v' },
{ "transaction_latency", 0, 0, o_transaction_latency },
{ "key-width", 1, 0, o_key_width },
{ "udp", 0, 0, o_use_udp },
{ NULL, 0, 0, 0 }
};

@@ -553,6 +567,22 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
case o_no_expiry:
cfg->no_expiry = true;
break;
case o_transaction_latency:
cfg->transaction_latency = true;
break;
case o_key_width:
endptr = NULL;
cfg->key_width = (unsigned short) strtoul(optarg, &endptr, 10);
if (!cfg->key_width || cfg->key_width > OBJECT_GENERATOR_KEY_WIDTH || !endptr || *endptr != '\0') {
fprintf(stderr, "error: key-width must be a number in the range [1-%u].\n", OBJECT_GENERATOR_KEY_WIDTH);
return -1;
}
break;
case o_use_udp:
//FIXME check that key and value size can fit in a datagram
fprintf(stderr, "Warning: generating traffic over UDP is still experimental. Check that the size of requests can fit in a UDP datagram.");
cfg->use_udp = true;
break;
default:
return -1;
break;
@@ -570,6 +600,7 @@ void usage() {
" -s, --server=ADDR Server address (default: localhost)\n"
" -p, --port=PORT Server port (default: 6379)\n"
" -S, --unix-socket=SOCKET UNIX Domain socket name (default: none)\n"
" --udp Connect using UDP rather than TCP (default: false)\n"
" -P, --protocol=PROTOCOL Protocol to use (default: redis). Other\n"
" supported protocols are memcache_text,\n"
" memcache_binary.\n"
@@ -597,6 +628,7 @@ void usage() {
" --select-db=DB DB number to select, when testing a redis server\n"
" --distinct-client-seed Use a different random seed for each client\n"
" --randomize random seed based on timestamp (default is constant value)\n"
" --transaction_latency Measure and report the latency of each transaction\n"
"\n"
"Object Options:\n"
" -d --data-size=SIZE Object data size (default: 32)\n"
@@ -619,6 +651,7 @@ void usage() {
" --no-expiry Ignore expiry information in imported data\n"
"\n"
"Key Options:\n"
" --key-width=NUMBER Maximum key size (default: \"250\")\n"
" --key-prefix=PREFIX Prefix for keys (default: \"memtier-\")\n"
" --key-minimum=NUMBER Key ID minimum value (default: 0)\n"
" --key-maximum=NUMBER Key ID maximum value (default: 10000000)\n"
@@ -651,7 +684,7 @@ struct cg_thread {
cg_thread(unsigned int id, benchmark_config* config, object_generator* obj_gen) :
m_thread_id(id), m_config(config), m_obj_gen(obj_gen), m_cg(NULL), m_protocol(NULL), m_finished(false)
{
m_protocol = protocol_factory(m_config->protocol);
m_protocol = protocol_factory(m_config->protocol, m_config->use_udp);
assert(m_protocol != NULL);

m_cg = new client_group(m_config, m_protocol, m_obj_gen);
@@ -868,7 +901,7 @@ int main(int argc, char *argv[])

if (cfg.server != NULL && cfg.port > 0) {
try {
cfg.server_addr = new server_addr(cfg.server, cfg.port);
cfg.server_addr = new server_addr(cfg.server, cfg.port, cfg.use_udp ? server_addr::UDP : server_addr::TCP);
} catch (std::runtime_error& e) {
benchmark_error_log("%s:%u: error: %s\n",
cfg.server, cfg.port, e.what());
@@ -903,7 +936,7 @@ int main(int argc, char *argv[])
exit(1);
}

obj_gen = new object_generator();
obj_gen = new object_generator(cfg.key_width);
assert(obj_gen != NULL);
} else {
// check paramters
@@ -1094,7 +1127,7 @@ int main(int argc, char *argv[])
// If needed, data verification is done now...
if (cfg.data_verify) {
struct event_base *verify_event_base = event_base_new();
abstract_protocol *verify_protocol = protocol_factory(cfg.protocol);
abstract_protocol *verify_protocol = protocol_factory(cfg.protocol, cfg.use_udp);
verify_client *client = new verify_client(verify_event_base, &cfg, verify_protocol, obj_gen);

fprintf(outfile, "\n\nPerforming data verification...\n");
3 changes: 3 additions & 0 deletions memtier_benchmark.h
Original file line number Diff line number Diff line change
@@ -75,6 +75,9 @@ struct benchmark_config {
int select_db;
bool no_expiry;
bool resolve_on_connect;
bool transaction_latency;
unsigned short key_width;
bool use_udp;
};


45 changes: 42 additions & 3 deletions obj_gen.cpp
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
#include <unistd.h>
#include <fcntl.h>
#include <math.h>
#include <stdexcept>

#ifdef HAVE_ASSERT_H
#include <assert.h>
@@ -117,6 +118,7 @@ int gaussian_noise::gaussian_distribution_range(double stddev, double median, in
}

object_generator::object_generator() :
m_key_width(OBJECT_GENERATOR_KEY_WIDTH),
m_data_size_type(data_size_unknown),
m_data_size_pattern(NULL),
m_random_data(false),
@@ -136,7 +138,31 @@ object_generator::object_generator() :
m_data_size.size_list = NULL;
}

object_generator::object_generator(unsigned int key_width) :
m_key_width(key_width),
m_data_size_type(data_size_unknown),
m_data_size_pattern(NULL),
m_random_data(false),
m_expiry_min(0),
m_expiry_max(0),
m_key_prefix(NULL),
m_key_min(0),
m_key_max(0),
m_key_stddev(0),
m_key_median(0),
m_value_buffer(NULL),
m_random_fd(-1)
{
assert(m_key_width <= OBJECT_GENERATOR_KEY_WIDTH);

for (int i = 0; i < OBJECT_GENERATOR_KEY_ITERATORS; i++)
m_next_key[i] = 0;

m_data_size.size_list = NULL;
}

object_generator::object_generator(const object_generator& copy) :
m_key_width(copy.m_key_width),
m_data_size_type(copy.m_data_size_type),
m_data_size(copy.m_data_size),
m_data_size_pattern(copy.m_data_size_pattern),
@@ -150,7 +176,6 @@ object_generator::object_generator(const object_generator& copy) :
m_key_median(copy.m_key_median),
m_value_buffer(NULL),
m_random_fd(-1)

{
if (m_data_size_type == data_size_weighted &&
m_data_size.size_list != NULL) {
@@ -304,15 +329,29 @@ void object_generator::set_expiry_range(unsigned int expiry_min, unsigned int ex
m_expiry_max = expiry_max;
}

void object_generator::check_key_size()
{
unsigned int width_of_key_prefix = m_key_prefix == NULL ? 0 : strlen(m_key_prefix);
unsigned int width_of_key_max = (unsigned)log10((double)m_key_max) + 1;
if (width_of_key_prefix + width_of_key_max > m_key_width) {
char str [200];
sprintf(str, "Key prefix '%s' (length %u) exceeds maximum key width (%u) when combined with the maximum key index (%u, length %u)", m_key_prefix, width_of_key_prefix, m_key_width, m_key_max, width_of_key_max);
throw std::logic_error(str);
}
}

void object_generator::set_key_prefix(const char *key_prefix)
{
m_key_prefix = key_prefix;
check_key_size();
}

void object_generator::set_key_range(unsigned int key_min, unsigned int key_max)
{
assert (key_min <= key_max);
m_key_min = key_min;
m_key_max = key_max;
check_key_size();
}

void object_generator::set_key_distribution(double key_stddev, double key_median)
@@ -360,9 +399,9 @@ const char* object_generator::get_key(int iter, unsigned int *len)
{
unsigned int l;
m_key_index = get_key_index(iter);

// format key
l = snprintf(m_key_buffer, sizeof(m_key_buffer)-1,
l = snprintf(m_key_buffer, m_key_width - 1,
"%s%u", m_key_prefix, m_key_index);
if (len != NULL) *len = l;

6 changes: 5 additions & 1 deletion obj_gen.h
Original file line number Diff line number Diff line change
@@ -75,10 +75,12 @@ class data_object {
#define OBJECT_GENERATOR_KEY_GET_ITER 0
#define OBJECT_GENERATOR_KEY_RANDOM -1
#define OBJECT_GENERATOR_KEY_GAUSSIAN -2
#define OBJECT_GENERATOR_KEY_WIDTH 250

class object_generator {
public:
enum data_size_type { data_size_unknown, data_size_fixed, data_size_range, data_size_weighted };
const unsigned int m_key_width;
protected:
data_size_type m_data_size_type;
union {
@@ -103,7 +105,7 @@ class object_generator {
unsigned int m_next_key[OBJECT_GENERATOR_KEY_ITERATORS];

unsigned int m_key_index;
char m_key_buffer[250];
char m_key_buffer[OBJECT_GENERATOR_KEY_WIDTH];
char *m_value_buffer;
int m_random_fd;
gaussian_noise m_random;
@@ -116,6 +118,7 @@ class object_generator {
unsigned int get_key_index(int iter);
public:
object_generator();
object_generator(unsigned int key_width);
object_generator(const object_generator& copy);
virtual ~object_generator();
virtual object_generator* clone(void);
@@ -130,6 +133,7 @@ class object_generator {
void set_key_range(unsigned int key_min, unsigned int key_max);
void set_key_distribution(double key_stddev, double key_median);
void set_random_seed(int seed);
void check_key_size();

virtual const char* get_key(int iter, unsigned int *len);
virtual data_object* get_object(int iter);
44 changes: 37 additions & 7 deletions protocol.cpp
Original file line number Diff line number Diff line change
@@ -410,7 +410,7 @@ int memcache_text_protocol::write_command_set(const char *key, int key_len, cons
assert(value != NULL);
assert(value_len > 0);
int size = 0;

size = evbuffer_add_printf(m_write_buf,
"set %.*s 0 %u %u\r\n", key_len, key, expiry, value_len);
evbuffer_add(m_write_buf, value, value_len);
@@ -565,11 +565,12 @@ class memcache_binary_protocol : public abstract_protocol {
response_state m_response_state;
protocol_binary_response_no_extras m_response_hdr;
size_t m_response_len;
bool m_over_udp;

const char* status_text(void);
public:
memcache_binary_protocol() : m_response_state(rs_initial), m_response_len(0) { }
virtual memcache_binary_protocol* clone(void) { return new memcache_binary_protocol(); }
memcache_binary_protocol(bool over_udp) : m_response_state(rs_initial), m_response_len(0), m_over_udp(over_udp) { }
virtual memcache_binary_protocol* clone(void) { return new memcache_binary_protocol(m_over_udp); }
virtual int select_db(int db);
virtual int authenticate(const char *credentials);
virtual int write_command_set(const char *key, int key_len, const char *value, int value_len, int expiry, unsigned int offset);
@@ -620,6 +621,18 @@ int memcache_binary_protocol::authenticate(const char *credentials)
return sizeof(req) + user_len + passwd_len + 2 + sizeof(mechanism) - 1;
}

// FIXME this currently produces a constant value.
int append_binary_udp_header(struct evbuffer* write_buf) {
protocol_binary_udp_header binary_udp_head;
memset(&binary_udp_head, 0, sizeof(binary_udp_head));
binary_udp_head.header.request_id = 0;
binary_udp_head.header.sequence_no = 0;
binary_udp_head.header.total_datagrams = 0x0100; //16-bit "0x01" in big endian.
binary_udp_head.header.reserved = 0;
evbuffer_add(write_buf, &binary_udp_head, sizeof(binary_udp_head));
return sizeof(binary_udp_head);
}

int memcache_binary_protocol::write_command_set(const char *key, int key_len, const char *value, int value_len, int expiry, unsigned int offset)
{
assert(key != NULL);
@@ -638,11 +651,16 @@ int memcache_binary_protocol::write_command_set(const char *key, int key_len, co
req.message.header.request.extlen = sizeof(req.message.body);
req.message.body.expiration = htonl(expiry);

int binary_udp_head_size = 0;
if (m_over_udp) {
binary_udp_head_size = append_binary_udp_header(m_write_buf);
}

evbuffer_add(m_write_buf, &req, sizeof(req));
evbuffer_add(m_write_buf, key, key_len);
evbuffer_add(m_write_buf, value, value_len);

return sizeof(req) + key_len + value_len;
return binary_udp_head_size + sizeof(req) + key_len + value_len;
}

int memcache_binary_protocol::write_command_get(const char *key, int key_len, unsigned int offset)
@@ -660,10 +678,15 @@ int memcache_binary_protocol::write_command_get(const char *key, int key_len, un
req.message.header.request.bodylen = htonl(key_len);
req.message.header.request.extlen = 0;

int binary_udp_head_size = 0;
if (m_over_udp) {
binary_udp_head_size = append_binary_udp_header(m_write_buf);
}

evbuffer_add(m_write_buf, &req, sizeof(req));
evbuffer_add(m_write_buf, key, key_len);

return sizeof(req) + key_len;
return binary_udp_head_size + sizeof(req) + key_len;
}

int memcache_binary_protocol::write_command_multi_get(const keylist *keylist)
@@ -722,6 +745,13 @@ int memcache_binary_protocol::parse_response(void)
if (evbuffer_get_length(m_read_buf) < sizeof(m_response_hdr))
return 0; // no header yet?

if (m_over_udp) {
protocol_binary_udp_header header;
// FIXME we currently do not check the returned header.
ret = evbuffer_remove(m_read_buf, (void *)&header, sizeof(header));
assert(ret == sizeof(header));
}

ret = evbuffer_remove(m_read_buf, (void *)&m_response_hdr, sizeof(m_response_hdr));
assert(ret == sizeof(m_response_hdr));

@@ -800,7 +830,7 @@ int memcache_binary_protocol::parse_response(void)

/////////////////////////////////////////////////////////////////////////

class abstract_protocol *protocol_factory(const char *proto_name)
class abstract_protocol *protocol_factory(const char *proto_name, bool over_udp)
{
assert(proto_name != NULL);

@@ -809,7 +839,7 @@ class abstract_protocol *protocol_factory(const char *proto_name)
} else if (strcmp(proto_name, "memcache_text") == 0) {
return new memcache_text_protocol();
} else if (strcmp(proto_name, "memcache_binary") == 0) {
return new memcache_binary_protocol();
return new memcache_binary_protocol(over_udp);
} else {
benchmark_error_log("Error: unknown protocol '%s'.\n", proto_name);
return NULL;
2 changes: 1 addition & 1 deletion protocol.h
Original file line number Diff line number Diff line change
@@ -102,6 +102,6 @@ class abstract_protocol {
struct protocol_response* get_response(void) { return &m_last_response; }
};

class abstract_protocol *protocol_factory(const char *proto_name);
class abstract_protocol *protocol_factory(const char *proto_name, bool over_udp);

#endif /* _PROTOCOL_H */