Skip to content

Commit 8801f9b

Browse files
Connection Error Tracking & Auto-Reconnect
1 parent 09052d3 commit 8801f9b

16 files changed

+969
-17
lines changed

.gitignore

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,18 @@ tests/tls/*
3333
*.txt
3434
!/tests/test_requirements.txt
3535
__pycache__
36+
*.csv
37+
*.json
3638

3739
# Code coverage with lcov/gcov
3840
*.gcno
3941
*.gcov
4042
*.gcda
4143
*.info
44+
45+
# redis related
46+
*.rdb
47+
*.aof
48+
appendonlydir/
49+
*.conf
50+

client.cpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,18 @@ int client_group::create_clients(int num)
622622
}
623623

624624
m_clients.push_back(c);
625+
626+
// Add jitter between connection creation (except for the last connection)
627+
if (i < num - 1 && m_config->thread_conn_start_max_jitter_micros > 0) {
628+
unsigned int jitter_range = m_config->thread_conn_start_max_jitter_micros - m_config->thread_conn_start_min_jitter_micros;
629+
unsigned int jitter_micros = m_config->thread_conn_start_min_jitter_micros;
630+
631+
if (jitter_range > 0) {
632+
jitter_micros += rand() % (jitter_range + 1);
633+
}
634+
635+
usleep(jitter_micros);
636+
}
625637
}
626638

627639
return num;
@@ -646,6 +658,32 @@ void client_group::run(void)
646658
event_base_dispatch(m_base);
647659
}
648660

661+
void client_group::interrupt(void)
662+
{
663+
// Mark all clients as interrupted
664+
set_all_clients_interrupted();
665+
// Break the event loop to stop processing
666+
event_base_loopbreak(m_base);
667+
// Set end time for all clients as close as possible to the loop break
668+
finalize_all_clients();
669+
}
670+
671+
void client_group::finalize_all_clients(void)
672+
{
673+
for (std::vector<client*>::iterator i = m_clients.begin(); i != m_clients.end(); i++) {
674+
client* c = *i;
675+
c->set_end_time();
676+
}
677+
}
678+
679+
void client_group::set_all_clients_interrupted(void)
680+
{
681+
for (std::vector<client*>::iterator i = m_clients.begin(); i != m_clients.end(); i++) {
682+
client* c = *i;
683+
c->get_stats()->set_interrupted(true);
684+
}
685+
}
686+
649687
unsigned long int client_group::get_total_bytes(void)
650688
{
651689
unsigned long int total_bytes = 0;
@@ -688,6 +726,16 @@ unsigned long int client_group::get_duration_usec(void)
688726
return duration;
689727
}
690728

729+
unsigned long int client_group::get_total_connection_errors(void)
730+
{
731+
unsigned long int total_errors = 0;
732+
for (std::vector<client*>::iterator i = m_clients.begin(); i != m_clients.end(); i++) {
733+
total_errors += (*i)->get_stats()->get_total_connection_errors();
734+
}
735+
736+
return total_errors;
737+
}
738+
691739
void client_group::merge_run_stats(run_stats* target)
692740
{
693741
assert(target != NULL);

client.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,18 +210,22 @@ class client_group {
210210
int create_clients(int count);
211211
int prepare(void);
212212
void run(void);
213+
void interrupt(void);
214+
void finalize_all_clients(void);
215+
void set_all_clients_interrupted(void);
213216

214217
void write_client_stats(const char *prefix);
215218

216219
struct event_base *get_event_base(void) { return m_base; }
217220
benchmark_config *get_config(void) { return m_config; }
218221
abstract_protocol* get_protocol(void) { return m_protocol; }
219-
object_generator* get_obj_gen(void) { return m_obj_gen; }
222+
object_generator* get_obj_gen(void) { return m_obj_gen; }
220223

221224
unsigned long int get_total_bytes(void);
222225
unsigned long int get_total_ops(void);
223226
unsigned long int get_total_latency(void);
224227
unsigned long int get_duration_usec(void);
228+
unsigned long int get_total_connection_errors(void);
225229

226230
void merge_run_stats(run_stats* target);
227231
};

cluster_client.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ bool cluster_client::connect_shard_connection(shard_connection* sc, char* addres
209209
memcpy(ci.addr_buf, addr_info->ai_addr, addr_info->ai_addrlen);
210210
ci.ci_addr = (struct sockaddr *) ci.addr_buf;
211211
ci.ci_addrlen = addr_info->ai_addrlen;
212+
212213
freeaddrinfo(addr_info);
213214

214215
// call connect
@@ -497,4 +498,3 @@ void cluster_client::handle_response(unsigned int conn_id, struct timeval timest
497498
// continue with base class
498499
client::handle_response(conn_id, timestamp, request, response);
499500
}
500-

memtier_benchmark.1

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,24 @@ Number of concurrent pipelined requests (default: 1)
128128
\fB\-\-reconnect\-interval\fR=\fI\,NUM\/\fR
129129
Number of requests after which re\-connection is performed
130130
.TP
131+
\fB\-\-reconnect\-on\-error\fR
132+
Enable automatic reconnection on connection errors (default: disabled)
133+
.TP
134+
\fB\-\-max\-reconnect\-attempts\fR=\fI\,NUM\/\fR
135+
Maximum number of reconnection attempts, 0 for unlimited (default: 0)
136+
.TP
137+
\fB\-\-reconnect\-backoff\-factor\fR=\fI\,NUM\/\fR
138+
Backoff factor for reconnection delays, 0 for no backoff (default: 0)
139+
.TP
140+
\fB\-\-connection\-timeout\fR=\fI\,SECS\/\fR
141+
Connection timeout in seconds, 0 to disable (default: 0)
142+
.TP
143+
\fB\-\-thread\-conn\-start\-min\-jitter\-micros\fR=\fI\,NUM\/\fR
144+
Minimum jitter in microseconds between connection creation (default: 0)
145+
.TP
146+
\fB\-\-thread\-conn\-start\-max\-jitter\-micros\fR=\fI\,NUM\/\fR
147+
Maximum jitter in microseconds between connection creation (default: 0)
148+
.TP
131149
\fB\-\-multi\-key\-get\fR=\fI\,NUM\/\fR
132150
Enable multi\-key get commands, up to NUM keys (default: 0)
133151
.TP

0 commit comments

Comments
 (0)