Skip to content

Commit

Permalink
Group replication monitoring: count transactions behind events
Browse files Browse the repository at this point in the history
  • Loading branch information
Valentin Rakush committed Nov 6, 2019
1 parent b3b8d64 commit 1ffd361
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 45 deletions.
4 changes: 4 additions & 0 deletions include/MySQL_Monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ class MyGR_monitor_node {
MyGR_monitor_node(char *_a, int _p, int _whg);
~MyGR_monitor_node();
bool add_entry(unsigned long long _st, unsigned long long _ct, long long _tb, bool _pp, bool _ro, char *_error); // return true if status changed

int get_lag_behind_count(int txs_behind);
int get_timeout_count();
};


Expand Down Expand Up @@ -183,6 +186,7 @@ class MySQL_Monitor_State_Data {
int writer_hostgroup; // used only by group replication
bool writer_is_also_reader; // used only by group replication
int max_transactions_behind; // used only by group replication
int max_transactions_behind_count; // used only by group replication
int aws_aurora_max_lag_ms;
int aws_aurora_check_timeout_ms;
bool use_ssl;
Expand Down
1 change: 1 addition & 0 deletions include/MySQL_Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ class MySQL_Threads_Handler
int monitor_groupreplication_healthcheck_interval;
int monitor_groupreplication_healthcheck_timeout;
int monitor_groupreplication_healthcheck_max_timeout_count;
int monitor_groupreplication_max_transactions_behind_count;
int monitor_galera_healthcheck_interval;
int monitor_galera_healthcheck_timeout;
int monitor_galera_healthcheck_max_timeout_count;
Expand Down
2 changes: 1 addition & 1 deletion include/SQLite3_Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class SQLite3_Server {
#endif // TEST_GALERA
#ifdef TEST_GROUPREP
pthread_mutex_t grouprep_mutex;
void populate_grouprep_table(MySQL_Session *sess);
void populate_grouprep_table(MySQL_Session *sess, int txs_behind = 0);
void init_grouprep_ifaces_string(std::string& s);
#endif // TEST_GROUPREP
SQLite3_Server();
Expand Down
2 changes: 2 additions & 0 deletions include/proxysql_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,7 @@ __thread int mysql_thread___monitor_replication_lag_timeout;
__thread int mysql_thread___monitor_groupreplication_healthcheck_interval;
__thread int mysql_thread___monitor_groupreplication_healthcheck_timeout;
__thread int mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count;
__thread int mysql_thread___monitor_groupreplication_max_transactions_behind_count;
__thread int mysql_thread___monitor_galera_healthcheck_interval;
__thread int mysql_thread___monitor_galera_healthcheck_timeout;
__thread int mysql_thread___monitor_galera_healthcheck_max_timeout_count;
Expand Down Expand Up @@ -901,6 +902,7 @@ extern __thread int mysql_thread___monitor_replication_lag_timeout;
extern __thread int mysql_thread___monitor_groupreplication_healthcheck_interval;
extern __thread int mysql_thread___monitor_groupreplication_healthcheck_timeout;
extern __thread int mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count;
extern __thread int mysql_thread___monitor_groupreplication_max_transactions_behind_count;
extern __thread int mysql_thread___monitor_galera_healthcheck_interval;
extern __thread int mysql_thread___monitor_galera_healthcheck_timeout;
extern __thread int mysql_thread___monitor_galera_healthcheck_max_timeout_count;
Expand Down
2 changes: 1 addition & 1 deletion lib/MySQL_HostGroups_Manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1983,7 +1983,7 @@ void MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_table
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT writer_hostgroup, hostname, port, MAX(use_ssl) use_ssl , writer_is_also_reader , max_transactions_behind FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=reader_hostgroup OR hostgroup_id=offline_hostgroup GROUP BY hostgroup, hostname, port";
char *query=(char *)"SELECT writer_hostgroup, hostname, port, MAX(use_ssl) use_ssl , writer_is_also_reader , max_transactions_behind FROM mysql_servers JOIN mysql_group_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=backup_writer_hostgroup OR hostgroup_id=reader_hostgroup OR hostgroup_id=offline_hostgroup GROUP BY hostgroup_id, hostname, port";
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (resultset) {
if (GloMyMon->Group_Replication_Hosts_resultset) {
Expand Down
131 changes: 93 additions & 38 deletions lib/MySQL_Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1349,44 +1349,15 @@ void * monitor_group_replication_thread(void *arg) {
}
if (mmsd->mysql_error_msg) {
if (strncasecmp(mmsd->mysql_error_msg, (char *)"timeout", 7) == 0) {
int max_num_timeout = 10;
if (mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count < max_num_timeout) {
max_num_timeout = mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count;
}
unsigned long long start_times[max_num_timeout];
bool timeouts[max_num_timeout];
for (int i=0; i<max_num_timeout; i++) {
start_times[i]=0;
timeouts[i]=false;
}
for (int i=0; i<MyGR_Nentries; i++) {
if (node->last_entries[i].start_time) {
int smallidx = 0;
for (int j=0; j<max_num_timeout; j++) {
if (j!=smallidx) {
if (start_times[j] < start_times[smallidx]) {
smallidx = j;
}
}
}
if (start_times[smallidx] < node->last_entries[i].start_time) {
start_times[smallidx] = node->last_entries[i].start_time;
timeouts[smallidx] = false;
if (node->last_entries[i].error) {
if (strncasecmp(node->last_entries[i].error, (char *)"timeout", 7) == 0) {
timeouts[smallidx] = true;
}
}
}
}
}
for (int i=0; i<max_num_timeout; i++) {
if (timeouts[i]) {
num_timeouts++;
}
}
num_timeouts=node->get_timeout_count();
proxy_warning("%s:%d : group replication health check timeout count %d. Max threshold %d.\n",
mmsd->hostname, mmsd->port, num_timeouts, mmsd->max_transactions_behind_count);
}
}
int lag_counts = 0;
if (read_only) {
lag_counts = node->get_lag_behind_count(mmsd->max_transactions_behind);
}
pthread_mutex_unlock(&GloMyMon->group_replication_mutex);

// NOTE: we update MyHGM outside the mutex group_replication_mutex
Expand All @@ -1397,7 +1368,7 @@ void * monitor_group_replication_thread(void *arg) {
} else {
// it was a timeout. Check if we are having consecutive timeout
if (num_timeouts == mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count) {
proxy_error("Server %s:%d missed %d group replication checks. Number retires %d, Assuming offline\n",
proxy_error("Server %s:%d missed %d group replication checks. Number retries %d, Assuming offline\n",
mmsd->hostname, mmsd->port, num_timeouts, num_timeouts);
MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, mmsd->mysql_error_msg);
} else {
Expand All @@ -1409,7 +1380,7 @@ void * monitor_group_replication_thread(void *arg) {
MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"viable_candidate=NO");
} else {
if (read_only==true) {
if (transactions_behind > mmsd->max_transactions_behind) {
if (lag_counts >= mysql_thread___monitor_groupreplication_max_transactions_behind_count) {
MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"slave is lagging");
} else {
MyHGM->update_group_replication_set_read_only(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"read_only=YES");
Expand Down Expand Up @@ -2716,13 +2687,15 @@ void * MySQL_Monitor::monitor_group_replication() {
// resultset = MyHGM->execute_query(query, &error);
// assert(resultset);
if (Group_Replication_Hosts_resultset==NULL) {
proxy_error("Group replication hosts result set is absent\n");
goto __end_monitor_group_replication_loop;
// }
// if (error) {
// proxy_error("Error on %s : %s\n", query, error);
// goto __end_monitor_read_only_loop;
} else {
if (Group_Replication_Hosts_resultset->rows_count==0) {
proxy_error("Group replication hosts result set is empty\n");
goto __end_monitor_group_replication_loop;
}
int us=100;
Expand All @@ -2738,6 +2711,7 @@ void * MySQL_Monitor::monitor_group_replication() {
mmsd->writer_hostgroup=atoi(r->fields[0]);
mmsd->writer_is_also_reader=atoi(r->fields[4]);
mmsd->max_transactions_behind=atoi(r->fields[5]);
mmsd->max_transactions_behind_count=mysql_thread___monitor_groupreplication_max_transactions_behind_count;
mmsd->mondb=monitordb;
WorkItem* item;
item=new WorkItem(mmsd,monitor_group_replication_thread);
Expand Down Expand Up @@ -3210,6 +3184,85 @@ MyGR_monitor_node::~MyGR_monitor_node() {
}
}

int MyGR_monitor_node::get_lag_behind_count(int txs_behind) {
int max_lag = 10;
if (mysql_thread___monitor_groupreplication_max_transactions_behind_count < max_lag)
max_lag = mysql_thread___monitor_groupreplication_max_transactions_behind_count;
bool lags[max_lag];
unsigned long long start_times[max_lag];
int lag_counts=0;
for (int i=0; i<max_lag; i++) {
start_times[i]=0;
lags[i]=false;
}
for (int i=0; i<MyGR_Nentries; i++) {
if (last_entries[i].start_time) {
int smallidx = 0;
for (int j=0; j<max_lag; j++) {
if (j!=smallidx) {
if (start_times[j] < start_times[smallidx]) {
smallidx = j;
}
}
}
if (start_times[smallidx] < last_entries[i].start_time) {
start_times[smallidx] = last_entries[i].start_time;
lags[smallidx] = false;
if (last_entries[i].transactions_behind > txs_behind) {
lags[smallidx] = true;
}
}
}
}
for (int i=0; i<max_lag; i++) {
if (lags[i]) {
lag_counts++;
}
}

return lag_counts;
}

int MyGR_monitor_node::get_timeout_count() {
int num_timeouts = 0;
int max_num_timeout = 10;
if (mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count < max_num_timeout)
max_num_timeout = mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count;
unsigned long long start_times[max_num_timeout];
bool timeouts[max_num_timeout];
for (int i=0; i<max_num_timeout; i++) {
start_times[i]=0;
timeouts[i]=false;
}
for (int i=0; i<MyGR_Nentries; i++) {
if (last_entries[i].start_time) {
int smallidx = 0;
for (int j=0; j<max_num_timeout; j++) {
if (j!=smallidx) {
if (start_times[j] < start_times[smallidx]) {
smallidx = j;
}
}
}
if (start_times[smallidx] < last_entries[i].start_time) {
start_times[smallidx] = last_entries[i].start_time;
timeouts[smallidx] = false;
if (last_entries[i].error) {
if (strncasecmp(last_entries[i].error, (char *)"timeout", 7) == 0) {
timeouts[smallidx] = true;
}
}
}
}
}
for (int i=0; i<max_num_timeout; i++) {
if (timeouts[i]) {
num_timeouts++;
}
}
return num_timeouts;
}

// return true if status changed
bool MyGR_monitor_node::add_entry(unsigned long long _st, unsigned long long _ct, long long _tb, bool _pp, bool _ro, char *_error) {
bool ret=false;
Expand Down Expand Up @@ -4694,3 +4747,5 @@ void MySQL_Monitor::evaluate_aws_aurora_results(unsigned int wHG, unsigned int r
}
#endif // TEST_AURORA
}


18 changes: 17 additions & 1 deletion lib/MySQL_Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"monitor_groupreplication_healthcheck_interval",
(char *)"monitor_groupreplication_healthcheck_timeout",
(char *)"monitor_groupreplication_healthcheck_max_timeout_count",
(char *)"monitor_groupreplication_max_transactions_behind_count",
(char *)"monitor_galera_healthcheck_interval",
(char *)"monitor_galera_healthcheck_timeout",
(char *)"monitor_galera_healthcheck_max_timeout_count",
Expand Down Expand Up @@ -400,8 +401,8 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.monitor_replication_lag_timeout=1000;
variables.monitor_groupreplication_healthcheck_interval=5000;
variables.monitor_groupreplication_healthcheck_timeout=800;
variables.monitor_groupreplication_healthcheck_timeout=800;
variables.monitor_groupreplication_healthcheck_max_timeout_count=3;
variables.monitor_groupreplication_max_transactions_behind_count=3;
variables.monitor_galera_healthcheck_interval=5000;
variables.monitor_galera_healthcheck_timeout=800;
variables.monitor_galera_healthcheck_max_timeout_count=3;
Expand Down Expand Up @@ -775,6 +776,7 @@ int MySQL_Threads_Handler::get_variable_int(const char *name) {
if (!strcmp(name,"monitor_groupreplication_healthcheck_interval")) return (int)variables.monitor_groupreplication_healthcheck_interval;
if (!strcmp(name,"monitor_groupreplication_healthcheck_timeout")) return (int)variables.monitor_groupreplication_healthcheck_timeout;
if (!strcmp(name,"monitor_groupreplication_healthcheck_max_timeout_count")) return (int)variables.monitor_groupreplication_healthcheck_max_timeout_count;
if (!strcmp(name,"monitor_groupreplication_max_transactions_behind_count")) return (int)variables.monitor_groupreplication_max_transactions_behind_count;
}
if (b == 'a') {
if (!strcmp(name,"monitor_galera_healthcheck_interval")) return (int)variables.monitor_galera_healthcheck_interval;
Expand Down Expand Up @@ -1176,6 +1178,10 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f
sprintf(intbuf,"%d",variables.monitor_groupreplication_healthcheck_max_timeout_count);
return strdup(intbuf);
}
if (!strcasecmp(name,"monitor_groupreplication_max_transactions_behind_count")) {
sprintf(intbuf,"%d",variables.monitor_groupreplication_max_transactions_behind_count);
return strdup(intbuf);
}
if (!strcasecmp(name,"monitor_galera_healthcheck_interval")) {
sprintf(intbuf,"%d",variables.monitor_galera_healthcheck_interval);
return strdup(intbuf);
Expand Down Expand Up @@ -1733,6 +1739,15 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t
return false;
}
}
if (!strcasecmp(name,"monitor_groupreplication_max_transactions_behind_count")) {
int intv=atoi(value);
if (intv >= 1 && intv <= 10) {
variables.monitor_groupreplication_max_transactions_behind_count=intv;
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"monitor_galera_healthcheck_interval")) {
int intv=atoi(value);
if (intv >= 50 && intv <= 7*24*3600*1000) {
Expand Down Expand Up @@ -4509,6 +4524,7 @@ void MySQL_Thread::refresh_variables() {
mysql_thread___monitor_groupreplication_healthcheck_interval=GloMTH->get_variable_int((char *)"monitor_groupreplication_healthcheck_interval");
mysql_thread___monitor_groupreplication_healthcheck_timeout=GloMTH->get_variable_int((char *)"monitor_groupreplication_healthcheck_timeout");
mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count=GloMTH->get_variable_int((char *)"monitor_groupreplication_healthcheck_max_timeout_count");
mysql_thread___monitor_groupreplication_max_transactions_behind_count=GloMTH->get_variable_int((char *)"monitor_groupreplication_max_transactions_behind_count");
mysql_thread___monitor_galera_healthcheck_interval=GloMTH->get_variable_int((char *)"monitor_galera_healthcheck_interval");
mysql_thread___monitor_galera_healthcheck_timeout=GloMTH->get_variable_int((char *)"monitor_galera_healthcheck_timeout");
mysql_thread___monitor_galera_healthcheck_max_timeout_count=GloMTH->get_variable_int((char *)"monitor_galera_healthcheck_max_timeout_count");
Expand Down
3 changes: 3 additions & 0 deletions lib/ProxySQL_Admin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10548,6 +10548,7 @@ void ProxySQL_Admin::enable_grouprep_testing() {
admindb->execute("INSERT INTO mysql_servers (hostgroup_id, hostname, use_ssl, comment) VALUES (3272, '127.2.1.1', 0, '')");
admindb->execute("INSERT INTO mysql_servers (hostgroup_id, hostname, use_ssl, comment) VALUES (3273, '127.2.1.2', 0, '')");
admindb->execute("INSERT INTO mysql_servers (hostgroup_id, hostname, use_ssl, comment) VALUES (3273, '127.2.1.3', 0, '')");
admindb->execute("DELETE FROM mysql_group_replication_hostgroups");
admindb->execute("INSERT INTO mysql_group_replication_hostgroups "
"(writer_hostgroup,backup_writer_hostgroup,reader_hostgroup,offline_hostgroup,active,max_writers,"
"writer_is_also_reader,max_transactions_behind) VALUES (3272,3274,3273,3271,1,1,1,0);");
Expand All @@ -10558,8 +10559,10 @@ void ProxySQL_Admin::enable_grouprep_testing() {
admindb->execute("UPDATE global_variables SET variable_value=5000 WHERE variable_name='mysql-monitor_groupreplication_healthcheck_interval'");
admindb->execute("UPDATE global_variables SET variable_value=800 WHERE variable_name='mysql-monitor_groupreplication_healthcheck_timeout'");
admindb->execute("UPDATE global_variables SET variable_value=3 WHERE variable_name='mysql-monitor_groupreplication_healthcheck_max_timeout_count'");
admindb->execute("UPDATE global_variables SET variable_value=3 WHERE variable_name='mysql-monitor_groupreplication_max_transactions_behind_count'");
load_mysql_variables_to_runtime();

admindb->execute("DELETE FROM mysql_users WHERE username='grouprep1'");
admindb->execute("INSERT INTO mysql_users (username,password,default_hostgroup) VALUES ('grouprep1','pass1',3272)");
init_users();

Expand Down
13 changes: 9 additions & 4 deletions lib/SQLite3_Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ static char *s_strdup(char *s) {
static int __SQLite3_Server_refresh_interval=1000;
static bool testTimeoutSequence[] = {true, false, true, false, true, false, true, false};
static int testIndex = 7;
static int testLag = 10;

extern Query_Cache *GloQC;
extern MySQL_Authentication *GloMyAuth;
Expand Down Expand Up @@ -516,7 +517,8 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p
#ifdef TEST_GROUPREP
if (strstr(query_no_space,(char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS")) {
pthread_mutex_lock(&GloSQLite3Server->grouprep_mutex);
GloSQLite3Server->populate_grouprep_table(sess);
GloSQLite3Server->populate_grouprep_table(sess, testLag);
if (testLag > 0) testLag--;
}
#endif // TEST_GROUPREP
if (strstr(query_no_space,(char *)"Seconds_Behind_Master")) {
Expand Down Expand Up @@ -1026,16 +1028,19 @@ void SQLite3_Server::populate_aws_aurora_table(MySQL_Session *sess) {
#endif // TEST_AURORA

#ifdef TEST_GROUPREP
void SQLite3_Server::populate_grouprep_table(MySQL_Session *sess) {
void SQLite3_Server::populate_grouprep_table(MySQL_Session *sess, int txs_behind) {
// this function needs to be called with lock on mutex galera_mutex already acquired
//
sessdb->execute("DELETE FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS");
string myip = string(sess->client_myds->proxy_addr.addr);
string server_id = myip.substr(8,1);
if (server_id == "1")
sessdb->execute("INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS (viable_candidate, read_only, transactions_behind) values ('YES', 'NO', 0)");
else
sessdb->execute("INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS (viable_candidate, read_only, transactions_behind) values ('YES', 'YES', 0)");
else {
std::stringstream ss;
ss << "INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS (viable_candidate, read_only, transactions_behind) values ('YES', 'YES', " << txs_behind << ")";
sessdb->execute(ss.str().c_str());
}
}
#endif // TEST_GALERA

Expand Down

0 comments on commit 1ffd361

Please sign in to comment.